go/net/psample: init

This adds a minimal golang implementation facilitating network packet
sampling based on 'psample' kernel module.

Metropolis kernel configuration was modified both in order for this
change to be testable in a ktest, as well as to make sure Metropolis
will be able to run the included code.

Change-Id: Ie6a4721455f26644b6be01aa6190cf87f21355f3
Reviewed-on: https://review.monogon.dev/c/monogon/+/1102
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/go.mod b/go.mod
index b4c7ced..44db341 100644
--- a/go.mod
+++ b/go.mod
@@ -96,6 +96,8 @@
 	github.com/lib/pq v1.10.6
 	github.com/mattn/go-shellwords v1.0.12
 	github.com/mdlayher/ethtool v0.0.0-20211028163843-288d040e9d60
+	github.com/mdlayher/genetlink v1.2.0
+	github.com/mdlayher/netlink v1.6.0
 	github.com/mdlayher/raw v0.1.0
 	github.com/opencontainers/runc v1.1.3
 	github.com/pierrec/lz4/v4 v4.1.14
@@ -276,8 +278,6 @@
 	github.com/mattn/go-isatty v0.0.14 // indirect
 	github.com/mattn/go-runewidth v0.0.13 // indirect
 	github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
-	github.com/mdlayher/genetlink v1.2.0 // indirect
-	github.com/mdlayher/netlink v1.6.0 // indirect
 	github.com/mdlayher/packet v0.0.0-20220221164757-67998ac0ff93 // indirect
 	github.com/mdlayher/socket v0.2.1 // indirect
 	github.com/miekg/dns v1.1.48 // indirect
