m/n/c/{cluster,roleserve}: implement Join Flow

This implements Join Flow for:
- Registered nodes attempting to re-join the cluster.
- Nodes bootstrapping the cluster.

See: Cluster Lifecycle and Integrity design document

Change-Id: I74ab98fdec650c4f6aa59e34a16c0f95745dc0e9
Reviewed-on: https://review.monogon.dev/c/monogon/+/556
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index d51d7d9..1de24f3 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -31,20 +31,25 @@
 	"errors"
 	"fmt"
 	"io"
+	"net"
 	"net/http"
 	"os"
+	"strings"
 	"sync"
 
 	"github.com/cenkalti/backoff/v4"
 	"google.golang.org/protobuf/proto"
 
+	"source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/consensus"
+	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/node/core/roleserve"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
 	ppb "source.monogon.dev/metropolis/proto/private"
 )
 
@@ -107,7 +112,15 @@
 	configuration, err := m.storageRoot.ESP.Metropolis.SealedConfiguration.Unseal()
 	if err == nil {
 		supervisor.Logger(ctx).Info("Sealed configuration present. attempting to join cluster")
-		return m.join(ctx, configuration)
+
+		// Read Cluster Directory and unmarshal it. Since the node is already
+		// registered with the cluster, the directory won't be bootstrapped from
+		// Node Parameters.
+		cd, err := m.storageRoot.ESP.Metropolis.ClusterDirectory.Unmarshal()
+		if err != nil {
+			return fmt.Errorf("while reading cluster directory: %w", err)
+		}
+		return m.join(ctx, configuration, cd)
 	}
 
 	if !errors.Is(err, localstorage.ErrNoSealed) {
@@ -241,6 +254,30 @@
 	}
 }
 
-func (m *Manager) join(ctx context.Context, cfg *ppb.SealedConfiguration) error {
-	return fmt.Errorf("unimplemented")
+// logClusterDirectory verbosely logs the whole Cluster Directory passed to it.
+func logClusterDirectory(ctx context.Context, cd *cpb.ClusterDirectory) {
+	for _, node := range cd.Nodes {
+		id := identity.NodeID(node.PublicKey)
+		var addresses []string
+		for _, add := range node.Addresses {
+			addresses = append(addresses, add.Host)
+		}
+		supervisor.Logger(ctx).Infof("    Node ID: %s, Addresses: %s", id, strings.Join(addresses, ","))
+	}
+}
+
+// curatorRemote returns a host:port pair pointing at one of the cluster's
+// available Curator endpoints. It will return an empty string, and an error,
+// if the cluster directory is empty.
+// TODO(issues/117): use dynamic cluster client instead
+func curatorRemote(cd *cpb.ClusterDirectory) (string, error) {
+	if len(cd.Nodes) == 0 {
+		return "", fmt.Errorf("the Cluster Directory is empty.")
+	}
+	n := cd.Nodes[0]
+	if len(n.Addresses) == 0 {
+		return "", fmt.Errorf("the first node in the Cluster Directory doesn't have an associated Address.")
+	}
+	r := n.Addresses[0].Host
+	return net.JoinHostPort(r, node.CuratorServicePort.PortString()), nil
 }