blob: cde3e0e9dd898c881eed6ba4c9dd9bc12440ed09 [file] [log] [blame]
Serge Bazanski93d593b2023-03-28 16:43:47 +02001// Package clusternet implements a Cluster Networking mesh service running on all
2// Metropolis nodes.
3//
4// The mesh is based on wireguard and a centralized configuration store in the
5// cluster Curator (in etcd).
6//
7// While the implementation is nearly generic, it currently makes an assumption
8// that it is used only for Kubernetes pod networking. That has a few
9// implications:
10//
11// First, we only have a single real route on the host into the wireguard
12// networking mesh / interface, and that is configured ahead of time in the
13// Service as ClusterNet. All destination addresses that should be carried by the
14// mesh must thus be part of this single route. Otherwise, traffic will be able
15// to flow into the node from other nodes, but will exit through another
16// interface. This is used in practice to allow other host nodes (whose external
17// addresses are outside the cluster network) to access the cluster network.
18//
Serge Bazanskib565cc62023-03-30 18:43:51 +020019// Second, we have two hardcoded/purpose-specific sources of prefixes:
20// 1. Pod networking node prefixes from the kubelet
21// 2. The host's external IP address (as a /32) from the network service.
Serge Bazanski93d593b2023-03-28 16:43:47 +020022package clusternet
23
24import (
25 "context"
26 "fmt"
27 "net"
Serge Bazanskib565cc62023-03-30 18:43:51 +020028 "net/netip"
Serge Bazanski60461b22023-10-26 19:16:59 +020029 "slices"
Serge Bazanski93d593b2023-03-28 16:43:47 +020030
31 "github.com/cenkalti/backoff/v4"
Tim Windelschmidt011dce62024-03-03 16:00:52 +010032 "github.com/vishvananda/netlink"
Serge Bazanski93d593b2023-03-28 16:43:47 +020033
Serge Bazanski60461b22023-10-26 19:16:59 +020034 "source.monogon.dev/metropolis/node/core/curator/watcher"
Serge Bazanski93d593b2023-03-28 16:43:47 +020035 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanskib565cc62023-03-30 18:43:51 +020036 "source.monogon.dev/metropolis/node/core/network"
Serge Bazanski93d593b2023-03-28 16:43:47 +020037 "source.monogon.dev/metropolis/pkg/event"
38 "source.monogon.dev/metropolis/pkg/supervisor"
Serge Bazanskib565cc62023-03-30 18:43:51 +020039
40 apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Serge Bazanski93d593b2023-03-28 16:43:47 +020041 cpb "source.monogon.dev/metropolis/proto/common"
42)
43
44// Service implements the Cluster Networking Mesh. See package-level docs for
45// more details.
46type Service struct {
47 // Curator is the gRPC client that the service will use to reach the cluster's
48 // Curator, for pushing locally announced prefixes and pulling information about
49 // other nodes.
50 Curator apb.CuratorClient
51 // ClusterNet is the prefix that will be programmed to exit through the wireguard
52 // mesh.
53 ClusterNet net.IPNet
54 // DataDirectory is where the WireGuard key of this node will be stored.
55 DataDirectory *localstorage.DataKubernetesClusterNetworkingDirectory
56 // LocalKubernetesPodNetwork is an event.Value watched for prefixes that should
57 // be announced into the mesh. This is to be Set by the Kubernetes service once
58 // it knows about the local node's IPAM address assignment.
59 LocalKubernetesPodNetwork event.Value[*Prefixes]
Serge Bazanskib565cc62023-03-30 18:43:51 +020060 // Network service used to get the local node's IP address to submit it as a /32.
61 Network event.Value[*network.Status]
Serge Bazanski93d593b2023-03-28 16:43:47 +020062
63 // wg is the interface to all the low-level interactions with WireGuard (and
64 // kernel routing). If not set, this defaults to a production implementation.
65 // This can be overridden by test to a test implementation instead.
66 wg wireguard
67}
68
69// Run the Service. This must be used in a supervisor Runnable.
70func (s *Service) Run(ctx context.Context) error {
71 if s.wg == nil {
72 s.wg = &localWireguard{}
73 }
74 if err := s.wg.ensureOnDiskKey(s.DataDirectory); err != nil {
75 return fmt.Errorf("could not ensure wireguard key: %w", err)
76 }
77 if err := s.wg.setup(&s.ClusterNet); err != nil {
78 return fmt.Errorf("could not setup wireguard: %w", err)
79 }
80
81 supervisor.Logger(ctx).Infof("Wireguard setup complete, starting updaters...")
82
Serge Bazanskib565cc62023-03-30 18:43:51 +020083 kubeC := make(chan *Prefixes)
84 netC := make(chan *network.Status)
85 if err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
86 "source-kubernetes": event.Pipe(s.LocalKubernetesPodNetwork, kubeC),
87 "source-network": event.Pipe(s.Network, netC),
88 "push": func(ctx context.Context) error {
89 return s.push(ctx, kubeC, netC)
90 },
91 }); err != nil {
Serge Bazanski93d593b2023-03-28 16:43:47 +020092 return err
93 }
Serge Bazanskib565cc62023-03-30 18:43:51 +020094
95 if err := supervisor.Run(ctx, "pull", s.pull); err != nil {
Serge Bazanski93d593b2023-03-28 16:43:47 +020096 return err
97 }
98 supervisor.Signal(ctx, supervisor.SignalHealthy)
99 <-ctx.Done()
100 return ctx.Err()
101}
102
103// push is the sub-runnable responsible for letting the Curator know about what
104// prefixes that are originated by this node.
Serge Bazanskib565cc62023-03-30 18:43:51 +0200105func (s *Service) push(ctx context.Context, kubeC chan *Prefixes, netC chan *network.Status) error {
Serge Bazanski93d593b2023-03-28 16:43:47 +0200106 supervisor.Signal(ctx, supervisor.SignalHealthy)
107
Serge Bazanskib565cc62023-03-30 18:43:51 +0200108 var kubePrefixes *Prefixes
Serge Bazanski521a8352023-06-29 12:38:17 +0200109 var prevKubePrefixes *Prefixes
110
Serge Bazanskib565cc62023-03-30 18:43:51 +0200111 var localAddr net.IP
Serge Bazanski521a8352023-06-29 12:38:17 +0200112 var prevLocalAddr net.IP
113
Serge Bazanski93d593b2023-03-28 16:43:47 +0200114 for {
Serge Bazanski521a8352023-06-29 12:38:17 +0200115 kubeChanged := false
116 localChanged := false
117
Serge Bazanskib565cc62023-03-30 18:43:51 +0200118 select {
119 case <-ctx.Done():
120 return ctx.Err()
121 case kubePrefixes = <-kubeC:
Serge Bazanski521a8352023-06-29 12:38:17 +0200122 if !kubePrefixes.Equal(prevKubePrefixes) {
123 kubeChanged = true
124 }
Serge Bazanskib565cc62023-03-30 18:43:51 +0200125 case n := <-netC:
126 localAddr = n.ExternalAddress
Serge Bazanski521a8352023-06-29 12:38:17 +0200127 if !localAddr.Equal(prevLocalAddr) {
128 localChanged = true
129 }
130 }
131
Tim Windelschmidt011dce62024-03-03 16:00:52 +0100132 if kubeChanged {
133 if err := configureKubeNetwork(prevKubePrefixes, kubePrefixes); err != nil {
134 supervisor.Logger(ctx).Warningf("Could not configure cluster networking update: %v", err)
135 }
136 }
137
Serge Bazanski521a8352023-06-29 12:38:17 +0200138 // Ignore spurious updates.
139 if !localChanged && !kubeChanged {
140 continue
Serge Bazanski93d593b2023-03-28 16:43:47 +0200141 }
142
Serge Bazanskib565cc62023-03-30 18:43:51 +0200143 // Prepare prefixes to submit to cluster.
144 var prefixes Prefixes
145
146 // Do we have a local node address? Add it to the prefixes.
147 if len(localAddr) > 0 {
148 addr, ok := netip.AddrFromSlice(localAddr)
149 if ok {
150 prefixes = append(prefixes, netip.PrefixFrom(addr, 32))
151 }
152 }
153 // Do we have any kubelet prefixes? Add them, too.
154 if kubePrefixes != nil {
155 prefixes.Update(kubePrefixes)
156 }
157
Serge Bazanski521a8352023-06-29 12:38:17 +0200158 supervisor.Logger(ctx).Infof("Submitting prefixes: %s (kube update: %v, local update: %v)", prefixes, kubeChanged, localChanged)
Serge Bazanskib565cc62023-03-30 18:43:51 +0200159
160 err := backoff.Retry(func() error {
Serge Bazanski93d593b2023-03-28 16:43:47 +0200161 _, err := s.Curator.UpdateNodeClusterNetworking(ctx, &apb.UpdateNodeClusterNetworkingRequest{
162 Clusternet: &cpb.NodeClusterNetworking{
163 WireguardPubkey: s.wg.key().PublicKey().String(),
Serge Bazanskib565cc62023-03-30 18:43:51 +0200164 Prefixes: prefixes.proto(),
Serge Bazanski93d593b2023-03-28 16:43:47 +0200165 },
166 })
167 if err != nil {
168 supervisor.Logger(ctx).Warningf("Could not submit cluster networking update: %v", err)
169 }
170 return err
171 }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
172 if err != nil {
173 return fmt.Errorf("couldn't update curator: %w", err)
174 }
Serge Bazanski521a8352023-06-29 12:38:17 +0200175
176 prevKubePrefixes = kubePrefixes
177 prevLocalAddr = localAddr
178
Serge Bazanski93d593b2023-03-28 16:43:47 +0200179 }
180}
181
Tim Windelschmidt011dce62024-03-03 16:00:52 +0100182// configureKubeNetwork configures the point-to-point peer IP address of the
183// node host network namespace (i.e. the one container P2P interfaces point to)
184// on its loopback interface to make it eligible to be used as a source IP
185// address for communication into the clusternet overlay.
186func configureKubeNetwork(oldPrefixes *Prefixes, newPrefixes *Prefixes) error {
187 // diff maps prefixes to be removed to false
188 // and prefixes to be added to true.
189 diff := make(map[netip.Prefix]bool)
190
191 if newPrefixes != nil {
192 for _, newAddr := range *newPrefixes {
193 diff[newAddr] = true
194 }
195 }
196
197 if oldPrefixes != nil {
198 for _, oldAddr := range *oldPrefixes {
199 // Remove all prefixes in both the old
200 // and new prefix sets from `diff`.
201 if diff[oldAddr] {
202 delete(diff, oldAddr)
203 continue
204 }
205
206 // Mark all remaining (i.e. ones not in the new prefix set)
207 // prefixes for removal.
208 diff[oldAddr] = false
209 }
210 }
211
212 loInterface, err := netlink.LinkByName("lo")
213 if err != nil {
214 return fmt.Errorf("while getting lo interface: %w", err)
215 }
216
217 for prefix, shouldAdd := range diff {
218 // By CNI convention the first IP after the subnet base address is the
219 // point-to-point partner for all pod veths. To make this IP eligible
220 // to be used as a general host network namespace source IP we also add
221 // it to the loopback interface. This ensures that the kernel picks it
222 // as the source IP for traffic flowing into clusternet
223 // (due to its preference for source IPs in the same subnet).
224 addr := &netlink.Addr{
225 IPNet: &net.IPNet{
226 IP: prefix.Addr().Next().AsSlice(),
227 Mask: net.CIDRMask(prefix.Addr().BitLen(), prefix.Addr().BitLen()),
228 },
229 }
230
231 if shouldAdd {
232 if err := netlink.AddrAdd(loInterface, addr); err != nil {
233 return fmt.Errorf("assigning extra loopback IP: %v", err)
234 }
235 } else {
236 if err := netlink.AddrDel(loInterface, addr); err != nil {
237 return fmt.Errorf("removing extra loopback IP: %v", err)
238 }
239 }
240 }
241
242 return nil
243}
244
Serge Bazanski93d593b2023-03-28 16:43:47 +0200245// pull is the sub-runnable responsible for fetching information about the
246// cluster networking setup/status of other nodes, and programming it as
247// WireGuard peers.
248func (s *Service) pull(ctx context.Context) error {
249 supervisor.Signal(ctx, supervisor.SignalHealthy)
250
Serge Bazanski60461b22023-10-26 19:16:59 +0200251 var batch []*apb.Node
252 return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
253 FilterFn: func(a *apb.Node) bool {
254 if a.Clusternet == nil {
255 return false
256 }
257 if a.Clusternet.WireguardPubkey == "" {
258 return false
259 }
260 return true
261 },
262 EqualsFn: func(a *apb.Node, b *apb.Node) bool {
263 if a.Status.ExternalAddress != b.Status.ExternalAddress {
264 return false
265 }
266 if a.Clusternet.WireguardPubkey != b.Clusternet.WireguardPubkey {
267 return false
268 }
269 if !slices.Equal(a.Clusternet.Prefixes, b.Clusternet.Prefixes) {
270 return false
271 }
272 return true
273 },
274 OnNewUpdated: func(new *apb.Node) error {
275 batch = append(batch, new)
276 return nil
277 },
278 OnBatchDone: func() error {
279 if err := s.wg.configurePeers(batch); err != nil {
280 supervisor.Logger(ctx).Errorf("nodes couldn't be configured: %v", err)
281 }
282 batch = nil
283 return nil
284 },
285 OnDeleted: func(prev *apb.Node) error {
286 if err := s.wg.unconfigurePeer(prev); err != nil {
287 supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", prev.Id, err)
288 }
289 return nil
Serge Bazanski93d593b2023-03-28 16:43:47 +0200290 },
291 })
Serge Bazanski93d593b2023-03-28 16:43:47 +0200292}