summaryrefslogtreecommitdiffhomepage
path: root/derp/derp_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'derp/derp_server.go')
-rw-r--r--derp/derp_server.go254
1 files changed, 209 insertions, 45 deletions
diff --git a/derp/derp_server.go b/derp/derp_server.go
index 08fd280a9..df5a1b2e3 100644
--- a/derp/derp_server.go
+++ b/derp/derp_server.go
@@ -52,6 +52,8 @@ import (
"tailscale.com/util/set"
"tailscale.com/util/slicesx"
"tailscale.com/version"
+
+ "github.com/prometheus/client_golang/prometheus"
)
// verboseDropKeys is the set of destination public keys that should
@@ -139,37 +141,39 @@ type Server struct {
debug bool
// Counters:
- packetsSent, bytesSent expvar.Int
- packetsRecv, bytesRecv expvar.Int
- packetsRecvByKind metrics.LabelMap
- packetsRecvDisco *expvar.Int
- packetsRecvOther *expvar.Int
- _ align64
- packetsForwardedOut expvar.Int
- packetsForwardedIn expvar.Int
- peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent
- peerGoneNotHereFrames expvar.Int // number of peer not here frames sent
- gotPing expvar.Int // number of ping frames from client
- sentPong expvar.Int // number of pong frames enqueued to client
- accepts expvar.Int
- curClients expvar.Int
- curClientsNotIdeal expvar.Int
- curHomeClients expvar.Int // ones with preferred
- dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
- dupClientConns expvar.Int // current number of connections sharing a public key
- dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
- unknownFrames expvar.Int
- homeMovesIn expvar.Int // established clients announce home server moves in
- homeMovesOut expvar.Int // established clients announce home server moves out
- multiForwarderCreated expvar.Int
- multiForwarderDeleted expvar.Int
- removePktForwardOther expvar.Int
- sclientWriteTimeouts expvar.Int
- avgQueueDuration *uint64 // In milliseconds; accessed atomically
- tcpRtt metrics.LabelMap // histogram
- meshUpdateBatchSize *metrics.Histogram
- meshUpdateLoopCount *metrics.Histogram
- bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
+ packetsSent, bytesSent expvar.Int
+ packetsRecv, bytesRecv expvar.Int
+ packetsRecvByKind metrics.LabelMap
+ packetsRecvDisco *expvar.Int
+ packetsRecvOther *expvar.Int
+ _ align64
+ packetsForwardedOut expvar.Int
+ packetsForwardedIn expvar.Int
+ peerGoneDisconnectedFrames expvar.Int // number of peer disconnected frames sent
+ peerGoneNotHereFrames expvar.Int // number of peer not here frames sent
+ gotPing expvar.Int // number of ping frames from client
+ sentPong expvar.Int // number of pong frames enqueued to client
+ accepts expvar.Int
+ curClients expvar.Int
+ curClientsNotIdeal expvar.Int
+ curHomeClients expvar.Int // ones with preferred
+ dupClientKeys expvar.Int // current number of public keys we have 2+ connections for
+ dupClientConns expvar.Int // current number of connections sharing a public key
+ dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed
+ unknownFrames expvar.Int
+ homeMovesIn expvar.Int // established clients announce home server moves in
+ homeMovesOut expvar.Int // established clients announce home server moves out
+ multiForwarderCreated expvar.Int
+ multiForwarderDeleted expvar.Int
+ removePktForwardOther expvar.Int
+ sclientWriteTimeouts expvar.Int
+ avgQueueDuration *uint64 // In milliseconds; accessed atomically
+ tcpRtt metrics.LabelMap // histogram
+ meshUpdateBatchSize *metrics.Histogram
+ meshUpdateLoopCount *metrics.Histogram
+ bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
+ packetsRecvDropRateByClient *prometheus.HistogramVec
+ packetsSentDropRateByClient *prometheus.HistogramVec
// verifyClientsLocalTailscaled only accepts client connections to the DERP
// server if the clientKey is a known peer in the network, as specified by a
@@ -381,8 +385,18 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}),
meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}),
bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}),
- keyOfAddr: map[netip.AddrPort]key.NodePublic{},
- clock: tstime.StdClock{},
+ packetsRecvDropRateByClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Name: "clients_by_packet_loss_rate_recv",
+ Help: "Histogram counting clients by the packet loss rate of packets received from those clients, by reason and kind",
+ Buckets: []float64{0.001, .005, .01, .025, .05, .1, .25, .5, 1},
+ }, []string{"kind", "reason"}),
+ packetsSentDropRateByClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Name: "clients_by_packet_loss_rate_send",
+ Help: "Histogram counting clients by the packet loss rate of packets sent to those clients, by reason and kind",
+ Buckets: []float64{0.001, .005, .01, .025, .05, .1, .25, .5, 1},
+ }, []string{"kind", "reason"}),
+ keyOfAddr: map[netip.AddrPort]key.NodePublic{},
+ clock: tstime.StdClock{},
}
s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get(string(packetKindDisco))
@@ -391,6 +405,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
genPacketsDroppedCounters()
s.perClientSendQueueDepth = getPerClientSendQueueDepth()
+
+ go s.collectDropsByClient()
return s
}
@@ -1105,7 +1121,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
} else {
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
}
- s.recordDrop(contents, srcKey, dstKey, reason)
+ s.recordDrop(c, dst, contents, srcKey, dstKey, reason)
return nil
}
@@ -1122,7 +1138,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
s := c.s
- dstKey, contents, err := s.recvPacket(c.br, fl)
+ dstKey, contents, err := c.recvPacket(fl)
if err != nil {
return fmt.Errorf("client %v: recvPacket: %v", c.key, err)
}
@@ -1158,7 +1174,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
} else {
c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere)
}
- s.recordDrop(contents, c.key, dstKey, reason)
+ s.recordDrop(c, dst, contents, c.key, dstKey, reason)
c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason)
return nil
}
@@ -1196,16 +1212,108 @@ const (
dropReasonDupClient dropReason = "dup_client" // the public key is connected 2+ times (active/active, fighting)
)
-func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) {
+type dropsByReason struct {
+ total float64
+ unknownDest float64
+ unknownDestOnFwd float64
+ goneDisconnected float64
+ queueHead float64
+ queueTail float64
+ writeError float64
+ dupClient float64
+}
+
+func (d *dropsByReason) recordDrop(reason dropReason) {
+ d.total += 1
+ switch reason {
+ case dropReasonUnknownDest:
+ d.unknownDest += 1
+ case dropReasonUnknownDestOnFwd:
+ d.unknownDestOnFwd += 1
+ case dropReasonGoneDisconnected:
+ d.goneDisconnected += 1
+ case dropReasonQueueHead:
+ d.queueHead += 1
+ case dropReasonQueueTail:
+ d.queueTail += 1
+ case dropReasonWriteError:
+ d.writeError += 1
+ case dropReasonDupClient:
+ d.dupClient += 1
+ }
+}
+
+func (d *dropsByReason) reset() {
+ d.total = 0
+ d.unknownDest = 0
+ d.unknownDestOnFwd = 0
+ d.goneDisconnected = 0
+ d.queueHead = 0
+ d.queueTail = 0
+ d.writeError = 0
+ d.dupClient = 0
+}
+
+// collect collects packet drop rates into the given HistogramVec. The rates
+// are labeled with the given kind and the relevant drop reasons. The rates are
+// calculated as the number of drops divided by the total number of packets.
+// If includeDrops is true, the total drops are added to the given total. This
+// is used for send statistics since the packets sent counter only includes
+// packets that weren't dropped.
+func (d *dropsByReason) collect(hv *prometheus.HistogramVec, kind packetKind, total float64, includeDrops bool) {
+ if includeDrops {
+ total += d.total
+ }
+
+ if total == 0 {
+ return
+ }
+
+ kindString := string(kind)
+ hv.WithLabelValues(kindString, string(dropReasonUnknownDest)).Observe(d.unknownDest / total)
+ hv.WithLabelValues(kindString, string(dropReasonUnknownDestOnFwd)).Observe(d.unknownDestOnFwd / total)
+ hv.WithLabelValues(kindString, string(dropReasonGoneDisconnected)).Observe(d.goneDisconnected / total)
+ hv.WithLabelValues(kindString, string(dropReasonQueueHead)).Observe(d.queueHead / total)
+ hv.WithLabelValues(kindString, string(dropReasonQueueTail)).Observe(d.queueTail / total)
+ hv.WithLabelValues(kindString, string(dropReasonWriteError)).Observe(d.writeError / total)
+ hv.WithLabelValues(kindString, string(dropReasonDupClient)).Observe(d.dupClient / total)
+}
+
+func (s *Server) recordDrop(src, dst *sclient, packetBytes []byte, srcKey, dstKey key.NodePublic, reason dropReason) {
labels := dropReasonKindLabels{
Reason: string(reason),
}
looksDisco := disco.LooksLikeDiscoWrapper(packetBytes)
+ if src != nil {
+ src.dropsMu.Lock()
+ }
+ if dst != nil {
+ dst.dropsMu.Lock()
+ }
if looksDisco {
labels.Kind = string(packetKindDisco)
+ if src != nil {
+ src.discoRecvDropsByReason.recordDrop(reason)
+ }
+ if dst != nil {
+ dst.discoSendDropsByReason.recordDrop(reason)
+ }
} else {
labels.Kind = string(packetKindOther)
+ if src != nil {
+ src.otherRecvDropsByReason.recordDrop(reason)
+ }
+ if dst != nil {
+ dst.otherSendDropsByReason.recordDrop(reason)
+ }
}
+ if src != nil {
+ src.dropsMu.Unlock()
+ }
+ if dst != nil {
+ dst.dropsMu.Unlock()
+ }
+
packetsDropped.Add(labels, 1)
if verboseDropKeys[dstKey] {
@@ -1220,7 +1328,6 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r
}
func (c *sclient) sendPkt(dst *sclient, p pkt) error {
- s := c.s
dstKey := dst.key
// Attempt to queue for sending up to 3 times. On each attempt, if
@@ -1233,7 +1340,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
for attempt := 0; attempt < 3; attempt++ {
select {
case <-dst.done:
- s.recordDrop(p.bs, c.key, dstKey, dropReasonGoneDisconnected)
+ c.s.recordDrop(c, dst, p.bs, c.key, dstKey, dropReasonGoneDisconnected)
dst.debugLogf("sendPkt attempt %d dropped, dst gone", attempt)
return nil
default:
@@ -1247,7 +1354,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
select {
case pkt := <-sendQueue:
- s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead)
+ c.s.recordDrop(c, dst, pkt.bs, c.key, dstKey, dropReasonQueueHead)
c.recordQueueTime(pkt.enqueuedAt)
default:
}
@@ -1255,7 +1362,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
// Failed to make room for packet. This can happen in a heavily
// contended queue with racing writers. Give up and tail-drop in
// this case to keep reader unblocked.
- s.recordDrop(p.bs, c.key, dstKey, dropReasonQueueTail)
+ c.s.recordDrop(c, dst, p.bs, c.key, dstKey, dropReasonQueueTail)
dst.debugLogf("sendPkt attempt %d dropped, queue full")
return nil
@@ -1497,7 +1604,10 @@ func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.NodePublic, info
return clientKey, info, nil
}
-func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
+func (c *sclient) recvPacket(frameLen uint32) (dstKey key.NodePublic, contents []byte, err error) {
+ s := c.s
+ br := c.br
+
if frameLen < keyLen {
return zpub, nil, errors.New("short send packet frame")
}
@@ -1512,12 +1622,15 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.NodeP
if _, err := io.ReadFull(br, contents); err != nil {
return zpub, nil, err
}
+
s.packetsRecv.Add(1)
s.bytesRecv.Add(int64(len(contents)))
if disco.LooksLikeDiscoWrapper(contents) {
s.packetsRecvDisco.Add(1)
+ c.packetsRecvDisco.Add(1)
} else {
s.packetsRecvOther.Add(1)
+ c.packetsRecvOther.Add(1)
}
return dstKey, contents, nil
}
@@ -1598,6 +1711,15 @@ type sclient struct {
// client that it's trying to establish a direct connection
// through us with a peer we have no record of.
peerGoneLim *rate.Limiter
+
+ packetsRecvDisco, packetsRecvOther, packetsSentDisco, packetsSentOther atomic.Uint64
+
+ // dropsMu guards the below packet drop metrics
+ dropsMu sync.Mutex
+ discoRecvDropsByReason dropsByReason
+ discoSendDropsByReason dropsByReason
+ otherRecvDropsByReason dropsByReason
+ otherSendDropsByReason dropsByReason
}
func (c *sclient) presentFlags() PeerPresentFlags {
@@ -1708,9 +1830,9 @@ func (c *sclient) onSendLoopDone() {
for {
select {
case pkt := <-c.sendQueue:
- c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
+ c.s.recordDrop(nil, c, pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
case pkt := <-c.discoSendQueue:
- c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
+ c.s.recordDrop(nil, c, pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected)
default:
return
}
@@ -1917,7 +2039,7 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error)
defer func() {
// Stats update.
if err != nil {
- c.s.recordDrop(contents, srcKey, c.key, dropReasonWriteError)
+ c.s.recordDrop(nil, c, contents, srcKey, c.key, dropReasonWriteError)
} else {
c.s.packetsSent.Add(1)
c.s.bytesSent.Add(int64(len(contents)))
@@ -2149,6 +2271,8 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize)
m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount)
m.Set("counter_buffered_write_frames", s.bufferedWriteFrames)
+ m.Set("packets_recv_drop_rate_by_client", collectorVar{s.packetsRecvDropRateByClient})
+ m.Set("packets_send_drop_rate_by_client", collectorVar{s.packetsSentDropRateByClient})
var expvarVersion expvar.String
expvarVersion.Set(version.Long())
m.Set("version", &expvarVersion)
@@ -2341,3 +2465,43 @@ func (w *lazyBufioWriter) Flush() error {
return err
}
+
+// monitorQueueDepths maintains histograms of send queue depths for disco and
+// non-disco traffic. It observes queue depths for all active clients every 10
+// seconds.
+func (s *Server) collectDropsByClient() {
+ t := time.NewTicker(10 * time.Second)
+
+ var clients []*clientSet
+
+ for {
+ select {
+ case <-t.C:
+ clients = clients[:0]
+ s.mu.Lock()
+ for _, cs := range s.clients {
+ clients = append(clients, cs)
+ }
+ s.mu.Unlock()
+
+ for _, cs := range clients {
+ if c := cs.activeClient.Load(); c != nil {
+ discoRecv, otherRecv, discoSent, otherSent := c.packetsRecvDisco.Swap(0), c.packetsRecvOther.Swap(0), c.packetsSentDisco.Swap(0), c.packetsSentOther.Swap(0)
+ c.dropsMu.Lock()
+ c.discoRecvDropsByReason.collect(s.packetsRecvDropRateByClient, packetKindDisco, float64(discoRecv), false)
+ c.otherRecvDropsByReason.collect(s.packetsRecvDropRateByClient, packetKindOther, float64(otherRecv), false)
+ c.discoSendDropsByReason.collect(s.packetsSentDropRateByClient, packetKindDisco, float64(discoSent), true)
+ c.otherSendDropsByReason.collect(s.packetsSentDropRateByClient, packetKindOther, float64(otherSent), true)
+ c.discoRecvDropsByReason.reset()
+ c.otherRecvDropsByReason.reset()
+ c.discoSendDropsByReason.reset()
+ c.otherSendDropsByReason.reset()
+ c.dropsMu.Unlock()
+ }
+ }
+
+ // clients does retain clientSets, but these are fairly lightweight since they just point at actual clients
+ clients = clients[:0]
+ }
+ }
+}