summaryrefslogtreecommitdiff
path: root/internal/ring.go
blob: 3cf8ada6d44c78dccfa83e9cc5342f4e1d04bcd6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package internal

import (
	"encoding/binary"
	"fmt"
	"sync/atomic"
)

const (
	// MaxSlots is the source of truth for ring capacity.
	MaxSlots = 2560

	// SlotHeaderSize is the fixed per-slot metadata size:
	// EventTime(8) + EventID(8) + EventKind(4) + Data1(4) + Data2(4) + PayloadLen(4)
	SlotHeaderSize = 32

	// SlotSize is the total size of one slot, including payload.
	SlotSize = 65536

	// HeaderSize is the shared ring header: WriteIndex, ReadIndex, EventCount (3x uint64).
	HeaderSize = 24

	// SizeFile is the total backing file size.
	SizeFile = HeaderSize + MaxSlots*SlotSize
)

// SharedRing wraps a ReaderAt and provides atomic access to the shared
// ring buffer header and slot regions of the mmap'd file.
type SharedRing struct {
	r *ReaderAt
}

func NewSharedRing(r *ReaderAt) *SharedRing {
	return &SharedRing{r: r}
}

// header field accessors via unsafe pointers into the mmap'd region.
// Offsets: WriteIndex=0, ReadIndex=8, EventCount=16.

func (s *SharedRing) writeIndexPtr() *uint64 {
	return (*uint64)(unsafePointerAt(s.r.data, 0))
}

func (s *SharedRing) readIndexPtr() *uint64 {
	return (*uint64)(unsafePointerAt(s.r.data, 8))
}

func (s *SharedRing) eventCountPtr() *uint64 {
	return (*uint64)(unsafePointerAt(s.r.data, 16))
}

// WriteIndex returns the current write index (next slot to be claimed).
func (s *SharedRing) WriteIndex() uint64 {
	return atomic.LoadUint64(s.writeIndexPtr())
}

// ReadIndex returns the current read index (next slot to be consumed).
func (s *SharedRing) ReadIndex() uint64 {
	return atomic.LoadUint64(s.readIndexPtr())
}

// SetReadIndex updates the read index after the consumer processes events.
func (s *SharedRing) SetReadIndex(idx uint64) {
	atomic.StoreUint64(s.readIndexPtr(), idx)
}

// ClaimSlot atomically reserves the next slot for a producer to write into.
// Returns the byte offset into the mmap'd region and the raw (unwrapped) slot index.
func (s *SharedRing) ClaimSlot() (offset int64, slotIndex uint64, err error) {
	idx := atomic.AddUint64(s.writeIndexPtr(), 1) - 1
	atomic.AddUint64(s.eventCountPtr(), 1)

	wrapped := idx % uint64(MaxSlots)
	offset = int64(HeaderSize) + int64(wrapped)*int64(SlotSize)

	if offset+int64(SlotSize) > int64(len(s.r.data)) {
		return 0, 0, fmt.Errorf("claim slot: offset out of range: %d", offset)
	}

	return offset, idx, nil
}

// SlotBytes returns the raw slot buffer for the given (already-wrapped)
// logical slot index, computing its position in the ring.
func (s *SharedRing) SlotBytes(slotIndex uint64) ([]byte, error) {
	wrapped := slotIndex % uint64(MaxSlots)
	offset := int64(HeaderSize) + int64(wrapped)*int64(SlotSize)

	if offset+int64(SlotSize) > int64(len(s.r.data)) {
		return nil, fmt.Errorf("slot bytes: offset out of range: %d", offset)
	}

	return s.r.data[offset : offset+int64(SlotSize)], nil
}

// WriteSlot writes encoded event bytes at the given offset (from ClaimSlot).
func (s *SharedRing) WriteSlot(offset int64, encoded []byte) error {
	if len(encoded) != SlotSize {
		return fmt.Errorf("write slot: encoded size %d != %d", len(encoded), SlotSize)
	}
	if offset+int64(SlotSize) > int64(len(s.r.data)) {
		return fmt.Errorf("write slot: offset out of range: %d", offset)
	}

	copy(s.r.data[offset:offset+int64(SlotSize)], encoded)
	return nil
}

// InitHeader zero-initializes the header region. Call this only when the
// backing file was just created (size == 0 case in Open).
func (s *SharedRing) InitHeader() {
	binary.LittleEndian.PutUint64(s.r.data[0:8], 0)
	binary.LittleEndian.PutUint64(s.r.data[8:16], 0)
	binary.LittleEndian.PutUint64(s.r.data[16:24], 0)
}

func (s *SharedRing) WriteHeader(offset int64, ev *SysLurchEventT) error {
	hdr := make([]byte, SlotHeaderSize)
	binary.LittleEndian.PutUint64(hdr[0:8], uint64(ev.EventTime))
	binary.LittleEndian.PutUint64(hdr[8:16], uint64(ev.EventID))
	binary.LittleEndian.PutUint32(hdr[28:32], 0)

	if offset*int64(SlotHeaderSize) > int64(len(s.r.data)) {
		return fmt.Errorf("write header: offset is out of range - %d", offset)
	}

	copy(s.r.data[offset:offset*int64(SlotHeaderSize)], hdr)

	return nil
}