From f17ddffb70fcba14efb977463048b07ab94ceabd Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 23 Jan 2019 17:24:01 +0700 Subject: [PATCH] don't delete a stream until it has been accepted --- streams_map_incoming_bidi.go | 36 ++++++++++++++++++++++-- streams_map_incoming_generic.go | 36 ++++++++++++++++++++++-- streams_map_incoming_generic_test.go | 42 ++++++++++++++++++++++++++-- streams_map_incoming_uni.go | 36 ++++++++++++++++++++++-- streams_map_test.go | 26 +++++++++++++++++ 5 files changed, 167 insertions(+), 9 deletions(-) diff --git a/streams_map_incoming_bidi.go b/streams_map_incoming_bidi.go index 8ee8ccdd..787c3d90 100644 --- a/streams_map_incoming_bidi.go +++ b/streams_map_incoming_bidi.go @@ -17,6 +17,9 @@ type incomingBidiStreamsMap struct { cond sync.Cond streams map[protocol.StreamID]streamI + // When a stream is deleted before it was accepted, we can't delete it immediately. + // We need to wait until the application accepts it, and delete it immediately then. + streamsToDelete map[protocol.StreamID]struct{} // used as a set nextStreamToAccept protocol.StreamID // the next stream that will be returned by AcceptStream() nextStreamToOpen protocol.StreamID // the highest stream that the peer openend @@ -38,6 +41,7 @@ func newIncomingBidiStreamsMap( ) *incomingBidiStreamsMap { m := &incomingBidiStreamsMap{ streams: make(map[protocol.StreamID]streamI), + streamsToDelete: make(map[protocol.StreamID]struct{}), nextStreamToAccept: nextStreamToAccept, nextStreamToOpen: nextStreamToAccept, maxStream: initialMaxStreamID, @@ -53,19 +57,28 @@ func (m *incomingBidiStreamsMap) AcceptStream() (streamI, error) { m.mutex.Lock() defer m.mutex.Unlock() + var id protocol.StreamID var str streamI for { + id = m.nextStreamToAccept var ok bool if m.closeErr != nil { return nil, m.closeErr } - str, ok = m.streams[m.nextStreamToAccept] + str, ok = m.streams[id] if ok { break } m.cond.Wait() } m.nextStreamToAccept += 4 + // If this stream was completed before being accepted, we can delete it now. + if _, ok := m.streamsToDelete[id]; ok { + delete(m.streamsToDelete, id) + if err := m.deleteStream(id); err != nil { + return nil, err + } + } return str, nil } @@ -79,7 +92,11 @@ func (m *incomingBidiStreamsMap) GetOrOpenStream(id protocol.StreamID) (streamI, // * this stream exists in the map, and we can return it, or // * this stream was already closed, then we can return the nil if id < m.nextStreamToOpen { - s := m.streams[id] + var s streamI + // If the stream was already queued for deletion, and is just waiting to be accepted, don't return it. + if _, ok := m.streamsToDelete[id]; !ok { + s = m.streams[id] + } m.mutex.RUnlock() return s, nil } @@ -103,9 +120,24 @@ func (m *incomingBidiStreamsMap) DeleteStream(id protocol.StreamID) error { m.mutex.Lock() defer m.mutex.Unlock() + return m.deleteStream(id) +} + +func (m *incomingBidiStreamsMap) deleteStream(id protocol.StreamID) error { if _, ok := m.streams[id]; !ok { return fmt.Errorf("Tried to delete unknown stream %d", id) } + + // Don't delete this stream yet, if it was not yet accepted. + // Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted. + if id >= m.nextStreamToAccept { + if _, ok := m.streamsToDelete[id]; ok { + return fmt.Errorf("Tried to delete stream %d multiple times", id) + } + m.streamsToDelete[id] = struct{}{} + return nil + } + delete(m.streams, id) // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream if m.maxNumStreams > uint64(len(m.streams)) { diff --git a/streams_map_incoming_generic.go b/streams_map_incoming_generic.go index 0dc6d9fc..503f0290 100644 --- a/streams_map_incoming_generic.go +++ b/streams_map_incoming_generic.go @@ -15,6 +15,9 @@ type incomingItemsMap struct { cond sync.Cond streams map[protocol.StreamID]item + // When a stream is deleted before it was accepted, we can't delete it immediately. + // We need to wait until the application accepts it, and delete it immediately then. + streamsToDelete map[protocol.StreamID]struct{} // used as a set nextStreamToAccept protocol.StreamID // the next stream that will be returned by AcceptStream() nextStreamToOpen protocol.StreamID // the highest stream that the peer openend @@ -36,6 +39,7 @@ func newIncomingItemsMap( ) *incomingItemsMap { m := &incomingItemsMap{ streams: make(map[protocol.StreamID]item), + streamsToDelete: make(map[protocol.StreamID]struct{}), nextStreamToAccept: nextStreamToAccept, nextStreamToOpen: nextStreamToAccept, maxStream: initialMaxStreamID, @@ -51,19 +55,28 @@ func (m *incomingItemsMap) AcceptStream() (item, error) { m.mutex.Lock() defer m.mutex.Unlock() + var id protocol.StreamID var str item for { + id = m.nextStreamToAccept var ok bool if m.closeErr != nil { return nil, m.closeErr } - str, ok = m.streams[m.nextStreamToAccept] + str, ok = m.streams[id] if ok { break } m.cond.Wait() } m.nextStreamToAccept += 4 + // If this stream was completed before being accepted, we can delete it now. + if _, ok := m.streamsToDelete[id]; ok { + delete(m.streamsToDelete, id) + if err := m.deleteStream(id); err != nil { + return nil, err + } + } return str, nil } @@ -77,7 +90,11 @@ func (m *incomingItemsMap) GetOrOpenStream(id protocol.StreamID) (item, error) { // * this stream exists in the map, and we can return it, or // * this stream was already closed, then we can return the nil if id < m.nextStreamToOpen { - s := m.streams[id] + var s item + // If the stream was already queued for deletion, and is just waiting to be accepted, don't return it. + if _, ok := m.streamsToDelete[id]; !ok { + s = m.streams[id] + } m.mutex.RUnlock() return s, nil } @@ -101,9 +118,24 @@ func (m *incomingItemsMap) DeleteStream(id protocol.StreamID) error { m.mutex.Lock() defer m.mutex.Unlock() + return m.deleteStream(id) +} + +func (m *incomingItemsMap) deleteStream(id protocol.StreamID) error { if _, ok := m.streams[id]; !ok { return fmt.Errorf("Tried to delete unknown stream %d", id) } + + // Don't delete this stream yet, if it was not yet accepted. + // Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted. + if id >= m.nextStreamToAccept { + if _, ok := m.streamsToDelete[id]; ok { + return fmt.Errorf("Tried to delete stream %d multiple times", id) + } + m.streamsToDelete[id] = struct{}{} + return nil + } + delete(m.streams, id) // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream if m.maxNumStreams > uint64(len(m.streams)) { diff --git a/streams_map_incoming_generic_test.go b/streams_map_incoming_generic_test.go index b5a33cfc..96dd87ea 100644 --- a/streams_map_incoming_generic_test.go +++ b/streams_map_incoming_generic_test.go @@ -157,15 +157,46 @@ var _ = Describe("Streams Map (incoming)", func() { It("deletes streams", func() { mockSender.EXPECT().queueControlFrame(gomock.Any()) - _, err := m.GetOrOpenStream(initialMaxStream) + _, err := m.GetOrOpenStream(firstNewStream) Expect(err).ToNot(HaveOccurred()) - err = m.DeleteStream(initialMaxStream) + str, err := m.AcceptStream() Expect(err).ToNot(HaveOccurred()) - str, err := m.GetOrOpenStream(initialMaxStream) + Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream)) + Expect(m.DeleteStream(firstNewStream)).To(Succeed()) + str, err = m.GetOrOpenStream(firstNewStream) Expect(err).ToNot(HaveOccurred()) Expect(str).To(BeNil()) }) + It("waits until a stream is accepted before actually deleting it", func() { + _, err := m.GetOrOpenStream(firstNewStream + 4) + Expect(err).ToNot(HaveOccurred()) + Expect(m.DeleteStream(firstNewStream + 4)).To(Succeed()) + str, err := m.AcceptStream() + Expect(err).ToNot(HaveOccurred()) + Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream)) + // when accepting this stream, it will get deleted, and a MAX_STREAMS frame is queued + mockSender.EXPECT().queueControlFrame(gomock.Any()) + str, err = m.AcceptStream() + Expect(err).ToNot(HaveOccurred()) + Expect(str.(*mockGenericStream).id).To(Equal(firstNewStream + 4)) + }) + + It("doesn't return a stream queued for deleting from GetOrOpenStream", func() { + str, err := m.GetOrOpenStream(firstNewStream) + Expect(err).ToNot(HaveOccurred()) + Expect(str).ToNot(BeNil()) + Expect(m.DeleteStream(firstNewStream)).To(Succeed()) + str, err = m.GetOrOpenStream(firstNewStream) + Expect(err).ToNot(HaveOccurred()) + Expect(str).To(BeNil()) + // when accepting this stream, it will get deleted, and a MAX_STREAMS frame is queued + mockSender.EXPECT().queueControlFrame(gomock.Any()) + str, err = m.AcceptStream() + Expect(err).ToNot(HaveOccurred()) + Expect(str).ToNot(BeNil()) + }) + It("errors when deleting a non-existing stream", func() { err := m.DeleteStream(1337) Expect(err).To(MatchError("Tried to delete unknown stream 1337")) @@ -175,6 +206,11 @@ var _ = Describe("Streams Map (incoming)", func() { // open a bunch of streams _, err := m.GetOrOpenStream(firstNewStream + 4*4) Expect(err).ToNot(HaveOccurred()) + // accept all streams + for i := 0; i < 5; i++ { + _, err := m.AcceptStream() + Expect(err).ToNot(HaveOccurred()) + } mockSender.EXPECT().queueControlFrame(gomock.Any()).Do(func(f wire.Frame) { Expect(f.(*wire.MaxStreamsFrame).MaxStreams).To(Equal(maxNumStreams + 1)) }) diff --git a/streams_map_incoming_uni.go b/streams_map_incoming_uni.go index a1ceadd2..f36fcee5 100644 --- a/streams_map_incoming_uni.go +++ b/streams_map_incoming_uni.go @@ -17,6 +17,9 @@ type incomingUniStreamsMap struct { cond sync.Cond streams map[protocol.StreamID]receiveStreamI + // When a stream is deleted before it was accepted, we can't delete it immediately. + // We need to wait until the application accepts it, and delete it immediately then. + streamsToDelete map[protocol.StreamID]struct{} // used as a set nextStreamToAccept protocol.StreamID // the next stream that will be returned by AcceptStream() nextStreamToOpen protocol.StreamID // the highest stream that the peer openend @@ -38,6 +41,7 @@ func newIncomingUniStreamsMap( ) *incomingUniStreamsMap { m := &incomingUniStreamsMap{ streams: make(map[protocol.StreamID]receiveStreamI), + streamsToDelete: make(map[protocol.StreamID]struct{}), nextStreamToAccept: nextStreamToAccept, nextStreamToOpen: nextStreamToAccept, maxStream: initialMaxStreamID, @@ -53,19 +57,28 @@ func (m *incomingUniStreamsMap) AcceptStream() (receiveStreamI, error) { m.mutex.Lock() defer m.mutex.Unlock() + var id protocol.StreamID var str receiveStreamI for { + id = m.nextStreamToAccept var ok bool if m.closeErr != nil { return nil, m.closeErr } - str, ok = m.streams[m.nextStreamToAccept] + str, ok = m.streams[id] if ok { break } m.cond.Wait() } m.nextStreamToAccept += 4 + // If this stream was completed before being accepted, we can delete it now. + if _, ok := m.streamsToDelete[id]; ok { + delete(m.streamsToDelete, id) + if err := m.deleteStream(id); err != nil { + return nil, err + } + } return str, nil } @@ -79,7 +92,11 @@ func (m *incomingUniStreamsMap) GetOrOpenStream(id protocol.StreamID) (receiveSt // * this stream exists in the map, and we can return it, or // * this stream was already closed, then we can return the nil if id < m.nextStreamToOpen { - s := m.streams[id] + var s receiveStreamI + // If the stream was already queued for deletion, and is just waiting to be accepted, don't return it. + if _, ok := m.streamsToDelete[id]; !ok { + s = m.streams[id] + } m.mutex.RUnlock() return s, nil } @@ -103,9 +120,24 @@ func (m *incomingUniStreamsMap) DeleteStream(id protocol.StreamID) error { m.mutex.Lock() defer m.mutex.Unlock() + return m.deleteStream(id) +} + +func (m *incomingUniStreamsMap) deleteStream(id protocol.StreamID) error { if _, ok := m.streams[id]; !ok { return fmt.Errorf("Tried to delete unknown stream %d", id) } + + // Don't delete this stream yet, if it was not yet accepted. + // Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted. + if id >= m.nextStreamToAccept { + if _, ok := m.streamsToDelete[id]; ok { + return fmt.Errorf("Tried to delete stream %d multiple times", id) + } + m.streamsToDelete[id] = struct{}{} + return nil + } + delete(m.streams, id) // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream if m.maxNumStreams > uint64(len(m.streams)) { diff --git a/streams_map_test.go b/streams_map_test.go index bec5de22..8a55cbce 100644 --- a/streams_map_test.go +++ b/streams_map_test.go @@ -158,6 +158,17 @@ var _ = Describe("Streams Map", func() { Expect(dstr).To(BeNil()) }) + It("accepts bidirectional streams after they have been deleted", func() { + id := ids.firstIncomingBidiStream + _, err := m.GetOrOpenReceiveStream(id) + Expect(err).ToNot(HaveOccurred()) + Expect(m.DeleteStream(id)).To(Succeed()) + str, err := m.AcceptStream() + Expect(err).ToNot(HaveOccurred()) + Expect(str).ToNot(BeNil()) + Expect(str.StreamID()).To(Equal(id)) + }) + It("deletes outgoing unidirectional streams", func() { id := ids.firstOutgoingUniStream str, err := m.OpenUniStream() @@ -179,6 +190,17 @@ var _ = Describe("Streams Map", func() { Expect(err).ToNot(HaveOccurred()) Expect(dstr).To(BeNil()) }) + + It("accepts unirectional streams after they have been deleted", func() { + id := ids.firstIncomingUniStream + _, err := m.GetOrOpenReceiveStream(id) + Expect(err).ToNot(HaveOccurred()) + Expect(m.DeleteStream(id)).To(Succeed()) + str, err := m.AcceptUniStream() + Expect(err).ToNot(HaveOccurred()) + Expect(str).ToNot(BeNil()) + Expect(str.StreamID()).To(Equal(id)) + }) }) Context("getting streams", func() { @@ -340,6 +362,8 @@ var _ = Describe("Streams Map", func() { It("sends a MAX_STREAMS frame for bidirectional streams", func() { _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream) Expect(err).ToNot(HaveOccurred()) + _, err = m.AcceptStream() + Expect(err).ToNot(HaveOccurred()) mockSender.EXPECT().queueControlFrame(&wire.MaxStreamsFrame{ Type: protocol.StreamTypeBidi, MaxStreams: maxBidiStreams + 1, @@ -350,6 +374,8 @@ var _ = Describe("Streams Map", func() { It("sends a MAX_STREAMS frame for unidirectional streams", func() { _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream) Expect(err).ToNot(HaveOccurred()) + _, err = m.AcceptUniStream() + Expect(err).ToNot(HaveOccurred()) mockSender.EXPECT().queueControlFrame(&wire.MaxStreamsFrame{ Type: protocol.StreamTypeUni, MaxStreams: maxUniStreams + 1,