summaryrefslogtreecommitdiff
path: root/internal/events.go
diff options
context:
space:
mode:
authorWayne-Cole <77279425+Wacky404@users.noreply.github.com>2026-06-13 20:26:39 -0500
committerWayne-Cole <77279425+Wacky404@users.noreply.github.com>2026-06-13 20:26:39 -0500
commit1cfd8a143107ab4bc50092daa45407196df0b75d (patch)
treec25097cdcfe39a875dad3eb483c59a95c5e8b1be /internal/events.go
parentbb0d5fecf8d839efa0e89c33d310c5202c6f8919 (diff)
downloadlurchers-1cfd8a143107ab4bc50092daa45407196df0b75d.tar.xz
lurchers-1cfd8a143107ab4bc50092daa45407196df0b75d.zip
update: dove deep into unix api and made the base orchestration system
Diffstat (limited to 'internal/events.go')
-rw-r--r--internal/events.go199
1 files changed, 199 insertions, 0 deletions
diff --git a/internal/events.go b/internal/events.go
new file mode 100644
index 0000000..56ac1cb
--- /dev/null
+++ b/internal/events.go
@@ -0,0 +1,199 @@
+package internal
+
+import (
+ "encoding/binary"
+ "fmt"
+ "time"
+)
+
+type EventType int
+
+const (
+ ChldProcStart = iota
+ ChldProcDone
+ ChldProcFailed
+ ChldProcHurt
+ ChldProcHealing
+ ChldProcHealed
+ ChldProcTimedOut
+)
+
+var eventName = map[EventType]string{
+ ChldProcStart: "child_start",
+ ChldProcDone: "child_done",
+ ChldProcFailed: "child_failed",
+ ChldProcHurt: "child_hurt",
+ ChldProcHealing: "child_healing",
+ ChldProcHealed: "child_healed",
+ ChldProcTimedOut: "child_timedout",
+}
+
+func (et EventType) String() string {
+ return eventName[et]
+}
+
+// SysLurchEventT mirrors the on-disk slot layout (32-byte header + payload).
+type SysLurchEventT struct {
+ EventTime int64
+ EventID int64
+ EventKind EventType
+ Data1 int32
+ Data2 int32
+ PayloadLen uint32
+ Payload []byte
+}
+
+// EncodeEvent serializes ev into buf (must be >= SlotSize).
+// Payload is truncated to fit if it exceeds the slot's payload capacity.
+func EncodeEvent(buf []byte, ev SysLurchEventT) error {
+ if len(buf) < SlotSize {
+ return fmt.Errorf("encode event: buffer too small: %d < %d", len(buf), SlotSize)
+ }
+
+ payloadCap := SlotSize - SlotHeaderSize
+ payload := ev.Payload
+ if len(payload) > payloadCap {
+ payload = payload[:payloadCap]
+ }
+
+ binary.LittleEndian.PutUint64(buf[0:8], uint64(ev.EventTime))
+ binary.LittleEndian.PutUint64(buf[8:16], uint64(ev.EventID))
+ binary.LittleEndian.PutUint32(buf[16:20], uint32(ev.EventKind))
+ binary.LittleEndian.PutUint32(buf[20:24], uint32(ev.Data1))
+ binary.LittleEndian.PutUint32(buf[24:28], uint32(ev.Data2))
+ binary.LittleEndian.PutUint32(buf[28:32], uint32(len(payload)))
+
+ copy(buf[SlotHeaderSize:], payload)
+
+ // zero any leftover bytes from a previous longer write at this slot
+ for i := SlotHeaderSize + len(payload); i < SlotSize; i++ {
+ buf[i] = 0
+ }
+
+ return nil
+}
+
+// DecodeEvent deserializes a SysLurchEvent_t from buf (must be >= SlotSize).
+func DecodeEvent(buf []byte) (SysLurchEventT, error) {
+ var ev SysLurchEventT
+ if len(buf) < SlotSize {
+ return ev, fmt.Errorf("decode event: buffer too small: %d < %d", len(buf), SlotSize)
+ }
+
+ ev.EventTime = int64(binary.LittleEndian.Uint64(buf[0:8]))
+ ev.EventID = int64(binary.LittleEndian.Uint64(buf[8:16]))
+ ev.EventKind = EventType(binary.LittleEndian.Uint32(buf[16:20]))
+ ev.Data1 = int32(binary.LittleEndian.Uint32(buf[20:24]))
+ ev.Data2 = int32(binary.LittleEndian.Uint32(buf[24:28]))
+ ev.PayloadLen = binary.LittleEndian.Uint32(buf[28:32])
+
+ if ev.PayloadLen > uint32(SlotSize-SlotHeaderSize) {
+ return ev, fmt.Errorf("decode event: invalid payload length %d", ev.PayloadLen)
+ }
+
+ payload := make([]byte, ev.PayloadLen)
+ copy(payload, buf[SlotHeaderSize:SlotHeaderSize+ev.PayloadLen])
+ ev.Payload = payload
+
+ return ev, nil
+}
+
+type pendingEntry struct {
+ deadline time.Time
+}
+
+// WatchMen is the in-process consumer view over a SharedRing.
+type WatchMen struct {
+ ring *SharedRing
+ pending map[uint64]pendingEntry
+}
+
+func (w *WatchMen) MarkPending(slotIdx uint64, deadline time.Time) {
+ w.pending[slotIdx] = pendingEntry{deadline: deadline}
+}
+
+func NewWatchMen(ring *SharedRing) *WatchMen {
+ return &WatchMen{
+ ring: ring,
+ pending: make(map[uint64]pendingEntry),
+ }
+}
+
+// Ingest reads all unread events from the ring, advances ReadIndex, and
+// returns them in order. Returns an error if the consumer has fallen behind
+// and data has been overwritten (overflow).
+func (w *WatchMen) Ingest() ([]SysLurchEventT, error) {
+ writeIdx := w.ring.WriteIndex()
+ readIdx := w.ring.ReadIndex()
+
+ if writeIdx < readIdx {
+ return nil, fmt.Errorf("ingest: corrupt indices: write %d < read %d", writeIdx, readIdx)
+ }
+
+ total := writeIdx - readIdx
+ if total == 0 {
+ return nil, nil
+ }
+ if total > uint64(MaxSlots) {
+ skipped := total - uint64(MaxSlots)
+ for s := readIdx; s < readIdx+skipped; s++ {
+ delete(w.pending, s)
+ }
+ readIdx += skipped
+ total = uint64(MaxSlots)
+ // TODO: caller needs to log skipped as dropped events
+ }
+
+ events := make([]SysLurchEventT, 0, total)
+ resolved := make(map[uint64]bool, total)
+
+ // first pass is decoding every slot in range, collect the resolved ones
+ for i := uint64(0); i < total; i++ {
+ slotIdx := readIdx + i
+
+ buf, err := w.ring.SlotBytes(slotIdx)
+ if err != nil {
+ return events, fmt.Errorf("ingest: read slot %d: %v", slotIdx, err)
+ }
+
+ ev, err := DecodeEvent(buf)
+ if err != nil {
+ return events, fmt.Errorf("ingest: decode slot %d: %v", slotIdx, err)
+ }
+
+ if ev.PayloadLen == 0 {
+
+ entry, tracked := w.pending[slotIdx]
+ if tracked && time.Now().After(entry.deadline) {
+ // means that this is still pending; worker is not done yet
+ // or it has timed out
+ delete(w.pending, slotIdx)
+ resolved[slotIdx] = true
+ ev.EventKind = ChldProcTimedOut
+ events = append(events, ev)
+ continue
+ }
+
+ w.pending[slotIdx] = entry // keeps tracking; deadline unchanged
+ continue
+ }
+
+ // rdy
+ delete(w.pending, slotIdx)
+ resolved[slotIdx] = true
+ events = append(events, ev)
+ }
+
+ // second pass advances the ReadIndex past the longest resolved
+ // prefix starting at readIdx
+ advanceTo := readIdx
+ for resolved[advanceTo] {
+ advanceTo++
+ }
+
+ if advanceTo > readIdx {
+ w.ring.SetReadIndex(advanceTo)
+ }
+
+ return events, nil
+}