summaryrefslogtreecommitdiffhomepage
path: root/k8s-operator
diff options
context:
space:
mode:
authorIrbe Krumina <irbe@tailscale.com>2024-07-26 20:05:49 +0300
committerIrbe Krumina <irbe@tailscale.com>2024-07-26 21:32:37 +0300
commit69c27b23cb8ae46e6f0845817e879d636f26e70a (patch)
tree62d00d75cc340334c636c4b58e85ab1aba6dd507 /k8s-operator
parent8d7b78f3f795e781d939750893610639b224d81a (diff)
downloadtailscale-irbekrm/websocket.tar.xz
tailscale-irbekrm/websocket.zip
cmd/k8s-operator,k8s-operator/session-recording: implement support for WebSocket protocolirbekrm/websocket
Kubernetes currently supports two streaming protocols- SPDY and WebSockets. WebSockets are replacing SPDY, see https://github.com/kubernetes/enhancements/issues/4006 Our 'kubectl exec' session recording was only supporting SPDY. This change: - adds functionality to parse streaming sessions over WebSockets - for sessions that the API server proxy has determined need to be recorded, determines if the session is over SPDY or WebSockets and invoke the relevant parser accordingly - refactors the session recording logic into its own package Updates tailscale/corp#19821 Signed-off-by: Irbe Krumina <irbe@tailscale.com>
Diffstat (limited to 'k8s-operator')
-rw-r--r--k8s-operator/session-recording/fakes/fakes.go117
-rw-r--r--k8s-operator/session-recording/hijacker.go205
-rw-r--r--k8s-operator/session-recording/hijacker_test.go123
-rw-r--r--k8s-operator/session-recording/spdy/conn.go205
-rw-r--r--k8s-operator/session-recording/spdy/conn_test.go243
-rw-r--r--k8s-operator/session-recording/spdy/frame.go285
-rw-r--r--k8s-operator/session-recording/spdy/frame_test.go293
-rw-r--r--k8s-operator/session-recording/spdy/zlib-reader.go221
-rw-r--r--k8s-operator/session-recording/tsrecorder/header.go54
-rw-r--r--k8s-operator/session-recording/tsrecorder/tsrecorder.go104
-rw-r--r--k8s-operator/session-recording/ws/conn.go244
-rw-r--r--k8s-operator/session-recording/ws/conn_test.go171
-rw-r--r--k8s-operator/session-recording/ws/message.go253
-rw-r--r--k8s-operator/session-recording/ws/message_test.go125
14 files changed, 2643 insertions, 0 deletions
diff --git a/k8s-operator/session-recording/fakes/fakes.go b/k8s-operator/session-recording/fakes/fakes.go
new file mode 100644
index 000000000..9f5c349d4
--- /dev/null
+++ b/k8s-operator/session-recording/fakes/fakes.go
@@ -0,0 +1,117 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+// package fakes contains utils for testing session recording behaviour.
+package fakes
+
+import (
+ "bytes"
+ "encoding/json"
+ "net"
+ "sync"
+ "testing"
+
+ "tailscale.com/k8s-operator/session-recording/tsrecorder"
+ "tailscale.com/tstime"
+)
+
+func New(conn net.Conn, wb bytes.Buffer, rb bytes.Buffer, closed bool) net.Conn {
+ return &TestConn{
+ Conn: conn,
+ writeBuf: wb,
+ readBuf: rb,
+ closed: closed,
+ }
+}
+
+type TestConn struct {
+ net.Conn
+ // writeBuf contains whatever was send to the conn via Write.
+ writeBuf bytes.Buffer
+ // readBuf contains whatever was sent to the conn via Read.
+ readBuf bytes.Buffer
+ sync.RWMutex // protects the following
+ closed bool
+}
+
+var _ net.Conn = &TestConn{}
+
+func (tc *TestConn) Read(b []byte) (int, error) {
+ return tc.readBuf.Read(b)
+}
+
+func (tc *TestConn) Write(b []byte) (int, error) {
+ return tc.writeBuf.Write(b)
+}
+
+func (tc *TestConn) Close() error {
+ tc.Lock()
+ defer tc.Unlock()
+ tc.closed = true
+ return nil
+}
+
+func (tc *TestConn) IsClosed() bool {
+ tc.Lock()
+ defer tc.Unlock()
+ return tc.closed
+}
+
+func (tc *TestConn) WriteBufBytes() []byte {
+ return tc.writeBuf.Bytes()
+}
+
+func (tc *TestConn) ResetReadBuf() {
+ tc.readBuf.Reset()
+}
+
+func (tc *TestConn) WriteReadBufBytes(b []byte) error {
+ _, err := tc.readBuf.Write(b)
+ return err
+}
+
+type TestSessionRecorder struct {
+ // buf holds data that was sent to the session recorder.
+ buf bytes.Buffer
+}
+
+func (t *TestSessionRecorder) Write(b []byte) (int, error) {
+ return t.buf.Write(b)
+}
+
+func (t *TestSessionRecorder) Close() error {
+ t.buf.Reset()
+ return nil
+}
+
+func (t *TestSessionRecorder) Bytes() []byte {
+ return t.buf.Bytes()
+}
+
+func CastLine(t *testing.T, p []byte, clock tstime.Clock) []byte {
+ t.Helper()
+ j, err := json.Marshal([]any{
+ clock.Now().Sub(clock.Now()).Seconds(),
+ "o",
+ string(p),
+ })
+ if err != nil {
+ t.Fatalf("error marshalling cast line: %v", err)
+ }
+ return append(j, '\n')
+}
+
+func AsciinemaResizeMsg(t *testing.T, width, height int) []byte {
+ t.Helper()
+ ch := tsrecorder.CastHeader{
+ Width: width,
+ Height: height,
+ }
+ bs, err := json.Marshal(ch)
+ if err != nil {
+ t.Fatalf("error marshalling CastHeader: %v", err)
+ }
+ return append(bs, '\n')
+}
diff --git a/k8s-operator/session-recording/hijacker.go b/k8s-operator/session-recording/hijacker.go
new file mode 100644
index 000000000..bbaee3ba7
--- /dev/null
+++ b/k8s-operator/session-recording/hijacker.go
@@ -0,0 +1,205 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+// Package sessionrecording has functionality for recording 'kubectl exec'
+// sessions and sending to a tsrecorder.
+package sessionrecording
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/netip"
+ "strings"
+
+ "go.uber.org/zap"
+ "tailscale.com/client/tailscale/apitype"
+ "tailscale.com/k8s-operator/session-recording/spdy"
+ "tailscale.com/k8s-operator/session-recording/tsrecorder"
+ "tailscale.com/k8s-operator/session-recording/ws"
+ "tailscale.com/tailcfg"
+ "tailscale.com/tsnet"
+ "tailscale.com/tstime"
+ "tailscale.com/util/clientmetric"
+ "tailscale.com/util/multierr"
+)
+
+const (
+ SPDYProtocol = "SPDY"
+ WebSocketsProtocol = "WebSockets"
+)
+
+var (
+ // counterSessionRecordingsAttempted counts the number of session recording attempts.
+ CounterSessionRecordingsAttempted = clientmetric.NewCounter("k8s_auth_proxy__session_recordings_attempted")
+
+ // counterSessionRecordingsUploaded counts the number of successfully uploaded session recordings.
+ CounterSessionRecordingsUploaded = clientmetric.NewCounter("k8s_auth_proxy_session_recordings_uploaded")
+)
+
+func New(ts *tsnet.Server, req *http.Request, who *apitype.WhoIsResponse, w http.ResponseWriter, pod, ns string, proto protocol, addrs []netip.AddrPort, failOpen bool, connFunc RecorderDialFn, log *zap.SugaredLogger) *SpdyHijacker {
+ return &SpdyHijacker{
+ ts: ts,
+ req: req,
+ who: who,
+ ResponseWriter: w,
+ pod: pod,
+ ns: ns,
+ addrs: addrs,
+ failOpen: failOpen,
+ connectToRecorder: connFunc,
+ proto: proto,
+ log: log,
+ }
+}
+
+// spdyHijacker implements [net/http.Hijacker] interface.
+// It must be configured with an http request for a 'kubectl exec' session that
+// needs to be recorded. It knows how to hijack the connection and configure for
+// the session contents to be sent to a tsrecorder instance.
+type SpdyHijacker struct {
+ http.ResponseWriter
+ ts *tsnet.Server
+ req *http.Request
+ who *apitype.WhoIsResponse
+ log *zap.SugaredLogger
+ pod string // pod being exec-d
+ ns string // namespace of the pod being exec-d
+ addrs []netip.AddrPort // tsrecorder addresses
+ failOpen bool // whether to fail open if recording fails
+ connectToRecorder RecorderDialFn
+ proto protocol
+}
+
+// protocol is the streaming protocol of the hijacked session. Supported
+// protocols are SPDY and WebSockets.
+type protocol string
+
+// RecorderDialFn dials the specified netip.AddrPorts that should be tsrecorder
+// addresses. It tries to connect to recorder endpoints one by one, till one
+// connection succeeds. In case of success, returns a list with a single
+// successful recording attempt and an error channel. If the connection errors
+// after having been established, an error is sent down the channel.
+type RecorderDialFn func(context.Context, []netip.AddrPort, func(context.Context, string, string) (net.Conn, error)) (io.WriteCloser, []*tailcfg.SSHRecordingAttempt, <-chan error, error)
+
+// Hijack hijacks a 'kubectl exec' session and configures for the session
+// contents to be sent to a recorder.
+func (h *SpdyHijacker) Hijack() (net.Conn, *bufio.ReadWriter, error) {
+ h.log.Infof("recorder addrs: %v, failOpen: %v", h.addrs, h.failOpen)
+ reqConn, brw, err := h.ResponseWriter.(http.Hijacker).Hijack()
+ if err != nil {
+ return nil, nil, fmt.Errorf("error hijacking connection: %w", err)
+ }
+
+ conn, err := h.setUpRecording(context.Background(), reqConn)
+ if err != nil {
+ return nil, nil, fmt.Errorf("error setting up session recording: %w", err)
+ }
+ return conn, brw, nil
+}
+
+// setupRecording attempts to connect to the recorders set via
+// spdyHijacker.addrs. Returns conn from provided opts, wrapped in recording
+// logic. If connecting to the recorder fails or an error is received during the
+// session and spdyHijacker.failOpen is false, connection will be closed.
+func (h *SpdyHijacker) setUpRecording(ctx context.Context, conn net.Conn) (net.Conn, error) {
+ const (
+ // https://docs.asciinema.org/manual/asciicast/v2/
+ asciicastv2 = 2
+ )
+ var wc io.WriteCloser
+ h.log.Infof("kubectl exec session will be recorded, recorders: %v, fail open policy: %t", h.addrs, h.failOpen)
+ // TODO (irbekrm): send client a message that session will be recorded.
+ rw, _, errChan, err := h.connectToRecorder(ctx, h.addrs, h.ts.Dial)
+ if err != nil {
+ msg := fmt.Sprintf("error connecting to session recorders: %v", err)
+ if h.failOpen {
+ msg = msg + "; failure mode is 'fail open'; continuing session without recording."
+ h.log.Warnf(msg)
+ return conn, nil
+ }
+ msg = msg + "; failure mode is 'fail closed'; closing connection."
+ if err := closeConnWithWarning(conn, msg); err != nil {
+ return nil, multierr.New(errors.New(msg), err)
+ }
+ return nil, errors.New(msg)
+ }
+ // TODO (irbekrm): log which recorder
+ h.log.Info("successfully connected to a session recorder")
+ wc = rw
+ cl := tstime.DefaultClock{}
+ rec := tsrecorder.New(wc, cl, cl.Now(), h.failOpen)
+
+ qp := h.req.URL.Query()
+ ch := tsrecorder.CastHeader{
+ Version: asciicastv2,
+ Timestamp: cl.Now().Unix(),
+ Command: strings.Join(qp["command"], " "),
+ SrcNode: strings.TrimSuffix(h.who.Node.Name, "."),
+ SrcNodeID: h.who.Node.StableID,
+ Kubernetes: &tsrecorder.Kubernetes{
+ PodName: h.pod,
+ Namespace: h.ns,
+ Container: strings.Join(qp["container"], " "),
+ },
+ }
+ if !h.who.Node.IsTagged() {
+ ch.SrcNodeUser = h.who.UserProfile.LoginName
+ ch.SrcNodeUserID = h.who.Node.User
+ } else {
+ ch.SrcNodeTags = h.who.Node.Tags
+ }
+ var lc net.Conn
+ switch h.proto {
+ case SPDYProtocol:
+ lc = spdy.New(conn, rec, ch, h.log)
+ case WebSocketsProtocol:
+ lc = ws.New(conn, rec, ch, h.log)
+ default:
+ return nil, fmt.Errorf("unknown protocol: %s", h.proto)
+ }
+
+ go func() {
+ var err error
+ select {
+ case <-ctx.Done():
+ return
+ case err = <-errChan:
+ }
+ if err == nil {
+ CounterSessionRecordingsUploaded.Add(1)
+ h.log.Info("finished uploading the recording")
+ return
+ }
+ msg := fmt.Sprintf("connection to the session recorder errorred: %v;", err)
+ if h.failOpen {
+ msg += msg + "; failure mode is 'fail open'; continuing session without recording."
+ h.log.Info(msg)
+ return
+ }
+ msg += "; failure mode set to 'fail closed'; closing connection"
+ h.log.Error(msg)
+ if err := lc.Close(); err != nil {
+ h.log.Infof("error closing recorder connections: %v", err)
+ }
+ return
+ }()
+
+ return lc, nil
+}
+
+func closeConnWithWarning(conn net.Conn, msg string) error {
+ b := io.NopCloser(bytes.NewBuffer([]byte(msg)))
+ resp := http.Response{Status: http.StatusText(http.StatusForbidden), StatusCode: http.StatusForbidden, Body: b}
+ if err := resp.Write(conn); err != nil {
+ return multierr.New(fmt.Errorf("error writing msg %q to conn: %v", msg, err), conn.Close())
+ }
+ return conn.Close()
+}
diff --git a/k8s-operator/session-recording/hijacker_test.go b/k8s-operator/session-recording/hijacker_test.go
new file mode 100644
index 000000000..cfc694d26
--- /dev/null
+++ b/k8s-operator/session-recording/hijacker_test.go
@@ -0,0 +1,123 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+package sessionrecording
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/netip"
+ "net/url"
+ "testing"
+ "time"
+
+ "go.uber.org/zap"
+ "tailscale.com/client/tailscale/apitype"
+ "tailscale.com/k8s-operator/session-recording/fakes"
+ "tailscale.com/tailcfg"
+ "tailscale.com/tsnet"
+ "tailscale.com/tstest"
+)
+
+func Test_SPDYHijacker(t *testing.T) {
+ zl, err := zap.NewDevelopment()
+ if err != nil {
+ t.Fatal(err)
+ }
+ tests := []struct {
+ name string
+ failOpen bool
+ failRecorderConnect bool // fail initial connect to the recorder
+ failRecorderConnPostConnect bool // send error down the error channel
+ proto protocol
+ wantsConnClosed bool
+ wantsSetupErr bool
+ }{
+ {
+ name: "spdy_setup_succeeds_conn_stays_open",
+ proto: SPDYProtocol,
+ },
+ {
+ name: "ws_setup_succeeds_conn_stays_open",
+ proto: WebSocketsProtocol,
+ },
+ {
+ name: "setup_fails_policy_is_to_fail_open_conn_stays_open",
+ failOpen: true,
+ failRecorderConnect: true,
+ proto: SPDYProtocol,
+ },
+ {
+ name: "setup_fails_policy_is_to_fail_closed_conn_is_closed",
+ failRecorderConnect: true,
+ wantsSetupErr: true,
+ wantsConnClosed: true,
+ proto: SPDYProtocol,
+ },
+ {
+ name: "connection_fails_post-initial_connect_policy_is_to_fail_open_conn_stays_open",
+ failRecorderConnPostConnect: true,
+ failOpen: true,
+ proto: SPDYProtocol,
+ },
+ {
+ name: "connection_fails_post-initial_connect_policy_is_to_fail_closed_conn_is_closed",
+ failRecorderConnPostConnect: true,
+ wantsConnClosed: true,
+ proto: SPDYProtocol,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tc := &fakes.TestConn{}
+ ch := make(chan error)
+ h := &SpdyHijacker{
+ connectToRecorder: func(context.Context, []netip.AddrPort, func(context.Context, string, string) (net.Conn, error)) (wc io.WriteCloser, rec []*tailcfg.SSHRecordingAttempt, _ <-chan error, err error) {
+ if tt.failRecorderConnect {
+ err = errors.New("test")
+ }
+ return wc, rec, ch, err
+ },
+ failOpen: tt.failOpen,
+ who: &apitype.WhoIsResponse{Node: &tailcfg.Node{}, UserProfile: &tailcfg.UserProfile{}},
+ log: zl.Sugar(),
+ ts: &tsnet.Server{},
+ req: &http.Request{URL: &url.URL{}},
+ proto: tt.proto,
+ }
+ ctx := context.Background()
+ _, err := h.setUpRecording(ctx, tc)
+ if (err != nil) != tt.wantsSetupErr {
+ t.Errorf("spdyHijacker.setupRecording() error = %v, wantErr %v", err, tt.wantsSetupErr)
+ return
+ }
+ if tt.failRecorderConnPostConnect {
+ select {
+ case ch <- errors.New("err"):
+ case <-time.After(time.Second * 15):
+ t.Errorf("error from recorder conn was not read within 15 seconds")
+ }
+ }
+ timeout := time.Second * 20
+ // TODO (irbekrm): cover case where an error is received
+ // over channel and the failure policy is to fail open
+ // (test that connection remains open over some period
+ // of time).
+ if err := tstest.WaitFor(timeout, func() (err error) {
+ if tt.wantsConnClosed != tc.IsClosed() {
+ return fmt.Errorf("got conIection state: %t, wants connection state: %t", tc.IsClosed(), tt.wantsConnClosed)
+ }
+ return nil
+ }); err != nil {
+ t.Errorf("connection did not reach the desired state within %s", timeout.String())
+ }
+ ctx.Done()
+ })
+ }
+}
diff --git a/k8s-operator/session-recording/spdy/conn.go b/k8s-operator/session-recording/spdy/conn.go
new file mode 100644
index 000000000..af27f27e6
--- /dev/null
+++ b/k8s-operator/session-recording/spdy/conn.go
@@ -0,0 +1,205 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+// Package spdy has functionality to parse 'kubectl exec' sessions streamed over
+// SPDY.
+package spdy
+
+import (
+ "bytes"
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "net"
+ "net/http"
+ "sync"
+ "sync/atomic"
+
+ "tailscale.com/k8s-operator/session-recording/tsrecorder"
+
+ "go.uber.org/zap"
+ corev1 "k8s.io/api/core/v1"
+)
+
+func New(conn net.Conn, rec *tsrecorder.Client, ch tsrecorder.CastHeader, log *zap.SugaredLogger) net.Conn {
+ return &spdyRemoteConnRecorder{
+ Conn: conn,
+ rec: rec,
+ ch: ch,
+ log: log,
+ }
+
+}
+
+// spdyRemoteConnRecorder is a wrapper around net.Conn. It reads the bytestream
+// for a 'kubectl exec' session, sends session recording data to the configured
+// recorder and forwards the raw bytes to the original destination.
+type spdyRemoteConnRecorder struct {
+ net.Conn
+ // rec knows how to send data written to it to a tsrecorder instance.
+ rec *tsrecorder.Client
+ ch tsrecorder.CastHeader
+
+ stdoutStreamID atomic.Uint32
+ stderrStreamID atomic.Uint32
+ resizeStreamID atomic.Uint32
+
+ wmu sync.Mutex // sequences writes
+ closed bool
+
+ rmu sync.Mutex // sequences reads
+ writeCastHeaderOnce sync.Once
+
+ zlibReqReader zlibReader
+ // writeBuf is used to store data written to the connection that has not
+ // yet been parsed as SPDY frames.
+ writeBuf bytes.Buffer
+ // readBuf is used to store data read from the connection that has not
+ // yet been parsed as SPDY frames.
+ readBuf bytes.Buffer
+ log *zap.SugaredLogger
+}
+
+// Read reads bytes from the original connection and parses them as SPDY frames.
+// If the frame is a data frame for resize stream, sends resize message to the
+// recorder. If the frame is a SYN_STREAM control frame that starts stdout,
+// stderr or resize stream, store the stream ID.
+func (c *spdyRemoteConnRecorder) Read(b []byte) (int, error) {
+ c.rmu.Lock()
+ defer c.rmu.Unlock()
+ n, err := c.Conn.Read(b)
+ if err != nil {
+ return n, fmt.Errorf("error reading from connection: %w", err)
+ }
+ c.readBuf.Write(b[:n])
+
+ var sf spdyFrame
+ ok, err := sf.Parse(c.readBuf.Bytes(), c.log)
+ if err != nil {
+ return 0, fmt.Errorf("error parsing data read from connection: %w", err)
+ }
+ if !ok {
+ // The parsed data in the buffer will be processed together with
+ // the new data on the next call to Read.
+ return n, nil
+ }
+ c.readBuf.Next(len(sf.Raw)) // advance buffer past the parsed frame
+
+ if !sf.Ctrl { // data frame
+ switch sf.StreamID {
+ case c.resizeStreamID.Load():
+ var err error
+ var msg tsrecorder.ResizeMsg
+ if err = json.Unmarshal(sf.Payload, &msg); err != nil {
+ return 0, err
+ }
+ c.ch.Width = msg.Width
+ c.ch.Height = msg.Height
+ }
+ return n, nil
+ }
+ // We always want to parse the headers, even if we don't care about the
+ // frame, as we need to advance the zlib reader otherwise we will get
+ // garbage.
+ header, err := sf.parseHeaders(&c.zlibReqReader, c.log)
+ if err != nil {
+ return 0, fmt.Errorf("error parsing frame headers: %w", err)
+ }
+ if sf.Type == SYN_STREAM {
+ c.storeStreamID(sf, header)
+ }
+ return n, nil
+}
+
+// Write forwards the raw data of the latest parsed SPDY frame to the original
+// destination. If the frame is an SPDY data frame, it also sends the payload to
+// the connected session recorder.
+func (c *spdyRemoteConnRecorder) Write(b []byte) (int, error) {
+ c.wmu.Lock()
+ defer c.wmu.Unlock()
+ c.writeBuf.Write(b)
+
+ var sf spdyFrame
+ ok, err := sf.Parse(c.writeBuf.Bytes(), c.log)
+ if err != nil {
+ return 0, fmt.Errorf("error parsing data: %w", err)
+ }
+ if !ok {
+ // The parsed data in the buffer will be processed together with
+ // the new data on the next call to Write.
+ return len(b), nil
+ }
+ c.writeBuf.Next(len(sf.Raw)) // advance buffer past the parsed frame
+
+ // If this is a stdout or stderr data frame, send its payload to the
+ // session recorder.
+ if !sf.Ctrl {
+ switch sf.StreamID {
+ case c.stdoutStreamID.Load(), c.stderrStreamID.Load():
+ var err error
+ c.writeCastHeaderOnce.Do(func() {
+
+ var j []byte
+ j, err = json.Marshal(c.ch)
+ if err != nil {
+ return
+ }
+ j = append(j, '\n')
+ err = c.rec.WriteCastLine(j)
+ if err != nil {
+ c.log.Errorf("received error from recorder: %v", err)
+ }
+ })
+ if err != nil {
+ return 0, fmt.Errorf("error writing CastHeader: %w", err)
+ }
+ if err := c.rec.Write(sf.Payload); err != nil {
+ return 0, fmt.Errorf("error sending payload to session recorder: %w", err)
+ }
+ }
+ }
+ // Forward the whole frame to the original destination.
+ _, err = c.Conn.Write(sf.Raw) // send to net.Conn
+ return len(b), err
+}
+
+func (c *spdyRemoteConnRecorder) Close() error {
+ c.wmu.Lock()
+ defer c.wmu.Unlock()
+ if c.closed {
+ return nil
+ }
+ // TODO: only do this if this is a normal closure rather than the
+ // reocrding has failed.
+ if c.writeBuf.Len() > 0 {
+ c.Conn.Write(c.writeBuf.Bytes())
+ }
+ c.writeBuf.Reset()
+ c.closed = true
+ err := c.Conn.Close()
+ c.rec.Close()
+ return err
+}
+
+// parseSynStream parses SYN_STREAM SPDY control frame and updates
+// spdyRemoteConnRecorder to store the newly created stream's ID if it is one of
+// the stream types we care about. Storing stream_id:stream_type mapping allows
+// us to parse received data frames (that have stream IDs) differently depening
+// on which stream they belong to (i.e send data frame payload for stdout stream
+// to session recorder).
+func (c *spdyRemoteConnRecorder) storeStreamID(sf spdyFrame, header http.Header) {
+ const (
+ streamTypeHeaderKey = "Streamtype"
+ )
+ id := binary.BigEndian.Uint32(sf.Payload[0:4])
+ switch header.Get(streamTypeHeaderKey) {
+ case corev1.StreamTypeStdout:
+ c.stdoutStreamID.Store(id)
+ case corev1.StreamTypeStderr:
+ c.stderrStreamID.Store(id)
+ case corev1.StreamTypeResize:
+ c.resizeStreamID.Store(id)
+ }
+}
diff --git a/k8s-operator/session-recording/spdy/conn_test.go b/k8s-operator/session-recording/spdy/conn_test.go
new file mode 100644
index 000000000..ce8c9ae49
--- /dev/null
+++ b/k8s-operator/session-recording/spdy/conn_test.go
@@ -0,0 +1,243 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+package spdy
+
+import (
+ "encoding/json"
+ "reflect"
+ "testing"
+
+ "go.uber.org/zap"
+ "tailscale.com/k8s-operator/session-recording/fakes"
+ "tailscale.com/k8s-operator/session-recording/tsrecorder"
+ "tailscale.com/tstest"
+)
+
+// Test_Writes tests that 1 or more Write calls to spdyRemoteConnRecorder
+// results in the expected data being forwarded to the original destination and
+// the session recorder.
+func Test_Writes(t *testing.T) {
+ var stdoutStreamID, stderrStreamID uint32 = 1, 2
+ zl, err := zap.NewDevelopment()
+ if err != nil {
+ t.Fatal(err)
+ }
+ cl := tstest.NewClock(tstest.ClockOpts{})
+ tests := []struct {
+ name string
+ inputs [][]byte
+ wantForwarded []byte
+ wantRecorded []byte
+ firstWrite bool
+ width int
+ height int
+ }{
+ {
+ name: "single_write_control_frame_with_payload",
+ inputs: [][]byte{{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x5}},
+ wantForwarded: []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x5},
+ },
+ {
+ name: "two_writes_control_frame_with_leftover",
+ inputs: [][]byte{{0x80, 0x3, 0x0, 0x1}, {0x0, 0x0, 0x0, 0x1, 0x5, 0x80, 0x3}},
+ wantForwarded: []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x5},
+ },
+ {
+ name: "single_write_stdout_data_frame",
+ inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0}},
+ wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0},
+ },
+ {
+ name: "single_write_stdout_data_frame_with_payload",
+ inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}},
+ wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5},
+ wantRecorded: fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl),
+ },
+ {
+ name: "single_write_stderr_data_frame_with_payload",
+ inputs: [][]byte{{0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}},
+ wantForwarded: []byte{0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5},
+ wantRecorded: fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl),
+ },
+ {
+ name: "single_data_frame_unknow_stream_with_payload",
+ inputs: [][]byte{{0x0, 0x0, 0x0, 0x7, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}},
+ wantForwarded: []byte{0x0, 0x0, 0x0, 0x7, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5},
+ },
+ {
+ name: "control_frame_and_data_frame_split_across_two_writes",
+ inputs: [][]byte{{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, {0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}},
+ wantForwarded: []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5},
+ wantRecorded: fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl),
+ },
+ {
+ name: "single_first_write_stdout_data_frame_with_payload",
+ inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5}},
+ wantForwarded: []byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x5, 0x1, 0x2, 0x3, 0x4, 0x5},
+ wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x1, 0x2, 0x3, 0x4, 0x5}, cl)...),
+ width: 10,
+ height: 20,
+ firstWrite: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tc := &fakes.TestConn{}
+ sr := &fakes.TestSessionRecorder{}
+ rec := tsrecorder.New(sr, cl, cl.Now(), true)
+
+ c := &spdyRemoteConnRecorder{
+ Conn: tc,
+ log: zl.Sugar(),
+ rec: rec,
+ ch: tsrecorder.CastHeader{
+ Width: tt.width,
+ Height: tt.height,
+ },
+ }
+ if !tt.firstWrite {
+ // This test case does not intend to test that cast header gets written once.
+ c.writeCastHeaderOnce.Do(func() {})
+ }
+
+ c.stdoutStreamID.Store(stdoutStreamID)
+ c.stderrStreamID.Store(stderrStreamID)
+ for i, input := range tt.inputs {
+ if _, err := c.Write(input); err != nil {
+ t.Errorf("[%d] spdyRemoteConnRecorder.Write() unexpected error %v", i, err)
+ }
+ }
+
+ // Assert that the expected bytes have been forwarded to the original destination.
+ gotForwarded := tc.WriteBufBytes()
+ if !reflect.DeepEqual(gotForwarded, tt.wantForwarded) {
+ t.Errorf("expected bytes not forwarded, wants\n%v\ngot\n%v", tt.wantForwarded, gotForwarded)
+ }
+
+ // Assert that the expected bytes have been forwarded to the session recorder.
+ gotRecorded := sr.Bytes()
+ if !reflect.DeepEqual(gotRecorded, tt.wantRecorded) {
+ t.Errorf("expected bytes not recorded, wants\n%v\ngot\n%v", tt.wantRecorded, gotRecorded)
+ }
+ })
+ }
+}
+
+// Test_Reads tests that 1 or more Read calls to spdyRemoteConnRecorder results
+// in the expected data being forwarded to the original destination and the
+// session recorder.
+func Test_Reads(t *testing.T) {
+ zl, err := zap.NewDevelopment()
+ if err != nil {
+ t.Fatal(err)
+ }
+ cl := tstest.NewClock(tstest.ClockOpts{})
+ var reader zlibReader
+ resizeMsg := resizeMsgBytes(t, 10, 20)
+ synStreamStdoutPayload := payload(t, map[string]string{"Streamtype": "stdout"}, SYN_STREAM, 1)
+ synStreamStderrPayload := payload(t, map[string]string{"Streamtype": "stderr"}, SYN_STREAM, 2)
+ synStreamResizePayload := payload(t, map[string]string{"Streamtype": "resize"}, SYN_STREAM, 3)
+ syn_stream_ctrl_header := []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, uint8(len(synStreamStdoutPayload))}
+
+ tests := []struct {
+ name string
+ inputs [][]byte
+ wantStdoutStreamID uint32
+ wantStderrStreamID uint32
+ wantResizeStreamID uint32
+ wantWidth int
+ wantHeight int
+ resizeStreamIDBeforeRead uint32
+ }{
+ {
+ name: "resize_data_frame_single_read",
+ inputs: [][]byte{append([]byte{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, uint8(len(resizeMsg))}, resizeMsg...)},
+ resizeStreamIDBeforeRead: 1,
+ wantWidth: 10,
+ wantHeight: 20,
+ },
+ {
+ name: "resize_data_frame_two_reads",
+ inputs: [][]byte{{0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, uint8(len(resizeMsg))}, resizeMsg},
+ resizeStreamIDBeforeRead: 1,
+ wantWidth: 10,
+ wantHeight: 20,
+ },
+ {
+ name: "syn_stream_ctrl_frame_stdout_single_read",
+ inputs: [][]byte{append(syn_stream_ctrl_header, synStreamStdoutPayload...)},
+ wantStdoutStreamID: 1,
+ },
+ {
+ name: "syn_stream_ctrl_frame_stderr_single_read",
+ inputs: [][]byte{append(syn_stream_ctrl_header, synStreamStderrPayload...)},
+ wantStderrStreamID: 2,
+ },
+ {
+ name: "syn_stream_ctrl_frame_resize_single_read",
+ inputs: [][]byte{append(syn_stream_ctrl_header, synStreamResizePayload...)},
+ wantResizeStreamID: 3,
+ },
+ {
+ name: "syn_stream_ctrl_frame_resize_four_reads_with_leftover",
+ inputs: [][]byte{syn_stream_ctrl_header, append(synStreamResizePayload, syn_stream_ctrl_header...), append(synStreamStderrPayload, syn_stream_ctrl_header...), append(synStreamStdoutPayload, 0x0, 0x3)},
+ wantStdoutStreamID: 1,
+ wantStderrStreamID: 2,
+ wantResizeStreamID: 3,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tc := &fakes.TestConn{}
+ sr := &fakes.TestSessionRecorder{}
+ rec := tsrecorder.New(sr, cl, cl.Now(), true)
+ c := &spdyRemoteConnRecorder{
+ Conn: tc,
+ log: zl.Sugar(),
+ rec: rec,
+ }
+ c.resizeStreamID.Store(tt.resizeStreamIDBeforeRead)
+
+ for i, input := range tt.inputs {
+ c.zlibReqReader = reader
+ tc.ResetReadBuf()
+ if err := tc.WriteReadBufBytes(input); err != nil {
+ t.Fatalf("writing bytes to test conn: %v", err)
+ }
+ _, err = c.Read(make([]byte, len(input)))
+ if err != nil {
+ t.Errorf("[%d] spdyRemoteConnRecorder.Read() resulted in an unexpected error: %v", i, err)
+ }
+ }
+ if id := c.resizeStreamID.Load(); id != tt.wantResizeStreamID && id != tt.resizeStreamIDBeforeRead {
+ t.Errorf("wants resizeStreamID: %d, got %d", tt.wantResizeStreamID, id)
+ }
+ if id := c.stderrStreamID.Load(); id != tt.wantStderrStreamID {
+ t.Errorf("wants stderrStreamID: %d, got %d", tt.wantStderrStreamID, id)
+ }
+ if id := c.stdoutStreamID.Load(); id != tt.wantStdoutStreamID {
+ t.Errorf("wants stdoutStreamID: %d, got %d", tt.wantStdoutStreamID, id)
+ }
+ if tt.wantHeight != 0 || tt.wantWidth != 0 {
+ if tt.wantWidth != c.ch.Width {
+ t.Errorf("wants width: %v, got %v", tt.wantWidth, c.ch.Width)
+ }
+ if tt.wantHeight != c.ch.Height {
+ t.Errorf("want height: %v, got %v", tt.wantHeight, c.ch.Height)
+ }
+ }
+ })
+ }
+}
+
+func resizeMsgBytes(t *testing.T, width, height int) []byte {
+ t.Helper()
+ bs, err := json.Marshal(tsrecorder.ResizeMsg{Width: width, Height: height})
+ if err != nil {
+ t.Fatalf("error marshalling resizeMsg: %v", err)
+ }
+ return bs
+}
diff --git a/k8s-operator/session-recording/spdy/frame.go b/k8s-operator/session-recording/spdy/frame.go
new file mode 100644
index 000000000..54b29d33a
--- /dev/null
+++ b/k8s-operator/session-recording/spdy/frame.go
@@ -0,0 +1,285 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+package spdy
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "net/http"
+ "sync"
+
+ "go.uber.org/zap"
+)
+
+const (
+ SYN_STREAM ControlFrameType = 1 // https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.1
+ SYN_REPLY ControlFrameType = 2 // https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.2
+ SYN_PING ControlFrameType = 6 // https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.5
+)
+
+// spdyFrame is a parsed SPDY frame as defined in
+// https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt
+// A SPDY frame can be either a control frame or a data frame.
+type spdyFrame struct {
+ Raw []byte // full frame as raw bytes
+
+ // Common frame fields:
+ Ctrl bool // true if this is a SPDY control frame
+ Payload []byte // payload as raw bytes
+
+ // Control frame fields:
+ Version uint16 // SPDY protocol version
+ Type ControlFrameType
+
+ // Data frame fields:
+ // StreamID is the id of the steam to which this data frame belongs.
+ // SPDY allows transmitting multiple data streams concurrently.
+ StreamID uint32
+}
+
+// Type of an SPDY control frame.
+type ControlFrameType uint16
+
+// Parse parses bytes into spdyFrame.
+// If the bytes don't contain a full frame, return false.
+//
+// Control frame structure:
+//
+// +----------------------------------+
+// |C| Version(15bits) | Type(16bits) |
+// +----------------------------------+
+// | Flags (8) | Length (24 bits) |
+// +----------------------------------+
+// | Data |
+// +----------------------------------+
+//
+// Data frame structure:
+//
+// +----------------------------------+
+// |C| Stream-ID (31bits) |
+// +----------------------------------+
+// | Flags (8) | Length (24 bits) |
+// +----------------------------------+
+// | Data |
+// +----------------------------------+
+//
+// https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt
+func (sf *spdyFrame) Parse(b []byte, log *zap.SugaredLogger) (ok bool, _ error) {
+ const (
+ spdyHeaderLength = 8
+ )
+ have := len(b)
+ if have < spdyHeaderLength { // input does not contain full frame
+ return false, nil
+ }
+
+ if !isSPDYFrameHeader(b) {
+ return false, fmt.Errorf("bytes %v do not seem to contain SPDY frames. Ensure that you are using a SPDY based client to 'kubectl exec'.", b)
+ }
+
+ payloadLength := readInt24(b[5:8])
+ frameLength := payloadLength + spdyHeaderLength
+ if have < frameLength { // input does not contain full frame
+ return false, nil
+ }
+
+ frame := b[:frameLength:frameLength] // enforce frameLength capacity
+
+ sf.Raw = frame
+ sf.Payload = frame[spdyHeaderLength:frameLength]
+
+ sf.Ctrl = hasControlBitSet(frame)
+
+ if !sf.Ctrl { // data frame
+ sf.StreamID = dataFrameStreamID(frame)
+ return true, nil
+ }
+
+ sf.Version = controlFrameVersion(frame)
+ sf.Type = controlFrameType(frame)
+ return true, nil
+}
+
+// parseHeaders retrieves any headers from this spdyFrame.
+func (sf *spdyFrame) parseHeaders(z *zlibReader, log *zap.SugaredLogger) (http.Header, error) {
+ if !sf.Ctrl {
+ return nil, fmt.Errorf("[unexpected] parseHeaders called for a frame that is not a control frame")
+ }
+ const (
+ // +------------------------------------+
+ // |X| Stream-ID (31bits) |
+ // +------------------------------------+
+ // |X| Associated-To-Stream-ID (31bits) |
+ // +------------------------------------+
+ // | Pri|Unused | Slot | |
+ // +-------------------+ |
+ synStreamPayloadLengthBeforeHeaders = 10
+
+ // +------------------------------------+
+ // |X| Stream-ID (31bits) |
+ //+------------------------------------+
+ synReplyPayloadLengthBeforeHeaders = 4
+
+ // +----------------------------------|
+ // | 32-bit ID |
+ // +----------------------------------+
+ pingPayloadLength = 4
+ )
+
+ switch sf.Type {
+ case SYN_STREAM:
+ if len(sf.Payload) < synStreamPayloadLengthBeforeHeaders {
+ return nil, fmt.Errorf("SYN_STREAM frame too short: %v", len(sf.Payload))
+ }
+ z.Set(sf.Payload[synStreamPayloadLengthBeforeHeaders:])
+ return parseHeaders(z, log)
+ case SYN_REPLY:
+ if len(sf.Payload) < synReplyPayloadLengthBeforeHeaders {
+ return nil, fmt.Errorf("SYN_REPLY frame too short: %v", len(sf.Payload))
+ }
+ if len(sf.Payload) == synReplyPayloadLengthBeforeHeaders {
+ return nil, nil // no headers
+ }
+ z.Set(sf.Payload[synReplyPayloadLengthBeforeHeaders:])
+ return parseHeaders(z, log)
+ case SYN_PING:
+ if len(sf.Payload) != pingPayloadLength {
+ return nil, fmt.Errorf("PING frame with unexpected length %v", len(sf.Payload))
+ }
+ return nil, nil // ping frame has no headers
+
+ default:
+ log.Infof("[unexpected] unknown control frame type %v", sf.Type)
+ }
+ return nil, nil
+}
+
+// parseHeaders expects to be passed a reader that contains a compressed SPDY control
+// frame Name/Value Header Block with 0 or more headers:
+//
+// | Number of Name/Value pairs (int32) | <+
+// +------------------------------------+ |
+// | Length of name (int32) | | This section is the "Name/Value
+// +------------------------------------+ | Header Block", and is compressed.
+// | Name (string) | |
+// +------------------------------------+ |
+// | Length of value (int32) | |
+// +------------------------------------+ |
+// | Value (string) | |
+// +------------------------------------+ |
+// | (repeats) | <+
+//
+// It extracts the headers and returns them as http.Header. By doing that it
+// also advances the provided reader past the headers block.
+// See also https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.10
+func parseHeaders(decompressor io.Reader, log *zap.SugaredLogger) (http.Header, error) {
+ buf := bufPool.Get().(*bytes.Buffer)
+ defer bufPool.Put(buf)
+ buf.Reset()
+
+ // readUint32 reads the next 4 decompressed bytes from the decompressor
+ // as a uint32.
+ readUint32 := func() (uint32, error) {
+ const uint32Length = 4
+ if _, err := io.CopyN(buf, decompressor, uint32Length); err != nil { // decompress
+ return 0, fmt.Errorf("error decompressing bytes: %w", err)
+ }
+ return binary.BigEndian.Uint32(buf.Next(uint32Length)), nil // return as uint32
+ }
+
+ // readLenBytes decompresses and returns as bytes the next 'Name' or 'Value'
+ // field from SPDY Name/Value header block. decompressor must be at
+ // 'Length of name'/'Length of value' field.
+ readLenBytes := func() ([]byte, error) {
+ xLen, err := readUint32() // length of field to read
+ if err != nil {
+ return nil, err
+ }
+ if _, err := io.CopyN(buf, decompressor, int64(xLen)); err != nil { // decompress
+ return nil, err
+ }
+ return buf.Next(int(xLen)), nil
+ }
+
+ numHeaders, err := readUint32()
+ if err != nil {
+ return nil, fmt.Errorf("error determining num headers: %v", err)
+ }
+ h := make(http.Header, numHeaders)
+ for i := uint32(0); i < numHeaders; i++ {
+ name, err := readLenBytes()
+ if err != nil {
+ return nil, err
+ }
+ ns := string(name)
+ if _, ok := h[ns]; ok {
+ return nil, fmt.Errorf("invalid data: duplicate header %q", ns)
+ }
+ val, err := readLenBytes()
+ if err != nil {
+ return nil, fmt.Errorf("error reading header data: %w", err)
+ }
+ for _, v := range bytes.Split(val, headerSep) {
+ h.Add(ns, string(v))
+ }
+ }
+ return h, nil
+}
+
+// isSPDYFrame validates that the input bytes start with a valid SPDY frame
+// header.
+func isSPDYFrameHeader(f []byte) bool {
+ if hasControlBitSet(f) {
+ // If this is a control frame, version and type must be set.
+ return controlFrameVersion(f) != uint16(0) && uint16(controlFrameType(f)) != uint16(0)
+ }
+ // If this is a data frame, stream ID must be set.
+ return dataFrameStreamID(f) != uint32(0)
+}
+
+// spdyDataFrameStreamID returns stream ID for an SPDY data frame passed as the
+// input data slice. StreaID is contained within bits [0-31) of a data frame
+// header.
+func dataFrameStreamID(frame []byte) uint32 {
+ return binary.BigEndian.Uint32(frame[0:4]) & 0x7f
+}
+
+// controlFrameType returns the type of a SPDY control frame.
+// See https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6
+func controlFrameType(f []byte) ControlFrameType {
+ return ControlFrameType(binary.BigEndian.Uint16(f[2:4]))
+}
+
+// spdyControlFrameVersion returns SPDY version extracted from input bytes that
+// must be a SPDY control frame.
+func controlFrameVersion(frame []byte) uint16 {
+ bs := binary.BigEndian.Uint16(frame[0:2]) // first 16 bits
+ return bs & 0x7f // discard control bit
+}
+
+// hasControlBitSet returns true if the passsed bytes have SPDY control bit set.
+// SPDY frames can be either control frames or data frames. A control frame has
+// control bit set to 1 and a data frame has it set to 0.
+func hasControlBitSet(frame []byte) bool {
+ return frame[0]&0x80 == 128 // 0x80
+}
+
+var bufPool = sync.Pool{
+ New: func() any {
+ return new(bytes.Buffer)
+ },
+}
+
+// Headers in SPDY header name/value block are separated by a 0 byte.
+// https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.10
+var headerSep = []byte{0}
+
+func readInt24(b []byte) int {
+ _ = b[2] // bounds check hint to compiler; see golang.org/issue/14808
+ return int(b[0])<<16 | int(b[1])<<8 | int(b[2])
+}
diff --git a/k8s-operator/session-recording/spdy/frame_test.go b/k8s-operator/session-recording/spdy/frame_test.go
new file mode 100644
index 000000000..c6aa4cf01
--- /dev/null
+++ b/k8s-operator/session-recording/spdy/frame_test.go
@@ -0,0 +1,293 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+package spdy
+
+import (
+ "bytes"
+ "compress/zlib"
+ "encoding/binary"
+ "io"
+ "net/http"
+ "reflect"
+ "strings"
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "go.uber.org/zap"
+)
+
+func Test_spdyFrame_Parse(t *testing.T) {
+ zl, err := zap.NewDevelopment()
+ if err != nil {
+ t.Fatal(err)
+ }
+ tests := []struct {
+ name string
+ gotBytes []byte
+ wantFrame spdyFrame
+ wantOk bool
+ wantErr bool
+ }{
+ {
+ name: "control_frame_syn_stream",
+ gotBytes: []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0},
+ wantFrame: spdyFrame{
+ Version: 3,
+ Type: SYN_STREAM,
+ Ctrl: true,
+ Raw: []byte{0x80, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0},
+ Payload: []byte{},
+ },
+ wantOk: true,
+ },
+ {
+ name: "control_frame_syn_reply",
+ gotBytes: []byte{0x80, 0x3, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0},
+ wantFrame: spdyFrame{
+ Ctrl: true,
+ Version: 3,
+ Type: SYN_REPLY,
+ Raw: []byte{0x80, 0x3, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0},
+ Payload: []byte{},
+ },
+ wantOk: true,
+ },
+ {
+ name: "control_frame_headers",
+ gotBytes: []byte{0x80, 0x3, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0},
+ wantFrame: spdyFrame{
+ Ctrl: true,
+ Version: 3,
+ Type: 8,
+ Raw: []byte{0x80, 0x3, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0},
+ Payload: []byte{},
+ },
+ wantOk: true,
+ },
+ {
+ name: "data_frame_stream_id_5",
+ gotBytes: []byte{0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x0},
+ wantFrame: spdyFrame{
+ Payload: []byte{},
+ StreamID: 5,
+ Raw: []byte{0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x0},
+ },
+ wantOk: true,
+ },
+ {
+ name: "frame_with_incomplete_header",
+ gotBytes: []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
+ },
+ {
+ name: "frame_with_incomplete_payload",
+ gotBytes: []byte{0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0x0, 0x2}, // header specifies payload length of 2
+ },
+ {
+ name: "control_bit_set_not_spdy_frame",
+ gotBytes: []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // header specifies payload length of 2
+ wantErr: true,
+ },
+ {
+ name: "control_bit_not_set_not_spdy_frame",
+ gotBytes: []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, // header specifies payload length of 2
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ sf := &spdyFrame{}
+ gotOk, err := sf.Parse(tt.gotBytes, zl.Sugar())
+ if (err != nil) != tt.wantErr {
+ t.Errorf("spdyFrame.Parse() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if gotOk != tt.wantOk {
+ t.Errorf("spdyFrame.Parse() = %v, want %v", gotOk, tt.wantOk)
+ }
+ if diff := cmp.Diff(*sf, tt.wantFrame); diff != "" {
+ t.Errorf("Unexpected SPDY frame (-got +want):\n%s", diff)
+ }
+ })
+ }
+}
+
+func Test_spdyFrame_parseHeaders(t *testing.T) {
+ zl, err := zap.NewDevelopment()
+ if err != nil {
+ t.Fatal(err)
+ }
+ tests := []struct {
+ name string
+ isCtrl bool
+ payload []byte
+ typ ControlFrameType
+ wantHeader http.Header
+ wantErr bool
+ }{
+ {
+ name: "syn_stream_with_header",
+ payload: payload(t, map[string]string{"Streamtype": "stdin"}, SYN_STREAM, 1),
+ typ: SYN_STREAM,
+ isCtrl: true,
+ wantHeader: header(map[string]string{"Streamtype": "stdin"}),
+ },
+ {
+ name: "syn_ping",
+ payload: payload(t, nil, SYN_PING, 0),
+ typ: SYN_PING,
+ isCtrl: true,
+ },
+ {
+ name: "syn_reply_headers",
+ payload: payload(t, map[string]string{"foo": "bar", "bar": "baz"}, SYN_REPLY, 0),
+ typ: SYN_REPLY,
+ isCtrl: true,
+ wantHeader: header(map[string]string{"foo": "bar", "bar": "baz"}),
+ },
+ {
+ name: "syn_reply_no_headers",
+ payload: payload(t, nil, SYN_REPLY, 0),
+ typ: SYN_REPLY,
+ isCtrl: true,
+ },
+ {
+ name: "syn_stream_too_short_payload",
+ payload: []byte{0, 1, 2, 3, 4},
+ typ: SYN_STREAM,
+ isCtrl: true,
+ wantErr: true,
+ },
+ {
+ name: "syn_reply_too_short_payload",
+ payload: []byte{0, 1, 2},
+ typ: SYN_REPLY,
+ isCtrl: true,
+ wantErr: true,
+ },
+ {
+ name: "syn_ping_too_short_payload",
+ payload: []byte{0, 1, 2},
+ typ: SYN_PING,
+ isCtrl: true,
+ wantErr: true,
+ },
+ {
+ name: "not_a_control_frame",
+ payload: []byte{0, 1, 2, 3},
+ typ: SYN_PING,
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ var reader zlibReader
+ t.Run(tt.name, func(t *testing.T) {
+ sf := &spdyFrame{
+ Ctrl: tt.isCtrl,
+ Type: tt.typ,
+ Payload: tt.payload,
+ }
+ gotHeader, err := sf.parseHeaders(&reader, zl.Sugar())
+ if (err != nil) != tt.wantErr {
+ t.Errorf("spdyFrame.parseHeaders() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ if !reflect.DeepEqual(gotHeader, tt.wantHeader) {
+ t.Errorf("spdyFrame.parseHeaders() = %v, want %v", gotHeader, tt.wantHeader)
+ }
+ })
+ }
+}
+
+// payload takes a control frame type and a map with 0 or more header keys and
+// values and returns a SPDY control frame payload with the header as SPDY zlib
+// compressed header name/value block. The payload is padded with arbitrary
+// bytes to ensure the header name/value block is in the correct position for
+// the frame type.
+func payload(t *testing.T, headerM map[string]string, typ ControlFrameType, streamID int) []byte {
+ t.Helper()
+
+ buf := bytes.NewBuffer([]byte{})
+ writeControlFramePayloadBeforeHeaders(t, buf, typ, streamID)
+ if len(headerM) == 0 {
+ return buf.Bytes()
+ }
+
+ w, err := zlib.NewWriterLevelDict(buf, zlib.BestCompression, spdyTxtDictionary)
+ if err != nil {
+ t.Fatalf("error creating new zlib writer: %v", err)
+ }
+ if len(headerM) != 0 {
+ writeHeaderValueBlock(t, w, headerM)
+ }
+ if err != nil {
+ t.Fatalf("error writing headers: %v", err)
+ }
+ w.Flush()
+ return buf.Bytes()
+}
+
+// writeControlFramePayloadBeforeHeaders writes to w N bytes, N being the number
+// of bytes that control frame payload for that control frame is required to
+// contain before the name/value header block.
+func writeControlFramePayloadBeforeHeaders(t *testing.T, w io.Writer, typ ControlFrameType, streamID int) {
+ t.Helper()
+ switch typ {
+ case SYN_STREAM:
+ // needs 10 bytes in payload before any headers
+ if err := binary.Write(w, binary.BigEndian, uint32(streamID)); err != nil {
+ t.Fatalf("writing streamID: %v", err)
+ }
+ if err := binary.Write(w, binary.BigEndian, [6]byte{0}); err != nil {
+ t.Fatalf("writing payload: %v", err)
+ }
+ case SYN_REPLY:
+ // needs 4 bytes in payload before any headers
+ if err := binary.Write(w, binary.BigEndian, uint32(0)); err != nil {
+ t.Fatalf("writing payload: %v", err)
+ }
+ case SYN_PING:
+ // needs 4 bytes in payload
+ if err := binary.Write(w, binary.BigEndian, uint32(0)); err != nil {
+ t.Fatalf("writing payload: %v", err)
+ }
+ default:
+ t.Fatalf("unexpected frame type: %v", typ)
+ }
+}
+
+// writeHeaderValue block takes http.Header and zlib writer, writes the headers
+// as SPDY zlib compressed bytes to the writer.
+// Adopted from https://github.com/moby/spdystream/blob/v0.2.0/spdy/write.go#L171-L198 (which is also what Kubernetes uses).
+func writeHeaderValueBlock(t *testing.T, w io.Writer, headerM map[string]string) {
+ t.Helper()
+ h := header(headerM)
+ if err := binary.Write(w, binary.BigEndian, uint32(len(h))); err != nil {
+ t.Fatalf("error writing header block length: %v", err)
+ }
+ for name, values := range h {
+ if err := binary.Write(w, binary.BigEndian, uint32(len(name))); err != nil {
+ t.Fatalf("error writing name length for name %q: %v", name, err)
+ }
+ name = strings.ToLower(name)
+ if _, err := io.WriteString(w, name); err != nil {
+ t.Fatalf("error writing name %q: %v", name, err)
+ }
+ v := strings.Join(values, string(headerSep))
+ if err := binary.Write(w, binary.BigEndian, uint32(len(v))); err != nil {
+ t.Fatalf("error writing value length for value %q: %v", v, err)
+ }
+ if _, err := io.WriteString(w, v); err != nil {
+ t.Fatalf("error writing value %q: %v", v, err)
+ }
+ }
+}
+
+func header(hs map[string]string) http.Header {
+ h := make(http.Header, len(hs))
+ for key, val := range hs {
+ h.Add(key, val)
+ }
+ return h
+}
diff --git a/k8s-operator/session-recording/spdy/zlib-reader.go b/k8s-operator/session-recording/spdy/zlib-reader.go
new file mode 100644
index 000000000..1eb654be3
--- /dev/null
+++ b/k8s-operator/session-recording/spdy/zlib-reader.go
@@ -0,0 +1,221 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+package spdy
+
+import (
+ "bytes"
+ "compress/zlib"
+ "io"
+)
+
+// zlibReader contains functionality to parse zlib compressed SPDY data.
+// See https://www.ietf.org/archive/id/draft-mbelshe-httpbis-spdy-00.txt section 2.6.10.1
+type zlibReader struct {
+ io.ReadCloser
+ underlying io.LimitedReader // zlib compressed SPDY data
+}
+
+// Read decompresses zlibReader's underlying zlib compressed SPDY data and reads
+// it into b.
+func (z *zlibReader) Read(b []byte) (int, error) {
+ if z.ReadCloser == nil {
+ r, err := zlib.NewReaderDict(&z.underlying, spdyTxtDictionary)
+ if err != nil {
+ return 0, err
+ }
+ z.ReadCloser = r
+ }
+ return z.ReadCloser.Read(b)
+}
+
+// Set sets zlibReader's underlying data. b must be zlib compressed SPDY data.
+func (z *zlibReader) Set(b []byte) {
+ z.underlying.R = bytes.NewReader(b)
+ z.underlying.N = int64(len(b))
+}
+
+// spdyTxtDictionary is the dictionary defined in the SPDY spec.
+// https://datatracker.ietf.org/doc/html/draft-mbelshe-httpbis-spdy-00#section-2.6.10.1
+var spdyTxtDictionary = []byte{
+ 0x00, 0x00, 0x00, 0x07, 0x6f, 0x70, 0x74, 0x69, // - - - - o p t i
+ 0x6f, 0x6e, 0x73, 0x00, 0x00, 0x00, 0x04, 0x68, // o n s - - - - h
+ 0x65, 0x61, 0x64, 0x00, 0x00, 0x00, 0x04, 0x70, // e a d - - - - p
+ 0x6f, 0x73, 0x74, 0x00, 0x00, 0x00, 0x03, 0x70, // o s t - - - - p
+ 0x75, 0x74, 0x00, 0x00, 0x00, 0x06, 0x64, 0x65, // u t - - - - d e
+ 0x6c, 0x65, 0x74, 0x65, 0x00, 0x00, 0x00, 0x05, // l e t e - - - -
+ 0x74, 0x72, 0x61, 0x63, 0x65, 0x00, 0x00, 0x00, // t r a c e - - -
+ 0x06, 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x00, // - a c c e p t -
+ 0x00, 0x00, 0x0e, 0x61, 0x63, 0x63, 0x65, 0x70, // - - - a c c e p
+ 0x74, 0x2d, 0x63, 0x68, 0x61, 0x72, 0x73, 0x65, // t - c h a r s e
+ 0x74, 0x00, 0x00, 0x00, 0x0f, 0x61, 0x63, 0x63, // t - - - - a c c
+ 0x65, 0x70, 0x74, 0x2d, 0x65, 0x6e, 0x63, 0x6f, // e p t - e n c o
+ 0x64, 0x69, 0x6e, 0x67, 0x00, 0x00, 0x00, 0x0f, // d i n g - - - -
+ 0x61, 0x63, 0x63, 0x65, 0x70, 0x74, 0x2d, 0x6c, // a c c e p t - l
+ 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x00, // a n g u a g e -
+ 0x00, 0x00, 0x0d, 0x61, 0x63, 0x63, 0x65, 0x70, // - - - a c c e p
+ 0x74, 0x2d, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x73, // t - r a n g e s
+ 0x00, 0x00, 0x00, 0x03, 0x61, 0x67, 0x65, 0x00, // - - - - a g e -
+ 0x00, 0x00, 0x05, 0x61, 0x6c, 0x6c, 0x6f, 0x77, // - - - a l l o w
+ 0x00, 0x00, 0x00, 0x0d, 0x61, 0x75, 0x74, 0x68, // - - - - a u t h
+ 0x6f, 0x72, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, // o r i z a t i o
+ 0x6e, 0x00, 0x00, 0x00, 0x0d, 0x63, 0x61, 0x63, // n - - - - c a c
+ 0x68, 0x65, 0x2d, 0x63, 0x6f, 0x6e, 0x74, 0x72, // h e - c o n t r
+ 0x6f, 0x6c, 0x00, 0x00, 0x00, 0x0a, 0x63, 0x6f, // o l - - - - c o
+ 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, // n n e c t i o n
+ 0x00, 0x00, 0x00, 0x0c, 0x63, 0x6f, 0x6e, 0x74, // - - - - c o n t
+ 0x65, 0x6e, 0x74, 0x2d, 0x62, 0x61, 0x73, 0x65, // e n t - b a s e
+ 0x00, 0x00, 0x00, 0x10, 0x63, 0x6f, 0x6e, 0x74, // - - - - c o n t
+ 0x65, 0x6e, 0x74, 0x2d, 0x65, 0x6e, 0x63, 0x6f, // e n t - e n c o
+ 0x64, 0x69, 0x6e, 0x67, 0x00, 0x00, 0x00, 0x10, // d i n g - - - -
+ 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, // c o n t e n t -
+ 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, // l a n g u a g e
+ 0x00, 0x00, 0x00, 0x0e, 0x63, 0x6f, 0x6e, 0x74, // - - - - c o n t
+ 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, // e n t - l e n g
+ 0x74, 0x68, 0x00, 0x00, 0x00, 0x10, 0x63, 0x6f, // t h - - - - c o
+ 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x6f, // n t e n t - l o
+ 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x00, 0x00, // c a t i o n - -
+ 0x00, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, // - - c o n t e n
+ 0x74, 0x2d, 0x6d, 0x64, 0x35, 0x00, 0x00, 0x00, // t - m d 5 - - -
+ 0x0d, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, // - c o n t e n t
+ 0x2d, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x00, 0x00, // - r a n g e - -
+ 0x00, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, // - - c o n t e n
+ 0x74, 0x2d, 0x74, 0x79, 0x70, 0x65, 0x00, 0x00, // t - t y p e - -
+ 0x00, 0x04, 0x64, 0x61, 0x74, 0x65, 0x00, 0x00, // - - d a t e - -
+ 0x00, 0x04, 0x65, 0x74, 0x61, 0x67, 0x00, 0x00, // - - e t a g - -
+ 0x00, 0x06, 0x65, 0x78, 0x70, 0x65, 0x63, 0x74, // - - e x p e c t
+ 0x00, 0x00, 0x00, 0x07, 0x65, 0x78, 0x70, 0x69, // - - - - e x p i
+ 0x72, 0x65, 0x73, 0x00, 0x00, 0x00, 0x04, 0x66, // r e s - - - - f
+ 0x72, 0x6f, 0x6d, 0x00, 0x00, 0x00, 0x04, 0x68, // r o m - - - - h
+ 0x6f, 0x73, 0x74, 0x00, 0x00, 0x00, 0x08, 0x69, // o s t - - - - i
+ 0x66, 0x2d, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x00, // f - m a t c h -
+ 0x00, 0x00, 0x11, 0x69, 0x66, 0x2d, 0x6d, 0x6f, // - - - i f - m o
+ 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x2d, 0x73, // d i f i e d - s
+ 0x69, 0x6e, 0x63, 0x65, 0x00, 0x00, 0x00, 0x0d, // i n c e - - - -
+ 0x69, 0x66, 0x2d, 0x6e, 0x6f, 0x6e, 0x65, 0x2d, // i f - n o n e -
+ 0x6d, 0x61, 0x74, 0x63, 0x68, 0x00, 0x00, 0x00, // m a t c h - - -
+ 0x08, 0x69, 0x66, 0x2d, 0x72, 0x61, 0x6e, 0x67, // - i f - r a n g
+ 0x65, 0x00, 0x00, 0x00, 0x13, 0x69, 0x66, 0x2d, // e - - - - i f -
+ 0x75, 0x6e, 0x6d, 0x6f, 0x64, 0x69, 0x66, 0x69, // u n m o d i f i
+ 0x65, 0x64, 0x2d, 0x73, 0x69, 0x6e, 0x63, 0x65, // e d - s i n c e
+ 0x00, 0x00, 0x00, 0x0d, 0x6c, 0x61, 0x73, 0x74, // - - - - l a s t
+ 0x2d, 0x6d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, // - m o d i f i e
+ 0x64, 0x00, 0x00, 0x00, 0x08, 0x6c, 0x6f, 0x63, // d - - - - l o c
+ 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, // a t i o n - - -
+ 0x0c, 0x6d, 0x61, 0x78, 0x2d, 0x66, 0x6f, 0x72, // - m a x - f o r
+ 0x77, 0x61, 0x72, 0x64, 0x73, 0x00, 0x00, 0x00, // w a r d s - - -
+ 0x06, 0x70, 0x72, 0x61, 0x67, 0x6d, 0x61, 0x00, // - p r a g m a -
+ 0x00, 0x00, 0x12, 0x70, 0x72, 0x6f, 0x78, 0x79, // - - - p r o x y
+ 0x2d, 0x61, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, // - a u t h e n t
+ 0x69, 0x63, 0x61, 0x74, 0x65, 0x00, 0x00, 0x00, // i c a t e - - -
+ 0x13, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2d, 0x61, // - p r o x y - a
+ 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x7a, 0x61, // u t h o r i z a
+ 0x74, 0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x05, // t i o n - - - -
+ 0x72, 0x61, 0x6e, 0x67, 0x65, 0x00, 0x00, 0x00, // r a n g e - - -
+ 0x07, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x72, // - r e f e r e r
+ 0x00, 0x00, 0x00, 0x0b, 0x72, 0x65, 0x74, 0x72, // - - - - r e t r
+ 0x79, 0x2d, 0x61, 0x66, 0x74, 0x65, 0x72, 0x00, // y - a f t e r -
+ 0x00, 0x00, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, // - - - s e r v e
+ 0x72, 0x00, 0x00, 0x00, 0x02, 0x74, 0x65, 0x00, // r - - - - t e -
+ 0x00, 0x00, 0x07, 0x74, 0x72, 0x61, 0x69, 0x6c, // - - - t r a i l
+ 0x65, 0x72, 0x00, 0x00, 0x00, 0x11, 0x74, 0x72, // e r - - - - t r
+ 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x2d, 0x65, // a n s f e r - e
+ 0x6e, 0x63, 0x6f, 0x64, 0x69, 0x6e, 0x67, 0x00, // n c o d i n g -
+ 0x00, 0x00, 0x07, 0x75, 0x70, 0x67, 0x72, 0x61, // - - - u p g r a
+ 0x64, 0x65, 0x00, 0x00, 0x00, 0x0a, 0x75, 0x73, // d e - - - - u s
+ 0x65, 0x72, 0x2d, 0x61, 0x67, 0x65, 0x6e, 0x74, // e r - a g e n t
+ 0x00, 0x00, 0x00, 0x04, 0x76, 0x61, 0x72, 0x79, // - - - - v a r y
+ 0x00, 0x00, 0x00, 0x03, 0x76, 0x69, 0x61, 0x00, // - - - - v i a -
+ 0x00, 0x00, 0x07, 0x77, 0x61, 0x72, 0x6e, 0x69, // - - - w a r n i
+ 0x6e, 0x67, 0x00, 0x00, 0x00, 0x10, 0x77, 0x77, // n g - - - - w w
+ 0x77, 0x2d, 0x61, 0x75, 0x74, 0x68, 0x65, 0x6e, // w - a u t h e n
+ 0x74, 0x69, 0x63, 0x61, 0x74, 0x65, 0x00, 0x00, // t i c a t e - -
+ 0x00, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, // - - m e t h o d
+ 0x00, 0x00, 0x00, 0x03, 0x67, 0x65, 0x74, 0x00, // - - - - g e t -
+ 0x00, 0x00, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, // - - - s t a t u
+ 0x73, 0x00, 0x00, 0x00, 0x06, 0x32, 0x30, 0x30, // s - - - - 2 0 0
+ 0x20, 0x4f, 0x4b, 0x00, 0x00, 0x00, 0x07, 0x76, // - O K - - - - v
+ 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x00, 0x00, // e r s i o n - -
+ 0x00, 0x08, 0x48, 0x54, 0x54, 0x50, 0x2f, 0x31, // - - H T T P - 1
+ 0x2e, 0x31, 0x00, 0x00, 0x00, 0x03, 0x75, 0x72, // - 1 - - - - u r
+ 0x6c, 0x00, 0x00, 0x00, 0x06, 0x70, 0x75, 0x62, // l - - - - p u b
+ 0x6c, 0x69, 0x63, 0x00, 0x00, 0x00, 0x0a, 0x73, // l i c - - - - s
+ 0x65, 0x74, 0x2d, 0x63, 0x6f, 0x6f, 0x6b, 0x69, // e t - c o o k i
+ 0x65, 0x00, 0x00, 0x00, 0x0a, 0x6b, 0x65, 0x65, // e - - - - k e e
+ 0x70, 0x2d, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x00, // p - a l i v e -
+ 0x00, 0x00, 0x06, 0x6f, 0x72, 0x69, 0x67, 0x69, // - - - o r i g i
+ 0x6e, 0x31, 0x30, 0x30, 0x31, 0x30, 0x31, 0x32, // n 1 0 0 1 0 1 2
+ 0x30, 0x31, 0x32, 0x30, 0x32, 0x32, 0x30, 0x35, // 0 1 2 0 2 2 0 5
+ 0x32, 0x30, 0x36, 0x33, 0x30, 0x30, 0x33, 0x30, // 2 0 6 3 0 0 3 0
+ 0x32, 0x33, 0x30, 0x33, 0x33, 0x30, 0x34, 0x33, // 2 3 0 3 3 0 4 3
+ 0x30, 0x35, 0x33, 0x30, 0x36, 0x33, 0x30, 0x37, // 0 5 3 0 6 3 0 7
+ 0x34, 0x30, 0x32, 0x34, 0x30, 0x35, 0x34, 0x30, // 4 0 2 4 0 5 4 0
+ 0x36, 0x34, 0x30, 0x37, 0x34, 0x30, 0x38, 0x34, // 6 4 0 7 4 0 8 4
+ 0x30, 0x39, 0x34, 0x31, 0x30, 0x34, 0x31, 0x31, // 0 9 4 1 0 4 1 1
+ 0x34, 0x31, 0x32, 0x34, 0x31, 0x33, 0x34, 0x31, // 4 1 2 4 1 3 4 1
+ 0x34, 0x34, 0x31, 0x35, 0x34, 0x31, 0x36, 0x34, // 4 4 1 5 4 1 6 4
+ 0x31, 0x37, 0x35, 0x30, 0x32, 0x35, 0x30, 0x34, // 1 7 5 0 2 5 0 4
+ 0x35, 0x30, 0x35, 0x32, 0x30, 0x33, 0x20, 0x4e, // 5 0 5 2 0 3 - N
+ 0x6f, 0x6e, 0x2d, 0x41, 0x75, 0x74, 0x68, 0x6f, // o n - A u t h o
+ 0x72, 0x69, 0x74, 0x61, 0x74, 0x69, 0x76, 0x65, // r i t a t i v e
+ 0x20, 0x49, 0x6e, 0x66, 0x6f, 0x72, 0x6d, 0x61, // - I n f o r m a
+ 0x74, 0x69, 0x6f, 0x6e, 0x32, 0x30, 0x34, 0x20, // t i o n 2 0 4 -
+ 0x4e, 0x6f, 0x20, 0x43, 0x6f, 0x6e, 0x74, 0x65, // N o - C o n t e
+ 0x6e, 0x74, 0x33, 0x30, 0x31, 0x20, 0x4d, 0x6f, // n t 3 0 1 - M o
+ 0x76, 0x65, 0x64, 0x20, 0x50, 0x65, 0x72, 0x6d, // v e d - P e r m
+ 0x61, 0x6e, 0x65, 0x6e, 0x74, 0x6c, 0x79, 0x34, // a n e n t l y 4
+ 0x30, 0x30, 0x20, 0x42, 0x61, 0x64, 0x20, 0x52, // 0 0 - B a d - R
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x34, 0x30, // e q u e s t 4 0
+ 0x31, 0x20, 0x55, 0x6e, 0x61, 0x75, 0x74, 0x68, // 1 - U n a u t h
+ 0x6f, 0x72, 0x69, 0x7a, 0x65, 0x64, 0x34, 0x30, // o r i z e d 4 0
+ 0x33, 0x20, 0x46, 0x6f, 0x72, 0x62, 0x69, 0x64, // 3 - F o r b i d
+ 0x64, 0x65, 0x6e, 0x34, 0x30, 0x34, 0x20, 0x4e, // d e n 4 0 4 - N
+ 0x6f, 0x74, 0x20, 0x46, 0x6f, 0x75, 0x6e, 0x64, // o t - F o u n d
+ 0x35, 0x30, 0x30, 0x20, 0x49, 0x6e, 0x74, 0x65, // 5 0 0 - I n t e
+ 0x72, 0x6e, 0x61, 0x6c, 0x20, 0x53, 0x65, 0x72, // r n a l - S e r
+ 0x76, 0x65, 0x72, 0x20, 0x45, 0x72, 0x72, 0x6f, // v e r - E r r o
+ 0x72, 0x35, 0x30, 0x31, 0x20, 0x4e, 0x6f, 0x74, // r 5 0 1 - N o t
+ 0x20, 0x49, 0x6d, 0x70, 0x6c, 0x65, 0x6d, 0x65, // - I m p l e m e
+ 0x6e, 0x74, 0x65, 0x64, 0x35, 0x30, 0x33, 0x20, // n t e d 5 0 3 -
+ 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x20, // S e r v i c e -
+ 0x55, 0x6e, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, // U n a v a i l a
+ 0x62, 0x6c, 0x65, 0x4a, 0x61, 0x6e, 0x20, 0x46, // b l e J a n - F
+ 0x65, 0x62, 0x20, 0x4d, 0x61, 0x72, 0x20, 0x41, // e b - M a r - A
+ 0x70, 0x72, 0x20, 0x4d, 0x61, 0x79, 0x20, 0x4a, // p r - M a y - J
+ 0x75, 0x6e, 0x20, 0x4a, 0x75, 0x6c, 0x20, 0x41, // u n - J u l - A
+ 0x75, 0x67, 0x20, 0x53, 0x65, 0x70, 0x74, 0x20, // u g - S e p t -
+ 0x4f, 0x63, 0x74, 0x20, 0x4e, 0x6f, 0x76, 0x20, // O c t - N o v -
+ 0x44, 0x65, 0x63, 0x20, 0x30, 0x30, 0x3a, 0x30, // D e c - 0 0 - 0
+ 0x30, 0x3a, 0x30, 0x30, 0x20, 0x4d, 0x6f, 0x6e, // 0 - 0 0 - M o n
+ 0x2c, 0x20, 0x54, 0x75, 0x65, 0x2c, 0x20, 0x57, // - - T u e - - W
+ 0x65, 0x64, 0x2c, 0x20, 0x54, 0x68, 0x75, 0x2c, // e d - - T h u -
+ 0x20, 0x46, 0x72, 0x69, 0x2c, 0x20, 0x53, 0x61, // - F r i - - S a
+ 0x74, 0x2c, 0x20, 0x53, 0x75, 0x6e, 0x2c, 0x20, // t - - S u n - -
+ 0x47, 0x4d, 0x54, 0x63, 0x68, 0x75, 0x6e, 0x6b, // G M T c h u n k
+ 0x65, 0x64, 0x2c, 0x74, 0x65, 0x78, 0x74, 0x2f, // e d - t e x t -
+ 0x68, 0x74, 0x6d, 0x6c, 0x2c, 0x69, 0x6d, 0x61, // h t m l - i m a
+ 0x67, 0x65, 0x2f, 0x70, 0x6e, 0x67, 0x2c, 0x69, // g e - p n g - i
+ 0x6d, 0x61, 0x67, 0x65, 0x2f, 0x6a, 0x70, 0x67, // m a g e - j p g
+ 0x2c, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x2f, 0x67, // - i m a g e - g
+ 0x69, 0x66, 0x2c, 0x61, 0x70, 0x70, 0x6c, 0x69, // i f - a p p l i
+ 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x78, // c a t i o n - x
+ 0x6d, 0x6c, 0x2c, 0x61, 0x70, 0x70, 0x6c, 0x69, // m l - a p p l i
+ 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x78, // c a t i o n - x
+ 0x68, 0x74, 0x6d, 0x6c, 0x2b, 0x78, 0x6d, 0x6c, // h t m l - x m l
+ 0x2c, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x70, 0x6c, // - t e x t - p l
+ 0x61, 0x69, 0x6e, 0x2c, 0x74, 0x65, 0x78, 0x74, // a i n - t e x t
+ 0x2f, 0x6a, 0x61, 0x76, 0x61, 0x73, 0x63, 0x72, // - j a v a s c r
+ 0x69, 0x70, 0x74, 0x2c, 0x70, 0x75, 0x62, 0x6c, // i p t - p u b l
+ 0x69, 0x63, 0x70, 0x72, 0x69, 0x76, 0x61, 0x74, // i c p r i v a t
+ 0x65, 0x6d, 0x61, 0x78, 0x2d, 0x61, 0x67, 0x65, // e m a x - a g e
+ 0x3d, 0x67, 0x7a, 0x69, 0x70, 0x2c, 0x64, 0x65, // - g z i p - d e
+ 0x66, 0x6c, 0x61, 0x74, 0x65, 0x2c, 0x73, 0x64, // f l a t e - s d
+ 0x63, 0x68, 0x63, 0x68, 0x61, 0x72, 0x73, 0x65, // c h c h a r s e
+ 0x74, 0x3d, 0x75, 0x74, 0x66, 0x2d, 0x38, 0x63, // t - u t f - 8 c
+ 0x68, 0x61, 0x72, 0x73, 0x65, 0x74, 0x3d, 0x69, // h a r s e t - i
+ 0x73, 0x6f, 0x2d, 0x38, 0x38, 0x35, 0x39, 0x2d, // s o - 8 8 5 9 -
+ 0x31, 0x2c, 0x75, 0x74, 0x66, 0x2d, 0x2c, 0x2a, // 1 - u t f - - -
+ 0x2c, 0x65, 0x6e, 0x71, 0x3d, 0x30, 0x2e, // - e n q - 0 -
+}
diff --git a/k8s-operator/session-recording/tsrecorder/header.go b/k8s-operator/session-recording/tsrecorder/header.go
new file mode 100644
index 000000000..45c50ca1e
--- /dev/null
+++ b/k8s-operator/session-recording/tsrecorder/header.go
@@ -0,0 +1,54 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+package tsrecorder
+
+import "tailscale.com/tailcfg"
+
+// CastHeader is the asciicast header to be sent to the recorder at the start of
+// the recording of a session.
+// https://docs.asciinema.org/manual/asciicast/v2/#header
+type CastHeader struct {
+ // Version is the asciinema file format version.
+ Version int `json:"version"`
+
+ // Width is the terminal width in characters.
+ Width int `json:"width"`
+
+ // Height is the terminal height in characters.
+ Height int `json:"height"`
+
+ // Timestamp is the unix timestamp of when the recording started.
+ Timestamp int64 `json:"timestamp"`
+
+ // Tailscale-specific fields: SrcNode is the full MagicDNS name of the
+ // tailnet node originating the connection, without the trailing dot.
+ SrcNode string `json:"srcNode"`
+
+ // SrcNodeID is the node ID of the tailnet node originating the connection.
+ SrcNodeID tailcfg.StableNodeID `json:"srcNodeID"`
+
+ // SrcNodeTags is the list of tags on the node originating the connection (if any).
+ SrcNodeTags []string `json:"srcNodeTags,omitempty"`
+
+ // SrcNodeUserID is the user ID of the node originating the connection (if not tagged).
+ SrcNodeUserID tailcfg.UserID `json:"srcNodeUserID,omitempty"` // if not tagged
+
+ // SrcNodeUser is the LoginName of the node originating the connection (if not tagged).
+ SrcNodeUser string `json:"srcNodeUser,omitempty"`
+
+ Command string
+
+ // Kubernetes-specific fields:
+ Kubernetes *Kubernetes `json:"kubernetes,omitempty"`
+}
+
+// Kubernetes contains 'kubectl exec' session specific information for
+// tsrecorder.
+type Kubernetes struct {
+ PodName string
+ Namespace string
+ Container string
+}
diff --git a/k8s-operator/session-recording/tsrecorder/tsrecorder.go b/k8s-operator/session-recording/tsrecorder/tsrecorder.go
new file mode 100644
index 000000000..4ce78a882
--- /dev/null
+++ b/k8s-operator/session-recording/tsrecorder/tsrecorder.go
@@ -0,0 +1,104 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+// Package tsrecorder contains functionality to send recorded kubectl-exec
+// sessions to tsrecorder.
+package tsrecorder
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/pkg/errors"
+ "tailscale.com/tstime"
+)
+
+func New(conn io.WriteCloser, clock tstime.Clock, start time.Time, failOpen bool) *Client {
+ return &Client{
+ start: start,
+ clock: clock,
+ conn: conn,
+ failOpen: failOpen,
+ }
+}
+
+// Client knows how to send the provided bytes to the configured tsrecorder
+// instance in asciinema format.
+type Client struct {
+ start time.Time
+ clock tstime.Clock
+
+ // failOpen specifies whether the session should be allowed to
+ // continue if writing to the recording fails.
+ failOpen bool
+
+ // backOff is set to true if we've failed open and should stop
+ // attempting to write to tsrecorder.
+ backOff bool
+
+ mu sync.Mutex // guards writes to conn
+ conn io.WriteCloser // connection to a tsrecorder instance
+}
+
+// Write appends timestamp to the provided bytes and sends them to the
+// configured tsrecorder.
+func (c *Client) Write(p []byte) (err error) {
+ if len(p) == 0 {
+ return nil
+ }
+ if c.backOff {
+ return nil
+ }
+ j, err := json.Marshal([]any{
+ c.clock.Now().Sub(c.start).Seconds(),
+ "o",
+ string(p),
+ })
+ if err != nil {
+ return fmt.Errorf("error marhalling payload: %w", err)
+ }
+ j = append(j, '\n')
+ if err := c.WriteCastLine(j); err != nil {
+ if !c.failOpen {
+ return fmt.Errorf("error writing payload to recorder: %w", err)
+ }
+ c.backOff = true
+ }
+ return nil
+}
+
+func (c *Client) Close() error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.conn == nil {
+ return nil
+ }
+ err := c.conn.Close()
+ c.conn = nil
+ return err
+}
+
+// writeCastLine sends bytes to the tsrecorder. The bytes should be in
+// asciinema format.
+func (c *Client) WriteCastLine(j []byte) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.conn == nil {
+ return errors.New("recorder closed")
+ }
+ _, err := c.conn.Write(j)
+ if err != nil {
+ return fmt.Errorf("recorder write error: %w", err)
+ }
+ return nil
+}
+
+type ResizeMsg struct {
+ Width int `json:"width"`
+ Height int `json:"height"`
+}
diff --git a/k8s-operator/session-recording/ws/conn.go b/k8s-operator/session-recording/ws/conn.go
new file mode 100644
index 000000000..88bbc2a7f
--- /dev/null
+++ b/k8s-operator/session-recording/ws/conn.go
@@ -0,0 +1,244 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+// package ws has functionality to parse 'kubectl exec' sessions streamed using
+// WebSockets protocol.
+package ws
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "sync"
+
+ "go.uber.org/zap"
+ "k8s.io/apimachinery/pkg/util/remotecommand"
+ "tailscale.com/k8s-operator/session-recording/tsrecorder"
+ "tailscale.com/util/multierr"
+)
+
+// New returns a wrapper around net.Conn that intercepts reads and writes for a
+// websocket streaming session over the provided net.Conn, parses the data as
+// websocket messages and sends message payloads for STDIN/STDOUT streams to a
+// tsrecorder instance using the provided client. Caller must ensure that the
+// session is streamed using WebSockets protocol.
+func New(c net.Conn, rec *tsrecorder.Client, ch tsrecorder.CastHeader, log *zap.SugaredLogger) net.Conn {
+ return &conn{
+ Conn: c,
+ rec: rec,
+ ch: ch,
+ log: log,
+ }
+}
+
+// conn is a wrapper around net.Conn. It reads the bytestream
+// for a 'kubectl exec' session, sends session recording data to the configured
+// recorder and forwards the raw bytes to the original destination.
+// A new conn is created per session.
+// conn only knows to how to read a 'kubectl exec' session that is streamed using WebSocket protocol.
+// https://www.rfc-editor.org/rfc/rfc6455
+type conn struct {
+ net.Conn
+ // rec knows how to send data to a tsrecorder instance.
+ rec *tsrecorder.Client
+ // ch is the asiinema CastHeader for a session.
+ ch tsrecorder.CastHeader
+ log *zap.SugaredLogger
+
+ rmu sync.Mutex // sequences reads
+ // currentReadMsg contains parsed contents of a websocket binary data message that
+ // is currently being read from the underlying net.Conn.
+ currentReadMsg *message
+ // readBuf contains bytes for a currently parsed binary data message
+ // read from the underlying conn. If the message is masked, it is
+ // unmasked in place, so having this buffer allows us to avoid modifying
+ // the original byte array.
+ readBuf bytes.Buffer
+
+ wmu sync.Mutex // sequences writes
+ writeCastHeaderOnce sync.Once
+ closed bool
+ // writeBuf contains bytes for a currently parsed binary data message
+ // being written to the underlying conn. If the message is masked, it is
+ // unmasked in place, so having this buffer allows us to avoid modifying
+ // the original byte array.
+ writeBuf bytes.Buffer
+ // currentWriteMsg contains parsed contents of a websocket binary data message that
+ // is currently being written to the underlying net.Conn.
+ currentWriteMsg *message
+}
+
+// Read reads bytes from the original connection and parses them as websocket
+// message fragments. If the message is for the resize stream, sets the width
+// and height of the CastHeader for this connection.
+// The fragment can be incomplete.
+func (c *conn) Read(b []byte) (int, error) {
+ c.rmu.Lock()
+ defer c.rmu.Unlock()
+ n, err := c.Conn.Read(b)
+ if err != nil {
+ // It seems that we sometimes get a wrapped io.EOF, but the
+ // caller checks for io.EOF with ==.
+ if errors.Is(err, io.EOF) {
+ err = io.EOF
+ }
+ return 0, err
+ }
+
+ typ := messageType(opcode(b))
+ if typ == noOpcode && c.currentReadMsg != nil && !c.currentReadMsg.isFinalized { // subsequent fragment
+ typ = c.currentReadMsg.typ
+ }
+
+ // A control message can not be fragmented and we are not interested in
+ // these messages. Just return.
+ if isControlMessage(typ) {
+ return n, nil
+ }
+
+ // The only data message type that Kubernetes supports is binary message.
+ // If we received another message type, return and let the API server close the connection.
+ // https://github.com/kubernetes/client-go/blob/release-1.30/tools/remotecommand/websocket.go#L281
+ if typ != binaryMessage {
+ c.log.Info("[unexpected] received a data message with a type that is not binary message type %d", typ)
+ return n, nil
+ }
+ if _, err := c.readBuf.Write(b[:n]); err != nil {
+ return 0, fmt.Errorf("[unexpected] error writing message contents to read buffer: %w", err)
+ }
+
+ readMsg := &message{typ: typ} // start a new message...
+ // ... or pick up an already started one if the previous fragment was not final.
+ if c.currentReadMsg != nil && !c.currentReadMsg.isFinalized {
+ readMsg = c.currentReadMsg
+ }
+
+ ok, err := readMsg.Parse(c.readBuf.Bytes(), c.log)
+ if err != nil {
+ return 0, fmt.Errorf("error parsing message: %v", err)
+ }
+ if !ok { // incomplete fragment
+ return n, nil
+ }
+ c.readBuf.Next(len(readMsg.raw))
+
+ if readMsg.isFinalized {
+ // Stream IDs for websocket streams are static.
+ // https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L218
+ if readMsg.streamID.Load() == remotecommand.StreamResize {
+ var err error
+ var msg tsrecorder.ResizeMsg
+ if err = json.Unmarshal(readMsg.payload, &msg); err != nil {
+ return 0, fmt.Errorf("error umarshalling resize message: %w", err)
+ }
+ c.ch.Width = msg.Width
+ c.ch.Height = msg.Height
+ }
+ }
+ c.currentReadMsg = readMsg
+ return n, err
+}
+
+// Write parses the written bytes as WebSocket message fragment. If the message
+// is for stdout or stderr streams, it is written to the configured tsrecorder.
+// A message fragment can be incomplete.
+func (c *conn) Write(b []byte) (int, error) {
+ c.wmu.Lock()
+ defer c.wmu.Unlock()
+
+ typ := messageType(opcode(b))
+ // If we are in process of parsing a message fragment, the received
+ // bytes are not structured as a message fragment and can not be used to
+ // determine a message fragment.
+ if len(c.writeBuf.Bytes()) > 0 { // buffer contains previous incomplete fragment
+ typ = c.currentWriteMsg.typ
+ }
+
+ if isControlMessage(typ) {
+ n, err := c.Conn.Write(b)
+ return n, err
+ }
+
+ if _, err := c.writeBuf.Write(b); err != nil {
+ c.log.Errorf("write: error writing to write buf: %v", err)
+ return 0, fmt.Errorf("[unexpected] error writing to internal write buffer: %w", err)
+ }
+
+ writeMsg := &message{typ: typ} // start a new message...
+ // ... or continue the existing one if it has not been finalized.
+ if c.currentWriteMsg != nil && !c.currentWriteMsg.isFinalized {
+ writeMsg = c.currentWriteMsg
+ }
+
+ ok, err := writeMsg.Parse(c.writeBuf.Bytes(), c.log)
+ if err != nil {
+ c.log.Errorf("write: parsing a message errored: %v", err)
+ return 0, fmt.Errorf("write: error parsing message: %v", err)
+ }
+ c.currentWriteMsg = writeMsg
+ if !ok { // incomplete fragment
+ return len(b), nil
+ }
+ c.writeBuf.Next(len(writeMsg.raw)) // advance frame
+
+ if len(writeMsg.payload) != 0 && writeMsg.isFinalized {
+ if writeMsg.streamID.Load() == remotecommand.StreamStdOut || writeMsg.streamID.Load() == remotecommand.StreamStdErr {
+ var err error
+ c.writeCastHeaderOnce.Do(func() {
+ var j []byte
+ j, err = json.Marshal(c.ch)
+ if err != nil {
+ c.log.Infof("error marhsalling conn: %v", err)
+ return
+ }
+ j = append(j, '\n')
+ err = c.rec.WriteCastLine(j)
+ if err != nil {
+ c.log.Errorf("received error from recorder: %v", err)
+ }
+ })
+ if err != nil {
+ return 0, fmt.Errorf("error writing CastHeader: %w", err)
+ }
+ if err := c.rec.Write(writeMsg.payload); err != nil {
+ return 0, fmt.Errorf("error writing message to recorder: %v", err)
+ }
+ }
+ }
+ _, err = c.Conn.Write(c.currentWriteMsg.raw)
+ if err != nil {
+ c.log.Errorf("write: error writing to conn: %v", err)
+ }
+ return len(b), err
+}
+
+func (c *conn) Close() error {
+ c.wmu.Lock()
+ defer c.wmu.Unlock()
+ if c.closed {
+ return nil
+ }
+ // TODO: only do this if this is a normal closure rather than the
+ // reocrding has failed.
+ if c.writeBuf.Len() > 0 {
+ c.Conn.Write(c.writeBuf.Bytes())
+ }
+ c.closed = true
+ connCloseErr := c.Conn.Close()
+ recCloseErr := c.rec.Close()
+ return multierr.New(connCloseErr, recCloseErr)
+}
+
+// opcode reads the websocket message opcode that denotes the message type.
+// opcode is contained in bits [4-8] of the message.
+// https://www.rfc-editor.org/rfc/rfc6455#section-5.2
+func opcode(b []byte) int {
+ // 0xf = 00001111; b & 00001111 zeroes out bits [0 - 3] of b
+ var mask byte = 0xf
+ return int(b[0] & mask)
+}
diff --git a/k8s-operator/session-recording/ws/conn_test.go b/k8s-operator/session-recording/ws/conn_test.go
new file mode 100644
index 000000000..a64b89c56
--- /dev/null
+++ b/k8s-operator/session-recording/ws/conn_test.go
@@ -0,0 +1,171 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+package ws
+
+import (
+ "reflect"
+ "testing"
+
+ "go.uber.org/zap"
+ "k8s.io/apimachinery/pkg/util/remotecommand"
+ "tailscale.com/k8s-operator/session-recording/fakes"
+ "tailscale.com/k8s-operator/session-recording/tsrecorder"
+ "tailscale.com/tstest"
+)
+
+func Test_conn_Read(t *testing.T) {
+ zl, err := zap.NewDevelopment()
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Resize stream ID + {"width": 10, "height": 20}
+ testResizeMsg := []byte{byte(remotecommand.StreamResize), 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x22, 0x3a, 0x32, 0x30, 0x7d}
+ lenResizeMsgPayload := byte(len(testResizeMsg))
+
+ tests := []struct {
+ name string
+ inputs [][]byte
+ wantWidth int
+ wantHeight int
+ }{
+ {
+ name: "single_read_control_message",
+ inputs: [][]byte{{0x88, 0x0}},
+ },
+ {
+ name: "single_read_resize_message",
+ inputs: [][]byte{append([]byte{0x82, lenResizeMsgPayload}, testResizeMsg...)},
+ wantWidth: 10,
+ wantHeight: 20,
+ },
+ {
+ name: "two_reads_resize_message",
+ inputs: [][]byte{{0x2, 0x9, 0x4, 0x7b, 0x22, 0x77, 0x69, 0x64, 0x74, 0x68, 0x22}, {0x80, 0x11, 0x4, 0x3a, 0x31, 0x30, 0x2c, 0x22, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x22, 0x3a, 0x32, 0x30, 0x7d}},
+ wantWidth: 10,
+ wantHeight: 20,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tc := &fakes.TestConn{}
+ tc.ResetReadBuf()
+ c := &conn{
+ Conn: tc,
+ log: zl.Sugar(),
+ }
+ for i, input := range tt.inputs {
+ if err := tc.WriteReadBufBytes(input); err != nil {
+ t.Fatalf("writing bytes to test conn: %v", err)
+ }
+ _, err := c.Read(make([]byte, len(input)))
+ if err != nil {
+ t.Errorf("[%d] conn.Read() errored %v", i, err)
+ return
+ }
+ }
+ if tt.wantHeight != 0 || tt.wantWidth != 0 {
+ if tt.wantWidth != c.ch.Width {
+ t.Errorf("wants width: %v, got %v", tt.wantWidth, c.ch.Width)
+ }
+ if tt.wantHeight != c.ch.Height {
+ t.Errorf("want height: %v, got %v", tt.wantHeight, c.ch.Height)
+ }
+ }
+ })
+ }
+}
+
+func Test_conn_Write(t *testing.T) {
+ zl, err := zap.NewDevelopment()
+ if err != nil {
+ t.Fatal(err)
+ }
+ cl := tstest.NewClock(tstest.ClockOpts{})
+ tests := []struct {
+ name string
+ inputs [][]byte
+ wantForwarded []byte
+ wantRecorded []byte
+ firstWrite bool
+ width int
+ height int
+ }{
+ {
+ name: "single_write_control_frame",
+ inputs: [][]byte{{0x88, 0x0}},
+ wantForwarded: []byte{0x88, 0x0},
+ },
+ {
+ name: "single_write_stdout_data_message",
+ inputs: [][]byte{{0x82, 0x3, 0x1, 0x7, 0x8}},
+ wantForwarded: []byte{0x82, 0x3, 0x1, 0x7, 0x8},
+ wantRecorded: fakes.CastLine(t, []byte{0x7, 0x8}, cl),
+ },
+ {
+ name: "single_write_stderr_data_message",
+ inputs: [][]byte{{0x82, 0x3, 0x2, 0x7, 0x8}},
+ wantForwarded: []byte{0x82, 0x3, 0x2, 0x7, 0x8},
+ wantRecorded: fakes.CastLine(t, []byte{0x7, 0x8}, cl),
+ },
+ {
+ name: "single_write_stdin_data_message",
+ inputs: [][]byte{{0x82, 0x3, 0x0, 0x7, 0x8}},
+ wantForwarded: []byte{0x82, 0x3, 0x0, 0x7, 0x8},
+ },
+ {
+ name: "single_write_stdout_data_message_with_cast_header",
+ inputs: [][]byte{{0x82, 0x3, 0x1, 0x7, 0x8}},
+ wantForwarded: []byte{0x82, 0x3, 0x1, 0x7, 0x8},
+ wantRecorded: append(fakes.AsciinemaResizeMsg(t, 10, 20), fakes.CastLine(t, []byte{0x7, 0x8}, cl)...),
+ width: 10,
+ height: 20,
+ firstWrite: true,
+ },
+ {
+ name: "two_writes_stdout_data_message",
+ inputs: [][]byte{{0x2, 0x3, 0x1, 0x7, 0x8}, {0x80, 0x6, 0x1, 0x1, 0x2, 0x3, 0x4, 0x5}},
+ wantForwarded: []byte{0x2, 0x3, 0x1, 0x7, 0x8, 0x80, 0x6, 0x1, 0x1, 0x2, 0x3, 0x4, 0x5},
+ wantRecorded: fakes.CastLine(t, []byte{0x7, 0x8, 0x1, 0x2, 0x3, 0x4, 0x5}, cl),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tc := &fakes.TestConn{}
+ sr := &fakes.TestSessionRecorder{}
+ rec := tsrecorder.New(sr, cl, cl.Now(), true)
+ c := &conn{
+ Conn: tc,
+ log: zl.Sugar(),
+ ch: tsrecorder.CastHeader{
+ Width: tt.width,
+ Height: tt.height,
+ },
+ rec: rec,
+ }
+ if !tt.firstWrite {
+ // This test case does not intend to test that cast header gets written once.
+ c.writeCastHeaderOnce.Do(func() {})
+ }
+ for i, input := range tt.inputs {
+ _, err := c.Write(input)
+ if err != nil {
+ t.Fatalf("[%d] conn.Write() errored: %v", i, err)
+ }
+ }
+ // Assert that the expected bytes have been forwarded to the original destination.
+ gotForwarded := tc.WriteBufBytes()
+ if !reflect.DeepEqual(gotForwarded, tt.wantForwarded) {
+ t.Errorf("expected bytes not forwarded, wants\n%v\ngot\n%v", tt.wantForwarded, gotForwarded)
+ }
+
+ // Assert that the expected bytes have been forwarded to the session recorder.
+ gotRecorded := sr.Bytes()
+ if !reflect.DeepEqual(gotRecorded, tt.wantRecorded) {
+ t.Errorf("expected bytes not recorded, wants\n%v\ngot\n%v", tt.wantRecorded, gotRecorded)
+ }
+ })
+ }
+}
diff --git a/k8s-operator/session-recording/ws/message.go b/k8s-operator/session-recording/ws/message.go
new file mode 100644
index 000000000..bf33e6bb2
--- /dev/null
+++ b/k8s-operator/session-recording/ws/message.go
@@ -0,0 +1,253 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+package ws
+
+import (
+ "encoding/binary"
+ "fmt"
+ "sync/atomic"
+
+ "github.com/pkg/errors"
+ "go.uber.org/zap"
+)
+
+const (
+ noOpcode messageType = 0 // continuation frame for fragmented messages
+ binaryMessage messageType = 2
+)
+
+// messageType is the type of a websocket data or control message as defined by opcode.
+// https://www.rfc-editor.org/rfc/rfc6455#section-5.2
+// Known types of control messages are close, ping and pong.
+// https://www.rfc-editor.org/rfc/rfc6455#section-5.5
+// The only data message type supported by Kubernetes is binary message
+// https://github.com/kubernetes/client-go/blob/v0.30.0-rc.1/tools/remotecommand/websocket.go#L281
+type messageType int
+
+// message is a parsed Websocket Message.
+type message struct {
+ // payload is the contents of the so far parsed Websocket
+ // data Message payload, potentially from multiple fragments written by
+ // multiple invocations of Parse. As per RFC 6455 We can assume that the
+ // fragments will always arrive in order and data messages will not be
+ // interleaved.
+ payload []byte
+
+ // isFinalized is set to true if msgPayload contains full contents of
+ // the message (the final fragment has been received).
+ isFinalized bool
+
+ // streamID is the stream to which the message belongs, i.e stdin, stout
+ // etc. It is one of the stream IDs defined in
+ // https://github.com/kubernetes/apimachinery/commit/73d12d09c5be8703587b5127416eb83dc3b7e182#diff-291f96e8632d04d2d20f5fb00f6b323492670570d65434e8eac90c7a442d13bdR23-R36
+ streamID atomic.Uint32
+
+ // typ is the type of a WebsocketMessage as defined by its opcode
+ // https://www.rfc-editor.org/rfc/rfc6455#section-5.2
+ typ messageType
+ raw []byte
+}
+
+// Parse accepts a websocket message fragment as a byte slice and parses its contents.
+// The fragment can be:
+// - a fragment that consists of a whole message
+// - an initial fragment for a message for which we expect more fragments
+// - a subsequent fragment for a message that we are currently parsing and whose so-far parsed contents are stored in msg.
+// It is not expected that the byte slice would contain an incomplete fragment or fragment for a different message than the one currently being parsed (if any).
+// Message fragment structure:
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-------+-+-------------+-------------------------------+
+// |F|R|R|R| opcode|M| Payload len | Extended payload length |
+// |I|S|S|S| (4) |A| (7) | (16/64) |
+// |N|V|V|V| |S| | (if payload len==126/127) |
+// | |1|2|3| |K| | |
+// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
+// | Extended payload length continued, if payload len == 127 |
+// + - - - - - - - - - - - - - - - +-------------------------------+
+// | |Masking-key, if MASK set to 1 |
+// +-------------------------------+-------------------------------+
+// | Masking-key (continued) | Payload Data |
+// +-------------------------------- - - - - - - - - - - - - - - - +
+// : Payload Data continued ... :
+// + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
+// | Payload Data continued ... |
+// +---------------------------------------------------------------+
+// https://www.rfc-editor.org/rfc/rfc6455#section-5.2
+//
+// Fragmentation rules:
+// An unfragmented message consists of a single frame with the FIN
+// bit set (Section 5.2) and an opcode other than 0.
+// A fragmented message consists of a single frame with the FIN bit
+// clear and an opcode other than 0, followed by zero or more frames
+// with the FIN bit clear and the opcode set to 0, and terminated by
+// a single frame with the FIN bit set and an opcode of 0.
+// https://www.rfc-editor.org/rfc/rfc6455#section-5.4
+func (msg *message) Parse(b []byte, log *zap.SugaredLogger) (bool, error) {
+ if msg.typ != binaryMessage {
+ return false, fmt.Errorf("[unexpected] internal error: attempted to parse a message with type %d", msg.typ)
+ }
+
+ msg.isFinalized = isFinalFragment(b)
+
+ maskSet := isMasked(b)
+
+ payloadLength, payloadOffset, maskOffset, err := fragmentDimensions(b, maskSet)
+ if err != nil {
+ return false, fmt.Errorf("error determining payload length: %w", err)
+ }
+ log.Debugf("parse: parsing a message with payload length: %d payload offset: %d maskOffset: %d mask set: %t, is finalized: %t", payloadLength, payloadOffset, maskOffset, maskSet, msg.isFinalized)
+
+ if len(b) < int(payloadOffset)+int(payloadLength) { // incomplete fragment
+ return false, nil
+ }
+ msg.raw = make([]byte, int(payloadOffset)+int(payloadLength))
+ copy(msg.raw, b[:payloadOffset+payloadLength])
+
+ // Extract the payload.
+ msgPayload := b[payloadOffset : payloadOffset+payloadLength]
+
+ // Unmask the payload if needed.
+ if maskSet {
+ m := b[maskOffset:payloadOffset]
+ var mask [4]byte
+ copy(mask[:], m)
+ maskBytes(mask, msgPayload)
+ }
+
+ // Determine what stream the message is for. Stream ID of a Kubernetes
+ // streaming session is a 32bit integer, stored in the first byte of the
+ // message payload.
+ // https://github.com/kubernetes/apimachinery/commit/73d12d09c5be8703587b5127416eb83dc3b7e182#diff-291f96e8632d04d2d20f5fb00f6b323492670570d65434e8eac90c7a442d13bdR23-R36
+ if len(msgPayload) == 0 {
+ return false, errors.New("[unexpected] received a message fragment with no stream ID")
+ }
+
+ streamId := uint32(msgPayload[0])
+ if msg.streamID.Load() != 0 && msg.streamID.Load() != streamId {
+ return false, fmt.Errorf("[unexpected] received message fragments with mismatched streamIDs %d and %d", msg.streamID.Load(), streamId)
+ }
+ msg.streamID.Store(streamId)
+
+ // This is normal, Kubernetes seem to send a couple data messages with
+ // no payloads at the start.
+ if len(msgPayload) < 2 {
+ return true, nil
+ }
+ msgPayload = msgPayload[1:] // remove the stream ID byte
+ msg.payload = append(msg.payload, msgPayload...)
+ return true, nil
+}
+
+// maskBytes applies mask to bytes in place.
+// https://www.rfc-editor.org/rfc/rfc6455#section-5.3
+func maskBytes(key [4]byte, b []byte) {
+ for i := range b {
+ b[i] = b[i] ^ key[i%4]
+ }
+}
+
+// isControlMessage returns true if the message type is one of the know control
+// frame message types.
+// https://www.rfc-editor.org/rfc/rfc6455#section-5.5
+func isControlMessage(t messageType) bool {
+ const (
+ closeMessage messageType = 8
+ pingMessage messageType = 9
+ pongMessage messageType = 10
+ )
+ return t == closeMessage || t == pingMessage || t == pongMessage
+}
+
+// isFinalFragment can be called with websocket message fragment and returns true if
+// the fragment is the final fragment of a websocket message.
+func isFinalFragment(b []byte) bool {
+ // Extract FIN bit. FIN bit is the first bit of a message fragment.
+ const finBitMask byte = 1 << 7
+ finBit := b[0] & finBitMask
+ return finBit != 0
+}
+
+// isMasked can be called with a websocket message fragment and returns true if
+// the payload of the message is masked. It uses the mask bit to determine if
+// the payload is masked.
+// https://www.rfc-editor.org/rfc/rfc6455#section-5.3
+func isMasked(b []byte) bool {
+ return extractFirstBit(b[1]) != 0
+}
+
+// extractFirstBit extracts first bit of a byte by zeroing out all the other
+// bits.
+func extractFirstBit(b byte) byte {
+ const mask byte = 1 << 7
+ return b & mask
+}
+
+// zeroFirstBit returns the provided byte with the first bit set to 0.
+func zeroFirstBit(b byte) byte {
+ const revMask byte = 1 << 7
+ return b & (^revMask)
+}
+
+// fragmentDimensions returns payload length as well as payload offset and mask offset.
+func fragmentDimensions(b []byte, maskSet bool) (payloadLength, payloadOffset, maskOffset int64, _ error) {
+
+ // payload length can be stored either in bits [9-15] or in bytes 2, 3
+ // or in bytes 2, 3, 4, 5, 6, 7.
+ // https://www.rfc-editor.org/rfc/rfc6455#section-5.2
+ // 0 1 2 3
+ // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ // +-+-+-+-+-------+-+-------------+-------------------------------+
+ // |F|R|R|R| opcode|M| Payload len | Extended payload length |
+ // |I|S|S|S| (4) |A| (7) | (16/64) |
+ // |N|V|V|V| |S| | (if payload len==126/127) |
+ // | |1|2|3| |K| | |
+ // +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
+ // | Extended payload length continued, if payload len == 127 |
+ // + - - - - - - - - - - - - - - - +-------------------------------+
+ payloadLengthIndicator := zeroFirstBit(b[1])
+ var lengthOffset int64
+ switch {
+ case payloadLengthIndicator < 126:
+ lengthOffset = 1
+ maskOffset = 2
+ payloadLength = int64(payloadLengthIndicator)
+ case payloadLengthIndicator == 126:
+ maskOffset = 4
+ lengthOffset = 2
+ payloadLength = extractInt64(b, lengthOffset, 2)
+ case payloadLengthIndicator == 127:
+ maskOffset = 10
+ lengthOffset = 2
+ payloadLength = extractInt64(b, lengthOffset, 6)
+ default:
+ return -1, -1, -1, fmt.Errorf("unexpected payload length indicator value: %v", payloadLengthIndicator)
+ }
+
+ // Masking key can take up 0 or 4 bytes- we need to take that into
+ // account when determining payload offset.
+ // 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ // ....
+ // + - - - - - - - - - - - - - - - +-------------------------------+
+ // | |Masking-key, if MASK set to 1 |
+ // +-------------------------------+-------------------------------+
+ // | Masking-key (continued) | Payload Data |
+ // + - - - - - - - - - - - - - - - +-------------------------------+
+ // ...
+ if maskSet {
+ payloadOffset = maskOffset + 4
+ } else {
+ payloadOffset = maskOffset
+ }
+ return
+}
+
+func extractInt64(b []byte, offset, length int64) int64 {
+ payloadLengthBytes := b[offset : offset+length]
+ payloadLengthBytesPadded := append(make([]byte, 8-len(payloadLengthBytes)), payloadLengthBytes...)
+
+ return int64(binary.BigEndian.Uint64(payloadLengthBytesPadded))
+}
diff --git a/k8s-operator/session-recording/ws/message_test.go b/k8s-operator/session-recording/ws/message_test.go
new file mode 100644
index 000000000..63a80ade9
--- /dev/null
+++ b/k8s-operator/session-recording/ws/message_test.go
@@ -0,0 +1,125 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+package ws
+
+import (
+ "reflect"
+ "testing"
+
+ "go.uber.org/zap"
+)
+
+func Test_msg_Parse(t *testing.T) {
+ zl, err := zap.NewDevelopment()
+ if err != nil {
+ t.Fatalf("error creating a test logger: %v", err)
+ }
+ testMask := [4]byte{1, 2, 3, 4}
+ tests := []struct {
+ name string
+ b []byte
+ initialPayload []byte
+ wantPayload []byte
+ wantIsFinalized bool
+ wantStreamID uint32
+ }{
+ {
+ name: "single_fragment_stdout_stream_no_payload_no_mask",
+ b: []byte{0x82, 0x1, 0x1},
+ wantPayload: nil,
+ wantIsFinalized: true,
+ wantStreamID: 1,
+ },
+ {
+ name: "single_fragment_stderr_steam_no_payload_has_mask",
+ b: append([]byte{0x82, 0x81, 0x1, 0x2, 0x3, 0x4}, maskedBytes(testMask, []byte{0x2})...),
+ wantPayload: nil,
+ wantIsFinalized: true,
+ wantStreamID: 2,
+ },
+ {
+ name: "single_fragment_stdout_stream_no_mask_has_payload",
+ b: []byte{0x82, 0x3, 0x1, 0x7, 0x8},
+ wantPayload: []byte{0x7, 0x8},
+ wantIsFinalized: true,
+ wantStreamID: 1,
+ },
+ {
+ name: "single_fragment_stdout_stream_has_mask_has_payload",
+ b: append([]byte{0x82, 0x83, 0x1, 0x2, 0x3, 0x4}, maskedBytes(testMask, []byte{0x1, 0x7, 0x8})...),
+ wantPayload: []byte{0x7, 0x8},
+ wantIsFinalized: true,
+ wantStreamID: 1,
+ },
+ {
+ name: "initial_fragment_stdout_stream_no_mask_has_payload",
+ b: []byte{0x2, 0x3, 0x1, 0x7, 0x8},
+ wantPayload: []byte{0x7, 0x8},
+ wantStreamID: 1,
+ },
+ {
+ name: "initial_fragment_stdout_stream_has_mask_has_payload",
+ b: append([]byte{0x2, 0x83, 0x1, 0x2, 0x3, 0x4}, maskedBytes(testMask, []byte{0x1, 0x7, 0x8})...),
+ wantPayload: []byte{0x7, 0x8},
+ wantStreamID: 1,
+ },
+ {
+ name: "subsequent_fragment_stdout_stream_no_mask_has_payload",
+ b: []byte{0x0, 0x3, 0x1, 0x7, 0x8},
+ initialPayload: []byte{0x1, 0x2, 0x3},
+ wantPayload: []byte{0x1, 0x2, 0x3, 0x7, 0x8},
+ wantStreamID: 1,
+ },
+ {
+ name: "subsequent_fragment_stdout_stream_has_mask_has_payload",
+ b: append([]byte{0x0, 0x83, 0x1, 0x2, 0x3, 0x4}, maskedBytes(testMask, []byte{0x1, 0x7, 0x8})...),
+ initialPayload: []byte{0x1, 0x2, 0x3},
+ wantPayload: []byte{0x1, 0x2, 0x3, 0x7, 0x8},
+ wantStreamID: 1,
+ },
+ {
+ name: "final_fragment_stdout_stream_no_mask_has_payload",
+ b: []byte{0x80, 0x3, 0x1, 0x7, 0x8},
+ initialPayload: []byte{0x1, 0x2, 0x3},
+ wantIsFinalized: true,
+ wantPayload: []byte{0x1, 0x2, 0x3, 0x7, 0x8},
+ wantStreamID: 1,
+ },
+ {
+ name: "final_fragment_stdout_stream_has_mask_has_payload",
+ b: append([]byte{0x80, 0x83, 0x1, 0x2, 0x3, 0x4}, maskedBytes(testMask, []byte{0x1, 0x7, 0x8})...),
+ initialPayload: []byte{0x1, 0x2, 0x3},
+ wantIsFinalized: true,
+ wantPayload: []byte{0x1, 0x2, 0x3, 0x7, 0x8},
+ wantStreamID: 1,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ msg := &message{
+ typ: binaryMessage,
+ payload: tt.initialPayload,
+ }
+ if _, err := msg.Parse(tt.b, zl.Sugar()); err != nil {
+ t.Errorf("msg.Parse() errored %v", err)
+ }
+ if msg.isFinalized != tt.wantIsFinalized {
+ t.Errorf("wants message to be finalized: %t, got: %t", tt.wantIsFinalized, msg.isFinalized)
+ }
+ if msg.streamID.Load() != tt.wantStreamID {
+ t.Errorf("wants stream ID: %d, got: %d", tt.wantStreamID, msg.streamID.Load())
+ }
+ if !reflect.DeepEqual(msg.payload, tt.wantPayload) {
+ t.Errorf("unexpected message payload after Parse, wants %b, got %b", tt.wantPayload, msg.payload)
+ }
+ })
+ }
+}
+
+func maskedBytes(mask [4]byte, b []byte) []byte {
+ maskBytes(mask, b)
+ return b
+}