m/node: use resolver for intra-cluster connections
This replaces all instances of 'just connect to the first node for now'
with usage of a proper leader-aware cluster resolver.
This isn't tested yet, as all of this will be exercised in an E2E test.
Change-Id: I9f6bfb49ff9ae0dd70ac1a3131c5ee021d9bb5a5
Reviewed-on: https://review.monogon.dev/c/monogon/+/796
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 2c9610c..5980563 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -25,6 +25,7 @@
"//metropolis/node/core/localstorage",
"//metropolis/node/core/network",
"//metropolis/node/core/rpc",
+ "//metropolis/node/core/rpc/resolver",
"//metropolis/node/kubernetes",
"//metropolis/node/kubernetes/containerd",
"//metropolis/node/kubernetes/pki",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 82e7cca..23076a4 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -45,9 +45,11 @@
"context"
"crypto/ed25519"
+ common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/node/core/rpc/resolver"
"source.monogon.dev/metropolis/pkg/supervisor"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -78,20 +80,31 @@
heartbeat *workerHeartbeat
kubernetes *workerKubernetes
rolefetch *workerRoleFetch
+
+ // resolver is the main, long-lived, authenticated cluster resolver that is used
+ // for all subsequent gRPC calls by the subordinates of the roleserver. It is
+ // created early in the roleserver lifecycle, and is seeded with node
+ // information as the first subordinate runs DialCurator().
+ resolver *resolver.Resolver
}
// New creates a Role Server services from a Config.
func New(c Config) *Service {
- s := &Service{
- Config: c,
- }
+ // Run the resolver forever in the background, making sure to keep it as
+ // long-lived as possible.
+ rctx := context.Background()
+ s := &Service{
+ Config: c,
+ resolver: resolver.New(rctx),
+ }
s.controlPlane = &workerControlPlane{
storageRoot: s.StorageRoot,
bootstrapData: &s.bootstrapData,
clusterMembership: &s.ClusterMembership,
localRoles: &s.localRoles,
+ resolver: s.resolver,
}
s.statusPush = &workerStatusPush{
@@ -126,8 +139,16 @@
}
func (s *Service) ProvideBootstrapData(privkey ed25519.PrivateKey, iok, cuk, nuk, jkey []byte) {
+ pubkey := privkey.Public().(ed25519.PublicKey)
+ nid := identity.NodeID(pubkey)
+
+ // This is the first time we have the node ID, tell the resolver that it's
+ // available on the loopback interface.
+ s.resolver.AddOverride(nid, resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
+
s.ClusterMembership.set(&ClusterMembership{
- pubkey: privkey.Public().(ed25519.PublicKey),
+ pubkey: pubkey,
+ resolver: s.resolver,
})
s.bootstrapData.set(&bootstrapData{
nodePrivateKey: privkey,
@@ -139,24 +160,38 @@
}
func (s *Service) ProvideRegisterData(credentials identity.NodeCredentials, directory *cpb.ClusterDirectory) {
+ // This is the first time we have the node ID, tell the resolver that it's
+ // available on the loopback interface.
+ s.resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
+
s.ClusterMembership.set(&ClusterMembership{
remoteCurators: directory,
credentials: &credentials,
pubkey: credentials.PublicKey(),
+ resolver: s.resolver,
})
}
func (s *Service) ProvideJoinData(credentials identity.NodeCredentials, directory *cpb.ClusterDirectory) {
+ // This is the first time we have the node ID, tell the resolver that it's
+ // available on the loopback interface.
+ s.resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
+
s.ClusterMembership.set(&ClusterMembership{
remoteCurators: directory,
credentials: &credentials,
pubkey: credentials.PublicKey(),
+ resolver: s.resolver,
})
}
// Run the Role Server service, which uses intermediary workload launchers to
// start/stop subordinate services as the Node's roles change.
func (s *Service) Run(ctx context.Context) error {
+ s.resolver.SetLogger(func(f string, args ...interface{}) {
+ supervisor.Logger(ctx).WithAddedStackDepth(2).Infof(f, args...)
+ })
+
supervisor.Run(ctx, "controlplane", s.controlPlane.run)
supervisor.Run(ctx, "kubernetes", s.kubernetes.run)
supervisor.Run(ctx, "statuspush", s.statusPush.run)
diff --git a/metropolis/node/core/roleserve/value_clustermembership.go b/metropolis/node/core/roleserve/value_clustermembership.go
index 37f67e7..f14e2c4 100644
--- a/metropolis/node/core/roleserve/value_clustermembership.go
+++ b/metropolis/node/core/roleserve/value_clustermembership.go
@@ -3,8 +3,6 @@
import (
"context"
"crypto/ed25519"
- "fmt"
- "net"
"google.golang.org/grpc"
@@ -13,6 +11,7 @@
"source.monogon.dev/metropolis/node/core/curator"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/node/core/rpc/resolver"
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
cpb "source.monogon.dev/metropolis/proto/common"
@@ -50,6 +49,8 @@
credentials *identity.NodeCredentials
// pubkey is the public key of the local node, and is always set.
pubkey ed25519.PublicKey
+ // resolver will be used to dial the cluster via DialCurator().
+ resolver *resolver.Resolver
}
type ClusterMembershipValue struct {
@@ -120,26 +121,26 @@
}
}
-// DialCurator returns an authenticated gRPC client connection to the Curator,
-// either local or remote. No load balancing will be performed across local and
-// remote curators, so if the local node starts running a local curator but old
-// connections are still used, they will continue to target only remote
-// curators. Same goes for local consensus being turned down - however, in this
-// case, calls will error out and the client can be redialed on errors.
-//
-// It is thus recommended to only use DialCurator in short-lived contexts, and
-// perform a GetHome/DialCurator process on any gRPC error. A smarter
-// load-balancing/re-dialing client will be implemented in the future.
+// DialCurator returns an authenticated gRPC client connection to the Curator
+// using the long-lived roleserver cluster resolver. RPCs will automatically be
+// forwarded to the current control plane leader, and this gRPC client
+// connection can be used long-term by callers.
func (m *ClusterMembership) DialCurator() (*grpc.ClientConn, error) {
- // Dial first curator.
- // TODO(q3k): load balance
- if m.remoteCurators == nil || len(m.remoteCurators.Nodes) < 1 {
- return nil, fmt.Errorf("no curators available")
+ // Always make sure the resolver has fresh data about curators, both local and
+ // remote. This would be better done only when ClusterMembership is set() with
+ // new data, but that would require a bit of a refactor.
+ //
+ // TODO(q3k): take care of the above, possibly when the roleserver is made more generic.
+ if m.localConsensus != nil {
+ m.resolver.AddEndpoint(resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
}
- host := m.remoteCurators.Nodes[0].Addresses[0].Host
- addr := net.JoinHostPort(host, common.CuratorServicePort.PortString())
+ for _, n := range m.remoteCurators.Nodes {
+ for _, addr := range n.Addresses {
+ m.resolver.AddEndpoint(resolver.NodeByHostPort(addr.Host, uint16(common.CuratorServicePort)))
+ }
+ }
creds := rpc.NewAuthenticatedCredentials(m.credentials.TLSCredentials(), m.credentials.ClusterCA())
- return grpc.Dial(addr, grpc.WithTransportCredentials(creds))
+ return grpc.Dial(resolver.MetropolisControlAddress, grpc.WithTransportCredentials(creds), grpc.WithResolvers(m.resolver))
}
func (m *ClusterMembership) NodePubkey() ed25519.PublicKey {
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
index 469df16..c581c3d 100644
--- a/metropolis/node/core/roleserve/worker_controlplane.go
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -15,6 +15,7 @@
"source.monogon.dev/metropolis/node/core/curator"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/rpc/resolver"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/pki"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -48,6 +49,8 @@
clusterMembership *ClusterMembershipValue
// localRoles will be read.
localRoles *localRolesValue
+ // resolver will be read and used to populate ClusterMembership.
+ resolver *resolver.Resolver
}
// controlPlaneStartup is used internally to provide a reduced (as in MapReduce)
@@ -427,6 +430,7 @@
credentials: creds,
remoteCurators: directory,
pubkey: creds.PublicKey(),
+ resolver: s.resolver,
})
}