m/test/e2e: split core/kubernetes tests, clean up

This splits the large TestE2E function into two separate functions and
tests: one which exercises the core functionality of Kubernetes, the
other which exercises just the Kubernetes bits.

This allows for easier testing during development, and generally trades
off higher resources usage for faster execution time in CI.

At the same time we do some small cleanups of the E2E functionality:

 1. Node startup is now parallelized.
 2. Non-bootstrap nodes can now be left in NEW (this was used in
    diagnosing issue #234, but it currently unused in the main code).
 3. Kubernetes access now goes over SOCKS.
 4. Some Cluster helper functions have been added.

All in all this should allow us writing more E2E tests in the future,
and at some point also maybe turn Cluster into an interface that is
implemented both by the current framework but also some persistent tests
running against long-term VMs/physical machines.

Change-Id: Ia4586b2aaa5fc8c979d35f4b49513638481e4c10
Reviewed-on: https://review.monogon.dev/c/monogon/+/1870
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index 615a9cc..5b39f67 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -11,6 +11,7 @@
 	"crypto/rand"
 	"crypto/tls"
 	"crypto/x509"
+	"encoding/pem"
 	"errors"
 	"fmt"
 	"io"
@@ -30,6 +31,8 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"google.golang.org/protobuf/proto"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/rest"
 
 	metroctl "source.monogon.dev/metropolis/cli/metroctl/core"
 	"source.monogon.dev/metropolis/cli/pkg/datafile"
@@ -487,6 +490,11 @@
 	// The files will be located within the launch directory inside TEST_TMPDIR (or
 	// the default tempdir location, if not set).
 	NodeLogsToFiles bool
+
+	// LeaveNodesNew, if set, will leave all non-bootstrap nodes in NEW, without
+	// bootstrapping them. The nodes' address information in Cluster.Nodes will be
+	// incomplete.
+	LeaveNodesNew bool
 }
 
 // Cluster is the running Metropolis cluster launched using the LaunchCluster
@@ -543,7 +551,8 @@
 // NodeInCluster represents information about a node that's part of a Cluster.
 type NodeInCluster struct {
 	// ID of the node, which can be used to dial this node's services via DialNode.
-	ID string
+	ID     string
+	Pubkey []byte
 	// Address of the node on the network ran by nanoswitch. Not reachable from the
 	// host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
 	// on Cluster.Ports[SOCKSPort]).
@@ -868,8 +877,6 @@
 	}
 
 	// Now run the rest of the nodes.
-	//
-	// TODO(q3k): parallelize this
 	for i := 1; i < opts.NumNodes; i++ {
 		launch.Log("Cluster: Starting node %d...", i+1)
 		go func(i int) {
@@ -879,9 +886,11 @@
 			}
 			done[i] <- err
 		}(i)
-		var newNode *apb.Node
+	}
 
