summaryrefslogtreecommitdiffhomepage
path: root/util
diff options
context:
space:
mode:
authorAndrew Dunham <andrew@du.nham.ca>2025-01-24 15:42:12 -0500
committerJames Tucker <james@tailscale.com>2025-01-24 13:17:19 -0800
commitf3db0011217c3bdba9c06ba189523962e655f501 (patch)
treee303ba93b57b76d67445e22d874c35e6a5eb2e9c /util
parent69bc164c621b8dc920b4208b389bd4a8f87c3d9f (diff)
downloadtailscale-andrew/execqueue-metrics.tar.xz
tailscale-andrew/execqueue-metrics.zip
util/execqueue: add metricsandrew/execqueue-metrics
Expose enough metrics to get a sense of queue depth, use and if it has stalled. Updates tailscale/corp#26058 Signed-off-by: Andrew Dunham <andrew@du.nham.ca> Change-Id: I271ac8d03f3db587a33aca6964fe92f2833e1251
Diffstat (limited to 'util')
-rw-r--r--util/execqueue/execqueue.go40
1 files changed, 40 insertions, 0 deletions
diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go
index 889cea255..d3f4f4cca 100644
--- a/util/execqueue/execqueue.go
+++ b/util/execqueue/execqueue.go
@@ -7,7 +7,11 @@ package execqueue
import (
"context"
"errors"
+ "expvar"
+ "fmt"
"sync"
+ "sync/atomic"
+ "time"
)
type ExecQueue struct {
@@ -16,9 +20,36 @@ type ExecQueue struct {
inFlight bool // whether a goroutine is running q.run
doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
queue []func()
+
+ // metrics follow
+ metricsRegisterOnce sync.Once
+ metricInserts expvar.Int
+ metricRemovals expvar.Int
+ metricQueueLastDrain expvar.Int // unix millis
+}
+
+// This is extremely silly but is for debugging
+var metricsCounter atomic.Int64
+
+// registerMetrics registers the queue's metrics with expvar, using a unique name.
+func (q *ExecQueue) registerMetrics() {
+ q.metricsRegisterOnce.Do(func() {
+ m := new(expvar.Map).Init()
+ m.Set("inserts", &q.metricInserts)
+ m.Set("removals", &q.metricRemovals)
+ m.Set("length", expvar.Func(func() any {
+ return q.metricInserts.Value() - q.metricRemovals.Value()
+ }))
+ m.Set("last_drain", &q.metricQueueLastDrain)
+
+ name := fmt.Sprintf("execqueue-%d", metricsCounter.Add(1))
+ expvar.Publish(name, m)
+ })
}
func (q *ExecQueue) Add(f func()) {
+ q.registerMetrics()
+
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
@@ -26,6 +57,7 @@ func (q *ExecQueue) Add(f func()) {
}
if q.inFlight {
q.queue = append(q.queue, f)
+ q.metricInserts.Add(1)
} else {
q.inFlight = true
go q.run(f)
@@ -35,6 +67,8 @@ func (q *ExecQueue) Add(f func()) {
// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
+ q.registerMetrics()
+
for {
if err := q.Wait(ctx); err != nil {
return err
@@ -61,11 +95,13 @@ func (q *ExecQueue) run(f func()) {
f := q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]
+ q.metricRemovals.Add(1)
q.mu.Unlock()
f()
q.mu.Lock()
}
q.inFlight = false
+ q.metricQueueLastDrain.Set(int64(time.Now().UnixMilli()))
q.queue = nil
if q.doneWaiter != nil {
close(q.doneWaiter)
@@ -76,6 +112,8 @@ func (q *ExecQueue) run(f func()) {
// Shutdown asynchronously signals the queue to stop.
func (q *ExecQueue) Shutdown() {
+ q.registerMetrics()
+
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
@@ -83,6 +121,8 @@ func (q *ExecQueue) Shutdown() {
// Wait waits for the queue to be empty.
func (q *ExecQueue) Wait(ctx context.Context) error {
+ q.registerMetrics()
+
q.mu.Lock()
waitCh := q.doneWaiter
if q.inFlight && waitCh == nil {