summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJordan Whited <jordan@tailscale.com>2024-12-17 20:23:52 -0800
committerJordan Whited <jordan@tailscale.com>2024-12-17 20:23:52 -0800
commit9b9d74ccf5986ec111113ae6647759b3611454f9 (patch)
treebbaff9616693a3545a80e81691a6315ccc2093d0
parent9d8c0c665e22c1adbf4b0e12b2d85cdb882d92ac (diff)
downloadtailscale-jwhited/qd-slice.tar.xz
tailscale-jwhited/qd-slice.zip
prober: qd example using slicejwhited/qd-slice
Signed-off-by: Jordan Whited <jordan@tailscale.com>
-rw-r--r--prober/derp.go51
1 files changed, 30 insertions, 21 deletions
diff --git a/prober/derp.go b/prober/derp.go
index b3e6b28f9..379c8094c 100644
--- a/prober/derp.go
+++ b/prober/derp.go
@@ -38,7 +38,6 @@ import (
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/types/logger"
- "tailscale.com/util/circularqueue"
)
// derpProber dynamically manages several probes for each DERP server
@@ -387,17 +386,13 @@ func derpProbeQueuingDelay(ctx context.Context, dm *tailcfg.DERPMap, from, to *t
}
func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, packetsDropped *expvar.Float, qdh *queuingDelayHistogram) error {
- // Circular buffer to hold packet send times. It is sized to hold timings
- // for up to 5 seconds when sending packets at a rate of 10 per second.
- // It assumes that packets may be dropped, but that they will generally
- // arrive in order. Packets arriving out of order will result in older
- // packets being ignored, effectively overcounting the number of dropped
- // packets.
- sentTimes := circularqueue.NewFIFO(50, func(t time.Time) {
- // If a sent time is evicted, that means we'll never record a timing
- // for this packet, so we considered it dropped.
- packetsDropped.Add(1)
- })
+ type txRecord struct {
+ at time.Time
+ seq uint64
+ }
+ // TODO: slice cap should be result of arithmetic involving tx Hz and slowest bucket bounds
+ txRecords := make([]txRecord, 0, 50)
+ var txRecordsMu sync.Mutex
// Send the packets.
sendErrC := make(chan error, 1)
@@ -408,14 +403,20 @@ func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.
t := time.NewTicker(time.Second / 10) // 10 packets per second
defer t.Stop()
- seq := 0
+ seq := uint64(0)
for {
select {
case <-ctx.Done():
return
case <-t.C:
- sentTimes.Push(time.Now())
- binary.BigEndian.PutUint64(pkt, uint64(seq))
+ txRecordsMu.Lock()
+ if len(txRecords) == cap(txRecords) {
+ txRecords = append(txRecords[:0], txRecords[1:]...)
+ // TODO: increment drop counter
+ }
+ txRecords = append(txRecords, txRecord{time.Now(), seq})
+ txRecordsMu.Unlock()
+ binary.BigEndian.PutUint64(pkt, seq)
seq++
if err := fromc.Send(toc.SelfPublicKey(), pkt); err != nil {
sendErrC <- fmt.Errorf("sending packet %w", err)
@@ -443,13 +444,21 @@ func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.
return
}
seq := binary.BigEndian.Uint64(v.Data)
- sent := sentTimes.Pop(int(seq))
- if sent == nil {
- // No sent time found, ignore
- continue
+ txRecordsMu.Lock()
+ for _, record := range txRecords {
+ switch {
+ case record.seq == seq:
+ rtt := now.Sub(record.at)
+ qdh.add(rtt) // TODO: use prometheus native histo if possible
+ case record.seq > seq:
+ // No sent time found, probably a late arrival already
+ // recorded as drop by sender when deleted
+ break
+ case record.seq < seq:
+ continue
+ }
}
- qdh.add(now.Sub(*sent))
-
+ txRecordsMu.Unlock()
case derp.KeepAliveMessage:
// Silently ignore.
default: