summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAnton Tolchanov <anton@tailscale.com>2023-10-25 21:41:24 +0100
committerAnton Tolchanov <anton@tailscale.com>2023-10-30 16:47:34 +0000
commit90c26fb09f2dd7f3fdbebb981f060a433a941314 (patch)
treec22936f50f53ce8a529ce23296985dc1281ca62c
parent03d4d859bf38ea553b72e0c213366659979a4b26 (diff)
downloadtailscale-knyar/derpmesh.tar.xz
tailscale-knyar/derpmesh.zip
derp/derphttp: add watch reconnection tests from #9719knyar/derpmesh
Co-authored-by: Val <valerie@tailscale.com> Signed-off-by: Anton Tolchanov <anton@tailscale.com>
-rw-r--r--derp/derphttp/derphttp_test.go191
-rw-r--r--derp/derphttp/mesh_client.go3
2 files changed, 193 insertions, 1 deletions
diff --git a/derp/derphttp/derphttp_test.go b/derp/derphttp/derphttp_test.go
index b121c097c..0ec14e0ee 100644
--- a/derp/derphttp/derphttp_test.go
+++ b/derp/derphttp/derphttp_test.go
@@ -9,6 +9,7 @@ import (
"crypto/tls"
"net"
"net/http"
+ "net/netip"
"sync"
"testing"
"time"
@@ -206,3 +207,193 @@ func TestPing(t *testing.T) {
t.Fatalf("Ping: %v", err)
}
}
+
+func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derp.Server) {
+ s = derp.NewServer(k, t.Logf)
+ httpsrv := &http.Server{
+ TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
+ Handler: Handler(s),
+ }
+
+ ln, err := net.Listen("tcp4", "localhost:0")
+ if err != nil {
+ t.Fatal(err)
+ }
+ serverURL = "http://" + ln.Addr().String()
+ s.SetMeshKey("1234")
+
+ go func() {
+ if err := httpsrv.Serve(ln); err != nil {
+ if err == http.ErrServerClosed {
+ t.Logf("server closed")
+ return
+ }
+ panic(err)
+ }
+ }()
+ return
+}
+
+func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToWatchURL string) (c *Client) {
+ c, err := NewClient(watcherPrivateKey, serverToWatchURL, t.Logf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ c.MeshKey = "1234"
+ return
+}
+
+// breakConnection breaks the connection, which should trigger a reconnect.
+func (c *Client) breakConnection(brokenClient *derp.Client) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.client != brokenClient {
+ return
+ }
+ if c.netConn != nil {
+ c.netConn.Close()
+ c.netConn = nil
+ }
+ c.client = nil
+}
+
+// Test that a watcher connection successfully reconnects and processes peer
+// updates after a different thread breaks and reconnects the connection, while
+// the watcher is waiting on recv().
+func TestBreakWatcherConnRecv(t *testing.T) {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ // Make the watcher server
+ serverPrivateKey1 := key.NewNode()
+ _, s1 := newTestServer(t, serverPrivateKey1)
+ defer s1.Close()
+
+ // Make the watched server
+ serverPrivateKey2 := key.NewNode()
+ serverURL2, s2 := newTestServer(t, serverPrivateKey2)
+ defer s2.Close()
+
+ // Make the watcher (but it is not connected yet)
+ watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2)
+ defer watcher1.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ watcherChan := make(chan int, 1)
+
+ // Set the wait time after a connection fails to much lower
+ origRetryInterval := retryInterval
+ retryInterval = 50 * time.Millisecond
+ defer func() { retryInterval = origRetryInterval }()
+
+ // Start the watcher thread (which connects to the watched server)
+ wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343
+ go func() {
+ defer wg.Done()
+ var peers int
+ add := func(k key.NodePublic, _ netip.AddrPort) {
+ t.Logf("add: %v", k.ShortString())
+ peers++
+ // Signal that the watcher has run
+ watcherChan <- peers
+ }
+ remove := func(k key.NodePublic) { t.Logf("remove: %v", k.ShortString()); peers-- }
+
+ watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove)
+ }()
+
+ timer := time.NewTimer(5 * time.Second)
+ defer timer.Stop()
+
+ // Wait for the watcher to run, then break the connection and check if it
+ // reconnected and received peer updates.
+ for i := 0; i < 10; i++ {
+ select {
+ case peers := <-watcherChan:
+ if peers != 1 {
+ t.Fatal("wrong number of peers added during watcher connection")
+ }
+ case <-timer.C:
+ t.Fatalf("watcher did not process the peer update")
+ }
+ watcher1.breakConnection(watcher1.client)
+ // re-establish connection by sending a packet
+ watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus"))
+
+ timer.Reset(5 * time.Second)
+ }
+}
+
+// Test that a watcher connection successfully reconnects and processes peer
+// updates after a different thread breaks and reconnects the connection, while
+// the watcher is not waiting on recv().
+func TestBreakWatcherConn(t *testing.T) {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ // Make the watcher server
+ serverPrivateKey1 := key.NewNode()
+ _, s1 := newTestServer(t, serverPrivateKey1)
+ defer s1.Close()
+
+ // Make the watched server
+ serverPrivateKey2 := key.NewNode()
+ serverURL2, s2 := newTestServer(t, serverPrivateKey2)
+ defer s2.Close()
+
+ // Make the watcher (but it is not connected yet)
+ watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2)
+ defer watcher1.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ watcherChan := make(chan int, 1)
+ breakerChan := make(chan bool, 1)
+
+ // Set the wait time after a connection fails to much lower
+ origRetryInterval := retryInterval
+ retryInterval = 50 * time.Millisecond
+ defer func() { retryInterval = origRetryInterval }()
+
+ // Start the watcher thread (which connects to the watched server)
+ wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343
+ go func() {
+ defer wg.Done()
+ var peers int
+ add := func(k key.NodePublic, _ netip.AddrPort) {
+ t.Logf("add: %v", k.ShortString())
+ peers++
+ // Signal that the watcher has run
+ watcherChan <- peers
+ // Wait for breaker to run
+ <-breakerChan
+ }
+ remove := func(k key.NodePublic) { t.Logf("remove: %v", k.ShortString()); peers-- }
+
+ watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove)
+ }()
+
+ timer := time.NewTimer(5 * time.Second)
+ defer timer.Stop()
+
+ // Wait for the watcher to run, then break the connection and check if it
+ // reconnected and received peer updates.
+ for i := 0; i < 10; i++ {
+ select {
+ case peers := <-watcherChan:
+ if peers != 1 {
+ t.Fatal("wrong number of peers added during watcher connection")
+ }
+ case <-timer.C:
+ t.Fatalf("watcher did not process the peer update")
+ }
+ watcher1.breakConnection(watcher1.client)
+ // re-establish connection by sending a packet
+ watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus"))
+ // signal that the breaker is done
+ breakerChan <- true
+
+ timer.Reset(5 * time.Second)
+ }
+}
diff --git a/derp/derphttp/mesh_client.go b/derp/derphttp/mesh_client.go
index 2793fd068..9e9e518e1 100644
--- a/derp/derphttp/mesh_client.go
+++ b/derp/derphttp/mesh_client.go
@@ -14,6 +14,8 @@ import (
"tailscale.com/types/logger"
)
+var retryInterval = 5 * time.Second
+
// RunWatchConnectionLoop loops until ctx is done, sending
// WatchConnectionChanges and subscribing to connection changes.
//
@@ -42,7 +44,6 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
infoLogf = logger.Discard
}
logf := c.logf
- const retryInterval = 5 * time.Second
const statusInterval = 10 * time.Second
var (
mu sync.Mutex