diff --git a/go/net/psample/BUILD.bazel b/go/net/psample/BUILD.bazel
new file mode 100644
index 0000000..36e19d3
--- /dev/null
+++ b/go/net/psample/BUILD.bazel
@@ -0,0 +1,29 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("//metropolis/test/ktest:ktest.bzl", "ktest")
+
+go_library(
+    name = "psample",
+    srcs = ["subscriber.go"],
+    importpath = "source.monogon.dev/go/net/psample",
+    visibility = ["//visibility:public"],
+    deps = [
+        "@com_github_mdlayher_genetlink//:genetlink",
+        "@com_github_mdlayher_netlink//:netlink",
+    ],
+)
+
+go_test(
+    name = "psample_test",
+    srcs = ["psample_test.go"],
+    embed = [":psample"],
+    deps = [
+        "@com_github_google_gopacket//:gopacket",
+        "@com_github_google_gopacket//layers",
+        "@com_github_vishvananda_netlink//:netlink",
+        "@org_golang_x_sys//unix",
+    ],
+)
+
+ktest(
+    tester = ":psample_test",
+)
diff --git a/go/net/psample/psample_test.go b/go/net/psample/psample_test.go
new file mode 100644
index 0000000..925ae0d
--- /dev/null
+++ b/go/net/psample/psample_test.go
@@ -0,0 +1,220 @@
+// This test requires the following Linux kernel configuration options to be
+// set:
+//
+// CONFIG_NET_CLS_ACT
+// CONFIG_NET_CLS_MATCHALL
+// CONFIG_NET_SCHED
+// CONFIG_NET_SCH_INGRESS
+// CONFIG_PSAMPLE
+// CONFIG_NET_ACT_SAMPLE
+package psample
+
+import (
+	"errors"
+	"net"
+	"os"
+	"strings"
+	"syscall"
+	"testing"
+
+	"golang.org/x/sys/unix"
+
+	"github.com/google/gopacket"
+	"github.com/google/gopacket/layers"
+	"github.com/vishvananda/netlink"
+)
+
+// setupLink adds and brings up a named link suited for packet capture.
+func setupLink(t *testing.T, name string) netlink.Link {
+	t.Helper()
+
+	lnk := &netlink.Dummy{
+		LinkAttrs: netlink.LinkAttrs{Name: name},
+	}
+	if err := netlink.LinkAdd(lnk); err != nil {
+		t.Fatalf("while adding link: %v", err)
+	}
+	if err := netlink.LinkSetUp(lnk); err != nil {
+		t.Fatalf("while setting up link: %v", err)
+	}
+	return lnk
+}
+
+// setupQdisc registers a clsact qdisc, which works on both the ingress and
+// the egress. This is important as we'll be sampling packets leaving the test
+// interface 'lk'. The qdisc is registered with the link lk.
+// More on clsact: https://lwn.net/Articles/671458/
+func setupQdisc(t *testing.T, lk netlink.Link) netlink.GenericQdisc {
+	t.Helper()
+
+	qdisc := netlink.GenericQdisc{
+		QdiscAttrs: netlink.QdiscAttrs{
+			LinkIndex: lk.Attrs().Index,
+			Handle:    netlink.MakeHandle(0xffff, 0),
+			Parent:    netlink.HANDLE_CLSACT,
+		},
+		QdiscType: "clsact",
+	}
+	if err := netlink.QdiscAdd(&qdisc); err != nil {
+		t.Fatalf("while adding qdisc: %v", err)
+	}
+	return qdisc
+}
+
+// setupSamplingFilter adds a filter on 'lk' link that will sample packets
+// exiting the interface.
+func setupSamplingFilter(t *testing.T, lk netlink.Link) netlink.Filter {
+	t.Helper()
+
+	sa := netlink.NewSampleAction()
+	// Sampled packets can be assigned their distinct group, allowing for
+	// multiple flows to be sampled and analyzed separately at the same time.
+	sa.Group = 7
+	// Every 10th packet will be sampled.
+	sa.Rate = 10
+	// Packet samples will be truncated to sa.TruncSize.
+	sa.TruncSize = 1500
+
+	fcid := netlink.MakeHandle(1, 1)
+	filter := &netlink.MatchAll{
+		FilterAttrs: netlink.FilterAttrs{
+			LinkIndex: lk.Attrs().Index,
+			Parent:    netlink.HANDLE_MIN_EGRESS,
+			Priority:  1,
+			Protocol:  unix.ETH_P_ALL,
+		},
+		ClassId: fcid,
+		Actions: []netlink.Action{
+			sa,
+		},
+	}
+	if err := netlink.FilterAdd(filter); err != nil {
+		t.Fatalf("while adding filter: %v", err)
+	}
+	return filter
+}
+
+// packetAttrs contains the test attributes looked for in a packet sample.
+type packetAttrs struct {
+	// magic is the string expected to be found in the packet's application layer
+	// contents.
+	magic string
+	// oifIdx identifies the egress interface the packet is exiting.
+	oifIdx uint16
+}
+
+// match returns true if packet 'raw' matches attributes specified in 'pa'.
+func (pa packetAttrs) match(t *testing.T, raw Packet) bool {
+	t.Helper()
+
+	// Check the packet's indicated egress interface.
+	if raw.OutgoingInterfaceIndex != pa.oifIdx {
+		return false
+	}
+
+	// Check the packet's payload.
+	if raw.Data == nil {
+		t.Fatalf("missing payload")
+	}
+	p := gopacket.NewPacket(raw.Data, layers.LayerTypeEthernet, gopacket.Default)
+	if app := p.ApplicationLayer(); app != nil {
+		if strings.Contains(string(app.Payload()), pa.magic) {
+			return true
+		}
+	}
+	return false
+}
+
+// TestSampling ascertains that packet samples can be obtained through use of
+// this package's Subscribe(), and Receive().
+func TestSampling(t *testing.T) {
+	if os.Getenv("IN_KTEST") != "true" {
+		t.Skip("Not in ktest")
+	}
+
+	// Make sure 'psample' module is loaded.
+	if _, err := netlink.GenlFamilyGet("psample"); err != nil {
+		t.Fatalf("psample genetlink family unavailable - is CONFIG_PSAMPLE enabled?")
+	}
+
+	// Set up the test link/interface, and supply it with a network address,
+	// which will enable routing for the test packets.
+	var localA = netlink.Addr{
+		IPNet: &net.IPNet{
+			IP:   net.IPv4(10, 0, 0, 4),
+			Mask: net.CIDRMask(24, 32),
+		},
+	}
+	lk := setupLink(t, "if1")
+	if err := netlink.AddrAdd(lk, &localA); err != nil {
+		t.Fatalf("while adding network address: %v", err)
+	}
+
+	// Set up sampling.
+
+	setupQdisc(t, lk)
+	setupSamplingFilter(t, lk)
+
+	c, err := Subscribe()
+	if err != nil {
+		t.Fatalf("while subscribing to psample notifications: %v", err)
+	}
+
+	// Test case: we'll send UDP datagrams to a remote address within the network
+	// associated with link lk, expecting these packets to show up in the sampled
+	// egress traffic.
+	var dstA = netlink.Addr{
+		IPNet: &net.IPNet{
+			IP:   net.IPv4(10, 0, 0, 5),
+			Mask: net.CIDRMask(24, 32),
+		},
+	}
+
+	// The sampled packets are expected to:
+	// - egress from interface 'if1'
+	// - contain a magic payload
+	sa := packetAttrs{
+		magic:  "test datagram",
+		oifIdx: uint16(lk.Attrs().Index),
+	}
+
+	// Look for packets matching attributes defined in 'sa'. Signal on 'dC'
+	// immediately after the expected packet has been received, then return.
+	dC := make(chan struct{})
+	go func() {
+		for {
+			pkts, err := Receive(c)
+			// Receiving ENOBUFS is expected in this case. It signals that some of
+			// the sampled traffic could not have been captured, and had been
+			// dropped instead.
+			if err != nil && !errors.Is(err, syscall.ENOBUFS) {
+				t.Fatalf("while receiving psamples: %v", err)
+			}
+			for _, raw := range pkts {
+				if sa.match(t, raw) {
+					t.Logf("Got the expected packet sample.")
+					dC <- struct{}{}
+					return
+				}
+			}
+		}
+	}()
+
+	// Send out the test datagrams. Return as soon as the test succeeds.
+
+	dstE := net.JoinHostPort(dstA.IP.String(), "1234")
+	conn, err := net.Dial("udp", dstE)
+	if err != nil {
+		t.Fatalf("while dialing UDP address: %v", err)
+	}
+	defer conn.Close()
+
+	for {
+		select {
+		case <-dC:
+			return
+		default:
+			conn.Write([]byte(sa.magic))
+		}
+	}
+}
diff --git a/go/net/psample/subscriber.go b/go/net/psample/subscriber.go
new file mode 100644
index 0000000..065c6b6
--- /dev/null
+++ b/go/net/psample/subscriber.go
@@ -0,0 +1,179 @@
+// Package psample provides a receiver for sampled network packets using the
+// Netlink psample interface.
+package psample
+
+import (
+	"fmt"
+
+	"github.com/mdlayher/genetlink"
+	"github.com/mdlayher/netlink"
+)
+
+// attrId identifies psample netlink message attributes.
+// Identifier numbers are based on psample kernel module sources:
+// https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/uapi/linux/psample.h?h=v5.15.89#n5
+type attrId uint16
+
+const (
+	aIIfIndex      attrId = iota // u16
+	aOIfIndex                    // u16
+	aOrigSize                    // u32
+	aSampleGroup                 // u32
+	aGroupSeq                    // u32
+	aSampleRate                  // u32
+	aData                        // []byte
+	aGroupRefcount               // u32
+	aTunnel
+
+	aPad
+	aOutTC     // u16
+	aOutTCOCC  // u64
+	aLatency   // u64, nanoseconds
+	aTimestamp // u64, nanoseconds
+	aProto     // u16
+)
+
+// Packet contains the sampled packet in its raw form, along with its
+// 'psample' metadata.
+type Packet struct {
+	// IncomingInterfaceIndex is the incoming interface index of the packet or 0
+	// if not applicable.
+	IncomingInterfaceIndex uint16
+	// OutgoingInterfaceIndex is the outgoing interface index of the packet or 0
+	// if not applicable.
+	OutgoingInterfaceIndex uint16
+	// OriginalSize is the packet's original size in bytes without any
+	// truncation.
+	OriginalSize uint32
+	// SampleGroup is the sample group to which this packet belongs. This is set
+	// by the sampling action and can be used to differentiate different
+	// sampling streams.
+	SampleGroup uint32
+	// GroupSequence is a monotonically-increasing counter of packets sampled
+	// for each sample group.
+	GroupSequence uint32
+	// SampleRate is the sampling rate (1 in SampleRate packets) used to capture
+	// this packet.
+	SampleRate uint32
+	// Data contains the packet data up to the specified size for truncation.
+	Data []byte
+
+	// The following attributes are only available on kernel versions 5.13+
+
+	// Latency is the sampled packet's latency as indicated by psample. It's
+	// expressed in nanoseconds.
+	Latency uint64
+	// Timestamp marks time of the packet's sampling. It's set by the kernel, and
+	// expressed in Unix nanoseconds.
+	Timestamp uint64
+}
+
+// decode converts raw generic netlink message attributes into a Packet. In
+// cases where some of the known psample attributes were left unspecified in
+// the message, appropriate Packet member variables will be left with their
+// zero values.
+func decode(b []byte) (*Packet, error) {
+	ad, err := netlink.NewAttributeDecoder(b)
+	if err != nil {
+		return nil, err
+	}
+
+	var p Packet
+	for ad.Next() {
+		switch attrId(ad.Type()) {
+		case aIIfIndex:
+			p.IncomingInterfaceIndex = ad.Uint16()
+		case aOIfIndex:
+			p.OutgoingInterfaceIndex = ad.Uint16()
+		case aOrigSize:
+			p.OriginalSize = ad.Uint32()
+		case aSampleGroup:
+			p.SampleGroup = ad.Uint32()
+		case aGroupSeq:
+			p.GroupSequence = ad.Uint32()
+		case aSampleRate:
+			p.SampleRate = ad.Uint32()
+		case aData:
+			p.Data = ad.Bytes()
+		case aLatency:
+			p.Latency = ad.Uint64()
+		case aTimestamp:
+			p.Timestamp = ad.Uint64()
+		}
+	}
+	return &p, nil
+}
+
+// Subscribe returns a NetlinkSocket that's already subscribed to "packets"
+// psample multicast group, which makes it ready to receive packet samples.
+// Close should be called on the returned socket.
+func Subscribe() (*genetlink.Conn, error) {
+	// Create a netlink socket.
+	c, err := genetlink.Dial(nil)
+	if err != nil {
+		return nil, fmt.Errorf("while dialing netlink socket: %w", err)
+	}
+
+	// Lookup the netlink family id associated with psample kernel module.
+	f, err := c.GetFamily("psample")
+	if err != nil {
+		c.Close()
+		return nil, fmt.Errorf("couldn't lookup \"psample\" netlink family: %w", err)
+	}
+
+	// Lookup psample's packet sampling netlink multicast group.
+	var pktGrpId uint32
+	for _, mgrp := range f.Groups {
+		if mgrp.Name == "packets" {
+			pktGrpId = mgrp.ID
+			break
+		}
+	}
+	if pktGrpId == 0 {
+		c.Close()
+		return nil, fmt.Errorf("packets multicast group not found")
+	}
+
+	// Subscribe to 'packets' multicast group in order to receive packet
+	// samples.
+	if err := c.JoinGroup(pktGrpId); err != nil {
+		c.Close()
+		return nil, fmt.Errorf("couldn't join multicast group: %w", err)
+	}
+	return c, nil
+}
+
+// Receive returns one or more of the sampled packets as soon as they're
+// available. It may return a syscall.ENOBUFS error which indicates that the
+// kernel-side buffer of the netlink connection has overflowed and lost
+// packets. This is a transient error, calling Receive again will retrieve
+// future packet samples.
+func Receive(c *genetlink.Conn) ([]Packet, error) {
+	// Wait for the samples to arrive over generic netlink connection c.
+	gnms, nms, err := c.Receive()
+	if err != nil {
+		return nil, fmt.Errorf("while receiving netlink notifications: %w", err)
+	}
+
+	var pkts []Packet
+	for i := 0; i < len(nms); i++ {
+		// Only process multicast notifications.
+		if nms[i].Header.PID != 0 {
+			continue
+		}
+
+		// PSAMPLE_CMD_SAMPLE should be zero in multicast notifications.
+		if gnms[i].Header.Command != 0 {
+			continue
+		}
+
+		// Iterate over the Generic Netlink attributes present in the message,
+		// extracting any relating to the sampled packet.
+		pkt, err := decode(gnms[i].Data)
+		if err != nil {
+			return nil, fmt.Errorf("while decoding netlink notification: %w", err)
+		}
+		pkts = append(pkts, *pkt)
+	}
+	return pkts, nil
+}
diff --git a/metropolis/test/ktest/BUILD b/metropolis/test/ktest/BUILD
index 0a5f5b3..952d7e8 100644
--- a/metropolis/test/ktest/BUILD
+++ b/metropolis/test/ktest/BUILD
@@ -14,7 +14,10 @@
     name = "ktest",
     embed = [":ktest_lib"],
     pure = "on",
