1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
|
// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package logtail sends logs to log.tailscale.io.
package logtail
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"time"
"tailscale.com/logtail/backoff"
tslogger "tailscale.com/types/logger"
)
// DefaultHost is the default host name to upload logs to when
// Config.BaseURL isn't provided.
const DefaultHost = "log.tailscale.io"
type Logger interface {
// Write logs an encoded JSON blob.
//
// If the []byte passed to Write is not an encoded JSON blob,
// then contents is fit into a JSON blob and written.
//
// This is intended as an interface for the stdlib "log" package.
Write([]byte) (int, error)
// Flush uploads all logs to the server.
// It blocks until complete or there is an unrecoverable error.
Flush() error
// Shutdown gracefully shuts down the logger while completing any
// remaining uploads.
//
// It will block, continuing to try and upload unless the passed
// context object interrupts it by being done.
// If the shutdown is interrupted, an error is returned.
Shutdown(context.Context) error
// Close shuts down this logger object, the background log uploader
// process, and any associated goroutines.
//
// DEPRECATED: use Shutdown
Close()
}
type Encoder interface {
EncodeAll(src, dst []byte) []byte
Close() error
}
type Config struct {
Collection string // collection name, a domain name
PrivateID PrivateID // machine-specific private identifier
BaseURL string // if empty defaults to "https://log.tailscale.io"
HTTPC *http.Client // if empty defaults to http.DefaultClient
SkipClientTime bool // if true, client_time is not written to logs
LowMemory bool // if true, logtail minimizes memory use
TimeNow func() time.Time // if set, subsitutes uses of time.Now
Stderr io.Writer // if set, logs are sent here instead of os.Stderr
Buffer Buffer // temp storage, if nil a MemoryBuffer
NewZstdEncoder func() Encoder // if set, used to compress logs for transmission
// DrainLogs, if non-nil, disables autmatic uploading of new logs,
// so that logs are only uploaded when a token is sent to DrainLogs.
DrainLogs <-chan struct{}
}
func Log(cfg Config, logf tslogger.Logf) Logger {
if cfg.BaseURL == "" {
cfg.BaseURL = "https://" + DefaultHost
}
if cfg.HTTPC == nil {
cfg.HTTPC = http.DefaultClient
}
if cfg.TimeNow == nil {
cfg.TimeNow = time.Now
}
if cfg.Stderr == nil {
cfg.Stderr = os.Stderr
}
if cfg.Buffer == nil {
pendingSize := 256
if cfg.LowMemory {
pendingSize = 64
}
cfg.Buffer = NewMemoryBuffer(pendingSize)
}
l := &logger{
stderr: cfg.Stderr,
httpc: cfg.HTTPC,
url: cfg.BaseURL + "/c/" + cfg.Collection + "/" + cfg.PrivateID.String(),
lowMem: cfg.LowMemory,
buffer: cfg.Buffer,
skipClientTime: cfg.SkipClientTime,
sent: make(chan struct{}, 1),
sentinel: make(chan int32, 16),
drainLogs: cfg.DrainLogs,
timeNow: cfg.TimeNow,
bo: backoff.NewBackoff("logtail", logf),
shutdownStart: make(chan struct{}),
shutdownDone: make(chan struct{}),
}
if cfg.NewZstdEncoder != nil {
l.zstdEncoder = cfg.NewZstdEncoder()
}
ctx, cancel := context.WithCancel(context.Background())
l.uploadCancel = cancel
go l.uploading(ctx)
l.Write([]byte("logtail started"))
return l
}
type logger struct {
stderr io.Writer
httpc *http.Client
url string
lowMem bool
skipClientTime bool
buffer Buffer
sent chan struct{} // signal to speed up drain
drainLogs <-chan struct{} // if non-nil, external signal to attempt a drain
sentinel chan int32
timeNow func() time.Time
bo backoff.Backoff
zstdEncoder Encoder
uploadCancel func()
shutdownStart chan struct{} // closed when shutdown begins
shutdownDone chan struct{} // closd when shutdown complete
}
func (l *logger) Shutdown(ctx context.Context) error {
done := make(chan struct{})
go func() {
select {
case <-ctx.Done():
l.uploadCancel()
<-l.shutdownDone
case <-l.shutdownDone:
}
close(done)
}()
close(l.shutdownStart)
io.WriteString(l, "logger closing down\n")
<-done
if l.zstdEncoder != nil {
return l.zstdEncoder.Close()
}
return nil
}
func (l *logger) Close() {
l.Shutdown(context.Background())
}
// drainBlock is called by drainPending when there are no logs to drain.
//
// In typical operation, every call to the Write method unblocks and triggers
// a buffer.TryReadline, so logs are written with very low latency.
//
// If the caller provides a DrainLogs channel, then unblock-drain-on-Write
// is disabled, and it is up to the caller to trigger unblock the drain.
func (l *logger) drainBlock() (shuttingDown bool) {
if l.drainLogs == nil {
select {
case <-l.shutdownStart:
return true
case <-l.sent:
}
} else {
select {
case <-l.shutdownStart:
return true
case <-l.drainLogs:
}
}
return false
}
// drainPending drains and encodes a batch of logs from the buffer for upload.
// If no logs are available, drainPending blocks until logs are available.
func (l *logger) drainPending() (res []byte) {
buf := new(bytes.Buffer)
entries := 0
var batchDone bool
const maxLen = 256 << 10
for buf.Len() < maxLen && !batchDone {
b, err := l.buffer.TryReadLine()
if err == io.EOF {
break
} else if err != nil {
b = []byte(fmt.Sprintf("reading ringbuffer: %v", err))
batchDone = true
} else if b == nil {
if entries > 0 {
break
}
batchDone = l.drainBlock()
continue
}
if len(b) == 0 {
continue
}
if b[0] != '{' || !json.Valid(b) {
// This is probably a log added to stderr by filch
// outside of the logtail logger. Encode it.
// Do not add a client time, as it could have been
// been written a long time ago.
b = l.encodeText(b, true)
}
switch {
case entries == 0:
buf.Write(b)
case entries == 1:
buf2 := new(bytes.Buffer)
buf2.WriteByte('[')
buf2.Write(buf.Bytes())
buf2.WriteByte(',')
buf2.Write(b)
buf.Reset()
buf.Write(buf2.Bytes())
default:
buf.WriteByte(',')
buf.Write(b)
}
entries++
}
if entries > 1 {
buf.WriteByte(']')
}
if buf.Len() == 0 {
return nil
}
return buf.Bytes()
}
// This is the goroutine that repeatedly uploads logs in the background.
func (l *logger) uploading(ctx context.Context) {
defer close(l.shutdownDone)
for {
body := l.drainPending()
if l.zstdEncoder != nil {
body = l.zstdEncoder.EncodeAll(body, nil)
}
for len(body) > 0 {
select {
case <-ctx.Done():
return
default:
}
uploaded, err := l.upload(ctx, body)
if err != nil {
fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err)
}
l.bo.BackOff(ctx, err)
if uploaded {
break
}
}
select {
case <-l.shutdownStart:
return
default:
}
}
}
func (l *logger) upload(ctx context.Context, body []byte) (uploaded bool, err error) {
req, err := http.NewRequest("POST", l.url, bytes.NewReader(body))
if err != nil {
// I know of no conditions under which this could fail.
// Report it very loudly.
// TODO record logs to disk
panic("logtail: cannot build http request: " + err.Error())
}
if l.zstdEncoder != nil {
req.Header.Add("Content-Encoding", "zstd")
}
req.Header["User-Agent"] = nil // not worth writing one; save some bytes
maxUploadTime := 45 * time.Second
ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
defer cancel()
req = req.WithContext(ctx)
compressedNote := "not-compressed"
if l.zstdEncoder != nil {
compressedNote = "compressed"
}
resp, err := l.httpc.Do(req)
if err != nil {
return false, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
uploaded = resp.StatusCode == 400 // the server saved the logs anyway
b, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 1<<20))
return uploaded, fmt.Errorf("log upload of %d bytes %s failed %d: %q", len(body), compressedNote, resp.StatusCode, b)
}
// Try to read to EOF, in case server's response is
// chunked. We want to reuse the TCP connection if it's
// HTTP/1. On success, we expect 0 bytes.
// TODO(bradfitz): can remove a few days after 2020-04-04 once
// server is fixed.
if resp.ContentLength == -1 {
resp.Body.Read(make([]byte, 1))
}
return true, nil
}
func (l *logger) Flush() error {
return nil
}
func (l *logger) send(jsonBlob []byte) (int, error) {
n, err := l.buffer.Write(jsonBlob)
if l.drainLogs == nil {
select {
case l.sent <- struct{}{}:
default:
}
}
return n, err
}
// TODO: instead of allocating, this should probably just append
// directly into the output log buffer.
func (l *logger) encodeText(buf []byte, skipClientTime bool) []byte {
now := l.timeNow()
// Factor in JSON encoding overhead to try to only do one alloc
// in the make below (so appends don't resize the buffer).
overhead := 13
if !skipClientTime {
overhead += 67
}
// TODO: do a pass over buf and count how many backslashes will be needed?
// For now just factor in a dozen.
overhead += 12
b := make([]byte, 0, len(buf)+overhead)
b = append(b, '{')
if !skipClientTime {
b = append(b, `"logtail": {"client_time": "`...)
b = now.AppendFormat(b, time.RFC3339Nano)
b = append(b, "\"}, "...)
}
b = append(b, "\"text\": \""...)
for i, c := range buf {
switch c {
case '\b':
b = append(b, '\\', 'b')
case '\f':
b = append(b, '\\', 'f')
case '\n':
b = append(b, '\\', 'n')
case '\r':
b = append(b, '\\', 'r')
case '\t':
b = append(b, '\\', 't')
case '"':
b = append(b, '\\', '"')
case '\\':
b = append(b, '\\', '\\')
default:
// TODO: what about binary gibberish or non UTF-8?
b = append(b, c)
}
if l.lowMem && i > 254 {
// TODO: this can break a UTF-8 character
// mid-encoding. We don't tend to log
// non-ASCII stuff ourselves, but e.g. client
// names might be.
b = append(b, "…"...)
break
}
}
b = append(b, "\"}\n"...)
return b
}
func (l *logger) encode(buf []byte) []byte {
if buf[0] != '{' {
return l.encodeText(buf, l.skipClientTime) // text fast-path
}
now := l.timeNow()
obj := make(map[string]interface{})
if err := json.Unmarshal(buf, &obj); err != nil {
for k := range obj {
delete(obj, k)
}
obj["text"] = string(buf)
}
if txt, isStr := obj["text"].(string); l.lowMem && isStr && len(txt) > 254 {
// TODO(crawshaw): trim to unicode code point
obj["text"] = txt[:254] + "…"
}
hasLogtail := obj["logtail"] != nil
if hasLogtail {
obj["error_has_logtail"] = obj["logtail"]
obj["logtail"] = nil
}
if !l.skipClientTime {
obj["logtail"] = map[string]string{
"client_time": now.Format(time.RFC3339Nano),
}
}
b, err := json.Marshal(obj)
if err != nil {
fmt.Fprintf(l.stderr, "logtail: re-encoding JSON failed: %v\n", err)
// I know of no conditions under which this could fail.
// Report it very loudly.
panic("logtail: re-encoding JSON failed: " + err.Error())
}
b = append(b, '\n')
return b
}
func (l *logger) Write(buf []byte) (int, error) {
if len(buf) == 0 {
return 0, nil
}
if l.stderr != nil && l.stderr != ioutil.Discard {
if buf[len(buf)-1] == '\n' {
l.stderr.Write(buf)
} else {
// The log package always line-terminates logs,
// so this is an uncommon path.
withNL := append(buf[:len(buf):len(buf)], '\n')
l.stderr.Write(withNL)
}
}
b := l.encode(buf)
return l.send(b)
}
|