diff options
| -rw-r--r-- | cmd/lurchers/main.go | 92 | ||||
| -rw-r--r-- | howlers/src/scripts/workspace_1/example.py | 0 | ||||
| -rw-r--r-- | internal/api/logic.go | 16 | ||||
| -rw-r--r-- | internal/daemon.go | 82 | ||||
| -rw-r--r-- | internal/events.go | 199 | ||||
| -rw-r--r-- | internal/lcommon.go | 111 | ||||
| -rw-r--r-- | internal/logic.go | 28 | ||||
| -rw-r--r-- | internal/lurchql.go (renamed from internal/api/lurchql.go) | 12 | ||||
| -rw-r--r-- | internal/ring.go | 130 |
9 files changed, 485 insertions, 185 deletions
diff --git a/cmd/lurchers/main.go b/cmd/lurchers/main.go index 57d2019..ca45eaf 100644 --- a/cmd/lurchers/main.go +++ b/cmd/lurchers/main.go @@ -1,22 +1,16 @@ package main import ( - "bufio" "context" - "fmt" "log" "log/slog" "net" "os" "os/signal" "runtime" - "sync" - "sync/atomic" "syscall" - "time" "lurchers/internal" - "lurchers/internal/api" ) var ( @@ -37,12 +31,6 @@ func init() { } } -type Daemon struct { - mem *internal.ReaderAt - workerWg sync.WaitGroup - workerCount atomic.Int64 -} - func main() { if _, err := os.Stat(FileLogs); os.IsNotExist(err) { fpath := "/var/log/lurchers/" @@ -54,14 +42,6 @@ func main() { defer f.Close() } - if _, err := os.Stat(FileMem); os.IsNotExist(err) { - m, err := os.OpenFile(FileMem, os.O_RDWR|os.O_CREATE, 0o644) - if err != nil { - log.Fatal(err) - } - defer m.Close() - } - logFile, err := internal.SetupLogger(internal.WithLogName(FileLogs)) if err != nil { log.Fatal("setuplogger: error setting up logger", err) @@ -81,14 +61,23 @@ func main() { slog.Error("failed to listen", "error", err) os.Exit(1) } + // allows non-root to connect to socket os.Chmod(SocketPath, 0o666) defer ln.Close() - slog.Info("daemon started", "socket", SocketPath) - ctx, cancel := context.WithCancel(context.Background()) + // idea of workspaces: Workspace 1, 2, 3, ..., N + // default ring + watchmen for init workspace + ring := internal.NewSharedRing(mem) + watchmen := internal.NewWatchMen(ring) + d := &internal.Daemon{ + Ring: ring, + Watchmen: watchmen, + Ctx: ctx, + } + sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) go func() { @@ -98,15 +87,17 @@ func main() { ln.Close() }() - d := &Daemon{mem: mem} + slog.Info("daemon started", "socket", SocketPath) + // a reference to the daemon in context + ctxD := context.WithValue(ctx, internal.LurchersContextKey("daemon"), &d) for { conn, err := ln.Accept() if err != nil { select { case <-ctx.Done(): slog.Info("waiting for workers to finish...") - d.workerWg.Wait() + d.WorkerWg.Wait() slog.Info("daemon stopped") return default: @@ -114,57 +105,6 @@ func main() { continue } } - go d.handleConn(ctx, conn) + go d.HandleConn(ctxD, conn) } } - -// {"method": "query","function": "status","params": {"id": 1, "user": "wcole"}} -func (d *Daemon) handleConn(ctx context.Context, conn net.Conn) { - defer conn.Close() - - scanner := bufio.NewScanner(conn) - for scanner.Scan() { - var msg api.LurchMsg - - if err := api.Decode(&msg, scanner.Bytes()); err != nil { - slog.Error("decode: failed to decode", "error", err) - fmt.Fprintln(conn, "error: invalid json") - continue - } - - slog.Info("received", "json", msg) - - if err := api.Compute(ctx, &msg, conn); err != nil { - slog.Error("parse: failed to parse lurch logic", "error", err) - fmt.Fprintln(conn, "error: failed to parse") - continue - } - } - - if err := scanner.Err(); err != nil { - slog.Error("scanner: something broke, fix it", "error", err) - } -} - -func (d *Daemon) worker(ctx context.Context) { - defer d.workerWg.Done() - defer d.workerCount.Add(-1) - - // replace eventually with shared memory polling - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - slog.Info("worker shutting down") - return - case <-ticker.C: - go doWork() - } - } -} - -func doWork() { - time.Sleep(5 * time.Second) -} diff --git a/howlers/src/scripts/workspace_1/example.py b/howlers/src/scripts/workspace_1/example.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/howlers/src/scripts/workspace_1/example.py diff --git a/internal/api/logic.go b/internal/api/logic.go deleted file mode 100644 index e60ec50..0000000 --- a/internal/api/logic.go +++ /dev/null @@ -1,16 +0,0 @@ -package api - -import "log/slog" - -var ( - statusArgs lurchArgs = lurchArgs{} - status lurchCallback = func(args lurchArgs) error { - if len(args) == 0 { - slog.Debug("empty args map", "length", len(args)) - } - - slog.Debug("stuff is happening eventually") - - return nil - } -) diff --git a/internal/daemon.go b/internal/daemon.go new file mode 100644 index 0000000..cc202f5 --- /dev/null +++ b/internal/daemon.go @@ -0,0 +1,82 @@ +package internal + +import ( + "bufio" + "context" + "fmt" + "log/slog" + "net" + "os/exec" + "sync" + "sync/atomic" + "time" + + "lurchers/internal/api" +) + +type Daemon struct { + Ring *SharedRing + Watchmen *WatchMen + Ctx context.Context + WorkerWg sync.WaitGroup + WorkerCount atomic.Int64 +} + +// {"method": "query","function": "status","params": {"id": 1, "user": "wcole"}} +func (d *Daemon) HandleConn(ctx context.Context, conn net.Conn) { + defer conn.Close() + + scanner := bufio.NewScanner(conn) + for scanner.Scan() { + var msg api.LurchMsg + + if err := api.Decode(&msg, scanner.Bytes()); err != nil { + slog.Error("decode: failed to decode", "error", err) + fmt.Fprintln(conn, "error: invalid json") + continue + } + + slog.Info("received", "json", msg) + + if err := api.Compute(ctx, &msg, conn); err != nil { + slog.Error("parse: failed to parse lurch logic", "error", err) + fmt.Fprintln(conn, "error: failed to parse") + continue + } + } + + if err := scanner.Err(); err != nil { + slog.Error("scanner: something broke, fix it", "error", err) + } +} + +func (d *Daemon) SpawnWorker(ev SysLurchEventT, timeout time.Duration) (uint64, error) { + offset, slotIdx, err := d.Ring.ClaimSlot() + if err != nil { + return 0, err + } + + if err := d.Ring.WriteHeader(offset, ev); err != nil { + return 0, err + } + + d.Watchmen.MarkPending(slotIdx, time.Now().Add(timeout)) + payloadOffset := offset + SlotHeaderSize + + ctx, cancel := context.WithTimeout(d.Ctx, timeout) + cmd := exec.CommandContext(ctx, "jac", "howler,.py", fmt.Sprintf("%d", payloadOffset)) + + // explicit tracking of workers + d.WorkerWg.Add(1) + d.WorkerCount.Add(1) + go func() { + defer d.WorkerWg.Done() + defer d.WorkerCount.Add(-1) + defer cancel() + if err := cmd.Run(); err != nil { + slog.Error("spawn worker: worker failed to spawn / timed out on ingest deadline", "slotIdx", slotIdx) + } + }() + + return slotIdx, nil +} diff --git a/internal/events.go b/internal/events.go new file mode 100644 index 0000000..56ac1cb --- /dev/null +++ b/internal/events.go @@ -0,0 +1,199 @@ +package internal + +import ( + "encoding/binary" + "fmt" + "time" +) + +type EventType int + +const ( + ChldProcStart = iota + ChldProcDone + ChldProcFailed + ChldProcHurt + ChldProcHealing + ChldProcHealed + ChldProcTimedOut +) + +var eventName = map[EventType]string{ + ChldProcStart: "child_start", + ChldProcDone: "child_done", + ChldProcFailed: "child_failed", + ChldProcHurt: "child_hurt", + ChldProcHealing: "child_healing", + ChldProcHealed: "child_healed", + ChldProcTimedOut: "child_timedout", +} + +func (et EventType) String() string { + return eventName[et] +} + +// SysLurchEventT mirrors the on-disk slot layout (32-byte header + payload). +type SysLurchEventT struct { + EventTime int64 + EventID int64 + EventKind EventType + Data1 int32 + Data2 int32 + PayloadLen uint32 + Payload []byte +} + +// EncodeEvent serializes ev into buf (must be >= SlotSize). +// Payload is truncated to fit if it exceeds the slot's payload capacity. +func EncodeEvent(buf []byte, ev SysLurchEventT) error { + if len(buf) < SlotSize { + return fmt.Errorf("encode event: buffer too small: %d < %d", len(buf), SlotSize) + } + + payloadCap := SlotSize - SlotHeaderSize + payload := ev.Payload + if len(payload) > payloadCap { + payload = payload[:payloadCap] + } + + binary.LittleEndian.PutUint64(buf[0:8], uint64(ev.EventTime)) + binary.LittleEndian.PutUint64(buf[8:16], uint64(ev.EventID)) + binary.LittleEndian.PutUint32(buf[16:20], uint32(ev.EventKind)) + binary.LittleEndian.PutUint32(buf[20:24], uint32(ev.Data1)) + binary.LittleEndian.PutUint32(buf[24:28], uint32(ev.Data2)) + binary.LittleEndian.PutUint32(buf[28:32], uint32(len(payload))) + + copy(buf[SlotHeaderSize:], payload) + + // zero any leftover bytes from a previous longer write at this slot + for i := SlotHeaderSize + len(payload); i < SlotSize; i++ { + buf[i] = 0 + } + + return nil +} + +// DecodeEvent deserializes a SysLurchEvent_t from buf (must be >= SlotSize). +func DecodeEvent(buf []byte) (SysLurchEventT, error) { + var ev SysLurchEventT + if len(buf) < SlotSize { + return ev, fmt.Errorf("decode event: buffer too small: %d < %d", len(buf), SlotSize) + } + + ev.EventTime = int64(binary.LittleEndian.Uint64(buf[0:8])) + ev.EventID = int64(binary.LittleEndian.Uint64(buf[8:16])) + ev.EventKind = EventType(binary.LittleEndian.Uint32(buf[16:20])) + ev.Data1 = int32(binary.LittleEndian.Uint32(buf[20:24])) + ev.Data2 = int32(binary.LittleEndian.Uint32(buf[24:28])) + ev.PayloadLen = binary.LittleEndian.Uint32(buf[28:32]) + + if ev.PayloadLen > uint32(SlotSize-SlotHeaderSize) { + return ev, fmt.Errorf("decode event: invalid payload length %d", ev.PayloadLen) + } + + payload := make([]byte, ev.PayloadLen) + copy(payload, buf[SlotHeaderSize:SlotHeaderSize+ev.PayloadLen]) + ev.Payload = payload + + return ev, nil +} + +type pendingEntry struct { + deadline time.Time +} + +// WatchMen is the in-process consumer view over a SharedRing. +type WatchMen struct { + ring *SharedRing + pending map[uint64]pendingEntry +} + +func (w *WatchMen) MarkPending(slotIdx uint64, deadline time.Time) { + w.pending[slotIdx] = pendingEntry{deadline: deadline} +} + +func NewWatchMen(ring *SharedRing) *WatchMen { + return &WatchMen{ + ring: ring, + pending: make(map[uint64]pendingEntry), + } +} + +// Ingest reads all unread events from the ring, advances ReadIndex, and +// returns them in order. Returns an error if the consumer has fallen behind +// and data has been overwritten (overflow). +func (w *WatchMen) Ingest() ([]SysLurchEventT, error) { + writeIdx := w.ring.WriteIndex() + readIdx := w.ring.ReadIndex() + + if writeIdx < readIdx { + return nil, fmt.Errorf("ingest: corrupt indices: write %d < read %d", writeIdx, readIdx) + } + + total := writeIdx - readIdx + if total == 0 { + return nil, nil + } + if total > uint64(MaxSlots) { + skipped := total - uint64(MaxSlots) + for s := readIdx; s < readIdx+skipped; s++ { + delete(w.pending, s) + } + readIdx += skipped + total = uint64(MaxSlots) + // TODO: caller needs to log skipped as dropped events + } + + events := make([]SysLurchEventT, 0, total) + resolved := make(map[uint64]bool, total) + + // first pass is decoding every slot in range, collect the resolved ones + for i := uint64(0); i < total; i++ { + slotIdx := readIdx + i + + buf, err := w.ring.SlotBytes(slotIdx) + if err != nil { + return events, fmt.Errorf("ingest: read slot %d: %v", slotIdx, err) + } + + ev, err := DecodeEvent(buf) + if err != nil { + return events, fmt.Errorf("ingest: decode slot %d: %v", slotIdx, err) + } + + if ev.PayloadLen == 0 { + + entry, tracked := w.pending[slotIdx] + if tracked && time.Now().After(entry.deadline) { + // means that this is still pending; worker is not done yet + // or it has timed out + delete(w.pending, slotIdx) + resolved[slotIdx] = true + ev.EventKind = ChldProcTimedOut + events = append(events, ev) + continue + } + + w.pending[slotIdx] = entry // keeps tracking; deadline unchanged + continue + } + + // rdy + delete(w.pending, slotIdx) + resolved[slotIdx] = true + events = append(events, ev) + } + + // second pass advances the ReadIndex past the longest resolved + // prefix starting at readIdx + advanceTo := readIdx + for resolved[advanceTo] { + advanceTo++ + } + + if advanceTo > readIdx { + w.ring.SetReadIndex(advanceTo) + } + + return events, nil +} diff --git a/internal/lcommon.go b/internal/lcommon.go index 49c5b28..f6a846f 100644 --- a/internal/lcommon.go +++ b/internal/lcommon.go @@ -1,4 +1,4 @@ -/* +/* Package internal * [https://cs.opensource.google/go/x/exp/+/master:mmap/mmap_unix.go] * for mmap implementation that I borrowed */ @@ -13,8 +13,11 @@ import ( "os" "runtime" "syscall" + "unsafe" ) +type LurchersContextKey string + func GetVar(key string, fallback string) string { val, ok := os.LookupEnv(key) if !ok { @@ -128,63 +131,9 @@ func WithFileLevel(level slog.Level) func(*options) { * ======================================================= */ const ( - MAX_EVENTS int = 256 - MAX_CHLD_PROC int = 10000 - SIZE_RING_BUFF int = 16384 - SIZE_FILE int = SIZE_RING_BUFF * MAX_CHLD_PROC // 160 mb - debug bool = false -) - -type EventType int - -const ( - CHLD_PROC_START = iota - CHLD_PROC_DONE - CHLD_PROC_FAILED - CHLD_PROC_HURT - CHLD_PROC_HEALING - CHLD_PROC_HEALED + debug bool = false ) -var eventName = map[EventType]string{ - CHLD_PROC_START: "child_start", - CHLD_PROC_DONE: "child_done", - CHLD_PROC_FAILED: "child_failed", - CHLD_PROC_HURT: "child_hurt", - CHLD_PROC_HEALING: "child_healing", - CHLD_PROC_HEALED: "child_healed", -} - -func (et EventType) String() string { - return eventName[et] -} - -type SysLurchEvent_t struct { - EventTime int - EventID int - Data1, Data2 int - EventKind EventType -} - -type WatchMen struct { - eventHead int - eventTail int - EventsTotal int - EventQue [MAX_EVENTS]SysLurchEvent_t -} - -// single consumer -func (w *WatchMen) SysGetEvent() *SysLurchEvent_t { - if (*w).eventHead == (*w).eventTail { - return nil // buffer is empty - } - - ev := &(*w).EventQue[(*w).eventTail] - (*w).eventTail = ((*w).eventTail + 1) % MAX_EVENTS - - return ev -} - // ReaderAt reads a memory-mapped file (.mem) // Like any io.ReaderAt, clients can execute parallel ReadAt calls, but it is // not safe to call Close and reading methods concurrently. @@ -193,7 +142,7 @@ type ReaderAt struct { file *os.File } -// implements the io.ReaderAt interface +// ReadAt implements the io.ReaderAt interface func (r *ReaderAt) ReadAt(p []byte, offset int64) (int, error) { if r.data == nil { return 0, errors.New("mmap: closed") @@ -242,9 +191,6 @@ func (r *ReaderAt) Close() error { } func Open(filename string) (*ReaderAt, error) { - // just opening a file that will store - // bytes of data that parent proc and - // and child proc(s) share f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0o666) if err != nil { return nil, fmt.Errorf("open: %v", err) @@ -256,18 +202,21 @@ func Open(filename string) (*ReaderAt, error) { return nil, fmt.Errorf("stat: %v", err) } - // size of the memory file in question size := fs.Size() + freshFile := false + if size == 0 { - // Treat (size == 0) as a special case, truncating the - // file to the specified file size, in the case that the - // file is new; upon first run of program. - if err := f.Truncate(int64(SIZE_FILE)); err != nil { + if err := f.Truncate(int64(SizeFile)); err != nil { f.Close() return nil, fmt.Errorf("truncate: %v", err) } + size = int64(SizeFile) + freshFile = true + } - size = int64(SIZE_FILE) + if size != int64(SizeFile) { + f.Close() + return nil, fmt.Errorf("mmap: file %q has size %d, expected %d (format/version mismatch)", filename, size, SizeFile) } if size < 0 { f.Close() @@ -289,6 +238,11 @@ func Open(filename string) (*ReaderAt, error) { file: f, } + if freshFile { + ring := NewSharedRing(r) + ring.InitHeader() + } + if debug { var p *byte if len(data) != 0 { @@ -302,27 +256,6 @@ func Open(filename string) (*ReaderAt, error) { return r, nil } -/* -func isBufferFull(eventHead int, eventTail int) bool { - return ((eventHead + 1) % MAX_EVENTS) == eventTail -} - -// single producer -func SysQueueEvent(time int, kind EventType, id int, data1 int, data2 int) error { - if IsBufferFull() { - // overflow protection - fmt.Printf("warning: Event queue overflow! Event %s dropped\n", kind.String()) - return fmt.Errorf("warning: Event queue overflow! Event %s dropped", kind.String()) - } - - EventQue[eventHead].EventTime = time - EventQue[eventHead].EventKind = kind - EventQue[eventHead].EventID = id - EventQue[eventHead].Data1 = data1 - EventQue[eventHead].Data2 = data2 - - eventHead = (eventHead + 1) % MAX_EVENTS - - return nil +func unsafePointerAt(data []byte, offset int) unsafe.Pointer { + return unsafe.Pointer(&data[offset]) } -*/ diff --git a/internal/logic.go b/internal/logic.go new file mode 100644 index 0000000..f6c096f --- /dev/null +++ b/internal/logic.go @@ -0,0 +1,28 @@ +package internal + +import ( + "log/slog" +) + +var ( + statusArgs lurchArgs = lurchArgs{} + status lurchCallback = func(args lurchArgs) error { + if len(args) == 0 { + slog.Debug("empty args map", "length", len(args)) + } + + slog.Debug("stuff is happening eventually") + + return nil + } +) + +var ( + spawnWorkerArgs lurchArgs = lurchArgs{"timeout_ns": int64(0)} + spawnWorker lurchCallback = func(args lurchArgs) error { + // tired TODO: finish impl of SpawnWorker from unix api + if dP, ok := args["daemon"].(*Daemon); ok { + dP.SpawnWorker() + } + } +) diff --git a/internal/api/lurchql.go b/internal/lurchql.go index 2f5c125..b4fad40 100644 --- a/internal/api/lurchql.go +++ b/internal/lurchql.go @@ -1,6 +1,4 @@ -// Package api: programming interface for encoding and decoding -// of lurch messages via the Unix Socket. -package api +package internal /* TODO: Keep any actual logic/compute out of this file */ @@ -94,11 +92,16 @@ func Decode(lp interface{}, data []byte) error { // {"method": "query","function": "get User","params": {"id": 1, "user": "wcole"}} var fns = lurchLogic{ // using the base rep of the type - "workerStart": []any{lurchArgs{"id": int(0), "user": ""}, func(args map[string]any) error { return nil }}, + "spawnWorker": []any{spawnWorkerArgs, spawnWorker}, "status": []any{statusArgs, status}, } func Compute(ctx context.Context, m *LurchMsg, conn net.Conn) error { + d := ctx.Value("daemon") + if d != nil { + return errors.New("compute: daemon reference not found in context") + } + a := func(args lurchArgs, pArgs lurchArgs) (lurchArgs, bool) { buf := lurchArgs{} for n, v := range args { @@ -113,6 +116,7 @@ func Compute(ctx context.Context, m *LurchMsg, conn net.Conn) error { return buf, false } + args["daemon"] = d return args, true } f := func(fn string) error { diff --git a/internal/ring.go b/internal/ring.go new file mode 100644 index 0000000..e691949 --- /dev/null +++ b/internal/ring.go @@ -0,0 +1,130 @@ +package internal + +import ( + "encoding/binary" + "fmt" + "sync/atomic" +) + +const ( + // MaxSlots is the source of truth for ring capacity. + MaxSlots = 2560 + + // SlotHeaderSize is the fixed per-slot metadata size: + // EventTime(8) + EventID(8) + EventKind(4) + Data1(4) + Data2(4) + PayloadLen(4) + SlotHeaderSize = 32 + + // SlotSize is the total size of one slot, including payload. + SlotSize = 65536 + + // HeaderSize is the shared ring header: WriteIndex, ReadIndex, EventCount (3x uint64). + HeaderSize = 24 + + // SizeFile is the total backing file size. + SizeFile = HeaderSize + MaxSlots*SlotSize +) + +// SharedRing wraps a ReaderAt and provides atomic access to the shared +// ring buffer header and slot regions of the mmap'd file. +type SharedRing struct { + r *ReaderAt +} + +func NewSharedRing(r *ReaderAt) *SharedRing { + return &SharedRing{r: r} +} + +// header field accessors via unsafe pointers into the mmap'd region. +// Offsets: WriteIndex=0, ReadIndex=8, EventCount=16. + +func (s *SharedRing) writeIndexPtr() *uint64 { + return (*uint64)(unsafePointerAt(s.r.data, 0)) +} + +func (s *SharedRing) readIndexPtr() *uint64 { + return (*uint64)(unsafePointerAt(s.r.data, 8)) +} + +func (s *SharedRing) eventCountPtr() *uint64 { + return (*uint64)(unsafePointerAt(s.r.data, 16)) +} + +// WriteIndex returns the current write index (next slot to be claimed). +func (s *SharedRing) WriteIndex() uint64 { + return atomic.LoadUint64(s.writeIndexPtr()) +} + +// ReadIndex returns the current read index (next slot to be consumed). +func (s *SharedRing) ReadIndex() uint64 { + return atomic.LoadUint64(s.readIndexPtr()) +} + +// SetReadIndex updates the read index after the consumer processes events. +func (s *SharedRing) SetReadIndex(idx uint64) { + atomic.StoreUint64(s.readIndexPtr(), idx) +} + +// ClaimSlot atomically reserves the next slot for a producer to write into. +// Returns the byte offset into the mmap'd region and the raw (unwrapped) slot index. +func (s *SharedRing) ClaimSlot() (offset int64, slotIndex uint64, err error) { + idx := atomic.AddUint64(s.writeIndexPtr(), 1) - 1 + atomic.AddUint64(s.eventCountPtr(), 1) + + wrapped := idx % uint64(MaxSlots) + offset = int64(HeaderSize) + int64(wrapped)*int64(SlotSize) + + if offset+int64(SlotSize) > int64(len(s.r.data)) { + return 0, 0, fmt.Errorf("claim slot: offset out of range: %d", offset) + } + + return offset, idx, nil +} + +// SlotBytes returns the raw slot buffer for the given (already-wrapped) +// logical slot index, computing its position in the ring. +func (s *SharedRing) SlotBytes(slotIndex uint64) ([]byte, error) { + wrapped := slotIndex % uint64(MaxSlots) + offset := int64(HeaderSize) + int64(wrapped)*int64(SlotSize) + + if offset+int64(SlotSize) > int64(len(s.r.data)) { + return nil, fmt.Errorf("slot bytes: offset out of range: %d", offset) + } + + return s.r.data[offset : offset+int64(SlotSize)], nil +} + +// WriteSlot writes encoded event bytes at the given offset (from ClaimSlot). +func (s *SharedRing) WriteSlot(offset int64, encoded []byte) error { + if len(encoded) != SlotSize { + return fmt.Errorf("write slot: encoded size %d != %d", len(encoded), SlotSize) + } + if offset+int64(SlotSize) > int64(len(s.r.data)) { + return fmt.Errorf("write slot: offset out of range: %d", offset) + } + + copy(s.r.data[offset:offset+int64(SlotSize)], encoded) + return nil +} + +// InitHeader zero-initializes the header region. Call this only when the +// backing file was just created (size == 0 case in Open). +func (s *SharedRing) InitHeader() { + binary.LittleEndian.PutUint64(s.r.data[0:8], 0) + binary.LittleEndian.PutUint64(s.r.data[8:16], 0) + binary.LittleEndian.PutUint64(s.r.data[16:24], 0) +} + +func (s *SharedRing) WriteHeader(offset int64, ev SysLurchEventT) error { + hdr := make([]byte, SlotHeaderSize) + binary.LittleEndian.PutUint64(hdr[0:8], uint64(ev.EventTime)) + binary.LittleEndian.PutUint64(hdr[8:16], uint64(ev.EventID)) + binary.LittleEndian.PutUint32(hdr[28:32], 0) + + if offset*int64(SlotHeaderSize) > int64(len(s.r.data)) { + return fmt.Errorf("write header: offset is out of range - %d", offset) + } + + copy(s.r.data[offset:offset*int64(SlotHeaderSize)], hdr) + + return nil +} |
