m/n/c/cluster: implement register flow
Change-Id: I197cbfa96d34c9912c7fc19710db25276e7440fc
Reviewed-on: https://review.monogon.dev/c/monogon/+/454
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index 7af1f55..4d5a772 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -27,8 +27,10 @@
"google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
"source.monogon.dev/metropolis/test/launch"
)
@@ -233,6 +235,27 @@
return out.Close()
}
+// getNodes wraps around Management.GetNodes to return a list of nodes in a
+// cluster.
+func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
+ srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+ if err != nil {
+ return nil, fmt.Errorf("GetNodes: %w", err)
+ }
+ var res []*apb.Node
+ for {
+ node, err := srvN.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, fmt.Errorf("GetNodes.Recv: %w", err)
+ }
+ res = append(res, node)
+ }
+ return res, nil
+}
+
// Gets a random EUI-48 Ethernet MAC address
func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
macBuf := make([]byte, 6)
@@ -294,12 +317,9 @@
// The given context will be used to run all qemu instances in the cluster, and
// canceling the context or calling Close() will terminate them.
func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
- if opts.NumNodes == 0 {
+ if opts.NumNodes <= 0 {
return nil, errors.New("refusing to start cluster with zero nodes")
}
- if opts.NumNodes > 1 {
- return nil, errors.New("unimplemented")
- }
ctxT, ctxC := context.WithCancel(ctx)
@@ -316,14 +336,15 @@
vmPorts = append(vmPorts, vmPort)
}
- // Make a list of channels that will be populated and closed by all running node
- // qemu processes.
+ // Make a list of channels that will be populated by all running node qemu
+ // processes.
done := make([]chan error, opts.NumNodes)
for i, _ := range done {
done[i] = make(chan error, 1)
}
// Start first node.
+ log.Printf("Cluster: Starting node %d...", 1)
go func() {
err := LaunchNode(ctxT, NodeOptions{
ConnectToSocket: vmPorts[0],
@@ -338,6 +359,7 @@
done[0] <- err
}()
+ // Launch nanoswitch.
portMap, err := launch.ConflictFreePortMap(ClusterPorts)
if err != nil {
ctxC()
@@ -378,7 +400,7 @@
// Retrieve owner certificate - this can take a while because the node is still
// coming up, so do it in a backoff loop.
- log.Printf("Cluster: retrieving owner certificate...")
+ log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
aaa := apb.NewAAAClient(initClient)
var cert *tls.Certificate
err = backoff.Retry(func() error {
@@ -391,15 +413,112 @@
}
log.Printf("Cluster: retrieved owner certificate.")
+ // Build authenticated owner client to new node.
authClient, err := rpc.NewAuthenticatedClient(remote, *cert, nil)
if err != nil {
ctxC()
return nil, fmt.Errorf("NewAuthenticatedClient: %w", err)
}
+ mgmt := apb.NewManagementClient(authClient)
+
+ // Retrieve register ticket to register further nodes.
+ log.Printf("Cluster: retrieving register ticket...")
+ resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
+ if err != nil {
+ ctxC()
+ return nil, fmt.Errorf("GetRegisterTicket: %w", err)
+ }
+ ticket := resT.Ticket
+ log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
+
+ // Retrieve cluster info (for directory and ca public key) to register further
+ // nodes.
+ resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+ if err != nil {
+ ctxC()
+ return nil, fmt.Errorf("GetClusterInfo: %w", err)
+ }
+
+ // Now run the rest of the nodes.
+ //
+ // TODO(q3k): parallelize this
+ for i := 1; i < opts.NumNodes; i++ {
+ log.Printf("Cluster: Starting node %d...", i+1)
+ go func(i int) {
+ err := LaunchNode(ctxT, NodeOptions{
+ ConnectToSocket: vmPorts[i],
+ NodeParameters: &apb.NodeParameters{
+ Cluster: &apb.NodeParameters_ClusterRegister_{
+ ClusterRegister: &apb.NodeParameters_ClusterRegister{
+ RegisterTicket: ticket,
+ ClusterDirectory: resI.ClusterDirectory,
+ CaCertificate: resI.CaCertificate,
+ },
+ },
+ },
+ SerialPort: os.Stdout,
+ })
+ done[i] <- err
+ }(i)
+ var newNode *apb.Node
+
+ log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
+ for {
+ nodes, err := getNodes(ctx, mgmt)
+ if err != nil {
+ ctxC()
+ return nil, fmt.Errorf("could not get nodes: %w", err)
+ }
+ for _, n := range nodes {
+ if n.State == cpb.NodeState_NODE_STATE_NEW {
+ newNode = n
+ break
+ }
+ }
+ if newNode != nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ }
+ id := identity.NodeID(newNode.Pubkey)
+ log.Printf("Cluster: node %d is %s", i, id)
+
+ log.Printf("Cluster: approving node %d", i)
+ _, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
+ Pubkey: newNode.Pubkey,
+ })
+ if err != nil {
+ ctxC()
+ return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
+ }
+ log.Printf("Cluster: node %d approved, waiting for it to appear as UP...", i)
+ for {
+ nodes, err := getNodes(ctx, mgmt)
+ if err != nil {
+ ctxC()
+ return nil, fmt.Errorf("could not get nodes: %w", err)
+ }
+ found := false
+ for _, n := range nodes {
+ if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
+ continue
+ }
+ if n.State == cpb.NodeState_NODE_STATE_UP {
+ found = true
+ break
+ }
+ time.Sleep(time.Second)
+ }
+ if found {
+ break
+ }
+ }
+ log.Printf("Cluster: node %d (%s) UP!", i, id)
+ }
return &Cluster{
Debug: apb.NewNodeDebugServiceClient(debugConn),
- Management: apb.NewManagementClient(authClient),
+ Management: mgmt,
Owner: *cert,
Ports: portMap,
nodesDone: done,