summaryrefslogtreecommitdiffhomepage
path: root/util
diff options
context:
space:
mode:
authorBrad Fitzpatrick <bradfitz@tailscale.com>2025-10-02 13:51:02 -0700
committerBrad Fitzpatrick <bradfitz@tailscale.com>2025-10-03 10:21:17 -0700
commitabb4c7ec18f4131c1709a6ec806cbfb006a5c1ea (patch)
tree5eab66928cb7d5e9c45f4c75a20954662ad587e4 /util
parentf42be719de9ef38d1dc22ea48f590a01a227bfe5 (diff)
downloadtailscale-bradfitz/evsub.tar.xz
tailscale-bradfitz/evsub.zip
util/eventbus: [DRAFT] add sketch of Subscribe with funcsbradfitz/evsub
Updates #DRAFT Change-Id: Id1f208bdd55a9ae4eccc07afc44eade6e67db5bb Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
Diffstat (limited to 'util')
-rw-r--r--util/eventbus/client.go20
-rw-r--r--util/eventbus/subscribe.go33
2 files changed, 52 insertions, 1 deletions
diff --git a/util/eventbus/client.go b/util/eventbus/client.go
index 7c0268886..b5333292a 100644
--- a/util/eventbus/client.go
+++ b/util/eventbus/client.go
@@ -147,6 +147,26 @@ func Subscribe[T any](c *Client) *Subscriber[T] {
return s
}
+// SubscribeFunc is like [Subscribe] but calls the provided func
+// for each event of type T.
+//
+// The func is not called from a new goroutine. It is called
+// from the eventbus's goroutine.
+func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.isClosed() {
+ panic("cannot SubscribeFunc on a closed client")
+ }
+
+ r := c.subscribeStateLocked()
+ s := newSubscriberFunc(r, f)
+ r.addSubscriber(s)
+
+ return &SubscriberFunc[T]{s: s}
+}
+
// Publish returns a publisher for event type T using the given client.
// It panics if c is closed.
func Publish[T any](c *Client) *Publisher[T] {
diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go
index ef155e621..cfa810daa 100644
--- a/util/eventbus/subscribe.go
+++ b/util/eventbus/subscribe.go
@@ -179,10 +179,21 @@ func (s *subscribeState) closed() <-chan struct{} {
// A Subscriber delivers one type of event from a [Client].
type Subscriber[T any] struct {
stop stopFlag
- read chan T
+ read chan T // mutually exclusive with readFunc
+ readFunc func(T) // mutually exclusive with read
unregister func()
}
+// SubscriberFunc is like [Subscriber] but has no channel
+// for delivery. They're returned by [SubscribeFunc].
+type SubscriberFunc[T any] struct {
+ s *Subscriber[T] // but don't use its Events or Done methods
+}
+
+func (s *SubscriberFunc[T]) Close() {
+ s.s.Close()
+}
+
func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
return &Subscriber[T]{
read: make(chan T),
@@ -190,6 +201,13 @@ func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
}
}
+func newSubscriberFunc[T any](r *subscribeState, f func(T)) *Subscriber[T] {
+ return &Subscriber[T]{
+ readFunc: f,
+ unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
+ }
+}
+
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
ret := &Subscriber[T]{
read: make(chan T, 100), // arbitrary, large
@@ -211,6 +229,19 @@ func (s *Subscriber[T]) monitor(debugEvent T) {
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
t := vals.Peek().Event.(T)
+
+ if s.readFunc != nil {
+ select {
+ case <-ctx.Done():
+ return false
+ case ch := <-snapshot:
+ ch <- vals.Snapshot()
+ default:
+ }
+ s.readFunc(t)
+ vals.Drop()
+ return true
+ }
for {
// Keep the cases in this select in sync with subscribeState.pump
// above. The only different should be that this select