m/node: use resolver for intra-cluster connections

This replaces all instances of 'just connect to the first node for now'
with usage of a proper leader-aware cluster resolver.

This isn't tested yet, as all of this will be exercised in an E2E test.

Change-Id: I9f6bfb49ff9ae0dd70ac1a3131c5ee021d9bb5a5
Reviewed-on: https://review.monogon.dev/c/monogon/+/796
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index e8c7ff2..4f0ab36 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -12,7 +12,6 @@
     importpath = "source.monogon.dev/metropolis/node/core/cluster",
     visibility = ["//metropolis/node/core:__subpackages__"],
     deps = [
-        "//metropolis/node",
         "//metropolis/node/core/consensus",
         "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/identity",
@@ -20,6 +19,7 @@
         "//metropolis/node/core/network",
         "//metropolis/node/core/roleserve",
         "//metropolis/node/core/rpc",
+        "//metropolis/node/core/rpc/resolver",
         "//metropolis/pkg/event/memory",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/api",
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index 4df489d..d5eee03 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -31,7 +31,6 @@
 	"errors"
 	"fmt"
 	"io"
-	"net"
 	"net/http"
 	"os"
 	"strings"
@@ -40,7 +39,6 @@
 	"github.com/cenkalti/backoff/v4"
 	"google.golang.org/protobuf/proto"
 
-	"source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/consensus"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
@@ -263,19 +261,3 @@
 		supervisor.Logger(ctx).Infof("    Addresses: %s", strings.Join(addresses, ","))
 	}
 }
-
-// curatorRemote returns a host:port pair pointing at one of the cluster's
-// available Curator endpoints. It will return an empty string, and an error,
-// if the cluster directory is empty.
-// TODO(issues/117): use dynamic cluster client instead
-func curatorRemote(cd *cpb.ClusterDirectory) (string, error) {
-	if len(cd.Nodes) == 0 {
-		return "", fmt.Errorf("the Cluster Directory is empty.")
-	}
-	n := cd.Nodes[0]
-	if len(n.Addresses) == 0 {
-		return "", fmt.Errorf("the first node in the Cluster Directory doesn't have an associated Address.")
-	}
-	r := n.Addresses[0].Host
-	return net.JoinHostPort(r, node.CuratorServicePort.PortString()), nil
-}
diff --git a/metropolis/node/core/cluster/cluster_join.go b/metropolis/node/core/cluster/cluster_join.go
index d2e5ad5..47f7589 100644
--- a/metropolis/node/core/cluster/cluster_join.go
+++ b/metropolis/node/core/cluster/cluster_join.go
@@ -13,6 +13,7 @@
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/node/core/rpc/resolver"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	cpb "source.monogon.dev/metropolis/proto/common"
 	ppb "source.monogon.dev/metropolis/proto/private"
@@ -37,18 +38,33 @@
 	supervisor.Logger(ctx).Infof("  Directory:")
 	logClusterDirectory(ctx, cd)
 
