summaryrefslogtreecommitdiffhomepage
path: root/util
diff options
context:
space:
mode:
authorkari-ts <kari@tailscale.com>2025-03-19 11:28:04 -0700
committerkari-ts <kari@tailscale.com>2025-04-04 14:24:56 -0700
commit6d5c7b11913e09b061e863411ad488dc44a13870 (patch)
tree9e1789b5080ae4a92523611e49920dcb1102604b /util
parentca50599c95e0a4cb7b4aab179e866e202f10c0c4 (diff)
parent3a2c92f08eac8cd8f50356ff288e40a28636ee42 (diff)
downloadtailscale-kari/taildropsaf.tar.xz
tailscale-kari/taildropsaf.zip
-check if Context.getExternalFilesDirs works as is for private dir
Diffstat (limited to 'util')
-rw-r--r--util/eventbus/assets/event.html6
-rw-r--r--util/eventbus/assets/htmx-websocket.min.js.gzbin0 -> 4249 bytes
-rw-r--r--util/eventbus/assets/htmx.min.js.gzbin0 -> 16409 bytes
-rw-r--r--util/eventbus/assets/main.html97
-rw-r--r--util/eventbus/assets/monitor.html5
-rw-r--r--util/eventbus/assets/style.css90
-rw-r--r--util/eventbus/bench_test.go125
-rw-r--r--util/eventbus/bus.go309
-rw-r--r--util/eventbus/bus_test.go203
-rw-r--r--util/eventbus/client.go127
-rw-r--r--util/eventbus/debug-demo/main.go103
-rw-r--r--util/eventbus/debug.go188
-rw-r--r--util/eventbus/debughttp.go240
-rw-r--r--util/eventbus/debughttp_ios.go18
-rw-r--r--util/eventbus/doc.go92
-rw-r--r--util/eventbus/fetch-htmx.go93
-rw-r--r--util/eventbus/publish.go74
-rw-r--r--util/eventbus/queue.go85
-rw-r--r--util/eventbus/subscribe.go254
-rw-r--r--util/syspolicy/internal/internal.go4
-rw-r--r--util/syspolicy/setting/origin.go21
-rw-r--r--util/syspolicy/setting/raw_item.go44
-rw-r--r--util/syspolicy/setting/snapshot.go21
-rw-r--r--util/syspolicy/setting/summary.go21
24 files changed, 2177 insertions, 43 deletions
diff --git a/util/eventbus/assets/event.html b/util/eventbus/assets/event.html
new file mode 100644
index 000000000..8e016f583
--- /dev/null
+++ b/util/eventbus/assets/event.html
@@ -0,0 +1,6 @@
+<li id="monitor" hx-swap-oob="afterbegin">
+ <details>
+ <summary>{{.Count}}: {{.Type}} from {{.Event.From.Name}}, {{len .Event.To}} recipients</summary>
+ {{.Event.Event}}
+ </details>
+</li>
diff --git a/util/eventbus/assets/htmx-websocket.min.js.gz b/util/eventbus/assets/htmx-websocket.min.js.gz
new file mode 100644
index 000000000..4ed53be49
--- /dev/null
+++ b/util/eventbus/assets/htmx-websocket.min.js.gz
Binary files differ
diff --git a/util/eventbus/assets/htmx.min.js.gz b/util/eventbus/assets/htmx.min.js.gz
new file mode 100644
index 000000000..b75fea8d1
--- /dev/null
+++ b/util/eventbus/assets/htmx.min.js.gz
Binary files differ
diff --git a/util/eventbus/assets/main.html b/util/eventbus/assets/main.html
new file mode 100644
index 000000000..51d6b22ad
--- /dev/null
+++ b/util/eventbus/assets/main.html
@@ -0,0 +1,97 @@
+<!DOCTYPE html>
+<html>
+ <head>
+ <script src="bus/htmx.min.js"></script>
+ <script src="bus/htmx-websocket.min.js"></script>
+ <link rel="stylesheet" href="bus/style.css">
+ </head>
+ <body hx-ext="ws">
+ <h1>Event bus</h1>
+
+ <section>
+ <h2>General</h2>
+ {{with $.PublishQueue}}
+ {{len .}} pending
+ {{end}}
+
+ <button hx-post="bus/monitor" hx-swap="outerHTML">Monitor all events</button>
+ </section>
+
+ <section>
+ <h2>Clients</h2>
+
+ <table>
+ <thead>
+ <tr>
+ <th>Name</th>
+ <th>Publishing</th>
+ <th>Subscribing</th>
+ <th>Pending</th>
+ </tr>
+ </thead>
+ {{range .Clients}}
+ <tr id="{{.Name}}">
+ <td>{{.Name}}</td>
+ <td class="list">
+ <ul>
+ {{range .Publish}}
+ <li><a href="#{{.}}">{{.}}</a></li>
+ {{end}}
+ </ul>
+ </td>
+ <td class="list">
+ <ul>
+ {{range .Subscribe}}
+ <li><a href="#{{.}}">{{.}}</a></li>
+ {{end}}
+ </ul>
+ </td>
+ <td>
+ {{len ($.SubscribeQueue .Client)}}
+ </td>
+ </tr>
+ {{end}}
+ </table>
+ </section>
+
+ <section>
+ <h2>Types</h2>
+
+ {{range .Types}}
+
+ <section id="{{.}}">
+ <h3>{{.Name}}</h3>
+ <h4>Definition</h4>
+ <code>{{prettyPrintStruct .}}</code>
+
+ <h4>Published by:</h4>
+ {{if len (.Publish)}}
+ <ul>
+ {{range .Publish}}
+ <li><a href="#{{.Name}}">{{.Name}}</a></li>
+ {{end}}
+ </ul>
+ {{else}}
+ <ul>
+ <li>No publishers.</li>
+ </ul>
+ {{end}}
+
+ <h4>Received by:</h4>
+ {{if len (.Subscribe)}}
+ <ul>
+ {{range .Subscribe}}
+ <li><a href="#{{.Name}}">{{.Name}}</a></li>
+ {{end}}
+ </ul>
+ {{else}}
+ <ul>
+ <li>No subscribers.</li>
+ </ul>
+ {{end}}
+ </section>
+ {{end}}
+
+ </section>
+ </body>
+</html>
diff --git a/util/eventbus/assets/monitor.html b/util/eventbus/assets/monitor.html
new file mode 100644
index 000000000..1af5bdce6
--- /dev/null
+++ b/util/eventbus/assets/monitor.html
@@ -0,0 +1,5 @@
+<div>
+<ul id="monitor" ws-connect="bus/monitor">
+</ul>
+<button hx-get="bus" hx-target="body">Stop monitoring</button>
+</div>
diff --git a/util/eventbus/assets/style.css b/util/eventbus/assets/style.css
new file mode 100644
index 000000000..690bd4f17
--- /dev/null
+++ b/util/eventbus/assets/style.css
@@ -0,0 +1,90 @@
+/* CSS reset, thanks Josh Comeau: https://www.joshwcomeau.com/css/custom-css-reset/ */
+*, *::before, *::after { box-sizing: border-box; }
+* { margin: 0; }
+input, button, textarea, select { font: inherit; }
+p, h1, h2, h3, h4, h5, h6 { overflow-wrap: break-word; }
+p { text-wrap: pretty; }
+h1, h2, h3, h4, h5, h6 { text-wrap: balance; }
+#root, #__next { isolation: isolate; }
+body {
+ line-height: 1.5;
+ -webkit-font-smoothing: antialiased;
+}
+img, picture, video, canvas, svg {
+ display: block;
+ max-width: 100%;
+}
+
+/* Local styling begins */
+
+body {
+ padding: 12px;
+}
+
+div {
+ width: 100%;
+}
+
+section {
+ display: flex;
+ flex-direction: column;
+ flex-gap: 6px;
+ align-items: flex-start;
+ padding: 12px 0;
+}
+
+section > * {
+ margin-left: 24px;
+}
+
+section > h2, section > h3 {
+ margin-left: 0;
+ padding-bottom: 6px;
+ padding-top: 12px;
+}
+
+details {
+ padding-bottom: 12px;
+}
+
+table {
+ table-layout: fixed;
+ width: calc(100% - 48px);
+ border-collapse: collapse;
+ border: 1px solid black;
+}
+
+th, td {
+ padding: 12px;
+ border: 1px solid black;
+}
+
+td.list {
+ vertical-align: top;
+}
+
+ul {
+ list-style: none;
+}
+
+td ul {
+ margin: 0;
+ padding: 0;
+}
+
+code {
+ padding: 12px;
+ white-space: pre;
+}
+
+#monitor {
+ width: calc(100% - 48px);
+ resize: vertical;
+ padding: 12px;
+ overflow: scroll;
+ height: 15lh;
+ border: 1px inset;
+ min-height: 1em;
+ display: flex;
+ flex-direction: column-reverse;
+}
diff --git a/util/eventbus/bench_test.go b/util/eventbus/bench_test.go
new file mode 100644
index 000000000..25f5b8002
--- /dev/null
+++ b/util/eventbus/bench_test.go
@@ -0,0 +1,125 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package eventbus_test
+
+import (
+ "math/rand/v2"
+ "testing"
+
+ "tailscale.com/util/eventbus"
+)
+
+func BenchmarkBasicThroughput(b *testing.B) {
+ bus := eventbus.New()
+ pcli := bus.Client(b.Name() + "-pub")
+ scli := bus.Client(b.Name() + "-sub")
+
+ type emptyEvent [0]byte
+
+ // One publisher and a corresponding subscriber shoveling events as fast as
+ // they can through the plumbing.
+ pub := eventbus.Publish[emptyEvent](pcli)
+ sub := eventbus.Subscribe[emptyEvent](scli)
+
+ go func() {
+ for {
+ select {
+ case <-sub.Events():
+ continue
+ case <-sub.Done():
+ return
+ }
+ }
+ }()
+
+ for b.Loop() {
+ pub.Publish(emptyEvent{})
+ }
+ bus.Close()
+}
+
+func BenchmarkSubsThroughput(b *testing.B) {
+ bus := eventbus.New()
+ pcli := bus.Client(b.Name() + "-pub")
+ scli1 := bus.Client(b.Name() + "-sub1")
+ scli2 := bus.Client(b.Name() + "-sub2")
+
+ type emptyEvent [0]byte
+
+ // One publisher and two subscribers shoveling events as fast as they can
+ // through the plumbing.
+ pub := eventbus.Publish[emptyEvent](pcli)
+ sub1 := eventbus.Subscribe[emptyEvent](scli1)
+ sub2 := eventbus.Subscribe[emptyEvent](scli2)
+
+ for _, sub := range []*eventbus.Subscriber[emptyEvent]{sub1, sub2} {
+ go func() {
+ for {
+ select {
+ case <-sub.Events():
+ continue
+ case <-sub.Done():
+ return
+ }
+ }
+ }()
+ }
+
+ for b.Loop() {
+ pub.Publish(emptyEvent{})
+ }
+ bus.Close()
+}
+
+func BenchmarkMultiThroughput(b *testing.B) {
+ bus := eventbus.New()
+ cli := bus.Client(b.Name())
+
+ type eventA struct{}
+ type eventB struct{}
+
+ // Two disjoint event streams routed through the global order.
+ apub := eventbus.Publish[eventA](cli)
+ asub := eventbus.Subscribe[eventA](cli)
+ bpub := eventbus.Publish[eventB](cli)
+ bsub := eventbus.Subscribe[eventB](cli)
+
+ go func() {
+ for {
+ select {
+ case <-asub.Events():
+ continue
+ case <-asub.Done():
+ return
+ }
+ }
+ }()
+ go func() {
+ for {
+ select {
+ case <-bsub.Events():
+ continue
+ case <-bsub.Done():
+ return
+ }
+ }
+ }()
+
+ var rng uint64
+ var bits int
+ for b.Loop() {
+ if bits == 0 {
+ rng = rand.Uint64()
+ bits = 64
+ }
+ if rng&1 == 0 {
+ apub.Publish(eventA{})
+ } else {
+ bpub.Publish(eventB{})
+ }
+ rng >>= 1
+ bits--
+ }
+ bus.Close()
+}
diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go
new file mode 100644
index 000000000..45d12da2f
--- /dev/null
+++ b/util/eventbus/bus.go
@@ -0,0 +1,309 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package eventbus
+
+import (
+ "context"
+ "reflect"
+ "slices"
+ "sync"
+
+ "tailscale.com/util/set"
+)
+
+type PublishedEvent struct {
+ Event any
+ From *Client
+}
+
+type RoutedEvent struct {
+ Event any
+ From *Client
+ To []*Client
+}
+
+// Bus is an event bus that distributes published events to interested
+// subscribers.
+type Bus struct {
+ router *worker
+ write chan PublishedEvent
+ snapshot chan chan []PublishedEvent
+ routeDebug hook[RoutedEvent]
+
+ topicsMu sync.Mutex
+ topics map[reflect.Type][]*subscribeState
+
+ // Used for introspection/debugging only, not in the normal event
+ // publishing path.
+ clientsMu sync.Mutex
+ clients set.Set[*Client]
+}
+
+// New returns a new bus. Use [PublisherOf] to make event publishers,
+// and [Bus.Queue] and [Subscribe] to make event subscribers.
+func New() *Bus {
+ ret := &Bus{
+ write: make(chan PublishedEvent),
+ snapshot: make(chan chan []PublishedEvent),
+ topics: map[reflect.Type][]*subscribeState{},
+ clients: set.Set[*Client]{},
+ }
+ ret.router = runWorker(ret.pump)
+ return ret
+}
+
+// Client returns a new client with no subscriptions. Use [Subscribe]
+// to receive events, and [Publish] to emit events.
+//
+// The client's name is used only for debugging, to tell humans what
+// piece of code a publisher/subscriber belongs to. Aim for something
+// short but unique, for example "kernel-route-monitor" or "taildrop",
+// not "watcher".
+func (b *Bus) Client(name string) *Client {
+ ret := &Client{
+ name: name,
+ bus: b,
+ pub: set.Set[publisher]{},
+ }
+ b.clientsMu.Lock()
+ defer b.clientsMu.Unlock()
+ b.clients.Add(ret)
+ return ret
+}
+
+// Debugger returns the debugging facility for the bus.
+func (b *Bus) Debugger() *Debugger {
+ return &Debugger{b}
+}
+
+// Close closes the bus. Implicitly closes all clients, publishers and
+// subscribers attached to the bus.
+//
+// Close blocks until the bus is fully shut down. The bus is
+// permanently unusable after closing.
+func (b *Bus) Close() {
+ b.router.StopAndWait()
+
+ b.clientsMu.Lock()
+ defer b.clientsMu.Unlock()
+ for c := range b.clients {
+ c.Close()
+ }
+ b.clients = nil
+}
+
+func (b *Bus) pump(ctx context.Context) {
+ var vals queue[PublishedEvent]
+ acceptCh := func() chan PublishedEvent {
+ if vals.Full() {
+ return nil
+ }
+ return b.write
+ }
+ for {
+ // Drain all pending events. Note that while we're draining
+ // events into subscriber queues, we continue to
+ // opportunistically accept more incoming events, if we have
+ // queue space for it.
+ for !vals.Empty() {
+ val := vals.Peek()
+ dests := b.dest(reflect.ValueOf(val.Event).Type())
+
+ if b.routeDebug.active() {
+ clients := make([]*Client, len(dests))
+ for i := range len(dests) {
+ clients[i] = dests[i].client
+ }
+ b.routeDebug.run(RoutedEvent{
+ Event: val.Event,
+ From: val.From,
+ To: clients,
+ })
+ }
+
+ for _, d := range dests {
+ evt := DeliveredEvent{
+ Event: val.Event,
+ From: val.From,
+ To: d.client,
+ }
+ deliverOne:
+ for {
+ select {
+ case d.write <- evt:
+ break deliverOne
+ case <-d.closed():
+ // Queue closed, don't block but continue
+ // delivering to others.
+ break deliverOne
+ case in := <-acceptCh():
+ vals.Add(in)
+ in.From.publishDebug.run(in)
+ case <-ctx.Done():
+ return
+ case ch := <-b.snapshot:
+ ch <- vals.Snapshot()
+ }
+ }
+ }
+ vals.Drop()
+ }
+
+ // Inbound queue empty, wait for at least 1 work item before
+ // resuming.
+ for vals.Empty() {
+ select {
+ case <-ctx.Done():
+ return
+ case in := <-b.write:
+ vals.Add(in)
+ in.From.publishDebug.run(in)
+ case ch := <-b.snapshot:
+ ch <- nil
+ }
+ }
+ }
+}
+
+func (b *Bus) dest(t reflect.Type) []*subscribeState {
+ b.topicsMu.Lock()
+ defer b.topicsMu.Unlock()
+ return b.topics[t]
+}
+
+func (b *Bus) shouldPublish(t reflect.Type) bool {
+ if b.routeDebug.active() {
+ return true
+ }
+
+ b.topicsMu.Lock()
+ defer b.topicsMu.Unlock()
+ return len(b.topics[t]) > 0
+}
+
+func (b *Bus) listClients() []*Client {
+ b.clientsMu.Lock()
+ defer b.clientsMu.Unlock()
+ return b.clients.Slice()
+}
+
+func (b *Bus) snapshotPublishQueue() []PublishedEvent {
+ resp := make(chan []PublishedEvent)
+ select {
+ case b.snapshot <- resp:
+ return <-resp
+ case <-b.router.Done():
+ return nil
+ }
+}
+
+func (b *Bus) subscribe(t reflect.Type, q *subscribeState) (cancel func()) {
+ b.topicsMu.Lock()
+ defer b.topicsMu.Unlock()
+ b.topics[t] = append(b.topics[t], q)
+ return func() {
+ b.unsubscribe(t, q)
+ }
+}
+
+func (b *Bus) unsubscribe(t reflect.Type, q *subscribeState) {
+ b.topicsMu.Lock()
+ defer b.topicsMu.Unlock()
+ // Topic slices are accessed by pump without holding a lock, so we
+ // have to replace the entire slice when unsubscribing.
+ // Unsubscribing should be infrequent enough that this won't
+ // matter.
+ i := slices.Index(b.topics[t], q)
+ if i < 0 {
+ return
+ }
+ b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1)
+}
+
+// A worker runs a worker goroutine and helps coordinate its shutdown.
+type worker struct {
+ ctx context.Context
+ stop context.CancelFunc
+ stopped chan struct{}
+}
+
+// runWorker creates a worker goroutine running fn. The context passed
+// to fn is canceled by [worker.Stop].
+func runWorker(fn func(context.Context)) *worker {
+ ctx, stop := context.WithCancel(context.Background())
+ ret := &worker{
+ ctx: ctx,
+ stop: stop,
+ stopped: make(chan struct{}),
+ }
+ go ret.run(fn)
+ return ret
+}
+
+func (w *worker) run(fn func(context.Context)) {
+ defer close(w.stopped)
+ fn(w.ctx)
+}
+
+// Stop signals the worker goroutine to shut down.
+func (w *worker) Stop() { w.stop() }
+
+// Done returns a channel that is closed when the worker goroutine
+// exits.
+func (w *worker) Done() <-chan struct{} { return w.stopped }
+
+// Wait waits until the worker goroutine has exited.
+func (w *worker) Wait() { <-w.stopped }
+
+// StopAndWait signals the worker goroutine to shut down, then waits
+// for it to exit.
+func (w *worker) StopAndWait() {
+ w.stop()
+ <-w.stopped
+}
+
+// stopFlag is a value that can be watched for a notification. The
+// zero value is ready for use.
+//
+// The flag is notified by running [stopFlag.Stop]. Stop can be called
+// multiple times. Upon the first call to Stop, [stopFlag.Done] is
+// closed, all pending [stopFlag.Wait] calls return, and future Wait
+// calls return immediately.
+//
+// A stopFlag can only notify once, and is intended for use as a
+// one-way shutdown signal that's lighter than a cancellable
+// context.Context.
+type stopFlag struct {
+ // guards the lazy construction of stopped, and the value of
+ // alreadyStopped.
+ mu sync.Mutex
+ stopped chan struct{}
+ alreadyStopped bool
+}
+
+func (s *stopFlag) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.alreadyStopped {
+ return
+ }
+ s.alreadyStopped = true
+ if s.stopped == nil {
+ s.stopped = make(chan struct{})
+ }
+ close(s.stopped)
+}
+
+func (s *stopFlag) Done() <-chan struct{} {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.stopped == nil {
+ s.stopped = make(chan struct{})
+ }
+ return s.stopped
+}
+
+func (s *stopFlag) Wait() {
+ <-s.Done()
+}
diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go
new file mode 100644
index 000000000..e159b6a12
--- /dev/null
+++ b/util/eventbus/bus_test.go
@@ -0,0 +1,203 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package eventbus_test
+
+import (
+ "errors"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/creachadair/taskgroup"
+ "github.com/google/go-cmp/cmp"
+ "tailscale.com/util/eventbus"
+)
+
+type EventA struct {
+ Counter int
+}
+
+type EventB struct {
+ Counter int
+}
+
+func TestBus(t *testing.T) {
+ b := eventbus.New()
+ defer b.Close()
+
+ c := b.Client("TestSub")
+ defer c.Close()
+ s := eventbus.Subscribe[EventA](c)
+
+ go func() {
+ p := b.Client("TestPub")
+ defer p.Close()
+ pa := eventbus.Publish[EventA](p)
+ defer pa.Close()
+ pb := eventbus.Publish[EventB](p)
+ defer pb.Close()
+ pa.Publish(EventA{1})
+ pb.Publish(EventB{2})
+ pa.Publish(EventA{3})
+ }()
+
+ want := expectEvents(t, EventA{1}, EventA{3})
+ for !want.Empty() {
+ select {
+ case got := <-s.Events():
+ want.Got(got)
+ case <-s.Done():
+ t.Fatalf("queue closed unexpectedly")
+ case <-time.After(time.Second):
+ t.Fatalf("timed out waiting for event")
+ }
+ }
+}
+
+func TestBusMultipleConsumers(t *testing.T) {
+ b := eventbus.New()
+ defer b.Close()
+
+ c1 := b.Client("TestSubA")
+ defer c1.Close()
+ s1 := eventbus.Subscribe[EventA](c1)
+
+ c2 := b.Client("TestSubB")
+ defer c2.Close()
+ s2A := eventbus.Subscribe[EventA](c2)
+ s2B := eventbus.Subscribe[EventB](c2)
+
+ go func() {
+ p := b.Client("TestPub")
+ defer p.Close()
+ pa := eventbus.Publish[EventA](p)
+ defer pa.Close()
+ pb := eventbus.Publish[EventB](p)
+ defer pb.Close()
+ pa.Publish(EventA{1})
+ pb.Publish(EventB{2})
+ pa.Publish(EventA{3})
+ }()
+
+ wantA := expectEvents(t, EventA{1}, EventA{3})
+ wantB := expectEvents(t, EventA{1}, EventB{2}, EventA{3})
+ for !wantA.Empty() || !wantB.Empty() {
+ select {
+ case got := <-s1.Events():
+ wantA.Got(got)
+ case got := <-s2A.Events():
+ wantB.Got(got)
+ case got := <-s2B.Events():
+ wantB.Got(got)
+ case <-s1.Done():
+ t.Fatalf("queue closed unexpectedly")
+ case <-s2A.Done():
+ t.Fatalf("queue closed unexpectedly")
+ case <-s2B.Done():
+ t.Fatalf("queue closed unexpectedly")
+ case <-time.After(time.Second):
+ t.Fatalf("timed out waiting for event")
+ }
+ }
+}
+
+func TestSpam(t *testing.T) {
+ b := eventbus.New()
+ defer b.Close()
+
+ const (
+ publishers = 100
+ eventsPerPublisher = 20
+ wantEvents = publishers * eventsPerPublisher
+ subscribers = 100
+ )
+
+ var g taskgroup.Group
+
+ received := make([][]EventA, subscribers)
+ for i := range subscribers {
+ c := b.Client(fmt.Sprintf("Subscriber%d", i))
+ defer c.Close()
+ s := eventbus.Subscribe[EventA](c)
+ g.Go(func() error {
+ for range wantEvents {
+ select {
+ case evt := <-s.Events():
+ received[i] = append(received[i], evt)
+ case <-s.Done():
+ t.Errorf("queue done before expected number of events received")
+ return errors.New("queue prematurely closed")
+ case <-time.After(5 * time.Second):
+ t.Errorf("timed out waiting for expected bus event after %d events", len(received[i]))
+ return errors.New("timeout")
+ }
+ }
+ return nil
+ })
+ }
+
+ published := make([][]EventA, publishers)
+ for i := range publishers {
+ g.Run(func() {
+ c := b.Client(fmt.Sprintf("Publisher%d", i))
+ p := eventbus.Publish[EventA](c)
+ for j := range eventsPerPublisher {
+ evt := EventA{i*eventsPerPublisher + j}
+ p.Publish(evt)
+ published[i] = append(published[i], evt)
+ }
+ })
+ }
+
+ if err := g.Wait(); err != nil {
+ t.Fatal(err)
+ }
+ var last []EventA
+ for i, got := range received {
+ if len(got) != wantEvents {
+ // Receiving goroutine already reported an error, we just need
+ // to fail early within the main test goroutine.
+ t.FailNow()
+ }
+ if last == nil {
+ continue
+ }
+ if diff := cmp.Diff(got, last); diff != "" {
+ t.Errorf("Subscriber %d did not see the same events as %d (-got+want):\n%s", i, i-1, diff)
+ }
+ last = got
+ }
+ for i, sent := range published {
+ if got := len(sent); got != eventsPerPublisher {
+ t.Fatalf("Publisher %d sent %d events, want %d", i, got, eventsPerPublisher)
+ }
+ }
+
+ // TODO: check that the published sequences are proper
+ // subsequences of the received slices.
+}
+
+type queueChecker struct {
+ t *testing.T
+ want []any
+}
+
+func expectEvents(t *testing.T, want ...any) *queueChecker {
+ return &queueChecker{t, want}
+}
+
+func (q *queueChecker) Got(v any) {
+ q.t.Helper()
+ if q.Empty() {
+ q.t.Fatalf("queue got unexpected %v", v)
+ }
+ if v != q.want[0] {
+ q.t.Fatalf("queue got %#v, want %#v", v, q.want[0])
+ }
+ q.want = q.want[1:]
+}
+
+func (q *queueChecker) Empty() bool {
+ return len(q.want) == 0
+}
diff --git a/util/eventbus/client.go b/util/eventbus/client.go
new file mode 100644
index 000000000..a7a88c0a1
--- /dev/null
+++ b/util/eventbus/client.go
@@ -0,0 +1,127 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package eventbus
+
+import (
+ "reflect"
+ "sync"
+
+ "tailscale.com/util/set"
+)
+
+// A Client can publish and subscribe to events on its attached
+// bus. See [Publish] to publish events, and [Subscribe] to receive
+// events.
+//
+// Subscribers that share the same client receive events one at a
+// time, in the order they were published.
+type Client struct {
+ name string
+ bus *Bus
+ publishDebug hook[PublishedEvent]
+
+ mu sync.Mutex
+ pub set.Set[publisher]
+ sub *subscribeState // Lazily created on first subscribe
+}
+
+func (c *Client) Name() string { return c.name }
+
+// Close closes the client. Implicitly closes all publishers and
+// subscribers obtained from this client.
+func (c *Client) Close() {
+ var (
+ pub set.Set[publisher]
+ sub *subscribeState
+ )
+
+ c.mu.Lock()
+ pub, c.pub = c.pub, nil
+ sub, c.sub = c.sub, nil
+ c.mu.Unlock()
+
+ if sub != nil {
+ sub.close()
+ }
+ for p := range pub {
+ p.Close()
+ }
+}
+
+func (c *Client) snapshotSubscribeQueue() []DeliveredEvent {
+ return c.peekSubscribeState().snapshotQueue()
+}
+
+func (c *Client) peekSubscribeState() *subscribeState {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ return c.sub
+}
+
+func (c *Client) publishTypes() []reflect.Type {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ ret := make([]reflect.Type, 0, len(c.pub))
+ for pub := range c.pub {
+ ret = append(ret, pub.publishType())
+ }
+ return ret
+}
+
+func (c *Client) subscribeTypes() []reflect.Type {
+ return c.peekSubscribeState().subscribeTypes()
+}
+
+func (c *Client) subscribeState() *subscribeState {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.sub == nil {
+ c.sub = newSubscribeState(c)
+ }
+ return c.sub
+}
+
+func (c *Client) addPublisher(pub publisher) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.pub.Add(pub)
+}
+
+func (c *Client) deletePublisher(pub publisher) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.pub.Delete(pub)
+}
+
+func (c *Client) addSubscriber(t reflect.Type, s *subscribeState) {
+ c.bus.subscribe(t, s)
+}
+
+func (c *Client) deleteSubscriber(t reflect.Type, s *subscribeState) {
+ c.bus.unsubscribe(t, s)
+}
+
+func (c *Client) publish() chan<- PublishedEvent {
+ return c.bus.write
+}
+
+func (c *Client) shouldPublish(t reflect.Type) bool {
+ return c.publishDebug.active() || c.bus.shouldPublish(t)
+}
+
+// Subscribe requests delivery of events of type T through the given
+// Queue. Panics if the queue already has a subscriber for T.
+func Subscribe[T any](c *Client) *Subscriber[T] {
+ return newSubscriber[T](c.subscribeState())
+}
+
+// Publisher returns a publisher for event type T using the given
+// client.
+func Publish[T any](c *Client) *Publisher[T] {
+ ret := newPublisher[T](c)
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.pub.Add(ret)
+ return ret
+}
diff --git a/util/eventbus/debug-demo/main.go b/util/eventbus/debug-demo/main.go
new file mode 100644
index 000000000..a6d232d88
--- /dev/null
+++ b/util/eventbus/debug-demo/main.go
@@ -0,0 +1,103 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+// debug-demo is a program that serves a bus's debug interface over
+// HTTP, then generates some fake traffic from a handful of
+// clients. It is an aid to development, to have something to present
+// on the debug interfaces while writing them.
+package main
+
+import (
+ "log"
+ "math/rand/v2"
+ "net/http"
+ "net/netip"
+ "time"
+
+ "tailscale.com/tsweb"
+ "tailscale.com/types/key"
+ "tailscale.com/util/eventbus"
+)
+
+func main() {
+ b := eventbus.New()
+ c := b.Client("RouteMonitor")
+ go testPub[RouteAdded](c, 5*time.Second)
+ go testPub[RouteRemoved](c, 5*time.Second)
+ c = b.Client("ControlClient")
+ go testPub[PeerAdded](c, 3*time.Second)
+ go testPub[PeerRemoved](c, 6*time.Second)
+ c = b.Client("Portmapper")
+ go testPub[PortmapAcquired](c, 10*time.Second)
+ go testPub[PortmapLost](c, 15*time.Second)
+ go testSub[RouteAdded](c)
+ c = b.Client("WireguardConfig")
+ go testSub[PeerAdded](c)
+ go testSub[PeerRemoved](c)
+ c = b.Client("Magicsock")
+ go testPub[PeerPathChanged](c, 5*time.Second)
+ go testSub[RouteAdded](c)
+ go testSub[RouteRemoved](c)
+ go testSub[PortmapAcquired](c)
+ go testSub[PortmapLost](c)
+
+ m := http.NewServeMux()
+ d := tsweb.Debugger(m)
+ b.Debugger().RegisterHTTP(d)
+
+ m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+ http.Redirect(w, r, "/debug/bus", http.StatusFound)
+ })
+ log.Printf("Serving debug interface at http://localhost:8185/debug/bus")
+ http.ListenAndServe(":8185", m)
+}
+
+func testPub[T any](c *eventbus.Client, every time.Duration) {
+ p := eventbus.Publish[T](c)
+ for {
+ jitter := time.Duration(rand.N(2000)) * time.Millisecond
+ time.Sleep(jitter)
+ var zero T
+ log.Printf("%s publish: %T", c.Name(), zero)
+ p.Publish(zero)
+ time.Sleep(every)
+ }
+}
+
+func testSub[T any](c *eventbus.Client) {
+ s := eventbus.Subscribe[T](c)
+ for v := range s.Events() {
+ log.Printf("%s received: %T", c.Name(), v)
+ }
+}
+
+type RouteAdded struct {
+ Prefix netip.Prefix
+ Via netip.Addr
+ Priority int
+}
+type RouteRemoved struct {
+ Prefix netip.Addr
+}
+
+type PeerAdded struct {
+ ID int
+ Key key.NodePublic
+}
+type PeerRemoved struct {
+ ID int
+ Key key.NodePublic
+}
+
+type PortmapAcquired struct {
+ Endpoint netip.Addr
+}
+type PortmapLost struct {
+ Endpoint netip.Addr
+}
+
+type PeerPathChanged struct {
+ ID int
+ EndpointID int
+ Quality int
+}
diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go
new file mode 100644
index 000000000..832d72ac0
--- /dev/null
+++ b/util/eventbus/debug.go
@@ -0,0 +1,188 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package eventbus
+
+import (
+ "cmp"
+ "fmt"
+ "reflect"
+ "slices"
+ "sync"
+ "sync/atomic"
+
+ "tailscale.com/tsweb"
+)
+
+// A Debugger offers access to a bus's privileged introspection and
+// debugging facilities.
+//
+// The debugger's functionality is intended for humans and their tools
+// to examine and troubleshoot bus clients, and should not be used in
+// normal codepaths.
+//
+// In particular, the debugger provides access to information that is
+// deliberately withheld from bus clients to encourage more robust and
+// maintainable code - for example, the sender of an event, or the
+// event streams of other clients. Please don't use the debugger to
+// circumvent these restrictions for purposes other than debugging.
+type Debugger struct {
+ bus *Bus
+}
+
+// Clients returns a list of all clients attached to the bus.
+func (d *Debugger) Clients() []*Client {
+ ret := d.bus.listClients()
+ slices.SortFunc(ret, func(a, b *Client) int {
+ return cmp.Compare(a.Name(), b.Name())
+ })
+ return ret
+}
+
+// PublishQueue returns the contents of the publish queue.
+//
+// The publish queue contains events that have been accepted by the
+// bus from Publish() calls, but have not yet been routed to relevant
+// subscribers.
+//
+// This queue is expected to be almost empty in normal operation. A
+// full publish queue indicates that a slow subscriber downstream is
+// causing backpressure and stalling the bus.
+func (d *Debugger) PublishQueue() []PublishedEvent {
+ return d.bus.snapshotPublishQueue()
+}
+
+// checkClient verifies that client is attached to the same bus as the
+// Debugger, and panics if not.
+func (d *Debugger) checkClient(client *Client) {
+ if client.bus != d.bus {
+ panic(fmt.Errorf("SubscribeQueue given client belonging to wrong bus"))
+ }
+}
+
+// SubscribeQueue returns the contents of the given client's subscribe
+// queue.
+//
+// The subscribe queue contains events that are to be delivered to the
+// client, but haven't yet been handed off to the relevant
+// [Subscriber].
+//
+// This queue is expected to be almost empty in normal operation. A
+// full subscribe queue indicates that the client is accepting events
+// too slowly, and may be causing the rest of the bus to stall.
+func (d *Debugger) SubscribeQueue(client *Client) []DeliveredEvent {
+ d.checkClient(client)
+ return client.snapshotSubscribeQueue()
+}
+
+// WatchBus streams information about all events passing through the
+// bus.
+//
+// Monitored events are delivered in the bus's global publication
+// order (see "Concurrency properties" in the package docs).
+//
+// The caller must consume monitoring events promptly to avoid
+// stalling the bus (see "Expected subscriber behavior" in the package
+// docs).
+func (d *Debugger) WatchBus() *Subscriber[RoutedEvent] {
+ return newMonitor(d.bus.routeDebug.add)
+}
+
+// WatchPublish streams information about all events published by the
+// given client.
+//
+// Monitored events are delivered in the bus's global publication
+// order (see "Concurrency properties" in the package docs).
+//
+// The caller must consume monitoring events promptly to avoid
+// stalling the bus (see "Expected subscriber behavior" in the package
+// docs).
+func (d *Debugger) WatchPublish(client *Client) *Subscriber[PublishedEvent] {
+ d.checkClient(client)
+ return newMonitor(client.publishDebug.add)
+}
+
+// WatchSubscribe streams information about all events received by the
+// given client.
+//
+// Monitored events are delivered in the bus's global publication
+// order (see "Concurrency properties" in the package docs).
+//
+// The caller must consume monitoring events promptly to avoid
+// stalling the bus (see "Expected subscriber behavior" in the package
+// docs).
+func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent] {
+ d.checkClient(client)
+ return newMonitor(client.subscribeState().debug.add)
+}
+
+// PublishTypes returns the list of types being published by client.
+//
+// The returned types are those for which the client has obtained a
+// [Publisher]. The client may not have ever sent the type in
+// question.
+func (d *Debugger) PublishTypes(client *Client) []reflect.Type {
+ d.checkClient(client)
+ return client.publishTypes()
+}
+
+// SubscribeTypes returns the list of types being subscribed to by
+// client.
+//
+// The returned types are those for which the client has obtained a
+// [Subscriber]. The client may not have ever received the type in
+// question, and here may not be any publishers of the type.
+func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type {
+ d.checkClient(client)
+ return client.subscribeTypes()
+}
+
+func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler) { registerHTTPDebugger(d, td) }
+
+// A hook collects hook functions that can be run as a group.
+type hook[T any] struct {
+ sync.Mutex
+ fns []hookFn[T]
+}
+
+var hookID atomic.Uint64
+
+// add registers fn to be called when the hook is run. Returns an
+// unregistration function that removes fn from the hook when called.
+func (h *hook[T]) add(fn func(T)) (remove func()) {
+ id := hookID.Add(1)
+ h.Lock()
+ defer h.Unlock()
+ h.fns = append(h.fns, hookFn[T]{id, fn})
+ return func() { h.remove(id) }
+}
+
+// remove removes the function with the given ID from the hook.
+func (h *hook[T]) remove(id uint64) {
+ h.Lock()
+ defer h.Unlock()
+ h.fns = slices.DeleteFunc(h.fns, func(f hookFn[T]) bool { return f.ID == id })
+}
+
+// active reports whether any functions are registered with the
+// hook. This can be used to skip expensive work when the hook is
+// inactive.
+func (h *hook[T]) active() bool {
+ h.Lock()
+ defer h.Unlock()
+ return len(h.fns) > 0
+}
+
+// run calls all registered functions with the value v.
+func (h *hook[T]) run(v T) {
+ h.Lock()
+ defer h.Unlock()
+ for _, fn := range h.fns {
+ fn.Fn(v)
+ }
+}
+
+type hookFn[T any] struct {
+ ID uint64
+ Fn func(T)
+}
diff --git a/util/eventbus/debughttp.go b/util/eventbus/debughttp.go
new file mode 100644
index 000000000..18888cc56
--- /dev/null
+++ b/util/eventbus/debughttp.go
@@ -0,0 +1,240 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !ios
+
+package eventbus
+
+import (
+ "bytes"
+ "cmp"
+ "embed"
+ "fmt"
+ "html/template"
+ "io"
+ "io/fs"
+ "log"
+ "net/http"
+ "path/filepath"
+ "reflect"
+ "slices"
+ "strings"
+ "sync"
+
+ "github.com/coder/websocket"
+ "tailscale.com/tsweb"
+)
+
+type httpDebugger struct {
+ *Debugger
+}
+
+func registerHTTPDebugger(d *Debugger, td *tsweb.DebugHandler) {
+ dh := httpDebugger{d}
+ td.Handle("bus", "Event bus", dh)
+ td.HandleSilent("bus/monitor", http.HandlerFunc(dh.serveMonitor))
+ td.HandleSilent("bus/style.css", serveStatic("style.css"))
+ td.HandleSilent("bus/htmx.min.js", serveStatic("htmx.min.js.gz"))
+ td.HandleSilent("bus/htmx-websocket.min.js", serveStatic("htmx-websocket.min.js.gz"))
+}
+
+//go:embed assets/*.html
+var templatesSrc embed.FS
+
+var templates = sync.OnceValue(func() *template.Template {
+ d, err := fs.Sub(templatesSrc, "assets")
+ if err != nil {
+ panic(fmt.Errorf("getting eventbus debughttp templates subdir: %w", err))
+ }
+ ret := template.New("").Funcs(map[string]any{
+ "prettyPrintStruct": prettyPrintStruct,
+ })
+ return template.Must(ret.ParseFS(d, "*"))
+})
+
+//go:generate go run fetch-htmx.go
+
+//go:embed assets/*.css assets/*.min.js.gz
+var static embed.FS
+
+func serveStatic(name string) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch {
+ case strings.HasSuffix(name, ".css"):
+ w.Header().Set("Content-Type", "text/css")
+ case strings.HasSuffix(name, ".min.js.gz"):
+ w.Header().Set("Content-Type", "text/javascript")
+ w.Header().Set("Content-Encoding", "gzip")
+ case strings.HasSuffix(name, ".js"):
+ w.Header().Set("Content-Type", "text/javascript")
+ default:
+ http.Error(w, "not found", http.StatusNotFound)
+ return
+ }
+
+ f, err := static.Open(filepath.Join("assets", name))
+ if err != nil {
+ http.Error(w, fmt.Sprintf("opening asset: %v", err), http.StatusInternalServerError)
+ return
+ }
+ defer f.Close()
+ if _, err := io.Copy(w, f); err != nil {
+ http.Error(w, fmt.Sprintf("serving asset: %v", err), http.StatusInternalServerError)
+ return
+ }
+ })
+}
+
+func render(w http.ResponseWriter, name string, data any) {
+ err := templates().ExecuteTemplate(w, name+".html", data)
+ if err != nil {
+ err := fmt.Errorf("rendering template: %v", err)
+ log.Print(err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+}
+
+func (h httpDebugger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ type clientInfo struct {
+ *Client
+ Publish []reflect.Type
+ Subscribe []reflect.Type
+ }
+ type typeInfo struct {
+ reflect.Type
+ Publish []*Client
+ Subscribe []*Client
+ }
+ type info struct {
+ *Debugger
+ Clients map[string]*clientInfo
+ Types map[string]*typeInfo
+ }
+
+ data := info{
+ Debugger: h.Debugger,
+ Clients: map[string]*clientInfo{},
+ Types: map[string]*typeInfo{},
+ }
+
+ getTypeInfo := func(t reflect.Type) *typeInfo {
+ if data.Types[t.Name()] == nil {
+ data.Types[t.Name()] = &typeInfo{
+ Type: t,
+ }
+ }
+ return data.Types[t.Name()]
+ }
+
+ for _, c := range h.Clients() {
+ ci := &clientInfo{
+ Client: c,
+ Publish: h.PublishTypes(c),
+ Subscribe: h.SubscribeTypes(c),
+ }
+ slices.SortFunc(ci.Publish, func(a, b reflect.Type) int { return cmp.Compare(a.Name(), b.Name()) })
+ slices.SortFunc(ci.Subscribe, func(a, b reflect.Type) int { return cmp.Compare(a.Name(), b.Name()) })
+ data.Clients[c.Name()] = ci
+
+ for _, t := range ci.Publish {
+ ti := getTypeInfo(t)
+ ti.Publish = append(ti.Publish, c)
+ }
+ for _, t := range ci.Subscribe {
+ ti := getTypeInfo(t)
+ ti.Subscribe = append(ti.Subscribe, c)
+ }
+ }
+
+ render(w, "main", data)
+}
+
+func (h httpDebugger) serveMonitor(w http.ResponseWriter, r *http.Request) {
+ if r.Header.Get("Upgrade") == "websocket" {
+ h.serveMonitorStream(w, r)
+ return
+ }
+
+ render(w, "monitor", nil)
+}
+
+func (h httpDebugger) serveMonitorStream(w http.ResponseWriter, r *http.Request) {
+ conn, err := websocket.Accept(w, r, nil)
+ if err != nil {
+ return
+ }
+ defer conn.CloseNow()
+ wsCtx := conn.CloseRead(r.Context())
+
+ mon := h.WatchBus()
+ defer mon.Close()
+
+ i := 0
+ for {
+ select {
+ case <-r.Context().Done():
+ return
+ case <-wsCtx.Done():
+ return
+ case <-mon.Done():
+ return
+ case event := <-mon.Events():
+ msg, err := conn.Writer(r.Context(), websocket.MessageText)
+ if err != nil {
+ return
+ }
+ data := map[string]any{
+ "Count": i,
+ "Type": reflect.TypeOf(event.Event),
+ "Event": event,
+ }
+ i++
+ if err := templates().ExecuteTemplate(msg, "event.html", data); err != nil {
+ log.Println(err)
+ return
+ }
+ if err := msg.Close(); err != nil {
+ return
+ }
+ }
+ }
+}
+
+func prettyPrintStruct(t reflect.Type) string {
+ if t.Kind() != reflect.Struct {
+ return t.String()
+ }
+ var rec func(io.Writer, int, reflect.Type)
+ rec = func(out io.Writer, indent int, t reflect.Type) {
+ ind := strings.Repeat(" ", indent)
+ fmt.Fprintf(out, "%s", t.String())
+ fs := collectFields(t)
+ if len(fs) > 0 {
+ io.WriteString(out, " {\n")
+ for _, f := range fs {
+ fmt.Fprintf(out, "%s %s ", ind, f.Name)
+ if f.Type.Kind() == reflect.Struct {
+ rec(out, indent+1, f.Type)
+ } else {
+ fmt.Fprint(out, f.Type)
+ }
+ io.WriteString(out, "\n")
+ }
+ fmt.Fprintf(out, "%s}", ind)
+ }
+ }
+
+ var ret bytes.Buffer
+ rec(&ret, 0, t)
+ return ret.String()
+}
+
+func collectFields(t reflect.Type) (ret []reflect.StructField) {
+ for _, f := range reflect.VisibleFields(t) {
+ if !f.IsExported() {
+ continue
+ }
+ ret = append(ret, f)
+ }
+ return ret
+}
diff --git a/util/eventbus/debughttp_ios.go b/util/eventbus/debughttp_ios.go
new file mode 100644
index 000000000..a898898b7
--- /dev/null
+++ b/util/eventbus/debughttp_ios.go
@@ -0,0 +1,18 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build ios
+
+package eventbus
+
+import "tailscale.com/tsweb"
+
+func registerHTTPDebugger(d *Debugger, td *tsweb.DebugHandler) {
+ // The event bus debugging UI uses html/template, which uses
+ // reflection for method lookups. This forces the compiler to
+ // retain a lot more code and information to make dynamic method
+ // dispatch work, which is unacceptable bloat for the iOS build.
+ //
+ // TODO: https://github.com/tailscale/tailscale/issues/15297 to
+ // bring the debug UI back to iOS somehow.
+}
diff --git a/util/eventbus/doc.go b/util/eventbus/doc.go
new file mode 100644
index 000000000..964a686ea
--- /dev/null
+++ b/util/eventbus/doc.go
@@ -0,0 +1,92 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+// Package eventbus provides an in-process event bus.
+//
+// An event bus connects publishers of typed events with subscribers
+// interested in those events. Typically, there is one global event
+// bus per process.
+//
+// # Usage
+//
+// To send or receive events, first use [Bus.Client] to register with
+// the bus. Clients should register with a human-readable name that
+// identifies the code using the client, to aid in debugging.
+//
+// To publish events, use [Publish] on a Client to get a typed
+// publisher for your event type, then call [Publisher.Publish] as
+// needed. If your event is expensive to construct, you can optionally
+// use [Publisher.ShouldPublish] to skip the work if nobody is
+// listening for the event.
+//
+// To receive events, use [Subscribe] to get a typed subscriber for
+// each event type you're interested in. Receive the events themselves
+// by selecting over all your [Subscriber.Events] channels, as well as
+// [Subscriber.Done] for shutdown notifications.
+//
+// # Concurrency properties
+//
+// The bus serializes all published events across all publishers, and
+// preserves that ordering when delivering to subscribers that are
+// attached to the same Client. In more detail:
+//
+// - An event is published to the bus at some instant between the
+// start and end of the call to [Publisher.Publish].
+// - Two events cannot be published at the same instant, and so are
+// totally ordered by their publication time. Given two events E1
+// and E2, either E1 happens before E2, or E2 happens before E1.
+// - Clients dispatch events to their Subscribers in publication
+// order: if E1 happens before E2, the client always delivers E1
+// before E2.
+// - Clients do not synchronize subscriptions with each other: given
+// clients C1 and C2, both subscribed to events E1 and E2, C1 may
+// deliver both E1 and E2 before C2 delivers E1.
+//
+// Less formally: there is one true timeline of all published events.
+// If you make a Client and subscribe to events, you will receive
+// events one at a time, in the same order as the one true
+// timeline. You will "skip over" events you didn't subscribe to, but
+// your view of the world always moves forward in time, never
+// backwards, and you will observe events in the same order as
+// everyone else.
+//
+// However, you cannot assume that what your client see as "now" is
+// the same as what other clients. They may be further behind you in
+// working through the timeline, or running ahead of you. This means
+// you should be careful about reaching out to another component
+// directly after receiving an event, as its view of the world may not
+// yet (or ever) be exactly consistent with yours.
+//
+// To make your code more testable and understandable, you should try
+// to structure it following the actor model: you have some local
+// state over which you have authority, but your only way to interact
+// with state elsewhere in the program is to receive and process
+// events coming from elsewhere, or to emit events of your own.
+//
+// # Expected subscriber behavior
+//
+// Subscribers are expected to promptly receive their events on
+// [Subscriber.Events]. The bus has a small, fixed amount of internal
+// buffering, meaning that a slow subscriber will eventually cause
+// backpressure and block publication of all further events.
+//
+// In general, you should receive from your subscriber(s) in a loop,
+// and only do fast state updates within that loop. Any heavier work
+// should be offloaded to another goroutine.
+//
+// Causing publishers to block from backpressure is considered a bug
+// in the slow subscriber causing the backpressure, and should be
+// addressed there. Publishers should assume that Publish will not
+// block for extended periods of time, and should not make exceptional
+// effort to behave gracefully if they do get blocked.
+//
+// These blocking semantics are provisional and subject to
+// change. Please speak up if this causes development pain, so that we
+// can adapt the semantics to better suit our needs.
+//
+// # Debugging facilities
+//
+// The [Debugger], obtained through [Bus.Debugger], provides
+// introspection facilities to monitor events flowing through the bus,
+// and inspect publisher and subscriber state.
+package eventbus
diff --git a/util/eventbus/fetch-htmx.go b/util/eventbus/fetch-htmx.go
new file mode 100644
index 000000000..f80d50257
--- /dev/null
+++ b/util/eventbus/fetch-htmx.go
@@ -0,0 +1,93 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build ignore
+
+// Program fetch-htmx fetches and installs local copies of the HTMX
+// library and its dependencies, used by the debug UI. It is meant to
+// be run via go generate.
+package main
+
+import (
+ "compress/gzip"
+ "crypto/sha512"
+ "encoding/base64"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "os"
+)
+
+func main() {
+ // Hash from https://htmx.org/docs/#installing
+ htmx, err := fetchHashed("https://unpkg.com/htmx.org@2.0.4", "HGfztofotfshcF7+8n44JQL2oJmowVChPTg48S+jvZoztPfvwD79OC/LTtG6dMp+")
+ if err != nil {
+ log.Fatalf("fetching htmx: %v", err)
+ }
+
+ // Hash SHOULD be from https://htmx.org/extensions/ws/ , but the
+ // hash is currently incorrect, see
+ // https://github.com/bigskysoftware/htmx-extensions/issues/153
+ //
+ // Until that bug is resolved, hash was obtained by rebuilding the
+ // extension from git source, and verifying that the hash matches
+ // what unpkg is serving.
+ ws, err := fetchHashed("https://unpkg.com/htmx-ext-ws@2.0.2", "932iIqjARv+Gy0+r6RTGrfCkCKS5MsF539Iqf6Vt8L4YmbnnWI2DSFoMD90bvXd0")
+ if err != nil {
+ log.Fatalf("fetching htmx-websockets: %v", err)
+ }
+
+ if err := writeGz("assets/htmx.min.js.gz", htmx); err != nil {
+ log.Fatalf("writing htmx.min.js.gz: %v", err)
+ }
+ if err := writeGz("assets/htmx-websocket.min.js.gz", ws); err != nil {
+ log.Fatalf("writing htmx-websocket.min.js.gz: %v", err)
+ }
+}
+
+func writeGz(path string, bs []byte) error {
+ f, err := os.Create(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ g, err := gzip.NewWriterLevel(f, gzip.BestCompression)
+ if err != nil {
+ return err
+ }
+
+ if _, err := g.Write(bs); err != nil {
+ return err
+ }
+
+ if err := g.Flush(); err != nil {
+ return err
+ }
+ if err := f.Close(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func fetchHashed(url, wantHash string) ([]byte, error) {
+ resp, err := http.Get(url)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("fetching %q returned error status: %s", url, resp.Status)
+ }
+ ret, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, fmt.Errorf("reading file from %q: %v", url, err)
+ }
+ h := sha512.Sum384(ret)
+ got := base64.StdEncoding.EncodeToString(h[:])
+ if got != wantHash {
+ return nil, fmt.Errorf("wrong hash for %q: got %q, want %q", url, got, wantHash)
+ }
+ return ret, nil
+}
diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go
new file mode 100644
index 000000000..9897114b6
--- /dev/null
+++ b/util/eventbus/publish.go
@@ -0,0 +1,74 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package eventbus
+
+import (
+ "reflect"
+)
+
+// publisher is a uniformly typed wrapper around Publisher[T], so that
+// debugging facilities can look at active publishers.
+type publisher interface {
+ publishType() reflect.Type
+ Close()
+}
+
+// A Publisher publishes typed events on a bus.
+type Publisher[T any] struct {
+ client *Client
+ stop stopFlag
+}
+
+func newPublisher[T any](c *Client) *Publisher[T] {
+ ret := &Publisher[T]{
+ client: c,
+ }
+ c.addPublisher(ret)
+ return ret
+}
+
+// Close closes the publisher.
+//
+// Calls to Publish after Close silently do nothing.
+func (p *Publisher[T]) Close() {
+ // Just unblocks any active calls to Publish, no other
+ // synchronization needed.
+ p.stop.Stop()
+ p.client.deletePublisher(p)
+}
+
+func (p *Publisher[T]) publishType() reflect.Type {
+ return reflect.TypeFor[T]()
+}
+
+// Publish publishes event v on the bus.
+func (p *Publisher[T]) Publish(v T) {
+ // Check for just a stopped publisher or bus before trying to
+ // write, so that once closed Publish consistently does nothing.
+ select {
+ case <-p.stop.Done():
+ return
+ default:
+ }
+
+ evt := PublishedEvent{
+ Event: v,
+ From: p.client,
+ }
+
+ select {
+ case p.client.publish() <- evt:
+ case <-p.stop.Done():
+ }
+}
+
+// ShouldPublish reports whether anyone is subscribed to the events
+// that this publisher emits.
+//
+// ShouldPublish can be used to skip expensive event construction if
+// nobody seems to care. Publishers must not assume that someone will
+// definitely receive an event if ShouldPublish returns true.
+func (p *Publisher[T]) ShouldPublish() bool {
+ return p.client.shouldPublish(reflect.TypeFor[T]())
+}
diff --git a/util/eventbus/queue.go b/util/eventbus/queue.go
new file mode 100644
index 000000000..a62bf3c62
--- /dev/null
+++ b/util/eventbus/queue.go
@@ -0,0 +1,85 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package eventbus
+
+import (
+ "slices"
+)
+
+const maxQueuedItems = 16
+
+// queue is an ordered queue of length up to maxQueuedItems.
+type queue[T any] struct {
+ vals []T
+ start int
+}
+
+// canAppend reports whether a value can be appended to q.vals without
+// shifting values around.
+func (q *queue[T]) canAppend() bool {
+ return cap(q.vals) < maxQueuedItems || len(q.vals) < cap(q.vals)
+}
+
+func (q *queue[T]) Full() bool {
+ return q.start == 0 && !q.canAppend()
+}
+
+func (q *queue[T]) Empty() bool {
+ return q.start == len(q.vals)
+}
+
+func (q *queue[T]) Len() int {
+ return len(q.vals) - q.start
+}
+
+// Add adds v to the end of the queue. Blocks until append can be
+// done.
+func (q *queue[T]) Add(v T) {
+ if !q.canAppend() {
+ if q.start == 0 {
+ panic("Add on a full queue")
+ }
+
+ // Slide remaining values back to the start of the array.
+ n := copy(q.vals, q.vals[q.start:])
+ toClear := len(q.vals) - n
+ clear(q.vals[len(q.vals)-toClear:])
+ q.vals = q.vals[:n]
+ q.start = 0
+ }
+
+ q.vals = append(q.vals, v)
+}
+
+// Peek returns the first value in the queue, without removing it from
+// the queue, or nil if the queue is empty.
+func (q *queue[T]) Peek() T {
+ if q.Empty() {
+ var zero T
+ return zero
+ }
+
+ return q.vals[q.start]
+}
+
+// Drop discards the first value in the queue, if any.
+func (q *queue[T]) Drop() {
+ if q.Empty() {
+ return
+ }
+
+ var zero T
+ q.vals[q.start] = zero
+ q.start++
+ if q.Empty() {
+ // Reset cursor to start of array, it's free to do.
+ q.start = 0
+ q.vals = q.vals[:0]
+ }
+}
+
+// Snapshot returns a copy of the queue's contents.
+func (q *queue[T]) Snapshot() []T {
+ return slices.Clone(q.vals[q.start:])
+}
diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go
new file mode 100644
index 000000000..ba17e8548
--- /dev/null
+++ b/util/eventbus/subscribe.go
@@ -0,0 +1,254 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package eventbus
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "sync"
+)
+
+type DeliveredEvent struct {
+ Event any
+ From *Client
+ To *Client
+}
+
+// subscriber is a uniformly typed wrapper around Subscriber[T], so
+// that debugging facilities can look at active subscribers.
+type subscriber interface {
+ subscribeType() reflect.Type
+ // dispatch is a function that dispatches the head value in vals to
+ // a subscriber, while also handling stop and incoming queue write
+ // events.
+ //
+ // dispatch exists because of the strongly typed Subscriber[T]
+ // wrapper around subscriptions: within the bus events are boxed in an
+ // 'any', and need to be unpacked to their full type before delivery
+ // to the subscriber. This involves writing to a strongly-typed
+ // channel, so subscribeState cannot handle that dispatch by itself -
+ // but if that strongly typed send blocks, we also need to keep
+ // processing other potential sources of wakeups, which is how we end
+ // up at this awkward type signature and sharing of internal state
+ // through dispatch.
+ dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool
+ Close()
+}
+
+// subscribeState handles dispatching of events received from a Bus.
+type subscribeState struct {
+ client *Client
+
+ dispatcher *worker
+ write chan DeliveredEvent
+ snapshot chan chan []DeliveredEvent
+ debug hook[DeliveredEvent]
+
+ outputsMu sync.Mutex
+ outputs map[reflect.Type]subscriber
+}
+
+func newSubscribeState(c *Client) *subscribeState {
+ ret := &subscribeState{
+ client: c,
+ write: make(chan DeliveredEvent),
+ snapshot: make(chan chan []DeliveredEvent),
+ outputs: map[reflect.Type]subscriber{},
+ }
+ ret.dispatcher = runWorker(ret.pump)
+ return ret
+}
+
+func (q *subscribeState) pump(ctx context.Context) {
+ var vals queue[DeliveredEvent]
+ acceptCh := func() chan DeliveredEvent {
+ if vals.Full() {
+ return nil
+ }
+ return q.write
+ }
+ for {
+ if !vals.Empty() {
+ val := vals.Peek()
+ sub := q.subscriberFor(val.Event)
+ if sub == nil {
+ // Raced with unsubscribe.
+ vals.Drop()
+ continue
+ }
+ if !sub.dispatch(ctx, &vals, acceptCh, q.snapshot) {
+ return
+ }
+
+ if q.debug.active() {
+ q.debug.run(DeliveredEvent{
+ Event: val.Event,
+ From: val.From,
+ To: q.client,
+ })
+ }
+ } else {
+ // Keep the cases in this select in sync with
+ // Subscriber.dispatch below. The only different should be
+ // that this select doesn't deliver queued values to
+ // anyone, and unconditionally accepts new values.
+ select {
+ case val := <-q.write:
+ vals.Add(val)
+ case <-ctx.Done():
+ return
+ case ch := <-q.snapshot:
+ ch <- vals.Snapshot()
+ }
+ }
+ }
+}
+
+func (s *subscribeState) snapshotQueue() []DeliveredEvent {
+ if s == nil {
+ return nil
+ }
+
+ resp := make(chan []DeliveredEvent)
+ select {
+ case s.snapshot <- resp:
+ return <-resp
+ case <-s.dispatcher.Done():
+ return nil
+ }
+}
+
+func (s *subscribeState) subscribeTypes() []reflect.Type {
+ if s == nil {
+ return nil
+ }
+
+ s.outputsMu.Lock()
+ defer s.outputsMu.Unlock()
+ ret := make([]reflect.Type, 0, len(s.outputs))
+ for t := range s.outputs {
+ ret = append(ret, t)
+ }
+ return ret
+}
+
+func (s *subscribeState) addSubscriber(t reflect.Type, sub subscriber) {
+ s.outputsMu.Lock()
+ defer s.outputsMu.Unlock()
+ if s.outputs[t] != nil {
+ panic(fmt.Errorf("double subscription for event %s", t))
+ }
+ s.outputs[t] = sub
+ s.client.addSubscriber(t, s)
+}
+
+func (s *subscribeState) deleteSubscriber(t reflect.Type) {
+ s.outputsMu.Lock()
+ defer s.outputsMu.Unlock()
+ delete(s.outputs, t)
+ s.client.deleteSubscriber(t, s)
+}
+
+func (q *subscribeState) subscriberFor(val any) subscriber {
+ q.outputsMu.Lock()
+ defer q.outputsMu.Unlock()
+ return q.outputs[reflect.TypeOf(val)]
+}
+
+// Close closes the subscribeState. Implicitly closes all Subscribers
+// linked to this state, and any pending events are discarded.
+func (s *subscribeState) close() {
+ s.dispatcher.StopAndWait()
+
+ var subs map[reflect.Type]subscriber
+ s.outputsMu.Lock()
+ subs, s.outputs = s.outputs, nil
+ s.outputsMu.Unlock()
+ for _, sub := range subs {
+ sub.Close()
+ }
+}
+
+func (s *subscribeState) closed() <-chan struct{} {
+ return s.dispatcher.Done()
+}
+
+// A Subscriber delivers one type of event from a [Client].
+type Subscriber[T any] struct {
+ stop stopFlag
+ read chan T
+ unregister func()
+}
+
+func newSubscriber[T any](r *subscribeState) *Subscriber[T] {
+ t := reflect.TypeFor[T]()
+
+ ret := &Subscriber[T]{
+ read: make(chan T),
+ unregister: func() { r.deleteSubscriber(t) },
+ }
+ r.addSubscriber(t, ret)
+
+ return ret
+}
+
+func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
+ ret := &Subscriber[T]{
+ read: make(chan T, 100), // arbitrary, large
+ }
+ ret.unregister = attach(ret.monitor)
+ return ret
+}
+
+func (s *Subscriber[T]) subscribeType() reflect.Type {
+ return reflect.TypeFor[T]()
+}
+
+func (s *Subscriber[T]) monitor(debugEvent T) {
+ select {
+ case s.read <- debugEvent:
+ case <-s.stop.Done():
+ }
+}
+
+func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
+ t := vals.Peek().Event.(T)
+ for {
+ // Keep the cases in this select in sync with subscribeState.pump
+ // above. The only different should be that this select
+ // delivers a value on s.read.
+ select {
+ case s.read <- t:
+ vals.Drop()
+ return true
+ case val := <-acceptCh():
+ vals.Add(val)
+ case <-ctx.Done():
+ return false
+ case ch := <-snapshot:
+ ch <- vals.Snapshot()
+ }
+ }
+}
+
+// Events returns a channel on which the subscriber's events are
+// delivered.
+func (s *Subscriber[T]) Events() <-chan T {
+ return s.read
+}
+
+// Done returns a channel that is closed when the subscriber is
+// closed.
+func (s *Subscriber[T]) Done() <-chan struct{} {
+ return s.stop.Done()
+}
+
+// Close closes the Subscriber, indicating the caller no longer wishes
+// to receive this event type. After Close, receives on
+// [Subscriber.Events] block for ever.
+func (s *Subscriber[T]) Close() {
+ s.stop.Stop() // unblock receivers
+ s.unregister()
+}
diff --git a/util/syspolicy/internal/internal.go b/util/syspolicy/internal/internal.go
index 8f2889625..2e1737e5b 100644
--- a/util/syspolicy/internal/internal.go
+++ b/util/syspolicy/internal/internal.go
@@ -56,10 +56,10 @@ func EqualJSONForTest(tb TB, j1, j2 jsontext.Value) (s1, s2 string, equal bool)
return "", "", true
}
// Otherwise, format the values for display and return false.
- if err := j1.Indent("", "\t"); err != nil {
+ if err := j1.Indent(); err != nil {
tb.Fatal(err)
}
- if err := j2.Indent("", "\t"); err != nil {
+ if err := j2.Indent(); err != nil {
tb.Fatal(err)
}
return j1.String(), j2.String(), false
diff --git a/util/syspolicy/setting/origin.go b/util/syspolicy/setting/origin.go
index 078ef758e..4c7cc7025 100644
--- a/util/syspolicy/setting/origin.go
+++ b/util/syspolicy/setting/origin.go
@@ -50,22 +50,27 @@ func (s Origin) String() string {
return s.Scope().String()
}
-// MarshalJSONV2 implements [jsonv2.MarshalerV2].
-func (s Origin) MarshalJSONV2(out *jsontext.Encoder, opts jsonv2.Options) error {
- return jsonv2.MarshalEncode(out, &s.data, opts)
+var (
+ _ jsonv2.MarshalerTo = (*Origin)(nil)
+ _ jsonv2.UnmarshalerFrom = (*Origin)(nil)
+)
+
+// MarshalJSONTo implements [jsonv2.MarshalerTo].
+func (s Origin) MarshalJSONTo(out *jsontext.Encoder) error {
+ return jsonv2.MarshalEncode(out, &s.data)
}
-// UnmarshalJSONV2 implements [jsonv2.UnmarshalerV2].
-func (s *Origin) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) error {
- return jsonv2.UnmarshalDecode(in, &s.data, opts)
+// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom].
+func (s *Origin) UnmarshalJSONFrom(in *jsontext.Decoder) error {
+ return jsonv2.UnmarshalDecode(in, &s.data)
}
// MarshalJSON implements [json.Marshaler].
func (s Origin) MarshalJSON() ([]byte, error) {
- return jsonv2.Marshal(s) // uses MarshalJSONV2
+ return jsonv2.Marshal(s) // uses MarshalJSONTo
}
// UnmarshalJSON implements [json.Unmarshaler].
func (s *Origin) UnmarshalJSON(b []byte) error {
- return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONV2
+ return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONFrom
}
diff --git a/util/syspolicy/setting/raw_item.go b/util/syspolicy/setting/raw_item.go
index cf46e54b7..9a96073b0 100644
--- a/util/syspolicy/setting/raw_item.go
+++ b/util/syspolicy/setting/raw_item.go
@@ -75,31 +75,36 @@ func (i RawItem) String() string {
return fmt.Sprintf("%v%s", i.data.Value.Value, suffix)
}
-// MarshalJSONV2 implements [jsonv2.MarshalerV2].
-func (i RawItem) MarshalJSONV2(out *jsontext.Encoder, opts jsonv2.Options) error {
- return jsonv2.MarshalEncode(out, &i.data, opts)
+var (
+ _ jsonv2.MarshalerTo = (*RawItem)(nil)
+ _ jsonv2.UnmarshalerFrom = (*RawItem)(nil)
+)
+
+// MarshalJSONTo implements [jsonv2.MarshalerTo].
+func (i RawItem) MarshalJSONTo(out *jsontext.Encoder) error {
+ return jsonv2.MarshalEncode(out, &i.data)
}
-// UnmarshalJSONV2 implements [jsonv2.UnmarshalerV2].
-func (i *RawItem) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) error {
- return jsonv2.UnmarshalDecode(in, &i.data, opts)
+// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom].
+func (i *RawItem) UnmarshalJSONFrom(in *jsontext.Decoder) error {
+ return jsonv2.UnmarshalDecode(in, &i.data)
}
// MarshalJSON implements [json.Marshaler].
func (i RawItem) MarshalJSON() ([]byte, error) {
- return jsonv2.Marshal(i) // uses MarshalJSONV2
+ return jsonv2.Marshal(i) // uses MarshalJSONTo
}
// UnmarshalJSON implements [json.Unmarshaler].
func (i *RawItem) UnmarshalJSON(b []byte) error {
- return jsonv2.Unmarshal(b, i) // uses UnmarshalJSONV2
+ return jsonv2.Unmarshal(b, i) // uses UnmarshalJSONFrom
}
// RawValue represents a raw policy setting value read from a policy store.
// It is JSON-marshallable and facilitates unmarshalling of JSON values
// into corresponding policy setting types, with special handling for JSON numbers
// (unmarshalled as float64) and JSON string arrays (unmarshalled as []string).
-// See also [RawValue.UnmarshalJSONV2].
+// See also [RawValue.UnmarshalJSONFrom].
type RawValue struct {
opt.Value[any]
}
@@ -114,16 +119,21 @@ func RawValueOf[T RawValueType](v T) RawValue {
return RawValue{opt.ValueOf[any](v)}
}
-// MarshalJSONV2 implements [jsonv2.MarshalerV2].
-func (v RawValue) MarshalJSONV2(out *jsontext.Encoder, opts jsonv2.Options) error {
- return jsonv2.MarshalEncode(out, v.Value, opts)
+var (
+ _ jsonv2.MarshalerTo = (*RawValue)(nil)
+ _ jsonv2.UnmarshalerFrom = (*RawValue)(nil)
+)
+
+// MarshalJSONTo implements [jsonv2.MarshalerTo].
+func (v RawValue) MarshalJSONTo(out *jsontext.Encoder) error {
+ return jsonv2.MarshalEncode(out, v.Value)
}
-// UnmarshalJSONV2 implements [jsonv2.UnmarshalerV2] by attempting to unmarshal
+// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom] by attempting to unmarshal
// a JSON value as one of the supported policy setting value types (bool, string, uint64, or []string),
// based on the JSON value type. It fails if the JSON value is an object, if it's a JSON number that
// cannot be represented as a uint64, or if a JSON array contains anything other than strings.
-func (v *RawValue) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) error {
+func (v *RawValue) UnmarshalJSONFrom(in *jsontext.Decoder) error {
var valPtr any
switch k := in.PeekKind(); k {
case 't', 'f':
@@ -139,7 +149,7 @@ func (v *RawValue) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) er
default:
panic("unreachable")
}
- if err := jsonv2.UnmarshalDecode(in, valPtr, opts); err != nil {
+ if err := jsonv2.UnmarshalDecode(in, valPtr); err != nil {
v.Value.Clear()
return err
}
@@ -150,12 +160,12 @@ func (v *RawValue) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) er
// MarshalJSON implements [json.Marshaler].
func (v RawValue) MarshalJSON() ([]byte, error) {
- return jsonv2.Marshal(v) // uses MarshalJSONV2
+ return jsonv2.Marshal(v) // uses MarshalJSONTo
}
// UnmarshalJSON implements [json.Unmarshaler].
func (v *RawValue) UnmarshalJSON(b []byte) error {
- return jsonv2.Unmarshal(b, v) // uses UnmarshalJSONV2
+ return jsonv2.Unmarshal(b, v) // uses UnmarshalJSONFrom
}
// RawValues is a map of keyed setting values that can be read from a JSON.
diff --git a/util/syspolicy/setting/snapshot.go b/util/syspolicy/setting/snapshot.go
index 0af2bae0f..087325a04 100644
--- a/util/syspolicy/setting/snapshot.go
+++ b/util/syspolicy/setting/snapshot.go
@@ -147,23 +147,28 @@ type snapshotJSON struct {
Settings map[Key]RawItem `json:",omitempty"`
}
-// MarshalJSONV2 implements [jsonv2.MarshalerV2].
-func (s *Snapshot) MarshalJSONV2(out *jsontext.Encoder, opts jsonv2.Options) error {
+var (
+ _ jsonv2.MarshalerTo = (*Snapshot)(nil)
+ _ jsonv2.UnmarshalerFrom = (*Snapshot)(nil)
+)
+
+// MarshalJSONTo implements [jsonv2.MarshalerTo].
+func (s *Snapshot) MarshalJSONTo(out *jsontext.Encoder) error {
data := &snapshotJSON{}
if s != nil {
data.Summary = s.summary
data.Settings = s.m
}
- return jsonv2.MarshalEncode(out, data, opts)
+ return jsonv2.MarshalEncode(out, data)
}
-// UnmarshalJSONV2 implements [jsonv2.UnmarshalerV2].
-func (s *Snapshot) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) error {
+// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom].
+func (s *Snapshot) UnmarshalJSONFrom(in *jsontext.Decoder) error {
if s == nil {
return errors.New("s must not be nil")
}
data := &snapshotJSON{}
- if err := jsonv2.UnmarshalDecode(in, data, opts); err != nil {
+ if err := jsonv2.UnmarshalDecode(in, data); err != nil {
return err
}
*s = Snapshot{m: data.Settings, sig: deephash.Hash(&data.Settings), summary: data.Summary}
@@ -172,12 +177,12 @@ func (s *Snapshot) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) er
// MarshalJSON implements [json.Marshaler].
func (s *Snapshot) MarshalJSON() ([]byte, error) {
- return jsonv2.Marshal(s) // uses MarshalJSONV2
+ return jsonv2.Marshal(s) // uses MarshalJSONTo
}
// UnmarshalJSON implements [json.Unmarshaler].
func (s *Snapshot) UnmarshalJSON(b []byte) error {
- return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONV2
+ return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONFrom
}
// MergeSnapshots returns a [Snapshot] that contains all [RawItem]s
diff --git a/util/syspolicy/setting/summary.go b/util/syspolicy/setting/summary.go
index 5ff20e0aa..9864822f7 100644
--- a/util/syspolicy/setting/summary.go
+++ b/util/syspolicy/setting/summary.go
@@ -54,24 +54,29 @@ func (s Summary) String() string {
return s.data.Scope.String()
}
-// MarshalJSONV2 implements [jsonv2.MarshalerV2].
-func (s Summary) MarshalJSONV2(out *jsontext.Encoder, opts jsonv2.Options) error {
- return jsonv2.MarshalEncode(out, &s.data, opts)
+var (
+ _ jsonv2.MarshalerTo = (*Summary)(nil)
+ _ jsonv2.UnmarshalerFrom = (*Summary)(nil)
+)
+
+// MarshalJSONTo implements [jsonv2.MarshalerTo].
+func (s Summary) MarshalJSONTo(out *jsontext.Encoder) error {
+ return jsonv2.MarshalEncode(out, &s.data)
}
-// UnmarshalJSONV2 implements [jsonv2.UnmarshalerV2].
-func (s *Summary) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) error {
- return jsonv2.UnmarshalDecode(in, &s.data, opts)
+// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom].
+func (s *Summary) UnmarshalJSONFrom(in *jsontext.Decoder) error {
+ return jsonv2.UnmarshalDecode(in, &s.data)
}
// MarshalJSON implements [json.Marshaler].
func (s Summary) MarshalJSON() ([]byte, error) {
- return jsonv2.Marshal(s) // uses MarshalJSONV2
+ return jsonv2.Marshal(s) // uses MarshalJSONTo
}
// UnmarshalJSON implements [json.Unmarshaler].
func (s *Summary) UnmarshalJSON(b []byte) error {
- return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONV2
+ return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONFrom
}
// SummaryOption is an option that configures [Summary]