diff options
Diffstat (limited to 'util/execqueue/execqueue.go')
| -rw-r--r-- | util/execqueue/execqueue.go | 21 |
1 files changed, 20 insertions, 1 deletions
diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go index 889cea255..dce70c542 100644 --- a/util/execqueue/execqueue.go +++ b/util/execqueue/execqueue.go @@ -12,6 +12,8 @@ import ( type ExecQueue struct { mu sync.Mutex + ctx context.Context // context.Background + closed on Shutdown + cancel context.CancelFunc // closes ctx closed bool inFlight bool // whether a goroutine is running q.run doneWaiter chan struct{} // non-nil if waiter is waiting, then closed @@ -24,6 +26,7 @@ func (q *ExecQueue) Add(f func()) { if q.closed { return } + q.initCtxLocked() if q.inFlight { q.queue = append(q.queue, f) } else { @@ -79,18 +82,32 @@ func (q *ExecQueue) Shutdown() { q.mu.Lock() defer q.mu.Unlock() q.closed = true + if q.cancel != nil { + q.cancel() + } } -// Wait waits for the queue to be empty. +func (q *ExecQueue) initCtxLocked() { + if q.ctx == nil { + q.ctx, q.cancel = context.WithCancel(context.Background()) + } +} + +// Wait waits for the queue to be empty or shut down. func (q *ExecQueue) Wait(ctx context.Context) error { q.mu.Lock() + q.initCtxLocked() waitCh := q.doneWaiter if q.inFlight && waitCh == nil { waitCh = make(chan struct{}) q.doneWaiter = waitCh } + closed := q.closed q.mu.Unlock() + if closed { + return errors.New("execqueue shut down") + } if waitCh == nil { return nil } @@ -98,6 +115,8 @@ func (q *ExecQueue) Wait(ctx context.Context) error { select { case <-waitCh: return nil + case <-q.ctx.Done(): + return errors.New("execqueue shut down") case <-ctx.Done(): return ctx.Err() } |
