diff options
Diffstat (limited to 'util')
| -rw-r--r-- | util/eventbus/bus.go | 6 | ||||
| -rw-r--r-- | util/eventbus/debug.go | 4 | ||||
| -rw-r--r-- | util/eventbus/publish.go | 6 | ||||
| -rw-r--r-- | util/eventbus/subscribe.go | 10 | ||||
| -rw-r--r-- | util/execqueue/execqueue.go | 15 | ||||
| -rw-r--r-- | util/goroutines/tracker.go | 4 | ||||
| -rw-r--r-- | util/ringlog/ringlog.go | 4 | ||||
| -rw-r--r-- | util/syspolicy/rsop/change_callbacks.go | 3 | ||||
| -rw-r--r-- | util/syspolicy/rsop/resultant_policy.go | 2 | ||||
| -rw-r--r-- | util/syspolicy/rsop/rsop.go | 5 | ||||
| -rw-r--r-- | util/syspolicy/setting/setting.go | 5 |
11 files changed, 54 insertions, 10 deletions
diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index 880e075cc..0a4674081 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -59,6 +59,10 @@ func NewWithOptions(opts BusOptions) *Bus { clients: set.Set[*Client]{}, logf: opts.logger(), } + + syncs.RegisterMutex(&ret.topicsMu, "eventbus.Bus.topicsMu") + syncs.RegisterMutex(&ret.clientsMu, "eventbus.Bus.clientsMu") + ret.router = runWorker(ret.pump) return ret } @@ -92,6 +96,8 @@ func (b *Bus) Client(name string) *Client { bus: b, pub: set.Set[publisher]{}, } + syncs.RegisterMutex(&ret.mu, "eventbus.Client.mu") + syncs.RegisterMutex(&ret.stop.mu, "eventbus.Client.stop.mu") b.clientsMu.Lock() defer b.clientsMu.Unlock() b.clients.Add(ret) diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go index 0453defb1..2f2c9589a 100644 --- a/util/eventbus/debug.go +++ b/util/eventbus/debug.go @@ -11,10 +11,10 @@ import ( "runtime" "slices" "strings" + "sync" "sync/atomic" "time" - "tailscale.com/syncs" "tailscale.com/types/logger" ) @@ -147,7 +147,7 @@ func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type { // A hook collects hook functions that can be run as a group. type hook[T any] struct { - syncs.Mutex + sync.Mutex fns []hookFn[T] } diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go index 348bb9dff..b35af28fd 100644 --- a/util/eventbus/publish.go +++ b/util/eventbus/publish.go @@ -5,6 +5,8 @@ package eventbus import ( "reflect" + + "tailscale.com/syncs" ) // publisher is a uniformly typed wrapper around Publisher[T], so that @@ -21,7 +23,9 @@ type Publisher[T any] struct { } func newPublisher[T any](c *Client) *Publisher[T] { - return &Publisher[T]{client: c} + p := &Publisher[T]{client: c} + syncs.RegisterMutex(&p.stop.mu, "eventbus.Publisher.stop.mu") + return p } // Close closes the publisher. diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index b0348e125..24886ee08 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -62,6 +62,7 @@ func newSubscribeState(c *Client) *subscribeState { snapshot: make(chan chan []DeliveredEvent), outputs: map[reflect.Type]subscriber{}, } + syncs.RegisterMutex(&ret.outputsMu, "eventbus.subscribeState.outputsMu") ret.dispatcher = runWorker(ret.pump) return ret } @@ -194,18 +195,21 @@ type Subscriber[T any] struct { func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] { slow := time.NewTimer(0) slow.Stop() // reset in dispatch - return &Subscriber[T]{ + s := &Subscriber[T]{ read: make(chan T), unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, logf: logf, slow: slow, } + syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu") + return s } func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { ret := &Subscriber[T]{ read: make(chan T, 100), // arbitrary, large } + syncs.RegisterMutex(&ret.stop.mu, "eventbus.Subscriber.stop.mu") ret.unregister = attach(ret.monitor) return ret } @@ -286,12 +290,14 @@ type SubscriberFunc[T any] struct { func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] { slow := time.NewTimer(0) slow.Stop() // reset in dispatch - return &SubscriberFunc[T]{ + s := &SubscriberFunc[T]{ read: f, unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, logf: logf, slow: slow, } + syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu") + return s } // Close closes the SubscriberFunc, indicating the caller no longer wishes to diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go index 87616a6b5..acf25f645 100644 --- a/util/execqueue/execqueue.go +++ b/util/execqueue/execqueue.go @@ -7,11 +7,14 @@ package execqueue import ( "context" "errors" + "sync" "tailscale.com/syncs" ) type ExecQueue struct { + regMutexOnce sync.Once + mu syncs.Mutex ctx context.Context // context.Background + closed on Shutdown cancel context.CancelFunc // closes ctx @@ -21,7 +24,13 @@ type ExecQueue struct { queue []func() } +func (q *ExecQueue) registerMutex() { + syncs.RegisterMutex(&q.mu, "execqueue.ExecQueue.mu") +} + func (q *ExecQueue) Add(f func()) { + q.regMutexOnce.Do(q.registerMutex) + q.mu.Lock() defer q.mu.Unlock() if q.closed { @@ -39,6 +48,8 @@ func (q *ExecQueue) Add(f func()) { // RunSync waits for the queue to be drained and then synchronously runs f. // It returns an error if the queue is closed before f is run or ctx expires. func (q *ExecQueue) RunSync(ctx context.Context, f func()) error { + q.regMutexOnce.Do(q.registerMutex) + q.mu.Lock() q.initCtxLocked() shutdownCtx := q.ctx @@ -80,6 +91,8 @@ func (q *ExecQueue) run(f func()) { // Shutdown asynchronously signals the queue to stop. func (q *ExecQueue) Shutdown() { + q.regMutexOnce.Do(q.registerMutex) + q.mu.Lock() defer q.mu.Unlock() q.closed = true @@ -98,6 +111,8 @@ var errExecQueueShutdown = errors.New("execqueue shut down") // Wait waits for the queue to be empty or shut down. func (q *ExecQueue) Wait(ctx context.Context) error { + q.regMutexOnce.Do(q.registerMutex) + q.mu.Lock() q.initCtxLocked() waitCh := q.doneWaiter diff --git a/util/goroutines/tracker.go b/util/goroutines/tracker.go index c2a0cb8c3..044843d33 100644 --- a/util/goroutines/tracker.go +++ b/util/goroutines/tracker.go @@ -4,9 +4,9 @@ package goroutines import ( + "sync" "sync/atomic" - "tailscale.com/syncs" "tailscale.com/util/set" ) @@ -15,7 +15,7 @@ type Tracker struct { started atomic.Int64 // counter running atomic.Int64 // gauge - mu syncs.Mutex + mu sync.Mutex onDone set.HandleSet[func()] } diff --git a/util/ringlog/ringlog.go b/util/ringlog/ringlog.go index 62dfbae5b..781e8f5ca 100644 --- a/util/ringlog/ringlog.go +++ b/util/ringlog/ringlog.go @@ -8,9 +8,11 @@ import "tailscale.com/syncs" // New creates a new [RingLog] containing at most max items. func New[T any](max int) *RingLog[T] { - return &RingLog[T]{ + rl := &RingLog[T]{ max: max, } + syncs.RegisterMutex(&rl.mu, "ringlog.RingLog.mu") + return rl } // RingLog is a concurrency-safe fixed size log window containing entries of [T]. diff --git a/util/syspolicy/rsop/change_callbacks.go b/util/syspolicy/rsop/change_callbacks.go index 71135bb2a..fdf51c253 100644 --- a/util/syspolicy/rsop/change_callbacks.go +++ b/util/syspolicy/rsop/change_callbacks.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "tailscale.com/syncs" "tailscale.com/util/set" "tailscale.com/util/syspolicy/internal/loggerx" "tailscale.com/util/syspolicy/pkey" @@ -71,7 +70,7 @@ func (c PolicyChange) HasChangedAnyOf(keys ...pkey.Key) bool { // policyChangeCallbacks are the callbacks to invoke when the effective policy changes. // It is safe for concurrent use. type policyChangeCallbacks struct { - mu syncs.Mutex + mu sync.Mutex cbs set.HandleSet[PolicyChangeCallback] } diff --git a/util/syspolicy/rsop/resultant_policy.go b/util/syspolicy/rsop/resultant_policy.go index bdda90976..67e13ab56 100644 --- a/util/syspolicy/rsop/resultant_policy.go +++ b/util/syspolicy/rsop/resultant_policy.go @@ -96,6 +96,8 @@ func newPolicy(scope setting.PolicyScope, sources ...*source.Source) (_ *Policy, closeCh: make(chan struct{}), doneCh: make(chan struct{}), } + syncs.RegisterMutex(&p.mu, "syspolicy/rsop.Policy.mu") + if _, err := p.reloadNow(false); err != nil { p.Close() return nil, err diff --git a/util/syspolicy/rsop/rsop.go b/util/syspolicy/rsop/rsop.go index 333dca643..d7e50a004 100644 --- a/util/syspolicy/rsop/rsop.go +++ b/util/syspolicy/rsop/rsop.go @@ -32,6 +32,11 @@ var ( effectivePolicyLRU [setting.NumScopes]syncs.AtomicValue[*Policy] ) +var _ = func() bool { + syncs.RegisterMutex(&policyMu, "syspolicy/rsop.policyMu") + return true +}() + // PolicyFor returns the [Policy] for the specified scope, // creating it from the registered [source.Store]s if it doesn't already exist. func PolicyFor(scope setting.PolicyScope) (*Policy, error) { diff --git a/util/syspolicy/setting/setting.go b/util/syspolicy/setting/setting.go index 97362b1dc..d0df2436c 100644 --- a/util/syspolicy/setting/setting.go +++ b/util/syspolicy/setting/setting.go @@ -220,6 +220,11 @@ var ( definitionsUsed bool ) +var _ = func() bool { + syncs.RegisterMutex(&definitionsMu, "syspolicy/setting.definitionsMu") + return true +}() + // Register registers a policy setting with the specified key, scope, value type, // and an optional list of supported platforms. All policy settings must be // registered before any of them can be used. Register panics if called after |
