summaryrefslogtreecommitdiffhomepage
path: root/util/execqueue/execqueue.go
diff options
context:
space:
mode:
Diffstat (limited to 'util/execqueue/execqueue.go')
-rw-r--r--util/execqueue/execqueue.go37
1 files changed, 20 insertions, 17 deletions
diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go
index 2ea0c1f2f..87616a6b5 100644
--- a/util/execqueue/execqueue.go
+++ b/util/execqueue/execqueue.go
@@ -39,21 +39,21 @@ func (q *ExecQueue) Add(f func()) {
// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
- for {
- if err := q.Wait(ctx); err != nil {
- return err
- }
- q.mu.Lock()
- if q.inFlight {
- q.mu.Unlock()
- continue
- }
- defer q.mu.Unlock()
- if q.closed {
- return errors.New("closed")
- }
- f()
+ q.mu.Lock()
+ q.initCtxLocked()
+ shutdownCtx := q.ctx
+ q.mu.Unlock()
+
+ ch := make(chan struct{})
+ q.Add(f)
+ q.Add(func() { close(ch) })
+ select {
+ case <-ch:
return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-shutdownCtx.Done():
+ return errExecQueueShutdown
}
}
@@ -94,6 +94,8 @@ func (q *ExecQueue) initCtxLocked() {
}
}
+var errExecQueueShutdown = errors.New("execqueue shut down")
+
// Wait waits for the queue to be empty or shut down.
func (q *ExecQueue) Wait(ctx context.Context) error {
q.mu.Lock()
@@ -104,10 +106,11 @@ func (q *ExecQueue) Wait(ctx context.Context) error {
q.doneWaiter = waitCh
}
closed := q.closed
+ shutdownCtx := q.ctx
q.mu.Unlock()
if closed {
- return errors.New("execqueue shut down")
+ return errExecQueueShutdown
}
if waitCh == nil {
return nil
@@ -116,8 +119,8 @@ func (q *ExecQueue) Wait(ctx context.Context) error {
select {
case <-waitCh:
return nil
- case <-q.ctx.Done():
- return errors.New("execqueue shut down")
+ case <-shutdownCtx.Done():
+ return errExecQueueShutdown
case <-ctx.Done():
return ctx.Err()
}