summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--logtail/iopipe/ephemeral.go115
-rw-r--r--logtail/iopipe/iopipe.go98
-rw-r--r--logtail/iopipe/iopipe_test.go240
-rw-r--r--logtail/iopipe/persistent.go520
4 files changed, 973 insertions, 0 deletions
diff --git a/logtail/iopipe/ephemeral.go b/logtail/iopipe/ephemeral.go
new file mode 100644
index 000000000..f87bd60ae
--- /dev/null
+++ b/logtail/iopipe/ephemeral.go
@@ -0,0 +1,115 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package iopipe
+
+import (
+ "cmp"
+ "io"
+ "sync"
+)
+
+// EphemeralBuffer in an in-memory implementation of [Buffer].
+// The zero value is an empty buffer ready for use.
+type EphemeralBuffer struct {
+ mu sync.Mutex
+ buf []byte // unread data is in buf[idx:]
+ idx int
+ waiter chan struct{}
+}
+
+// Len reports the size of the buffer,
+// which is the number of written, but unread bytes.
+func (b *EphemeralBuffer) Len() int64 {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ return int64(len(b.buf[b.idx:]))
+}
+
+// Write writes data to the end of the buffer,
+// incrementing Len by the amount of bytes written.
+func (b *EphemeralBuffer) Write(p []byte) (int, error) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ b.buf = append(b.buf, p...)
+
+ // Check if there are any waiters to wake up.
+ if len(p) > 0 && b.waiter != nil {
+ close(b.waiter)
+ b.waiter = nil
+ }
+ return len(p), nil
+}
+
+// Read reads data from the front of the buffer,
+// decrementing Len by the amount of bytes read.
+// When the buffer is empty, it returns [io.EOF].
+func (b *EphemeralBuffer) Read(p []byte) (int, error) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ p2, peekErr := b.peekLocked(len(p))
+ n, discErr := b.discardLocked(copy(p, p2))
+ return n, cmp.Or(discErr, peekErr)
+}
+
+// Peek peeks n bytes from the front of the buffer.
+// The buffer is only valid until the next Read, Peek, or Discard call.
+// It reports an error if the buffer length is less than n.
+func (b *EphemeralBuffer) Peek(n int) ([]byte, error) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ return b.peekLocked(n)
+}
+
+// Discard discards n bytes from the front of the buffer,
+// decrementing Len by the amount of bytes discarded.
+// It reports an error if the number of discard bytes is less than n.
+func (b *EphemeralBuffer) Discard(n int) (int, error) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ return b.discardLocked(n)
+}
+
+// peekLocked implements Peek while mu is already held.
+func (b *EphemeralBuffer) peekLocked(n int) ([]byte, error) {
+ switch data := b.buf[b.idx:]; {
+ case n < 0:
+ return nil, wrapError("peek", errNegative)
+ case n <= len(data):
+ return data[:n], nil
+ default:
+ return data, io.EOF
+ }
+}
+
+// discardLocked implements Discard while mu is already held.
+func (b *EphemeralBuffer) discardLocked(n int) (int, error) {
+ // Use peek to determine the available bytes to discard
+ // and discard by incrementing idx.
+ p, err := b.peekLocked(n)
+ err = wrapError("discard", err) // remains nil if already nil
+ b.idx += len(p)
+
+ // If enough of the buffer has already been read,
+ // then move the data to the front.
+ if b.idx > len(b.buf)/2 { // more than half the buffer is already read
+ // TODO: Allow shrinking the buffer if unused enough?
+ m := copy(b.buf[:cap(b.buf)], b.buf[b.idx:]) // copy data to the front
+ b.buf = b.buf[:m]
+ b.idx = 0
+ }
+
+ return len(p), err
+}
+
+// Wait returns channel that is closed when the buffer is non-empty.
+func (b *EphemeralBuffer) Wait() <-chan struct{} {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ if len(b.buf[b.idx:]) > 0 {
+ return alreadyClosed // data is available
+ } else if b.waiter == nil {
+ b.waiter = make(chan struct{})
+ }
+ return b.waiter
+}
diff --git a/logtail/iopipe/iopipe.go b/logtail/iopipe/iopipe.go
new file mode 100644
index 000000000..e4573400e
--- /dev/null
+++ b/logtail/iopipe/iopipe.go
@@ -0,0 +1,98 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+// Package iopipe provides a ring buffer for writing and reading bytes.
+package iopipe
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+)
+
+// Buffer is a ring buffer semantically similar to a [bytes.Buffer].
+// It is an infinitely sized buffer, so it is the application's
+// responsibility to drain and/or avoid writing if it is too full.
+// It does not provide any form of message framing,
+// which is the responsibility of the application logic.
+// All methods must be safe for concurrent use.
+type Buffer interface {
+ // Len reports the size of the buffer,
+ // which is the number of written, but unread bytes.
+ Len() int64
+
+ // Write writes data to the end of the buffer,
+ // incrementing Len by the amount of bytes written.
+ // Concurrent Write calls are atomically performed.
+ // Write does not block.
+ Write([]byte) (int, error)
+
+ // Read reads data from the front of the buffer,
+ // decrementing Len by the amount of bytes read.
+ // It cannot read partially written data for a concurrent Write call.
+ // Rather than blocking, it returns [io.EOF] when the buffer is empty.
+ Read([]byte) (int, error)
+
+ // Peek peeks n bytes from the front of the buffer
+ // without affecting the read offset or changing the Len.
+ // It cannot peek partially written data for a concurrent Write call.
+ // The buffer is only valid until the next Read, Peek, or Discard call.
+ // It reports an error if the buffer length is less than n.
+ // If n is greater than Len, then the error is usually [io.EOF].
+ Peek(n int) ([]byte, error)
+
+ // Discard discards n bytes from the front of the buffer,
+ // decrementing Len by the amount of bytes discarded.
+ // It reports an error if the number of discard bytes is less than n.
+ Discard(n int) (int, error)
+
+ // Wait returns channel that is closed when the buffer is non-empty.
+ Wait() <-chan struct{}
+}
+
+var alreadyClosed = func() chan struct{} {
+ c := make(chan struct{})
+ close(c)
+ return c
+}()
+
+var (
+ _ bytes.Buffer // for godoc hot-linking
+
+ // Statically verify concrete implementations against interface.
+ _ Buffer = (*PersistentBuffer)(nil)
+ _ Buffer = (*EphemeralBuffer)(nil)
+)
+
+var (
+ errClosed = errors.New("closed buffer")
+ errNegative = errors.New("negative count")
+)
+
+type iopipeError struct {
+ op string
+ err error
+}
+
+func wrapError(op string, err error) error {
+ if err == nil || err == io.EOF {
+ return err
+ }
+ if e, ok := err.(*iopipeError); ok {
+ err = e.err // avoid double wrapping
+ }
+ return &iopipeError{op: op, err: err}
+}
+
+func (e *iopipeError) Error() string {
+ if e.op == "" {
+ return fmt.Sprintf("iopipe: %v", e.err)
+ } else {
+ return fmt.Sprintf("iopipe %s: %v", e.op, e.err)
+ }
+}
+
+func (e *iopipeError) Unwrap() error {
+ return e.err
+}
diff --git a/logtail/iopipe/iopipe_test.go b/logtail/iopipe/iopipe_test.go
new file mode 100644
index 000000000..9b864c317
--- /dev/null
+++ b/logtail/iopipe/iopipe_test.go
@@ -0,0 +1,240 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package iopipe
+
+import (
+ "bytes"
+ "encoding/binary"
+ "flag"
+ "io"
+ "io/fs"
+ "math/rand/v2"
+ "os"
+ "path/filepath"
+ "runtime"
+ "slices"
+ "sync"
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "tailscale.com/types/bools"
+ "tailscale.com/util/must"
+)
+
+// testFile implements [file], but allows override methods of [osFile].
+type testFile struct {
+ file
+
+ stat func() (fs.FileInfo, error)
+ writeAt func([]byte, int64) (int, error)
+ readAt func([]byte, int64) (int, error)
+ truncate func(int64) error
+ close func() error
+}
+
+func (f testFile) Stat() (fs.FileInfo, error) {
+ return bools.IfElse(f.stat != nil, f.stat, f.file.Stat)()
+}
+func (f testFile) WriteAt(b []byte, p int64) (int, error) {
+ return bools.IfElse(f.writeAt != nil, f.writeAt, f.file.WriteAt)(b, p)
+}
+func (f testFile) ReadAt(b []byte, p int64) (int, error) {
+ return bools.IfElse(f.readAt != nil, f.readAt, f.file.ReadAt)(b, p)
+}
+func (f testFile) Truncate(len int64) error {
+ return bools.IfElse(f.truncate != nil, f.truncate, f.file.Truncate)(len)
+}
+func (f testFile) Close() error {
+ return bools.IfElse(f.close != nil, f.close, f.file.Close)()
+}
+
+func mustOpenPersistent(t *testing.T) *PersistentBuffer {
+ fp := filepath.Join(t.TempDir(), "file")
+ var f file = must.Get(os.OpenFile(fp, os.O_RDWR|os.O_CREATE, 0600))
+ if testing.Verbose() {
+ f0 := f
+ f = testFile{file: f0,
+ writeAt: func(b []byte, p int64) (int, error) {
+ n, err := f0.WriteAt(b, p)
+ if n != len(b) || err != nil {
+ t.Logf("WriteAt(pos:%d, len:%d) = (%v, %v)", p, len(b), n, err)
+ } else if uint64(len(b)) != offsetsSize || p != 0 {
+ t.Logf("WriteAt(pos:%d, len:%d)", p, len(b))
+ } else {
+ t.Logf("WriteOffsets(rd:%d, wr:%d)", int64(binary.LittleEndian.Uint64(b[:8])), int64(binary.LittleEndian.Uint64(b[8:])))
+ }
+ return n, err
+ },
+ readAt: func(b []byte, p int64) (int, error) {
+ n, err := f0.ReadAt(b, p)
+ if n != len(b) || err != nil {
+ t.Logf("ReadAt(pos:%d, len:%d) = (%v, %v)", p, len(b), n, err)
+ } else if uint64(len(b)) != offsetsSize || p != 0 {
+ t.Logf("ReadAt(pos:%d, len:%d)", p, len(b))
+ } else {
+ t.Logf("ReadOffsets() = (rd:%d, wr:%d)", int64(binary.LittleEndian.Uint64(b[:8])), int64(binary.LittleEndian.Uint64(b[8:])))
+ }
+ return n, err
+ },
+ truncate: func(p int64) error {
+ err := f0.Truncate(p)
+ if err == nil {
+ t.Logf("Truncate(pos:%d)", p)
+ } else {
+ t.Logf("Truncate(pos:%d) = (%v)", p, err)
+ }
+ return err
+ },
+ }
+ }
+ b := must.Get(newPersistent(f))
+ t.Cleanup(func() { b.Close() })
+ return b
+}
+
+func testAll(t *testing.T, f func(t *testing.T, b Buffer)) {
+ t.Run("Ephemeral", func(t *testing.T) { f(t, new(EphemeralBuffer)) })
+ t.Run("Persistent", func(t *testing.T) { f(t, mustOpenPersistent(t)) })
+}
+
+var streamTestLength = flag.Int64("buffer-stream-size", 1<<20, "number of bytes to stream")
+
+func TestBufferStream(t *testing.T) {
+ testAll(t, func(t *testing.T, b Buffer) {
+ maxSize := *streamTestLength
+ var group sync.WaitGroup
+ defer group.Wait()
+ group.Go(func() {
+ var written int64
+ var data []byte
+ stream := rand.NewChaCha8([32]byte{})
+ for written < maxSize {
+ n := rand.IntN(1 << 16)
+ data = slices.Grow(data[:0], n)[:n]
+ must.Get(stream.Read(data))
+ m := must.Get(b.Write(data))
+ if n != m {
+ t.Fatalf("Write = %v, want %v", m, n)
+ }
+ written += int64(n)
+ runtime.Gosched()
+ }
+ })
+ group.Go(func() {
+ var read, maxLen int64
+ var got, want []byte
+ stream := rand.NewChaCha8([32]byte{})
+ for read < maxSize {
+ blen := b.Len()
+ maxLen = max(maxLen, blen)
+ nn := rand.IntN(1 + int(min(3*blen/2, 1<<20)))
+ noEOF := rand.IntN(2) == 0
+ if noEOF && int64(nn) > blen {
+ nn = int(blen) // reading up to Buffer.Len should never report EOF
+ }
+ want = slices.Grow(want[:0], nn)[:nn]
+ switch rand.IntN(3) {
+ case 0: // Read
+ got = slices.Grow(got[:0], nn)[:nn]
+ n, err := b.Read(got)
+ if err != nil && (noEOF || err != io.EOF) {
+ t.Fatalf("Read error: %v", err)
+ } else if err == nil && n != nn {
+ t.Fatalf("Read = %d, want %d", n, nn)
+ }
+ must.Get(stream.Read(want[:n]))
+ if !bytes.Equal(got[:n], want[:n]) {
+ t.Fatalf("data mismatch:\n%s", cmp.Diff(got[:n], want[:n]))
+ }
+ read += int64(n)
+ case 1: // Peek+Discard
+ data, err := b.Peek(nn)
+ got = append(got[:0], data...)
+ if err != nil && (noEOF || err != io.EOF) {
+ t.Fatalf("Peek error: %v", err)
+ } else if err == nil && len(got) != nn {
+ t.Fatalf("Peek = %d, want %d", len(got), nn)
+ }
+ n, err := b.Discard(len(got))
+ if err != nil {
+ t.Fatalf("Discard error: %v", err)
+ } else if n != len(got) {
+ t.Fatalf("Discard = %d, want %d", n, len(got))
+ }
+ must.Get(stream.Read(want[:n]))
+ if !bytes.Equal(got[:n], want[:n]) {
+ t.Fatalf("data mismatch:\n%s", cmp.Diff(got[:n], want[:n]))
+ }
+ read += int64(n)
+ case 2: // Discard only
+ n, err := b.Discard(nn)
+ if err != nil && (noEOF || err != io.EOF) {
+ t.Fatalf("Discard error: %v", err)
+ } else if err == nil && n != nn {
+ t.Fatalf("Discard = %d, want %d", n, nn)
+ }
+ must.Get(stream.Read(want[:n]))
+ read += int64(n)
+ }
+ }
+ t.Logf("peak Buffer.Len: %d", maxLen)
+ })
+ })
+}
+
+func TestPersistentRestart(t *testing.T) {
+ fp := filepath.Join(t.TempDir(), "file")
+ b := must.Get(OpenPersistent(fp))
+ must.Get(b.Write(make([]byte, 100)))
+ want := "Hello, world!"
+ must.Get(b.Write([]byte(want)))
+ must.Get(b.Discard(100))
+ must.Do(b.Close())
+ b = must.Get(OpenPersistent(fp))
+ got := string(must.Get(b.Peek(int(b.Len()))))
+ if got != want {
+ t.Errorf("Peek = %s, want %s", got, want)
+ }
+ must.Do(b.Close())
+}
+
+func TestBufferWait(t *testing.T) {
+ testAll(t, func(t *testing.T, b Buffer) {
+ var want [8]byte
+ for i := range 1000 {
+ binary.LittleEndian.PutUint64(want[:], uint64(i))
+ go must.Get(b.Write(want[:]))
+ if i%2 == 0 {
+ runtime.Gosched() // increase probability of a race
+ }
+ select {
+ case <-b.Wait():
+ got := must.Get(b.Peek(len(want)))
+ if !bytes.Equal(got, want[:]) {
+ t.Errorf("Peek = %x, want %x", got, want)
+ }
+ must.Get(b.Discard(len(want)))
+ case <-t.Context().Done():
+ t.Fatalf("test timeout: %v", t.Context().Err())
+ }
+ }
+ })
+}
+
+func TestBufferNoWait(t *testing.T) {
+ testAll(t, func(t *testing.T, b Buffer) {
+ done := make(chan struct{})
+ go func() {
+ for range 1000 {
+ runtime.Gosched()
+ }
+ close(done)
+ }()
+ select {
+ case <-b.Wait():
+ t.Fatalf("Wait unexpectedly closed early")
+ case <-done:
+ }
+ })
+}
diff --git a/logtail/iopipe/persistent.go b/logtail/iopipe/persistent.go
new file mode 100644
index 000000000..dbaa8909a
--- /dev/null
+++ b/logtail/iopipe/persistent.go
@@ -0,0 +1,520 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package iopipe
+
+import (
+ "cmp"
+ "encoding/binary"
+ "errors"
+ "io"
+ "io/fs"
+ "math"
+ "os"
+ "slices"
+ "sync"
+ "sync/atomic"
+
+ "tailscale.com/types/bools"
+)
+
+// file is general-purpose interface for a file.
+type file interface {
+ Stat() (fs.FileInfo, error)
+ io.WriterAt
+ io.ReaderAt
+ Truncate(int64) error // used for compaction
+ io.Closer
+}
+
+const offsetsSize = uint64(len(offsets{}))
+
+type offsets [16]byte // tuple of ReadOffset and WriteOffset
+
+func (o *offsets) ReadOffset() uint64 { return binary.LittleEndian.Uint64(o[0:]) }
+func (o *offsets) WriteOffset() uint64 { return binary.LittleEndian.Uint64(o[8:]) }
+func (o *offsets) PutReadOffset(n uint64) { binary.LittleEndian.PutUint64(o[0:], n) }
+func (o *offsets) PutWriteOffset(n uint64) { binary.LittleEndian.PutUint64(o[8:], n) }
+
+// PersistentBuffer in an on-disk implementation of [Buffer].
+type PersistentBuffer struct {
+ // The on-disk format of the buffer is sequentially organized as:
+ //
+ // - ReadOffset: 64-bit little endian unsigned integer that
+ // contains the offset to the start of DataBuffer (inclusive).
+ // The offset should always be ≥ [offsetsSize] and ≤ WriteOffset.
+ //
+ // - WriteOffset: 64-bit little endian unsigned integer that
+ // contains the offset to the end of DataBuffer (exclusive).
+ // The offset should always be ≥ ReadOffset and ≤ the file size.
+ // As a special case, if this value is 0 or [math.MaxUint64],
+ // then it is implicitly the current file size.
+ //
+ // - FreeBuffer: A variable-length buffer that occupies space
+ // after the WriteOffset field until the offset in ReadOffset.
+ // The FreeBuffer contains already consumed data,
+ // where the actual content is not meaningful.
+ // As an optimization, the file may be sparse where FreeBuffer
+ // is mostly unallocated disk blocks.
+ //
+ // - DataBuffer: A variable-length buffer starting at the offset
+ // in ReadOffset and contains written, but unread data.
+ // Reads start at the beginning of buffer and
+ // ReadOffset is incremented by the amount of bytes read.
+ // Writes are appended to the end of the buffer starting at the
+ // offset in WriteOffset, which usually grows the size of the file.
+ //
+ // A naive implementation of file buffer can grow indefinitely
+ // due to the ever increasing size of FreeBuffer.
+ // Compaction is needed to reduce the file size:
+ //
+ // - In the simple case where ReadOffset equals WriteOffset,
+ // the ReadOffset and WriteOffset can both be set to [offsetsSize],
+ // and the file be truncated to [offsetsSize].
+ // If successfully truncated, the WriteOffset may be set to [math.MaxUint64].
+ //
+ // - If the underlying filesystem supports sparse files,
+ // a hole can be punched that covers the FreeBuffer range.
+ // With sparse files, it is technically fine if the file size grows
+ // indefinitely since the on-disk size is mainly the DataBuffer.
+ // However, a corrupted ReadOffset could end up causing the buffer
+ // to mistakenly report a massive number of zero bytes,
+ // so there is still wisdom in compacting eventually.
+ //
+ // - If size of DataBuffer is smaller than the FreeBuffer,
+ // then the content of DataBuffer can be copied to the start
+ // of FreeBuffer, ReadOffset set to [offsetsSize], and the file size
+ // truncated to the number of copied bytes plus [offsetsSize].
+ //
+ // - The WriteOffset field is not strictly needed,
+ // but is useful for data resilience.
+ // Under normal operation, it will be set to [math.MaxUint64]
+ // and simply rely on the file size to determine the WriteOffset.
+ // However, compaction requires two non-atomic operations
+ // (updating the offset fields and file truncation).
+ // If the offsets are updated, but file truncation failed,
+ // then prior data may accidentally be "added" to the DataBuffer.
+ // Since it is highly likely that two adjacent offsets
+ // can be written atomically to disk,
+ // we can update both ReadOffset and WriteOffset together
+ // and use that to help protect against failed truncation.
+
+ file file
+ closed atomic.Bool // set to true while holding both rdMu and wrMu
+
+ // rdMu is held by Read, Peek, Discard, Wait, and Close.
+ rdMu sync.Mutex // may acquire wrMu while holding rdMu
+ rdPos atomic.Uint64 // may only decrement while holding both rdMu and wrMu
+ peekPos uint64 // offset into peekBuf
+ peekBuf []byte // contains file data at rdPos-peekPos
+ offsets offsets // offsets in the file
+ lastCompactPos uint64 // rdPos of when a compaction was last attempted
+ blockSize int64 // block size used by the file (best-effort)
+
+ // wrMu is held by Len, Write, Discard, Wait, and Close.
+ wrMu sync.Mutex // must never acquire rdMu while holding wrMu
+ wrPos atomic.Uint64 // may only decrement while holding both rdMu and wrMu
+ waiter chan struct{} // closed by Write if non-nil
+
+ // While more complicated, there are two different mutexes
+ // to minimize how often Read and Write may block each other.
+ // Some operations need to hold both mutexes. To avoid a deadlock,
+ // the wrMu must always be acquired after the rdMu.
+}
+
+// OpenPersistent opens or creates a persistent [Buffer]
+// backed on disk by a file located at path.
+// The buffer must be closed to release resources.
+func OpenPersistent(path string) (*PersistentBuffer, error) {
+ f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0600)
+ if err != nil {
+ return nil, wrapError("open", err)
+ }
+ b, err := newPersistent(f)
+ if err != nil {
+ return nil, err
+ }
+ return b, nil
+}
+
+// newPersistent constructs a new PersistentBuffer from the file.
+// It takes ownership of closing the file.
+func newPersistent(f file) (*PersistentBuffer, error) {
+ // Load the ReadOffset, WriteOffsets, fileSize, and blockSize.
+ b := &PersistentBuffer{file: f}
+ if _, err := readFullAt(b.file, b.offsets[:], 0); err != nil && err != io.ErrUnexpectedEOF {
+ f.Close()
+ return nil, wrapError("open", err)
+ }
+ fi, err := b.file.Stat()
+ if err != nil {
+ f.Close()
+ return nil, wrapError("open", err)
+ }
+ // TODO: Populate blockSize.
+
+ // Enforce the following invariant:
+ // offsetsSize ≤ ReadOffset ≤ WriteOffset ≤ fileSize
+ fileSize := uint64(max(int64(offsetsSize), fi.Size())) // enforce fileSize against offsetSize, which is a constant
+ readOffset := clamp(offsetsSize, b.offsets.ReadOffset(), fileSize)
+ writeOffset := clamp(offsetsSize, cmp.Or(b.offsets.WriteOffset(), fileSize), fileSize)
+ readOffset = min(readOffset, writeOffset)
+
+ // Always update the offsets (even if unchanged).
+ // This helps detect read-only files before they become a problem.
+ if err := b.truncateLocked(readOffset, writeOffset); err != nil {
+ f.Close()
+ return nil, wrapError("open", err)
+ }
+ return b, nil
+}
+
+// Len reports the size of the buffer,
+// which is the number of written, but unread bytes.
+// It reports zero if the buffer is closed.
+func (b *PersistentBuffer) Len() int64 {
+ b.wrMu.Lock() // generally faster to acquire
+ defer b.wrMu.Unlock()
+ return int64(b.wrPos.Load() - b.rdPos.Load()) // rdPos may increase asynchronously
+}
+
+// Write writes data to the end of the buffer,
+// incrementing Len by the amount of bytes written.
+func (b *PersistentBuffer) Write(p []byte) (int, error) {
+ b.wrMu.Lock()
+ defer b.wrMu.Unlock()
+ if b.closed.Load() {
+ return 0, wrapError("write", errClosed)
+ }
+ n, err := b.file.WriteAt(p, int64(b.wrPos.Load())) // wrPos is stable
+ b.wrPos.Add(uint64(n))
+
+ // Check if there are any waiters to wake up.
+ if n > 0 && b.waiter != nil {
+ close(b.waiter)
+ b.waiter = nil
+ }
+
+ return n, wrapError("write", err) // err remains nil if already nil
+}
+
+// Read reads data from the front of the buffer,
+// decrementing Len by the amount of bytes read.
+// When the buffer is empty, it returns [io.EOF].
+func (b *PersistentBuffer) Read(p []byte) (int, error) {
+ b.rdMu.Lock()
+ defer b.rdMu.Unlock()
+ p2, peekErr := b.peekReadLocked(len(p))
+ n, discErr := b.discardReadLocked(copy(p, p2))
+ return n, cmp.Or(discErr, peekErr)
+}
+
+// Peek peeks n bytes from the front of the buffer.
+// The buffer is only valid until the next Read, Peek, or Discard call.
+// It reports an error if the buffer length is less than n.
+func (b *PersistentBuffer) Peek(n int) ([]byte, error) {
+ b.rdMu.Lock()
+ defer b.rdMu.Unlock()
+ return b.peekReadLocked(n)
+}
+
+// Discard discards n bytes from the front of the buffer,
+// decrementing Len by the amount of bytes discarded.
+// It reports an error if the number of discard bytes is less than n.
+func (b *PersistentBuffer) Discard(n int) (int, error) {
+ b.rdMu.Lock()
+ defer b.rdMu.Unlock()
+ return b.discardReadLocked(n)
+}
+
+// peekReadLocked implements Peek while rdMu is already held.
+func (b *PersistentBuffer) peekReadLocked(n int) ([]byte, error) {
+ switch {
+ case b.closed.Load():
+ return nil, wrapError("peek", errClosed)
+ case n < 0:
+ return nil, wrapError("peek", errNegative)
+ }
+
+ // Fill the peek buffer if necessary.
+ var rdErr error
+ peekBuf := b.peekBuf[min(b.peekPos, uint64(len(b.peekBuf))):]
+ if n > len(peekBuf) {
+ // Move data in peek buffer to the front.
+ m := copy(b.peekBuf[:cap(b.peekBuf)], peekBuf)
+ b.peekPos, b.peekBuf = 0, b.peekBuf[:m]
+
+ // Read data into the peek buffer.
+ availData := max(0, int64(b.wrPos.Load()-b.rdPos.Load())-int64(len(b.peekBuf)))
+ b.peekBuf = slices.Grow(b.peekBuf, int(min(int64(n-len(peekBuf)), availData)))
+ m = int(min(int64(cap(b.peekBuf)-len(b.peekBuf)), availData))
+ m, rdErr = readFullAt(b.file, b.peekBuf[len(b.peekBuf):cap(b.peekBuf)][:m], int64(b.rdPos.Load())+int64(len(b.peekBuf)))
+ rdErr = wrapError("peek", rdErr) // remains nil if already nil
+ b.peekBuf = b.peekBuf[:len(b.peekBuf)+m]
+ peekBuf = b.peekBuf
+ }
+
+ // Return the available data in the peek buffer.
+ if n > len(peekBuf) {
+ return peekBuf, cmp.Or(rdErr, io.EOF)
+ }
+ return peekBuf[:n], nil
+}
+
+// discardReadLocked implements Discard while rdMu is already held.
+func (b *PersistentBuffer) discardReadLocked(n int) (m int, err error) {
+ switch {
+ case b.closed.Load():
+ return 0, wrapError("discard", errClosed)
+ case n < 0:
+ return 0, wrapError("discard", errNegative)
+ }
+
+ avail := max(0, int64(b.wrPos.Load()-b.rdPos.Load())) // wrPos may increase asynchronously
+ if int64(n) > avail {
+ n, err = int(avail), io.EOF
+ }
+ if n > 0 {
+ if err := b.updateOffsetsReadLocked(n); err != nil {
+ return 0, wrapError("discard", err)
+ }
+ if err := b.mayCompactReadLocked(); err != nil {
+ return n, wrapError("compact", err)
+ }
+ }
+ return n, err // either nil or [io.EOF]
+}
+
+// errMoreData reports that the DataBuffer is non-empty.
+var errMoreData = errors.New("more data available")
+
+// updateOffsetsReadLocked updates the offsets.
+// The rdMu must already be held.
+func (b *PersistentBuffer) updateOffsetsReadLocked(n int) error {
+ readOffset := b.rdPos.Load() + uint64(n) // rdPos is stable
+
+ // Check if the file would be empty, in which case, just truncate.
+ if readOffset == b.wrPos.Load() { // wrPos may increase asynchronously
+ if err := func() error {
+ b.wrMu.Lock() // properly acquired after rdMu
+ defer b.wrMu.Unlock()
+ if readOffset == b.wrPos.Load() { // wrPos is stable
+ if err := b.truncateLocked(readOffset, b.wrPos.Load()); err != nil {
+ return err
+ }
+ b.peekPos, b.peekBuf = 0, b.peekBuf[:0] // invalidate peek buffer
+ return nil
+ }
+ return errMoreData
+ }(); (err != nil && err != errMoreData) || err == nil {
+ return err
+ }
+ }
+
+ // Otherwise, we need to write the offsets.
+ offsetsOld := b.offsets
+ b.offsets.PutReadOffset(readOffset)
+ if b.offsets.WriteOffset() < math.MaxUint64 {
+ b.offsets.PutWriteOffset(b.wrPos.Load()) // wrPos may increase asynchronously
+ }
+ if _, err := b.file.WriteAt(b.offsets[:], 0); err != nil {
+ b.offsets = offsetsOld
+ return err
+ }
+
+ // Update the offsets.
+ b.rdPos.Add(uint64(n))
+ b.peekPos += uint64(n) // invalidate leading bytes of peekBuf
+ return nil
+}
+
+// mayCompactReadLocked optionally compacts the file.
+// The rdMu must already be held.
+func (b *PersistentBuffer) mayCompactReadLocked() error {
+ // Always trying to compact for every read could be expensive.
+ // Similar to GOGC, only attempt compaction when the FreeBuffer
+ // grows by some fraction (chosen default is 25%).
+ //
+ // Also, skip compaction if the entire file fits in a single block,
+ // since it will generally occupy the same amount of disk space.
+ singleBlock := b.wrPos.Load() <= clamp(1<<12, uint64(b.blockSize), 1<<20)
+ compactedRecently := b.rdPos.Load() < 5*b.lastCompactPos/4 // rdPos is stable
+ if singleBlock || compactedRecently {
+ return nil
+ }
+
+ freeLen := max(0, int64(b.rdPos.Load()-offsetsSize)) // rdPos is stable
+ dataLen := max(0, int64(b.wrPos.Load()-b.rdPos.Load())) // wrPos may increase asynchronously
+
+ // Rely on hole-punching to reclaim disk space.
+ // If the file supports sparse holes, then we can tolerate a higher
+ // logical file size since the physical size on disk is smaller.
+ if freeLen < 16*dataLen && int64(b.rdPos.Load()) > 2*b.blockSize && b.blockSize > 0 {
+ // TODO: Implement support for punching holes.
+ }
+
+ // Move the data to the front of the file.
+ // Ensure there is notably more free space than data to reduce
+ // probability that data grows beyond free space while copying.
+ if freeLen > 3*dataLen/2 {
+ if err := b.copyingCompactReadLocked(); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// errNoSpace reports that the DataBuffer is larger than the FreeBuffer.
+// This an internal error and should not be exposed to the external API.
+var errNoSpace = errors.New("insufficient free space")
+
+// copyingCompactReadLocked copies the DataBuffer into the FreeBuffer
+// and updates the ReadOffset and WriteOffset.
+func (b *PersistentBuffer) copyingCompactReadLocked() error {
+ // Copy DataBuffer to FreeBuffer on a block-by-block basis.
+ var blockBuffer [1 << 12]byte // TODO: Pool this?
+ dstPos := uint64(offsetsSize)
+ srcPos := b.rdPos.Load()
+ for {
+ if err := func() (err error) {
+ // If this seems like the last block, acquire wrMu beforehand
+ // to ensure that copying does not race with concurrent Writes.
+ // Thus, we can know for certain that this is truly the last block.
+ availData := int64(b.wrPos.Load() - srcPos) // wrPos may increase asynchronously
+ if availData <= int64(len(blockBuffer)) {
+ b.wrMu.Lock() // properly acquired after rdMu
+ defer b.wrMu.Unlock()
+
+ // After copying the last block, update the offsets.
+ defer func() {
+ availData = int64(b.wrPos.Load() - srcPos) // wrPos is stable
+ if err != nil || availData != 0 {
+ return // still more data to copy
+ }
+ dataLen := b.wrPos.Load() - b.rdPos.Load()
+ err = cmp.Or(b.truncateLocked(dstPos-dataLen, dstPos), io.EOF)
+ }()
+ }
+
+ // Read a block from the DataBuffer.
+ availData = int64(b.wrPos.Load() - srcPos) // wrPos may increase asynchronously unless wrMu is held
+ n := int(min(int64(len(blockBuffer)), availData))
+ if _, err := readFullAt(b.file, blockBuffer[:n], int64(srcPos)); err != nil {
+ return err
+ }
+ srcPos += uint64(n) // should never run past b.wrPos
+
+ // Write a block into the FreeBuffer.
+ availFree := int64(b.rdPos.Load() - dstPos) // rdPos may increase asynchronously unless rdMu is held
+ if availData > availFree {
+ return errNoSpace
+ }
+ if _, err := b.file.WriteAt(blockBuffer[:n], int64(dstPos)); err != nil {
+ return err
+ }
+ dstPos += uint64(n) // should never run past b.rdPos
+
+ return nil
+ }(); err != nil {
+ return bools.IfElse(err != errNoSpace && err != io.EOF, err, nil)
+ }
+ }
+}
+
+// truncateLocked truncates the file according the specified offsets.
+// Both rdMu and wrMu must be held.
+func (b *PersistentBuffer) truncateLocked(readOffset, writeOffset uint64) error {
+ // Special-case: If all data is read, then just truncate the file.
+ // This reduces IO operations from 3 down to 1.
+ if readOffset == writeOffset {
+ if err := b.file.Truncate(0); err != nil {
+ return err
+ }
+ b.offsets.PutReadOffset(offsetsSize)
+ b.offsets.PutWriteOffset(math.MaxUint64)
+ b.rdPos.Store(offsetsSize)
+ b.wrPos.Store(offsetsSize)
+ b.lastCompactPos = offsetsSize
+ return nil
+ }
+
+ // Step 1: Update both offsets.
+ // A modern disk should be able to update both offsets atomically.
+ offsetsOld := b.offsets
+ b.offsets.PutReadOffset(readOffset)
+ b.offsets.PutWriteOffset(writeOffset)
+ if _, err := b.file.WriteAt(b.offsets[:], 0); err != nil {
+ b.offsets = offsetsOld
+ return err
+ }
+ b.rdPos.Store(readOffset) // only time rdPos is possibly decremented
+ b.wrPos.Store(writeOffset) // only time wrPos is possibly decremented
+ b.lastCompactPos = readOffset
+
+ // Step 2: Truncate the file.
+ // If this fails, then WriteOffset holds the real file size,
+ // allowing OpenPersistent to reliably resume the file.
+ if err := b.file.Truncate(int64(b.wrPos.Load())); err != nil {
+ return err
+ }
+
+ // Step 3: Update WriteOffset to use the file size.
+ // Since the file was successfully truncated,
+ // we can rely of the file size to implicitly be the WriteOffset.
+ offsetsOld = b.offsets
+ b.offsets.PutWriteOffset(math.MaxUint64) // use file size as WriteOffset
+ if _, err := b.file.WriteAt(b.offsets[:], 0); err != nil {
+ b.offsets = offsetsOld
+ return err
+ }
+
+ return nil
+}
+
+// Wait returns channel that is closed when the buffer is non-empty
+// or when the buffer itself is closed.
+func (b *PersistentBuffer) Wait() <-chan struct{} {
+ b.rdMu.Lock()
+ defer b.rdMu.Unlock()
+ b.wrMu.Lock() // properly acquired after rdMu
+ defer b.wrMu.Unlock()
+ if b.closed.Load() || b.wrPos.Load() > b.rdPos.Load() { // both wrPos and rdPos are stable
+ return alreadyClosed // already closed or data is available
+ } else if b.waiter == nil {
+ b.waiter = make(chan struct{})
+ }
+ return b.waiter
+}
+
+// Close closes the buffer.
+func (b *PersistentBuffer) Close() error {
+ b.rdMu.Lock()
+ defer b.rdMu.Unlock()
+ b.wrMu.Lock() // properly acquired after rdMu
+ defer b.wrMu.Unlock()
+ if b.closed.Load() {
+ return wrapError("close", errors.New("buffer already closed"))
+ }
+ b.closed.Store(true)
+ if b.waiter != nil {
+ close(b.waiter)
+ b.waiter = nil
+ }
+ return wrapError("close", b.file.Close())
+}
+
+// readFullAt is like ReadAt except it
+// converts [io.EOF] to [io.ErrUnexpectedEOF] unless all of b is read.
+func readFullAt(r io.ReaderAt, b []byte, pos int64) (int, error) {
+ n, err := r.ReadAt(b, pos)
+ if err == io.EOF {
+ err = bools.IfElse(n < len(b), io.ErrUnexpectedEOF, nil)
+ }
+ return n, err
+}
+
+// clamp clamps val to be within lo and hi, inclusive.
+func clamp[T cmp.Ordered](lo, val, hi T) T {
+ return min(max(lo, val), hi)
+}