m/node: use resolver for intra-cluster connections

This replaces all instances of 'just connect to the first node for now'
with usage of a proper leader-aware cluster resolver.

This isn't tested yet, as all of this will be exercised in an E2E test.

Change-Id: I9f6bfb49ff9ae0dd70ac1a3131c5ee021d9bb5a5
Reviewed-on: https://review.monogon.dev/c/monogon/+/796
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 82e7cca..23076a4 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -45,9 +45,11 @@
 	"context"
 	"crypto/ed25519"
 
+	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/node/core/rpc/resolver"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
@@ -78,20 +80,31 @@
 	heartbeat    *workerHeartbeat
 	kubernetes   *workerKubernetes
 	rolefetch    *workerRoleFetch
+
+	// resolver is the main, long-lived, authenticated cluster resolver that is used
+	// for all subsequent gRPC calls by the subordinates of the roleserver. It is
+	// created early in the roleserver lifecycle, and is seeded with node
+	// information as the first subordinate runs DialCurator().
+	resolver *resolver.Resolver
 }
 
 // New creates a Role Server services from a Config.
 func New(c Config) *Service {
-	s := &Service{
-		Config: c,
-	}
+	// Run the resolver forever in the background, making sure to keep it as
+	// long-lived as possible.
+	rctx := context.Background()
 
+	s := &Service{
+		Config:   c,
+		resolver: resolver.New(rctx),
+	}
 	s.controlPlane = &workerControlPlane{
 		storageRoot: s.StorageRoot,
 
 		bootstrapData:     &s.bootstrapData,
 		clusterMembership: &s.ClusterMembership,
 		localRoles:        &s.localRoles,
+		resolver:          s.resolver,
 	}
 
 	s.statusPush = &workerStatusPush{
@@ -126,8 +139,16 @@
 }
 
 func (s *Service) ProvideBootstrapData(privkey ed25519.PrivateKey, iok, cuk, nuk, jkey []byte) {
+	pubkey := privkey.Public().(ed25519.PublicKey)
+	nid := identity.NodeID(pubkey)
+
+	// This is the first time we have the node ID, tell the resolver that it's
+	// available on the loopback interface.
+	s.resolver.AddOverride(nid, resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
+
 	s.ClusterMembership.set(&ClusterMembership{
-		pubkey: privkey.Public().(ed25519.PublicKey),
+		pubkey:   pubkey,
+		resolver: s.resolver,
 	})
 	s.bootstrapData.set(&bootstrapData{
 		nodePrivateKey:     privkey,
@@ -139,24 +160,38 @@
 }
 
 func (s *Service) ProvideRegisterData(credentials identity.NodeCredentials, directory *cpb.ClusterDirectory) {
+	// This is the first time we have the node ID, tell the resolver that it's
+	// available on the loopback interface.
+	s.resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
+
 	s.ClusterMembership.set(&ClusterMembership{
 		remoteCurators: directory,
 		credentials:    &credentials,
 		pubkey:         credentials.PublicKey(),
+		resolver:       s.resolver,
 	})
 }
 
 func (s *Service) ProvideJoinData(credentials identity.NodeCredentials, directory *cpb.ClusterDirectory) {
+	// This is the first time we have the node ID, tell the resolver that it's
+	// available on the loopback interface.
+	s.resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
+
 	s.ClusterMembership.set(&ClusterMembership{
 		remoteCurators: directory,
 		credentials:    &credentials,
 		pubkey:         credentials.PublicKey(),
+		resolver:       s.resolver,
 	})
 }
 
 // Run the Role Server service, which uses intermediary workload launchers to
 // start/stop subordinate services as the Node's roles change.
 func (s *Service) Run(ctx context.Context) error {
+	s.resolver.SetLogger(func(f string, args ...interface{}) {
+		supervisor.Logger(ctx).WithAddedStackDepth(2).Infof(f, args...)
+	})
+
 	supervisor.Run(ctx, "controlplane", s.controlPlane.run)
 	supervisor.Run(ctx, "kubernetes", s.kubernetes.run)
 	supervisor.Run(ctx, "statuspush", s.statusPush.run)