-	// Attempt to connect to the first node in the cluster directory.
-	r, err := curatorRemote(cd)
-	if err != nil {
-		return fmt.Errorf("while picking a Curator endpoint: %w", err)
+	// Build resolver used by the join process, authenticating with join
+	// credentials. Once the join is complete, the rolesever will start its own
+	// long-term resolver.
+	r := resolver.New(ctx)
+	r.SetLogger(func(f string, args ...interface{}) {
+		supervisor.Logger(ctx).WithAddedStackDepth(1).Infof(f, args...)
+	})
+	addedNodes := 0
+	for _, node := range cd.Nodes {
+		if len(node.Addresses) == 0 {
+			continue
+		}
+		// MVP: handle curators at non-default ports
+		r.AddEndpoint(resolver.NodeAtAddressWithDefaultPort(node.Addresses[0].Host))
+		addedNodes += 1
 	}
+	if addedNodes == 0 {
+		return fmt.Errorf("no remote node available, cannot join cluster")
+	}
+
 	ephCreds, err := rpc.NewEphemeralCredentials(jpriv, ca)
 	if err != nil {
 		return fmt.Errorf("could not create ephemeral credentials: %w", err)
 	}
-	eph, err := grpc.Dial(r, grpc.WithTransportCredentials(ephCreds))
+	eph, err := grpc.Dial(resolver.MetropolisControlAddress, grpc.WithTransportCredentials(ephCreds), grpc.WithResolvers(r))
 	if err != nil {
-		return fmt.Errorf("could not create ephemeral client to %q: %w", r, err)
+		return fmt.Errorf("could not dial cluster with join credentials: %w", err)
 	}
 	cur := ipb.NewCuratorClient(eph)
 
diff --git a/metropolis/node/core/cluster/cluster_register.go b/metropolis/node/core/cluster/cluster_register.go
index f2ccbff..9e67000 100644
--- a/metropolis/node/core/cluster/cluster_register.go
+++ b/metropolis/node/core/cluster/cluster_register.go
@@ -17,6 +17,7 @@
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/node/core/rpc/resolver"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
 	ppb "source.monogon.dev/metropolis/proto/private"
@@ -96,18 +97,33 @@
 	}
 	supervisor.Logger(ctx).Infof("Registering: node public key: %s", hex.EncodeToString([]byte(pub)))
 
