diff options
| author | Wayne-Cole <77279425+Wacky404@users.noreply.github.com> | 2026-06-13 20:26:39 -0500 |
|---|---|---|
| committer | Wayne-Cole <77279425+Wacky404@users.noreply.github.com> | 2026-06-13 20:26:39 -0500 |
| commit | 1cfd8a143107ab4bc50092daa45407196df0b75d (patch) | |
| tree | c25097cdcfe39a875dad3eb483c59a95c5e8b1be /internal/events.go | |
| parent | bb0d5fecf8d839efa0e89c33d310c5202c6f8919 (diff) | |
| download | lurchers-1cfd8a143107ab4bc50092daa45407196df0b75d.tar.xz lurchers-1cfd8a143107ab4bc50092daa45407196df0b75d.zip | |
update: dove deep into unix api and made the base orchestration system
Diffstat (limited to 'internal/events.go')
| -rw-r--r-- | internal/events.go | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/internal/events.go b/internal/events.go new file mode 100644 index 0000000..56ac1cb --- /dev/null +++ b/internal/events.go @@ -0,0 +1,199 @@ +package internal + +import ( + "encoding/binary" + "fmt" + "time" +) + +type EventType int + +const ( + ChldProcStart = iota + ChldProcDone + ChldProcFailed + ChldProcHurt + ChldProcHealing + ChldProcHealed + ChldProcTimedOut +) + +var eventName = map[EventType]string{ + ChldProcStart: "child_start", + ChldProcDone: "child_done", + ChldProcFailed: "child_failed", + ChldProcHurt: "child_hurt", + ChldProcHealing: "child_healing", + ChldProcHealed: "child_healed", + ChldProcTimedOut: "child_timedout", +} + +func (et EventType) String() string { + return eventName[et] +} + +// SysLurchEventT mirrors the on-disk slot layout (32-byte header + payload). +type SysLurchEventT struct { + EventTime int64 + EventID int64 + EventKind EventType + Data1 int32 + Data2 int32 + PayloadLen uint32 + Payload []byte +} + +// EncodeEvent serializes ev into buf (must be >= SlotSize). +// Payload is truncated to fit if it exceeds the slot's payload capacity. +func EncodeEvent(buf []byte, ev SysLurchEventT) error { + if len(buf) < SlotSize { + return fmt.Errorf("encode event: buffer too small: %d < %d", len(buf), SlotSize) + } + + payloadCap := SlotSize - SlotHeaderSize + payload := ev.Payload + if len(payload) > payloadCap { + payload = payload[:payloadCap] + } + + binary.LittleEndian.PutUint64(buf[0:8], uint64(ev.EventTime)) + binary.LittleEndian.PutUint64(buf[8:16], uint64(ev.EventID)) + binary.LittleEndian.PutUint32(buf[16:20], uint32(ev.EventKind)) + binary.LittleEndian.PutUint32(buf[20:24], uint32(ev.Data1)) + binary.LittleEndian.PutUint32(buf[24:28], uint32(ev.Data2)) + binary.LittleEndian.PutUint32(buf[28:32], uint32(len(payload))) + + copy(buf[SlotHeaderSize:], payload) + + // zero any leftover bytes from a previous longer write at this slot + for i := SlotHeaderSize + len(payload); i < SlotSize; i++ { + buf[i] = 0 + } + + return nil +} + +// DecodeEvent deserializes a SysLurchEvent_t from buf (must be >= SlotSize). +func DecodeEvent(buf []byte) (SysLurchEventT, error) { + var ev SysLurchEventT + if len(buf) < SlotSize { + return ev, fmt.Errorf("decode event: buffer too small: %d < %d", len(buf), SlotSize) + } + + ev.EventTime = int64(binary.LittleEndian.Uint64(buf[0:8])) + ev.EventID = int64(binary.LittleEndian.Uint64(buf[8:16])) + ev.EventKind = EventType(binary.LittleEndian.Uint32(buf[16:20])) + ev.Data1 = int32(binary.LittleEndian.Uint32(buf[20:24])) + ev.Data2 = int32(binary.LittleEndian.Uint32(buf[24:28])) + ev.PayloadLen = binary.LittleEndian.Uint32(buf[28:32]) + + if ev.PayloadLen > uint32(SlotSize-SlotHeaderSize) { + return ev, fmt.Errorf("decode event: invalid payload length %d", ev.PayloadLen) + } + + payload := make([]byte, ev.PayloadLen) + copy(payload, buf[SlotHeaderSize:SlotHeaderSize+ev.PayloadLen]) + ev.Payload = payload + + return ev, nil +} + +type pendingEntry struct { + deadline time.Time +} + +// WatchMen is the in-process consumer view over a SharedRing. +type WatchMen struct { + ring *SharedRing + pending map[uint64]pendingEntry +} + +func (w *WatchMen) MarkPending(slotIdx uint64, deadline time.Time) { + w.pending[slotIdx] = pendingEntry{deadline: deadline} +} + +func NewWatchMen(ring *SharedRing) *WatchMen { + return &WatchMen{ + ring: ring, + pending: make(map[uint64]pendingEntry), + } +} + +// Ingest reads all unread events from the ring, advances ReadIndex, and +// returns them in order. Returns an error if the consumer has fallen behind +// and data has been overwritten (overflow). +func (w *WatchMen) Ingest() ([]SysLurchEventT, error) { + writeIdx := w.ring.WriteIndex() + readIdx := w.ring.ReadIndex() + + if writeIdx < readIdx { + return nil, fmt.Errorf("ingest: corrupt indices: write %d < read %d", writeIdx, readIdx) + } + + total := writeIdx - readIdx + if total == 0 { + return nil, nil + } + if total > uint64(MaxSlots) { + skipped := total - uint64(MaxSlots) + for s := readIdx; s < readIdx+skipped; s++ { + delete(w.pending, s) + } + readIdx += skipped + total = uint64(MaxSlots) + // TODO: caller needs to log skipped as dropped events + } + + events := make([]SysLurchEventT, 0, total) + resolved := make(map[uint64]bool, total) + + // first pass is decoding every slot in range, collect the resolved ones + for i := uint64(0); i < total; i++ { + slotIdx := readIdx + i + + buf, err := w.ring.SlotBytes(slotIdx) + if err != nil { + return events, fmt.Errorf("ingest: read slot %d: %v", slotIdx, err) + } + + ev, err := DecodeEvent(buf) + if err != nil { + return events, fmt.Errorf("ingest: decode slot %d: %v", slotIdx, err) + } + + if ev.PayloadLen == 0 { + + entry, tracked := w.pending[slotIdx] + if tracked && time.Now().After(entry.deadline) { + // means that this is still pending; worker is not done yet + // or it has timed out + delete(w.pending, slotIdx) + resolved[slotIdx] = true + ev.EventKind = ChldProcTimedOut + events = append(events, ev) + continue + } + + w.pending[slotIdx] = entry // keeps tracking; deadline unchanged + continue + } + + // rdy + delete(w.pending, slotIdx) + resolved[slotIdx] = true + events = append(events, ev) + } + + // second pass advances the ReadIndex past the longest resolved + // prefix starting at readIdx + advanceTo := readIdx + for resolved[advanceTo] { + advanceTo++ + } + + if advanceTo > readIdx { + w.ring.SetReadIndex(advanceTo) + } + + return events, nil +} |
