summaryrefslogtreecommitdiffhomepage
path: root/util
diff options
context:
space:
mode:
Diffstat (limited to 'util')
-rw-r--r--util/eventbus/bus.go6
-rw-r--r--util/eventbus/debug.go4
-rw-r--r--util/eventbus/publish.go6
-rw-r--r--util/eventbus/subscribe.go10
-rw-r--r--util/execqueue/execqueue.go15
-rw-r--r--util/goroutines/tracker.go4
-rw-r--r--util/ringlog/ringlog.go4
-rw-r--r--util/syspolicy/rsop/change_callbacks.go3
-rw-r--r--util/syspolicy/rsop/resultant_policy.go2
-rw-r--r--util/syspolicy/rsop/rsop.go5
-rw-r--r--util/syspolicy/setting/setting.go5
11 files changed, 54 insertions, 10 deletions
diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go
index 880e075cc..0a4674081 100644
--- a/util/eventbus/bus.go
+++ b/util/eventbus/bus.go
@@ -59,6 +59,10 @@ func NewWithOptions(opts BusOptions) *Bus {
clients: set.Set[*Client]{},
logf: opts.logger(),
}
+
+ syncs.RegisterMutex(&ret.topicsMu, "eventbus.Bus.topicsMu")
+ syncs.RegisterMutex(&ret.clientsMu, "eventbus.Bus.clientsMu")
+
ret.router = runWorker(ret.pump)
return ret
}
@@ -92,6 +96,8 @@ func (b *Bus) Client(name string) *Client {
bus: b,
pub: set.Set[publisher]{},
}
+ syncs.RegisterMutex(&ret.mu, "eventbus.Client.mu")
+ syncs.RegisterMutex(&ret.stop.mu, "eventbus.Client.stop.mu")
b.clientsMu.Lock()
defer b.clientsMu.Unlock()
b.clients.Add(ret)
diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go
index 0453defb1..2f2c9589a 100644
--- a/util/eventbus/debug.go
+++ b/util/eventbus/debug.go
@@ -11,10 +11,10 @@ import (
"runtime"
"slices"
"strings"
+ "sync"
"sync/atomic"
"time"
- "tailscale.com/syncs"
"tailscale.com/types/logger"
)
@@ -147,7 +147,7 @@ func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type {
// A hook collects hook functions that can be run as a group.
type hook[T any] struct {
- syncs.Mutex
+ sync.Mutex
fns []hookFn[T]
}
diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go
index 348bb9dff..b35af28fd 100644
--- a/util/eventbus/publish.go
+++ b/util/eventbus/publish.go
@@ -5,6 +5,8 @@ package eventbus
import (
"reflect"
+
+ "tailscale.com/syncs"
)
// publisher is a uniformly typed wrapper around Publisher[T], so that
@@ -21,7 +23,9 @@ type Publisher[T any] struct {
}
func newPublisher[T any](c *Client) *Publisher[T] {
- return &Publisher[T]{client: c}
+ p := &Publisher[T]{client: c}
+ syncs.RegisterMutex(&p.stop.mu, "eventbus.Publisher.stop.mu")
+ return p
}
// Close closes the publisher.
diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go
index b0348e125..24886ee08 100644
--- a/util/eventbus/subscribe.go
+++ b/util/eventbus/subscribe.go
@@ -62,6 +62,7 @@ func newSubscribeState(c *Client) *subscribeState {
snapshot: make(chan chan []DeliveredEvent),
outputs: map[reflect.Type]subscriber{},
}
+ syncs.RegisterMutex(&ret.outputsMu, "eventbus.subscribeState.outputsMu")
ret.dispatcher = runWorker(ret.pump)
return ret
}
@@ -194,18 +195,21 @@ type Subscriber[T any] struct {
func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] {
slow := time.NewTimer(0)
slow.Stop() // reset in dispatch
- return &Subscriber[T]{
+ s := &Subscriber[T]{
read: make(chan T),
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
logf: logf,
slow: slow,
}
+ syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu")
+ return s
}
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
ret := &Subscriber[T]{
read: make(chan T, 100), // arbitrary, large
}
+ syncs.RegisterMutex(&ret.stop.mu, "eventbus.Subscriber.stop.mu")
ret.unregister = attach(ret.monitor)
return ret
}
@@ -286,12 +290,14 @@ type SubscriberFunc[T any] struct {
func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
slow := time.NewTimer(0)
slow.Stop() // reset in dispatch
- return &SubscriberFunc[T]{
+ s := &SubscriberFunc[T]{
read: f,
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
logf: logf,
slow: slow,
}
+ syncs.RegisterMutex(&s.stop.mu, "eventbus.Subscriber.stop.mu")
+ return s
}
// Close closes the SubscriberFunc, indicating the caller no longer wishes to
diff --git a/util/execqueue/execqueue.go b/util/execqueue/execqueue.go
index 87616a6b5..acf25f645 100644
--- a/util/execqueue/execqueue.go
+++ b/util/execqueue/execqueue.go
@@ -7,11 +7,14 @@ package execqueue
import (
"context"
"errors"
+ "sync"
"tailscale.com/syncs"
)
type ExecQueue struct {
+ regMutexOnce sync.Once
+
mu syncs.Mutex
ctx context.Context // context.Background + closed on Shutdown
cancel context.CancelFunc // closes ctx
@@ -21,7 +24,13 @@ type ExecQueue struct {
queue []func()
}
+func (q *ExecQueue) registerMutex() {
+ syncs.RegisterMutex(&q.mu, "execqueue.ExecQueue.mu")
+}
+
func (q *ExecQueue) Add(f func()) {
+ q.regMutexOnce.Do(q.registerMutex)
+
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
@@ -39,6 +48,8 @@ func (q *ExecQueue) Add(f func()) {
// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
+ q.regMutexOnce.Do(q.registerMutex)
+
q.mu.Lock()
q.initCtxLocked()
shutdownCtx := q.ctx
@@ -80,6 +91,8 @@ func (q *ExecQueue) run(f func()) {
// Shutdown asynchronously signals the queue to stop.
func (q *ExecQueue) Shutdown() {
+ q.regMutexOnce.Do(q.registerMutex)
+
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
@@ -98,6 +111,8 @@ var errExecQueueShutdown = errors.New("execqueue shut down")
// Wait waits for the queue to be empty or shut down.
func (q *ExecQueue) Wait(ctx context.Context) error {
+ q.regMutexOnce.Do(q.registerMutex)
+
q.mu.Lock()
q.initCtxLocked()
waitCh := q.doneWaiter
diff --git a/util/goroutines/tracker.go b/util/goroutines/tracker.go
index c2a0cb8c3..044843d33 100644
--- a/util/goroutines/tracker.go
+++ b/util/goroutines/tracker.go
@@ -4,9 +4,9 @@
package goroutines
import (
+ "sync"
"sync/atomic"
- "tailscale.com/syncs"
"tailscale.com/util/set"
)
@@ -15,7 +15,7 @@ type Tracker struct {
started atomic.Int64 // counter
running atomic.Int64 // gauge
- mu syncs.Mutex
+ mu sync.Mutex
onDone set.HandleSet[func()]
}
diff --git a/util/ringlog/ringlog.go b/util/ringlog/ringlog.go
index 62dfbae5b..781e8f5ca 100644
--- a/util/ringlog/ringlog.go
+++ b/util/ringlog/ringlog.go
@@ -8,9 +8,11 @@ import "tailscale.com/syncs"
// New creates a new [RingLog] containing at most max items.
func New[T any](max int) *RingLog[T] {
- return &RingLog[T]{
+ rl := &RingLog[T]{
max: max,
}
+ syncs.RegisterMutex(&rl.mu, "ringlog.RingLog.mu")
+ return rl
}
// RingLog is a concurrency-safe fixed size log window containing entries of [T].
diff --git a/util/syspolicy/rsop/change_callbacks.go b/util/syspolicy/rsop/change_callbacks.go
index 71135bb2a..fdf51c253 100644
--- a/util/syspolicy/rsop/change_callbacks.go
+++ b/util/syspolicy/rsop/change_callbacks.go
@@ -9,7 +9,6 @@ import (
"sync"
"time"
- "tailscale.com/syncs"
"tailscale.com/util/set"
"tailscale.com/util/syspolicy/internal/loggerx"
"tailscale.com/util/syspolicy/pkey"
@@ -71,7 +70,7 @@ func (c PolicyChange) HasChangedAnyOf(keys ...pkey.Key) bool {
// policyChangeCallbacks are the callbacks to invoke when the effective policy changes.
// It is safe for concurrent use.
type policyChangeCallbacks struct {
- mu syncs.Mutex
+ mu sync.Mutex
cbs set.HandleSet[PolicyChangeCallback]
}
diff --git a/util/syspolicy/rsop/resultant_policy.go b/util/syspolicy/rsop/resultant_policy.go
index bdda90976..67e13ab56 100644
--- a/util/syspolicy/rsop/resultant_policy.go
+++ b/util/syspolicy/rsop/resultant_policy.go
@@ -96,6 +96,8 @@ func newPolicy(scope setting.PolicyScope, sources ...*source.Source) (_ *Policy,
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
+ syncs.RegisterMutex(&p.mu, "syspolicy/rsop.Policy.mu")
+
if _, err := p.reloadNow(false); err != nil {
p.Close()
return nil, err
diff --git a/util/syspolicy/rsop/rsop.go b/util/syspolicy/rsop/rsop.go
index 333dca643..d7e50a004 100644
--- a/util/syspolicy/rsop/rsop.go
+++ b/util/syspolicy/rsop/rsop.go
@@ -32,6 +32,11 @@ var (
effectivePolicyLRU [setting.NumScopes]syncs.AtomicValue[*Policy]
)
+var _ = func() bool {
+ syncs.RegisterMutex(&policyMu, "syspolicy/rsop.policyMu")
+ return true
+}()
+
// PolicyFor returns the [Policy] for the specified scope,
// creating it from the registered [source.Store]s if it doesn't already exist.
func PolicyFor(scope setting.PolicyScope) (*Policy, error) {
diff --git a/util/syspolicy/setting/setting.go b/util/syspolicy/setting/setting.go
index 97362b1dc..d0df2436c 100644
--- a/util/syspolicy/setting/setting.go
+++ b/util/syspolicy/setting/setting.go
@@ -220,6 +220,11 @@ var (
definitionsUsed bool
)
+var _ = func() bool {
+ syncs.RegisterMutex(&definitionsMu, "syspolicy/setting.definitionsMu")
+ return true
+}()
+
// Register registers a policy setting with the specified key, scope, value type,
// and an optional list of supported platforms. All policy settings must be
// registered before any of them can be used. Register panics if called after