summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorchaosinthecrd <tom@tmlabs.co.uk>2025-05-22 16:22:48 +0100
committerchaosinthecrd <tom@tmlabs.co.uk>2025-05-22 16:27:50 +0100
commit05ecda9855f975c42063cb09a0c446cd4de96ce6 (patch)
tree10eb5d57f15f2e3c08c888ecc7ee64cb299b5ffb
parent95dcd133dd87808d663c0e0f09dd9330e8ca1a0c (diff)
downloadtailscale-irbekrm/pretendpoints.tar.xz
tailscale-irbekrm/pretendpoints.zip
cmd/k8s-operator, k8s-operator: support direct connections on ProxyGroupsirbekrm/pretendpoints
updates: #14674 Signed-off-by: chaosinthecrd <tom@tmlabs.co.uk>
-rw-r--r--cmd/k8s-operator/proxygroup.go262
-rw-r--r--cmd/k8s-operator/proxygroup_specs.go7
-rw-r--r--k8s-operator/apis/v1alpha1/types_proxyclass.go1
3 files changed, 257 insertions, 13 deletions
diff --git a/cmd/k8s-operator/proxygroup.go b/cmd/k8s-operator/proxygroup.go
index 936a29325..77a644e67 100644
--- a/cmd/k8s-operator/proxygroup.go
+++ b/cmd/k8s-operator/proxygroup.go
@@ -11,7 +11,10 @@ import (
"encoding/json"
"fmt"
"net/http"
+ "net/netip"
"slices"
+ "sort"
+ "strconv"
"strings"
"sync"
@@ -25,6 +28,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -43,11 +47,15 @@ import (
)
const (
+ nodePortType = "NodePort"
+ directConnPortName = "direct-connection-proxy"
+ directConnProxyPort = 30052
reasonProxyGroupCreationFailed = "ProxyGroupCreationFailed"
reasonProxyGroupReady = "ProxyGroupReady"
reasonProxyGroupCreating = "ProxyGroupCreating"
reasonProxyGroupInvalid = "ProxyGroupInvalid"
+ statefulSetPodNameSelector = "statefulset.kubernetes.io/pod-name"
// Copied from k8s.io/apiserver/pkg/registry/generic/registry/store.go@cccad306d649184bf2a0e319ba830c53f65c445c
optimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
)
@@ -206,20 +214,191 @@ func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Requ
return setStatusReady(pg, metav1.ConditionTrue, reasonProxyGroupReady, reasonProxyGroupReady)
}
-func (r *ProxyGroupReconciler) maybeExposeViaNodePort(ctx context.Context, pc *tsapi.ProxyClass, pg *tsapi.ProxyGroup) error {
- // TODO: make NodePort a const
- if pc == nil || pc.Spec.TailnetListenerConfig == nil || pc.Spec.TailnetListenerConfig.Type != "NodePort" {
- return nil
+func allocatePorts(pg *tsapi.ProxyGroup, pr []string, ports map[string]int32) error {
+ ranges, err := validatePortRanges(pr)
+ if err != nil {
+ return fmt.Errorf("configured port ranges invalid: %w", err)
+ }
+
+ i := 0
+ replicaCount := int(*pg.Spec.Replicas)
+ for _, r := range ranges {
+ for p := r.Start; p <= r.End && len(ports) < replicaCount; p++ {
+ ports[fmt.Sprintf("%s-%d", pg.Name, i)] = int32(p)
+ i++
+ }
+ if i-1 >= replicaCount {
+ break
+ }
}
- // 1. Create a NodePort Service per each replica
- // TODO: support setting NodePort range
- for i := range *(pg.Spec.Replicas) {
+ if len(ports) < replicaCount {
+ return fmt.Errorf("not enough ports in configured ranges: needed %d, found %d", replicaCount, len(ports))
}
return nil
}
+func validateRange(s int, e int) error {
+ if s < 0 || s > 65535 {
+ return fmt.Errorf("invalid port value: %q", s)
+ }
+ if e < 0 || e > 65535 {
+ return fmt.Errorf("invalid port value: %q", e)
+ }
+ if s > e {
+ return fmt.Errorf("invalid port range: '%d-%d'", s, e)
+ }
+
+ return nil
+}
+
+type portRange struct {
+ Start int
+ End int
+ String string
+}
+
+func validatePortRanges(pr []string) ([]portRange, error) {
+ ranges := []portRange{}
+ for _, p := range pr {
+ parts := strings.Split(p, "-")
+ switch len(parts) {
+ case 1:
+ s, err := strconv.Atoi(parts[0])
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse port range %q: %w", p, err)
+ }
+ e := s
+
+ err = validateRange(s, e)
+ if err != nil {
+ return nil, err
+ }
+
+ ranges = append(ranges, portRange{Start: s, End: e, String: p})
+ case 2:
+ s, err := strconv.Atoi(parts[0])
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse port range %q: %w", p, err)
+ }
+ e, err := strconv.Atoi(parts[1])
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse port range %q: %w", p, err)
+ }
+
+ err = validateRange(s, e)
+ if err != nil {
+ return nil, err
+ }
+
+ ranges = append(ranges, portRange{Start: s, End: e, String: p})
+ default:
+ return nil, fmt.Errorf("failed to parse port range %q", p)
+ }
+ }
+
+ if len(ranges) < 2 {
+ return ranges, nil
+ }
+
+ sort.Slice(ranges, func(i, j int) bool {
+ return ranges[i].Start < ranges[j].Start
+ })
+
+ for i := 1; i < len(ranges); i++ {
+ prev := ranges[i-1]
+ curr := ranges[i]
+ if curr.Start <= prev.End {
+ return nil, fmt.Errorf("overlapping ranges: %q and %q", prev.String, curr.String)
+ }
+ }
+
+ return ranges, nil
+}
+
+func (r *ProxyGroupReconciler) maybeExposeViaNodePort(ctx context.Context, pc *tsapi.ProxyClass, pg *tsapi.ProxyGroup, logger *zap.SugaredLogger) (map[string]int32, error) {
+ if pc == nil || pc.Spec.TailnetListenerConfig == nil || pc.Spec.TailnetListenerConfig.Type != nodePortType {
+ return nil, nil
+ }
+
+ ports := make(map[string]int32)
+ pr := pc.Spec.TailnetListenerConfig.NodePortConfig.PortRanges
+ if len(pr) == 0 {
+ logger.Infof("no port ranges specified in ProxyClass config, leaving NodePort unspecified")
+ } else {
+ err := allocatePorts(pg, pr, ports)
+ if err != nil {
+ return nil, fmt.Errorf("failed to allocate NodePorts to ProxyGroup Services: %w", err)
+ }
+ }
+
+ for i := range *(pg.Spec.Replicas) {
+ replicaName := fmt.Sprintf("%s-%d", pg.Name, i)
+ port, ok := ports[replicaName]
+ if !ok {
+ // NOTE: if port ranges have not been configured we want to leave Kubernetes to set the NodePort
+ port = 0
+ }
+
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: replicaName,
+ Namespace: r.tsNamespace,
+ Labels: pgLabels(pg.Name, nil),
+ OwnerReferences: pgOwnerReference(pg),
+ },
+ Spec: corev1.ServiceSpec{
+ Type: corev1.ServiceTypeNodePort,
+ Ports: []corev1.ServicePort{
+ {
+ Name: directConnPortName,
+ Port: int32(directConnProxyPort),
+ Protocol: corev1.ProtocolUDP,
+ NodePort: int32(port),
+ TargetPort: intstr.FromInt(directConnProxyPort),
+ },
+ },
+ Selector: map[string]string{
+ statefulSetPodNameSelector: replicaName,
+ },
+ },
+ }
+
+ createOrUpdate(ctx, r.Client, r.tsNamespace, svc, func(s *corev1.Service) {
+ s.ObjectMeta.Labels = svc.ObjectMeta.Labels
+ s.ObjectMeta.Annotations = svc.ObjectMeta.Annotations
+ s.ObjectMeta.OwnerReferences = svc.ObjectMeta.OwnerReferences
+ s.Spec.Selector = svc.Spec.Selector
+ if port != 0 {
+ s.Spec.Ports = svc.Spec.Ports
+ }
+ })
+
+ if port == 0 {
+ if err := r.Get(ctx, client.ObjectKeyFromObject(svc), svc); err != nil && !apierrors.IsNotFound(err) {
+ return nil, fmt.Errorf("error retrieving Kubernetes NodePort Service %s: %w", svc.Name, err)
+ }
+
+ for _, p := range svc.Spec.Ports {
+ if p.Name == directConnPortName {
+ port = p.NodePort
+ ports[replicaName] = port
+ }
+ }
+
+ if port == 0 {
+ logger.Warn("ProxyGroup %q replica %q NodePort not configured")
+ return nil, nil
+ }
+
+ logger.Info("ProxyGroup %q replica %q exposed on NodePort %q. Please ensure the appropriate firewall rules are configured to expose it on the desired network.", pg.Name, svc.Name, port)
+ }
+ }
+
+ return ports, nil
+}
+
// validateProxyClassForPG applies custom validation logic for ProxyClass applied to ProxyGroup.
func validateProxyClassForPG(logger *zap.SugaredLogger, pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass) {
if pg.Spec.Type == tsapi.ProxyGroupTypeIngress {
@@ -251,7 +430,12 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
r.ensureAddedToGaugeForProxyGroup(pg)
r.mu.Unlock()
- cfgHash, err := r.ensureConfigSecretsCreated(ctx, pg, proxyClass)
+ ports, err := r.maybeExposeViaNodePort(ctx, proxyClass, pg, logger)
+ if err != nil {
+ return fmt.Errorf("error getting device info: %w", err)
+ }
+
+ cfgHash, err := r.ensureConfigSecretsCreated(ctx, pg, proxyClass, ports)
if err != nil {
return fmt.Errorf("error provisioning config Secrets: %w", err)
}
@@ -326,7 +510,6 @@ func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, pg *tsapi.Pro
}
updateSS := func(s *appsv1.StatefulSet) {
-
// This is a temporary workaround to ensure that egress ProxyGroup proxies with capver older than 110
// are restarted when tailscaled configfile contents have changed.
// This workaround ensures that:
@@ -435,7 +618,8 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, pg *tsapi.Proxy
mo := &metricsOpts{
proxyLabels: pgLabels(pg.Name, nil),
tsNamespace: r.tsNamespace,
- proxyType: "proxygroup"}
+ proxyType: "proxygroup",
+ }
if err := maybeCleanupMetricsResources(ctx, mo, r.Client); err != nil {
return false, fmt.Errorf("error cleaning up metrics resources: %w", err)
}
@@ -463,7 +647,7 @@ func (r *ProxyGroupReconciler) deleteTailnetDevice(ctx context.Context, id tailc
return nil
}
-func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (hash string, err error) {
+func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass, ports map[string]int32) (hash string, err error) {
logger := r.logger(pg.Name)
var configSHA256Sum string
for i := range pgReplicas(pg) {
@@ -497,7 +681,25 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
}
}
- configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, existingCfgSecret)
+ endpoints := []netip.AddrPort{}
+ if proxyClass != nil && proxyClass.Spec.TailnetListenerConfig.Type == nodePortType {
+ replicaName := fmt.Sprintf("%s-%d", pg.Name, i)
+ port, ok := ports[replicaName]
+ if !ok {
+ err := fmt.Errorf("could not find configured NodePort for ProxyGroup replica %q", replicaName)
+ logger.Warn(err.Error())
+ return "", err
+ }
+
+ err := r.findStaticEndpoints(ctx, port, endpoints, proxyClass, logger)
+ if err != nil {
+ err := fmt.Errorf("could not find static endpoints for replica %q: %w", replicaName, err)
+ logger.Warn(err.Error())
+ return "", err
+ }
+ }
+
+ configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, existingCfgSecret, endpoints)
if err != nil {
return "", fmt.Errorf("error creating tailscaled config: %w", err)
}
@@ -554,6 +756,36 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(ctx context.Context, p
return configSHA256Sum, nil
}
+func (r *ProxyGroupReconciler) findStaticEndpoints(ctx context.Context, port int32, endpoints []netip.AddrPort, proxyClass *tsapi.ProxyClass, logger *zap.SugaredLogger) error {
+ nodes := new(corev1.NodeList)
+ err := r.List(ctx, nodes, client.MatchingLabels(proxyClass.Spec.TailnetListenerConfig.NodePortConfig.Selector))
+ if err != nil {
+ return fmt.Errorf("failed to list nodes: %w", err)
+ }
+
+ if len(nodes.Items) == 0 {
+ err := fmt.Errorf("failed to match nodes to configured NodeSelectors in TailnetListenerConfig")
+ logger.Warn(err.Error())
+ return err
+ }
+
+ for _, n := range nodes.Items {
+ for _, a := range n.Status.Addresses {
+ if a.Type == corev1.NodeExternalIP {
+ addrPort := fmt.Sprintf("%s:%d", a.Address, port)
+ i, err := netip.ParseAddrPort(addrPort)
+ if err != nil {
+ logger.Debugf("failed to parse external address on node %q: %q", n.Name, addrPort)
+ }
+ logger.Debugf("adding endpoint %q to staticEndpoints config", addrPort)
+ endpoints = append(endpoints, i)
+ }
+ }
+ }
+
+ return nil
+}
+
// ensureAddedToGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup
// is created. r.mu must be held.
func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGroup) {
@@ -580,7 +812,7 @@ func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.Pro
gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
}
-func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32, authKey string, oldSecret *corev1.Secret) (tailscaledConfigs, error) {
+func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32, authKey string, oldSecret *corev1.Secret, staticEndpoints []netip.AddrPort) (tailscaledConfigs, error) {
conf := &ipn.ConfigVAlpha{
Version: "alpha0",
AcceptDNS: "false",
@@ -597,6 +829,10 @@ func pgTailscaledConfig(pg *tsapi.ProxyGroup, class *tsapi.ProxyClass, idx int32
conf.AcceptRoutes = "true"
}
+ if len(staticEndpoints) > 0 {
+ conf.StaticEndpoints = staticEndpoints
+ }
+
deviceAuthed := false
for _, d := range pg.Status.Devices {
if d.Hostname == *conf.Hostname {
diff --git a/cmd/k8s-operator/proxygroup_specs.go b/cmd/k8s-operator/proxygroup_specs.go
index 1d12c39e0..ee53b0e91 100644
--- a/cmd/k8s-operator/proxygroup_specs.go
+++ b/cmd/k8s-operator/proxygroup_specs.go
@@ -144,6 +144,13 @@ func pgStatefulSet(pg *tsapi.ProxyGroup, namespace, image, tsFirewallMode string
},
}
+ if len(proxyClass.Spec.TailnetListenerConfig.NodePortConfig.PortRanges) > 0 {
+ envs = append(envs, corev1.EnvVar{
+ Name: "PORT",
+ Value: strconv.Itoa(directConnProxyPort),
+ })
+ }
+
if tsFirewallMode != "" {
envs = append(envs, corev1.EnvVar{
Name: "TS_DEBUG_FIREWALL_MODE",
diff --git a/k8s-operator/apis/v1alpha1/types_proxyclass.go b/k8s-operator/apis/v1alpha1/types_proxyclass.go
index 3b6bf8376..f1e850a37 100644
--- a/k8s-operator/apis/v1alpha1/types_proxyclass.go
+++ b/k8s-operator/apis/v1alpha1/types_proxyclass.go
@@ -92,6 +92,7 @@ type TailnetListenerConfig struct {
}
type TailnetListenerConfigMode string
+
type NodePort struct {
PortRanges []string `json:"portRanges,omitempty"`
Selector map[string]string `json:"selector,omitempty"`