summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--derp/derp_server.go132
1 files changed, 121 insertions, 11 deletions
diff --git a/derp/derp_server.go b/derp/derp_server.go
index 42855e0b3..3d91bffea 100644
--- a/derp/derp_server.go
+++ b/derp/derp_server.go
@@ -41,6 +41,7 @@ import (
"tailscale.com/client/tailscale"
"tailscale.com/disco"
"tailscale.com/metrics"
+ "tailscale.com/syncs"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/version"
@@ -129,6 +130,8 @@ type Server struct {
removePktForwardOther expvar.Int
avgQueueDuration *uint64 // In milliseconds; accessed atomically
+ avgFlowDuration *uint64 // In seconds; accessed atomically
+
// verifyClients only accepts client connections to the DERP server if the clientKey is a
// known peer in the network, as specified by a running tailscaled's client's local api.
verifyClients bool
@@ -197,6 +200,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
watchers: map[*sclient]bool{},
sentTo: map[key.Public]map[key.Public]int64{},
avgQueueDuration: new(uint64),
+ avgFlowDuration: new(uint64),
keyOfAddr: map[netaddr.IPPort]key.Public{},
}
s.initMetacert()
@@ -514,6 +518,8 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
// minute is less than the client's 2 minute
// inactivity timeout.
replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100),
+
+ flows: make(map[key.Public]*flow),
}
if c.canMesh {
@@ -592,6 +598,37 @@ func (c *sclient) run(ctx context.Context) error {
if err != nil {
return err
}
+
+ c.SweepFlows()
+ }
+}
+
+// SweepFlows deletes old flows from this client, removing any old items and
+// updating avgFlowDuration.
+func (c *sclient) SweepFlows() {
+
+ // delete flows which are dirty, but only when a large number have accumulated
+ if atomic.LoadUint32(&c.dirtyFlows) > 5 {
+ total := 0
+ totalFlowDurationSec := 0.0
+ // TODO do we need to iterate through the entire flow set?
+ for k, f := range c.flows {
+ if f.idle.Get() {
+ delete(c.flows, k)
+ total++
+ totalFlowDurationSec += f.closedAt.Sub(f.createdAt).Seconds()
+ }
+ }
+ atomic.AddUint32(&c.dirtyFlows, ^uint32(total-1))
+
+ avgFlowDurationSec := totalFlowDurationSec / float64(total)
+ for {
+ old := atomic.LoadUint64(c.s.avgFlowDuration)
+ newAvg := expMovingAverage(math.Float64frombits(old), avgFlowDurationSec, 0.1)
+ if atomic.CompareAndSwapUint64(c.s.avgFlowDuration, old, math.Float64bits(newAvg)) {
+ break
+ }
+ }
}
}
@@ -704,20 +741,47 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
}
var fwd PacketForwarder
- s.mu.Lock()
- dst := s.clients[dstKey]
- if dst == nil {
- fwd = s.clientsMesh[dstKey]
- } else {
- s.notePeerSendLocked(c.key, dst)
+ var dst *sclient
+
+ f, ok := c.flows[dstKey]
+ if ok {
+ if f.dst != nil {
+ select {
+ case <-f.dst.done:
+ default:
+ dst = f.dst
+ }
+ }
+ } else /* dst still is nil */ {
+ s.mu.Lock()
+ dst = s.clients[dstKey]
+ if dst == nil {
+ fwd = s.clientsMesh[dstKey]
+ } else {
+ s.notePeerSendLocked(c.key, dst)
+ }
+ s.mu.Unlock()
+
+ if dst != nil {
+ f = &flow{
+ c: c,
+ dst: dst,
+ idleTimer: time.AfterFunc(FlowTimerDuration, f.close),
+ createdAt: time.Now(),
+ }
+ c.flows[dstKey] = f
+ }
+ }
+
+ if f != nil {
+ f.sawPacket(uint64(len(contents)))
}
- s.mu.Unlock()
if dst == nil {
if fwd != nil {
s.packetsForwardedOut.Add(1)
if err := fwd.ForwardPacket(c.key, dstKey, contents); err != nil {
- // TODO:
+ // TODO: failed to forward packet when intended to send it.
return nil
}
return nil
@@ -726,12 +790,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return nil
}
- p := pkt{
+ return c.sendPkt(dst, pkt{
bs: contents,
enqueuedAt: time.Now(),
src: c.key,
- }
- return c.sendPkt(dst, p)
+ })
}
// dropReason is why we dropped a DERP frame.
@@ -971,6 +1034,46 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d
return srcKey, dstKey, contents, nil
}
+// FlowTimerDuration is the time which we consider a flow to be active.
+// After this time, we consider it to be idle, and can never return to an active state.
+var FlowTimerDuration = time.Minute
+
+type flow struct {
+ c *sclient // source of flow
+ dst *sclient // non-nil for local client; nil means use forwarder
+
+ // Various stats:
+ pkts uint64
+ bytes uint64
+
+ idleTimer *time.Timer
+ createdAt time.Time
+ closedAt time.Time
+
+ // idle indicates this flow expired once, and should be cleaned up.
+ // Once set, it should always remain true.
+ idle syncs.AtomicBool
+}
+
+func (f *flow) sawPacket(bytes uint64) {
+ if f.idle.Get() {
+ return
+ }
+ f.pkts += 1
+ f.bytes += bytes
+ // do not care if f.close is called twice, as it's idempotent.
+ f.idleTimer.Reset(FlowTimerDuration)
+}
+
+func (f *flow) close() {
+ if changed := f.idle.Swap(true); !changed {
+ return
+ }
+ f.closedAt = time.Now()
+ atomic.AddUint32(&f.c.dirtyFlows, 1)
+ // TODO(jknodt): log items to the server here? Or lazily log them when the flow is closed?
+}
+
// sclient is a client connection to the server.
//
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
@@ -1001,6 +1104,10 @@ type sclient struct {
connectedAt time.Time
preferred bool
+ // dirtyFlows is the number of dirty flows owned by this client that needs to be cleaned up.
+ dirtyFlows uint32
+ flows map[key.Public]*flow
+
// Owned by sender, not thread-safe.
bw *bufio.Writer
@@ -1417,6 +1524,9 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("average_queue_duration_ms", expvar.Func(func() interface{} {
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
}))
+ m.Set("average_flow_duration_sec", expvar.Func(func() interface{} {
+ return math.Float64frombits(atomic.LoadUint64(s.avgFlowDuration))
+ }))
var expvarVersion expvar.String
expvarVersion.Set(version.Long)
m.Set("version", &expvarVersion)