mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-05 13:17:36 +03:00
remove obsolete garbage collection from streamsMap
This commit is contained in:
parent
6d3e94bf21
commit
92aa9c2b13
2 changed files with 9 additions and 125 deletions
|
@ -17,23 +17,19 @@ type streamsMap struct {
|
|||
connectionParameters handshake.ConnectionParametersManager
|
||||
|
||||
streams map[protocol.StreamID]*stream
|
||||
// TODO: remove this
|
||||
// needed for round-robin scheduling
|
||||
openStreams []protocol.StreamID
|
||||
roundRobinIndex uint32
|
||||
|
||||
nextStream protocol.StreamID // StreamID of the next Stream that will be returned by OpenStream()
|
||||
highestStreamOpenedByPeer protocol.StreamID
|
||||
|
||||
// TODO: remove this
|
||||
streamsOpenedAfterLastGarbageCollect int
|
||||
|
||||
newStream newStreamLambda
|
||||
|
||||
maxOutgoingStreams uint32
|
||||
numOutgoingStreams uint32
|
||||
maxIncomingStreams uint32
|
||||
numIncomingStreams uint32
|
||||
|
||||
roundRobinIndex uint32
|
||||
}
|
||||
|
||||
type streamLambda func(*stream) (bool, error)
|
||||
|
@ -98,12 +94,6 @@ func (m *streamsMap) GetOrOpenStream(id protocol.StreamID) (*stream, error) {
|
|||
sid -= 2
|
||||
}
|
||||
|
||||
// maybe trigger garbage collection of streams map
|
||||
m.streamsOpenedAfterLastGarbageCollect++
|
||||
if m.streamsOpenedAfterLastGarbageCollect%protocol.MaxNewStreamIDDelta == 0 {
|
||||
m.garbageCollectClosedStreams()
|
||||
}
|
||||
|
||||
return m.streams[id], nil
|
||||
}
|
||||
|
||||
|
@ -194,7 +184,7 @@ func (m *streamsMap) RoundRobinIterate(fn streamLambda) error {
|
|||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
numStreams := uint32(len(m.openStreams))
|
||||
numStreams := uint32(len(m.streams))
|
||||
startIndex := m.roundRobinIndex
|
||||
|
||||
for _, i := range []protocol.StreamID{1, 3} {
|
||||
|
@ -209,7 +199,6 @@ func (m *streamsMap) RoundRobinIterate(fn streamLambda) error {
|
|||
|
||||
for i := uint32(0); i < numStreams; i++ {
|
||||
streamID := m.openStreams[(i+startIndex)%numStreams]
|
||||
|
||||
if streamID == 1 || streamID == 3 {
|
||||
continue
|
||||
}
|
||||
|
@ -231,9 +220,6 @@ func (m *streamsMap) iterateFunc(streamID protocol.StreamID, fn streamLambda) (b
|
|||
if !ok {
|
||||
return true, errMapAccess
|
||||
}
|
||||
if str == nil {
|
||||
return false, fmt.Errorf("BUG: Stream %d is closed, but still in openStreams map", streamID)
|
||||
}
|
||||
return fn(str)
|
||||
}
|
||||
|
||||
|
@ -245,7 +231,6 @@ func (m *streamsMap) putStream(s *stream) error {
|
|||
|
||||
m.streams[id] = s
|
||||
m.openStreams = append(m.openStreams, id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -256,7 +241,6 @@ func (m *streamsMap) RemoveStream(id protocol.StreamID) error {
|
|||
return fmt.Errorf("attempted to remove non-existing stream: %d", id)
|
||||
}
|
||||
|
||||
m.streams[id] = nil
|
||||
if id%2 == 0 {
|
||||
m.numOutgoingStreams--
|
||||
} else {
|
||||
|
@ -275,22 +259,6 @@ func (m *streamsMap) RemoveStream(id protocol.StreamID) error {
|
|||
}
|
||||
}
|
||||
|
||||
delete(m.streams, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// garbageCollectClosedStreams deletes nil values in the streams if they are smaller than protocol.MaxNewStreamIDDelta than the highest stream opened by the client
|
||||
// note that this garbage collection is relatively expensive, since it iterates over the whole streams map. It should not be called every time a stream is openend or closed
|
||||
func (m *streamsMap) garbageCollectClosedStreams() {
|
||||
for id, str := range m.streams {
|
||||
if str != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// server-side streams can be gargage collected immediately
|
||||
// client-side streams need to be kept as nils in the streams map for a bit longer, in order to prevent a client from reopening closed streams
|
||||
if id%2 == 0 || id+protocol.MaxNewStreamIDDelta <= m.highestStreamOpenedByPeer {
|
||||
delete(m.streams, id)
|
||||
}
|
||||
}
|
||||
m.streamsOpenedAfterLastGarbageCollect = 0
|
||||
}
|
||||
|
|
|
@ -71,6 +71,10 @@ var _ = Describe("Streams Map", func() {
|
|||
}
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
Expect(m.openStreams).To(HaveLen(len(m.streams)))
|
||||
})
|
||||
|
||||
Context("getting and creating streams", func() {
|
||||
Context("as a server", func() {
|
||||
BeforeEach(func() {
|
||||
|
@ -262,94 +266,6 @@ var _ = Describe("Streams Map", func() {
|
|||
setNewStreamsMap(protocol.PerspectiveServer)
|
||||
})
|
||||
|
||||
// TODO: remove when removing the openStreams slice
|
||||
Context("DoS mitigation", func() {
|
||||
It("opens and closes a lot of streams", func() {
|
||||
for i := 1; i < 2*protocol.MaxNewStreamIDDelta; i += 2 {
|
||||
streamID := protocol.StreamID(i)
|
||||
_, err := m.GetOrOpenStream(streamID)
|
||||
Expect(m.highestStreamOpenedByPeer).To(Equal(streamID))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = m.RemoveStream(streamID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
})
|
||||
|
||||
PIt("prevents opening of streams with very low StreamIDs, if higher streams have already been opened", func() {
|
||||
for i := 1; i < protocol.MaxNewStreamIDDelta+14; i += 2 {
|
||||
if i == 11 || i == 13 {
|
||||
continue
|
||||
}
|
||||
streamID := protocol.StreamID(i)
|
||||
_, err := m.GetOrOpenStream(streamID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = m.RemoveStream(streamID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
Expect(m.highestStreamOpenedByPeer).To(Equal(protocol.StreamID(protocol.MaxNewStreamIDDelta + 13)))
|
||||
_, err := m.GetOrOpenStream(11)
|
||||
Expect(err).To(MatchError("InvalidStreamID: attempted to open stream 11, which is a lot smaller than the highest opened stream, 413"))
|
||||
_, err = m.GetOrOpenStream(13)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
It("garbage-collects closed streams", func() {
|
||||
for i := 1; i < 4*protocol.MaxNewStreamIDDelta; i += 2 {
|
||||
streamID := protocol.StreamID(i)
|
||||
_, err := m.GetOrOpenStream(streamID)
|
||||
Expect(m.highestStreamOpenedByPeer).To(Equal(streamID))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = m.RemoveStream(streamID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
m.garbageCollectClosedStreams()
|
||||
for i := 1; i < 3*protocol.MaxNewStreamIDDelta; i += 2 {
|
||||
Expect(m.streams).ToNot(HaveKey(protocol.StreamID(i)))
|
||||
}
|
||||
for i := 3*protocol.MaxNewStreamIDDelta + 1; i < 4*protocol.MaxNewStreamIDDelta; i += 2 {
|
||||
Expect(m.streams).To(HaveKey(protocol.StreamID(i)))
|
||||
}
|
||||
})
|
||||
|
||||
It("does not garbage-collects open streams", func() {
|
||||
for i := 1; i < 1002; i += 2 {
|
||||
streamID := protocol.StreamID(i)
|
||||
_, err := m.GetOrOpenStream(streamID)
|
||||
Expect(m.highestStreamOpenedByPeer).To(Equal(streamID))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if streamID != 23 {
|
||||
err = m.RemoveStream(streamID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
}
|
||||
lengthBefore := len(m.streams)
|
||||
m.garbageCollectClosedStreams()
|
||||
Expect(len(m.streams)).To(BeNumerically("<", lengthBefore))
|
||||
Expect(m.streams).To(HaveKey(protocol.StreamID(23)))
|
||||
Expect(m.streams[23]).ToNot(BeNil())
|
||||
})
|
||||
|
||||
It("runs garbage-collection after a bunch of streams have been opened", func() {
|
||||
numGarbageCollections := 0
|
||||
numSavedStreams := 0
|
||||
for i := 1; i < 4*protocol.MaxNewStreamIDDelta; i += 2 {
|
||||
streamID := protocol.StreamID(i)
|
||||
_, err := m.GetOrOpenStream(streamID)
|
||||
Expect(m.highestStreamOpenedByPeer).To(Equal(streamID))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
err = m.RemoveStream(streamID)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if len(m.streams) != numSavedStreams+1 {
|
||||
numGarbageCollections++
|
||||
}
|
||||
numSavedStreams = len(m.streams)
|
||||
}
|
||||
Expect(numGarbageCollections).ToNot(BeZero())
|
||||
Expect(numGarbageCollections).To(BeNumerically("<", 4))
|
||||
Expect(len(m.streams)).To(BeNumerically("<", 2*protocol.MaxNewStreamIDDelta))
|
||||
})
|
||||
})
|
||||
|
||||
Context("deleting streams", func() {
|
||||
BeforeEach(func() {
|
||||
for i := 1; i <= 5; i++ {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue