summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorSimeng He <simeng@tailscale.com>2021-07-07 13:35:30 -0400
committerSimeng He <simeng@tailscale.com>2021-07-07 13:35:30 -0400
commitccde87bee581ef4680c08569475dbfcb32762ddb (patch)
treefefca50a3584f661c5f097ea0d6fb25201fb1a5d
parentfcbabd35b2506c9d5bc961ba3444c32f9e25d289 (diff)
downloadtailscale-simenghe/tcpnodeping.tar.xz
tailscale-simenghe/tcpnodeping.zip
Signed-off-by: Simeng He <simeng@tailscale.com>
-rw-r--r--net/tstun/fake.go3
-rw-r--r--net/tstun/wrap.go127
-rw-r--r--tstest/integration/integration_test.go40
-rw-r--r--wgengine/magicsock/magicsock.go15
-rw-r--r--wgengine/netstack/netstack.go2
5 files changed, 88 insertions, 99 deletions
diff --git a/net/tstun/fake.go b/net/tstun/fake.go
index 09d68b6ba..83c573a55 100644
--- a/net/tstun/fake.go
+++ b/net/tstun/fake.go
@@ -6,6 +6,7 @@ package tstun
import (
"io"
+ "log"
"os"
"golang.zx2c4.com/wireguard/tun"
@@ -35,11 +36,13 @@ func (t *fakeTUN) Close() error {
}
func (t *fakeTUN) Read(out []byte, offset int) (int, error) {
+ log.Println("TSTUN : FAKE")
<-t.closechan
return 0, io.EOF
}
func (t *fakeTUN) Write(b []byte, n int) (int, error) {
+ log.Println("FAKE : Write Called")
select {
case <-t.closechan:
return 0, ErrClosed
diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go
index 226ea3e10..e3fde0df0 100644
--- a/net/tstun/wrap.go
+++ b/net/tstun/wrap.go
@@ -8,9 +8,11 @@ package tstun
import (
"errors"
+ "fmt"
"io"
"log"
"os"
+ "runtime/debug"
"sync"
"sync/atomic"
"time"
@@ -141,6 +143,8 @@ type tunReadResult struct {
}
func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper {
+ fmt.Printf("Tunnel Type %T", tdev)
+ debug.PrintStack()
tun := &Wrapper{
logf: logger.WithPrefix(logf, "tstun: "),
tdev: tdev,
@@ -276,8 +280,8 @@ func allowSendOnClosedChannel() {
// This is needed because t.tdev.Read in general may block (it does on Windows),
// so packets may be stuck in t.outbound if t.Read called t.tdev.Read directly.
func (t *Wrapper) poll() {
-<<<<<<< HEAD
defer allowSendOnClosedChannel() // for send to t.outbound
+ log.Println("TSTUN : POLL Started with len ", len(t.bufferConsumed))
for range t.bufferConsumed {
var n int
var err error
@@ -289,55 +293,20 @@ func (t *Wrapper) poll() {
// We don't need this loop for correctness,
// but wireguard-go will skip an empty read,
// so we might as well avoid the send through t.outbound.
+ log.Println("TSTUN : BEFORE READ")
for n == 0 && err == nil {
+ log.Println("TSTUN : BEFORE READ IN FOR")
if t.isClosed() {
-=======
- for {
- log.Println("TSTUN: POLL START")
- select {
- case <-t.closed:
- return
- case <-t.bufferConsumed:
- // continue
- }
-
- log.Println("TSTUN: AFTER FIRST SELECT")
-
- // Read may use memory in t.buffer before PacketStartOffset for mandatory headers.
- // This is the rationale behind the tun.Wrapper.{Read,Write} interfaces
- // and the reason t.buffer has size MaxMessageSize and not MaxContentSize.
- n, err := t.tdev.Read(t.buffer[:], PacketStartOffset)
- log.Println("TSTUN : readerr,", err)
- if err != nil {
- select {
- case <-t.closed:
->>>>>>> Many logs
+ log.Println("TSTUN : BEFORE T CLOSED")
return
}
-<<<<<<< HEAD
n, err = t.tdev.Read(t.buffer[:], PacketStartOffset)
-=======
- continue
- }
-
- // Wireguard will skip an empty read,
- // so we might as well do it here to avoid the send through t.outbound.
- if n == 0 {
- t.bufferConsumed <- struct{}{}
- continue
- }
- log.Println("TSTUN : after err check", err)
-
- select {
- case <-t.closed:
- return
- case t.outbound <- t.buffer[PacketStartOffset : PacketStartOffset+n]:
- log.Println("TSTUN : Outbound Sent To")
- // continue
->>>>>>> Many logs
}
+ log.Println("TSTUN : BEFORE OUTBOUND")
t.outbound <- tunReadResult{data: t.buffer[PacketStartOffset : PacketStartOffset+n], err: err}
+ log.Println("TSTUN : sent to outbound")
}
+ log.Println("TSTUN : POLL FINISHED")
}
var magicDNSIPPort = netaddr.MustParseIPPort("100.100.100.100:0")
@@ -392,13 +361,16 @@ func (t *Wrapper) IdleDuration() time.Duration {
}
func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
-<<<<<<< HEAD
+ now := time.Now()
res, ok := <-t.outbound
+ log.Println("TSTUN : outbound wait for channel read time, ", time.Since(now))
if !ok {
// Wrapper is closed.
+ log.Println("TSTUN : EOF")
return 0, io.EOF
}
if res.err != nil {
+ log.Println("TSTUN : err: ", res.err)
return 0, res.err
}
defer allowSendOnClosedChannel() // for send to t.bufferConsumed
@@ -411,42 +383,11 @@ func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
if !isInjectedPacket {
// We are done with t.buffer. Let poll re-use it.
t.bufferConsumed <- struct{}{}
-=======
- log.Println("TSTUN: WRAPPERREAD")
- now := time.Now()
- var n int
-
- wasInjectedPacket := false
-
- log.Println("TSTUN : Before Select")
- select {
- case <-t.closed:
- log.Println("TSTUN : EOF")
- return 0, io.EOF
- case err := <-t.errors:
- log.Println("TSTUN : ", err)
- return 0, err
- case pkt := <-t.outbound:
- log.Println("TSTUN : t<-outbound")
- n = copy(buf[offset:], pkt)
- // t.buffer has a fixed location in memory,
- // so this is the easiest way to tell when it has been consumed.
- // &pkt[0] can be used because empty packets do not reach t.outbound.
- if &pkt[0] == &t.buffer[PacketStartOffset] {
- t.bufferConsumed <- struct{}{}
- } else {
- // If the packet is not from t.buffer, then it is an injected packet.
- wasInjectedPacket = true
- }
->>>>>>> Many logs
}
- log.Println("TSTUN : After SELECT, took ", time.Since(now))
-
p := parsedPacketPool.Get().(*packet.Parsed)
defer parsedPacketPool.Put(p)
p.Decode(buf[offset : offset+n])
- log.Println("TSTUN : After DECODE, took ", time.Since(now))
if m, ok := t.destIPActivity.Load().(map[netaddr.IP]func()); ok {
if fn := m[p.Dst.IP()]; fn != nil {
@@ -454,22 +395,8 @@ func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
}
}
-<<<<<<< HEAD
// Do not filter injected packets.
if !isInjectedPacket && !t.disableFilter {
-=======
- log.Println("TSTUN : After DestIP, took ", time.Since(now))
-
- // For injected packets, we return early to bypass filtering.
- if wasInjectedPacket {
- log.Println("TSTUN : WasInjectedPacket, took ", time.Since(now))
- t.noteActivity()
- log.Println("TSTUN BODY: ", n)
- return n, nil
- }
-
- if !t.disableFilter {
->>>>>>> Many logs
response := t.filterOut(p)
if response != filter.Accept {
// Wireguard considers read errors fatal; pretend nothing was read
@@ -478,11 +405,13 @@ func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
}
t.noteActivity()
- log.Printf("TSTUN: Full read took %vs", time.Since(now))
+ log.Printf("TSTUN : Read Completed in %v\n", time.Since(now).Seconds())
return n, nil
}
func (t *Wrapper) filterIn(buf []byte) filter.Response {
+ log.Println("TSTUN : Filter In called")
+ now := time.Now()
p := parsedPacketPool.Get().(*packet.Parsed)
defer parsedPacketPool.Put(p)
p.Decode(buf)
@@ -498,12 +427,14 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
}
}
}
+ log.Println("TSTUN : Filter In After TSMP")
if t.PreFilterIn != nil {
if res := t.PreFilterIn(p, t); res.IsDrop() {
return res
}
}
+ log.Println("TSTUN : Filter In After PreFilter")
filt, _ := t.filter.Load().(*filter.Filter)
@@ -512,6 +443,7 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
}
outcome := filt.RunIn(p, t.filterFlags)
+ log.Println("TSTUN : Filter In After Outcome")
// Let peerapi through the filter; its ACLs are handled at L7,
// not at the packet level.
@@ -523,14 +455,17 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
outcome = filter.Accept
}
}
+ log.Println("TSTUN : Filter In After Outcome check2 type : ", outcome.String())
if outcome != filter.Accept {
+ log.Println("TSTUN : Filter In After Outcome check3")
// Tell them, via TSMP, we're dropping them due to the ACL.
// Their host networking stack can translate this into ICMP
// or whatnot as required. But notably, their GUI or tailscale CLI
// can show them a rejection history with reasons.
if p.IPVersion == 4 && p.IPProto == ipproto.TCP && p.TCPFlags&packet.TCPSyn != 0 && !t.disableTSMPRejected {
+ log.Println("TSTUN : Filter In After Outcome check4")
rj := packet.TailscaleRejectedHeader{
IPSrc: p.Dst.IP(),
IPDst: p.Src.IP(),
@@ -544,18 +479,21 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
}
pkt := packet.Generate(rj, nil)
t.InjectOutbound(pkt)
+ log.Println("TSTUN : FilterIn Inject took ,", time.Since(now))
// TODO(bradfitz): also send a TCP RST, after the TSMP message.
}
return filter.Drop
}
+ log.Println("TSTUN : Filter In After Outcome check5")
if t.PostFilterIn != nil {
if res := t.PostFilterIn(p, t); res.IsDrop() {
return res
}
}
+ log.Println("TSTUN : Filter In After Outcome check6")
return filter.Accept
}
@@ -563,6 +501,7 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
// Write accepts an incoming packet. The packet begins at buf[offset:],
// like wireguard-go/tun.Device.Write.
func (t *Wrapper) Write(buf []byte, offset int) (int, error) {
+ now := time.Now()
if !t.disableFilter {
if t.filterIn(buf[offset:]) != filter.Accept {
// If we're not accepting the packet, lie to wireguard-go and pretend
@@ -578,11 +517,16 @@ func (t *Wrapper) Write(buf []byte, offset int) (int, error) {
// device/receive.go: _, err = device.tun.device.Write(....)
//
// TODO(bradfitz): fix upstream interface docs, implementation.
+ log.Println("TSTUN : Write completed early in ", time.Since(now))
return len(buf), nil
}
}
+ // Causes a data race ???
+ // defer log.Println("TSTUN : Write completed in ", time.Since(now))
+ // debug.PrintStack()
t.noteActivity()
+ // log.Println("TSTUN : Write completed in ", time.Since(now))
return t.tdev.Write(buf, offset)
}
@@ -639,6 +583,7 @@ func (t *Wrapper) InjectInboundCopy(packet []byte) error {
}
func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingRequest) {
+ now := time.Now()
pong := packet.TSMPPongReply{
Data: req.Data,
}
@@ -659,6 +604,7 @@ func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingReque
}
t.InjectOutbound(packet.Generate(pong, nil))
+ log.Println("TSTUN : Inject outbound pong took ", time.Since(now))
}
// InjectOutbound makes the Wrapper device behave as if a packet
@@ -667,6 +613,8 @@ func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingReque
// The injected packet will not pass through outbound filters.
// Injecting an empty packet is a no-op.
func (t *Wrapper) InjectOutbound(packet []byte) error {
+ log.Println("TSTUN: Inject Outbound")
+ now := time.Now()
if len(packet) > MaxPacketSize {
return errPacketTooBig
}
@@ -675,6 +623,7 @@ func (t *Wrapper) InjectOutbound(packet []byte) error {
}
defer allowSendOnClosedChannel() // for send to t.outbound
t.outbound <- tunReadResult{data: packet}
+ log.Println("TSTUN : Inject took ", time.Since(now))
return nil
}
diff --git a/tstest/integration/integration_test.go b/tstest/integration/integration_test.go
index a19862cc5..d003017e7 100644
--- a/tstest/integration/integration_test.go
+++ b/tstest/integration/integration_test.go
@@ -298,12 +298,12 @@ func TestTwoNodeConnectivity(t *testing.T) {
// Create two nodes and hope that logs come out correctly
n1 := newTestNode(t, env)
n1SocksAddrCh := n1.socks5AddrChan()
- d1 := n1.StartDaemon(t)
+ d1 := n1.StartDaemonPrefix(t, "Node1 ")
defer d1.Kill()
n2 := newTestNode(t, env)
n2SocksAddrCh := n2.socks5AddrChan()
- d2 := n2.StartDaemon(t)
+ d2 := n2.StartDaemonPrefix(t, "Node2 ")
defer d2.Kill()
n1Socks := n1.AwaitSocksAddr(t, n1SocksAddrCh)
@@ -402,11 +402,11 @@ func TestTwoNodeConnectivity(t *testing.T) {
}
// Read the bytes in
- // p := make([]byte, 1024)
- // _, err = dialerConn.Read(p)
- // if err != nil {
- // return err
- // }
+ p := make([]byte, 1024)
+ _, err = dialerConn.Read(p)
+ if err != nil {
+ return err
+ }
t.Logf("Time taken for this run : %vs", time.Since(now).Seconds())
return nil
}); err != nil {
@@ -609,9 +609,29 @@ func (d *Daemon) MustCleanShutdown(t testing.TB) {
}
}
+type PrefixedWriter struct {
+ prefix string
+ w io.Writer
+}
+
+func (p *PrefixedWriter) Write(b []byte) (int, error) {
+ var buf []byte
+ buf = append(buf, p.prefix...)
+ buf = append(buf, b...)
+ _, err := p.w.Write(buf)
+ if err != nil {
+ return 0, err
+ }
+ return len(b), err
+}
+
+func (n *testNode) StartDaemon(t testing.TB) *Daemon {
+ return n.StartDaemonPrefix(t, "")
+}
+
// StartDaemon starts the node's tailscaled, failing if it fails to
// start.
-func (n *testNode) StartDaemon(t testing.TB) *Daemon {
+func (n *testNode) StartDaemonPrefix(t testing.TB, prefix string) *Daemon {
cmd := exec.Command(n.env.Binaries.Daemon,
"--tun=userspace-networking",
"--state="+n.stateFile,
@@ -625,8 +645,8 @@ func (n *testNode) StartDaemon(t testing.TB) *Daemon {
)
cmd.Stderr = &nodeOutputParser{n: n}
if *verboseTailscaled {
- cmd.Stdout = os.Stdout
- cmd.Stderr = io.MultiWriter(cmd.Stderr, os.Stderr)
+ cmd.Stdout = &PrefixedWriter{prefix: prefix, w: os.Stdout}
+ cmd.Stderr = io.MultiWriter(cmd.Stderr, &PrefixedWriter{prefix: prefix, w: os.Stderr})
}
if err := cmd.Start(); err != nil {
t.Fatalf("starting tailscaled: %v", err)
diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go
index ba3bc5a24..83de1465f 100644
--- a/wgengine/magicsock/magicsock.go
+++ b/wgengine/magicsock/magicsock.go
@@ -1632,6 +1632,7 @@ func (c *Conn) receiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) {
// ok is whether this read should be reported up to wireguard-go (our
// caller).
func (c *Conn) receiveIP(b []byte, ipp netaddr.IPPort, cache *ippEndpointCache) (ep conn.Endpoint, ok bool) {
+ log.Println("MS : Received IP")
if stun.Is(b) {
c.stunReceiveFunc.Load().(func([]byte, netaddr.IPPort))(b, ipp)
return nil, false
@@ -1639,6 +1640,7 @@ func (c *Conn) receiveIP(b []byte, ipp netaddr.IPPort, cache *ippEndpointCache)
if c.handleDiscoMessage(b, ipp) {
return nil, false
}
+ log.Println("MS : ReceivedIP called HandleDiscoMessage")
if !c.havePrivateKey.Get() {
// If we have no private key, we're logged out or
// stopped. Don't try to pass these wireguard packets
@@ -1685,6 +1687,7 @@ func (c *connBind) receiveDERP(b []byte) (n int, ep conn.Endpoint, err error) {
}
func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep conn.Endpoint) {
+ log.Println("MS : Process DERP READ RESULT")
if dm.copyBuf == nil {
return 0, nil
}
@@ -1699,8 +1702,10 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep con
ipp := netaddr.IPPortFrom(derpMagicIPAddr, uint16(regionID))
if c.handleDiscoMessage(b[:n], ipp) {
+ log.Println("MS : c.HandleDiscoMessage")
return 0, nil
}
+ log.Println("MS : ProcessDerp HandleDiscoMessage")
var (
didNoteRecvActivity bool
@@ -1764,6 +1769,7 @@ const (
)
func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey tailcfg.NodeKey, dstDisco tailcfg.DiscoKey, m disco.Message, logLevel discoLogLevel) (sent bool, err error) {
+ log.Println("MS: SENDDISCOMESSAGE")
c.mu.Lock()
if c.closed {
c.mu.Unlock()
@@ -1936,6 +1942,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) (isDiscoMsg bo
switch dm := dm.(type) {
case *disco.Ping:
+ log.Println("Pinger", de)
c.handlePingLocked(dm, de, src, sender, peerNode)
case *disco.Pong:
log.Println("Ponger", de)
@@ -1957,6 +1964,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) (isDiscoMsg bo
go de.handleCallMeMaybe(dm)
}
}
+ log.Println("DISCO MESSAGE COMPLETE")
return
}
@@ -2655,6 +2663,7 @@ func (c *Conn) listenPacket(network string, host netaddr.IP, port uint16) (net.P
// If curPortFate is set to dropCurrentPort, no attempt is made to reuse
// the current port.
func (c *Conn) bindSocket(rucPtr **RebindingUDPConn, network string, curPortFate currentPortFate) error {
+ log.Println("MS: BINDSOCKET")
var host netaddr.IP
if inTest() && !c.simulatedNetwork {
switch network {
@@ -2842,6 +2851,7 @@ func (c *RebindingUDPConn) currentConn() net.PacketConn {
// ReadFrom reads a packet from c into b.
// It returns the number of bytes copied and the source address.
func (c *RebindingUDPConn) ReadFrom(b []byte) (int, net.Addr, error) {
+ log.Println("MS : ReadFrom ", c.pconn.LocalAddr())
for {
pconn := c.currentConn()
n, addr, err := pconn.ReadFrom(b)
@@ -2927,6 +2937,9 @@ func (c *RebindingUDPConn) closeLocked() error {
func (c *RebindingUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
for {
+ log.Printf("MS : WRITETO %v", c.pconn.LocalAddr())
+ log.Println(string(b))
+ log.Println(b)
c.mu.Lock()
pconn := c.pconn
c.mu.Unlock()
@@ -2941,6 +2954,7 @@ func (c *RebindingUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
continue
}
}
+ log.Println("MS : WRITETO complete ", n, err)
return n, err
}
}
@@ -2968,6 +2982,7 @@ func (c *blockForeverConn) ReadFrom(p []byte) (n int, addr net.Addr, err error)
}
func (c *blockForeverConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
+ log.Println("DROP WRITE")
// Silently drop writes.
return len(p), nil
}
diff --git a/wgengine/netstack/netstack.go b/wgengine/netstack/netstack.go
index 4bd00f041..c86641b43 100644
--- a/wgengine/netstack/netstack.go
+++ b/wgengine/netstack/netstack.go
@@ -399,6 +399,7 @@ func (ns *Impl) DialContextUDP(ctx context.Context, addr string) (*gonet.UDPConn
}
func (ns *Impl) injectOutbound() {
+ log.Println("NETSTACK Inject outbound")
for {
packetInfo, ok := ns.linkEP.ReadContext(context.Background())
if !ok {
@@ -431,6 +432,7 @@ func (ns *Impl) isLocalIP(ip netaddr.IP) bool {
}
func (ns *Impl) injectInbound(p *packet.Parsed, t *tstun.Wrapper) filter.Response {
+ log.Println("NETSTACK: injectInbound")
if ns.onlySubnets && ns.isLocalIP(p.Dst.IP()) {
// In hybrid ("only subnets") mode, bail out early if
// the traffic is destined for an actual Tailscale