diff options
| author | Jordan Whited <jordan@tailscale.com> | 2024-12-17 20:23:52 -0800 |
|---|---|---|
| committer | Jordan Whited <jordan@tailscale.com> | 2024-12-17 20:23:52 -0800 |
| commit | 9b9d74ccf5986ec111113ae6647759b3611454f9 (patch) | |
| tree | bbaff9616693a3545a80e81691a6315ccc2093d0 | |
| parent | 9d8c0c665e22c1adbf4b0e12b2d85cdb882d92ac (diff) | |
| download | tailscale-jwhited/qd-slice.tar.xz tailscale-jwhited/qd-slice.zip | |
prober: qd example using slicejwhited/qd-slice
Signed-off-by: Jordan Whited <jordan@tailscale.com>
| -rw-r--r-- | prober/derp.go | 51 |
1 files changed, 30 insertions, 21 deletions
diff --git a/prober/derp.go b/prober/derp.go index b3e6b28f9..379c8094c 100644 --- a/prober/derp.go +++ b/prober/derp.go @@ -38,7 +38,6 @@ import ( "tailscale.com/tailcfg" "tailscale.com/types/key" "tailscale.com/types/logger" - "tailscale.com/util/circularqueue" ) // derpProber dynamically manages several probes for each DERP server @@ -387,17 +386,13 @@ func derpProbeQueuingDelay(ctx context.Context, dm *tailcfg.DERPMap, from, to *t } func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg.DERPNode, fromc, toc *derphttp.Client, packetsDropped *expvar.Float, qdh *queuingDelayHistogram) error { - // Circular buffer to hold packet send times. It is sized to hold timings - // for up to 5 seconds when sending packets at a rate of 10 per second. - // It assumes that packets may be dropped, but that they will generally - // arrive in order. Packets arriving out of order will result in older - // packets being ignored, effectively overcounting the number of dropped - // packets. - sentTimes := circularqueue.NewFIFO(50, func(t time.Time) { - // If a sent time is evicted, that means we'll never record a timing - // for this packet, so we considered it dropped. - packetsDropped.Add(1) - }) + type txRecord struct { + at time.Time + seq uint64 + } + // TODO: slice cap should be result of arithmetic involving tx Hz and slowest bucket bounds + txRecords := make([]txRecord, 0, 50) + var txRecordsMu sync.Mutex // Send the packets. sendErrC := make(chan error, 1) @@ -408,14 +403,20 @@ func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg. t := time.NewTicker(time.Second / 10) // 10 packets per second defer t.Stop() - seq := 0 + seq := uint64(0) for { select { case <-ctx.Done(): return case <-t.C: - sentTimes.Push(time.Now()) - binary.BigEndian.PutUint64(pkt, uint64(seq)) + txRecordsMu.Lock() + if len(txRecords) == cap(txRecords) { + txRecords = append(txRecords[:0], txRecords[1:]...) + // TODO: increment drop counter + } + txRecords = append(txRecords, txRecord{time.Now(), seq}) + txRecordsMu.Unlock() + binary.BigEndian.PutUint64(pkt, seq) seq++ if err := fromc.Send(toc.SelfPublicKey(), pkt); err != nil { sendErrC <- fmt.Errorf("sending packet %w", err) @@ -443,13 +444,21 @@ func runDerpProbeQueuingDelayContinously(ctx context.Context, from, to *tailcfg. return } seq := binary.BigEndian.Uint64(v.Data) - sent := sentTimes.Pop(int(seq)) - if sent == nil { - // No sent time found, ignore - continue + txRecordsMu.Lock() + for _, record := range txRecords { + switch { + case record.seq == seq: + rtt := now.Sub(record.at) + qdh.add(rtt) // TODO: use prometheus native histo if possible + case record.seq > seq: + // No sent time found, probably a late arrival already + // recorded as drop by sender when deleted + break + case record.seq < seq: + continue + } } - qdh.add(now.Sub(*sent)) - + txRecordsMu.Unlock() case derp.KeepAliveMessage: // Silently ignore. default: |
