summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--net/tunstats/stats.go367
-rw-r--r--net/tunstats/stats_test.go325
2 files changed, 692 insertions, 0 deletions
diff --git a/net/tunstats/stats.go b/net/tunstats/stats.go
new file mode 100644
index 000000000..9d1a8ae55
--- /dev/null
+++ b/net/tunstats/stats.go
@@ -0,0 +1,367 @@
+// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package tunstats maintains statistics about connections
+// flowing through a TUN device (which operate at the IP layer).
+package tunstats
+
+import (
+ "encoding/binary"
+ "hash/maphash"
+ "math/bits"
+ "net/netip"
+ "sync"
+ "sync/atomic"
+
+ "tailscale.com/net/flowtrack"
+ "tailscale.com/types/ipproto"
+)
+
+// Statistics maintains counters for every connection.
+// All methods are safe for concurrent use.
+// The zero value is ready for use.
+type Statistics struct {
+ v4 hashTable[addrsPortsV4]
+ v6 hashTable[addrsPortsV6]
+}
+
+// Counts are statistics about a particular connection.
+type Counts struct {
+ TxPackets uint64 `json:"txPkts,omitempty"`
+ TxBytes uint64 `json:"txBytes,omitempty"`
+ RxPackets uint64 `json:"rxPkts,omitempty"`
+ RxBytes uint64 `json:"rxBytes,omitempty"`
+}
+
+const (
+ minTableLen = 8
+ maxProbeLen = 64
+)
+
+// hashTable is a hash table that uses open addressing with probing.
+// See https://en.wikipedia.org/wiki/Hash_table#Open_addressing.
+// The primary table is in the active field and can be retrieved atomically.
+// In the common case, this data structure is mostly lock free.
+//
+// If the current table is too small, a new table is allocated that
+// replaces the current active table. The contents of the older table are
+// NOT copied to the new table, but rather the older table is appended
+// to a list of outgrown tables. Re-growth happens under a lock,
+// but is expected to happen rarely as the table size grows exponentially.
+//
+// To reduce memory usage, the counters uses 32-bit unsigned integers,
+// which carry the risk of overflowing. If an overflow is detected,
+// we add the amount overflowed to the overflow map. This is a naive Go map
+// protected by a sync.Mutex. Overflow is rare that contention is not a concern.
+//
+// To extract all counters, we replace the active table with a zeroed table,
+// and clear out the outgrown and overflow tables.
+// We take advantage of the fact that all the tables can be merged together
+// by simply adding up all the counters for each connection.
+type hashTable[AddrsPorts addrsPorts] struct {
+ // TODO: Get rid of this. It is just an atomic update in the common case,
+ // but contention updating the same word still incurs a 25% performance hit.
+ mu sync.RWMutex // RLock held while updating, Lock held while extracting
+
+ active atomic.Pointer[countsTable[AddrsPorts]]
+ inserts atomic.Uint32 // heuristic for next active table to allocate
+
+ muGrow sync.Mutex // muGrow.Lock implies that mu.RLock held
+ outgrown []countsTable[AddrsPorts]
+
+ muOverflow sync.Mutex // muOverflow.Lock implies that mu.RLock held
+ overflow map[flowtrack.Tuple]Counts
+}
+
+type countsTable[AddrsPorts addrsPorts] []counts[AddrsPorts]
+
+func (t *countsTable[AddrsPorts]) len() int {
+ if t == nil {
+ return 0
+ }
+ return len(*t)
+}
+
+type counts[AddrsPorts addrsPorts] struct {
+ // initProto is both an initialization flag and the IP protocol.
+ // It is 0 if uninitialized, 1 if initializing, and
+ // 2+ipproto.Proto if initialized.
+ initProto atomic.Uint32
+
+ addrsPorts AddrsPorts // only valid if initProto is initialized
+
+ txPackets atomic.Uint32
+ txBytes atomic.Uint32
+ rxPackets atomic.Uint32
+ rxBytes atomic.Uint32
+}
+
+// NOTE: There is some degree of duplicated code.
+// For example, the functionality to swap the addrsPorts and compute the hash
+// should be performed by hashTable.update rather than Statistics.update.
+// However, Go generics cannot invoke pointer methods on addressable values.
+// See https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#no-way-to-require-pointer-methods
+
+type addrsPorts interface {
+ comparable
+ asTuple(ipproto.Proto) flowtrack.Tuple
+}
+
+type addrsPortsV4 [4 + 4 + 2 + 2]byte
+
+func (x *addrsPortsV4) addrs() *[8]byte { return (*[8]byte)(x[:]) }
+func (x *addrsPortsV4) ports() *[4]byte { return (*[4]byte)(x[8:]) }
+func (x *addrsPortsV4) swap() {
+ *(*[4]byte)(x[0:]), *(*[4]byte)(x[4:]) = *(*[4]byte)(x[4:]), *(*[4]byte)(x[0:])
+ *(*[2]byte)(x[8:]), *(*[2]byte)(x[10:]) = *(*[2]byte)(x[10:]), *(*[2]byte)(x[8:])
+}
+func (x addrsPortsV4) asTuple(proto ipproto.Proto) flowtrack.Tuple {
+ return flowtrack.Tuple{Proto: proto,
+ Src: netip.AddrPortFrom(netip.AddrFrom4(*(*[4]byte)(x[0:])), binary.BigEndian.Uint16(x[8:])),
+ Dst: netip.AddrPortFrom(netip.AddrFrom4(*(*[4]byte)(x[4:])), binary.BigEndian.Uint16(x[10:])),
+ }
+}
+
+type addrsPortsV6 [16 + 16 + 2 + 2]byte
+
+func (x *addrsPortsV6) addrs() *[32]byte { return (*[32]byte)(x[:]) }
+func (x *addrsPortsV6) ports() *[4]byte { return (*[4]byte)(x[32:]) }
+func (x *addrsPortsV6) swap() {
+ *(*[16]byte)(x[0:]), *(*[16]byte)(x[16:]) = *(*[16]byte)(x[16:]), *(*[16]byte)(x[0:])
+ *(*[2]byte)(x[32:]), *(*[2]byte)(x[34:]) = *(*[2]byte)(x[34:]), *(*[2]byte)(x[32:])
+}
+func (x addrsPortsV6) asTuple(proto ipproto.Proto) flowtrack.Tuple {
+ return flowtrack.Tuple{Proto: proto,
+ Src: netip.AddrPortFrom(netip.AddrFrom16(*(*[16]byte)(x[0:])), binary.BigEndian.Uint16(x[32:])),
+ Dst: netip.AddrPortFrom(netip.AddrFrom16(*(*[16]byte)(x[16:])), binary.BigEndian.Uint16(x[34:])),
+ }
+}
+
+// UpdateTx updates the statistics for a transmitted IP packet.
+func (s *Statistics) UpdateTx(b []byte) {
+ s.update(b, false)
+}
+
+// UpdateRx updates the statistics for a received IP packet.
+func (s *Statistics) UpdateRx(b []byte) {
+ s.update(b, true)
+}
+
+var seed = maphash.MakeSeed()
+
+func (s *Statistics) update(b []byte, receive bool) {
+ switch {
+ case len(b) >= 20 && b[0]>>4 == 4: // IPv4
+ proto := ipproto.Proto(b[9])
+ hasPorts := proto == ipproto.TCP || proto == ipproto.UDP
+ var addrsPorts addrsPortsV4
+ if hdrLen := int(4 * (b[0] & 0xf)); hdrLen == 20 && len(b) >= 24 && hasPorts {
+ addrsPorts = *(*addrsPortsV4)(b[12:]) // addresses and ports are contiguous
+ } else {
+ *addrsPorts.addrs() = *(*[8]byte)(b[12:])
+ // May have IPv4 options in-between address and ports.
+ if len(b) >= hdrLen+4 && hasPorts {
+ *addrsPorts.ports() = *(*[4]byte)(b[hdrLen:])
+ }
+ }
+ if receive {
+ addrsPorts.swap()
+ }
+ hash := maphash.Bytes(seed, addrsPorts[:]) ^ uint64(proto) // TODO: Hash proto better?
+ s.v4.update(receive, proto, &addrsPorts, hash, uint32(len(b)))
+ return
+ case len(b) >= 40 && b[0]>>4 == 6: // IPv6
+ proto := ipproto.Proto(b[6])
+ hasPorts := proto == ipproto.TCP || proto == ipproto.UDP
+ var addrsPorts addrsPortsV6
+ if len(b) >= 44 && hasPorts {
+ addrsPorts = *(*addrsPortsV6)(b[8:]) // addresses and ports are contiguous
+ } else {
+ *addrsPorts.addrs() = *(*[32]byte)(b[8:])
+ // TODO: Support IPv6 extension headers?
+ if hdrLen := 40; len(b) > hdrLen+4 && hasPorts {
+ *addrsPorts.ports() = *(*[4]byte)(b[hdrLen:])
+ }
+ }
+ if receive {
+ addrsPorts.swap()
+ }
+ hash := maphash.Bytes(seed, addrsPorts[:]) ^ uint64(proto) // TODO: Hash proto better?
+ s.v6.update(receive, proto, &addrsPorts, hash, uint32(len(b)))
+ return
+ }
+ // TODO: Track malformed packets?
+}
+
+func (h *hashTable[AddrsPorts]) update(receive bool, proto ipproto.Proto, addrsPorts *AddrsPorts, hash uint64, size uint32) {
+ h.mu.RLock()
+ defer h.mu.RUnlock()
+
+ table := h.active.Load()
+ for {
+ // Start with an initialized table.
+ if table.len() == 0 {
+ table = h.grow(table)
+ }
+
+ // Try to update an entry in the currently active table.
+ for i := 0; i < len(*table) && i < maxProbeLen; i++ {
+ probe := uint64(i) // linear probing for small tables
+ if len(*table) > 2*maxProbeLen {
+ probe *= probe // quadratic probing for large tables
+ }
+ entry := &(*table)[(hash+probe)%uint64(len(*table))]
+
+ // Spin-lock waiting for the entry to be initialized,
+ // which should be quick as it only stores the AddrsPort.
+ retry:
+ switch initProto := entry.initProto.Load(); initProto {
+ case 0: // uninitialized
+ if !entry.initProto.CompareAndSwap(0, 1) {
+ goto retry // raced with another initialization attempt
+ }
+ entry.addrsPorts = *addrsPorts
+ entry.initProto.Store(uint32(proto) + 2) // initialization done
+ h.inserts.Add(1)
+ case 1: // initializing
+ goto retry
+ default: // initialized
+ if ipproto.Proto(initProto-2) != proto || entry.addrsPorts != *addrsPorts {
+ continue // this entry is for a different connection; try next entry
+ }
+ }
+
+ // Atomically update the counters for the connection entry.
+ var overflowPackets, overflowBytes bool
+ if receive {
+ overflowPackets = entry.rxPackets.Add(1) < 1
+ overflowBytes = entry.rxBytes.Add(size) < size
+ } else {
+ overflowPackets = entry.txPackets.Add(1) < 1
+ overflowBytes = entry.txBytes.Add(size) < size
+ }
+ if overflowPackets || overflowBytes {
+ h.updateOverflow(receive, proto, addrsPorts, overflowPackets, overflowBytes)
+ }
+ return
+ }
+
+ // Unable to update, so grow the table and try again.
+ // TODO: Use overflow map instead if table utilization is too low.
+ table = h.grow(table)
+ }
+}
+
+// grow grows the table unless the active table is larger than oldTable.
+func (h *hashTable[AddrsPorts]) grow(oldTable *countsTable[AddrsPorts]) (newTable *countsTable[AddrsPorts]) {
+ h.muGrow.Lock()
+ defer h.muGrow.Unlock()
+
+ if newTable = h.active.Load(); newTable.len() > oldTable.len() {
+ return newTable // raced with another grow
+ }
+ newTable = new(countsTable[AddrsPorts])
+ if oldTable.len() == 0 {
+ *newTable = make(countsTable[AddrsPorts], minTableLen)
+ } else {
+ *newTable = make(countsTable[AddrsPorts], 2*len(*oldTable))
+ h.outgrown = append(h.outgrown, *oldTable)
+ }
+ h.active.Store(newTable)
+ return newTable
+}
+
+// updateOverflow updates the overflow map for counters that overflowed.
+// Using 32-bit counters, this condition happens rarely as it only triggers
+// after every 4 GiB of unidirectional network traffic on the same connection.
+func (h *hashTable[AddrsPorts]) updateOverflow(receive bool, proto ipproto.Proto, addrsPorts *AddrsPorts, overflowPackets, overflowBytes bool) {
+ h.muOverflow.Lock()
+ defer h.muOverflow.Unlock()
+ if h.overflow == nil {
+ h.overflow = make(map[flowtrack.Tuple]Counts)
+ }
+ tuple := (*addrsPorts).asTuple(proto)
+ cnts := h.overflow[tuple]
+ if overflowPackets {
+ if receive {
+ cnts.RxPackets += 1 << 32
+ } else {
+ cnts.TxPackets += 1 << 32
+ }
+ }
+ if overflowBytes {
+ if receive {
+ cnts.RxBytes += 1 << 32
+ } else {
+ cnts.TxBytes += 1 << 32
+ }
+ }
+ h.overflow[tuple] = cnts
+}
+
+func (h *hashTable[AddrsPorts]) extractInto(out map[flowtrack.Tuple]Counts) {
+ // Allocate a new table based on previous usage.
+ var newTable *countsTable[AddrsPorts]
+ if numInserts := h.inserts.Load(); numInserts > 0 {
+ newLen := 1 << bits.Len(uint(4*numInserts/3)|uint(minTableLen-1))
+ newTable = new(countsTable[AddrsPorts])
+ *newTable = make(countsTable[AddrsPorts], newLen)
+ }
+
+ // Swap out the old tables for new tables.
+ // We do not need to lock h.muGrow or h.muOverflow since holding h.mu
+ // implies that nothing else could be holding those locks.
+ h.mu.Lock()
+ oldTable := h.active.Swap(newTable)
+ oldOutgrown := h.outgrown
+ oldOverflow := h.overflow
+ h.outgrown = nil
+ h.overflow = nil
+ h.inserts.Store(0)
+ h.mu.Unlock()
+
+ // Merge tables into output.
+ if oldTable != nil {
+ mergeTable(out, *oldTable)
+ }
+ for _, table := range oldOutgrown {
+ mergeTable(out, table)
+ }
+ mergeMap(out, oldOverflow)
+}
+
+// Extract extracts and resets the counters for all active connections.
+// It must be called periodically otherwise the memory used is unbounded.
+func (s *Statistics) Extract() map[flowtrack.Tuple]Counts {
+ out := make(map[flowtrack.Tuple]Counts)
+ s.v4.extractInto(out)
+ s.v6.extractInto(out)
+ return out
+}
+
+func mergeTable[AddrsPorts addrsPorts](dst map[flowtrack.Tuple]Counts, src countsTable[AddrsPorts]) {
+ for i := range src {
+ entry := &src[i]
+ if initProto := entry.initProto.Load(); initProto > 0 {
+ tuple := entry.addrsPorts.asTuple(ipproto.Proto(initProto - 2))
+ cnts := dst[tuple]
+ cnts.TxPackets += uint64(entry.txPackets.Load())
+ cnts.TxBytes += uint64(entry.txBytes.Load())
+ cnts.RxPackets += uint64(entry.rxPackets.Load())
+ cnts.RxBytes += uint64(entry.rxBytes.Load())
+ dst[tuple] = cnts
+ }
+ }
+}
+
+func mergeMap(dst, src map[flowtrack.Tuple]Counts) {
+ for tuple, cntsSrc := range src {
+ cntsDst := dst[tuple]
+ cntsDst.TxPackets += cntsSrc.TxPackets
+ cntsDst.TxBytes += cntsSrc.TxBytes
+ cntsDst.RxPackets += cntsSrc.RxPackets
+ cntsDst.RxBytes += cntsSrc.RxBytes
+ dst[tuple] = cntsDst
+ }
+}
diff --git a/net/tunstats/stats_test.go b/net/tunstats/stats_test.go
new file mode 100644
index 000000000..6992d99e6
--- /dev/null
+++ b/net/tunstats/stats_test.go
@@ -0,0 +1,325 @@
+// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package tunstats
+
+import (
+ "encoding/binary"
+ "fmt"
+ "hash/maphash"
+ "math"
+ "runtime"
+ "sync"
+ "testing"
+
+ qt "github.com/frankban/quicktest"
+ "tailscale.com/net/flowtrack"
+ "tailscale.com/types/ipproto"
+)
+
+type SimpleStatistics struct {
+ mu sync.Mutex
+ m map[flowtrack.Tuple]Counts
+}
+
+func (s *SimpleStatistics) UpdateTx(b []byte) {
+ s.update(b, false)
+}
+func (s *SimpleStatistics) UpdateRx(b []byte) {
+ s.update(b, true)
+}
+func (s *SimpleStatistics) update(b []byte, receive bool) {
+ var tuple flowtrack.Tuple
+ var size uint64
+ if len(b) >= 1 {
+ // This logic is mostly copied from Statistics.update.
+ switch v := b[0] >> 4; {
+ case v == 4 && len(b) >= 20: // IPv4
+ proto := ipproto.Proto(b[9])
+ size = uint64(binary.BigEndian.Uint16(b[2:]))
+ var addrsPorts addrsPortsV4
+ *(*[8]byte)(addrsPorts[0:]) = *(*[8]byte)(b[12:])
+ if hdrLen := int(4 * (b[0] & 0xf)); len(b) >= hdrLen+4 && (proto == ipproto.TCP || proto == ipproto.UDP) {
+ *(*[4]byte)(addrsPorts[8:]) = *(*[4]byte)(b[hdrLen:])
+ }
+ if receive {
+ addrsPorts.swap()
+ }
+ tuple = addrsPorts.asTuple(proto)
+ case v == 6 && len(b) >= 40: // IPv6
+ proto := ipproto.Proto(b[6])
+ size = uint64(binary.BigEndian.Uint16(b[4:]))
+ var addrsPorts addrsPortsV6
+ *(*[32]byte)(addrsPorts[0:]) = *(*[32]byte)(b[8:])
+ if hdrLen := 40; len(b) > hdrLen+4 && (proto == ipproto.TCP || proto == ipproto.UDP) {
+ *(*[4]byte)(addrsPorts[32:]) = *(*[4]byte)(b[hdrLen:])
+ }
+ if receive {
+ addrsPorts.swap()
+ }
+ tuple = addrsPorts.asTuple(proto)
+ default:
+ return // non-IP packet
+ }
+ } else {
+ return // invalid packet
+ }
+
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.m == nil {
+ s.m = make(map[flowtrack.Tuple]Counts)
+ }
+ cnts := s.m[tuple]
+ if receive {
+ cnts.RxPackets++
+ cnts.RxBytes += size
+ } else {
+ cnts.TxPackets++
+ cnts.TxBytes += size
+ }
+ s.m[tuple] = cnts
+}
+
+func TestEmpty(t *testing.T) {
+ c := qt.New(t)
+ var s Statistics
+ c.Assert(s.Extract(), qt.DeepEquals, map[flowtrack.Tuple]Counts{})
+ c.Assert(s.Extract(), qt.DeepEquals, map[flowtrack.Tuple]Counts{})
+}
+
+func TestOverflow(t *testing.T) {
+ c := qt.New(t)
+ var s Statistics
+ var cnts Counts
+
+ a := &addrsPortsV4{192, 168, 0, 1, 192, 168, 0, 2, 12, 34, 56, 78}
+ h := maphash.Bytes(seed, a[:])
+
+ cnts.TxPackets++
+ cnts.TxBytes += math.MaxUint32
+ s.v4.update(false, ipproto.UDP, a, h, math.MaxUint32)
+ for i := 0; i < 1e6; i++ {
+ cnts.TxPackets++
+ cnts.TxBytes += uint64(i)
+ s.v4.update(false, ipproto.UDP, a, h, uint32(i))
+ }
+ c.Assert(s.Extract(), qt.DeepEquals, map[flowtrack.Tuple]Counts{a.asTuple(ipproto.UDP): cnts})
+ c.Assert(s.Extract(), qt.DeepEquals, map[flowtrack.Tuple]Counts{})
+}
+
+func FuzzParse(f *testing.F) {
+ f.Fuzz(func(t *testing.T, b []byte) {
+ var s Statistics
+ s.UpdateRx(b) // must not panic
+ s.UpdateTx(b) // must not panic
+ s.Extract() // must not panic
+ })
+}
+
+var testV4 = func() (b [24]byte) {
+ b[0] = 4<<4 | 5 // version and header length
+ binary.BigEndian.PutUint16(b[2:], 1234) // size
+ b[9] = byte(ipproto.UDP) // protocol
+ *(*[4]byte)(b[12:]) = [4]byte{192, 168, 0, 1} // src addr
+ *(*[4]byte)(b[16:]) = [4]byte{192, 168, 0, 2} // dst addr
+ binary.BigEndian.PutUint16(b[20:], 456) // src port
+ binary.BigEndian.PutUint16(b[22:], 789) // dst port
+ return b
+}()
+
+/*
+func BenchmarkA(b *testing.B) {
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ var s Statistics
+ for j := 0; j < 1e3; j++ {
+ s.UpdateTx(testV4[:])
+ }
+ }
+}
+
+func BenchmarkB(b *testing.B) {
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ var s SimpleStatistics
+ for j := 0; j < 1e3; j++ {
+ s.UpdateTx(testV4[:])
+ }
+ }
+}
+
+func BenchmarkC(b *testing.B) {
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ var s Statistics
+ var group sync.WaitGroup
+ for k := 0; k < runtime.NumCPU(); k++ {
+ group.Add(1)
+ go func(k int) {
+ defer group.Done()
+ b := testV4
+ for j := 0; j < 1e3; j++ {
+ binary.LittleEndian.PutUint32(b[12:], uint32(k))
+ binary.LittleEndian.PutUint32(b[16:], uint32(j))
+ s.UpdateTx(b[:])
+ }
+ }(k)
+ }
+ group.Wait()
+ }
+}
+
+func BenchmarkD(b *testing.B) {
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ var s SimpleStatistics
+ var group sync.WaitGroup
+ for k := 0; k < runtime.NumCPU(); k++ {
+ group.Add(1)
+ go func(k int) {
+ defer group.Done()
+ b := testV4
+ for j := 0; j < 1e3; j++ {
+ binary.LittleEndian.PutUint32(b[12:], uint32(k))
+ binary.LittleEndian.PutUint32(b[16:], uint32(j))
+ s.UpdateTx(b[:])
+ }
+ }(k)
+ }
+ group.Wait()
+ }
+}
+*/
+
+// FUZZ
+// Benchmark:
+// IPv4 vs IPv6
+// single vs all cores
+// same vs unique addresses
+
+/*
+linear probing
+
+ 1 => 115595714 ns/op 859003746 B/op
+ 2 => 9355585 ns/op 46454947 B/op
+ 4 => 3301663 ns/op 8706967 B/op
+ 8 => 2775162 ns/op 4176433 B/op
+ 16 => 2517899 ns/op 2099434 B/op
+ 32 => 2397939 ns/op 2098986 B/op
+ 64 => 2118390 ns/op 1197352 B/op
+ 128 => 2029255 ns/op 1046729 B/op
+ 256 => 2069939 ns/op 1042577 B/op
+
+quadratic probing
+
+ 1 => 111134367 ns/op 825962200 B/op
+ 2 => 8061189 ns/op 45106117 B/op
+ 4 => 3216728 ns/op 8079556 B/op
+ 8 => 2576443 ns/op 2355890 B/op
+ 16 => 2471713 ns/op 2097196 B/op
+ 32 => 2108294 ns/op 1050225 B/op
+ 64 => 1964441 ns/op 1048736 B/op
+ 128 => 2118538 ns/op 1046663 B/op
+ 256 => 1968353 ns/op 1042568 B/op
+ 512 => 2049336 ns/op 1034306 B/op
+ 1024 => 2001605 ns/op 1017786 B/op
+ 2048 => 2046972 ns/op 984988 B/op
+ 4096 => 2108753 ns/op 919105 B/op
+*/
+
+func testPacketV4(proto ipproto.Proto, srcAddr, dstAddr [4]byte, srcPort, dstPort, size uint16) (out []byte) {
+ var ipHdr [20]byte
+ ipHdr[0] = 4<<4 | 5
+ binary.BigEndian.PutUint16(ipHdr[2:], size)
+ ipHdr[9] = byte(proto)
+ *(*[4]byte)(ipHdr[12:]) = srcAddr
+ *(*[4]byte)(ipHdr[16:]) = dstAddr
+ out = append(out, ipHdr[:]...)
+ switch proto {
+ case ipproto.TCP:
+ var tcpHdr [20]byte
+ binary.BigEndian.PutUint16(tcpHdr[0:], srcPort)
+ binary.BigEndian.PutUint16(tcpHdr[2:], dstPort)
+ out = append(out, tcpHdr[:]...)
+ case ipproto.UDP:
+ var udpHdr [8]byte
+ binary.BigEndian.PutUint16(udpHdr[0:], srcPort)
+ binary.BigEndian.PutUint16(udpHdr[2:], dstPort)
+ out = append(out, udpHdr[:]...)
+ default:
+ panic(fmt.Sprintf("unknown proto: %d", proto))
+ }
+ return append(out, make([]byte, int(size)-len(out))...)
+}
+
+func Benchmark(b *testing.B) {
+ b.Run("SingleRoutine/SameConn", func(b *testing.B) {
+ p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789)
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ var s Statistics
+ for j := 0; j < 1e3; j++ {
+ s.UpdateTx(p)
+ }
+ }
+ })
+ b.Run("SingleRoutine/UniqueConns", func(b *testing.B) {
+ p := testPacketV4(ipproto.UDP, [4]byte{}, [4]byte{}, 0, 0, 789)
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ var s Statistics
+ for j := 0; j < 1e3; j++ {
+ binary.BigEndian.PutUint32(p[20:], uint32(j)) // unique port combination
+ s.UpdateTx(p)
+ }
+ }
+ })
+ b.Run("MultiRoutine/SameConn", func(b *testing.B) {
+ p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789)
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ var s Statistics
+ var group sync.WaitGroup
+ for j := 0; j < runtime.NumCPU(); j++ {
+ group.Add(1)
+ go func() {
+ defer group.Done()
+ for k := 0; k < 1e3; k++ {
+ s.UpdateTx(p)
+ }
+ }()
+ }
+ group.Wait()
+ }
+ })
+ b.Run("MultiRoutine/UniqueConns", func(b *testing.B) {
+ ps := make([][]byte, runtime.NumCPU())
+ for i := range ps {
+ ps[i] = testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 0, 0, 789)
+ }
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ var s Statistics
+ var group sync.WaitGroup
+ for j := 0; j < runtime.NumCPU(); j++ {
+ group.Add(1)
+ go func(j int) {
+ defer group.Done()
+ p := ps[j]
+ j *= 1e3
+ for k := 0; k < 1e3; k++ {
+ binary.BigEndian.PutUint32(p[20:], uint32(j+k)) // unique port combination
+ s.UpdateTx(p)
+ }
+ }(j)
+ }
+ group.Wait()
+ }
+ })
+}