treewide: update nftables to 0.3 with patches
This update nftables to 0.3. The bigger-buffer patch is no longer
needed, because it turns out that it is possible to increase the buffer
size using nftables.WithSockOptions, which k8s-nft-npc now does. I added
two new patches which implement features which we need for k8s-nft-npc.
I made upstream PRs for these which are still pending review.
Change-Id: Iefbf850147a4c6dfd110e356fb43e822f436e843
Reviewed-on: https://review.monogon.dev/c/monogon/+/3994
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/third_party/com_github_google_nftables/nftables-rule-handle.patch b/third_party/com_github_google_nftables/nftables-rule-handle.patch
new file mode 100644
index 0000000..df00fef
--- /dev/null
+++ b/third_party/com_github_google_nftables/nftables-rule-handle.patch
@@ -0,0 +1,636 @@
+commit b230daafa27f7cf22c9b9795aee2f0116f108a70
+Author: Jan Schär <jan@monogon.tech>
+Date: Mon Feb 24 10:52:11 2025 +0100
+
+ Set rule handle during flush
+
+ This change makes it possible to delete rules after inserting them,
+ without needing to query the rules first. Rules can be deleted both
+ before and after they are flushed. Additionally, this allows positioning
+ a new rule next to an existing rule, both before and after the existing
+ rule is flushed.
+
+ Upstream PR: https://github.com/google/nftables/pull/299
+
+diff --git a/chain.go b/chain.go
+index 4f4c0a5..f1853cf 100644
+--- a/chain.go
++++ b/chain.go
+@@ -140,7 +140,7 @@ func (cc *Conn) AddChain(c *Chain) *Chain {
+ {Type: unix.NFTA_CHAIN_TYPE, Data: []byte(c.Type + "\x00")},
+ })...)
+ }
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWCHAIN),
+ Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
+@@ -161,7 +161,7 @@ func (cc *Conn) DelChain(c *Chain) {
+ {Type: unix.NFTA_CHAIN_NAME, Data: []byte(c.Name + "\x00")},
+ })
+
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELCHAIN),
+ Flags: netlink.Request | netlink.Acknowledge,
+@@ -179,7 +179,7 @@ func (cc *Conn) FlushChain(c *Chain) {
+ {Type: unix.NFTA_RULE_TABLE, Data: []byte(c.Table.Name + "\x00")},
+ {Type: unix.NFTA_RULE_CHAIN, Data: []byte(c.Name + "\x00")},
+ })
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE),
+ Flags: netlink.Request | netlink.Acknowledge,
+diff --git a/conn.go b/conn.go
+index fef9c2a..6b10844 100644
+--- a/conn.go
++++ b/conn.go
+@@ -17,6 +17,7 @@ package nftables
+ import (
+ "errors"
+ "fmt"
++ "math"
+ "os"
+ "sync"
+ "syscall"
+@@ -38,12 +39,20 @@ type Conn struct {
+ TestDial nltest.Func // for testing only; passed to nltest.Dial
+ NetNS int // fd referencing the network namespace netlink will interact with.
+
+- lasting bool // establish a lasting connection to be used across multiple netlink operations.
+- mu sync.Mutex // protects the following state
+- messages []netlink.Message
+- err error
+- nlconn *netlink.Conn // netlink socket using NETLINK_NETFILTER protocol.
+- sockOptions []SockOption
++ lasting bool // establish a lasting connection to be used across multiple netlink operations.
++ mu sync.Mutex // protects the following state
++ messages []netlinkMessage
++ err error
++ nlconn *netlink.Conn // netlink socket using NETLINK_NETFILTER protocol.
++ sockOptions []SockOption
++ lastID uint32
++ allocatedIDs uint32
++}
++
++type netlinkMessage struct {
++ Header netlink.Header
++ Data []byte
++ handleReply func(reply netlink.Message) error
+ }
+
+ // ConnOption is an option to change the behavior of the nftables Conn returned by Open.
+@@ -168,24 +177,6 @@ func receiveAckAware(nlconn *netlink.Conn, sentMsgFlags netlink.HeaderFlags) ([]
+ return reply, nil
+ }
+
+- if len(reply) != 0 {
+- last := reply[len(reply)-1]
+- for re := last.Header.Type; (re&netlink.Overrun) == netlink.Overrun && (re&netlink.Done) != netlink.Done; re = last.Header.Type {
+- // we are not finished, the message is overrun
+- r, err := nlconn.Receive()
+- if err != nil {
+- return nil, err
+- }
+- reply = append(reply, r...)
+- last = reply[len(reply)-1]
+- }
+-
+- if last.Header.Type == netlink.Error && binaryutil.BigEndian.Uint32(last.Data[:4]) == 0 {
+- // we have already collected an ack
+- return reply, nil
+- }
+- }
+-
+ // Now we expect an ack
+ ack, err := nlconn.Receive()
+ if err != nil {
+@@ -193,8 +184,7 @@ func receiveAckAware(nlconn *netlink.Conn, sentMsgFlags netlink.HeaderFlags) ([]
+ }
+
+ if len(ack) == 0 {
+- // received an empty ack?
+- return reply, nil
++ return nil, errors.New("received an empty ack")
+ }
+
+ msg := ack[0]
+@@ -244,6 +234,7 @@ func (cc *Conn) Flush() error {
+ cc.mu.Lock()
+ defer func() {
+ cc.messages = nil
++ cc.allocatedIDs = 0
+ cc.mu.Unlock()
+ }()
+ if len(cc.messages) == 0 {
+@@ -259,15 +250,53 @@ func (cc *Conn) Flush() error {
+ }
+ defer func() { _ = closer() }()
+
+- if _, err := conn.SendMessages(batch(cc.messages)); err != nil {
++ messages, err := conn.SendMessages(batch(cc.messages))
++ if err != nil {
+ return fmt.Errorf("SendMessages: %w", err)
+ }
+
+ var errs error
++
++ // Fetch replies. Each message with the Echo flag triggers a reply of the same
++ // type. Additionally, if the first message of the batch has the Echo flag, we
++ // get a reply of type NFT_MSG_NEWGEN, which we ignore.
++ replyIndex := 0
++ for replyIndex < len(cc.messages) && cc.messages[replyIndex].Header.Flags&netlink.Echo == 0 {
++ replyIndex++
++ }
++ replies, err := conn.Receive()
++ for err == nil && len(replies) != 0 {
++ reply := replies[0]
++ if reply.Header.Type == netlink.Error && reply.Header.Sequence == messages[1].Header.Sequence {
++ // The next message is the acknowledgement for the first message in the
++ // batch; stop looking for replies.
++ break
++ } else if replyIndex < len(cc.messages) {
++ msg := messages[replyIndex+1]
++ if msg.Header.Sequence == reply.Header.Sequence && msg.Header.Type == reply.Header.Type {
++ err := cc.messages[replyIndex].handleReply(reply)
++ if err != nil {
++ errs = errors.Join(errs, err)
++ }
++ replyIndex++
++ for replyIndex < len(cc.messages) && cc.messages[replyIndex].Header.Flags&netlink.Echo == 0 {
++ replyIndex++
++ }
++ }
++ }
++ replies = replies[1:]
++ if len(replies) == 0 {
++ replies, err = conn.Receive()
++ }
++ }
++
+ // Fetch the requested acknowledgement for each message we sent.
+- for _, msg := range cc.messages {
+- if _, err := receiveAckAware(conn, msg.Header.Flags); err != nil {
+- if errors.Is(err, os.ErrPermission) || errors.Is(err, syscall.ENOBUFS) {
++ for i := range cc.messages {
++ if i != 0 {
++ _, err = conn.Receive()
++ }
++ if err != nil {
++ if errors.Is(err, os.ErrPermission) || errors.Is(err, syscall.ENOBUFS) || errors.Is(err, syscall.ENOMEM) {
+ // Kernel will only send one error to user space.
+ return err
+ }
+@@ -278,6 +307,9 @@ func (cc *Conn) Flush() error {
+ if errs != nil {
+ return fmt.Errorf("conn.Receive: %w", errs)
+ }
++ if replyIndex < len(cc.messages) {
++ return fmt.Errorf("missing reply for message %d in batch", replyIndex)
++ }
+
+ return nil
+ }
+@@ -287,7 +319,7 @@ func (cc *Conn) Flush() error {
+ func (cc *Conn) FlushRuleset() {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELTABLE),
+ Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
+@@ -346,26 +378,47 @@ func (cc *Conn) marshalExpr(fam byte, e expr.Any) []byte {
+ return b
+ }
+
+-func batch(messages []netlink.Message) []netlink.Message {
+- batch := []netlink.Message{
+- {
+- Header: netlink.Header{
+- Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_BEGIN),
+- Flags: netlink.Request,
+- },
+- Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
++func batch(messages []netlinkMessage) []netlink.Message {
++ batch := make([]netlink.Message, len(messages)+2)
++ batch[0] = netlink.Message{
++ Header: netlink.Header{
++ Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_BEGIN),
++ Flags: netlink.Request,
+ },
++ Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
+ }
+
+- batch = append(batch, messages...)
++ for i, msg := range messages {
++ batch[i+1] = netlink.Message{
++ Header: msg.Header,
++ Data: msg.Data,
++ }
++ }
+
+- batch = append(batch, netlink.Message{
++ batch[len(messages)+1] = netlink.Message{
+ Header: netlink.Header{
+ Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_END),
+ Flags: netlink.Request,
+ },
+ Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
+- })
++ }
+
+ return batch
+ }
++
++// allocateTransactionID allocates an identifier which is only valid in the
++// current transaction.
++func (cc *Conn) allocateTransactionID() uint32 {
++ if cc.allocatedIDs == math.MaxUint32 {
++ panic(fmt.Sprintf("trying to allocate more than %d IDs in a single nftables transaction", math.MaxUint32))
++ }
++ // To make it more likely to catch when a transaction ID is erroneously used
++ // in a later transaction, cc.lastID is not reset after each transaction;
++ // instead it is only reset once it rolls over from math.MaxUint32 to 0.
++ cc.allocatedIDs++
++ cc.lastID++
++ if cc.lastID == 0 {
++ cc.lastID = 1
++ }
++ return cc.lastID
++}
+diff --git a/flowtable.go b/flowtable.go
+index 93dbcb5..a35712f 100644
+--- a/flowtable.go
++++ b/flowtable.go
+@@ -142,7 +142,7 @@ func (cc *Conn) AddFlowtable(f *Flowtable) *Flowtable {
+ {Type: unix.NLA_F_NESTED | NFTA_FLOWTABLE_HOOK, Data: cc.marshalAttr(hookAttr)},
+ })...)
+
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | NFT_MSG_NEWFLOWTABLE),
+ Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
+@@ -162,7 +162,7 @@ func (cc *Conn) DelFlowtable(f *Flowtable) {
+ {Type: NFTA_FLOWTABLE_NAME, Data: []byte(f.Name)},
+ })
+
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | NFT_MSG_DELFLOWTABLE),
+ Flags: netlink.Request | netlink.Acknowledge,
+diff --git a/obj.go b/obj.go
+index 3fcd6d7..60d6f76 100644
+--- a/obj.go
++++ b/obj.go
+@@ -124,7 +124,7 @@ func (cc *Conn) AddObj(o Obj) Obj {
+ attrs = append(attrs, netlink.Attribute{Type: unix.NLA_F_NESTED | unix.NFTA_OBJ_DATA, Data: data})
+ }
+
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWOBJ),
+ Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
+@@ -146,7 +146,7 @@ func (cc *Conn) DeleteObject(o Obj) {
+ data := cc.marshalAttr(attrs)
+ data = append(data, cc.marshalAttr([]netlink.Attribute{{Type: unix.NLA_F_NESTED | unix.NFTA_OBJ_DATA}})...)
+
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELOBJ),
+ Flags: netlink.Request | netlink.Acknowledge,
+diff --git a/rule.go b/rule.go
+index 0706834..7798150 100644
+--- a/rule.go
++++ b/rule.go
+@@ -30,6 +30,9 @@ var (
+ delRuleHeaderType = netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE)
+ )
+
++// This constant is missing at unix.NFTA_RULE_POSITION_ID.
++const nfta_rule_position_id = 0xa
++
+ type ruleOperation uint32
+
+ // Possible PayloadOperationType values.
+@@ -42,15 +45,27 @@ const (
+ // A Rule does something with a packet. See also
+ // https://wiki.nftables.org/wiki-nftables/index.php/Simple_rule_management
+ type Rule struct {
+- Table *Table
+- Chain *Chain
++ Table *Table
++ Chain *Chain
++ // Position can be set to the Handle of another Rule to insert the new Rule
++ // before (InsertRule) or after (AddRule) the existing rule.
+ Position uint64
+- Handle uint64
+ // The list of possible flags are specified by nftnl_rule_attr, see
+ // https://git.netfilter.org/libnftnl/tree/include/libnftnl/rule.h#n21
+ // Current nftables go implementation supports only
+ // NFTNL_RULE_POSITION flag for setting rule at position 0
+- Flags uint32
++ Flags uint32
++ // PositionID can be set to the ID of another Rule, same as Position, for when
++ // the existing rule is not yet committed.
++ PositionID uint32
++ // Handle identifies an existing Rule. For a new Rule, this field is set
++ // during the Flush() in which the rule is committed. Make sure to not access
++ // this field concurrently with this Flush() to avoid data races.
++ Handle uint64
++ // ID is an identifier for a new Rule, which is assigned by
++ // AddRule/InsertRule, and only valid before the rule is committed by Flush().
++ // The field is set to 0 during Flush().
++ ID uint32
+ Exprs []expr.Any
+ UserData []byte
+ }
+@@ -81,7 +96,7 @@ func (cc *Conn) GetRules(t *Table, c *Chain) ([]*Rule, error) {
+ message := netlink.Message{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_GETRULE),
+- Flags: netlink.Request | netlink.Acknowledge | netlink.Dump | unix.NLM_F_ECHO,
++ Flags: netlink.Request | netlink.Acknowledge | netlink.Dump,
+ },
+ Data: append(extraHeader(uint8(t.Family), 0), data...),
+ }
+@@ -106,7 +121,6 @@ func (cc *Conn) GetRules(t *Table, c *Chain) ([]*Rule, error) {
+ return rules, nil
+ }
+
+-// AddRule adds the specified Rule
+ func (cc *Conn) newRule(r *Rule, op ruleOperation) *Rule {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+@@ -127,6 +141,11 @@ func (cc *Conn) newRule(r *Rule, op ruleOperation) *Rule {
+ data = append(data, cc.marshalAttr([]netlink.Attribute{
+ {Type: unix.NFTA_RULE_HANDLE, Data: binaryutil.BigEndian.PutUint64(r.Handle)},
+ })...)
++ } else {
++ r.ID = cc.allocateTransactionID()
++ data = append(data, cc.marshalAttr([]netlink.Attribute{
++ {Type: unix.NFTA_RULE_ID, Data: binaryutil.BigEndian.PutUint32(r.ID)},
++ })...)
+ }
+
+ data = append(data, cc.marshalAttr([]netlink.Attribute{
+@@ -147,43 +166,77 @@ func (cc *Conn) newRule(r *Rule, op ruleOperation) *Rule {
+ msgData := []byte{}
+
+ msgData = append(msgData, data...)
+- var flags netlink.HeaderFlags
+ if r.UserData != nil {
+ msgData = append(msgData, cc.marshalAttr([]netlink.Attribute{
+ {Type: unix.NFTA_RULE_USERDATA, Data: r.UserData},
+ })...)
+ }
+
++ var flags netlink.HeaderFlags
++ var handleReply func(reply netlink.Message) error
+ switch op {
+ case operationAdd:
+- flags = netlink.Request | netlink.Acknowledge | netlink.Create | unix.NLM_F_ECHO | unix.NLM_F_APPEND
++ flags = netlink.Request | netlink.Acknowledge | netlink.Create | netlink.Echo | netlink.Append
++ handleReply = r.handleCreateReply
+ case operationInsert:
+- flags = netlink.Request | netlink.Acknowledge | netlink.Create | unix.NLM_F_ECHO
++ flags = netlink.Request | netlink.Acknowledge | netlink.Create | netlink.Echo
++ handleReply = r.handleCreateReply
+ case operationReplace:
+- flags = netlink.Request | netlink.Acknowledge | netlink.Replace | unix.NLM_F_ECHO | unix.NLM_F_REPLACE
++ flags = netlink.Request | netlink.Acknowledge | netlink.Replace
+ }
+
+ if r.Position != 0 || (r.Flags&(1<<unix.NFTA_RULE_POSITION)) != 0 {
+ msgData = append(msgData, cc.marshalAttr([]netlink.Attribute{
+ {Type: unix.NFTA_RULE_POSITION, Data: binaryutil.BigEndian.PutUint64(r.Position)},
+ })...)
++ } else if r.PositionID != 0 {
++ msgData = append(msgData, cc.marshalAttr([]netlink.Attribute{
++ {Type: nfta_rule_position_id, Data: binaryutil.BigEndian.PutUint32(r.PositionID)},
++ })...)
+ }
+
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: newRuleHeaderType,
+ Flags: flags,
+ },
+- Data: append(extraHeader(uint8(r.Table.Family), 0), msgData...),
++ Data: append(extraHeader(uint8(r.Table.Family), 0), msgData...),
++ handleReply: handleReply,
+ })
+
+ return r
+ }
+
++func (r *Rule) handleCreateReply(reply netlink.Message) error {
++ ad, err := netlink.NewAttributeDecoder(reply.Data[4:])
++ if err != nil {
++ return err
++ }
++ ad.ByteOrder = binary.BigEndian
++ var handle uint64
++ for ad.Next() {
++ switch ad.Type() {
++ case unix.NFTA_RULE_HANDLE:
++ handle = ad.Uint64()
++ }
++ }
++ if ad.Err() != nil {
++ return ad.Err()
++ }
++ if handle == 0 {
++ return fmt.Errorf("missing rule handle in create reply")
++ }
++ r.Handle = handle
++ r.ID = 0
++ return nil
++}
++
+ func (cc *Conn) ReplaceRule(r *Rule) *Rule {
+ return cc.newRule(r, operationReplace)
+ }
+
++// AddRule inserts the specified Rule after the existing Rule referenced by
++// Position/PositionID if set, otherwise at the end of the chain.
+ func (cc *Conn) AddRule(r *Rule) *Rule {
+ if r.Handle != 0 {
+ return cc.newRule(r, operationReplace)
+@@ -192,6 +245,8 @@ func (cc *Conn) AddRule(r *Rule) *Rule {
+ return cc.newRule(r, operationAdd)
+ }
+
++// InsertRule inserts the specified Rule before the existing Rule referenced by
++// Position/PositionID if set, otherwise at the beginning of the chain.
+ func (cc *Conn) InsertRule(r *Rule) *Rule {
+ if r.Handle != 0 {
+ return cc.newRule(r, operationReplace)
+@@ -200,7 +255,8 @@ func (cc *Conn) InsertRule(r *Rule) *Rule {
+ return cc.newRule(r, operationInsert)
+ }
+
+-// DelRule deletes the specified Rule, rule's handle cannot be 0
++// DelRule deletes the specified Rule. Either the Handle or ID of the
++// rule must be set.
+ func (cc *Conn) DelRule(r *Rule) error {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+@@ -208,15 +264,20 @@ func (cc *Conn) DelRule(r *Rule) error {
+ {Type: unix.NFTA_RULE_TABLE, Data: []byte(r.Table.Name + "\x00")},
+ {Type: unix.NFTA_RULE_CHAIN, Data: []byte(r.Chain.Name + "\x00")},
+ })
+- if r.Handle == 0 {
+- return fmt.Errorf("rule's handle cannot be 0")
++ if r.Handle != 0 {
++ data = append(data, cc.marshalAttr([]netlink.Attribute{
++ {Type: unix.NFTA_RULE_HANDLE, Data: binaryutil.BigEndian.PutUint64(r.Handle)},
++ })...)
++ } else if r.ID != 0 {
++ data = append(data, cc.marshalAttr([]netlink.Attribute{
++ {Type: unix.NFTA_RULE_ID, Data: binaryutil.BigEndian.PutUint32(r.ID)},
++ })...)
++ } else {
++ return fmt.Errorf("rule must have a handle or ID")
+ }
+- data = append(data, cc.marshalAttr([]netlink.Attribute{
+- {Type: unix.NFTA_RULE_HANDLE, Data: binaryutil.BigEndian.PutUint64(r.Handle)},
+- })...)
+ flags := netlink.Request | netlink.Acknowledge
+
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: delRuleHeaderType,
+ Flags: flags,
+diff --git a/set.go b/set.go
+index a7441d9..412d75a 100644
+--- a/set.go
++++ b/set.go
+@@ -45,8 +45,6 @@ const (
+ NFTA_SET_ELEM_EXPRESSIONS = 0x11
+ )
+
+-var allocSetID uint32
+-
+ // SetDatatype represents a datatype declared by nft.
+ type SetDatatype struct {
+ Name string
+@@ -382,7 +380,7 @@ func (cc *Conn) SetAddElements(s *Set, vals []SetElement) error {
+ if err != nil {
+ return err
+ }
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWSETELEM),
+ Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
+@@ -502,8 +500,7 @@ func (cc *Conn) AddSet(s *Set, vals []SetElement) error {
+ }
+
+ if s.ID == 0 {
+- allocSetID++
+- s.ID = allocSetID
++ s.ID = cc.allocateTransactionID()
+ if s.Anonymous {
+ s.Name = "__set%d"
+ if s.IsMap {
+@@ -653,7 +650,7 @@ func (cc *Conn) AddSet(s *Set, vals []SetElement) error {
+ tableInfo = append(tableInfo, netlink.Attribute{Type: unix.NLA_F_NESTED | NFTA_SET_ELEM_EXPRESSIONS, Data: data})
+ }
+
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWSET),
+ Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
+@@ -668,7 +665,7 @@ func (cc *Conn) AddSet(s *Set, vals []SetElement) error {
+ if err != nil {
+ return err
+ }
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | hdrType),
+ Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
+@@ -688,7 +685,7 @@ func (cc *Conn) DelSet(s *Set) {
+ {Type: unix.NFTA_SET_TABLE, Data: []byte(s.Table.Name + "\x00")},
+ {Type: unix.NFTA_SET_NAME, Data: []byte(s.Name + "\x00")},
+ })
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELSET),
+ Flags: netlink.Request | netlink.Acknowledge,
+@@ -709,7 +706,7 @@ func (cc *Conn) SetDeleteElements(s *Set, vals []SetElement) error {
+ if err != nil {
+ return err
+ }
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELSETELEM),
+ Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
+@@ -728,7 +725,7 @@ func (cc *Conn) FlushSet(s *Set) {
+ {Type: unix.NFTA_SET_TABLE, Data: []byte(s.Table.Name + "\x00")},
+ {Type: unix.NFTA_SET_NAME, Data: []byte(s.Name + "\x00")},
+ })
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELSETELEM),
+ Flags: netlink.Request | netlink.Acknowledge,
+diff --git a/set_test.go b/set_test.go
+index 65a8e00..dd30f45 100644
+--- a/set_test.go
++++ b/set_test.go
+@@ -254,7 +254,10 @@ func TestMarshalSet(t *testing.T) {
+ }
+ msg := c.messages[connMsgSetIdx]
+
+- nset, err := setsFromMsg(msg)
++ nset, err := setsFromMsg(netlink.Message{
++ Header: msg.Header,
++ Data: msg.Data,
++ })
+ if err != nil {
+ t.Fatalf("setsFromMsg() error: %+v", err)
+ }
+diff --git a/table.go b/table.go
+index c391b7b..f7ed1ca 100644
+--- a/table.go
++++ b/table.go
+@@ -57,7 +57,7 @@ func (cc *Conn) DelTable(t *Table) {
+ {Type: unix.NFTA_TABLE_NAME, Data: []byte(t.Name + "\x00")},
+ {Type: unix.NFTA_TABLE_FLAGS, Data: []byte{0, 0, 0, 0}},
+ })
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELTABLE),
+ Flags: netlink.Request | netlink.Acknowledge,
+@@ -73,7 +73,7 @@ func (cc *Conn) addTable(t *Table, flag netlink.HeaderFlags) *Table {
+ {Type: unix.NFTA_TABLE_NAME, Data: []byte(t.Name + "\x00")},
+ {Type: unix.NFTA_TABLE_FLAGS, Data: []byte{0, 0, 0, 0}},
+ })
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWTABLE),
+ Flags: netlink.Request | netlink.Acknowledge | flag,
+@@ -103,7 +103,7 @@ func (cc *Conn) FlushTable(t *Table) {
+ data := cc.marshalAttr([]netlink.Attribute{
+ {Type: unix.NFTA_RULE_TABLE, Data: []byte(t.Name + "\x00")},
+ })
+- cc.messages = append(cc.messages, netlink.Message{
++ cc.messages = append(cc.messages, netlinkMessage{
+ Header: netlink.Header{
+ Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE),
+ Flags: netlink.Request | netlink.Acknowledge,