-		launch.Log("Cluster: waiting for node %d to appear as NEW...", i)
+	seenNodes := make(map[string]bool)
+	launch.Log("Cluster: waiting for nodes to appear as NEW...")
+	for i := 1; i < opts.NumNodes; i++ {
 		for {
 			nodes, err := getNodes(ctx, mgmt)
 			if err != nil {
@@ -889,65 +898,75 @@
 				return nil, fmt.Errorf("could not get nodes: %w", err)
 			}
 			for _, n := range nodes {
-				if n.State == cpb.NodeState_NODE_STATE_NEW {
-					newNode = n
-					break
+				if n.State != cpb.NodeState_NODE_STATE_NEW {
+					continue
 				}
+				seenNodes[n.Id] = true
+				cluster.Nodes[n.Id] = &NodeInCluster{
+					ID:     n.Id,
+					Pubkey: n.Pubkey,
+				}
+				cluster.NodeIDs = append(cluster.NodeIDs, n.Id)
 			}
-			if newNode != nil {
+
+			if len(seenNodes) == opts.NumNodes-1 {
 				break
 			}
 			time.Sleep(1 * time.Second)
 		}
-		id := identity.NodeID(newNode.Pubkey)
-		launch.Log("Cluster: node %d is %s", i, id)
+	}
+	launch.Log("Found all expected nodes")
 
-		launch.Log("Cluster: approving node %d", i)
-		_, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
-			Pubkey: newNode.Pubkey,
-		})
-		if err != nil {
-			ctxC()
-			return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
-		}
-		launch.Log("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
+	approvedNodes := make(map[string]bool)
+	upNodes := make(map[string]bool)
+	if !opts.LeaveNodesNew {
 		for {
 			nodes, err := getNodes(ctx, mgmt)
 			if err != nil {
 				ctxC()
 				return nil, fmt.Errorf("could not get nodes: %w", err)
 			}
-			found := false
-			for _, n := range nodes {
-				if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
+			for _, node := range nodes {
+				if !seenNodes[node.Id] {
+					// Skip nodes that weren't NEW in the previous step.
 					continue
 				}
-				if n.Status == nil || n.Status.ExternalAddress == "" {
-					break
+
+				if node.State == cpb.NodeState_NODE_STATE_UP && node.Status != nil && node.Status.ExternalAddress != "" {
+					launch.Log("Cluster: node %s is up", node.Id)
+					upNodes[node.Id] = true
+					cluster.Nodes[node.Id].ManagementAddress = node.Status.ExternalAddress
 				}
-				if n.State != cpb.NodeState_NODE_STATE_UP {
-					break
+				if upNodes[node.Id] {
+					continue
 				}
-				found = true
-				cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
-					ID:                identity.NodeID(n.Pubkey),
-					ManagementAddress: n.Status.ExternalAddress,
+
+				if !approvedNodes[node.Id] {
+					launch.Log("Cluster: approving node %s", node.Id)
+					_, err := mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
+						Pubkey: node.Pubkey,
+					})
+					if err != nil {
+						ctxC()
+						return nil, fmt.Errorf("ApproveNode(%s): %w", node.Id, err)
+					}
+					approvedNodes[node.Id] = true
 				}
-				cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
-				break
 			}
-			if found {
+
+			launch.Log("Cluster: want %d up nodes, have %d", opts.NumNodes-1, len(upNodes))
+			if len(upNodes) == opts.NumNodes-1 {
 				break
 			}
 			time.Sleep(time.Second)
 		}
-		launch.Log("Cluster: node %d (%s) UP!", i, id)
 	}
 
 	launch.Log("Cluster: all nodes up:")
 	for _, node := range cluster.Nodes {
 		launch.Log("Cluster:  - %s at %s", node.ID, node.ManagementAddress)
 	}
+	launch.Log("Cluster: starting tests...")
 
 	return cluster, nil
 }
@@ -1077,6 +1096,34 @@
 	return c.socksDialer.Dial("tcp", addr)
 }
 
+// GetKubeClientSet gets a Kubernetes client set accessing the Metropolis
+// Kubernetes authenticating proxy using the cluster owner identity.
+// It currently has access to everything (i.e. the cluster-admin role)
+// via the owner-admin binding.
+func (c *Cluster) GetKubeClientSet() (kubernetes.Interface, error) {
+	pkcs8Key, err := x509.MarshalPKCS8PrivateKey(c.Owner.PrivateKey)
+	if err != nil {
+		// We explicitly pass an Ed25519 private key in, so this can't happen
+		panic(err)
+	}
+
+	host := net.JoinHostPort(c.NodeIDs[0], node.KubernetesAPIWrappedPort.PortString())
+	var clientConfig = rest.Config{
+		Host: host,
+		TLSClientConfig: rest.TLSClientConfig{
+			// TODO(q3k): use CA certificate
+			Insecure:   true,
+			ServerName: "kubernetes.default.svc",
+			CertData:   pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: c.Owner.Certificate[0]}),
+			KeyData:    pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}),
+		},
+		Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
+			return c.DialNode(ctx, address)
+		},
+	}
+	return kubernetes.NewForConfig(&clientConfig)
+}
+
 // KubernetesControllerNodeAddresses returns the list of IP addresses of nodes
 // which are currently Kubernetes controllers, ie. run an apiserver. This list
 // might be empty if no node is currently configured with the
@@ -1111,6 +1158,8 @@
 	return res, nil
 }
 
+// AllNodesHealthy returns nil if all the nodes in the cluster are seemingly
+// healthy.
 func (c *Cluster) AllNodesHealthy(ctx context.Context) error {
 	// Get an authenticated owner client within the cluster.
 	curC, err := c.CuratorClient()
@@ -1135,3 +1184,71 @@
 	}
 	return fmt.Errorf("nodes unhealthy: %s", strings.Join(unhealthy, ", "))
 }
+
+// ApproveNode approves a node by ID, waiting for it to become UP.
+func (c *Cluster) ApproveNode(ctx context.Context, id string) error {
+	curC, err := c.CuratorClient()
+	if err != nil {
+		return err
+	}
+	mgmt := apb.NewManagementClient(curC)
+
+	_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{
+		Pubkey: c.Nodes[id].Pubkey,
+	})
+	if err != nil {
+		return fmt.Errorf("ApproveNode: %w", err)
+	}
+	launch.Log("Cluster: %s: approved, waiting for UP", id)
+	for {
+		nodes, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+		if err != nil {
+			return fmt.Errorf("GetNodes: %w", err)
+		}
+		found := false
+		for {
+			node, err := nodes.Recv()
+			if errors.Is(err, io.EOF) {
+				break
+			}
+			if err != nil {
+				return fmt.Errorf("Nodes.Recv: %w", err)
+			}
+			if node.Id != id {
+				continue
+			}
+			if node.State != cpb.NodeState_NODE_STATE_UP {
+				continue
+			}
+			found = true
+			break
+		}
+		nodes.CloseSend()
+
+		if found {
+			break
+		}
+		time.Sleep(time.Second)
+	}
+	launch.Log("Cluster: %s: UP", id)
+	return nil
+}
+
+// MakeKubernetesWorker adds the KubernetesWorker role to a node by ID.
+func (c *Cluster) MakeKubernetesWorker(ctx context.Context, id string) error {
+	curC, err := c.CuratorClient()
+	if err != nil {
+		return err
+	}
+	mgmt := apb.NewManagementClient(curC)
+
+	tr := true
+	launch.Log("Cluster: %s: adding KubernetesWorker", id)
+	_, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+		Node: &apb.UpdateNodeRolesRequest_Id{
+			Id: id,
+		},
+		KubernetesWorker: &tr,
+	})
+	return err
+}