diff options
Diffstat (limited to 'cmd/k8s-nameserver/main.go')
| -rw-r--r-- | cmd/k8s-nameserver/main.go | 427 |
1 files changed, 219 insertions, 208 deletions
diff --git a/cmd/k8s-nameserver/main.go b/cmd/k8s-nameserver/main.go index 53c0fee39..df062aec6 100644 --- a/cmd/k8s-nameserver/main.go +++ b/cmd/k8s-nameserver/main.go @@ -13,28 +13,30 @@ import ( "encoding/json" "fmt" "log" + "math/rand" "net" + "net/netip" "os" - "os/signal" + "path" "path/filepath" + "strings" "sync" - "syscall" + "time" "github.com/fsnotify/fsnotify" - "github.com/miekg/dns" + "golang.org/x/net/dns/dnsmessage" + "k8s.io/utils/pointer" + "tailscale.com/ipn/store/kubestore" operatorutils "tailscale.com/k8s-operator" + "tailscale.com/tsnet" + "tailscale.com/types/nettype" "tailscale.com/util/dnsname" ) const ( - // tsNetDomain is the domain that this DNS nameserver has registered a handler for. - tsNetDomain = "ts.net" // addr is the the address that the UDP and TCP listeners will listen on. - addr = ":1053" + addr = ":53" - // The following constants are specific to the nameserver configuration - // provided by a mounted Kubernetes Configmap. The Configmap mounted at - // /config is the only supported way for configuring this nameserver. defaultDNSConfigDir = "/config" kubeletMountedConfigLn = "..data" ) @@ -55,122 +57,187 @@ type nameserver struct { // configuration has changed and the nameserver should update the // in-memory records. configWatcher <-chan string + proxies []string - mu sync.Mutex // protects following - // ip4 are the in-memory hostname -> IP4 mappings that the nameserver - // uses to respond to A record queries. - ip4 map[dnsname.FQDN][]net.IP + mu sync.Mutex // protects following + serviceIPs map[dnsname.FQDN][]netip.Addr } func main() { ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // Ensure that we watch the kube Configmap mounted at /config for - // nameserver configuration updates and send events when updates happen. - c := ensureWatcherForKubeConfigMap(ctx) + // state always in 'dnsrecords' Secret + kubeStateStore, err := kubestore.New(log.Printf, *pointer.StringPtr("nameserver-state")) + if err != nil { + log.Fatalf("error starting kube state store: %v", err) + } + ts := tsnet.Server{ + Logf: log.Printf, + Hostname: "dns-server", + Dir: "/tmp", + Store: kubeStateStore, + } + if _, err := ts.Up(ctx); err != nil { + log.Fatalf("ts.Up: %v", err) + } + defer ts.Close() + + // hardcoded for this prototype + proxies := []string{"proxies-0", "proxies-1", "proxies-2", "proxies-3"} + c := ensureWatcherForServiceConfigMaps(ctx, proxies) ns := &nameserver{ configReader: configMapConfigReader, configWatcher: c, + proxies: proxies, } - // Ensure that in-memory records get set up to date now and will get - // reset when the configuration changes. - ns.runRecordsReconciler(ctx) + ns.runServiceRecordsReconciler(ctx) - // Register a DNS server handle for ts.net domain names. Not having a - // handle registered for any other domain names is how we enforce that - // this nameserver can only be used for ts.net domains - querying any - // other domain names returns Rcode Refused. - dns.HandleFunc(tsNetDomain, ns.handleFunc()) + var wg sync.WaitGroup + + udpListener, err := ts.Listen("udp", addr) + if err != nil { + log.Fatalf("failed listening on udp port :53") + } + defer udpListener.Close() + wg.Add(1) + go func() { + ns.serveDNS(udpListener) + }() + log.Printf("Listening for DNS on UDP %s", udpListener.Addr()) - // Listen for DNS queries over UDP and TCP. - udpSig := make(chan os.Signal) - tcpSig := make(chan os.Signal) - go listenAndServe("udp", addr, udpSig) - go listenAndServe("tcp", addr, tcpSig) - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) - s := <-sig - log.Printf("OS signal (%s) received, shutting down", s) - cancel() // exit the records reconciler and configmap watcher goroutines - udpSig <- s // stop the UDP listener - tcpSig <- s // stop the TCP listener + tcpListener, err := ts.Listen("tcp", addr) + if err != nil { + log.Fatalf("failed listening on tcp port :53") + } + defer tcpListener.Close() + wg.Add(1) + go func() { + ns.serveDNS(tcpListener) + }() + log.Printf("Listening for DNS on TCP %s", tcpListener.Addr()) + wg.Wait() } -// handleFunc is a DNS query handler that can respond to A record queries from -// the nameserver's in-memory records. -// - If an A record query is received and the -// nameserver's in-memory records contain records for the queried domain name, -// return a success response. -// - If an A record query is received, but the -// nameserver's in-memory records do not contain records for the queried domain name, -// return NXDOMAIN. -// - If an A record query is received, but the queried domain name is not valid, return Format Error. -// - If a query is received for any other record type than A, return Not Implemented. -func (n *nameserver) handleFunc() func(w dns.ResponseWriter, r *dns.Msg) { - h := func(w dns.ResponseWriter, r *dns.Msg) { - m := new(dns.Msg) - defer func() { - w.WriteMsg(m) - }() - if len(r.Question) < 1 { - log.Print("[unexpected] nameserver received a request with no questions") - m = r.SetRcodeFormatError(r) +func (c *nameserver) serveDNS(ln net.Listener) { + for { + conn, err := ln.Accept() + if err != nil { + log.Printf("serveDNS accept: %v", err) return } - // TODO (irbekrm): maybe set message compression - switch r.Question[0].Qtype { - case dns.TypeA: - q := r.Question[0].Name - fqdn, err := dnsname.ToFQDN(q) - if err != nil { - m = r.SetRcodeFormatError(r) - return - } - // The only supported use of this nameserver is as a - // single source of truth for MagicDNS names by - // non-tailnet Kubernetes workloads. - m.Authoritative = true - m.RecursionAvailable = false + go c.handleServiceName(conn.(nettype.ConnPacketConn)) + } +} - ips := n.lookupIP4(fqdn) - if ips == nil || len(ips) == 0 { - // As we are the authoritative nameserver for MagicDNS - // names, if we do not have a record for this MagicDNS - // name, it does not exist. - m = m.SetRcode(r, dns.RcodeNameError) - return - } - // TODO (irbekrm): TTL is currently set to 0, meaning - // that cluster workloads will not cache the DNS - // records. Revisit this in future when we understand - // the usage patterns better- is it putting too much - // load on kube DNS server or is this fine? - for _, ip := range ips { - rr := &dns.A{Hdr: dns.RR_Header{Name: q, Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 0}, A: ip} - m.SetRcode(r, dns.RcodeSuccess) - m.Answer = append(m.Answer, rr) - } - case dns.TypeAAAA: - // TODO (irbekrm): implement IPv6 support. - // Kubernetes distributions that I am most familiar with - // default to IPv4 for Pod CIDR ranges and often many cases don't - // support IPv6 at all, so this should not be crucial for now. - fallthrough - default: - log.Printf("[unexpected] nameserver received a query for an unsupported record type: %s", r.Question[0].String()) - m.SetRcode(r, dns.RcodeNotImplemented) +var tsMBox = dnsmessage.MustNewName("support.tailscale.com.") + +func (ns *nameserver) handleServiceName(conn nettype.ConnPacketConn) { + defer conn.Close() + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + buf := make([]byte, 1500) + n, err := conn.Read(buf) + if err != nil { + log.Printf("handeServiceName: read failed: %v\n ", err) + return + } + var msg dnsmessage.Message + err = msg.Unpack(buf[:n]) + if err != nil { + log.Printf("handleServiceName: dnsmessage unpack failed: %v\n ", err) + return + } + resp, err := ns.generateDNSResponse(&msg) + if err != nil { + log.Printf("handleServiceName: DNS response generation failed: %v\n", err) + return + } + if len(resp) == 0 { + return + } + _, err = conn.Write(resp) + if err != nil { + log.Printf("handleServiceName: write failed: %v\n", err) + } +} + +func (ns *nameserver) generateDNSResponse(req *dnsmessage.Message) ([]byte, error) { + b := dnsmessage.NewBuilder(nil, + dnsmessage.Header{ + ID: req.Header.ID, + Response: true, + Authoritative: true, + }) + b.EnableCompression() + + if len(req.Questions) == 0 { + return b.Finish() + } + q := req.Questions[0] + if err := b.StartQuestions(); err != nil { + return nil, err + } + if err := b.Question(q); err != nil { + return nil, err + } + if err := b.StartAnswers(); err != nil { + return nil, err + } + + var err error + switch q.Type { + case dnsmessage.TypeA: + log.Printf("query for an A record") + var fqdn dnsname.FQDN + fqdn, err = dnsname.ToFQDN(q.Name.String()) + if err != nil { + log.Print("format error") + return nil, err } + + log.Print("locking service IPs") + ns.mu.Lock() + ips := ns.serviceIPs[fqdn] + ns.mu.Unlock() + log.Print("unlocking service IPs") + + if ips == nil || len(ips) == 0 { + log.Printf("nameserver has no IPs for %s", fqdn) + // NXDOMAIN? + return nil, fmt.Errorf("no address found for %s", fqdn) + } + + // return a random IP + i := rand.Intn(len(ips)) + ip := ips[i] + log.Printf("produced IP address %s", ip) + err = b.AResource( + dnsmessage.ResourceHeader{Name: q.Name, Class: q.Class, TTL: 5}, + dnsmessage.AResource{A: ip.As4()}, + ) + case dnsmessage.TypeSOA: + err = b.SOAResource( + dnsmessage.ResourceHeader{Name: q.Name, Class: q.Class, TTL: 120}, + dnsmessage.SOAResource{NS: q.Name, MBox: tsMBox, Serial: 2023030600, + Refresh: 120, Retry: 120, Expire: 120, MinTTL: 60}, + ) + case dnsmessage.TypeNS: + err = b.NSResource( + dnsmessage.ResourceHeader{Name: q.Name, Class: q.Class, TTL: 120}, + dnsmessage.NSResource{NS: tsMBox}, + ) } - return h + if err != nil { + return nil, err + } + return b.Finish() } -// runRecordsReconciler ensures that nameserver's in-memory records are -// reset when the provided configuration changes. -func (n *nameserver) runRecordsReconciler(ctx context.Context) { - log.Print("updating nameserver's records from the provided configuration...") - if err := n.resetRecords(); err != nil { // ensure records are up to date before the nameserver starts +func (n *nameserver) runServiceRecordsReconciler(ctx context.Context) { + log.Print("updating nameserver's records from the provided services configuration...") + if err := n.resetServiceRecords(); err != nil { // ensure records are up to date before the nameserver starts log.Fatalf("error setting nameserver's records: %v", err) } log.Print("nameserver's records were updated") @@ -182,13 +249,7 @@ func (n *nameserver) runRecordsReconciler(ctx context.Context) { return case <-n.configWatcher: log.Print("configuration update detected, resetting records") - if err := n.resetRecords(); err != nil { - // TODO (irbekrm): this runs in a - // container that will be thrown away, - // so this should be ok. But maybe still - // need to ensure that the DNS server - // terminates connections more - // gracefully. + if err := n.resetServiceRecords(); err != nil { log.Fatalf("error resetting records: %v", err) } log.Print("nameserver records were reset") @@ -197,93 +258,47 @@ func (n *nameserver) runRecordsReconciler(ctx context.Context) { }() } -// resetRecords sets the in-memory DNS records of this nameserver from the -// provided configuration. It does not check for the diff, so the caller is -// expected to ensure that this is only called when reset is needed. -func (n *nameserver) resetRecords() error { - dnsCfgBytes, err := n.configReader() - if err != nil { - log.Printf("error reading nameserver's configuration: %v", err) - return err - } - if dnsCfgBytes == nil || len(dnsCfgBytes) < 1 { - log.Print("nameserver's configuration is empty, any in-memory records will be unset") - n.mu.Lock() - n.ip4 = make(map[dnsname.FQDN][]net.IP) - n.mu.Unlock() - return nil - } - dnsCfg := &operatorutils.Records{} - err = json.Unmarshal(dnsCfgBytes, dnsCfg) - if err != nil { - return fmt.Errorf("error unmarshalling nameserver configuration: %v\n", err) - } - - if dnsCfg.Version != operatorutils.Alpha1Version { - return fmt.Errorf("unsupported configuration version %s, supported versions are %s\n", dnsCfg.Version, operatorutils.Alpha1Version) - } - - ip4 := make(map[dnsname.FQDN][]net.IP) - defer func() { - n.mu.Lock() - defer n.mu.Unlock() - n.ip4 = ip4 - }() - - if len(dnsCfg.IP4) == 0 { - log.Print("nameserver's configuration contains no records, any in-memory records will be unset") - return nil - } +func (n *nameserver) resetServiceRecords() error { + ip4 := make(map[dnsname.FQDN][]netip.Addr) + for _, proxy := range n.proxies { + dnsCfgBytes, err := proxyConfigReader(proxy) + if err != nil { + log.Printf("error reading proxy config for %s configuration: %v", proxy, err) + return err + } + if dnsCfgBytes == nil || len(dnsCfgBytes) == 0 { + log.Printf("configuration for proxy %s is empty; do nothing", proxy) + continue + } + proxyCfg := &operatorutils.ProxyConfig{} - for fqdn, ips := range dnsCfg.IP4 { - fqdn, err := dnsname.ToFQDN(fqdn) + err = json.Unmarshal(dnsCfgBytes, proxyCfg) if err != nil { - log.Printf("invalid nameserver's configuration: %s is not a valid FQDN: %v; skipping this record", fqdn, err) - continue // one invalid hostname should not break the whole nameserver + return fmt.Errorf("error unmarshalling proxy config: %v\n", err) } - for _, ipS := range ips { - ip := net.ParseIP(ipS).To4() - if ip == nil { // To4 returns nil if IP is not a IPv4 address - log.Printf("invalid nameserver's configuration: %v does not appear to be an IPv4 address; skipping this record", ipS) - continue // one invalid IP address should not break the whole nameserver - } - ip4[fqdn] = []net.IP{ip} + for _, svc := range proxyCfg.Services { + log.Printf("adding record for Service %s", svc.FQDN) + ip4[dnsname.FQDN(svc.FQDN)] = append(ip4[dnsname.FQDN(svc.FQDN)], svc.V4ServiceIPs...) } } + log.Printf("after update DNS records are %#+v", ip4) + n.mu.Lock() + n.serviceIPs = ip4 + n.mu.Unlock() return nil } -// listenAndServe starts a DNS server for the provided network and address. -func listenAndServe(net, addr string, shutdown chan os.Signal) { - s := &dns.Server{Addr: addr, Net: net} - go func() { - <-shutdown - log.Printf("shutting down server for %s", net) - s.Shutdown() - }() - log.Printf("listening for %s queries on %s", net, addr) - if err := s.ListenAndServe(); err != nil { - log.Fatalf("error running %s server: %v", net, err) - } -} - -// ensureWatcherForKubeConfigMap sets up a new file watcher for the ConfigMap -// that's expected to be mounted at /config. Returns a channel that receives an -// event every time the contents get updated. -func ensureWatcherForKubeConfigMap(ctx context.Context) chan string { +// ensureWatcherForServiceConfigMaps sets up a new file watcher for the +// ConfigMaps containing records for Services served by the operator proxies. +func ensureWatcherForServiceConfigMaps(ctx context.Context, proxies []string) chan string { c := make(chan string) watcher, err := fsnotify.NewWatcher() if err != nil { - log.Fatalf("error creating a new watcher for the mounted ConfigMap: %v", err) + log.Fatalf("error creating a new watcher for the services ConfigMap: %v", err) } - // kubelet mounts configmap to a Pod using a series of symlinks, one of - // which is <mount-dir>/..data that Kubernetes recommends consumers to - // use if they need to monitor changes - // https://github.com/kubernetes/kubernetes/blob/v1.28.1/pkg/volume/util/atomic_writer.go#L39-L61 - toWatch := filepath.Join(defaultDNSConfigDir, kubeletMountedConfigLn) go func() { defer watcher.Close() - log.Printf("starting file watch for %s", defaultDNSConfigDir) + log.Printf("starting file watch for %s", "/services/") for { select { case <-ctx.Done(): @@ -293,35 +308,31 @@ func ensureWatcherForKubeConfigMap(ctx context.Context) chan string { if !ok { log.Fatal("watcher finished; exiting") } - if event.Name == toWatch { + // kubelet mounts configmap to a Pod using a series of symlinks, one of + // which is <mount-dir>/..data that Kubernetes recommends consumers to + // use if they need to monitor changes + // https://github.com/kubernetes/kubernetes/blob/v1.28.1/pkg/volume/util/atomic_writer.go#L39-L61 + if strings.HasSuffix(event.Name, kubeletMountedConfigLn) { msg := fmt.Sprintf("ConfigMap update received: %s", event) log.Print(msg) - c <- msg + n := path.Dir(event.Name) + base := path.Base(n) + c <- base // which proxy's ConfigMap should be updated } case err, ok := <-watcher.Errors: if err != nil { - // TODO (irbekrm): this runs in a - // container that will be thrown away, - // so this should be ok. But maybe still - // need to ensure that the DNS server - // terminates connections more - // gracefully. - log.Fatalf("[unexpected] error watching configuration: %v", err) + log.Fatalf("[unexpected] error watching services configuration: %v", err) } if !ok { - // TODO (irbekrm): this runs in a - // container that will be thrown away, - // so this should be ok. But maybe still - // need to ensure that the DNS server - // terminates connections more - // gracefully. log.Fatalf("[unexpected] errors watcher exited") } } } }() - if err = watcher.Add(defaultDNSConfigDir); err != nil { - log.Fatalf("failed setting up a watcher for the mounted ConfigMap: %v", err) + for _, name := range proxies { + if err = watcher.Add(filepath.Join("/services", name)); err != nil { + log.Fatalf("failed setting up a watcher for config for %s : %v", name, err) + } } return c } @@ -341,14 +352,14 @@ var configMapConfigReader configReaderFunc = func() ([]byte, error) { } } -// lookupIP4 returns any IPv4 addresses for the given FQDN from nameserver's -// in-memory records. -func (n *nameserver) lookupIP4(fqdn dnsname.FQDN) []net.IP { - if n.ip4 == nil { - return nil +func proxyConfigReader(proxy string) ([]byte, error) { + path := filepath.Join("/services", proxy, "proxyConfig") + if bs, err := os.ReadFile(path); err == nil { + return bs, err + } else if os.IsNotExist(err) { + log.Printf("path %s does not exist", path) + return nil, nil + } else { + return nil, fmt.Errorf("error reading %s: %w", path, err) } - n.mu.Lock() - defer n.mu.Unlock() - f := n.ip4[fqdn] - return f } |
