summaryrefslogtreecommitdiffhomepage
path: root/util/goroutines/tracker.go
blob: b0513ef4efa3fa3672859170b3422ae7f9d80e01 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause

package goroutines

import (
	"sync/atomic"

	"tailscale.com/syncs"
	"tailscale.com/util/set"
)

// Tracker tracks a set of goroutines.
type Tracker struct {
	started atomic.Int64 // counter
	running atomic.Int64 // gauge

	mu     syncs.Mutex
	onDone set.HandleSet[func()]
}

func (t *Tracker) Go(f func()) {
	t.started.Add(1)
	t.running.Add(1)
	go t.goAndDecr(f)
}

func (t *Tracker) goAndDecr(f func()) {
	defer t.decr()
	f()
}

func (t *Tracker) decr() {
	t.running.Add(-1)

	t.mu.Lock()
	defer t.mu.Unlock()
	for _, f := range t.onDone {
		go f()
	}
}

// AddDoneCallback adds a callback to be called in a new goroutine
// whenever a goroutine managed by t (excluding ones from this method)
// finishes. It returns a function to remove the callback.
func (t *Tracker) AddDoneCallback(f func()) (remove func()) {
	t.mu.Lock()
	defer t.mu.Unlock()
	if t.onDone == nil {
		t.onDone = set.HandleSet[func()]{}
	}
	h := t.onDone.Add(f)
	return func() {
		t.mu.Lock()
		defer t.mu.Unlock()
		delete(t.onDone, h)
	}
}

func (t *Tracker) RunningGoroutines() int64 {
	return t.running.Load()
}

func (t *Tracker) StartedGoroutines() int64 {
	return t.started.Load()
}