summaryrefslogtreecommitdiffhomepage
path: root/util/latencyqueue/latencyqueue.go
diff options
context:
space:
mode:
Diffstat (limited to 'util/latencyqueue/latencyqueue.go')
-rw-r--r--util/latencyqueue/latencyqueue.go25
1 files changed, 13 insertions, 12 deletions
diff --git a/util/latencyqueue/latencyqueue.go b/util/latencyqueue/latencyqueue.go
index 65f62a304..26d7b4db4 100644
--- a/util/latencyqueue/latencyqueue.go
+++ b/util/latencyqueue/latencyqueue.go
@@ -11,6 +11,8 @@ import (
"sync"
"sync/atomic"
"time"
+
+ "tailscale.com/util/ringbuffer"
)
var (
@@ -52,7 +54,7 @@ type Queue[T any] struct {
cancel context.CancelCauseFunc
mu sync.Mutex
- items []queueItem[T]
+ items *ringbuffer.RingBuffer[queueItem[T]]
wakeup chan struct{}
started bool
@@ -93,7 +95,7 @@ func New[T any](parent context.Context, maxLag time.Duration) *Queue[T] {
q := &Queue[T]{
ctx: ctx,
cancel: cancel,
- items: make([]queueItem[T], 0, 128),
+ items: ringbuffer.New[queueItem[T]](),
wakeup: make(chan struct{}, 1),
maxLag: maxLag,
done: make(chan struct{}),
@@ -154,7 +156,7 @@ func (q *Queue[T]) Enqueue(batch []T) bool {
default:
}
- q.items = append(q.items, item)
+ q.items.Push(item)
q.numEnqueued.Add(uint64(len(batch)))
q.mu.Unlock()
@@ -181,7 +183,7 @@ func (q *Queue[T]) Barrier() <-chan struct{} {
kind: kindBarrier,
barrier: ch,
}
- q.items = append(q.items, item)
+ q.items.Push(item)
q.mu.Unlock()
q.wake()
@@ -227,12 +229,7 @@ func (q *Queue[T]) run(processor func(context.Context, T)) {
for {
if processing == nil {
q.mu.Lock()
- hasItems := len(q.items) > 0
- var item queueItem[T]
- if hasItems {
- item = q.items[0]
- q.items = q.items[1:]
- }
+ item, hasItems := q.items.Pop()
q.mu.Unlock()
if !hasItems {
@@ -323,10 +320,14 @@ func (q *Queue[T]) drainAndReleaseBarriers() {
q.mu.Lock()
defer q.mu.Unlock()
- for _, item := range q.items {
+ for !q.items.IsEmpty() {
+ item, ok := q.items.Pop()
+ if !ok {
+ break
+ }
if item.kind == kindBarrier {
close(item.barrier)
}
}
- q.items = nil
+ q.items.Clear()
}