summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorjulianknodt <julianknodt@gmail.com>2021-06-18 14:44:46 -0700
committerjulianknodt <julianknodt@gmail.com>2021-07-08 10:43:38 -0700
commit1bc7c0ce7616e1f22a3034377dd8e2bd622946fb (patch)
tree58aa5f97088fae39401888fc6378562fdf2f4092
parent944e967a734017215d5c2614f2674365cd7ae87d (diff)
downloadtailscale-jknodt/periodic_probe.tar.xz
tailscale-jknodt/periodic_probe.zip
Remove old portmapping codejknodt/periodic_probe
Signed-off-by: julianknodt <julianknodt@gmail.com>
-rw-r--r--net/portmapper/portmapper.go31
-rw-r--r--net/portmapper/portmapper_test.go6
-rw-r--r--net/portmapper/probe.go344
-rw-r--r--syncs/syncs.go59
-rw-r--r--wgengine/magicsock/magicsock.go23
5 files changed, 170 insertions, 293 deletions
diff --git a/net/portmapper/portmapper.go b/net/portmapper/portmapper.go
index d2deee536..938cd39d5 100644
--- a/net/portmapper/portmapper.go
+++ b/net/portmapper/portmapper.go
@@ -63,7 +63,7 @@ type Client struct {
localPort uint16
pmpMapping *pmpMapping // non-nil if we have a PMP mapping
- *Prober // non-nil once the prober has started
+ prober *Prober // non-nil once probe has been called.
}
// HaveMapping reports whether we have a current valid mapping.
@@ -131,6 +131,9 @@ func (c *Client) Close() error {
if c.closed {
return nil
}
+ if c.prober != nil {
+ c.prober.Close()
+ }
c.closed = true
c.invalidateMappingsLocked(true)
// TODO: close some future ever-listening UDP socket(s),
@@ -181,6 +184,24 @@ func (c *Client) invalidateMappingsLocked(releaseOld bool) {
c.uPnPSawTime = time.Time{}
}
+// Probe will assess the network for the presence of portmapping services.
+func (c *Client) Probe(ctx context.Context) (ProbeResult, error) {
+ c.mu.Lock()
+ if c.prober == nil {
+ c.initProberLocked(ctx)
+ }
+ c.mu.Unlock()
+ return c.prober.Complete()
+}
+
+func (c *Client) StartProbing() {
+ c.mu.Lock()
+ if c.prober == nil {
+ c.initProberLocked(context.Background())
+ }
+ c.mu.Unlock()
+}
+
func (c *Client) sawPMPRecently() bool {
c.mu.Lock()
defer c.mu.Unlock()
@@ -435,19 +456,25 @@ func parsePMPResponse(pkt []byte) (res pmpResponse, ok bool) {
return res, true
}
+// ProbeResults indicates which services are present after a probe.
+// The presense of services may change over time, so it represents the presense
+// of these items at a given time.
type ProbeResult struct {
PCP bool
PMP bool
UPnP bool
}
+// oldProbe is the old API for probing, retained in order to ensure back-compatibility.
+// It's currently used in TestProberEquivalent.
+//
// Probe returns a summary of which port mapping services are
// available on the network.
//
// If a probe has run recently and there haven't been any network changes since,
// the returned result might be server from the Client's cache, without
// sending any network traffic.
-func (c *Client) Probe(ctx context.Context) (res ProbeResult, err error) {
+func (c *Client) oldProbe(ctx context.Context) (res ProbeResult, err error) {
gw, myIP, ok := c.gatewayAndSelfIP()
if !ok {
return res, ErrGatewayNotFound
diff --git a/net/portmapper/portmapper_test.go b/net/portmapper/portmapper_test.go
index 995347fa9..7c5da438d 100644
--- a/net/portmapper/portmapper_test.go
+++ b/net/portmapper/portmapper_test.go
@@ -59,12 +59,12 @@ func TestProberEquivalent(t *testing.T) {
}
c := NewClient(t.Logf)
c.SetLocalPort(1234)
- res, err := c.Probe(context.Background())
+ res, err := c.oldProbe(context.Background())
if err != nil {
return
}
- proberRes, proberErr := c.NewProber(context.Background()).StatusBlock()
- if err == nil && proberErr != nil {
+ proberRes, proberErr := c.Probe(context.Background())
+ if proberErr != nil {
t.Errorf("prober returned err while probe did not: %v, %v", err, proberErr)
}
if res.PCP && !proberRes.PCP {
diff --git a/net/portmapper/probe.go b/net/portmapper/probe.go
index dca7647b8..83123884d 100644
--- a/net/portmapper/probe.go
+++ b/net/portmapper/probe.go
@@ -1,332 +1,114 @@
// Copyright (c) 2021 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+
package portmapper
import (
"context"
- "net"
- "sync"
"time"
- "go4.org/mem"
- "inet.af/netaddr"
- "tailscale.com/net/netns"
+ "tailscale.com/syncs"
)
+// Prober periodically pings the network and checks for port-mapping services.
type Prober struct {
- // pause signals the probe to either pause temporarily (true), or stop entirely (false)
- // to restart the probe, send another pause to it.
- pause chan<- bool
+ // stop will stop the prober
+ stop func()
- PMP *ProbeSubResult
- PCP *ProbeSubResult
- UPnP *ProbeSubResult
-}
+ // Each of the SubResults below is intended to expose whether a specific service is available
+ // for use on a client, and the most recent seen time. Should not be modified externally, and
+ // will be periodically updated.
-// NewProber creates a new prober for a given client. Should not be called concurrently.
-//
-// It is not currently the only method to probe the network, so that it can be tested for
-// compatibility with the prior method.
-func (c *Client) NewProber(ctx context.Context) (p *Prober) {
- if c.Prober != nil {
- return c.Prober
- }
- pause := make(chan bool)
- p = &Prober{
- pause: pause,
+ // PMP stores the result of probing pmp services and is populated by the prober.
+ PMP syncs.WaitableResult
+ // PCP stores the result of probing pcp services and is populated by the prober.
+ PCP syncs.WaitableResult
+ // UPnP stores the result of probing pcp services and is populated by the prober.
+ UPnP syncs.WaitableResult
+}
- PMP: NewProbeSubResult(),
- PCP: NewProbeSubResult(),
- UPnP: NewProbeSubResult(),
+// initProberLocked will start a prober if it does not exist on the given portmapping client.
+// The prober will run until the context terminates or stop is called, probing whether services
+// are available periodically. c.mu must be held.
+func (c *Client) initProberLocked(ctx context.Context) {
+ stop := make(chan struct{})
+ p := &Prober{
+ PMP: syncs.NewWaitableResult(),
+ PCP: syncs.NewWaitableResult(),
+ UPnP: syncs.NewWaitableResult(),
+ stop: func() { close(stop) },
}
- c.Prober = p
-
+ c.prober = p
go func() {
- defer p.PMP.Set(false, nil)
- defer p.PCP.Set(false, nil)
for {
- pmp_ctx, cancel := context.WithTimeout(ctx, portMapServiceTimeout)
- hasPCP, hasPMP, err := c.probePMPAndPCP(pmp_ctx)
- if err != nil {
- if ctx.Err() != nil {
- err = nil
- // the global context has passed, exit cleanly
- cancel()
- return
- }
- if pmp_ctx.Err() == context.DeadlineExceeded {
- err = nil
- }
- }
- cancel()
- p.PMP.Set(hasPMP, err)
- p.PCP.Set(hasPCP, err)
-
- t := time.NewTimer(trustServiceStillAvailableDuration * 3 / 4)
+ res, err := c.oldProbe(ctx)
+ p.PMP.Set(res.PMP, err)
+ p.PCP.Set(res.PCP, err)
+ p.UPnP.Set(res.UPnP, err)
select {
- case should_pause := <-pause:
- if !should_pause {
- t.Stop()
- return
- }
- restart := <-pause
- if !restart {
- t.Stop()
- return
- }
- case <-t.C: // break through and retry the connection
+ case <-time.After(trustServiceStillAvailableDuration * 3 / 4):
+ case <-ctx.Done():
+ return
+ case <-stop:
+ return
}
}
}()
-
- go func() {
- defer p.UPnP.Set(false, nil)
- for {
- upnp_ctx, cancel := context.WithTimeout(ctx, portMapServiceTimeout)
- hasUPnP, err := c.probeUPnP(upnp_ctx)
- if err != nil {
- if ctx.Err() != nil {
- // the global context has passed, exit cleanly
- cancel()
- return
- }
- if upnp_ctx.Err() == context.DeadlineExceeded {
- err = nil
- }
- }
- cancel()
- t := time.NewTimer(trustServiceStillAvailableDuration * 3 / 4)
- p.UPnP.Set(hasUPnP, err)
-
- select {
- case should_pause := <-pause:
- if !should_pause {
- t.Stop()
- return
- }
- restart := <-pause
- if !restart {
- t.Stop()
- return
- }
- case <-t.C: // break through and retry the connection
- }
- }
- }()
-
- return
}
-// Stop gracefully turns the Prober off, completing the current probes before exiting.
-func (p *Prober) Stop() { close(p.pause) }
-
-// Pauses the prober if currently running, or starts if it was previously paused.
-func (p *Prober) Toggle() { p.pause <- true }
+// Close gracefully turns the Prober off, completing the current probes before exiting.
+//
+// Calling stop Close multiple times will have no additional effects.
+func (p *Prober) Close() { p.stop() }
-// CurrentStatus returns the current results of the prober, regardless of whether they have
-// completed or not.
-func (p *Prober) CurrentStatus() (res ProbeResult, err error) {
- hasPMP, errPMP := p.PMP.PresentCurrent()
+// Current returns the current results of the prober, regardless of whether they have completed
+// or not. The returned probe result returns whether any of the services have been known to be
+// detected and if a value is true it will be available. If any of the services recently
+// returned an error due to inability to reach it, some failure of protocol, it will also be
+// returned, but if one of the probe results returned true it can still be used. Notably, it is
+// not an error to not yet have completed, or for a limited number of services to be available.
+func (p *Prober) Current() (ProbeResult, error) {
+ var res ProbeResult
+ _, hasPMP, errPMP := p.PMP.Peek()
res.PMP = hasPMP
- err = errPMP
+ err := errPMP
- hasUPnP, errUPnP := p.UPnP.PresentCurrent()
+ _, hasUPnP, errUPnP := p.UPnP.Peek()
res.UPnP = hasUPnP
if err == nil {
err = errUPnP
}
- hasPCP, errPCP := p.PCP.PresentCurrent()
+ _, hasPCP, errPCP := p.PCP.Peek()
res.PCP = hasPCP
if err == nil {
err = errPCP
}
- return
+ return res, err
}
-func (p *Prober) StatusBlock() (res ProbeResult, err error) {
- hasPMP, errPMP := p.PMP.PresentBlock()
+// Complete blocks the caller until probing all services has completed, regardless of success
+// or failure. It returns the result of probing each of UPnP, PMP, and PCP, and if there is an
+// error on any service, it will be returned. If any result is true, that service completed without
+// error and can be used.
+func (p *Prober) Complete() (ProbeResult, error) {
+ var res ProbeResult
+ hasPMP, errPMP := p.PMP.Get()
res.PMP = hasPMP
- err = errPMP
+ err := errPMP
- hasUPnP, errUPnP := p.UPnP.PresentBlock()
+ hasUPnP, errUPnP := p.UPnP.Get()
res.UPnP = hasUPnP
if err == nil {
err = errUPnP
}
- hasPCP, errPCP := p.PCP.PresentBlock()
+ hasPCP, errPCP := p.PCP.Get()
res.PCP = hasPCP
if err == nil {
err = errPCP
}
- return
-}
-
-type ProbeSubResult struct {
- cond *sync.Cond
- // If this probe has finished, regardless of success or failure
- completed bool
-
- // whether or not this feature is present
- present bool
- // most recent error
- err error
-
- // time we last saw it to be available.
- sawTime time.Time
-}
-
-func NewProbeSubResult() *ProbeSubResult {
- return &ProbeSubResult{
- cond: &sync.Cond{
- L: &sync.Mutex{},
- },
- }
-}
-
-// PresentBlock blocks until the probe completes, then returns the result.
-func (psr *ProbeSubResult) PresentBlock() (bool, error) {
- psr.cond.L.Lock()
- defer psr.cond.L.Unlock()
- for !psr.completed {
- psr.cond.Wait()
- }
- return psr.present, psr.err
-}
-
-// PresentCurrent returns the current state, regardless whether or not the probe has completed.
-func (psr *ProbeSubResult) PresentCurrent() (bool, error) {
- psr.cond.L.Lock()
- defer psr.cond.L.Unlock()
- present := psr.present && psr.sawTime.After(time.Now().Add(-trustServiceStillAvailableDuration))
- return present, psr.err
-}
-
-func (psr *ProbeSubResult) Set(present bool, err error) {
- saw := time.Now()
- psr.cond.L.Lock()
- psr.sawTime = saw
- psr.completed = true
- psr.err = err
- psr.present = present
- psr.cond.L.Unlock()
-
- psr.cond.Broadcast()
-}
-
-func (c *Client) probePMPAndPCP(ctx context.Context) (pcp bool, pmp bool, err error) {
- gw, myIP, ok := c.gatewayAndSelfIP()
- if !ok {
- return false, false, ErrGatewayNotFound
- }
-
- uc, err := netns.Listener().ListenPacket(ctx, "udp4", ":0")
- if err != nil {
- c.logf("ProbePCP/PMP: %v", err)
- return false, false, err
- }
- defer uc.Close()
- defer closeCloserOnContextDone(ctx, uc)()
-
- pcpAddr := netaddr.IPPortFrom(gw, pcpPort).UDPAddr()
- pmpAddr := netaddr.IPPortFrom(gw, pmpPort).UDPAddr()
-
- // Don't send probes to services that we recently learned (for
- // the same gw/myIP) are available. See
- // https://github.com/tailscale/tailscale/issues/1001
- if c.sawPMPRecently() {
- pmp = true
- } else {
- uc.WriteTo(pmpReqExternalAddrPacket, pmpAddr)
- }
- if c.sawPCPRecently() {
- pcp = true
- } else {
- uc.WriteTo(pcpAnnounceRequest(myIP), pcpAddr)
- }
-
- buf := make([]byte, 1500)
- pcpHeard := false // true when we get any PCP response
- for {
- if pcpHeard && pmp {
- // Nothing more to discover.
- return
- }
- n, _, err := uc.ReadFrom(buf)
- if err != nil {
- if ctx.Err() == context.DeadlineExceeded {
- err = nil
- }
- return pcp, pmp, err
- }
- if pres, ok := parsePCPResponse(buf[:n]); ok {
- if pres.OpCode == pcpOpReply|pcpOpAnnounce {
- pcpHeard = true
- //c.mu.Lock()
- //c.pcpSawTime = time.Now()
- //c.mu.Unlock()
- switch pres.ResultCode {
- case pcpCodeOK:
- c.logf("Got PCP response: epoch: %v", pres.Epoch)
- pcp = true
- continue
- case pcpCodeNotAuthorized:
- // A PCP service is running, but refuses to
- // provide port mapping services.
- pcp = false
- continue
- default:
- // Fall through to unexpected log line.
- }
- }
- c.logf("unexpected PCP probe response: %+v", pres)
- }
- if pres, ok := parsePMPResponse(buf[:n]); ok {
- if pres.OpCode == pmpOpReply|pmpOpMapPublicAddr && pres.ResultCode == pmpCodeOK {
- c.logf("Got PMP response; IP: %v, epoch: %v", pres.PublicAddr, pres.SecondsSinceEpoch)
- pmp = true
- c.mu.Lock()
- c.pmpPubIP = pres.PublicAddr
- c.pmpPubIPTime = time.Now()
- c.pmpLastEpoch = pres.SecondsSinceEpoch
- c.mu.Unlock()
- continue
- }
- c.logf("unexpected PMP probe response: %+v", pres)
- }
- }
-}
-
-func (c *Client) probeUPnP(ctx context.Context) (upnp bool, err error) {
- gw, _, ok := c.gatewayAndSelfIP()
- if !ok {
- return false, ErrGatewayNotFound
- }
- if c.sawUPnPRecently() {
- return true, nil
- }
- upnpAddr := netaddr.IPPortFrom(gw, upnpPort).UDPAddr()
- uc, err := netns.Listener().ListenPacket(ctx, "udp4", ":0")
- if deadline, ok := ctx.Deadline(); ok {
- uc.SetDeadline(deadline)
- }
- if err != nil {
- c.logf("ProbeUPnP: %v", err)
- return false, err
- }
- defer uc.Close()
- uc.WriteTo(uPnPPacket, upnpAddr)
- buf := make([]byte, 1500)
- n, _, err := uc.ReadFrom(buf)
- if err != nil {
- if err.(net.Error).Timeout() || err.(net.Error).Temporary() {
- err = nil
- }
- return false, err
- }
- upnp = mem.Contains(mem.B(buf[:n]), mem.S(":InternetGatewayDevice:"))
- return upnp, err
+ return res, err
}
diff --git a/syncs/syncs.go b/syncs/syncs.go
index 46861af63..c305ba550 100644
--- a/syncs/syncs.go
+++ b/syncs/syncs.go
@@ -7,7 +7,9 @@ package syncs
import (
"context"
+ "sync"
"sync/atomic"
+ "time"
)
// ClosedChan returns a channel that's already closed.
@@ -135,3 +137,60 @@ func (s Semaphore) TryAcquire() bool {
func (s Semaphore) Release() {
<-s.c
}
+
+// WaitableResult allows for blocking on a repeated, fallible operation until it completes,
+// and getting the result.
+type WaitableResult struct {
+ // sync.Cond.L guards all the fields below, and is used to wait until completed is true.
+ cond *sync.Cond
+ // Completed is set after the first operation has completed, and should be used in conjunction
+ // with `cond` above in order to block.
+ completed bool
+
+ result bool // result is whether or not the most recent operation succeeded or not.
+ err error // err indicates the most recent error during the operation.
+
+ // sawTime is the last time this result was updated.
+ sawTime time.Time
+}
+
+func NewWaitableResult() WaitableResult {
+ return WaitableResult{
+ cond: &sync.Cond{
+ L: &sync.Mutex{},
+ },
+ }
+}
+
+// Get blocks until an operation completes, then returns true if it was a success.
+// Otherwise, it returns returns false, with a possible error.
+func (wr *WaitableResult) Get() (bool, error) {
+ wr.cond.L.Lock()
+ defer wr.cond.L.Unlock()
+ for !wr.completed {
+ wr.cond.Wait()
+ }
+ return wr.result, wr.err
+}
+
+// Current returns the current state of the result without blocking, regardless of whether or
+// not it has completed, as well as the completion time of the operation.
+func (wr *WaitableResult) Peek() (time.Time, bool, error) {
+ wr.cond.L.Lock()
+ defer wr.cond.L.Unlock()
+ return wr.sawTime, wr.result, wr.err
+}
+
+// Set should be called when an operation has completed. It will unblock any items waiting for
+// the completed operation, and overwrite previous the results of previous operations.
+func (wr *WaitableResult) Set(result bool, err error) {
+ saw := time.Now()
+ wr.cond.L.Lock()
+ wr.sawTime = saw
+ wr.completed = true
+ wr.err = err
+ wr.result = result
+ wr.cond.L.Unlock()
+
+ wr.cond.Broadcast()
+}
diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go
index a2c200be2..1422b0e95 100644
--- a/wgengine/magicsock/magicsock.go
+++ b/wgengine/magicsock/magicsock.go
@@ -487,6 +487,7 @@ func NewConn(opts Options) (*Conn, error) {
c.simulatedNetwork = opts.SimulatedNetwork
c.disableLegacy = opts.DisableLegacyNetworking
c.portMapper = portmapper.NewClient(logger.WithPrefix(c.logf, "portmapper: "))
+ c.portMapper.StartProbing()
if opts.LinkMonitor != nil {
c.portMapper.SetGatewayLookupFunc(opts.LinkMonitor.GatewayAndSelfIP)
}
@@ -985,6 +986,15 @@ func (c *Conn) determineEndpoints(ctx context.Context) ([]tailcfg.Endpoint, erro
return nil, err
}
+ portmap := make(chan netaddr.IPPort, 1)
+ go func() {
+ if ext, err := c.portMapper.CreateOrGetMapping(ctx); err == nil {
+ portmap <- ext
+ } else if !portmapper.IsNoMappingError(err) {
+ c.logf("portmapper: %v", err)
+ }
+ }()
+
already := make(map[netaddr.IPPort]tailcfg.EndpointType) // endpoint -> how it was found
var eps []tailcfg.Endpoint // unique endpoints
@@ -1002,13 +1012,6 @@ func (c *Conn) determineEndpoints(ctx context.Context) ([]tailcfg.Endpoint, erro
}
}
- if ext, err := c.portMapper.CreateOrGetMapping(ctx); err == nil {
- addAddr(ext, tailcfg.EndpointPortmapped)
- c.setNetInfoHavePortMap()
- } else if !portmapper.IsNoMappingError(err) {
- c.logf("portmapper: %v", err)
- }
-
if nr.GlobalV4 != "" {
addAddr(ipp(nr.GlobalV4), tailcfg.EndpointSTUN)
@@ -1050,6 +1053,12 @@ func (c *Conn) determineEndpoints(ctx context.Context) ([]tailcfg.Endpoint, erro
// Do not offer addresses on other local interfaces.
addAddr(ipp(localAddr.String()), tailcfg.EndpointLocal)
}
+ select {
+ case ext := <-portmap:
+ addAddr(ext, tailcfg.EndpointPortmapped)
+ c.setNetInfoHavePortMap()
+ case <-time.After(200 * time.Millisecond):
+ }
// Note: the endpoints are intentionally returned in priority order,
// from "farthest but most reliable" to "closest but least