diff --git a/.github/workflows/go-generate.sh b/.github/workflows/go-generate.sh index 37edcacc..cd84098e 100755 --- a/.github/workflows/go-generate.sh +++ b/.github/workflows/go-generate.sh @@ -11,10 +11,6 @@ cp -r "$DIR" generated cd generated # delete all go-generated files generated (that adhere to the comment convention) grep --include \*.go -lrIZ "^// Code generated .* DO NOT EDIT\.$" . | xargs --null rm -# delete all files generated by Genny -grep --include \*.go -lrIZ "This file was automatically generated by genny." . | xargs --null rm -# first generate Genny files to make the code compile -grep --include \*.go -lrI "//go:generate genny" | xargs -L 1 go generate # now generate everything go generate ./... cd .. diff --git a/.github/workflows/go-generate.yml b/.github/workflows/go-generate.yml index a56a664d..cfe9d1a5 100644 --- a/.github/workflows/go-generate.yml +++ b/.github/workflows/go-generate.yml @@ -11,7 +11,6 @@ jobs: run: go build - name: Install code generators run: | - go install -v github.com/cheekybits/genny go install -v github.com/golang/mock/mockgen go install -v golang.org/x/tools/cmd/goimports - name: Run code generators diff --git a/go.mod b/go.mod index 85341176..bb21d544 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/lucas-clemente/quic-go go 1.18 require ( - github.com/cheekybits/genny v1.0.0 github.com/francoispqt/gojay v1.2.13 github.com/golang/mock v1.6.0 github.com/marten-seemann/qpack v0.2.1 diff --git a/go.sum b/go.sum index 6b40e48c..81a90dcd 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,6 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= -github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/integrationtests/gomodvendor/go.sum b/integrationtests/gomodvendor/go.sum index e3b0a913..99d733e0 100644 --- a/integrationtests/gomodvendor/go.sum +++ b/integrationtests/gomodvendor/go.sum @@ -12,8 +12,6 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= -github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/streams_map.go b/streams_map.go index b7fbeaa1..e9f0c2e1 100644 --- a/streams_map.go +++ b/streams_map.go @@ -57,8 +57,8 @@ type streamsMap struct { mutex sync.Mutex outgoingBidiStreams *outgoingStreamsMap[streamI] outgoingUniStreams *outgoingStreamsMap[sendStreamI] - incomingBidiStreams *incomingBidiStreamsMap - incomingUniStreams *incomingUniStreamsMap + incomingBidiStreams *incomingStreamsMap[streamI] + incomingUniStreams *incomingStreamsMap[receiveStreamI] reset bool } @@ -93,7 +93,8 @@ func (m *streamsMap) initMaps() { }, m.sender.queueControlFrame, ) - m.incomingBidiStreams = newIncomingBidiStreamsMap( + m.incomingBidiStreams = newIncomingStreamsMap( + protocol.StreamTypeBidi, func(num protocol.StreamNum) streamI { id := num.StreamID(protocol.StreamTypeBidi, m.perspective.Opposite()) return newStream(id, m.sender, m.newFlowController(id), m.version) @@ -109,7 +110,8 @@ func (m *streamsMap) initMaps() { }, m.sender.queueControlFrame, ) - m.incomingUniStreams = newIncomingUniStreamsMap( + m.incomingUniStreams = newIncomingStreamsMap( + protocol.StreamTypeUni, func(num protocol.StreamNum) receiveStreamI { id := num.StreamID(protocol.StreamTypeUni, m.perspective.Opposite()) return newReceiveStream(id, m.sender, m.newFlowController(id), m.version) diff --git a/streams_map_generic_helper.go b/streams_map_generic_helper.go deleted file mode 100644 index 26b56233..00000000 --- a/streams_map_generic_helper.go +++ /dev/null @@ -1,18 +0,0 @@ -package quic - -import ( - "github.com/cheekybits/genny/generic" - - "github.com/lucas-clemente/quic-go/internal/protocol" -) - -// In the auto-generated streams maps, we need to be able to close the streams. -// Therefore, extend the generic.Type with the stream close method. -// This definition must be in a file that Genny doesn't process. -type item interface { - generic.Type - updateSendWindow(protocol.ByteCount) - closeForShutdown(error) -} - -const streamTypeGeneric protocol.StreamType = protocol.StreamTypeUni diff --git a/streams_map_incoming_uni.go b/streams_map_incoming.go similarity index 78% rename from streams_map_incoming_uni.go rename to streams_map_incoming.go index 5bddec00..6fe0c61b 100644 --- a/streams_map_incoming_uni.go +++ b/streams_map_incoming.go @@ -1,7 +1,3 @@ -// This file was automatically generated by genny. -// Any changes will be lost if this file is regenerated. -// see https://github.com/cheekybits/genny - package quic import ( @@ -12,38 +8,45 @@ import ( "github.com/lucas-clemente/quic-go/internal/wire" ) +type incomingStream interface { + closeForShutdown(error) +} + // When a stream is deleted before it was accepted, we can't delete it from the map immediately. // We need to wait until the application accepts it, and delete it then. -type receiveStreamIEntry struct { - stream receiveStreamI +type incomingStreamEntry[T incomingStream] struct { + stream T shouldDelete bool } -type incomingUniStreamsMap struct { +type incomingStreamsMap[T incomingStream] struct { mutex sync.RWMutex newStreamChan chan struct{} - streams map[protocol.StreamNum]receiveStreamIEntry + streamType protocol.StreamType + streams map[protocol.StreamNum]incomingStreamEntry[T] nextStreamToAccept protocol.StreamNum // the next stream that will be returned by AcceptStream() nextStreamToOpen protocol.StreamNum // the highest stream that the peer opened maxStream protocol.StreamNum // the highest stream that the peer is allowed to open maxNumStreams uint64 // maximum number of streams - newStream func(protocol.StreamNum) receiveStreamI + newStream func(protocol.StreamNum) T queueMaxStreamID func(*wire.MaxStreamsFrame) closeErr error } -func newIncomingUniStreamsMap( - newStream func(protocol.StreamNum) receiveStreamI, +func newIncomingStreamsMap[T incomingStream]( + streamType protocol.StreamType, + newStream func(protocol.StreamNum) T, maxStreams uint64, queueControlFrame func(wire.Frame), -) *incomingUniStreamsMap { - return &incomingUniStreamsMap{ +) *incomingStreamsMap[T] { + return &incomingStreamsMap[T]{ newStreamChan: make(chan struct{}, 1), - streams: make(map[protocol.StreamNum]receiveStreamIEntry), + streamType: streamType, + streams: make(map[protocol.StreamNum]incomingStreamEntry[T]), maxStream: protocol.StreamNum(maxStreams), maxNumStreams: maxStreams, newStream: newStream, @@ -53,7 +56,7 @@ func newIncomingUniStreamsMap( } } -func (m *incomingUniStreamsMap) AcceptStream(ctx context.Context) (receiveStreamI, error) { +func (m *incomingStreamsMap[T]) AcceptStream(ctx context.Context) (T, error) { // drain the newStreamChan, so we don't check the map twice if the stream doesn't exist select { case <-m.newStreamChan: @@ -63,12 +66,12 @@ func (m *incomingUniStreamsMap) AcceptStream(ctx context.Context) (receiveStream m.mutex.Lock() var num protocol.StreamNum - var entry receiveStreamIEntry + var entry incomingStreamEntry[T] for { num = m.nextStreamToAccept if m.closeErr != nil { m.mutex.Unlock() - return nil, m.closeErr + return *new(T), m.closeErr } var ok bool entry, ok = m.streams[num] @@ -78,7 +81,7 @@ func (m *incomingUniStreamsMap) AcceptStream(ctx context.Context) (receiveStream m.mutex.Unlock() select { case <-ctx.Done(): - return nil, ctx.Err() + return *new(T), ctx.Err() case <-m.newStreamChan: } m.mutex.Lock() @@ -88,18 +91,18 @@ func (m *incomingUniStreamsMap) AcceptStream(ctx context.Context) (receiveStream if entry.shouldDelete { if err := m.deleteStream(num); err != nil { m.mutex.Unlock() - return nil, err + return *new(T), err } } m.mutex.Unlock() return entry.stream, nil } -func (m *incomingUniStreamsMap) GetOrOpenStream(num protocol.StreamNum) (receiveStreamI, error) { +func (m *incomingStreamsMap[T]) GetOrOpenStream(num protocol.StreamNum) (T, error) { m.mutex.RLock() if num > m.maxStream { m.mutex.RUnlock() - return nil, streamError{ + return *new(T), streamError{ message: "peer tried to open stream %d (current limit: %d)", nums: []protocol.StreamNum{num, m.maxStream}, } @@ -108,7 +111,7 @@ func (m *incomingUniStreamsMap) GetOrOpenStream(num protocol.StreamNum) (receive // * this stream exists in the map, and we can return it, or // * this stream was already closed, then we can return the nil if num < m.nextStreamToOpen { - var s receiveStreamI + var s T // If the stream was already queued for deletion, and is just waiting to be accepted, don't return it. if entry, ok := m.streams[num]; ok && !entry.shouldDelete { s = entry.stream @@ -123,7 +126,7 @@ func (m *incomingUniStreamsMap) GetOrOpenStream(num protocol.StreamNum) (receive // * maxStream can only increase, so if the id was valid before, it definitely is valid now // * highestStream is only modified by this function for newNum := m.nextStreamToOpen; newNum <= num; newNum++ { - m.streams[newNum] = receiveStreamIEntry{stream: m.newStream(newNum)} + m.streams[newNum] = incomingStreamEntry[T]{stream: m.newStream(newNum)} select { case m.newStreamChan <- struct{}{}: default: @@ -135,14 +138,14 @@ func (m *incomingUniStreamsMap) GetOrOpenStream(num protocol.StreamNum) (receive return entry.stream, nil } -func (m *incomingUniStreamsMap) DeleteStream(num protocol.StreamNum) error { +func (m *incomingStreamsMap[T]) DeleteStream(num protocol.StreamNum) error { m.mutex.Lock() defer m.mutex.Unlock() return m.deleteStream(num) } -func (m *incomingUniStreamsMap) deleteStream(num protocol.StreamNum) error { +func (m *incomingStreamsMap[T]) deleteStream(num protocol.StreamNum) error { if _, ok := m.streams[num]; !ok { return streamError{ message: "tried to delete unknown incoming stream %d", @@ -173,7 +176,7 @@ func (m *incomingUniStreamsMap) deleteStream(num protocol.StreamNum) error { if maxStream <= protocol.MaxStreamCount { m.maxStream = maxStream m.queueMaxStreamID(&wire.MaxStreamsFrame{ - Type: protocol.StreamTypeUni, + Type: m.streamType, MaxStreamNum: m.maxStream, }) } @@ -181,7 +184,7 @@ func (m *incomingUniStreamsMap) deleteStream(num protocol.StreamNum) error { return nil } -func (m *incomingUniStreamsMap) CloseWithError(err error) { +func (m *incomingStreamsMap[T]) CloseWithError(err error) { m.mutex.Lock() m.closeErr = err for _, entry := range m.streams { diff --git a/streams_map_incoming_bidi.go b/streams_map_incoming_bidi.go deleted file mode 100644 index 46c8c73a..00000000 --- a/streams_map_incoming_bidi.go +++ /dev/null @@ -1,192 +0,0 @@ -// This file was automatically generated by genny. -// Any changes will be lost if this file is regenerated. -// see https://github.com/cheekybits/genny - -package quic - -import ( - "context" - "sync" - - "github.com/lucas-clemente/quic-go/internal/protocol" - "github.com/lucas-clemente/quic-go/internal/wire" -) - -// When a stream is deleted before it was accepted, we can't delete it from the map immediately. -// We need to wait until the application accepts it, and delete it then. -type streamIEntry struct { - stream streamI - shouldDelete bool -} - -type incomingBidiStreamsMap struct { - mutex sync.RWMutex - newStreamChan chan struct{} - - streams map[protocol.StreamNum]streamIEntry - - nextStreamToAccept protocol.StreamNum // the next stream that will be returned by AcceptStream() - nextStreamToOpen protocol.StreamNum // the highest stream that the peer opened - maxStream protocol.StreamNum // the highest stream that the peer is allowed to open - maxNumStreams uint64 // maximum number of streams - - newStream func(protocol.StreamNum) streamI - queueMaxStreamID func(*wire.MaxStreamsFrame) - - closeErr error -} - -func newIncomingBidiStreamsMap( - newStream func(protocol.StreamNum) streamI, - maxStreams uint64, - queueControlFrame func(wire.Frame), -) *incomingBidiStreamsMap { - return &incomingBidiStreamsMap{ - newStreamChan: make(chan struct{}, 1), - streams: make(map[protocol.StreamNum]streamIEntry), - maxStream: protocol.StreamNum(maxStreams), - maxNumStreams: maxStreams, - newStream: newStream, - nextStreamToOpen: 1, - nextStreamToAccept: 1, - queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) }, - } -} - -func (m *incomingBidiStreamsMap) AcceptStream(ctx context.Context) (streamI, error) { - // drain the newStreamChan, so we don't check the map twice if the stream doesn't exist - select { - case <-m.newStreamChan: - default: - } - - m.mutex.Lock() - - var num protocol.StreamNum - var entry streamIEntry - for { - num = m.nextStreamToAccept - if m.closeErr != nil { - m.mutex.Unlock() - return nil, m.closeErr - } - var ok bool - entry, ok = m.streams[num] - if ok { - break - } - m.mutex.Unlock() - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-m.newStreamChan: - } - m.mutex.Lock() - } - m.nextStreamToAccept++ - // If this stream was completed before being accepted, we can delete it now. - if entry.shouldDelete { - if err := m.deleteStream(num); err != nil { - m.mutex.Unlock() - return nil, err - } - } - m.mutex.Unlock() - return entry.stream, nil -} - -func (m *incomingBidiStreamsMap) GetOrOpenStream(num protocol.StreamNum) (streamI, error) { - m.mutex.RLock() - if num > m.maxStream { - m.mutex.RUnlock() - return nil, streamError{ - message: "peer tried to open stream %d (current limit: %d)", - nums: []protocol.StreamNum{num, m.maxStream}, - } - } - // if the num is smaller than the highest we accepted - // * this stream exists in the map, and we can return it, or - // * this stream was already closed, then we can return the nil - if num < m.nextStreamToOpen { - var s streamI - // If the stream was already queued for deletion, and is just waiting to be accepted, don't return it. - if entry, ok := m.streams[num]; ok && !entry.shouldDelete { - s = entry.stream - } - m.mutex.RUnlock() - return s, nil - } - m.mutex.RUnlock() - - m.mutex.Lock() - // no need to check the two error conditions from above again - // * maxStream can only increase, so if the id was valid before, it definitely is valid now - // * highestStream is only modified by this function - for newNum := m.nextStreamToOpen; newNum <= num; newNum++ { - m.streams[newNum] = streamIEntry{stream: m.newStream(newNum)} - select { - case m.newStreamChan <- struct{}{}: - default: - } - } - m.nextStreamToOpen = num + 1 - entry := m.streams[num] - m.mutex.Unlock() - return entry.stream, nil -} - -func (m *incomingBidiStreamsMap) DeleteStream(num protocol.StreamNum) error { - m.mutex.Lock() - defer m.mutex.Unlock() - - return m.deleteStream(num) -} - -func (m *incomingBidiStreamsMap) deleteStream(num protocol.StreamNum) error { - if _, ok := m.streams[num]; !ok { - return streamError{ - message: "tried to delete unknown incoming stream %d", - nums: []protocol.StreamNum{num}, - } - } - - // Don't delete this stream yet, if it was not yet accepted. - // Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted. - if num >= m.nextStreamToAccept { - entry, ok := m.streams[num] - if ok && entry.shouldDelete { - return streamError{ - message: "tried to delete incoming stream %d multiple times", - nums: []protocol.StreamNum{num}, - } - } - entry.shouldDelete = true - m.streams[num] = entry // can't assign to struct in map, so we need to reassign - return nil - } - - delete(m.streams, num) - // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream - if m.maxNumStreams > uint64(len(m.streams)) { - maxStream := m.nextStreamToOpen + protocol.StreamNum(m.maxNumStreams-uint64(len(m.streams))) - 1 - // Never send a value larger than protocol.MaxStreamCount. - if maxStream <= protocol.MaxStreamCount { - m.maxStream = maxStream - m.queueMaxStreamID(&wire.MaxStreamsFrame{ - Type: protocol.StreamTypeBidi, - MaxStreamNum: m.maxStream, - }) - } - } - return nil -} - -func (m *incomingBidiStreamsMap) CloseWithError(err error) { - m.mutex.Lock() - m.closeErr = err - for _, entry := range m.streams { - entry.stream.closeForShutdown(err) - } - m.mutex.Unlock() - close(m.newStreamChan) -} diff --git a/streams_map_incoming_generic.go b/streams_map_incoming_generic.go deleted file mode 100644 index 4c7696a0..00000000 --- a/streams_map_incoming_generic.go +++ /dev/null @@ -1,190 +0,0 @@ -package quic - -import ( - "context" - "sync" - - "github.com/lucas-clemente/quic-go/internal/protocol" - "github.com/lucas-clemente/quic-go/internal/wire" -) - -// When a stream is deleted before it was accepted, we can't delete it from the map immediately. -// We need to wait until the application accepts it, and delete it then. -type itemEntry struct { - stream item - shouldDelete bool -} - -//go:generate genny -in $GOFILE -out streams_map_incoming_bidi.go gen "item=streamI Item=BidiStream streamTypeGeneric=protocol.StreamTypeBidi" -//go:generate genny -in $GOFILE -out streams_map_incoming_uni.go gen "item=receiveStreamI Item=UniStream streamTypeGeneric=protocol.StreamTypeUni" -type incomingItemsMap struct { - mutex sync.RWMutex - newStreamChan chan struct{} - - streams map[protocol.StreamNum]itemEntry - - nextStreamToAccept protocol.StreamNum // the next stream that will be returned by AcceptStream() - nextStreamToOpen protocol.StreamNum // the highest stream that the peer opened - maxStream protocol.StreamNum // the highest stream that the peer is allowed to open - maxNumStreams uint64 // maximum number of streams - - newStream func(protocol.StreamNum) item - queueMaxStreamID func(*wire.MaxStreamsFrame) - - closeErr error -} - -func newIncomingItemsMap( - newStream func(protocol.StreamNum) item, - maxStreams uint64, - queueControlFrame func(wire.Frame), -) *incomingItemsMap { - return &incomingItemsMap{ - newStreamChan: make(chan struct{}, 1), - streams: make(map[protocol.StreamNum]itemEntry), - maxStream: protocol.StreamNum(maxStreams), - maxNumStreams: maxStreams, - newStream: newStream, - nextStreamToOpen: 1, - nextStreamToAccept: 1, - queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) }, - } -} - -func (m *incomingItemsMap) AcceptStream(ctx context.Context) (item, error) { - // drain the newStreamChan, so we don't check the map twice if the stream doesn't exist - select { - case <-m.newStreamChan: - default: - } - - m.mutex.Lock() - - var num protocol.StreamNum - var entry itemEntry - for { - num = m.nextStreamToAccept - if m.closeErr != nil { - m.mutex.Unlock() - return nil, m.closeErr - } - var ok bool - entry, ok = m.streams[num] - if ok { - break - } - m.mutex.Unlock() - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-m.newStreamChan: - } - m.mutex.Lock() - } - m.nextStreamToAccept++ - // If this stream was completed before being accepted, we can delete it now. - if entry.shouldDelete { - if err := m.deleteStream(num); err != nil { - m.mutex.Unlock() - return nil, err - } - } - m.mutex.Unlock() - return entry.stream, nil -} - -func (m *incomingItemsMap) GetOrOpenStream(num protocol.StreamNum) (item, error) { - m.mutex.RLock() - if num > m.maxStream { - m.mutex.RUnlock() - return nil, streamError{ - message: "peer tried to open stream %d (current limit: %d)", - nums: []protocol.StreamNum{num, m.maxStream}, - } - } - // if the num is smaller than the highest we accepted - // * this stream exists in the map, and we can return it, or - // * this stream was already closed, then we can return the nil - if num < m.nextStreamToOpen { - var s item - // If the stream was already queued for deletion, and is just waiting to be accepted, don't return it. - if entry, ok := m.streams[num]; ok && !entry.shouldDelete { - s = entry.stream - } - m.mutex.RUnlock() - return s, nil - } - m.mutex.RUnlock() - - m.mutex.Lock() - // no need to check the two error conditions from above again - // * maxStream can only increase, so if the id was valid before, it definitely is valid now - // * highestStream is only modified by this function - for newNum := m.nextStreamToOpen; newNum <= num; newNum++ { - m.streams[newNum] = itemEntry{stream: m.newStream(newNum)} - select { - case m.newStreamChan <- struct{}{}: - default: - } - } - m.nextStreamToOpen = num + 1 - entry := m.streams[num] - m.mutex.Unlock() - return entry.stream, nil -} - -func (m *incomingItemsMap) DeleteStream(num protocol.StreamNum) error { - m.mutex.Lock() - defer m.mutex.Unlock() - - return m.deleteStream(num) -} - -func (m *incomingItemsMap) deleteStream(num protocol.StreamNum) error { - if _, ok := m.streams[num]; !ok { - return streamError{ - message: "tried to delete unknown incoming stream %d", - nums: []protocol.StreamNum{num}, - } - } - - // Don't delete this stream yet, if it was not yet accepted. - // Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted. - if num >= m.nextStreamToAccept { - entry, ok := m.streams[num] - if ok && entry.shouldDelete { - return streamError{ - message: "tried to delete incoming stream %d multiple times", - nums: []protocol.StreamNum{num}, - } - } - entry.shouldDelete = true - m.streams[num] = entry // can't assign to struct in map, so we need to reassign - return nil - } - - delete(m.streams, num) - // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream - if m.maxNumStreams > uint64(len(m.streams)) { - maxStream := m.nextStreamToOpen + protocol.StreamNum(m.maxNumStreams-uint64(len(m.streams))) - 1 - // Never send a value larger than protocol.MaxStreamCount. - if maxStream <= protocol.MaxStreamCount { - m.maxStream = maxStream - m.queueMaxStreamID(&wire.MaxStreamsFrame{ - Type: streamTypeGeneric, - MaxStreamNum: m.maxStream, - }) - } - } - return nil -} - -func (m *incomingItemsMap) CloseWithError(err error) { - m.mutex.Lock() - m.closeErr = err - for _, entry := range m.streams { - entry.stream.closeForShutdown(err) - } - m.mutex.Unlock() - close(m.newStreamChan) -} diff --git a/streams_map_incoming_generic_test.go b/streams_map_incoming_test.go similarity index 89% rename from streams_map_incoming_generic_test.go rename to streams_map_incoming_test.go index d84aa047..3001ad20 100644 --- a/streams_map_incoming_generic_test.go +++ b/streams_map_incoming_test.go @@ -34,11 +34,12 @@ func (s *mockGenericStream) updateSendWindow(limit protocol.ByteCount) { var _ = Describe("Streams Map (incoming)", func() { var ( - m *incomingItemsMap + m *incomingStreamsMap[*mockGenericStream] newItemCounter int mockSender *MockStreamSender maxNumStreams uint64 ) + streamType := []protocol.StreamType{protocol.StreamTypeUni, protocol.StreamTypeUni}[rand.Intn(2)] // check that the frame can be serialized and deserialized checkFrameSerialization := func(f wire.Frame) { @@ -54,8 +55,9 @@ var _ = Describe("Streams Map (incoming)", func() { JustBeforeEach(func() { newItemCounter = 0 mockSender = NewMockStreamSender(mockCtrl) - m = newIncomingItemsMap( - func(num protocol.StreamNum) item { + m = newIncomingStreamsMap( + streamType, + func(num protocol.StreamNum) *mockGenericStream { newItemCounter++ return &mockGenericStream{num: num} }, @@ -85,16 +87,16 @@ var _ = Describe("Streams Map (incoming)", func() { Expect(err).ToNot(HaveOccurred()) str, err := m.AcceptStream(context.Background()) Expect(err).ToNot(HaveOccurred()) - Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) + Expect(str.num).To(Equal(protocol.StreamNum(1))) str, err = m.AcceptStream(context.Background()) Expect(err).ToNot(HaveOccurred()) - Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2))) + Expect(str.num).To(Equal(protocol.StreamNum(2))) }) It("allows opening the maximum stream ID", func() { str, err := m.GetOrOpenStream(1) Expect(err).ToNot(HaveOccurred()) - Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) + Expect(str.num).To(Equal(protocol.StreamNum(1))) }) It("errors when trying to get a stream ID higher than the maximum", func() { @@ -104,7 +106,7 @@ var _ = Describe("Streams Map (incoming)", func() { }) It("blocks AcceptStream until a new stream is available", func() { - strChan := make(chan item) + strChan := make(chan *mockGenericStream) go func() { defer GinkgoRecover() str, err := m.AcceptStream(context.Background()) @@ -114,10 +116,10 @@ var _ = Describe("Streams Map (incoming)", func() { Consistently(strChan).ShouldNot(Receive()) str, err := m.GetOrOpenStream(1) Expect(err).ToNot(HaveOccurred()) - Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) - var acceptedStr item + Expect(str.num).To(Equal(protocol.StreamNum(1))) + var acceptedStr *mockGenericStream Eventually(strChan).Should(Receive(&acceptedStr)) - Expect(acceptedStr.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) + Expect(acceptedStr.num).To(Equal(protocol.StreamNum(1))) }) It("unblocks AcceptStream when the context is canceled", func() { @@ -162,10 +164,10 @@ var _ = Describe("Streams Map (incoming)", func() { Expect(err).ToNot(HaveOccurred()) testErr := errors.New("test err") m.CloseWithError(testErr) - Expect(str1.(*mockGenericStream).closed).To(BeTrue()) - Expect(str1.(*mockGenericStream).closeErr).To(MatchError(testErr)) - Expect(str2.(*mockGenericStream).closed).To(BeTrue()) - Expect(str2.(*mockGenericStream).closeErr).To(MatchError(testErr)) + Expect(str1.closed).To(BeTrue()) + Expect(str1.closeErr).To(MatchError(testErr)) + Expect(str2.closed).To(BeTrue()) + Expect(str2.closeErr).To(MatchError(testErr)) }) It("deletes streams", func() { @@ -174,7 +176,7 @@ var _ = Describe("Streams Map (incoming)", func() { Expect(err).ToNot(HaveOccurred()) str, err := m.AcceptStream(context.Background()) Expect(err).ToNot(HaveOccurred()) - Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) + Expect(str.num).To(Equal(protocol.StreamNum(1))) Expect(m.DeleteStream(1)).To(Succeed()) str, err = m.GetOrOpenStream(1) Expect(err).ToNot(HaveOccurred()) @@ -187,12 +189,12 @@ var _ = Describe("Streams Map (incoming)", func() { Expect(m.DeleteStream(2)).To(Succeed()) str, err := m.AcceptStream(context.Background()) Expect(err).ToNot(HaveOccurred()) - Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1))) + Expect(str.num).To(Equal(protocol.StreamNum(1))) // when accepting this stream, it will get deleted, and a MAX_STREAMS frame is queued mockSender.EXPECT().queueControlFrame(gomock.Any()) str, err = m.AcceptStream(context.Background()) Expect(err).ToNot(HaveOccurred()) - Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2))) + Expect(str.num).To(Equal(protocol.StreamNum(2))) }) It("doesn't return a stream queued for deleting from GetOrOpenStream", func() { @@ -226,7 +228,9 @@ var _ = Describe("Streams Map (incoming)", func() { Expect(err).ToNot(HaveOccurred()) } mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { - Expect(f.(*wire.MaxStreamsFrame).MaxStreamNum).To(Equal(protocol.StreamNum(maxNumStreams + 1))) + msf := f.(*wire.MaxStreamsFrame) + Expect(msf.Type).To(BeEquivalentTo(streamType)) + Expect(msf.MaxStreamNum).To(Equal(protocol.StreamNum(maxNumStreams + 1))) checkFrameSerialization(f) }) Expect(m.DeleteStream(3)).To(Succeed()) diff --git a/tools.go b/tools.go index a0204588..03ddd063 100644 --- a/tools.go +++ b/tools.go @@ -3,6 +3,6 @@ package quic import ( - _ "github.com/cheekybits/genny" + _ "github.com/golang/mock/mockgen" _ "github.com/onsi/ginkgo/ginkgo" )