summaryrefslogtreecommitdiffhomepage
path: root/tailsync/tailsyncimpl/session.go
diff options
context:
space:
mode:
authorJason O'Donnell <2160810+jasonodonnell@users.noreply.github.com>2026-03-07 09:32:06 -0500
committerJason O'Donnell <2160810+jasonodonnell@users.noreply.github.com>2026-03-07 09:32:06 -0500
commit9cce0baa43eddf3e75b31ffecf3649093a07e7a1 (patch)
tree83c304758b8acdfa2f38b38b3e6e133bdc644dbd /tailsync/tailsyncimpl/session.go
parent6e71a15e595a3461c719e5ede6581f9e7b46621f (diff)
downloadtailscale-jasonodonnell/sync.tar.xz
tailscale-jasonodonnell/sync.zip
Diffstat (limited to 'tailsync/tailsyncimpl/session.go')
-rw-r--r--tailsync/tailsyncimpl/session.go192
1 files changed, 184 insertions, 8 deletions
diff --git a/tailsync/tailsyncimpl/session.go b/tailsync/tailsyncimpl/session.go
index 7297aed6f..487ac1ed0 100644
--- a/tailsync/tailsyncimpl/session.go
+++ b/tailsync/tailsyncimpl/session.go
@@ -6,6 +6,7 @@ package tailsyncimpl
import (
"context"
"fmt"
+ "net/http"
"os"
"path/filepath"
"sync"
@@ -25,6 +26,10 @@ type sessionRunner struct {
root *tailsync.Root
idx *index.Index
+ transport http.RoundTripper
+ peerURL tailsync.PeerURLFunc
+ client *syncClient
+
mu sync.RWMutex
state tailsync.SessionState
conflicts []tailsync.ConflictInfo
@@ -37,16 +42,18 @@ type sessionRunner struct {
done chan struct{}
}
-func newSessionRunner(logf logger.Logf, session *tailsync.Session, root *tailsync.Root) *sessionRunner {
+func newSessionRunner(logf logger.Logf, session *tailsync.Session, root *tailsync.Root, transport http.RoundTripper, peerURL tailsync.PeerURLFunc) *sessionRunner {
_, cancel := context.WithCancel(context.Background())
return &sessionRunner{
- logf: logf,
- session: session,
- root: root,
- idx: index.New(logf),
- state: tailsync.SessionStateIdle,
- cancel: cancel,
- done: make(chan struct{}),
+ logf: logf,
+ session: session,
+ root: root,
+ idx: index.New(logf),
+ transport: transport,
+ peerURL: peerURL,
+ state: tailsync.SessionStateIdle,
+ cancel: cancel,
+ done: make(chan struct{}),
}
}
@@ -88,6 +95,14 @@ func (sr *sessionRunner) run() {
sr.setState(tailsync.SessionStateIdle)
sr.logf("tailsync: session %s: initial index built with %d files", sr.session.Name, sr.idx.Len())
+ // Set up sync client if transport is available.
+ var lastPushedSeq uint64
+ if sr.transport != nil && sr.peerURL != nil {
+ sr.client = newSyncClient(sr.logf, sr.transport, sr.peerURL, sr.session.PeerID, sr.session.RemoteRoot)
+ sr.initialReconcile()
+ lastPushedSeq = sr.idx.LocalSeq()
+ }
+
// Process events from watcher.
ctx, cancel := context.WithCancel(context.Background())
sr.mu.Lock()
@@ -97,6 +112,9 @@ func (sr *sessionRunner) run() {
tombstoneTicker := time.NewTicker(1 * time.Hour)
defer tombstoneTicker.Stop()
+ pullTicker := time.NewTicker(5 * time.Second)
+ defer pullTicker.Stop()
+
for {
select {
case <-ctx.Done():
@@ -106,6 +124,13 @@ func (sr *sessionRunner) run() {
return
}
sr.handleEvents(events)
+ if sr.client != nil {
+ lastPushedSeq = sr.pushChanges(lastPushedSeq)
+ }
+ case <-pullTicker.C:
+ if sr.client != nil {
+ sr.pullRemoteChanges()
+ }
case <-tombstoneTicker.C:
if n := sr.idx.PurgeTombstones(); n > 0 {
sr.logf("tailsync: session %s: purged %d tombstones", sr.session.Name, n)
@@ -243,6 +268,157 @@ func (sr *sessionRunner) fullRescan() {
}
}
+func (sr *sessionRunner) pushChanges(lastPushedSeq uint64) uint64 {
+ entries := sr.idx.ChangedSince(lastPushedSeq)
+ if len(entries) == 0 {
+ return lastPushedSeq
+ }
+
+ if sr.session.Mode == tailsync.ModePull {
+ return sr.idx.LocalSeq()
+ }
+
+ applied, err := sr.client.pushFiles(entries, sr.root.Path)
+ if err != nil {
+ sr.logf("tailsync: session %s: push error: %v", sr.session.Name, err)
+ return lastPushedSeq
+ }
+
+ sr.mu.Lock()
+ sr.bytesSent += countBytes(entries)
+ sr.mu.Unlock()
+
+ sr.logf("tailsync: session %s: pushed %d files", sr.session.Name, applied)
+ return sr.idx.LocalSeq()
+}
+
+func (sr *sessionRunner) pullRemoteChanges() {
+ if sr.session.Mode == tailsync.ModePush {
+ return
+ }
+
+ remoteSeq := sr.idx.RemoteSeq()
+ entries, err := sr.client.pullChanges(remoteSeq)
+ if err != nil {
+ sr.logf("tailsync: session %s: pull error: %v", sr.session.Name, err)
+ return
+ }
+
+ for _, entry := range entries {
+ if entry.Deleted {
+ absPath := filepath.Join(sr.root.Path, entry.Path)
+ os.Remove(absPath)
+ sr.idx.ApplyRemote(entry)
+ continue
+ }
+
+ body, _, err := sr.client.pullFile(entry.Path)
+ if err != nil {
+ sr.logf("tailsync: session %s: pull file %s error: %v", sr.session.Name, entry.Path, err)
+ continue
+ }
+
+ absPath := filepath.Join(sr.root.Path, entry.Path)
+ mode := entry.Mode
+ if mode == 0 {
+ mode = 0o644
+ }
+ err = fileWriter(absPath, body, mode)
+ body.Close()
+ if err != nil {
+ sr.logf("tailsync: session %s: write %s error: %v", sr.session.Name, entry.Path, err)
+ continue
+ }
+
+ sr.idx.ApplyRemote(entry)
+
+ sr.mu.Lock()
+ sr.bytesRecv += entry.Size
+ sr.mu.Unlock()
+
+ if entry.Sequence > remoteSeq {
+ remoteSeq = entry.Sequence
+ }
+ }
+
+ sr.idx.SetRemoteSeq(remoteSeq)
+
+ if len(entries) > 0 {
+ sr.logf("tailsync: session %s: pulled %d files", sr.session.Name, len(entries))
+ }
+}
+
+func (sr *sessionRunner) initialReconcile() {
+ remoteEntries, remoteSeq, err := sr.client.getRemoteIndex()
+ if err != nil {
+ sr.logf("tailsync: session %s: initial reconcile: remote index error: %v (will use local-only)", sr.session.Name, err)
+ return
+ }
+
+ sr.idx.SetRemoteSeq(remoteSeq)
+
+ // Pull files we're missing from remote.
+ if sr.session.Mode != tailsync.ModePush {
+ for path, remoteEntry := range remoteEntries {
+ if remoteEntry.Deleted {
+ continue
+ }
+ localEntry := sr.idx.Get(path)
+ if localEntry == nil || localEntry.Hash != remoteEntry.Hash {
+ body, _, err := sr.client.pullFile(path)
+ if err != nil {
+ sr.logf("tailsync: session %s: initial pull %s: %v", sr.session.Name, path, err)
+ continue
+ }
+ absPath := filepath.Join(sr.root.Path, path)
+ mode := remoteEntry.Mode
+ if mode == 0 {
+ mode = 0o644
+ }
+ err = fileWriter(absPath, body, mode)
+ body.Close()
+ if err != nil {
+ sr.logf("tailsync: session %s: initial write %s: %v", sr.session.Name, path, err)
+ continue
+ }
+ sr.idx.ApplyRemote(remoteEntry)
+ }
+ }
+ }
+
+ // Push files remote is missing.
+ if sr.session.Mode != tailsync.ModePull {
+ var toPush []*tailsync.FileEntry
+ for path, localEntry := range sr.idx.Entries() {
+ if localEntry.Deleted {
+ continue
+ }
+ remoteEntry, exists := remoteEntries[path]
+ if !exists || remoteEntry.Hash != localEntry.Hash {
+ toPush = append(toPush, localEntry)
+ }
+ }
+ if len(toPush) > 0 {
+ applied, err := sr.client.pushFiles(toPush, sr.root.Path)
+ if err != nil {
+ sr.logf("tailsync: session %s: initial push error: %v", sr.session.Name, err)
+ } else {
+ sr.logf("tailsync: session %s: initial push: %d files", sr.session.Name, applied)
+ }
+ }
+ }
+}
+
+func countBytes(entries []*tailsync.FileEntry) int64 {
+ var total int64
+ for _, e := range entries {
+ if !e.Deleted {
+ total += e.Size
+ }
+ }
+ return total
+}
+
func (sr *sessionRunner) status() *tailsync.SessionStatus {
sr.mu.RLock()
defer sr.mu.RUnlock()