summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--derp/derpserver/derpserver.go143
-rw-r--r--derp/derpserver/derpserver_test.go202
2 files changed, 305 insertions, 40 deletions
diff --git a/derp/derpserver/derpserver.go b/derp/derpserver/derpserver.go
index 0959a4729..816ed3f83 100644
--- a/derp/derpserver/derpserver.go
+++ b/derp/derpserver/derpserver.go
@@ -87,6 +87,12 @@ const (
defaultPerClientSendQueueDepth = 32 // default packets buffered for sending
DefaultTCPWiteTimeout = 2 * time.Second
privilegedWriteTimeout = 30 * time.Second // for clients with the mesh key
+
+ // notHereCacheTTL is how long we suppress repeated lookups and
+ // peerGone notifications for a destination that is not connected
+ // to this server. Must be longer than the client's disco retry
+ // interval (3-5s) to suppress at least one retry.
+ notHereCacheTTL = 10 * time.Second
)
func getPerClientSendQueueDepth() int {
@@ -140,37 +146,44 @@ type Server struct {
localClient local.Client
// Counters:
- packetsSent, bytesSent expvar.Int
- packetsRecv, bytesRecv expvar.Int
- packetsRecvByKind metrics.LabelMap
- packetsRecvDisco *expvar.Int
- packetsRecvOther *expvar.Int
- _ align64
- packetsForwardedOut expvar.Int
- packetsForwardedIn expvar.Int
- peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent
- peerGoneNotHereFrames expvar.Int // number of peer not here frames sent
- gotPing expvar.Int // number of ping frames from client
- sentPong expvar.Int // number of pong frames enqueued to client
- accepts expvar.Int
- curClients expvar.Int
- curClientsNotIdeal expvar.Int
- curHomeClients expvar.Int // ones with preferred
- dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
- dupClientConns expvar.Int // current number of connections sharing a public key
- dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
- unknownFrames expvar.Int
- homeMovesIn expvar.Int // established clients announce home server moves in
- homeMovesOut expvar.Int // established clients announce home server moves out
- multiForwarderCreated expvar.Int
- multiForwarderDeleted expvar.Int
- removePktForwardOther expvar.Int
- sclientWriteTimeouts expvar.Int
- avgQueueDuration *uint64 // In milliseconds; accessed atomically
- tcpRtt metrics.LabelMap // histogram
- meshUpdateBatchSize *metrics.Histogram
- meshUpdateLoopCount *metrics.Histogram
- bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
+ packetsSent, bytesSent expvar.Int
+ packetsRecv, bytesRecv expvar.Int
+ packetsRecvByKind metrics.LabelMap
+ packetsRecvDisco *expvar.Int
+ packetsRecvOther *expvar.Int
+ _ align64
+ packetsForwardedOut expvar.Int
+ packetsForwardedIn expvar.Int
+ peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent
+ peerGoneNotHereFrames expvar.Int // number of peer not here frames sent
+ packetsDroppedCachedNotHere expvar.Int // number of packets dropped via not-here cache
+ gotPing expvar.Int // number of ping frames from client
+ sentPong expvar.Int // number of pong frames enqueued to client
+ accepts expvar.Int
+ curClients expvar.Int
+ curClientsNotIdeal expvar.Int
+ curHomeClients expvar.Int // ones with preferred
+ dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
+ dupClientConns expvar.Int // current number of connections sharing a public key
+ dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
+ unknownFrames expvar.Int
+ homeMovesIn expvar.Int // established clients announce home server moves in
+ homeMovesOut expvar.Int // established clients announce home server moves out
+ multiForwarderCreated expvar.Int
+ multiForwarderDeleted expvar.Int
+ removePktForwardOther expvar.Int
+ sclientWriteTimeouts expvar.Int
+ avgQueueDuration *uint64 // In milliseconds; accessed atomically
+ tcpRtt metrics.LabelMap // histogram
+ meshUpdateBatchSize *metrics.Histogram
+ meshUpdateLoopCount *metrics.Histogram
+ bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
+
+ // notHereCache is a server-wide cache of destination keys that are
+ // not connected to this server. Shared across all client goroutines
+ // to avoid repeated mutex-protected client map lookups.
+ // Key: key.NodePublic, Value: time.Time (when entry was added).
+ notHereCache sync.Map
// verifyClientsLocalTailscaled only accepts client connections to the DERP
// server if the clientKey is a known peer in the network, as specified by a
@@ -667,6 +680,10 @@ func (s *Server) ModifyTLSConfigToAddMetaCert(c *tls.Config) {
// observe EOFs/timeouts) but won't send them frames on the assumption
// that they're dead.
func (s *Server) registerClient(c *sclient) {
+ // Invalidate not-here cache so other clients sending to this
+ // key will do a fresh lookup and find the newly connected peer.
+ s.notHereCache.Delete(c.key)
+
s.mu.Lock()
defer s.mu.Unlock()
@@ -866,15 +883,21 @@ func (s *Server) notePeerGoneFromRegionLocked(key key.NodePublic) {
// requestPeerGoneWriteLimited sends a request to write a "peer gone"
// frame, but only in reply to a disco packet, and only if we haven't
-// sent one recently.
+// already notified this client about this specific peer recently.
+// The caller must add to notHereCache separately. The peerGoneLim
+// provides a per-client safety backstop against clients sending to
+// many unique absent destinations.
func (c *sclient) requestPeerGoneWriteLimited(peer key.NodePublic, contents []byte, reason derp.PeerGoneReasonType) {
- if disco.LooksLikeDiscoWrapper(contents) != true {
+ if !disco.LooksLikeDiscoWrapper(contents) {
return
}
- if c.peerGoneLim.Allow() {
- go c.requestPeerGoneWrite(peer, reason)
+ // Per-client safety backstop.
+ if !c.peerGoneLim.Allow() {
+ return
}
+
+ go c.requestPeerGoneWrite(peer, reason)
}
func (s *Server) addWatcher(c *sclient) {
@@ -952,7 +975,7 @@ func (s *Server) accept(ctx context.Context, nc derp.Conn, brw *bufio.ReadWriter
peerGone: make(chan peerGoneMsg),
canMesh: s.isMeshPeer(clientInfo),
isNotIdealConn: IdealNodeContextKey.Value(ctx) != "",
- peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 3),
+ peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 30),
}
if c.canMesh {
@@ -1185,10 +1208,14 @@ func (c *sclient) handleFrameForwardPacket(ft derp.FrameType, fl uint32) error {
func (c *sclient) handleFrameSendPacket(ft derp.FrameType, fl uint32) error {
s := c.s
- dstKey, contents, err := s.recvPacket(c.br, fl)
+ dstKey, contents, err := s.recvPacket(c, fl)
if err != nil {
return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
}
+ if contents == nil {
+ // Packet was dropped early by the not-here cache.
+ return nil
+ }
var fwd PacketForwarder
var dstLen int
@@ -1220,6 +1247,7 @@ func (c *sclient) handleFrameSendPacket(ft derp.FrameType, fl uint32) error {
reason = dropReasonDupClient
} else {
c.requestPeerGoneWriteLimited(dstKey, contents, derp.PeerGoneReasonNotHere)
+ s.addNotHereCache(dstKey, s.clock.Now())
}
s.recordDrop(contents, c.key, dstKey, reason)
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
@@ -1559,19 +1587,34 @@ func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info
return clientKey, info, nil
}
-func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
+// recvPacket reads a send-packet frame from br. If the destination key
+// is in the client's not-here cache, the payload is discarded without
+// allocation and contents is returned as nil.
+func (s *Server) recvPacket(c *sclient, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
if frameLen < derp.KeyLen {
return zpub, nil, errors.New("short send packet frame")
}
- if err := dstKey.ReadRawWithoutAllocating(br); err != nil {
+ if err := dstKey.ReadRawWithoutAllocating(c.br); err != nil {
return zpub, nil, err
}
packetLen := frameLen - derp.KeyLen
if packetLen > derp.MaxPacketSize {
return zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, derp.MaxPacketSize)
}
+
+ // Check not-here cache before reading payload. This avoids
+ // payload allocation, io.ReadFull, s.mu lock, and client map
+ // lookup for repeated packets to absent destinations.
+ if s.isNotHereCached(dstKey, s.clock.Now()) {
+ if _, err := c.br.Discard(int(packetLen)); err != nil {
+ return zpub, nil, err
+ }
+ s.packetsDroppedCachedNotHere.Add(1)
+ return dstKey, nil, nil
+ }
+
contents = make([]byte, packetLen)
- if _, err := io.ReadFull(br, contents); err != nil {
+ if _, err := io.ReadFull(c.br, contents); err != nil {
return zpub, nil, err
}
s.packetsRecv.Add(1)
@@ -1685,6 +1728,25 @@ func (c *sclient) presentFlags() derp.PeerPresentFlags {
return f
}
+// isNotHereCached reports whether dstKey is in the server's not-here
+// cache and the entry has not expired.
+func (s *Server) isNotHereCached(dstKey key.NodePublic, now time.Time) bool {
+ v, ok := s.notHereCache.Load(dstKey)
+ if !ok {
+ return false
+ }
+ if now.Sub(v.(time.Time)) > notHereCacheTTL {
+ s.notHereCache.Delete(dstKey)
+ return false
+ }
+ return true
+}
+
+// addNotHereCache records that dstKey is not connected to this server.
+func (s *Server) addNotHereCache(dstKey key.NodePublic, now time.Time) {
+ s.notHereCache.Store(dstKey, now)
+}
+
// peerConnState represents whether a peer is connected to the server
// or not.
type peerConnState struct {
@@ -2233,6 +2295,7 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("sent_pong", &s.sentPong)
m.Set("peer_gone_disconnected_frames", &s.peerGoneDisconnectedFrames)
m.Set("peer_gone_not_here_frames", &s.peerGoneNotHereFrames)
+ m.Set("packets_dropped_cached_not_here", &s.packetsDroppedCachedNotHere)
m.Set("packets_forwarded_out", &s.packetsForwardedOut)
m.Set("packets_forwarded_in", &s.packetsForwardedIn)
m.Set("multiforwarder_created", &s.multiForwarderCreated)
diff --git a/derp/derpserver/derpserver_test.go b/derp/derpserver/derpserver_test.go
index 7f956ba78..cb78b0a30 100644
--- a/derp/derpserver/derpserver_test.go
+++ b/derp/derpserver/derpserver_test.go
@@ -27,6 +27,7 @@ import (
"golang.org/x/time/rate"
"tailscale.com/derp"
"tailscale.com/derp/derpconst"
+ tsrate "tailscale.com/tstime/rate"
"tailscale.com/types/key"
"tailscale.com/types/logger"
)
@@ -680,12 +681,114 @@ func BenchmarkConcurrentStreams(b *testing.B) {
<-acceptDone
}
+// loopReader is an io.Reader that repeats data infinitely.
+type loopReader struct {
+ data []byte
+ off int
+}
+
+func (r *loopReader) Read(p []byte) (int, error) {
+ n := 0
+ for n < len(p) {
+ copied := copy(p[n:], r.data[r.off:])
+ n += copied
+ r.off += copied
+ if r.off >= len(r.data) {
+ r.off = 0
+ }
+ }
+ return n, nil
+}
+
func BenchmarkSendRecv(b *testing.B) {
for _, size := range []int{10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("msgsize=%d", size), func(b *testing.B) { benchmarkSendRecvSize(b, size) })
}
}
+
+// BenchmarkHandleFrameSendPacketAbsent benchmarks the server-side
+// handleFrameSendPacket path directly, bypassing TLS and TCP to
+// isolate server processing cost. It compares cache hits (same absent
+// destination) vs cache misses (rotating through many absent destinations).
+func BenchmarkHandleFrameSendPacketAbsent(b *testing.B) {
+ const payloadSize = 100
+ const frameSize = derp.KeyLen + payloadSize
+
+ setup := func(b *testing.B) *sclient {
+ b.Helper()
+ s := New(key.NewNode(), logger.Discard)
+ b.Cleanup(func() { s.Close() })
+ c := &sclient{
+ s: s,
+ key: key.NewNode().Public(),
+ logf: logger.Discard,
+ done: make(chan struct{}),
+ peerGone: make(chan peerGoneMsg, 100),
+ peerGoneLim: tsrate.NewLimiter(tsrate.Every(time.Second/50), 50),
+ }
+ return c
+ }
+
+ // buildFrames builds a byte buffer containing n frame payloads.
+ // If sameKey, all frames use the same destination key (cache hit
+ // after first). Otherwise, each frame uses a distinct key (cache miss).
+ buildFrames := func(n int, sameKey bool) []byte {
+ buf := make([]byte, n*frameSize)
+ var fixedKey []byte
+ if sameKey {
+ fixedKey = key.NewNode().Public().AppendTo(nil)
+ }
+ for i := range n {
+ off := i * frameSize
+ if sameKey {
+ copy(buf[off:], fixedKey)
+ } else {
+ k := key.NewNode().Public()
+ copy(buf[off:], k.AppendTo(nil))
+ }
+ }
+ return buf
+ }
+
+ // same_key: all packets to same absent peer (cache hit after first).
+ b.Run("same_key", func(b *testing.B) {
+ c := setup(b)
+ frame := buildFrames(1, true)
+ // Use an infinite reader that repeats the same frame.
+ c.br = bufio.NewReader(&loopReader{data: frame})
+
+ b.SetBytes(int64(payloadSize))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for range b.N {
+ if err := c.handleFrameSendPacket(derp.FrameSendPacket, frameSize); err != nil {
+ b.Fatal(err)
+ }
+ }
+ })
+
+ // unique_keys: each packet to a different absent peer (always cache miss).
+ // Uses a pool of 1000 keys cycling, each seen once per 1000 iterations.
+ // With 5s TTL the cache entries from earlier cycles are still valid,
+ // so this measures cache-miss-then-hit pattern.
+ b.Run("unique_keys", func(b *testing.B) {
+ const numKeys = 1000
+ c := setup(b)
+ pool := buildFrames(numKeys, false)
+ c.br = bufio.NewReader(&loopReader{data: pool})
+
+ b.SetBytes(int64(payloadSize))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for range b.N {
+ if err := c.handleFrameSendPacket(derp.FrameSendPacket, frameSize); err != nil {
+ b.Fatal(err)
+ }
+ }
+ })
+}
+
func benchmarkSendRecvSize(b *testing.B, packetSize int) {
serverPrivateKey := key.NewNode()
s := New(serverPrivateKey, logger.Discard)
@@ -973,3 +1076,102 @@ func BenchmarkSenderCardinalityOverhead(b *testing.B) {
}
})
}
+
+func TestNotHereCache(t *testing.T) {
+ s := New(key.NewNode(), t.Logf)
+ defer s.Close()
+
+ absentKey := key.NewNode().Public()
+ now := time.Now()
+
+ // Initially not cached.
+ if s.isNotHereCached(absentKey, now) {
+ t.Fatal("expected not cached initially")
+ }
+
+ // Add to cache.
+ s.addNotHereCache(absentKey, now)
+ if !s.isNotHereCached(absentKey, now) {
+ t.Fatal("expected cached after add")
+ }
+
+ // Still cached just before TTL expires.
+ if !s.isNotHereCached(absentKey, now.Add(notHereCacheTTL-time.Millisecond)) {
+ t.Fatal("expected cached just before TTL")
+ }
+
+ // Expired after TTL.
+ if s.isNotHereCached(absentKey, now.Add(notHereCacheTTL+time.Millisecond)) {
+ t.Fatal("expected expired after TTL")
+ }
+
+ // Re-add and verify invalidation on registerClient.
+ s.addNotHereCache(absentKey, time.Now())
+ if !s.isNotHereCached(absentKey, time.Now()) {
+ t.Fatal("expected cached after re-add")
+ }
+
+ c := &sclient{
+ s: s,
+ key: absentKey,
+ logf: logger.WithPrefix(t.Logf, "test: "),
+ done: make(chan struct{}),
+ peerGone: make(chan peerGoneMsg),
+ sendQueue: make(chan pkt, 1),
+ discoSendQueue: make(chan pkt, 1),
+ sendPongCh: make(chan [8]byte, 1),
+ }
+ s.registerClient(c)
+ defer s.unregisterClient(c)
+
+ if s.isNotHereCached(absentKey, time.Now()) {
+ t.Fatal("expected cache invalidated after registerClient")
+ }
+}
+
+func TestNotHereCacheRecvPacket(t *testing.T) {
+ s := New(key.NewNode(), t.Logf)
+ defer s.Close()
+
+ absentKey := key.NewNode().Public()
+
+ // Build a frame payload: 32-byte dest key + 100-byte payload.
+ payload := make([]byte, derp.KeyLen+100)
+ copy(payload[:derp.KeyLen], absentKey.AppendTo(nil))
+
+ c := &sclient{
+ s: s,
+ key: key.NewNode().Public(),
+ logf: logger.WithPrefix(t.Logf, "test: "),
+ done: make(chan struct{}),
+ peerGone: make(chan peerGoneMsg, 100),
+ peerGoneLim: tsrate.NewLimiter(tsrate.Every(time.Second/50), 50),
+ }
+
+ // First call: cache miss, should return contents.
+ c.br = bufio.NewReader(&loopReader{data: payload})
+ _, contents, err := s.recvPacket(c, uint32(len(payload)))
+ if err != nil {
+ t.Fatalf("recvPacket (miss): %v", err)
+ }
+ if contents == nil {
+ t.Fatal("expected non-nil contents on cache miss")
+ }
+
+ // Populate the cache as handleFrameSendPacket would.
+ s.addNotHereCache(absentKey, s.clock.Now())
+
+ // Second call: cache hit, should return nil contents.
+ _, contents, err = s.recvPacket(c, uint32(len(payload)))
+ if err != nil {
+ t.Fatalf("recvPacket (hit): %v", err)
+ }
+ if contents != nil {
+ t.Fatal("expected nil contents on cache hit")
+ }
+
+ // Verify metric incremented.
+ if got := s.packetsDroppedCachedNotHere.Value(); got != 1 {
+ t.Fatalf("packetsDroppedCachedNotHere = %d, want 1", got)
+ }
+}