summaryrefslogtreecommitdiffhomepage
path: root/wgengine/userspace.go
diff options
context:
space:
mode:
Diffstat (limited to 'wgengine/userspace.go')
-rw-r--r--wgengine/userspace.go168
1 files changed, 56 insertions, 112 deletions
diff --git a/wgengine/userspace.go b/wgengine/userspace.go
index 8ad771fc5..9d7449336 100644
--- a/wgengine/userspace.go
+++ b/wgengine/userspace.go
@@ -15,7 +15,6 @@ import (
"net/netip"
"reflect"
"runtime"
- "slices"
"strings"
"sync"
"time"
@@ -46,6 +45,7 @@ import (
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/types/netmap"
+ "tailscale.com/types/topics"
"tailscale.com/types/views"
"tailscale.com/util/backoff"
"tailscale.com/util/checkchange"
@@ -95,8 +95,10 @@ const networkLoggerUploadTimeout = 5 * time.Second
type userspaceEngine struct {
// eventBus will eventually become required, but for now may be nil.
- eventBus *eventbus.Bus
- eventClient *eventbus.Client
+ eventBus *eventbus.Bus
+ eventClient *eventbus.Client
+ tunStatusPub *eventbus.Publisher[topics.TUNStatusChange]
+ peerRecvActivityPub *eventbus.Publisher[topics.PeerRecvActivity]
logf logger.Logf
wgLogger *wglog.Logger // a wireguard-go logging wrapper
@@ -142,13 +144,12 @@ type userspaceEngine struct {
lastStatusPollTime mono.Time // last time we polled the engine status
reconfigureVPN func() error // or nil
- mu sync.Mutex // guards following; see lock order comment below
- netMap *netmap.NetworkMap // or nil
- closing bool // Close was called (even if we're still closing)
- statusCallback StatusCallback
- peerSequence views.Slice[key.NodePublic]
- endpoints []tailcfg.Endpoint
- pendOpen map[flowtrackTuple]*pendingOpenFlow // see pendopen.go
+ mu sync.Mutex // guards following; see lock order comment below
+ netMap *netmap.NetworkMap // or nil
+ closing bool // Close was called (even if we're still closing)
+ peerSequence views.Slice[key.NodePublic]
+ endpoints []tailcfg.Endpoint
+ pendOpen map[flowtrackTuple]*pendingOpenFlow // see pendopen.go
// pongCallback is the map of response handlers waiting for disco or TSMP
// pong callbacks. The map key is a random slice of bytes.
@@ -391,25 +392,16 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
logf("link state: %+v", e.netMon.InterfaceState())
- endpointsFn := func(endpoints []tailcfg.Endpoint) {
- e.mu.Lock()
- e.endpoints = append(e.endpoints[:0], endpoints...)
- e.mu.Unlock()
-
- e.RequestStatus()
- }
magicsockOpts := magicsock.Options{
- EventBus: e.eventBus,
- Logf: logf,
- Port: conf.ListenPort,
- EndpointsFunc: endpointsFn,
- DERPActiveFunc: e.RequestStatus,
- IdleFunc: e.tundev.IdleDuration,
- NetMon: e.netMon,
- HealthTracker: e.health,
- Metrics: conf.Metrics,
- ControlKnobs: conf.ControlKnobs,
- PeerByKeyFunc: e.PeerByKey,
+ EventBus: e.eventBus,
+ Logf: logf,
+ Port: conf.ListenPort,
+ IdleFunc: e.tundev.IdleDuration,
+ NetMon: e.netMon,
+ HealthTracker: e.health,
+ Metrics: conf.Metrics,
+ ControlKnobs: conf.ControlKnobs,
+ PeerByKeyFunc: e.PeerByKey,
}
if buildfeatures.HasLazyWG {
magicsockOpts.NoteRecvActivity = e.noteRecvActivity
@@ -477,22 +469,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
})
go func() {
- up := false
- for event := range e.tundev.EventsUpDown() {
- if event&tun.EventUp != 0 && !up {
- e.logf("external route: up")
- e.RequestStatus()
- up = true
- }
- if event&tun.EventDown != 0 && up {
- e.logf("external route: down")
- e.RequestStatus()
- up = false
- }
- }
- }()
-
- go func() {
select {
case <-e.wgdev.Wait():
e.mu.Lock()
@@ -547,10 +523,29 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
e.linkChange(&cd)
})
e.eventClient = ec
+ e.tunStatusPub = eventbus.Publish[topics.TUNStatusChange](ec)
+ e.peerRecvActivityPub = eventbus.Publish[topics.PeerRecvActivity](ec)
+ go e.publishTUNSTatusLoop()
e.logf("Engine created.")
return e, nil
}
+func (e *userspaceEngine) publishTUNSTatusLoop() {
+ up := false
+ for event := range e.tundev.EventsUpDown() {
+ if event&tun.EventUp != 0 && !up {
+ e.logf("external route: up")
+ e.tunStatusPub.Publish(topics.TUNStatusChange{Up: true})
+ up = true
+ }
+ if event&tun.EventDown != 0 && up {
+ e.logf("external route: down")
+ e.tunStatusPub.Publish(topics.TUNStatusChange{Up: true})
+ up = false
+ }
+ }
+}
+
// echoRespondToAll is an inbound post-filter responding to all echo requests.
func echoRespondToAll(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) {
if p.IsEchoRequest() {
@@ -673,7 +668,7 @@ func (e *userspaceEngine) noteRecvActivity(nk key.NodePublic) {
// tailscaled alone did not, hence this.
if e.lastStatusPollTime.IsZero() || now.Sub(e.lastStatusPollTime) >= statusPollInterval {
e.lastStatusPollTime = now
- go e.RequestStatus()
+ e.peerRecvActivityPub.Publish(topics.PeerRecvActivity{PeerKey: nk})
}
// If the last activity time jumped a bunch (say, at least
@@ -940,10 +935,7 @@ func (e *userspaceEngine) ResetAndStop() (*Status, error) {
}
bo := backoff.NewBackoff("UserspaceEngineResetAndStop", e.logf, 1*time.Second)
for {
- st, err := e.getStatus()
- if err != nil {
- return nil, err
- }
+ st := e.GetStatus()
if len(st.Peers) == 0 && st.DERPs == 0 {
return st, nil
}
@@ -1180,18 +1172,6 @@ func (e *userspaceEngine) SetJailedFilter(filt *filter.Filter) {
e.tundev.SetJailedFilter(filt)
}
-func (e *userspaceEngine) SetStatusCallback(cb StatusCallback) {
- e.mu.Lock()
- defer e.mu.Unlock()
- e.statusCallback = cb
-}
-
-func (e *userspaceEngine) getStatusCallback() StatusCallback {
- e.mu.Lock()
- defer e.mu.Unlock()
- return e.statusCallback
-}
-
var ErrEngineClosing = errors.New("engine closing; no status")
func (e *userspaceEngine) PeerByKey(pubKey key.NodePublic) (_ wgint.Peer, ok bool) {
@@ -1221,7 +1201,13 @@ func (e *userspaceEngine) getPeerStatusLite(pk key.NodePublic) (status ipnstate.
return status, true
}
-func (e *userspaceEngine) getStatus() (*Status, error) {
+func (e *userspaceEngine) NumConfiguredPeers() int {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ return e.peerSequence.Len()
+}
+
+func (e *userspaceEngine) GetStatus() *Status {
// Grab derpConns before acquiring wgLock to not violate lock ordering;
// the DERPs method acquires magicsock.Conn.mu.
// (See comment in userspaceEngine's declaration.)
@@ -1230,60 +1216,22 @@ func (e *userspaceEngine) getStatus() (*Status, error) {
e.mu.Lock()
closing := e.closing
peerKeys := e.peerSequence
- localAddrs := slices.Clone(e.endpoints)
e.mu.Unlock()
if closing {
- return nil, ErrEngineClosing
+ return new(Status)
}
- peers := make([]ipnstate.PeerStatusLite, 0, peerKeys.Len())
+ st := &Status{
+ DERPs: derpConns,
+ Peers: make([]ipnstate.PeerStatusLite, 0, peerKeys.Len()),
+ }
for _, key := range peerKeys.All() {
if status, ok := e.getPeerStatusLite(key); ok {
- peers = append(peers, status)
+ st.Peers = append(st.Peers, status)
}
}
-
- return &Status{
- AsOf: time.Now(),
- LocalAddrs: localAddrs,
- Peers: peers,
- DERPs: derpConns,
- }, nil
-}
-
-func (e *userspaceEngine) RequestStatus() {
- // This is slightly tricky. e.getStatus() can theoretically get
- // blocked inside wireguard for a while, and RequestStatus() is
- // sometimes called from a goroutine, so we don't want a lot of
- // them hanging around. On the other hand, requesting multiple
- // status updates simultaneously is pointless anyway; they will
- // all say the same thing.
-
- // Enqueue at most one request. If one is in progress already, this
- // adds one more to the queue. If one has been requested but not
- // started, it is a no-op.
- select {
- case e.reqCh <- struct{}{}:
- default:
- }
-
- // Dequeue at most one request. Another thread may have already
- // dequeued the request we enqueued above, which is fine, since the
- // information is guaranteed to be at least as recent as the current
- // call to RequestStatus().
- select {
- case <-e.reqCh:
- s, err := e.getStatus()
- if s == nil && err == nil {
- e.logf("[unexpected] RequestStatus: both s and err are nil")
- return
- }
- if cb := e.getStatusCallback(); cb != nil {
- cb(s, err)
- }
- default:
- }
+ return st
}
func (e *userspaceEngine) Close() {
@@ -1388,11 +1336,7 @@ func (e *userspaceEngine) SetNetworkMap(nm *netmap.NetworkMap) {
}
func (e *userspaceEngine) UpdateStatus(sb *ipnstate.StatusBuilder) {
- st, err := e.getStatus()
- if err != nil {
- e.logf("wgengine: getStatus: %v", err)
- return
- }
+ st := e.GetStatus()
if sb.WantPeers {
for _, ps := range st.Peers {
sb.AddPeer(ps.NodeKey, &ipnstate.PeerStatus{