| package util |
| |
| import ( |
| "context" |
| "net" |
| "testing" |
| |
| "golang.zx2c4.com/wireguard/wgctrl/wgtypes" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/credentials/insecure" |
| "google.golang.org/grpc/test/bufconn" |
| |
| apb "source.monogon.dev/metropolis/node/core/curator/proto/api" |
| cpb "source.monogon.dev/metropolis/proto/common" |
| |
| "source.monogon.dev/metropolis/pkg/event/memory" |
| ) |
| |
| // TestCurator is a shim Curator implementation that serves pending Watch |
| // requests based on data submitted to a channel. |
| type TestCurator struct { |
| apb.UnimplementedCuratorServer |
| |
| watchC chan *apb.WatchEvent |
| updateReq memory.Value[*apb.UpdateNodeClusterNetworkingRequest] |
| } |
| |
| // Watch implements a minimum Watch which just returns all nodes at once. |
| func (t *TestCurator) Watch(_ *apb.WatchRequest, srv apb.Curator_WatchServer) error { |
| ctx := srv.Context() |
| for { |
| select { |
| case <-ctx.Done(): |
| return ctx.Err() |
| case ev := <-t.watchC: |
| if err := srv.Send(ev); err != nil { |
| return err |
| } |
| } |
| } |
| } |
| |
| func (t *TestCurator) UpdateNodeClusterNetworking(ctx context.Context, req *apb.UpdateNodeClusterNetworkingRequest) (*apb.UpdateNodeClusterNetworkingResponse, error) { |
| t.updateReq.Set(req) |
| return &apb.UpdateNodeClusterNetworkingResponse{}, nil |
| } |
| |
| // NodeWithPrefixes submits a given node/key/address with prefixes to the Watch |
| // event channel. |
| func (t *TestCurator) NodeWithPrefixes(key wgtypes.Key, id, address string, prefixes ...string) { |
| var p []*cpb.NodeClusterNetworking_Prefix |
| for _, prefix := range prefixes { |
| p = append(p, &cpb.NodeClusterNetworking_Prefix{Cidr: prefix}) |
| } |
| n := &apb.Node{ |
| Id: id, |
| Status: &cpb.NodeStatus{ |
| ExternalAddress: address, |
| }, |
| Clusternet: &cpb.NodeClusterNetworking{ |
| WireguardPubkey: key.PublicKey().String(), |
| Prefixes: p, |
| }, |
| Roles: &cpb.NodeRoles{ |
| ConsensusMember: &cpb.NodeRoles_ConsensusMember{}, |
| }, |
| } |
| t.watchC <- &apb.WatchEvent{ |
| Nodes: []*apb.Node{ |
| n, |
| }, |
| } |
| } |
| |
| // DeleteNode submits a given node for deletion to the Watch event channel. |
| func (t *TestCurator) DeleteNode(id string) { |
| t.watchC <- &apb.WatchEvent{ |
| NodeTombstones: []*apb.WatchEvent_NodeTombstone{ |
| { |
| NodeId: id, |
| }, |
| }, |
| } |
| } |
| |
| // MakeTestCurator returns a working TestCurator alongside a grpc connection to |
| // it. |
| func MakeTestCurator(t *testing.T) (*TestCurator, *grpc.ClientConn) { |
| cur := &TestCurator{ |
| watchC: make(chan *apb.WatchEvent), |
| } |
| |
| srv := grpc.NewServer() |
| apb.RegisterCuratorServer(srv, cur) |
| externalLis := bufconn.Listen(1024 * 1024) |
| go func() { |
| if err := srv.Serve(externalLis); err != nil { |
| t.Fatalf("GRPC serve failed: %v", err) |
| } |
| }() |
| withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) { |
| return externalLis.Dial() |
| }) |
| cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials())) |
| if err != nil { |
| t.Fatalf("Dialing GRPC failed: %v", err) |
| } |
| |
| return cur, cl |
| } |