summaryrefslogtreecommitdiffhomepage
path: root/wgengine
diff options
context:
space:
mode:
authorBrad Fitzpatrick <bradfitz@tailscale.com>2025-11-14 10:18:30 -0800
committerBrad Fitzpatrick <bradfitz@tailscale.com>2025-11-14 10:18:30 -0800
commit5b0997536fa8ea9cbde7961e08bbe78378d0fde4 (patch)
tree99ac1621342910b9f7c54bcc1ca80c179c0a2dd4 /wgengine
parent124301fbb651382959f8bfe9b1f1765e42e8a3ef (diff)
downloadtailscale-bradfitz/getstatus.tar.xz
tailscale-bradfitz/getstatus.zip
wgengine, ipn/ipnlocal, wgengine/magicsock: remove RequestStatus, eventbus-ify thingsbradfitz/getstatus
Updates #17900 Change-Id: Ia53a3f195a82256d8f915c8928d8a775f723259d Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
Diffstat (limited to 'wgengine')
-rw-r--r--wgengine/magicsock/derp.go13
-rw-r--r--wgengine/magicsock/magicsock.go33
-rw-r--r--wgengine/userspace.go168
-rw-r--r--wgengine/watchdog.go12
-rw-r--r--wgengine/wgengine.go25
5 files changed, 88 insertions, 163 deletions
diff --git a/wgengine/magicsock/derp.go b/wgengine/magicsock/derp.go
index 37a4f1a64..8f44d3977 100644
--- a/wgengine/magicsock/derp.go
+++ b/wgengine/magicsock/derp.go
@@ -27,6 +27,7 @@ import (
"tailscale.com/tstime/mono"
"tailscale.com/types/key"
"tailscale.com/types/logger"
+ "tailscale.com/types/topics"
"tailscale.com/util/backoff"
"tailscale.com/util/mak"
"tailscale.com/util/rands"
@@ -396,6 +397,11 @@ func (c *Conn) derpWriteChanForRegion(regionID int, peer key.NodePublic) chan de
*ad.lastWrite = time.Now()
ad.createTime = time.Now()
c.activeDerp[regionID] = ad
+ c.derpConnChangePub.Publish(topics.DERPConnChange{
+ RegionID: regionID,
+ Connected: true,
+ LiveDERPs: len(c.activeDerp),
+ })
metricNumDERPConns.Set(int64(len(c.activeDerp)))
c.logActiveDerpLocked()
c.setPeerLastDerpLocked(peer, regionID, regionID)
@@ -424,8 +430,6 @@ func (c *Conn) derpWriteChanForRegion(regionID int, peer key.NodePublic) chan de
go c.runDerpReader(ctx, regionID, dc, wg, startGate)
go c.runDerpWriter(ctx, dc, ch, wg, startGate)
- go c.derpActiveFunc()
-
return ad.writeCh
}
@@ -874,6 +878,11 @@ func (c *Conn) closeDerpLocked(regionID int, why string) {
go ad.c.Close()
ad.cancel()
delete(c.activeDerp, regionID)
+ c.derpConnChangePub.Publish(topics.DERPConnChange{
+ RegionID: regionID,
+ Connected: false,
+ LiveDERPs: len(c.activeDerp),
+ })
metricNumDERPConns.Set(int64(len(c.activeDerp)))
}
}
diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go
index d44cf1c11..00ea623a2 100644
--- a/wgengine/magicsock/magicsock.go
+++ b/wgengine/magicsock/magicsock.go
@@ -58,6 +58,7 @@ import (
"tailscale.com/types/netlogfunc"
"tailscale.com/types/netmap"
"tailscale.com/types/nettype"
+ "tailscale.com/types/topics"
"tailscale.com/types/views"
"tailscale.com/util/clientmetric"
"tailscale.com/util/eventbus"
@@ -158,8 +159,6 @@ type Conn struct {
eventBus *eventbus.Bus
eventClient *eventbus.Client
logf logger.Logf
- epFunc func([]tailcfg.Endpoint)
- derpActiveFunc func()
idleFunc func() time.Duration // nil means unknown
testOnlyPacketListener nettype.PacketListener
noteRecvActivity func(key.NodePublic) // or nil, see Options.NoteRecvActivity
@@ -181,6 +180,8 @@ type Conn struct {
syncPub *eventbus.Publisher[syncPoint]
allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq]
portUpdatePub *eventbus.Publisher[router.PortUpdate]
+ derpConnChangePub *eventbus.Publisher[topics.DERPConnChange]
+ epChangePub *eventbus.Publisher[topics.EndpointsChanged]
// pconn4 and pconn6 are the underlying UDP sockets used to
// send/receive packets for wireguard and other magicsock
@@ -446,14 +447,6 @@ type Options struct {
// Zero means to pick one automatically.
Port uint16
- // EndpointsFunc optionally provides a func to be called when
- // endpoints change. The called func does not own the slice.
- EndpointsFunc func([]tailcfg.Endpoint)
-
- // DERPActiveFunc optionally provides a func to be called when
- // a connection is made to a DERP server.
- DERPActiveFunc func()
-
// IdleFunc optionally provides a func to return how long
// it's been since a TUN packet was sent or received.
IdleFunc func() time.Duration
@@ -507,20 +500,6 @@ func (o *Options) logf() logger.Logf {
return o.Logf
}
-func (o *Options) endpointsFunc() func([]tailcfg.Endpoint) {
- if o == nil || o.EndpointsFunc == nil {
- return func([]tailcfg.Endpoint) {}
- }
- return o.EndpointsFunc
-}
-
-func (o *Options) derpActiveFunc() func() {
- if o == nil || o.DERPActiveFunc == nil {
- return func() {}
- }
- return o.DERPActiveFunc
-}
-
// NodeViewsUpdate represents an update event of [tailcfg.NodeView] for all
// nodes. This event is published over an [eventbus.Bus]. It may be published
// with an invalid SelfNode, and/or zero/nil Peers. [magicsock.Conn] is the sole
@@ -686,8 +665,6 @@ func NewConn(opts Options) (*Conn, error) {
c.eventBus = opts.EventBus
c.port.Store(uint32(opts.Port))
c.controlKnobs = opts.ControlKnobs
- c.epFunc = opts.endpointsFunc()
- c.derpActiveFunc = opts.derpActiveFunc()
c.idleFunc = opts.IdleFunc
c.testOnlyPacketListener = opts.TestOnlyPacketListener
c.noteRecvActivity = opts.NoteRecvActivity
@@ -699,6 +676,8 @@ func NewConn(opts Options) (*Conn, error) {
c.syncPub = eventbus.Publish[syncPoint](ec)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](ec)
c.portUpdatePub = eventbus.Publish[router.PortUpdate](ec)
+ c.derpConnChangePub = eventbus.Publish[topics.DERPConnChange](ec)
+ c.epChangePub = eventbus.Publish[topics.EndpointsChanged](ec)
eventbus.SubscribeFunc(ec, c.onPortMapChanged)
eventbus.SubscribeFunc(ec, c.onFilterUpdate)
eventbus.SubscribeFunc(ec, c.onNodeViewsUpdate)
@@ -973,7 +952,7 @@ func (c *Conn) updateEndpoints(why string) {
if c.setEndpoints(endpoints) {
c.logEndpointChange(endpoints)
- c.epFunc(endpoints)
+ c.epChangePub.Publish(topics.EndpointsChanged(endpoints))
}
}
diff --git a/wgengine/userspace.go b/wgengine/userspace.go
index 8ad771fc5..9d7449336 100644
--- a/wgengine/userspace.go
+++ b/wgengine/userspace.go
@@ -15,7 +15,6 @@ import (
"net/netip"
"reflect"
"runtime"
- "slices"
"strings"
"sync"
"time"
@@ -46,6 +45,7 @@ import (
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/types/netmap"
+ "tailscale.com/types/topics"
"tailscale.com/types/views"
"tailscale.com/util/backoff"
"tailscale.com/util/checkchange"
@@ -95,8 +95,10 @@ const networkLoggerUploadTimeout = 5 * time.Second
type userspaceEngine struct {
// eventBus will eventually become required, but for now may be nil.
- eventBus *eventbus.Bus
- eventClient *eventbus.Client
+ eventBus *eventbus.Bus
+ eventClient *eventbus.Client
+ tunStatusPub *eventbus.Publisher[topics.TUNStatusChange]
+ peerRecvActivityPub *eventbus.Publisher[topics.PeerRecvActivity]
logf logger.Logf
wgLogger *wglog.Logger // a wireguard-go logging wrapper
@@ -142,13 +144,12 @@ type userspaceEngine struct {
lastStatusPollTime mono.Time // last time we polled the engine status
reconfigureVPN func() error // or nil
- mu sync.Mutex // guards following; see lock order comment below
- netMap *netmap.NetworkMap // or nil
- closing bool // Close was called (even if we're still closing)
- statusCallback StatusCallback
- peerSequence views.Slice[key.NodePublic]
- endpoints []tailcfg.Endpoint
- pendOpen map[flowtrackTuple]*pendingOpenFlow // see pendopen.go
+ mu sync.Mutex // guards following; see lock order comment below
+ netMap *netmap.NetworkMap // or nil
+ closing bool // Close was called (even if we're still closing)
+ peerSequence views.Slice[key.NodePublic]
+ endpoints []tailcfg.Endpoint
+ pendOpen map[flowtrackTuple]*pendingOpenFlow // see pendopen.go
// pongCallback is the map of response handlers waiting for disco or TSMP
// pong callbacks. The map key is a random slice of bytes.
@@ -391,25 +392,16 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
logf("link state: %+v", e.netMon.InterfaceState())
- endpointsFn := func(endpoints []tailcfg.Endpoint) {
- e.mu.Lock()
- e.endpoints = append(e.endpoints[:0], endpoints...)
- e.mu.Unlock()
-
- e.RequestStatus()
- }
magicsockOpts := magicsock.Options{
- EventBus: e.eventBus,
- Logf: logf,
- Port: conf.ListenPort,
- EndpointsFunc: endpointsFn,
- DERPActiveFunc: e.RequestStatus,
- IdleFunc: e.tundev.IdleDuration,
- NetMon: e.netMon,
- HealthTracker: e.health,
- Metrics: conf.Metrics,
- ControlKnobs: conf.ControlKnobs,
- PeerByKeyFunc: e.PeerByKey,
+ EventBus: e.eventBus,
+ Logf: logf,
+ Port: conf.ListenPort,
+ IdleFunc: e.tundev.IdleDuration,
+ NetMon: e.netMon,
+ HealthTracker: e.health,
+ Metrics: conf.Metrics,
+ ControlKnobs: conf.ControlKnobs,
+ PeerByKeyFunc: e.PeerByKey,
}
if buildfeatures.HasLazyWG {
magicsockOpts.NoteRecvActivity = e.noteRecvActivity
@@ -477,22 +469,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
})
go func() {
- up := false
- for event := range e.tundev.EventsUpDown() {
- if event&tun.EventUp != 0 && !up {
- e.logf("external route: up")
- e.RequestStatus()
- up = true
- }
- if event&tun.EventDown != 0 && up {
- e.logf("external route: down")
- e.RequestStatus()
- up = false
- }
- }
- }()
-
- go func() {
select {
case <-e.wgdev.Wait():
e.mu.Lock()
@@ -547,10 +523,29 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
e.linkChange(&cd)
})
e.eventClient = ec
+ e.tunStatusPub = eventbus.Publish[topics.TUNStatusChange](ec)
+ e.peerRecvActivityPub = eventbus.Publish[topics.PeerRecvActivity](ec)
+ go e.publishTUNSTatusLoop()
e.logf("Engine created.")
return e, nil
}
+func (e *userspaceEngine) publishTUNSTatusLoop() {
+ up := false
+ for event := range e.tundev.EventsUpDown() {
+ if event&tun.EventUp != 0 && !up {
+ e.logf("external route: up")
+ e.tunStatusPub.Publish(topics.TUNStatusChange{Up: true})
+ up = true
+ }
+ if event&tun.EventDown != 0 && up {
+ e.logf("external route: down")
+ e.tunStatusPub.Publish(topics.TUNStatusChange{Up: true})
+ up = false
+ }
+ }
+}
+
// echoRespondToAll is an inbound post-filter responding to all echo requests.
func echoRespondToAll(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) {
if p.IsEchoRequest() {
@@ -673,7 +668,7 @@ func (e *userspaceEngine) noteRecvActivity(nk key.NodePublic) {
// tailscaled alone did not, hence this.
if e.lastStatusPollTime.IsZero() || now.Sub(e.lastStatusPollTime) >= statusPollInterval {
e.lastStatusPollTime = now
- go e.RequestStatus()
+ e.peerRecvActivityPub.Publish(topics.PeerRecvActivity{PeerKey: nk})
}
// If the last activity time jumped a bunch (say, at least
@@ -940,10 +935,7 @@ func (e *userspaceEngine) ResetAndStop() (*Status, error) {
}
bo := backoff.NewBackoff("UserspaceEngineResetAndStop", e.logf, 1*time.Second)
for {
- st, err := e.getStatus()
- if err != nil {
- return nil, err
- }
+ st := e.GetStatus()
if len(st.Peers) == 0 && st.DERPs == 0 {
return st, nil
}
@@ -1180,18 +1172,6 @@ func (e *userspaceEngine) SetJailedFilter(filt *filter.Filter) {
e.tundev.SetJailedFilter(filt)
}
-func (e *userspaceEngine) SetStatusCallback(cb StatusCallback) {
- e.mu.Lock()
- defer e.mu.Unlock()
- e.statusCallback = cb
-}
-
-func (e *userspaceEngine) getStatusCallback() StatusCallback {
- e.mu.Lock()
- defer e.mu.Unlock()
- return e.statusCallback
-}
-
var ErrEngineClosing = errors.New("engine closing; no status")
func (e *userspaceEngine) PeerByKey(pubKey key.NodePublic) (_ wgint.Peer, ok bool) {
@@ -1221,7 +1201,13 @@ func (e *userspaceEngine) getPeerStatusLite(pk key.NodePublic) (status ipnstate.
return status, true
}
-func (e *userspaceEngine) getStatus() (*Status, error) {
+func (e *userspaceEngine) NumConfiguredPeers() int {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ return e.peerSequence.Len()
+}
+
+func (e *userspaceEngine) GetStatus() *Status {
// Grab derpConns before acquiring wgLock to not violate lock ordering;
// the DERPs method acquires magicsock.Conn.mu.
// (See comment in userspaceEngine's declaration.)
@@ -1230,60 +1216,22 @@ func (e *userspaceEngine) getStatus() (*Status, error) {
e.mu.Lock()
closing := e.closing
peerKeys := e.peerSequence
- localAddrs := slices.Clone(e.endpoints)
e.mu.Unlock()
if closing {
- return nil, ErrEngineClosing
+ return new(Status)
}
- peers := make([]ipnstate.PeerStatusLite, 0, peerKeys.Len())
+ st := &Status{
+ DERPs: derpConns,
+ Peers: make([]ipnstate.PeerStatusLite, 0, peerKeys.Len()),
+ }
for _, key := range peerKeys.All() {
if status, ok := e.getPeerStatusLite(key); ok {
- peers = append(peers, status)
+ st.Peers = append(st.Peers, status)
}
}
-
- return &Status{
- AsOf: time.Now(),
- LocalAddrs: localAddrs,
- Peers: peers,
- DERPs: derpConns,
- }, nil
-}
-
-func (e *userspaceEngine) RequestStatus() {
- // This is slightly tricky. e.getStatus() can theoretically get
- // blocked inside wireguard for a while, and RequestStatus() is
- // sometimes called from a goroutine, so we don't want a lot of
- // them hanging around. On the other hand, requesting multiple
- // status updates simultaneously is pointless anyway; they will
- // all say the same thing.
-
- // Enqueue at most one request. If one is in progress already, this
- // adds one more to the queue. If one has been requested but not
- // started, it is a no-op.
- select {
- case e.reqCh <- struct{}{}:
- default:
- }
-
- // Dequeue at most one request. Another thread may have already
- // dequeued the request we enqueued above, which is fine, since the
- // information is guaranteed to be at least as recent as the current
- // call to RequestStatus().
- select {
- case <-e.reqCh:
- s, err := e.getStatus()
- if s == nil && err == nil {
- e.logf("[unexpected] RequestStatus: both s and err are nil")
- return
- }
- if cb := e.getStatusCallback(); cb != nil {
- cb(s, err)
- }
- default:
- }
+ return st
}
func (e *userspaceEngine) Close() {
@@ -1388,11 +1336,7 @@ func (e *userspaceEngine) SetNetworkMap(nm *netmap.NetworkMap) {
}
func (e *userspaceEngine) UpdateStatus(sb *ipnstate.StatusBuilder) {
- st, err := e.getStatus()
- if err != nil {
- e.logf("wgengine: getStatus: %v", err)
- return
- }
+ st := e.GetStatus()
if sb.WantPeers {
for _, ps := range st.Peers {
sb.AddPeer(ps.NodeKey, &ipnstate.PeerStatus{
diff --git a/wgengine/watchdog.go b/wgengine/watchdog.go
index 9cc4ed3b5..e3e9a024c 100644
--- a/wgengine/watchdog.go
+++ b/wgengine/watchdog.go
@@ -142,14 +142,16 @@ func (e *watchdogEngine) GetJailedFilter() *filter.Filter {
func (e *watchdogEngine) SetJailedFilter(filt *filter.Filter) {
e.watchdog("SetJailedFilter", func() { e.wrap.SetJailedFilter(filt) })
}
-func (e *watchdogEngine) SetStatusCallback(cb StatusCallback) {
- e.watchdog("SetStatusCallback", func() { e.wrap.SetStatusCallback(cb) })
-}
func (e *watchdogEngine) UpdateStatus(sb *ipnstate.StatusBuilder) {
e.watchdog("UpdateStatus", func() { e.wrap.UpdateStatus(sb) })
}
-func (e *watchdogEngine) RequestStatus() {
- e.watchdog("RequestStatus", func() { e.wrap.RequestStatus() })
+func (e *watchdogEngine) GetStatus() (st *Status) {
+ e.watchdog("GetStatus", func() { st = e.wrap.GetStatus() })
+ return st
+}
+func (e *watchdogEngine) NumConfiguredPeers() (n int) {
+ e.watchdog("NumConfiguredPeers", func() { n = e.wrap.NumConfiguredPeers() })
+ return n
}
func (e *watchdogEngine) SetNetworkMap(nm *netmap.NetworkMap) {
e.watchdog("SetNetworkMap", func() { e.wrap.SetNetworkMap(nm) })
diff --git a/wgengine/wgengine.go b/wgengine/wgengine.go
index be7873147..04de2bed2 100644
--- a/wgengine/wgengine.go
+++ b/wgengine/wgengine.go
@@ -7,7 +7,6 @@ package wgengine
import (
"errors"
"net/netip"
- "time"
"tailscale.com/ipn/ipnstate"
"tailscale.com/net/dns"
@@ -24,19 +23,12 @@ import (
// Status is the Engine status.
//
// TODO(bradfitz): remove this, subset of ipnstate? Need to migrate users.
+// TODO(bradfitz): at least view-ify this?
type Status struct {
- AsOf time.Time // the time at which the status was calculated
- Peers []ipnstate.PeerStatusLite
- LocalAddrs []tailcfg.Endpoint // the set of possible endpoints for the magic conn
- DERPs int // number of active DERP connections
+ Peers []ipnstate.PeerStatusLite
+ DERPs int // number of active DERP connections
}
-// StatusCallback is the type of status callbacks used by
-// Engine.SetStatusCallback.
-//
-// Exactly one of Status or error is non-nil.
-type StatusCallback func(*Status, error)
-
// NetworkMapCallback is the type used by callbacks that hook
// into network map updates.
type NetworkMapCallback func(*netmap.NetworkMap)
@@ -93,13 +85,12 @@ type Engine interface {
// SetJailedFilter updates the packet filter for jailed nodes.
SetJailedFilter(*filter.Filter)
- // SetStatusCallback sets the function to call when the
- // WireGuard status changes.
- SetStatusCallback(StatusCallback)
+ // GetStatus returns the current Engine status.
+ GetStatus() *Status
- // RequestStatus requests a WireGuard status update right
- // away, sent to the callback registered via SetStatusCallback.
- RequestStatus()
+ // NumConfiguredPeers returns the number of currently configured peers,
+ // regardless of activity.
+ NumConfiguredPeers() int
// PeerByKey returns the WireGuard status of the provided peer.
// If the peer is not found, ok is false.