diff options
Diffstat (limited to 'cmd/k8s-operator/svc.go')
| -rw-r--r-- | cmd/k8s-operator/svc.go | 248 |
1 files changed, 115 insertions, 133 deletions
diff --git a/cmd/k8s-operator/svc.go b/cmd/k8s-operator/svc.go index 363c1c8e3..295f00184 100644 --- a/cmd/k8s-operator/svc.go +++ b/cmd/k8s-operator/svc.go @@ -7,12 +7,16 @@ package main import ( "context" + "encoding/binary" + "encoding/json" "fmt" + "math/rand/v2" "net/netip" "slices" "strings" "sync" + "github.com/gaissmai/bart" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -20,16 +24,19 @@ import ( "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + kubeutils "tailscale.com/k8s-operator" tsoperator "tailscale.com/k8s-operator" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" "tailscale.com/net/dns/resolvconffile" "tailscale.com/util/clientmetric" + "tailscale.com/util/mak" "tailscale.com/util/set" ) const ( - resolvConfPath = "/etc/resolv.conf" - defaultClusterDomain = "cluster.local" + resolvConfPath = "/etc/resolv.conf" + defaultClusterDomain = "cluster.local" + serviceDNSNameAnnotation = "tailscale.com/service-dns-name" ) type ServiceReconciler struct { @@ -90,13 +97,6 @@ func (a *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request } else if err != nil { return reconcile.Result{}, fmt.Errorf("failed to get svc: %w", err) } - targetIP := tailnetTargetAnnotation(svc) - targetFQDN := svc.Annotations[AnnotationTailnetTargetFQDN] - if !svc.DeletionTimestamp.IsZero() || !a.shouldExpose(svc) && targetIP == "" && targetFQDN == "" { - logger.Debugf("service is being deleted or is (no longer) referring to Tailscale ingress/egress, ensuring any created resources are cleaned up") - return reconcile.Result{}, a.maybeCleanup(ctx, logger, svc) - } - return reconcile.Result{}, a.maybeProvision(ctx, logger, svc) } @@ -150,145 +150,85 @@ func (a *ServiceReconciler) maybeCleanup(ctx context.Context, logger *zap.Sugare // This function adds a finalizer to svc, ensuring that we can handle orderly // deprovisioning later. func (a *ServiceReconciler) maybeProvision(ctx context.Context, logger *zap.SugaredLogger, svc *corev1.Service) error { - // Run for proxy config related validations here as opposed to running - // them earlier. This is to prevent cleanup being blocked on a - // misconfigured proxy param. - if err := a.ssr.validate(); err != nil { - msg := fmt.Sprintf("unable to provision proxy resources: invalid config: %v", err) - a.recorder.Event(svc, corev1.EventTypeWarning, "INVALIDCONFIG", msg) - a.logger.Error(msg) - return nil - } - if violations := validateService(svc); len(violations) > 0 { - msg := fmt.Sprintf("unable to provision proxy resources: invalid Service: %s", strings.Join(violations, ", ")) - a.recorder.Event(svc, corev1.EventTypeWarning, "INVALIDSERVCICE", msg) - a.logger.Error(msg) - return nil - } + // Take a look at the Service + // If it is an ingress Service (expose annotation or load balancer) + // Add a record to the config map - proxyClass := proxyClassForObject(svc) - if proxyClass != "" { - if ready, err := proxyClassIsReady(ctx, proxyClass, a.Client); err != nil { - return fmt.Errorf("error verifying ProxyClass for Service: %w", err) - } else if !ready { - logger.Infof("ProxyClass %s specified for the Service, but is not (yet) Ready, waiting..", proxyClass) - return nil - } - } - - hostname, err := nameForService(svc) - if err != nil { - return err - } - - if !slices.Contains(svc.Finalizers, FinalizerName) { - // This log line is printed exactly once during initial provisioning, - // because once the finalizer is in place this block gets skipped. So, - // this is a nice place to tell the operator that the high level, - // multi-reconcile operation is underway. - logger.Infof("exposing service over tailscale") - svc.Finalizers = append(svc.Finalizers, FinalizerName) - if err := a.Update(ctx, svc); err != nil { - return fmt.Errorf("failed to add finalizer: %w", err) - } - } - crl := childResourceLabels(svc.Name, svc.Namespace, "svc") - var tags []string - if tstr, ok := svc.Annotations[AnnotationTags]; ok { - tags = strings.Split(tstr, ",") - } - - sts := &tailscaleSTSConfig{ - ParentResourceName: svc.Name, - ParentResourceUID: string(svc.UID), - Hostname: hostname, - Tags: tags, - ChildResourceLabels: crl, - ProxyClass: proxyClass, - } - - a.mu.Lock() - if a.shouldExposeClusterIP(svc) { - sts.ClusterTargetIP = svc.Spec.ClusterIP - a.managedIngressProxies.Add(svc.UID) - gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len())) - } else if a.shouldExposeDNSName(svc) { - sts.ClusterTargetDNSName = svc.Spec.ExternalName - a.managedIngressProxies.Add(svc.UID) - gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len())) - } else if ip := tailnetTargetAnnotation(svc); ip != "" { - sts.TailnetTargetIP = ip - a.managedEgressProxies.Add(svc.UID) - gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len())) - } else if fqdn := svc.Annotations[AnnotationTailnetTargetFQDN]; fqdn != "" { - fqdn := svc.Annotations[AnnotationTailnetTargetFQDN] - if !strings.HasSuffix(fqdn, ".") { - fqdn = fqdn + "." - } - sts.TailnetTargetFQDN = fqdn - a.managedEgressProxies.Add(svc.UID) - gaugeEgressProxies.Set(int64(a.managedEgressProxies.Len())) + // This prototype only looks at ingress Services + if !a.shouldExpose(svc) { + return nil } - a.mu.Unlock() - var hsvc *corev1.Service - if hsvc, err = a.ssr.Provision(ctx, logger, sts); err != nil { - return fmt.Errorf("failed to provision: %w", err) + // get clusterconfig + // Exactly one ClusterConfig needs to exist, else we don't proceed. + ccl := &tsapi.ClusterConfigList{} + if err := a.List(ctx, ccl); err != nil { + return fmt.Errorf("error listing ClusterConfigs: %w", err) } - - if sts.TailnetTargetIP != "" || sts.TailnetTargetFQDN != "" { - clusterDomain := retrieveClusterDomain(a.tsNamespace, logger) - headlessSvcName := hsvc.Name + "." + hsvc.Namespace + ".svc." + clusterDomain - if svc.Spec.ExternalName != headlessSvcName || svc.Spec.Type != corev1.ServiceTypeExternalName { - svc.Spec.ExternalName = headlessSvcName - svc.Spec.Selector = nil - svc.Spec.Type = corev1.ServiceTypeExternalName - if err := a.Update(ctx, svc); err != nil { - return fmt.Errorf("failed to update service: %w", err) - } - } + if len(ccl.Items) < 1 { + logger.Info("got %d ClusterConfigs", len(ccl.Items)) return nil } - - if !isTailscaleLoadBalancerService(svc, a.isDefaultLoadBalancer) { - logger.Debugf("service is not a LoadBalancer, so not updating ingress") + if svc.Spec.ClusterIP == "" { + logger.Info("[unexpected] Service has no ClusterIP") return nil } - _, tsHost, tsIPs, err := a.ssr.DeviceInfo(ctx, crl) - if err != nil { - return fmt.Errorf("failed to get device ID: %w", err) + cc := ccl.Items[0] + svcDNSName := a.fqdnsForSvc(svc, cc.Spec.Domain) + logger.Debugf("determined DNS name %s", svcDNSName) + + // Get all ConfigMaps for all proxies + cmList := &corev1.ConfigMapList{} + if err := a.List(ctx, cmList); err != nil { + return fmt.Errorf("error listing proxy ConfigMaps: %w", err) } - if tsHost == "" { - logger.Debugf("no Tailscale hostname known yet, waiting for proxy pod to finish auth") - // No hostname yet. Wait for the proxy pod to auth. - svc.Status.LoadBalancer.Ingress = nil - if err := a.Status().Update(ctx, svc); err != nil { - return fmt.Errorf("failed to update service status: %w", err) + for _, cm := range cmList.Items { + pcB := cm.BinaryData["proxyConfig"] + if len(pcB) == 0 { + a.logger.Info("[unexpected] ConfigMap %s does not contain proxyConfig", cm.Name) + continue + } + pc := &kubeutils.ProxyConfig{} + if err := json.Unmarshal(pcB, pc); err != nil { + return fmt.Errorf("error unmarshalling proxyconfig for proxy %s: %w", cm.Name, err) + } + // does it have the service name already? + if _, ok := pc.Services[svcDNSName]; ok { + logger.Debugf("service %s already configured for proxy %s; do nothing", svcDNSName, cm.Name) + // TODO: check if the record is correct + continue } - return nil - } - logger.Debugf("setting ingress to %q, %s", tsHost, strings.Join(tsIPs, ", ")) - ingress := []corev1.LoadBalancerIngress{ - {Hostname: tsHost}, - } - clusterIPAddr, err := netip.ParseAddr(svc.Spec.ClusterIP) - if err != nil { - return fmt.Errorf("failed to parse cluster IP: %w", err) - } - for _, ip := range tsIPs { - addr, err := netip.ParseAddr(ip) + // pick an IP + ip := unusedIPv4(pc.ServicesCIDRRange, pc.AddrsToDomain) + if pc.AddrsToDomain == nil { + pc.AddrsToDomain = &bart.Table[string]{} + } + pc.AddrsToDomain.Insert(netip.PrefixFrom(ip, ip.BitLen()), svcDNSName) + clusterIP, err := netip.ParseAddr(svc.Spec.ClusterIP) if err != nil { - continue + return fmt.Errorf("error marshalling Service Cluster IP %v: %w", svc.Spec.ClusterIP, err) } - if addr.Is4() == clusterIPAddr.Is4() { // only add addresses of the same family - ingress = append(ingress, corev1.LoadBalancerIngress{IP: ip}) + svcConfig := kubeutils.Service{ + V4ServiceIPs: []netip.Addr{ip}, + FQDN: svcDNSName, + Ingress: &kubeutils.Ingress{ + Type: "tcp", // currently unused + V4Backends: []netip.Addr{clusterIP}, + }, } - } - svc.Status.LoadBalancer.Ingress = ingress - if err := a.Status().Update(ctx, svc); err != nil { - return fmt.Errorf("failed to update service status: %w", err) + logger.Info("assigning Service IP %v to %s", ip, svcDNSName) + mak.Set(&pc.Services, svcDNSName, svcConfig) + pcB, err = json.Marshal(pc) + if err != nil { + return fmt.Errorf("error marshalling ConfigMap for proxy %s: %w", cm.Name, err) + } + mak.Set(&cm.BinaryData, "proxyConfig", pcB) + if err := a.Update(ctx, &cm); err != nil { + return fmt.Errorf("error updating ConfigMap %s: %w", cm.Name, err) + } + logger.Info("ConfigMap %s updated with a record for %s", cm.Name, svcDNSName) } return nil } @@ -320,6 +260,12 @@ func (a *ServiceReconciler) shouldExposeClusterIP(svc *corev1.Service) bool { } return isTailscaleLoadBalancerService(svc, a.isDefaultLoadBalancer) || hasExposeAnnotation(svc) } +func (a *ServiceReconciler) fqdnsForSvc(svc *corev1.Service, clusterDomain string) string { + if annot := svc.Annotations["tailscale.com/svc-name"]; annot != "" { + return annot + "." + clusterDomain + } + return svc.Name + "-" + svc.Namespace + "." + clusterDomain +} func isTailscaleLoadBalancerService(svc *corev1.Service, isDefaultLoadBalancer bool) bool { return svc != nil && @@ -407,3 +353,39 @@ func clusterDomainFromResolverConf(conf *resolvconffile.Config, namespace string logger.Infof("Cluster domain %q extracted from resolver config", probablyClusterDomain) return probablyClusterDomain } + +func unusedIPv4(serviceCIDR netip.Prefix, usedIPs *bart.Table[string]) netip.Addr { + ip := randV4(serviceCIDR) + if usedIPs == nil { + return ip // first IP being assigned + } + for serviceCIDR.Contains(ip) { + if !isIPUsed(ip, usedIPs) { + return ip + } + ip = ip.Next() + } + return netip.Addr{} +} + +func isIPUsed(ip netip.Addr, usedIPs *bart.Table[string]) bool { + _, ok := usedIPs.Get(ip) + return ok +} + +// randV4 returns a random IPv4 address within the given prefix. +func randV4(maskedPfx netip.Prefix) netip.Addr { + bits := 32 - maskedPfx.Bits() + randBits := rand.Uint32N(1 << uint(bits)) + + ip4 := maskedPfx.Addr().As4() + pn := binary.BigEndian.Uint32(ip4[:]) + binary.BigEndian.PutUint32(ip4[:], randBits|pn) + return netip.AddrFrom4(ip4) +} + +// domainForIP returns the domain name assigned to the given IP address and +// whether it was found. +// func domainForIP(ip netip.Addr, serviceRecords ) (string, bool) { +// return ps.addrToDomain.Get(ip) +// } |
