summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorBrad Fitzpatrick <bradfitz@tailscale.com>2021-07-16 11:53:19 -0700
committerBrad Fitzpatrick <bradfitz@tailscale.com>2021-07-16 11:53:19 -0700
commit2f7b567bfe1aef8e3c392c0e855a036dcb4e7d19 (patch)
treed9a9542fb86a1095db8d999817cdf6257cee9136
parent391207bbcf11171bd3fbf6a8667f94fea0b1e23f (diff)
downloadtailscale-bradfitz/derp_flow.tar.xz
tailscale-bradfitz/derp_flow.zip
derp: WIP notes on adding a flow typebradfitz/derp_flow
very rough, uncompiled.
-rw-r--r--derp/derp_server.go63
1 files changed, 57 insertions, 6 deletions
diff --git a/derp/derp_server.go b/derp/derp_server.go
index 32b09fb7b..75a10ab1c 100644
--- a/derp/derp_server.go
+++ b/derp/derp_server.go
@@ -490,6 +490,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
discoSendQueue: make(chan pkt, perClientSendQueueDepth),
peerGone: make(chan key.Public),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
+ flows: make(map[key.Public]*flow),
}
if c.canMesh {
c.meshUpdate = make(chan struct{})
@@ -536,6 +537,8 @@ func (c *sclient) run(ctx context.Context) error {
}
return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err)
}
+ // read c.crapIsDirty atomic bool, do some flow cleanup
+
switch ft {
case frameNotePreferred:
err = c.handleFrameNotePreferred(ft, fl)
@@ -665,14 +668,41 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
}
var fwd PacketForwarder
- s.mu.Lock()
- dst := s.clients[dstKey]
+ var dst *sclient
+
+ var f *flow
+ var ok bool
+ if f, ok = c.flows[dstKey]; ok {
+ if f.dst != nil {
+ select {
+ case <-f.dst.done:
+ flow.close()
+ default:
+ dst = f.dst
+ }
+ }
+ }
if dst == nil {
- fwd = s.clientsMesh[dstKey]
- } else {
- s.notePeerSendLocked(c.key, dst)
+ s.mu.Lock()
+ dst = s.clients[dstKey]
+ if dst == nil {
+ fwd = s.clientsMesh[dstKey]
+ } else {
+ s.notePeerSendLocked(c.key, dst)
+ }
+ s.mu.Unlock()
}
- s.mu.Unlock()
+ if f == nil {
+ f = &flow{
+ c: c,
+ dst: dst,
+ }
+ f.idleTimer = time.AfterFunc(f.onIdle, time.Minute)
+ c.flows[dstKey] = f
+ }
+ f.pkts++
+ f.bytes += int64(len(contents))
+ f.markActive()
if dst == nil {
if fwd != nil {
@@ -932,6 +962,26 @@ func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, d
return srcKey, dstKey, contents, nil
}
+type flow struct {
+ c *sclient // source of flow
+ dst *sclient // non-nil for local client; nil means use forwarder
+
+ // Various stats:
+ pkts int64
+ bytes int64
+
+ lastLog time.Time // etc
+ idleTimer *time.Timer
+}
+
+func (f *flow) markActive() {
+ f.idleTimer.Reset(time.Minute)
+}
+
+func (f *flow) onIdle() {
+ panic("TODOXXX")
+}
+
// sclient is a client connection to the server.
//
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
@@ -956,6 +1006,7 @@ type sclient struct {
br *bufio.Reader
connectedAt time.Time
preferred bool
+ flows map[key.Public]*flow
// Owned by sender, not thread-safe.
bw *bufio.Writer