diff options
Diffstat (limited to 'wgengine/magicsock')
| -rw-r--r-- | wgengine/magicsock/magicsock.go | 60 |
1 files changed, 17 insertions, 43 deletions
diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index e3cf249c5..db93ec0bb 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -156,7 +156,7 @@ type Conn struct { // struct. Initialized once at construction, then constant. eventBus *eventbus.Bus - eventSubs eventbus.Monitor + eventBusClient *eventbus.Client logf logger.Logf epFunc func([]tailcfg.Endpoint) derpActiveFunc func() @@ -631,43 +631,6 @@ func newConn(logf logger.Logf) *Conn { return c } -// consumeEventbusTopics consumes events from all [Conn]-relevant -// [eventbus.Subscriber]'s and passes them to their related handler. Events are -// always handled in the order they are received, i.e. the next event is not -// read until the previous event's handler has returned. It returns when the -// [eventbus.Client] is closed. -func (c *Conn) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) { - // Subscribe calls must return before NewConn otherwise published - // events can be missed. - pmSub := eventbus.Subscribe[portmappertype.Mapping](cli) - filterSub := eventbus.Subscribe[FilterUpdate](cli) - nodeViewsSub := eventbus.Subscribe[NodeViewsUpdate](cli) - nodeMutsSub := eventbus.Subscribe[NodeMutationsUpdate](cli) - syncSub := eventbus.Subscribe[syncPoint](cli) - allocRelayEndpointSub := eventbus.Subscribe[UDPRelayAllocResp](cli) - return func(cli *eventbus.Client) { - for { - select { - case <-cli.Done(): - return - case <-pmSub.Events(): - c.onPortMapChanged() - case filterUpdate := <-filterSub.Events(): - c.onFilterUpdate(filterUpdate) - case nodeViews := <-nodeViewsSub.Events(): - c.onNodeViewsUpdate(nodeViews) - case nodeMuts := <-nodeMutsSub.Events(): - c.onNodeMutationsUpdate(nodeMuts) - case syncPoint := <-syncSub.Events(): - c.dlogf("magicsock: received sync point after reconfig") - syncPoint.Signal() - case allocResp := <-allocRelayEndpointSub.Events(): - c.onUDPRelayAllocResp(allocResp) - } - } - } -} - func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) { c.mu.Lock() defer c.mu.Unlock() @@ -732,10 +695,10 @@ func NewConn(opts Options) (*Conn, error) { // Set up publishers and subscribers. Subscribe calls must return before // NewConn otherwise published events can be missed. - cli := c.eventBus.Client("magicsock.Conn") - c.syncPub = eventbus.Publish[syncPoint](cli) - c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli) - c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli)) + ec := c.eventBus.Client("magicsock.Conn") + c.eventBusClient = ec + c.syncPub = eventbus.Publish[syncPoint](ec) + c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec) c.connCtx, c.connCtxCancel = context.WithCancel(context.Background()) c.donec = c.connCtx.Done() @@ -791,6 +754,17 @@ func NewConn(opts Options) (*Conn, error) { } c.logf("magicsock: disco key = %v", c.discoShort) + + eventbus.SubscribeFunc(ec, func(portmappertype.Mapping) { c.onPortMapChanged() }) + eventbus.SubscribeFunc(ec, c.onFilterUpdate) + eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate) + eventbus.SubscribeFunc(ec, c.onNodeMutationsUpdate) + eventbus.SubscribeFunc(ec, c.onUDPRelayAllocResp) + eventbus.SubscribeFunc(ec, func(p syncPoint) { + c.dlogf("magicsock: received sync point after reconfig") + p.Signal() + }) + return c, nil } @@ -3317,7 +3291,7 @@ func (c *Conn) Close() error { // deadlock with c.Close(). // 2. Conn.consumeEventbusTopics event handlers may not guard against // undesirable post/in-progress Conn.Close() behaviors. - c.eventSubs.Close() + c.eventBusClient.Close() c.mu.Lock() defer c.mu.Unlock() |
