summaryrefslogtreecommitdiffhomepage
path: root/derp/derp_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'derp/derp_server.go')
-rw-r--r--derp/derp_server.go81
1 files changed, 47 insertions, 34 deletions
diff --git a/derp/derp_server.go b/derp/derp_server.go
index 8066b7f19..ed82a3f30 100644
--- a/derp/derp_server.go
+++ b/derp/derp_server.go
@@ -48,6 +48,7 @@ import (
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/util/ctxkey"
+ "tailscale.com/util/lru"
"tailscale.com/util/mak"
"tailscale.com/util/set"
"tailscale.com/util/slicesx"
@@ -178,11 +179,14 @@ type Server struct {
verifyClientsURL string
verifyClientsURLFailOpen bool
- mu sync.Mutex
- closed bool
- netConns map[Conn]chan struct{} // chan is closed when conn closes
- clients map[key.NodePublic]*clientSet
- watchers set.Set[*sclient] // mesh peers
+ mu sync.Mutex
+ closed bool
+ flow map[flowKey]*flow
+ flows []*flow // slice of values of flow map
+ flowCleanIndex int
+ netConns map[Conn]chan struct{} // chan is closed when conn closes
+ clients map[key.NodePublic]*clientSet
+ watchers set.Set[*sclient] // mesh peers
// clientsMesh tracks all clients in the cluster, both locally
// and to mesh peers. If the value is nil, that means the
// peer is only local (and thus in the clients Map, but not
@@ -368,6 +372,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
packetsDroppedType: metrics.LabelMap{Label: "type"},
clients: map[key.NodePublic]*clientSet{},
clientsMesh: map[key.NodePublic]PacketForwarder{},
+ flow: map[flowKey]*flow{},
netConns: map[Conn]chan struct{}{},
memSys0: ms.Sys,
watchers: set.Set[*sclient]{},
@@ -901,9 +906,20 @@ func (s *Server) debugLogf(format string, v ...any) {
}
}
+// onRunLoopDone is called when the run loop is done
+// to clean up.
+//
+// It must only be called from the [slient.run] goroutine.
+func (c *sclient) onRunLoopDone() {
+ c.flows.ForEach(func(k key.NodePublic, peer flowAndClientSet) {
+ peer.f.ref.Add(-1)
+ })
+}
+
// run serves the client until there's an error.
// If the client hangs up or the server is closed, run returns nil, otherwise run returns an error.
func (c *sclient) run(ctx context.Context) error {
+ defer c.onRunLoopDone()
// Launch sender, but don't return from run until sender goroutine is done.
var grp errgroup.Group
sendCtx, cancelSender := context.WithCancel(ctx)
@@ -1066,6 +1082,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
var dst *sclient
s.mu.Lock()
+ flo := s.getMakeFlowLocked(srcKey, dstKey)
if set, ok := s.clients[dstKey]; ok {
dstLen = set.Len()
dst = set.activeClient.Load()
@@ -1088,7 +1105,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
return c.sendPkt(dst, pkt{
bs: contents,
enqueuedAt: c.s.clock.Now(),
- src: srcKey,
+ flow: flo,
})
}
@@ -1101,22 +1118,13 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
}
- var fwd PacketForwarder
- var dstLen int
- var dst *sclient
-
- s.mu.Lock()
- if set, ok := s.clients[dstKey]; ok {
- dstLen = set.Len()
- dst = set.activeClient.Load()
- }
- if dst == nil && dstLen < 1 {
- fwd = s.clientsMesh[dstKey]
- }
- s.mu.Unlock()
+ flo, dst, fwd := c.lookupDest(dstKey)
+ flo.noteActivity()
if dst == nil {
if fwd != nil {
+ flo.pktSendRegion.Add(1)
+ flo.byteSendRegion.Add(1)
s.packetsForwardedOut.Add(1)
err := fwd.ForwardPacket(c.key, dstKey, contents)
c.debugLogf("SendPacket for %s, forwarding via %s: %v", dstKey.ShortString(), fwd, err)
@@ -1126,22 +1134,22 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
}
return nil
}
+ flo.dropUnknownDest.Add(1)
reason := dropReasonUnknownDest
- if dstLen > 1 {
- reason = dropReasonDupClient
- } else {
- c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
- }
+ c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
s.recordDrop(contents, c.key, dstKey, reason)
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
return nil
}
c.debugLogf("SendPacket for %s, sending directly", dstKey.ShortString())
+ flo.pktSendLocal.Add(1)
+ flo.byteSendLocal.Add(1)
+
p := pkt{
bs: contents,
enqueuedAt: c.s.clock.Now(),
- src: c.key,
+ flow: flo,
}
return c.sendPkt(dst, p)
}
@@ -1189,6 +1197,7 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r
}
func (c *sclient) sendPkt(dst *sclient, p pkt) error {
+ // TODO(bradfitz): bump metrics on p.flow
s := c.s
dstKey := dst.key
@@ -1550,6 +1559,7 @@ type sclient struct {
br *bufio.Reader
connectedAt time.Time
preferred bool
+ flows lru.Cache[key.NodePublic, flowAndClientSet] // keyed by dest
// Owned by sendLoop, not thread-safe.
sawSrc map[key.NodePublic]set.Handle
@@ -1605,8 +1615,12 @@ type pkt struct {
// The memory is owned by pkt.
bs []byte
- // src is the who's the sender of the packet.
- src key.NodePublic
+ // flow is the flow stats from the src to the dest.
+ flow *flow
+}
+
+func (p pkt) src() key.NodePublic {
+ return p.flow.flowKey.Value().src
}
// peerGoneMsg is a request to write a peerGone frame to an sclient
@@ -1677,14 +1691,13 @@ func (c *sclient) onSendLoopDone() {
for {
select {
case pkt := <-c.sendQueue:
- c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
+ c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected)
case pkt := <-c.discoSendQueue:
- c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
+ c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected)
default:
return
}
}
-
}
func (c *sclient) sendLoop(ctx context.Context) error {
@@ -1713,11 +1726,11 @@ func (c *sclient) sendLoop(ctx context.Context) error {
werr = c.sendMeshUpdates()
continue
case msg := <-c.sendQueue:
- werr = c.sendPacket(msg.src, msg.bs)
+ werr = c.sendPacket(msg.src(), msg.bs)
c.recordQueueTime(msg.enqueuedAt)
continue
case msg := <-c.discoSendQueue:
- werr = c.sendPacket(msg.src, msg.bs)
+ werr = c.sendPacket(msg.src(), msg.bs)
c.recordQueueTime(msg.enqueuedAt)
continue
case msg := <-c.sendPongCh:
@@ -1747,10 +1760,10 @@ func (c *sclient) sendLoop(ctx context.Context) error {
case <-c.meshUpdate:
werr = c.sendMeshUpdates()
case msg := <-c.sendQueue:
- werr = c.sendPacket(msg.src, msg.bs)
+ werr = c.sendPacket(msg.src(), msg.bs)
c.recordQueueTime(msg.enqueuedAt)
case msg := <-c.discoSendQueue:
- werr = c.sendPacket(msg.src, msg.bs)
+ werr = c.sendPacket(msg.src(), msg.bs)
c.recordQueueTime(msg.enqueuedAt)
case msg := <-c.sendPongCh:
werr = c.sendPong(msg)