diff options
Diffstat (limited to 'util')
| -rw-r--r-- | util/eventbus/client.go | 20 | ||||
| -rw-r--r-- | util/eventbus/subscribe.go | 33 |
2 files changed, 52 insertions, 1 deletions
diff --git a/util/eventbus/client.go b/util/eventbus/client.go index 7c0268886..b5333292a 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -147,6 +147,26 @@ func Subscribe[T any](c *Client) *Subscriber[T] { return s } +// SubscribeFunc is like [Subscribe] but calls the provided func +// for each event of type T. +// +// The func is not called from a new goroutine. It is called +// from the eventbus's goroutine. +func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] { + c.mu.Lock() + defer c.mu.Unlock() + + if c.isClosed() { + panic("cannot SubscribeFunc on a closed client") + } + + r := c.subscribeStateLocked() + s := newSubscriberFunc(r, f) + r.addSubscriber(s) + + return &SubscriberFunc[T]{s: s} +} + // Publish returns a publisher for event type T using the given client. // It panics if c is closed. func Publish[T any](c *Client) *Publisher[T] { diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index ef155e621..cfa810daa 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -179,10 +179,21 @@ func (s *subscribeState) closed() <-chan struct{} { // A Subscriber delivers one type of event from a [Client]. type Subscriber[T any] struct { stop stopFlag - read chan T + read chan T // mutually exclusive with readFunc + readFunc func(T) // mutually exclusive with read unregister func() } +// SubscriberFunc is like [Subscriber] but has no channel +// for delivery. They're returned by [SubscribeFunc]. +type SubscriberFunc[T any] struct { + s *Subscriber[T] // but don't use its Events or Done methods +} + +func (s *SubscriberFunc[T]) Close() { + s.s.Close() +} + func newSubscriber[T any](r *subscribeState) *Subscriber[T] { return &Subscriber[T]{ read: make(chan T), @@ -190,6 +201,13 @@ func newSubscriber[T any](r *subscribeState) *Subscriber[T] { } } +func newSubscriberFunc[T any](r *subscribeState, f func(T)) *Subscriber[T] { + return &Subscriber[T]{ + readFunc: f, + unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, + } +} + func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { ret := &Subscriber[T]{ read: make(chan T, 100), // arbitrary, large @@ -211,6 +229,19 @@ func (s *Subscriber[T]) monitor(debugEvent T) { func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { t := vals.Peek().Event.(T) + + if s.readFunc != nil { + select { + case <-ctx.Done(): + return false + case ch := <-snapshot: + ch <- vals.Snapshot() + default: + } + s.readFunc(t) + vals.Drop() + return true + } for { // Keep the cases in this select in sync with subscribeState.pump // above. The only different should be that this select |
