summaryrefslogtreecommitdiffhomepage
path: root/util
diff options
context:
space:
mode:
Diffstat (limited to 'util')
-rw-r--r--util/eventbus/client.go20
-rw-r--r--util/eventbus/subscribe.go33
2 files changed, 52 insertions, 1 deletions
diff --git a/util/eventbus/client.go b/util/eventbus/client.go
index 7c0268886..b5333292a 100644
--- a/util/eventbus/client.go
+++ b/util/eventbus/client.go
@@ -147,6 +147,26 @@ func Subscribe[T any](c *Client) *Subscriber[T] {
return s
}
+// SubscribeFunc is like [Subscribe] but calls the provided func
+// for each event of type T.
+//
+// The func is not called from a new goroutine. It is called
+// from the eventbus's goroutine.
+func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.isClosed() {
+ panic("cannot SubscribeFunc on a closed client")
+ }
+
+ r := c.subscribeStateLocked()
+ s := newSubscriberFunc(r, f)
+ r.addSubscriber(s)
+
+ return &SubscriberFunc[T]{s: s}
+}
+
// Publish returns a publisher for event type T using the given client.
// It panics if c is closed.
func Publish[T any](c *Client) *Publisher[T] {
diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go
index ef155e621..cfa810daa 100644
--- a/util/eventbus/subscribe.go
+++ b/util/eventbus/subscribe.go
@@ -179,10 +179,21 @@ func (s *subscribeState) closed() <-chan struct{} {
// A Subscriber delivers one type of event from a [Client].
type Subscriber[T any] struct {
stop stopFlag
- read chan T
+ read chan T // mutually exclusive with readFunc
+ readFunc func(T) // mutually exclusive with read
unregister func()
}
+// SubscriberFunc is like [Subscriber] but has no channel
+// for delivery. They're returned by [SubscribeFunc].
+type SubscriberFunc[T any] struct {
+ s *Subscriber[T] // but don't use its Events or Done methods
+}
+
+func (s *SubscriberFunc[T]) Close() {
+ s.s.Close()
+}
+
func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
return &Subscriber[T]{
read: make(chan T),
@@ -190,6 +201,13 @@ func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
}
}
+func newSubscriberFunc[T any](r *subscribeState, f func(T)) *Subscriber[T] {
+ return &Subscriber[T]{
+ readFunc: f,
+ unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
+ }
+}
+
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
ret := &Subscriber[T]{
read: make(chan T, 100), // arbitrary, large
@@ -211,6 +229,19 @@ func (s *Subscriber[T]) monitor(debugEvent T) {
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
t := vals.Peek().Event.(T)
+
+ if s.readFunc != nil {
+ select {
+ case <-ctx.Done():
+ return false
+ case ch := <-snapshot:
+ ch <- vals.Snapshot()
+ default:
+ }
+ s.readFunc(t)
+ vals.Drop()
+ return true
+ }
for {
// Keep the cases in this select in sync with subscribeState.pump
// above. The only different should be that this select