summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorBrad Fitzpatrick <bradfitz@tailscale.com>2021-07-16 11:53:19 -0700
committerjulianknodt <julianknodt@gmail.com>2021-07-27 15:17:56 -0700
commit0a317a963b3950d0bc91352648ed926202ba1e2f (patch)
tree4f5a275a20b5c42b35c565e5989b380b904edf8e
parent1b14e1d6bdca832c84bd01a7f17fb3a246c9176e (diff)
downloadtailscale-jknodt/derp_flow.tar.xz
tailscale-jknodt/derp_flow.zip
derp: add session flowsjknodt/derp_flow
A flow tracks exchanges from some source to destination, for the purpose of better understanding who is using DERP. It currently tracks the number of packets sent, as well as the number of bytes sent with the packets. Signed-off-by: julianknodt <julianknodt@gmail.com>
-rw-r--r--derp/derp_server.go132
1 files changed, 121 insertions, 11 deletions
diff --git a/derp/derp_server.go b/derp/derp_server.go
index 42855e0b3..3d91bffea 100644
--- a/derp/derp_server.go
+++ b/derp/derp_server.go
@@ -41,6 +41,7 @@ import (
"tailscale.com/client/tailscale"
"tailscale.com/disco"
"tailscale.com/metrics"
+ "tailscale.com/syncs"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/version"
@@ -129,6 +130,8 @@ type Server struct {
removePktForwardOther expvar.Int
avgQueueDuration *uint64 // In milliseconds; accessed atomically
+ avgFlowDuration *uint64 // In seconds; accessed atomically
+
// verifyClients only accepts client connections to the DERP server if the clientKey is a
// known peer in the network, as specified by a running tailscaled's client's local api.
verifyClients bool
@@ -197,6 +200,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
watchers: map[*sclient]bool{},
sentTo: map[key.Public]map[key.Public]int64{},
avgQueueDuration: new(uint64),
+ avgFlowDuration: new(uint64),
keyOfAddr: map[netaddr.IPPort]key.Public{},
}
s.initMetacert()
@@ -514,6 +518,8 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
// minute is less than the client's 2 minute
// inactivity timeout.
replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100),
+
+ flows: make(map[key.Public]*flow),
}
if c.canMesh {
@@ -592,6 +598,37 @@ func (c *sclient) run(ctx context.Context) error {
if err != nil {
return err
}
+
+ c.SweepFlows()
+ }
+}
+
+// SweepFlows deletes old flows from this client, removing any old items and
+// updating avgFlowDuration.
+func (c *sclient) SweepFlows() {
+
+ // delete flows which are dirty, but only when a large number have accumulated
+ if atomic.LoadUint32(&c.dirtyFlows) > 5 {
+ total := 0
+ totalFlowDurationSec := 0.0
+ // TODO do we need to iterate through the entire flow set?
+ for k, f := range c.flows {
+ if f.idle.Get() {
+ delete(c.flows, k)
+ total++
+ totalFlowDurationSec += f.closedAt.Sub(f.createdAt).Seconds()
+ }
+ }
+ atomic.AddUint32(&c.dirtyFlows, ^uint32(total-1))
+
+ avgFlowDurationSec := totalFlowDurationSec / float64(total)
+ for {
+ old := atomic.LoadUint64(c.s.avgFlowDuration)
+ newAvg := expMovingAverage(math.Float64frombits(old), avgFlowDurationSec, 0.1)
+ if atomic.CompareAndSwapUint64(c.s.avgFlowDuration, old, math.Float64bits(newAvg)) {
+ break
+ }
+ }
}
}
@@ -704,20 +741,47 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
}
var fwd PacketForwarder
- s.mu.Lock()
- dst := s.clients[dstKey]
- if dst == nil {
- fwd = s.clientsMesh[dstKey]
- } else {
- s.notePeerSendLocked(c.key, dst)
+ var dst *sclient
+
+ f, ok := c.flows[dstKey]
+ if ok {
+ if f.dst != nil {
+ select {
+ case <-f.dst.done:
+ default:
+ dst = f.dst
+ }
+ }
+ } else /* dst still is nil */ {
+ s.mu.Lock()
+ dst = s.clients[dstKey]
+ if dst == nil {
+ fwd = s.clientsMesh[dstKey]
+ } else {
+ s.notePeerSendLocked(c.key, dst)
+ }
+ s.mu.Unlock()
+
+ if dst != nil {
+ f = &flow{
+ c: c,
+ dst: dst,
+ idleTimer: time.AfterFunc(FlowTimerDuration, f.close),
+ createdAt: time.Now(),
+ }
+ c.flows[dstKey] = f
+ }
+ }
+
+ if f != nil {
+ f.sawPacket(uint64(len(contents)))
}
- s.mu.Unlock()
if dst == nil {
if fwd != nil {
s.packetsForwardedOut.Add(1)
if err := fwd.ForwardPacket(c.key, dstKey, contents); err != nil {
- // TODO:
+ // TODO: failed to forward packet when intended to send it.
return nil
}
return nil
@@ -726,12 +790,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return nil
}
- p := pkt{
+ return c.sendPkt(dst, pkt{
bs: contents,
enqueuedAt: time.Now(),
src: c.key,
- }
- return c.sendPkt(dst, p)
+ })
}
// dropReason is why we dropped a DERP frame.
@@ -971,6 +1034,46 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d
return srcKey, dstKey, contents, nil
}
+// FlowTimerDuration is the time which we consider a flow to be active.
+// After this time, we consider it to be idle, and can never return to an active state.
+var FlowTimerDuration = time.Minute
+
+type flow struct {
+ c *sclient // source of flow
+ dst *sclient // non-nil for local client; nil means use forwarder
+
+ // Various stats:
+ pkts uint64
+ bytes uint64
+
+ idleTimer *time.Timer
+ createdAt time.Time
+ closedAt time.Time
+
+ // idle indicates this flow expired once, and should be cleaned up.
+ // Once set, it should always remain true.
+ idle syncs.AtomicBool
+}
+
+func (f *flow) sawPacket(bytes uint64) {
+ if f.idle.Get() {
+ return
+ }
+ f.pkts += 1
+ f.bytes += bytes
+ // do not care if f.close is called twice, as it's idempotent.
+ f.idleTimer.Reset(FlowTimerDuration)
+}
+
+func (f *flow) close() {
+ if changed := f.idle.Swap(true); !changed {
+ return
+ }
+ f.closedAt = time.Now()
+ atomic.AddUint32(&f.c.dirtyFlows, 1)
+ // TODO(jknodt): log items to the server here? Or lazily log them when the flow is closed?
+}
+
// sclient is a client connection to the server.
//
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
@@ -1001,6 +1104,10 @@ type sclient struct {
connectedAt time.Time
preferred bool
+ // dirtyFlows is the number of dirty flows owned by this client that needs to be cleaned up.
+ dirtyFlows uint32
+ flows map[key.Public]*flow
+
// Owned by sender, not thread-safe.
bw *bufio.Writer
@@ -1417,6 +1524,9 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("average_queue_duration_ms", expvar.Func(func() interface{} {
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
}))
+ m.Set("average_flow_duration_sec", expvar.Func(func() interface{} {
+ return math.Float64frombits(atomic.LoadUint64(s.avgFlowDuration))
+ }))
var expvarVersion expvar.String
expvarVersion.Set(version.Long)
m.Set("version", &expvarVersion)