summaryrefslogtreecommitdiffhomepage
path: root/cmd/k8s-operator/proxygroup.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/k8s-operator/proxygroup.go')
-rw-r--r--cmd/k8s-operator/proxygroup.go148
1 files changed, 129 insertions, 19 deletions
diff --git a/cmd/k8s-operator/proxygroup.go b/cmd/k8s-operator/proxygroup.go
index 13c3d7b71..126082643 100644
--- a/cmd/k8s-operator/proxygroup.go
+++ b/cmd/k8s-operator/proxygroup.go
@@ -16,10 +16,12 @@ import (
"sort"
"strings"
"sync"
+ "time"
dockerref "github.com/distribution/reference"
"go.uber.org/zap"
xslices "golang.org/x/exp/slices"
+ "golang.org/x/time/rate"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
@@ -95,10 +97,11 @@ type ProxyGroupReconciler struct {
defaultProxyClass string
loginServer string
- mu sync.Mutex // protects following
- egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge
- ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge
- apiServerProxyGroups set.Slice[types.UID] // for kube-apiserver proxygroups gauge
+ mu sync.Mutex // protects following
+ egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge
+ ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge
+ apiServerProxyGroups set.Slice[types.UID] // for kube-apiserver proxygroups gauge
+ authKeyRateLimits map[string]*rate.Limiter // per-ProxyGroup rate limiters for auth key re-issuance.
}
func (r *ProxyGroupReconciler) logger(name string) *zap.SugaredLogger {
@@ -300,7 +303,7 @@ func (r *ProxyGroupReconciler) validate(ctx context.Context, pg *tsapi.ProxyGrou
func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (map[string][]netip.AddrPort, *notReadyReason, error) {
logger := r.logger(pg.Name)
r.mu.Lock()
- r.ensureAddedToGaugeForProxyGroup(pg)
+ r.ensureStateAddedForProxyGroup(pg)
r.mu.Unlock()
svcToNodePorts := make(map[string]uint16)
@@ -637,13 +640,13 @@ func (r *ProxyGroupReconciler) cleanupDanglingResources(ctx context.Context, tai
}
for _, m := range metadata {
- if m.ordinal+1 <= int(pgReplicas(pg)) {
+ if m.ordinal+1 <= pgReplicas(pg) {
continue
}
// Dangling resource, delete the config + state Secrets, as well as
// deleting the device from the tailnet.
- if err := r.deleteTailnetDevice(ctx, tailscaleClient, m.tsID, logger); err != nil {
+ if err := r.ensureDeviceDeleted(ctx, tailscaleClient, m.tsID, logger); err != nil {
return err
}
if err := r.Delete(ctx, m.stateSecret); err != nil && !apierrors.IsNotFound(err) {
@@ -695,7 +698,7 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, tailscaleClient
}
for _, m := range metadata {
- if err := r.deleteTailnetDevice(ctx, tailscaleClient, m.tsID, logger); err != nil {
+ if err := r.ensureDeviceDeleted(ctx, tailscaleClient, m.tsID, logger); err != nil {
return false, err
}
}
@@ -711,12 +714,12 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, tailscaleClient
logger.Infof("cleaned up ProxyGroup resources")
r.mu.Lock()
- r.ensureRemovedFromGaugeForProxyGroup(pg)
+ r.ensureStateRemovedForProxyGroup(pg)
r.mu.Unlock()
return true, nil
}
-func (r *ProxyGroupReconciler) deleteTailnetDevice(ctx context.Context, tailscaleClient tsClient, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error {
+func (r *ProxyGroupReconciler) ensureDeviceDeleted(ctx context.Context, tailscaleClient tsClient, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error {
logger.Debugf("deleting device %s from control", string(id))
if err := tailscaleClient.DeleteDevice(ctx, string(id)); err != nil {
errResp := &tailscale.ErrResponse{}
@@ -742,6 +745,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(
logger := r.logger(pg.Name)
endpoints = make(map[string][]netip.AddrPort, pgReplicas(pg)) // keyed by Service name.
for i := range pgReplicas(pg) {
+ logger = logger.With("Pod", fmt.Sprintf("%s-%d", pg.Name, i))
cfgSecret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pgConfigSecretName(pg.Name, i),
@@ -785,7 +789,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(
return nil, err
}
- if shouldRetainAuthKey(stateSecret) && existingCfgSecret != nil {
+ if deviceAuthed(stateSecret) && existingCfgSecret != nil {
authKey, err = authKeyFromSecret(existingCfgSecret)
if err != nil {
return nil, fmt.Errorf("error retrieving auth key from existing config Secret: %w", err)
@@ -931,6 +935,105 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(
return endpoints, nil
}
+// getAuthKey looks at the proxy's state and config Secrets, and may return:
+// * a newly created auth key,
+// * an existing auth key from the config Secret,
+// * or nil if the device is authed.
+//
+// It will create a new auth key if the config Secret is not yet created,
+// or if the proxy has set reissue_authkey in its state Secret.
+func (r *ProxyGroupReconciler) getAuthKey(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, existingCfgSecret *corev1.Secret, ordinal int32, logger *zap.SugaredLogger) (*string, error) {
+ // Get state Secret to check if it's already authed or has requested
+ // a fresh auth key.
+ stateSecret := &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: pgStateSecretName(pg.Name, ordinal),
+ Namespace: r.tsNamespace,
+ },
+ }
+ if err := r.Get(ctx, client.ObjectKeyFromObject(stateSecret), stateSecret); err != nil && !apierrors.IsNotFound(err) {
+ return nil, err
+ }
+
+ var createAuthKey bool
+ var cfgAuthKey *string
+ if existingCfgSecret == nil {
+ createAuthKey = true
+ } else {
+ var err error
+ cfgAuthKey, err = authKeyFromSecret(existingCfgSecret)
+ if err != nil {
+ return nil, fmt.Errorf("error retrieving auth key from existing config Secret: %w", err)
+ }
+ }
+
+ if shouldReissueAuthKey(stateSecret, cfgAuthKey) {
+ logger.Infof("Proxy is failing to auth; will attempt to clean up the old device if any found and issue a new auth key")
+
+ // If the device is already authed, we want to delete it from the tailnet.
+ if tsID, ok := stateSecret.Data[kubetypes.KeyDeviceID]; ok && len(tsID) > 0 {
+ id := tailcfg.StableNodeID(tsID)
+ if err := r.ensureDeviceDeleted(ctx, tailscaleClient, id, logger); err != nil {
+ return nil, err
+ }
+ }
+
+ if lim := r.authKeyRateLimits[pg.Name]; lim.Allow() {
+ createAuthKey = true
+ } else {
+ logger.Debugf("auth key re-issuance rate limit exceeded, limit: %.2f, burst: %d, tokens: %.2f", lim.Limit(), lim.Burst(), lim.Tokens())
+ return nil, fmt.Errorf("auth key re-issuance rate limit exceeded for ProxyGroup %q, will retry with backoff", pg.Name)
+ }
+ }
+
+ var authKey *string
+ if createAuthKey {
+ logger.Debugf("Creating auth key for ProxyGroup proxy")
+
+ tags := pg.Spec.Tags.Stringify()
+ if len(tags) == 0 {
+ tags = r.defaultTags
+ }
+ key, err := newAuthKey(ctx, r.tsClient, tags)
+ if err != nil {
+ return nil, err
+ }
+ authKey = &key
+ } else if !deviceAuthed(stateSecret) {
+ // Retain auth key from existing config.
+ authKey = cfgAuthKey
+ }
+
+ return authKey, nil
+}
+
+// shouldReissueAuthKey extracts the value of reissue_authkey from the proxy's
+// state Secret, and returns true if a new auth key is needed. The proxy will
+// set the value of reissue_authkey to the auth key with which the it failed to
+// auth, or empty if it didn't have an auth key in its config file.
+func shouldReissueAuthKey(s *corev1.Secret, authKeyInConfig *string) bool {
+ // If the key exists but the value is empty, that means a previous reissue
+ // request got cleared.
+ brokenAuthkey, reissueRequested := s.Data[kubetypes.KeyReissueAuthkey]
+ if !reissueRequested {
+ return false
+ }
+
+ // Reissue requested and no auth key in config, definitely reissue.
+ if authKeyInConfig == nil || *authKeyInConfig == "" {
+ return true
+ }
+
+ // The auth key we were going to use is already reported broken, reissue.
+ if *authKeyInConfig == string(brokenAuthkey) {
+ return true
+ }
+
+ // Make sure we don't reissue again if we happened to reconcile again before
+ // the proxy got a chance to auth with a reissued auth key.
+ return false
+}
+
type FindStaticEndpointErr struct {
msg string
}
@@ -1024,9 +1127,9 @@ func getStaticEndpointAddress(a *corev1.NodeAddress, port uint16) *netip.AddrPor
return ptr.To(netip.AddrPortFrom(addr, port))
}
-// ensureAddedToGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup
-// is created. r.mu must be held.
-func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGroup) {
+// ensureStateAddedForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup
+// is created, and initialises per-ProxyGroup rate limits on re-issuing auth keys. r.mu must be held.
+func (r *ProxyGroupReconciler) ensureStateAddedForProxyGroup(pg *tsapi.ProxyGroup) {
switch pg.Spec.Type {
case tsapi.ProxyGroupTypeEgress:
r.egressProxyGroups.Add(pg.UID)
@@ -1038,11 +1141,17 @@ func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGr
gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len()))
gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len()))
+
+ if _, ok := r.authKeyRateLimits[pg.Name]; !ok {
+ // Allow every replica to have its auth key re-issued quickly the first
+ // time, but with an overall limit of 1 every 30s after a burst.
+ r.authKeyRateLimits[pg.Name] = rate.NewLimiter(rate.Every(30*time.Second), int(pgReplicas(pg)))
+ }
}
-// ensureRemovedFromGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource type is updated when the
-// ProxyGroup is deleted. r.mu must be held.
-func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.ProxyGroup) {
+// ensureStateRemovedForProxyGroup ensures the gauge metric for the ProxyGroup resource type is updated when the
+// ProxyGroup is deleted, and deletes the per-ProxyGroup rate limiter to free memory. r.mu must be held.
+func (r *ProxyGroupReconciler) ensureStateRemovedForProxyGroup(pg *tsapi.ProxyGroup) {
switch pg.Spec.Type {
case tsapi.ProxyGroupTypeEgress:
r.egressProxyGroups.Remove(pg.UID)
@@ -1054,6 +1163,7 @@ func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.Pro
gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len()))
gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len()))
+ delete(r.authKeyRateLimits, pg.Name)
}
func pgTailscaledConfig(pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass, idx int32, authKey *string, staticEndpoints []netip.AddrPort, oldAdvertiseServices []string, loginServer string) (tailscaledConfigs, error) {
@@ -1114,7 +1224,7 @@ func (r *ProxyGroupReconciler) getNodeMetadata(ctx context.Context, pg *tsapi.Pr
return nil, fmt.Errorf("failed to list state Secrets: %w", err)
}
for _, secret := range secrets.Items {
- var ordinal int
+ var ordinal int32
if _, err := fmt.Sscanf(secret.Name, pg.Name+"-%d", &ordinal); err != nil {
return nil, fmt.Errorf("unexpected secret %s was labelled as owned by the ProxyGroup %s: %w", secret.Name, pg.Name, err)
}
@@ -1198,7 +1308,7 @@ func (r *ProxyGroupReconciler) getRunningProxies(ctx context.Context, pg *tsapi.
}
type nodeMetadata struct {
- ordinal int
+ ordinal int32
stateSecret *corev1.Secret
podUID string // or empty if the Pod no longer exists.
tsID tailcfg.StableNodeID