diff options
Diffstat (limited to 'util/latencyqueue/latencyqueue_test.go')
| -rw-r--r-- | util/latencyqueue/latencyqueue_test.go | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/util/latencyqueue/latencyqueue_test.go b/util/latencyqueue/latencyqueue_test.go index 6a6051ee1..dcf4b38d5 100644 --- a/util/latencyqueue/latencyqueue_test.go +++ b/util/latencyqueue/latencyqueue_test.go @@ -722,3 +722,101 @@ func TestZeroMaxLag(t *testing.T) { } }) } + +// BenchmarkVariableLoad tests memory efficiency under variable load patterns. +// The ringbuffer-based implementation should efficiently handle: +// - Bursts of enqueues followed by processing +// - Growing and shrinking queue sizes +// - Memory compaction during idle periods +func BenchmarkVariableLoad(b *testing.B) { + q := New[int](context.Background(), 10*time.Second) + + processed := atomic.Int64{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + // Simulate some processing time + time.Sleep(10 * time.Microsecond) + }) + defer q.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Simulate bursty traffic - enqueue in batches + batchSize := 10 + (i % 50) // Variable batch sizes from 10-59 + batch := make([]int, batchSize) + for j := range batch { + batch[j] = i*100 + j + } + q.Enqueue(batch) + + // Occasionally wait for processing to catch up + if i%100 == 99 { + barrier := q.Barrier() + <-barrier + } + } + + // Final barrier to ensure all items are processed + barrier := q.Barrier() + <-barrier + + b.ReportMetric(float64(processed.Load()), "items") +} + +// BenchmarkSteadyState tests performance under steady-state conditions. +func BenchmarkSteadyState(b *testing.B) { + q := New[int](context.Background(), 10*time.Second) + + q.Start(func(ctx context.Context, val int) { + // Fast processing + }) + defer q.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + q.Enqueue([]int{i}) + } + + barrier := q.Barrier() + <-barrier +} + +// BenchmarkBurstThenDrain tests memory efficiency in burst-then-drain scenarios. +// This pattern exposes inefficiencies in slice-based implementations where +// the underlying array never shrinks. The ringbuffer should compact efficiently. +func BenchmarkBurstThenDrain(b *testing.B) { + q := New[int](context.Background(), 10*time.Second) + + processDelay := atomic.Bool{} + q.Start(func(ctx context.Context, val int) { + if processDelay.Load() { + time.Sleep(100 * time.Microsecond) + } + }) + defer q.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Burst phase: enqueue many items with slow processing + processDelay.Store(true) + largeBatch := make([]int, 1000) + for j := range largeBatch { + largeBatch[j] = i*1000 + j + } + q.Enqueue(largeBatch) + + // Let queue fill up a bit + time.Sleep(500 * time.Microsecond) + + // Drain phase: speed up processing + processDelay.Store(false) + barrier := q.Barrier() + <-barrier + + // Allow time for compaction to potentially occur + time.Sleep(100 * time.Microsecond) + } +} |
