summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--ipn/ipnlocal/local.go49
-rw-r--r--wgengine/magicsock/magicsock.go87
2 files changed, 72 insertions, 64 deletions
diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go
index 5fb3d5771..19b7eed32 100644
--- a/ipn/ipnlocal/local.go
+++ b/ipn/ipnlocal/local.go
@@ -99,6 +99,7 @@ import (
"tailscale.com/util/clientmetric"
"tailscale.com/util/deephash"
"tailscale.com/util/dnsname"
+ "tailscale.com/util/eventbus"
"tailscale.com/util/goroutines"
"tailscale.com/util/httpm"
"tailscale.com/util/mak"
@@ -202,6 +203,8 @@ type LocalBackend struct {
keyLogf logger.Logf // for printing list of peers on change
statsLogf logger.Logf // for printing peers stats on change
sys *tsd.System
+ eventbus *eventbus.Bus
+ eventClient *eventbus.Client
health *health.Tracker // always non-nil
metrics metrics
e wgengine.Engine // non-nil; TODO(bradfitz): remove; use sys
@@ -427,6 +430,8 @@ type LocalBackend struct {
//
// See tailscale/corp#29969.
overrideExitNodePolicy bool
+
+ magicSockConfChangeSub *eventbus.Subscriber[magicsock.ConfigurationChanged]
}
// HealthTracker returns the health tracker for the backend.
@@ -532,7 +537,11 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
captiveCancel: nil, // so that we start checkCaptivePortalLoop when Running
needsCaptiveDetection: make(chan bool),
}
- nb := newNodeBackend(ctx, b.sys.Bus.Get())
+ b.eventbus = sys.Bus.Get()
+ b.eventClient = b.eventbus.Client("ipnlocal.LocalBackend")
+ b.magicSockConfChangeSub = eventbus.Subscribe[magicsock.ConfigurationChanged](b.eventClient)
+
+ nb := newNodeBackend(ctx, b.eventbus)
b.currentNodeAtomic.Store(nb)
nb.ready()
@@ -605,6 +614,21 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
return b, nil
}
+func (b *LocalBackend) consumeEventbusTopics() {
+ for {
+ select {
+ case <-b.ctx.Done():
+ b.magicSockConfChangeSub.Close()
+ return
+ case <-b.magicSockConfChangeSub.Events():
+ if b.ctx.Err() != nil {
+ return
+ }
+ go b.authReconfig()
+ }
+ }
+}
+
func (b *LocalBackend) Clock() tstime.Clock { return b.clock }
func (b *LocalBackend) Sys() *tsd.System { return b.sys }
@@ -1756,9 +1780,6 @@ func (b *LocalBackend) SetControlClientStatus(c controlclient.Client, st control
b.setAuthURL(st.URL)
}
b.stateMachine()
- // This is currently (2020-07-28) necessary; conditionally disabling it is fragile!
- // This is where netmap information gets propagated to router and magicsock.
- b.authReconfig()
}
type preferencePolicyInfo struct {
@@ -2262,6 +2283,7 @@ func (b *LocalBackend) getNewControlClientFuncLocked() clientGen {
// initOnce is called on the first call to [LocalBackend.Start].
func (b *LocalBackend) initOnce() {
+ go b.consumeEventbusTopics()
b.extHost.Init()
}
@@ -4422,7 +4444,6 @@ func (b *LocalBackend) changeDisablesExitNodeLocked(prefs ipn.PrefsView, change
// but wasn't empty before, then the change disables
// exit node usage.
return tmpPrefs.ExitNodeID == ""
-
}
// adjustEditPrefsLocked applies additional changes to mp if necessary,
@@ -5087,11 +5108,6 @@ func (b *LocalBackend) readvertiseAppConnectorRoutes() {
// updates are not currently blocked, based on the cached netmap and
// user prefs.
func (b *LocalBackend) authReconfig() {
- // Wait for magicsock to process pending [eventbus] events,
- // such as netmap updates. This should be completed before
- // wireguard-go is reconfigured. See tailscale/tailscale#16369.
- b.MagicConn().Synchronize()
-
b.mu.Lock()
blocked := b.blocked
prefs := b.pm.CurrentPrefs()
@@ -7337,7 +7353,7 @@ func (b *LocalBackend) resetForProfileChangeLockedOnEntry(unlock unlockOnce) err
// down, so no need to do any work.
return nil
}
- newNode := newNodeBackend(b.ctx, b.sys.Bus.Get())
+ newNode := newNodeBackend(b.ctx, b.eventbus)
if oldNode := b.currentNodeAtomic.Swap(newNode); oldNode != nil {
oldNode.shutdown(errNodeContextChanged)
}
@@ -7676,10 +7692,8 @@ var (
// allowedAutoRoute determines if the route being added via AdvertiseRoute (the app connector featuge) should be allowed.
func allowedAutoRoute(ipp netip.Prefix) bool {
// Note: blocking the addrs for globals, not solely the prefixes.
- for _, addr := range disallowedAddrs {
- if ipp.Addr() == addr {
- return false
- }
+ if slices.Contains(disallowedAddrs, ipp.Addr()) {
+ return false
}
for _, pfx := range disallowedRanges {
if pfx.Overlaps(ipp) {
@@ -8113,7 +8127,6 @@ func isAllowedAutoExitNodeID(exitNodeID tailcfg.StableNodeID) bool {
}
if nodes, _ := syspolicy.GetStringArray(syspolicy.AllowedSuggestedExitNodes, nil); nodes != nil {
return slices.Contains(nodes, string(exitNodeID))
-
}
return true // no policy configured; allow all exit nodes
}
@@ -8257,9 +8270,7 @@ func (b *LocalBackend) vipServicesFromPrefsLocked(prefs ipn.PrefsView) []*tailcf
return servicesList
}
-var (
- metricCurrentWatchIPNBus = clientmetric.NewGauge("localbackend_current_watch_ipn_bus")
-)
+var metricCurrentWatchIPNBus = clientmetric.NewGauge("localbackend_current_watch_ipn_bus")
func (b *LocalBackend) stateEncrypted() opt.Bool {
switch runtime.GOOS {
diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go
index a59a38f65..08bdb4620 100644
--- a/wgengine/magicsock/magicsock.go
+++ b/wgengine/magicsock/magicsock.go
@@ -14,6 +14,7 @@ import (
"expvar"
"fmt"
"io"
+ "maps"
"net"
"net/netip"
"reflect"
@@ -181,11 +182,11 @@ type Conn struct {
filterSub *eventbus.Subscriber[FilterUpdate]
nodeViewsSub *eventbus.Subscriber[NodeViewsUpdate]
nodeMutsSub *eventbus.Subscriber[NodeMutationsUpdate]
- syncSub *eventbus.Subscriber[syncPoint]
- syncPub *eventbus.Publisher[syncPoint]
allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq]
allocRelayEndpointSub *eventbus.Subscriber[UDPRelayAllocResp]
+ configChangedPub *eventbus.Publisher[ConfigurationChanged]
subsDoneCh chan struct{} // closed when consumeEventbusTopics returns
+ netInfoPub *eventbus.Publisher[tailcfg.NetInfo]
// pconn4 and pconn6 are the underlying UDP sockets used to
// send/receive packets for wireguard and other magicsock
@@ -423,6 +424,8 @@ type Conn struct {
// metrics contains the metrics for the magicsock instance.
metrics *metrics
+
+ hasReconfigured chan any
}
// SetDebugLoggingEnabled controls whether spammy debug logging is enabled.
@@ -562,20 +565,7 @@ type FilterUpdate struct {
*filter.Filter
}
-// syncPoint is an event published over an [eventbus.Bus] by [Conn.Synchronize].
-// It serves as a synchronization point, allowing to wait until magicsock
-// has processed all pending events.
-type syncPoint chan struct{}
-
-// Wait blocks until [syncPoint.Signal] is called.
-func (s syncPoint) Wait() {
- <-s
-}
-
-// Signal signals the sync point, unblocking the [syncPoint.Wait] call.
-func (s syncPoint) Signal() {
- close(s)
-}
+type ConfigurationChanged struct{}
// UDPRelayAllocReq represents a [*disco.AllocateUDPRelayEndpointRequest]
// reception event. This is signaled over an [eventbus.Bus] from
@@ -612,15 +602,16 @@ type UDPRelayAllocResp struct {
func newConn(logf logger.Logf) *Conn {
discoPrivate := key.NewDisco()
c := &Conn{
- logf: logf,
- derpRecvCh: make(chan derpReadResult, 1), // must be buffered, see issue 3736
- derpStarted: make(chan struct{}),
- peerLastDerp: make(map[key.NodePublic]int),
- peerMap: newPeerMap(),
- discoInfo: make(map[key.DiscoPublic]*discoInfo),
- discoPrivate: discoPrivate,
- discoPublic: discoPrivate.Public(),
- cloudInfo: newCloudInfo(logf),
+ logf: logf,
+ derpRecvCh: make(chan derpReadResult, 1), // must be buffered, see issue 3736
+ derpStarted: make(chan struct{}),
+ peerLastDerp: make(map[key.NodePublic]int),
+ peerMap: newPeerMap(),
+ discoInfo: make(map[key.DiscoPublic]*discoInfo),
+ discoPrivate: discoPrivate,
+ discoPublic: discoPrivate.Public(),
+ cloudInfo: newCloudInfo(logf),
+ hasReconfigured: make(chan any, 25),
}
c.discoShort = c.discoPublic.ShortString()
c.bind = &connBind{Conn: c, closed: true}
@@ -658,15 +649,22 @@ func (c *Conn) consumeEventbusTopics() {
c.onPortMapChanged()
case filterUpdate := <-c.filterSub.Events():
c.onFilterUpdate(filterUpdate)
+ c.hasReconfigured <- new(any)
case nodeViews := <-c.nodeViewsSub.Events():
c.onNodeViewsUpdate(nodeViews)
+ c.hasReconfigured <- new(any)
case nodeMuts := <-c.nodeMutsSub.Events():
c.onNodeMutationsUpdate(nodeMuts)
- case syncPoint := <-c.syncSub.Events():
- c.dlogf("magicsock: received sync point after reconfig")
- syncPoint.Signal()
case allocResp := <-c.allocRelayEndpointSub.Events():
c.onUDPRelayAllocResp(allocResp)
+ c.hasReconfigured <- new(any)
+ case <-c.hasReconfigured:
+ c.dlogf("magicsock: configuration has changed")
+ // Drain channel as we only want to reconfigure once
+ for len(c.hasReconfigured) > 0 {
+ <-c.hasReconfigured
+ }
+ c.configChangedPub.Publish(ConfigurationChanged{})
}
}
}
@@ -700,18 +698,6 @@ func (c *Conn) onUDPRelayAllocResp(allocResp UDPRelayAllocResp) {
go c.sendDiscoMessage(epAddr{ap: derpAddr}, ep.publicKey, disco.key, allocResp.Message, discoVerboseLog)
}
-// Synchronize waits for all [eventbus] events published
-// prior to this call to be processed by the receiver.
-func (c *Conn) Synchronize() {
- if c.syncPub == nil {
- // Eventbus is not used; no need to synchronize (in certain tests).
- return
- }
- sp := syncPoint(make(chan struct{}))
- c.syncPub.Publish(sp)
- sp.Wait()
-}
-
// NewConn creates a magic Conn listening on opts.Port.
// As the set of possible endpoints for a Conn changes, the
// callback opts.EndpointsFunc is called.
@@ -741,10 +727,10 @@ func NewConn(opts Options) (*Conn, error) {
c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient)
c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient)
c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient)
- c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient)
- c.syncPub = eventbus.Publish[syncPoint](c.eventClient)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient)
c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient)
+ c.netInfoPub = eventbus.Publish[tailcfg.NetInfo](c.eventClient)
+ c.configChangedPub = eventbus.Publish[ConfigurationChanged](c.eventClient)
c.subsDoneCh = make(chan struct{})
go c.consumeEventbusTopics()
@@ -1123,12 +1109,21 @@ func (c *Conn) callNetInfoCallback(ni *tailcfg.NetInfo) {
func (c *Conn) callNetInfoCallbackLocked(ni *tailcfg.NetInfo) {
c.netInfoLast = ni
+ c.publishNetInfo(ni)
if c.netInfoFunc != nil {
c.dlogf("[v1] magicsock: netInfo update: %+v", ni)
go c.netInfoFunc(ni)
}
}
+func (c *Conn) publishNetInfo(ni *tailcfg.NetInfo) {
+ if c.netInfoPub != nil {
+ newNetInfo := *ni
+ newNetInfo.DERPLatency = maps.Clone(ni.DERPLatency)
+ c.netInfoPub.Publish(newNetInfo)
+ }
+}
+
// addValidDiscoPathForTest makes addr a validated disco address for
// discoKey. It's used in tests to enable receiving of packets from
// addr without having to spin up the entire active discovery
@@ -4085,9 +4080,11 @@ type lazyEndpoint struct {
src epAddr
}
-var _ conn.InitiationAwareEndpoint = (*lazyEndpoint)(nil)
-var _ conn.PeerAwareEndpoint = (*lazyEndpoint)(nil)
-var _ conn.Endpoint = (*lazyEndpoint)(nil)
+var (
+ _ conn.InitiationAwareEndpoint = (*lazyEndpoint)(nil)
+ _ conn.PeerAwareEndpoint = (*lazyEndpoint)(nil)
+ _ conn.Endpoint = (*lazyEndpoint)(nil)
+)
// InitiationMessagePublicKey implements [conn.InitiationAwareEndpoint].
// wireguard-go calls us here if we passed it a [*lazyEndpoint] for an