-    visibility = ["//metropolis:__subpackages__"],
+    visibility = [
+        "//go/net/psample:__pkg__",
+        "//metropolis:__subpackages__",
+    ],
 )
 
 kconfig_patch(
@@ -41,6 +44,7 @@
     # This image is directly used by the ktest macro, thus it needs a pretty
     # wide visibility.
     visibility = [
+        "//go/net/psample:__pkg__",
         "//metropolis:__subpackages__",
     ],
 )
@@ -48,5 +52,8 @@
 filegroup(
     name = "test-script",
     srcs = ["run_ktest.sh"],
-    visibility = ["//metropolis:__subpackages__"],
+    visibility = [
+        "//go/net/psample:__pkg__",
+        "//metropolis:__subpackages__",
+    ],
 )
diff --git a/metropolis/test/ktest/init/BUILD.bazel b/metropolis/test/ktest/init/BUILD.bazel
index f6b540f..e48bcb2 100644
--- a/metropolis/test/ktest/init/BUILD.bazel
+++ b/metropolis/test/ktest/init/BUILD.bazel
@@ -12,5 +12,8 @@
     name = "init",
     embed = [":init_lib"],
     pure = "on",
-    visibility = ["//metropolis:__subpackages__"],
+    visibility = [
+        "//go/net/psample:__pkg__",
+        "//metropolis:__subpackages__",
+    ],
 )
