diff options
Diffstat (limited to 'derp/derp_server.go')
| -rw-r--r-- | derp/derp_server.go | 81 |
1 files changed, 47 insertions, 34 deletions
diff --git a/derp/derp_server.go b/derp/derp_server.go index 8066b7f19..ed82a3f30 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -48,6 +48,7 @@ import ( "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/util/ctxkey" + "tailscale.com/util/lru" "tailscale.com/util/mak" "tailscale.com/util/set" "tailscale.com/util/slicesx" @@ -178,11 +179,14 @@ type Server struct { verifyClientsURL string verifyClientsURLFailOpen bool - mu sync.Mutex - closed bool - netConns map[Conn]chan struct{} // chan is closed when conn closes - clients map[key.NodePublic]*clientSet - watchers set.Set[*sclient] // mesh peers + mu sync.Mutex + closed bool + flow map[flowKey]*flow + flows []*flow // slice of values of flow map + flowCleanIndex int + netConns map[Conn]chan struct{} // chan is closed when conn closes + clients map[key.NodePublic]*clientSet + watchers set.Set[*sclient] // mesh peers // clientsMesh tracks all clients in the cluster, both locally // and to mesh peers. If the value is nil, that means the // peer is only local (and thus in the clients Map, but not @@ -368,6 +372,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { packetsDroppedType: metrics.LabelMap{Label: "type"}, clients: map[key.NodePublic]*clientSet{}, clientsMesh: map[key.NodePublic]PacketForwarder{}, + flow: map[flowKey]*flow{}, netConns: map[Conn]chan struct{}{}, memSys0: ms.Sys, watchers: set.Set[*sclient]{}, @@ -901,9 +906,20 @@ func (s *Server) debugLogf(format string, v ...any) { } } +// onRunLoopDone is called when the run loop is done +// to clean up. +// +// It must only be called from the [slient.run] goroutine. +func (c *sclient) onRunLoopDone() { + c.flows.ForEach(func(k key.NodePublic, peer flowAndClientSet) { + peer.f.ref.Add(-1) + }) +} + // run serves the client until there's an error. // If the client hangs up or the server is closed, run returns nil, otherwise run returns an error. func (c *sclient) run(ctx context.Context) error { + defer c.onRunLoopDone() // Launch sender, but don't return from run until sender goroutine is done. var grp errgroup.Group sendCtx, cancelSender := context.WithCancel(ctx) @@ -1066,6 +1082,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { var dst *sclient s.mu.Lock() + flo := s.getMakeFlowLocked(srcKey, dstKey) if set, ok := s.clients[dstKey]; ok { dstLen = set.Len() dst = set.activeClient.Load() @@ -1088,7 +1105,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { return c.sendPkt(dst, pkt{ bs: contents, enqueuedAt: c.s.clock.Now(), - src: srcKey, + flow: flo, }) } @@ -1101,22 +1118,13 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { return fmt.Errorf("client %v: recvPacket: %v", c.key, err) } - var fwd PacketForwarder - var dstLen int - var dst *sclient - - s.mu.Lock() - if set, ok := s.clients[dstKey]; ok { - dstLen = set.Len() - dst = set.activeClient.Load() - } - if dst == nil && dstLen < 1 { - fwd = s.clientsMesh[dstKey] - } - s.mu.Unlock() + flo, dst, fwd := c.lookupDest(dstKey) + flo.noteActivity() if dst == nil { if fwd != nil { + flo.pktSendRegion.Add(1) + flo.byteSendRegion.Add(1) s.packetsForwardedOut.Add(1) err := fwd.ForwardPacket(c.key, dstKey, contents) c.debugLogf("SendPacket for %s, forwarding via %s: %v", dstKey.ShortString(), fwd, err) @@ -1126,22 +1134,22 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } return nil } + flo.dropUnknownDest.Add(1) reason := dropReasonUnknownDest - if dstLen > 1 { - reason = dropReasonDupClient - } else { - c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) - } + c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) s.recordDrop(contents, c.key, dstKey, reason) c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason) return nil } c.debugLogf("SendPacket for %s, sending directly", dstKey.ShortString()) + flo.pktSendLocal.Add(1) + flo.byteSendLocal.Add(1) + p := pkt{ bs: contents, enqueuedAt: c.s.clock.Now(), - src: c.key, + flow: flo, } return c.sendPkt(dst, p) } @@ -1189,6 +1197,7 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r } func (c *sclient) sendPkt(dst *sclient, p pkt) error { + // TODO(bradfitz): bump metrics on p.flow s := c.s dstKey := dst.key @@ -1550,6 +1559,7 @@ type sclient struct { br *bufio.Reader connectedAt time.Time preferred bool + flows lru.Cache[key.NodePublic, flowAndClientSet] // keyed by dest // Owned by sendLoop, not thread-safe. sawSrc map[key.NodePublic]set.Handle @@ -1605,8 +1615,12 @@ type pkt struct { // The memory is owned by pkt. bs []byte - // src is the who's the sender of the packet. - src key.NodePublic + // flow is the flow stats from the src to the dest. + flow *flow +} + +func (p pkt) src() key.NodePublic { + return p.flow.flowKey.Value().src } // peerGoneMsg is a request to write a peerGone frame to an sclient @@ -1677,14 +1691,13 @@ func (c *sclient) onSendLoopDone() { for { select { case pkt := <-c.sendQueue: - c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) + c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected) case pkt := <-c.discoSendQueue: - c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) + c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected) default: return } } - } func (c *sclient) sendLoop(ctx context.Context) error { @@ -1713,11 +1726,11 @@ func (c *sclient) sendLoop(ctx context.Context) error { werr = c.sendMeshUpdates() continue case msg := <-c.sendQueue: - werr = c.sendPacket(msg.src, msg.bs) + werr = c.sendPacket(msg.src(), msg.bs) c.recordQueueTime(msg.enqueuedAt) continue case msg := <-c.discoSendQueue: - werr = c.sendPacket(msg.src, msg.bs) + werr = c.sendPacket(msg.src(), msg.bs) c.recordQueueTime(msg.enqueuedAt) continue case msg := <-c.sendPongCh: @@ -1747,10 +1760,10 @@ func (c *sclient) sendLoop(ctx context.Context) error { case <-c.meshUpdate: werr = c.sendMeshUpdates() case msg := <-c.sendQueue: - werr = c.sendPacket(msg.src, msg.bs) + werr = c.sendPacket(msg.src(), msg.bs) c.recordQueueTime(msg.enqueuedAt) case msg := <-c.discoSendQueue: - werr = c.sendPacket(msg.src, msg.bs) + werr = c.sendPacket(msg.src(), msg.bs) c.recordQueueTime(msg.enqueuedAt) case msg := <-c.sendPongCh: werr = c.sendPong(msg) |
