summaryrefslogtreecommitdiffhomepage
path: root/cmd/k8s-nameserver/main.go
diff options
context:
space:
mode:
authorIrbe Krumina <irbe@tailscale.com>2024-05-30 10:09:21 +0100
committerIrbe Krumina <irbe@tailscale.com>2024-08-15 11:19:32 +0300
commit5156ec6a3b70190875ff05708763dd9cf24ddcd5 (patch)
treed81968a65614bbb304a5a96ec155e8df2b24f60c /cmd/k8s-nameserver/main.go
parent436794cf7ab944d40e93f3cc81f25ec06f79bf86 (diff)
downloadtailscale-irbekrm/proxycidrs.tar.xz
tailscale-irbekrm/proxycidrs.zip
Diffstat (limited to 'cmd/k8s-nameserver/main.go')
-rw-r--r--cmd/k8s-nameserver/main.go427
1 files changed, 219 insertions, 208 deletions
diff --git a/cmd/k8s-nameserver/main.go b/cmd/k8s-nameserver/main.go
index 53c0fee39..df062aec6 100644
--- a/cmd/k8s-nameserver/main.go
+++ b/cmd/k8s-nameserver/main.go
@@ -13,28 +13,30 @@ import (
"encoding/json"
"fmt"
"log"
+ "math/rand"
"net"
+ "net/netip"
"os"
- "os/signal"
+ "path"
"path/filepath"
+ "strings"
"sync"
- "syscall"
+ "time"
"github.com/fsnotify/fsnotify"
- "github.com/miekg/dns"
+ "golang.org/x/net/dns/dnsmessage"
+ "k8s.io/utils/pointer"
+ "tailscale.com/ipn/store/kubestore"
operatorutils "tailscale.com/k8s-operator"
+ "tailscale.com/tsnet"
+ "tailscale.com/types/nettype"
"tailscale.com/util/dnsname"
)
const (
- // tsNetDomain is the domain that this DNS nameserver has registered a handler for.
- tsNetDomain = "ts.net"
// addr is the the address that the UDP and TCP listeners will listen on.
- addr = ":1053"
+ addr = ":53"
- // The following constants are specific to the nameserver configuration
- // provided by a mounted Kubernetes Configmap. The Configmap mounted at
- // /config is the only supported way for configuring this nameserver.
defaultDNSConfigDir = "/config"
kubeletMountedConfigLn = "..data"
)
@@ -55,122 +57,187 @@ type nameserver struct {
// configuration has changed and the nameserver should update the
// in-memory records.
configWatcher <-chan string
+ proxies []string
- mu sync.Mutex // protects following
- // ip4 are the in-memory hostname -> IP4 mappings that the nameserver
- // uses to respond to A record queries.
- ip4 map[dnsname.FQDN][]net.IP
+ mu sync.Mutex // protects following
+ serviceIPs map[dnsname.FQDN][]netip.Addr
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
- // Ensure that we watch the kube Configmap mounted at /config for
- // nameserver configuration updates and send events when updates happen.
- c := ensureWatcherForKubeConfigMap(ctx)
+ // state always in 'dnsrecords' Secret
+ kubeStateStore, err := kubestore.New(log.Printf, *pointer.StringPtr("nameserver-state"))
+ if err != nil {
+ log.Fatalf("error starting kube state store: %v", err)
+ }
+ ts := tsnet.Server{
+ Logf: log.Printf,
+ Hostname: "dns-server",
+ Dir: "/tmp",
+ Store: kubeStateStore,
+ }
+ if _, err := ts.Up(ctx); err != nil {
+ log.Fatalf("ts.Up: %v", err)
+ }
+ defer ts.Close()
+
+ // hardcoded for this prototype
+ proxies := []string{"proxies-0", "proxies-1", "proxies-2", "proxies-3"}
+ c := ensureWatcherForServiceConfigMaps(ctx, proxies)
ns := &nameserver{
configReader: configMapConfigReader,
configWatcher: c,
+ proxies: proxies,
}
- // Ensure that in-memory records get set up to date now and will get
- // reset when the configuration changes.
- ns.runRecordsReconciler(ctx)
+ ns.runServiceRecordsReconciler(ctx)
- // Register a DNS server handle for ts.net domain names. Not having a
- // handle registered for any other domain names is how we enforce that
- // this nameserver can only be used for ts.net domains - querying any
- // other domain names returns Rcode Refused.
- dns.HandleFunc(tsNetDomain, ns.handleFunc())
+ var wg sync.WaitGroup
+
+ udpListener, err := ts.Listen("udp", addr)
+ if err != nil {
+ log.Fatalf("failed listening on udp port :53")
+ }
+ defer udpListener.Close()
+ wg.Add(1)
+ go func() {
+ ns.serveDNS(udpListener)
+ }()
+ log.Printf("Listening for DNS on UDP %s", udpListener.Addr())
- // Listen for DNS queries over UDP and TCP.
- udpSig := make(chan os.Signal)
- tcpSig := make(chan os.Signal)
- go listenAndServe("udp", addr, udpSig)
- go listenAndServe("tcp", addr, tcpSig)
- sig := make(chan os.Signal, 1)
- signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
- s := <-sig
- log.Printf("OS signal (%s) received, shutting down", s)
- cancel() // exit the records reconciler and configmap watcher goroutines
- udpSig <- s // stop the UDP listener
- tcpSig <- s // stop the TCP listener
+ tcpListener, err := ts.Listen("tcp", addr)
+ if err != nil {
+ log.Fatalf("failed listening on tcp port :53")
+ }
+ defer tcpListener.Close()
+ wg.Add(1)
+ go func() {
+ ns.serveDNS(tcpListener)
+ }()
+ log.Printf("Listening for DNS on TCP %s", tcpListener.Addr())
+ wg.Wait()
}
-// handleFunc is a DNS query handler that can respond to A record queries from
-// the nameserver's in-memory records.
-// - If an A record query is received and the
-// nameserver's in-memory records contain records for the queried domain name,
-// return a success response.
-// - If an A record query is received, but the
-// nameserver's in-memory records do not contain records for the queried domain name,
-// return NXDOMAIN.
-// - If an A record query is received, but the queried domain name is not valid, return Format Error.
-// - If a query is received for any other record type than A, return Not Implemented.
-func (n *nameserver) handleFunc() func(w dns.ResponseWriter, r *dns.Msg) {
- h := func(w dns.ResponseWriter, r *dns.Msg) {
- m := new(dns.Msg)
- defer func() {
- w.WriteMsg(m)
- }()
- if len(r.Question) < 1 {
- log.Print("[unexpected] nameserver received a request with no questions")
- m = r.SetRcodeFormatError(r)
+func (c *nameserver) serveDNS(ln net.Listener) {
+ for {
+ conn, err := ln.Accept()
+ if err != nil {
+ log.Printf("serveDNS accept: %v", err)
return
}
- // TODO (irbekrm): maybe set message compression
- switch r.Question[0].Qtype {
- case dns.TypeA:
- q := r.Question[0].Name
- fqdn, err := dnsname.ToFQDN(q)
- if err != nil {
- m = r.SetRcodeFormatError(r)
- return
- }
- // The only supported use of this nameserver is as a
- // single source of truth for MagicDNS names by
- // non-tailnet Kubernetes workloads.
- m.Authoritative = true
- m.RecursionAvailable = false
+ go c.handleServiceName(conn.(nettype.ConnPacketConn))
+ }
+}
- ips := n.lookupIP4(fqdn)
- if ips == nil || len(ips) == 0 {
- // As we are the authoritative nameserver for MagicDNS
- // names, if we do not have a record for this MagicDNS
- // name, it does not exist.
- m = m.SetRcode(r, dns.RcodeNameError)
- return
- }
- // TODO (irbekrm): TTL is currently set to 0, meaning
- // that cluster workloads will not cache the DNS
- // records. Revisit this in future when we understand
- // the usage patterns better- is it putting too much
- // load on kube DNS server or is this fine?
- for _, ip := range ips {
- rr := &dns.A{Hdr: dns.RR_Header{Name: q, Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 0}, A: ip}
- m.SetRcode(r, dns.RcodeSuccess)
- m.Answer = append(m.Answer, rr)
- }
- case dns.TypeAAAA:
- // TODO (irbekrm): implement IPv6 support.
- // Kubernetes distributions that I am most familiar with
- // default to IPv4 for Pod CIDR ranges and often many cases don't
- // support IPv6 at all, so this should not be crucial for now.
- fallthrough
- default:
- log.Printf("[unexpected] nameserver received a query for an unsupported record type: %s", r.Question[0].String())
- m.SetRcode(r, dns.RcodeNotImplemented)
+var tsMBox = dnsmessage.MustNewName("support.tailscale.com.")
+
+func (ns *nameserver) handleServiceName(conn nettype.ConnPacketConn) {
+ defer conn.Close()
+ conn.SetReadDeadline(time.Now().Add(5 * time.Second))
+ buf := make([]byte, 1500)
+ n, err := conn.Read(buf)
+ if err != nil {
+ log.Printf("handeServiceName: read failed: %v\n ", err)
+ return
+ }
+ var msg dnsmessage.Message
+ err = msg.Unpack(buf[:n])
+ if err != nil {
+ log.Printf("handleServiceName: dnsmessage unpack failed: %v\n ", err)
+ return
+ }
+ resp, err := ns.generateDNSResponse(&msg)
+ if err != nil {
+ log.Printf("handleServiceName: DNS response generation failed: %v\n", err)
+ return
+ }
+ if len(resp) == 0 {
+ return
+ }
+ _, err = conn.Write(resp)
+ if err != nil {
+ log.Printf("handleServiceName: write failed: %v\n", err)
+ }
+}
+
+func (ns *nameserver) generateDNSResponse(req *dnsmessage.Message) ([]byte, error) {
+ b := dnsmessage.NewBuilder(nil,
+ dnsmessage.Header{
+ ID: req.Header.ID,
+ Response: true,
+ Authoritative: true,
+ })
+ b.EnableCompression()
+
+ if len(req.Questions) == 0 {
+ return b.Finish()
+ }
+ q := req.Questions[0]
+ if err := b.StartQuestions(); err != nil {
+ return nil, err
+ }
+ if err := b.Question(q); err != nil {
+ return nil, err
+ }
+ if err := b.StartAnswers(); err != nil {
+ return nil, err
+ }
+
+ var err error
+ switch q.Type {
+ case dnsmessage.TypeA:
+ log.Printf("query for an A record")
+ var fqdn dnsname.FQDN
+ fqdn, err = dnsname.ToFQDN(q.Name.String())
+ if err != nil {
+ log.Print("format error")
+ return nil, err
}
+
+ log.Print("locking service IPs")
+ ns.mu.Lock()
+ ips := ns.serviceIPs[fqdn]
+ ns.mu.Unlock()
+ log.Print("unlocking service IPs")
+
+ if ips == nil || len(ips) == 0 {
+ log.Printf("nameserver has no IPs for %s", fqdn)
+ // NXDOMAIN?
+ return nil, fmt.Errorf("no address found for %s", fqdn)
+ }
+
+ // return a random IP
+ i := rand.Intn(len(ips))
+ ip := ips[i]
+ log.Printf("produced IP address %s", ip)
+ err = b.AResource(
+ dnsmessage.ResourceHeader{Name: q.Name, Class: q.Class, TTL: 5},
+ dnsmessage.AResource{A: ip.As4()},
+ )
+ case dnsmessage.TypeSOA:
+ err = b.SOAResource(
+ dnsmessage.ResourceHeader{Name: q.Name, Class: q.Class, TTL: 120},
+ dnsmessage.SOAResource{NS: q.Name, MBox: tsMBox, Serial: 2023030600,
+ Refresh: 120, Retry: 120, Expire: 120, MinTTL: 60},
+ )
+ case dnsmessage.TypeNS:
+ err = b.NSResource(
+ dnsmessage.ResourceHeader{Name: q.Name, Class: q.Class, TTL: 120},
+ dnsmessage.NSResource{NS: tsMBox},
+ )
}
- return h
+ if err != nil {
+ return nil, err
+ }
+ return b.Finish()
}
-// runRecordsReconciler ensures that nameserver's in-memory records are
-// reset when the provided configuration changes.
-func (n *nameserver) runRecordsReconciler(ctx context.Context) {
- log.Print("updating nameserver's records from the provided configuration...")
- if err := n.resetRecords(); err != nil { // ensure records are up to date before the nameserver starts
+func (n *nameserver) runServiceRecordsReconciler(ctx context.Context) {
+ log.Print("updating nameserver's records from the provided services configuration...")
+ if err := n.resetServiceRecords(); err != nil { // ensure records are up to date before the nameserver starts
log.Fatalf("error setting nameserver's records: %v", err)
}
log.Print("nameserver's records were updated")
@@ -182,13 +249,7 @@ func (n *nameserver) runRecordsReconciler(ctx context.Context) {
return
case <-n.configWatcher:
log.Print("configuration update detected, resetting records")
- if err := n.resetRecords(); err != nil {
- // TODO (irbekrm): this runs in a
- // container that will be thrown away,
- // so this should be ok. But maybe still
- // need to ensure that the DNS server
- // terminates connections more
- // gracefully.
+ if err := n.resetServiceRecords(); err != nil {
log.Fatalf("error resetting records: %v", err)
}
log.Print("nameserver records were reset")
@@ -197,93 +258,47 @@ func (n *nameserver) runRecordsReconciler(ctx context.Context) {
}()
}
-// resetRecords sets the in-memory DNS records of this nameserver from the
-// provided configuration. It does not check for the diff, so the caller is
-// expected to ensure that this is only called when reset is needed.
-func (n *nameserver) resetRecords() error {
- dnsCfgBytes, err := n.configReader()
- if err != nil {
- log.Printf("error reading nameserver's configuration: %v", err)
- return err
- }
- if dnsCfgBytes == nil || len(dnsCfgBytes) < 1 {
- log.Print("nameserver's configuration is empty, any in-memory records will be unset")
- n.mu.Lock()
- n.ip4 = make(map[dnsname.FQDN][]net.IP)
- n.mu.Unlock()
- return nil
- }
- dnsCfg := &operatorutils.Records{}
- err = json.Unmarshal(dnsCfgBytes, dnsCfg)
- if err != nil {
- return fmt.Errorf("error unmarshalling nameserver configuration: %v\n", err)
- }
-
- if dnsCfg.Version != operatorutils.Alpha1Version {
- return fmt.Errorf("unsupported configuration version %s, supported versions are %s\n", dnsCfg.Version, operatorutils.Alpha1Version)
- }
-
- ip4 := make(map[dnsname.FQDN][]net.IP)
- defer func() {
- n.mu.Lock()
- defer n.mu.Unlock()
- n.ip4 = ip4
- }()
-
- if len(dnsCfg.IP4) == 0 {
- log.Print("nameserver's configuration contains no records, any in-memory records will be unset")
- return nil
- }
+func (n *nameserver) resetServiceRecords() error {
+ ip4 := make(map[dnsname.FQDN][]netip.Addr)
+ for _, proxy := range n.proxies {
+ dnsCfgBytes, err := proxyConfigReader(proxy)
+ if err != nil {
+ log.Printf("error reading proxy config for %s configuration: %v", proxy, err)
+ return err
+ }
+ if dnsCfgBytes == nil || len(dnsCfgBytes) == 0 {
+ log.Printf("configuration for proxy %s is empty; do nothing", proxy)
+ continue
+ }
+ proxyCfg := &operatorutils.ProxyConfig{}
- for fqdn, ips := range dnsCfg.IP4 {
- fqdn, err := dnsname.ToFQDN(fqdn)
+ err = json.Unmarshal(dnsCfgBytes, proxyCfg)
if err != nil {
- log.Printf("invalid nameserver's configuration: %s is not a valid FQDN: %v; skipping this record", fqdn, err)
- continue // one invalid hostname should not break the whole nameserver
+ return fmt.Errorf("error unmarshalling proxy config: %v\n", err)
}
- for _, ipS := range ips {
- ip := net.ParseIP(ipS).To4()
- if ip == nil { // To4 returns nil if IP is not a IPv4 address
- log.Printf("invalid nameserver's configuration: %v does not appear to be an IPv4 address; skipping this record", ipS)
- continue // one invalid IP address should not break the whole nameserver
- }
- ip4[fqdn] = []net.IP{ip}
+ for _, svc := range proxyCfg.Services {
+ log.Printf("adding record for Service %s", svc.FQDN)
+ ip4[dnsname.FQDN(svc.FQDN)] = append(ip4[dnsname.FQDN(svc.FQDN)], svc.V4ServiceIPs...)
}
}
+ log.Printf("after update DNS records are %#+v", ip4)
+ n.mu.Lock()
+ n.serviceIPs = ip4
+ n.mu.Unlock()
return nil
}
-// listenAndServe starts a DNS server for the provided network and address.
-func listenAndServe(net, addr string, shutdown chan os.Signal) {
- s := &dns.Server{Addr: addr, Net: net}
- go func() {
- <-shutdown
- log.Printf("shutting down server for %s", net)
- s.Shutdown()
- }()
- log.Printf("listening for %s queries on %s", net, addr)
- if err := s.ListenAndServe(); err != nil {
- log.Fatalf("error running %s server: %v", net, err)
- }
-}
-
-// ensureWatcherForKubeConfigMap sets up a new file watcher for the ConfigMap
-// that's expected to be mounted at /config. Returns a channel that receives an
-// event every time the contents get updated.
-func ensureWatcherForKubeConfigMap(ctx context.Context) chan string {
+// ensureWatcherForServiceConfigMaps sets up a new file watcher for the
+// ConfigMaps containing records for Services served by the operator proxies.
+func ensureWatcherForServiceConfigMaps(ctx context.Context, proxies []string) chan string {
c := make(chan string)
watcher, err := fsnotify.NewWatcher()
if err != nil {
- log.Fatalf("error creating a new watcher for the mounted ConfigMap: %v", err)
+ log.Fatalf("error creating a new watcher for the services ConfigMap: %v", err)
}
- // kubelet mounts configmap to a Pod using a series of symlinks, one of
- // which is <mount-dir>/..data that Kubernetes recommends consumers to
- // use if they need to monitor changes
- // https://github.com/kubernetes/kubernetes/blob/v1.28.1/pkg/volume/util/atomic_writer.go#L39-L61
- toWatch := filepath.Join(defaultDNSConfigDir, kubeletMountedConfigLn)
go func() {
defer watcher.Close()
- log.Printf("starting file watch for %s", defaultDNSConfigDir)
+ log.Printf("starting file watch for %s", "/services/")
for {
select {
case <-ctx.Done():
@@ -293,35 +308,31 @@ func ensureWatcherForKubeConfigMap(ctx context.Context) chan string {
if !ok {
log.Fatal("watcher finished; exiting")
}
- if event.Name == toWatch {
+ // kubelet mounts configmap to a Pod using a series of symlinks, one of
+ // which is <mount-dir>/..data that Kubernetes recommends consumers to
+ // use if they need to monitor changes
+ // https://github.com/kubernetes/kubernetes/blob/v1.28.1/pkg/volume/util/atomic_writer.go#L39-L61
+ if strings.HasSuffix(event.Name, kubeletMountedConfigLn) {
msg := fmt.Sprintf("ConfigMap update received: %s", event)
log.Print(msg)
- c <- msg
+ n := path.Dir(event.Name)
+ base := path.Base(n)
+ c <- base // which proxy's ConfigMap should be updated
}
case err, ok := <-watcher.Errors:
if err != nil {
- // TODO (irbekrm): this runs in a
- // container that will be thrown away,
- // so this should be ok. But maybe still
- // need to ensure that the DNS server
- // terminates connections more
- // gracefully.
- log.Fatalf("[unexpected] error watching configuration: %v", err)
+ log.Fatalf("[unexpected] error watching services configuration: %v", err)
}
if !ok {
- // TODO (irbekrm): this runs in a
- // container that will be thrown away,
- // so this should be ok. But maybe still
- // need to ensure that the DNS server
- // terminates connections more
- // gracefully.
log.Fatalf("[unexpected] errors watcher exited")
}
}
}
}()
- if err = watcher.Add(defaultDNSConfigDir); err != nil {
- log.Fatalf("failed setting up a watcher for the mounted ConfigMap: %v", err)
+ for _, name := range proxies {
+ if err = watcher.Add(filepath.Join("/services", name)); err != nil {
+ log.Fatalf("failed setting up a watcher for config for %s : %v", name, err)
+ }
}
return c
}
@@ -341,14 +352,14 @@ var configMapConfigReader configReaderFunc = func() ([]byte, error) {
}
}
-// lookupIP4 returns any IPv4 addresses for the given FQDN from nameserver's
-// in-memory records.
-func (n *nameserver) lookupIP4(fqdn dnsname.FQDN) []net.IP {
- if n.ip4 == nil {
- return nil
+func proxyConfigReader(proxy string) ([]byte, error) {
+ path := filepath.Join("/services", proxy, "proxyConfig")
+ if bs, err := os.ReadFile(path); err == nil {
+ return bs, err
+ } else if os.IsNotExist(err) {
+ log.Printf("path %s does not exist", path)
+ return nil, nil
+ } else {
+ return nil, fmt.Errorf("error reading %s: %w", path, err)
}
- n.mu.Lock()
- defer n.mu.Unlock()
- f := n.ip4[fqdn]
- return f
}