diff --git a/example/main.go b/example/main.go index 53d7d5bd..0dee5ecc 100644 --- a/example/main.go +++ b/example/main.go @@ -32,10 +32,9 @@ func main() { } } -func handleStream(frame *frames.StreamFrame) []frames.Frame { - h2r := bytes.NewReader(frame.Data) +func handleStream(stream *quic.Stream) []frames.Frame { var reply bytes.Buffer - h2framer := http2.NewFramer(&reply, h2r) + h2framer := http2.NewFramer(&reply, stream) h2framer.ReadMetaHeaders = hpack.NewDecoder(1024, nil) h2frame, err := h2framer.ReadFrame() if err != nil { @@ -55,7 +54,7 @@ func handleStream(frame *frames.StreamFrame) []frames.Frame { BlockFragment: replyHeaders.Bytes(), }) headerStreamFrame := &frames.StreamFrame{ - StreamID: frame.StreamID, + StreamID: stream.StreamID, Data: reply.Bytes(), FinBit: true, } diff --git a/session.go b/session.go index e29d493b..ba23368d 100644 --- a/session.go +++ b/session.go @@ -12,7 +12,7 @@ import ( ) // StreamCallback gets a stream frame and returns a reply frame -type StreamCallback func(*frames.StreamFrame) []frames.Frame +type StreamCallback func(*Stream) []frames.Frame // A Session is a QUIC session type Session struct { @@ -30,6 +30,8 @@ type Session struct { lastSentPacketNumber protocol.PacketNumber lastObservedPacketNumber protocol.PacketNumber + Streams map[protocol.StreamID]*Stream + streamCallback StreamCallback } @@ -43,6 +45,7 @@ func NewSession(conn *net.UDPConn, v protocol.VersionNumber, connectionID protoc cryptoSetup: handshake.NewCryptoSetup(connectionID, v, sCfg), streamCallback: streamCallback, lastObservedPacketNumber: 0, + Streams: make(map[protocol.StreamID]*Stream), } } @@ -115,7 +118,17 @@ func (s *Session) HandlePacket(addr *net.UDPAddr, publicHeaderBinary []byte, pub s.SendFrames([]frames.Frame{&frames.StreamFrame{StreamID: 1, Data: reply}}) } } else { - replyFrames := s.streamCallback(frame) + stream, ok := s.Streams[frame.StreamID] + if !ok { + stream = NewStream(frame.StreamID) + s.Streams[frame.StreamID] = stream + } + err := stream.AddStreamFrame(frame) + if err != nil { + return err + } + + replyFrames := s.streamCallback(stream) if replyFrames != nil { s.SendFrames(replyFrames) } diff --git a/stream.go b/stream.go index df52b54c..4dac00b9 100644 --- a/stream.go +++ b/stream.go @@ -2,19 +2,22 @@ package quic import ( "github.com/lucas-clemente/quic-go/frames" + "github.com/lucas-clemente/quic-go/protocol" "github.com/lucas-clemente/quic-go/utils" ) // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface type Stream struct { + StreamID protocol.StreamID StreamFrames chan *frames.StreamFrame CurrentFrame *frames.StreamFrame ReadPosInFrame int } // NewStream creates a new Stream -func NewStream() *Stream { +func NewStream(StreamID protocol.StreamID) *Stream { return &Stream{ + StreamID: StreamID, StreamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number } } diff --git a/stream_test.go b/stream_test.go index 916e2ad0..c554e246 100644 --- a/stream_test.go +++ b/stream_test.go @@ -14,7 +14,7 @@ var _ = Describe("Stream", func() { Offset: 0, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, } - stream := NewStream() + stream := NewStream(1337) stream.AddStreamFrame(&frame) b := make([]byte, 4) n, err := stream.Read(b) @@ -32,7 +32,7 @@ var _ = Describe("Stream", func() { Offset: 2, Data: []byte{0xBE, 0xEF}, } - stream := NewStream() + stream := NewStream(1337) stream.AddStreamFrame(&frame1) stream.AddStreamFrame(&frame2) b := make([]byte, 6) @@ -51,7 +51,7 @@ var _ = Describe("Stream", func() { Offset: 2, Data: []byte{0xBE, 0xEF}, } - stream := NewStream() + stream := NewStream(1337) stream.AddStreamFrame(&frame1) stream.AddStreamFrame(&frame2) b := make([]byte, 4) @@ -62,7 +62,7 @@ var _ = Describe("Stream", func() { }) It("waits until data is available", func() { - stream := NewStream() + stream := NewStream(1337) go func() { frame := frames.StreamFrame{ Offset: 0, @@ -86,7 +86,7 @@ var _ = Describe("Stream", func() { Offset: 1, Data: []byte{0xBE, 0xEF}, } - stream := NewStream() + stream := NewStream(1337) stream.AddStreamFrame(&frame1) err := stream.AddStreamFrame(&frame2) Expect(err).To(HaveOccurred())