summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--cmd/tailscale/cli/debug.go5
-rw-r--r--ipn/ipnlocal/local.go12
-rw-r--r--ipn/localapi/localapi.go2
-rw-r--r--net/packet/packet.go10
-rw-r--r--wgengine/filter/filter.go161
5 files changed, 181 insertions, 9 deletions
diff --git a/cmd/tailscale/cli/debug.go b/cmd/tailscale/cli/debug.go
index c23d976af..7f763753c 100644
--- a/cmd/tailscale/cli/debug.go
+++ b/cmd/tailscale/cli/debug.go
@@ -94,6 +94,11 @@ var debugCmd = &ffcli.Command{
ShortHelp: "force a magicsock rebind",
},
{
+ Name: "kick-all-tcp-in",
+ Exec: localAPIAction("kick-all-tcp-in"),
+ ShortHelp: "test TCP flow kick [incoming]",
+ },
+ {
Name: "prefs",
Exec: runPrefs,
ShortHelp: "print prefs",
diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go
index 2d2d0f90d..d6b1d9daa 100644
--- a/ipn/ipnlocal/local.go
+++ b/ipn/ipnlocal/local.go
@@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"io"
+ "log"
"net"
"net/http"
"os"
@@ -3283,6 +3284,17 @@ func (b *LocalBackend) DebugReSTUN() error {
return nil
}
+func (b *LocalBackend) DebugKickAllTCPIn() error {
+ filt, ok := b.filterAtomic.Load().(*filter.Filter)
+ if !ok {
+ return errors.New("no filter")
+ }
+ for _, flow := range filt.OpenTCPFlows() {
+ log.Printf("XXX: flow open: %+v", flow)
+ }
+ return nil
+}
+
func (b *LocalBackend) magicConn() (*magicsock.Conn, error) {
ig, ok := b.e.(wgengine.InternalsGetter)
if !ok {
diff --git a/ipn/localapi/localapi.go b/ipn/localapi/localapi.go
index 699758d3e..286a54754 100644
--- a/ipn/localapi/localapi.go
+++ b/ipn/localapi/localapi.go
@@ -278,6 +278,8 @@ func (h *Handler) serveDebug(w http.ResponseWriter, r *http.Request) {
err = h.b.DebugRebind()
case "restun":
err = h.b.DebugReSTUN()
+ case "kick-all-tcp-in":
+ err = h.b.DebugKickAllTCPIn()
case "":
err = fmt.Errorf("missing parameter 'action'")
default:
diff --git a/net/packet/packet.go b/net/packet/packet.go
index 48633b89d..f9d912d03 100644
--- a/net/packet/packet.go
+++ b/net/packet/packet.go
@@ -390,6 +390,16 @@ func (q *Parsed) IsTCPSyn() bool {
return (q.TCPFlags & TCPSynAck) == TCPSyn
}
+// IsTCPRst reports whether q is a TCP RST packet.
+func (q *Parsed) IsTCPRst() bool {
+ return (q.TCPFlags & TCPRst) != 0
+}
+
+// IsTCPFin reports whether q is a TCP FIN packet.
+func (q *Parsed) IsTCPFin() bool {
+ return (q.TCPFlags & TCPFin) != 0
+}
+
// IsError reports whether q is an ICMP "Error" packet.
func (q *Parsed) IsError() bool {
switch q.IPProto {
diff --git a/wgengine/filter/filter.go b/wgengine/filter/filter.go
index cda28eb30..8aea09e12 100644
--- a/wgengine/filter/filter.go
+++ b/wgengine/filter/filter.go
@@ -7,6 +7,7 @@ package filter
import (
"fmt"
+ "log"
"sync"
"time"
@@ -17,6 +18,7 @@ import (
"tailscale.com/tstime/rate"
"tailscale.com/types/ipproto"
"tailscale.com/types/logger"
+ "tailscale.com/util/mak"
)
// Filter is a stateful packet filter.
@@ -56,8 +58,106 @@ type Filter struct {
// filterState is a state cache of past seen packets.
type filterState struct {
- mu sync.Mutex
+ mu sync.Mutex // guards following
+
+ // lru is the flow track cached used by UDP & SCTP.
lru *flowtrack.Cache // from flowtrack.Tuple -> nil
+
+ // tcpFlows tracks open TCP flows, both inbound and outbound. Regardless of
+ // which direction initiated it, the Tuple's Src is always the remote side
+ // and Dst is the local side.
+ tcpFlows map[flowtrack.Tuple]*TCPFlow
+}
+
+// OpenTCPFlows returns the set of open TCP flows in an unsorted order.
+func (f *Filter) OpenTCPFlows() []*TCPFlow {
+ st := f.state
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ ret := make([]*TCPFlow, 0, len(st.tcpFlows))
+ for _, f := range st.tcpFlows {
+ ret = append(ret, f)
+ }
+ return ret
+}
+
+type TCPFlow struct {
+ flowtrack.Tuple
+ Out bool // was an outbound connection (from local tailscale)
+ Created time.Time
+ // TODO(bradfitz): lastActivity mono.Time, once we can do it fast enough
+ // to update on all packets.
+
+ mu sync.Mutex // guards finIn/finOut; lock order: filterState.mu, then TCPFlow.mu
+ // finOut and finIn record whether we've seen a FIN packet in or out
+ // for this flow.
+ finOut bool
+ finIn bool
+}
+
+func (s *filterState) addOutgoingTCPFlow(t flowtrack.Tuple) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ log.Printf("XXX adding out flow %v", t)
+ mak.Set(&s.tcpFlows, t, &TCPFlow{
+ Tuple: t,
+ Out: true,
+ Created: time.Now(),
+ })
+}
+
+func (s *filterState) addIncomingTCPFlow(t flowtrack.Tuple) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ log.Printf("XXX adding in flow %v", t)
+ mak.Set(&s.tcpFlows, t, &TCPFlow{
+ Tuple: t,
+ Out: false,
+ Created: time.Now(),
+ })
+}
+
+func (s *filterState) removeTCPFlow(t flowtrack.Tuple) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ delete(s.tcpFlows, t)
+ log.Printf("XXX removing flow %v", t)
+}
+
+func (s *filterState) setFinOut(t flowtrack.Tuple) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if f, ok := s.tcpFlows[t]; ok {
+ f.mu.Lock()
+ f.finOut = true
+ done := f.finOut && f.finIn
+ f.mu.Unlock()
+ log.Printf("XXX FIN out for flow %v", t)
+ if done {
+ delete(s.tcpFlows, t)
+ log.Printf("XXX due to FINs, removing flow %v", t)
+ }
+ } else {
+ log.Printf("XXX FIN out for unknown flow %v", t)
+ }
+}
+
+func (s *filterState) setFinIn(t flowtrack.Tuple) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if f, ok := s.tcpFlows[t]; ok {
+ f.mu.Lock()
+ f.finIn = true
+ done := f.finOut && f.finIn
+ f.mu.Unlock()
+ log.Printf("XXX FIN in for flow %v", t)
+ if done {
+ delete(s.tcpFlows, t)
+ log.Printf("XXX due to FINs, removing flow %v", t)
+ }
+ } else {
+ log.Printf("XXX FIN in for unknown flow %v", t)
+ }
}
// lruMax is the size of the LRU cache in filterState.
@@ -416,12 +516,25 @@ func (f *Filter) runIn4(q *packet.Parsed) (r Response, why string) {
// can't be initiated without first sending a SYN.
// It happens to also be much faster.
// TODO(apenwarr): Skip the rest of decoding in this path?
- if !q.IsTCPSyn() {
- return Accept, "tcp non-syn"
+ if q.IsTCPSyn() {
+ if f.matches4.match(q) {
+ t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
+ f.state.addIncomingTCPFlow(t)
+ return Accept, "tcp ok"
+ }
+ return Drop, "no rules matched"
}
- if f.matches4.match(q) {
- return Accept, "tcp ok"
+ isFin := q.IsTCPFin()
+ isRst := q.IsTCPRst()
+ if isFin || isRst {
+ t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
+ if isFin {
+ f.state.setFinIn(t)
+ } else if isRst {
+ f.state.removeTCPFlow(t)
+ }
}
+ return Accept, "tcp non-syn"
case ipproto.UDP, ipproto.SCTP:
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
@@ -476,12 +589,25 @@ func (f *Filter) runIn6(q *packet.Parsed) (r Response, why string) {
// can't be initiated without first sending a SYN.
// It happens to also be much faster.
// TODO(apenwarr): Skip the rest of decoding in this path?
- if q.IPProto == ipproto.TCP && !q.IsTCPSyn() {
- return Accept, "tcp non-syn"
+ if q.IsTCPSyn() {
+ if f.matches6.match(q) {
+ t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
+ f.state.addIncomingTCPFlow(t)
+ return Accept, "tcp ok"
+ }
+ return Drop, "no rules matched"
}
- if f.matches6.match(q) {
- return Accept, "tcp ok"
+ isFin := q.IsTCPFin()
+ isRst := q.IsTCPRst()
+ if isFin || isRst {
+ t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
+ if isFin {
+ f.state.setFinIn(t)
+ } else if isRst {
+ f.state.removeTCPFlow(t)
+ }
}
+ return Accept, "tcp non-syn"
case ipproto.UDP, ipproto.SCTP:
t := flowtrack.Tuple{Proto: q.IPProto, Src: q.Src, Dst: q.Dst}
@@ -517,6 +643,23 @@ func (f *Filter) runOut(q *packet.Parsed) (r Response, why string) {
f.state.mu.Lock()
f.state.lru.Add(tuple, nil)
f.state.mu.Unlock()
+ case ipproto.TCP:
+ isSyn := q.IsTCPSyn()
+ isRst := q.IsTCPRst()
+ isFin := q.IsTCPFin()
+ if isSyn || isRst || isFin {
+ tuple := flowtrack.Tuple{
+ Proto: q.IPProto,
+ Src: q.Dst, Dst: q.Src, // src/dst reversed
+ }
+ if isSyn {
+ f.state.addOutgoingTCPFlow(tuple)
+ } else if isRst {
+ f.state.removeTCPFlow(tuple)
+ } else if isFin {
+ f.state.setFinOut(tuple)
+ }
+ }
}
return Accept, "ok out"
}