summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--cmd/derper/derper.go11
-rw-r--r--derp/derp_server.go69
-rw-r--r--derp/dropreason_string.go5
-rw-r--r--derp/limiter.go171
-rw-r--r--derp/limiter_test.go56
5 files changed, 310 insertions, 2 deletions
diff --git a/cmd/derper/derper.go b/cmd/derper/derper.go
index f98b45375..07a38af9f 100644
--- a/cmd/derper/derper.go
+++ b/cmd/derper/derper.go
@@ -57,6 +57,11 @@ var (
acceptConnLimit = flag.Float64("accept-connection-limit", math.Inf(+1), "rate limit for accepting new connection")
acceptConnBurst = flag.Int("accept-connection-burst", math.MaxInt, "burst limit for accepting new connection")
+
+ egressInterface = flag.String("egress-interface", "", "the interface to monitor for automatic ratelimit tuning")
+ egressDataLimit = flag.Int("egress-data-limit", 100*1024*1024/8, "the bandwidth in bytes/s the server will try to stay under, only applies if egress-interface is set")
+ clientDataMin = flag.Int("client-data-min-limit", 1024*1024/8, "minimum bandwidth in bytes/s for a single client, only applies if egress-interface is set")
+ clientDataBurst = flag.Int("client-data-burst", 3*1024*1024, "burst limit in bytes for forwarded data from a single client, only applies if egress-interface is set")
)
var (
@@ -154,6 +159,12 @@ func main() {
s := derp.NewServer(cfg.PrivateKey, log.Printf)
s.SetVerifyClient(*verifyClients)
+ if *egressInterface != "" && *egressDataLimit > 0 {
+ if err := s.StartEgressRateLimiter(*egressInterface, *egressDataLimit, *clientDataMin, *clientDataBurst); err != nil {
+ log.Fatalf("failed to start egress rate limiter: %v", err)
+ }
+ }
+
if *meshPSKFile != "" {
b, err := ioutil.ReadFile(*meshPSKFile)
if err != nil {
diff --git a/derp/derp_server.go b/derp/derp_server.go
index 52680cace..afb0e6b23 100644
--- a/derp/derp_server.go
+++ b/derp/derp_server.go
@@ -107,6 +107,9 @@ type Server struct {
metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
dupPolicy dupPolicy
+ clientDataLimit *uint64 // limit for how many bytes/s of content a client can send; atomic
+ clientDataBurst int // burst limit for how many bytes/s of content a client can send
+
// Counters:
packetsSent, bytesSent expvar.Int
packetsRecv, bytesRecv expvar.Int
@@ -314,7 +317,10 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
sentTo: map[key.NodePublic]map[key.NodePublic]int64{},
avgQueueDuration: new(uint64),
keyOfAddr: map[netip.AddrPort]key.NodePublic{},
+ clientDataLimit: new(uint64),
+ clientDataBurst: 10 * 1024 * 1024, // 10Mb default burst
}
+ atomic.StoreUint64(s.clientDataLimit, 12*1024*1024) // 12Mb/s default ratelimit
s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
s.packetsRecvOther = s.packetsRecvByKind.Get("other")
@@ -325,12 +331,48 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
s.packetsDroppedReason.Get("queue_head"),
s.packetsDroppedReason.Get("queue_tail"),
s.packetsDroppedReason.Get("write_error"),
+ s.packetsDroppedReason.Get("rate_limited"),
}
s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco")
s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other")
return s
}
+// StartEgressRateLimiter starts dynamically adjusting the rate limit
+// based on the desired limit and the utilization of the specified interface.
+//
+// It must be called before serving begins. All limits are in bytes/s.
+func (s *Server) StartEgressRateLimiter(interfaceName string, egressDataLimit, clientDataMin, clientDataBurst int) error {
+ limiter, err := newEgressLimiter(interfaceName, uint64(egressDataLimit), uint64(clientDataMin))
+ if err != nil {
+ return fmt.Errorf("starting limiter: %v", err)
+ }
+
+ atomic.StoreUint64(s.clientDataLimit, uint64(egressDataLimit))
+ s.clientDataBurst = clientDataBurst
+
+ go func() {
+ t := time.NewTicker(time.Second)
+ defer t.Stop()
+
+ for {
+ limit, err := limiter.Limit()
+ if err != nil {
+ s.logf("derp: failed to update egress limiter: %v", err)
+ return
+ }
+ atomic.StoreUint64(s.clientDataLimit, uint64(limit))
+
+ <-t.C
+ if s.closed {
+ return
+ }
+ }
+ }()
+
+ return nil
+}
+
// SetMesh sets the pre-shared key that regional DERP servers used to mesh
// amongst themselves.
//
@@ -664,6 +706,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
remoteIPPort, _ := netip.ParseAddrPort(remoteAddr)
+ rateLimiter := rate.NewLimiter(rate.Limit(atomic.LoadUint64(s.clientDataLimit)), s.clientDataBurst)
c := &sclient{
connNum: connNum,
s: s,
@@ -681,6 +724,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
sendPongCh: make(chan [8]byte, 1),
peerGone: make(chan key.NodePublic),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
+ rateLimiter: rateLimiter,
}
if c.canMesh {
@@ -757,6 +801,18 @@ func (c *sclient) run(ctx context.Context) error {
}
}
+func (c *sclient) shouldRatelimitData(dataLen int) bool {
+ if c.canMesh {
+ return false // Mesh connections arent regular clients.
+ }
+
+ now := time.Now()
+ if rateLimit := atomic.LoadUint64(c.s.clientDataLimit); rateLimit != uint64(c.rateLimiter.Limit()) {
+ c.rateLimiter.SetLimitAt(now, rate.Limit(rateLimit))
+ }
+ return !c.rateLimiter.AllowN(now, dataLen)
+}
+
func (c *sclient) handleUnknownFrame(ft frameType, fl uint32) error {
_, err := io.CopyN(ioutil.Discard, c.br, int64(fl))
return err
@@ -858,6 +914,11 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
}
s.packetsForwardedIn.Add(1)
+ if c.shouldRatelimitData(len(contents)) {
+ s.recordDrop(contents, c.key, dstKey, dropReasonRateLimited)
+ return nil
+ }
+
var dstLen int
var dst *sclient
@@ -908,6 +969,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return fmt.Errorf("client %x: recvPacket: %v", c.key, err)
}
+ if c.shouldRatelimitData(len(contents)) {
+ s.recordDrop(contents, c.key, dstKey, dropReasonRateLimited)
+ return nil
+ }
+
var fwd PacketForwarder
var dstLen int
var dst *sclient
@@ -962,6 +1028,7 @@ const (
dropReasonQueueTail // destination queue is full, dropped packet at queue tail
dropReasonWriteError // OS write() failed
dropReasonDupClient // the public key is connected 2+ times (active/active, fighting)
+ dropReasonRateLimited // send/forward packet content exceeds rate limit
)
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) {
@@ -1254,6 +1321,7 @@ type sclient struct {
canMesh bool // clientInfo had correct mesh token for inter-region routing
isDup atomic.Bool // whether more than 1 sclient for key is connected
isDisabled atomic.Bool // whether sends to this peer are disabled due to active/active dups
+ rateLimiter *rate.Limiter
// replaceLimiter controls how quickly two connections with
// the same client key can kick each other off the server by
@@ -1700,6 +1768,7 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("average_queue_duration_ms", expvar.Func(func() any {
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
}))
+ m.Set("client_ratelimit_bytes_per_second", expvar.Func(func() any { return atomic.LoadUint64(s.clientDataLimit) }))
var expvarVersion expvar.String
expvarVersion.Set(version.Long)
m.Set("version", &expvarVersion)
diff --git a/derp/dropreason_string.go b/derp/dropreason_string.go
index 5ed41a26b..1f58e88a6 100644
--- a/derp/dropreason_string.go
+++ b/derp/dropreason_string.go
@@ -19,11 +19,12 @@ func _() {
_ = x[dropReasonQueueTail-4]
_ = x[dropReasonWriteError-5]
_ = x[dropReasonDupClient-6]
+ _ = x[dropReasonRateLimited-7]
}
-const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClient"
+const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClientRateLimited"
-var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68}
+var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68, 79}
func (i dropReason) String() string {
if i < 0 || i >= dropReason(len(_dropReason_index)-1) {
diff --git a/derp/limiter.go b/derp/limiter.go
new file mode 100644
index 000000000..1cd653013
--- /dev/null
+++ b/derp/limiter.go
@@ -0,0 +1,171 @@
+// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package derp
+
+import (
+ "io/ioutil"
+ "strconv"
+ "strings"
+ "time"
+)
+
+func readTxBytes(interfaceName string) (uint64, error) {
+ v, err := ioutil.ReadFile("/sys/class/net/" + interfaceName + "/statistics/tx_bytes")
+ if err != nil {
+ return 0, err
+ }
+ tx, err := strconv.Atoi(strings.TrimSpace(string(v)))
+ if err != nil {
+ return 0, err
+ }
+ return uint64(tx), nil
+}
+
+type egressLimiter struct {
+ interfaceName string
+ limitBytesSec uint64 // the egress bytes/s we want to stay under.
+ minBytesSec uint64 // the minimum bytes/s rate limit.
+
+ lastTxBytes uint64
+ controlLoop limiterLoop
+}
+
+func newEgressLimiter(interfaceName string, limitBytesSec, minBytesSec uint64) (*egressLimiter, error) {
+ initial, err := readTxBytes(interfaceName)
+ if err != nil {
+ return nil, err
+ }
+
+ return &egressLimiter{
+ interfaceName: interfaceName,
+ limitBytesSec: limitBytesSec,
+ minBytesSec: minBytesSec,
+ lastTxBytes: initial,
+ controlLoop: newLimiterLoop(limitBytesSec, time.Now()),
+ }, err
+}
+
+// Limit returns the current rate limit value based on interface utilization.
+func (e *egressLimiter) Limit() (uint64, error) {
+ rx, err := readTxBytes(e.interfaceName)
+ if err != nil {
+ return 0, err
+ }
+
+ last := e.lastTxBytes
+ e.lastTxBytes = rx
+
+ limit := e.controlLoop.tick(uint64(rx)-last, time.Now())
+ if limit < 0 || uint64(limit) < e.minBytesSec {
+ limit = float64(e.minBytesSec)
+ }
+ if uint64(limit) > e.limitBytesSec {
+ limit = float64(e.limitBytesSec)
+ }
+ return uint64(limit), nil
+}
+
+// PID loop values for the dynamic ratelimit.
+// The wikipedia page on PID is recommended reading if you are not familiar
+// with PID loops or open-loop control theory.
+//
+// Gain values are unitless, but operate on a feedback value in bytes
+// and a setpoint value in bytes/s, and a time delta (dt) of seconds.
+//
+// These values are initial and should be tuned: These are just initial
+// values based on first principles and vibin with pretty graphs.
+const (
+ // Proportional gain.
+ // Given this represents a global ratelimit, the P term doesnt make a lot of
+ // sense, as each clients contribution to link utilization is entirely
+ // dependent on the client workload.
+ //
+ // For this reason, its set super low: Its expected the I term will do
+ // most of the heavy lifting.
+ limiterP float64 = 1.0 / 1024
+ // Derivative gain.
+ // This term reacts against 'trends', that is, the first derivative of
+ // the feedback value. Think of it like a rapid-change damper.
+ //
+ // This isnt super important, so again we've set it fairly low.
+ limiterD float64 = 0.003
+ // Integral gain.
+ //
+ // This is where all the heavy lifting happens. Basically, we increase
+ // the ratelimit (by limiterIP) when we have room to spare, and
+ // decrease it once we exceed 4/5ths of the limit (by limiterIN).
+ // The increase is linear to the error between feedback and the setpoint,
+ // but clamped proportionate to the limit.
+ //
+ // The decrease term is stronger than the increase term, so we 'backoff
+ // quickly' when we are approaching limits, but test the waters on
+ // the other end cautiously.
+ limiterIP float64 = 0.008
+ limiterIN float64 = 0.3
+)
+
+// limiterLoop exposes a dynamic ratelimit, based on the egress rate
+// of some interface. The PID loop tries to keep egress at 4/5 of the limit.
+type limiterLoop struct {
+ limitBytesSec uint64 // the egress bytes/s we want to stay under.
+
+ integral float64 // the integral sum at lastUpdate instant
+ lastEgress uint64 // feedback value of previous iteration, bytes/s
+ lastUpdate time.Time // instant at which last iteration occurred.
+}
+
+func newLimiterLoop(limitBytesSec uint64, now time.Time) limiterLoop {
+ return limiterLoop{
+ limitBytesSec: limitBytesSec * 4 / 5,
+ lastUpdate: now,
+ lastEgress: 0,
+ integral: float64(limitBytesSec),
+ }
+}
+
+// tick computes & returns the ratelimit value in bytes/s, computing
+// the next iteration of the PID loop in the process.
+func (l *limiterLoop) tick(egressBytesPerSec uint64, now time.Time) float64 {
+ var (
+ dt = now.Sub(l.lastUpdate).Seconds()
+ err = float64(l.limitBytesSec) - float64(egressBytesPerSec)
+ )
+
+ // Integral term.
+ var iDelta float64
+ if err > 0 {
+ iDelta = err * dt * limiterIP
+ } else {
+ iDelta = err * dt * limiterIN
+ }
+ // Constrain integral sum change to a 20th of the setpoint per second.
+ maxDelta := dt * float64(l.limitBytesSec) / 20
+ if iDelta > maxDelta {
+ iDelta = maxDelta
+ } else if iDelta < -maxDelta {
+ iDelta = -maxDelta
+ }
+ l.integral += iDelta
+ // Constrain integral sum to prevent windup.
+ if max := float64(l.limitBytesSec); l.integral > max {
+ l.integral = max
+ } else if l.integral < -max {
+ l.integral = -max
+ }
+
+ // Derivative term.
+ var d float64
+ if dt > 0 {
+ d = -(float64(egressBytesPerSec-l.lastEgress) / dt) * limiterD
+ }
+ // Proportional term.
+ p := limiterP * err
+
+ l.lastEgress = egressBytesPerSec
+ l.lastUpdate = now
+ output := p + l.integral + d
+ // fmt.Printf("in=%d, out=%0.3f: p=%0.2f d=%0.2f i=%0.2f\n", egressBytesPerSec, output, p, d, l.integral)
+ return output
+}
diff --git a/derp/limiter_test.go b/derp/limiter_test.go
new file mode 100644
index 000000000..be0466143
--- /dev/null
+++ b/derp/limiter_test.go
@@ -0,0 +1,56 @@
+// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package derp
+
+import (
+ "testing"
+ "time"
+)
+
+func mb(mb uint64) uint64 {
+ return mb * 1024 * 1024
+}
+
+func TestLimiterLoopGradual(t *testing.T) {
+ // Make a limiter that tries to keep under 200Mb/s.
+ limit := mb(200)
+ start := time.Now()
+ l := newLimiterLoop(limit, start)
+
+ // Make sure the initial value is sane.
+ // Lets imagine the egress is only like 1Mb/s.
+ now := start.Add(time.Second)
+ if v := uint64(l.tick(1024*1024, now)); v < mb(150) || v > limit {
+ t.Errorf("initial value = %dMb/s, want 150 < value < limit", v/1024/1024)
+ }
+
+ // Tick through 10 minutes of low usage. Lets make sure the limit stays high.
+ lowUsage := limit / 10
+ for i := 0; i < 600; i++ {
+ now = now.Add(time.Second)
+ v := uint64(l.tick(lowUsage, now))
+
+ if v < mb(150) {
+ t.Errorf("[t=%0.f] limit too low for low usage: %d (expected >150)", now.Sub(start).Seconds(), v/1024/1024)
+ }
+ }
+
+ // Lets tick through 60 seconds of steadily-increasing usage.
+ for i := 0; i < 60; i++ {
+ now = now.Add(time.Second)
+ l.tick(uint64(i)*limit/60, now)
+ }
+ if v := uint64(l.tick(limit, now)); v > mb(100) || v < mb(1) {
+ t.Errorf("[t=%0.f] limit = %dMb/s, want 1-100Mb/s", now.Sub(start).Seconds(), v/1024/1024)
+ }
+ // Lets imagine we are at limits for 10s. Does the limit drop pretty hard?
+ for i := 0; i < 10; i++ {
+ now = now.Add(time.Second)
+ l.tick(limit, now)
+ }
+ if v := uint64(l.tick(limit, now)); v > mb(20) || v < mb(1) {
+ t.Errorf("[t=%0.f] limit = %dMb/s, want 1-20Mb/s", now.Sub(start).Seconds(), v/1024/1024)
+ }
+}