summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJason O'Donnell <2160810+jasonodonnell@users.noreply.github.com>2026-03-07 09:32:06 -0500
committerJason O'Donnell <2160810+jasonodonnell@users.noreply.github.com>2026-03-07 09:32:06 -0500
commit9cce0baa43eddf3e75b31ffecf3649093a07e7a1 (patch)
tree83c304758b8acdfa2f38b38b3e6e133bdc644dbd
parent6e71a15e595a3461c719e5ede6581f9e7b46621f (diff)
downloadtailscale-jasonodonnell/sync.tar.xz
tailscale-jasonodonnell/sync.zip
-rw-r--r--ipn/ipnlocal/sync.go24
-rw-r--r--tailcfg/tailcfg.go4
-rw-r--r--tailsync/service.go7
-rw-r--r--tailsync/tailsyncimpl/service.go135
-rw-r--r--tailsync/tailsyncimpl/session.go192
-rw-r--r--tailsync/tailsyncimpl/syncclient.go186
6 files changed, 523 insertions, 25 deletions
diff --git a/ipn/ipnlocal/sync.go b/ipn/ipnlocal/sync.go
index 0daaa92bb..58bb156cf 100644
--- a/ipn/ipnlocal/sync.go
+++ b/ipn/ipnlocal/sync.go
@@ -10,6 +10,28 @@ import (
"tailscale.com/tailsync"
)
+// SyncSetTransport configures the sync service with the PeerAPI transport.
+func (b *LocalBackend) SyncSetTransport() {
+ svc, ok := b.sys.FileSync.GetOK()
+ if !ok {
+ return
+ }
+ transport := b.Dialer().PeerAPITransport()
+ peerURL := func(peerID string) string {
+ cn := b.currentNode()
+ for _, p := range cn.Peers() {
+ if string(p.StableID()) == peerID || p.DisplayName(false) == peerID || p.Name() == peerID {
+ base := cn.PeerAPIBase(p)
+ if base != "" {
+ return base
+ }
+ }
+ }
+ return ""
+ }
+ svc.SetTransport(transport, peerURL)
+}
+
// SyncSharingEnabled reports whether sharing sync roots via Tailsync is
// enabled. This is currently based on checking for the sync:share node
// attribute.
@@ -50,6 +72,8 @@ func (b *LocalBackend) SyncSetSession(session *tailsync.Session) error {
if !ok {
return tailsync.ErrSyncNotEnabled
}
+ // Ensure transport is configured before starting session.
+ b.SyncSetTransport()
return svc.SetSession(session)
}
diff --git a/tailcfg/tailcfg.go b/tailcfg/tailcfg.go
index c90a66f0f..c71ba728e 100644
--- a/tailcfg/tailcfg.go
+++ b/tailcfg/tailcfg.go
@@ -1555,10 +1555,10 @@ const (
// PeerCapabilityTailsync grants the ability for a peer to sync files
// with this node's exported sync roots.
- PeerCapabilityTailsync PeerCapability = "tailscale.com/cap/sync"
+ PeerCapabilityTailsync PeerCapability = "jasonodonnell.com/cap/tailsync"
// PeerCapabilityTailsyncSharer indicates that a peer has the ability to
// export sync roots to us.
- PeerCapabilityTailsyncSharer PeerCapability = "tailscale.com/cap/sync-sharer"
+ PeerCapabilityTailsyncSharer PeerCapability = "jasonodonnell.com/cap/tailsync-sharer"
// PeerCapabilityKubernetes grants a peer Kubernetes-specific
// capabilities, such as the ability to impersonate specific Tailscale
diff --git a/tailsync/service.go b/tailsync/service.go
index f1981079b..089517e8b 100644
--- a/tailsync/service.go
+++ b/tailsync/service.go
@@ -9,6 +9,9 @@ import (
"strings"
)
+// PeerURLFunc returns the PeerAPI base URL for a given peer ID (stable node ID or hostname).
+type PeerURLFunc func(peerID string) string
+
var (
ErrSyncNotEnabled = errors.New("tailsync not enabled")
ErrInvalidRootName = errors.New("root names may only contain the letters a-z, digits 0-9, underscore _, or hyphens -")
@@ -41,6 +44,10 @@ type Service interface {
// GetSessionStatus returns the status of a named session.
GetSessionStatus(name string) (*SessionStatus, error)
+ // SetTransport configures the HTTP transport and peer URL resolver
+ // used for reaching remote peers via PeerAPI.
+ SetTransport(transport http.RoundTripper, peerURL PeerURLFunc)
+
// ServeHTTPWithPerms handles incoming PeerAPI sync requests.
ServeHTTPWithPerms(permissions Permissions, w http.ResponseWriter, r *http.Request)
diff --git a/tailsync/tailsyncimpl/service.go b/tailsync/tailsyncimpl/service.go
index 83a78a7bc..f5ab30752 100644
--- a/tailsync/tailsyncimpl/service.go
+++ b/tailsync/tailsyncimpl/service.go
@@ -5,9 +5,12 @@
package tailsyncimpl
import (
+ "bytes"
"encoding/json"
"fmt"
"io"
+ "mime"
+ "mime/multipart"
"net/http"
"os"
"path/filepath"
@@ -25,10 +28,12 @@ const ConflictDir = ".tailsync-conflicts"
type Service struct {
logf logger.Logf
- mu sync.RWMutex
- roots map[string]*tailsync.Root
- sessions map[string]*sessionRunner
- closed bool
+ mu sync.RWMutex
+ roots map[string]*tailsync.Root
+ sessions map[string]*sessionRunner
+ closed bool
+ transport http.RoundTripper
+ peerURL tailsync.PeerURLFunc
}
// NewService creates a new tailsync Service.
@@ -43,6 +48,13 @@ func NewService(logf logger.Logf) *Service {
}
}
+func (s *Service) SetTransport(transport http.RoundTripper, peerURL tailsync.PeerURLFunc) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.transport = transport
+ s.peerURL = peerURL
+}
+
func (s *Service) SetRoot(root *tailsync.Root) error {
name, err := tailsync.NormalizeRootName(root.Name)
if err != nil {
@@ -131,7 +143,7 @@ func (s *Service) SetSession(session *tailsync.Session) error {
existing.stop()
}
- sr := newSessionRunner(s.logf, session, root)
+ sr := newSessionRunner(s.logf, session, root, s.transport, s.peerURL)
s.sessions[session.Name] = sr
go sr.run()
@@ -196,6 +208,8 @@ func (s *Service) ServeHTTPWithPerms(permissions tailsync.Permissions, w http.Re
s.handleRemotePush(permissions, w, r)
case "pull":
s.handleRemotePull(permissions, w, r)
+ case "file":
+ s.handleRemoteFile(permissions, w, r)
default:
http.Error(w, "unknown action", http.StatusNotFound)
}
@@ -247,12 +261,6 @@ func (s *Service) handleRemoteIndex(permissions tailsync.Permissions, w http.Res
w.Write(data)
}
-// pushRequest is the JSON payload for a push.
-type pushRequest struct {
- RootName string `json:"root"`
- Entries []*tailsync.FileEntry `json:"entries"`
-}
-
func (s *Service) handleRemotePush(permissions tailsync.Permissions, w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
@@ -273,14 +281,63 @@ func (s *Service) handleRemotePush(permissions tailsync.Permissions, w http.Resp
return
}
- var req pushRequest
- if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
- http.Error(w, err.Error(), http.StatusBadRequest)
+ contentType := r.Header.Get("Content-Type")
+ mediaType, params, err := mime.ParseMediaType(contentType)
+ if err != nil || !strings.HasPrefix(mediaType, "multipart/") {
+ http.Error(w, "expected multipart content", http.StatusBadRequest)
+ return
+ }
+
+ mr := multipart.NewReader(r.Body, params["boundary"])
+
+ // First part: JSON metadata.
+ metaPart, err := mr.NextPart()
+ if err != nil {
+ http.Error(w, "read metadata part: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+ var entries []*tailsync.FileEntry
+ if err := json.NewDecoder(metaPart).Decode(&entries); err != nil {
+ http.Error(w, "decode metadata: "+err.Error(), http.StatusBadRequest)
return
}
+ metaPart.Close()
+
+ // Build a map of paths that need file data.
+ needsData := make(map[string]bool)
+ for _, entry := range entries {
+ if !entry.Deleted && !entry.IsSymlink {
+ needsData[entry.Path] = true
+ }
+ }
+
+ // Read file data parts.
+ fileData := make(map[string][]byte)
+ for len(needsData) > 0 {
+ part, err := mr.NextPart()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ s.logf("tailsync: push: read file part: %v", err)
+ break
+ }
+ path := part.Header.Get("X-Tailsync-Path")
+ if path == "" {
+ path = part.FileName()
+ }
+ data, err := io.ReadAll(part)
+ part.Close()
+ if err != nil {
+ s.logf("tailsync: push: read file data for %s: %v", path, err)
+ continue
+ }
+ fileData[path] = data
+ delete(needsData, path)
+ }
applied := 0
- for _, entry := range req.Entries {
+ for _, entry := range entries {
if entry.Deleted {
absPath := filepath.Join(root.Path, entry.Path)
if err := os.Remove(absPath); err != nil && !os.IsNotExist(err) {
@@ -289,6 +346,24 @@ func (s *Service) handleRemotePush(permissions tailsync.Permissions, w http.Resp
applied++
continue
}
+ if entry.IsSymlink {
+ applied++
+ continue
+ }
+ data, ok := fileData[entry.Path]
+ if !ok {
+ s.logf("tailsync: push: missing file data for %s", entry.Path)
+ continue
+ }
+ absPath := filepath.Join(root.Path, entry.Path)
+ mode := entry.Mode
+ if mode == 0 {
+ mode = 0o644
+ }
+ if err := fileWriter(absPath, bytes.NewReader(data), mode); err != nil {
+ s.logf("tailsync: push: write %s: %v", entry.Path, err)
+ continue
+ }
applied++
}
@@ -327,6 +402,36 @@ func (s *Service) handleRemotePull(permissions tailsync.Permissions, w http.Resp
json.NewEncoder(w).Encode(entries)
}
+func (s *Service) handleRemoteFile(permissions tailsync.Permissions, w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+ rootName := r.URL.Query().Get("root")
+ if permissions.For(rootName) == tailsync.PermissionNone {
+ http.Error(w, "not found", http.StatusNotFound)
+ return
+ }
+ relPath := r.URL.Query().Get("path")
+ if relPath == "" {
+ http.Error(w, "missing path", http.StatusBadRequest)
+ return
+ }
+ s.mu.RLock()
+ root, ok := s.roots[rootName]
+ s.mu.RUnlock()
+ if !ok {
+ http.Error(w, "root not found", http.StatusNotFound)
+ return
+ }
+ absPath := filepath.Join(root.Path, filepath.FromSlash(relPath))
+ if !strings.HasPrefix(absPath, root.Path) {
+ http.Error(w, "invalid path", http.StatusBadRequest)
+ return
+ }
+ http.ServeFile(w, r, absPath)
+}
+
func (s *Service) findSessionForRoot(rootName string) *sessionRunner {
for _, sr := range s.sessions {
if sr.session.LocalRoot == rootName {
diff --git a/tailsync/tailsyncimpl/session.go b/tailsync/tailsyncimpl/session.go
index 7297aed6f..487ac1ed0 100644
--- a/tailsync/tailsyncimpl/session.go
+++ b/tailsync/tailsyncimpl/session.go
@@ -6,6 +6,7 @@ package tailsyncimpl
import (
"context"
"fmt"
+ "net/http"
"os"
"path/filepath"
"sync"
@@ -25,6 +26,10 @@ type sessionRunner struct {
root *tailsync.Root
idx *index.Index
+ transport http.RoundTripper
+ peerURL tailsync.PeerURLFunc
+ client *syncClient
+
mu sync.RWMutex
state tailsync.SessionState
conflicts []tailsync.ConflictInfo
@@ -37,16 +42,18 @@ type sessionRunner struct {
done chan struct{}
}
-func newSessionRunner(logf logger.Logf, session *tailsync.Session, root *tailsync.Root) *sessionRunner {
+func newSessionRunner(logf logger.Logf, session *tailsync.Session, root *tailsync.Root, transport http.RoundTripper, peerURL tailsync.PeerURLFunc) *sessionRunner {
_, cancel := context.WithCancel(context.Background())
return &sessionRunner{
- logf: logf,
- session: session,
- root: root,
- idx: index.New(logf),
- state: tailsync.SessionStateIdle,
- cancel: cancel,
- done: make(chan struct{}),
+ logf: logf,
+ session: session,
+ root: root,
+ idx: index.New(logf),
+ transport: transport,
+ peerURL: peerURL,
+ state: tailsync.SessionStateIdle,
+ cancel: cancel,
+ done: make(chan struct{}),
}
}
@@ -88,6 +95,14 @@ func (sr *sessionRunner) run() {
sr.setState(tailsync.SessionStateIdle)
sr.logf("tailsync: session %s: initial index built with %d files", sr.session.Name, sr.idx.Len())
+ // Set up sync client if transport is available.
+ var lastPushedSeq uint64
+ if sr.transport != nil && sr.peerURL != nil {
+ sr.client = newSyncClient(sr.logf, sr.transport, sr.peerURL, sr.session.PeerID, sr.session.RemoteRoot)
+ sr.initialReconcile()
+ lastPushedSeq = sr.idx.LocalSeq()
+ }
+
// Process events from watcher.
ctx, cancel := context.WithCancel(context.Background())
sr.mu.Lock()
@@ -97,6 +112,9 @@ func (sr *sessionRunner) run() {
tombstoneTicker := time.NewTicker(1 * time.Hour)
defer tombstoneTicker.Stop()
+ pullTicker := time.NewTicker(5 * time.Second)
+ defer pullTicker.Stop()
+
for {
select {
case <-ctx.Done():
@@ -106,6 +124,13 @@ func (sr *sessionRunner) run() {
return
}
sr.handleEvents(events)
+ if sr.client != nil {
+ lastPushedSeq = sr.pushChanges(lastPushedSeq)
+ }
+ case <-pullTicker.C:
+ if sr.client != nil {
+ sr.pullRemoteChanges()
+ }
case <-tombstoneTicker.C:
if n := sr.idx.PurgeTombstones(); n > 0 {
sr.logf("tailsync: session %s: purged %d tombstones", sr.session.Name, n)
@@ -243,6 +268,157 @@ func (sr *sessionRunner) fullRescan() {
}
}
+func (sr *sessionRunner) pushChanges(lastPushedSeq uint64) uint64 {
+ entries := sr.idx.ChangedSince(lastPushedSeq)
+ if len(entries) == 0 {
+ return lastPushedSeq
+ }
+
+ if sr.session.Mode == tailsync.ModePull {
+ return sr.idx.LocalSeq()
+ }
+
+ applied, err := sr.client.pushFiles(entries, sr.root.Path)
+ if err != nil {
+ sr.logf("tailsync: session %s: push error: %v", sr.session.Name, err)
+ return lastPushedSeq
+ }
+
+ sr.mu.Lock()
+ sr.bytesSent += countBytes(entries)
+ sr.mu.Unlock()
+
+ sr.logf("tailsync: session %s: pushed %d files", sr.session.Name, applied)
+ return sr.idx.LocalSeq()
+}
+
+func (sr *sessionRunner) pullRemoteChanges() {
+ if sr.session.Mode == tailsync.ModePush {
+ return
+ }
+
+ remoteSeq := sr.idx.RemoteSeq()
+ entries, err := sr.client.pullChanges(remoteSeq)
+ if err != nil {
+ sr.logf("tailsync: session %s: pull error: %v", sr.session.Name, err)
+ return
+ }
+
+ for _, entry := range entries {
+ if entry.Deleted {
+ absPath := filepath.Join(sr.root.Path, entry.Path)
+ os.Remove(absPath)
+ sr.idx.ApplyRemote(entry)
+ continue
+ }
+
+ body, _, err := sr.client.pullFile(entry.Path)
+ if err != nil {
+ sr.logf("tailsync: session %s: pull file %s error: %v", sr.session.Name, entry.Path, err)
+ continue
+ }
+
+ absPath := filepath.Join(sr.root.Path, entry.Path)
+ mode := entry.Mode
+ if mode == 0 {
+ mode = 0o644
+ }
+ err = fileWriter(absPath, body, mode)
+ body.Close()
+ if err != nil {
+ sr.logf("tailsync: session %s: write %s error: %v", sr.session.Name, entry.Path, err)
+ continue
+ }
+
+ sr.idx.ApplyRemote(entry)
+
+ sr.mu.Lock()
+ sr.bytesRecv += entry.Size
+ sr.mu.Unlock()
+
+ if entry.Sequence > remoteSeq {
+ remoteSeq = entry.Sequence
+ }
+ }
+
+ sr.idx.SetRemoteSeq(remoteSeq)
+
+ if len(entries) > 0 {
+ sr.logf("tailsync: session %s: pulled %d files", sr.session.Name, len(entries))
+ }
+}
+
+func (sr *sessionRunner) initialReconcile() {
+ remoteEntries, remoteSeq, err := sr.client.getRemoteIndex()
+ if err != nil {
+ sr.logf("tailsync: session %s: initial reconcile: remote index error: %v (will use local-only)", sr.session.Name, err)
+ return
+ }
+
+ sr.idx.SetRemoteSeq(remoteSeq)
+
+ // Pull files we're missing from remote.
+ if sr.session.Mode != tailsync.ModePush {
+ for path, remoteEntry := range remoteEntries {
+ if remoteEntry.Deleted {
+ continue
+ }
+ localEntry := sr.idx.Get(path)
+ if localEntry == nil || localEntry.Hash != remoteEntry.Hash {
+ body, _, err := sr.client.pullFile(path)
+ if err != nil {
+ sr.logf("tailsync: session %s: initial pull %s: %v", sr.session.Name, path, err)
+ continue
+ }
+ absPath := filepath.Join(sr.root.Path, path)
+ mode := remoteEntry.Mode
+ if mode == 0 {
+ mode = 0o644
+ }
+ err = fileWriter(absPath, body, mode)
+ body.Close()
+ if err != nil {
+ sr.logf("tailsync: session %s: initial write %s: %v", sr.session.Name, path, err)
+ continue
+ }
+ sr.idx.ApplyRemote(remoteEntry)
+ }
+ }
+ }
+
+ // Push files remote is missing.
+ if sr.session.Mode != tailsync.ModePull {
+ var toPush []*tailsync.FileEntry
+ for path, localEntry := range sr.idx.Entries() {
+ if localEntry.Deleted {
+ continue
+ }
+ remoteEntry, exists := remoteEntries[path]
+ if !exists || remoteEntry.Hash != localEntry.Hash {
+ toPush = append(toPush, localEntry)
+ }
+ }
+ if len(toPush) > 0 {
+ applied, err := sr.client.pushFiles(toPush, sr.root.Path)
+ if err != nil {
+ sr.logf("tailsync: session %s: initial push error: %v", sr.session.Name, err)
+ } else {
+ sr.logf("tailsync: session %s: initial push: %d files", sr.session.Name, applied)
+ }
+ }
+ }
+}
+
+func countBytes(entries []*tailsync.FileEntry) int64 {
+ var total int64
+ for _, e := range entries {
+ if !e.Deleted {
+ total += e.Size
+ }
+ }
+ return total
+}
+
func (sr *sessionRunner) status() *tailsync.SessionStatus {
sr.mu.RLock()
defer sr.mu.RUnlock()
diff --git a/tailsync/tailsyncimpl/syncclient.go b/tailsync/tailsyncimpl/syncclient.go
new file mode 100644
index 000000000..6b8c9d08a
--- /dev/null
+++ b/tailsync/tailsyncimpl/syncclient.go
@@ -0,0 +1,186 @@
+// Copyright (c) Tailscale Inc & contributors
+// SPDX-License-Identifier: BSD-3-Clause
+
+package tailsyncimpl
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "mime/multipart"
+ "net/http"
+ "net/textproto"
+ "os"
+ "path/filepath"
+ "strconv"
+
+ "tailscale.com/tailsync"
+ "tailscale.com/types/logger"
+)
+
+// syncClient handles network communication with a remote peer's tailsync PeerAPI.
+type syncClient struct {
+ logf logger.Logf
+ transport http.RoundTripper
+ peerURL tailsync.PeerURLFunc
+ peerID string
+ rootName string
+}
+
+func newSyncClient(logf logger.Logf, transport http.RoundTripper, peerURL tailsync.PeerURLFunc, peerID, rootName string) *syncClient {
+ return &syncClient{
+ logf: logf,
+ transport: transport,
+ peerURL: peerURL,
+ peerID: peerID,
+ rootName: rootName,
+ }
+}
+
+func (c *syncClient) baseURL() string {
+ return c.peerURL(c.peerID) + "/v0/sync"
+}
+
+func (c *syncClient) httpClient() *http.Client {
+ return &http.Client{Transport: c.transport}
+}
+
+// pushFiles sends changed files to the remote peer.
+// entries are the metadata, rootPath is the local root to read files from.
+func (c *syncClient) pushFiles(entries []*tailsync.FileEntry, rootPath string) (int, error) {
+ if len(entries) == 0 {
+ return 0, nil
+ }
+
+ // Build multipart body: first part is JSON metadata, subsequent parts are file data.
+ var buf bytes.Buffer
+ mw := multipart.NewWriter(&buf)
+
+ // Write metadata part.
+ metaHeader := make(textproto.MIMEHeader)
+ metaHeader.Set("Content-Type", "application/json")
+ metaHeader.Set("Content-Disposition", `form-data; name="metadata"`)
+ metaPart, err := mw.CreatePart(metaHeader)
+ if err != nil {
+ return 0, fmt.Errorf("create metadata part: %w", err)
+ }
+ if err := json.NewEncoder(metaPart).Encode(entries); err != nil {
+ return 0, fmt.Errorf("encode metadata: %w", err)
+ }
+
+ // Write file data parts for non-deleted entries.
+ for _, entry := range entries {
+ if entry.Deleted || entry.IsSymlink {
+ continue
+ }
+ fileHeader := make(textproto.MIMEHeader)
+ fileHeader.Set("Content-Type", "application/octet-stream")
+ fileHeader.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename=%q`, entry.Path))
+ fileHeader.Set("X-Tailsync-Path", entry.Path)
+ fileHeader.Set("X-Tailsync-Size", strconv.FormatInt(entry.Size, 10))
+ fileHeader.Set("X-Tailsync-Mode", strconv.FormatUint(uint64(entry.Mode), 8))
+
+ filePart, err := mw.CreatePart(fileHeader)
+ if err != nil {
+ return 0, fmt.Errorf("create file part for %s: %w", entry.Path, err)
+ }
+
+ absPath := filepath.Join(rootPath, entry.Path)
+ f, err := os.Open(absPath)
+ if err != nil {
+ c.logf("tailsync: push: skip %s: %v", entry.Path, err)
+ continue
+ }
+ _, err = io.Copy(filePart, f)
+ f.Close()
+ if err != nil {
+ return 0, fmt.Errorf("copy file %s: %w", entry.Path, err)
+ }
+ }
+
+ mw.Close()
+
+ url := c.baseURL() + "/push?root=" + c.rootName
+ req, err := http.NewRequest("POST", url, &buf)
+ if err != nil {
+ return 0, fmt.Errorf("create request: %w", err)
+ }
+ req.Header.Set("Content-Type", mw.FormDataContentType())
+
+ resp, err := c.httpClient().Do(req)
+ if err != nil {
+ return 0, fmt.Errorf("push request: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ return 0, fmt.Errorf("push failed: %s: %s", resp.Status, string(body))
+ }
+
+ var result struct {
+ Applied int `json:"applied"`
+ }
+ json.NewDecoder(resp.Body).Decode(&result)
+ return result.Applied, nil
+}
+
+// pullChanges fetches changed entries from the remote since the given sequence.
+func (c *syncClient) pullChanges(sinceSeq uint64) ([]*tailsync.FileEntry, error) {
+ url := fmt.Sprintf("%s/pull?root=%s&since=%d", c.baseURL(), c.rootName, sinceSeq)
+ resp, err := c.httpClient().Get(url)
+ if err != nil {
+ return nil, fmt.Errorf("pull request: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ return nil, fmt.Errorf("pull failed: %s: %s", resp.Status, string(body))
+ }
+
+ var entries []*tailsync.FileEntry
+ if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil {
+ return nil, fmt.Errorf("decode pull response: %w", err)
+ }
+ return entries, nil
+}
+
+// pullFile downloads a single file from the remote peer.
+func (c *syncClient) pullFile(relPath string) (io.ReadCloser, int64, error) {
+ url := fmt.Sprintf("%s/file?root=%s&path=%s", c.baseURL(), c.rootName, relPath)
+ resp, err := c.httpClient().Get(url)
+ if err != nil {
+ return nil, 0, fmt.Errorf("pull file request: %w", err)
+ }
+ if resp.StatusCode != http.StatusOK {
+ resp.Body.Close()
+ return nil, 0, fmt.Errorf("pull file %s: %s", relPath, resp.Status)
+ }
+ return resp.Body, resp.ContentLength, nil
+}
+
+// getRemoteIndex fetches the full index from the remote peer.
+func (c *syncClient) getRemoteIndex() (map[string]*tailsync.FileEntry, uint64, error) {
+ url := c.baseURL() + "/index?root=" + c.rootName
+ resp, err := c.httpClient().Get(url)
+ if err != nil {
+ return nil, 0, fmt.Errorf("index request: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ return nil, 0, fmt.Errorf("index failed: %s: %s", resp.Status, string(body))
+ }
+
+ var snap struct {
+ Entries map[string]*tailsync.FileEntry `json:"entries"`
+ LocalSeq uint64 `json:"localSeq"`
+ }
+ if err := json.NewDecoder(resp.Body).Decode(&snap); err != nil {
+ return nil, 0, fmt.Errorf("decode index: %w", err)
+ }
+ return snap.Entries, snap.LocalSeq, nil
+}