mirror of
https://github.com/refraction-networking/uquic.git
synced 2025-04-04 12:47:36 +03:00
remove unneeded function from the crypto stream interface
This commit is contained in:
parent
3d47284e1c
commit
cd5e7ae177
10 changed files with 134 additions and 39 deletions
41
crypto_stream.go
Normal file
41
crypto_stream.go
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
package quic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/lucas-clemente/quic-go/internal/flowcontrol"
|
||||||
|
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||||
|
"github.com/lucas-clemente/quic-go/internal/wire"
|
||||||
|
)
|
||||||
|
|
||||||
|
type cryptoStreamI interface {
|
||||||
|
io.Reader
|
||||||
|
io.Writer
|
||||||
|
AddStreamFrame(*wire.StreamFrame) error
|
||||||
|
HasDataForWriting() bool
|
||||||
|
GetDataForWriting(maxBytes protocol.ByteCount) (data []byte, shouldSendFin bool)
|
||||||
|
GetWriteOffset() protocol.ByteCount
|
||||||
|
Cancel(error)
|
||||||
|
SetReadOffset(protocol.ByteCount)
|
||||||
|
// methods needed for flow control
|
||||||
|
GetWindowUpdate() protocol.ByteCount
|
||||||
|
UpdateSendWindow(protocol.ByteCount)
|
||||||
|
IsFlowControlBlocked() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type cryptoStream struct {
|
||||||
|
*stream
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCryptoStream(onData func(), flowController flowcontrol.StreamFlowController, version protocol.VersionNumber) cryptoStreamI {
|
||||||
|
str := newStream(version.CryptoStreamID(), onData, nil, flowController, version)
|
||||||
|
return &cryptoStream{str}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetReadOffset sets the read offset.
|
||||||
|
// It is only needed for the crypto stream.
|
||||||
|
// It must not be called concurrently with any other stream methods, especially Read and Write.
|
||||||
|
func (s *cryptoStream) SetReadOffset(offset protocol.ByteCount) {
|
||||||
|
s.readOffset = offset
|
||||||
|
s.frameQueue.readPosition = offset
|
||||||
|
}
|
20
crypto_stream_test.go
Normal file
20
crypto_stream_test.go
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package quic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/lucas-clemente/quic-go/internal/protocol"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Stream", func() {
|
||||||
|
var str *cryptoStream
|
||||||
|
|
||||||
|
str = newCryptoStream(nil, nil, protocol.VersionWhatever).(*cryptoStream)
|
||||||
|
|
||||||
|
It("sets the read offset", func() {
|
||||||
|
str.SetReadOffset(0x42)
|
||||||
|
Expect(str.readOffset).To(Equal(protocol.ByteCount(0x42)))
|
||||||
|
Expect(str.frameQueue.readPosition).To(Equal(protocol.ByteCount(0x42)))
|
||||||
|
})
|
||||||
|
})
|
|
@ -56,12 +56,12 @@ var _ = Describe("Packet packer", func() {
|
||||||
publicHeaderLen protocol.ByteCount
|
publicHeaderLen protocol.ByteCount
|
||||||
maxFrameSize protocol.ByteCount
|
maxFrameSize protocol.ByteCount
|
||||||
streamFramer *streamFramer
|
streamFramer *streamFramer
|
||||||
cryptoStream *stream
|
cryptoStream cryptoStreamI
|
||||||
)
|
)
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
version := versionGQUICFrames
|
version := versionGQUICFrames
|
||||||
cryptoStream = &stream{streamID: version.CryptoStreamID(), flowController: flowcontrol.NewStreamFlowController(version.CryptoStreamID(), false, flowcontrol.NewConnectionFlowController(1000, 1000, nil), 1000, 1000, 1000, nil)}
|
cryptoStream = newCryptoStream(func() {}, flowcontrol.NewStreamFlowController(version.CryptoStreamID(), false, flowcontrol.NewConnectionFlowController(1000, 1000, nil), 1000, 1000, 1000, nil), version)
|
||||||
streamsMap := newStreamsMap(nil, protocol.PerspectiveServer, versionGQUICFrames)
|
streamsMap := newStreamsMap(nil, protocol.PerspectiveServer, versionGQUICFrames)
|
||||||
streamFramer = newStreamFramer(cryptoStream, streamsMap, nil, versionGQUICFrames)
|
streamFramer = newStreamFramer(cryptoStream, streamsMap, nil, versionGQUICFrames)
|
||||||
|
|
||||||
|
@ -585,29 +585,55 @@ var _ = Describe("Packet packer", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
It("sends unencrypted stream data on the crypto stream", func() {
|
It("sends unencrypted stream data on the crypto stream", func() {
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
_, err := cryptoStream.Write([]byte("foobar"))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
packer.cryptoSetup.(*mockCryptoSetup).encLevelSealCrypto = protocol.EncryptionUnencrypted
|
packer.cryptoSetup.(*mockCryptoSetup).encLevelSealCrypto = protocol.EncryptionUnencrypted
|
||||||
cryptoStream.dataForWriting = []byte("foobar")
|
var p *packedPacket
|
||||||
p, err := packer.PackPacket()
|
Eventually(func() *packedPacket {
|
||||||
Expect(err).ToNot(HaveOccurred())
|
defer GinkgoRecover()
|
||||||
|
var err error
|
||||||
|
p, err = packer.PackPacket()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
return p
|
||||||
|
}).ShouldNot(BeNil())
|
||||||
Expect(p.encryptionLevel).To(Equal(protocol.EncryptionUnencrypted))
|
Expect(p.encryptionLevel).To(Equal(protocol.EncryptionUnencrypted))
|
||||||
Expect(p.frames).To(HaveLen(1))
|
Expect(p.frames).To(HaveLen(1))
|
||||||
Expect(p.frames[0]).To(Equal(&wire.StreamFrame{
|
Expect(p.frames[0]).To(Equal(&wire.StreamFrame{
|
||||||
StreamID: packer.version.CryptoStreamID(),
|
StreamID: packer.version.CryptoStreamID(),
|
||||||
Data: []byte("foobar"),
|
Data: []byte("foobar"),
|
||||||
}))
|
}))
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("sends encrypted stream data on the crypto stream", func() {
|
It("sends encrypted stream data on the crypto stream", func() {
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
_, err := cryptoStream.Write([]byte("foobar"))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
packer.cryptoSetup.(*mockCryptoSetup).encLevelSealCrypto = protocol.EncryptionSecure
|
packer.cryptoSetup.(*mockCryptoSetup).encLevelSealCrypto = protocol.EncryptionSecure
|
||||||
cryptoStream.dataForWriting = []byte("foobar")
|
var p *packedPacket
|
||||||
p, err := packer.PackPacket()
|
Eventually(func() *packedPacket {
|
||||||
Expect(err).ToNot(HaveOccurred())
|
defer GinkgoRecover()
|
||||||
|
var err error
|
||||||
|
p, err = packer.PackPacket()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
return p
|
||||||
|
}).ShouldNot(BeNil())
|
||||||
Expect(p.encryptionLevel).To(Equal(protocol.EncryptionSecure))
|
Expect(p.encryptionLevel).To(Equal(protocol.EncryptionSecure))
|
||||||
Expect(p.frames).To(HaveLen(1))
|
Expect(p.frames).To(HaveLen(1))
|
||||||
Expect(p.frames[0]).To(Equal(&wire.StreamFrame{
|
Expect(p.frames[0]).To(Equal(&wire.StreamFrame{
|
||||||
StreamID: packer.version.CryptoStreamID(),
|
StreamID: packer.version.CryptoStreamID(),
|
||||||
Data: []byte("foobar"),
|
Data: []byte("foobar"),
|
||||||
}))
|
}))
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("does not pack stream frames if not allowed", func() {
|
It("does not pack stream frames if not allowed", func() {
|
||||||
|
@ -766,14 +792,27 @@ var _ = Describe("Packet packer", func() {
|
||||||
packer.hasSentPacket = false
|
packer.hasSentPacket = false
|
||||||
packer.perspective = protocol.PerspectiveClient
|
packer.perspective = protocol.PerspectiveClient
|
||||||
packer.cryptoSetup.(*mockCryptoSetup).encLevelSealCrypto = protocol.EncryptionUnencrypted
|
packer.cryptoSetup.(*mockCryptoSetup).encLevelSealCrypto = protocol.EncryptionUnencrypted
|
||||||
cryptoStream.dataForWriting = []byte("foobar")
|
done := make(chan struct{})
|
||||||
packet, err := packer.PackPacket()
|
go func() {
|
||||||
Expect(err).ToNot(HaveOccurred())
|
defer GinkgoRecover()
|
||||||
|
_, err := cryptoStream.Write([]byte("foobar"))
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
var packet *packedPacket
|
||||||
|
Eventually(func() *packedPacket {
|
||||||
|
defer GinkgoRecover()
|
||||||
|
var err error
|
||||||
|
packet, err = packer.PackPacket()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
return packet
|
||||||
|
}).ShouldNot(BeNil())
|
||||||
Expect(packet.raw).To(HaveLen(protocol.MinInitialPacketSize))
|
Expect(packet.raw).To(HaveLen(protocol.MinInitialPacketSize))
|
||||||
Expect(packet.frames).To(HaveLen(1))
|
Expect(packet.frames).To(HaveLen(1))
|
||||||
sf := packet.frames[0].(*wire.StreamFrame)
|
sf := packet.frames[0].(*wire.StreamFrame)
|
||||||
Expect(sf.Data).To(Equal([]byte("foobar")))
|
Expect(sf.Data).To(Equal([]byte("foobar")))
|
||||||
Expect(sf.DataLenPresent).To(BeTrue())
|
Expect(sf.DataLenPresent).To(BeTrue())
|
||||||
|
Eventually(done).Should(BeClosed())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("refuses to retransmit packets that were sent with forward-secure encryption", func() {
|
It("refuses to retransmit packets that were sent with forward-secure encryption", func() {
|
||||||
|
|
|
@ -19,7 +19,7 @@ import (
|
||||||
// packetHandler handles packets
|
// packetHandler handles packets
|
||||||
type packetHandler interface {
|
type packetHandler interface {
|
||||||
Session
|
Session
|
||||||
getCryptoStream() cryptoStream
|
getCryptoStream() cryptoStreamI
|
||||||
handshakeStatus() <-chan handshakeEvent
|
handshakeStatus() <-chan handshakeEvent
|
||||||
handlePacket(*receivedPacket)
|
handlePacket(*receivedPacket)
|
||||||
GetVersion() protocol.VersionNumber
|
GetVersion() protocol.VersionNumber
|
||||||
|
|
|
@ -68,7 +68,7 @@ func (s *mockSession) RemoteAddr() net.Addr { panic("not imple
|
||||||
func (*mockSession) Context() context.Context { panic("not implemented") }
|
func (*mockSession) Context() context.Context { panic("not implemented") }
|
||||||
func (*mockSession) GetVersion() protocol.VersionNumber { return protocol.VersionWhatever }
|
func (*mockSession) GetVersion() protocol.VersionNumber { return protocol.VersionWhatever }
|
||||||
func (s *mockSession) handshakeStatus() <-chan handshakeEvent { return s.handshakeChan }
|
func (s *mockSession) handshakeStatus() <-chan handshakeEvent { return s.handshakeChan }
|
||||||
func (*mockSession) getCryptoStream() cryptoStream { panic("not implemented") }
|
func (*mockSession) getCryptoStream() cryptoStreamI { panic("not implemented") }
|
||||||
|
|
||||||
var _ Session = &mockSession{}
|
var _ Session = &mockSession{}
|
||||||
var _ NonFWSession = &mockSession{}
|
var _ NonFWSession = &mockSession{}
|
||||||
|
|
20
session.go
20
session.go
|
@ -56,7 +56,7 @@ type session struct {
|
||||||
conn connection
|
conn connection
|
||||||
|
|
||||||
streamsMap *streamsMap
|
streamsMap *streamsMap
|
||||||
cryptoStream cryptoStream
|
cryptoStream cryptoStreamI
|
||||||
|
|
||||||
rttStats *congestion.RTTStats
|
rttStats *congestion.RTTStats
|
||||||
|
|
||||||
|
@ -294,7 +294,7 @@ func (s *session) preSetup() {
|
||||||
protocol.ByteCount(s.config.MaxReceiveConnectionFlowControlWindow),
|
protocol.ByteCount(s.config.MaxReceiveConnectionFlowControlWindow),
|
||||||
s.rttStats,
|
s.rttStats,
|
||||||
)
|
)
|
||||||
s.cryptoStream = s.newStream(s.version.CryptoStreamID()).(cryptoStream)
|
s.cryptoStream = s.newCryptoStream()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) postSetup(initialPacketNumber protocol.PacketNumber) error {
|
func (s *session) postSetup(initialPacketNumber protocol.PacketNumber) error {
|
||||||
|
@ -880,6 +880,20 @@ func (s *session) newStream(id protocol.StreamID) streamI {
|
||||||
return newStream(id, s.scheduleSending, s.queueResetStreamFrame, flowController, s.version)
|
return newStream(id, s.scheduleSending, s.queueResetStreamFrame, flowController, s.version)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *session) newCryptoStream() cryptoStreamI {
|
||||||
|
id := s.version.CryptoStreamID()
|
||||||
|
flowController := flowcontrol.NewStreamFlowController(
|
||||||
|
id,
|
||||||
|
s.version.StreamContributesToConnectionFlowControl(id),
|
||||||
|
s.connFlowController,
|
||||||
|
protocol.ReceiveStreamFlowControlWindow,
|
||||||
|
protocol.ByteCount(s.config.MaxReceiveStreamFlowControlWindow),
|
||||||
|
0,
|
||||||
|
s.rttStats,
|
||||||
|
)
|
||||||
|
return newCryptoStream(s.scheduleSending, flowController, s.version)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *session) sendPublicReset(rejectedPacketNumber protocol.PacketNumber) error {
|
func (s *session) sendPublicReset(rejectedPacketNumber protocol.PacketNumber) error {
|
||||||
utils.Infof("Sending public reset for connection %x, packet number %d", s.connectionID, rejectedPacketNumber)
|
utils.Infof("Sending public reset for connection %x, packet number %d", s.connectionID, rejectedPacketNumber)
|
||||||
return s.conn.Write(wire.WritePublicReset(s.connectionID, rejectedPacketNumber, 0))
|
return s.conn.Write(wire.WritePublicReset(s.connectionID, rejectedPacketNumber, 0))
|
||||||
|
@ -949,7 +963,7 @@ func (s *session) handshakeStatus() <-chan handshakeEvent {
|
||||||
return s.handshakeChan
|
return s.handshakeChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) getCryptoStream() cryptoStream {
|
func (s *session) getCryptoStream() cryptoStreamI {
|
||||||
return s.cryptoStream
|
return s.cryptoStream
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -414,7 +414,7 @@ var _ = Describe("Session", func() {
|
||||||
fc := mocks.NewMockStreamFlowController(mockCtrl)
|
fc := mocks.NewMockStreamFlowController(mockCtrl)
|
||||||
offset := protocol.ByteCount(0x4321)
|
offset := protocol.ByteCount(0x4321)
|
||||||
fc.EXPECT().UpdateSendWindow(offset)
|
fc.EXPECT().UpdateSendWindow(offset)
|
||||||
sess.cryptoStream.(*stream).flowController = fc
|
sess.cryptoStream.(*cryptoStream).flowController = fc
|
||||||
err := sess.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{
|
err := sess.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{
|
||||||
StreamID: sess.version.CryptoStreamID(),
|
StreamID: sess.version.CryptoStreamID(),
|
||||||
ByteOffset: offset,
|
ByteOffset: offset,
|
||||||
|
|
13
stream.go
13
stream.go
|
@ -30,11 +30,6 @@ type streamI interface {
|
||||||
IsFlowControlBlocked() bool
|
IsFlowControlBlocked() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type cryptoStream interface {
|
|
||||||
streamI
|
|
||||||
SetReadOffset(protocol.ByteCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
|
// A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
|
||||||
//
|
//
|
||||||
// Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
|
// Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
|
||||||
|
@ -481,11 +476,3 @@ func (s *stream) IsFlowControlBlocked() bool {
|
||||||
func (s *stream) GetWindowUpdate() protocol.ByteCount {
|
func (s *stream) GetWindowUpdate() protocol.ByteCount {
|
||||||
return s.flowController.GetWindowUpdate()
|
return s.flowController.GetWindowUpdate()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetReadOffset sets the read offset.
|
|
||||||
// It is only needed for the crypto stream.
|
|
||||||
// It must not be called concurrently with any other stream methods, especially Read and Write.
|
|
||||||
func (s *stream) SetReadOffset(offset protocol.ByteCount) {
|
|
||||||
s.readOffset = offset
|
|
||||||
s.frameQueue.readPosition = offset
|
|
||||||
}
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
|
|
||||||
type streamFramer struct {
|
type streamFramer struct {
|
||||||
streamsMap *streamsMap
|
streamsMap *streamsMap
|
||||||
cryptoStream streamI
|
cryptoStream cryptoStreamI
|
||||||
version protocol.VersionNumber
|
version protocol.VersionNumber
|
||||||
|
|
||||||
connFlowController flowcontrol.ConnectionFlowController
|
connFlowController flowcontrol.ConnectionFlowController
|
||||||
|
@ -18,7 +18,7 @@ type streamFramer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStreamFramer(
|
func newStreamFramer(
|
||||||
cryptoStream streamI,
|
cryptoStream cryptoStreamI,
|
||||||
streamsMap *streamsMap,
|
streamsMap *streamsMap,
|
||||||
cfc flowcontrol.ConnectionFlowController,
|
cfc flowcontrol.ConnectionFlowController,
|
||||||
v protocol.VersionNumber,
|
v protocol.VersionNumber,
|
||||||
|
@ -63,7 +63,7 @@ func (f *streamFramer) PopCryptoStreamFrame(maxLen protocol.ByteCount) *wire.Str
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
frame := &wire.StreamFrame{
|
frame := &wire.StreamFrame{
|
||||||
StreamID: f.cryptoStream.StreamID(),
|
StreamID: f.version.CryptoStreamID(),
|
||||||
Offset: f.cryptoStream.GetWriteOffset(),
|
Offset: f.cryptoStream.GetWriteOffset(),
|
||||||
}
|
}
|
||||||
frame.Data, frame.FinBit = f.cryptoStream.GetDataForWriting(maxLen - frame.MinLength(f.version))
|
frame.Data, frame.FinBit = f.cryptoStream.GetDataForWriting(maxLen - frame.MinLength(f.version))
|
||||||
|
|
|
@ -267,12 +267,6 @@ var _ = Describe("Stream", func() {
|
||||||
Expect(onDataCalled).To(BeTrue())
|
Expect(onDataCalled).To(BeTrue())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("sets the read offset", func() {
|
|
||||||
str.SetReadOffset(0x42)
|
|
||||||
Expect(str.readOffset).To(Equal(protocol.ByteCount(0x42)))
|
|
||||||
Expect(str.frameQueue.readPosition).To(Equal(protocol.ByteCount(0x42)))
|
|
||||||
})
|
|
||||||
|
|
||||||
Context("deadlines", func() {
|
Context("deadlines", func() {
|
||||||
It("the deadline error has the right net.Error properties", func() {
|
It("the deadline error has the right net.Error properties", func() {
|
||||||
Expect(errDeadline.Temporary()).To(BeTrue())
|
Expect(errDeadline.Temporary()).To(BeTrue())
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue