| package util | 
 |  | 
 | import ( | 
 | 	"context" | 
 | 	"net" | 
 | 	"testing" | 
 |  | 
 | 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes" | 
 | 	"google.golang.org/grpc" | 
 | 	"google.golang.org/grpc/credentials/insecure" | 
 | 	"google.golang.org/grpc/test/bufconn" | 
 |  | 
 | 	apb "source.monogon.dev/metropolis/node/core/curator/proto/api" | 
 | 	cpb "source.monogon.dev/metropolis/proto/common" | 
 |  | 
 | 	"source.monogon.dev/metropolis/pkg/event/memory" | 
 | ) | 
 |  | 
 | // TestCurator is a shim Curator implementation that serves pending Watch | 
 | // requests based on data submitted to a channel. | 
 | type TestCurator struct { | 
 | 	apb.UnimplementedCuratorServer | 
 |  | 
 | 	watchC    chan *apb.WatchEvent | 
 | 	updateReq memory.Value[*apb.UpdateNodeClusterNetworkingRequest] | 
 | } | 
 |  | 
 | // Watch implements a minimum Watch which just returns all nodes at once. | 
 | func (t *TestCurator) Watch(_ *apb.WatchRequest, srv apb.Curator_WatchServer) error { | 
 | 	ctx := srv.Context() | 
 | 	for { | 
 | 		select { | 
 | 		case <-ctx.Done(): | 
 | 			return ctx.Err() | 
 | 		case ev := <-t.watchC: | 
 | 			if err := srv.Send(ev); err != nil { | 
 | 				return err | 
 | 			} | 
 | 		} | 
 | 	} | 
 | } | 
 |  | 
 | func (t *TestCurator) UpdateNodeClusterNetworking(ctx context.Context, req *apb.UpdateNodeClusterNetworkingRequest) (*apb.UpdateNodeClusterNetworkingResponse, error) { | 
 | 	t.updateReq.Set(req) | 
 | 	return &apb.UpdateNodeClusterNetworkingResponse{}, nil | 
 | } | 
 |  | 
 | // NodeWithPrefixes submits a given node/key/address with prefixes to the Watch | 
 | // event channel. | 
 | func (t *TestCurator) NodeWithPrefixes(key wgtypes.Key, id, address string, prefixes ...string) { | 
 | 	var p []*cpb.NodeClusterNetworking_Prefix | 
 | 	for _, prefix := range prefixes { | 
 | 		p = append(p, &cpb.NodeClusterNetworking_Prefix{Cidr: prefix}) | 
 | 	} | 
 | 	n := &apb.Node{ | 
 | 		Id: id, | 
 | 		Status: &cpb.NodeStatus{ | 
 | 			ExternalAddress: address, | 
 | 		}, | 
 | 		Clusternet: &cpb.NodeClusterNetworking{ | 
 | 			WireguardPubkey: key.PublicKey().String(), | 
 | 			Prefixes:        p, | 
 | 		}, | 
 | 		Roles: &cpb.NodeRoles{ | 
 | 			ConsensusMember: &cpb.NodeRoles_ConsensusMember{}, | 
 | 		}, | 
 | 	} | 
 | 	t.watchC <- &apb.WatchEvent{ | 
 | 		Nodes: []*apb.Node{ | 
 | 			n, | 
 | 		}, | 
 | 	} | 
 | } | 
 |  | 
 | // DeleteNode submits a given node for deletion to the Watch event channel. | 
 | func (t *TestCurator) DeleteNode(id string) { | 
 | 	t.watchC <- &apb.WatchEvent{ | 
 | 		NodeTombstones: []*apb.WatchEvent_NodeTombstone{ | 
 | 			{ | 
 | 				NodeId: id, | 
 | 			}, | 
 | 		}, | 
 | 	} | 
 | } | 
 |  | 
 | // MakeTestCurator returns a working TestCurator alongside a grpc connection to | 
 | // it. | 
 | func MakeTestCurator(t *testing.T) (*TestCurator, *grpc.ClientConn) { | 
 | 	cur := &TestCurator{ | 
 | 		watchC: make(chan *apb.WatchEvent), | 
 | 	} | 
 |  | 
 | 	srv := grpc.NewServer() | 
 | 	apb.RegisterCuratorServer(srv, cur) | 
 | 	externalLis := bufconn.Listen(1024 * 1024) | 
 | 	go func() { | 
 | 		if err := srv.Serve(externalLis); err != nil { | 
 | 			t.Fatalf("GRPC serve failed: %v", err) | 
 | 		} | 
 | 	}() | 
 | 	withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) { | 
 | 		return externalLis.Dial() | 
 | 	}) | 
 | 	cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials())) | 
 | 	if err != nil { | 
 | 		t.Fatalf("Dialing GRPC failed: %v", err) | 
 | 	} | 
 |  | 
 | 	return cur, cl | 
 | } |