diff options
| author | Irbe Krumina <irbe@tailscale.com> | 2024-07-26 20:05:49 +0300 |
|---|---|---|
| committer | Irbe Krumina <irbe@tailscale.com> | 2024-07-26 21:32:37 +0300 |
| commit | 69c27b23cb8ae46e6f0845817e879d636f26e70a (patch) | |
| tree | 62d00d75cc340334c636c4b58e85ab1aba6dd507 /k8s-operator | |
| parent | 8d7b78f3f795e781d939750893610639b224d81a (diff) | |
| download | tailscale-irbekrm/websocket.tar.xz tailscale-irbekrm/websocket.zip | |
cmd/k8s-operator,k8s-operator/session-recording: implement support for WebSocket protocolirbekrm/websocket
Kubernetes currently supports two streaming protocols- SPDY and
WebSockets. WebSockets are replacing SPDY, see
https://github.com/kubernetes/enhancements/issues/4006
Our 'kubectl exec' session recording was only supporting
SPDY.
This change:
- adds functionality to parse streaming sessions over WebSockets
- for sessions that the API server proxy has determined need to be
recorded, determines if the session is over SPDY or WebSockets and
invoke the relevant parser accordingly
- refactors the session recording logic into its own package
Updates tailscale/corp#19821
Signed-off-by: Irbe Krumina <irbe@tailscale.com>
Diffstat (limited to 'k8s-operator')
| -rw-r--r-- | k8s-operator/session-recording/fakes/fakes.go | 117 | ||||
| -rw-r--r-- | k8s-operator/session-recording/hijacker.go | 205 | ||||
| -rw-r--r-- | k8s-operator/session-recording/hijacker_test.go | 123 | ||||
| -rw-r--r-- | k8s-operator/session-recording/spdy/conn.go | 205 | ||||
| -rw-r--r-- | k8s-operator/session-recording/spdy/conn_test.go | 243 | ||||
| -rw-r--r-- | k8s-operator/session-recording/spdy/frame.go | 285 | ||||
| -rw-r--r-- | k8s-operator/session-recording/spdy/frame_test.go | 293 | ||||
| -rw-r--r-- | k8s-operator/session-recording/spdy/zlib-reader.go | 221 | ||||
| -rw-r--r-- | k8s-operator/session-recording/tsrecorder/header.go | 54 | ||||
| -rw-r--r-- | k8s-operator/session-recording/tsrecorder/tsrecorder.go | 104 | ||||
| -rw-r--r-- | k8s-operator/session-recording/ws/conn.go | 244 | ||||
| -rw-r--r-- | k8s-operator/session-recording/ws/conn_test.go | 171 | ||||
| -rw-r--r-- | k8s-operator/session-recording/ws/message.go | 253 | ||||
| -rw-r--r-- | k8s-operator/session-recording/ws/message_test.go | 125 |
14 files changed, 2643 insertions, 0 deletions
diff --git a/k8s-operator/session-recording/fakes/fakes.go b/k8s-operator/session-recording/fakes/fakes.go new file mode 100644 index 000000000..9f5c349d4 --- /dev/null +++ b/k8s-operator/session-recording/fakes/fakes.go @@ -0,0 +1,117 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +// package fakes contains utils for testing session recording behaviour. +package fakes + +import ( + "bytes" + "encoding/json" + "net" + "sync" + "testing" + + "tailscale.com/k8s-operator/session-recording/tsrecorder" + "tailscale.com/tstime" +) + +func New(conn net.Conn, wb bytes.Buffer, rb bytes.Buffer, closed bool) net.Conn { + return &TestConn{ + Conn: conn, + writeBuf: wb, + readBuf: rb, + closed: closed, + } +} + +type TestConn struct { + net.Conn + // writeBuf contains whatever was send to the conn via Write. + writeBuf bytes.Buffer + // readBuf contains whatever was sent to the conn via Read. + readBuf bytes.Buffer + sync.RWMutex // protects the following + closed bool +} + +var _ net.Conn = &TestConn{} + +func (tc *TestConn) Read(b []byte) (int, error) { + return tc.readBuf.Read(b) +} + +func (tc *TestConn) Write(b []byte) (int, error) { + return tc.writeBuf.Write(b) +} + +func (tc *TestConn) Close() error { + tc.Lock() + defer tc.Unlock() + tc.closed = true + return nil +} + +func (tc *TestConn) IsClosed() bool { + tc.Lock() + defer tc.Unlock() + return tc.closed +} + +func (tc *TestConn) WriteBufBytes() []byte { + return tc.writeBuf.Bytes() +} + +func (tc *TestConn) ResetReadBuf() { + tc.readBuf.Reset() +} + +func (tc *TestConn) WriteReadBufBytes(b []byte) error { + _, err := tc.readBuf.Write(b) + return err +} + +type TestSessionRecorder struct { + // buf holds data that was sent to the session recorder. + buf bytes.Buffer +} + +func (t *TestSessionRecorder) Write(b []byte) (int, error) { + return t.buf.Write(b) +} + +func (t *TestSessionRecorder) Close() error { + t.buf.Reset() + return nil +} + +func (t *TestSessionRecorder) Bytes() []byte { + return t.buf.Bytes() +} + +func CastLine(t *testing.T, p []byte, clock tstime.Clock) []byte { + t.Helper() + j, err := json.Marshal([]any{ + clock.Now().Sub(clock.Now()).Seconds(), + "o", + string(p), + }) + if err != nil { + t.Fatalf("error marshalling cast line: %v", err) + } + return append(j, '\n') +} + +func AsciinemaResizeMsg(t *testing.T, width, height int) []byte { + t.Helper() + ch := tsrecorder.CastHeader{ + Width: width, + Height: height, + } + bs, err := json.Marshal(ch) + if err != nil { + t.Fatalf("error marshalling CastHeader: %v", err) + } + return append(bs, '\n') +} diff --git a/k8s-operator/session-recording/hijacker.go b/k8s-operator/session-recording/hijacker.go new file mode 100644 index 000000000..bbaee3ba7 --- /dev/null +++ b/k8s-operator/session-recording/hijacker.go @@ -0,0 +1,205 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +// Package sessionrecording has functionality for recording 'kubectl exec' +// sessions and sending to a tsrecorder. +package sessionrecording + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/netip" + "strings" + + "go.uber.org/zap" + "tailscale.com/client/tailscale/apitype" + "tailscale.com/k8s-operator/session-recording/spdy" + "tailscale.com/k8s-operator/session-recording/tsrecorder" + "tailscale.com/k8s-operator/session-recording/ws" + "tailscale.com/tailcfg" + "tailscale.com/tsnet" + "tailscale.com/tstime" + "tailscale.com/util/clientmetric" + "tailscale.com/util/multierr" +) + +const ( + SPDYProtocol = "SPDY" + WebSocketsProtocol = "WebSockets" +) + +var ( + // counterSessionRecordingsAttempted counts the number of session recording attempts. + CounterSessionRecordingsAttempted = clientmetric.NewCounter("k8s_auth_proxy__session_recordings_attempted") + + // counterSessionRecordingsUploaded counts the number of successfully uploaded session recordings. + CounterSessionRecordingsUploaded = clientmetric.NewCounter("k8s_auth_proxy_session_recordings_uploaded") +) + +func New(ts *tsnet.Server, req *http.Request, who *apitype.WhoIsResponse, w http.ResponseWriter, pod, ns string, proto protocol, addrs []netip.AddrPort, failOpen bool, connFunc RecorderDialFn, log *zap.SugaredLogger) *SpdyHijacker { + return &SpdyHijacker{ + ts: ts, + req: req, + who: who, + ResponseWriter: w, + pod: pod, + ns: ns, + addrs: addrs, + failOpen: failOpen, + connectToRecorder: connFunc, + proto: proto, + log: log, + } +} + +// spdyHijacker implements [net/http.Hijacker] interface. +// It must be configured with an http request for a 'kubectl exec' session that +// needs to be recorded. It knows how to hijack the connection and configure for +// the session contents to be sent to a tsrecorder instance. +type SpdyHijacker struct { + http.ResponseWriter + ts *tsnet.Server + req *http.Request + who *apitype.WhoIsResponse + log *zap.SugaredLogger + pod string // pod being exec-d + ns string // namespace of the pod being exec-d + addrs []netip.AddrPort // tsrecorder addresses + failOpen bool // whether to fail open if recording fails + connectToRecorder RecorderDialFn + proto protocol +} + +// protocol is the streaming protocol of the hijacked session. Supported +// protocols are SPDY and WebSockets. +type protocol string + +// RecorderDialFn dials the specified netip.AddrPorts that should be tsrecorder +// addresses. It tries to connect to recorder endpoints one by one, till one +// connection succeeds. In case of success, returns a list with a single +// successful recording attempt and an error channel. If the connection errors +// after having been established, an error is sent down the channel. +type RecorderDialFn func(context.Context, []netip.AddrPort, func(context.Context, string, string) (net.Conn, error)) (io.WriteCloser, []*tailcfg.SSHRecordingAttempt, <-chan error, error) + +// Hijack hijacks a 'kubectl exec' session and configures for the session +// contents to be sent to a recorder. +func (h *SpdyHijacker) Hijack() (net.Conn, *bufio.ReadWriter, error) { + h.log.Infof("recorder addrs: %v, failOpen: %v", h.addrs, h.failOpen) + reqConn, brw, err := h.ResponseWriter.(http.Hijacker).Hijack() + if err != nil { + return nil, nil, fmt.Errorf("error hijacking connection: %w", err) + } + + conn, err := h.setUpRecording(context.Background(), reqConn) + if err != nil { + return nil, nil, fmt.Errorf("error setting up session recording: %w", err) + } + return conn, brw, nil +} + +// setupRecording attempts to connect to the recorders set via +// spdyHijacker.addrs. Returns conn from provided opts, wrapped in recording +// logic. If connecting to the recorder fails or an error is received during the +// session and spdyHijacker.failOpen is false, connection will be closed. +func (h *SpdyHijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn, error) { + const ( + // https://docs.asciinema.org/manual/asciicast/v2/ + asciicastv2 = 2 + ) + var wc io.WriteCloser + h.log.Infof("kubectl exec session will be recorded, recorders: %v, fail open policy: %t", h.addrs, h.failOpen) + // TODO (irbekrm): send client a message that session will be recorded. + rw, _, errChan, err := h.connectToRecorder(ctx, h.addrs, h.ts.Dial) + if err != nil { + msg := fmt.Sprintf("error connecting to session recorders: %v", err) + if h.failOpen { + msg = msg + "; failure mode is 'fail open'; continuing session without recording." + h.log.Warnf(msg) + return conn, nil + } + msg = msg + "; failure mode is 'fail closed'; closing connection." + if err := closeConnWithWarning(conn, msg); err != nil { + return nil, multierr.New(errors.New(msg), err) + } + return nil, errors.New(msg) + } + // TODO (irbekrm): log which recorder + h.log.Info("successfully connected to a session recorder") + wc = rw + cl := tstime.DefaultClock{} + rec := tsrecorder.New(wc, cl, cl.Now(), h.failOpen) + + qp := h.req.URL.Query() + ch := tsrecorder.CastHeader{ + Version: asciicastv2, + Timestamp: cl.Now().Unix(), + Command: strings.Join(qp["command"], " "), + SrcNode: strings.TrimSuffix(h.who.Node.Name, "."), + SrcNodeID: h.who.Node.StableID, + Kubernetes: &tsrecorder.Kubernetes{ + PodName: h.pod, + Namespace: h.ns, + Container: strings.Join(qp["container"], " "), + }, + } + if !h.who.Node.IsTagged() { + ch.SrcNodeUser = h.who.UserProfile.LoginName + ch.SrcNodeUserID = h.who.Node.User + } else { + ch.SrcNodeTags = h.who.Node.Tags + } + var lc net.Conn + switch h.proto { + case SPDYProtocol: + lc = spdy.New(conn, rec, ch, h.log) + case WebSocketsProtocol: + lc = ws.New(conn, rec, ch, h.log) + default: + return nil, fmt.Errorf("unknown protocol: %s", h.proto) + } + + go func() { + var err error + select { + case <-ctx.Done(): + return + case err = <-errChan: + } + if err == nil { + CounterSessionRecordingsUploaded.Add(1) + h.log.Info("finished uploading the recording") + return + } + msg := fmt.Sprintf("connection to the session recorder errorred: %v;", err) + if h.failOpen { + msg += msg + "; failure mode is 'fail open'; continuing session without recording." + h.log.Info(msg) + return + } + msg += "; failure mode set to 'fail closed'; closing connection" + h.log.Error(msg) + if err := lc.Close(); err != nil { + h.log.Infof("error closing recorder connections: %v", err) + } + return + }() + + return lc, nil +} + +func closeConnWithWarning(conn net.Conn, msg string) error { + b := io.NopCloser(bytes.NewBuffer([]byte(msg))) + resp := http.Response{Status: http.StatusText(http.StatusForbidden), StatusCode: http.StatusForbidden, Body: b} + if err := resp.Write(conn); err != nil { + return multierr.New(fmt.Errorf("error writing msg %q to conn: %v", msg, err), conn.Close()) + } + return conn.Close() +} diff --git a/k8s-operator/session-recording/hijacker_test.go b/k8s-operator/session-recording/hijacker_test.go new file mode 100644 index 000000000..cfc694d26 --- /dev/null +++ b/k8s-operator/session-recording/hijacker_test.go @@ -0,0 +1,123 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package sessionrecording + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/netip" + "net/url" + "testing" + "time" + + "go.uber.org/zap" + "tailscale.com/client/tailscale/apitype" + "tailscale.com/k8s-operator/session-recording/fakes" + "tailscale.com/tailcfg" + "tailscale.com/tsnet" + "tailscale.com/tstest" +) + +func Test_SPDYHijacker(t *testing.T) { + zl, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + tests := []struct { + name string + failOpen bool + failRecorderConnect bool // fail initial connect to the recorder + failRecorderConnPostConnect bool // send error down the error channel + proto protocol + wantsConnClosed bool + wantsSetupErr bool + }{ + { + name: "spdy_setup_succeeds_conn_stays_open", + proto: SPDYProtocol, + }, + { + name: "ws_setup_succeeds_conn_stays_open", + proto: WebSocketsProtocol, + }, + { + name: "setup_fails_policy_is_to_fail_open_conn_stays_open", + failOpen: true, + failRecorderConnect: true, + proto: SPDYProtocol, + }, + { + name: "setup_fails_policy_is_to_fail_closed_conn_is_closed", + failRecorderConnect: true, + wantsSetupErr: true, + wantsConnClosed: true, + proto: SPDYProtocol, + }, + { + name: "connection_fails_post-initial_connect_policy_is_to_fail_open_conn_stays_open", + failRecorderConnPostConnect: true, + failOpen: true, + proto: SPDYProtocol, + }, + { + name: "connection_fails_post-initial_connect_policy_is_to_fail_closed_conn_is_closed", + failRecorderConnPostConnect: true, + wantsConnClosed: true, + proto: SPDYProtocol, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tc := &fakes.TestConn{} + ch := make(chan error) + h := &SpdyHijacker{ + connectToRecorder: func(context.Context, []netip.AddrPort, func(context.Context, string, string) (net.Conn, error)) (wc io.WriteCloser, rec []*tailcfg.SSHRecordingAttempt, _ <-chan error, err error) { + if tt.failRecorderConnect { + err = errors.New("test") + } + return wc, rec, ch, err + }, + failOpen: tt.failOpen, + who: &apitype.WhoIsResponse{Node: &tailcfg.Node{}, UserProfile: &tailcfg.UserProfile{}}, + log: zl.Sugar(), + ts: &tsnet.Server{}, + req: &http.Request{URL: &url.URL{}}, + proto: tt.proto, + } + ctx := context.Background() + _, err := h.setUpRecording(ctx, tc) + if (err != nil) != tt.wantsSetupErr { + t.Errorf("spdyHijacker.setupRecording() error = %v, wantErr %v", err, tt.wantsSetupErr) + return + } + if tt.failRecorderConnPostConnect { + select { + case ch <- errors.New("err"): + case <-time.After(time.Second * 15): + t.Errorf("error from recorder conn was not read within 15 seconds") + } + } + timeout := time.Second * 20 + // TODO (irbekrm): cover case where an error is received + // over channel and the failure policy is to fail open + // (test that connection remains open over some period + // of time). + if err := tstest.WaitFor(timeout, func() (err error) { + if tt.wantsConnClosed != tc.IsClosed() { + return fmt.Errorf("got conIection state: %t, wants connection state: %t", tc.IsClosed(), tt.wantsConnClosed) + } + return nil + }); err != nil { + t.Errorf("connection did not reach the desired state within %s", timeout.String()) + } + ctx.Done() + }) + } +} diff --git a/k8s-operator/session-recording/spdy/conn.go b/k8s-operator/session-recording/spdy/conn.go new file mode 100644 index 000000000..af27f27e6 --- /dev/null +++ b/k8s-operator/session-recording/spdy/conn.go @@ -0,0 +1,205 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +// Package spdy has functionality to parse 'kubectl exec' sessions streamed over +// SPDY. +package spdy + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + "net" + "net/http" + "sync" + "sync/atomic" + + "tailscale.com/k8s-operator/session-recording/tsrecorder" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" +) + +func New(conn net.Conn, rec *tsrecorder.Client, ch tsrecorder.CastHeader, log *zap.SugaredLogger) net.Conn { + return &spdyRemoteConnRecorder{ + Conn: conn, + rec: rec, + ch: ch, + log: log, + } + +} + +// spdyRemoteConnRecorder is a wrapper around net.Conn. It reads the bytestream +// for a 'kubectl exec' session, sends session recording data to the configured +// recorder and forwards the raw bytes to the original destination. +type spdyRemoteConnRecorder struct { + net.Conn + // rec knows how to send data written to it to a tsrecorder instance. + rec *tsrecorder.Client + ch tsrecorder.CastHeader + + stdoutStreamID atomic.Uint32 + stderrStreamID atomic.Uint32 + resizeStreamID atomic.Uint32 + + wmu sync.Mutex // sequences writes + closed bool + + rmu sync.Mutex // sequences reads + writeCastHeaderOnce sync.Once + + zlibReqReader zlibReader + // writeBuf is used to store data written to the connection that has not + // yet been parsed as SPDY frames. + writeBuf bytes.Buffer + // readBuf is used to store data read from the connection that has not + // yet been parsed as SPDY frames. + readBuf bytes.Buffer + log *zap.SugaredLogger +} + +// Read reads bytes from the original connection and parses them as SPDY frames. +// If the frame is a data frame for resize stream, sends resize message to the +// recorder. If the frame is a SYN_STREAM control frame that starts stdout, +// stderr or resize stream, store the stream ID. +func (c *spdyRemoteConnRecorder) Read(b []byte) (int, error) { + c.rmu.Lock() + defer c.rmu.Unlock() + n, err := c.Conn.Read(b) + if err != nil { + return n, fmt.Errorf("error reading from connection: %w", err) + } + c.readBuf.Write(b[:n]) + + var sf spdyFrame + ok, err := sf.Parse(c.readBuf.Bytes(), c.log) + if err != nil { + return 0, fmt.Errorf("error parsing data read from connection: %w", err) + } + if !ok { + // The parsed data in the buffer will be processed together with + // the new data on the next call to Read. + return n, nil + } + c.readBuf.Next(len(sf.Raw)) // advance buffer past the parsed frame + + if !sf.Ctrl { // data frame + switch sf.StreamID { + case c.resizeStreamID.Load(): + var err error + var msg tsrecorder.ResizeMsg + if err = json.Unmarshal(sf.Payload, &msg); err != nil { + return 0, err + } + c.ch.Width = msg.Width + c.ch.Height = msg.Height + } + return n, nil + } + // We always want to parse the headers, even if we don't care about the + // frame, as we need to advance the zlib reader otherwise we will get + // garbage. + header, err := sf.parseHeaders(&c.zlibReqReader, c.log) + if err != nil { + return 0, fmt.Errorf("error parsing frame headers: %w", err) + } + if sf.Type == SYN_STREAM { + c.storeStreamID(sf, header) + } + return n, nil +} + +// Write forwards the raw data of the latest parsed SPDY frame to the original +// destination. If the frame is an SPDY data frame, it also sends the payload to +// the connected session recorder. +func (c *spdyRemoteConnRecorder) Write(b []byte) (int, error) { + c.wmu.Lock() + defer c.wmu.Unlock() + c.writeBuf.Write(b) + + var sf spdyFrame + ok, err := sf.Parse(c.writeBuf.Bytes(), c.log) + if err != nil { + return 0, fmt.Errorf("error parsing data: %w", err) + } + if !ok { + // The parsed data in the buffer will be processed together with + // the new data on the next call to Write. + return len(b), nil + } + c.writeBuf.Next(len(sf.Raw)) // advance buffer past the parsed frame + + // If this is a stdout or stderr data frame, send its payload to the + // session recorder. + if !sf.Ctrl { + switch sf.StreamID { + case c.stdoutStreamID.Load(), c.stderrStreamID.Load(): + var err error + c.writeCastHeaderOnce.Do(func() { + + var j []byte + j, err = json.Marshal(c.ch) + if err != nil { + return + } + j = append(j, '\n') + err = c.rec.WriteCastLine(j) + if err != nil { + c.log.Errorf("received error from recorder: %v", err) + } + }) + if err != nil { + return 0, fmt.Errorf("error writing CastHeader: %w", err) + } + if err := c.rec.Write(sf.Payload); err != nil { + return 0, fmt.Errorf("error sending payload to session recorder: %w", err) + } + } + } + // Forward the whole frame to the original destination. + _, err = c.Conn.Write(sf.Raw) // send to net.Conn + return len(b), err +} + +func (c *spdyRemoteConnRecorder) Close() error { + c.wmu.Lock() + defer c.wmu.Unlock() + if c.closed { + return nil + } + // TODO: only do this if this is a normal closure rather than the + // reocrding has failed. + if c.writeBuf.Len() > 0 { + c.Conn.Write(c.writeBuf.Bytes()) + } + c.writeBuf.Reset() + c.closed = true + err := c.Conn.Close() + c.rec.Close() + return err +} + +// parseSynStream parses SYN_STREAM SPDY control frame and updates +// spdyRemoteConnRecorder to store the newly created stream's ID if it is one of +// the stream types we care about. Storing stream_id:stream_type mapping allows +// us to parse received data frames (that have stream IDs) differently depening +// on which stream they belong to (i.e send data frame payload for stdout stream +// to session recorder). +func (c *spdyRemoteConnRecorder) storeStreamID(sf spdyFrame, header http.Header) { + const ( + streamTypeHeaderKey = "Streamtype" + ) + id := binary.BigEndian.Uint32(sf.Payload[0:4]) + switch header.Get(streamTypeHeaderKey) { + case corev1.StreamTypeStdout: + c.stdoutStreamID.Store(id) + case corev1.StreamTypeStderr: + c.stderrStreamID.Store(id) + case corev1.StreamTypeResize: + c.resizeStreamID.Store(id) + } +} diff --git a/k8s-operator/session-recording/spdy/conn_test.go b/k8s-operator/session-recording/spdy/conn_test.go new file mode 100644 index 000000000..ce8c9ae49 --- /dev/null +++ b/k8s-operator/session-recording/spdy/conn_test.go @@ -0,0 +1,243 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package spdy + +import ( + "encoding/json" + "reflect" + "testing" + + "go.uber.org/zap" + "tailscale.com/k8s-operator/session-recording/fakes" + "tailscale.com/k8s-operator/session-recording/tsrecorder" + "tailscale.com/tstest" +) + +// Test_Writes tests that 1 or more Write calls to spdyRemoteConnRecorder +// results in the expected data being forwarded to the original destination and +// the session recorder. +func Test_Writes(t *testing.T) { + var stdoutStreamID, stderrStreamID uint32 = 1, 2 + zl, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + cl := tstest.NewClock(tstest.ClockOpts{}) + tests := []struct { + name string + inputs [][]byte + wantForwarded []byte + wantRecorded []byte + firstWrite bool + width int + height int + }{ + { + name: "single_write_control_frame_with_payload", + inputs: [][]byte{{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x5}}, + wantForwarded: []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x5}, + }, + { + name: "two_writes_control_frame_with_leftover", + inputs: [][]byte{{0x80, 0x3, 0x0, 0x1}, {0x0, 0x0, 0x0, 0x1, 0x5, 0x80, 0x3}}, + wantForwarded: []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x5}, + }, + { + name: "single_write_stdout_data_frame", + inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0}}, + wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0}, + }, + { + name: "single_write_stdout_data_frame_with_payload", + inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}}, + wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}, + wantRecorded: fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl), + }, + { + name: "single_write_stderr_data_frame_with_payload", + inputs: [][]byte{{0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}}, + wantForwarded: []byte{0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}, + wantRecorded: fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl), + }, + { + name: "single_data_frame_unknow_stream_with_payload", + inputs: [][]byte{{0x0, 0x0, 0x0, 0x7, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}}, + wantForwarded: []byte{0x0, 0x0, 0x0, 0x7, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}, + }, + { + name: "control_frame_and_data_frame_split_across_two_writes", + inputs: [][]byte{{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, {0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}}, + wantForwarded: []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}, + wantRecorded: fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl), + }, + { + name: "single_first_write_stdout_data_frame_with_payload", + inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}}, + wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}, + wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl)...), + width: 10, + height: 20, + firstWrite: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tc := &fakes.TestConn{} + sr := &fakes.TestSessionRecorder{} + rec := tsrecorder.New(sr, cl, cl.Now(), true) + + c := &spdyRemoteConnRecorder{ + Conn: tc, + log: zl.Sugar(), + rec: rec, + ch: tsrecorder.CastHeader{ + Width: tt.width, + Height: tt.height, + }, + } + if !tt.firstWrite { + // This test case does not intend to test that cast header gets written once. + c.writeCastHeaderOnce.Do(func() {}) + } + + c.stdoutStreamID.Store(stdoutStreamID) + c.stderrStreamID.Store(stderrStreamID) + for i, input := range tt.inputs { + if _, err := c.Write(input); err != nil { + t.Errorf("[%d] spdyRemoteConnRecorder.Write() unexpected error %v", i, err) + } + } + + // Assert that the expected bytes have been forwarded to the original destination. + gotForwarded := tc.WriteBufBytes() + if !reflect.DeepEqual(gotForwarded, tt.wantForwarded) { + t.Errorf("expected bytes not forwarded, wants\n%v\ngot\n%v", tt.wantForwarded, gotForwarded) + } + + // Assert that the expected bytes have been forwarded to the session recorder. + gotRecorded := sr.Bytes() + if !reflect.DeepEqual(gotRecorded, tt.wantRecorded) { + t.Errorf("expected bytes not recorded, wants\n%v\ngot\n%v", tt.wantRecorded, gotRecorded) + } + }) + } +} + +// Test_Reads tests that 1 or more Read calls to spdyRemoteConnRecorder results +// in the expected data being forwarded to the original destination and the +// session recorder. +func Test_Reads(t *testing.T) { + zl, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + cl := tstest.NewClock(tstest.ClockOpts{}) + var reader zlibReader + resizeMsg := resizeMsgBytes(t, 10, 20) + synStreamStdoutPayload := payload(t, map[string]string{"Streamtype": "stdout"}, SYN_STREAM, 1) + synStreamStderrPayload := payload(t, map[string]string{"Streamtype": "stderr"}, SYN_STREAM, 2) + synStreamResizePayload := payload(t, map[string]string{"Streamtype": "resize"}, SYN_STREAM, 3) + syn_stream_ctrl_header := []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, uint8(len(synStreamStdoutPayload))} + + tests := []struct { + name string + inputs [][]byte + wantStdoutStreamID uint32 + wantStderrStreamID uint32 + wantResizeStreamID uint32 + wantWidth int + wantHeight int + resizeStreamIDBeforeRead uint32 + }{ + { + name: "resize_data_frame_single_read", + inputs: [][]byte{append([]byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, uint8(len(resizeMsg))}, resizeMsg...)}, + resizeStreamIDBeforeRead: 1, + wantWidth: 10, + wantHeight: 20, + }, + { + name: "resize_data_frame_two_reads", + inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, uint8(len(resizeMsg))}, resizeMsg}, + resizeStreamIDBeforeRead: 1, + wantWidth: 10, + wantHeight: 20, + }, + { + name: "syn_stream_ctrl_frame_stdout_single_read", + inputs: [][]byte{append(syn_stream_ctrl_header, synStreamStdoutPayload...)}, + wantStdoutStreamID: 1, + }, + { + name: "syn_stream_ctrl_frame_stderr_single_read", + inputs: [][]byte{append(syn_stream_ctrl_header, synStreamStderrPayload...)}, + wantStderrStreamID: 2, + }, + { + name: "syn_stream_ctrl_frame_resize_single_read", + inputs: [][]byte{append(syn_stream_ctrl_header, synStreamResizePayload...)}, + wantResizeStreamID: 3, + }, + { + name: "syn_stream_ctrl_frame_resize_four_reads_with_leftover", + inputs: [][]byte{syn_stream_ctrl_header, append(synStreamResizePayload, syn_stream_ctrl_header...), append(synStreamStderrPayload, syn_stream_ctrl_header...), append(synStreamStdoutPayload, 0x0, 0x3)}, + wantStdoutStreamID: 1, + wantStderrStreamID: 2, + wantResizeStreamID: 3, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tc := &fakes.TestConn{} + sr := &fakes.TestSessionRecorder{} + rec := tsrecorder.New(sr, cl, cl.Now(), true) + c := &spdyRemoteConnRecorder{ + Conn: tc, + log: zl.Sugar(), + rec: rec, + } + c.resizeStreamID.Store(tt.resizeStreamIDBeforeRead) + + for i, input := range tt.inputs { + c.zlibReqReader = reader + tc.ResetReadBuf() + if err := tc.WriteReadBufBytes(input); err != nil { + t.Fatalf("writing bytes to test conn: %v", err) + } + _, err = c.Read(make([]byte, len(input))) + if err != nil { + t.Errorf("[%d] spdyRemoteConnRecorder.Read() resulted in an unexpected error: %v", i, err) + } + } + if id := c.resizeStreamID.Load(); id != tt.wantResizeStreamID && id != tt.resizeStreamIDBeforeRead { + t.Errorf("wants resizeStreamID: %d, got %d", tt.wantResizeStreamID, id) + } + if id := c.stderrStreamID.Load(); id != tt.wantStderrStreamID { + t.Errorf("wants stderrStreamID: %d, got %d", tt.wantStderrStreamID, id) + } + if id := c.stdoutStreamID.Load(); id != tt.wantStdoutStreamID { + t.Errorf("wants stdoutStreamID: %d, got %d", tt.wantStdoutStreamID, id) + } + if tt.wantHeight != 0 || tt.wantWidth != 0 { + if tt.wantWidth != c.ch.Width { + t.Errorf("wants width: %v, got %v", tt.wantWidth, c.ch.Width) + } + if tt.wantHeight != c.ch.Height { + t.Errorf("want height: %v, got %v", tt.wantHeight, c.ch.Height) + } + } + }) + } +} + +func resizeMsgBytes(t *testing.T, width, height int) []byte { + t.Helper() + bs, err := json.Marshal(tsrecorder.ResizeMsg{Width: width, Height: height}) + if err != nil { + t.Fatalf("error marshalling resizeMsg: %v", err) + } + return bs +} diff --git a/k8s-operator/session-recording/spdy/frame.go b/k8s-operator/session-recording/spdy/frame.go new file mode 100644 index 000000000..54b29d33a --- /dev/null +++ b/k8s-operator/session-recording/spdy/frame.go @@ -0,0 +1,285 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package spdy + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "net/http" + "sync" + + "go.uber.org/zap" +) + +const ( + SYN_STREAM ControlFrameType = 1 // https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.1 + SYN_REPLY ControlFrameType = 2 // https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.2 + SYN_PING ControlFrameType = 6 // https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.5 +) + +// spdyFrame is a parsed SPDY frame as defined in +// https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt +// A SPDY frame can be either a control frame or a data frame. +type spdyFrame struct { + Raw []byte // full frame as raw bytes + + // Common frame fields: + Ctrl bool // true if this is a SPDY control frame + Payload []byte // payload as raw bytes + + // Control frame fields: + Version uint16 // SPDY protocol version + Type ControlFrameType + + // Data frame fields: + // StreamID is the id of the steam to which this data frame belongs. + // SPDY allows transmitting multiple data streams concurrently. + StreamID uint32 +} + +// Type of an SPDY control frame. +type ControlFrameType uint16 + +// Parse parses bytes into spdyFrame. +// If the bytes don't contain a full frame, return false. +// +// Control frame structure: +// +// +----------------------------------+ +// |C| Version(15bits) | Type(16bits) | +// +----------------------------------+ +// | Flags (8) | Length (24 bits) | +// +----------------------------------+ +// | Data | +// +----------------------------------+ +// +// Data frame structure: +// +// +----------------------------------+ +// |C| Stream-ID (31bits) | +// +----------------------------------+ +// | Flags (8) | Length (24 bits) | +// +----------------------------------+ +// | Data | +// +----------------------------------+ +// +// https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt +func (sf *spdyFrame) Parse(b []byte, log *zap.SugaredLogger) (ok bool, _ error) { + const ( + spdyHeaderLength = 8 + ) + have := len(b) + if have < spdyHeaderLength { // input does not contain full frame + return false, nil + } + + if !isSPDYFrameHeader(b) { + return false, fmt.Errorf("bytes %v do not seem to contain SPDY frames. Ensure that you are using a SPDY based client to 'kubectl exec'.", b) + } + + payloadLength := readInt24(b[5:8]) + frameLength := payloadLength + spdyHeaderLength + if have < frameLength { // input does not contain full frame + return false, nil + } + + frame := b[:frameLength:frameLength] // enforce frameLength capacity + + sf.Raw = frame + sf.Payload = frame[spdyHeaderLength:frameLength] + + sf.Ctrl = hasControlBitSet(frame) + + if !sf.Ctrl { // data frame + sf.StreamID = dataFrameStreamID(frame) + return true, nil + } + + sf.Version = controlFrameVersion(frame) + sf.Type = controlFrameType(frame) + return true, nil +} + +// parseHeaders retrieves any headers from this spdyFrame. +func (sf *spdyFrame) parseHeaders(z *zlibReader, log *zap.SugaredLogger) (http.Header, error) { + if !sf.Ctrl { + return nil, fmt.Errorf("[unexpected] parseHeaders called for a frame that is not a control frame") + } + const ( + // +------------------------------------+ + // |X| Stream-ID (31bits) | + // +------------------------------------+ + // |X| Associated-To-Stream-ID (31bits) | + // +------------------------------------+ + // | Pri|Unused | Slot | | + // +-------------------+ | + synStreamPayloadLengthBeforeHeaders = 10 + + // +------------------------------------+ + // |X| Stream-ID (31bits) | + //+------------------------------------+ + synReplyPayloadLengthBeforeHeaders = 4 + + // +----------------------------------| + // | 32-bit ID | + // +----------------------------------+ + pingPayloadLength = 4 + ) + + switch sf.Type { + case SYN_STREAM: + if len(sf.Payload) < synStreamPayloadLengthBeforeHeaders { + return nil, fmt.Errorf("SYN_STREAM frame too short: %v", len(sf.Payload)) + } + z.Set(sf.Payload[synStreamPayloadLengthBeforeHeaders:]) + return parseHeaders(z, log) + case SYN_REPLY: + if len(sf.Payload) < synReplyPayloadLengthBeforeHeaders { + return nil, fmt.Errorf("SYN_REPLY frame too short: %v", len(sf.Payload)) + } + if len(sf.Payload) == synReplyPayloadLengthBeforeHeaders { + return nil, nil // no headers + } + z.Set(sf.Payload[synReplyPayloadLengthBeforeHeaders:]) + return parseHeaders(z, log) + case SYN_PING: + if len(sf.Payload) != pingPayloadLength { + return nil, fmt.Errorf("PING frame with unexpected length %v", len(sf.Payload)) + } + return nil, nil // ping frame has no headers + + default: + log.Infof("[unexpected] unknown control frame type %v", sf.Type) + } + return nil, nil +} + +// parseHeaders expects to be passed a reader that contains a compressed SPDY control +// frame Name/Value Header Block with 0 or more headers: +// +// | Number of Name/Value pairs (int32) | <+ +// +------------------------------------+ | +// | Length of name (int32) | | This section is the "Name/Value +// +------------------------------------+ | Header Block", and is compressed. +// | Name (string) | | +// +------------------------------------+ | +// | Length of value (int32) | | +// +------------------------------------+ | +// | Value (string) | | +// +------------------------------------+ | +// | (repeats) | <+ +// +// It extracts the headers and returns them as http.Header. By doing that it +// also advances the provided reader past the headers block. +// See also https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.10 +func parseHeaders(decompressor io.Reader, log *zap.SugaredLogger) (http.Header, error) { + buf := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(buf) + buf.Reset() + + // readUint32 reads the next 4 decompressed bytes from the decompressor + // as a uint32. + readUint32 := func() (uint32, error) { + const uint32Length = 4 + if _, err := io.CopyN(buf, decompressor, uint32Length); err != nil { // decompress + return 0, fmt.Errorf("error decompressing bytes: %w", err) + } + return binary.BigEndian.Uint32(buf.Next(uint32Length)), nil // return as uint32 + } + + // readLenBytes decompresses and returns as bytes the next 'Name' or 'Value' + // field from SPDY Name/Value header block. decompressor must be at + // 'Length of name'/'Length of value' field. + readLenBytes := func() ([]byte, error) { + xLen, err := readUint32() // length of field to read + if err != nil { + return nil, err + } + if _, err := io.CopyN(buf, decompressor, int64(xLen)); err != nil { // decompress + return nil, err + } + return buf.Next(int(xLen)), nil + } + + numHeaders, err := readUint32() + if err != nil { + return nil, fmt.Errorf("error determining num headers: %v", err) + } + h := make(http.Header, numHeaders) + for i := uint32(0); i < numHeaders; i++ { + name, err := readLenBytes() + if err != nil { + return nil, err + } + ns := string(name) + if _, ok := h[ns]; ok { + return nil, fmt.Errorf("invalid data: duplicate header %q", ns) + } + val, err := readLenBytes() + if err != nil { + return nil, fmt.Errorf("error reading header data: %w", err) + } + for _, v := range bytes.Split(val, headerSep) { + h.Add(ns, string(v)) + } + } + return h, nil +} + +// isSPDYFrame validates that the input bytes start with a valid SPDY frame +// header. +func isSPDYFrameHeader(f []byte) bool { + if hasControlBitSet(f) { + // If this is a control frame, version and type must be set. + return controlFrameVersion(f) != uint16(0) && uint16(controlFrameType(f)) != uint16(0) + } + // If this is a data frame, stream ID must be set. + return dataFrameStreamID(f) != uint32(0) +} + +// spdyDataFrameStreamID returns stream ID for an SPDY data frame passed as the +// input data slice. StreaID is contained within bits [0-31) of a data frame +// header. +func dataFrameStreamID(frame []byte) uint32 { + return binary.BigEndian.Uint32(frame[0:4]) & 0x7f +} + +// controlFrameType returns the type of a SPDY control frame. +// See https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6 +func controlFrameType(f []byte) ControlFrameType { + return ControlFrameType(binary.BigEndian.Uint16(f[2:4])) +} + +// spdyControlFrameVersion returns SPDY version extracted from input bytes that +// must be a SPDY control frame. +func controlFrameVersion(frame []byte) uint16 { + bs := binary.BigEndian.Uint16(frame[0:2]) // first 16 bits + return bs & 0x7f // discard control bit +} + +// hasControlBitSet returns true if the passsed bytes have SPDY control bit set. +// SPDY frames can be either control frames or data frames. A control frame has +// control bit set to 1 and a data frame has it set to 0. +func hasControlBitSet(frame []byte) bool { + return frame[0]&0x80 == 128 // 0x80 +} + +var bufPool = sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, +} + +// Headers in SPDY header name/value block are separated by a 0 byte. +// https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.10 +var headerSep = []byte{0} + +func readInt24(b []byte) int { + _ = b[2] // bounds check hint to compiler; see golang.org/issue/14808 + return int(b[0])<<16 | int(b[1])<<8 | int(b[2]) +} diff --git a/k8s-operator/session-recording/spdy/frame_test.go b/k8s-operator/session-recording/spdy/frame_test.go new file mode 100644 index 000000000..c6aa4cf01 --- /dev/null +++ b/k8s-operator/session-recording/spdy/frame_test.go @@ -0,0 +1,293 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package spdy + +import ( + "bytes" + "compress/zlib" + "encoding/binary" + "io" + "net/http" + "reflect" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "go.uber.org/zap" +) + +func Test_spdyFrame_Parse(t *testing.T) { + zl, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + tests := []struct { + name string + gotBytes []byte + wantFrame spdyFrame + wantOk bool + wantErr bool + }{ + { + name: "control_frame_syn_stream", + gotBytes: []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0}, + wantFrame: spdyFrame{ + Version: 3, + Type: SYN_STREAM, + Ctrl: true, + Raw: []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0}, + Payload: []byte{}, + }, + wantOk: true, + }, + { + name: "control_frame_syn_reply", + gotBytes: []byte{0x80, 0x3, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0}, + wantFrame: spdyFrame{ + Ctrl: true, + Version: 3, + Type: SYN_REPLY, + Raw: []byte{0x80, 0x3, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0}, + Payload: []byte{}, + }, + wantOk: true, + }, + { + name: "control_frame_headers", + gotBytes: []byte{0x80, 0x3, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0}, + wantFrame: spdyFrame{ + Ctrl: true, + Version: 3, + Type: 8, + Raw: []byte{0x80, 0x3, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0}, + Payload: []byte{}, + }, + wantOk: true, + }, + { + name: "data_frame_stream_id_5", + gotBytes: []byte{0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x0}, + wantFrame: spdyFrame{ + Payload: []byte{}, + StreamID: 5, + Raw: []byte{0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x0}, + }, + wantOk: true, + }, + { + name: "frame_with_incomplete_header", + gotBytes: []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, + }, + { + name: "frame_with_incomplete_payload", + gotBytes: []byte{0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x2}, // header specifies payload length of 2 + }, + { + name: "control_bit_set_not_spdy_frame", + gotBytes: []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // header specifies payload length of 2 + wantErr: true, + }, + { + name: "control_bit_not_set_not_spdy_frame", + gotBytes: []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // header specifies payload length of 2 + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sf := &spdyFrame{} + gotOk, err := sf.Parse(tt.gotBytes, zl.Sugar()) + if (err != nil) != tt.wantErr { + t.Errorf("spdyFrame.Parse() error = %v, wantErr %v", err, tt.wantErr) + return + } + if gotOk != tt.wantOk { + t.Errorf("spdyFrame.Parse() = %v, want %v", gotOk, tt.wantOk) + } + if diff := cmp.Diff(*sf, tt.wantFrame); diff != "" { + t.Errorf("Unexpected SPDY frame (-got +want):\n%s", diff) + } + }) + } +} + +func Test_spdyFrame_parseHeaders(t *testing.T) { + zl, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + tests := []struct { + name string + isCtrl bool + payload []byte + typ ControlFrameType + wantHeader http.Header + wantErr bool + }{ + { + name: "syn_stream_with_header", + payload: payload(t, map[string]string{"Streamtype": "stdin"}, SYN_STREAM, 1), + typ: SYN_STREAM, + isCtrl: true, + wantHeader: header(map[string]string{"Streamtype": "stdin"}), + }, + { + name: "syn_ping", + payload: payload(t, nil, SYN_PING, 0), + typ: SYN_PING, + isCtrl: true, + }, + { + name: "syn_reply_headers", + payload: payload(t, map[string]string{"foo": "bar", "bar": "baz"}, SYN_REPLY, 0), + typ: SYN_REPLY, + isCtrl: true, + wantHeader: header(map[string]string{"foo": "bar", "bar": "baz"}), + }, + { + name: "syn_reply_no_headers", + payload: payload(t, nil, SYN_REPLY, 0), + typ: SYN_REPLY, + isCtrl: true, + }, + { + name: "syn_stream_too_short_payload", + payload: []byte{0, 1, 2, 3, 4}, + typ: SYN_STREAM, + isCtrl: true, + wantErr: true, + }, + { + name: "syn_reply_too_short_payload", + payload: []byte{0, 1, 2}, + typ: SYN_REPLY, + isCtrl: true, + wantErr: true, + }, + { + name: "syn_ping_too_short_payload", + payload: []byte{0, 1, 2}, + typ: SYN_PING, + isCtrl: true, + wantErr: true, + }, + { + name: "not_a_control_frame", + payload: []byte{0, 1, 2, 3}, + typ: SYN_PING, + wantErr: true, + }, + } + for _, tt := range tests { + var reader zlibReader + t.Run(tt.name, func(t *testing.T) { + sf := &spdyFrame{ + Ctrl: tt.isCtrl, + Type: tt.typ, + Payload: tt.payload, + } + gotHeader, err := sf.parseHeaders(&reader, zl.Sugar()) + if (err != nil) != tt.wantErr { + t.Errorf("spdyFrame.parseHeaders() error = %v, wantErr %v", err, tt.wantErr) + } + if !reflect.DeepEqual(gotHeader, tt.wantHeader) { + t.Errorf("spdyFrame.parseHeaders() = %v, want %v", gotHeader, tt.wantHeader) + } + }) + } +} + +// payload takes a control frame type and a map with 0 or more header keys and +// values and returns a SPDY control frame payload with the header as SPDY zlib +// compressed header name/value block. The payload is padded with arbitrary +// bytes to ensure the header name/value block is in the correct position for +// the frame type. +func payload(t *testing.T, headerM map[string]string, typ ControlFrameType, streamID int) []byte { + t.Helper() + + buf := bytes.NewBuffer([]byte{}) + writeControlFramePayloadBeforeHeaders(t, buf, typ, streamID) + if len(headerM) == 0 { + return buf.Bytes() + } + + w, err := zlib.NewWriterLevelDict(buf, zlib.BestCompression, spdyTxtDictionary) + if err != nil { + t.Fatalf("error creating new zlib writer: %v", err) + } + if len(headerM) != 0 { + writeHeaderValueBlock(t, w, headerM) + } + if err != nil { + t.Fatalf("error writing headers: %v", err) + } + w.Flush() + return buf.Bytes() +} + +// writeControlFramePayloadBeforeHeaders writes to w N bytes, N being the number +// of bytes that control frame payload for that control frame is required to +// contain before the name/value header block. +func writeControlFramePayloadBeforeHeaders(t *testing.T, w io.Writer, typ ControlFrameType, streamID int) { + t.Helper() + switch typ { + case SYN_STREAM: + // needs 10 bytes in payload before any headers + if err := binary.Write(w, binary.BigEndian, uint32(streamID)); err != nil { + t.Fatalf("writing streamID: %v", err) + } + if err := binary.Write(w, binary.BigEndian, [6]byte{0}); err != nil { + t.Fatalf("writing payload: %v", err) + } + case SYN_REPLY: + // needs 4 bytes in payload before any headers + if err := binary.Write(w, binary.BigEndian, uint32(0)); err != nil { + t.Fatalf("writing payload: %v", err) + } + case SYN_PING: + // needs 4 bytes in payload + if err := binary.Write(w, binary.BigEndian, uint32(0)); err != nil { + t.Fatalf("writing payload: %v", err) + } + default: + t.Fatalf("unexpected frame type: %v", typ) + } +} + +// writeHeaderValue block takes http.Header and zlib writer, writes the headers +// as SPDY zlib compressed bytes to the writer. +// Adopted from https://github.com/moby/spdystream/blob/v0.2.0/spdy/write.go#L171-L198 (which is also what Kubernetes uses). +func writeHeaderValueBlock(t *testing.T, w io.Writer, headerM map[string]string) { + t.Helper() + h := header(headerM) + if err := binary.Write(w, binary.BigEndian, uint32(len(h))); err != nil { + t.Fatalf("error writing header block length: %v", err) + } + for name, values := range h { + if err := binary.Write(w, binary.BigEndian, uint32(len(name))); err != nil { + t.Fatalf("error writing name length for name %q: %v", name, err) + } + name = strings.ToLower(name) + if _, err := io.WriteString(w, name); err != nil { + t.Fatalf("error writing name %q: %v", name, err) + } + v := strings.Join(values, string(headerSep)) + if err := binary.Write(w, binary.BigEndian, uint32(len(v))); err != nil { + t.Fatalf("error writing value length for value %q: %v", v, err) + } + if _, err := io.WriteString(w, v); err != nil { + t.Fatalf("error writing value %q: %v", v, err) + } + } +} + +func header(hs map[string]string) http.Header { + h := make(http.Header, len(hs)) + for key, val := range hs { + h.Add(key, val) + } + return h +} diff --git a/k8s-operator/session-recording/spdy/zlib-reader.go b/k8s-operator/session-recording/spdy/zlib-reader.go new file mode 100644 index 000000000..1eb654be3 --- /dev/null +++ b/k8s-operator/session-recording/spdy/zlib-reader.go @@ -0,0 +1,221 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package spdy + +import ( + "bytes" + "compress/zlib" + "io" +) + +// zlibReader contains functionality to parse zlib compressed SPDY data. +// See https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.10.1 +type zlibReader struct { + io.ReadCloser + underlying io.LimitedReader // zlib compressed SPDY data +} + +// Read decompresses zlibReader's underlying zlib compressed SPDY data and reads +// it into b. +func (z *zlibReader) Read(b []byte) (int, error) { + if z.ReadCloser == nil { + r, err := zlib.NewReaderDict(&z.underlying, spdyTxtDictionary) + if err != nil { + return 0, err + } + z.ReadCloser = r + } + return z.ReadCloser.Read(b) +} + +// Set sets zlibReader's underlying data. b must be zlib compressed SPDY data. +func (z *zlibReader) Set(b []byte) { + z.underlying.R = bytes.NewReader(b) + z.underlying.N = int64(len(b)) +} + +// spdyTxtDictionary is the dictionary defined in the SPDY spec. +// https://datatracker.ietf.org/doc/html/draft-mbelshe-httpbis-spdy-00#section-2.6.10.1 +var spdyTxtDictionary = []byte{ + 0x00, 0x00, 0x00, 0x07, 0x6f, 0x70, 0x74, 0x69, // - - - - o p t i + 0x6f, 0x6e, 0x73, 0x00, 0x00, 0x00, 0x04, 0x68, // o n s - - - - h + 0x65, 0x61, 0x64, 0x00, 0x00, 0x00, 0x04, 0x70, // e a d - - - - p + 0x6f, 0x73, 0x74, 0x00, 0x00, 0x00, 0x03, 0x70, // o s t - - - - p + 0x75, 0x74, 0x00, 0x00, 0x00, 0x06, 0x64, 0x65, // u t - - - - d e + 0x6c, 0x65, 0x74, 0x65, 0x00, 0x00, 0x00, 0x05, // l e t e - - - - + 0x74, 0x72, 0x61, 0x63, 0x65, 0x00, 0x00, 0x00, // t r a c e - - - + 0x06, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x00, // - a c c e p t - + 0x00, 0x00, 0x0e, 0x61, 0x63, 0x63, 0x65, 0x70, // - - - a c c e p + 0x74, 0x2d, 0x63, 0x68, 0x61, 0x72, 0x73, 0x65, // t - c h a r s e + 0x74, 0x00, 0x00, 0x00, 0x0f, 0x61, 0x63, 0x63, // t - - - - a c c + 0x65, 0x70, 0x74, 0x2d, 0x65, 0x6e, 0x63, 0x6f, // e p t - e n c o + 0x64, 0x69, 0x6e, 0x67, 0x00, 0x00, 0x00, 0x0f, // d i n g - - - - + 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x2d, 0x6c, // a c c e p t - l + 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x00, // a n g u a g e - + 0x00, 0x00, 0x0d, 0x61, 0x63, 0x63, 0x65, 0x70, // - - - a c c e p + 0x74, 0x2d, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x73, // t - r a n g e s + 0x00, 0x00, 0x00, 0x03, 0x61, 0x67, 0x65, 0x00, // - - - - a g e - + 0x00, 0x00, 0x05, 0x61, 0x6c, 0x6c, 0x6f, 0x77, // - - - a l l o w + 0x00, 0x00, 0x00, 0x0d, 0x61, 0x75, 0x74, 0x68, // - - - - a u t h + 0x6f, 0x72, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, // o r i z a t i o + 0x6e, 0x00, 0x00, 0x00, 0x0d, 0x63, 0x61, 0x63, // n - - - - c a c + 0x68, 0x65, 0x2d, 0x63, 0x6f, 0x6e, 0x74, 0x72, // h e - c o n t r + 0x6f, 0x6c, 0x00, 0x00, 0x00, 0x0a, 0x63, 0x6f, // o l - - - - c o + 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, // n n e c t i o n + 0x00, 0x00, 0x00, 0x0c, 0x63, 0x6f, 0x6e, 0x74, // - - - - c o n t + 0x65, 0x6e, 0x74, 0x2d, 0x62, 0x61, 0x73, 0x65, // e n t - b a s e + 0x00, 0x00, 0x00, 0x10, 0x63, 0x6f, 0x6e, 0x74, // - - - - c o n t + 0x65, 0x6e, 0x74, 0x2d, 0x65, 0x6e, 0x63, 0x6f, // e n t - e n c o + 0x64, 0x69, 0x6e, 0x67, 0x00, 0x00, 0x00, 0x10, // d i n g - - - - + 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, // c o n t e n t - + 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, // l a n g u a g e + 0x00, 0x00, 0x00, 0x0e, 0x63, 0x6f, 0x6e, 0x74, // - - - - c o n t + 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, // e n t - l e n g + 0x74, 0x68, 0x00, 0x00, 0x00, 0x10, 0x63, 0x6f, // t h - - - - c o + 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x6f, // n t e n t - l o + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x00, 0x00, // c a t i o n - - + 0x00, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, // - - c o n t e n + 0x74, 0x2d, 0x6d, 0x64, 0x35, 0x00, 0x00, 0x00, // t - m d 5 - - - + 0x0d, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, // - c o n t e n t + 0x2d, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x00, 0x00, // - r a n g e - - + 0x00, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, // - - c o n t e n + 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x00, 0x00, // t - t y p e - - + 0x00, 0x04, 0x64, 0x61, 0x74, 0x65, 0x00, 0x00, // - - d a t e - - + 0x00, 0x04, 0x65, 0x74, 0x61, 0x67, 0x00, 0x00, // - - e t a g - - + 0x00, 0x06, 0x65, 0x78, 0x70, 0x65, 0x63, 0x74, // - - e x p e c t + 0x00, 0x00, 0x00, 0x07, 0x65, 0x78, 0x70, 0x69, // - - - - e x p i + 0x72, 0x65, 0x73, 0x00, 0x00, 0x00, 0x04, 0x66, // r e s - - - - f + 0x72, 0x6f, 0x6d, 0x00, 0x00, 0x00, 0x04, 0x68, // r o m - - - - h + 0x6f, 0x73, 0x74, 0x00, 0x00, 0x00, 0x08, 0x69, // o s t - - - - i + 0x66, 0x2d, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x00, // f - m a t c h - + 0x00, 0x00, 0x11, 0x69, 0x66, 0x2d, 0x6d, 0x6f, // - - - i f - m o + 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x2d, 0x73, // d i f i e d - s + 0x69, 0x6e, 0x63, 0x65, 0x00, 0x00, 0x00, 0x0d, // i n c e - - - - + 0x69, 0x66, 0x2d, 0x6e, 0x6f, 0x6e, 0x65, 0x2d, // i f - n o n e - + 0x6d, 0x61, 0x74, 0x63, 0x68, 0x00, 0x00, 0x00, // m a t c h - - - + 0x08, 0x69, 0x66, 0x2d, 0x72, 0x61, 0x6e, 0x67, // - i f - r a n g + 0x65, 0x00, 0x00, 0x00, 0x13, 0x69, 0x66, 0x2d, // e - - - - i f - + 0x75, 0x6e, 0x6d, 0x6f, 0x64, 0x69, 0x66, 0x69, // u n m o d i f i + 0x65, 0x64, 0x2d, 0x73, 0x69, 0x6e, 0x63, 0x65, // e d - s i n c e + 0x00, 0x00, 0x00, 0x0d, 0x6c, 0x61, 0x73, 0x74, // - - - - l a s t + 0x2d, 0x6d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, // - m o d i f i e + 0x64, 0x00, 0x00, 0x00, 0x08, 0x6c, 0x6f, 0x63, // d - - - - l o c + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, // a t i o n - - - + 0x0c, 0x6d, 0x61, 0x78, 0x2d, 0x66, 0x6f, 0x72, // - m a x - f o r + 0x77, 0x61, 0x72, 0x64, 0x73, 0x00, 0x00, 0x00, // w a r d s - - - + 0x06, 0x70, 0x72, 0x61, 0x67, 0x6d, 0x61, 0x00, // - p r a g m a - + 0x00, 0x00, 0x12, 0x70, 0x72, 0x6f, 0x78, 0x79, // - - - p r o x y + 0x2d, 0x61, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, // - a u t h e n t + 0x69, 0x63, 0x61, 0x74, 0x65, 0x00, 0x00, 0x00, // i c a t e - - - + 0x13, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2d, 0x61, // - p r o x y - a + 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x7a, 0x61, // u t h o r i z a + 0x74, 0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x05, // t i o n - - - - + 0x72, 0x61, 0x6e, 0x67, 0x65, 0x00, 0x00, 0x00, // r a n g e - - - + 0x07, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x72, // - r e f e r e r + 0x00, 0x00, 0x00, 0x0b, 0x72, 0x65, 0x74, 0x72, // - - - - r e t r + 0x79, 0x2d, 0x61, 0x66, 0x74, 0x65, 0x72, 0x00, // y - a f t e r - + 0x00, 0x00, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, // - - - s e r v e + 0x72, 0x00, 0x00, 0x00, 0x02, 0x74, 0x65, 0x00, // r - - - - t e - + 0x00, 0x00, 0x07, 0x74, 0x72, 0x61, 0x69, 0x6c, // - - - t r a i l + 0x65, 0x72, 0x00, 0x00, 0x00, 0x11, 0x74, 0x72, // e r - - - - t r + 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x2d, 0x65, // a n s f e r - e + 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x00, // n c o d i n g - + 0x00, 0x00, 0x07, 0x75, 0x70, 0x67, 0x72, 0x61, // - - - u p g r a + 0x64, 0x65, 0x00, 0x00, 0x00, 0x0a, 0x75, 0x73, // d e - - - - u s + 0x65, 0x72, 0x2d, 0x61, 0x67, 0x65, 0x6e, 0x74, // e r - a g e n t + 0x00, 0x00, 0x00, 0x04, 0x76, 0x61, 0x72, 0x79, // - - - - v a r y + 0x00, 0x00, 0x00, 0x03, 0x76, 0x69, 0x61, 0x00, // - - - - v i a - + 0x00, 0x00, 0x07, 0x77, 0x61, 0x72, 0x6e, 0x69, // - - - w a r n i + 0x6e, 0x67, 0x00, 0x00, 0x00, 0x10, 0x77, 0x77, // n g - - - - w w + 0x77, 0x2d, 0x61, 0x75, 0x74, 0x68, 0x65, 0x6e, // w - a u t h e n + 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x00, 0x00, // t i c a t e - - + 0x00, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, // - - m e t h o d + 0x00, 0x00, 0x00, 0x03, 0x67, 0x65, 0x74, 0x00, // - - - - g e t - + 0x00, 0x00, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, // - - - s t a t u + 0x73, 0x00, 0x00, 0x00, 0x06, 0x32, 0x30, 0x30, // s - - - - 2 0 0 + 0x20, 0x4f, 0x4b, 0x00, 0x00, 0x00, 0x07, 0x76, // - O K - - - - v + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x00, 0x00, // e r s i o n - - + 0x00, 0x08, 0x48, 0x54, 0x54, 0x50, 0x2f, 0x31, // - - H T T P - 1 + 0x2e, 0x31, 0x00, 0x00, 0x00, 0x03, 0x75, 0x72, // - 1 - - - - u r + 0x6c, 0x00, 0x00, 0x00, 0x06, 0x70, 0x75, 0x62, // l - - - - p u b + 0x6c, 0x69, 0x63, 0x00, 0x00, 0x00, 0x0a, 0x73, // l i c - - - - s + 0x65, 0x74, 0x2d, 0x63, 0x6f, 0x6f, 0x6b, 0x69, // e t - c o o k i + 0x65, 0x00, 0x00, 0x00, 0x0a, 0x6b, 0x65, 0x65, // e - - - - k e e + 0x70, 0x2d, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x00, // p - a l i v e - + 0x00, 0x00, 0x06, 0x6f, 0x72, 0x69, 0x67, 0x69, // - - - o r i g i + 0x6e, 0x31, 0x30, 0x30, 0x31, 0x30, 0x31, 0x32, // n 1 0 0 1 0 1 2 + 0x30, 0x31, 0x32, 0x30, 0x32, 0x32, 0x30, 0x35, // 0 1 2 0 2 2 0 5 + 0x32, 0x30, 0x36, 0x33, 0x30, 0x30, 0x33, 0x30, // 2 0 6 3 0 0 3 0 + 0x32, 0x33, 0x30, 0x33, 0x33, 0x30, 0x34, 0x33, // 2 3 0 3 3 0 4 3 + 0x30, 0x35, 0x33, 0x30, 0x36, 0x33, 0x30, 0x37, // 0 5 3 0 6 3 0 7 + 0x34, 0x30, 0x32, 0x34, 0x30, 0x35, 0x34, 0x30, // 4 0 2 4 0 5 4 0 + 0x36, 0x34, 0x30, 0x37, 0x34, 0x30, 0x38, 0x34, // 6 4 0 7 4 0 8 4 + 0x30, 0x39, 0x34, 0x31, 0x30, 0x34, 0x31, 0x31, // 0 9 4 1 0 4 1 1 + 0x34, 0x31, 0x32, 0x34, 0x31, 0x33, 0x34, 0x31, // 4 1 2 4 1 3 4 1 + 0x34, 0x34, 0x31, 0x35, 0x34, 0x31, 0x36, 0x34, // 4 4 1 5 4 1 6 4 + 0x31, 0x37, 0x35, 0x30, 0x32, 0x35, 0x30, 0x34, // 1 7 5 0 2 5 0 4 + 0x35, 0x30, 0x35, 0x32, 0x30, 0x33, 0x20, 0x4e, // 5 0 5 2 0 3 - N + 0x6f, 0x6e, 0x2d, 0x41, 0x75, 0x74, 0x68, 0x6f, // o n - A u t h o + 0x72, 0x69, 0x74, 0x61, 0x74, 0x69, 0x76, 0x65, // r i t a t i v e + 0x20, 0x49, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x61, // - I n f o r m a + 0x74, 0x69, 0x6f, 0x6e, 0x32, 0x30, 0x34, 0x20, // t i o n 2 0 4 - + 0x4e, 0x6f, 0x20, 0x43, 0x6f, 0x6e, 0x74, 0x65, // N o - C o n t e + 0x6e, 0x74, 0x33, 0x30, 0x31, 0x20, 0x4d, 0x6f, // n t 3 0 1 - M o + 0x76, 0x65, 0x64, 0x20, 0x50, 0x65, 0x72, 0x6d, // v e d - P e r m + 0x61, 0x6e, 0x65, 0x6e, 0x74, 0x6c, 0x79, 0x34, // a n e n t l y 4 + 0x30, 0x30, 0x20, 0x42, 0x61, 0x64, 0x20, 0x52, // 0 0 - B a d - R + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x34, 0x30, // e q u e s t 4 0 + 0x31, 0x20, 0x55, 0x6e, 0x61, 0x75, 0x74, 0x68, // 1 - U n a u t h + 0x6f, 0x72, 0x69, 0x7a, 0x65, 0x64, 0x34, 0x30, // o r i z e d 4 0 + 0x33, 0x20, 0x46, 0x6f, 0x72, 0x62, 0x69, 0x64, // 3 - F o r b i d + 0x64, 0x65, 0x6e, 0x34, 0x30, 0x34, 0x20, 0x4e, // d e n 4 0 4 - N + 0x6f, 0x74, 0x20, 0x46, 0x6f, 0x75, 0x6e, 0x64, // o t - F o u n d + 0x35, 0x30, 0x30, 0x20, 0x49, 0x6e, 0x74, 0x65, // 5 0 0 - I n t e + 0x72, 0x6e, 0x61, 0x6c, 0x20, 0x53, 0x65, 0x72, // r n a l - S e r + 0x76, 0x65, 0x72, 0x20, 0x45, 0x72, 0x72, 0x6f, // v e r - E r r o + 0x72, 0x35, 0x30, 0x31, 0x20, 0x4e, 0x6f, 0x74, // r 5 0 1 - N o t + 0x20, 0x49, 0x6d, 0x70, 0x6c, 0x65, 0x6d, 0x65, // - I m p l e m e + 0x6e, 0x74, 0x65, 0x64, 0x35, 0x30, 0x33, 0x20, // n t e d 5 0 3 - + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x20, // S e r v i c e - + 0x55, 0x6e, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, // U n a v a i l a + 0x62, 0x6c, 0x65, 0x4a, 0x61, 0x6e, 0x20, 0x46, // b l e J a n - F + 0x65, 0x62, 0x20, 0x4d, 0x61, 0x72, 0x20, 0x41, // e b - M a r - A + 0x70, 0x72, 0x20, 0x4d, 0x61, 0x79, 0x20, 0x4a, // p r - M a y - J + 0x75, 0x6e, 0x20, 0x4a, 0x75, 0x6c, 0x20, 0x41, // u n - J u l - A + 0x75, 0x67, 0x20, 0x53, 0x65, 0x70, 0x74, 0x20, // u g - S e p t - + 0x4f, 0x63, 0x74, 0x20, 0x4e, 0x6f, 0x76, 0x20, // O c t - N o v - + 0x44, 0x65, 0x63, 0x20, 0x30, 0x30, 0x3a, 0x30, // D e c - 0 0 - 0 + 0x30, 0x3a, 0x30, 0x30, 0x20, 0x4d, 0x6f, 0x6e, // 0 - 0 0 - M o n + 0x2c, 0x20, 0x54, 0x75, 0x65, 0x2c, 0x20, 0x57, // - - T u e - - W + 0x65, 0x64, 0x2c, 0x20, 0x54, 0x68, 0x75, 0x2c, // e d - - T h u - + 0x20, 0x46, 0x72, 0x69, 0x2c, 0x20, 0x53, 0x61, // - F r i - - S a + 0x74, 0x2c, 0x20, 0x53, 0x75, 0x6e, 0x2c, 0x20, // t - - S u n - - + 0x47, 0x4d, 0x54, 0x63, 0x68, 0x75, 0x6e, 0x6b, // G M T c h u n k + 0x65, 0x64, 0x2c, 0x74, 0x65, 0x78, 0x74, 0x2f, // e d - t e x t - + 0x68, 0x74, 0x6d, 0x6c, 0x2c, 0x69, 0x6d, 0x61, // h t m l - i m a + 0x67, 0x65, 0x2f, 0x70, 0x6e, 0x67, 0x2c, 0x69, // g e - p n g - i + 0x6d, 0x61, 0x67, 0x65, 0x2f, 0x6a, 0x70, 0x67, // m a g e - j p g + 0x2c, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x2f, 0x67, // - i m a g e - g + 0x69, 0x66, 0x2c, 0x61, 0x70, 0x70, 0x6c, 0x69, // i f - a p p l i + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x78, // c a t i o n - x + 0x6d, 0x6c, 0x2c, 0x61, 0x70, 0x70, 0x6c, 0x69, // m l - a p p l i + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x78, // c a t i o n - x + 0x68, 0x74, 0x6d, 0x6c, 0x2b, 0x78, 0x6d, 0x6c, // h t m l - x m l + 0x2c, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x70, 0x6c, // - t e x t - p l + 0x61, 0x69, 0x6e, 0x2c, 0x74, 0x65, 0x78, 0x74, // a i n - t e x t + 0x2f, 0x6a, 0x61, 0x76, 0x61, 0x73, 0x63, 0x72, // - j a v a s c r + 0x69, 0x70, 0x74, 0x2c, 0x70, 0x75, 0x62, 0x6c, // i p t - p u b l + 0x69, 0x63, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, // i c p r i v a t + 0x65, 0x6d, 0x61, 0x78, 0x2d, 0x61, 0x67, 0x65, // e m a x - a g e + 0x3d, 0x67, 0x7a, 0x69, 0x70, 0x2c, 0x64, 0x65, // - g z i p - d e + 0x66, 0x6c, 0x61, 0x74, 0x65, 0x2c, 0x73, 0x64, // f l a t e - s d + 0x63, 0x68, 0x63, 0x68, 0x61, 0x72, 0x73, 0x65, // c h c h a r s e + 0x74, 0x3d, 0x75, 0x74, 0x66, 0x2d, 0x38, 0x63, // t - u t f - 8 c + 0x68, 0x61, 0x72, 0x73, 0x65, 0x74, 0x3d, 0x69, // h a r s e t - i + 0x73, 0x6f, 0x2d, 0x38, 0x38, 0x35, 0x39, 0x2d, // s o - 8 8 5 9 - + 0x31, 0x2c, 0x75, 0x74, 0x66, 0x2d, 0x2c, 0x2a, // 1 - u t f - - - + 0x2c, 0x65, 0x6e, 0x71, 0x3d, 0x30, 0x2e, // - e n q - 0 - +} diff --git a/k8s-operator/session-recording/tsrecorder/header.go b/k8s-operator/session-recording/tsrecorder/header.go new file mode 100644 index 000000000..45c50ca1e --- /dev/null +++ b/k8s-operator/session-recording/tsrecorder/header.go @@ -0,0 +1,54 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package tsrecorder + +import "tailscale.com/tailcfg" + +// CastHeader is the asciicast header to be sent to the recorder at the start of +// the recording of a session. +// https://docs.asciinema.org/manual/asciicast/v2/#header +type CastHeader struct { + // Version is the asciinema file format version. + Version int `json:"version"` + + // Width is the terminal width in characters. + Width int `json:"width"` + + // Height is the terminal height in characters. + Height int `json:"height"` + + // Timestamp is the unix timestamp of when the recording started. + Timestamp int64 `json:"timestamp"` + + // Tailscale-specific fields: SrcNode is the full MagicDNS name of the + // tailnet node originating the connection, without the trailing dot. + SrcNode string `json:"srcNode"` + + // SrcNodeID is the node ID of the tailnet node originating the connection. + SrcNodeID tailcfg.StableNodeID `json:"srcNodeID"` + + // SrcNodeTags is the list of tags on the node originating the connection (if any). + SrcNodeTags []string `json:"srcNodeTags,omitempty"` + + // SrcNodeUserID is the user ID of the node originating the connection (if not tagged). + SrcNodeUserID tailcfg.UserID `json:"srcNodeUserID,omitempty"` // if not tagged + + // SrcNodeUser is the LoginName of the node originating the connection (if not tagged). + SrcNodeUser string `json:"srcNodeUser,omitempty"` + + Command string + + // Kubernetes-specific fields: + Kubernetes *Kubernetes `json:"kubernetes,omitempty"` +} + +// Kubernetes contains 'kubectl exec' session specific information for +// tsrecorder. +type Kubernetes struct { + PodName string + Namespace string + Container string +} diff --git a/k8s-operator/session-recording/tsrecorder/tsrecorder.go b/k8s-operator/session-recording/tsrecorder/tsrecorder.go new file mode 100644 index 000000000..4ce78a882 --- /dev/null +++ b/k8s-operator/session-recording/tsrecorder/tsrecorder.go @@ -0,0 +1,104 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +// Package tsrecorder contains functionality to send recorded kubectl-exec +// sessions to tsrecorder. +package tsrecorder + +import ( + "encoding/json" + "fmt" + "io" + "sync" + "time" + + "github.com/pkg/errors" + "tailscale.com/tstime" +) + +func New(conn io.WriteCloser, clock tstime.Clock, start time.Time, failOpen bool) *Client { + return &Client{ + start: start, + clock: clock, + conn: conn, + failOpen: failOpen, + } +} + +// Client knows how to send the provided bytes to the configured tsrecorder +// instance in asciinema format. +type Client struct { + start time.Time + clock tstime.Clock + + // failOpen specifies whether the session should be allowed to + // continue if writing to the recording fails. + failOpen bool + + // backOff is set to true if we've failed open and should stop + // attempting to write to tsrecorder. + backOff bool + + mu sync.Mutex // guards writes to conn + conn io.WriteCloser // connection to a tsrecorder instance +} + +// Write appends timestamp to the provided bytes and sends them to the +// configured tsrecorder. +func (c *Client) Write(p []byte) (err error) { + if len(p) == 0 { + return nil + } + if c.backOff { + return nil + } + j, err := json.Marshal([]any{ + c.clock.Now().Sub(c.start).Seconds(), + "o", + string(p), + }) + if err != nil { + return fmt.Errorf("error marhalling payload: %w", err) + } + j = append(j, '\n') + if err := c.WriteCastLine(j); err != nil { + if !c.failOpen { + return fmt.Errorf("error writing payload to recorder: %w", err) + } + c.backOff = true + } + return nil +} + +func (c *Client) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + if c.conn == nil { + return nil + } + err := c.conn.Close() + c.conn = nil + return err +} + +// writeCastLine sends bytes to the tsrecorder. The bytes should be in +// asciinema format. +func (c *Client) WriteCastLine(j []byte) error { + c.mu.Lock() + defer c.mu.Unlock() + if c.conn == nil { + return errors.New("recorder closed") + } + _, err := c.conn.Write(j) + if err != nil { + return fmt.Errorf("recorder write error: %w", err) + } + return nil +} + +type ResizeMsg struct { + Width int `json:"width"` + Height int `json:"height"` +} diff --git a/k8s-operator/session-recording/ws/conn.go b/k8s-operator/session-recording/ws/conn.go new file mode 100644 index 000000000..88bbc2a7f --- /dev/null +++ b/k8s-operator/session-recording/ws/conn.go @@ -0,0 +1,244 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +// package ws has functionality to parse 'kubectl exec' sessions streamed using +// WebSockets protocol. +package ws + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "sync" + + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/remotecommand" + "tailscale.com/k8s-operator/session-recording/tsrecorder" + "tailscale.com/util/multierr" +) + +// New returns a wrapper around net.Conn that intercepts reads and writes for a +// websocket streaming session over the provided net.Conn, parses the data as +// websocket messages and sends message payloads for STDIN/STDOUT streams to a +// tsrecorder instance using the provided client. Caller must ensure that the +// session is streamed using WebSockets protocol. +func New(c net.Conn, rec *tsrecorder.Client, ch tsrecorder.CastHeader, log *zap.SugaredLogger) net.Conn { + return &conn{ + Conn: c, + rec: rec, + ch: ch, + log: log, + } +} + +// conn is a wrapper around net.Conn. It reads the bytestream +// for a 'kubectl exec' session, sends session recording data to the configured +// recorder and forwards the raw bytes to the original destination. +// A new conn is created per session. +// conn only knows to how to read a 'kubectl exec' session that is streamed using WebSocket protocol. +// https://www.rfc-editor.org/rfc/rfc6455 +type conn struct { + net.Conn + // rec knows how to send data to a tsrecorder instance. + rec *tsrecorder.Client + // ch is the asiinema CastHeader for a session. + ch tsrecorder.CastHeader + log *zap.SugaredLogger + + rmu sync.Mutex // sequences reads + // currentReadMsg contains parsed contents of a websocket binary data message that + // is currently being read from the underlying net.Conn. + currentReadMsg *message + // readBuf contains bytes for a currently parsed binary data message + // read from the underlying conn. If the message is masked, it is + // unmasked in place, so having this buffer allows us to avoid modifying + // the original byte array. + readBuf bytes.Buffer + + wmu sync.Mutex // sequences writes + writeCastHeaderOnce sync.Once + closed bool + // writeBuf contains bytes for a currently parsed binary data message + // being written to the underlying conn. If the message is masked, it is + // unmasked in place, so having this buffer allows us to avoid modifying + // the original byte array. + writeBuf bytes.Buffer + // currentWriteMsg contains parsed contents of a websocket binary data message that + // is currently being written to the underlying net.Conn. + currentWriteMsg *message +} + +// Read reads bytes from the original connection and parses them as websocket +// message fragments. If the message is for the resize stream, sets the width +// and height of the CastHeader for this connection. +// The fragment can be incomplete. +func (c *conn) Read(b []byte) (int, error) { + c.rmu.Lock() + defer c.rmu.Unlock() + n, err := c.Conn.Read(b) + if err != nil { + // It seems that we sometimes get a wrapped io.EOF, but the + // caller checks for io.EOF with ==. + if errors.Is(err, io.EOF) { + err = io.EOF + } + return 0, err + } + + typ := messageType(opcode(b)) + if typ == noOpcode && c.currentReadMsg != nil && !c.currentReadMsg.isFinalized { // subsequent fragment + typ = c.currentReadMsg.typ + } + + // A control message can not be fragmented and we are not interested in + // these messages. Just return. + if isControlMessage(typ) { + return n, nil + } + + // The only data message type that Kubernetes supports is binary message. + // If we received another message type, return and let the API server close the connection. + // https://github.com/kubernetes/client-go/blob/release-1.30/tools/remotecommand/websocket.go#L281 + if typ != binaryMessage { + c.log.Info("[unexpected] received a data message with a type that is not binary message type %d", typ) + return n, nil + } + if _, err := c.readBuf.Write(b[:n]); err != nil { + return 0, fmt.Errorf("[unexpected] error writing message contents to read buffer: %w", err) + } + + readMsg := &message{typ: typ} // start a new message... + // ... or pick up an already started one if the previous fragment was not final. + if c.currentReadMsg != nil && !c.currentReadMsg.isFinalized { + readMsg = c.currentReadMsg + } + + ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log) + if err != nil { + return 0, fmt.Errorf("error parsing message: %v", err) + } + if !ok { // incomplete fragment + return n, nil + } + c.readBuf.Next(len(readMsg.raw)) + + if readMsg.isFinalized { + // Stream IDs for websocket streams are static. + // https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218 + if readMsg.streamID.Load() == remotecommand.StreamResize { + var err error + var msg tsrecorder.ResizeMsg + if err = json.Unmarshal(readMsg.payload, &msg); err != nil { + return 0, fmt.Errorf("error umarshalling resize message: %w", err) + } + c.ch.Width = msg.Width + c.ch.Height = msg.Height + } + } + c.currentReadMsg = readMsg + return n, err +} + +// Write parses the written bytes as WebSocket message fragment. If the message +// is for stdout or stderr streams, it is written to the configured tsrecorder. +// A message fragment can be incomplete. +func (c *conn) Write(b []byte) (int, error) { + c.wmu.Lock() + defer c.wmu.Unlock() + + typ := messageType(opcode(b)) + // If we are in process of parsing a message fragment, the received + // bytes are not structured as a message fragment and can not be used to + // determine a message fragment. + if len(c.writeBuf.Bytes()) > 0 { // buffer contains previous incomplete fragment + typ = c.currentWriteMsg.typ + } + + if isControlMessage(typ) { + n, err := c.Conn.Write(b) + return n, err + } + + if _, err := c.writeBuf.Write(b); err != nil { + c.log.Errorf("write: error writing to write buf: %v", err) + return 0, fmt.Errorf("[unexpected] error writing to internal write buffer: %w", err) + } + + writeMsg := &message{typ: typ} // start a new message... + // ... or continue the existing one if it has not been finalized. + if c.currentWriteMsg != nil && !c.currentWriteMsg.isFinalized { + writeMsg = c.currentWriteMsg + } + + ok, err := writeMsg.Parse(c.writeBuf.Bytes(), c.log) + if err != nil { + c.log.Errorf("write: parsing a message errored: %v", err) + return 0, fmt.Errorf("write: error parsing message: %v", err) + } + c.currentWriteMsg = writeMsg + if !ok { // incomplete fragment + return len(b), nil + } + c.writeBuf.Next(len(writeMsg.raw)) // advance frame + + if len(writeMsg.payload) != 0 && writeMsg.isFinalized { + if writeMsg.streamID.Load() == remotecommand.StreamStdOut || writeMsg.streamID.Load() == remotecommand.StreamStdErr { + var err error + c.writeCastHeaderOnce.Do(func() { + var j []byte + j, err = json.Marshal(c.ch) + if err != nil { + c.log.Infof("error marhsalling conn: %v", err) + return + } + j = append(j, '\n') + err = c.rec.WriteCastLine(j) + if err != nil { + c.log.Errorf("received error from recorder: %v", err) + } + }) + if err != nil { + return 0, fmt.Errorf("error writing CastHeader: %w", err) + } + if err := c.rec.Write(writeMsg.payload); err != nil { + return 0, fmt.Errorf("error writing message to recorder: %v", err) + } + } + } + _, err = c.Conn.Write(c.currentWriteMsg.raw) + if err != nil { + c.log.Errorf("write: error writing to conn: %v", err) + } + return len(b), err +} + +func (c *conn) Close() error { + c.wmu.Lock() + defer c.wmu.Unlock() + if c.closed { + return nil + } + // TODO: only do this if this is a normal closure rather than the + // reocrding has failed. + if c.writeBuf.Len() > 0 { + c.Conn.Write(c.writeBuf.Bytes()) + } + c.closed = true + connCloseErr := c.Conn.Close() + recCloseErr := c.rec.Close() + return multierr.New(connCloseErr, recCloseErr) +} + +// opcode reads the websocket message opcode that denotes the message type. +// opcode is contained in bits [4-8] of the message. +// https://www.rfc-editor.org/rfc/rfc6455#section-5.2 +func opcode(b []byte) int { + // 0xf = 00001111; b & 00001111 zeroes out bits [0 - 3] of b + var mask byte = 0xf + return int(b[0] & mask) +} diff --git a/k8s-operator/session-recording/ws/conn_test.go b/k8s-operator/session-recording/ws/conn_test.go new file mode 100644 index 000000000..a64b89c56 --- /dev/null +++ b/k8s-operator/session-recording/ws/conn_test.go @@ -0,0 +1,171 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package ws + +import ( + "reflect" + "testing" + + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/remotecommand" + "tailscale.com/k8s-operator/session-recording/fakes" + "tailscale.com/k8s-operator/session-recording/tsrecorder" + "tailscale.com/tstest" +) + +func Test_conn_Read(t *testing.T) { + zl, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + // Resize stream ID + {"width": 10, "height": 20} + testResizeMsg := []byte{byte(remotecommand.StreamResize), 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x22, 0x3a, 0x32, 0x30, 0x7d} + lenResizeMsgPayload := byte(len(testResizeMsg)) + + tests := []struct { + name string + inputs [][]byte + wantWidth int + wantHeight int + }{ + { + name: "single_read_control_message", + inputs: [][]byte{{0x88, 0x0}}, + }, + { + name: "single_read_resize_message", + inputs: [][]byte{append([]byte{0x82, lenResizeMsgPayload}, testResizeMsg...)}, + wantWidth: 10, + wantHeight: 20, + }, + { + name: "two_reads_resize_message", + inputs: [][]byte{{0x2, 0x9, 0x4, 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22}, {0x80, 0x11, 0x4, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x22, 0x3a, 0x32, 0x30, 0x7d}}, + wantWidth: 10, + wantHeight: 20, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tc := &fakes.TestConn{} + tc.ResetReadBuf() + c := &conn{ + Conn: tc, + log: zl.Sugar(), + } + for i, input := range tt.inputs { + if err := tc.WriteReadBufBytes(input); err != nil { + t.Fatalf("writing bytes to test conn: %v", err) + } + _, err := c.Read(make([]byte, len(input))) + if err != nil { + t.Errorf("[%d] conn.Read() errored %v", i, err) + return + } + } + if tt.wantHeight != 0 || tt.wantWidth != 0 { + if tt.wantWidth != c.ch.Width { + t.Errorf("wants width: %v, got %v", tt.wantWidth, c.ch.Width) + } + if tt.wantHeight != c.ch.Height { + t.Errorf("want height: %v, got %v", tt.wantHeight, c.ch.Height) + } + } + }) + } +} + +func Test_conn_Write(t *testing.T) { + zl, err := zap.NewDevelopment() + if err != nil { + t.Fatal(err) + } + cl := tstest.NewClock(tstest.ClockOpts{}) + tests := []struct { + name string + inputs [][]byte + wantForwarded []byte + wantRecorded []byte + firstWrite bool + width int + height int + }{ + { + name: "single_write_control_frame", + inputs: [][]byte{{0x88, 0x0}}, + wantForwarded: []byte{0x88, 0x0}, + }, + { + name: "single_write_stdout_data_message", + inputs: [][]byte{{0x82, 0x3, 0x1, 0x7, 0x8}}, + wantForwarded: []byte{0x82, 0x3, 0x1, 0x7, 0x8}, + wantRecorded: fakes.CastLine(t, []byte{0x7, 0x8}, cl), + }, + { + name: "single_write_stderr_data_message", + inputs: [][]byte{{0x82, 0x3, 0x2, 0x7, 0x8}}, + wantForwarded: []byte{0x82, 0x3, 0x2, 0x7, 0x8}, + wantRecorded: fakes.CastLine(t, []byte{0x7, 0x8}, cl), + }, + { + name: "single_write_stdin_data_message", + inputs: [][]byte{{0x82, 0x3, 0x0, 0x7, 0x8}}, + wantForwarded: []byte{0x82, 0x3, 0x0, 0x7, 0x8}, + }, + { + name: "single_write_stdout_data_message_with_cast_header", + inputs: [][]byte{{0x82, 0x3, 0x1, 0x7, 0x8}}, + wantForwarded: []byte{0x82, 0x3, 0x1, 0x7, 0x8}, + wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x7, 0x8}, cl)...), + width: 10, + height: 20, + firstWrite: true, + }, + { + name: "two_writes_stdout_data_message", + inputs: [][]byte{{0x2, 0x3, 0x1, 0x7, 0x8}, {0x80, 0x6, 0x1, 0x1, 0x2, 0x3, 0x4, 0x5}}, + wantForwarded: []byte{0x2, 0x3, 0x1, 0x7, 0x8, 0x80, 0x6, 0x1, 0x1, 0x2, 0x3, 0x4, 0x5}, + wantRecorded: fakes.CastLine(t, []byte{0x7, 0x8, 0x1, 0x2, 0x3, 0x4, 0x5}, cl), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tc := &fakes.TestConn{} + sr := &fakes.TestSessionRecorder{} + rec := tsrecorder.New(sr, cl, cl.Now(), true) + c := &conn{ + Conn: tc, + log: zl.Sugar(), + ch: tsrecorder.CastHeader{ + Width: tt.width, + Height: tt.height, + }, + rec: rec, + } + if !tt.firstWrite { + // This test case does not intend to test that cast header gets written once. + c.writeCastHeaderOnce.Do(func() {}) + } + for i, input := range tt.inputs { + _, err := c.Write(input) + if err != nil { + t.Fatalf("[%d] conn.Write() errored: %v", i, err) + } + } + // Assert that the expected bytes have been forwarded to the original destination. + gotForwarded := tc.WriteBufBytes() + if !reflect.DeepEqual(gotForwarded, tt.wantForwarded) { + t.Errorf("expected bytes not forwarded, wants\n%v\ngot\n%v", tt.wantForwarded, gotForwarded) + } + + // Assert that the expected bytes have been forwarded to the session recorder. + gotRecorded := sr.Bytes() + if !reflect.DeepEqual(gotRecorded, tt.wantRecorded) { + t.Errorf("expected bytes not recorded, wants\n%v\ngot\n%v", tt.wantRecorded, gotRecorded) + } + }) + } +} diff --git a/k8s-operator/session-recording/ws/message.go b/k8s-operator/session-recording/ws/message.go new file mode 100644 index 000000000..bf33e6bb2 --- /dev/null +++ b/k8s-operator/session-recording/ws/message.go @@ -0,0 +1,253 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package ws + +import ( + "encoding/binary" + "fmt" + "sync/atomic" + + "github.com/pkg/errors" + "go.uber.org/zap" +) + +const ( + noOpcode messageType = 0 // continuation frame for fragmented messages + binaryMessage messageType = 2 +) + +// messageType is the type of a websocket data or control message as defined by opcode. +// https://www.rfc-editor.org/rfc/rfc6455#section-5.2 +// Known types of control messages are close, ping and pong. +// https://www.rfc-editor.org/rfc/rfc6455#section-5.5 +// The only data message type supported by Kubernetes is binary message +// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L281 +type messageType int + +// message is a parsed Websocket Message. +type message struct { + // payload is the contents of the so far parsed Websocket + // data Message payload, potentially from multiple fragments written by + // multiple invocations of Parse. As per RFC 6455 We can assume that the + // fragments will always arrive in order and data messages will not be + // interleaved. + payload []byte + + // isFinalized is set to true if msgPayload contains full contents of + // the message (the final fragment has been received). + isFinalized bool + + // streamID is the stream to which the message belongs, i.e stdin, stout + // etc. It is one of the stream IDs defined in + // https://github.com/kubernetes/apimachinery/commit/73d12d09c5be8703587b5127416eb83dc3b7e182#diff-291f96e8632d04d2d20f5fb00f6b323492670570d65434e8eac90c7a442d13bdR23-R36 + streamID atomic.Uint32 + + // typ is the type of a WebsocketMessage as defined by its opcode + // https://www.rfc-editor.org/rfc/rfc6455#section-5.2 + typ messageType + raw []byte +} + +// Parse accepts a websocket message fragment as a byte slice and parses its contents. +// The fragment can be: +// - a fragment that consists of a whole message +// - an initial fragment for a message for which we expect more fragments +// - a subsequent fragment for a message that we are currently parsing and whose so-far parsed contents are stored in msg. +// It is not expected that the byte slice would contain an incomplete fragment or fragment for a different message than the one currently being parsed (if any). +// Message fragment structure: +// 0 1 2 3 +// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +// +-+-+-+-+-------+-+-------------+-------------------------------+ +// |F|R|R|R| opcode|M| Payload len | Extended payload length | +// |I|S|S|S| (4) |A| (7) | (16/64) | +// |N|V|V|V| |S| | (if payload len==126/127) | +// | |1|2|3| |K| | | +// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + +// | Extended payload length continued, if payload len == 127 | +// + - - - - - - - - - - - - - - - +-------------------------------+ +// | |Masking-key, if MASK set to 1 | +// +-------------------------------+-------------------------------+ +// | Masking-key (continued) | Payload Data | +// +-------------------------------- - - - - - - - - - - - - - - - + +// : Payload Data continued ... : +// + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +// | Payload Data continued ... | +// +---------------------------------------------------------------+ +// https://www.rfc-editor.org/rfc/rfc6455#section-5.2 +// +// Fragmentation rules: +// An unfragmented message consists of a single frame with the FIN +// bit set (Section 5.2) and an opcode other than 0. +// A fragmented message consists of a single frame with the FIN bit +// clear and an opcode other than 0, followed by zero or more frames +// with the FIN bit clear and the opcode set to 0, and terminated by +// a single frame with the FIN bit set and an opcode of 0. +// https://www.rfc-editor.org/rfc/rfc6455#section-5.4 +func (msg *message) Parse(b []byte, log *zap.SugaredLogger) (bool, error) { + if msg.typ != binaryMessage { + return false, fmt.Errorf("[unexpected] internal error: attempted to parse a message with type %d", msg.typ) + } + + msg.isFinalized = isFinalFragment(b) + + maskSet := isMasked(b) + + payloadLength, payloadOffset, maskOffset, err := fragmentDimensions(b, maskSet) + if err != nil { + return false, fmt.Errorf("error determining payload length: %w", err) + } + log.Debugf("parse: parsing a message with payload length: %d payload offset: %d maskOffset: %d mask set: %t, is finalized: %t", payloadLength, payloadOffset, maskOffset, maskSet, msg.isFinalized) + + if len(b) < int(payloadOffset)+int(payloadLength) { // incomplete fragment + return false, nil + } + msg.raw = make([]byte, int(payloadOffset)+int(payloadLength)) + copy(msg.raw, b[:payloadOffset+payloadLength]) + + // Extract the payload. + msgPayload := b[payloadOffset : payloadOffset+payloadLength] + + // Unmask the payload if needed. + if maskSet { + m := b[maskOffset:payloadOffset] + var mask [4]byte + copy(mask[:], m) + maskBytes(mask, msgPayload) + } + + // Determine what stream the message is for. Stream ID of a Kubernetes + // streaming session is a 32bit integer, stored in the first byte of the + // message payload. + // https://github.com/kubernetes/apimachinery/commit/73d12d09c5be8703587b5127416eb83dc3b7e182#diff-291f96e8632d04d2d20f5fb00f6b323492670570d65434e8eac90c7a442d13bdR23-R36 + if len(msgPayload) == 0 { + return false, errors.New("[unexpected] received a message fragment with no stream ID") + } + + streamId := uint32(msgPayload[0]) + if msg.streamID.Load() != 0 && msg.streamID.Load() != streamId { + return false, fmt.Errorf("[unexpected] received message fragments with mismatched streamIDs %d and %d", msg.streamID.Load(), streamId) + } + msg.streamID.Store(streamId) + + // This is normal, Kubernetes seem to send a couple data messages with + // no payloads at the start. + if len(msgPayload) < 2 { + return true, nil + } + msgPayload = msgPayload[1:] // remove the stream ID byte + msg.payload = append(msg.payload, msgPayload...) + return true, nil +} + +// maskBytes applies mask to bytes in place. +// https://www.rfc-editor.org/rfc/rfc6455#section-5.3 +func maskBytes(key [4]byte, b []byte) { + for i := range b { + b[i] = b[i] ^ key[i%4] + } +} + +// isControlMessage returns true if the message type is one of the know control +// frame message types. +// https://www.rfc-editor.org/rfc/rfc6455#section-5.5 +func isControlMessage(t messageType) bool { + const ( + closeMessage messageType = 8 + pingMessage messageType = 9 + pongMessage messageType = 10 + ) + return t == closeMessage || t == pingMessage || t == pongMessage +} + +// isFinalFragment can be called with websocket message fragment and returns true if +// the fragment is the final fragment of a websocket message. +func isFinalFragment(b []byte) bool { + // Extract FIN bit. FIN bit is the first bit of a message fragment. + const finBitMask byte = 1 << 7 + finBit := b[0] & finBitMask + return finBit != 0 +} + +// isMasked can be called with a websocket message fragment and returns true if +// the payload of the message is masked. It uses the mask bit to determine if +// the payload is masked. +// https://www.rfc-editor.org/rfc/rfc6455#section-5.3 +func isMasked(b []byte) bool { + return extractFirstBit(b[1]) != 0 +} + +// extractFirstBit extracts first bit of a byte by zeroing out all the other +// bits. +func extractFirstBit(b byte) byte { + const mask byte = 1 << 7 + return b & mask +} + +// zeroFirstBit returns the provided byte with the first bit set to 0. +func zeroFirstBit(b byte) byte { + const revMask byte = 1 << 7 + return b & (^revMask) +} + +// fragmentDimensions returns payload length as well as payload offset and mask offset. +func fragmentDimensions(b []byte, maskSet bool) (payloadLength, payloadOffset, maskOffset int64, _ error) { + + // payload length can be stored either in bits [9-15] or in bytes 2, 3 + // or in bytes 2, 3, 4, 5, 6, 7. + // https://www.rfc-editor.org/rfc/rfc6455#section-5.2 + // 0 1 2 3 + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + // +-+-+-+-+-------+-+-------------+-------------------------------+ + // |F|R|R|R| opcode|M| Payload len | Extended payload length | + // |I|S|S|S| (4) |A| (7) | (16/64) | + // |N|V|V|V| |S| | (if payload len==126/127) | + // | |1|2|3| |K| | | + // +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + + // | Extended payload length continued, if payload len == 127 | + // + - - - - - - - - - - - - - - - +-------------------------------+ + payloadLengthIndicator := zeroFirstBit(b[1]) + var lengthOffset int64 + switch { + case payloadLengthIndicator < 126: + lengthOffset = 1 + maskOffset = 2 + payloadLength = int64(payloadLengthIndicator) + case payloadLengthIndicator == 126: + maskOffset = 4 + lengthOffset = 2 + payloadLength = extractInt64(b, lengthOffset, 2) + case payloadLengthIndicator == 127: + maskOffset = 10 + lengthOffset = 2 + payloadLength = extractInt64(b, lengthOffset, 6) + default: + return -1, -1, -1, fmt.Errorf("unexpected payload length indicator value: %v", payloadLengthIndicator) + } + + // Masking key can take up 0 or 4 bytes- we need to take that into + // account when determining payload offset. + // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + // .... + // + - - - - - - - - - - - - - - - +-------------------------------+ + // | |Masking-key, if MASK set to 1 | + // +-------------------------------+-------------------------------+ + // | Masking-key (continued) | Payload Data | + // + - - - - - - - - - - - - - - - +-------------------------------+ + // ... + if maskSet { + payloadOffset = maskOffset + 4 + } else { + payloadOffset = maskOffset + } + return +} + +func extractInt64(b []byte, offset, length int64) int64 { + payloadLengthBytes := b[offset : offset+length] + payloadLengthBytesPadded := append(make([]byte, 8-len(payloadLengthBytes)), payloadLengthBytes...) + + return int64(binary.BigEndian.Uint64(payloadLengthBytesPadded)) +} diff --git a/k8s-operator/session-recording/ws/message_test.go b/k8s-operator/session-recording/ws/message_test.go new file mode 100644 index 000000000..63a80ade9 --- /dev/null +++ b/k8s-operator/session-recording/ws/message_test.go @@ -0,0 +1,125 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package ws + +import ( + "reflect" + "testing" + + "go.uber.org/zap" +) + +func Test_msg_Parse(t *testing.T) { + zl, err := zap.NewDevelopment() + if err != nil { + t.Fatalf("error creating a test logger: %v", err) + } + testMask := [4]byte{1, 2, 3, 4} + tests := []struct { + name string + b []byte + initialPayload []byte + wantPayload []byte + wantIsFinalized bool + wantStreamID uint32 + }{ + { + name: "single_fragment_stdout_stream_no_payload_no_mask", + b: []byte{0x82, 0x1, 0x1}, + wantPayload: nil, + wantIsFinalized: true, + wantStreamID: 1, + }, + { + name: "single_fragment_stderr_steam_no_payload_has_mask", + b: append([]byte{0x82, 0x81, 0x1, 0x2, 0x3, 0x4}, maskedBytes(testMask, []byte{0x2})...), + wantPayload: nil, + wantIsFinalized: true, + wantStreamID: 2, + }, + { + name: "single_fragment_stdout_stream_no_mask_has_payload", + b: []byte{0x82, 0x3, 0x1, 0x7, 0x8}, + wantPayload: []byte{0x7, 0x8}, + wantIsFinalized: true, + wantStreamID: 1, + }, + { + name: "single_fragment_stdout_stream_has_mask_has_payload", + b: append([]byte{0x82, 0x83, 0x1, 0x2, 0x3, 0x4}, maskedBytes(testMask, []byte{0x1, 0x7, 0x8})...), + wantPayload: []byte{0x7, 0x8}, + wantIsFinalized: true, + wantStreamID: 1, + }, + { + name: "initial_fragment_stdout_stream_no_mask_has_payload", + b: []byte{0x2, 0x3, 0x1, 0x7, 0x8}, + wantPayload: []byte{0x7, 0x8}, + wantStreamID: 1, + }, + { + name: "initial_fragment_stdout_stream_has_mask_has_payload", + b: append([]byte{0x2, 0x83, 0x1, 0x2, 0x3, 0x4}, maskedBytes(testMask, []byte{0x1, 0x7, 0x8})...), + wantPayload: []byte{0x7, 0x8}, + wantStreamID: 1, + }, + { + name: "subsequent_fragment_stdout_stream_no_mask_has_payload", + b: []byte{0x0, 0x3, 0x1, 0x7, 0x8}, + initialPayload: []byte{0x1, 0x2, 0x3}, + wantPayload: []byte{0x1, 0x2, 0x3, 0x7, 0x8}, + wantStreamID: 1, + }, + { + name: "subsequent_fragment_stdout_stream_has_mask_has_payload", + b: append([]byte{0x0, 0x83, 0x1, 0x2, 0x3, 0x4}, maskedBytes(testMask, []byte{0x1, 0x7, 0x8})...), + initialPayload: []byte{0x1, 0x2, 0x3}, + wantPayload: []byte{0x1, 0x2, 0x3, 0x7, 0x8}, + wantStreamID: 1, + }, + { + name: "final_fragment_stdout_stream_no_mask_has_payload", + b: []byte{0x80, 0x3, 0x1, 0x7, 0x8}, + initialPayload: []byte{0x1, 0x2, 0x3}, + wantIsFinalized: true, + wantPayload: []byte{0x1, 0x2, 0x3, 0x7, 0x8}, + wantStreamID: 1, + }, + { + name: "final_fragment_stdout_stream_has_mask_has_payload", + b: append([]byte{0x80, 0x83, 0x1, 0x2, 0x3, 0x4}, maskedBytes(testMask, []byte{0x1, 0x7, 0x8})...), + initialPayload: []byte{0x1, 0x2, 0x3}, + wantIsFinalized: true, + wantPayload: []byte{0x1, 0x2, 0x3, 0x7, 0x8}, + wantStreamID: 1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + msg := &message{ + typ: binaryMessage, + payload: tt.initialPayload, + } + if _, err := msg.Parse(tt.b, zl.Sugar()); err != nil { + t.Errorf("msg.Parse() errored %v", err) + } + if msg.isFinalized != tt.wantIsFinalized { + t.Errorf("wants message to be finalized: %t, got: %t", tt.wantIsFinalized, msg.isFinalized) + } + if msg.streamID.Load() != tt.wantStreamID { + t.Errorf("wants stream ID: %d, got: %d", tt.wantStreamID, msg.streamID.Load()) + } + if !reflect.DeepEqual(msg.payload, tt.wantPayload) { + t.Errorf("unexpected message payload after Parse, wants %b, got %b", tt.wantPayload, msg.payload) + } + }) + } +} + +func maskedBytes(mask [4]byte, b []byte) []byte { + maskBytes(mask, b) + return b +} |
