summaryrefslogtreecommitdiffhomepage
path: root/util/eventbus
diff options
context:
space:
mode:
Diffstat (limited to 'util/eventbus')
-rw-r--r--util/eventbus/bus.go6
-rw-r--r--util/eventbus/debug.go4
-rw-r--r--util/eventbus/publish.go6
-rw-r--r--util/eventbus/subscribe.go10
4 files changed, 21 insertions, 5 deletions
diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go
index 880e075cc..0a4674081 100644
--- a/util/eventbus/bus.go
+++ b/util/eventbus/bus.go
@@ -59,6 +59,10 @@ func NewWithOptions(opts BusOptions) *Bus {
clients: set.Set[*Client]{},
logf: opts.logger(),
}
+
+ syncs.RegisterMutex(&ret.topicsMu, "eventbus.Bus.topicsMu")
+ syncs.RegisterMutex(&ret.clientsMu, "eventbus.Bus.clientsMu")
+
ret.router = runWorker(ret.pump)
return ret
}
@@ -92,6 +96,8 @@ func (b *Bus) Client(name string) *Client {
bus: b,
pub: set.Set[publisher]{},
}
+ syncs.RegisterMutex(&ret.mu, "eventbus.Client.mu")
+ syncs.RegisterMutex(&ret.stop.mu, "eventbus.Client.stop.mu")
b.clientsMu.Lock()
defer b.clientsMu.Unlock()
b.clients.Add(ret)
diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go
index 0453defb1..2f2c9589a 100644
--- a/util/eventbus/debug.go
+++ b/util/eventbus/debug.go
@@ -11,10 +11,10 @@ import (
"runtime"
"slices"
"strings"
+ "sync"
"sync/atomic"
"time"
- "tailscale.com/syncs"
"tailscale.com/types/logger"
)
@@ -147,7 +147,7 @@ func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type {
// A hook collects hook functions that can be run as a group.
type hook[T any] struct {
- syncs.Mutex
+ sync.Mutex
fns []hookFn[T]
}
diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go
index 348bb9dff..b35af28fd 100644
--- a/util/eventbus/publish.go
+++ b/util/eventbus/publish.go
@@ -5,6 +5,8 @@ package eventbus
import (
"reflect"
+
+ "tailscale.com/syncs"
)
// publisher is a uniformly typed wrapper around Publisher[T], so that
@@ -21,7 +23,9 @@ type Publisher[T any] struct {
}
func newPublisher[T any](c *Client) *Publisher[T] {
- return &Publisher[T]{client: c}
+ p := &Publisher[T]{client: c}
+ syncs.RegisterMutex(&p.stop.mu, "eventbus.Publisher.stop.mu")
+ return p
}
// Close closes the publisher.
diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go
index b0348e125..24886ee08 100644
--- a/util/eventbus/subscribe.go
+++ b/util/eventbus/subscribe.go
@@ -62,6 +62,7 @@ func newSubscribeState(c *Client) *subscribeState {
snapshot: make(chan chan []DeliveredEvent),
outputs: map[reflect.Type]subscriber{},
}
+ syncs.RegisterMutex(&ret.outputsMu, "eventbus.subscribeState.outputsMu")
ret.dispatcher = runWorker(ret.pump)
return ret
}
@@ -194,18 +195,21 @@ type Subscriber[T any] struct {
func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] {
slow := time.NewTimer(0)
slow.Stop() // reset in dispatch
- return &Subscriber[T]{
+ s := &Subscriber[T]{
read: make(chan T),
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
logf: logf,
slow: slow,
}
+ syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu")
+ return s
}
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
ret := &Subscriber[T]{
read: make(chan T, 100), // arbitrary, large
}
+ syncs.RegisterMutex(&ret.stop.mu, "eventbus.Subscriber.stop.mu")
ret.unregister = attach(ret.monitor)
return ret
}
@@ -286,12 +290,14 @@ type SubscriberFunc[T any] struct {
func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
slow := time.NewTimer(0)
slow.Stop() // reset in dispatch
- return &SubscriberFunc[T]{
+ s := &SubscriberFunc[T]{
read: f,
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
logf: logf,
slow: slow,
}
+ syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu")
+ return s
}
// Close closes the SubscriberFunc, indicating the caller no longer wishes to