summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorBrad Fitzpatrick <bradfitz@tailscale.com>2025-10-02 13:51:02 -0700
committerBrad Fitzpatrick <bradfitz@tailscale.com>2025-10-03 10:21:17 -0700
commitabb4c7ec18f4131c1709a6ec806cbfb006a5c1ea (patch)
tree5eab66928cb7d5e9c45f4c75a20954662ad587e4
parentf42be719de9ef38d1dc22ea48f590a01a227bfe5 (diff)
downloadtailscale-bradfitz/evsub.tar.xz
tailscale-bradfitz/evsub.zip
util/eventbus: [DRAFT] add sketch of Subscribe with funcsbradfitz/evsub
Updates #DRAFT Change-Id: Id1f208bdd55a9ae4eccc07afc44eade6e67db5bb Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
-rw-r--r--ipn/ipnlocal/local.go58
-rw-r--r--util/eventbus/client.go20
-rw-r--r--util/eventbus/subscribe.go33
-rw-r--r--wgengine/magicsock/magicsock.go60
4 files changed, 84 insertions, 87 deletions
diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go
index 7488a06a9..3aa625ecc 100644
--- a/ipn/ipnlocal/local.go
+++ b/ipn/ipnlocal/local.go
@@ -548,7 +548,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
b.prevIfState = netMon.InterfaceState()
// Call our linkChange code once with the current state.
// Following changes are triggered via the eventbus.
- b.linkChange(&netmon.ChangeDelta{New: netMon.InterfaceState()})
+ b.linkChange(netmon.ChangeDelta{New: netMon.InterfaceState()})
if buildfeatures.HasPeerAPIServer {
if tunWrap, ok := b.sys.Tun.GetOK(); ok {
@@ -573,48 +573,15 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
// Start the event bus late, once all the assignments above are done.
// (See previous race in tailscale/tailscale#17252)
ec := b.Sys().Bus.Get().Client("ipnlocal.LocalBackend")
- b.eventSubs = ec.Monitor(b.consumeEventbusTopics(ec))
-
- return b, nil
-}
-
-// consumeEventbusTopics consumes events from all relevant
-// [eventbus.Subscriber]'s and passes them to their related handler. Events are
-// always handled in the order they are received, i.e. the next event is not
-// read until the previous event's handler has returned. It returns when the
-// [eventbus.Client] is closed.
-func (b *LocalBackend) consumeEventbusTopics(ec *eventbus.Client) func(*eventbus.Client) {
- clientVersionSub := eventbus.Subscribe[tailcfg.ClientVersion](ec)
- autoUpdateSub := eventbus.Subscribe[controlclient.AutoUpdate](ec)
- healthChangeSub := eventbus.Subscribe[health.Change](ec)
- changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](ec)
-
- var portlist <-chan PortlistServices
+ eventbus.SubscribeFunc(ec, b.onClientVersion)
+ eventbus.SubscribeFunc(ec, b.onTailnetDefaultAutoUpdateEvent)
+ eventbus.SubscribeFunc(ec, b.onHealthChange)
+ eventbus.SubscribeFunc(ec, b.linkChange)
if buildfeatures.HasPortList {
- portlistSub := eventbus.Subscribe[PortlistServices](ec)
- portlist = portlistSub.Events()
+ eventbus.SubscribeFunc(ec, b.setPortlistServices)
}
- return func(ec *eventbus.Client) {
- for {
- select {
- case <-ec.Done():
- return
- case clientVersion := <-clientVersionSub.Events():
- b.onClientVersion(&clientVersion)
- case au := <-autoUpdateSub.Events():
- b.onTailnetDefaultAutoUpdate(au.Value)
- case change := <-healthChangeSub.Events():
- b.onHealthChange(change)
- case changeDelta := <-changeDeltaSub.Events():
- b.linkChange(&changeDelta)
- case pl := <-portlist:
- if buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans
- b.setPortlistServices(pl)
- }
- }
- }
- }
+ return b, nil
}
func (b *LocalBackend) Clock() tstime.Clock { return b.clock }
@@ -933,7 +900,7 @@ func (b *LocalBackend) DisconnectControl() {
}
// linkChange is our network monitor callback, called whenever the network changes.
-func (b *LocalBackend) linkChange(delta *netmon.ChangeDelta) {
+func (b *LocalBackend) linkChange(delta netmon.ChangeDelta) {
b.mu.Lock()
defer b.mu.Unlock()
@@ -3399,7 +3366,8 @@ func (b *LocalBackend) tellRecipientToBrowseToURL(url string, recipient notifica
// onClientVersion is called on MapResponse updates when a MapResponse contains
// a non-nil ClientVersion message.
-func (b *LocalBackend) onClientVersion(v *tailcfg.ClientVersion) {
+func (b *LocalBackend) onClientVersion(cv tailcfg.ClientVersion) {
+ v := &cv
b.mu.Lock()
b.lastClientVersion = v
b.health.SetLatestVersion(v)
@@ -3407,6 +3375,10 @@ func (b *LocalBackend) onClientVersion(v *tailcfg.ClientVersion) {
b.send(ipn.Notify{ClientVersion: v})
}
+func (b *LocalBackend) onTailnetDefaultAutoUpdateEvent(a controlclient.AutoUpdate) {
+ b.onTailnetDefaultAutoUpdate(a.Value)
+}
+
func (b *LocalBackend) onTailnetDefaultAutoUpdate(au bool) {
unlock := b.lockAndGetUnlock()
defer unlock()
@@ -4708,7 +4680,7 @@ func (b *LocalBackend) peerAPIServicesLocked() (ret []tailcfg.Service) {
// to advertise the running services on the host.
type PortlistServices []tailcfg.Service
-func (b *LocalBackend) setPortlistServices(sl []tailcfg.Service) {
+func (b *LocalBackend) setPortlistServices(sl PortlistServices) {
if !buildfeatures.HasPortList { // redundant, but explicit for linker deadcode and humans
return
}
diff --git a/util/eventbus/client.go b/util/eventbus/client.go
index 7c0268886..b5333292a 100644
--- a/util/eventbus/client.go
+++ b/util/eventbus/client.go
@@ -147,6 +147,26 @@ func Subscribe[T any](c *Client) *Subscriber[T] {
return s
}
+// SubscribeFunc is like [Subscribe] but calls the provided func
+// for each event of type T.
+//
+// The func is not called from a new goroutine. It is called
+// from the eventbus's goroutine.
+func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.isClosed() {
+ panic("cannot SubscribeFunc on a closed client")
+ }
+
+ r := c.subscribeStateLocked()
+ s := newSubscriberFunc(r, f)
+ r.addSubscriber(s)
+
+ return &SubscriberFunc[T]{s: s}
+}
+
// Publish returns a publisher for event type T using the given client.
// It panics if c is closed.
func Publish[T any](c *Client) *Publisher[T] {
diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go
index ef155e621..cfa810daa 100644
--- a/util/eventbus/subscribe.go
+++ b/util/eventbus/subscribe.go
@@ -179,10 +179,21 @@ func (s *subscribeState) closed() <-chan struct{} {
// A Subscriber delivers one type of event from a [Client].
type Subscriber[T any] struct {
stop stopFlag
- read chan T
+ read chan T // mutually exclusive with readFunc
+ readFunc func(T) // mutually exclusive with read
unregister func()
}
+// SubscriberFunc is like [Subscriber] but has no channel
+// for delivery. They're returned by [SubscribeFunc].
+type SubscriberFunc[T any] struct {
+ s *Subscriber[T] // but don't use its Events or Done methods
+}
+
+func (s *SubscriberFunc[T]) Close() {
+ s.s.Close()
+}
+
func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
return &Subscriber[T]{
read: make(chan T),
@@ -190,6 +201,13 @@ func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
}
}
+func newSubscriberFunc[T any](r *subscribeState, f func(T)) *Subscriber[T] {
+ return &Subscriber[T]{
+ readFunc: f,
+ unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
+ }
+}
+
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
ret := &Subscriber[T]{
read: make(chan T, 100), // arbitrary, large
@@ -211,6 +229,19 @@ func (s *Subscriber[T]) monitor(debugEvent T) {
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
t := vals.Peek().Event.(T)
+
+ if s.readFunc != nil {
+ select {
+ case <-ctx.Done():
+ return false
+ case ch := <-snapshot:
+ ch <- vals.Snapshot()
+ default:
+ }
+ s.readFunc(t)
+ vals.Drop()
+ return true
+ }
for {
// Keep the cases in this select in sync with subscribeState.pump
// above. The only different should be that this select
diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go
index e3cf249c5..db93ec0bb 100644
--- a/wgengine/magicsock/magicsock.go
+++ b/wgengine/magicsock/magicsock.go
@@ -156,7 +156,7 @@ type Conn struct {
// struct. Initialized once at construction, then constant.
eventBus *eventbus.Bus
- eventSubs eventbus.Monitor
+ eventBusClient *eventbus.Client
logf logger.Logf
epFunc func([]tailcfg.Endpoint)
derpActiveFunc func()
@@ -631,43 +631,6 @@ func newConn(logf logger.Logf) *Conn {
return c
}
-// consumeEventbusTopics consumes events from all [Conn]-relevant
-// [eventbus.Subscriber]'s and passes them to their related handler. Events are
-// always handled in the order they are received, i.e. the next event is not
-// read until the previous event's handler has returned. It returns when the
-// [eventbus.Client] is closed.
-func (c *Conn) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) {
- // Subscribe calls must return before NewConn otherwise published
- // events can be missed.
- pmSub := eventbus.Subscribe[portmappertype.Mapping](cli)
- filterSub := eventbus.Subscribe[FilterUpdate](cli)
- nodeViewsSub := eventbus.Subscribe[NodeViewsUpdate](cli)
- nodeMutsSub := eventbus.Subscribe[NodeMutationsUpdate](cli)
- syncSub := eventbus.Subscribe[syncPoint](cli)
- allocRelayEndpointSub := eventbus.Subscribe[UDPRelayAllocResp](cli)
- return func(cli *eventbus.Client) {
- for {
- select {
- case <-cli.Done():
- return
- case <-pmSub.Events():
- c.onPortMapChanged()
- case filterUpdate := <-filterSub.Events():
- c.onFilterUpdate(filterUpdate)
- case nodeViews := <-nodeViewsSub.Events():
- c.onNodeViewsUpdate(nodeViews)
- case nodeMuts := <-nodeMutsSub.Events():
- c.onNodeMutationsUpdate(nodeMuts)
- case syncPoint := <-syncSub.Events():
- c.dlogf("magicsock: received sync point after reconfig")
- syncPoint.Signal()
- case allocResp := <-allocRelayEndpointSub.Events():
- c.onUDPRelayAllocResp(allocResp)
- }
- }
- }
-}
-
func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -732,10 +695,10 @@ func NewConn(opts Options) (*Conn, error) {
// Set up publishers and subscribers. Subscribe calls must return before
// NewConn otherwise published events can be missed.
- cli := c.eventBus.Client("magicsock.Conn")
- c.syncPub = eventbus.Publish[syncPoint](cli)
- c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli)
- c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli))
+ ec := c.eventBus.Client("magicsock.Conn")
+ c.eventBusClient = ec
+ c.syncPub = eventbus.Publish[syncPoint](ec)
+ c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec)
c.connCtx, c.connCtxCancel = context.WithCancel(context.Background())
c.donec = c.connCtx.Done()
@@ -791,6 +754,17 @@ func NewConn(opts Options) (*Conn, error) {
}
c.logf("magicsock: disco key = %v", c.discoShort)
+
+ eventbus.SubscribeFunc(ec, func(portmappertype.Mapping) { c.onPortMapChanged() })
+ eventbus.SubscribeFunc(ec, c.onFilterUpdate)
+ eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate)
+ eventbus.SubscribeFunc(ec, c.onNodeMutationsUpdate)
+ eventbus.SubscribeFunc(ec, c.onUDPRelayAllocResp)
+ eventbus.SubscribeFunc(ec, func(p syncPoint) {
+ c.dlogf("magicsock: received sync point after reconfig")
+ p.Signal()
+ })
+
return c, nil
}
@@ -3317,7 +3291,7 @@ func (c *Conn) Close() error {
// deadlock with c.Close().
// 2. Conn.consumeEventbusTopics event handlers may not guard against
// undesirable post/in-progress Conn.Close() behaviors.
- c.eventSubs.Close()
+ c.eventBusClient.Close()
c.mu.Lock()
defer c.mu.Unlock()