summaryrefslogtreecommitdiffhomepage
path: root/cmd/k8s-proxy/k8s-proxy.go
blob: e00d43a948dbad183fa99879a408754833da0631 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause

//go:build !plan9

// k8s-proxy proxies between tailnet and Kubernetes cluster traffic.
// Currently, it only supports proxying tailnet clients to the Kubernetes API
// server.
package main

import (
	"context"
	"errors"
	"fmt"
	"net"
	"net/http"
	"os"
	"os/signal"
	"reflect"
	"strconv"
	"strings"
	"syscall"
	"time"

	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
	"golang.org/x/sync/errgroup"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/utils/strings/slices"
	"tailscale.com/client/local"
	"tailscale.com/cmd/k8s-proxy/internal/config"
	"tailscale.com/hostinfo"
	"tailscale.com/ipn"
	"tailscale.com/ipn/store"

	// we need to import this package so that the `kube:` ipn store gets registered
	_ "tailscale.com/ipn/store/kubestore"
	apiproxy "tailscale.com/k8s-operator/api-proxy"
	"tailscale.com/kube/certs"
	healthz "tailscale.com/kube/health"
	"tailscale.com/kube/k8s-proxy/conf"
	"tailscale.com/kube/kubetypes"
	klc "tailscale.com/kube/localclient"
	"tailscale.com/kube/metrics"
	"tailscale.com/kube/services"
	"tailscale.com/kube/state"
	"tailscale.com/tailcfg"
	"tailscale.com/tsnet"
)

func main() {
	encoderCfg := zap.NewProductionEncoderConfig()
	encoderCfg.EncodeTime = zapcore.RFC3339TimeEncoder
	logger := zap.Must(zap.Config{
		Level:            zap.NewAtomicLevelAt(zap.DebugLevel),
		Encoding:         "json",
		OutputPaths:      []string{"stderr"},
		ErrorOutputPaths: []string{"stderr"},
		EncoderConfig:    encoderCfg,
	}.Build()).Sugar()
	defer logger.Sync()

	if err := run(logger); err != nil {
		logger.Fatal(err.Error())
	}
}

