fix busy-looping in OpenStreamSync

This commit is contained in:
Marten Seemann 2020-10-16 11:47:11 +07:00
parent 1aa35722a1
commit e94b5e8234
3 changed files with 27 additions and 9 deletions

View file

@ -106,6 +106,7 @@ func (m *outgoingBidiStreamsMap) OpenStreamSync(ctx context.Context) (streamI, e
} }
str := m.openStream() str := m.openStream()
delete(m.openQueue, queuePos) delete(m.openQueue, queuePos)
m.lowestInQueue = queuePos + 1
m.unblockOpenSync() m.unblockOpenSync()
return str, nil return str, nil
} }
@ -172,8 +173,10 @@ func (m *outgoingBidiStreamsMap) SetMaxStream(num protocol.StreamNum) {
m.maxStream = num m.maxStream = num
m.blockedSent = false m.blockedSent = false
m.unblockOpenSync() m.unblockOpenSync()
// TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame
} }
// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream
func (m *outgoingBidiStreamsMap) unblockOpenSync() { func (m *outgoingBidiStreamsMap) unblockOpenSync() {
if len(m.openQueue) == 0 { if len(m.openQueue) == 0 {
return return
@ -183,9 +186,12 @@ func (m *outgoingBidiStreamsMap) unblockOpenSync() {
if !ok { // entry was deleted because the context was canceled if !ok { // entry was deleted because the context was canceled
continue continue
} }
close(c) // unblockOpenSync is called both from OpenStreamSync and from SetMaxStream.
m.openQueue[qp] = nil // It's sufficient to only unblock OpenStreamSync once.
m.lowestInQueue = qp + 1 select {
case c <- struct{}{}:
default:
}
return return
} }
} }

View file

@ -104,6 +104,7 @@ func (m *outgoingItemsMap) OpenStreamSync(ctx context.Context) (item, error) {
} }
str := m.openStream() str := m.openStream()
delete(m.openQueue, queuePos) delete(m.openQueue, queuePos)
m.lowestInQueue = queuePos + 1
m.unblockOpenSync() m.unblockOpenSync()
return str, nil return str, nil
} }
@ -170,8 +171,10 @@ func (m *outgoingItemsMap) SetMaxStream(num protocol.StreamNum) {
m.maxStream = num m.maxStream = num
m.blockedSent = false m.blockedSent = false
m.unblockOpenSync() m.unblockOpenSync()
// TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame
} }
// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream
func (m *outgoingItemsMap) unblockOpenSync() { func (m *outgoingItemsMap) unblockOpenSync() {
if len(m.openQueue) == 0 { if len(m.openQueue) == 0 {
return return
@ -181,9 +184,12 @@ func (m *outgoingItemsMap) unblockOpenSync() {
if !ok { // entry was deleted because the context was canceled if !ok { // entry was deleted because the context was canceled
continue continue
} }
close(c) // unblockOpenSync is called both from OpenStreamSync and from SetMaxStream.
m.openQueue[qp] = nil // It's sufficient to only unblock OpenStreamSync once.
m.lowestInQueue = qp + 1 select {
case c <- struct{}{}:
default:
}
return return
} }
} }

View file

@ -106,6 +106,7 @@ func (m *outgoingUniStreamsMap) OpenStreamSync(ctx context.Context) (sendStreamI
} }
str := m.openStream() str := m.openStream()
delete(m.openQueue, queuePos) delete(m.openQueue, queuePos)
m.lowestInQueue = queuePos + 1
m.unblockOpenSync() m.unblockOpenSync()
return str, nil return str, nil
} }
@ -172,8 +173,10 @@ func (m *outgoingUniStreamsMap) SetMaxStream(num protocol.StreamNum) {
m.maxStream = num m.maxStream = num
m.blockedSent = false m.blockedSent = false
m.unblockOpenSync() m.unblockOpenSync()
// TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame
} }
// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream
func (m *outgoingUniStreamsMap) unblockOpenSync() { func (m *outgoingUniStreamsMap) unblockOpenSync() {
if len(m.openQueue) == 0 { if len(m.openQueue) == 0 {
return return
@ -183,9 +186,12 @@ func (m *outgoingUniStreamsMap) unblockOpenSync() {
if !ok { // entry was deleted because the context was canceled if !ok { // entry was deleted because the context was canceled
continue continue
} }
close(c) // unblockOpenSync is called both from OpenStreamSync and from SetMaxStream.
m.openQueue[qp] = nil // It's sufficient to only unblock OpenStreamSync once.
m.lowestInQueue = qp + 1 select {
case c <- struct{}{}:
default:
}
return return
} }
} }