summaryrefslogtreecommitdiffhomepage
path: root/ipn/auditlog/auditlog.go
diff options
context:
space:
mode:
Diffstat (limited to 'ipn/auditlog/auditlog.go')
-rw-r--r--ipn/auditlog/auditlog.go466
1 files changed, 466 insertions, 0 deletions
diff --git a/ipn/auditlog/auditlog.go b/ipn/auditlog/auditlog.go
new file mode 100644
index 000000000..30f39211f
--- /dev/null
+++ b/ipn/auditlog/auditlog.go
@@ -0,0 +1,466 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+// Package auditlog provides a mechanism for logging audit events.
+package auditlog
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "sort"
+ "sync"
+ "time"
+
+ "tailscale.com/ipn"
+ "tailscale.com/tailcfg"
+ "tailscale.com/types/logger"
+ "tailscale.com/util/rands"
+ "tailscale.com/util/set"
+)
+
+// transaction represents an audit log that has not yet been sent to the control plane.
+type transaction struct {
+ // EventID is the unique identifier for the event being logged.
+ // This is used on the client side only and is not sent to control.
+ EventID string `json:",omitempty"`
+ // Retries is the number of times the logger has attempted to send this log.
+ // This is used on the client side only and is not sent to control.
+ Retries int `json:",omitempty"`
+
+ // Action is the action to be logged. It must correspond to a known action in the control plane.
+ Action tailcfg.ClientAuditAction `json:",omitempty"`
+ // Details is an opaque string specific to the action being logged. Empty strings may not
+ // be valid depending on the action being logged.
+ Details string `json:",omitempty"`
+ // TimeStamp is the time at which the audit log was generated on the node.
+ TimeStamp time.Time `json:",omitzero"`
+}
+
+// Transport provides a means for a client to send audit logs to a consumer (typically the control plane).
+type Transport interface {
+ // SendAuditLog sends an audit log to a consumer of audit logs.
+ // Errors should be checked with [IsRetryableError] for retryability.
+ SendAuditLog(context.Context, tailcfg.AuditLogRequest) error
+}
+
+// LogStore provides a means for a [Logger] to persist logs to disk or memory.
+type LogStore interface {
+ // Save saves the given data to a persistent store. Save will overwrite existing data
+ // for the given key.
+ save(key ipn.ProfileID, txns []*transaction) error
+
+ // Load retrieves the data from a persistent store. Returns a nil slice and
+ // no error if no data exists for the given key.
+ load(key ipn.ProfileID) ([]*transaction, error)
+}
+
+// Opts contains the configuration options for a [Logger].
+type Opts struct {
+ // RetryLimit is the maximum number of attempts the logger will make to send a log before giving up.
+ RetryLimit int
+ // Store is the persistent store used to save logs to disk. Must be non-nil.
+ Store LogStore
+ // Logf is the logger used to log messages from the audit logger. Must be non-nil.
+ Logf logger.Logf
+}
+
+// IsRetryableError returns true if the given error is retryable
+// See [controlclient.apiResponseError]. Potentially retryable errors implement the Retryable() method.
+func IsRetryableError(err error) bool {
+ var retryable interface{ Retryable() bool }
+ return errors.As(err, &retryable) && retryable.Retryable()
+}
+
+type backoffOpts struct {
+ min, max time.Duration
+ multiplier float64
+}
+
+// .5, 1, 2, 4, 8, 10, 10, 10, 10, 10...
+var defaultBackoffOpts = backoffOpts{
+ min: time.Millisecond * 500,
+ max: 10 * time.Second,
+ multiplier: 2,
+}
+
+// Logger provides a queue-based mechanism for submitting audit logs to the control plane - or
+// another suitable consumer. Logs are stored to disk and retried until they are successfully sent,
+// or until they permanently fail.
+//
+// Each individual profile/controlclient tuple should construct and manage a unique [Logger] instance.
+type Logger struct {
+ logf logger.Logf
+ retryLimit int // the maximum number of attempts to send a log before giving up.
+ flusher chan struct{} // channel used to signal a flush operation.
+ done chan struct{} // closed when the flush worker exits.
+ ctx context.Context // canceled when the logger is stopped.
+ ctxCancel context.CancelFunc // cancels ctx.
+ backoffOpts // backoff settings for retry operations.
+
+ // mu protects the fields below.
+ mu sync.Mutex
+ store LogStore // persistent storage for unsent logs.
+ profileID ipn.ProfileID // empty if [Logger.SetProfileID] has not been called.
+ transport Transport // nil until [Logger.Start] is called.
+}
+
+// NewLogger creates a new [Logger] with the given options.
+func NewLogger(opts Opts) *Logger {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ al := &Logger{
+ retryLimit: opts.RetryLimit,
+ logf: logger.WithPrefix(opts.Logf, "auditlog: "),
+ store: opts.Store,
+ flusher: make(chan struct{}, 1),
+ done: make(chan struct{}),
+ ctx: ctx,
+ ctxCancel: cancel,
+ backoffOpts: defaultBackoffOpts,
+ }
+ al.logf("created")
+ return al
+}
+
+// FlushAndStop synchronously flushes all pending logs and stops the audit logger.
+// This will block until a final flush operation completes or context is done.
+// If the logger is already stopped, this will return immediately. All unsent
+// logs will be persisted to the store.
+func (al *Logger) FlushAndStop(ctx context.Context) {
+ al.stop()
+ al.flush(ctx)
+}
+
+// SetProfileID sets the profileID for the logger. This must be called before any logs can be enqueued.
+// The profileID of a logger cannot be changed once set.
+func (al *Logger) SetProfileID(profileID ipn.ProfileID) error {
+ al.mu.Lock()
+ defer al.mu.Unlock()
+ if al.profileID != "" {
+ return errors.New("profileID already set")
+ }
+
+ al.profileID = profileID
+ return nil
+}
+
+// Start starts the audit logger with the given transport.
+// It returns an error if the logger is already started.
+func (al *Logger) Start(t Transport) error {
+ al.mu.Lock()
+ defer al.mu.Unlock()
+
+ if al.transport != nil {
+ return errors.New("already started")
+ }
+
+ al.transport = t
+ pending, err := al.storedCountLocked()
+ if err != nil {
+ al.logf("[unexpected] failed to restore logs: %v", err)
+ }
+ go al.flushWorker()
+ if pending > 0 {
+ al.flushAsync()
+ }
+ return nil
+}
+
+// ErrAuditLogStorageFailure is returned when the logger fails to persist logs to the store.
+var ErrAuditLogStorageFailure = errors.New("audit log storage failure")
+
+// Enqueue queues an audit log to be sent to the control plane (or another suitable consumer/transport).
+// This will return an error if the underlying store fails to save the log or we fail to generate a unique
+// eventID for the log.
+func (al *Logger) Enqueue(action tailcfg.ClientAuditAction, details string) error {
+ txn := &transaction{
+ Action: action,
+ Details: details,
+ TimeStamp: time.Now(),
+ }
+ // Generate a suitably random eventID for the transaction.
+ txn.EventID = fmt.Sprint(txn.TimeStamp, rands.HexString(16))
+ return al.enqueue(txn)
+}
+
+// flushAsync requests an asynchronous flush.
+// It is a no-op if a flush is already pending.
+func (al *Logger) flushAsync() {
+ select {
+ case al.flusher <- struct{}{}:
+ default:
+ }
+}
+
+func (al *Logger) flushWorker() {
+ defer close(al.done)
+
+ var retryDelay time.Duration
+ retry := time.NewTimer(0)
+ retry.Stop()
+
+ for {
+ select {
+ case <-al.ctx.Done():
+ return
+ case <-al.flusher:
+ err := al.flush(al.ctx)
+ switch {
+ case errors.Is(err, context.Canceled):
+ // The logger was stopped, no need to retry.
+ return
+ case err != nil:
+ retryDelay = max(al.backoffOpts.min, min(retryDelay*time.Duration(al.backoffOpts.multiplier), al.backoffOpts.max))
+ al.logf("retrying after %v, %v", retryDelay, err)
+ retry.Reset(retryDelay)
+ default:
+ retryDelay = 0
+ retry.Stop()
+ }
+ case <-retry.C:
+ al.flushAsync()
+ }
+ }
+}
+
+// flush attempts to send all pending logs to the control plane.
+// l.mu must not be held.
+func (al *Logger) flush(ctx context.Context) error {
+ al.mu.Lock()
+ pending, err := al.store.load(al.profileID)
+ t := al.transport
+ al.mu.Unlock()
+
+ if err != nil {
+ // This will catch nil profileIDs
+ return fmt.Errorf("failed to restore pending logs: %w", err)
+ }
+ if len(pending) == 0 {
+ return nil
+ }
+ if t == nil {
+ return errors.New("no transport")
+ }
+
+ complete, unsent := al.sendToTransport(ctx, pending, t)
+ al.markTransactionsDone(complete)
+
+ al.mu.Lock()
+ defer al.mu.Unlock()
+ if err = al.appendToStoreLocked(unsent); err != nil {
+ al.logf("[unexpected] failed to persist logs: %v", err)
+ }
+
+ if len(unsent) != 0 {
+ return fmt.Errorf("failed to send %d logs", len(unsent))
+ }
+
+ if len(complete) != 0 {
+ al.logf("complete %d audit log transactions", len(complete))
+ }
+ return nil
+}
+
+// sendToTransport sends all pending logs to the control plane. Returns a pair of slices
+// containing the logs that were successfully sent (or failed permanently) and those that were not.
+//
+// This may require multiple round trips to the control plane and can be a long running transaction.
+func (al *Logger) sendToTransport(ctx context.Context, pending []*transaction, t Transport) (complete []*transaction, unsent []*transaction) {
+ for i, txn := range pending {
+ req := tailcfg.AuditLogRequest{
+ Action: tailcfg.ClientAuditAction(txn.Action),
+ Details: txn.Details,
+ Timestamp: txn.TimeStamp,
+ }
+
+ if err := t.SendAuditLog(ctx, req); err != nil {
+ switch {
+ case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
+ // The contex is done. All further attempts will fail.
+ unsent = append(unsent, pending[i:]...)
+ return complete, unsent
+ case IsRetryableError(err) && txn.Retries+1 < al.retryLimit:
+ // We permit a maximum number of retries for each log. All retriable
+ // errors should be transient and we should be able to send the log eventually, but
+ // we don't want logs to be persisted indefinitely.
+ txn.Retries++
+ unsent = append(unsent, txn)
+ default:
+ complete = append(complete, txn)
+ al.logf("failed permanently: %v", err)
+ }
+ } else {
+ // No error - we're done.
+ complete = append(complete, txn)
+ }
+ }
+
+ return complete, unsent
+}
+
+func (al *Logger) stop() {
+ al.mu.Lock()
+ t := al.transport
+ al.mu.Unlock()
+
+ if t == nil {
+ // No transport means no worker goroutine and done will not be
+ // closed if we cancel the context.
+ return
+ }
+
+ al.ctxCancel()
+ <-al.done
+ al.logf("stopped for profileID: %v", al.profileID)
+}
+
+// appendToStoreLocked persists logs to the store. This will deduplicate
+// logs so it is safe to call this with the same logs multiple time, to
+// requeue failed transactions for example.
+//
+// l.mu must be held.
+func (al *Logger) appendToStoreLocked(txns []*transaction) error {
+ if len(txns) == 0 {
+ return nil
+ }
+
+ if al.profileID == "" {
+ return errors.New("no logId set")
+ }
+
+ persisted, err := al.store.load(al.profileID)
+ if err != nil {
+ al.logf("[unexpected] append failed to restore logs: %v", err)
+ }
+
+ // The order is important here. We want the latest transactions first, which will
+ // ensure when we dedup, the new transactions are seen and the older transactions
+ // are discarded.
+ txnsOut := append(txns, persisted...)
+ txnsOut = deduplicateAndSort(txnsOut)
+
+ return al.store.save(al.profileID, txnsOut)
+}
+
+// storedCountLocked returns the number of logs persisted to the store.
+// al.mu must be held.
+func (al *Logger) storedCountLocked() (int, error) {
+ persisted, err := al.store.load(al.profileID)
+ return len(persisted), err
+}
+
+// markTransactionsDone removes logs from the store that are complete (sent or failed permanently).
+// al.mu must not be held.
+func (al *Logger) markTransactionsDone(sent []*transaction) {
+ al.mu.Lock()
+ defer al.mu.Unlock()
+
+ ids := set.Set[string]{}
+ for _, txn := range sent {
+ ids.Add(txn.EventID)
+ }
+
+ persisted, err := al.store.load(al.profileID)
+ if err != nil {
+ al.logf("[unexpected] markTransactionsDone failed to restore logs: %v", err)
+ }
+ var unsent []*transaction
+ for _, txn := range persisted {
+ if !ids.Contains(txn.EventID) {
+ unsent = append(unsent, txn)
+ }
+ }
+ al.store.save(al.profileID, unsent)
+}
+
+// deduplicateAndSort removes duplicate logs from the given slice and sorts them by timestamp.
+// The first log entry in the slice will be retained, subsequent logs with the same EventID will be discarded.
+func deduplicateAndSort(txns []*transaction) []*transaction {
+ seen := set.Set[string]{}
+ deduped := make([]*transaction, 0, len(txns))
+ for _, txn := range txns {
+ if !seen.Contains(txn.EventID) {
+ deduped = append(deduped, txn)
+ seen.Add(txn.EventID)
+ }
+ }
+ // Sort logs by timestamp - oldest to newest. This will put the oldest logs at
+ // the front of the queue.
+ sort.Slice(deduped, func(i, j int) bool {
+ return deduped[i].TimeStamp.Before(deduped[j].TimeStamp)
+ })
+ return deduped
+}
+
+func (al *Logger) enqueue(txn *transaction) error {
+ al.mu.Lock()
+ defer al.mu.Unlock()
+
+ if err := al.appendToStoreLocked([]*transaction{txn}); err != nil {
+ return fmt.Errorf("%w: %w", ErrAuditLogStorageFailure, err)
+ }
+
+ // If a.transport is nil if the logger is stopped.
+ if al.transport != nil {
+ al.flushAsync()
+ }
+
+ return nil
+}
+
+var _ LogStore = (*logStateStore)(nil)
+
+// logStateStore is a concrete implementation of [LogStore]
+// using [ipn.StateStore] as the underlying storage.
+type logStateStore struct {
+ store ipn.StateStore
+}
+
+// NewLogStore creates a new LogStateStore with the given [ipn.StateStore].
+func NewLogStore(store ipn.StateStore) LogStore {
+ return &logStateStore{
+ store: store,
+ }
+}
+
+func (s *logStateStore) generateKey(key ipn.ProfileID) string {
+ return "auditlog-" + string(key)
+}
+
+// Save saves the given logs to an [ipn.StateStore]. This overwrites
+// any existing entries for the given key.
+func (s *logStateStore) save(key ipn.ProfileID, txns []*transaction) error {
+ if key == "" {
+ return errors.New("empty key")
+ }
+
+ data, err := json.Marshal(txns)
+ if err != nil {
+ return err
+ }
+ k := ipn.StateKey(s.generateKey(key))
+ return s.store.WriteState(k, data)
+}
+
+// Load retrieves the logs from an [ipn.StateStore].
+func (s *logStateStore) load(key ipn.ProfileID) ([]*transaction, error) {
+ if key == "" {
+ return nil, errors.New("empty key")
+ }
+
+ k := ipn.StateKey(s.generateKey(key))
+ data, err := s.store.ReadState(k)
+
+ switch {
+ case errors.Is(err, ipn.ErrStateNotExist):
+ return nil, nil
+ case err != nil:
+ return nil, err
+ }
+
+ var txns []*transaction
+ err = json.Unmarshal(data, &txns)
+ return txns, err
+}