m/n/c/cluster: implement register flow

Change-Id: I197cbfa96d34c9912c7fc19710db25276e7440fc
Reviewed-on: https://review.monogon.dev/c/monogon/+/454
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index 7af1f55..4d5a772 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -27,8 +27,10 @@
 	"google.golang.org/protobuf/proto"
 
 	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
 	apb "source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
 	"source.monogon.dev/metropolis/test/launch"
 )
 
@@ -233,6 +235,27 @@
 	return out.Close()
 }
 
+// getNodes wraps around Management.GetNodes to return a list of nodes in a
+// cluster.
+func getNodes(ctx context.Context, mgmt apb.ManagementClient) ([]*apb.Node, error) {
+	srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+	if err != nil {
+		return nil, fmt.Errorf("GetNodes: %w", err)
+	}
+	var res []*apb.Node
+	for {
+		node, err := srvN.Recv()
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return nil, fmt.Errorf("GetNodes.Recv: %w", err)
+		}
+		res = append(res, node)
+	}
+	return res, nil
+}
+
 // Gets a random EUI-48 Ethernet MAC address
 func generateRandomEthernetMAC() (*net.HardwareAddr, error) {
 	macBuf := make([]byte, 6)
@@ -294,12 +317,9 @@
 // The given context will be used to run all qemu instances in the cluster, and
 // canceling the context or calling Close() will terminate them.
 func LaunchCluster(ctx context.Context, opts ClusterOptions) (*Cluster, error) {
-	if opts.NumNodes == 0 {
+	if opts.NumNodes <= 0 {
 		return nil, errors.New("refusing to start cluster with zero nodes")
 	}
-	if opts.NumNodes > 1 {
-		return nil, errors.New("unimplemented")
-	}
 
 	ctxT, ctxC := context.WithCancel(ctx)
 
@@ -316,14 +336,15 @@
 		vmPorts = append(vmPorts, vmPort)
 	}
 
-	// Make a list of channels that will be populated and closed by all running node
-	// qemu processes.
+	// Make a list of channels that will be populated by all running node qemu
+	// processes.
 	done := make([]chan error, opts.NumNodes)
 	for i, _ := range done {
 		done[i] = make(chan error, 1)
 	}
 
 	// Start first node.
+	log.Printf("Cluster: Starting node %d...", 1)
 	go func() {
 		err := LaunchNode(ctxT, NodeOptions{
 			ConnectToSocket: vmPorts[0],
@@ -338,6 +359,7 @@
 		done[0] <- err
 	}()
 
+	// Launch nanoswitch.
 	portMap, err := launch.ConflictFreePortMap(ClusterPorts)
 	if err != nil {
 		ctxC()
@@ -378,7 +400,7 @@
 
 	// Retrieve owner certificate - this can take a while because the node is still
 	// coming up, so do it in a backoff loop.
-	log.Printf("Cluster: retrieving owner certificate...")
+	log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
 	aaa := apb.NewAAAClient(initClient)
 	var cert *tls.Certificate
 	err = backoff.Retry(func() error {
@@ -391,15 +413,112 @@
 	}
 	log.Printf("Cluster: retrieved owner certificate.")
 
+	// Build authenticated owner client to new node.
 	authClient, err := rpc.NewAuthenticatedClient(remote, *cert, nil)
 	if err != nil {
 		ctxC()
 		return nil, fmt.Errorf("NewAuthenticatedClient: %w", err)
 	}
+	mgmt := apb.NewManagementClient(authClient)
+
+	// Retrieve register ticket to register further nodes.
+	log.Printf("Cluster: retrieving register ticket...")
+	resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
+	if err != nil {
+		ctxC()
+		return nil, fmt.Errorf("GetRegisterTicket: %w", err)
+	}
+	ticket := resT.Ticket
+	log.Printf("Cluster: retrieved register ticket (%d bytes).", len(ticket))
+
+	// Retrieve cluster info (for directory and ca public key) to register further
+	// nodes.
+	resI, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+	if err != nil {
+		ctxC()
+		return nil, fmt.Errorf("GetClusterInfo: %w", err)
+	}
+
+	// Now run the rest of the nodes.
+	//
+	// TODO(q3k): parallelize this
+	for i := 1; i < opts.NumNodes; i++ {
+		log.Printf("Cluster: Starting node %d...", i+1)
+		go func(i int) {
+			err := LaunchNode(ctxT, NodeOptions{
+				ConnectToSocket: vmPorts[i],
+				NodeParameters: &apb.NodeParameters{
+					Cluster: &apb.NodeParameters_ClusterRegister_{
+						ClusterRegister: &apb.NodeParameters_ClusterRegister{
+							RegisterTicket:   ticket,
+							ClusterDirectory: resI.ClusterDirectory,
+							CaCertificate:    resI.CaCertificate,
+						},
+					},
+				},
+				SerialPort: os.Stdout,
+			})
+			done[i] <- err
+		}(i)
+		var newNode *apb.Node
+
+		log.Printf("Cluster: waiting for node %d to appear as NEW...", i)
+		for {
+			nodes, err := getNodes(ctx, mgmt)
+			if err != nil {
+				ctxC()
+				return nil, fmt.Errorf("could not get nodes: %w", err)
+			}
+			for _, n := range nodes {
+				if n.State == cpb.NodeState_NODE_STATE_NEW {
+					newNode = n
+					break
+				}
+			}
+			if newNode != nil {
+				break
+			}
+			time.Sleep(1 * time.Second)
+		}
+		id := identity.NodeID(newNode.Pubkey)
+		log.Printf("Cluster: node %d is %s", i, id)
+
+		log.Printf("Cluster: approving node %d", i)
+		_, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
+			Pubkey: newNode.Pubkey,
+		})
+		if err != nil {
+			ctxC()
+			return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
+		}
+		log.Printf("Cluster: node %d approved, waiting for it to appear as UP...", i)
+		for {
+			nodes, err := getNodes(ctx, mgmt)
+			if err != nil {
+				ctxC()
+				return nil, fmt.Errorf("could not get nodes: %w", err)
+			}
+			found := false
+			for _, n := range nodes {
+				if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
+					continue
+				}
+				if n.State == cpb.NodeState_NODE_STATE_UP {
+					found = true
+					break
+				}
+				time.Sleep(time.Second)
+			}
+			if found {
+				break
+			}
+		}
+		log.Printf("Cluster: node %d (%s) UP!", i, id)
+	}
 
 	return &Cluster{
 		Debug:      apb.NewNodeDebugServiceClient(debugConn),
-		Management: apb.NewManagementClient(authClient),
+		Management: mgmt,
 		Owner:      *cert,
 		Ports:      portMap,
 		nodesDone:  done,