blob: 7d1f36ad422fa294969d1f221d33e1b9dd34baaf [file] [log] [blame]
Tim Windelschmidt5d0906e2023-07-20 20:23:57 +02001package util
2
3import (
4 "context"
5 "net"
6 "testing"
7
8 "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
9 "google.golang.org/grpc"
10 "google.golang.org/grpc/credentials/insecure"
11 "google.golang.org/grpc/test/bufconn"
12
13 apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
14 cpb "source.monogon.dev/metropolis/proto/common"
15
16 "source.monogon.dev/metropolis/pkg/event/memory"
17)
18
19// TestCurator is a shim Curator implementation that serves pending Watch
20// requests based on data submitted to a channel.
21type TestCurator struct {
22 apb.UnimplementedCuratorServer
23
24 watchC chan *apb.WatchEvent
25 updateReq memory.Value[*apb.UpdateNodeClusterNetworkingRequest]
26}
27
28// Watch implements a minimum Watch which just returns all nodes at once.
29func (t *TestCurator) Watch(_ *apb.WatchRequest, srv apb.Curator_WatchServer) error {
30 ctx := srv.Context()
31 for {
32 select {
33 case <-ctx.Done():
34 return ctx.Err()
35 case ev := <-t.watchC:
36 if err := srv.Send(ev); err != nil {
37 return err
38 }
39 }
40 }
41}
42
43func (t *TestCurator) UpdateNodeClusterNetworking(ctx context.Context, req *apb.UpdateNodeClusterNetworkingRequest) (*apb.UpdateNodeClusterNetworkingResponse, error) {
44 t.updateReq.Set(req)
45 return &apb.UpdateNodeClusterNetworkingResponse{}, nil
46}
47
48// NodeWithPrefixes submits a given node/key/address with prefixes to the Watch
49// event channel.
50func (t *TestCurator) NodeWithPrefixes(key wgtypes.Key, id, address string, prefixes ...string) {
51 var p []*cpb.NodeClusterNetworking_Prefix
52 for _, prefix := range prefixes {
53 p = append(p, &cpb.NodeClusterNetworking_Prefix{Cidr: prefix})
54 }
55 n := &apb.Node{
56 Id: id,
57 Status: &cpb.NodeStatus{
58 ExternalAddress: address,
59 },
60 Clusternet: &cpb.NodeClusterNetworking{
61 WireguardPubkey: key.PublicKey().String(),
62 Prefixes: p,
63 },
64 Roles: &cpb.NodeRoles{
65 ConsensusMember: &cpb.NodeRoles_ConsensusMember{},
66 },
67 }
68 t.watchC <- &apb.WatchEvent{
69 Nodes: []*apb.Node{
70 n,
71 },
72 }
73}
74
75// DeleteNode submits a given node for deletion to the Watch event channel.
76func (t *TestCurator) DeleteNode(id string) {
77 t.watchC <- &apb.WatchEvent{
78 NodeTombstones: []*apb.WatchEvent_NodeTombstone{
79 {
80 NodeId: id,
81 },
82 },
83 }
84}
85
86// MakeTestCurator returns a working TestCurator alongside a grpc connection to
87// it.
88func MakeTestCurator(t *testing.T) (*TestCurator, *grpc.ClientConn) {
89 cur := &TestCurator{
90 watchC: make(chan *apb.WatchEvent),
91 }
92
93 srv := grpc.NewServer()
94 apb.RegisterCuratorServer(srv, cur)
95 externalLis := bufconn.Listen(1024 * 1024)
96 go func() {
97 if err := srv.Serve(externalLis); err != nil {
98 t.Fatalf("GRPC serve failed: %v", err)
99 }
100 }()
101 withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
102 return externalLis.Dial()
103 })
104 cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials()))
105 if err != nil {
106 t.Fatalf("Dialing GRPC failed: %v", err)
107 }
108
109 return cur, cl
110}