feat: BBR

This commit is contained in:
Toby 2023-08-04 17:29:15 -07:00
parent 02fca02ddc
commit 45c3fc54bd
16 changed files with 1612 additions and 32 deletions

View file

@ -185,7 +185,8 @@ func (c *clientConfig) fillQUICConfig(hyConfig *client.Config) error {
func (c *clientConfig) fillBandwidthConfig(hyConfig *client.Config) error {
if c.Bandwidth.Up == "" || c.Bandwidth.Down == "" {
return configError{Field: "bandwidth", Err: errors.New("both up and down bandwidth must be set")}
// New core now allows users to omit bandwidth values and use built-in congestion control
return nil
}
var err error
hyConfig.BandwidthConfig.MaxTx, err = convBandwidth(c.Bandwidth.Up)

View file

@ -42,6 +42,7 @@ require (
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/txthinking/runnergroup v0.0.0-20210608031112-152c7c4432bf // indirect
github.com/zhangyunhao116/fastrand v0.3.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.11.0 // indirect

View file

@ -224,6 +224,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zhangyunhao116/fastrand v0.3.0 h1:7bwe124xcckPulX6fxtr2lFdO2KQqaefdtbk+mqO/Ig=
github.com/zhangyunhao116/fastrand v0.3.0/go.mod h1:0v5KgHho0VE6HU192HnY15de/oDS8UrbBChIFjIhBtc=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=

View file

@ -9,7 +9,9 @@ import (
"time"
coreErrs "github.com/apernet/hysteria/core/errors"
"github.com/apernet/hysteria/core/internal/congestion"
"github.com/apernet/hysteria/core/internal/congestion/bbr"
"github.com/apernet/hysteria/core/internal/congestion/brutal"
"github.com/apernet/hysteria/core/internal/congestion/common"
"github.com/apernet/hysteria/core/internal/protocol"
"github.com/apernet/hysteria/core/internal/utils"
@ -123,9 +125,16 @@ func (c *clientImpl) connect() error {
if actualTx == 0 || actualTx > c.config.BandwidthConfig.MaxTx {
actualTx = c.config.BandwidthConfig.MaxTx
}
// Set congestion control when applicable
// Use Brutal CC if actualTx > 0, otherwise use BBR
if actualTx > 0 {
conn.SetCongestionControl(congestion.NewBrutalSender(actualTx))
conn.SetCongestionControl(brutal.NewBrutalSender(actualTx))
} else {
conn.SetCongestionControl(bbr.NewBBRSender(
bbr.DefaultClock{},
bbr.GetInitialPacketSize(conn.RemoteAddr()),
32*common.InitMaxDatagramSize,
bbr.DefaultBBRMaxCongestionWindow*common.InitMaxDatagramSize,
))
}
_ = resp.Body.Close()

View file

@ -5,6 +5,7 @@ go 1.20
require (
github.com/apernet/quic-go v0.37.3-0.20230804221709-80e23a7cabb6
github.com/stretchr/testify v1.8.4
github.com/zhangyunhao116/fastrand v0.3.0
go.uber.org/goleak v1.2.1
golang.org/x/time v0.3.0
)

View file

@ -44,6 +44,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zhangyunhao116/fastrand v0.3.0 h1:7bwe124xcckPulX6fxtr2lFdO2KQqaefdtbk+mqO/Ig=
github.com/zhangyunhao116/fastrand v0.3.0/go.mod h1:0v5KgHho0VE6HU192HnY15de/oDS8UrbBChIFjIhBtc=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=

View file

@ -0,0 +1,25 @@
package bbr
import (
"math"
"time"
"github.com/apernet/quic-go/congestion"
)
// Bandwidth of a connection
type Bandwidth uint64
const infBandwidth Bandwidth = math.MaxUint64
const (
// BitsPerSecond is 1 bit per second
BitsPerSecond Bandwidth = 1
// BytesPerSecond is 1 byte per second
BytesPerSecond = 8 * BitsPerSecond
)
// BandwidthFromDelta calculates the bandwidth from a number of bytes and a time delta
func BandwidthFromDelta(bytes congestion.ByteCount, delta time.Duration) Bandwidth {
return Bandwidth(bytes) * Bandwidth(time.Second) / Bandwidth(delta) * BytesPerSecond
}

View file

@ -0,0 +1,374 @@
package bbr
import (
"math"
"time"
"github.com/apernet/quic-go/congestion"
)
var InfiniteBandwidth = Bandwidth(math.MaxUint64)
// SendTimeState is a subset of ConnectionStateOnSentPacket which is returned
// to the caller when the packet is acked or lost.
type SendTimeState struct {
// Whether other states in this object is valid.
isValid bool
// Whether the sender is app limited at the time the packet was sent.
// App limited bandwidth sample might be artificially low because the sender
// did not have enough data to send in order to saturate the link.
isAppLimited bool
// Total number of sent bytes at the time the packet was sent.
// Includes the packet itself.
totalBytesSent congestion.ByteCount
// Total number of acked bytes at the time the packet was sent.
totalBytesAcked congestion.ByteCount
// Total number of lost bytes at the time the packet was sent.
totalBytesLost congestion.ByteCount
}
// ConnectionStateOnSentPacket represents the information about a sent packet
// and the state of the connection at the moment the packet was sent,
// specifically the information about the most recently acknowledged packet at
// that moment.
type ConnectionStateOnSentPacket struct {
packetNumber congestion.PacketNumber
// Time at which the packet is sent.
sendTime time.Time
// Size of the packet.
size congestion.ByteCount
// The value of |totalBytesSentAtLastAckedPacket| at the time the
// packet was sent.
totalBytesSentAtLastAckedPacket congestion.ByteCount
// The value of |lastAckedPacketSentTime| at the time the packet was
// sent.
lastAckedPacketSentTime time.Time
// The value of |lastAckedPacketAckTime| at the time the packet was
// sent.
lastAckedPacketAckTime time.Time
// Send time states that are returned to the congestion controller when the
// packet is acked or lost.
sendTimeState SendTimeState
}
// BandwidthSample
type BandwidthSample struct {
// The bandwidth at that particular sample. Zero if no valid bandwidth sample
// is available.
bandwidth Bandwidth
// The RTT measurement at this particular sample. Zero if no RTT sample is
// available. Does not correct for delayed ack time.
rtt time.Duration
// States captured when the packet was sent.
stateAtSend SendTimeState
}
func NewBandwidthSample() *BandwidthSample {
return &BandwidthSample{
// FIXME: the default value of original code is zero.
rtt: InfiniteRTT,
}
}
// BandwidthSampler keeps track of sent and acknowledged packets and outputs a
// bandwidth sample for every packet acknowledged. The samples are taken for
// individual packets, and are not filtered; the consumer has to filter the
// bandwidth samples itself. In certain cases, the sampler will locally severely
// underestimate the bandwidth, hence a maximum filter with a size of at least
// one RTT is recommended.
//
// This class bases its samples on the slope of two curves: the number of bytes
// sent over time, and the number of bytes acknowledged as received over time.
// It produces a sample of both slopes for every packet that gets acknowledged,
// based on a slope between two points on each of the corresponding curves. Note
// that due to the packet loss, the number of bytes on each curve might get
// further and further away from each other, meaning that it is not feasible to
// compare byte values coming from different curves with each other.
//
// The obvious points for measuring slope sample are the ones corresponding to
// the packet that was just acknowledged. Let us denote them as S_1 (point at
// which the current packet was sent) and A_1 (point at which the current packet
// was acknowledged). However, taking a slope requires two points on each line,
// so estimating bandwidth requires picking a packet in the past with respect to
// which the slope is measured.
//
// For that purpose, BandwidthSampler always keeps track of the most recently
// acknowledged packet, and records it together with every outgoing packet.
// When a packet gets acknowledged (A_1), it has not only information about when
// it itself was sent (S_1), but also the information about the latest
// acknowledged packet right before it was sent (S_0 and A_0).
//
// Based on that data, send and ack rate are estimated as:
//
// send_rate = (bytes(S_1) - bytes(S_0)) / (time(S_1) - time(S_0))
// ack_rate = (bytes(A_1) - bytes(A_0)) / (time(A_1) - time(A_0))
//
// Here, the ack rate is intuitively the rate we want to treat as bandwidth.
// However, in certain cases (e.g. ack compression) the ack rate at a point may
// end up higher than the rate at which the data was originally sent, which is
// not indicative of the real bandwidth. Hence, we use the send rate as an upper
// bound, and the sample value is
//
// rate_sample = min(send_rate, ack_rate)
//
// An important edge case handled by the sampler is tracking the app-limited
// samples. There are multiple meaning of "app-limited" used interchangeably,
// hence it is important to understand and to be able to distinguish between
// them.
//
// Meaning 1: connection state. The connection is said to be app-limited when
// there is no outstanding data to send. This means that certain bandwidth
// samples in the future would not be an accurate indication of the link
// capacity, and it is important to inform consumer about that. Whenever
// connection becomes app-limited, the sampler is notified via OnAppLimited()
// method.
//
// Meaning 2: a phase in the bandwidth sampler. As soon as the bandwidth
// sampler becomes notified about the connection being app-limited, it enters
// app-limited phase. In that phase, all *sent* packets are marked as
// app-limited. Note that the connection itself does not have to be
// app-limited during the app-limited phase, and in fact it will not be
// (otherwise how would it send packets?). The boolean flag below indicates
// whether the sampler is in that phase.
//
// Meaning 3: a flag on the sent packet and on the sample. If a sent packet is
// sent during the app-limited phase, the resulting sample related to the
// packet will be marked as app-limited.
//
// With the terminology issue out of the way, let us consider the question of
// what kind of situation it addresses.
//
// Consider a scenario where we first send packets 1 to 20 at a regular
// bandwidth, and then immediately run out of data. After a few seconds, we send
// packets 21 to 60, and only receive ack for 21 between sending packets 40 and
// 41. In this case, when we sample bandwidth for packets 21 to 40, the S_0/A_0
// we use to compute the slope is going to be packet 20, a few seconds apart
// from the current packet, hence the resulting estimate would be extremely low
// and not indicative of anything. Only at packet 41 the S_0/A_0 will become 21,
// meaning that the bandwidth sample would exclude the quiescence.
//
// Based on the analysis of that scenario, we implement the following rule: once
// OnAppLimited() is called, all sent packets will produce app-limited samples
// up until an ack for a packet that was sent after OnAppLimited() was called.
// Note that while the scenario above is not the only scenario when the
// connection is app-limited, the approach works in other cases too.
type BandwidthSampler struct {
// The total number of congestion controlled bytes sent during the connection.
totalBytesSent congestion.ByteCount
// The total number of congestion controlled bytes which were acknowledged.
totalBytesAcked congestion.ByteCount
// The total number of congestion controlled bytes which were lost.
totalBytesLost congestion.ByteCount
// The value of |totalBytesSent| at the time the last acknowledged packet
// was sent. Valid only when |lastAckedPacketSentTime| is valid.
totalBytesSentAtLastAckedPacket congestion.ByteCount
// The time at which the last acknowledged packet was sent. Set to
// QuicTime::Zero() if no valid timestamp is available.
lastAckedPacketSentTime time.Time
// The time at which the most recent packet was acknowledged.
lastAckedPacketAckTime time.Time
// The most recently sent packet.
lastSendPacket congestion.PacketNumber
// Indicates whether the bandwidth sampler is currently in an app-limited
// phase.
isAppLimited bool
// The packet that will be acknowledged after this one will cause the sampler
// to exit the app-limited phase.
endOfAppLimitedPhase congestion.PacketNumber
// Record of the connection state at the point where each packet in flight was
// sent, indexed by the packet number.
connectionStats *ConnectionStates
}
func NewBandwidthSampler() *BandwidthSampler {
return &BandwidthSampler{
connectionStats: &ConnectionStates{
stats: make(map[congestion.PacketNumber]*ConnectionStateOnSentPacket),
},
}
}
// OnPacketSent Inputs the sent packet information into the sampler. Assumes that all
// packets are sent in order. The information about the packet will not be
// released from the sampler until it the packet is either acknowledged or
// declared lost.
func (s *BandwidthSampler) OnPacketSent(sentTime time.Time, lastSentPacket congestion.PacketNumber, sentBytes, bytesInFlight congestion.ByteCount, hasRetransmittableData bool) {
s.lastSendPacket = lastSentPacket
if !hasRetransmittableData {
return
}
s.totalBytesSent += sentBytes
// If there are no packets in flight, the time at which the new transmission
// opens can be treated as the A_0 point for the purpose of bandwidth
// sampling. This underestimates bandwidth to some extent, and produces some
// artificially low samples for most packets in flight, but it provides with
// samples at important points where we would not have them otherwise, most
// importantly at the beginning of the connection.
if bytesInFlight == 0 {
s.lastAckedPacketAckTime = sentTime
s.totalBytesSentAtLastAckedPacket = s.totalBytesSent
// In this situation ack compression is not a concern, set send rate to
// effectively infinite.
s.lastAckedPacketSentTime = sentTime
}
s.connectionStats.Insert(lastSentPacket, sentTime, sentBytes, s)
}
// OnPacketAcked Notifies the sampler that the |lastAckedPacket| is acknowledged. Returns a
// bandwidth sample. If no bandwidth sample is available,
// QuicBandwidth::Zero() is returned.
func (s *BandwidthSampler) OnPacketAcked(ackTime time.Time, lastAckedPacket congestion.PacketNumber) *BandwidthSample {
sentPacketState := s.connectionStats.Get(lastAckedPacket)
if sentPacketState == nil {
return NewBandwidthSample()
}
sample := s.onPacketAckedInner(ackTime, lastAckedPacket, sentPacketState)
s.connectionStats.Remove(lastAckedPacket)
return sample
}
// onPacketAckedInner Handles the actual bandwidth calculations, whereas the outer method handles
// retrieving and removing |sentPacket|.
func (s *BandwidthSampler) onPacketAckedInner(ackTime time.Time, lastAckedPacket congestion.PacketNumber, sentPacket *ConnectionStateOnSentPacket) *BandwidthSample {
s.totalBytesAcked += sentPacket.size
s.totalBytesSentAtLastAckedPacket = sentPacket.sendTimeState.totalBytesSent
s.lastAckedPacketSentTime = sentPacket.sendTime
s.lastAckedPacketAckTime = ackTime
// Exit app-limited phase once a packet that was sent while the connection is
// not app-limited is acknowledged.
if s.isAppLimited && lastAckedPacket > s.endOfAppLimitedPhase {
s.isAppLimited = false
}
// There might have been no packets acknowledged at the moment when the
// current packet was sent. In that case, there is no bandwidth sample to
// make.
if sentPacket.lastAckedPacketSentTime.IsZero() {
return NewBandwidthSample()
}
// Infinite rate indicates that the sampler is supposed to discard the
// current send rate sample and use only the ack rate.
sendRate := InfiniteBandwidth
if sentPacket.sendTime.After(sentPacket.lastAckedPacketSentTime) {
sendRate = BandwidthFromDelta(sentPacket.sendTimeState.totalBytesSent-sentPacket.totalBytesSentAtLastAckedPacket, sentPacket.sendTime.Sub(sentPacket.lastAckedPacketSentTime))
}
// During the slope calculation, ensure that ack time of the current packet is
// always larger than the time of the previous packet, otherwise division by
// zero or integer underflow can occur.
if !ackTime.After(sentPacket.lastAckedPacketAckTime) {
// TODO(wub): Compare this code count before and after fixing clock jitter
// issue.
// if sentPacket.lastAckedPacketAckTime.Equal(sentPacket.sendTime) {
// This is the 1st packet after quiescense.
// QUIC_CODE_COUNT_N(quic_prev_ack_time_larger_than_current_ack_time, 1, 2);
// } else {
// QUIC_CODE_COUNT_N(quic_prev_ack_time_larger_than_current_ack_time, 2, 2);
// }
return NewBandwidthSample()
}
ackRate := BandwidthFromDelta(s.totalBytesAcked-sentPacket.sendTimeState.totalBytesAcked,
ackTime.Sub(sentPacket.lastAckedPacketAckTime))
// Note: this sample does not account for delayed acknowledgement time. This
// means that the RTT measurements here can be artificially high, especially
// on low bandwidth connections.
sample := &BandwidthSample{
bandwidth: minBandwidth(sendRate, ackRate),
rtt: ackTime.Sub(sentPacket.sendTime),
}
SentPacketToSendTimeState(sentPacket, &sample.stateAtSend)
return sample
}
// OnPacketLost Informs the sampler that a packet is considered lost and it should no
// longer keep track of it.
func (s *BandwidthSampler) OnPacketLost(packetNumber congestion.PacketNumber) SendTimeState {
ok, sentPacket := s.connectionStats.Remove(packetNumber)
sendTimeState := SendTimeState{
isValid: ok,
}
if sentPacket != nil {
s.totalBytesLost += sentPacket.size
SentPacketToSendTimeState(sentPacket, &sendTimeState)
}
return sendTimeState
}
// OnAppLimited Informs the sampler that the connection is currently app-limited, causing
// the sampler to enter the app-limited phase. The phase will expire by
// itself.
func (s *BandwidthSampler) OnAppLimited() {
s.isAppLimited = true
s.endOfAppLimitedPhase = s.lastSendPacket
}
// SentPacketToSendTimeState Copy a subset of the (private) ConnectionStateOnSentPacket to the (public)
// SendTimeState. Always set send_time_state->is_valid to true.
func SentPacketToSendTimeState(sentPacket *ConnectionStateOnSentPacket, sendTimeState *SendTimeState) {
sendTimeState.isAppLimited = sentPacket.sendTimeState.isAppLimited
sendTimeState.totalBytesSent = sentPacket.sendTimeState.totalBytesSent
sendTimeState.totalBytesAcked = sentPacket.sendTimeState.totalBytesAcked
sendTimeState.totalBytesLost = sentPacket.sendTimeState.totalBytesLost
sendTimeState.isValid = true
}
// ConnectionStates Record of the connection state at the point where each packet in flight was
// sent, indexed by the packet number.
// FIXME: using LinkedList replace map to fast remove all the packets lower than the specified packet number.
type ConnectionStates struct {
stats map[congestion.PacketNumber]*ConnectionStateOnSentPacket
}
func (s *ConnectionStates) Insert(packetNumber congestion.PacketNumber, sentTime time.Time, bytes congestion.ByteCount, sampler *BandwidthSampler) bool {
if _, ok := s.stats[packetNumber]; ok {
return false
}
s.stats[packetNumber] = NewConnectionStateOnSentPacket(packetNumber, sentTime, bytes, sampler)
return true
}
func (s *ConnectionStates) Get(packetNumber congestion.PacketNumber) *ConnectionStateOnSentPacket {
return s.stats[packetNumber]
}
func (s *ConnectionStates) Remove(packetNumber congestion.PacketNumber) (bool, *ConnectionStateOnSentPacket) {
state, ok := s.stats[packetNumber]
if ok {
delete(s.stats, packetNumber)
}
return ok, state
}
func NewConnectionStateOnSentPacket(packetNumber congestion.PacketNumber, sentTime time.Time, bytes congestion.ByteCount, sampler *BandwidthSampler) *ConnectionStateOnSentPacket {
return &ConnectionStateOnSentPacket{
packetNumber: packetNumber,
sendTime: sentTime,
size: bytes,
lastAckedPacketSentTime: sampler.lastAckedPacketSentTime,
lastAckedPacketAckTime: sampler.lastAckedPacketAckTime,
totalBytesSentAtLastAckedPacket: sampler.totalBytesSentAtLastAckedPacket,
sendTimeState: SendTimeState{
isValid: true,
isAppLimited: sampler.isAppLimited,
totalBytesSent: sampler.totalBytesSent,
totalBytesAcked: sampler.totalBytesAcked,
totalBytesLost: sampler.totalBytesLost,
},
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,18 @@
package bbr
import "time"
// A Clock returns the current time
type Clock interface {
Now() time.Time
}
// DefaultClock implements the Clock interface using the Go stdlib clock.
type DefaultClock struct{}
var _ Clock = DefaultClock{}
// Now gets the current time
func (DefaultClock) Now() time.Time {
return time.Now()
}

View file

@ -0,0 +1,132 @@
package bbr
// WindowedFilter Use the following to construct a windowed filter object of type T.
// For example, a min filter using QuicTime as the time type:
//
// WindowedFilter<T, MinFilter<T>, QuicTime, QuicTime::Delta> ObjectName;
//
// A max filter using 64-bit integers as the time type:
//
// WindowedFilter<T, MaxFilter<T>, uint64_t, int64_t> ObjectName;
//
// Specifically, this template takes four arguments:
// 1. T -- type of the measurement that is being filtered.
// 2. Compare -- MinFilter<T> or MaxFilter<T>, depending on the type of filter
// desired.
// 3. TimeT -- the type used to represent timestamps.
// 4. TimeDeltaT -- the type used to represent continuous time intervals between
// two timestamps. Has to be the type of (a - b) if both |a| and |b| are
// of type TimeT.
type WindowedFilter struct {
// Time length of window.
windowLength int64
estimates []Sample
comparator func(int64, int64) bool
}
type Sample struct {
sample int64
time int64
}
// Compares two values and returns true if the first is greater than or equal
// to the second.
func MaxFilter(a, b int64) bool {
return a >= b
}
// Compares two values and returns true if the first is less than or equal
// to the second.
func MinFilter(a, b int64) bool {
return a <= b
}
func NewWindowedFilter(windowLength int64, comparator func(int64, int64) bool) *WindowedFilter {
return &WindowedFilter{
windowLength: windowLength,
estimates: make([]Sample, 3),
comparator: comparator,
}
}
// Changes the window length. Does not update any current samples.
func (f *WindowedFilter) SetWindowLength(windowLength int64) {
f.windowLength = windowLength
}
func (f *WindowedFilter) GetBest() int64 {
return f.estimates[0].sample
}
func (f *WindowedFilter) GetSecondBest() int64 {
return f.estimates[1].sample
}
func (f *WindowedFilter) GetThirdBest() int64 {
return f.estimates[2].sample
}
func (f *WindowedFilter) Update(sample, time int64) {
if f.estimates[0].time == 0 || f.comparator(sample, f.estimates[0].sample) || (time-f.estimates[2].time) > f.windowLength {
f.Reset(sample, time)
return
}
if f.comparator(sample, f.estimates[1].sample) {
f.estimates[1].sample = sample
f.estimates[1].time = time
f.estimates[2].sample = sample
f.estimates[2].time = time
} else if f.comparator(sample, f.estimates[2].sample) {
f.estimates[2].sample = sample
f.estimates[2].time = time
}
// Expire and update estimates as necessary.
if time-f.estimates[0].time > f.windowLength {
// The best estimate hasn't been updated for an entire window, so promote
// second and third best estimates.
f.estimates[0].sample = f.estimates[1].sample
f.estimates[0].time = f.estimates[1].time
f.estimates[1].sample = f.estimates[2].sample
f.estimates[1].time = f.estimates[2].time
f.estimates[2].sample = sample
f.estimates[2].time = time
// Need to iterate one more time. Check if the new best estimate is
// outside the window as well, since it may also have been recorded a
// long time ago. Don't need to iterate once more since we cover that
// case at the beginning of the method.
if time-f.estimates[0].time > f.windowLength {
f.estimates[0].sample = f.estimates[1].sample
f.estimates[0].time = f.estimates[1].time
f.estimates[1].sample = f.estimates[2].sample
f.estimates[1].time = f.estimates[2].time
}
return
}
if f.estimates[1].sample == f.estimates[0].sample && time-f.estimates[1].time > f.windowLength>>2 {
// A quarter of the window has passed without a better sample, so the
// second-best estimate is taken from the second quarter of the window.
f.estimates[1].sample = sample
f.estimates[1].time = time
f.estimates[2].sample = sample
f.estimates[2].time = time
return
}
if f.estimates[2].sample == f.estimates[1].sample && time-f.estimates[2].time > f.windowLength>>1 {
// We've passed a half of the window without a better estimate, so take
// a third-best estimate from the second half of the window.
f.estimates[2].sample = sample
f.estimates[2].time = time
}
}
func (f *WindowedFilter) Reset(newSample, newTime int64) {
f.estimates[0].sample = newSample
f.estimates[0].time = newTime
f.estimates[1].sample = newSample
f.estimates[1].time = newTime
f.estimates[2].sample = newSample
f.estimates[2].time = newTime
}

View file

@ -1,14 +1,14 @@
package congestion
package brutal
import (
"time"
"github.com/apernet/hysteria/core/internal/congestion/common"
"github.com/apernet/quic-go/congestion"
)
const (
initMaxDatagramSize = 1252
pktInfoSlotCount = 4
minSampleCount = 50
minAckRate = 0.8
@ -20,7 +20,7 @@ type BrutalSender struct {
rttStats congestion.RTTStatsProvider
bps congestion.ByteCount
maxDatagramSize congestion.ByteCount
pacer *pacer
pacer *common.Pacer
pktInfoSlots [pktInfoSlotCount]pktInfo
ackRate float64
@ -35,10 +35,10 @@ type pktInfo struct {
func NewBrutalSender(bps uint64) *BrutalSender {
bs := &BrutalSender{
bps: congestion.ByteCount(bps),
maxDatagramSize: initMaxDatagramSize,
maxDatagramSize: common.InitMaxDatagramSize,
ackRate: 1,
}
bs.pacer = newPacer(func() congestion.ByteCount {
bs.pacer = common.NewPacer(func() congestion.ByteCount {
return congestion.ByteCount(float64(bs.bps) / bs.ackRate)
})
return bs
@ -142,10 +142,3 @@ func (b *BrutalSender) InRecovery() bool {
func (b *BrutalSender) MaybeExitSlowStart() {}
func (b *BrutalSender) OnRetransmissionTimeout(packetsRetransmitted bool) {}
func maxDuration(a, b time.Duration) time.Duration {
if a > b {
return a
}
return b
}

View file

@ -1,4 +1,4 @@
package congestion
package common
import (
"math"
@ -8,28 +8,30 @@ import (
)
const (
InitMaxDatagramSize = 1252
maxBurstPackets = 10
minPacingDelay = time.Millisecond
)
// The pacer implements a token bucket pacing algorithm.
type pacer struct {
// Pacer implements a token bucket pacing algorithm.
type Pacer struct {
budgetAtLastSent congestion.ByteCount
maxDatagramSize congestion.ByteCount
lastSentTime time.Time
getBandwidth func() congestion.ByteCount // in bytes/s
}
func newPacer(getBandwidth func() congestion.ByteCount) *pacer {
p := &pacer{
budgetAtLastSent: maxBurstPackets * initMaxDatagramSize,
maxDatagramSize: initMaxDatagramSize,
func NewPacer(getBandwidth func() congestion.ByteCount) *Pacer {
p := &Pacer{
budgetAtLastSent: maxBurstPackets * InitMaxDatagramSize,
maxDatagramSize: InitMaxDatagramSize,
getBandwidth: getBandwidth,
}
return p
}
func (p *pacer) SentPacket(sendTime time.Time, size congestion.ByteCount) {
func (p *Pacer) SentPacket(sendTime time.Time, size congestion.ByteCount) {
budget := p.Budget(sendTime)
if size > budget {
p.budgetAtLastSent = 0
@ -39,7 +41,7 @@ func (p *pacer) SentPacket(sendTime time.Time, size congestion.ByteCount) {
p.lastSentTime = sendTime
}
func (p *pacer) Budget(now time.Time) congestion.ByteCount {
func (p *Pacer) Budget(now time.Time) congestion.ByteCount {
if p.lastSentTime.IsZero() {
return p.maxBurstSize()
}
@ -47,7 +49,7 @@ func (p *pacer) Budget(now time.Time) congestion.ByteCount {
return minByteCount(p.maxBurstSize(), budget)
}
func (p *pacer) maxBurstSize() congestion.ByteCount {
func (p *Pacer) maxBurstSize() congestion.ByteCount {
return maxByteCount(
congestion.ByteCount((minPacingDelay+time.Millisecond).Nanoseconds())*p.getBandwidth()/1e9,
maxBurstPackets*p.maxDatagramSize,
@ -56,7 +58,7 @@ func (p *pacer) maxBurstSize() congestion.ByteCount {
// TimeUntilSend returns when the next packet should be sent.
// It returns the zero value of time.Time if a packet can be sent immediately.
func (p *pacer) TimeUntilSend() time.Time {
func (p *Pacer) TimeUntilSend() time.Time {
if p.budgetAtLastSent >= p.maxDatagramSize {
return time.Time{}
}
@ -67,7 +69,7 @@ func (p *pacer) TimeUntilSend() time.Time {
))
}
func (p *pacer) SetMaxDatagramSize(s congestion.ByteCount) {
func (p *Pacer) SetMaxDatagramSize(s congestion.ByteCount) {
p.maxDatagramSize = s
}
@ -84,3 +86,10 @@ func minByteCount(a, b congestion.ByteCount) congestion.ByteCount {
}
return b
}
func maxDuration(a, b time.Duration) time.Duration {
if a > b {
return a
}
return b
}

View file

@ -9,7 +9,9 @@ import (
"github.com/apernet/quic-go"
"github.com/apernet/quic-go/http3"
"github.com/apernet/hysteria/core/internal/congestion"
"github.com/apernet/hysteria/core/internal/congestion/bbr"
"github.com/apernet/hysteria/core/internal/congestion/brutal"
"github.com/apernet/hysteria/core/internal/congestion/common"
"github.com/apernet/hysteria/core/internal/protocol"
"github.com/apernet/hysteria/core/internal/utils"
)
@ -126,9 +128,16 @@ func (h *h3sHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Set authenticated flag
h.authenticated = true
h.authID = id
// Update congestion control when applicable
// Use Brutal CC if actualTx > 0, otherwise use BBR
if actualTx > 0 {
h.conn.SetCongestionControl(congestion.NewBrutalSender(actualTx))
h.conn.SetCongestionControl(brutal.NewBrutalSender(actualTx))
} else {
h.conn.SetCongestionControl(bbr.NewBBRSender(
bbr.DefaultClock{},
bbr.GetInitialPacketSize(h.conn.RemoteAddr()),
32*common.InitMaxDatagramSize,
bbr.DefaultBBRMaxCongestionWindow*common.InitMaxDatagramSize,
))
}
// Auth OK, send response
protocol.AuthResponseDataToHeader(w.Header(), !h.config.DisableUDP, h.config.BandwidthConfig.MaxRx)

View file

@ -20,6 +20,7 @@ require (
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-20 v0.3.1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/zhangyunhao116/fastrand v0.3.0 // indirect
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.12.0 // indirect

View file

@ -39,6 +39,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zhangyunhao116/fastrand v0.3.0 h1:7bwe124xcckPulX6fxtr2lFdO2KQqaefdtbk+mqO/Ig=
github.com/zhangyunhao116/fastrand v0.3.0/go.mod h1:0v5KgHho0VE6HU192HnY15de/oDS8UrbBChIFjIhBtc=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=