func run(logger *zap.SugaredLogger) error {
	var (
		configPath = os.Getenv("TS_K8S_PROXY_CONFIG")
		podUID     = os.Getenv("POD_UID")
		podIP      = os.Getenv("POD_IP")
	)
	if configPath == "" {
		return errors.New("TS_K8S_PROXY_CONFIG unset")
	}

	// serveCtx to live for the lifetime of the process, only gets cancelled
	// once the Tailscale Service has been drained
	serveCtx, serveCancel := context.WithCancel(context.Background())
	defer serveCancel()

	// ctx to cancel to start the shutdown process.
	ctx, cancel := context.WithCancel(serveCtx)
	defer cancel()

	sigsChan := make(chan os.Signal, 1)
	signal.Notify(sigsChan, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		select {
		case <-ctx.Done():
		case s := <-sigsChan:
			logger.Infof("Received shutdown signal %s, exiting", s)
			cancel()
		}
	}()

	var group *errgroup.Group
	group, ctx = errgroup.WithContext(ctx)

	restConfig, err := getRestConfig(logger)
	if err != nil {
		return fmt.Errorf("error getting rest config: %w", err)
	}
	clientset, err := kubernetes.NewForConfig(restConfig)
	if err != nil {
		return fmt.Errorf("error creating Kubernetes clientset: %w", err)
	}

	// Load and watch config.
	cfgChan := make(chan *conf.Config)
	cfgLoader := config.NewConfigLoader(logger, clientset.CoreV1(), cfgChan)
	group.Go(func() error {
		return cfgLoader.WatchConfig(ctx, configPath)
	})

	// Get initial config.
	var cfg *conf.Config
	select {
	case <-ctx.Done():
		return group.Wait()
	case cfg = <-cfgChan:
	}

	if cfg.Parsed.LogLevel != nil {
		level, err := zapcore.ParseLevel(*cfg.Parsed.LogLevel)
		if err != nil {
			return fmt.Errorf("error parsing log level %q: %w", *cfg.Parsed.LogLevel, err)
		}
		logger = logger.WithOptions(zap.IncreaseLevel(level))
	}

	// TODO:(ChaosInTheCRD) This is a temporary workaround until we can set static endpoints using prefs
	if se := cfg.Parsed.StaticEndpoints; len(se) > 0 {
		logger.Debugf("setting static endpoints '%v' via TS_DEBUG_PRETENDPOINT environment variable", cfg.Parsed.StaticEndpoints)
		ses := make([]string, len(se))
		for i, e := range se {
			ses[i] = e.String()
		}

		err := os.Setenv("TS_DEBUG_PRETENDPOINT", strings.Join(ses, ","))
		if err != nil {
			return err
		}
	}

	if cfg.Parsed.App != nil {
		hostinfo.SetApp(*cfg.Parsed.App)
	}

	// TODO(tomhjp): Pass this setting directly into the store instead of using
	// environment variables.
	if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true) {
		os.Setenv("TS_CERT_SHARE_MODE", "rw")
	} else {
		os.Setenv("TS_CERT_SHARE_MODE", "ro")
	}

	st, err := getStateStore(cfg.Parsed.State, logger)
	if err != nil {
		return err
	}

	// If Pod UID unset, assume we're running outside of a cluster/not managed
	// by the operator, so no need to set additional state keys.
	if podUID != "" {
		if err := state.SetInitialKeys(st, podUID); err != nil {
			return fmt.Errorf("error setting initial state: %w", err)
		}
	}

	var authKey string
	if cfg.Parsed.AuthKey != nil {
		authKey = *cfg.Parsed.AuthKey
	}

	ts := &tsnet.Server{
		Logf:     logger.Named("tsnet").Debugf,
		UserLogf: logger.Named("tsnet").Infof,
		Store:    st,
		AuthKey:  authKey,
	}

	if cfg.Parsed.ServerURL != nil {
		ts.ControlURL = *cfg.Parsed.ServerURL
	}

	if cfg.Parsed.Hostname != nil {
		ts.Hostname = *cfg.Parsed.Hostname
	}

	// Make sure we crash loop if Up doesn't complete in reasonable time.
	upCtx, upCancel := context.WithTimeout(ctx, time.Minute)
	defer upCancel()
	if _, err := ts.Up(upCtx); err != nil {
		return fmt.Errorf("error starting tailscale server: %w", err)
	}
	defer ts.Close()
	lc, err := ts.LocalClient()
	if err != nil {
		return fmt.Errorf("error getting local client: %w", err)
	}

	// Setup for updating state keys.
	if podUID != "" {
		group.Go(func() error {
			return state.KeepKeysUpdated(ctx, st, klc.New(lc))
		})
	}

	if cfg.Parsed.HealthCheckEnabled.EqualBool(true) || cfg.Parsed.MetricsEnabled.EqualBool(true) {
		addr := podIP
		if addr == "" {
			addr = cfg.GetLocalAddr()
		}

		addrPort := getLocalAddrPort(addr, cfg.GetLocalPort())
		mux := http.NewServeMux()
		localSrv := &http.Server{Addr: addrPort, Handler: mux}

		if cfg.Parsed.MetricsEnabled.EqualBool(true) {
			logger.Infof("Running metrics endpoint at %s/metrics", addrPort)
			metrics.RegisterMetricsHandlers(mux, lc, "")
		}

		if cfg.Parsed.HealthCheckEnabled.EqualBool(true) {
			ipV4, _ := ts.TailscaleIPs()
			hz := healthz.RegisterHealthHandlers(mux, ipV4.String(), logger.Infof)
			group.Go(func() error {
				err := hz.MonitorHealth(ctx, lc)
				if err == nil || errors.Is(err, context.Canceled) {
					return nil
				}
				return err
			})
		}

		group.Go(func() error {
			errChan := make(chan error)
			go func() {
				if err := localSrv.ListenAndServe(); err != nil {
					errChan <- err
				}
				close(errChan)
			}()

			select {
			case <-ctx.Done():
				sCtx, scancel := context.WithTimeout(serveCtx, 10*time.Second)
				defer scancel()
				return localSrv.Shutdown(sCtx)
			case err := <-errChan:
				return err
			}
		})
	}

	if v, ok := cfg.Parsed.AcceptRoutes.Get(); ok {
		_, err = lc.EditPrefs(ctx, &ipn.MaskedPrefs{
			RouteAllSet: true,
			Prefs:       ipn.Prefs{RouteAll: v},
		})
		if err != nil {
			return fmt.Errorf("error editing prefs: %w", err)
		}
	}

	// TODO(tomhjp): There seems to be a bug that on restart the device does
	// not get reassigned it's already working Service IPs unless we clear and
	// reset the serve config.
	if err := lc.SetServeConfig(ctx, &ipn.ServeConfig{}); err != nil {
		return fmt.Errorf("error clearing existing ServeConfig: %w", err)
	}

	var cm *certs.CertManager
	if shouldIssueCerts(cfg) {
		logger.Infof("Will issue TLS certs for Tailscale Service")
		cm = certs.NewCertManager(klc.New(lc), logger.Infof)
	}
	if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
		return err
	}

	if cfg.Parsed.AdvertiseServices != nil {
		if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
			AdvertiseServicesSet: true,
			Prefs: ipn.Prefs{
				AdvertiseServices: cfg.Parsed.AdvertiseServices,
			},
		}); err != nil {
			return fmt.Errorf("error setting prefs AdvertiseServices: %w", err)
		}
	}

	// Setup for the API server proxy.
	mode := kubetypes.APIServerProxyModeAuth
	if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.Mode != nil {
		mode = *cfg.Parsed.APIServerProxy.Mode
	}
	ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, mode, false)
	if err != nil {
		return fmt.Errorf("error creating api server proxy: %w", err)
	}

	group.Go(func() error {
		if err := ap.Run(serveCtx); err != nil {
			return fmt.Errorf("error running API server proxy: %w", err)
		}

		return nil
	})

	for {
		select {
		case <-ctx.Done():
			// Context cancelled, exit.
			logger.Info("Context cancelled, exiting")
			shutdownCtx, shutdownCancel := context.WithTimeout(serveCtx, 20*time.Second)
			unadvertiseErr := services.EnsureServicesNotAdvertised(shutdownCtx, lc, logger.Infof)
			shutdownCancel()
			serveCancel()
			return errors.Join(unadvertiseErr, group.Wait())
		case cfg = <-cfgChan:
			// Handle config reload.
			// TODO(tomhjp): Make auth mode reloadable.
			var prefs ipn.MaskedPrefs
			cfgLogger := logger
			currentPrefs, err := lc.GetPrefs(ctx)
			if err != nil {
				return fmt.Errorf("error getting current prefs: %w", err)
			}
			if !slices.Equal(currentPrefs.AdvertiseServices, cfg.Parsed.AdvertiseServices) {
				cfgLogger = cfgLogger.With("AdvertiseServices", fmt.Sprintf("%v -> %v", currentPrefs.AdvertiseServices, cfg.Parsed.AdvertiseServices))
				prefs.AdvertiseServicesSet = true
				prefs.Prefs.AdvertiseServices = cfg.Parsed.AdvertiseServices
			}
			if cfg.Parsed.Hostname != nil && *cfg.Parsed.Hostname != currentPrefs.Hostname {
				cfgLogger = cfgLogger.With("Hostname", fmt.Sprintf("%s -> %s", currentPrefs.Hostname, *cfg.Parsed.Hostname))
				prefs.HostnameSet = true
				prefs.Hostname = *cfg.Parsed.Hostname
			}
			if v, ok := cfg.Parsed.AcceptRoutes.Get(); ok && v != currentPrefs.RouteAll {
				cfgLogger = cfgLogger.With("AcceptRoutes", fmt.Sprintf("%v -> %v", currentPrefs.RouteAll, v))
				prefs.RouteAllSet = true
				prefs.Prefs.RouteAll = v
			}
			if !prefs.IsEmpty() {
				if _, err := lc.EditPrefs(ctx, &prefs); err != nil {
					return fmt.Errorf("error editing prefs: %w", err)
				}
			}
			if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
				return fmt.Errorf("error setting serve config: %w", err)
			}

			cfgLogger.Infof("Config reloaded")
		}
	}
}

