summaryrefslogtreecommitdiffhomepage
path: root/wgengine/magicsock/magicsock.go
diff options
context:
space:
mode:
Diffstat (limited to 'wgengine/magicsock/magicsock.go')
-rw-r--r--wgengine/magicsock/magicsock.go60
1 files changed, 17 insertions, 43 deletions
diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go
index e3cf249c5..db93ec0bb 100644
--- a/wgengine/magicsock/magicsock.go
+++ b/wgengine/magicsock/magicsock.go
@@ -156,7 +156,7 @@ type Conn struct {
// struct. Initialized once at construction, then constant.
eventBus *eventbus.Bus
- eventSubs eventbus.Monitor
+ eventBusClient *eventbus.Client
logf logger.Logf
epFunc func([]tailcfg.Endpoint)
derpActiveFunc func()
@@ -631,43 +631,6 @@ func newConn(logf logger.Logf) *Conn {
return c
}
-// consumeEventbusTopics consumes events from all [Conn]-relevant
-// [eventbus.Subscriber]'s and passes them to their related handler. Events are
-// always handled in the order they are received, i.e. the next event is not
-// read until the previous event's handler has returned. It returns when the
-// [eventbus.Client] is closed.
-func (c *Conn) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) {
- // Subscribe calls must return before NewConn otherwise published
- // events can be missed.
- pmSub := eventbus.Subscribe[portmappertype.Mapping](cli)
- filterSub := eventbus.Subscribe[FilterUpdate](cli)
- nodeViewsSub := eventbus.Subscribe[NodeViewsUpdate](cli)
- nodeMutsSub := eventbus.Subscribe[NodeMutationsUpdate](cli)
- syncSub := eventbus.Subscribe[syncPoint](cli)
- allocRelayEndpointSub := eventbus.Subscribe[UDPRelayAllocResp](cli)
- return func(cli *eventbus.Client) {
- for {
- select {
- case <-cli.Done():
- return
- case <-pmSub.Events():
- c.onPortMapChanged()
- case filterUpdate := <-filterSub.Events():
- c.onFilterUpdate(filterUpdate)
- case nodeViews := <-nodeViewsSub.Events():
- c.onNodeViewsUpdate(nodeViews)
- case nodeMuts := <-nodeMutsSub.Events():
- c.onNodeMutationsUpdate(nodeMuts)
- case syncPoint := <-syncSub.Events():
- c.dlogf("magicsock: received sync point after reconfig")
- syncPoint.Signal()
- case allocResp := <-allocRelayEndpointSub.Events():
- c.onUDPRelayAllocResp(allocResp)
- }
- }
- }
-}
-
func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -732,10 +695,10 @@ func NewConn(opts Options) (*Conn, error) {
// Set up publishers and subscribers. Subscribe calls must return before
// NewConn otherwise published events can be missed.
- cli := c.eventBus.Client("magicsock.Conn")
- c.syncPub = eventbus.Publish[syncPoint](cli)
- c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli)
- c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli))
+ ec := c.eventBus.Client("magicsock.Conn")
+ c.eventBusClient = ec
+ c.syncPub = eventbus.Publish[syncPoint](ec)
+ c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec)
c.connCtx, c.connCtxCancel = context.WithCancel(context.Background())
c.donec = c.connCtx.Done()
@@ -791,6 +754,17 @@ func NewConn(opts Options) (*Conn, error) {
}
c.logf("magicsock: disco key = %v", c.discoShort)
+
+ eventbus.SubscribeFunc(ec, func(portmappertype.Mapping) { c.onPortMapChanged() })
+ eventbus.SubscribeFunc(ec, c.onFilterUpdate)
+ eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate)
+ eventbus.SubscribeFunc(ec, c.onNodeMutationsUpdate)
+ eventbus.SubscribeFunc(ec, c.onUDPRelayAllocResp)
+ eventbus.SubscribeFunc(ec, func(p syncPoint) {
+ c.dlogf("magicsock: received sync point after reconfig")
+ p.Signal()
+ })
+
return c, nil
}
@@ -3317,7 +3291,7 @@ func (c *Conn) Close() error {
// deadlock with c.Close().
// 2. Conn.consumeEventbusTopics event handlers may not guard against
// undesirable post/in-progress Conn.Close() behaviors.
- c.eventSubs.Close()
+ c.eventBusClient.Close()
c.mu.Lock()
defer c.mu.Unlock()