From 23a1e08e90943936c8cc2daa32b8463dbf797def Mon Sep 17 00:00:00 2001 From: Lucas Clemente Date: Mon, 25 Apr 2016 17:50:35 +0200 Subject: [PATCH] make stream private --- example/main.go | 7 +- handshake/crypto_setup_test.go | 4 + session.go | 28 +++--- session_test.go | 5 +- stream.go | 104 +++++++++++----------- stream_test.go | 152 ++++++++++++++++----------------- utils/utils.go | 1 + 7 files changed, 155 insertions(+), 146 deletions(-) diff --git a/example/main.go b/example/main.go index acf81d79..ec24d0cc 100644 --- a/example/main.go +++ b/example/main.go @@ -12,6 +12,7 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/protocol" + "github.com/lucas-clemente/quic-go/utils" ) var supportedVersions = map[protocol.VersionNumber]bool{ @@ -40,8 +41,8 @@ func main() { type responseWriter struct { session *quic.Session dataStreamID protocol.StreamID - headerStream *quic.Stream - dataStream *quic.Stream + headerStream utils.Stream + dataStream utils.Stream header http.Header headerWritten bool @@ -105,7 +106,7 @@ func (w *responseWriter) Write(p []byte) (int, error) { return 0, nil } -func handleStream(session *quic.Session, headerStream *quic.Stream) { +func handleStream(session *quic.Session, headerStream utils.Stream) { hpackDecoder := hpack.NewDecoder(4096, nil) h2framer := http2.NewFramer(nil, headerStream) diff --git a/handshake/crypto_setup_test.go b/handshake/crypto_setup_test.go index 215b09c5..f808a9ee 100644 --- a/handshake/crypto_setup_test.go +++ b/handshake/crypto_setup_test.go @@ -78,6 +78,10 @@ func (s *mockStream) Write(p []byte) (int, error) { return s.dataWritten.Write(p) } +func (s *mockStream) Close() error { + panic("not implemented") +} + var _ = Describe("Crypto setup", func() { var ( kex *mockKEX diff --git a/session.go b/session.go index c040d191..100725aa 100644 --- a/session.go +++ b/session.go @@ -12,6 +12,7 @@ import ( "github.com/lucas-clemente/quic-go/frames" "github.com/lucas-clemente/quic-go/handshake" "github.com/lucas-clemente/quic-go/protocol" + "github.com/lucas-clemente/quic-go/utils" ) type receivedPacket struct { @@ -21,7 +22,7 @@ type receivedPacket struct { } // StreamCallback gets a stream frame and returns a reply frame -type StreamCallback func(*Session, *Stream) +type StreamCallback func(*Session, utils.Stream) // A Session is a QUIC session type Session struct { @@ -30,7 +31,7 @@ type Session struct { connection *net.UDPConn currentRemoteAddr *net.UDPAddr - streams map[protocol.StreamID]*Stream + streams map[protocol.StreamID]*stream streamsMutex sync.RWMutex outgoingAckHandler ackhandler.OutgoingPacketAckHandler @@ -47,7 +48,7 @@ func NewSession(conn *net.UDPConn, v protocol.VersionNumber, connectionID protoc session := &Session{ connection: conn, streamCallback: streamCallback, - streams: make(map[protocol.StreamID]*Stream), + streams: make(map[protocol.StreamID]*stream), outgoingAckHandler: ackhandler.NewOutgoingPacketAckHandler(), incomingAckHandler: ackhandler.NewIncomingPacketAckHandler(), receivedPackets: make(chan receivedPacket), @@ -133,21 +134,22 @@ func (s *Session) handleStreamFrame(frame *frames.StreamFrame) error { return errors.New("Session: 0 is not a valid Stream ID") } s.streamsMutex.RLock() - stream, existingStream := s.streams[frame.StreamID] + str, streamExists := s.streams[frame.StreamID] s.streamsMutex.RUnlock() - if !existingStream { - stream, _ = s.NewStream(frame.StreamID) + if !streamExists { + ss, _ := s.NewStream(frame.StreamID) + str = ss.(*stream) } - if stream == nil { + if str == nil { return errors.New("Session: reopening streams is not allowed") } - err := stream.AddStreamFrame(frame) + err := str.AddStreamFrame(frame) if err != nil { return err } - if !existingStream { - s.streamCallback(s, stream) + if !streamExists { + s.streamCallback(s, str) } return nil } @@ -193,11 +195,11 @@ func (s *Session) QueueFrame(frame frames.Frame) error { return nil } -// NewStream creates a new strean open for reading and writing -func (s *Session) NewStream(id protocol.StreamID) (*Stream, error) { +// NewStream creates a new stream open for reading and writing +func (s *Session) NewStream(id protocol.StreamID) (utils.Stream, error) { s.streamsMutex.Lock() defer s.streamsMutex.Unlock() - stream := NewStream(s, id) + stream := newStream(s, id) if s.streams[id] != nil { return nil, fmt.Errorf("Session: stream with ID %d already exists", id) } diff --git a/session_test.go b/session_test.go index fd228082..de0e7369 100644 --- a/session_test.go +++ b/session_test.go @@ -8,6 +8,7 @@ import ( "github.com/lucas-clemente/quic-go/frames" "github.com/lucas-clemente/quic-go/protocol" + "github.com/lucas-clemente/quic-go/utils" ) var _ = Describe("Session", func() { @@ -19,8 +20,8 @@ var _ = Describe("Session", func() { BeforeEach(func() { callbackCalled = false session = &Session{ - streams: make(map[protocol.StreamID]*Stream), - streamCallback: func(*Session, *Stream) { callbackCalled = true }, + streams: make(map[protocol.StreamID]*stream), + streamCallback: func(*Session, utils.Stream) { callbackCalled = true }, } }) diff --git a/stream.go b/stream.go index 3aaf03ba..035745b3 100644 --- a/stream.go +++ b/stream.go @@ -15,66 +15,66 @@ type streamHandler interface { } // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface -type Stream struct { - Session streamHandler - StreamID protocol.StreamID - StreamFrames chan *frames.StreamFrame - CurrentFrame *frames.StreamFrame - ReadPosInFrame int - WriteOffset uint64 - ReadOffset uint64 +type stream struct { + session streamHandler + streamID protocol.StreamID + streamFrames chan *frames.StreamFrame + currentFrame *frames.StreamFrame + readPosInFrame int + writeOffset uint64 + readOffset uint64 frameQueue []*frames.StreamFrame // TODO: replace with heap currentErr error } -// NewStream creates a new Stream -func NewStream(session streamHandler, StreamID protocol.StreamID) *Stream { - return &Stream{ - Session: session, - StreamID: StreamID, - StreamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number +// newStream creates a new Stream +func newStream(session streamHandler, StreamID protocol.StreamID) *stream { + return &stream{ + session: session, + streamID: StreamID, + streamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number } } // Read reads data -func (s *Stream) Read(p []byte) (int, error) { +func (s *stream) Read(p []byte) (int, error) { bytesRead := 0 for bytesRead < len(p) { - if s.CurrentFrame == nil { + if s.currentFrame == nil { var err error - s.CurrentFrame, err = s.getNextFrameInOrder(bytesRead == 0) + s.currentFrame, err = s.getNextFrameInOrder(bytesRead == 0) if err != nil { return bytesRead, err } - if s.CurrentFrame == nil { + if s.currentFrame == nil { return bytesRead, nil } - s.ReadPosInFrame = 0 + s.readPosInFrame = 0 } - m := utils.Min(len(p)-bytesRead, len(s.CurrentFrame.Data)-s.ReadPosInFrame) - copy(p[bytesRead:], s.CurrentFrame.Data[s.ReadPosInFrame:]) - s.ReadPosInFrame += m + m := utils.Min(len(p)-bytesRead, len(s.currentFrame.Data)-s.readPosInFrame) + copy(p[bytesRead:], s.currentFrame.Data[s.readPosInFrame:]) + s.readPosInFrame += m bytesRead += m - s.ReadOffset += uint64(m) - if s.ReadPosInFrame >= len(s.CurrentFrame.Data) { - if s.CurrentFrame.FinBit { + s.readOffset += uint64(m) + if s.readPosInFrame >= len(s.currentFrame.Data) { + if s.currentFrame.FinBit { s.currentErr = io.EOF - close(s.StreamFrames) - s.CurrentFrame = nil - s.Session.closeStream(s.StreamID) + close(s.streamFrames) + s.currentFrame = nil + s.session.closeStream(s.streamID) return bytesRead, io.EOF } - s.CurrentFrame = nil + s.currentFrame = nil } } return bytesRead, nil } -func (s *Stream) getNextFrameInOrder(wait bool) (*frames.StreamFrame, error) { +func (s *stream) getNextFrameInOrder(wait bool) (*frames.StreamFrame, error) { // First, check the queue for i, f := range s.frameQueue { - if f.Offset == s.ReadOffset { + if f.Offset == s.readOffset { // Move last element into position i s.frameQueue[i] = s.frameQueue[len(s.frameQueue)-1] s.frameQueue = s.frameQueue[:len(s.frameQueue)-1] @@ -91,12 +91,12 @@ func (s *Stream) getNextFrameInOrder(wait bool) (*frames.StreamFrame, error) { return nil, nil } - if nextFrameFromChannel.Offset == s.ReadOffset { + if nextFrameFromChannel.Offset == s.readOffset { return nextFrameFromChannel, nil } // Discard if we already know it - if nextFrameFromChannel.Offset < s.ReadOffset { + if nextFrameFromChannel.Offset < s.readOffset { continue } @@ -105,15 +105,15 @@ func (s *Stream) getNextFrameInOrder(wait bool) (*frames.StreamFrame, error) { } } -func (s *Stream) nextFrameInChan(blocking bool) (f *frames.StreamFrame, err error) { +func (s *stream) nextFrameInChan(blocking bool) (f *frames.StreamFrame, err error) { var ok bool if blocking { select { - case f, ok = <-s.StreamFrames: + case f, ok = <-s.streamFrames: } } else { select { - case f, ok = <-s.StreamFrames: + case f, ok = <-s.streamFrames: default: } } @@ -124,48 +124,48 @@ func (s *Stream) nextFrameInChan(blocking bool) (f *frames.StreamFrame, err erro } // ReadByte implements io.ByteReader -func (s *Stream) ReadByte() (byte, error) { +func (s *stream) ReadByte() (byte, error) { // TODO: Optimize p := make([]byte, 1) _, err := io.ReadFull(s, p) return p[0], err } -func (s *Stream) Write(p []byte) (int, error) { +func (s *stream) Write(p []byte) (int, error) { data := make([]byte, len(p)) copy(data, p) - err := s.Session.QueueFrame(&frames.StreamFrame{ - StreamID: s.StreamID, - Offset: s.WriteOffset, + err := s.session.QueueFrame(&frames.StreamFrame{ + StreamID: s.streamID, + Offset: s.writeOffset, Data: data, }) if err != nil { return 0, err } - s.WriteOffset += uint64(len(p)) + s.writeOffset += uint64(len(p)) return len(p), nil } // Close implements io.Closer -func (s *Stream) Close() error { - fmt.Printf("Closing stream %d\n", s.StreamID) - return s.Session.QueueFrame(&frames.StreamFrame{ - StreamID: s.StreamID, - Offset: s.WriteOffset, +func (s *stream) Close() error { + fmt.Printf("Closing stream %d\n", s.streamID) + return s.session.QueueFrame(&frames.StreamFrame{ + StreamID: s.streamID, + Offset: s.writeOffset, FinBit: true, }) } // AddStreamFrame adds a new stream frame -func (s *Stream) AddStreamFrame(frame *frames.StreamFrame) error { - s.StreamFrames <- frame +func (s *stream) AddStreamFrame(frame *frames.StreamFrame) error { + s.streamFrames <- frame return nil } // RegisterError is called by session to indicate that an error occured and the // stream should be closed. -func (s *Stream) RegisterError(err error) { +func (s *stream) RegisterError(err error) { s.currentErr = err - s.Session.closeStream(s.StreamID) - close(s.StreamFrames) + s.session.closeStream(s.streamID) + close(s.streamFrames) } diff --git a/stream_test.go b/stream_test.go index fb9dab20..37819343 100644 --- a/stream_test.go +++ b/stream_test.go @@ -27,13 +27,13 @@ func (m *mockStreamHandler) closeStream(protocol.StreamID) { var _ = Describe("Stream", func() { var ( - stream *Stream + str *stream handler *mockStreamHandler ) BeforeEach(func() { handler = &mockStreamHandler{} - stream = NewStream(handler, 1337) + str = newStream(handler, 1337) }) Context("reading", func() { @@ -42,9 +42,9 @@ var _ = Describe("Stream", func() { Offset: 0, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, } - stream.AddStreamFrame(&frame) + str.AddStreamFrame(&frame) b := make([]byte, 4) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(4)) Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) @@ -55,13 +55,13 @@ var _ = Describe("Stream", func() { Offset: 0, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, } - stream.AddStreamFrame(&frame) + str.AddStreamFrame(&frame) b := make([]byte, 2) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(2)) Expect(b).To(Equal([]byte{0xDE, 0xAD})) - n, err = stream.Read(b) + n, err = str.Read(b) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(2)) Expect(b).To(Equal([]byte{0xBE, 0xEF})) @@ -72,17 +72,17 @@ var _ = Describe("Stream", func() { Offset: 0, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, } - stream.AddStreamFrame(&frame) - b, err := stream.ReadByte() + str.AddStreamFrame(&frame) + b, err := str.ReadByte() Expect(err).ToNot(HaveOccurred()) Expect(b).To(Equal(byte(0xDE))) - b, err = stream.ReadByte() + b, err = str.ReadByte() Expect(err).ToNot(HaveOccurred()) Expect(b).To(Equal(byte(0xAD))) - b, err = stream.ReadByte() + b, err = str.ReadByte() Expect(err).ToNot(HaveOccurred()) Expect(b).To(Equal(byte(0xBE))) - b, err = stream.ReadByte() + b, err = str.ReadByte() Expect(err).ToNot(HaveOccurred()) Expect(b).To(Equal(byte(0xEF))) }) @@ -96,10 +96,10 @@ var _ = Describe("Stream", func() { Offset: 2, Data: []byte{0xBE, 0xEF}, } - stream.AddStreamFrame(&frame1) - stream.AddStreamFrame(&frame2) + str.AddStreamFrame(&frame1) + str.AddStreamFrame(&frame2) b := make([]byte, 6) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(4)) Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00})) @@ -114,10 +114,10 @@ var _ = Describe("Stream", func() { Offset: 2, Data: []byte{0xBE, 0xEF}, } - stream.AddStreamFrame(&frame1) - stream.AddStreamFrame(&frame2) + str.AddStreamFrame(&frame1) + str.AddStreamFrame(&frame2) b := make([]byte, 4) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(4)) Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) @@ -130,10 +130,10 @@ var _ = Describe("Stream", func() { Data: []byte{0xDE, 0xAD}, } time.Sleep(time.Millisecond) - stream.AddStreamFrame(&frame) + str.AddStreamFrame(&frame) }() b := make([]byte, 2) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(2)) }) @@ -147,10 +147,10 @@ var _ = Describe("Stream", func() { Offset: 0, Data: []byte{0xDE, 0xAD}, } - stream.AddStreamFrame(&frame1) - stream.AddStreamFrame(&frame2) + str.AddStreamFrame(&frame1) + str.AddStreamFrame(&frame2) b := make([]byte, 4) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(4)) Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) @@ -169,17 +169,17 @@ var _ = Describe("Stream", func() { Offset: 2, Data: []byte{0xBE, 0xEF}, } - stream.AddStreamFrame(&frame1) - stream.AddStreamFrame(&frame2) - stream.AddStreamFrame(&frame3) + str.AddStreamFrame(&frame1) + str.AddStreamFrame(&frame2) + str.AddStreamFrame(&frame3) b := make([]byte, 4) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(4)) Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) }) - It("discards unneeded stream frames", func() { + It("discards unneeded str frames", func() { frame1 := frames.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD}, @@ -192,11 +192,11 @@ var _ = Describe("Stream", func() { Offset: 2, Data: []byte{0xBE, 0xEF}, } - stream.AddStreamFrame(&frame1) - stream.AddStreamFrame(&frame2) - stream.AddStreamFrame(&frame3) + str.AddStreamFrame(&frame1) + str.AddStreamFrame(&frame2) + str.AddStreamFrame(&frame3) b := make([]byte, 4) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(4)) Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) @@ -204,8 +204,8 @@ var _ = Describe("Stream", func() { }) Context("writing", func() { - It("writes stream frames", func() { - n, err := stream.Write([]byte("foobar")) + It("writes str frames", func() { + n, err := str.Write([]byte("foobar")) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(6)) Expect(handler.frames).To(HaveLen(1)) @@ -215,11 +215,11 @@ var _ = Describe("Stream", func() { })) }) - It("writes multiple stream frames", func() { - n, err := stream.Write([]byte("foo")) + It("writes multiple str frames", func() { + n, err := str.Write([]byte("foo")) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(3)) - n, err = stream.Write([]byte("bar")) + n, err = str.Write([]byte("bar")) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(3)) Expect(handler.frames).To(HaveLen(2)) @@ -235,7 +235,7 @@ var _ = Describe("Stream", func() { }) It("closes", func() { - err := stream.Close() + err := str.Close() Expect(err).ToNot(HaveOccurred()) Expect(handler.frames).To(HaveLen(1)) Expect(handler.frames[0]).To(Equal(&frames.StreamFrame{ @@ -246,13 +246,13 @@ var _ = Describe("Stream", func() { }) }) - Context("getting next stream frame", func() { + Context("getting next str frame", func() { It("gets next frame", func() { - stream.AddStreamFrame(&frames.StreamFrame{ + str.AddStreamFrame(&frames.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD}, }) - f, err := stream.getNextFrameInOrder(true) + f, err := str.getNextFrameInOrder(true) Expect(err).ToNot(HaveOccurred()) Expect(f.Data).To(Equal([]byte{0xDE, 0xAD})) }) @@ -262,80 +262,80 @@ var _ = Describe("Stream", func() { go func() { time.Sleep(time.Millisecond) b = true - stream.AddStreamFrame(&frames.StreamFrame{ + str.AddStreamFrame(&frames.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD}, }) }() - f, err := stream.getNextFrameInOrder(true) + f, err := str.getNextFrameInOrder(true) Expect(err).ToNot(HaveOccurred()) Expect(b).To(BeTrue()) Expect(f.Data).To(Equal([]byte{0xDE, 0xAD})) }) - It("queues non-matching stream frames", func() { + It("queues non-matching str frames", func() { var b bool - stream.AddStreamFrame(&frames.StreamFrame{ + str.AddStreamFrame(&frames.StreamFrame{ Offset: 2, Data: []byte{0xBE, 0xEF}, }) go func() { time.Sleep(time.Millisecond) b = true - stream.AddStreamFrame(&frames.StreamFrame{ + str.AddStreamFrame(&frames.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD}, }) }() - f, err := stream.getNextFrameInOrder(true) + f, err := str.getNextFrameInOrder(true) Expect(err).ToNot(HaveOccurred()) Expect(b).To(BeTrue()) Expect(f.Data).To(Equal([]byte{0xDE, 0xAD})) - stream.ReadOffset += 2 - f, err = stream.getNextFrameInOrder(true) + str.readOffset += 2 + f, err = str.getNextFrameInOrder(true) Expect(err).ToNot(HaveOccurred()) Expect(f.Data).To(Equal([]byte{0xBE, 0xEF})) }) It("returns nil if non-blocking", func() { - Expect(stream.getNextFrameInOrder(false)).To(BeNil()) + Expect(str.getNextFrameInOrder(false)).To(BeNil()) }) It("returns properly if non-blocking", func() { - stream.AddStreamFrame(&frames.StreamFrame{ + str.AddStreamFrame(&frames.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD}, }) - Expect(stream.getNextFrameInOrder(false)).ToNot(BeNil()) + Expect(str.getNextFrameInOrder(false)).ToNot(BeNil()) }) It("dequeues 3rd frame after blocking on 1st", func() { - stream.AddStreamFrame(&frames.StreamFrame{ + str.AddStreamFrame(&frames.StreamFrame{ Offset: 4, Data: []byte{0x23, 0x42}, }) - stream.AddStreamFrame(&frames.StreamFrame{ + str.AddStreamFrame(&frames.StreamFrame{ Offset: 2, Data: []byte{0xBE, 0xEF}, }) go func() { time.Sleep(time.Millisecond) - stream.AddStreamFrame(&frames.StreamFrame{ + str.AddStreamFrame(&frames.StreamFrame{ Offset: 0, Data: []byte{0xDE, 0xAD}, }) }() - Expect(stream.getNextFrameInOrder(true)).ToNot(BeNil()) - stream.ReadOffset += 2 - Expect(stream.getNextFrameInOrder(true)).ToNot(BeNil()) - stream.ReadOffset += 2 - Expect(stream.getNextFrameInOrder(true)).ToNot(BeNil()) + Expect(str.getNextFrameInOrder(true)).ToNot(BeNil()) + str.readOffset += 2 + Expect(str.getNextFrameInOrder(true)).ToNot(BeNil()) + str.readOffset += 2 + Expect(str.getNextFrameInOrder(true)).ToNot(BeNil()) }) }) Context("closing", func() { AfterEach(func() { - Expect(stream.StreamFrames).To(BeClosed()) + Expect(str.streamFrames).To(BeClosed()) Expect(handler.closedStream).To(BeTrue()) }) @@ -346,13 +346,13 @@ var _ = Describe("Stream", func() { Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, FinBit: true, } - stream.AddStreamFrame(&frame) + str.AddStreamFrame(&frame) b := make([]byte, 4) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).To(Equal(io.EOF)) Expect(n).To(Equal(4)) Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) - n, err = stream.Read(b) + n, err = str.Read(b) Expect(n).To(BeZero()) Expect(err).To(Equal(io.EOF)) }) @@ -367,14 +367,14 @@ var _ = Describe("Stream", func() { Offset: 0, Data: []byte{0xDE, 0xAD}, } - stream.AddStreamFrame(&frame1) - stream.AddStreamFrame(&frame2) + str.AddStreamFrame(&frame1) + str.AddStreamFrame(&frame2) b := make([]byte, 4) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).To(Equal(io.EOF)) Expect(n).To(Equal(4)) Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) - n, err = stream.Read(b) + n, err = str.Read(b) Expect(n).To(BeZero()) Expect(err).To(Equal(io.EOF)) }) @@ -385,9 +385,9 @@ var _ = Describe("Stream", func() { Data: []byte{0xDE, 0xAD}, FinBit: true, } - stream.AddStreamFrame(&frame) + str.AddStreamFrame(&frame) b := make([]byte, 4) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).To(Equal(io.EOF)) Expect(n).To(Equal(2)) Expect(b[:n]).To(Equal([]byte{0xDE, 0xAD})) @@ -399,9 +399,9 @@ var _ = Describe("Stream", func() { Data: []byte{}, FinBit: true, } - stream.AddStreamFrame(&frame) + str.AddStreamFrame(&frame) b := make([]byte, 4) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(n).To(BeZero()) Expect(err).To(Equal(io.EOF)) }) @@ -415,14 +415,14 @@ var _ = Describe("Stream", func() { Offset: 0, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, } - stream.AddStreamFrame(&frame) - stream.RegisterError(testErr) + str.AddStreamFrame(&frame) + str.RegisterError(testErr) b := make([]byte, 4) - n, err := stream.Read(b) + n, err := str.Read(b) Expect(err).ToNot(HaveOccurred()) Expect(n).To(Equal(4)) Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) - n, err = stream.Read(b) + n, err = str.Read(b) Expect(n).To(BeZero()) Expect(err).To(Equal(testErr)) }) diff --git a/utils/utils.go b/utils/utils.go index d103c01e..3e3c1338 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -17,6 +17,7 @@ type Stream interface { io.Reader io.ByteReader io.Writer + io.Closer } // ReadUintN reads N bytes