func getLocalAddrPort(addr string, port uint16) string {
	return net.JoinHostPort(addr, strconv.FormatUint(uint64(port), 10))
}

func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, error) {
	p := "mem:"
	if path != nil {
		p = *path
	} else {
		logger.Warn("No state Secret provided; using in-memory store, which will lose state on restart")
	}
	st, err := store.New(logger.Errorf, p)
	if err != nil {
		return nil, fmt.Errorf("error creating state store: %w", err)
	}

	return st, nil
}

func getRestConfig(logger *zap.SugaredLogger) (*rest.Config, error) {
	restConfig, err := rest.InClusterConfig()
	switch err {
	case nil:
		return restConfig, nil
	case rest.ErrNotInCluster:
		logger.Info("Not running in-cluster, falling back to kubeconfig")
	default:
		return nil, fmt.Errorf("error getting in-cluster config: %w", err)
	}

	loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
	clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil)
	restConfig, err = clientConfig.ClientConfig()
	if err != nil {
		return nil, fmt.Errorf("error loading kubeconfig: %w", err)
	}

	return restConfig, nil
}

func apiServerProxyService(cfg *conf.Config) tailcfg.ServiceName {
	if cfg.Parsed.APIServerProxy != nil &&
		cfg.Parsed.APIServerProxy.Enabled.EqualBool(true) &&
		cfg.Parsed.APIServerProxy.ServiceName != nil &&
		*cfg.Parsed.APIServerProxy.ServiceName != "" {
		return tailcfg.ServiceName(*cfg.Parsed.APIServerProxy.ServiceName)
	}

	return ""
}

