diff options
Diffstat (limited to 'wgengine/userspace.go')
| -rw-r--r-- | wgengine/userspace.go | 168 |
1 files changed, 56 insertions, 112 deletions
diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 8ad771fc5..9d7449336 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -15,7 +15,6 @@ import ( "net/netip" "reflect" "runtime" - "slices" "strings" "sync" "time" @@ -46,6 +45,7 @@ import ( "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/types/netmap" + "tailscale.com/types/topics" "tailscale.com/types/views" "tailscale.com/util/backoff" "tailscale.com/util/checkchange" @@ -95,8 +95,10 @@ const networkLoggerUploadTimeout = 5 * time.Second type userspaceEngine struct { // eventBus will eventually become required, but for now may be nil. - eventBus *eventbus.Bus - eventClient *eventbus.Client + eventBus *eventbus.Bus + eventClient *eventbus.Client + tunStatusPub *eventbus.Publisher[topics.TUNStatusChange] + peerRecvActivityPub *eventbus.Publisher[topics.PeerRecvActivity] logf logger.Logf wgLogger *wglog.Logger // a wireguard-go logging wrapper @@ -142,13 +144,12 @@ type userspaceEngine struct { lastStatusPollTime mono.Time // last time we polled the engine status reconfigureVPN func() error // or nil - mu sync.Mutex // guards following; see lock order comment below - netMap *netmap.NetworkMap // or nil - closing bool // Close was called (even if we're still closing) - statusCallback StatusCallback - peerSequence views.Slice[key.NodePublic] - endpoints []tailcfg.Endpoint - pendOpen map[flowtrackTuple]*pendingOpenFlow // see pendopen.go + mu sync.Mutex // guards following; see lock order comment below + netMap *netmap.NetworkMap // or nil + closing bool // Close was called (even if we're still closing) + peerSequence views.Slice[key.NodePublic] + endpoints []tailcfg.Endpoint + pendOpen map[flowtrackTuple]*pendingOpenFlow // see pendopen.go // pongCallback is the map of response handlers waiting for disco or TSMP // pong callbacks. The map key is a random slice of bytes. @@ -391,25 +392,16 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) logf("link state: %+v", e.netMon.InterfaceState()) - endpointsFn := func(endpoints []tailcfg.Endpoint) { - e.mu.Lock() - e.endpoints = append(e.endpoints[:0], endpoints...) - e.mu.Unlock() - - e.RequestStatus() - } magicsockOpts := magicsock.Options{ - EventBus: e.eventBus, - Logf: logf, - Port: conf.ListenPort, - EndpointsFunc: endpointsFn, - DERPActiveFunc: e.RequestStatus, - IdleFunc: e.tundev.IdleDuration, - NetMon: e.netMon, - HealthTracker: e.health, - Metrics: conf.Metrics, - ControlKnobs: conf.ControlKnobs, - PeerByKeyFunc: e.PeerByKey, + EventBus: e.eventBus, + Logf: logf, + Port: conf.ListenPort, + IdleFunc: e.tundev.IdleDuration, + NetMon: e.netMon, + HealthTracker: e.health, + Metrics: conf.Metrics, + ControlKnobs: conf.ControlKnobs, + PeerByKeyFunc: e.PeerByKey, } if buildfeatures.HasLazyWG { magicsockOpts.NoteRecvActivity = e.noteRecvActivity @@ -477,22 +469,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) }) go func() { - up := false - for event := range e.tundev.EventsUpDown() { - if event&tun.EventUp != 0 && !up { - e.logf("external route: up") - e.RequestStatus() - up = true - } - if event&tun.EventDown != 0 && up { - e.logf("external route: down") - e.RequestStatus() - up = false - } - } - }() - - go func() { select { case <-e.wgdev.Wait(): e.mu.Lock() @@ -547,10 +523,29 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) e.linkChange(&cd) }) e.eventClient = ec + e.tunStatusPub = eventbus.Publish[topics.TUNStatusChange](ec) + e.peerRecvActivityPub = eventbus.Publish[topics.PeerRecvActivity](ec) + go e.publishTUNSTatusLoop() e.logf("Engine created.") return e, nil } +func (e *userspaceEngine) publishTUNSTatusLoop() { + up := false + for event := range e.tundev.EventsUpDown() { + if event&tun.EventUp != 0 && !up { + e.logf("external route: up") + e.tunStatusPub.Publish(topics.TUNStatusChange{Up: true}) + up = true + } + if event&tun.EventDown != 0 && up { + e.logf("external route: down") + e.tunStatusPub.Publish(topics.TUNStatusChange{Up: true}) + up = false + } + } +} + // echoRespondToAll is an inbound post-filter responding to all echo requests. func echoRespondToAll(p *packet.Parsed, t *tstun.Wrapper, gro *gro.GRO) (filter.Response, *gro.GRO) { if p.IsEchoRequest() { @@ -673,7 +668,7 @@ func (e *userspaceEngine) noteRecvActivity(nk key.NodePublic) { // tailscaled alone did not, hence this. if e.lastStatusPollTime.IsZero() || now.Sub(e.lastStatusPollTime) >= statusPollInterval { e.lastStatusPollTime = now - go e.RequestStatus() + e.peerRecvActivityPub.Publish(topics.PeerRecvActivity{PeerKey: nk}) } // If the last activity time jumped a bunch (say, at least @@ -940,10 +935,7 @@ func (e *userspaceEngine) ResetAndStop() (*Status, error) { } bo := backoff.NewBackoff("UserspaceEngineResetAndStop", e.logf, 1*time.Second) for { - st, err := e.getStatus() - if err != nil { - return nil, err - } + st := e.GetStatus() if len(st.Peers) == 0 && st.DERPs == 0 { return st, nil } @@ -1180,18 +1172,6 @@ func (e *userspaceEngine) SetJailedFilter(filt *filter.Filter) { e.tundev.SetJailedFilter(filt) } -func (e *userspaceEngine) SetStatusCallback(cb StatusCallback) { - e.mu.Lock() - defer e.mu.Unlock() - e.statusCallback = cb -} - -func (e *userspaceEngine) getStatusCallback() StatusCallback { - e.mu.Lock() - defer e.mu.Unlock() - return e.statusCallback -} - var ErrEngineClosing = errors.New("engine closing; no status") func (e *userspaceEngine) PeerByKey(pubKey key.NodePublic) (_ wgint.Peer, ok bool) { @@ -1221,7 +1201,13 @@ func (e *userspaceEngine) getPeerStatusLite(pk key.NodePublic) (status ipnstate. return status, true } -func (e *userspaceEngine) getStatus() (*Status, error) { +func (e *userspaceEngine) NumConfiguredPeers() int { + e.mu.Lock() + defer e.mu.Unlock() + return e.peerSequence.Len() +} + +func (e *userspaceEngine) GetStatus() *Status { // Grab derpConns before acquiring wgLock to not violate lock ordering; // the DERPs method acquires magicsock.Conn.mu. // (See comment in userspaceEngine's declaration.) @@ -1230,60 +1216,22 @@ func (e *userspaceEngine) getStatus() (*Status, error) { e.mu.Lock() closing := e.closing peerKeys := e.peerSequence - localAddrs := slices.Clone(e.endpoints) e.mu.Unlock() if closing { - return nil, ErrEngineClosing + return new(Status) } - peers := make([]ipnstate.PeerStatusLite, 0, peerKeys.Len()) + st := &Status{ + DERPs: derpConns, + Peers: make([]ipnstate.PeerStatusLite, 0, peerKeys.Len()), + } for _, key := range peerKeys.All() { if status, ok := e.getPeerStatusLite(key); ok { - peers = append(peers, status) + st.Peers = append(st.Peers, status) } } - - return &Status{ - AsOf: time.Now(), - LocalAddrs: localAddrs, - Peers: peers, - DERPs: derpConns, - }, nil -} - -func (e *userspaceEngine) RequestStatus() { - // This is slightly tricky. e.getStatus() can theoretically get - // blocked inside wireguard for a while, and RequestStatus() is - // sometimes called from a goroutine, so we don't want a lot of - // them hanging around. On the other hand, requesting multiple - // status updates simultaneously is pointless anyway; they will - // all say the same thing. - - // Enqueue at most one request. If one is in progress already, this - // adds one more to the queue. If one has been requested but not - // started, it is a no-op. - select { - case e.reqCh <- struct{}{}: - default: - } - - // Dequeue at most one request. Another thread may have already - // dequeued the request we enqueued above, which is fine, since the - // information is guaranteed to be at least as recent as the current - // call to RequestStatus(). - select { - case <-e.reqCh: - s, err := e.getStatus() - if s == nil && err == nil { - e.logf("[unexpected] RequestStatus: both s and err are nil") - return - } - if cb := e.getStatusCallback(); cb != nil { - cb(s, err) - } - default: - } + return st } func (e *userspaceEngine) Close() { @@ -1388,11 +1336,7 @@ func (e *userspaceEngine) SetNetworkMap(nm *netmap.NetworkMap) { } func (e *userspaceEngine) UpdateStatus(sb *ipnstate.StatusBuilder) { - st, err := e.getStatus() - if err != nil { - e.logf("wgengine: getStatus: %v", err) - return - } + st := e.GetStatus() if sb.WantPeers { for _, ps := range st.Peers { sb.AddPeer(ps.NodeKey, &ipnstate.PeerStatus{ |
