diff options
| author | Brad Fitzpatrick <bradfitz@tailscale.com> | 2021-07-16 11:53:19 -0700 |
|---|---|---|
| committer | julianknodt <julianknodt@gmail.com> | 2021-07-27 15:17:56 -0700 |
| commit | 0a317a963b3950d0bc91352648ed926202ba1e2f (patch) | |
| tree | 4f5a275a20b5c42b35c565e5989b380b904edf8e | |
| parent | 1b14e1d6bdca832c84bd01a7f17fb3a246c9176e (diff) | |
| download | tailscale-jknodt/derp_flow.tar.xz tailscale-jknodt/derp_flow.zip | |
derp: add session flowsjknodt/derp_flow
A flow tracks exchanges from some source to destination, for the purpose of better understanding
who is using DERP. It currently tracks the number of packets sent, as well as the number of
bytes sent with the packets.
Signed-off-by: julianknodt <julianknodt@gmail.com>
| -rw-r--r-- | derp/derp_server.go | 132 |
1 files changed, 121 insertions, 11 deletions
diff --git a/derp/derp_server.go b/derp/derp_server.go index 42855e0b3..3d91bffea 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -41,6 +41,7 @@ import ( "tailscale.com/client/tailscale" "tailscale.com/disco" "tailscale.com/metrics" + "tailscale.com/syncs" "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/version" @@ -129,6 +130,8 @@ type Server struct { removePktForwardOther expvar.Int avgQueueDuration *uint64 // In milliseconds; accessed atomically + avgFlowDuration *uint64 // In seconds; accessed atomically + // verifyClients only accepts client connections to the DERP server if the clientKey is a // known peer in the network, as specified by a running tailscaled's client's local api. verifyClients bool @@ -197,6 +200,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server { watchers: map[*sclient]bool{}, sentTo: map[key.Public]map[key.Public]int64{}, avgQueueDuration: new(uint64), + avgFlowDuration: new(uint64), keyOfAddr: map[netaddr.IPPort]key.Public{}, } s.initMetacert() @@ -514,6 +518,8 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN // minute is less than the client's 2 minute // inactivity timeout. replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100), + + flows: make(map[key.Public]*flow), } if c.canMesh { @@ -592,6 +598,37 @@ func (c *sclient) run(ctx context.Context) error { if err != nil { return err } + + c.SweepFlows() + } +} + +// SweepFlows deletes old flows from this client, removing any old items and +// updating avgFlowDuration. +func (c *sclient) SweepFlows() { + + // delete flows which are dirty, but only when a large number have accumulated + if atomic.LoadUint32(&c.dirtyFlows) > 5 { + total := 0 + totalFlowDurationSec := 0.0 + // TODO do we need to iterate through the entire flow set? + for k, f := range c.flows { + if f.idle.Get() { + delete(c.flows, k) + total++ + totalFlowDurationSec += f.closedAt.Sub(f.createdAt).Seconds() + } + } + atomic.AddUint32(&c.dirtyFlows, ^uint32(total-1)) + + avgFlowDurationSec := totalFlowDurationSec / float64(total) + for { + old := atomic.LoadUint64(c.s.avgFlowDuration) + newAvg := expMovingAverage(math.Float64frombits(old), avgFlowDurationSec, 0.1) + if atomic.CompareAndSwapUint64(c.s.avgFlowDuration, old, math.Float64bits(newAvg)) { + break + } + } } } @@ -704,20 +741,47 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } var fwd PacketForwarder - s.mu.Lock() - dst := s.clients[dstKey] - if dst == nil { - fwd = s.clientsMesh[dstKey] - } else { - s.notePeerSendLocked(c.key, dst) + var dst *sclient + + f, ok := c.flows[dstKey] + if ok { + if f.dst != nil { + select { + case <-f.dst.done: + default: + dst = f.dst + } + } + } else /* dst still is nil */ { + s.mu.Lock() + dst = s.clients[dstKey] + if dst == nil { + fwd = s.clientsMesh[dstKey] + } else { + s.notePeerSendLocked(c.key, dst) + } + s.mu.Unlock() + + if dst != nil { + f = &flow{ + c: c, + dst: dst, + idleTimer: time.AfterFunc(FlowTimerDuration, f.close), + createdAt: time.Now(), + } + c.flows[dstKey] = f + } + } + + if f != nil { + f.sawPacket(uint64(len(contents))) } - s.mu.Unlock() if dst == nil { if fwd != nil { s.packetsForwardedOut.Add(1) if err := fwd.ForwardPacket(c.key, dstKey, contents); err != nil { - // TODO: + // TODO: failed to forward packet when intended to send it. return nil } return nil @@ -726,12 +790,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { return nil } - p := pkt{ + return c.sendPkt(dst, pkt{ bs: contents, enqueuedAt: time.Now(), src: c.key, - } - return c.sendPkt(dst, p) + }) } // dropReason is why we dropped a DERP frame. @@ -971,6 +1034,46 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d return srcKey, dstKey, contents, nil } +// FlowTimerDuration is the time which we consider a flow to be active. +// After this time, we consider it to be idle, and can never return to an active state. +var FlowTimerDuration = time.Minute + +type flow struct { + c *sclient // source of flow + dst *sclient // non-nil for local client; nil means use forwarder + + // Various stats: + pkts uint64 + bytes uint64 + + idleTimer *time.Timer + createdAt time.Time + closedAt time.Time + + // idle indicates this flow expired once, and should be cleaned up. + // Once set, it should always remain true. + idle syncs.AtomicBool +} + +func (f *flow) sawPacket(bytes uint64) { + if f.idle.Get() { + return + } + f.pkts += 1 + f.bytes += bytes + // do not care if f.close is called twice, as it's idempotent. + f.idleTimer.Reset(FlowTimerDuration) +} + +func (f *flow) close() { + if changed := f.idle.Swap(true); !changed { + return + } + f.closedAt = time.Now() + atomic.AddUint32(&f.c.dirtyFlows, 1) + // TODO(jknodt): log items to the server here? Or lazily log them when the flow is closed? +} + // sclient is a client connection to the server. // // (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go) @@ -1001,6 +1104,10 @@ type sclient struct { connectedAt time.Time preferred bool + // dirtyFlows is the number of dirty flows owned by this client that needs to be cleaned up. + dirtyFlows uint32 + flows map[key.Public]*flow + // Owned by sender, not thread-safe. bw *bufio.Writer @@ -1417,6 +1524,9 @@ func (s *Server) ExpVar() expvar.Var { m.Set("average_queue_duration_ms", expvar.Func(func() interface{} { return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration)) })) + m.Set("average_flow_duration_sec", expvar.Func(func() interface{} { + return math.Float64frombits(atomic.LoadUint64(s.avgFlowDuration)) + })) var expvarVersion expvar.String expvarVersion.Set(version.Long) m.Set("version", &expvarVersion) |
