summaryrefslogtreecommitdiffhomepage
path: root/util/execqueue/execqueue.go
diff options
context:
space:
mode:
authorBrad Fitzpatrick <bradfitz@tailscale.com>2025-11-26 09:26:08 -0800
committerBrad Fitzpatrick <brad@danga.com>2025-11-26 10:09:23 -0800
commit8af7778ce04457a5f84a45e7cc8f58f02b7bfb4c (patch)
treecfa670e69ca997499de5772045179147a496ad43 /util/execqueue/execqueue.go
parentb7658a4ad2d13da515daee2bd8dd7d50a9067708 (diff)
downloadtailscale-8af7778ce04457a5f84a45e7cc8f58f02b7bfb4c.tar.xz
tailscale-8af7778ce04457a5f84a45e7cc8f58f02b7bfb4c.zip
util/execqueue: don't hold mutex in RunSync
We don't hold q.mu while running normal ExecQueue.Add funcs, so we shouldn't in RunSync either. Otherwise code it calls can't shut down the queue, as seen in #18502. Updates #18052 Co-authored-by: Nick Khyl <nickk@tailscale.com> Change-Id: Ic5e53440411eca5e9fabac7f4a68a9f6ef026de1 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
Diffstat (limited to 'util/execqueue/execqueue.go')
-rw-r--r--util/execqueue/execqueue.go37
1 files changed, 20 insertions, 17 deletions
diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go
index 2ea0c1f2f..87616a6b5 100644
--- a/util/execqueue/execqueue.go
+++ b/util/execqueue/execqueue.go
@@ -39,21 +39,21 @@ func (q *ExecQueue) Add(f func()) {
// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
- for {
- if err := q.Wait(ctx); err != nil {
- return err
- }
- q.mu.Lock()
- if q.inFlight {
- q.mu.Unlock()
- continue
- }
- defer q.mu.Unlock()
- if q.closed {
- return errors.New("closed")
- }
- f()
+ q.mu.Lock()
+ q.initCtxLocked()
+ shutdownCtx := q.ctx
+ q.mu.Unlock()
+
+ ch := make(chan struct{})
+ q.Add(f)
+ q.Add(func() { close(ch) })
+ select {
+ case <-ch:
return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-shutdownCtx.Done():
+ return errExecQueueShutdown
}
}
@@ -94,6 +94,8 @@ func (q *ExecQueue) initCtxLocked() {
}
}
+var errExecQueueShutdown = errors.New("execqueue shut down")
+
// Wait waits for the queue to be empty or shut down.
func (q *ExecQueue) Wait(ctx context.Context) error {
q.mu.Lock()
@@ -104,10 +106,11 @@ func (q *ExecQueue) Wait(ctx context.Context) error {
q.doneWaiter = waitCh
}
closed := q.closed
+ shutdownCtx := q.ctx
q.mu.Unlock()
if closed {
- return errors.New("execqueue shut down")
+ return errExecQueueShutdown
}
if waitCh == nil {
return nil
@@ -116,8 +119,8 @@ func (q *ExecQueue) Wait(ctx context.Context) error {
select {
case <-waitCh:
return nil
- case <-q.ctx.Done():
- return errors.New("execqueue shut down")
+ case <-shutdownCtx.Done():
+ return errExecQueueShutdown
case <-ctx.Done():
return ctx.Err()
}