use a single map in the incoming streams map

This commit is contained in:
Marten Seemann 2020-11-15 10:39:36 +07:00
parent 69158cf5f1
commit 0b267408c5
3 changed files with 72 additions and 60 deletions

View file

@ -8,16 +8,20 @@ import (
"github.com/lucas-clemente/quic-go/internal/wire"
)
// When a stream is deleted before it was accepted, we can't delete it from the map immediately.
// We need to wait until the application accepts it, and delete it then.
type itemEntry struct {
stream item
shouldDelete bool
}
//go:generate genny -in $GOFILE -out streams_map_incoming_bidi.go gen "item=streamI Item=BidiStream streamTypeGeneric=protocol.StreamTypeBidi"
//go:generate genny -in $GOFILE -out streams_map_incoming_uni.go gen "item=receiveStreamI Item=UniStream streamTypeGeneric=protocol.StreamTypeUni"
type incomingItemsMap struct {
mutex sync.RWMutex
newStreamChan chan struct{}
streams map[protocol.StreamNum]item
// When a stream is deleted before it was accepted, we can't delete it immediately.
// We need to wait until the application accepts it, and delete it immediately then.
streamsToDelete map[protocol.StreamNum]struct{} // used as a set
streams map[protocol.StreamNum]itemEntry
nextStreamToAccept protocol.StreamNum // the next stream that will be returned by AcceptStream()
nextStreamToOpen protocol.StreamNum // the highest stream that the peer opened
@ -37,8 +41,7 @@ func newIncomingItemsMap(
) *incomingItemsMap {
return &incomingItemsMap{
newStreamChan: make(chan struct{}, 1),
streams: make(map[protocol.StreamNum]item),
streamsToDelete: make(map[protocol.StreamNum]struct{}),
streams: make(map[protocol.StreamNum]itemEntry),
maxStream: protocol.StreamNum(maxStreams),
maxNumStreams: maxStreams,
newStream: newStream,
@ -58,7 +61,7 @@ func (m *incomingItemsMap) AcceptStream(ctx context.Context) (item, error) {
m.mutex.Lock()
var num protocol.StreamNum
var str item
var entry itemEntry
for {
num = m.nextStreamToAccept
if m.closeErr != nil {
@ -66,7 +69,7 @@ func (m *incomingItemsMap) AcceptStream(ctx context.Context) (item, error) {
return nil, m.closeErr
}
var ok bool
str, ok = m.streams[num]
entry, ok = m.streams[num]
if ok {
break
}
@ -80,15 +83,14 @@ func (m *incomingItemsMap) AcceptStream(ctx context.Context) (item, error) {
}
m.nextStreamToAccept++
// If this stream was completed before being accepted, we can delete it now.
if _, ok := m.streamsToDelete[num]; ok {
delete(m.streamsToDelete, num)
if entry.shouldDelete {
if err := m.deleteStream(num); err != nil {
m.mutex.Unlock()
return nil, err
}
}
m.mutex.Unlock()
return str, nil
return entry.stream, nil
}
func (m *incomingItemsMap) GetOrOpenStream(num protocol.StreamNum) (item, error) {
@ -106,8 +108,8 @@ func (m *incomingItemsMap) GetOrOpenStream(num protocol.StreamNum) (item, error)
if num < m.nextStreamToOpen {
var s item
// If the stream was already queued for deletion, and is just waiting to be accepted, don't return it.
if _, ok := m.streamsToDelete[num]; !ok {
s = m.streams[num]
if entry, ok := m.streams[num]; ok && !entry.shouldDelete {
s = entry.stream
}
m.mutex.RUnlock()
return s, nil
@ -119,16 +121,16 @@ func (m *incomingItemsMap) GetOrOpenStream(num protocol.StreamNum) (item, error)
// * maxStream can only increase, so if the id was valid before, it definitely is valid now
// * highestStream is only modified by this function
for newNum := m.nextStreamToOpen; newNum <= num; newNum++ {
m.streams[newNum] = m.newStream(newNum)
m.streams[newNum] = itemEntry{stream: m.newStream(newNum)}
select {
case m.newStreamChan <- struct{}{}:
default:
}
}
m.nextStreamToOpen = num + 1
s := m.streams[num]
entry := m.streams[num]
m.mutex.Unlock()
return s, nil
return entry.stream, nil
}
func (m *incomingItemsMap) DeleteStream(num protocol.StreamNum) error {
@ -149,13 +151,15 @@ func (m *incomingItemsMap) deleteStream(num protocol.StreamNum) error {
// Don't delete this stream yet, if it was not yet accepted.
// Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted.
if num >= m.nextStreamToAccept {
if _, ok := m.streamsToDelete[num]; ok {
entry, ok := m.streams[num]
if ok && entry.shouldDelete {
return streamError{
message: "Tried to delete incoming stream %d multiple times",
nums: []protocol.StreamNum{num},
}
}
m.streamsToDelete[num] = struct{}{}
entry.shouldDelete = true
m.streams[num] = entry // can't assign to struct in map, so we need to reassign
return nil
}
@ -178,8 +182,8 @@ func (m *incomingItemsMap) deleteStream(num protocol.StreamNum) error {
func (m *incomingItemsMap) CloseWithError(err error) {
m.mutex.Lock()
m.closeErr = err
for _, str := range m.streams {
str.closeForShutdown(err)
for _, entry := range m.streams {
entry.stream.closeForShutdown(err)
}
m.mutex.Unlock()
close(m.newStreamChan)