func shouldIssueCerts(cfg *conf.Config) bool {
	return cfg.Parsed.APIServerProxy != nil &&
		cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true)
}

// setServeConfig sets up serve config such that it's serving for the passed in
// Tailscale Service, and does nothing if it's already up to date.
func setServeConfig(ctx context.Context, lc *local.Client, cm *certs.CertManager, name tailcfg.ServiceName) error {
	existingServeConfig, err := lc.GetServeConfig(ctx)
	if err != nil {
		return fmt.Errorf("error getting existing serve config: %w", err)
	}

	// Ensure serve config is cleared if no Tailscale Service.
	if name == "" {
		if reflect.DeepEqual(*existingServeConfig, ipn.ServeConfig{}) {
			// Already up to date.
			return nil
		}

		if cm != nil {
			cm.EnsureCertLoops(ctx, &ipn.ServeConfig{})
		}
		return lc.SetServeConfig(ctx, &ipn.ServeConfig{})
	}

	status, err := lc.StatusWithoutPeers(ctx)
	if err != nil {
		return fmt.Errorf("error getting local client status: %w", err)
	}
	serviceHostPort := ipn.HostPort(fmt.Sprintf("%s.%s:443", name.WithoutPrefix(), status.CurrentTailnet.MagicDNSSuffix))

	serveConfig := ipn.ServeConfig{
		// Configure for the Service hostname.
		Services: map[tailcfg.ServiceName]*ipn.ServiceConfig{
			name: {
				TCP: map[uint16]*ipn.TCPPortHandler{
					443: {
						HTTPS: true,
					},
				},
				Web: map[ipn.HostPort]*ipn.WebServerConfig{
					serviceHostPort: {
						Handlers: map[string]*ipn.HTTPHandler{
							"/": {
								Proxy: "http://localhost:80",
							},
						},
					},
				},
			},
		},
	}

	if reflect.DeepEqual(*existingServeConfig, serveConfig) {
		// Already up to date.
		return nil
	}

	if cm != nil {
		cm.EnsureCertLoops(ctx, &serveConfig)
	}
	return lc.SetServeConfig(ctx, &serveConfig)
}