diff options
Diffstat (limited to 'cmd/k8s-operator/proxygroup.go')
| -rw-r--r-- | cmd/k8s-operator/proxygroup.go | 148 |
1 files changed, 129 insertions, 19 deletions
diff --git a/cmd/k8s-operator/proxygroup.go b/cmd/k8s-operator/proxygroup.go index 13c3d7b71..126082643 100644 --- a/cmd/k8s-operator/proxygroup.go +++ b/cmd/k8s-operator/proxygroup.go @@ -16,10 +16,12 @@ import ( "sort" "strings" "sync" + "time" dockerref "github.com/distribution/reference" "go.uber.org/zap" xslices "golang.org/x/exp/slices" + "golang.org/x/time/rate" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -95,10 +97,11 @@ type ProxyGroupReconciler struct { defaultProxyClass string loginServer string - mu sync.Mutex // protects following - egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge - ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge - apiServerProxyGroups set.Slice[types.UID] // for kube-apiserver proxygroups gauge + mu sync.Mutex // protects following + egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge + ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge + apiServerProxyGroups set.Slice[types.UID] // for kube-apiserver proxygroups gauge + authKeyRateLimits map[string]*rate.Limiter // per-ProxyGroup rate limiters for auth key re-issuance. } func (r *ProxyGroupReconciler) logger(name string) *zap.SugaredLogger { @@ -300,7 +303,7 @@ func (r *ProxyGroupReconciler) validate(ctx context.Context, pg *tsapi.ProxyGrou func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (map[string][]netip.AddrPort, *notReadyReason, error) { logger := r.logger(pg.Name) r.mu.Lock() - r.ensureAddedToGaugeForProxyGroup(pg) + r.ensureStateAddedForProxyGroup(pg) r.mu.Unlock() svcToNodePorts := make(map[string]uint16) @@ -637,13 +640,13 @@ func (r *ProxyGroupReconciler) cleanupDanglingResources(ctx context.Context, tai } for _, m := range metadata { - if m.ordinal+1 <= int(pgReplicas(pg)) { + if m.ordinal+1 <= pgReplicas(pg) { continue } // Dangling resource, delete the config + state Secrets, as well as // deleting the device from the tailnet. - if err := r.deleteTailnetDevice(ctx, tailscaleClient, m.tsID, logger); err != nil { + if err := r.ensureDeviceDeleted(ctx, tailscaleClient, m.tsID, logger); err != nil { return err } if err := r.Delete(ctx, m.stateSecret); err != nil && !apierrors.IsNotFound(err) { @@ -695,7 +698,7 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, tailscaleClient } for _, m := range metadata { - if err := r.deleteTailnetDevice(ctx, tailscaleClient, m.tsID, logger); err != nil { + if err := r.ensureDeviceDeleted(ctx, tailscaleClient, m.tsID, logger); err != nil { return false, err } } @@ -711,12 +714,12 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, tailscaleClient logger.Infof("cleaned up ProxyGroup resources") r.mu.Lock() - r.ensureRemovedFromGaugeForProxyGroup(pg) + r.ensureStateRemovedForProxyGroup(pg) r.mu.Unlock() return true, nil } -func (r *ProxyGroupReconciler) deleteTailnetDevice(ctx context.Context, tailscaleClient tsClient, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error { +func (r *ProxyGroupReconciler) ensureDeviceDeleted(ctx context.Context, tailscaleClient tsClient, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error { logger.Debugf("deleting device %s from control", string(id)) if err := tailscaleClient.DeleteDevice(ctx, string(id)); err != nil { errResp := &tailscale.ErrResponse{} @@ -742,6 +745,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated( logger := r.logger(pg.Name) endpoints = make(map[string][]netip.AddrPort, pgReplicas(pg)) // keyed by Service name. for i := range pgReplicas(pg) { + logger = logger.With("Pod", fmt.Sprintf("%s-%d", pg.Name, i)) cfgSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: pgConfigSecretName(pg.Name, i), @@ -785,7 +789,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated( return nil, err } - if shouldRetainAuthKey(stateSecret) && existingCfgSecret != nil { + if deviceAuthed(stateSecret) && existingCfgSecret != nil { authKey, err = authKeyFromSecret(existingCfgSecret) if err != nil { return nil, fmt.Errorf("error retrieving auth key from existing config Secret: %w", err) @@ -931,6 +935,105 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated( return endpoints, nil } +// getAuthKey looks at the proxy's state and config Secrets, and may return: +// * a newly created auth key, +// * an existing auth key from the config Secret, +// * or nil if the device is authed. +// +// It will create a new auth key if the config Secret is not yet created, +// or if the proxy has set reissue_authkey in its state Secret. +func (r *ProxyGroupReconciler) getAuthKey(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, existingCfgSecret *corev1.Secret, ordinal int32, logger *zap.SugaredLogger) (*string, error) { + // Get state Secret to check if it's already authed or has requested + // a fresh auth key. + stateSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: pgStateSecretName(pg.Name, ordinal), + Namespace: r.tsNamespace, + }, + } + if err := r.Get(ctx, client.ObjectKeyFromObject(stateSecret), stateSecret); err != nil && !apierrors.IsNotFound(err) { + return nil, err + } + + var createAuthKey bool + var cfgAuthKey *string + if existingCfgSecret == nil { + createAuthKey = true + } else { + var err error + cfgAuthKey, err = authKeyFromSecret(existingCfgSecret) + if err != nil { + return nil, fmt.Errorf("error retrieving auth key from existing config Secret: %w", err) + } + } + + if shouldReissueAuthKey(stateSecret, cfgAuthKey) { + logger.Infof("Proxy is failing to auth; will attempt to clean up the old device if any found and issue a new auth key") + + // If the device is already authed, we want to delete it from the tailnet. + if tsID, ok := stateSecret.Data[kubetypes.KeyDeviceID]; ok && len(tsID) > 0 { + id := tailcfg.StableNodeID(tsID) + if err := r.ensureDeviceDeleted(ctx, tailscaleClient, id, logger); err != nil { + return nil, err + } + } + + if lim := r.authKeyRateLimits[pg.Name]; lim.Allow() { + createAuthKey = true + } else { + logger.Debugf("auth key re-issuance rate limit exceeded, limit: %.2f, burst: %d, tokens: %.2f", lim.Limit(), lim.Burst(), lim.Tokens()) + return nil, fmt.Errorf("auth key re-issuance rate limit exceeded for ProxyGroup %q, will retry with backoff", pg.Name) + } + } + + var authKey *string + if createAuthKey { + logger.Debugf("Creating auth key for ProxyGroup proxy") + + tags := pg.Spec.Tags.Stringify() + if len(tags) == 0 { + tags = r.defaultTags + } + key, err := newAuthKey(ctx, r.tsClient, tags) + if err != nil { + return nil, err + } + authKey = &key + } else if !deviceAuthed(stateSecret) { + // Retain auth key from existing config. + authKey = cfgAuthKey + } + + return authKey, nil +} + +// shouldReissueAuthKey extracts the value of reissue_authkey from the proxy's +// state Secret, and returns true if a new auth key is needed. The proxy will +// set the value of reissue_authkey to the auth key with which the it failed to +// auth, or empty if it didn't have an auth key in its config file. +func shouldReissueAuthKey(s *corev1.Secret, authKeyInConfig *string) bool { + // If the key exists but the value is empty, that means a previous reissue + // request got cleared. + brokenAuthkey, reissueRequested := s.Data[kubetypes.KeyReissueAuthkey] + if !reissueRequested { + return false + } + + // Reissue requested and no auth key in config, definitely reissue. + if authKeyInConfig == nil || *authKeyInConfig == "" { + return true + } + + // The auth key we were going to use is already reported broken, reissue. + if *authKeyInConfig == string(brokenAuthkey) { + return true + } + + // Make sure we don't reissue again if we happened to reconcile again before + // the proxy got a chance to auth with a reissued auth key. + return false +} + type FindStaticEndpointErr struct { msg string } @@ -1024,9 +1127,9 @@ func getStaticEndpointAddress(a *corev1.NodeAddress, port uint16) *netip.AddrPor return ptr.To(netip.AddrPortFrom(addr, port)) } -// ensureAddedToGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup -// is created. r.mu must be held. -func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGroup) { +// ensureStateAddedForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup +// is created, and initialises per-ProxyGroup rate limits on re-issuing auth keys. r.mu must be held. +func (r *ProxyGroupReconciler) ensureStateAddedForProxyGroup(pg *tsapi.ProxyGroup) { switch pg.Spec.Type { case tsapi.ProxyGroupTypeEgress: r.egressProxyGroups.Add(pg.UID) @@ -1038,11 +1141,17 @@ func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGr gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len())) gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len())) gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len())) + + if _, ok := r.authKeyRateLimits[pg.Name]; !ok { + // Allow every replica to have its auth key re-issued quickly the first + // time, but with an overall limit of 1 every 30s after a burst. + r.authKeyRateLimits[pg.Name] = rate.NewLimiter(rate.Every(30*time.Second), int(pgReplicas(pg))) + } } -// ensureRemovedFromGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource type is updated when the -// ProxyGroup is deleted. r.mu must be held. -func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.ProxyGroup) { +// ensureStateRemovedForProxyGroup ensures the gauge metric for the ProxyGroup resource type is updated when the +// ProxyGroup is deleted, and deletes the per-ProxyGroup rate limiter to free memory. r.mu must be held. +func (r *ProxyGroupReconciler) ensureStateRemovedForProxyGroup(pg *tsapi.ProxyGroup) { switch pg.Spec.Type { case tsapi.ProxyGroupTypeEgress: r.egressProxyGroups.Remove(pg.UID) @@ -1054,6 +1163,7 @@ func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.Pro gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len())) gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len())) gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len())) + delete(r.authKeyRateLimits, pg.Name) } func pgTailscaledConfig(pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass, idx int32, authKey *string, staticEndpoints []netip.AddrPort, oldAdvertiseServices []string, loginServer string) (tailscaledConfigs, error) { @@ -1114,7 +1224,7 @@ func (r *ProxyGroupReconciler) getNodeMetadata(ctx context.Context, pg *tsapi.Pr return nil, fmt.Errorf("failed to list state Secrets: %w", err) } for _, secret := range secrets.Items { - var ordinal int + var ordinal int32 if _, err := fmt.Sscanf(secret.Name, pg.Name+"-%d", &ordinal); err != nil { return nil, fmt.Errorf("unexpected secret %s was labelled as owned by the ProxyGroup %s: %w", secret.Name, pg.Name, err) } @@ -1198,7 +1308,7 @@ func (r *ProxyGroupReconciler) getRunningProxies(ctx context.Context, pg *tsapi. } type nodeMetadata struct { - ordinal int + ordinal int32 stateSecret *corev1.Secret podUID string // or empty if the Pod no longer exists. tsID tailcfg.StableNodeID |
