remove the flow control manager

This commit is contained in:
Marten Seemann 2017-10-17 11:14:58 +07:00
parent 6dc453caa3
commit 268c3859fc
25 changed files with 1710 additions and 1942 deletions

View file

@ -14,6 +14,24 @@ import (
"github.com/lucas-clemente/quic-go/internal/wire"
)
type streamI interface {
Stream
AddStreamFrame(*wire.StreamFrame) error
RegisterRemoteError(error, protocol.ByteCount) error
LenOfDataForWriting() protocol.ByteCount
GetDataForWriting(maxBytes protocol.ByteCount) []byte
GetWriteOffset() protocol.ByteCount
Finished() bool
Cancel(error)
ShouldSendFin() bool
SentFin()
// methods needed for flow control
GetWindowUpdate() protocol.ByteCount
UpdateSendWindow(protocol.ByteCount)
IsFlowControlBlocked() bool
}
// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
//
// Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
@ -56,10 +74,11 @@ type stream struct {
writeChan chan struct{}
writeDeadline time.Time
flowControlManager flowcontrol.FlowControlManager
flowController flowcontrol.StreamFlowController
}
var _ Stream = &stream{}
var _ streamI = &stream{}
type deadlineError struct{}
@ -73,15 +92,16 @@ var errDeadline net.Error = &deadlineError{}
func newStream(StreamID protocol.StreamID,
onData func(),
onReset func(protocol.StreamID, protocol.ByteCount),
flowControlManager flowcontrol.FlowControlManager) *stream {
flowController flowcontrol.StreamFlowController,
) *stream {
s := &stream{
onData: onData,
onReset: onReset,
streamID: StreamID,
flowControlManager: flowControlManager,
frameQueue: newStreamFrameSorter(),
readChan: make(chan struct{}, 1),
writeChan: make(chan struct{}, 1),
onData: onData,
onReset: onReset,
streamID: StreamID,
flowController: flowController,
frameQueue: newStreamFrameSorter(),
readChan: make(chan struct{}, 1),
writeChan: make(chan struct{}, 1),
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
return s
@ -162,7 +182,7 @@ func (s *stream) Read(p []byte) (int, error) {
// when a RST_STREAM was received, the was already informed about the final byteOffset for this stream
if !s.resetRemotely.Get() {
s.flowControlManager.AddBytesRead(s.streamID, protocol.ByteCount(m))
s.flowController.AddBytesRead(protocol.ByteCount(m))
}
s.onData() // so that a possible WINDOW_UPDATE is sent
@ -231,7 +251,11 @@ func (s *stream) Write(p []byte) (int, error) {
return len(p), nil
}
func (s *stream) lenOfDataForWriting() protocol.ByteCount {
func (s *stream) GetWriteOffset() protocol.ByteCount {
return s.writeOffset
}
func (s *stream) LenOfDataForWriting() protocol.ByteCount {
s.mutex.Lock()
var l protocol.ByteCount
if s.err == nil {
@ -241,7 +265,7 @@ func (s *stream) lenOfDataForWriting() protocol.ByteCount {
return l
}
func (s *stream) getDataForWriting(maxBytes protocol.ByteCount) []byte {
func (s *stream) GetDataForWriting(maxBytes protocol.ByteCount) []byte {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -249,6 +273,14 @@ func (s *stream) getDataForWriting(maxBytes protocol.ByteCount) []byte {
return nil
}
// TODO(#657): Flow control for the crypto stream
if s.streamID != 1 {
maxBytes = utils.MinByteCount(maxBytes, s.flowController.SendWindowSize())
}
if maxBytes == 0 {
return nil
}
var ret []byte
if protocol.ByteCount(len(s.dataForWriting)) > maxBytes {
ret = s.dataForWriting[:maxBytes]
@ -259,6 +291,7 @@ func (s *stream) getDataForWriting(maxBytes protocol.ByteCount) []byte {
s.signalWrite()
}
s.writeOffset += protocol.ByteCount(len(ret))
s.flowController.AddBytesSent(protocol.ByteCount(len(ret)))
return ret
}
@ -277,29 +310,27 @@ func (s *stream) shouldSendReset() bool {
return (s.resetLocally.Get() || s.resetRemotely.Get()) && !s.finishedWriteAndSentFin()
}
func (s *stream) shouldSendFin() bool {
func (s *stream) ShouldSendFin() bool {
s.mutex.Lock()
res := s.finishedWriting.Get() && !s.finSent.Get() && s.err == nil && s.dataForWriting == nil
s.mutex.Unlock()
return res
}
func (s *stream) sentFin() {
func (s *stream) SentFin() {
s.finSent.Set(true)
}
// AddStreamFrame adds a new stream frame
func (s *stream) AddStreamFrame(frame *wire.StreamFrame) error {
maxOffset := frame.Offset + frame.DataLen()
err := s.flowControlManager.UpdateHighestReceived(s.streamID, maxOffset)
if err != nil {
if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
return err
}
s.mutex.Lock()
defer s.mutex.Unlock()
err = s.frameQueue.Push(frame)
if err != nil && err != errDuplicateStreamData {
if err := s.frameQueue.Push(frame); err != nil && err != errDuplicateStreamData {
return err
}
s.signalRead()
@ -393,9 +424,9 @@ func (s *stream) Reset(err error) {
}
// resets the stream remotely
func (s *stream) RegisterRemoteError(err error) {
func (s *stream) RegisterRemoteError(err error, offset protocol.ByteCount) error {
if s.resetRemotely.Get() {
return
return nil
}
s.mutex.Lock()
s.resetRemotely.Set(true)
@ -405,18 +436,22 @@ func (s *stream) RegisterRemoteError(err error) {
s.err = err
s.signalWrite()
}
if err := s.flowController.UpdateHighestReceived(offset, true); err != nil {
return err
}
if s.shouldSendReset() {
s.onReset(s.streamID, s.writeOffset)
s.rstSent.Set(true)
}
s.mutex.Unlock()
return nil
}
func (s *stream) finishedWriteAndSentFin() bool {
return s.finishedWriting.Get() && s.finSent.Get()
}
func (s *stream) finished() bool {
func (s *stream) Finished() bool {
return s.cancelled.Get() ||
(s.finishedReading.Get() && s.finishedWriteAndSentFin()) ||
(s.resetRemotely.Get() && s.rstSent.Get()) ||
@ -431,3 +466,15 @@ func (s *stream) Context() context.Context {
func (s *stream) StreamID() protocol.StreamID {
return s.streamID
}
func (s *stream) UpdateSendWindow(n protocol.ByteCount) {
s.flowController.UpdateSendWindow(n)
}
func (s *stream) IsFlowControlBlocked() bool {
return s.flowController.IsBlocked()
}
func (s *stream) GetWindowUpdate() protocol.ByteCount {
return s.flowController.GetWindowUpdate()
}