diff options
| author | Andrew Dunham <andrew@tailscale.com> | 2022-10-24 19:18:47 -0400 |
|---|---|---|
| committer | Andrew Dunham <andrew@tailscale.com> | 2022-10-24 19:29:36 -0400 |
| commit | d608fcd7ca6e886eee2d5edadd8a189d9c37d438 (patch) | |
| tree | 2a954c6b233c3bc3efb19da573d6e243f197d51a /wgengine/monitor/monitor.go | |
| parent | 9f39c3b10f7f0f9b995c396da7a0c84ad62b53be (diff) | |
| download | tailscale-andrew/monitor-link-change.tar.xz tailscale-andrew/monitor-link-change.zip | |
wgengine/monitor: add monitor for link change eventsandrew/monitor-link-change
Change-Id: I45100c7a5b785ad6824080fe0a38751e3d246eaa
Signed-off-by: Andrew Dunham <andrew@tailscale.com>
Diffstat (limited to 'wgengine/monitor/monitor.go')
| -rw-r--r-- | wgengine/monitor/monitor.go | 98 |
1 files changed, 80 insertions, 18 deletions
diff --git a/wgengine/monitor/monitor.go b/wgengine/monitor/monitor.go index ba099196d..c3a9804aa 100644 --- a/wgengine/monitor/monitor.go +++ b/wgengine/monitor/monitor.go @@ -10,13 +10,16 @@ package monitor import ( "encoding/json" "errors" + "net" "net/netip" "runtime" "sync" "time" + "golang.org/x/exp/slices" "tailscale.com/net/interfaces" "tailscale.com/types/logger" + "tailscale.com/util/mak" ) // pollWallTimeInterval is how often we check the time to check @@ -64,19 +67,21 @@ type Mon struct { change chan struct{} stop chan struct{} // closed on Stop - mu sync.Mutex // guards all following fields - cbs map[*callbackHandle]ChangeFunc - ruleDelCB map[*callbackHandle]RuleDeleteCallback - ifState *interfaces.State - gwValid bool // whether gw and gwSelfIP are valid - gw netip.Addr // our gateway's IP - gwSelfIP netip.Addr // our own IP address (that corresponds to gw) - started bool - closed bool - goroutines sync.WaitGroup - wallTimer *time.Timer // nil until Started; re-armed AfterFunc per tick - lastWall time.Time - timeJumped bool // whether we need to send a changed=true after a big time jump + mu sync.Mutex // guards all following fields + cbs map[*callbackHandle]ChangeFunc + ruleDelCB map[*callbackHandle]RuleDeleteCallback + linkChangedCB map[*callbackHandle]LinkChangedCallback + ifState *interfaces.State + ifState2 []*net.Interface + gwValid bool // whether gw and gwSelfIP are valid + gw netip.Addr // our gateway's IP + gwSelfIP netip.Addr // our own IP address (that corresponds to gw) + started bool + closed bool + goroutines sync.WaitGroup + wallTimer *time.Timer // nil until Started; re-armed AfterFunc per tick + lastWall time.Time + timeJumped bool // whether we need to send a changed=true after a big time jump } // New instantiates and starts a monitoring instance. @@ -179,6 +184,26 @@ func (m *Mon) RegisterRuleDeleteCallback(callback RuleDeleteCallback) (unregiste } } +// LinkChangedCallback is a callback when a network link changes. +type LinkChangedCallback func(iif *net.Interface, deleted bool) + +// RegisterLinkChangedCallback adds a callback to the set of parties to be +// notified (in their own goroutine) whenever a link (a.k.a. "interface") is +// changed. +// +// To remove this callback, call unregister or close the monitor. +func (m *Mon) RegisterLinkChangedCallback(callback LinkChangedCallback) (unregister func()) { + handle := new(callbackHandle) + m.mu.Lock() + defer m.mu.Unlock() + mak.Set(&m.linkChangedCB, handle, callback) + return func() { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.linkChangedCB, handle) + } +} + // Start starts the monitor. // A monitor can only be started & closed once. func (m *Mon) Start() { @@ -256,6 +281,7 @@ func (m *Mon) stopped() bool { // the change channel of changes, and stopping when a stop is issued. func (m *Mon) pump() { defer m.goroutines.Done() +pumpLoop: for !m.stopped() { msg, err := m.om.Receive() if err != nil { @@ -265,14 +291,18 @@ func (m *Mon) pump() { // Keep retrying while we're not closed. m.logf("error from link monitor: %v", err) time.Sleep(time.Second) - continue + continue pumpLoop } - if rdm, ok := msg.(ipRuleDeletedMessage); ok { - m.notifyRuleDeleted(rdm) - continue + switch v := msg.(type) { + case ipRuleDeletedMessage: + m.notifyRuleDeleted(v) + continue pumpLoop + case newLinkMessage: + m.notifyLinkChanged(v) + continue pumpLoop } if msg.ignore() { - continue + continue pumpLoop } m.InjectEvent() } @@ -286,6 +316,38 @@ func (m *Mon) notifyRuleDeleted(rdm ipRuleDeletedMessage) { } } +func (m *Mon) notifyLinkChanged(nlm newLinkMessage) { + m.mu.Lock() + defer m.mu.Unlock() + for _, cb := range m.linkChangedCB { + go cb(nlm.Link, nlm.Delete) + } + + // Update our cached state + updated := false + for i, iif := range m.ifState2 { + if iif.Index == nlm.Link.Index { + if nlm.Delete { + m.ifState2 = slices.Delete(m.ifState2, i, i) + } else { + m.ifState2[i] = nlm.Link + } + updated = true + break + } + } + if updated { + return + } + + // Need to append + // TODO(andrew): insert sorted instead of insert then sort? + m.ifState2 = append(m.ifState2, nlm.Link) + slices.SortFunc(m.ifState2, func(x, y *net.Interface) bool { + return x.Index < y.Index + }) +} + // isInterestingInterface reports whether the provided interface should be // considered when checking for network state changes. // The ips parameter should be the IPs of the provided interface. |
