blob: 0c7050761b0a4636b7db4248513652a98781c78b [file] [log] [blame]
Serge Bazanski93d593b2023-03-28 16:43:47 +02001package clusternet
2
3import (
4 "context"
5 "fmt"
6 "net"
7 "os"
8 "strings"
9 "sync"
10 "testing"
11 "time"
12
13 "golang.zx2c4.com/wireguard/wgctrl"
14 "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
15 "google.golang.org/grpc"
16 "google.golang.org/grpc/credentials/insecure"
17 "google.golang.org/grpc/test/bufconn"
18
19 common "source.monogon.dev/metropolis/node"
20 "source.monogon.dev/metropolis/node/core/localstorage"
21 "source.monogon.dev/metropolis/node/core/localstorage/declarative"
22 "source.monogon.dev/metropolis/pkg/event/memory"
23 "source.monogon.dev/metropolis/pkg/supervisor"
24
25 apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
26 cpb "source.monogon.dev/metropolis/proto/common"
27)
28
29// testCurator is a shim Curator implementation that serves pending Watch
30// requests based on data submitted to a channel.
31type testCurator struct {
32 apb.UnimplementedCuratorServer
33
34 watchC chan *apb.WatchEvent
35 updateReq memory.Value[*apb.UpdateNodeClusterNetworkingRequest]
36}
37
38// Watch implements a minimum Watch which just returns all nodes at once.
39func (t *testCurator) Watch(_ *apb.WatchRequest, srv apb.Curator_WatchServer) error {
40 ctx := srv.Context()
41 for {
42 select {
43 case <-ctx.Done():
44 return ctx.Err()
45 case ev := <-t.watchC:
46 if err := srv.Send(ev); err != nil {
47 return err
48 }
49 }
50 }
51}
52
53func (t *testCurator) UpdateNodeClusterNetworking(ctx context.Context, req *apb.UpdateNodeClusterNetworkingRequest) (*apb.UpdateNodeClusterNetworkingResponse, error) {
54 t.updateReq.Set(req)
55 return &apb.UpdateNodeClusterNetworkingResponse{}, nil
56}
57
58// nodeWithPrefix submits a given node/key/address with prefixes to the Watch
59// event channel.
60func (t *testCurator) nodeWithPrefixes(key wgtypes.Key, id, address string, prefixes ...string) {
61 var p []*cpb.NodeClusterNetworking_Prefix
62 for _, prefix := range prefixes {
63 p = append(p, &cpb.NodeClusterNetworking_Prefix{Cidr: prefix})
64 }
65 n := &apb.Node{
66 Id: id,
67 Status: &cpb.NodeStatus{
68 ExternalAddress: address,
69 },
70 Clusternet: &cpb.NodeClusterNetworking{
71 WireguardPubkey: key.PublicKey().String(),
72 Prefixes: p,
73 },
74 }
75 t.watchC <- &apb.WatchEvent{
76 Nodes: []*apb.Node{
77 n,
78 },
79 }
80}
81
82// deleteNode submits a given node for deletion to the Watch event channel.
83func (t *testCurator) deleteNode(id string) {
84 t.watchC <- &apb.WatchEvent{
85 NodeTombstones: []*apb.WatchEvent_NodeTombstone{
86 {
87 NodeId: id,
88 },
89 },
90 }
91}
92
93// makeTestCurator returns a working testCurator alongside a grpc connection to
94// it.
95func makeTestCurator(t *testing.T) (*testCurator, *grpc.ClientConn) {
96 cur := &testCurator{
97 watchC: make(chan *apb.WatchEvent),
98 }
99
100 srv := grpc.NewServer()
101 apb.RegisterCuratorServer(srv, cur)
102 externalLis := bufconn.Listen(1024 * 1024)
103 go func() {
104 if err := srv.Serve(externalLis); err != nil {
105 t.Fatalf("GRPC serve failed: %v", err)
106 }
107 }()
108 withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
109 return externalLis.Dial()
110 })
111 cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials()))
112 if err != nil {
113 t.Fatalf("Dialing GRPC failed: %v", err)
114 }
115
116 return cur, cl
117}
118
119// fakeWireguard implements wireguard while keeping peer information internally.
120type fakeWireguard struct {
121 k wgtypes.Key
122
123 muNodes sync.Mutex
124 nodes map[string]*node
125 failNextUpdate bool
126}
127
128func (f *fakeWireguard) ensureOnDiskKey(_ *localstorage.DataKubernetesClusterNetworkingDirectory) error {
129 f.k, _ = wgtypes.GeneratePrivateKey()
130 return nil
131}
132
133func (f *fakeWireguard) setup(clusterNet *net.IPNet) error {
134 f.muNodes.Lock()
135 defer f.muNodes.Unlock()
136 f.nodes = make(map[string]*node)
137 return nil
138}
139
140func (f *fakeWireguard) configurePeers(nodes []*node) error {
141 f.muNodes.Lock()
142 defer f.muNodes.Unlock()
143 if f.failNextUpdate {
144 f.failNextUpdate = false
145 return fmt.Errorf("synthetic test failure")
146 }
147 for _, n := range nodes {
148 f.nodes[n.id] = n
149 }
150 return nil
151}
152
153func (f *fakeWireguard) unconfigurePeer(n *node) error {
154 f.muNodes.Lock()
155 defer f.muNodes.Unlock()
156 delete(f.nodes, n.id)
157 return nil
158}
159
160func (f *fakeWireguard) key() wgtypes.Key {
161 return f.k
162}
163
164func (f *fakeWireguard) close() {
165}
166
167// TestClusternetBasic exercises clusternet with a fake curator and fake
168// wireguard, trying to exercise as many edge cases as possible.
169func TestClusternetBasic(t *testing.T) {
170 key1, err := wgtypes.GeneratePrivateKey()
171 if err != nil {
172 t.Fatalf("Failed to generate private key: %v", err)
173 }
174 key2, err := wgtypes.GeneratePrivateKey()
175 if err != nil {
176 t.Fatalf("Failed to generate private key: %v", err)
177 }
178
179 cur, cl := makeTestCurator(t)
180 defer cl.Close()
181 curator := apb.NewCuratorClient(cl)
182
183 var podNetwork memory.Value[*Prefixes]
184 wg := &fakeWireguard{}
185 svc := Service{
186 Curator: curator,
187 ClusterNet: net.IPNet{
188 IP: net.IP([]byte{10, 10, 0, 0}),
189 Mask: net.IPv4Mask(255, 255, 0, 0),
190 },
191 DataDirectory: nil,
192 LocalKubernetesPodNetwork: &podNetwork,
193
194 wg: wg,
195 }
196 supervisor.TestHarness(t, svc.Run)
197
198 checkState := func(nodes map[string]*node) error {
199 t.Helper()
200 wg.muNodes.Lock()
201 defer wg.muNodes.Unlock()
202 for nid, n := range nodes {
203 n2, ok := wg.nodes[nid]
204 if !ok {
205 return fmt.Errorf("node %q missing in programmed peers", nid)
206 }
207 if n2.pubkey != n.pubkey {
208 return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, n2.pubkey, n.pubkey)
209 }
210 if n2.address != n.address {
211 return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, n2.address, n.address)
212 }
213 p := strings.Join(n.prefixes, ",")
214 p2 := strings.Join(n2.prefixes, ",")
215 if p != p2 {
216 return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, n2.prefixes, n.prefixes)
217 }
218 }
219 for nid, _ := range wg.nodes {
220 if _, ok := nodes[nid]; !ok {
221 return fmt.Errorf("node %q present in programmed peers", nid)
222 }
223 }
224 return nil
225 }
226
227 assertStateEventual := func(nodes map[string]*node) {
228 t.Helper()
229 deadline := time.Now().Add(5 * time.Second)
230 for {
231 err := checkState(nodes)
232 if err == nil {
233 break
234 }
235 if time.Now().After(deadline) {
236 t.Error(err)
237 return
238 }
239 }
240
241 }
242
243 // Start with a single node.
244 cur.nodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.4")
245 assertStateEventual(map[string]*node{
246 "metropolis-fake-1": {
247 pubkey: key1.PublicKey().String(),
248 address: "1.2.3.4",
249 prefixes: nil,
250 },
251 })
252 // Change the node's peer address.
253 cur.nodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5")
254 assertStateEventual(map[string]*node{
255 "metropolis-fake-1": {
256 pubkey: key1.PublicKey().String(),
257 address: "1.2.3.5",
258 prefixes: nil,
259 },
260 })
261 // Add another node.
262 cur.nodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6")
263 assertStateEventual(map[string]*node{
264 "metropolis-fake-1": {
265 pubkey: key1.PublicKey().String(),
266 address: "1.2.3.5",
267 prefixes: nil,
268 },
269 "metropolis-fake-2": {
270 pubkey: key2.PublicKey().String(),
271 address: "1.2.3.6",
272 prefixes: nil,
273 },
274 })
275 // Add some prefixes to both nodes, but fail the next configurePeers call.
276 wg.muNodes.Lock()
277 wg.failNextUpdate = true
278 wg.muNodes.Unlock()
279 cur.nodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5", "10.100.10.0/24", "10.100.20.0/24")
280 cur.nodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6", "10.100.30.0/24", "10.100.40.0/24")
281 assertStateEventual(map[string]*node{
282 "metropolis-fake-1": {
283 pubkey: key1.PublicKey().String(),
284 address: "1.2.3.5",
285 prefixes: []string{
286 "10.100.10.0/24", "10.100.20.0/24",
287 },
288 },
289 "metropolis-fake-2": {
290 pubkey: key2.PublicKey().String(),
291 address: "1.2.3.6",
292 prefixes: []string{
293 "10.100.30.0/24", "10.100.40.0/24",
294 },
295 },
296 })
297 // Delete one of the nodes.
298 cur.deleteNode("metropolis-fake-1")
299 assertStateEventual(map[string]*node{
300 "metropolis-fake-2": {
301 pubkey: key2.PublicKey().String(),
302 address: "1.2.3.6",
303 prefixes: []string{
304 "10.100.30.0/24", "10.100.40.0/24",
305 },
306 },
307 })
308}
309
310// TestWireguardImplementation makes sure localWireguard behaves as expected.
311func TestWireguardIntegration(t *testing.T) {
312 if os.Getenv("IN_KTEST") != "true" {
313 t.Skip("Not in ktest")
314 }
315
316 root := &localstorage.Root{}
317 tmp, err := os.MkdirTemp("", "clusternet")
318 if err != nil {
319 t.Fatal(err)
320 }
321 err = declarative.PlaceFS(root, tmp)
322 if err != nil {
323 t.Fatal(err)
324 }
325 os.MkdirAll(root.Data.Kubernetes.ClusterNetworking.FullPath(), 0700)
326 wg := &localWireguard{}
327
328 // Ensure key once and make note of it.
329 if err := wg.ensureOnDiskKey(&root.Data.Kubernetes.ClusterNetworking); err != nil {
330 t.Fatalf("Could not ensure wireguard key: %v", err)
331 }
332 key := wg.key().String()
333 // Do it again, and make sure the key hasn't changed.
334 wg = &localWireguard{}
335 if err := wg.ensureOnDiskKey(&root.Data.Kubernetes.ClusterNetworking); err != nil {
336 t.Fatalf("Could not ensure wireguard key second time: %v", err)
337 }
338 if want, got := key, wg.key().String(); want != got {
339 t.Fatalf("Key changed, was %q, became %q", want, got)
340 }
341
342 // Setup the interface.
343 cnet := net.IPNet{
344 IP: net.IP([]byte{10, 10, 0, 0}),
345 Mask: net.IPv4Mask(255, 255, 0, 0),
346 }
347 if err := wg.setup(&cnet); err != nil {
348 t.Fatalf("Failed to setup interface: %v", err)
349 }
350 // Do it again.
351 wg.close()
352 if err := wg.setup(&cnet); err != nil {
353 t.Fatalf("Failed to setup interface second time: %v", err)
354 }
355
356 // Check that the key and listen port are configured correctly.
357 wgClient, err := wgctrl.New()
358 if err != nil {
359 t.Fatalf("Failed to create wireguard client: %v", err)
360 }
361 wgDev, err := wgClient.Device(clusterNetDeviceName)
362 if err != nil {
363 t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
364 }
365 if want, got := key, wgDev.PrivateKey.String(); want != got {
366 t.Errorf("Wireguard key mismatch, wanted %q, got %q", want, got)
367 }
368 if want, got := int(common.WireGuardPort), wgDev.ListenPort; want != got {
369 t.Errorf("Wireguard port mismatch, wanted %d, got %d", want, got)
370 }
371
372 // Add some peers and check that we got them.
373 pkeys := make([]wgtypes.Key, 2)
374 pkeys[0], err = wgtypes.GeneratePrivateKey()
375 if err != nil {
376 t.Fatalf("Failed to generate private key: %v", err)
377 }
378 pkeys[1], err = wgtypes.GeneratePrivateKey()
379 if err != nil {
380 t.Fatalf("Failed to generate private key: %v", err)
381 }
382 err = wg.configurePeers([]*node{
383 {
384 pubkey: pkeys[0].PublicKey().String(),
385 address: "10.100.0.1",
386 prefixes: []string{
387 "10.0.0.0/24",
388 "10.0.1.0/24",
389 },
390 },
391 {
392 pubkey: pkeys[1].PublicKey().String(),
393 address: "10.100.1.1",
394 prefixes: []string{
395 "10.1.0.0/24",
396 "10.1.1.0/24",
397 },
398 },
399 })
400 if err != nil {
401 t.Fatalf("Configuring peers failed: %v", err)
402 }
403
404 wgDev, err = wgClient.Device(clusterNetDeviceName)
405 if err != nil {
406 t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
407 }
408 if want, got := 2, len(wgDev.Peers); want != got {
409 t.Errorf("Wanted %d peers, got %d", want, got)
410 } else {
411 for i := 0; i < 2; i++ {
412 if want, got := pkeys[i].PublicKey().String(), wgDev.Peers[i].PublicKey.String(); want != got {
413 t.Errorf("Peer %d should have key %q, got %q", i, want, got)
414 }
415 if want, got := fmt.Sprintf("10.100.%d.1:%s", i, common.WireGuardPort.PortString()), wgDev.Peers[i].Endpoint.String(); want != got {
416 t.Errorf("Peer %d should have endpoint %q, got %q", i, want, got)
417 }
418 if want, got := 2, len(wgDev.Peers[i].AllowedIPs); want != got {
419 t.Errorf("Peer %d should have %d peers, got %d", i, want, got)
420 } else {
421 for j := 0; j < 2; j++ {
422 if want, got := fmt.Sprintf("10.%d.%d.0/24", i, j), wgDev.Peers[i].AllowedIPs[j].String(); want != got {
423 t.Errorf("Peer %d should have allowed ip %d %q, got %q", i, j, want, got)
424 }
425 }
426 }
427 }
428 }
429
430 // Update one of the peers and check that things got applied.
431 err = wg.configurePeers([]*node{
432 {
433 pubkey: pkeys[0].PublicKey().String(),
434 address: "10.100.0.3",
435 prefixes: []string{
436 "10.0.0.0/24",
437 },
438 },
439 })
440 if err != nil {
441 t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
442 }
443 wgDev, err = wgClient.Device(clusterNetDeviceName)
444 if err != nil {
445 t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
446 }
447 if want, got := 2, len(wgDev.Peers); want != got {
448 t.Errorf("Wanted %d peers, got %d", want, got)
449 } else {
450 if want, got := pkeys[0].PublicKey().String(), wgDev.Peers[0].PublicKey.String(); want != got {
451 t.Errorf("Peer 0 should have key %q, got %q", want, got)
452 }
453 if want, got := fmt.Sprintf("10.100.0.3:%s", common.WireGuardPort.PortString()), wgDev.Peers[0].Endpoint.String(); want != got {
454 t.Errorf("Peer 0 should have endpoint %q, got %q", want, got)
455 }
456 if want, got := 1, len(wgDev.Peers[0].AllowedIPs); want != got {
457 t.Errorf("Peer 0 should have %d peers, got %d", want, got)
458 } else {
459 if want, got := "10.0.0.0/24", wgDev.Peers[0].AllowedIPs[0].String(); want != got {
460 t.Errorf("Peer 0 should have allowed ip 0 %q, got %q", want, got)
461 }
462 }
463 }
464
465 // Remove one of the peers and make sure it's gone.
466 err = wg.unconfigurePeer(&node{
467 pubkey: pkeys[0].PublicKey().String(),
468 })
469 if err != nil {
470 t.Fatalf("Failed to unconfigure peer: %v", err)
471 }
472 err = wg.unconfigurePeer(&node{
473 pubkey: pkeys[0].PublicKey().String(),
474 })
475 if err != nil {
476 t.Fatalf("Failed to unconfigure peer a second time: %v", err)
477 }
478 wgDev, err = wgClient.Device(clusterNetDeviceName)
479 if err != nil {
480 t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
481 }
482 if want, got := 1, len(wgDev.Peers); want != got {
483 t.Errorf("Wanted %d peer, got %d", want, got)
484 }
485}