summaryrefslogtreecommitdiffhomepage
path: root/util/latencyqueue/latencyqueue_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'util/latencyqueue/latencyqueue_test.go')
-rw-r--r--util/latencyqueue/latencyqueue_test.go98
1 files changed, 98 insertions, 0 deletions
diff --git a/util/latencyqueue/latencyqueue_test.go b/util/latencyqueue/latencyqueue_test.go
index 6a6051ee1..dcf4b38d5 100644
--- a/util/latencyqueue/latencyqueue_test.go
+++ b/util/latencyqueue/latencyqueue_test.go
@@ -722,3 +722,101 @@ func TestZeroMaxLag(t *testing.T) {
}
})
}
+
+// BenchmarkVariableLoad tests memory efficiency under variable load patterns.
+// The ringbuffer-based implementation should efficiently handle:
+// - Bursts of enqueues followed by processing
+// - Growing and shrinking queue sizes
+// - Memory compaction during idle periods
+func BenchmarkVariableLoad(b *testing.B) {
+ q := New[int](context.Background(), 10*time.Second)
+
+ processed := atomic.Int64{}
+ q.Start(func(ctx context.Context, val int) {
+ processed.Add(1)
+ // Simulate some processing time
+ time.Sleep(10 * time.Microsecond)
+ })
+ defer q.Close()
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ // Simulate bursty traffic - enqueue in batches
+ batchSize := 10 + (i % 50) // Variable batch sizes from 10-59
+ batch := make([]int, batchSize)
+ for j := range batch {
+ batch[j] = i*100 + j
+ }
+ q.Enqueue(batch)
+
+ // Occasionally wait for processing to catch up
+ if i%100 == 99 {
+ barrier := q.Barrier()
+ <-barrier
+ }
+ }
+
+ // Final barrier to ensure all items are processed
+ barrier := q.Barrier()
+ <-barrier
+
+ b.ReportMetric(float64(processed.Load()), "items")
+}
+
+// BenchmarkSteadyState tests performance under steady-state conditions.
+func BenchmarkSteadyState(b *testing.B) {
+ q := New[int](context.Background(), 10*time.Second)
+
+ q.Start(func(ctx context.Context, val int) {
+ // Fast processing
+ })
+ defer q.Close()
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ q.Enqueue([]int{i})
+ }
+
+ barrier := q.Barrier()
+ <-barrier
+}
+
+// BenchmarkBurstThenDrain tests memory efficiency in burst-then-drain scenarios.
+// This pattern exposes inefficiencies in slice-based implementations where
+// the underlying array never shrinks. The ringbuffer should compact efficiently.
+func BenchmarkBurstThenDrain(b *testing.B) {
+ q := New[int](context.Background(), 10*time.Second)
+
+ processDelay := atomic.Bool{}
+ q.Start(func(ctx context.Context, val int) {
+ if processDelay.Load() {
+ time.Sleep(100 * time.Microsecond)
+ }
+ })
+ defer q.Close()
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ // Burst phase: enqueue many items with slow processing
+ processDelay.Store(true)
+ largeBatch := make([]int, 1000)
+ for j := range largeBatch {
+ largeBatch[j] = i*1000 + j
+ }
+ q.Enqueue(largeBatch)
+
+ // Let queue fill up a bit
+ time.Sleep(500 * time.Microsecond)
+
+ // Drain phase: speed up processing
+ processDelay.Store(false)
+ barrier := q.Barrier()
+ <-barrier
+
+ // Allow time for compaction to potentially occur
+ time.Sleep(100 * time.Microsecond)
+ }
+}