m/node: use resolver for intra-cluster connections
This replaces all instances of 'just connect to the first node for now'
with usage of a proper leader-aware cluster resolver.
This isn't tested yet, as all of this will be exercised in an E2E test.
Change-Id: I9f6bfb49ff9ae0dd70ac1a3131c5ee021d9bb5a5
Reviewed-on: https://review.monogon.dev/c/monogon/+/796
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index e8c7ff2..4f0ab36 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -12,7 +12,6 @@
importpath = "source.monogon.dev/metropolis/node/core/cluster",
visibility = ["//metropolis/node/core:__subpackages__"],
deps = [
- "//metropolis/node",
"//metropolis/node/core/consensus",
"//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/identity",
@@ -20,6 +19,7 @@
"//metropolis/node/core/network",
"//metropolis/node/core/roleserve",
"//metropolis/node/core/rpc",
+ "//metropolis/node/core/rpc/resolver",
"//metropolis/pkg/event/memory",
"//metropolis/pkg/supervisor",
"//metropolis/proto/api",
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index 4df489d..d5eee03 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -31,7 +31,6 @@
"errors"
"fmt"
"io"
- "net"
"net/http"
"os"
"strings"
@@ -40,7 +39,6 @@
"github.com/cenkalti/backoff/v4"
"google.golang.org/protobuf/proto"
- "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
@@ -263,19 +261,3 @@
supervisor.Logger(ctx).Infof(" Addresses: %s", strings.Join(addresses, ","))
}
}
-
-// curatorRemote returns a host:port pair pointing at one of the cluster's
-// available Curator endpoints. It will return an empty string, and an error,
-// if the cluster directory is empty.
-// TODO(issues/117): use dynamic cluster client instead
-func curatorRemote(cd *cpb.ClusterDirectory) (string, error) {
- if len(cd.Nodes) == 0 {
- return "", fmt.Errorf("the Cluster Directory is empty.")
- }
- n := cd.Nodes[0]
- if len(n.Addresses) == 0 {
- return "", fmt.Errorf("the first node in the Cluster Directory doesn't have an associated Address.")
- }
- r := n.Addresses[0].Host
- return net.JoinHostPort(r, node.CuratorServicePort.PortString()), nil
-}
diff --git a/metropolis/node/core/cluster/cluster_join.go b/metropolis/node/core/cluster/cluster_join.go
index d2e5ad5..47f7589 100644
--- a/metropolis/node/core/cluster/cluster_join.go
+++ b/metropolis/node/core/cluster/cluster_join.go
@@ -13,6 +13,7 @@
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/node/core/rpc/resolver"
"source.monogon.dev/metropolis/pkg/supervisor"
cpb "source.monogon.dev/metropolis/proto/common"
ppb "source.monogon.dev/metropolis/proto/private"
@@ -37,18 +38,33 @@
supervisor.Logger(ctx).Infof(" Directory:")
logClusterDirectory(ctx, cd)
- // Attempt to connect to the first node in the cluster directory.
- r, err := curatorRemote(cd)
- if err != nil {
- return fmt.Errorf("while picking a Curator endpoint: %w", err)
+ // Build resolver used by the join process, authenticating with join
+ // credentials. Once the join is complete, the rolesever will start its own
+ // long-term resolver.
+ r := resolver.New(ctx)
+ r.SetLogger(func(f string, args ...interface{}) {
+ supervisor.Logger(ctx).WithAddedStackDepth(1).Infof(f, args...)
+ })
+ addedNodes := 0
+ for _, node := range cd.Nodes {
+ if len(node.Addresses) == 0 {
+ continue
+ }
+ // MVP: handle curators at non-default ports
+ r.AddEndpoint(resolver.NodeAtAddressWithDefaultPort(node.Addresses[0].Host))
+ addedNodes += 1
}
+ if addedNodes == 0 {
+ return fmt.Errorf("no remote node available, cannot join cluster")
+ }
+
ephCreds, err := rpc.NewEphemeralCredentials(jpriv, ca)
if err != nil {
return fmt.Errorf("could not create ephemeral credentials: %w", err)
}
- eph, err := grpc.Dial(r, grpc.WithTransportCredentials(ephCreds))
+ eph, err := grpc.Dial(resolver.MetropolisControlAddress, grpc.WithTransportCredentials(ephCreds), grpc.WithResolvers(r))
if err != nil {
- return fmt.Errorf("could not create ephemeral client to %q: %w", r, err)
+ return fmt.Errorf("could not dial cluster with join credentials: %w", err)
}
cur := ipb.NewCuratorClient(eph)
diff --git a/metropolis/node/core/cluster/cluster_register.go b/metropolis/node/core/cluster/cluster_register.go
index f2ccbff..9e67000 100644
--- a/metropolis/node/core/cluster/cluster_register.go
+++ b/metropolis/node/core/cluster/cluster_register.go
@@ -17,6 +17,7 @@
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/node/core/rpc/resolver"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
ppb "source.monogon.dev/metropolis/proto/private"
@@ -96,18 +97,33 @@
}
supervisor.Logger(ctx).Infof("Registering: node public key: %s", hex.EncodeToString([]byte(pub)))
- // Attempt to connect to first node in cluster directory and to call Register.
- r, err := curatorRemote(register.ClusterDirectory)
- if err != nil {
- return fmt.Errorf("while picking a Curator endpoint: %w", err)
+ // Build resolver used by the register process, authenticating with ephemeral
+ // credentials. Once the join is complete, the rolesever will start its own
+ // long-term resolver.
+ r := resolver.New(ctx)
+ r.SetLogger(func(f string, args ...interface{}) {
+ supervisor.Logger(ctx).WithAddedStackDepth(1).Infof(f, args...)
+ })
+ addedNodes := 0
+ for _, node := range register.ClusterDirectory.Nodes {
+ if len(node.Addresses) == 0 {
+ continue
+ }
+ // MVP: handle curators at non-default ports
+ r.AddEndpoint(resolver.NodeAtAddressWithDefaultPort(node.Addresses[0].Host))
+ addedNodes += 1
}
+ if addedNodes == 0 {
+ return fmt.Errorf("no remote node available, cannot register into cluster")
+ }
+
ephCreds, err := rpc.NewEphemeralCredentials(priv, ca)
if err != nil {
return fmt.Errorf("could not create ephemeral credentials: %w", err)
}
- eph, err := grpc.Dial(r, grpc.WithTransportCredentials(ephCreds))
+ eph, err := grpc.Dial(resolver.MetropolisControlAddress, grpc.WithTransportCredentials(ephCreds), grpc.WithResolvers(r))
if err != nil {
- return fmt.Errorf("could not create ephemeral client to %q: %w", r, err)
+ return fmt.Errorf("could not dial cluster with ephemeral credentials: %w", err)
}
cur := ipb.NewCuratorClient(eph)