diff options
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/eks-nlb/README.md | 63 | ||||
| -rw-r--r-- | cmd/eks-nlb/deploy.yaml | 63 | ||||
| -rw-r--r-- | cmd/eks-nlb/example.yaml | 95 | ||||
| -rw-r--r-- | cmd/eks-nlb/main.go | 68 | ||||
| -rw-r--r-- | cmd/eks-nlb/reconciler.go | 372 |
5 files changed, 661 insertions, 0 deletions
diff --git a/cmd/eks-nlb/README.md b/cmd/eks-nlb/README.md new file mode 100644 index 000000000..3f4eb4907 --- /dev/null +++ b/cmd/eks-nlb/README.md @@ -0,0 +1,63 @@ +eks-nlb can be used to set up routing from an AWS NLB to wireguard port of Tailscale running in a Pod. + +### Pods must: + +- have tailscale.com/enlb-configmap annotation set to a ConfigMap that contains NLB ARN and the ID of the EKS cluster VPC +(see structure in example.yamls) + +- have TS_DEBUG_PRETENDPOINT env var set directly on 'tailscale' container config or provided via ConfigMap + +- have a container named 'tailscale' that runs tailscale + +- have wireguard port set to 41641 + +- have metrics exposed on port 9001 (temporary health check solution) + +## Deploy + +Deploy (in default namespace): + +1. Create a Secret with AWS creds + +```sh +kubectl create secret generic aws-creds --from-literal aws_access_key_id=<AWS_ACCESS_KEY_ID> \ +--from-literal aws_secret_access_key=<AWS_SECRET_ACCESS_KEY> +``` + +2. (Optional) Modify image in ./deploy.yaml + +3. Deploy: + +``` +$ kubectl apply -f ./deploy.yaml +``` + +## Usage example + +See an example manifest in ./example.yaml + +To use: +- deploy the controller +- create an NLB load balancer, set up security groups etc +- create a Secret with tailscale auth key +``` +kubectl create secret generic ts-creds --from-literal=authkey=<ts-auth-key> +``` +- populate 'eks-config' ConfigMap with NLB ARN and the VPC of the EKS cluster + +- poulate 'pretendpoint' ConfigMap with pairs of load balancer external IPs + port + + +For this, eks-nlb will ensure that the single replica is exposed on the port specified in via TS_DEBUG_PRETENDPOINT env var read from 'pretendpoint' ConfigMap on the load balancer whose ARN is passed via tailscale.com/awsnlbarn annotation to the StatefulSet. + +TODO: this flow is inconvenient. We should be able to make eks-nlb dynamically set TS_DEBUG_PRETENDPOINT once we can have tailscaled dynamically reloading its config. + +The controller will: + +- create a target group with the Pod IP routing traffic to 41641 and using 9001 as health check port + +- expose this target on the NLB via the port parsed from TS_DEBUG_PRETENDPOINT + +## Dev + +Build and push images with `REPO="<registry>/eksnlb" TAGS=<tags> make publishdeveksnlb` diff --git a/cmd/eks-nlb/deploy.yaml b/cmd/eks-nlb/deploy.yaml new file mode 100644 index 000000000..cd2fa07ac --- /dev/null +++ b/cmd/eks-nlb/deploy.yaml @@ -0,0 +1,63 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: eks-nlb + namespace: tailscale +spec: + replicas: 1 + selector: + matchLabels: + app: eks-nlb + template: + metadata: + labels: + app: eks-nlb + spec: + serviceAccountName: eks-nlb + containers: + - name: eks-nlb + image: gcr.io/csi-test-290908/eksnlb:v0.0.15 # this image is publicly available + env: + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + key: aws_access_key_id + name: aws-creds + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + key: aws_secret_access_key + name: aws-creds + - name: AWS_DEFAULT_REGION + value: eu-central-1 +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: eks-nlb +rules: + - apiGroups: [""] + resources: ["configmaps"] + verbs: ["get", "list", "update", "patch", "create", "watch"] + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: eks-nlb +subjects: + - kind: ServiceAccount + name: eks-nlb + namespace: tailscale +roleRef: + kind: ClusterRole + name: eks-nlb + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + namespace: tailscale + name: eks-nlb
\ No newline at end of file diff --git a/cmd/eks-nlb/example.yaml b/cmd/eks-nlb/example.yaml new file mode 100644 index 000000000..ff6a880c1 --- /dev/null +++ b/cmd/eks-nlb/example.yaml @@ -0,0 +1,95 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: tailscale + namespace: tailscale +spec: + replicas: 1 + selector: + matchLabels: + app: tailscale + template: + metadata: + labels: + app: tailscale + annotations: + tailscale.com/eksnlb-configmap: eks-config + spec: + serviceAccountName: tailscale + containers: + - name: tailscale + image: tailscale/tailscale:unstable + env: + - name: TS_AUTHKEY + valueFrom: + secretKeyRef: + name: ts-creds + key: authkey + - name: TS_KUBE_SECRET + value: tailscale-secret + - name: TS_HOSTNAME + value: eks-nlb-test + - name: TS_USERSPACE + value: "false" + - name: TS_TAILSCALED_EXTRA_ARGS + value: "--port=41641 --debug=0.0.0.0:9001" + - name: TS_DEBUG_PRETENDPOINT + valueFrom: + configMapKeyRef: + name: pretendpoint + key: pretendpoint + securityContext: + capabilities: + add: + - NET_ADMIN + resources: + limits: + memory: 64Mi + cpu: 10m +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: tailscale + namespace: tailscale +rules: + - apiGroups: [""] # "" indicates the core API group + resources: ["secrets"] + verbs: ["get", "update", "patch", "create"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: tailscale + namespace: tailscale +subjects: + - kind: ServiceAccount + name: tailscale + namespace: tailscale +roleRef: + kind: Role + name: tailscale + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: tailscale + namespace: tailscale +--- +apiVersion: v1 +data: + vpc_id: + lb_arn: +kind: ConfigMap +metadata: + name: eks-config + namespace: tailscale +--- +apiVersion: v1 +data: + pretendpoint: <lb-ip-1>:<port>,<lb-ip-2>:<port> +kind: ConfigMap +metadata: + name: pretendpoint + namespace: tailscale
\ No newline at end of file diff --git a/cmd/eks-nlb/main.go b/cmd/eks-nlb/main.go new file mode 100644 index 000000000..2597ff5ff --- /dev/null +++ b/cmd/eks-nlb/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "github.com/go-logr/zapr" + "go.uber.org/zap/zapcore" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" + logf "sigs.k8s.io/controller-runtime/pkg/log" + kzap "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + "tailscale.com/version" +) + +// TODO: add an option to configure namespaces to watch +const tsNamespace = "tailscale" + +func main() { + + zlog := kzap.NewRaw(kzap.Level(zapcore.DebugLevel)).Sugar() + logf.SetLogger(zapr.NewLogger(zlog.Desugar())) + + startLog := zlog.Named("startup") + + restConfig := config.GetConfigOrDie() + + nsFilter := cache.ByObject{ + Field: client.InNamespace(tsNamespace).AsSelector(), + } + mgrOpts := manager.Options{ + // TODO (irbekrm): stricter filtering what we watch/cache/call + // reconcilers on. c/r by default starts a watch on any + // resources that we GET via the controller manager's client. + Cache: cache.Options{ + ByObject: map[client.Object]cache.ByObject{ + &corev1.Pod{}: nsFilter, + &corev1.ConfigMap{}: nsFilter, + }, + }, + } + + mgr, err := manager.New(restConfig, mgrOpts) + if err != nil { + startLog.Fatalf("could not create manager: %v", err) + } + + // TODO: cache metadata only as else this will cache all Pods in cluster + // -> high memory consumption. + err = builder. + ControllerManagedBy(mgr). + Named("pods-reconciler"). + For(&corev1.Pod{}). + Complete(&podReconciler{ + logger: zlog.Named("pods-reconciler"), + Client: mgr.GetClient(), + }) + if err != nil { + startLog.Fatalf("could not create pods reconciler: %v", err) + } + + zlog.Infof("Startup complete, operator running, version: %s", version.Long()) + if err := mgr.Start(signals.SetupSignalHandler()); err != nil { + startLog.Fatalf("could not start manager: %v", err) + } +} diff --git a/cmd/eks-nlb/reconciler.go b/cmd/eks-nlb/reconciler.go new file mode 100644 index 000000000..53a2e931a --- /dev/null +++ b/cmd/eks-nlb/reconciler.go @@ -0,0 +1,372 @@ +package main + +import ( + "context" + "errors" + "fmt" + "math/rand" + "net" + "strconv" + "strings" + + "github.com/aws/aws-sdk-go-v2/config" + elb "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" + elbtypes "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "tailscale.com/types/ptr" +) + +const ( + eksNLBConfigAnnotation = "tailscale.com/eksnlb-configmap" + pretendpointEnvVar = "TS_DEBUG_PRETENDPOINT" + + wireguardPort int32 = 41641 + metricsPort string = "9001" +) + +type podReconciler struct { + client.Client + logger *zap.SugaredLogger +} + +type podConfig struct { + portFromEnv int32 + lbAddrsFromEnv []string + lbARN string + vpcID string + podLabels map[string]string + backendIP string // Pod IP +} + +func (pr *podReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) { + logger := pr.logger.With("pod-ns", req.Namespace, "pod-name", req.Name) + logger.Debugf("starting reconcile") + defer logger.Debugf("reconcile finished") + + pod := new(corev1.Pod) + err = pr.Get(ctx, req.NamespacedName, pod) + if apierrors.IsNotFound(err) { + logger.Debugf("Pod not found, assuming it was deleted") + return reconcile.Result{}, nil + } else if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get Pod: %w", err) + } + + if !pod.DeletionTimestamp.IsZero() { + logger.Debugf("Pod is being deleted; currently doing nothing") + // TODO: clean up load balancer resources + return reconcile.Result{}, nil + } + + if pod.Annotations[eksNLBConfigAnnotation] == "" { + logger.Debugf("Pod does not have %s annotation, do nothing", eksNLBConfigAnnotation) + return res, nil + // TODO: clean up if removed + } + + // TODO: validate Pod config + + // TODO: add a finalizer + + // Parse Pod config + pc, err := pr.parseClusterConfig(ctx, pod) + if err != nil { + return res, fmt.Errorf("error parsing Pod config: %w", err) + } + if pc.backendIP == "" { + logger.Info("[unexpected] Pod does not have an IP address allocated, waiting...") + return res, nil + } + + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return res, fmt.Errorf("unable to load SDK config, %v", err) + } + cl := elb.NewFromConfig(cfg) + + resourceName := fmt.Sprintf("%s-%s", pod.Name, pod.Namespace) + + tgci := elb.CreateTargetGroupInput{ + VpcId: &pc.vpcID, + Name: &resourceName, + HealthCheckEnabled: ptr.To(true), // TODO: internal pointer + HealthCheckPort: ptr.To(metricsPort), + HealthCheckProtocol: "TCP", + // TODO: other health check params + // IpAddressType: "ipv4", // TODO: determine from Pod IP + Port: ptr.To(wireguardPort), + Protocol: "UDP", + TargetType: elbtypes.TargetTypeEnumIp, + } + // CreateTargetGroup is idempotent + tgco, err := cl.CreateTargetGroup(ctx, &tgci) + if err != nil { + return res, fmt.Errorf("error creating target group %q", err) + } + if len(tgco.TargetGroups) == 0 { + logger.Debugf("No target groups found after creation, waiting...") + return res, nil + } + // Loop over and look up matching IP addresses + var tg *elbtypes.TargetGroup + for _, maybeTG := range tgco.TargetGroups { + if strings.EqualFold(*maybeTG.TargetGroupName, resourceName) { + logger.Debugf("found target group %s", resourceName) + tg = &maybeTG + // TODO: verify ports etc + } + } + if tg == nil { + logger.Infof("[unexpected] target group not found") + return res, nil + } + + if tg.TargetGroupArn == nil { + logger.Infof("[unexpected] target group %+#v has no ARN", tg) + return res, nil + } + logger.Debugf("found target group %v", tg.TargetGroupArn) + + // List targets + hi := elb.DescribeTargetHealthInput{TargetGroupArn: tg.TargetGroupArn} + ho, err := cl.DescribeTargetHealth(ctx, &hi) + if err != nil { + return res, fmt.Errorf("error describing target health: %w", err) + } + var targetExists bool + for _, health := range ho.TargetHealthDescriptions { + if health.Target.Id == &pc.backendIP { + logger.Debugf("Target found %#+v", health.Target) + targetExists = true + } else { + // TODO: Deregister the target + logger.Debugf("Found target that should be deregistered: %#+v", health.Target) + } + } + if !targetExists { + logger.Debugf("target for %v does not exist, creating...", pc.backendIP) + target := elb.RegisterTargetsInput{TargetGroupArn: tg.TargetGroupArn, Targets: []elbtypes.TargetDescription{ + {Id: ptr.To(pc.backendIP), Port: ptr.To(wireguardPort)}, + }} + _, err := cl.RegisterTargets(ctx, &target) + if err != nil { + return res, fmt.Errorf("error registering target: %w", err) + } + } + + li := elb.DescribeListenersInput{LoadBalancerArn: &pc.lbARN} + lo, err := cl.DescribeListeners(ctx, &li) + if err != nil { + return res, fmt.Errorf("error listing listeners: %w", err) + } + var lis *elbtypes.Listener + port := pc.portFromEnv + if port != 0 { + for _, l := range lo.Listeners { + if l.Port == &pc.portFromEnv { + logger.Debugf("found existing listener on port %q", pc.portFromEnv) + lis = &l + } + } + } else { + // figure out a free port + searchFreePort := true + for searchFreePort { + suggestPort := int32(rand.Intn(65535)) // 1 - 65335 + found := false + for _, l := range lo.Listeners { + if l.Port == &suggestPort { + found = true + break + } + } + if !found { + port = suggestPort + searchFreePort = false + } + } + if port == 0 { + return res, fmt.Errorf("unable to find a free port to expose on the listener: %w", err) + } + } + for _, maybeLB := range lo.Listeners { + if maybeLB.Port == ptr.To(port) { + logger.Debugf("Found listener for port %v", port) + lis = &maybeLB + break + } + } + + if lis == nil { + logger.Infof("listener for port %v not found, creating", port) + lci := elb.CreateListenerInput{ + LoadBalancerArn: &pc.lbARN, + Port: ptr.To(port), + Protocol: "UDP", + DefaultActions: []elbtypes.Action{ + {TargetGroupArn: tg.TargetGroupArn, Type: elbtypes.ActionTypeEnumForward}, + }, + } + lco, err := cl.CreateListener(ctx, &lci) + if err != nil { + return res, fmt.Errorf("error creating listener: %w", err) + } + logger.Infof("created listener with arn: %v", lco.Listeners[0].ListenerArn) + } + + dli := elb.DescribeLoadBalancersInput{LoadBalancerArns: []string{pc.lbARN}} + dlo, err := cl.DescribeLoadBalancers(ctx, &dli) + if len(dlo.LoadBalancers) != 1 { + return res, fmt.Errorf("expected exactly 1 NLB with ARN %s, got %d", pc.lbARN, len(dlo.LoadBalancers)) + } + lb := dlo.LoadBalancers[0] + addrs := make([]string, 0) + for _, z := range lb.AvailabilityZones { + for _, a := range z.LoadBalancerAddresses { + addrs = append(addrs, *a.IpAddress) // IPv6? + } + } + if err := pr.ensurePretendPointUpToDate(ctx, pod, port, addrs); err != nil { + return res, fmt.Errorf("error ensuring TS_DEBUG_PRETENDPOINT value is up to date: %w", err) + } + return reconcile.Result{}, nil +} + +func (pr *podReconciler) ensurePretendPointUpToDate(ctx context.Context, p *corev1.Pod, port int32, addrs []string) error { + var cont *corev1.Container + for _, c := range p.Spec.Containers { + if c.Name == "tailscale" { + cont = &c + break + } + } + if cont == nil { + return errors.New("pod does not have a 'tailscale' container") + } + + // calculate value + addrPorts := make([]string, 0) + + for _, a := range addrs { + addrPorts = append(addrPorts, strings.Join([]string{a, string(port)}, ",")) + } + pretendpoint := strings.Join(addrPorts, ",") + + for _, envVar := range cont.Env { + if envVar.Name == pretendpointEnvVar { + if envVar.Value != "" { + // TODO: log an error out if this is not up to date + pr.logger.Infof("env var set, do nothing") + return nil + } else if cmConfig := envVar.ValueFrom.ConfigMapKeyRef; cmConfig != nil { + cm := &corev1.ConfigMap{} + n := types.NamespacedName{Name: cmConfig.Name, Namespace: p.Namespace} + err := pr.Get(ctx, n, cm) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("error retrieving ConfigMap: %w", err) + } + if apierrors.IsNotFound(err) { + pr.logger.Infof("Creating ConfigMap") + cm := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: cmConfig.Name}, + Data: map[string]string{cmConfig.Key: pretendpoint}} + return pr.Create(ctx, cm) + + } + if cm.Data[cmConfig.Key] != pretendpoint { + pr.logger.Infof("Updating ConfigMap with wireguard endpoints value: %v", pretendpoint) + cm.Data[cmConfig.Key] = pretendpoint + return pr.Update(ctx, cm) + } + } + } + } + return nil +} + +func (pr *podReconciler) parseClusterConfig(ctx context.Context, p *corev1.Pod) (*podConfig, error) { + + var cont *corev1.Container + for _, c := range p.Spec.Containers { + if c.Name == "tailscale" { + cont = &c + break + } + } + if cont == nil { + return nil, errors.New("pod does not have a 'tailscale' container") + } + var pretendpoint string + for _, envVar := range cont.Env { + if envVar.Name == pretendpointEnvVar { + if envVar.Value != "" { + pretendpoint = envVar.Value + } else if cmConfig := envVar.ValueFrom.ConfigMapKeyRef; cmConfig != nil { + // Get the configmap + // Read the value if exists + cm := &corev1.ConfigMap{} + n := types.NamespacedName{Name: cmConfig.Name, Namespace: p.Namespace} + err := pr.Get(ctx, n, cm) + if apierrors.IsNotFound(err) { + pr.logger.Info("ConfigMap %s does not exist, it will be created") + } else if err != nil { + return nil, fmt.Errorf("error retrieving ConfigMap: %w", err) + } else if cm.Data[cmConfig.Key] != "" { + pretendpoint = cm.Data[cmConfig.Key] + pr.logger.Infof("read wireguard endoints for ConfigMap: %v", pretendpoint) + } + } + break + } + } + if pretendpoint == "" { + return nil, nil + } + addrs := strings.Split(pretendpoint, ",") + + var maybePort string + var lbAddrs []string + for _, a := range addrs { + h, port, err := net.SplitHostPort(a) + if err != nil { + return nil, fmt.Errorf("error splitting host port: %v", err) + } + // if the ports are not the same, there is probably some issue, recreate the listener + if maybePort != "" && maybePort != port { + return nil, nil + } + maybePort = port + lbAddrs = append(lbAddrs, h) + } + port, err := strconv.ParseInt(maybePort, 10, 32) + if err != nil { + return nil, fmt.Errorf("error parsing port %q as int: %w", maybePort, err) + } + + cm := &corev1.ConfigMap{} + if err := pr.Get(ctx, types.NamespacedName{Namespace: p.Namespace, Name: p.Annotations[eksNLBConfigAnnotation]}, cm); err != nil { + return nil, fmt.Errorf("ConfigMap %s not found", eksNLBConfigAnnotation) + } + vpcID := cm.Data["vpc_id"] + if vpcID == "" { + return nil, fmt.Errorf("vpc_id field not set for %s ConfigMap", eksNLBConfigAnnotation) + } + lbARN := cm.Data["lb_arn"] + if lbARN == "" { + return nil, fmt.Errorf("lb_arn not set for %s ConfigMap", eksNLBConfigAnnotation) + } + + return &podConfig{ + portFromEnv: int32(port), + lbAddrsFromEnv: lbAddrs, + vpcID: vpcID, + lbARN: lbARN, + podLabels: p.Labels, + backendIP: p.Status.PodIP, + }, nil +} |
