diff options
Diffstat (limited to 'util')
24 files changed, 2177 insertions, 43 deletions
diff --git a/util/eventbus/assets/event.html b/util/eventbus/assets/event.html new file mode 100644 index 000000000..8e016f583 --- /dev/null +++ b/util/eventbus/assets/event.html @@ -0,0 +1,6 @@ +<li id="monitor" hx-swap-oob="afterbegin"> + <details> + <summary>{{.Count}}: {{.Type}} from {{.Event.From.Name}}, {{len .Event.To}} recipients</summary> + {{.Event.Event}} + </details> +</li> diff --git a/util/eventbus/assets/htmx-websocket.min.js.gz b/util/eventbus/assets/htmx-websocket.min.js.gz Binary files differnew file mode 100644 index 000000000..4ed53be49 --- /dev/null +++ b/util/eventbus/assets/htmx-websocket.min.js.gz diff --git a/util/eventbus/assets/htmx.min.js.gz b/util/eventbus/assets/htmx.min.js.gz Binary files differnew file mode 100644 index 000000000..b75fea8d1 --- /dev/null +++ b/util/eventbus/assets/htmx.min.js.gz diff --git a/util/eventbus/assets/main.html b/util/eventbus/assets/main.html new file mode 100644 index 000000000..51d6b22ad --- /dev/null +++ b/util/eventbus/assets/main.html @@ -0,0 +1,97 @@ +<!DOCTYPE html> +<html> + <head> + <script src="bus/htmx.min.js"></script> + <script src="bus/htmx-websocket.min.js"></script> + <link rel="stylesheet" href="bus/style.css"> + </head> + <body hx-ext="ws"> + <h1>Event bus</h1> + + <section> + <h2>General</h2> + {{with $.PublishQueue}} + {{len .}} pending + {{end}} + + <button hx-post="bus/monitor" hx-swap="outerHTML">Monitor all events</button> + </section> + + <section> + <h2>Clients</h2> + + <table> + <thead> + <tr> + <th>Name</th> + <th>Publishing</th> + <th>Subscribing</th> + <th>Pending</th> + </tr> + </thead> + {{range .Clients}} + <tr id="{{.Name}}"> + <td>{{.Name}}</td> + <td class="list"> + <ul> + {{range .Publish}} + <li><a href="#{{.}}">{{.}}</a></li> + {{end}} + </ul> + </td> + <td class="list"> + <ul> + {{range .Subscribe}} + <li><a href="#{{.}}">{{.}}</a></li> + {{end}} + </ul> + </td> + <td> + {{len ($.SubscribeQueue .Client)}} + </td> + </tr> + {{end}} + </table> + </section> + + <section> + <h2>Types</h2> + + {{range .Types}} + + <section id="{{.}}"> + <h3>{{.Name}}</h3> + <h4>Definition</h4> + <code>{{prettyPrintStruct .}}</code> + + <h4>Published by:</h4> + {{if len (.Publish)}} + <ul> + {{range .Publish}} + <li><a href="#{{.Name}}">{{.Name}}</a></li> + {{end}} + </ul> + {{else}} + <ul> + <li>No publishers.</li> + </ul> + {{end}} + + <h4>Received by:</h4> + {{if len (.Subscribe)}} + <ul> + {{range .Subscribe}} + <li><a href="#{{.Name}}">{{.Name}}</a></li> + {{end}} + </ul> + {{else}} + <ul> + <li>No subscribers.</li> + </ul> + {{end}} + </section> + {{end}} + + </section> + </body> +</html> diff --git a/util/eventbus/assets/monitor.html b/util/eventbus/assets/monitor.html new file mode 100644 index 000000000..1af5bdce6 --- /dev/null +++ b/util/eventbus/assets/monitor.html @@ -0,0 +1,5 @@ +<div> +<ul id="monitor" ws-connect="bus/monitor"> +</ul> +<button hx-get="bus" hx-target="body">Stop monitoring</button> +</div> diff --git a/util/eventbus/assets/style.css b/util/eventbus/assets/style.css new file mode 100644 index 000000000..690bd4f17 --- /dev/null +++ b/util/eventbus/assets/style.css @@ -0,0 +1,90 @@ +/* CSS reset, thanks Josh Comeau: https://www.joshwcomeau.com/css/custom-css-reset/ */ +*, *::before, *::after { box-sizing: border-box; } +* { margin: 0; } +input, button, textarea, select { font: inherit; } +p, h1, h2, h3, h4, h5, h6 { overflow-wrap: break-word; } +p { text-wrap: pretty; } +h1, h2, h3, h4, h5, h6 { text-wrap: balance; } +#root, #__next { isolation: isolate; } +body { + line-height: 1.5; + -webkit-font-smoothing: antialiased; +} +img, picture, video, canvas, svg { + display: block; + max-width: 100%; +} + +/* Local styling begins */ + +body { + padding: 12px; +} + +div { + width: 100%; +} + +section { + display: flex; + flex-direction: column; + flex-gap: 6px; + align-items: flex-start; + padding: 12px 0; +} + +section > * { + margin-left: 24px; +} + +section > h2, section > h3 { + margin-left: 0; + padding-bottom: 6px; + padding-top: 12px; +} + +details { + padding-bottom: 12px; +} + +table { + table-layout: fixed; + width: calc(100% - 48px); + border-collapse: collapse; + border: 1px solid black; +} + +th, td { + padding: 12px; + border: 1px solid black; +} + +td.list { + vertical-align: top; +} + +ul { + list-style: none; +} + +td ul { + margin: 0; + padding: 0; +} + +code { + padding: 12px; + white-space: pre; +} + +#monitor { + width: calc(100% - 48px); + resize: vertical; + padding: 12px; + overflow: scroll; + height: 15lh; + border: 1px inset; + min-height: 1em; + display: flex; + flex-direction: column-reverse; +} diff --git a/util/eventbus/bench_test.go b/util/eventbus/bench_test.go new file mode 100644 index 000000000..25f5b8002 --- /dev/null +++ b/util/eventbus/bench_test.go @@ -0,0 +1,125 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus_test + +import ( + "math/rand/v2" + "testing" + + "tailscale.com/util/eventbus" +) + +func BenchmarkBasicThroughput(b *testing.B) { + bus := eventbus.New() + pcli := bus.Client(b.Name() + "-pub") + scli := bus.Client(b.Name() + "-sub") + + type emptyEvent [0]byte + + // One publisher and a corresponding subscriber shoveling events as fast as + // they can through the plumbing. + pub := eventbus.Publish[emptyEvent](pcli) + sub := eventbus.Subscribe[emptyEvent](scli) + + go func() { + for { + select { + case <-sub.Events(): + continue + case <-sub.Done(): + return + } + } + }() + + for b.Loop() { + pub.Publish(emptyEvent{}) + } + bus.Close() +} + +func BenchmarkSubsThroughput(b *testing.B) { + bus := eventbus.New() + pcli := bus.Client(b.Name() + "-pub") + scli1 := bus.Client(b.Name() + "-sub1") + scli2 := bus.Client(b.Name() + "-sub2") + + type emptyEvent [0]byte + + // One publisher and two subscribers shoveling events as fast as they can + // through the plumbing. + pub := eventbus.Publish[emptyEvent](pcli) + sub1 := eventbus.Subscribe[emptyEvent](scli1) + sub2 := eventbus.Subscribe[emptyEvent](scli2) + + for _, sub := range []*eventbus.Subscriber[emptyEvent]{sub1, sub2} { + go func() { + for { + select { + case <-sub.Events(): + continue + case <-sub.Done(): + return + } + } + }() + } + + for b.Loop() { + pub.Publish(emptyEvent{}) + } + bus.Close() +} + +func BenchmarkMultiThroughput(b *testing.B) { + bus := eventbus.New() + cli := bus.Client(b.Name()) + + type eventA struct{} + type eventB struct{} + + // Two disjoint event streams routed through the global order. + apub := eventbus.Publish[eventA](cli) + asub := eventbus.Subscribe[eventA](cli) + bpub := eventbus.Publish[eventB](cli) + bsub := eventbus.Subscribe[eventB](cli) + + go func() { + for { + select { + case <-asub.Events(): + continue + case <-asub.Done(): + return + } + } + }() + go func() { + for { + select { + case <-bsub.Events(): + continue + case <-bsub.Done(): + return + } + } + }() + + var rng uint64 + var bits int + for b.Loop() { + if bits == 0 { + rng = rand.Uint64() + bits = 64 + } + if rng&1 == 0 { + apub.Publish(eventA{}) + } else { + bpub.Publish(eventB{}) + } + rng >>= 1 + bits-- + } + bus.Close() +} diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go new file mode 100644 index 000000000..45d12da2f --- /dev/null +++ b/util/eventbus/bus.go @@ -0,0 +1,309 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "context" + "reflect" + "slices" + "sync" + + "tailscale.com/util/set" +) + +type PublishedEvent struct { + Event any + From *Client +} + +type RoutedEvent struct { + Event any + From *Client + To []*Client +} + +// Bus is an event bus that distributes published events to interested +// subscribers. +type Bus struct { + router *worker + write chan PublishedEvent + snapshot chan chan []PublishedEvent + routeDebug hook[RoutedEvent] + + topicsMu sync.Mutex + topics map[reflect.Type][]*subscribeState + + // Used for introspection/debugging only, not in the normal event + // publishing path. + clientsMu sync.Mutex + clients set.Set[*Client] +} + +// New returns a new bus. Use [PublisherOf] to make event publishers, +// and [Bus.Queue] and [Subscribe] to make event subscribers. +func New() *Bus { + ret := &Bus{ + write: make(chan PublishedEvent), + snapshot: make(chan chan []PublishedEvent), + topics: map[reflect.Type][]*subscribeState{}, + clients: set.Set[*Client]{}, + } + ret.router = runWorker(ret.pump) + return ret +} + +// Client returns a new client with no subscriptions. Use [Subscribe] +// to receive events, and [Publish] to emit events. +// +// The client's name is used only for debugging, to tell humans what +// piece of code a publisher/subscriber belongs to. Aim for something +// short but unique, for example "kernel-route-monitor" or "taildrop", +// not "watcher". +func (b *Bus) Client(name string) *Client { + ret := &Client{ + name: name, + bus: b, + pub: set.Set[publisher]{}, + } + b.clientsMu.Lock() + defer b.clientsMu.Unlock() + b.clients.Add(ret) + return ret +} + +// Debugger returns the debugging facility for the bus. +func (b *Bus) Debugger() *Debugger { + return &Debugger{b} +} + +// Close closes the bus. Implicitly closes all clients, publishers and +// subscribers attached to the bus. +// +// Close blocks until the bus is fully shut down. The bus is +// permanently unusable after closing. +func (b *Bus) Close() { + b.router.StopAndWait() + + b.clientsMu.Lock() + defer b.clientsMu.Unlock() + for c := range b.clients { + c.Close() + } + b.clients = nil +} + +func (b *Bus) pump(ctx context.Context) { + var vals queue[PublishedEvent] + acceptCh := func() chan PublishedEvent { + if vals.Full() { + return nil + } + return b.write + } + for { + // Drain all pending events. Note that while we're draining + // events into subscriber queues, we continue to + // opportunistically accept more incoming events, if we have + // queue space for it. + for !vals.Empty() { + val := vals.Peek() + dests := b.dest(reflect.ValueOf(val.Event).Type()) + + if b.routeDebug.active() { + clients := make([]*Client, len(dests)) + for i := range len(dests) { + clients[i] = dests[i].client + } + b.routeDebug.run(RoutedEvent{ + Event: val.Event, + From: val.From, + To: clients, + }) + } + + for _, d := range dests { + evt := DeliveredEvent{ + Event: val.Event, + From: val.From, + To: d.client, + } + deliverOne: + for { + select { + case d.write <- evt: + break deliverOne + case <-d.closed(): + // Queue closed, don't block but continue + // delivering to others. + break deliverOne + case in := <-acceptCh(): + vals.Add(in) + in.From.publishDebug.run(in) + case <-ctx.Done(): + return + case ch := <-b.snapshot: + ch <- vals.Snapshot() + } + } + } + vals.Drop() + } + + // Inbound queue empty, wait for at least 1 work item before + // resuming. + for vals.Empty() { + select { + case <-ctx.Done(): + return + case in := <-b.write: + vals.Add(in) + in.From.publishDebug.run(in) + case ch := <-b.snapshot: + ch <- nil + } + } + } +} + +func (b *Bus) dest(t reflect.Type) []*subscribeState { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + return b.topics[t] +} + +func (b *Bus) shouldPublish(t reflect.Type) bool { + if b.routeDebug.active() { + return true + } + + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + return len(b.topics[t]) > 0 +} + +func (b *Bus) listClients() []*Client { + b.clientsMu.Lock() + defer b.clientsMu.Unlock() + return b.clients.Slice() +} + +func (b *Bus) snapshotPublishQueue() []PublishedEvent { + resp := make(chan []PublishedEvent) + select { + case b.snapshot <- resp: + return <-resp + case <-b.router.Done(): + return nil + } +} + +func (b *Bus) subscribe(t reflect.Type, q *subscribeState) (cancel func()) { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + b.topics[t] = append(b.topics[t], q) + return func() { + b.unsubscribe(t, q) + } +} + +func (b *Bus) unsubscribe(t reflect.Type, q *subscribeState) { + b.topicsMu.Lock() + defer b.topicsMu.Unlock() + // Topic slices are accessed by pump without holding a lock, so we + // have to replace the entire slice when unsubscribing. + // Unsubscribing should be infrequent enough that this won't + // matter. + i := slices.Index(b.topics[t], q) + if i < 0 { + return + } + b.topics[t] = slices.Delete(slices.Clone(b.topics[t]), i, i+1) +} + +// A worker runs a worker goroutine and helps coordinate its shutdown. +type worker struct { + ctx context.Context + stop context.CancelFunc + stopped chan struct{} +} + +// runWorker creates a worker goroutine running fn. The context passed +// to fn is canceled by [worker.Stop]. +func runWorker(fn func(context.Context)) *worker { + ctx, stop := context.WithCancel(context.Background()) + ret := &worker{ + ctx: ctx, + stop: stop, + stopped: make(chan struct{}), + } + go ret.run(fn) + return ret +} + +func (w *worker) run(fn func(context.Context)) { + defer close(w.stopped) + fn(w.ctx) +} + +// Stop signals the worker goroutine to shut down. +func (w *worker) Stop() { w.stop() } + +// Done returns a channel that is closed when the worker goroutine +// exits. +func (w *worker) Done() <-chan struct{} { return w.stopped } + +// Wait waits until the worker goroutine has exited. +func (w *worker) Wait() { <-w.stopped } + +// StopAndWait signals the worker goroutine to shut down, then waits +// for it to exit. +func (w *worker) StopAndWait() { + w.stop() + <-w.stopped +} + +// stopFlag is a value that can be watched for a notification. The +// zero value is ready for use. +// +// The flag is notified by running [stopFlag.Stop]. Stop can be called +// multiple times. Upon the first call to Stop, [stopFlag.Done] is +// closed, all pending [stopFlag.Wait] calls return, and future Wait +// calls return immediately. +// +// A stopFlag can only notify once, and is intended for use as a +// one-way shutdown signal that's lighter than a cancellable +// context.Context. +type stopFlag struct { + // guards the lazy construction of stopped, and the value of + // alreadyStopped. + mu sync.Mutex + stopped chan struct{} + alreadyStopped bool +} + +func (s *stopFlag) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + if s.alreadyStopped { + return + } + s.alreadyStopped = true + if s.stopped == nil { + s.stopped = make(chan struct{}) + } + close(s.stopped) +} + +func (s *stopFlag) Done() <-chan struct{} { + s.mu.Lock() + defer s.mu.Unlock() + if s.stopped == nil { + s.stopped = make(chan struct{}) + } + return s.stopped +} + +func (s *stopFlag) Wait() { + <-s.Done() +} diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go new file mode 100644 index 000000000..e159b6a12 --- /dev/null +++ b/util/eventbus/bus_test.go @@ -0,0 +1,203 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus_test + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/creachadair/taskgroup" + "github.com/google/go-cmp/cmp" + "tailscale.com/util/eventbus" +) + +type EventA struct { + Counter int +} + +type EventB struct { + Counter int +} + +func TestBus(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c := b.Client("TestSub") + defer c.Close() + s := eventbus.Subscribe[EventA](c) + + go func() { + p := b.Client("TestPub") + defer p.Close() + pa := eventbus.Publish[EventA](p) + defer pa.Close() + pb := eventbus.Publish[EventB](p) + defer pb.Close() + pa.Publish(EventA{1}) + pb.Publish(EventB{2}) + pa.Publish(EventA{3}) + }() + + want := expectEvents(t, EventA{1}, EventA{3}) + for !want.Empty() { + select { + case got := <-s.Events(): + want.Got(got) + case <-s.Done(): + t.Fatalf("queue closed unexpectedly") + case <-time.After(time.Second): + t.Fatalf("timed out waiting for event") + } + } +} + +func TestBusMultipleConsumers(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c1 := b.Client("TestSubA") + defer c1.Close() + s1 := eventbus.Subscribe[EventA](c1) + + c2 := b.Client("TestSubB") + defer c2.Close() + s2A := eventbus.Subscribe[EventA](c2) + s2B := eventbus.Subscribe[EventB](c2) + + go func() { + p := b.Client("TestPub") + defer p.Close() + pa := eventbus.Publish[EventA](p) + defer pa.Close() + pb := eventbus.Publish[EventB](p) + defer pb.Close() + pa.Publish(EventA{1}) + pb.Publish(EventB{2}) + pa.Publish(EventA{3}) + }() + + wantA := expectEvents(t, EventA{1}, EventA{3}) + wantB := expectEvents(t, EventA{1}, EventB{2}, EventA{3}) + for !wantA.Empty() || !wantB.Empty() { + select { + case got := <-s1.Events(): + wantA.Got(got) + case got := <-s2A.Events(): + wantB.Got(got) + case got := <-s2B.Events(): + wantB.Got(got) + case <-s1.Done(): + t.Fatalf("queue closed unexpectedly") + case <-s2A.Done(): + t.Fatalf("queue closed unexpectedly") + case <-s2B.Done(): + t.Fatalf("queue closed unexpectedly") + case <-time.After(time.Second): + t.Fatalf("timed out waiting for event") + } + } +} + +func TestSpam(t *testing.T) { + b := eventbus.New() + defer b.Close() + + const ( + publishers = 100 + eventsPerPublisher = 20 + wantEvents = publishers * eventsPerPublisher + subscribers = 100 + ) + + var g taskgroup.Group + + received := make([][]EventA, subscribers) + for i := range subscribers { + c := b.Client(fmt.Sprintf("Subscriber%d", i)) + defer c.Close() + s := eventbus.Subscribe[EventA](c) + g.Go(func() error { + for range wantEvents { + select { + case evt := <-s.Events(): + received[i] = append(received[i], evt) + case <-s.Done(): + t.Errorf("queue done before expected number of events received") + return errors.New("queue prematurely closed") + case <-time.After(5 * time.Second): + t.Errorf("timed out waiting for expected bus event after %d events", len(received[i])) + return errors.New("timeout") + } + } + return nil + }) + } + + published := make([][]EventA, publishers) + for i := range publishers { + g.Run(func() { + c := b.Client(fmt.Sprintf("Publisher%d", i)) + p := eventbus.Publish[EventA](c) + for j := range eventsPerPublisher { + evt := EventA{i*eventsPerPublisher + j} + p.Publish(evt) + published[i] = append(published[i], evt) + } + }) + } + + if err := g.Wait(); err != nil { + t.Fatal(err) + } + var last []EventA + for i, got := range received { + if len(got) != wantEvents { + // Receiving goroutine already reported an error, we just need + // to fail early within the main test goroutine. + t.FailNow() + } + if last == nil { + continue + } + if diff := cmp.Diff(got, last); diff != "" { + t.Errorf("Subscriber %d did not see the same events as %d (-got+want):\n%s", i, i-1, diff) + } + last = got + } + for i, sent := range published { + if got := len(sent); got != eventsPerPublisher { + t.Fatalf("Publisher %d sent %d events, want %d", i, got, eventsPerPublisher) + } + } + + // TODO: check that the published sequences are proper + // subsequences of the received slices. +} + +type queueChecker struct { + t *testing.T + want []any +} + +func expectEvents(t *testing.T, want ...any) *queueChecker { + return &queueChecker{t, want} +} + +func (q *queueChecker) Got(v any) { + q.t.Helper() + if q.Empty() { + q.t.Fatalf("queue got unexpected %v", v) + } + if v != q.want[0] { + q.t.Fatalf("queue got %#v, want %#v", v, q.want[0]) + } + q.want = q.want[1:] +} + +func (q *queueChecker) Empty() bool { + return len(q.want) == 0 +} diff --git a/util/eventbus/client.go b/util/eventbus/client.go new file mode 100644 index 000000000..a7a88c0a1 --- /dev/null +++ b/util/eventbus/client.go @@ -0,0 +1,127 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "reflect" + "sync" + + "tailscale.com/util/set" +) + +// A Client can publish and subscribe to events on its attached +// bus. See [Publish] to publish events, and [Subscribe] to receive +// events. +// +// Subscribers that share the same client receive events one at a +// time, in the order they were published. +type Client struct { + name string + bus *Bus + publishDebug hook[PublishedEvent] + + mu sync.Mutex + pub set.Set[publisher] + sub *subscribeState // Lazily created on first subscribe +} + +func (c *Client) Name() string { return c.name } + +// Close closes the client. Implicitly closes all publishers and +// subscribers obtained from this client. +func (c *Client) Close() { + var ( + pub set.Set[publisher] + sub *subscribeState + ) + + c.mu.Lock() + pub, c.pub = c.pub, nil + sub, c.sub = c.sub, nil + c.mu.Unlock() + + if sub != nil { + sub.close() + } + for p := range pub { + p.Close() + } +} + +func (c *Client) snapshotSubscribeQueue() []DeliveredEvent { + return c.peekSubscribeState().snapshotQueue() +} + +func (c *Client) peekSubscribeState() *subscribeState { + c.mu.Lock() + defer c.mu.Unlock() + return c.sub +} + +func (c *Client) publishTypes() []reflect.Type { + c.mu.Lock() + defer c.mu.Unlock() + ret := make([]reflect.Type, 0, len(c.pub)) + for pub := range c.pub { + ret = append(ret, pub.publishType()) + } + return ret +} + +func (c *Client) subscribeTypes() []reflect.Type { + return c.peekSubscribeState().subscribeTypes() +} + +func (c *Client) subscribeState() *subscribeState { + c.mu.Lock() + defer c.mu.Unlock() + if c.sub == nil { + c.sub = newSubscribeState(c) + } + return c.sub +} + +func (c *Client) addPublisher(pub publisher) { + c.mu.Lock() + defer c.mu.Unlock() + c.pub.Add(pub) +} + +func (c *Client) deletePublisher(pub publisher) { + c.mu.Lock() + defer c.mu.Unlock() + c.pub.Delete(pub) +} + +func (c *Client) addSubscriber(t reflect.Type, s *subscribeState) { + c.bus.subscribe(t, s) +} + +func (c *Client) deleteSubscriber(t reflect.Type, s *subscribeState) { + c.bus.unsubscribe(t, s) +} + +func (c *Client) publish() chan<- PublishedEvent { + return c.bus.write +} + +func (c *Client) shouldPublish(t reflect.Type) bool { + return c.publishDebug.active() || c.bus.shouldPublish(t) +} + +// Subscribe requests delivery of events of type T through the given +// Queue. Panics if the queue already has a subscriber for T. +func Subscribe[T any](c *Client) *Subscriber[T] { + return newSubscriber[T](c.subscribeState()) +} + +// Publisher returns a publisher for event type T using the given +// client. +func Publish[T any](c *Client) *Publisher[T] { + ret := newPublisher[T](c) + c.mu.Lock() + defer c.mu.Unlock() + c.pub.Add(ret) + return ret +} diff --git a/util/eventbus/debug-demo/main.go b/util/eventbus/debug-demo/main.go new file mode 100644 index 000000000..a6d232d88 --- /dev/null +++ b/util/eventbus/debug-demo/main.go @@ -0,0 +1,103 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// debug-demo is a program that serves a bus's debug interface over +// HTTP, then generates some fake traffic from a handful of +// clients. It is an aid to development, to have something to present +// on the debug interfaces while writing them. +package main + +import ( + "log" + "math/rand/v2" + "net/http" + "net/netip" + "time" + + "tailscale.com/tsweb" + "tailscale.com/types/key" + "tailscale.com/util/eventbus" +) + +func main() { + b := eventbus.New() + c := b.Client("RouteMonitor") + go testPub[RouteAdded](c, 5*time.Second) + go testPub[RouteRemoved](c, 5*time.Second) + c = b.Client("ControlClient") + go testPub[PeerAdded](c, 3*time.Second) + go testPub[PeerRemoved](c, 6*time.Second) + c = b.Client("Portmapper") + go testPub[PortmapAcquired](c, 10*time.Second) + go testPub[PortmapLost](c, 15*time.Second) + go testSub[RouteAdded](c) + c = b.Client("WireguardConfig") + go testSub[PeerAdded](c) + go testSub[PeerRemoved](c) + c = b.Client("Magicsock") + go testPub[PeerPathChanged](c, 5*time.Second) + go testSub[RouteAdded](c) + go testSub[RouteRemoved](c) + go testSub[PortmapAcquired](c) + go testSub[PortmapLost](c) + + m := http.NewServeMux() + d := tsweb.Debugger(m) + b.Debugger().RegisterHTTP(d) + + m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/debug/bus", http.StatusFound) + }) + log.Printf("Serving debug interface at http://localhost:8185/debug/bus") + http.ListenAndServe(":8185", m) +} + +func testPub[T any](c *eventbus.Client, every time.Duration) { + p := eventbus.Publish[T](c) + for { + jitter := time.Duration(rand.N(2000)) * time.Millisecond + time.Sleep(jitter) + var zero T + log.Printf("%s publish: %T", c.Name(), zero) + p.Publish(zero) + time.Sleep(every) + } +} + +func testSub[T any](c *eventbus.Client) { + s := eventbus.Subscribe[T](c) + for v := range s.Events() { + log.Printf("%s received: %T", c.Name(), v) + } +} + +type RouteAdded struct { + Prefix netip.Prefix + Via netip.Addr + Priority int +} +type RouteRemoved struct { + Prefix netip.Addr +} + +type PeerAdded struct { + ID int + Key key.NodePublic +} +type PeerRemoved struct { + ID int + Key key.NodePublic +} + +type PortmapAcquired struct { + Endpoint netip.Addr +} +type PortmapLost struct { + Endpoint netip.Addr +} + +type PeerPathChanged struct { + ID int + EndpointID int + Quality int +} diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go new file mode 100644 index 000000000..832d72ac0 --- /dev/null +++ b/util/eventbus/debug.go @@ -0,0 +1,188 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "cmp" + "fmt" + "reflect" + "slices" + "sync" + "sync/atomic" + + "tailscale.com/tsweb" +) + +// A Debugger offers access to a bus's privileged introspection and +// debugging facilities. +// +// The debugger's functionality is intended for humans and their tools +// to examine and troubleshoot bus clients, and should not be used in +// normal codepaths. +// +// In particular, the debugger provides access to information that is +// deliberately withheld from bus clients to encourage more robust and +// maintainable code - for example, the sender of an event, or the +// event streams of other clients. Please don't use the debugger to +// circumvent these restrictions for purposes other than debugging. +type Debugger struct { + bus *Bus +} + +// Clients returns a list of all clients attached to the bus. +func (d *Debugger) Clients() []*Client { + ret := d.bus.listClients() + slices.SortFunc(ret, func(a, b *Client) int { + return cmp.Compare(a.Name(), b.Name()) + }) + return ret +} + +// PublishQueue returns the contents of the publish queue. +// +// The publish queue contains events that have been accepted by the +// bus from Publish() calls, but have not yet been routed to relevant +// subscribers. +// +// This queue is expected to be almost empty in normal operation. A +// full publish queue indicates that a slow subscriber downstream is +// causing backpressure and stalling the bus. +func (d *Debugger) PublishQueue() []PublishedEvent { + return d.bus.snapshotPublishQueue() +} + +// checkClient verifies that client is attached to the same bus as the +// Debugger, and panics if not. +func (d *Debugger) checkClient(client *Client) { + if client.bus != d.bus { + panic(fmt.Errorf("SubscribeQueue given client belonging to wrong bus")) + } +} + +// SubscribeQueue returns the contents of the given client's subscribe +// queue. +// +// The subscribe queue contains events that are to be delivered to the +// client, but haven't yet been handed off to the relevant +// [Subscriber]. +// +// This queue is expected to be almost empty in normal operation. A +// full subscribe queue indicates that the client is accepting events +// too slowly, and may be causing the rest of the bus to stall. +func (d *Debugger) SubscribeQueue(client *Client) []DeliveredEvent { + d.checkClient(client) + return client.snapshotSubscribeQueue() +} + +// WatchBus streams information about all events passing through the +// bus. +// +// Monitored events are delivered in the bus's global publication +// order (see "Concurrency properties" in the package docs). +// +// The caller must consume monitoring events promptly to avoid +// stalling the bus (see "Expected subscriber behavior" in the package +// docs). +func (d *Debugger) WatchBus() *Subscriber[RoutedEvent] { + return newMonitor(d.bus.routeDebug.add) +} + +// WatchPublish streams information about all events published by the +// given client. +// +// Monitored events are delivered in the bus's global publication +// order (see "Concurrency properties" in the package docs). +// +// The caller must consume monitoring events promptly to avoid +// stalling the bus (see "Expected subscriber behavior" in the package +// docs). +func (d *Debugger) WatchPublish(client *Client) *Subscriber[PublishedEvent] { + d.checkClient(client) + return newMonitor(client.publishDebug.add) +} + +// WatchSubscribe streams information about all events received by the +// given client. +// +// Monitored events are delivered in the bus's global publication +// order (see "Concurrency properties" in the package docs). +// +// The caller must consume monitoring events promptly to avoid +// stalling the bus (see "Expected subscriber behavior" in the package +// docs). +func (d *Debugger) WatchSubscribe(client *Client) *Subscriber[DeliveredEvent] { + d.checkClient(client) + return newMonitor(client.subscribeState().debug.add) +} + +// PublishTypes returns the list of types being published by client. +// +// The returned types are those for which the client has obtained a +// [Publisher]. The client may not have ever sent the type in +// question. +func (d *Debugger) PublishTypes(client *Client) []reflect.Type { + d.checkClient(client) + return client.publishTypes() +} + +// SubscribeTypes returns the list of types being subscribed to by +// client. +// +// The returned types are those for which the client has obtained a +// [Subscriber]. The client may not have ever received the type in +// question, and here may not be any publishers of the type. +func (d *Debugger) SubscribeTypes(client *Client) []reflect.Type { + d.checkClient(client) + return client.subscribeTypes() +} + +func (d *Debugger) RegisterHTTP(td *tsweb.DebugHandler) { registerHTTPDebugger(d, td) } + +// A hook collects hook functions that can be run as a group. +type hook[T any] struct { + sync.Mutex + fns []hookFn[T] +} + +var hookID atomic.Uint64 + +// add registers fn to be called when the hook is run. Returns an +// unregistration function that removes fn from the hook when called. +func (h *hook[T]) add(fn func(T)) (remove func()) { + id := hookID.Add(1) + h.Lock() + defer h.Unlock() + h.fns = append(h.fns, hookFn[T]{id, fn}) + return func() { h.remove(id) } +} + +// remove removes the function with the given ID from the hook. +func (h *hook[T]) remove(id uint64) { + h.Lock() + defer h.Unlock() + h.fns = slices.DeleteFunc(h.fns, func(f hookFn[T]) bool { return f.ID == id }) +} + +// active reports whether any functions are registered with the +// hook. This can be used to skip expensive work when the hook is +// inactive. +func (h *hook[T]) active() bool { + h.Lock() + defer h.Unlock() + return len(h.fns) > 0 +} + +// run calls all registered functions with the value v. +func (h *hook[T]) run(v T) { + h.Lock() + defer h.Unlock() + for _, fn := range h.fns { + fn.Fn(v) + } +} + +type hookFn[T any] struct { + ID uint64 + Fn func(T) +} diff --git a/util/eventbus/debughttp.go b/util/eventbus/debughttp.go new file mode 100644 index 000000000..18888cc56 --- /dev/null +++ b/util/eventbus/debughttp.go @@ -0,0 +1,240 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !ios + +package eventbus + +import ( + "bytes" + "cmp" + "embed" + "fmt" + "html/template" + "io" + "io/fs" + "log" + "net/http" + "path/filepath" + "reflect" + "slices" + "strings" + "sync" + + "github.com/coder/websocket" + "tailscale.com/tsweb" +) + +type httpDebugger struct { + *Debugger +} + +func registerHTTPDebugger(d *Debugger, td *tsweb.DebugHandler) { + dh := httpDebugger{d} + td.Handle("bus", "Event bus", dh) + td.HandleSilent("bus/monitor", http.HandlerFunc(dh.serveMonitor)) + td.HandleSilent("bus/style.css", serveStatic("style.css")) + td.HandleSilent("bus/htmx.min.js", serveStatic("htmx.min.js.gz")) + td.HandleSilent("bus/htmx-websocket.min.js", serveStatic("htmx-websocket.min.js.gz")) +} + +//go:embed assets/*.html +var templatesSrc embed.FS + +var templates = sync.OnceValue(func() *template.Template { + d, err := fs.Sub(templatesSrc, "assets") + if err != nil { + panic(fmt.Errorf("getting eventbus debughttp templates subdir: %w", err)) + } + ret := template.New("").Funcs(map[string]any{ + "prettyPrintStruct": prettyPrintStruct, + }) + return template.Must(ret.ParseFS(d, "*")) +}) + +//go:generate go run fetch-htmx.go + +//go:embed assets/*.css assets/*.min.js.gz +var static embed.FS + +func serveStatic(name string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case strings.HasSuffix(name, ".css"): + w.Header().Set("Content-Type", "text/css") + case strings.HasSuffix(name, ".min.js.gz"): + w.Header().Set("Content-Type", "text/javascript") + w.Header().Set("Content-Encoding", "gzip") + case strings.HasSuffix(name, ".js"): + w.Header().Set("Content-Type", "text/javascript") + default: + http.Error(w, "not found", http.StatusNotFound) + return + } + + f, err := static.Open(filepath.Join("assets", name)) + if err != nil { + http.Error(w, fmt.Sprintf("opening asset: %v", err), http.StatusInternalServerError) + return + } + defer f.Close() + if _, err := io.Copy(w, f); err != nil { + http.Error(w, fmt.Sprintf("serving asset: %v", err), http.StatusInternalServerError) + return + } + }) +} + +func render(w http.ResponseWriter, name string, data any) { + err := templates().ExecuteTemplate(w, name+".html", data) + if err != nil { + err := fmt.Errorf("rendering template: %v", err) + log.Print(err) + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func (h httpDebugger) ServeHTTP(w http.ResponseWriter, r *http.Request) { + type clientInfo struct { + *Client + Publish []reflect.Type + Subscribe []reflect.Type + } + type typeInfo struct { + reflect.Type + Publish []*Client + Subscribe []*Client + } + type info struct { + *Debugger + Clients map[string]*clientInfo + Types map[string]*typeInfo + } + + data := info{ + Debugger: h.Debugger, + Clients: map[string]*clientInfo{}, + Types: map[string]*typeInfo{}, + } + + getTypeInfo := func(t reflect.Type) *typeInfo { + if data.Types[t.Name()] == nil { + data.Types[t.Name()] = &typeInfo{ + Type: t, + } + } + return data.Types[t.Name()] + } + + for _, c := range h.Clients() { + ci := &clientInfo{ + Client: c, + Publish: h.PublishTypes(c), + Subscribe: h.SubscribeTypes(c), + } + slices.SortFunc(ci.Publish, func(a, b reflect.Type) int { return cmp.Compare(a.Name(), b.Name()) }) + slices.SortFunc(ci.Subscribe, func(a, b reflect.Type) int { return cmp.Compare(a.Name(), b.Name()) }) + data.Clients[c.Name()] = ci + + for _, t := range ci.Publish { + ti := getTypeInfo(t) + ti.Publish = append(ti.Publish, c) + } + for _, t := range ci.Subscribe { + ti := getTypeInfo(t) + ti.Subscribe = append(ti.Subscribe, c) + } + } + + render(w, "main", data) +} + +func (h httpDebugger) serveMonitor(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Upgrade") == "websocket" { + h.serveMonitorStream(w, r) + return + } + + render(w, "monitor", nil) +} + +func (h httpDebugger) serveMonitorStream(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r, nil) + if err != nil { + return + } + defer conn.CloseNow() + wsCtx := conn.CloseRead(r.Context()) + + mon := h.WatchBus() + defer mon.Close() + + i := 0 + for { + select { + case <-r.Context().Done(): + return + case <-wsCtx.Done(): + return + case <-mon.Done(): + return + case event := <-mon.Events(): + msg, err := conn.Writer(r.Context(), websocket.MessageText) + if err != nil { + return + } + data := map[string]any{ + "Count": i, + "Type": reflect.TypeOf(event.Event), + "Event": event, + } + i++ + if err := templates().ExecuteTemplate(msg, "event.html", data); err != nil { + log.Println(err) + return + } + if err := msg.Close(); err != nil { + return + } + } + } +} + +func prettyPrintStruct(t reflect.Type) string { + if t.Kind() != reflect.Struct { + return t.String() + } + var rec func(io.Writer, int, reflect.Type) + rec = func(out io.Writer, indent int, t reflect.Type) { + ind := strings.Repeat(" ", indent) + fmt.Fprintf(out, "%s", t.String()) + fs := collectFields(t) + if len(fs) > 0 { + io.WriteString(out, " {\n") + for _, f := range fs { + fmt.Fprintf(out, "%s %s ", ind, f.Name) + if f.Type.Kind() == reflect.Struct { + rec(out, indent+1, f.Type) + } else { + fmt.Fprint(out, f.Type) + } + io.WriteString(out, "\n") + } + fmt.Fprintf(out, "%s}", ind) + } + } + + var ret bytes.Buffer + rec(&ret, 0, t) + return ret.String() +} + +func collectFields(t reflect.Type) (ret []reflect.StructField) { + for _, f := range reflect.VisibleFields(t) { + if !f.IsExported() { + continue + } + ret = append(ret, f) + } + return ret +} diff --git a/util/eventbus/debughttp_ios.go b/util/eventbus/debughttp_ios.go new file mode 100644 index 000000000..a898898b7 --- /dev/null +++ b/util/eventbus/debughttp_ios.go @@ -0,0 +1,18 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build ios + +package eventbus + +import "tailscale.com/tsweb" + +func registerHTTPDebugger(d *Debugger, td *tsweb.DebugHandler) { + // The event bus debugging UI uses html/template, which uses + // reflection for method lookups. This forces the compiler to + // retain a lot more code and information to make dynamic method + // dispatch work, which is unacceptable bloat for the iOS build. + // + // TODO: https://github.com/tailscale/tailscale/issues/15297 to + // bring the debug UI back to iOS somehow. +} diff --git a/util/eventbus/doc.go b/util/eventbus/doc.go new file mode 100644 index 000000000..964a686ea --- /dev/null +++ b/util/eventbus/doc.go @@ -0,0 +1,92 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// Package eventbus provides an in-process event bus. +// +// An event bus connects publishers of typed events with subscribers +// interested in those events. Typically, there is one global event +// bus per process. +// +// # Usage +// +// To send or receive events, first use [Bus.Client] to register with +// the bus. Clients should register with a human-readable name that +// identifies the code using the client, to aid in debugging. +// +// To publish events, use [Publish] on a Client to get a typed +// publisher for your event type, then call [Publisher.Publish] as +// needed. If your event is expensive to construct, you can optionally +// use [Publisher.ShouldPublish] to skip the work if nobody is +// listening for the event. +// +// To receive events, use [Subscribe] to get a typed subscriber for +// each event type you're interested in. Receive the events themselves +// by selecting over all your [Subscriber.Events] channels, as well as +// [Subscriber.Done] for shutdown notifications. +// +// # Concurrency properties +// +// The bus serializes all published events across all publishers, and +// preserves that ordering when delivering to subscribers that are +// attached to the same Client. In more detail: +// +// - An event is published to the bus at some instant between the +// start and end of the call to [Publisher.Publish]. +// - Two events cannot be published at the same instant, and so are +// totally ordered by their publication time. Given two events E1 +// and E2, either E1 happens before E2, or E2 happens before E1. +// - Clients dispatch events to their Subscribers in publication +// order: if E1 happens before E2, the client always delivers E1 +// before E2. +// - Clients do not synchronize subscriptions with each other: given +// clients C1 and C2, both subscribed to events E1 and E2, C1 may +// deliver both E1 and E2 before C2 delivers E1. +// +// Less formally: there is one true timeline of all published events. +// If you make a Client and subscribe to events, you will receive +// events one at a time, in the same order as the one true +// timeline. You will "skip over" events you didn't subscribe to, but +// your view of the world always moves forward in time, never +// backwards, and you will observe events in the same order as +// everyone else. +// +// However, you cannot assume that what your client see as "now" is +// the same as what other clients. They may be further behind you in +// working through the timeline, or running ahead of you. This means +// you should be careful about reaching out to another component +// directly after receiving an event, as its view of the world may not +// yet (or ever) be exactly consistent with yours. +// +// To make your code more testable and understandable, you should try +// to structure it following the actor model: you have some local +// state over which you have authority, but your only way to interact +// with state elsewhere in the program is to receive and process +// events coming from elsewhere, or to emit events of your own. +// +// # Expected subscriber behavior +// +// Subscribers are expected to promptly receive their events on +// [Subscriber.Events]. The bus has a small, fixed amount of internal +// buffering, meaning that a slow subscriber will eventually cause +// backpressure and block publication of all further events. +// +// In general, you should receive from your subscriber(s) in a loop, +// and only do fast state updates within that loop. Any heavier work +// should be offloaded to another goroutine. +// +// Causing publishers to block from backpressure is considered a bug +// in the slow subscriber causing the backpressure, and should be +// addressed there. Publishers should assume that Publish will not +// block for extended periods of time, and should not make exceptional +// effort to behave gracefully if they do get blocked. +// +// These blocking semantics are provisional and subject to +// change. Please speak up if this causes development pain, so that we +// can adapt the semantics to better suit our needs. +// +// # Debugging facilities +// +// The [Debugger], obtained through [Bus.Debugger], provides +// introspection facilities to monitor events flowing through the bus, +// and inspect publisher and subscriber state. +package eventbus diff --git a/util/eventbus/fetch-htmx.go b/util/eventbus/fetch-htmx.go new file mode 100644 index 000000000..f80d50257 --- /dev/null +++ b/util/eventbus/fetch-htmx.go @@ -0,0 +1,93 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build ignore + +// Program fetch-htmx fetches and installs local copies of the HTMX +// library and its dependencies, used by the debug UI. It is meant to +// be run via go generate. +package main + +import ( + "compress/gzip" + "crypto/sha512" + "encoding/base64" + "fmt" + "io" + "log" + "net/http" + "os" +) + +func main() { + // Hash from https://htmx.org/docs/#installing + htmx, err := fetchHashed("https://unpkg.com/htmx.org@2.0.4", "HGfztofotfshcF7+8n44JQL2oJmowVChPTg48S+jvZoztPfvwD79OC/LTtG6dMp+") + if err != nil { + log.Fatalf("fetching htmx: %v", err) + } + + // Hash SHOULD be from https://htmx.org/extensions/ws/ , but the + // hash is currently incorrect, see + // https://github.com/bigskysoftware/htmx-extensions/issues/153 + // + // Until that bug is resolved, hash was obtained by rebuilding the + // extension from git source, and verifying that the hash matches + // what unpkg is serving. + ws, err := fetchHashed("https://unpkg.com/htmx-ext-ws@2.0.2", "932iIqjARv+Gy0+r6RTGrfCkCKS5MsF539Iqf6Vt8L4YmbnnWI2DSFoMD90bvXd0") + if err != nil { + log.Fatalf("fetching htmx-websockets: %v", err) + } + + if err := writeGz("assets/htmx.min.js.gz", htmx); err != nil { + log.Fatalf("writing htmx.min.js.gz: %v", err) + } + if err := writeGz("assets/htmx-websocket.min.js.gz", ws); err != nil { + log.Fatalf("writing htmx-websocket.min.js.gz: %v", err) + } +} + +func writeGz(path string, bs []byte) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + g, err := gzip.NewWriterLevel(f, gzip.BestCompression) + if err != nil { + return err + } + + if _, err := g.Write(bs); err != nil { + return err + } + + if err := g.Flush(); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + return nil +} + +func fetchHashed(url, wantHash string) ([]byte, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("fetching %q returned error status: %s", url, resp.Status) + } + ret, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading file from %q: %v", url, err) + } + h := sha512.Sum384(ret) + got := base64.StdEncoding.EncodeToString(h[:]) + if got != wantHash { + return nil, fmt.Errorf("wrong hash for %q: got %q, want %q", url, got, wantHash) + } + return ret, nil +} diff --git a/util/eventbus/publish.go b/util/eventbus/publish.go new file mode 100644 index 000000000..9897114b6 --- /dev/null +++ b/util/eventbus/publish.go @@ -0,0 +1,74 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "reflect" +) + +// publisher is a uniformly typed wrapper around Publisher[T], so that +// debugging facilities can look at active publishers. +type publisher interface { + publishType() reflect.Type + Close() +} + +// A Publisher publishes typed events on a bus. +type Publisher[T any] struct { + client *Client + stop stopFlag +} + +func newPublisher[T any](c *Client) *Publisher[T] { + ret := &Publisher[T]{ + client: c, + } + c.addPublisher(ret) + return ret +} + +// Close closes the publisher. +// +// Calls to Publish after Close silently do nothing. +func (p *Publisher[T]) Close() { + // Just unblocks any active calls to Publish, no other + // synchronization needed. + p.stop.Stop() + p.client.deletePublisher(p) +} + +func (p *Publisher[T]) publishType() reflect.Type { + return reflect.TypeFor[T]() +} + +// Publish publishes event v on the bus. +func (p *Publisher[T]) Publish(v T) { + // Check for just a stopped publisher or bus before trying to + // write, so that once closed Publish consistently does nothing. + select { + case <-p.stop.Done(): + return + default: + } + + evt := PublishedEvent{ + Event: v, + From: p.client, + } + + select { + case p.client.publish() <- evt: + case <-p.stop.Done(): + } +} + +// ShouldPublish reports whether anyone is subscribed to the events +// that this publisher emits. +// +// ShouldPublish can be used to skip expensive event construction if +// nobody seems to care. Publishers must not assume that someone will +// definitely receive an event if ShouldPublish returns true. +func (p *Publisher[T]) ShouldPublish() bool { + return p.client.shouldPublish(reflect.TypeFor[T]()) +} diff --git a/util/eventbus/queue.go b/util/eventbus/queue.go new file mode 100644 index 000000000..a62bf3c62 --- /dev/null +++ b/util/eventbus/queue.go @@ -0,0 +1,85 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "slices" +) + +const maxQueuedItems = 16 + +// queue is an ordered queue of length up to maxQueuedItems. +type queue[T any] struct { + vals []T + start int +} + +// canAppend reports whether a value can be appended to q.vals without +// shifting values around. +func (q *queue[T]) canAppend() bool { + return cap(q.vals) < maxQueuedItems || len(q.vals) < cap(q.vals) +} + +func (q *queue[T]) Full() bool { + return q.start == 0 && !q.canAppend() +} + +func (q *queue[T]) Empty() bool { + return q.start == len(q.vals) +} + +func (q *queue[T]) Len() int { + return len(q.vals) - q.start +} + +// Add adds v to the end of the queue. Blocks until append can be +// done. +func (q *queue[T]) Add(v T) { + if !q.canAppend() { + if q.start == 0 { + panic("Add on a full queue") + } + + // Slide remaining values back to the start of the array. + n := copy(q.vals, q.vals[q.start:]) + toClear := len(q.vals) - n + clear(q.vals[len(q.vals)-toClear:]) + q.vals = q.vals[:n] + q.start = 0 + } + + q.vals = append(q.vals, v) +} + +// Peek returns the first value in the queue, without removing it from +// the queue, or nil if the queue is empty. +func (q *queue[T]) Peek() T { + if q.Empty() { + var zero T + return zero + } + + return q.vals[q.start] +} + +// Drop discards the first value in the queue, if any. +func (q *queue[T]) Drop() { + if q.Empty() { + return + } + + var zero T + q.vals[q.start] = zero + q.start++ + if q.Empty() { + // Reset cursor to start of array, it's free to do. + q.start = 0 + q.vals = q.vals[:0] + } +} + +// Snapshot returns a copy of the queue's contents. +func (q *queue[T]) Snapshot() []T { + return slices.Clone(q.vals[q.start:]) +} diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go new file mode 100644 index 000000000..ba17e8548 --- /dev/null +++ b/util/eventbus/subscribe.go @@ -0,0 +1,254 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package eventbus + +import ( + "context" + "fmt" + "reflect" + "sync" +) + +type DeliveredEvent struct { + Event any + From *Client + To *Client +} + +// subscriber is a uniformly typed wrapper around Subscriber[T], so +// that debugging facilities can look at active subscribers. +type subscriber interface { + subscribeType() reflect.Type + // dispatch is a function that dispatches the head value in vals to + // a subscriber, while also handling stop and incoming queue write + // events. + // + // dispatch exists because of the strongly typed Subscriber[T] + // wrapper around subscriptions: within the bus events are boxed in an + // 'any', and need to be unpacked to their full type before delivery + // to the subscriber. This involves writing to a strongly-typed + // channel, so subscribeState cannot handle that dispatch by itself - + // but if that strongly typed send blocks, we also need to keep + // processing other potential sources of wakeups, which is how we end + // up at this awkward type signature and sharing of internal state + // through dispatch. + dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool + Close() +} + +// subscribeState handles dispatching of events received from a Bus. +type subscribeState struct { + client *Client + + dispatcher *worker + write chan DeliveredEvent + snapshot chan chan []DeliveredEvent + debug hook[DeliveredEvent] + + outputsMu sync.Mutex + outputs map[reflect.Type]subscriber +} + +func newSubscribeState(c *Client) *subscribeState { + ret := &subscribeState{ + client: c, + write: make(chan DeliveredEvent), + snapshot: make(chan chan []DeliveredEvent), + outputs: map[reflect.Type]subscriber{}, + } + ret.dispatcher = runWorker(ret.pump) + return ret +} + +func (q *subscribeState) pump(ctx context.Context) { + var vals queue[DeliveredEvent] + acceptCh := func() chan DeliveredEvent { + if vals.Full() { + return nil + } + return q.write + } + for { + if !vals.Empty() { + val := vals.Peek() + sub := q.subscriberFor(val.Event) + if sub == nil { + // Raced with unsubscribe. + vals.Drop() + continue + } + if !sub.dispatch(ctx, &vals, acceptCh, q.snapshot) { + return + } + + if q.debug.active() { + q.debug.run(DeliveredEvent{ + Event: val.Event, + From: val.From, + To: q.client, + }) + } + } else { + // Keep the cases in this select in sync with + // Subscriber.dispatch below. The only different should be + // that this select doesn't deliver queued values to + // anyone, and unconditionally accepts new values. + select { + case val := <-q.write: + vals.Add(val) + case <-ctx.Done(): + return + case ch := <-q.snapshot: + ch <- vals.Snapshot() + } + } + } +} + +func (s *subscribeState) snapshotQueue() []DeliveredEvent { + if s == nil { + return nil + } + + resp := make(chan []DeliveredEvent) + select { + case s.snapshot <- resp: + return <-resp + case <-s.dispatcher.Done(): + return nil + } +} + +func (s *subscribeState) subscribeTypes() []reflect.Type { + if s == nil { + return nil + } + + s.outputsMu.Lock() + defer s.outputsMu.Unlock() + ret := make([]reflect.Type, 0, len(s.outputs)) + for t := range s.outputs { + ret = append(ret, t) + } + return ret +} + +func (s *subscribeState) addSubscriber(t reflect.Type, sub subscriber) { + s.outputsMu.Lock() + defer s.outputsMu.Unlock() + if s.outputs[t] != nil { + panic(fmt.Errorf("double subscription for event %s", t)) + } + s.outputs[t] = sub + s.client.addSubscriber(t, s) +} + +func (s *subscribeState) deleteSubscriber(t reflect.Type) { + s.outputsMu.Lock() + defer s.outputsMu.Unlock() + delete(s.outputs, t) + s.client.deleteSubscriber(t, s) +} + +func (q *subscribeState) subscriberFor(val any) subscriber { + q.outputsMu.Lock() + defer q.outputsMu.Unlock() + return q.outputs[reflect.TypeOf(val)] +} + +// Close closes the subscribeState. Implicitly closes all Subscribers +// linked to this state, and any pending events are discarded. +func (s *subscribeState) close() { + s.dispatcher.StopAndWait() + + var subs map[reflect.Type]subscriber + s.outputsMu.Lock() + subs, s.outputs = s.outputs, nil + s.outputsMu.Unlock() + for _, sub := range subs { + sub.Close() + } +} + +func (s *subscribeState) closed() <-chan struct{} { + return s.dispatcher.Done() +} + +// A Subscriber delivers one type of event from a [Client]. +type Subscriber[T any] struct { + stop stopFlag + read chan T + unregister func() +} + +func newSubscriber[T any](r *subscribeState) *Subscriber[T] { + t := reflect.TypeFor[T]() + + ret := &Subscriber[T]{ + read: make(chan T), + unregister: func() { r.deleteSubscriber(t) }, + } + r.addSubscriber(t, ret) + + return ret +} + +func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { + ret := &Subscriber[T]{ + read: make(chan T, 100), // arbitrary, large + } + ret.unregister = attach(ret.monitor) + return ret +} + +func (s *Subscriber[T]) subscribeType() reflect.Type { + return reflect.TypeFor[T]() +} + +func (s *Subscriber[T]) monitor(debugEvent T) { + select { + case s.read <- debugEvent: + case <-s.stop.Done(): + } +} + +func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { + t := vals.Peek().Event.(T) + for { + // Keep the cases in this select in sync with subscribeState.pump + // above. The only different should be that this select + // delivers a value on s.read. + select { + case s.read <- t: + vals.Drop() + return true + case val := <-acceptCh(): + vals.Add(val) + case <-ctx.Done(): + return false + case ch := <-snapshot: + ch <- vals.Snapshot() + } + } +} + +// Events returns a channel on which the subscriber's events are +// delivered. +func (s *Subscriber[T]) Events() <-chan T { + return s.read +} + +// Done returns a channel that is closed when the subscriber is +// closed. +func (s *Subscriber[T]) Done() <-chan struct{} { + return s.stop.Done() +} + +// Close closes the Subscriber, indicating the caller no longer wishes +// to receive this event type. After Close, receives on +// [Subscriber.Events] block for ever. +func (s *Subscriber[T]) Close() { + s.stop.Stop() // unblock receivers + s.unregister() +} diff --git a/util/syspolicy/internal/internal.go b/util/syspolicy/internal/internal.go index 8f2889625..2e1737e5b 100644 --- a/util/syspolicy/internal/internal.go +++ b/util/syspolicy/internal/internal.go @@ -56,10 +56,10 @@ func EqualJSONForTest(tb TB, j1, j2 jsontext.Value) (s1, s2 string, equal bool) return "", "", true } // Otherwise, format the values for display and return false. - if err := j1.Indent("", "\t"); err != nil { + if err := j1.Indent(); err != nil { tb.Fatal(err) } - if err := j2.Indent("", "\t"); err != nil { + if err := j2.Indent(); err != nil { tb.Fatal(err) } return j1.String(), j2.String(), false diff --git a/util/syspolicy/setting/origin.go b/util/syspolicy/setting/origin.go index 078ef758e..4c7cc7025 100644 --- a/util/syspolicy/setting/origin.go +++ b/util/syspolicy/setting/origin.go @@ -50,22 +50,27 @@ func (s Origin) String() string { return s.Scope().String() } -// MarshalJSONV2 implements [jsonv2.MarshalerV2]. -func (s Origin) MarshalJSONV2(out *jsontext.Encoder, opts jsonv2.Options) error { - return jsonv2.MarshalEncode(out, &s.data, opts) +var ( + _ jsonv2.MarshalerTo = (*Origin)(nil) + _ jsonv2.UnmarshalerFrom = (*Origin)(nil) +) + +// MarshalJSONTo implements [jsonv2.MarshalerTo]. +func (s Origin) MarshalJSONTo(out *jsontext.Encoder) error { + return jsonv2.MarshalEncode(out, &s.data) } -// UnmarshalJSONV2 implements [jsonv2.UnmarshalerV2]. -func (s *Origin) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) error { - return jsonv2.UnmarshalDecode(in, &s.data, opts) +// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom]. +func (s *Origin) UnmarshalJSONFrom(in *jsontext.Decoder) error { + return jsonv2.UnmarshalDecode(in, &s.data) } // MarshalJSON implements [json.Marshaler]. func (s Origin) MarshalJSON() ([]byte, error) { - return jsonv2.Marshal(s) // uses MarshalJSONV2 + return jsonv2.Marshal(s) // uses MarshalJSONTo } // UnmarshalJSON implements [json.Unmarshaler]. func (s *Origin) UnmarshalJSON(b []byte) error { - return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONV2 + return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONFrom } diff --git a/util/syspolicy/setting/raw_item.go b/util/syspolicy/setting/raw_item.go index cf46e54b7..9a96073b0 100644 --- a/util/syspolicy/setting/raw_item.go +++ b/util/syspolicy/setting/raw_item.go @@ -75,31 +75,36 @@ func (i RawItem) String() string { return fmt.Sprintf("%v%s", i.data.Value.Value, suffix) } -// MarshalJSONV2 implements [jsonv2.MarshalerV2]. -func (i RawItem) MarshalJSONV2(out *jsontext.Encoder, opts jsonv2.Options) error { - return jsonv2.MarshalEncode(out, &i.data, opts) +var ( + _ jsonv2.MarshalerTo = (*RawItem)(nil) + _ jsonv2.UnmarshalerFrom = (*RawItem)(nil) +) + +// MarshalJSONTo implements [jsonv2.MarshalerTo]. +func (i RawItem) MarshalJSONTo(out *jsontext.Encoder) error { + return jsonv2.MarshalEncode(out, &i.data) } -// UnmarshalJSONV2 implements [jsonv2.UnmarshalerV2]. -func (i *RawItem) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) error { - return jsonv2.UnmarshalDecode(in, &i.data, opts) +// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom]. +func (i *RawItem) UnmarshalJSONFrom(in *jsontext.Decoder) error { + return jsonv2.UnmarshalDecode(in, &i.data) } // MarshalJSON implements [json.Marshaler]. func (i RawItem) MarshalJSON() ([]byte, error) { - return jsonv2.Marshal(i) // uses MarshalJSONV2 + return jsonv2.Marshal(i) // uses MarshalJSONTo } // UnmarshalJSON implements [json.Unmarshaler]. func (i *RawItem) UnmarshalJSON(b []byte) error { - return jsonv2.Unmarshal(b, i) // uses UnmarshalJSONV2 + return jsonv2.Unmarshal(b, i) // uses UnmarshalJSONFrom } // RawValue represents a raw policy setting value read from a policy store. // It is JSON-marshallable and facilitates unmarshalling of JSON values // into corresponding policy setting types, with special handling for JSON numbers // (unmarshalled as float64) and JSON string arrays (unmarshalled as []string). -// See also [RawValue.UnmarshalJSONV2]. +// See also [RawValue.UnmarshalJSONFrom]. type RawValue struct { opt.Value[any] } @@ -114,16 +119,21 @@ func RawValueOf[T RawValueType](v T) RawValue { return RawValue{opt.ValueOf[any](v)} } -// MarshalJSONV2 implements [jsonv2.MarshalerV2]. -func (v RawValue) MarshalJSONV2(out *jsontext.Encoder, opts jsonv2.Options) error { - return jsonv2.MarshalEncode(out, v.Value, opts) +var ( + _ jsonv2.MarshalerTo = (*RawValue)(nil) + _ jsonv2.UnmarshalerFrom = (*RawValue)(nil) +) + +// MarshalJSONTo implements [jsonv2.MarshalerTo]. +func (v RawValue) MarshalJSONTo(out *jsontext.Encoder) error { + return jsonv2.MarshalEncode(out, v.Value) } -// UnmarshalJSONV2 implements [jsonv2.UnmarshalerV2] by attempting to unmarshal +// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom] by attempting to unmarshal // a JSON value as one of the supported policy setting value types (bool, string, uint64, or []string), // based on the JSON value type. It fails if the JSON value is an object, if it's a JSON number that // cannot be represented as a uint64, or if a JSON array contains anything other than strings. -func (v *RawValue) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) error { +func (v *RawValue) UnmarshalJSONFrom(in *jsontext.Decoder) error { var valPtr any switch k := in.PeekKind(); k { case 't', 'f': @@ -139,7 +149,7 @@ func (v *RawValue) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) er default: panic("unreachable") } - if err := jsonv2.UnmarshalDecode(in, valPtr, opts); err != nil { + if err := jsonv2.UnmarshalDecode(in, valPtr); err != nil { v.Value.Clear() return err } @@ -150,12 +160,12 @@ func (v *RawValue) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) er // MarshalJSON implements [json.Marshaler]. func (v RawValue) MarshalJSON() ([]byte, error) { - return jsonv2.Marshal(v) // uses MarshalJSONV2 + return jsonv2.Marshal(v) // uses MarshalJSONTo } // UnmarshalJSON implements [json.Unmarshaler]. func (v *RawValue) UnmarshalJSON(b []byte) error { - return jsonv2.Unmarshal(b, v) // uses UnmarshalJSONV2 + return jsonv2.Unmarshal(b, v) // uses UnmarshalJSONFrom } // RawValues is a map of keyed setting values that can be read from a JSON. diff --git a/util/syspolicy/setting/snapshot.go b/util/syspolicy/setting/snapshot.go index 0af2bae0f..087325a04 100644 --- a/util/syspolicy/setting/snapshot.go +++ b/util/syspolicy/setting/snapshot.go @@ -147,23 +147,28 @@ type snapshotJSON struct { Settings map[Key]RawItem `json:",omitempty"` } -// MarshalJSONV2 implements [jsonv2.MarshalerV2]. -func (s *Snapshot) MarshalJSONV2(out *jsontext.Encoder, opts jsonv2.Options) error { +var ( + _ jsonv2.MarshalerTo = (*Snapshot)(nil) + _ jsonv2.UnmarshalerFrom = (*Snapshot)(nil) +) + +// MarshalJSONTo implements [jsonv2.MarshalerTo]. +func (s *Snapshot) MarshalJSONTo(out *jsontext.Encoder) error { data := &snapshotJSON{} if s != nil { data.Summary = s.summary data.Settings = s.m } - return jsonv2.MarshalEncode(out, data, opts) + return jsonv2.MarshalEncode(out, data) } -// UnmarshalJSONV2 implements [jsonv2.UnmarshalerV2]. -func (s *Snapshot) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) error { +// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom]. +func (s *Snapshot) UnmarshalJSONFrom(in *jsontext.Decoder) error { if s == nil { return errors.New("s must not be nil") } data := &snapshotJSON{} - if err := jsonv2.UnmarshalDecode(in, data, opts); err != nil { + if err := jsonv2.UnmarshalDecode(in, data); err != nil { return err } *s = Snapshot{m: data.Settings, sig: deephash.Hash(&data.Settings), summary: data.Summary} @@ -172,12 +177,12 @@ func (s *Snapshot) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) er // MarshalJSON implements [json.Marshaler]. func (s *Snapshot) MarshalJSON() ([]byte, error) { - return jsonv2.Marshal(s) // uses MarshalJSONV2 + return jsonv2.Marshal(s) // uses MarshalJSONTo } // UnmarshalJSON implements [json.Unmarshaler]. func (s *Snapshot) UnmarshalJSON(b []byte) error { - return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONV2 + return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONFrom } // MergeSnapshots returns a [Snapshot] that contains all [RawItem]s diff --git a/util/syspolicy/setting/summary.go b/util/syspolicy/setting/summary.go index 5ff20e0aa..9864822f7 100644 --- a/util/syspolicy/setting/summary.go +++ b/util/syspolicy/setting/summary.go @@ -54,24 +54,29 @@ func (s Summary) String() string { return s.data.Scope.String() } -// MarshalJSONV2 implements [jsonv2.MarshalerV2]. -func (s Summary) MarshalJSONV2(out *jsontext.Encoder, opts jsonv2.Options) error { - return jsonv2.MarshalEncode(out, &s.data, opts) +var ( + _ jsonv2.MarshalerTo = (*Summary)(nil) + _ jsonv2.UnmarshalerFrom = (*Summary)(nil) +) + +// MarshalJSONTo implements [jsonv2.MarshalerTo]. +func (s Summary) MarshalJSONTo(out *jsontext.Encoder) error { + return jsonv2.MarshalEncode(out, &s.data) } -// UnmarshalJSONV2 implements [jsonv2.UnmarshalerV2]. -func (s *Summary) UnmarshalJSONV2(in *jsontext.Decoder, opts jsonv2.Options) error { - return jsonv2.UnmarshalDecode(in, &s.data, opts) +// UnmarshalJSONFrom implements [jsonv2.UnmarshalerFrom]. +func (s *Summary) UnmarshalJSONFrom(in *jsontext.Decoder) error { + return jsonv2.UnmarshalDecode(in, &s.data) } // MarshalJSON implements [json.Marshaler]. func (s Summary) MarshalJSON() ([]byte, error) { - return jsonv2.Marshal(s) // uses MarshalJSONV2 + return jsonv2.Marshal(s) // uses MarshalJSONTo } // UnmarshalJSON implements [json.Unmarshaler]. func (s *Summary) UnmarshalJSON(b []byte) error { - return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONV2 + return jsonv2.Unmarshal(b, s) // uses UnmarshalJSONFrom } // SummaryOption is an option that configures [Summary] |
