summaryrefslogtreecommitdiffhomepage
path: root/util/execqueue/execqueue.go
blob: d3f4f4cca6957ebc852424203cba431fccaff6ba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause

// Package execqueue implements an ordered asynchronous queue for executing functions.
package execqueue

import (
	"context"
	"errors"
	"expvar"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type ExecQueue struct {
	mu         sync.Mutex
	closed     bool
	inFlight   bool          // whether a goroutine is running q.run
	doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
	queue      []func()

	// metrics follow
	metricsRegisterOnce  sync.Once
	metricInserts        expvar.Int
	metricRemovals       expvar.Int
	metricQueueLastDrain expvar.Int // unix millis
}

// This is extremely silly but is for debugging
var metricsCounter atomic.Int64

// registerMetrics registers the queue's metrics with expvar, using a unique name.
func (q *ExecQueue) registerMetrics() {
	q.metricsRegisterOnce.Do(func() {
		m := new(expvar.Map).Init()
		m.Set("inserts", &q.metricInserts)
		m.Set("removals", &q.metricRemovals)
		m.Set("length", expvar.Func(func() any {
			return q.metricInserts.Value() - q.metricRemovals.Value()
		}))
		m.Set("last_drain", &q.metricQueueLastDrain)

		name := fmt.Sprintf("execqueue-%d", metricsCounter.Add(1))
		expvar.Publish(name, m)
	})
}

func (q *ExecQueue) Add(f func()) {
	q.registerMetrics()

	q.mu.Lock()
	defer q.mu.Unlock()
	if q.closed {
		return
	}
	if q.inFlight {
		q.queue = append(q.queue, f)
		q.metricInserts.Add(1)
	} else {
		q.inFlight = true
		go q.run(f)
	}
}

// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
	q.registerMetrics()

	for {
		if err := q.Wait(ctx); err != nil {
			return err
		}
		q.mu.Lock()
		if q.inFlight {
			q.mu.Unlock()
			continue
		}
		defer q.mu.Unlock()
		if q.closed {
			return errors.New("closed")
		}
		f()
		return nil
	}
}

func (q *ExecQueue) run(f func()) {
	f()

	q.mu.Lock()
	for len(q.queue) > 0 && !q.closed {
		f := q.queue[0]
		q.queue[0] = nil
		q.queue = q.queue[1:]
		q.metricRemovals.Add(1)
		q.mu.Unlock()
		f()
		q.mu.Lock()
	}
	q.inFlight = false
	q.metricQueueLastDrain.Set(int64(time.Now().UnixMilli()))
	q.queue = nil
	if q.doneWaiter != nil {
		close(q.doneWaiter)
		q.doneWaiter = nil
	}
	q.mu.Unlock()
}

// Shutdown asynchronously signals the queue to stop.
func (q *ExecQueue) Shutdown() {
	q.registerMetrics()

	q.mu.Lock()
	defer q.mu.Unlock()
	q.closed = true
}

// Wait waits for the queue to be empty.
func (q *ExecQueue) Wait(ctx context.Context) error {
	q.registerMetrics()

	q.mu.Lock()
	waitCh := q.doneWaiter
	if q.inFlight && waitCh == nil {
		waitCh = make(chan struct{})
		q.doneWaiter = waitCh
	}
	q.mu.Unlock()

	if waitCh == nil {
		return nil
	}

	select {
	case <-waitCh:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}