diff --git a/third_party/linux/linux-metropolis.config b/third_party/linux/linux-metropolis.config
index 1877ff0..d13769d 100644
--- a/third_party/linux/linux-metropolis.config
+++ b/third_party/linux/linux-metropolis.config
@@ -884,6 +884,7 @@
 # end of Memory Management options
 
 CONFIG_NET=y
+CONFIG_NET_ACT_SAMPLE=y
 CONFIG_NET_INGRESS=y
 
 #
@@ -1120,7 +1121,10 @@
 # CONFIG_PHONET is not set
 # CONFIG_6LOWPAN is not set
 # CONFIG_IEEE802154 is not set
-# CONFIG_NET_SCHED is not set
+CONFIG_NET_CLS_ACT=y
+CONFIG_NET_CLS_MATCHALL=y
+CONFIG_NET_SCHED=y
+CONFIG_NET_SCH_INGRESS=y
 # CONFIG_DCB is not set
 # CONFIG_DNS_RESOLVER is not set
 # CONFIG_BATMAN_ADV is not set
@@ -1165,7 +1169,7 @@
 # CONFIG_CAIF is not set
 # CONFIG_CEPH_LIB is not set
 # CONFIG_NFC is not set
-# CONFIG_PSAMPLE is not set
+CONFIG_PSAMPLE=y
 # CONFIG_NET_IFE is not set
 # CONFIG_LWTUNNEL is not set
 CONFIG_DST_CACHE=y