summaryrefslogtreecommitdiffhomepage
path: root/cmd/k8s-operator/svc.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/k8s-operator/svc.go')
-rw-r--r--cmd/k8s-operator/svc.go248
1 files changed, 115 insertions, 133 deletions
diff --git a/cmd/k8s-operator/svc.go b/cmd/k8s-operator/svc.go
index 363c1c8e3..295f00184 100644
--- a/cmd/k8s-operator/svc.go
+++ b/cmd/k8s-operator/svc.go
@@ -7,12 +7,16 @@ package main
import (
"context"
+ "encoding/binary"
+ "encoding/json"
"fmt"
+ "math/rand/v2"
"net/netip"
"slices"
"strings"
"sync"
+ "github.com/gaissmai/bart"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -20,16 +24,19 @@ import (
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
+ kubeutils "tailscale.com/k8s-operator"
tsoperator "tailscale.com/k8s-operator"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/net/dns/resolvconffile"
"tailscale.com/util/clientmetric"
+ "tailscale.com/util/mak"
"tailscale.com/util/set"
)
const (
- resolvConfPath = "/etc/resolv.conf"
- defaultClusterDomain = "cluster.local"
+ resolvConfPath = "/etc/resolv.conf"
+ defaultClusterDomain = "cluster.local"
+ serviceDNSNameAnnotation = "tailscale.com/service-dns-name"
)
type ServiceReconciler struct {
@@ -90,13 +97,6 @@ func (a *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request
} else if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to get svc: %w", err)
}
- targetIP := tailnetTargetAnnotation(svc)
- targetFQDN := svc.Annotations[AnnotationTailnetTargetFQDN]
- if !svc.DeletionTimestamp.IsZero() || !a.shouldExpose(svc) && targetIP == "" && targetFQDN == "" {
- logger.Debugf("service is being deleted or is (no longer) referring to Tailscale ingress/egress, ensuring any created resources are cleaned up")
- return reconcile.Result{}, a.maybeCleanup(ctx, logger, svc)
- }
-
return reconcile.Result{}, a.maybeProvision(ctx, logger, svc)
}
@@ -150,145 +150,85 @@ func (a *ServiceReconciler) maybeCleanup(ctx context.Context, logger *zap.Sugare
// This function adds a finalizer to svc, ensuring that we can handle orderly
// deprovisioning later.
func (a *ServiceReconciler) maybeProvision(ctx context.Context, logger *zap.SugaredLogger, svc *corev1.Service) error {
- // Run for proxy config related validations here as opposed to running
- // them earlier. This is to prevent cleanup being blocked on a
- // misconfigured proxy param.
- if err := a.ssr.validate(); err != nil {
- msg := fmt.Sprintf("unable to provision proxy resources: invalid config: %v", err)
- a.recorder.Event(svc, corev1.EventTypeWarning, "INVALIDCONFIG", msg)
- a.logger.Error(msg)
- return nil
- }
- if violations := validateService(svc); len(violations) > 0 {
- msg := fmt.Sprintf("unable to provision proxy resources: invalid Service: %s", strings.Join(violations, ", "))
- a.recorder.Event(svc, corev1.EventTypeWarning, "INVALIDSERVCICE", msg)
- a.logger.Error(msg)
- return nil
- }
+ // Take a look at the Service
+ // If it is an ingress Service (expose annotation or load balancer)
+ // Add a record to the config map
- proxyClass := proxyClassForObject(svc)
- if proxyClass != "" {
- if ready, err := proxyClassIsReady(ctx, proxyClass, a.Client); err != nil {
- return fmt.Errorf("error verifying ProxyClass for Service: %w", err)
- } else if !ready {
- logger.Infof("ProxyClass %s specified for the Service, but is not (yet) Ready, waiting..", proxyClass)
- return nil
- }
- }
-
- hostname, err := nameForService(svc)
- if err != nil {
- return err
- }
-
- if !slices.Contains(svc.Finalizers, FinalizerName) {
- // This log line is printed exactly once during initial provisioning,
- // because once the finalizer is in place this block gets skipped. So,
- // this is a nice place to tell the operator that the high level,
- // multi-reconcile operation is underway.
- logger.Infof("exposing service over tailscale")
- svc.Finalizers = append(svc.Finalizers, FinalizerName)
- if err := a.Update(ctx, svc); err != nil {
- return fmt.Errorf("failed to add finalizer: %w", err)
- }
- }
- crl := childResourceLabels(svc.Name, svc.Namespace, "svc")
- var tags []string
- if tstr, ok := svc.Annotations[AnnotationTags]; ok {
- tags = strings.Split(tstr, ",")
- }
-
- sts := &tailscaleSTSConfig{
- ParentResourceName: svc.Name,
- ParentResourceUID: string(svc.UID),
- Hostname: hostname,
- Tags: tags,
- ChildResourceLabels: crl,
- ProxyClass: proxyClass,
- }
-
- a.mu.Lock()
- if a.shouldExposeClusterIP(svc) {
- sts.ClusterTargetIP = svc.Spec.ClusterIP
- a.managedIngressProxies.Add(svc.UID)
- gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len()))
- } else if a.shouldExposeDNSName(svc) {
- sts.ClusterTargetDNSName = svc.Spec.ExternalName
- a.managedIngressProxies.Add(svc.UID)
- gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len()))
- } else if ip := tailnetTargetAnnotation(svc); ip != "" {
- sts.TailnetTargetIP = ip
- a.managedEgressProxies.Add(svc.UID)
- gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len()))
- } else if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" {
- fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]
- if !strings.HasSuffix(fqdn, ".") {
- fqdn = fqdn + "."
- }
- sts.TailnetTargetFQDN = fqdn
- a.managedEgressProxies.Add(svc.UID)
- gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len()))
+ // This prototype only looks at ingress Services
+ if !a.shouldExpose(svc) {
+ return nil
}
- a.mu.Unlock()
- var hsvc *corev1.Service
- if hsvc, err = a.ssr.Provision(ctx, logger, sts); err != nil {
- return fmt.Errorf("failed to provision: %w", err)
+ // get clusterconfig
+ // Exactly one ClusterConfig needs to exist, else we don't proceed.
+ ccl := &tsapi.ClusterConfigList{}
+ if err := a.List(ctx, ccl); err != nil {
+ return fmt.Errorf("error listing ClusterConfigs: %w", err)
}
-
- if sts.TailnetTargetIP != "" || sts.TailnetTargetFQDN != "" {
- clusterDomain := retrieveClusterDomain(a.tsNamespace, logger)
- headlessSvcName := hsvc.Name + "." + hsvc.Namespace + ".svc." + clusterDomain
- if svc.Spec.ExternalName != headlessSvcName || svc.Spec.Type != corev1.ServiceTypeExternalName {
- svc.Spec.ExternalName = headlessSvcName
- svc.Spec.Selector = nil
- svc.Spec.Type = corev1.ServiceTypeExternalName
- if err := a.Update(ctx, svc); err != nil {
- return fmt.Errorf("failed to update service: %w", err)
- }
- }
+ if len(ccl.Items) < 1 {
+ logger.Info("got %d ClusterConfigs", len(ccl.Items))
return nil
}
-
- if !isTailscaleLoadBalancerService(svc, a.isDefaultLoadBalancer) {
- logger.Debugf("service is not a LoadBalancer, so not updating ingress")
+ if svc.Spec.ClusterIP == "" {
+ logger.Info("[unexpected] Service has no ClusterIP")
return nil
}
- _, tsHost, tsIPs, err := a.ssr.DeviceInfo(ctx, crl)
- if err != nil {
- return fmt.Errorf("failed to get device ID: %w", err)
+ cc := ccl.Items[0]
+ svcDNSName := a.fqdnsForSvc(svc, cc.Spec.Domain)
+ logger.Debugf("determined DNS name %s", svcDNSName)
+
+ // Get all ConfigMaps for all proxies
+ cmList := &corev1.ConfigMapList{}
+ if err := a.List(ctx, cmList); err != nil {
+ return fmt.Errorf("error listing proxy ConfigMaps: %w", err)
}
- if tsHost == "" {
- logger.Debugf("no Tailscale hostname known yet, waiting for proxy pod to finish auth")
- // No hostname yet. Wait for the proxy pod to auth.
- svc.Status.LoadBalancer.Ingress = nil
- if err := a.Status().Update(ctx, svc); err != nil {
- return fmt.Errorf("failed to update service status: %w", err)
+ for _, cm := range cmList.Items {
+ pcB := cm.BinaryData["proxyConfig"]
+ if len(pcB) == 0 {
+ a.logger.Info("[unexpected] ConfigMap %s does not contain proxyConfig", cm.Name)
+ continue
+ }
+ pc := &kubeutils.ProxyConfig{}
+ if err := json.Unmarshal(pcB, pc); err != nil {
+ return fmt.Errorf("error unmarshalling proxyconfig for proxy %s: %w", cm.Name, err)
+ }
+ // does it have the service name already?
+ if _, ok := pc.Services[svcDNSName]; ok {
+ logger.Debugf("service %s already configured for proxy %s; do nothing", svcDNSName, cm.Name)
+ // TODO: check if the record is correct
+ continue
}
- return nil
- }
- logger.Debugf("setting ingress to %q, %s", tsHost, strings.Join(tsIPs, ", "))
- ingress := []corev1.LoadBalancerIngress{
- {Hostname: tsHost},
- }
- clusterIPAddr, err := netip.ParseAddr(svc.Spec.ClusterIP)
- if err != nil {
- return fmt.Errorf("failed to parse cluster IP: %w", err)
- }
- for _, ip := range tsIPs {
- addr, err := netip.ParseAddr(ip)
+ // pick an IP
+ ip := unusedIPv4(pc.ServicesCIDRRange, pc.AddrsToDomain)
+ if pc.AddrsToDomain == nil {
+ pc.AddrsToDomain = &bart.Table[string]{}
+ }
+ pc.AddrsToDomain.Insert(netip.PrefixFrom(ip, ip.BitLen()), svcDNSName)
+ clusterIP, err := netip.ParseAddr(svc.Spec.ClusterIP)
if err != nil {
- continue
+ return fmt.Errorf("error marshalling Service Cluster IP %v: %w", svc.Spec.ClusterIP, err)
}
- if addr.Is4() == clusterIPAddr.Is4() { // only add addresses of the same family
- ingress = append(ingress, corev1.LoadBalancerIngress{IP: ip})
+ svcConfig := kubeutils.Service{
+ V4ServiceIPs: []netip.Addr{ip},
+ FQDN: svcDNSName,
+ Ingress: &kubeutils.Ingress{
+ Type: "tcp", // currently unused
+ V4Backends: []netip.Addr{clusterIP},
+ },
}
- }
- svc.Status.LoadBalancer.Ingress = ingress
- if err := a.Status().Update(ctx, svc); err != nil {
- return fmt.Errorf("failed to update service status: %w", err)
+ logger.Info("assigning Service IP %v to %s", ip, svcDNSName)
+ mak.Set(&pc.Services, svcDNSName, svcConfig)
+ pcB, err = json.Marshal(pc)
+ if err != nil {
+ return fmt.Errorf("error marshalling ConfigMap for proxy %s: %w", cm.Name, err)
+ }
+ mak.Set(&cm.BinaryData, "proxyConfig", pcB)
+ if err := a.Update(ctx, &cm); err != nil {
+ return fmt.Errorf("error updating ConfigMap %s: %w", cm.Name, err)
+ }
+ logger.Info("ConfigMap %s updated with a record for %s", cm.Name, svcDNSName)
}
return nil
}
@@ -320,6 +260,12 @@ func (a *ServiceReconciler) shouldExposeClusterIP(svc *corev1.Service) bool {
}
return isTailscaleLoadBalancerService(svc, a.isDefaultLoadBalancer) || hasExposeAnnotation(svc)
}
+func (a *ServiceReconciler) fqdnsForSvc(svc *corev1.Service, clusterDomain string) string {
+ if annot := svc.Annotations["tailscale.com/svc-name"]; annot != "" {
+ return annot + "." + clusterDomain
+ }
+ return svc.Name + "-" + svc.Namespace + "." + clusterDomain
+}
func isTailscaleLoadBalancerService(svc *corev1.Service, isDefaultLoadBalancer bool) bool {
return svc != nil &&
@@ -407,3 +353,39 @@ func clusterDomainFromResolverConf(conf *resolvconffile.Config, namespace string
logger.Infof("Cluster domain %q extracted from resolver config", probablyClusterDomain)
return probablyClusterDomain
}
+
+func unusedIPv4(serviceCIDR netip.Prefix, usedIPs *bart.Table[string]) netip.Addr {
+ ip := randV4(serviceCIDR)
+ if usedIPs == nil {
+ return ip // first IP being assigned
+ }
+ for serviceCIDR.Contains(ip) {
+ if !isIPUsed(ip, usedIPs) {
+ return ip
+ }
+ ip = ip.Next()
+ }
+ return netip.Addr{}
+}
+
+func isIPUsed(ip netip.Addr, usedIPs *bart.Table[string]) bool {
+ _, ok := usedIPs.Get(ip)
+ return ok
+}
+
+// randV4 returns a random IPv4 address within the given prefix.
+func randV4(maskedPfx netip.Prefix) netip.Addr {
+ bits := 32 - maskedPfx.Bits()
+ randBits := rand.Uint32N(1 << uint(bits))
+
+ ip4 := maskedPfx.Addr().As4()
+ pn := binary.BigEndian.Uint32(ip4[:])
+ binary.BigEndian.PutUint32(ip4[:], randBits|pn)
+ return netip.AddrFrom4(ip4)
+}
+
+// domainForIP returns the domain name assigned to the given IP address and
+// whether it was found.
+// func domainForIP(ip netip.Addr, serviceRecords ) (string, bool) {
+// return ps.addrToDomain.Get(ip)
+// }