go/net/psample: init
This adds a minimal golang implementation facilitating network packet
sampling based on 'psample' kernel module.
Metropolis kernel configuration was modified both in order for this
change to be testable in a ktest, as well as to make sure Metropolis
will be able to run the included code.
Change-Id: Ie6a4721455f26644b6be01aa6190cf87f21355f3
Reviewed-on: https://review.monogon.dev/c/monogon/+/1102
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/go.mod b/go.mod
index b4c7ced..44db341 100644
--- a/go.mod
+++ b/go.mod
@@ -96,6 +96,8 @@
github.com/lib/pq v1.10.6
github.com/mattn/go-shellwords v1.0.12
github.com/mdlayher/ethtool v0.0.0-20211028163843-288d040e9d60
+ github.com/mdlayher/genetlink v1.2.0
+ github.com/mdlayher/netlink v1.6.0
github.com/mdlayher/raw v0.1.0
github.com/opencontainers/runc v1.1.3
github.com/pierrec/lz4/v4 v4.1.14
@@ -276,8 +278,6 @@
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
- github.com/mdlayher/genetlink v1.2.0 // indirect
- github.com/mdlayher/netlink v1.6.0 // indirect
github.com/mdlayher/packet v0.0.0-20220221164757-67998ac0ff93 // indirect
github.com/mdlayher/socket v0.2.1 // indirect
github.com/miekg/dns v1.1.48 // indirect
diff --git a/go/net/psample/BUILD.bazel b/go/net/psample/BUILD.bazel
new file mode 100644
index 0000000..36e19d3
--- /dev/null
+++ b/go/net/psample/BUILD.bazel
@@ -0,0 +1,29 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+load("//metropolis/test/ktest:ktest.bzl", "ktest")
+
+go_library(
+ name = "psample",
+ srcs = ["subscriber.go"],
+ importpath = "source.monogon.dev/go/net/psample",
+ visibility = ["//visibility:public"],
+ deps = [
+ "@com_github_mdlayher_genetlink//:genetlink",
+ "@com_github_mdlayher_netlink//:netlink",
+ ],
+)
+
+go_test(
+ name = "psample_test",
+ srcs = ["psample_test.go"],
+ embed = [":psample"],
+ deps = [
+ "@com_github_google_gopacket//:gopacket",
+ "@com_github_google_gopacket//layers",
+ "@com_github_vishvananda_netlink//:netlink",
+ "@org_golang_x_sys//unix",
+ ],
+)
+
+ktest(
+ tester = ":psample_test",
+)
diff --git a/go/net/psample/psample_test.go b/go/net/psample/psample_test.go
new file mode 100644
index 0000000..925ae0d
--- /dev/null
+++ b/go/net/psample/psample_test.go
@@ -0,0 +1,220 @@
+// This test requires the following Linux kernel configuration options to be
+// set:
+//
+// CONFIG_NET_CLS_ACT
+// CONFIG_NET_CLS_MATCHALL
+// CONFIG_NET_SCHED
+// CONFIG_NET_SCH_INGRESS
+// CONFIG_PSAMPLE
+// CONFIG_NET_ACT_SAMPLE
+package psample
+
+import (
+ "errors"
+ "net"
+ "os"
+ "strings"
+ "syscall"
+ "testing"
+
+ "golang.org/x/sys/unix"
+
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/vishvananda/netlink"
+)
+
+// setupLink adds and brings up a named link suited for packet capture.
+func setupLink(t *testing.T, name string) netlink.Link {
+ t.Helper()
+
+ lnk := &netlink.Dummy{
+ LinkAttrs: netlink.LinkAttrs{Name: name},
+ }
+ if err := netlink.LinkAdd(lnk); err != nil {
+ t.Fatalf("while adding link: %v", err)
+ }
+ if err := netlink.LinkSetUp(lnk); err != nil {
+ t.Fatalf("while setting up link: %v", err)
+ }
+ return lnk
+}
+
+// setupQdisc registers a clsact qdisc, which works on both the ingress and
+// the egress. This is important as we'll be sampling packets leaving the test
+// interface 'lk'. The qdisc is registered with the link lk.
+// More on clsact: https://lwn.net/Articles/671458/
+func setupQdisc(t *testing.T, lk netlink.Link) netlink.GenericQdisc {
+ t.Helper()
+
+ qdisc := netlink.GenericQdisc{
+ QdiscAttrs: netlink.QdiscAttrs{
+ LinkIndex: lk.Attrs().Index,
+ Handle: netlink.MakeHandle(0xffff, 0),
+ Parent: netlink.HANDLE_CLSACT,
+ },
+ QdiscType: "clsact",
+ }
+ if err := netlink.QdiscAdd(&qdisc); err != nil {
+ t.Fatalf("while adding qdisc: %v", err)
+ }
+ return qdisc
+}
+
+// setupSamplingFilter adds a filter on 'lk' link that will sample packets
+// exiting the interface.
+func setupSamplingFilter(t *testing.T, lk netlink.Link) netlink.Filter {
+ t.Helper()
+
+ sa := netlink.NewSampleAction()
+ // Sampled packets can be assigned their distinct group, allowing for
+ // multiple flows to be sampled and analyzed separately at the same time.
+ sa.Group = 7
+ // Every 10th packet will be sampled.
+ sa.Rate = 10
+ // Packet samples will be truncated to sa.TruncSize.
+ sa.TruncSize = 1500
+
+ fcid := netlink.MakeHandle(1, 1)
+ filter := &netlink.MatchAll{
+ FilterAttrs: netlink.FilterAttrs{
+ LinkIndex: lk.Attrs().Index,
+ Parent: netlink.HANDLE_MIN_EGRESS,
+ Priority: 1,
+ Protocol: unix.ETH_P_ALL,
+ },
+ ClassId: fcid,
+ Actions: []netlink.Action{
+ sa,
+ },
+ }
+ if err := netlink.FilterAdd(filter); err != nil {
+ t.Fatalf("while adding filter: %v", err)
+ }
+ return filter
+}
+
+// packetAttrs contains the test attributes looked for in a packet sample.
+type packetAttrs struct {
+ // magic is the string expected to be found in the packet's application layer
+ // contents.
+ magic string
+ // oifIdx identifies the egress interface the packet is exiting.
+ oifIdx uint16
+}
+
+// match returns true if packet 'raw' matches attributes specified in 'pa'.
+func (pa packetAttrs) match(t *testing.T, raw Packet) bool {
+ t.Helper()
+
+ // Check the packet's indicated egress interface.
+ if raw.OutgoingInterfaceIndex != pa.oifIdx {
+ return false
+ }
+
+ // Check the packet's payload.
+ if raw.Data == nil {
+ t.Fatalf("missing payload")
+ }
+ p := gopacket.NewPacket(raw.Data, layers.LayerTypeEthernet, gopacket.Default)
+ if app := p.ApplicationLayer(); app != nil {
+ if strings.Contains(string(app.Payload()), pa.magic) {
+ return true
+ }
+ }
+ return false
+}
+
+// TestSampling ascertains that packet samples can be obtained through use of
+// this package's Subscribe(), and Receive().
+func TestSampling(t *testing.T) {
+ if os.Getenv("IN_KTEST") != "true" {
+ t.Skip("Not in ktest")
+ }
+
+ // Make sure 'psample' module is loaded.
+ if _, err := netlink.GenlFamilyGet("psample"); err != nil {
+ t.Fatalf("psample genetlink family unavailable - is CONFIG_PSAMPLE enabled?")
+ }
+
+ // Set up the test link/interface, and supply it with a network address,
+ // which will enable routing for the test packets.
+ var localA = netlink.Addr{
+ IPNet: &net.IPNet{
+ IP: net.IPv4(10, 0, 0, 4),
+ Mask: net.CIDRMask(24, 32),
+ },
+ }
+ lk := setupLink(t, "if1")
+ if err := netlink.AddrAdd(lk, &localA); err != nil {
+ t.Fatalf("while adding network address: %v", err)
+ }
+
+ // Set up sampling.
+
+ setupQdisc(t, lk)
+ setupSamplingFilter(t, lk)
+
+ c, err := Subscribe()
+ if err != nil {
+ t.Fatalf("while subscribing to psample notifications: %v", err)
+ }
+
+ // Test case: we'll send UDP datagrams to a remote address within the network
+ // associated with link lk, expecting these packets to show up in the sampled
+ // egress traffic.
+ var dstA = netlink.Addr{
+ IPNet: &net.IPNet{
+ IP: net.IPv4(10, 0, 0, 5),
+ Mask: net.CIDRMask(24, 32),
+ },
+ }
+
+ // The sampled packets are expected to:
+ // - egress from interface 'if1'
+ // - contain a magic payload
+ sa := packetAttrs{
+ magic: "test datagram",
+ oifIdx: uint16(lk.Attrs().Index),
+ }
+
+ // Look for packets matching attributes defined in 'sa'. Signal on 'dC'
+ // immediately after the expected packet has been received, then return.
+ dC := make(chan struct{})
+ go func() {
+ for {
+ pkts, err := Receive(c)
+ // Receiving ENOBUFS is expected in this case. It signals that some of
+ // the sampled traffic could not have been captured, and had been
+ // dropped instead.
+ if err != nil && !errors.Is(err, syscall.ENOBUFS) {
+ t.Fatalf("while receiving psamples: %v", err)
+ }
+ for _, raw := range pkts {
+ if sa.match(t, raw) {
+ t.Logf("Got the expected packet sample.")
+ dC <- struct{}{}
+ return
+ }
+ }
+ }
+ }()
+
+ // Send out the test datagrams. Return as soon as the test succeeds.
+
+ dstE := net.JoinHostPort(dstA.IP.String(), "1234")
+ conn, err := net.Dial("udp", dstE)
+ if err != nil {
+ t.Fatalf("while dialing UDP address: %v", err)
+ }
+ defer conn.Close()
+
+ for {
+ select {
+ case <-dC:
+ return
+ default:
+ conn.Write([]byte(sa.magic))
+ }
+ }
+}
diff --git a/go/net/psample/subscriber.go b/go/net/psample/subscriber.go
new file mode 100644
index 0000000..065c6b6
--- /dev/null
+++ b/go/net/psample/subscriber.go
@@ -0,0 +1,179 @@
+// Package psample provides a receiver for sampled network packets using the
+// Netlink psample interface.
+package psample
+
+import (
+ "fmt"
+
+ "github.com/mdlayher/genetlink"
+ "github.com/mdlayher/netlink"
+)
+
+// attrId identifies psample netlink message attributes.
+// Identifier numbers are based on psample kernel module sources:
+// https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/uapi/linux/psample.h?h=v5.15.89#n5
+type attrId uint16
+
+const (
+ aIIfIndex attrId = iota // u16
+ aOIfIndex // u16
+ aOrigSize // u32
+ aSampleGroup // u32
+ aGroupSeq // u32
+ aSampleRate // u32
+ aData // []byte
+ aGroupRefcount // u32
+ aTunnel
+
+ aPad
+ aOutTC // u16
+ aOutTCOCC // u64
+ aLatency // u64, nanoseconds
+ aTimestamp // u64, nanoseconds
+ aProto // u16
+)
+
+// Packet contains the sampled packet in its raw form, along with its
+// 'psample' metadata.
+type Packet struct {
+ // IncomingInterfaceIndex is the incoming interface index of the packet or 0
+ // if not applicable.
+ IncomingInterfaceIndex uint16
+ // OutgoingInterfaceIndex is the outgoing interface index of the packet or 0
+ // if not applicable.
+ OutgoingInterfaceIndex uint16
+ // OriginalSize is the packet's original size in bytes without any
+ // truncation.
+ OriginalSize uint32
+ // SampleGroup is the sample group to which this packet belongs. This is set
+ // by the sampling action and can be used to differentiate different
+ // sampling streams.
+ SampleGroup uint32
+ // GroupSequence is a monotonically-increasing counter of packets sampled
+ // for each sample group.
+ GroupSequence uint32
+ // SampleRate is the sampling rate (1 in SampleRate packets) used to capture
+ // this packet.
+ SampleRate uint32
+ // Data contains the packet data up to the specified size for truncation.
+ Data []byte
+
+ // The following attributes are only available on kernel versions 5.13+
+
+ // Latency is the sampled packet's latency as indicated by psample. It's
+ // expressed in nanoseconds.
+ Latency uint64
+ // Timestamp marks time of the packet's sampling. It's set by the kernel, and
+ // expressed in Unix nanoseconds.
+ Timestamp uint64
+}
+
+// decode converts raw generic netlink message attributes into a Packet. In
+// cases where some of the known psample attributes were left unspecified in
+// the message, appropriate Packet member variables will be left with their
+// zero values.
+func decode(b []byte) (*Packet, error) {
+ ad, err := netlink.NewAttributeDecoder(b)
+ if err != nil {
+ return nil, err
+ }
+
+ var p Packet
+ for ad.Next() {
+ switch attrId(ad.Type()) {
+ case aIIfIndex:
+ p.IncomingInterfaceIndex = ad.Uint16()
+ case aOIfIndex:
+ p.OutgoingInterfaceIndex = ad.Uint16()
+ case aOrigSize:
+ p.OriginalSize = ad.Uint32()
+ case aSampleGroup:
+ p.SampleGroup = ad.Uint32()
+ case aGroupSeq:
+ p.GroupSequence = ad.Uint32()
+ case aSampleRate:
+ p.SampleRate = ad.Uint32()
+ case aData:
+ p.Data = ad.Bytes()
+ case aLatency:
+ p.Latency = ad.Uint64()
+ case aTimestamp:
+ p.Timestamp = ad.Uint64()
+ }
+ }
+ return &p, nil
+}
+
+// Subscribe returns a NetlinkSocket that's already subscribed to "packets"
+// psample multicast group, which makes it ready to receive packet samples.
+// Close should be called on the returned socket.
+func Subscribe() (*genetlink.Conn, error) {
+ // Create a netlink socket.
+ c, err := genetlink.Dial(nil)
+ if err != nil {
+ return nil, fmt.Errorf("while dialing netlink socket: %w", err)
+ }
+
+ // Lookup the netlink family id associated with psample kernel module.
+ f, err := c.GetFamily("psample")
+ if err != nil {
+ c.Close()
+ return nil, fmt.Errorf("couldn't lookup \"psample\" netlink family: %w", err)
+ }
+
+ // Lookup psample's packet sampling netlink multicast group.
+ var pktGrpId uint32
+ for _, mgrp := range f.Groups {
+ if mgrp.Name == "packets" {
+ pktGrpId = mgrp.ID
+ break
+ }
+ }
+ if pktGrpId == 0 {
+ c.Close()
+ return nil, fmt.Errorf("packets multicast group not found")
+ }
+
+ // Subscribe to 'packets' multicast group in order to receive packet
+ // samples.
+ if err := c.JoinGroup(pktGrpId); err != nil {
+ c.Close()
+ return nil, fmt.Errorf("couldn't join multicast group: %w", err)
+ }
+ return c, nil
+}
+
+// Receive returns one or more of the sampled packets as soon as they're
+// available. It may return a syscall.ENOBUFS error which indicates that the
+// kernel-side buffer of the netlink connection has overflowed and lost
+// packets. This is a transient error, calling Receive again will retrieve
+// future packet samples.
+func Receive(c *genetlink.Conn) ([]Packet, error) {
+ // Wait for the samples to arrive over generic netlink connection c.
+ gnms, nms, err := c.Receive()
+ if err != nil {
+ return nil, fmt.Errorf("while receiving netlink notifications: %w", err)
+ }
+
+ var pkts []Packet
+ for i := 0; i < len(nms); i++ {
+ // Only process multicast notifications.
+ if nms[i].Header.PID != 0 {
+ continue
+ }
+
+ // PSAMPLE_CMD_SAMPLE should be zero in multicast notifications.
+ if gnms[i].Header.Command != 0 {
+ continue
+ }
+
+ // Iterate over the Generic Netlink attributes present in the message,
+ // extracting any relating to the sampled packet.
+ pkt, err := decode(gnms[i].Data)
+ if err != nil {
+ return nil, fmt.Errorf("while decoding netlink notification: %w", err)
+ }
+ pkts = append(pkts, *pkt)
+ }
+ return pkts, nil
+}
diff --git a/metropolis/test/ktest/BUILD b/metropolis/test/ktest/BUILD
index 0a5f5b3..952d7e8 100644
--- a/metropolis/test/ktest/BUILD
+++ b/metropolis/test/ktest/BUILD
@@ -14,7 +14,10 @@
name = "ktest",
embed = [":ktest_lib"],
pure = "on",
- visibility = ["//metropolis:__subpackages__"],
+ visibility = [
+ "//go/net/psample:__pkg__",
+ "//metropolis:__subpackages__",
+ ],
)
kconfig_patch(
@@ -41,6 +44,7 @@
# This image is directly used by the ktest macro, thus it needs a pretty
# wide visibility.
visibility = [
+ "//go/net/psample:__pkg__",
"//metropolis:__subpackages__",
],
)
@@ -48,5 +52,8 @@
filegroup(
name = "test-script",
srcs = ["run_ktest.sh"],
- visibility = ["//metropolis:__subpackages__"],
+ visibility = [
+ "//go/net/psample:__pkg__",
+ "//metropolis:__subpackages__",
+ ],
)
diff --git a/metropolis/test/ktest/init/BUILD.bazel b/metropolis/test/ktest/init/BUILD.bazel
index f6b540f..e48bcb2 100644
--- a/metropolis/test/ktest/init/BUILD.bazel
+++ b/metropolis/test/ktest/init/BUILD.bazel
@@ -12,5 +12,8 @@
name = "init",
embed = [":init_lib"],
pure = "on",
- visibility = ["//metropolis:__subpackages__"],
+ visibility = [
+ "//go/net/psample:__pkg__",
+ "//metropolis:__subpackages__",
+ ],
)
diff --git a/third_party/linux/linux-metropolis.config b/third_party/linux/linux-metropolis.config
index 1877ff0..d13769d 100644
--- a/third_party/linux/linux-metropolis.config
+++ b/third_party/linux/linux-metropolis.config
@@ -884,6 +884,7 @@
# end of Memory Management options
CONFIG_NET=y
+CONFIG_NET_ACT_SAMPLE=y
CONFIG_NET_INGRESS=y
#
@@ -1120,7 +1121,10 @@
# CONFIG_PHONET is not set
# CONFIG_6LOWPAN is not set
# CONFIG_IEEE802154 is not set
-# CONFIG_NET_SCHED is not set
+CONFIG_NET_CLS_ACT=y
+CONFIG_NET_CLS_MATCHALL=y
+CONFIG_NET_SCHED=y
+CONFIG_NET_SCH_INGRESS=y
# CONFIG_DCB is not set
# CONFIG_DNS_RESOLVER is not set
# CONFIG_BATMAN_ADV is not set
@@ -1165,7 +1169,7 @@
# CONFIG_CAIF is not set
# CONFIG_CEPH_LIB is not set
# CONFIG_NFC is not set
-# CONFIG_PSAMPLE is not set
+CONFIG_PSAMPLE=y
# CONFIG_NET_IFE is not set
# CONFIG_LWTUNNEL is not set
CONFIG_DST_CACHE=y