use Stream object in the Session

This commit is contained in:
Marten Seemann 2016-04-17 00:19:23 +07:00
parent 66b52e915d
commit 3eabb4a46c
4 changed files with 27 additions and 12 deletions

View file

@ -32,10 +32,9 @@ func main() {
} }
} }
func handleStream(frame *frames.StreamFrame) []frames.Frame { func handleStream(stream *quic.Stream) []frames.Frame {
h2r := bytes.NewReader(frame.Data)
var reply bytes.Buffer var reply bytes.Buffer
h2framer := http2.NewFramer(&reply, h2r) h2framer := http2.NewFramer(&reply, stream)
h2framer.ReadMetaHeaders = hpack.NewDecoder(1024, nil) h2framer.ReadMetaHeaders = hpack.NewDecoder(1024, nil)
h2frame, err := h2framer.ReadFrame() h2frame, err := h2framer.ReadFrame()
if err != nil { if err != nil {
@ -55,7 +54,7 @@ func handleStream(frame *frames.StreamFrame) []frames.Frame {
BlockFragment: replyHeaders.Bytes(), BlockFragment: replyHeaders.Bytes(),
}) })
headerStreamFrame := &frames.StreamFrame{ headerStreamFrame := &frames.StreamFrame{
StreamID: frame.StreamID, StreamID: stream.StreamID,
Data: reply.Bytes(), Data: reply.Bytes(),
FinBit: true, FinBit: true,
} }

View file

@ -12,7 +12,7 @@ import (
) )
// StreamCallback gets a stream frame and returns a reply frame // StreamCallback gets a stream frame and returns a reply frame
type StreamCallback func(*frames.StreamFrame) []frames.Frame type StreamCallback func(*Stream) []frames.Frame
// A Session is a QUIC session // A Session is a QUIC session
type Session struct { type Session struct {
@ -30,6 +30,8 @@ type Session struct {
lastSentPacketNumber protocol.PacketNumber lastSentPacketNumber protocol.PacketNumber
lastObservedPacketNumber protocol.PacketNumber lastObservedPacketNumber protocol.PacketNumber
Streams map[protocol.StreamID]*Stream
streamCallback StreamCallback streamCallback StreamCallback
} }
@ -43,6 +45,7 @@ func NewSession(conn *net.UDPConn, v protocol.VersionNumber, connectionID protoc
cryptoSetup: handshake.NewCryptoSetup(connectionID, v, sCfg), cryptoSetup: handshake.NewCryptoSetup(connectionID, v, sCfg),
streamCallback: streamCallback, streamCallback: streamCallback,
lastObservedPacketNumber: 0, lastObservedPacketNumber: 0,
Streams: make(map[protocol.StreamID]*Stream),
} }
} }
@ -115,7 +118,17 @@ func (s *Session) HandlePacket(addr *net.UDPAddr, publicHeaderBinary []byte, pub
s.SendFrames([]frames.Frame{&frames.StreamFrame{StreamID: 1, Data: reply}}) s.SendFrames([]frames.Frame{&frames.StreamFrame{StreamID: 1, Data: reply}})
} }
} else { } else {
replyFrames := s.streamCallback(frame) stream, ok := s.Streams[frame.StreamID]
if !ok {
stream = NewStream(frame.StreamID)
s.Streams[frame.StreamID] = stream
}
err := stream.AddStreamFrame(frame)
if err != nil {
return err
}
replyFrames := s.streamCallback(stream)
if replyFrames != nil { if replyFrames != nil {
s.SendFrames(replyFrames) s.SendFrames(replyFrames)
} }

View file

@ -2,19 +2,22 @@ package quic
import ( import (
"github.com/lucas-clemente/quic-go/frames" "github.com/lucas-clemente/quic-go/frames"
"github.com/lucas-clemente/quic-go/protocol"
"github.com/lucas-clemente/quic-go/utils" "github.com/lucas-clemente/quic-go/utils"
) )
// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
type Stream struct { type Stream struct {
StreamID protocol.StreamID
StreamFrames chan *frames.StreamFrame StreamFrames chan *frames.StreamFrame
CurrentFrame *frames.StreamFrame CurrentFrame *frames.StreamFrame
ReadPosInFrame int ReadPosInFrame int
} }
// NewStream creates a new Stream // NewStream creates a new Stream
func NewStream() *Stream { func NewStream(StreamID protocol.StreamID) *Stream {
return &Stream{ return &Stream{
StreamID: StreamID,
StreamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number StreamFrames: make(chan *frames.StreamFrame, 8), // ToDo: add config option for this number
} }
} }

View file

@ -14,7 +14,7 @@ var _ = Describe("Stream", func() {
Offset: 0, Offset: 0,
Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
} }
stream := NewStream() stream := NewStream(1337)
stream.AddStreamFrame(&frame) stream.AddStreamFrame(&frame)
b := make([]byte, 4) b := make([]byte, 4)
n, err := stream.Read(b) n, err := stream.Read(b)
@ -32,7 +32,7 @@ var _ = Describe("Stream", func() {
Offset: 2, Offset: 2,
Data: []byte{0xBE, 0xEF}, Data: []byte{0xBE, 0xEF},
} }
stream := NewStream() stream := NewStream(1337)
stream.AddStreamFrame(&frame1) stream.AddStreamFrame(&frame1)
stream.AddStreamFrame(&frame2) stream.AddStreamFrame(&frame2)
b := make([]byte, 6) b := make([]byte, 6)
@ -51,7 +51,7 @@ var _ = Describe("Stream", func() {
Offset: 2, Offset: 2,
Data: []byte{0xBE, 0xEF}, Data: []byte{0xBE, 0xEF},
} }
stream := NewStream() stream := NewStream(1337)
stream.AddStreamFrame(&frame1) stream.AddStreamFrame(&frame1)
stream.AddStreamFrame(&frame2) stream.AddStreamFrame(&frame2)
b := make([]byte, 4) b := make([]byte, 4)
@ -62,7 +62,7 @@ var _ = Describe("Stream", func() {
}) })
It("waits until data is available", func() { It("waits until data is available", func() {
stream := NewStream() stream := NewStream(1337)
go func() { go func() {
frame := frames.StreamFrame{ frame := frames.StreamFrame{
Offset: 0, Offset: 0,
@ -86,7 +86,7 @@ var _ = Describe("Stream", func() {
Offset: 1, Offset: 1,
Data: []byte{0xBE, 0xEF}, Data: []byte{0xBE, 0xEF},
} }
stream := NewStream() stream := NewStream(1337)
stream.AddStreamFrame(&frame1) stream.AddStreamFrame(&frame1)
err := stream.AddStreamFrame(&frame2) err := stream.AddStreamFrame(&frame2)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())