diff options
| author | Andrew Dunham <andrew@du.nham.ca> | 2025-01-24 15:42:12 -0500 |
|---|---|---|
| committer | James Tucker <james@tailscale.com> | 2025-01-24 13:17:19 -0800 |
| commit | f3db0011217c3bdba9c06ba189523962e655f501 (patch) | |
| tree | e303ba93b57b76d67445e22d874c35e6a5eb2e9c /util | |
| parent | 69bc164c621b8dc920b4208b389bd4a8f87c3d9f (diff) | |
| download | tailscale-andrew/execqueue-metrics.tar.xz tailscale-andrew/execqueue-metrics.zip | |
util/execqueue: add metricsandrew/execqueue-metrics
Expose enough metrics to get a sense of queue depth, use and if it has
stalled.
Updates tailscale/corp#26058
Signed-off-by: Andrew Dunham <andrew@du.nham.ca>
Change-Id: I271ac8d03f3db587a33aca6964fe92f2833e1251
Diffstat (limited to 'util')
| -rw-r--r-- | util/execqueue/execqueue.go | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go index 889cea255..d3f4f4cca 100644 --- a/util/execqueue/execqueue.go +++ b/util/execqueue/execqueue.go @@ -7,7 +7,11 @@ package execqueue import ( "context" "errors" + "expvar" + "fmt" "sync" + "sync/atomic" + "time" ) type ExecQueue struct { @@ -16,9 +20,36 @@ type ExecQueue struct { inFlight bool // whether a goroutine is running q.run doneWaiter chan struct{} // non-nil if waiter is waiting, then closed queue []func() + + // metrics follow + metricsRegisterOnce sync.Once + metricInserts expvar.Int + metricRemovals expvar.Int + metricQueueLastDrain expvar.Int // unix millis +} + +// This is extremely silly but is for debugging +var metricsCounter atomic.Int64 + +// registerMetrics registers the queue's metrics with expvar, using a unique name. +func (q *ExecQueue) registerMetrics() { + q.metricsRegisterOnce.Do(func() { + m := new(expvar.Map).Init() + m.Set("inserts", &q.metricInserts) + m.Set("removals", &q.metricRemovals) + m.Set("length", expvar.Func(func() any { + return q.metricInserts.Value() - q.metricRemovals.Value() + })) + m.Set("last_drain", &q.metricQueueLastDrain) + + name := fmt.Sprintf("execqueue-%d", metricsCounter.Add(1)) + expvar.Publish(name, m) + }) } func (q *ExecQueue) Add(f func()) { + q.registerMetrics() + q.mu.Lock() defer q.mu.Unlock() if q.closed { @@ -26,6 +57,7 @@ func (q *ExecQueue) Add(f func()) { } if q.inFlight { q.queue = append(q.queue, f) + q.metricInserts.Add(1) } else { q.inFlight = true go q.run(f) @@ -35,6 +67,8 @@ func (q *ExecQueue) Add(f func()) { // RunSync waits for the queue to be drained and then synchronously runs f. // It returns an error if the queue is closed before f is run or ctx expires. func (q *ExecQueue) RunSync(ctx context.Context, f func()) error { + q.registerMetrics() + for { if err := q.Wait(ctx); err != nil { return err @@ -61,11 +95,13 @@ func (q *ExecQueue) run(f func()) { f := q.queue[0] q.queue[0] = nil q.queue = q.queue[1:] + q.metricRemovals.Add(1) q.mu.Unlock() f() q.mu.Lock() } q.inFlight = false + q.metricQueueLastDrain.Set(int64(time.Now().UnixMilli())) q.queue = nil if q.doneWaiter != nil { close(q.doneWaiter) @@ -76,6 +112,8 @@ func (q *ExecQueue) run(f func()) { // Shutdown asynchronously signals the queue to stop. func (q *ExecQueue) Shutdown() { + q.registerMetrics() + q.mu.Lock() defer q.mu.Unlock() q.closed = true @@ -83,6 +121,8 @@ func (q *ExecQueue) Shutdown() { // Wait waits for the queue to be empty. func (q *ExecQueue) Wait(ctx context.Context) error { + q.registerMetrics() + q.mu.Lock() waitCh := q.doneWaiter if q.inFlight && waitCh == nil { |
