summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorchaosinthecrd <tom@tmlabs.co.uk>2026-01-23 12:19:38 +0000
committerchaosinthecrd <tom@tmlabs.co.uk>2026-02-02 15:50:38 +0000
commitbf7508dbf66a927b7600cc2968f03c81a0fe7e54 (patch)
treec3ef2c7b2108ad7510ddfcd9018b690117938dce
parent63d563e7340b4712b9f2933f663057ce2dcfa4a4 (diff)
downloadtailscale-chaosinthecrd/userspace-tsnet-proxy.tar.xz
tailscale-chaosinthecrd/userspace-tsnet-proxy.zip
cmd/k8s-proxy,kube/k8s-proxy: starting userspace proxychaosinthecrd/userspace-tsnet-proxy
Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
-rw-r--r--cmd/k8s-proxy/k8s-proxy.go91
-rw-r--r--cmd/k8s-proxy/l4-forwarder.go269
-rw-r--r--kube/egressservices/egressservices.go2
-rw-r--r--kube/ingressservices/ingressservices.go18
-rw-r--r--kube/k8s-proxy/conf/conf.go8
-rw-r--r--wgengine/netstack/netstack.go28
6 files changed, 383 insertions, 33 deletions
diff --git a/cmd/k8s-proxy/k8s-proxy.go b/cmd/k8s-proxy/k8s-proxy.go
index 9b2bb6749..fbcde1564 100644
--- a/cmd/k8s-proxy/k8s-proxy.go
+++ b/cmd/k8s-proxy/k8s-proxy.go
@@ -10,6 +10,7 @@ package main
import (
"context"
+ "encoding/json"
"errors"
"fmt"
"net"
@@ -104,6 +105,7 @@ func run(logger *zap.SugaredLogger) error {
if err != nil {
return fmt.Errorf("error getting rest config: %w", err)
}
+
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("error creating Kubernetes clientset: %w", err)
@@ -152,10 +154,12 @@ func run(logger *zap.SugaredLogger) error {
// TODO(tomhjp): Pass this setting directly into the store instead of using
// environment variables.
- if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true) {
- os.Setenv("TS_CERT_SHARE_MODE", "rw")
- } else {
- os.Setenv("TS_CERT_SHARE_MODE", "ro")
+ if cfg.Parsed.APIServerProxy != nil {
+ if cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true) {
+ os.Setenv("TS_CERT_SHARE_MODE", "rw")
+ } else {
+ os.Setenv("TS_CERT_SHARE_MODE", "ro")
+ }
}
st, err := getStateStore(cfg.Parsed.State, logger)
@@ -275,43 +279,64 @@ func run(logger *zap.SugaredLogger) error {
}
var cm *certs.CertManager
- if shouldIssueCerts(cfg) {
- logger.Infof("Will issue TLS certs for Tailscale Service")
- cm = certs.NewCertManager(klc.New(lc), logger.Infof)
+
+ if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.L4Proxy != nil {
+ return fmt.Errorf("proxy configured for both api-server-proxy and l4-proxy")
}
- if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
- return err
+
+ if cfg.Parsed.APIServerProxy != nil {
+ // Setup for the API server proxy.
+ mode := kubetypes.APIServerProxyModeAuth
+ if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.Mode != nil {
+ mode = *cfg.Parsed.APIServerProxy.Mode
+ }
+
+ ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, mode, false)
+ if err != nil {
+ return fmt.Errorf("error creating api server proxy: %w", err)
+ }
+
+ group.Go(func() error {
+ if err := ap.Run(serveCtx); err != nil {
+ return fmt.Errorf("error running API server proxy: %w", err)
+ }
+
+ return nil
+ })
+
+ if shouldIssueCerts(cfg) {
+ logger.Infof("Will issue TLS certs for Tailscale Service")
+ cm = certs.NewCertManager(klc.New(lc), logger.Infof)
+ }
+ if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
+ return err
+ }
+ } else if cfg.Parsed.L4Proxy != nil {
+ err := setupL4Proxies(serveCtx, ts, lc, logger, cfg, group)
+ if err != nil {
+ return fmt.Errorf("failed to setup l4 proxies: %w", err)
+ }
+ } else {
+ return fmt.Errorf("please configure proxy either as api-server-proxy or l4-proxy")
}
if cfg.Parsed.AdvertiseServices != nil {
- if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
+ if prefs, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
AdvertiseServicesSet: true,
Prefs: ipn.Prefs{
AdvertiseServices: cfg.Parsed.AdvertiseServices,
},
}); err != nil {
return fmt.Errorf("error setting prefs AdvertiseServices: %w", err)
+ } else {
+ prefsJSON, _ := json.Marshal(prefs)
+ logger.Infof("new prefs: %q", string(prefsJSON))
}
+ logger.Infof("Successfully set AdvertiseServices")
+ } else {
+ logger.Infof("No AdvertiseServices configured")
}
- // Setup for the API server proxy.
- mode := kubetypes.APIServerProxyModeAuth
- if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.Mode != nil {
- mode = *cfg.Parsed.APIServerProxy.Mode
- }
- ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, mode, false)
- if err != nil {
- return fmt.Errorf("error creating api server proxy: %w", err)
- }
-
- group.Go(func() error {
- if err := ap.Run(serveCtx); err != nil {
- return fmt.Errorf("error running API server proxy: %w", err)
- }
-
- return nil
- })
-
for {
select {
case <-ctx.Done():
@@ -325,6 +350,7 @@ func run(logger *zap.SugaredLogger) error {
case cfg = <-cfgChan:
// Handle config reload.
// TODO(tomhjp): Make auth mode reloadable.
+ // TODO(ChaosInTheCRD): Make UDP and TCP forwarders reloadable.
var prefs ipn.MaskedPrefs
cfgLogger := logger
currentPrefs, err := lc.GetPrefs(ctx)
@@ -347,12 +373,16 @@ func run(logger *zap.SugaredLogger) error {
prefs.Prefs.RouteAll = v
}
if !prefs.IsEmpty() {
+ logger.Infof("Advertising Service: %v", cfg.Parsed.AdvertiseServices)
if _, err := lc.EditPrefs(ctx, &prefs); err != nil {
return fmt.Errorf("error editing prefs: %w", err)
}
}
- if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
- return fmt.Errorf("error setting serve config: %w", err)
+
+ if cfg.Parsed.APIServerProxy != nil {
+ if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
+ return fmt.Errorf("error setting serve config: %w", err)
+ }
}
cfgLogger.Infof("Config reloaded")
@@ -441,6 +471,7 @@ func setServeConfig(ctx context.Context, lc *local.Client, cm *certs.CertManager
if err != nil {
return fmt.Errorf("error getting local client status: %w", err)
}
+
serviceHostPort := ipn.HostPort(fmt.Sprintf("%s.%s:443", name.WithoutPrefix(), status.CurrentTailnet.MagicDNSSuffix))
serveConfig := ipn.ServeConfig{
diff --git a/cmd/k8s-proxy/l4-forwarder.go b/cmd/k8s-proxy/l4-forwarder.go
new file mode 100644
index 000000000..008d7d64f
--- /dev/null
+++ b/cmd/k8s-proxy/l4-forwarder.go
@@ -0,0 +1,269 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+//go:build !plan9
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "net/netip"
+ "slices"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "go.uber.org/zap"
+ "golang.org/x/sync/errgroup"
+ "tailscale.com/client/local"
+ "tailscale.com/ipn"
+ "tailscale.com/kube/ingressservices"
+ "tailscale.com/kube/k8s-proxy/conf"
+ "tailscale.com/tailcfg"
+ "tailscale.com/tsnet"
+)
+
+type udpForwarder struct {
+ listener net.PacketConn
+ backend string
+ connMap map[netip.AddrPort]*natEntry
+ timeout time.Duration
+ l *zap.SugaredLogger
+ m sync.Mutex
+}
+
+type natEntry struct {
+ conn net.Conn
+ timestamp atomic.Int64
+ cancel context.CancelFunc
+}
+
+func (f *udpForwarder) run(ctx context.Context) error {
+ buf := make([]byte, 65535)
+
+ f.l.Infof("UDP forwarder started, listening on %s, forwarding to %s", f.listener.LocalAddr().String(), f.backend)
+
+ // TODO: Cleanup goroutine
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ default:
+ }
+
+ n, addr, err := f.listener.ReadFrom(buf)
+ if err != nil {
+ f.l.Errorf("failed to read from listener: %v", err)
+ return err
+ }
+
+ f.l.Debugf("Received %d bytes from %s", n, addr.String())
+
+ addrp, err := netip.ParseAddrPort(addr.String())
+ if err != nil {
+ f.l.Errorf("failed to parse address as address and port: %v", err)
+ return err
+ }
+
+ f.m.Lock()
+ entry, ok := f.connMap[addrp]
+ if !ok {
+ c, err := net.Dial("udp", f.backend)
+ if err != nil {
+ f.l.Errorf("failed to dial: %v", err)
+ f.m.Unlock()
+ return err
+ }
+
+ entryCtx, cancel := context.WithCancel(ctx)
+
+ entry = &natEntry{
+ conn: c,
+ cancel: cancel,
+ timestamp: atomic.Int64{},
+ }
+ f.connMap[addrp] = entry
+
+ go func(ctx context.Context, ne *natEntry) {
+ defer ne.conn.Close()
+ buf := make([]byte, 65535)
+
+ for {
+ select {
+ case <-ctx.Done():
+ f.l.Infof("context for relay with address %q done, exiting", addrp.String())
+ return
+ default:
+ }
+
+ n, err := ne.conn.Read(buf)
+ if err != nil {
+ f.l.Errorf("failed to read from connection with address %q: %v", addrp.String(), err)
+ return
+ }
+
+ ne.timestamp.Store(time.Now().Unix())
+
+ _, err = f.listener.WriteTo(buf[:n], net.UDPAddrFromAddrPort(addrp))
+ if err != nil {
+ f.l.Errorf("failed to write response to address %q: %v", addrp.String(), err)
+ return
+ }
+ }
+ }(entryCtx, entry)
+ }
+ f.m.Unlock()
+
+ _, err = entry.conn.Write(buf[:n])
+ if err != nil {
+ f.l.Errorf("failed to write bytes to %q: %v", f.backend, err)
+ return err
+ }
+
+ entry.timestamp.Store(time.Now().Unix())
+ }
+}
+
+func setupL4Proxies(ctx context.Context, ts *tsnet.Server, lc *local.Client, logger *zap.SugaredLogger, cfg *conf.Config, group *errgroup.Group) (err error) {
+ sc := &ipn.ServeConfig{}
+ sc.Services = make(map[tailcfg.ServiceName]*ipn.ServiceConfig)
+
+ // Store proxies to start later
+ udpProxies := []ingressservices.Config{}
+
+ // Build up the ServeConfig
+ for _, p := range cfg.Parsed.L4Proxy.Ingress {
+ // Register empty service config to trigger IP assignment
+ for _, m := range p.Mappings() {
+ if sc.Services[tailcfg.ServiceName(m.TailscaleServiceName)] == nil {
+ sc.Services[tailcfg.ServiceName(m.TailscaleServiceName)] = &ipn.ServiceConfig{}
+ }
+ }
+ udpProxies = append(udpProxies, p)
+
+ status, err := lc.StatusWithoutPeers(ctx)
+ if err != nil {
+ return fmt.Errorf("error getting local client status: %w", err)
+ }
+ err = setTCPForwardingForProxy(p, status.CurrentTailnet.MagicDNSSuffix, sc, lc, logger)
+ if err != nil {
+ return fmt.Errorf("failed to set tcp forwarding for services: %w", err)
+ }
+ }
+
+ // Apply the ServeConfig
+ logger.Infof("Applying ServeConfig...")
+ err = lc.SetServeConfig(ctx, sc)
+ if err != nil {
+ logger.Errorf("Failed to set ServeConfig: %v", err)
+ return err
+ }
+
+ // Setup the UDP Forwarders
+ for _, p := range udpProxies {
+ status, err := lc.StatusWithoutPeers(ctx)
+ if err != nil {
+ return fmt.Errorf("error getting status: %w", err)
+ }
+
+ // We can validate that the Service IP is in this node's capmap, to ensure that the advertisement was successful
+ found := false
+ serviceIPMaps, err := tailcfg.UnmarshalNodeCapJSON[tailcfg.ServiceIPMappings](status.Self.CapMap, tailcfg.NodeAttrServiceHost)
+ if err != nil {
+ return fmt.Errorf("error unmarshaling service IP mappings: %w", err)
+ }
+ if len(serviceIPMaps) == 0 {
+ logger.Warnf("no service IP mappings found for this node")
+ } else {
+ for _, m := range p.Mappings() {
+ ipMatches := false
+ for serviceName, addrs := range serviceIPMaps[0] {
+ if string(serviceName) == m.TailscaleServiceName {
+ found = true
+ if len(addrs) == 0 {
+ logger.Warnf("service %s has no assigned VIP addresses", m.TailscaleServiceName)
+ break
+ }
+ // Check if the configured IP is in the capmap. There can be scenarios where it isn't (no autoapproval, tag problems)
+ if slices.Contains(addrs, m.TailscaleServiceIP) {
+ ipMatches = true
+ logger.Infof("Found matching VIP %s for service %s in capmap", m.TailscaleServiceIP, m.TailscaleServiceName)
+ }
+ if !ipMatches {
+ logger.Warnf("Service %s configured with IP %s, but capmap reports %v. Routing may not work.",
+ m.TailscaleServiceName, m.TailscaleServiceIP, addrs)
+ }
+ break
+ }
+ }
+ if !found {
+ logger.Warnf("Tailscale Service %q not found in capmap. Routing may not work.", m.TailscaleServiceName)
+ }
+ }
+ }
+
+ fs, err := setupUDPForwardingForProxy(ts, p, logger)
+ if err != nil {
+ return fmt.Errorf("failed to setup udp forwarding: %w", err)
+ }
+
+ for _, f := range fs {
+ group.Go(func() error {
+ logger.Infof("Starting UDP forwarder goroutine for %s (%v)", f.backend, f.listener.LocalAddr())
+ return f.run(ctx)
+ })
+
+ logger.Infof("successfully created UDP listener on %s", f.listener.LocalAddr())
+ }
+
+ }
+
+ logger.Infof("Successfully applied ServeConfig and started all L4 proxies")
+ return nil
+}
+
+func setTCPForwardingForProxy(p ingressservices.Config, magicDNSSuffix string, serveConfig *ipn.ServeConfig, lc *local.Client, logger *zap.SugaredLogger) error {
+ for _, m := range p.Mappings() {
+ for _, port := range m.Ports {
+ svcName := tailcfg.ServiceName(m.TailscaleServiceName)
+ logger.Infof("Setting TCP forwarding for service=%s, port=%d, backend=%s", svcName, port, m.ClusterIP)
+
+ serveConfig.SetTCPForwardingForService(
+ port,
+ m.ClusterIP.String(),
+ false,
+ svcName,
+ 0,
+ magicDNSSuffix,
+ )
+ }
+ }
+
+ return nil
+}
+
+func setupUDPForwardingForProxy(ts *tsnet.Server, p ingressservices.Config, logger *zap.SugaredLogger) (fs []*udpForwarder, err error) {
+ for _, m := range p.Mappings() {
+ for _, port := range m.Ports {
+ f := &udpForwarder{
+ l: logger.Named(fmt.Sprintf("udp-forwarder-%v", m.ClusterIP)),
+ backend: fmt.Sprintf("%s:%d", m.ClusterIP.String(), port),
+ connMap: make(map[netip.AddrPort]*natEntry),
+ }
+ listenAddr := fmt.Sprintf("%s:%d", m.TailscaleServiceIP, port)
+ logger.Infof("Attempting to listen on UDP address: %s", listenAddr)
+
+ f.listener, err = ts.ListenPacket("udp", listenAddr)
+ if err != nil {
+ logger.Warnf("Failed to listen on %s: %v", listenAddr, err)
+ return nil, err
+ }
+
+ fs = append(fs, f)
+ }
+ }
+
+ return
+}
diff --git a/kube/egressservices/egressservices.go b/kube/egressservices/egressservices.go
index 56c874f31..d61a9547d 100644
--- a/kube/egressservices/egressservices.go
+++ b/kube/egressservices/egressservices.go
@@ -47,7 +47,7 @@ type TailnetTarget struct {
FQDN string `json:"fqdn"`
}
-// PorMap is a mapping between match port on which proxy receives cluster
+// PortMap is a mapping between match port on which proxy receives cluster
// traffic and target port where traffic received on match port should be
// fowardded to.
type PortMap struct {
diff --git a/kube/ingressservices/ingressservices.go b/kube/ingressservices/ingressservices.go
index f79410761..ba8d0f63e 100644
--- a/kube/ingressservices/ingressservices.go
+++ b/kube/ingressservices/ingressservices.go
@@ -48,6 +48,20 @@ type Config struct {
// Mapping describes a rule that forwards traffic from Tailscale Service IP to a
// Kubernetes Service IP.
type Mapping struct {
- TailscaleServiceIP netip.Addr `json:"TailscaleServiceIP"`
- ClusterIP netip.Addr `json:"ClusterIP"`
+ TailscaleServiceName string `json:"TailscaleServiceName"`
+ TailscaleServiceIP netip.Addr `json:"TailscaleServiceIP"`
+ ClusterIP netip.Addr `json:"ClusterIP"`
+ Ports []uint16 `json:"ports"`
+}
+
+// Mappings returns all non-nil mappings for this config
+func (c *Config) Mappings() []*Mapping {
+ var mappings []*Mapping
+ if c.IPv4Mapping != nil {
+ mappings = append(mappings, c.IPv4Mapping)
+ }
+ if c.IPv6Mapping != nil {
+ mappings = append(mappings, c.IPv6Mapping)
+ }
+ return mappings
}
diff --git a/kube/k8s-proxy/conf/conf.go b/kube/k8s-proxy/conf/conf.go
index 529495243..c854d7c13 100644
--- a/kube/k8s-proxy/conf/conf.go
+++ b/kube/k8s-proxy/conf/conf.go
@@ -14,6 +14,8 @@ import (
"net/netip"
"github.com/tailscale/hujson"
+ "tailscale.com/kube/egressservices"
+ "tailscale.com/kube/ingressservices"
"tailscale.com/kube/kubetypes"
"tailscale.com/tailcfg"
"tailscale.com/types/opt"
@@ -66,6 +68,12 @@ type ConfigV1Alpha1 struct {
AdvertiseServices []string `json:",omitempty"` // Tailscale Services to advertise.
APIServerProxy *APIServerProxyConfig `json:",omitempty"` // Config specific to the API Server proxy.
StaticEndpoints []netip.AddrPort `json:",omitempty"` // StaticEndpoints are additional, user-defined endpoints that this node should advertise amongst its wireguard endpoints.
+ L4Proxy *L4ProxyConfig `json:",omitempty"`
+}
+
+type L4ProxyConfig struct {
+ Ingress []ingressservices.Config `json:",omitempty"`
+ Egress []egressservices.Config `json:",omitempty"`
}
type APIServerProxyConfig struct {
diff --git a/wgengine/netstack/netstack.go b/wgengine/netstack/netstack.go
index e05846e15..27aa39014 100644
--- a/wgengine/netstack/netstack.go
+++ b/wgengine/netstack/netstack.go
@@ -1109,6 +1109,34 @@ func (ns *Impl) shouldProcessInbound(p *packet.Parsed, t *tstun.Wrapper) bool {
return true
}
}
+ // check if there's a registered UDP endpoint for this service VIP
+ // This allows userspace UDP listeners (e.g., via tsnet.ListenPacket) to
+ // receive traffic on service VIP addresses.
+ if p.IPProto == ipproto.UDP {
+ var netProto tcpip.NetworkProtocolNumber
+ var id stack.TransportEndpointID
+ if p.Dst.Addr().Is4() {
+ netProto = ipv4.ProtocolNumber
+ id = stack.TransportEndpointID{
+ LocalAddress: tcpip.AddrFrom4(p.Dst.Addr().As4()),
+ LocalPort: p.Dst.Port(),
+ RemoteAddress: tcpip.AddrFrom4(p.Src.Addr().As4()),
+ RemotePort: p.Src.Port(),
+ }
+ } else {
+ netProto = ipv6.ProtocolNumber
+ id = stack.TransportEndpointID{
+ LocalAddress: tcpip.AddrFrom16(p.Dst.Addr().As16()),
+ LocalPort: p.Dst.Port(),
+ RemoteAddress: tcpip.AddrFrom16(p.Src.Addr().As16()),
+ RemotePort: p.Src.Port(),
+ }
+ }
+ ep := ns.ipstack.FindTransportEndpoint(netProto, udp.ProtocolNumber, id, nicID)
+ if ep != nil {
+ return true
+ }
+ }
return false
}
if p.IPVersion == 6 && !isLocal && viaRange.Contains(dstIP) {