diff options
Diffstat (limited to 'tailsync/tailsyncimpl/session.go')
| -rw-r--r-- | tailsync/tailsyncimpl/session.go | 192 |
1 files changed, 184 insertions, 8 deletions
diff --git a/tailsync/tailsyncimpl/session.go b/tailsync/tailsyncimpl/session.go index 7297aed6f..487ac1ed0 100644 --- a/tailsync/tailsyncimpl/session.go +++ b/tailsync/tailsyncimpl/session.go @@ -6,6 +6,7 @@ package tailsyncimpl import ( "context" "fmt" + "net/http" "os" "path/filepath" "sync" @@ -25,6 +26,10 @@ type sessionRunner struct { root *tailsync.Root idx *index.Index + transport http.RoundTripper + peerURL tailsync.PeerURLFunc + client *syncClient + mu sync.RWMutex state tailsync.SessionState conflicts []tailsync.ConflictInfo @@ -37,16 +42,18 @@ type sessionRunner struct { done chan struct{} } -func newSessionRunner(logf logger.Logf, session *tailsync.Session, root *tailsync.Root) *sessionRunner { +func newSessionRunner(logf logger.Logf, session *tailsync.Session, root *tailsync.Root, transport http.RoundTripper, peerURL tailsync.PeerURLFunc) *sessionRunner { _, cancel := context.WithCancel(context.Background()) return &sessionRunner{ - logf: logf, - session: session, - root: root, - idx: index.New(logf), - state: tailsync.SessionStateIdle, - cancel: cancel, - done: make(chan struct{}), + logf: logf, + session: session, + root: root, + idx: index.New(logf), + transport: transport, + peerURL: peerURL, + state: tailsync.SessionStateIdle, + cancel: cancel, + done: make(chan struct{}), } } @@ -88,6 +95,14 @@ func (sr *sessionRunner) run() { sr.setState(tailsync.SessionStateIdle) sr.logf("tailsync: session %s: initial index built with %d files", sr.session.Name, sr.idx.Len()) + // Set up sync client if transport is available. + var lastPushedSeq uint64 + if sr.transport != nil && sr.peerURL != nil { + sr.client = newSyncClient(sr.logf, sr.transport, sr.peerURL, sr.session.PeerID, sr.session.RemoteRoot) + sr.initialReconcile() + lastPushedSeq = sr.idx.LocalSeq() + } + // Process events from watcher. ctx, cancel := context.WithCancel(context.Background()) sr.mu.Lock() @@ -97,6 +112,9 @@ func (sr *sessionRunner) run() { tombstoneTicker := time.NewTicker(1 * time.Hour) defer tombstoneTicker.Stop() + pullTicker := time.NewTicker(5 * time.Second) + defer pullTicker.Stop() + for { select { case <-ctx.Done(): @@ -106,6 +124,13 @@ func (sr *sessionRunner) run() { return } sr.handleEvents(events) + if sr.client != nil { + lastPushedSeq = sr.pushChanges(lastPushedSeq) + } + case <-pullTicker.C: + if sr.client != nil { + sr.pullRemoteChanges() + } case <-tombstoneTicker.C: if n := sr.idx.PurgeTombstones(); n > 0 { sr.logf("tailsync: session %s: purged %d tombstones", sr.session.Name, n) @@ -243,6 +268,157 @@ func (sr *sessionRunner) fullRescan() { } } +func (sr *sessionRunner) pushChanges(lastPushedSeq uint64) uint64 { + entries := sr.idx.ChangedSince(lastPushedSeq) + if len(entries) == 0 { + return lastPushedSeq + } + + if sr.session.Mode == tailsync.ModePull { + return sr.idx.LocalSeq() + } + + applied, err := sr.client.pushFiles(entries, sr.root.Path) + if err != nil { + sr.logf("tailsync: session %s: push error: %v", sr.session.Name, err) + return lastPushedSeq + } + + sr.mu.Lock() + sr.bytesSent += countBytes(entries) + sr.mu.Unlock() + + sr.logf("tailsync: session %s: pushed %d files", sr.session.Name, applied) + return sr.idx.LocalSeq() +} + +func (sr *sessionRunner) pullRemoteChanges() { + if sr.session.Mode == tailsync.ModePush { + return + } + + remoteSeq := sr.idx.RemoteSeq() + entries, err := sr.client.pullChanges(remoteSeq) + if err != nil { + sr.logf("tailsync: session %s: pull error: %v", sr.session.Name, err) + return + } + + for _, entry := range entries { + if entry.Deleted { + absPath := filepath.Join(sr.root.Path, entry.Path) + os.Remove(absPath) + sr.idx.ApplyRemote(entry) + continue + } + + body, _, err := sr.client.pullFile(entry.Path) + if err != nil { + sr.logf("tailsync: session %s: pull file %s error: %v", sr.session.Name, entry.Path, err) + continue + } + + absPath := filepath.Join(sr.root.Path, entry.Path) + mode := entry.Mode + if mode == 0 { + mode = 0o644 + } + err = fileWriter(absPath, body, mode) + body.Close() + if err != nil { + sr.logf("tailsync: session %s: write %s error: %v", sr.session.Name, entry.Path, err) + continue + } + + sr.idx.ApplyRemote(entry) + + sr.mu.Lock() + sr.bytesRecv += entry.Size + sr.mu.Unlock() + + if entry.Sequence > remoteSeq { + remoteSeq = entry.Sequence + } + } + + sr.idx.SetRemoteSeq(remoteSeq) + + if len(entries) > 0 { + sr.logf("tailsync: session %s: pulled %d files", sr.session.Name, len(entries)) + } +} + +func (sr *sessionRunner) initialReconcile() { + remoteEntries, remoteSeq, err := sr.client.getRemoteIndex() + if err != nil { + sr.logf("tailsync: session %s: initial reconcile: remote index error: %v (will use local-only)", sr.session.Name, err) + return + } + + sr.idx.SetRemoteSeq(remoteSeq) + + // Pull files we're missing from remote. + if sr.session.Mode != tailsync.ModePush { + for path, remoteEntry := range remoteEntries { + if remoteEntry.Deleted { + continue + } + localEntry := sr.idx.Get(path) + if localEntry == nil || localEntry.Hash != remoteEntry.Hash { + body, _, err := sr.client.pullFile(path) + if err != nil { + sr.logf("tailsync: session %s: initial pull %s: %v", sr.session.Name, path, err) + continue + } + absPath := filepath.Join(sr.root.Path, path) + mode := remoteEntry.Mode + if mode == 0 { + mode = 0o644 + } + err = fileWriter(absPath, body, mode) + body.Close() + if err != nil { + sr.logf("tailsync: session %s: initial write %s: %v", sr.session.Name, path, err) + continue + } + sr.idx.ApplyRemote(remoteEntry) + } + } + } + + // Push files remote is missing. + if sr.session.Mode != tailsync.ModePull { + var toPush []*tailsync.FileEntry + for path, localEntry := range sr.idx.Entries() { + if localEntry.Deleted { + continue + } + remoteEntry, exists := remoteEntries[path] + if !exists || remoteEntry.Hash != localEntry.Hash { + toPush = append(toPush, localEntry) + } + } + if len(toPush) > 0 { + applied, err := sr.client.pushFiles(toPush, sr.root.Path) + if err != nil { + sr.logf("tailsync: session %s: initial push error: %v", sr.session.Name, err) + } else { + sr.logf("tailsync: session %s: initial push: %d files", sr.session.Name, applied) + } + } + } +} + +func countBytes(entries []*tailsync.FileEntry) int64 { + var total int64 + for _, e := range entries { + if !e.Deleted { + total += e.Size + } + } + return total +} + func (sr *sessionRunner) status() *tailsync.SessionStatus { sr.mu.RLock() defer sr.mu.RUnlock() |
