summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWayne-Cole <77279425+Wacky404@users.noreply.github.com>2026-06-13 20:26:39 -0500
committerWayne-Cole <77279425+Wacky404@users.noreply.github.com>2026-06-13 20:26:39 -0500
commit1cfd8a143107ab4bc50092daa45407196df0b75d (patch)
treec25097cdcfe39a875dad3eb483c59a95c5e8b1be
parentbb0d5fecf8d839efa0e89c33d310c5202c6f8919 (diff)
downloadlurchers-1cfd8a143107ab4bc50092daa45407196df0b75d.tar.xz
lurchers-1cfd8a143107ab4bc50092daa45407196df0b75d.zip
update: dove deep into unix api and made the base orchestration system
-rw-r--r--cmd/lurchers/main.go92
-rw-r--r--howlers/src/scripts/workspace_1/example.py0
-rw-r--r--internal/api/logic.go16
-rw-r--r--internal/daemon.go82
-rw-r--r--internal/events.go199
-rw-r--r--internal/lcommon.go111
-rw-r--r--internal/logic.go28
-rw-r--r--internal/lurchql.go (renamed from internal/api/lurchql.go)12
-rw-r--r--internal/ring.go130
9 files changed, 485 insertions, 185 deletions
diff --git a/cmd/lurchers/main.go b/cmd/lurchers/main.go
index 57d2019..ca45eaf 100644
--- a/cmd/lurchers/main.go
+++ b/cmd/lurchers/main.go
@@ -1,22 +1,16 @@
package main
import (
- "bufio"
"context"
- "fmt"
"log"
"log/slog"
"net"
"os"
"os/signal"
"runtime"
- "sync"
- "sync/atomic"
"syscall"
- "time"
"lurchers/internal"
- "lurchers/internal/api"
)
var (
@@ -37,12 +31,6 @@ func init() {
}
}
-type Daemon struct {
- mem *internal.ReaderAt
- workerWg sync.WaitGroup
- workerCount atomic.Int64
-}
-
func main() {
if _, err := os.Stat(FileLogs); os.IsNotExist(err) {
fpath := "/var/log/lurchers/"
@@ -54,14 +42,6 @@ func main() {
defer f.Close()
}
- if _, err := os.Stat(FileMem); os.IsNotExist(err) {
- m, err := os.OpenFile(FileMem, os.O_RDWR|os.O_CREATE, 0o644)
- if err != nil {
- log.Fatal(err)
- }
- defer m.Close()
- }
-
logFile, err := internal.SetupLogger(internal.WithLogName(FileLogs))
if err != nil {
log.Fatal("setuplogger: error setting up logger", err)
@@ -81,14 +61,23 @@ func main() {
slog.Error("failed to listen", "error", err)
os.Exit(1)
}
+
// allows non-root to connect to socket
os.Chmod(SocketPath, 0o666)
defer ln.Close()
- slog.Info("daemon started", "socket", SocketPath)
-
ctx, cancel := context.WithCancel(context.Background())
+ // idea of workspaces: Workspace 1, 2, 3, ..., N
+ // default ring + watchmen for init workspace
+ ring := internal.NewSharedRing(mem)
+ watchmen := internal.NewWatchMen(ring)
+ d := &internal.Daemon{
+ Ring: ring,
+ Watchmen: watchmen,
+ Ctx: ctx,
+ }
+
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
@@ -98,15 +87,17 @@ func main() {
ln.Close()
}()
- d := &Daemon{mem: mem}
+ slog.Info("daemon started", "socket", SocketPath)
+ // a reference to the daemon in context
+ ctxD := context.WithValue(ctx, internal.LurchersContextKey("daemon"), &d)
for {
conn, err := ln.Accept()
if err != nil {
select {
case <-ctx.Done():
slog.Info("waiting for workers to finish...")
- d.workerWg.Wait()
+ d.WorkerWg.Wait()
slog.Info("daemon stopped")
return
default:
@@ -114,57 +105,6 @@ func main() {
continue
}
}
- go d.handleConn(ctx, conn)
+ go d.HandleConn(ctxD, conn)
}
}
-
-// {"method": "query","function": "status","params": {"id": 1, "user": "wcole"}}
-func (d *Daemon) handleConn(ctx context.Context, conn net.Conn) {
- defer conn.Close()
-
- scanner := bufio.NewScanner(conn)
- for scanner.Scan() {
- var msg api.LurchMsg
-
- if err := api.Decode(&msg, scanner.Bytes()); err != nil {
- slog.Error("decode: failed to decode", "error", err)
- fmt.Fprintln(conn, "error: invalid json")
- continue
- }
-
- slog.Info("received", "json", msg)
-
- if err := api.Compute(ctx, &msg, conn); err != nil {
- slog.Error("parse: failed to parse lurch logic", "error", err)
- fmt.Fprintln(conn, "error: failed to parse")
- continue
- }
- }
-
- if err := scanner.Err(); err != nil {
- slog.Error("scanner: something broke, fix it", "error", err)
- }
-}
-
-func (d *Daemon) worker(ctx context.Context) {
- defer d.workerWg.Done()
- defer d.workerCount.Add(-1)
-
- // replace eventually with shared memory polling
- ticker := time.NewTicker(5 * time.Second)
- defer ticker.Stop()
-
- for {
- select {
- case <-ctx.Done():
- slog.Info("worker shutting down")
- return
- case <-ticker.C:
- go doWork()
- }
- }
-}
-
-func doWork() {
- time.Sleep(5 * time.Second)
-}
diff --git a/howlers/src/scripts/workspace_1/example.py b/howlers/src/scripts/workspace_1/example.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/howlers/src/scripts/workspace_1/example.py
diff --git a/internal/api/logic.go b/internal/api/logic.go
deleted file mode 100644
index e60ec50..0000000
--- a/internal/api/logic.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package api
-
-import "log/slog"
-
-var (
- statusArgs lurchArgs = lurchArgs{}
- status lurchCallback = func(args lurchArgs) error {
- if len(args) == 0 {
- slog.Debug("empty args map", "length", len(args))
- }
-
- slog.Debug("stuff is happening eventually")
-
- return nil
- }
-)
diff --git a/internal/daemon.go b/internal/daemon.go
new file mode 100644
index 0000000..cc202f5
--- /dev/null
+++ b/internal/daemon.go
@@ -0,0 +1,82 @@
+package internal
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "log/slog"
+ "net"
+ "os/exec"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "lurchers/internal/api"
+)
+
+type Daemon struct {
+ Ring *SharedRing
+ Watchmen *WatchMen
+ Ctx context.Context
+ WorkerWg sync.WaitGroup
+ WorkerCount atomic.Int64
+}
+
+// {"method": "query","function": "status","params": {"id": 1, "user": "wcole"}}
+func (d *Daemon) HandleConn(ctx context.Context, conn net.Conn) {
+ defer conn.Close()
+
+ scanner := bufio.NewScanner(conn)
+ for scanner.Scan() {
+ var msg api.LurchMsg
+
+ if err := api.Decode(&msg, scanner.Bytes()); err != nil {
+ slog.Error("decode: failed to decode", "error", err)
+ fmt.Fprintln(conn, "error: invalid json")
+ continue
+ }
+
+ slog.Info("received", "json", msg)
+
+ if err := api.Compute(ctx, &msg, conn); err != nil {
+ slog.Error("parse: failed to parse lurch logic", "error", err)
+ fmt.Fprintln(conn, "error: failed to parse")
+ continue
+ }
+ }
+
+ if err := scanner.Err(); err != nil {
+ slog.Error("scanner: something broke, fix it", "error", err)
+ }
+}
+
+func (d *Daemon) SpawnWorker(ev SysLurchEventT, timeout time.Duration) (uint64, error) {
+ offset, slotIdx, err := d.Ring.ClaimSlot()
+ if err != nil {
+ return 0, err
+ }
+
+ if err := d.Ring.WriteHeader(offset, ev); err != nil {
+ return 0, err
+ }
+
+ d.Watchmen.MarkPending(slotIdx, time.Now().Add(timeout))
+ payloadOffset := offset + SlotHeaderSize
+
+ ctx, cancel := context.WithTimeout(d.Ctx, timeout)
+ cmd := exec.CommandContext(ctx, "jac", "howler,.py", fmt.Sprintf("%d", payloadOffset))
+
+ // explicit tracking of workers
+ d.WorkerWg.Add(1)
+ d.WorkerCount.Add(1)
+ go func() {
+ defer d.WorkerWg.Done()
+ defer d.WorkerCount.Add(-1)
+ defer cancel()
+ if err := cmd.Run(); err != nil {
+ slog.Error("spawn worker: worker failed to spawn / timed out on ingest deadline", "slotIdx", slotIdx)
+ }
+ }()
+
+ return slotIdx, nil
+}
diff --git a/internal/events.go b/internal/events.go
new file mode 100644
index 0000000..56ac1cb
--- /dev/null
+++ b/internal/events.go
@@ -0,0 +1,199 @@
+package internal
+
+import (
+ "encoding/binary"
+ "fmt"
+ "time"
+)
+
+type EventType int
+
+const (
+ ChldProcStart = iota
+ ChldProcDone
+ ChldProcFailed
+ ChldProcHurt
+ ChldProcHealing
+ ChldProcHealed
+ ChldProcTimedOut
+)
+
+var eventName = map[EventType]string{
+ ChldProcStart: "child_start",
+ ChldProcDone: "child_done",
+ ChldProcFailed: "child_failed",
+ ChldProcHurt: "child_hurt",
+ ChldProcHealing: "child_healing",
+ ChldProcHealed: "child_healed",
+ ChldProcTimedOut: "child_timedout",
+}
+
+func (et EventType) String() string {
+ return eventName[et]
+}
+
+// SysLurchEventT mirrors the on-disk slot layout (32-byte header + payload).
+type SysLurchEventT struct {
+ EventTime int64
+ EventID int64
+ EventKind EventType
+ Data1 int32
+ Data2 int32
+ PayloadLen uint32
+ Payload []byte
+}
+
+// EncodeEvent serializes ev into buf (must be >= SlotSize).
+// Payload is truncated to fit if it exceeds the slot's payload capacity.
+func EncodeEvent(buf []byte, ev SysLurchEventT) error {
+ if len(buf) < SlotSize {
+ return fmt.Errorf("encode event: buffer too small: %d < %d", len(buf), SlotSize)
+ }
+
+ payloadCap := SlotSize - SlotHeaderSize
+ payload := ev.Payload
+ if len(payload) > payloadCap {
+ payload = payload[:payloadCap]
+ }
+
+ binary.LittleEndian.PutUint64(buf[0:8], uint64(ev.EventTime))
+ binary.LittleEndian.PutUint64(buf[8:16], uint64(ev.EventID))
+ binary.LittleEndian.PutUint32(buf[16:20], uint32(ev.EventKind))
+ binary.LittleEndian.PutUint32(buf[20:24], uint32(ev.Data1))
+ binary.LittleEndian.PutUint32(buf[24:28], uint32(ev.Data2))
+ binary.LittleEndian.PutUint32(buf[28:32], uint32(len(payload)))
+
+ copy(buf[SlotHeaderSize:], payload)
+
+ // zero any leftover bytes from a previous longer write at this slot
+ for i := SlotHeaderSize + len(payload); i < SlotSize; i++ {
+ buf[i] = 0
+ }
+
+ return nil
+}
+
+// DecodeEvent deserializes a SysLurchEvent_t from buf (must be >= SlotSize).
+func DecodeEvent(buf []byte) (SysLurchEventT, error) {
+ var ev SysLurchEventT
+ if len(buf) < SlotSize {
+ return ev, fmt.Errorf("decode event: buffer too small: %d < %d", len(buf), SlotSize)
+ }
+
+ ev.EventTime = int64(binary.LittleEndian.Uint64(buf[0:8]))
+ ev.EventID = int64(binary.LittleEndian.Uint64(buf[8:16]))
+ ev.EventKind = EventType(binary.LittleEndian.Uint32(buf[16:20]))
+ ev.Data1 = int32(binary.LittleEndian.Uint32(buf[20:24]))
+ ev.Data2 = int32(binary.LittleEndian.Uint32(buf[24:28]))
+ ev.PayloadLen = binary.LittleEndian.Uint32(buf[28:32])
+
+ if ev.PayloadLen > uint32(SlotSize-SlotHeaderSize) {
+ return ev, fmt.Errorf("decode event: invalid payload length %d", ev.PayloadLen)
+ }
+
+ payload := make([]byte, ev.PayloadLen)
+ copy(payload, buf[SlotHeaderSize:SlotHeaderSize+ev.PayloadLen])
+ ev.Payload = payload
+
+ return ev, nil
+}
+
+type pendingEntry struct {
+ deadline time.Time
+}
+
+// WatchMen is the in-process consumer view over a SharedRing.
+type WatchMen struct {
+ ring *SharedRing
+ pending map[uint64]pendingEntry
+}
+
+func (w *WatchMen) MarkPending(slotIdx uint64, deadline time.Time) {
+ w.pending[slotIdx] = pendingEntry{deadline: deadline}
+}
+
+func NewWatchMen(ring *SharedRing) *WatchMen {
+ return &WatchMen{
+ ring: ring,
+ pending: make(map[uint64]pendingEntry),
+ }
+}
+
+// Ingest reads all unread events from the ring, advances ReadIndex, and
+// returns them in order. Returns an error if the consumer has fallen behind
+// and data has been overwritten (overflow).
+func (w *WatchMen) Ingest() ([]SysLurchEventT, error) {
+ writeIdx := w.ring.WriteIndex()
+ readIdx := w.ring.ReadIndex()
+
+ if writeIdx < readIdx {
+ return nil, fmt.Errorf("ingest: corrupt indices: write %d < read %d", writeIdx, readIdx)
+ }
+
+ total := writeIdx - readIdx
+ if total == 0 {
+ return nil, nil
+ }
+ if total > uint64(MaxSlots) {
+ skipped := total - uint64(MaxSlots)
+ for s := readIdx; s < readIdx+skipped; s++ {
+ delete(w.pending, s)
+ }
+ readIdx += skipped
+ total = uint64(MaxSlots)
+ // TODO: caller needs to log skipped as dropped events
+ }
+
+ events := make([]SysLurchEventT, 0, total)
+ resolved := make(map[uint64]bool, total)
+
+ // first pass is decoding every slot in range, collect the resolved ones
+ for i := uint64(0); i < total; i++ {
+ slotIdx := readIdx + i
+
+ buf, err := w.ring.SlotBytes(slotIdx)
+ if err != nil {
+ return events, fmt.Errorf("ingest: read slot %d: %v", slotIdx, err)
+ }
+
+ ev, err := DecodeEvent(buf)
+ if err != nil {
+ return events, fmt.Errorf("ingest: decode slot %d: %v", slotIdx, err)
+ }
+
+ if ev.PayloadLen == 0 {
+
+ entry, tracked := w.pending[slotIdx]
+ if tracked && time.Now().After(entry.deadline) {
+ // means that this is still pending; worker is not done yet
+ // or it has timed out
+ delete(w.pending, slotIdx)
+ resolved[slotIdx] = true
+ ev.EventKind = ChldProcTimedOut
+ events = append(events, ev)
+ continue
+ }
+
+ w.pending[slotIdx] = entry // keeps tracking; deadline unchanged
+ continue
+ }
+
+ // rdy
+ delete(w.pending, slotIdx)
+ resolved[slotIdx] = true
+ events = append(events, ev)
+ }
+
+ // second pass advances the ReadIndex past the longest resolved
+ // prefix starting at readIdx
+ advanceTo := readIdx
+ for resolved[advanceTo] {
+ advanceTo++
+ }
+
+ if advanceTo > readIdx {
+ w.ring.SetReadIndex(advanceTo)
+ }
+
+ return events, nil
+}
diff --git a/internal/lcommon.go b/internal/lcommon.go
index 49c5b28..f6a846f 100644
--- a/internal/lcommon.go
+++ b/internal/lcommon.go
@@ -1,4 +1,4 @@
-/*
+/* Package internal
* [https://cs.opensource.google/go/x/exp/+/master:mmap/mmap_unix.go]
* for mmap implementation that I borrowed
*/
@@ -13,8 +13,11 @@ import (
"os"
"runtime"
"syscall"
+ "unsafe"
)
+type LurchersContextKey string
+
func GetVar(key string, fallback string) string {
val, ok := os.LookupEnv(key)
if !ok {
@@ -128,63 +131,9 @@ func WithFileLevel(level slog.Level) func(*options) {
* =======================================================
*/
const (
- MAX_EVENTS int = 256
- MAX_CHLD_PROC int = 10000
- SIZE_RING_BUFF int = 16384
- SIZE_FILE int = SIZE_RING_BUFF * MAX_CHLD_PROC // 160 mb
- debug bool = false
-)
-
-type EventType int
-
-const (
- CHLD_PROC_START = iota
- CHLD_PROC_DONE
- CHLD_PROC_FAILED
- CHLD_PROC_HURT
- CHLD_PROC_HEALING
- CHLD_PROC_HEALED
+ debug bool = false
)
-var eventName = map[EventType]string{
- CHLD_PROC_START: "child_start",
- CHLD_PROC_DONE: "child_done",
- CHLD_PROC_FAILED: "child_failed",
- CHLD_PROC_HURT: "child_hurt",
- CHLD_PROC_HEALING: "child_healing",
- CHLD_PROC_HEALED: "child_healed",
-}
-
-func (et EventType) String() string {
- return eventName[et]
-}
-
-type SysLurchEvent_t struct {
- EventTime int
- EventID int
- Data1, Data2 int
- EventKind EventType
-}
-
-type WatchMen struct {
- eventHead int
- eventTail int
- EventsTotal int
- EventQue [MAX_EVENTS]SysLurchEvent_t
-}
-
-// single consumer
-func (w *WatchMen) SysGetEvent() *SysLurchEvent_t {
- if (*w).eventHead == (*w).eventTail {
- return nil // buffer is empty
- }
-
- ev := &(*w).EventQue[(*w).eventTail]
- (*w).eventTail = ((*w).eventTail + 1) % MAX_EVENTS
-
- return ev
-}
-
// ReaderAt reads a memory-mapped file (.mem)
// Like any io.ReaderAt, clients can execute parallel ReadAt calls, but it is
// not safe to call Close and reading methods concurrently.
@@ -193,7 +142,7 @@ type ReaderAt struct {
file *os.File
}
-// implements the io.ReaderAt interface
+// ReadAt implements the io.ReaderAt interface
func (r *ReaderAt) ReadAt(p []byte, offset int64) (int, error) {
if r.data == nil {
return 0, errors.New("mmap: closed")
@@ -242,9 +191,6 @@ func (r *ReaderAt) Close() error {
}
func Open(filename string) (*ReaderAt, error) {
- // just opening a file that will store
- // bytes of data that parent proc and
- // and child proc(s) share
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0o666)
if err != nil {
return nil, fmt.Errorf("open: %v", err)
@@ -256,18 +202,21 @@ func Open(filename string) (*ReaderAt, error) {
return nil, fmt.Errorf("stat: %v", err)
}
- // size of the memory file in question
size := fs.Size()
+ freshFile := false
+
if size == 0 {
- // Treat (size == 0) as a special case, truncating the
- // file to the specified file size, in the case that the
- // file is new; upon first run of program.
- if err := f.Truncate(int64(SIZE_FILE)); err != nil {
+ if err := f.Truncate(int64(SizeFile)); err != nil {
f.Close()
return nil, fmt.Errorf("truncate: %v", err)
}
+ size = int64(SizeFile)
+ freshFile = true
+ }
- size = int64(SIZE_FILE)
+ if size != int64(SizeFile) {
+ f.Close()
+ return nil, fmt.Errorf("mmap: file %q has size %d, expected %d (format/version mismatch)", filename, size, SizeFile)
}
if size < 0 {
f.Close()
@@ -289,6 +238,11 @@ func Open(filename string) (*ReaderAt, error) {
file: f,
}
+ if freshFile {
+ ring := NewSharedRing(r)
+ ring.InitHeader()
+ }
+
if debug {
var p *byte
if len(data) != 0 {
@@ -302,27 +256,6 @@ func Open(filename string) (*ReaderAt, error) {
return r, nil
}
-/*
-func isBufferFull(eventHead int, eventTail int) bool {
- return ((eventHead + 1) % MAX_EVENTS) == eventTail
-}
-
-// single producer
-func SysQueueEvent(time int, kind EventType, id int, data1 int, data2 int) error {
- if IsBufferFull() {
- // overflow protection
- fmt.Printf("warning: Event queue overflow! Event %s dropped\n", kind.String())
- return fmt.Errorf("warning: Event queue overflow! Event %s dropped", kind.String())
- }
-
- EventQue[eventHead].EventTime = time
- EventQue[eventHead].EventKind = kind
- EventQue[eventHead].EventID = id
- EventQue[eventHead].Data1 = data1
- EventQue[eventHead].Data2 = data2
-
- eventHead = (eventHead + 1) % MAX_EVENTS
-
- return nil
+func unsafePointerAt(data []byte, offset int) unsafe.Pointer {
+ return unsafe.Pointer(&data[offset])
}
-*/
diff --git a/internal/logic.go b/internal/logic.go
new file mode 100644
index 0000000..f6c096f
--- /dev/null
+++ b/internal/logic.go
@@ -0,0 +1,28 @@
+package internal
+
+import (
+ "log/slog"
+)
+
+var (
+ statusArgs lurchArgs = lurchArgs{}
+ status lurchCallback = func(args lurchArgs) error {
+ if len(args) == 0 {
+ slog.Debug("empty args map", "length", len(args))
+ }
+
+ slog.Debug("stuff is happening eventually")
+
+ return nil
+ }
+)
+
+var (
+ spawnWorkerArgs lurchArgs = lurchArgs{"timeout_ns": int64(0)}
+ spawnWorker lurchCallback = func(args lurchArgs) error {
+ // tired TODO: finish impl of SpawnWorker from unix api
+ if dP, ok := args["daemon"].(*Daemon); ok {
+ dP.SpawnWorker()
+ }
+ }
+)
diff --git a/internal/api/lurchql.go b/internal/lurchql.go
index 2f5c125..b4fad40 100644
--- a/internal/api/lurchql.go
+++ b/internal/lurchql.go
@@ -1,6 +1,4 @@
-// Package api: programming interface for encoding and decoding
-// of lurch messages via the Unix Socket.
-package api
+package internal
/* TODO: Keep any actual logic/compute out of this file */
@@ -94,11 +92,16 @@ func Decode(lp interface{}, data []byte) error {
// {"method": "query","function": "get User","params": {"id": 1, "user": "wcole"}}
var fns = lurchLogic{
// using the base rep of the type
- "workerStart": []any{lurchArgs{"id": int(0), "user": ""}, func(args map[string]any) error { return nil }},
+ "spawnWorker": []any{spawnWorkerArgs, spawnWorker},
"status": []any{statusArgs, status},
}
func Compute(ctx context.Context, m *LurchMsg, conn net.Conn) error {
+ d := ctx.Value("daemon")
+ if d != nil {
+ return errors.New("compute: daemon reference not found in context")
+ }
+
a := func(args lurchArgs, pArgs lurchArgs) (lurchArgs, bool) {
buf := lurchArgs{}
for n, v := range args {
@@ -113,6 +116,7 @@ func Compute(ctx context.Context, m *LurchMsg, conn net.Conn) error {
return buf, false
}
+ args["daemon"] = d
return args, true
}
f := func(fn string) error {
diff --git a/internal/ring.go b/internal/ring.go
new file mode 100644
index 0000000..e691949
--- /dev/null
+++ b/internal/ring.go
@@ -0,0 +1,130 @@
+package internal
+
+import (
+ "encoding/binary"
+ "fmt"
+ "sync/atomic"
+)
+
+const (
+ // MaxSlots is the source of truth for ring capacity.
+ MaxSlots = 2560
+
+ // SlotHeaderSize is the fixed per-slot metadata size:
+ // EventTime(8) + EventID(8) + EventKind(4) + Data1(4) + Data2(4) + PayloadLen(4)
+ SlotHeaderSize = 32
+
+ // SlotSize is the total size of one slot, including payload.
+ SlotSize = 65536
+
+ // HeaderSize is the shared ring header: WriteIndex, ReadIndex, EventCount (3x uint64).
+ HeaderSize = 24
+
+ // SizeFile is the total backing file size.
+ SizeFile = HeaderSize + MaxSlots*SlotSize
+)
+
+// SharedRing wraps a ReaderAt and provides atomic access to the shared
+// ring buffer header and slot regions of the mmap'd file.
+type SharedRing struct {
+ r *ReaderAt
+}
+
+func NewSharedRing(r *ReaderAt) *SharedRing {
+ return &SharedRing{r: r}
+}
+
+// header field accessors via unsafe pointers into the mmap'd region.
+// Offsets: WriteIndex=0, ReadIndex=8, EventCount=16.
+
+func (s *SharedRing) writeIndexPtr() *uint64 {
+ return (*uint64)(unsafePointerAt(s.r.data, 0))
+}
+
+func (s *SharedRing) readIndexPtr() *uint64 {
+ return (*uint64)(unsafePointerAt(s.r.data, 8))
+}
+
+func (s *SharedRing) eventCountPtr() *uint64 {
+ return (*uint64)(unsafePointerAt(s.r.data, 16))
+}
+
+// WriteIndex returns the current write index (next slot to be claimed).
+func (s *SharedRing) WriteIndex() uint64 {
+ return atomic.LoadUint64(s.writeIndexPtr())
+}
+
+// ReadIndex returns the current read index (next slot to be consumed).
+func (s *SharedRing) ReadIndex() uint64 {
+ return atomic.LoadUint64(s.readIndexPtr())
+}
+
+// SetReadIndex updates the read index after the consumer processes events.
+func (s *SharedRing) SetReadIndex(idx uint64) {
+ atomic.StoreUint64(s.readIndexPtr(), idx)
+}
+
+// ClaimSlot atomically reserves the next slot for a producer to write into.
+// Returns the byte offset into the mmap'd region and the raw (unwrapped) slot index.
+func (s *SharedRing) ClaimSlot() (offset int64, slotIndex uint64, err error) {
+ idx := atomic.AddUint64(s.writeIndexPtr(), 1) - 1
+ atomic.AddUint64(s.eventCountPtr(), 1)
+
+ wrapped := idx % uint64(MaxSlots)
+ offset = int64(HeaderSize) + int64(wrapped)*int64(SlotSize)
+
+ if offset+int64(SlotSize) > int64(len(s.r.data)) {
+ return 0, 0, fmt.Errorf("claim slot: offset out of range: %d", offset)
+ }
+
+ return offset, idx, nil
+}
+
+// SlotBytes returns the raw slot buffer for the given (already-wrapped)
+// logical slot index, computing its position in the ring.
+func (s *SharedRing) SlotBytes(slotIndex uint64) ([]byte, error) {
+ wrapped := slotIndex % uint64(MaxSlots)
+ offset := int64(HeaderSize) + int64(wrapped)*int64(SlotSize)
+
+ if offset+int64(SlotSize) > int64(len(s.r.data)) {
+ return nil, fmt.Errorf("slot bytes: offset out of range: %d", offset)
+ }
+
+ return s.r.data[offset : offset+int64(SlotSize)], nil
+}
+
+// WriteSlot writes encoded event bytes at the given offset (from ClaimSlot).
+func (s *SharedRing) WriteSlot(offset int64, encoded []byte) error {
+ if len(encoded) != SlotSize {
+ return fmt.Errorf("write slot: encoded size %d != %d", len(encoded), SlotSize)
+ }
+ if offset+int64(SlotSize) > int64(len(s.r.data)) {
+ return fmt.Errorf("write slot: offset out of range: %d", offset)
+ }
+
+ copy(s.r.data[offset:offset+int64(SlotSize)], encoded)
+ return nil
+}
+
+// InitHeader zero-initializes the header region. Call this only when the
+// backing file was just created (size == 0 case in Open).
+func (s *SharedRing) InitHeader() {
+ binary.LittleEndian.PutUint64(s.r.data[0:8], 0)
+ binary.LittleEndian.PutUint64(s.r.data[8:16], 0)
+ binary.LittleEndian.PutUint64(s.r.data[16:24], 0)
+}
+
+func (s *SharedRing) WriteHeader(offset int64, ev SysLurchEventT) error {
+ hdr := make([]byte, SlotHeaderSize)
+ binary.LittleEndian.PutUint64(hdr[0:8], uint64(ev.EventTime))
+ binary.LittleEndian.PutUint64(hdr[8:16], uint64(ev.EventID))
+ binary.LittleEndian.PutUint32(hdr[28:32], 0)
+
+ if offset*int64(SlotHeaderSize) > int64(len(s.r.data)) {
+ return fmt.Errorf("write header: offset is out of range - %d", offset)
+ }
+
+ copy(s.r.data[offset:offset*int64(SlotHeaderSize)], hdr)
+
+ return nil
+}