diff options
Diffstat (limited to 'util/eventbus')
| -rw-r--r-- | util/eventbus/bus.go | 6 | ||||
| -rw-r--r-- | util/eventbus/debug.go | 4 | ||||
| -rw-r--r-- | util/eventbus/publish.go | 6 | ||||
| -rw-r--r-- | util/eventbus/subscribe.go | 10 |
4 files changed, 21 insertions, 5 deletions
diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index 880e075cc..0a4674081 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -59,6 +59,10 @@ func NewWithOptions(opts BusOptions) *Bus { clients: set.Set[*Client]{}, logf: opts.logger(), } + + syncs.RegisterMutex(&ret.topicsMu, "eventbus.Bus.topicsMu") + syncs.RegisterMutex(&ret.clientsMu, "eventbus.Bus.clientsMu") + ret.router = runWorker(ret.pump) return ret } @@ -92,6 +96,8 @@ func (b *Bus) Client(name string) *Client { bus: b, pub: set.Set[publisher]{}, } + syncs.RegisterMutex(&ret.mu, "eventbus.Client.mu") + syncs.RegisterMutex(&ret.stop.mu, "eventbus.Client.stop.mu") b.clientsMu.Lock() defer b.clientsMu.Unlock() b.clients.Add(ret) diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go index 0453defb1..2f2c9589a 100644 --- a/util/eventbus/debug.go +++ b/util/eventbus/debug.go @@ -11,10 +11,10 @@ import ( "runtime" "slices" "strings" + "sync" "sync/atomic" "time" - "tailscale.com/syncs" "tailscale.com/types/logger" ) @@ -147,7 +147,7 @@ func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type { // A hook collects hook functions that can be run as a group. type hook[T any] struct { - syncs.Mutex + sync.Mutex fns []hookFn[T] } diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index 348bb9dff..b35af28fd 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -5,6 +5,8 @@ package eventbus import ( "reflect" + + "tailscale.com/syncs" ) // publisher is a uniformly typed wrapper around Publisher[T], so that @@ -21,7 +23,9 @@ type Publisher[T any] struct { } func newPublisher[T any](c *Client) *Publisher[T] { - return &Publisher[T]{client: c} + p := &Publisher[T]{client: c} + syncs.RegisterMutex(&p.stop.mu, "eventbus.Publisher.stop.mu") + return p } // Close closes the publisher. diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index b0348e125..24886ee08 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -62,6 +62,7 @@ func newSubscribeState(c *Client) *subscribeState { snapshot: make(chan chan []DeliveredEvent), outputs: map[reflect.Type]subscriber{}, } + syncs.RegisterMutex(&ret.outputsMu, "eventbus.subscribeState.outputsMu") ret.dispatcher = runWorker(ret.pump) return ret } @@ -194,18 +195,21 @@ type Subscriber[T any] struct { func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] { slow := time.NewTimer(0) slow.Stop() // reset in dispatch - return &Subscriber[T]{ + s := &Subscriber[T]{ read: make(chan T), unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, logf: logf, slow: slow, } + syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu") + return s } func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { ret := &Subscriber[T]{ read: make(chan T, 100), // arbitrary, large } + syncs.RegisterMutex(&ret.stop.mu, "eventbus.Subscriber.stop.mu") ret.unregister = attach(ret.monitor) return ret } @@ -286,12 +290,14 @@ type SubscriberFunc[T any] struct { func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] { slow := time.NewTimer(0) slow.Stop() // reset in dispatch - return &SubscriberFunc[T]{ + s := &SubscriberFunc[T]{ read: f, unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, logf: logf, slow: slow, } + syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu") + return s } // Close closes the SubscriberFunc, indicating the caller no longer wishes to |
