summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDavid Anderson <dave@tailscale.com>2025-03-05 10:07:48 -0800
committerDavid Anderson <dave@tailscale.com>2025-03-05 10:07:59 -0800
commit43c9228f4a6750bc70d03d4af7130537171a81eb (patch)
tree52f7a93e3a15d56c47d5560d588abaeff9ad73f5
parent24d4846f007d34b160e2dba9fecf95a8357372d7 (diff)
downloadtailscale-push-otwrlsqunmon.tar.xz
tailscale-push-otwrlsqunmon.zip
WIP: internal debugging machinerypush-otwrlsqunmon
Signed-off-by: David Anderson <dave@tailscale.com>
-rw-r--r--util/eventbus/bus.go29
-rw-r--r--util/eventbus/debug.go90
-rw-r--r--util/eventbus/publish.go12
-rw-r--r--util/eventbus/subscribe.go18
4 files changed, 145 insertions, 4 deletions
diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go
index 3520be828..ce888a46b 100644
--- a/util/eventbus/bus.go
+++ b/util/eventbus/bus.go
@@ -8,6 +8,7 @@ import (
"reflect"
"slices"
"sync"
+ "time"
"tailscale.com/util/set"
)
@@ -18,12 +19,14 @@ type Bus struct {
router *worker
write chan any
snapshot chan chan []any
+ debug hook[routedEvent]
topicsMu sync.Mutex // guards everything below.
topics map[reflect.Type][]*subscribeState
// Used for introspection/debugging only, not in the normal event
// publishing path.
+ debugMu sync.Mutex
clients set.Set[*Client]
}
@@ -53,8 +56,8 @@ func (b *Bus) Client(name string) *Client {
bus: b,
pub: set.Set[publisher]{},
}
- b.topicsMu.Lock()
- defer b.topicsMu.Unlock()
+ b.debugMu.Lock()
+ defer b.debugMu.Unlock()
b.clients.Add(ret)
return ret
}
@@ -68,9 +71,9 @@ func (b *Bus) Close() {
b.router.StopAndWait()
var clients set.Set[*Client]
- b.topicsMu.Lock()
+ b.debugMu.Lock()
clients, b.clients = b.clients, set.Set[*Client]{}
- b.topicsMu.Unlock()
+ b.debugMu.Unlock()
for c := range clients {
c.Close()
@@ -91,8 +94,26 @@ func (b *Bus) pump(ctx context.Context) {
// opportunistically accept more incoming events, if we have
// queue space for it.
for !vals.Empty() {
+ popped := time.Now()
val := vals.Peek()
dests := b.dest(reflect.ValueOf(val).Type())
+ routed := time.Now()
+
+ if !b.debug.active() {
+ subscribers := make([]*Client, len(dests))
+ for i := range len(dests) {
+ subscribers[i] = dests[i].client
+ }
+ b.debug.run(routedEvent{
+ Event: val,
+ From: nil, // TODO: publisher queue needs to be of publishedEvent
+ To: subscribers,
+ Published: time.Time{}, // TODO: same
+ ReachedRouter: popped,
+ DestinationsPicked: routed,
+ })
+ }
+
for _, d := range dests {
deliverOne:
for {
diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go
new file mode 100644
index 000000000..9a7c2b643
--- /dev/null
+++ b/util/eventbus/debug.go
@@ -0,0 +1,90 @@
+package eventbus
+
+import (
+ "slices"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type publishedEvent struct {
+ Event any
+ From *Client
+ Published time.Time
+}
+
+type routedEvent struct {
+ Event any
+ From *Client // publisher's name
+ To []*Client // target names
+
+ Published time.Time
+ ReachedRouter time.Time
+ DestinationsPicked time.Time
+}
+
+type subscribedEvent struct {
+ Event any
+ From *Client
+ To *Client
+
+ Published time.Time
+ ReachedRouter time.Time
+ DestinationsPicked time.Time
+ QueuedAtSubscriber time.Time
+ NextToDeliver time.Time
+}
+
+// A hook is a hook point to which functions can be attached. When
+// the hook is run, attached callbacks are invoked synchronously, in
+// the order they were added.
+type hook[T any] struct {
+ sync.Mutex
+ fns []hookFn[T]
+}
+
+// add registers fn to be called when the hook is run.
+//
+// Returns a cleanup function that unregisters fn when called.
+func (h *hook[T]) add(fn func(T)) (remove func()) {
+ id := hookID.Add(1)
+ h.Lock()
+ defer h.Unlock()
+ h.fns = append(h.fns, hookFn[T]{id, fn})
+ return func() { h.remove(id) }
+}
+
+// remove unregisters the hook function with the given ID.
+func (h *hook[T]) remove(id uint64) {
+ h.Lock()
+ defer h.Unlock()
+ h.fns = slices.DeleteFunc(h.fns, func(f hookFn[T]) bool { return f.ID == id })
+}
+
+// run calls all registered hooks functions with v.
+func (h *hook[T]) run(v T) {
+ h.Lock()
+ defer h.Unlock()
+ for _, f := range h.fns {
+ f.run(v)
+ }
+}
+
+// active reports whether any hook functions are registered. Hook call
+// sites can use this to skip doing work if nobody's listening.
+func (h *hook[T]) active() bool {
+ h.Lock()
+ defer h.Unlock()
+ return len(h.fns) > 0
+}
+
+var hookID atomic.Uint64
+
+// hookFn attaches a comparable ID to a hook function, so that hooks
+// can be found and deleted during cleanup.
+type hookFn[T any] struct {
+ ID uint64
+ Fn func(T)
+}
+
+func (h hookFn[T]) run(v T) { h.Fn(v) }
diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go
index b2d0641d9..a8eefdb97 100644
--- a/util/eventbus/publish.go
+++ b/util/eventbus/publish.go
@@ -5,6 +5,7 @@ package eventbus
import (
"reflect"
+ "time"
)
// publisher is a uniformly typed wrapper around Publisher[T], so that
@@ -18,6 +19,7 @@ type publisher interface {
type Publisher[T any] struct {
client *Client
stop stopFlag
+ debug hook[publishedEvent]
}
func newPublisher[T any](c *Client) *Publisher[T] {
@@ -44,6 +46,8 @@ func (p *Publisher[T]) publishType() reflect.Type {
// Publish publishes event v on the bus.
func (p *Publisher[T]) Publish(v T) {
+ now := time.Now()
+
// Check for just a stopped publisher or bus before trying to
// write, so that once closed Publish consistently does nothing.
select {
@@ -52,6 +56,14 @@ func (p *Publisher[T]) Publish(v T) {
default:
}
+ if p.debug.active() {
+ p.debug.run(publishedEvent{
+ Event: v,
+ From: p.client,
+ Published: now,
+ })
+ }
+
select {
case p.client.publish() <- v:
case <-p.stop.Done():
diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go
index 606410c8e..77ed194ff 100644
--- a/util/eventbus/subscribe.go
+++ b/util/eventbus/subscribe.go
@@ -8,11 +8,13 @@ import (
"fmt"
"reflect"
"sync"
+ "time"
)
// subscriber is a uniformly typed wrapper around Subscriber[T], so
// that debugging facilities can look at active subscribers.
type subscriber interface {
+ client() *Client
subscribeType() reflect.Type
// dispatch is a function that dispatches the head value in vals to
// a subscriber, while also handling stop and incoming queue write
@@ -38,6 +40,7 @@ type subscribeState struct {
dispatcher *worker
write chan any
snapshot chan chan []any
+ debug hook[subscribedEvent]
outputsMu sync.Mutex
outputs map[reflect.Type]subscriber
@@ -64,6 +67,7 @@ func (q *subscribeState) pump(ctx context.Context) {
}
for {
if !vals.Empty() {
+ popped := time.Now()
val := vals.Peek()
sub := q.subscriberFor(val)
if sub == nil {
@@ -71,6 +75,20 @@ func (q *subscribeState) pump(ctx context.Context) {
vals.Drop()
continue
}
+
+ if q.debug.active() {
+ q.debug.run(subscribedEvent{
+ Event: val,
+ From: nil, // TODO: plumb more
+ To: q.client,
+ Published: time.Time{}, // TODO: plumb
+ ReachedRouter: time.Time{},
+ DestinationsPicked: time.Time{},
+ QueuedAtSubscriber: time.Time{},
+ NextToDeliver: popped,
+ })
+ }
+
if !sub.dispatch(ctx, &vals, acceptCh) {
return
}