summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorjulianknodt <julianknodt@gmail.com>2021-06-08 12:57:33 -0700
committerjulianknodt <julianknodt@gmail.com>2021-06-08 17:14:02 -0700
commit152707960eaeed3afcca1aacee458eb0f4d44d8a (patch)
treef5e6b532a7736777a68554e8d31f03b440df1b82
parent7423c72f8463625e6b2f1e7e2ceeb8a5d734436f (diff)
downloadtailscale-rec_in_use_after_5_sec.tar.xz
tailscale-rec_in_use_after_5_sec.zip
derp/: Switch approach for metricsrec_in_use_after_5_sec
Instead of explicitly encoding 5 and 10 seconds and checking for then, use a timer to keep track of how much total time has elapsed since a flow started, where a flow is defined for any period of time where every contiguous 3 minute window has at least 1 packet. Signed-off-by: julianknodt <julianknodt@gmail.com>
-rw-r--r--.gitignore1
-rw-r--r--derp/derp_server.go87
-rw-r--r--derp/derp_test.go18
3 files changed, 78 insertions, 28 deletions
diff --git a/.gitignore b/.gitignore
index 39a4e8702..2d067ea62 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,6 +5,7 @@
*.dll
*.so
*.dylib
+*.swp
cmd/tailscale/tailscale
cmd/tailscaled/tailscaled
diff --git a/derp/derp_server.go b/derp/derp_server.go
index 64828d58f..5a80c3018 100644
--- a/derp/derp_server.go
+++ b/derp/derp_server.go
@@ -20,6 +20,7 @@ import (
"io"
"io/ioutil"
"log"
+ "math"
"math/big"
"math/rand"
"os"
@@ -41,6 +42,9 @@ import (
var debug, _ = strconv.ParseBool(os.Getenv("DERP_DEBUG_LOGS"))
+// How long a flow is considered to be active for.
+var DerpFlowLogTime = 3 * time.Minute
+
// verboseDropKeys is the set of destination public keys that should
// verbosely log whenever DERP drops a packet.
var verboseDropKeys = map[key.Public]bool{}
@@ -120,8 +124,10 @@ type Server struct {
multiForwarderCreated expvar.Int
multiForwarderDeleted expvar.Int
removePktForwardOther expvar.Int
- clientsInUse5Sec expvar.Int // Number of clients using Derp after 5 seconds.
- clientsInUse10Sec expvar.Int
+
+ flow_mu sync.Mutex
+ activeFlows map[*sclient]flow
+ flowLogs metrics.LabelMap
mu sync.Mutex
closed bool
@@ -142,6 +148,17 @@ type Server struct {
sentTo map[key.Public]map[key.Public]int64 // src => dst => dst's latest sclient.connNum
}
+// Flow retains metrics about packets sent in serial.
+//
+type flow struct {
+ packetKinds struct {
+ disco expvar.Int
+ other expvar.Int
+ }
+ createdAt time.Time
+ timer *time.Timer
+}
+
// PacketForwarder is something that can forward packets.
//
// It's mostly an inteface for circular dependency reasons; the
@@ -184,6 +201,9 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
memSys0: ms.Sys,
watchers: map[*sclient]bool{},
sentTo: map[key.Public]map[key.Public]int64{},
+
+ activeFlows: map[*sclient]flow{},
+ flowLogs: metrics.LabelMap{Label: "minutes"},
}
s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
@@ -459,7 +479,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
done: ctx.Done(),
remoteAddr: remoteAddr,
connectedAt: time.Now(),
- lastPktAt: time.Now(),
sendQueue: make(chan pkt, perClientSendQueueDepth),
peerGone: make(chan key.Public),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
@@ -639,6 +658,8 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
if err != nil {
return fmt.Errorf("client %x: recvPacket: %v", c.key, err)
}
+ /// Do not need to block on updating metrics
+ go s.updateFlow(c, disco.LooksLikeDiscoWrapper(contents), nil)
var fwd PacketForwarder
s.mu.Lock()
@@ -666,7 +687,6 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
}
return nil
}
- c.markLastPktAt()
p := pkt{
bs: contents,
@@ -858,6 +878,46 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.Publi
return dstKey, contents, nil
}
+// Updates an active flow for a given client given that we
+// saw a packet from them
+func (s *Server) updateFlow(c *sclient, isDiscoPacket bool, done chan<- bool) {
+ s.flow_mu.Lock()
+ defer s.flow_mu.Unlock()
+
+ flow, exists := s.activeFlows[c]
+ if isDiscoPacket {
+ flow.packetKinds.disco.Add(1)
+ } else {
+ flow.packetKinds.other.Add(1)
+ }
+
+ if !exists {
+ flow.createdAt = time.Now()
+ s.activeFlows[c] = flow
+ }
+
+ if flow.timer == nil {
+ flow.timer = time.AfterFunc(DerpFlowLogTime, func() {
+ s.flow_mu.Lock()
+ defer s.flow_mu.Unlock()
+ running_time := time.Since(flow.createdAt)
+ // report how many flows were alive for how many minutes
+ s.flowLogs.Get(fmt.Sprint(math.Ceil(running_time.Minutes()))).Add(1)
+ delete(s.activeFlows, c)
+ if done != nil {
+ done <- true
+ }
+ })
+ } else {
+ if !flow.timer.Reset(DerpFlowLogTime) {
+ // If the previous timer already ran, just stop timer and exit since it either removed it
+ // from the map or is waiting on the lock.
+ flow.timer.Stop()
+ }
+ }
+
+}
+
// zpub is the key.Public zero value.
var zpub key.Public
@@ -905,7 +965,6 @@ type sclient struct {
// Owned by run, not thread-safe.
br *bufio.Reader
connectedAt time.Time
- lastPktAt time.Time
preferred bool
// Owned by sender, not thread-safe.
@@ -1067,21 +1126,6 @@ func (c *sclient) sendPeerPresent(peer key.Public) error {
return err
}
-func (c *sclient) markLastPktAt() {
- old := c.lastPktAt
- curr := time.Now()
- c.lastPktAt = curr
- // If we've been connected for over 5 seconds and haven't previously
- since_old := old.Sub(c.connectedAt)
- since_now := curr.Sub(c.connectedAt)
- if since_old <= 5*time.Second && since_now > 5*time.Second {
- c.s.clientsInUse5Sec.Add(1)
- }
- if since_old <= 10*time.Second && since_now > 10*time.Second {
- c.s.clientsInUse10Sec.Add(1)
- }
-}
-
// sendMeshUpdates drains as many mesh peerStateChange entries as
// possible into the write buffer WITHOUT flushing or otherwise
// blocking (as it holds c.s.mu while working). If it can't drain them
@@ -1310,8 +1354,7 @@ func (s *Server) ExpVar() expvar.Var {
m.Set("multiforwarder_created", &s.multiForwarderCreated)
m.Set("multiforwarder_deleted", &s.multiForwarderDeleted)
m.Set("packet_forwarder_delete_other_value", &s.removePktForwardOther)
- m.Set("clients_inuse_after_5_sec", &s.clientsInUse5Sec)
- m.Set("clients_inuse_after_10_sec", &s.clientsInUse10Sec)
+ m.Set("live_flow_durations", &s.flowLogs)
var expvarVersion expvar.String
expvarVersion.Set(version.Long)
m.Set("version", &expvarVersion)
diff --git a/derp/derp_test.go b/derp/derp_test.go
index 1c4d186ff..a429186d3 100644
--- a/derp/derp_test.go
+++ b/derp/derp_test.go
@@ -28,6 +28,10 @@ import (
"tailscale.com/types/logger"
)
+func init() {
+ DerpFlowLogTime = time.Nanosecond
+}
+
func newPrivateKey(tb testing.TB) (k key.Private) {
tb.Helper()
if _, err := crand.Read(k[:]); err != nil {
@@ -774,7 +778,7 @@ func TestForwarderRegistration(t *testing.T) {
})
}
-func TestLastAliveCounter(t *testing.T) {
+func TestDerpFlowLogging(t *testing.T) {
ts := newTestServer(t)
defer ts.close(t)
wantCounter := func(c *expvar.Int, want int) {
@@ -783,14 +787,16 @@ func TestLastAliveCounter(t *testing.T) {
t.Errorf("counter = %v; want %v", got, want)
}
}
- wantCounter(&ts.s.clientsInUse5Sec, 0)
+ wantCounter(ts.s.flowLogs.Get("1"), 0)
tc0 := newRegularClient(t, ts, "c0")
- time.Sleep(6 * time.Second)
+ defer tc0.close(t)
+ time.Sleep(10 * time.Microsecond)
for _, sc := range ts.s.clients {
- sc.markLastPktAt()
+ done := make(chan bool, 1)
+ ts.s.updateFlow(sc, false, done)
+ <-done
}
- wantCounter(&ts.s.clientsInUse5Sec, 1)
- tc0.close(t)
+ wantCounter(ts.s.flowLogs.Get("1"), 1)
}
func TestMetaCert(t *testing.T) {