diff options
Diffstat (limited to 'util/execqueue/execqueue.go')
| -rw-r--r-- | util/execqueue/execqueue.go | 37 |
1 files changed, 20 insertions, 17 deletions
diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go index 2ea0c1f2f..87616a6b5 100644 --- a/util/execqueue/execqueue.go +++ b/util/execqueue/execqueue.go @@ -39,21 +39,21 @@ func (q *ExecQueue) Add(f func()) { // RunSync waits for the queue to be drained and then synchronously runs f. // It returns an error if the queue is closed before f is run or ctx expires. func (q *ExecQueue) RunSync(ctx context.Context, f func()) error { - for { - if err := q.Wait(ctx); err != nil { - return err - } - q.mu.Lock() - if q.inFlight { - q.mu.Unlock() - continue - } - defer q.mu.Unlock() - if q.closed { - return errors.New("closed") - } - f() + q.mu.Lock() + q.initCtxLocked() + shutdownCtx := q.ctx + q.mu.Unlock() + + ch := make(chan struct{}) + q.Add(f) + q.Add(func() { close(ch) }) + select { + case <-ch: return nil + case <-ctx.Done(): + return ctx.Err() + case <-shutdownCtx.Done(): + return errExecQueueShutdown } } @@ -94,6 +94,8 @@ func (q *ExecQueue) initCtxLocked() { } } +var errExecQueueShutdown = errors.New("execqueue shut down") + // Wait waits for the queue to be empty or shut down. func (q *ExecQueue) Wait(ctx context.Context) error { q.mu.Lock() @@ -104,10 +106,11 @@ func (q *ExecQueue) Wait(ctx context.Context) error { q.doneWaiter = waitCh } closed := q.closed + shutdownCtx := q.ctx q.mu.Unlock() if closed { - return errors.New("execqueue shut down") + return errExecQueueShutdown } if waitCh == nil { return nil @@ -116,8 +119,8 @@ func (q *ExecQueue) Wait(ctx context.Context) error { select { case <-waitCh: return nil - case <-q.ctx.Done(): - return errors.New("execqueue shut down") + case <-shutdownCtx.Done(): + return errExecQueueShutdown case <-ctx.Done(): return ctx.Err() } |
