summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJames Tucker <james@tailscale.com>2025-11-09 23:29:15 -0800
committerJames Tucker <james@tailscale.com>2025-11-09 23:29:15 -0800
commitfdfe5309acf1f8535d451db8fb55c8df485dc419 (patch)
treec1d18fce4ef40195539b2df920d8d44dc3bfb2d1
parent3ef8ae083ae48bb2270b3996a67145f90ddcea47 (diff)
downloadtailscale-fdfe5309acf1f8535d451db8fb55c8df485dc419.tar.xz
tailscale-fdfe5309acf1f8535d451db8fb55c8df485dc419.zip
util/latencyqueue: use util/ringbuffer for improved memory costraggi/latencyqueue
Updates tailscale/corp#34129 Signed-off-by: James Tucker <james@tailscale.com>
-rw-r--r--util/latencyqueue/latencyqueue.go25
-rw-r--r--util/latencyqueue/latencyqueue_test.go98
2 files changed, 111 insertions, 12 deletions
diff --git a/util/latencyqueue/latencyqueue.go b/util/latencyqueue/latencyqueue.go
index 65f62a304..26d7b4db4 100644
--- a/util/latencyqueue/latencyqueue.go
+++ b/util/latencyqueue/latencyqueue.go
@@ -11,6 +11,8 @@ import (
"sync"
"sync/atomic"
"time"
+
+ "tailscale.com/util/ringbuffer"
)
var (
@@ -52,7 +54,7 @@ type Queue[T any] struct {
cancel context.CancelCauseFunc
mu sync.Mutex
- items []queueItem[T]
+ items *ringbuffer.RingBuffer[queueItem[T]]
wakeup chan struct{}
started bool
@@ -93,7 +95,7 @@ func New[T any](parent context.Context, maxLag time.Duration) *Queue[T] {
q := &Queue[T]{
ctx: ctx,
cancel: cancel,
- items: make([]queueItem[T], 0, 128),
+ items: ringbuffer.New[queueItem[T]](),
wakeup: make(chan struct{}, 1),
maxLag: maxLag,
done: make(chan struct{}),
@@ -154,7 +156,7 @@ func (q *Queue[T]) Enqueue(batch []T) bool {
default:
}
- q.items = append(q.items, item)
+ q.items.Push(item)
q.numEnqueued.Add(uint64(len(batch)))
q.mu.Unlock()
@@ -181,7 +183,7 @@ func (q *Queue[T]) Barrier() <-chan struct{} {
kind: kindBarrier,
barrier: ch,
}
- q.items = append(q.items, item)
+ q.items.Push(item)
q.mu.Unlock()
q.wake()
@@ -227,12 +229,7 @@ func (q *Queue[T]) run(processor func(context.Context, T)) {
for {
if processing == nil {
q.mu.Lock()
- hasItems := len(q.items) > 0
- var item queueItem[T]
- if hasItems {
- item = q.items[0]
- q.items = q.items[1:]
- }
+ item, hasItems := q.items.Pop()
q.mu.Unlock()
if !hasItems {
@@ -323,10 +320,14 @@ func (q *Queue[T]) drainAndReleaseBarriers() {
q.mu.Lock()
defer q.mu.Unlock()
- for _, item := range q.items {
+ for !q.items.IsEmpty() {
+ item, ok := q.items.Pop()
+ if !ok {
+ break
+ }
if item.kind == kindBarrier {
close(item.barrier)
}
}
- q.items = nil
+ q.items.Clear()
}
diff --git a/util/latencyqueue/latencyqueue_test.go b/util/latencyqueue/latencyqueue_test.go
index 6a6051ee1..dcf4b38d5 100644
--- a/util/latencyqueue/latencyqueue_test.go
+++ b/util/latencyqueue/latencyqueue_test.go
@@ -722,3 +722,101 @@ func TestZeroMaxLag(t *testing.T) {
}
})
}
+
+// BenchmarkVariableLoad tests memory efficiency under variable load patterns.
+// The ringbuffer-based implementation should efficiently handle:
+// - Bursts of enqueues followed by processing
+// - Growing and shrinking queue sizes
+// - Memory compaction during idle periods
+func BenchmarkVariableLoad(b *testing.B) {
+ q := New[int](context.Background(), 10*time.Second)
+
+ processed := atomic.Int64{}
+ q.Start(func(ctx context.Context, val int) {
+ processed.Add(1)
+ // Simulate some processing time
+ time.Sleep(10 * time.Microsecond)
+ })
+ defer q.Close()
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ // Simulate bursty traffic - enqueue in batches
+ batchSize := 10 + (i % 50) // Variable batch sizes from 10-59
+ batch := make([]int, batchSize)
+ for j := range batch {
+ batch[j] = i*100 + j
+ }
+ q.Enqueue(batch)
+
+ // Occasionally wait for processing to catch up
+ if i%100 == 99 {
+ barrier := q.Barrier()
+ <-barrier
+ }
+ }
+
+ // Final barrier to ensure all items are processed
+ barrier := q.Barrier()
+ <-barrier
+
+ b.ReportMetric(float64(processed.Load()), "items")
+}
+
+// BenchmarkSteadyState tests performance under steady-state conditions.
+func BenchmarkSteadyState(b *testing.B) {
+ q := New[int](context.Background(), 10*time.Second)
+
+ q.Start(func(ctx context.Context, val int) {
+ // Fast processing
+ })
+ defer q.Close()
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ q.Enqueue([]int{i})
+ }
+
+ barrier := q.Barrier()
+ <-barrier
+}
+
+// BenchmarkBurstThenDrain tests memory efficiency in burst-then-drain scenarios.
+// This pattern exposes inefficiencies in slice-based implementations where
+// the underlying array never shrinks. The ringbuffer should compact efficiently.
+func BenchmarkBurstThenDrain(b *testing.B) {
+ q := New[int](context.Background(), 10*time.Second)
+
+ processDelay := atomic.Bool{}
+ q.Start(func(ctx context.Context, val int) {
+ if processDelay.Load() {
+ time.Sleep(100 * time.Microsecond)
+ }
+ })
+ defer q.Close()
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ // Burst phase: enqueue many items with slow processing
+ processDelay.Store(true)
+ largeBatch := make([]int, 1000)
+ for j := range largeBatch {
+ largeBatch[j] = i*1000 + j
+ }
+ q.Enqueue(largeBatch)
+
+ // Let queue fill up a bit
+ time.Sleep(500 * time.Microsecond)
+
+ // Drain phase: speed up processing
+ processDelay.Store(false)
+ barrier := q.Barrier()
+ <-barrier
+
+ // Allow time for compaction to potentially occur
+ time.Sleep(100 * time.Microsecond)
+ }
+}