diff options
Diffstat (limited to 'util/latencyqueue/latencyqueue.go')
| -rw-r--r-- | util/latencyqueue/latencyqueue.go | 25 |
1 files changed, 13 insertions, 12 deletions
diff --git a/util/latencyqueue/latencyqueue.go b/util/latencyqueue/latencyqueue.go index 65f62a304..26d7b4db4 100644 --- a/util/latencyqueue/latencyqueue.go +++ b/util/latencyqueue/latencyqueue.go @@ -11,6 +11,8 @@ import ( "sync" "sync/atomic" "time" + + "tailscale.com/util/ringbuffer" ) var ( @@ -52,7 +54,7 @@ type Queue[T any] struct { cancel context.CancelCauseFunc mu sync.Mutex - items []queueItem[T] + items *ringbuffer.RingBuffer[queueItem[T]] wakeup chan struct{} started bool @@ -93,7 +95,7 @@ func New[T any](parent context.Context, maxLag time.Duration) *Queue[T] { q := &Queue[T]{ ctx: ctx, cancel: cancel, - items: make([]queueItem[T], 0, 128), + items: ringbuffer.New[queueItem[T]](), wakeup: make(chan struct{}, 1), maxLag: maxLag, done: make(chan struct{}), @@ -154,7 +156,7 @@ func (q *Queue[T]) Enqueue(batch []T) bool { default: } - q.items = append(q.items, item) + q.items.Push(item) q.numEnqueued.Add(uint64(len(batch))) q.mu.Unlock() @@ -181,7 +183,7 @@ func (q *Queue[T]) Barrier() <-chan struct{} { kind: kindBarrier, barrier: ch, } - q.items = append(q.items, item) + q.items.Push(item) q.mu.Unlock() q.wake() @@ -227,12 +229,7 @@ func (q *Queue[T]) run(processor func(context.Context, T)) { for { if processing == nil { q.mu.Lock() - hasItems := len(q.items) > 0 - var item queueItem[T] - if hasItems { - item = q.items[0] - q.items = q.items[1:] - } + item, hasItems := q.items.Pop() q.mu.Unlock() if !hasItems { @@ -323,10 +320,14 @@ func (q *Queue[T]) drainAndReleaseBarriers() { q.mu.Lock() defer q.mu.Unlock() - for _, item := range q.items { + for !q.items.IsEmpty() { + item, ok := q.items.Pop() + if !ok { + break + } if item.kind == kindBarrier { close(item.barrier) } } - q.items = nil + q.items.Clear() } |
