m/n/c/{cluster,roleserve}: implement Join Flow
This implements Join Flow for:
- Registered nodes attempting to re-join the cluster.
- Nodes bootstrapping the cluster.
See: Cluster Lifecycle and Integrity design document
Change-Id: I74ab98fdec650c4f6aa59e34a16c0f95745dc0e9
Reviewed-on: https://review.monogon.dev/c/monogon/+/556
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index d51d7d9..1de24f3 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -31,20 +31,25 @@
"errors"
"fmt"
"io"
+ "net"
"net/http"
"os"
+ "strings"
"sync"
"github.com/cenkalti/backoff/v4"
"google.golang.org/protobuf/proto"
+ "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus"
+ "source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/core/roleserve"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
ppb "source.monogon.dev/metropolis/proto/private"
)
@@ -107,7 +112,15 @@
configuration, err := m.storageRoot.ESP.Metropolis.SealedConfiguration.Unseal()
if err == nil {
supervisor.Logger(ctx).Info("Sealed configuration present. attempting to join cluster")
- return m.join(ctx, configuration)
+
+ // Read Cluster Directory and unmarshal it. Since the node is already
+ // registered with the cluster, the directory won't be bootstrapped from
+ // Node Parameters.
+ cd, err := m.storageRoot.ESP.Metropolis.ClusterDirectory.Unmarshal()
+ if err != nil {
+ return fmt.Errorf("while reading cluster directory: %w", err)
+ }
+ return m.join(ctx, configuration, cd)
}
if !errors.Is(err, localstorage.ErrNoSealed) {
@@ -241,6 +254,30 @@
}
}
-func (m *Manager) join(ctx context.Context, cfg *ppb.SealedConfiguration) error {
- return fmt.Errorf("unimplemented")
+// logClusterDirectory verbosely logs the whole Cluster Directory passed to it.
+func logClusterDirectory(ctx context.Context, cd *cpb.ClusterDirectory) {
+ for _, node := range cd.Nodes {
+ id := identity.NodeID(node.PublicKey)
+ var addresses []string
+ for _, add := range node.Addresses {
+ addresses = append(addresses, add.Host)
+ }
+ supervisor.Logger(ctx).Infof(" Node ID: %s, Addresses: %s", id, strings.Join(addresses, ","))
+ }
+}
+
+// curatorRemote returns a host:port pair pointing at one of the cluster's
+// available Curator endpoints. It will return an empty string, and an error,
+// if the cluster directory is empty.
+// TODO(issues/117): use dynamic cluster client instead
+func curatorRemote(cd *cpb.ClusterDirectory) (string, error) {
+ if len(cd.Nodes) == 0 {
+ return "", fmt.Errorf("the Cluster Directory is empty.")
+ }
+ n := cd.Nodes[0]
+ if len(n.Addresses) == 0 {
+ return "", fmt.Errorf("the first node in the Cluster Directory doesn't have an associated Address.")
+ }
+ r := n.Addresses[0].Host
+ return net.JoinHostPort(r, node.CuratorServicePort.PortString()), nil
}