-	// Attempt to connect to first node in cluster directory and to call Register.
-	r, err := curatorRemote(register.ClusterDirectory)
-	if err != nil {
-		return fmt.Errorf("while picking a Curator endpoint: %w", err)
+	// Build resolver used by the register process, authenticating with ephemeral
+	// credentials. Once the join is complete, the rolesever will start its own
+	// long-term resolver.
+	r := resolver.New(ctx)
+	r.SetLogger(func(f string, args ...interface{}) {
+		supervisor.Logger(ctx).WithAddedStackDepth(1).Infof(f, args...)
+	})
+	addedNodes := 0
+	for _, node := range register.ClusterDirectory.Nodes {
+		if len(node.Addresses) == 0 {
+			continue
+		}
+		// MVP: handle curators at non-default ports
+		r.AddEndpoint(resolver.NodeAtAddressWithDefaultPort(node.Addresses[0].Host))
+		addedNodes += 1
 	}
+	if addedNodes == 0 {
+		return fmt.Errorf("no remote node available, cannot register into cluster")
+	}
+
 	ephCreds, err := rpc.NewEphemeralCredentials(priv, ca)
 	if err != nil {
 		return fmt.Errorf("could not create ephemeral credentials: %w", err)
 	}
-	eph, err := grpc.Dial(r, grpc.WithTransportCredentials(ephCreds))
+	eph, err := grpc.Dial(resolver.MetropolisControlAddress, grpc.WithTransportCredentials(ephCreds), grpc.WithResolvers(r))
 	if err != nil {
-		return fmt.Errorf("could not create ephemeral client to %q: %w", r, err)
+		return fmt.Errorf("could not dial cluster with ephemeral credentials: %w", err)
 	}
 	cur := ipb.NewCuratorClient(eph)
 
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 2c9610c..5980563 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -25,6 +25,7 @@
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/network",
         "//metropolis/node/core/rpc",
+        "//metropolis/node/core/rpc/resolver",
         "//metropolis/node/kubernetes",
         "//metropolis/node/kubernetes/containerd",
         "//metropolis/node/kubernetes/pki",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 82e7cca..23076a4 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -45,9 +45,11 @@
 	"context"
 	"crypto/ed25519"
 
+	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/node/core/rpc/resolver"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
@@ -78,20 +80,31 @@
 	heartbeat    *workerHeartbeat
 	kubernetes   *workerKubernetes
 	rolefetch    *workerRoleFetch
+
+	// resolver is the main, long-lived, authenticated cluster resolver that is used
+	// for all subsequent gRPC calls by the subordinates of the roleserver. It is
+	// created early in the roleserver lifecycle, and is seeded with node
+	// information as the first subordinate runs DialCurator().
+	resolver *resolver.Resolver
 }
 
 // New creates a Role Server services from a Config.
 func New(c Config) *Service {
-	s := &Service{
-		Config: c,
-	}
+	// Run the resolver forever in the background, making sure to keep it as
+	// long-lived as possible.
+	rctx := context.Background()
 
+	s := &Service{
+		Config:   c,
+		resolver: resolver.New(rctx),
+	}
 	s.controlPlane = &workerControlPlane{
 		storageRoot: s.StorageRoot,
 
 		bootstrapData:     &s.bootstrapData,
 		clusterMembership: &s.ClusterMembership,
 		localRoles:        &s.localRoles,
+		resolver:          s.resolver,
 	}
 
 	s.statusPush = &workerStatusPush{
@@ -126,8 +139,16 @@
 }
 
 func (s *Service) ProvideBootstrapData(privkey ed25519.PrivateKey, iok, cuk, nuk, jkey []byte) {
+	pubkey := privkey.Public().(ed25519.PublicKey)
+	nid := identity.NodeID(pubkey)
+
+	// This is the first time we have the node ID, tell the resolver that it's
+	// available on the loopback interface.
+	s.resolver.AddOverride(nid, resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
+
 	s.ClusterMembership.set(&ClusterMembership{
-		pubkey: privkey.Public().(ed25519.PublicKey),
+		pubkey:   pubkey,
+		resolver: s.resolver,
 	})
 	s.bootstrapData.set(&bootstrapData{
 		nodePrivateKey:     privkey,
@@ -139,24 +160,38 @@
 }
 
 func (s *Service) ProvideRegisterData(credentials identity.NodeCredentials, directory *cpb.ClusterDirectory) {
+	// This is the first time we have the node ID, tell the resolver that it's
+	// available on the loopback interface.
+	s.resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
+
 	s.ClusterMembership.set(&ClusterMembership{
 		remoteCurators: directory,
 		credentials:    &credentials,
 		pubkey:         credentials.PublicKey(),
+		resolver:       s.resolver,
 	})
 }
 
 func (s *Service) ProvideJoinData(credentials identity.NodeCredentials, directory *cpb.ClusterDirectory) {
+	// This is the first time we have the node ID, tell the resolver that it's
+	// available on the loopback interface.
+	s.resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
+
 	s.ClusterMembership.set(&ClusterMembership{
 		remoteCurators: directory,
 		credentials:    &credentials,
 		pubkey:         credentials.PublicKey(),
+		resolver:       s.resolver,
 	})
 }
 
 // Run the Role Server service, which uses intermediary workload launchers to
 // start/stop subordinate services as the Node's roles change.
 func (s *Service) Run(ctx context.Context) error {
+	s.resolver.SetLogger(func(f string, args ...interface{}) {
+		supervisor.Logger(ctx).WithAddedStackDepth(2).Infof(f, args...)
+	})
+
 	supervisor.Run(ctx, "controlplane", s.controlPlane.run)
 	supervisor.Run(ctx, "kubernetes", s.kubernetes.run)
 	supervisor.Run(ctx, "statuspush", s.statusPush.run)
diff --git a/metropolis/node/core/roleserve/value_clustermembership.go b/metropolis/node/core/roleserve/value_clustermembership.go
index 37f67e7..f14e2c4 100644
--- a/metropolis/node/core/roleserve/value_clustermembership.go
+++ b/metropolis/node/core/roleserve/value_clustermembership.go
@@ -3,8 +3,6 @@
 import (
 	"context"
 	"crypto/ed25519"
-	"fmt"
-	"net"
 
 	"google.golang.org/grpc"
 
@@ -13,6 +11,7 @@
 	"source.monogon.dev/metropolis/node/core/curator"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/node/core/rpc/resolver"
 	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	cpb "source.monogon.dev/metropolis/proto/common"
@@ -50,6 +49,8 @@
 	credentials *identity.NodeCredentials
 	// pubkey is the public key of the local node, and is always set.
 	pubkey ed25519.PublicKey
+	// resolver will be used to dial the cluster via DialCurator().
+	resolver *resolver.Resolver
 }
 
 type ClusterMembershipValue struct {
@@ -120,26 +121,26 @@
 	}
 }
 
-// DialCurator returns an authenticated gRPC client connection to the Curator,
-// either local or remote. No load balancing will be performed across local and
-// remote curators, so if the local node starts running a local curator but old
-// connections are still used, they will continue to target only remote
-// curators. Same goes for local consensus being turned down - however, in this
-// case, calls will error out and the client can be redialed on errors.
-//
-// It is thus recommended to only use DialCurator in short-lived contexts, and
-// perform a GetHome/DialCurator process on any gRPC error. A smarter
-// load-balancing/re-dialing client will be implemented in the future.
+// DialCurator returns an authenticated gRPC client connection to the Curator
+// using the long-lived roleserver cluster resolver. RPCs will automatically be
+// forwarded to the current control plane leader, and this gRPC client
+// connection can be used long-term by callers.
 func (m *ClusterMembership) DialCurator() (*grpc.ClientConn, error) {
-	// Dial first curator.
-	// TODO(q3k): load balance
-	if m.remoteCurators == nil || len(m.remoteCurators.Nodes) < 1 {
-		return nil, fmt.Errorf("no curators available")
+	// Always make sure the resolver has fresh data about curators, both local and
+	// remote. This would be better done only when ClusterMembership is set() with
+	// new data, but that would require a bit of a refactor.
+	//
+	// TODO(q3k): take care of the above, possibly when the roleserver is made more generic.
+	if m.localConsensus != nil {
+		m.resolver.AddEndpoint(resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
 	}
-	host := m.remoteCurators.Nodes[0].Addresses[0].Host
-	addr := net.JoinHostPort(host, common.CuratorServicePort.PortString())
+	for _, n := range m.remoteCurators.Nodes {
+		for _, addr := range n.Addresses {
+			m.resolver.AddEndpoint(resolver.NodeByHostPort(addr.Host, uint16(common.CuratorServicePort)))
+		}
+	}
 	creds := rpc.NewAuthenticatedCredentials(m.credentials.TLSCredentials(), m.credentials.ClusterCA())
-	return grpc.Dial(addr, grpc.WithTransportCredentials(creds))
+	return grpc.Dial(resolver.MetropolisControlAddress, grpc.WithTransportCredentials(creds), grpc.WithResolvers(m.resolver))
 }
 
 func (m *ClusterMembership) NodePubkey() ed25519.PublicKey {
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
index 469df16..c581c3d 100644
--- a/metropolis/node/core/roleserve/worker_controlplane.go
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -15,6 +15,7 @@
 	"source.monogon.dev/metropolis/node/core/curator"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/rpc/resolver"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/pki"
 	"source.monogon.dev/metropolis/pkg/supervisor"
@@ -48,6 +49,8 @@
 	clusterMembership *ClusterMembershipValue
 	// localRoles will be read.
 	localRoles *localRolesValue
+	// resolver will be read and used to populate ClusterMembership.
+	resolver *resolver.Resolver
 }
 
 // controlPlaneStartup is used internally to provide a reduced (as in MapReduce)
@@ -427,6 +430,7 @@
 				credentials:    creds,
 				remoteCurators: directory,
 				pubkey:         creds.PublicKey(),
+				resolver:       s.resolver,
 			})
 		}
 
diff --git a/metropolis/node/core/rpc/resolver/resolver.go b/metropolis/node/core/rpc/resolver/resolver.go
index 39a0124..41fa8a1 100644
--- a/metropolis/node/core/rpc/resolver/resolver.go
+++ b/metropolis/node/core/rpc/resolver/resolver.go
@@ -118,6 +118,12 @@
 	return NodeByHostPort(id, uint16(common.CuratorServicePort)), nil
 }
 
+// NodeAtAddressWithDefaultPort returns a NodeEndpoint referencing the default
+// control plane port (the Curator port) of a node at a given address.
+func NodeAtAddressWithDefaultPort(host string) *NodeEndpoint {
+	return NodeByHostPort(host, uint16(common.CuratorServicePort))
+}
+
 // NodeByHostPort returns a NodeEndpoint for a fully specified host + port pair.
 // The host can either be a hostname or an IP address.
 func NodeByHostPort(host string, port uint16) *NodeEndpoint {