summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorBrad Fitzpatrick <bradfitz@tailscale.com>2024-09-15 08:56:16 -0700
committerPatrick O'Doherty <patrick@tailscale.com>2024-12-11 13:27:58 -0800
commitef68b4c0045936c88fa09bc7bcd1282e3b0d176a (patch)
tree6a8e21b7f3570bb799a163e276175dc209db7e98
parent6e552f66a0289f6309477fb024019b62a251da16 (diff)
downloadtailscale-patrickod/bradtfitz-flow-rebased.tar.xz
tailscale-patrickod/bradtfitz-flow-rebased.zip
derp: start adding flow tracking statspatrickod/bradtfitz-flow-rebased
This starts adding flow tracking stats, without exposing them anywhere yet. Flow structs are created as needed and metrics are bumped, and benchmarks show no change in performance. Updates #3560 Change-Id: I376187a8452ec92d49effcbf48a6fb4f4d787b8a Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
-rw-r--r--derp/derp_server.go81
-rw-r--r--derp/flow.go182
-rw-r--r--derp/flow_test.go52
-rw-r--r--util/lru/lru.go9
4 files changed, 290 insertions, 34 deletions
diff --git a/derp/derp_server.go b/derp/derp_server.go
index 8066b7f19..ed82a3f30 100644
--- a/derp/derp_server.go
+++ b/derp/derp_server.go
@@ -48,6 +48,7 @@ import (
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/util/ctxkey"
+ "tailscale.com/util/lru"
"tailscale.com/util/mak"
"tailscale.com/util/set"
"tailscale.com/util/slicesx"
@@ -178,11 +179,14 @@ type Server struct {
verifyClientsURL string
verifyClientsURLFailOpen bool
- mu sync.Mutex
- closed bool
- netConns map[Conn]chan struct{} // chan is closed when conn closes
- clients map[key.NodePublic]*clientSet
- watchers set.Set[*sclient] // mesh peers
+ mu sync.Mutex
+ closed bool
+ flow map[flowKey]*flow
+ flows []*flow // slice of values of flow map
+ flowCleanIndex int
+ netConns map[Conn]chan struct{} // chan is closed when conn closes
+ clients map[key.NodePublic]*clientSet
+ watchers set.Set[*sclient] // mesh peers
// clientsMesh tracks all clients in the cluster, both locally
// and to mesh peers. If the value is nil, that means the
// peer is only local (and thus in the clients Map, but not
@@ -368,6 +372,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
packetsDroppedType: metrics.LabelMap{Label: "type"},
clients: map[key.NodePublic]*clientSet{},
clientsMesh: map[key.NodePublic]PacketForwarder{},
+ flow: map[flowKey]*flow{},
netConns: map[Conn]chan struct{}{},
memSys0: ms.Sys,
watchers: set.Set[*sclient]{},
@@ -901,9 +906,20 @@ func (s *Server) debugLogf(format string, v ...any) {
}
}
+// onRunLoopDone is called when the run loop is done
+// to clean up.
+//
+// It must only be called from the [slient.run] goroutine.
+func (c *sclient) onRunLoopDone() {
+ c.flows.ForEach(func(k key.NodePublic, peer flowAndClientSet) {
+ peer.f.ref.Add(-1)
+ })
+}
+
// run serves the client until there's an error.
// If the client hangs up or the server is closed, run returns nil, otherwise run returns an error.
func (c *sclient) run(ctx context.Context) error {
+ defer c.onRunLoopDone()
// Launch sender, but don't return from run until sender goroutine is done.
var grp errgroup.Group
sendCtx, cancelSender := context.WithCancel(ctx)
@@ -1066,6 +1082,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
var dst *sclient
s.mu.Lock()
+ flo := s.getMakeFlowLocked(srcKey, dstKey)
if set, ok := s.clients[dstKey]; ok {
dstLen = set.Len()
dst = set.activeClient.Load()
@@ -1088,7 +1105,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
return c.sendPkt(dst, pkt{
bs: contents,
enqueuedAt: c.s.clock.Now(),
- src: srcKey,
+ flow: flo,
})
}
@@ -1101,22 +1118,13 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
}
- var fwd PacketForwarder
- var dstLen int
- var dst *sclient
-
- s.mu.Lock()
- if set, ok := s.clients[dstKey]; ok {
- dstLen = set.Len()
- dst = set.activeClient.Load()
- }
- if dst == nil && dstLen < 1 {
- fwd = s.clientsMesh[dstKey]
- }
- s.mu.Unlock()
+ flo, dst, fwd := c.lookupDest(dstKey)
+ flo.noteActivity()
if dst == nil {
if fwd != nil {
+ flo.pktSendRegion.Add(1)
+ flo.byteSendRegion.Add(1)
s.packetsForwardedOut.Add(1)
err := fwd.ForwardPacket(c.key, dstKey, contents)
c.debugLogf("SendPacket for %s, forwarding via %s: %v", dstKey.ShortString(), fwd, err)
@@ -1126,22 +1134,22 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
}
return nil
}
+ flo.dropUnknownDest.Add(1)
reason := dropReasonUnknownDest
- if dstLen > 1 {
- reason = dropReasonDupClient
- } else {
- c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
- }
+ c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
s.recordDrop(contents, c.key, dstKey, reason)
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
return nil
}
c.debugLogf("SendPacket for %s, sending directly", dstKey.ShortString())
+ flo.pktSendLocal.Add(1)
+ flo.byteSendLocal.Add(1)
+
p := pkt{
bs: contents,
enqueuedAt: c.s.clock.Now(),
- src: c.key,
+ flow: flo,
}
return c.sendPkt(dst, p)
}
@@ -1189,6 +1197,7 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r
}
func (c *sclient) sendPkt(dst *sclient, p pkt) error {
+ // TODO(bradfitz): bump metrics on p.flow
s := c.s
dstKey := dst.key
@@ -1550,6 +1559,7 @@ type sclient struct {
br *bufio.Reader
connectedAt time.Time
preferred bool
+ flows lru.Cache[key.NodePublic, flowAndClientSet] // keyed by dest
// Owned by sendLoop, not thread-safe.
sawSrc map[key.NodePublic]set.Handle
@@ -1605,8 +1615,12 @@ type pkt struct {
// The memory is owned by pkt.
bs []byte
- // src is the who's the sender of the packet.
- src key.NodePublic
+ // flow is the flow stats from the src to the dest.
+ flow *flow
+}
+
+func (p pkt) src() key.NodePublic {
+ return p.flow.flowKey.Value().src
}
// peerGoneMsg is a request to write a peerGone frame to an sclient
@@ -1677,14 +1691,13 @@ func (c *sclient) onSendLoopDone() {
for {
select {
case pkt := <-c.sendQueue:
- c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
+ c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected)
case pkt := <-c.discoSendQueue:
- c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
+ c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected)
default:
return
}
}
-
}
func (c *sclient) sendLoop(ctx context.Context) error {
@@ -1713,11 +1726,11 @@ func (c *sclient) sendLoop(ctx context.Context) error {
werr = c.sendMeshUpdates()
continue
case msg := <-c.sendQueue:
- werr = c.sendPacket(msg.src, msg.bs)
+ werr = c.sendPacket(msg.src(), msg.bs)
c.recordQueueTime(msg.enqueuedAt)
continue
case msg := <-c.discoSendQueue:
- werr = c.sendPacket(msg.src, msg.bs)
+ werr = c.sendPacket(msg.src(), msg.bs)
c.recordQueueTime(msg.enqueuedAt)
continue
case msg := <-c.sendPongCh:
@@ -1747,10 +1760,10 @@ func (c *sclient) sendLoop(ctx context.Context) error {
case <-c.meshUpdate:
werr = c.sendMeshUpdates()
case msg := <-c.sendQueue:
- werr = c.sendPacket(msg.src, msg.bs)
+ werr = c.sendPacket(msg.src(), msg.bs)
c.recordQueueTime(msg.enqueuedAt)
case msg := <-c.discoSendQueue:
- werr = c.sendPacket(msg.src, msg.bs)
+ werr = c.sendPacket(msg.src(), msg.bs)
c.recordQueueTime(msg.enqueuedAt)
case msg := <-c.sendPongCh:
werr = c.sendPong(msg)
diff --git a/derp/flow.go b/derp/flow.go
new file mode 100644
index 000000000..95f81cd6d
--- /dev/null
+++ b/derp/flow.go
@@ -0,0 +1,182 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package derp
+
+import (
+ "sync/atomic"
+ "time"
+ "unique"
+
+ "tailscale.com/types/key"
+)
+
+type flowKey struct {
+ src, dst key.NodePublic
+}
+
+// flow tracks metadata about a directional flow of packets from a source
+// node to a destination node. The public keys of the src is known
+// by the caller.
+type flow struct {
+ createdUnixNano int64 // int64 instead of time.Time to keep flow smaller
+ index int // index in Server.flows slice or -1 if not; guarded by Server.mu
+ flowKey unique.Handle[flowKey] // TODO: make this a unique handle of two unique handles for each NodePublic?
+
+ roughActivityUnixTime atomic.Int64 // unix sec of recent activity, updated at most once a minute
+ pktSendRegion atomic.Int64
+ byteSendRegion atomic.Int64
+ pktSendLocal atomic.Int64
+ byteSendLocal atomic.Int64
+ dropUnknownDest atomic.Int64 // no local or region client for dest
+ dropGone atomic.Int64
+
+ // ref is the reference count of things (*Server, *sclient) holding on
+ // to this flow. As of 2024-09-18 it is currently only informational
+ // and not used for anything. The Server adds/removes a ref count when
+ // it's remove from its map and each 0, 1 or more sclients for a given
+ // recently active flow also add/remove a ref count.
+ //
+ // This might be used in the future as an alternate Server.flow eviction
+ // strategy but for now it's just a debug tool. We do want to keep flow
+ // stats surviving a brief client disconnections, so we do want Server
+ // to keep at least a momentary ref count alive.
+ ref atomic.Int64
+}
+
+// noteActivity updates f.recentActivityUnixTime if it's been
+// more than a minute.
+func (f *flow) noteActivity() {
+ now := time.Now().Unix()
+ if now-f.roughActivityUnixTime.Load() > 60 {
+ f.roughActivityUnixTime.Store(now)
+ }
+}
+
+// getMakeFlow either gets or makes a new flow for the given source and
+// destination nodes.
+func (s *Server) getMakeFlow(src, dst key.NodePublic) *flow {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.getMakeFlowLocked(src, dst)
+}
+
+func (s *Server) getMakeFlowLocked(src, dst key.NodePublic) *flow {
+ k := flowKey{src, dst}
+ f, ok := s.flow[k]
+ if ok {
+ return f
+ }
+ now := time.Now()
+ f = &flow{
+ createdUnixNano: now.UnixNano(),
+ index: len(s.flows),
+ flowKey: unique.Make(k),
+ }
+ f.roughActivityUnixTime.Store(now.Unix())
+ f.ref.Add(1) // for Server's ref in the s.flows map itself
+
+ // As penance for the one flow we're about to add to the map and slice
+ // above, check two old flows for removal. We roll around and around the
+ // flows slice, so this is a simple way to eventually check everything for
+ // removal before we double in size.
+ for range 2 {
+ s.maybeCleanOldFlowLocked()
+ }
+
+ s.flow[k] = f
+ s.flows = append(s.flows, f)
+
+ return f
+}
+
+func (s *Server) maybeCleanOldFlowLocked() {
+ if len(s.flows) == 0 {
+ return
+ }
+ s.flowCleanIndex++
+ if s.flowCleanIndex >= len(s.flows) {
+ s.flowCleanIndex = 0
+ }
+ f := s.flows[s.flowCleanIndex]
+
+ now := time.Now().Unix()
+ ageSec := now - f.roughActivityUnixTime.Load()
+ if ageSec > 3600 {
+ // No activity in an hour. Remove it.
+ delete(s.flow, f.flowKey.Value())
+ holeIdx := f.index
+ s.flows[holeIdx] = s.flows[len(s.flows)-1]
+ s.flows[holeIdx].index = holeIdx
+ s.flows = s.flows[:len(s.flows)-1]
+ f.ref.Add(-1)
+ return
+ }
+}
+
+type flowAndClientSet struct {
+ f *flow // always non-nil
+ cs *clientSet // may be nil if peer not connected/known
+}
+
+// lookupDest returns the flow (always non-nil) and sclient and/or
+// PacketForwarder (at least one of which will be nil, possibly both) for the
+// given destination node.
+
+// It must only be called from the [sclient.run] goroutine.
+func (c *sclient) lookupDest(dst key.NodePublic) (_ *flow, _ *sclient, fwd PacketForwarder) {
+ peer, ok := c.flows.GetOk(dst)
+ if ok && peer.cs != nil {
+ if c := peer.cs.activeClient.Load(); c != nil {
+ // Common case for hot flows within the same node: we know the
+ // clientSet and no mutex is needed.
+ return peer.f, c, nil
+ }
+ }
+
+ if peer.f == nil {
+ peer.f = c.s.getMakeFlow(c.key, dst)
+ peer.f.ref.Add(1)
+ // At least store the flow in the map, even if we don't find the
+ // clientSet later. In theory we could coallesce this map write with a
+ // possible one later, but they should be rare and uncontended so we
+ // don't care as of 2024-09-18.
+ c.flows.Set(dst, peer)
+ c.maybeCleanFlows()
+ }
+
+ srv := c.s
+ srv.mu.Lock()
+ set, ok := srv.clients[dst]
+ if ok {
+ if c := set.activeClient.Load(); c != nil {
+ srv.mu.Unlock()
+ peer.cs = set
+ c.flows.Set(dst, peer)
+ c.maybeCleanFlows()
+ return peer.f, c, nil
+ }
+ fwd = srv.clientsMesh[dst]
+ }
+ srv.mu.Unlock()
+ return peer.f, nil, fwd // fwd may be nil too
+}
+
+// maybeCleanFlows cleans the oldest element from the client flows cache if
+// it's too big.
+//
+// It must only be called from the [sclient.run] goroutine.
+func (c *sclient) maybeCleanFlows() {
+ const maxClientFlowTrack = 100
+ if c.flows.Len() <= maxClientFlowTrack {
+ return
+ }
+
+ oldest, _ := c.flows.OldestKey()
+ facs, ok := c.flows.PeekOk(oldest)
+ if !ok {
+ panic("lookupDest: OldestKey lied")
+ }
+ facs.f.ref.Add(-1)
+ c.flows.Delete(oldest)
+}
diff --git a/derp/flow_test.go b/derp/flow_test.go
new file mode 100644
index 000000000..b3a740bc3
--- /dev/null
+++ b/derp/flow_test.go
@@ -0,0 +1,52 @@
+package derp
+
+import (
+ "testing"
+ "unique"
+
+ "go4.org/mem"
+ "tailscale.com/types/key"
+)
+
+func BenchmarkUnique(b *testing.B) {
+ var keys [100]key.NodePublic
+ for i := range keys {
+ keys[i] = key.NodePublicFromRaw32(mem.B([]byte{31: byte(i)}))
+ }
+ b.Run("raw", func(b *testing.B) {
+ m := map[flowKey]bool{}
+ for range b.N {
+ for _, k := range keys {
+ key := flowKey{k, k}
+ if _, ok := m[key]; !ok {
+ m[key] = true
+ }
+ }
+ }
+ })
+ b.Run("unique-tightmake", func(b *testing.B) {
+ m := map[unique.Handle[flowKey]]bool{}
+ for range b.N {
+ for _, k := range keys {
+ key := unique.Make(flowKey{k, k})
+ if _, ok := m[key]; !ok {
+ m[key] = true
+ }
+ }
+ }
+ })
+ b.Run("unique-makeonce", func(b *testing.B) {
+ m := map[unique.Handle[flowKey]]bool{}
+ ukeys := make([]unique.Handle[flowKey], len(keys))
+ for i, k := range keys {
+ ukeys[i] = unique.Make(flowKey{k, k})
+ }
+ for range b.N {
+ for _, key := range ukeys {
+ if _, ok := m[key]; !ok {
+ m[key] = true
+ }
+ }
+ }
+ })
+}
diff --git a/util/lru/lru.go b/util/lru/lru.go
index 8e4dd417b..d4e836cf1 100644
--- a/util/lru/lru.go
+++ b/util/lru/lru.go
@@ -133,6 +133,15 @@ func (c *Cache[K, V]) DeleteOldest() {
}
}
+// OldestKey returns the oldest key, without bumping it to the head.
+// If the cache is empty, it returns ok false.
+func (c *Cache[K, V]) OldestKey() (key K, ok bool) {
+ if c.head == nil {
+ return key, false
+ }
+ return c.head.prev.key, true
+}
+
// Len returns the number of items in the cache.
func (c *Cache[K, V]) Len() int { return len(c.lookup) }