summaryrefslogtreecommitdiffhomepage
path: root/derp
diff options
context:
space:
mode:
Diffstat (limited to 'derp')
-rw-r--r--derp/derp_server.go79
1 files changed, 60 insertions, 19 deletions
diff --git a/derp/derp_server.go b/derp/derp_server.go
index 08fd280a9..ad75dd6d0 100644
--- a/derp/derp_server.go
+++ b/derp/derp_server.go
@@ -170,6 +170,8 @@ type Server struct {
meshUpdateBatchSize *metrics.Histogram
meshUpdateLoopCount *metrics.Histogram
bufferedWriteFrames *metrics.Histogram // how many sendLoop frames (or groups of related frames) get written per flush
+ nonDiscoSendQueueDepths *metrics.Histogram
+ discoSendQueueDepths *metrics.Histogram
// verifyClientsLocalTailscaled only accepts client connections to the DERP
// server if the clientKey is a known peer in the network, as specified by a
@@ -364,25 +366,27 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
runtime.ReadMemStats(&ms)
s := &Server{
- debug: envknob.Bool("DERP_DEBUG_LOGS"),
- privateKey: privateKey,
- publicKey: privateKey.Public(),
- logf: logf,
- limitedLogf: logger.RateLimitedFn(logf, 30*time.Second, 5, 100),
- packetsRecvByKind: metrics.LabelMap{Label: "kind"},
- clients: map[key.NodePublic]*clientSet{},
- clientsMesh: map[key.NodePublic]PacketForwarder{},
- netConns: map[Conn]chan struct{}{},
- memSys0: ms.Sys,
- watchers: set.Set[*sclient]{},
- peerGoneWatchers: map[key.NodePublic]set.HandleSet[func(key.NodePublic)]{},
- avgQueueDuration: new(uint64),
- tcpRtt: metrics.LabelMap{Label: "le"},
- meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}),
- meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}),
- bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}),
- keyOfAddr: map[netip.AddrPort]key.NodePublic{},
- clock: tstime.StdClock{},
+ debug: envknob.Bool("DERP_DEBUG_LOGS"),
+ privateKey: privateKey,
+ publicKey: privateKey.Public(),
+ logf: logf,
+ limitedLogf: logger.RateLimitedFn(logf, 30*time.Second, 5, 100),
+ packetsRecvByKind: metrics.LabelMap{Label: "kind"},
+ clients: map[key.NodePublic]*clientSet{},
+ clientsMesh: map[key.NodePublic]PacketForwarder{},
+ netConns: map[Conn]chan struct{}{},
+ memSys0: ms.Sys,
+ watchers: set.Set[*sclient]{},
+ peerGoneWatchers: map[key.NodePublic]set.HandleSet[func(key.NodePublic)]{},
+ avgQueueDuration: new(uint64),
+ tcpRtt: metrics.LabelMap{Label: "le"},
+ meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}),
+ meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}),
+ bufferedWriteFrames: metrics.NewHistogram([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 50, 100}),
+ nonDiscoSendQueueDepths: metrics.NewHistogram([]float64{0, 1, 2, 4, 8, 16, 32}),
+ discoSendQueueDepths: metrics.NewHistogram([]float64{0, 1, 2, 4, 8, 16, 32}),
+ keyOfAddr: map[netip.AddrPort]key.NodePublic{},
+ clock: tstime.StdClock{},
}
s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get(string(packetKindDisco))
@@ -391,6 +395,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
genPacketsDroppedCounters()
s.perClientSendQueueDepth = getPerClientSendQueueDepth()
+
+ go s.monitorQueueDepths()
return s
}
@@ -2149,6 +2155,8 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize)
m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount)
m.Set("counter_buffered_write_frames", s.bufferedWriteFrames)
+ m.Set("counter_non_disco_sendqueue_depth", s.nonDiscoSendQueueDepths)
+ m.Set("counter_disco_sendqueue_depth", s.discoSendQueueDepths)
var expvarVersion expvar.String
expvarVersion.Set(version.Long())
m.Set("version", &expvarVersion)
@@ -2341,3 +2349,36 @@ func (w *lazyBufioWriter) Flush() error {
return err
}
+
+// monitorQueueDepths maintains histograms of send queue depths for disco and
+// non-disco traffic. It observes queue depths for all active clients every 10
+// seconds.
+func (s *Server) monitorQueueDepths() {
+ t := time.NewTicker(10 * time.Second)
+ var nonDiscoDepths []int
+ var discoDepths []int
+
+ for {
+ select {
+ case <-t.C:
+ nonDiscoDepths = nonDiscoDepths[:0]
+ discoDepths = nonDiscoDepths[:0]
+ s.mu.Lock()
+ for _, cs := range s.clients {
+ c := cs.activeClient.Load()
+ if c != nil {
+ nonDiscoDepths = append(nonDiscoDepths, len(c.sendQueue))
+ discoDepths = append(discoDepths, len(c.discoSendQueue))
+ }
+ }
+ s.mu.Unlock()
+
+ for _, depth := range nonDiscoDepths {
+ s.nonDiscoSendQueueDepths.Observe(float64(depth))
+ }
+ for _, depth := range discoDepths {
+ s.discoSendQueueDepths.Observe(float64(depth))
+ }
+ }
+ }
+}