diff options
| -rw-r--r-- | derp/derp_server.go | 132 |
1 files changed, 121 insertions, 11 deletions
diff --git a/derp/derp_server.go b/derp/derp_server.go index 42855e0b3..3d91bffea 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -41,6 +41,7 @@ import ( "tailscale.com/client/tailscale" "tailscale.com/disco" "tailscale.com/metrics" + "tailscale.com/syncs" "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/version" @@ -129,6 +130,8 @@ type Server struct { removePktForwardOther expvar.Int avgQueueDuration *uint64 // In milliseconds; accessed atomically + avgFlowDuration *uint64 // In seconds; accessed atomically + // verifyClients only accepts client connections to the DERP server if the clientKey is a // known peer in the network, as specified by a running tailscaled's client's local api. verifyClients bool @@ -197,6 +200,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server { watchers: map[*sclient]bool{}, sentTo: map[key.Public]map[key.Public]int64{}, avgQueueDuration: new(uint64), + avgFlowDuration: new(uint64), keyOfAddr: map[netaddr.IPPort]key.Public{}, } s.initMetacert() @@ -514,6 +518,8 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN // minute is less than the client's 2 minute // inactivity timeout. replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100), + + flows: make(map[key.Public]*flow), } if c.canMesh { @@ -592,6 +598,37 @@ func (c *sclient) run(ctx context.Context) error { if err != nil { return err } + + c.SweepFlows() + } +} + +// SweepFlows deletes old flows from this client, removing any old items and +// updating avgFlowDuration. +func (c *sclient) SweepFlows() { + + // delete flows which are dirty, but only when a large number have accumulated + if atomic.LoadUint32(&c.dirtyFlows) > 5 { + total := 0 + totalFlowDurationSec := 0.0 + // TODO do we need to iterate through the entire flow set? + for k, f := range c.flows { + if f.idle.Get() { + delete(c.flows, k) + total++ + totalFlowDurationSec += f.closedAt.Sub(f.createdAt).Seconds() + } + } + atomic.AddUint32(&c.dirtyFlows, ^uint32(total-1)) + + avgFlowDurationSec := totalFlowDurationSec / float64(total) + for { + old := atomic.LoadUint64(c.s.avgFlowDuration) + newAvg := expMovingAverage(math.Float64frombits(old), avgFlowDurationSec, 0.1) + if atomic.CompareAndSwapUint64(c.s.avgFlowDuration, old, math.Float64bits(newAvg)) { + break + } + } } } @@ -704,20 +741,47 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } var fwd PacketForwarder - s.mu.Lock() - dst := s.clients[dstKey] - if dst == nil { - fwd = s.clientsMesh[dstKey] - } else { - s.notePeerSendLocked(c.key, dst) + var dst *sclient + + f, ok := c.flows[dstKey] + if ok { + if f.dst != nil { + select { + case <-f.dst.done: + default: + dst = f.dst + } + } + } else /* dst still is nil */ { + s.mu.Lock() + dst = s.clients[dstKey] + if dst == nil { + fwd = s.clientsMesh[dstKey] + } else { + s.notePeerSendLocked(c.key, dst) + } + s.mu.Unlock() + + if dst != nil { + f = &flow{ + c: c, + dst: dst, + idleTimer: time.AfterFunc(FlowTimerDuration, f.close), + createdAt: time.Now(), + } + c.flows[dstKey] = f + } + } + + if f != nil { + f.sawPacket(uint64(len(contents))) } - s.mu.Unlock() if dst == nil { if fwd != nil { s.packetsForwardedOut.Add(1) if err := fwd.ForwardPacket(c.key, dstKey, contents); err != nil { - // TODO: + // TODO: failed to forward packet when intended to send it. return nil } return nil @@ -726,12 +790,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { return nil } - p := pkt{ + return c.sendPkt(dst, pkt{ bs: contents, enqueuedAt: time.Now(), src: c.key, - } - return c.sendPkt(dst, p) + }) } // dropReason is why we dropped a DERP frame. @@ -971,6 +1034,46 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d return srcKey, dstKey, contents, nil } +// FlowTimerDuration is the time which we consider a flow to be active. +// After this time, we consider it to be idle, and can never return to an active state. +var FlowTimerDuration = time.Minute + +type flow struct { + c *sclient // source of flow + dst *sclient // non-nil for local client; nil means use forwarder + + // Various stats: + pkts uint64 + bytes uint64 + + idleTimer *time.Timer + createdAt time.Time + closedAt time.Time + + // idle indicates this flow expired once, and should be cleaned up. + // Once set, it should always remain true. + idle syncs.AtomicBool +} + +func (f *flow) sawPacket(bytes uint64) { + if f.idle.Get() { + return + } + f.pkts += 1 + f.bytes += bytes + // do not care if f.close is called twice, as it's idempotent. + f.idleTimer.Reset(FlowTimerDuration) +} + +func (f *flow) close() { + if changed := f.idle.Swap(true); !changed { + return + } + f.closedAt = time.Now() + atomic.AddUint32(&f.c.dirtyFlows, 1) + // TODO(jknodt): log items to the server here? Or lazily log them when the flow is closed? +} + // sclient is a client connection to the server. // // (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go) @@ -1001,6 +1104,10 @@ type sclient struct { connectedAt time.Time preferred bool + // dirtyFlows is the number of dirty flows owned by this client that needs to be cleaned up. + dirtyFlows uint32 + flows map[key.Public]*flow + // Owned by sender, not thread-safe. bw *bufio.Writer @@ -1417,6 +1524,9 @@ func (s *Server) ExpVar() expvar.Var { m.Set("average_queue_duration_ms", expvar.Func(func() interface{} { return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration)) })) + m.Set("average_flow_duration_sec", expvar.Func(func() interface{} { + return math.Float64frombits(atomic.LoadUint64(s.avgFlowDuration)) + })) var expvarVersion expvar.String expvarVersion.Set(version.Long) m.Set("version", &expvarVersion) |
