m/n/core/roleserve: rework cluster membership, reuse control plane connections
This changes up roleserver internals to simplify the handling of cluster
membership state. The end goal is to allow reusing control plane gRPC
connections across different components in a node, but the refactor goes
a bit beyond that.
Ever since the introduction of the rpc resolver, we have effectively
simplifies the control plane startup problem. This is because the
resolver allows the rest of the system to dynamically switch between
different gRPC endpoints for the control plane.
What this means is that some of the existing complexity in the
roleserver (which predates the resolver) can be thrown away. Notably, we
remove the ClusterMembership structure, and replace it with two
significantly simpler structures that represent two separate facts about
he local node:
1. localControlPlane carries information about whether this node has a
locally running control plane. This is only used by the statuspusher
(to report whether the control plane is running) and by the
Kubernetes control plane.
2. curatorConnection carries the credentials, resolver and an open gRPC
connection to the control plane, and is the only roleserver
EventValue now used by the vast majority of the roleserver runnables.
The resulting code, especially inside the control plane roleserver
runnable, is now less complex, at the cost of a bit of an ugly refactor.
Change-Id: Idbe1ff2ac3bfb2d570bed040a2f78ccabb66caba
Reviewed-on: https://review.monogon.dev/c/monogon/+/1749
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 2d62ef4..ab84395 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -4,10 +4,7 @@
name = "roleserve",
srcs = [
"roleserve.go",
- "value_bootstrapdata.go",
- "value_clustermembership.go",
- "value_kubernetes.go",
- "value_node.go",
+ "values.go",
"worker_clusternet.go",
"worker_controlplane.go",
"worker_heartbeat.go",
@@ -52,7 +49,10 @@
embed = [":roleserve"],
deps = [
"//metropolis/node",
+ "//metropolis/node/core/consensus",
+ "//metropolis/node/core/curator",
"//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/rpc",
"//metropolis/pkg/supervisor",
"//metropolis/proto/common",
"@com_github_cenkalti_backoff_v4//:backoff",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 3af5319..68b9f59 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -31,12 +31,8 @@
// The Role Server also has to handle the complex bootstrap problem involved in
// simultaneously accessing the control plane (for node roles and other cluster
// data) while maintaining (possibly the only one in the cluster) control plane
-// instance. The state of of resolution of this bootstrap problem is maintained
-// within ClusterMembership, which contains critical information about the
-// control plane, like the information required to connect to a Curator (local
-// or remote). It is updated both by external processes (ie. data from the
-// Cluster Enrolment) as well as logic responsible for spawning the control
-// plane.
+// instance. This problem is resolved by using the RPC resolver package which
+// allows dynamic reconfiguration of endpoints as the cluster is running.
package roleserve
import (
@@ -70,7 +66,7 @@
// resolver is the main, long-lived, authenticated cluster resolver that is used
// for all subsequent gRPC calls by the subordinates of the roleserver. It is
// created early in the roleserver lifecycle, and is seeded with node
- // information as the first subordinate runs DialCurator().
+ // information from the ProvideXXX methods.
Resolver *resolver.Resolver
LogTree *logtree.LogTree
@@ -81,12 +77,13 @@
type Service struct {
Config
- ClusterMembership memory.Value[*ClusterMembership]
KubernetesStatus memory.Value[*KubernetesStatus]
bootstrapData memory.Value[*bootstrapData]
localRoles memory.Value[*cpb.NodeRoles]
podNetwork memory.Value[*clusternet.Prefixes]
clusterDirectorySaved memory.Value[bool]
+ localControlPlane memory.Value[*localControlPlane]
+ CuratorConnection memory.Value[*curatorConnection]
controlPlane *workerControlPlane
statusPush *workerStatusPush
@@ -106,23 +103,26 @@
s.controlPlane = &workerControlPlane{
storageRoot: s.StorageRoot,
- bootstrapData: &s.bootstrapData,
- clusterMembership: &s.ClusterMembership,
- localRoles: &s.localRoles,
- resolver: s.Resolver,
+ bootstrapData: &s.bootstrapData,
+ localRoles: &s.localRoles,
+ resolver: s.Resolver,
+
+ localControlPlane: &s.localControlPlane,
+ curatorConnection: &s.CuratorConnection,
}
s.statusPush = &workerStatusPush{
network: s.Network,
- clusterMembership: &s.ClusterMembership,
+ curatorConnection: &s.CuratorConnection,
+ localControlPlane: &s.localControlPlane,
clusterDirectorySaved: &s.clusterDirectorySaved,
}
s.heartbeat = &workerHeartbeat{
network: s.Network,
- clusterMembership: &s.ClusterMembership,
+ curatorConnection: &s.CuratorConnection,
}
s.kubernetes = &workerKubernetes{
@@ -130,27 +130,28 @@
storageRoot: s.StorageRoot,
localRoles: &s.localRoles,
- clusterMembership: &s.ClusterMembership,
+ localControlPlane: &s.localControlPlane,
+ curatorConnection: &s.CuratorConnection,
kubernetesStatus: &s.KubernetesStatus,
podNetwork: &s.podNetwork,
}
s.rolefetch = &workerRoleFetch{
- clusterMembership: &s.ClusterMembership,
+ curatorConnection: &s.CuratorConnection,
localRoles: &s.localRoles,
}
s.nodeMgmt = &workerNodeMgmt{
- clusterMembership: &s.ClusterMembership,
+ curatorConnection: &s.CuratorConnection,
logTree: s.LogTree,
}
s.clusternet = &workerClusternet{
storageRoot: s.StorageRoot,
- clusterMembership: &s.ClusterMembership,
+ curatorConnection: &s.CuratorConnection,
podNetwork: &s.podNetwork,
network: s.Network,
}
@@ -158,7 +159,7 @@
s.hostsfile = &workerHostsfile{
storageRoot: s.StorageRoot,
network: s.Network,
- clusterMembership: &s.ClusterMembership,
+ curatorConnection: &s.CuratorConnection,
clusterDirectorySaved: &s.clusterDirectorySaved,
}
@@ -174,10 +175,6 @@
s.Resolver.AddOverride(nid, resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
s.Resolver.AddEndpoint(resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
- s.ClusterMembership.Set(&ClusterMembership{
- pubkey: pubkey,
- resolver: s.Resolver,
- })
s.bootstrapData.Set(&bootstrapData{
nodePrivateKey: privkey,
initialOwnerKey: iok,
@@ -194,19 +191,16 @@
// available on the loopback interface.
s.Resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
// Also tell the resolver about all the existing nodes in the cluster we just
- // registered into.
+ // registered into. The directory passed here was used to issue the initial
+ // Register call, which means at least one of the nodes was running the control
+ // plane and thus can be used to seed the rest of the resolver.
for _, n := range directory.Nodes {
- // TODO(q3k): only add control plane nodes.
for _, addr := range n.Addresses {
- s.Resolver.AddEndpoint(resolver.NodeByHostPort(addr.Host, uint16(common.CuratorServicePort)))
+ s.Resolver.AddEndpoint(resolver.NodeAtAddressWithDefaultPort(addr.Host))
}
}
- s.ClusterMembership.Set(&ClusterMembership{
- credentials: &credentials,
- pubkey: credentials.PublicKey(),
- resolver: s.Resolver,
- })
+ s.CuratorConnection.Set(newCuratorConnection(&credentials, s.Resolver))
}
func (s *Service) ProvideJoinData(credentials identity.NodeCredentials, directory *cpb.ClusterDirectory) {
@@ -214,19 +208,16 @@
// available on the loopback interface.
s.Resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
// Also tell the resolver about all the existing nodes in the cluster we just
- // joined into.
+ // joined into. The directory passed here was used to issue the initial
+ // Join call, which means at least one of the nodes was running the control
+ // plane and thus can be used to seed the rest of the resolver.
for _, n := range directory.Nodes {
- // TODO(q3k): only add control plane nodes.
for _, addr := range n.Addresses {
- s.Resolver.AddEndpoint(resolver.NodeByHostPort(addr.Host, uint16(common.CuratorServicePort)))
+ s.Resolver.AddEndpoint(resolver.NodeAtAddressWithDefaultPort(addr.Host))
}
}
- s.ClusterMembership.Set(&ClusterMembership{
- credentials: &credentials,
- pubkey: credentials.PublicKey(),
- resolver: s.Resolver,
- })
+ s.CuratorConnection.Set(newCuratorConnection(&credentials, s.Resolver))
s.clusterDirectorySaved.Set(true)
}
diff --git a/metropolis/node/core/roleserve/value_bootstrapdata.go b/metropolis/node/core/roleserve/value_bootstrapdata.go
deleted file mode 100644
index f2ed064..0000000
--- a/metropolis/node/core/roleserve/value_bootstrapdata.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package roleserve
-
-import (
- "crypto/ed25519"
-
- "source.monogon.dev/metropolis/node/core/curator"
- cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-// bootstrapData is an internal EventValue structure which is populated by the
-// Cluster Enrolment logic via ProvideBootstrapData. It contains data needed by
-// the control plane logic to go into bootstrap mode and bring up a control
-// plane from scratch.
-type bootstrapData struct {
- nodePrivateKey ed25519.PrivateKey
- clusterUnlockKey []byte
- nodeUnlockKey []byte
- initialOwnerKey []byte
- nodePrivateJoinKey ed25519.PrivateKey
- initialClusterConfiguration *curator.Cluster
- nodeTPMUsage cpb.NodeTPMUsage
-}
diff --git a/metropolis/node/core/roleserve/value_clustermembership.go b/metropolis/node/core/roleserve/value_clustermembership.go
deleted file mode 100644
index 7801650..0000000
--- a/metropolis/node/core/roleserve/value_clustermembership.go
+++ /dev/null
@@ -1,79 +0,0 @@
-package roleserve
-
-import (
- "crypto/ed25519"
-
- "google.golang.org/grpc"
-
- "source.monogon.dev/metropolis/node/core/consensus"
- "source.monogon.dev/metropolis/node/core/curator"
- "source.monogon.dev/metropolis/node/core/identity"
- "source.monogon.dev/metropolis/node/core/rpc"
- "source.monogon.dev/metropolis/node/core/rpc/resolver"
- "source.monogon.dev/metropolis/pkg/event"
-)
-
-// ClusterMembership is an Event Value structure used to keep state of the
-// membership of this node in a cluster, the location of a working Curator API
-// (local or remote) and the state of a locally running control plane.
-//
-// This amalgam of seemingly unrelated data is all required to have a single
-// structure that can answer a seemingly trivial question: “Who am I and how do
-// I contact a Curator?”.
-//
-// This structure is available to roleserver-internal workers (eg. the Kubernetes
-// Worker Launcher and Updater) and to external code (eg. the Hostsfile
-// service). It is also deeply intertwined with the Control Plane Worker which
-// not only populates it when a Control Plane (and thus Curator) gets started,
-// but also accesses it to pass over information about already known remote
-// curators and to get the local node's identity.
-type ClusterMembership struct {
- // localConsensus and localCurator are set by the Control Plane Worker when this
- // node runs control plane services.
- localConsensus *consensus.Service
- localCurator *curator.Service
- // credentials is set whenever this node has full access to the Cluster and is
- // the a of credentials which can be used to perform authenticated (as the node)
- // access to the Curator.
- credentials *identity.NodeCredentials
- // pubkey is the public key of the local node, and is always set.
- pubkey ed25519.PublicKey
- // resolver will be used to dial the cluster via DialCurator().
- resolver *resolver.Resolver
-}
-
-// FilterHome returns a ClusterMembership whenever the local node is HOME to a
-// cluster (ie. whenever the node is fully a member of a cluster and can dial
-// the cluster's Curator). See proto.common.ClusterState for more information
-// about cluster states. The watcher will then block all futher Get calls until
-// new information is available.
-func FilterHome() event.GetOption[*ClusterMembership] {
- return event.Filter(func(cm *ClusterMembership) bool {
- if cm.credentials == nil {
- return false
- }
- return true
- })
-}
-
-// DialCurator returns an authenticated gRPC client connection to the Curator
-// using the long-lived roleserver cluster resolver. RPCs will automatically be
-// forwarded to the current control plane leader, and this gRPC client
-// connection can be used long-term by callers.
-func (m *ClusterMembership) DialCurator() (*grpc.ClientConn, error) {
- creds := rpc.NewAuthenticatedCredentials(m.credentials.TLSCredentials(), rpc.WantRemoteCluster(m.credentials.ClusterCA()))
- return grpc.Dial(resolver.MetropolisControlAddress, grpc.WithTransportCredentials(creds), grpc.WithResolvers(m.resolver))
-}
-
-func (m *ClusterMembership) NodePubkey() ed25519.PublicKey {
- if m.pubkey == nil {
- // This shouldn't happen - it means a user got this structure too early or
- // constructed it from scratch.
- panic("node pubkey not available")
- }
- return m.pubkey
-}
-
-func (m *ClusterMembership) NodeID() string {
- return identity.NodeID(m.NodePubkey())
-}
diff --git a/metropolis/node/core/roleserve/value_kubernetes.go b/metropolis/node/core/roleserve/value_kubernetes.go
deleted file mode 100644
index 68580e7..0000000
--- a/metropolis/node/core/roleserve/value_kubernetes.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package roleserve
-
-import (
- "source.monogon.dev/metropolis/node/kubernetes"
-)
-
-// KubernetesStatus is an Event Value structure populated by a running
-// Kubernetes instance. It allows external services to access the Kubernetes
-// Service whenever available (ie. enabled and started by the Role Server).
-type KubernetesStatus struct {
- Controller *kubernetes.Controller
-}
diff --git a/metropolis/node/core/roleserve/value_node.go b/metropolis/node/core/roleserve/value_node.go
deleted file mode 100644
index 26050ee..0000000
--- a/metropolis/node/core/roleserve/value_node.go
+++ /dev/null
@@ -1 +0,0 @@
-package roleserve
diff --git a/metropolis/node/core/roleserve/values.go b/metropolis/node/core/roleserve/values.go
new file mode 100644
index 0000000..3b7ff25
--- /dev/null
+++ b/metropolis/node/core/roleserve/values.go
@@ -0,0 +1,92 @@
+package roleserve
+
+import (
+ "crypto/ed25519"
+
+ "google.golang.org/grpc"
+
+ "source.monogon.dev/metropolis/node/core/consensus"
+ "source.monogon.dev/metropolis/node/core/curator"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/node/core/rpc/resolver"
+ "source.monogon.dev/metropolis/node/kubernetes"
+
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// bootstrapData is an internal EventValue structure which is populated by the
+// Cluster Enrolment logic via ProvideBootstrapData. It contains data needed by
+// the control plane logic to go into bootstrap mode and bring up a control
+// plane from scratch.
+type bootstrapData struct {
+ nodePrivateKey ed25519.PrivateKey
+ clusterUnlockKey []byte
+ nodeUnlockKey []byte
+ initialOwnerKey []byte
+ nodePrivateJoinKey ed25519.PrivateKey
+ initialClusterConfiguration *curator.Cluster
+ nodeTPMUsage cpb.NodeTPMUsage
+}
+
+// localControlPlane is an internal EventValue structure which carries
+// information about whether the node has a locally running consensus and curator
+// service. When it does, the structure pointer inside the EventValue will be
+// non-nil and its consensus and curator members will also be non-nil. If it
+// doesn't, either the pointer inside the EventValue will be nil, or will carry
+// nil pointers. Because of this, it is recommended to use the exists() method to
+// check for consensus/curator presence.
+type localControlPlane struct {
+ consensus *consensus.Service
+ curator *curator.Service
+}
+
+func (l *localControlPlane) exists() bool {
+ if l == nil {
+ return false
+ }
+ if l.consensus == nil || l.curator == nil {
+ return false
+ }
+ return true
+}
+
+// CuratorConnection carries information about the node having successfully
+// established connectivity to its cluster's control plane.
+//
+// It carries inside it a single gRPC client connection which is built using the
+// main roleserver resolver. This connection will automatically use any available
+// curator, whether running locally or remotely.
+//
+// This structure should also be used by roleserver runnables that simply wish to
+// access the node's credentials.
+type curatorConnection struct {
+ credentials *identity.NodeCredentials
+ resolver *resolver.Resolver
+ conn *grpc.ClientConn
+}
+
+func newCuratorConnection(creds *identity.NodeCredentials, res *resolver.Resolver) *curatorConnection {
+ c := rpc.NewAuthenticatedCredentials(creds.TLSCredentials(), rpc.WantRemoteCluster(creds.ClusterCA()))
+ conn, err := grpc.Dial(resolver.MetropolisControlAddress, grpc.WithTransportCredentials(c), grpc.WithResolvers(res))
+ if err != nil {
+ // TOOD(q3k): triple check that Dial will not fail
+ panic(err)
+ }
+ return &curatorConnection{
+ credentials: creds,
+ resolver: res,
+ conn: conn,
+ }
+}
+
+func (c *curatorConnection) nodeID() string {
+ return identity.NodeID(c.credentials.PublicKey())
+}
+
+// KubernetesStatus is an Event Value structure populated by a running
+// Kubernetes instance. It allows external services to access the Kubernetes
+// Service whenever available (ie. enabled and started by the Role Server).
+type KubernetesStatus struct {
+ Controller *kubernetes.Controller
+}
diff --git a/metropolis/node/core/roleserve/worker_clusternet.go b/metropolis/node/core/roleserve/worker_clusternet.go
index 7dfeba6..2e56098 100644
--- a/metropolis/node/core/roleserve/worker_clusternet.go
+++ b/metropolis/node/core/roleserve/worker_clusternet.go
@@ -16,29 +16,23 @@
type workerClusternet struct {
storageRoot *localstorage.Root
- // clusterMembership will be read.
- clusterMembership *memory.Value[*ClusterMembership]
+ // curatorConnection will be read
+ curatorConnection *memory.Value[*curatorConnection]
// podNetwork will be read.
podNetwork *memory.Value[*clusternet.Prefixes]
network *network.Service
}
func (s *workerClusternet) run(ctx context.Context) error {
- w := s.clusterMembership.Watch()
+ w := s.curatorConnection.Watch()
defer w.Close()
- supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := w.Get(ctx, FilterHome())
+ supervisor.Logger(ctx).Infof("Waiting for curator connection...")
+ cc, err := w.Get(ctx)
if err != nil {
return err
}
- supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
-
- conn, err := cm.DialCurator()
- if err != nil {
- return err
- }
- defer conn.Close()
- cur := ipb.NewCuratorClient(conn)
+ supervisor.Logger(ctx).Infof("Got curator connection, starting...")
+ cur := ipb.NewCuratorClient(cc.conn)
svc := clusternet.Service{
Curator: cur,
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
index eb85f5d..a49343f 100644
--- a/metropolis/node/core/roleserve/worker_controlplane.go
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -31,22 +31,25 @@
// which are REGISTERing into the cluster, as well as already running nodes that
// have been assigned the role.
//
-// In either case, ClusterMembership will be updated to allow connecting to the
-// newly locally running control plane. For nodes that are bootstrapping the
-// cluster, this will be the fist time the rest of the node can reach the
-// Curator. For other cases, this will be the new, preferred way to reach
-// consensus, without having to rely on external Control Plane nodes.
+// In either case, localControlPlane will be updated to allow direct access to
+// the now locally running control plane. For bootstrapping node,
+// curatorConnection is also populated, as that is now the first time the rest of
+// the node services can reach the newly minted cluster control plane.
type workerControlPlane struct {
storageRoot *localstorage.Root
// bootstrapData will be read.
bootstrapData *memory.Value[*bootstrapData]
- // clusterMembership will be read and written.
- clusterMembership *memory.Value[*ClusterMembership]
// localRoles will be read.
localRoles *memory.Value[*cpb.NodeRoles]
- // resolver will be read and used to populate ClusterMembership.
+ // resolver will be read and used to populate curatorConnection when
+ // bootstrapping consensus.
resolver *resolver.Resolver
+
+ // localControlPlane will be written.
+ localControlPlane *memory.Value[*localControlPlane]
+ // curatorConnection will be written.
+ curatorConnection *memory.Value[*curatorConnection]
}
// controlPlaneStartup is used internally to provide a reduced (as in MapReduce)
@@ -59,19 +62,14 @@
// bootstrap is set if this node should bootstrap consensus. It contains all
// data required to perform this bootstrap step.
bootstrap *bootstrapData
-
- // existingMembership is the ClusterMembership that the node already had
- // available before deciding to run the Control Plane. This will be used to
- // carry over existing data from the membership into the new membership as
- // affected by starting the control plane.
- existingMembership *ClusterMembership
+ existing *curatorConnection
}
// changed informs the Control Plane launcher whether two different
// controlPlaneStartups differ to the point where a restart of the control plane
// should happen.
//
-// Currently this is only true when a node switches to/from having a Control
+// Currently, this is only true when a node switches to/from having a Control
// Plane role.
func (c *controlPlaneStartup) changed(o *controlPlaneStartup) bool {
hasConsensusA := c.consensusConfig != nil
@@ -90,7 +88,7 @@
//
// bootstrapData -M-> bootstrapDataC ------.
// |
- // ClusterMambership -M-> clusterMembershipC --R---> startupV
+ // curatorConnection -M-> curatorConnectionC --R---> startupV
// |
// NodeRoles -M-> rolesC --------------'
//
@@ -100,28 +98,28 @@
// which is okay as long as the entire tree restarts simultaneously (which we
// ensure via RunGroup).
bootstrapDataC := make(chan *bootstrapData)
- clusterMembershipC := make(chan *ClusterMembership)
+ curatorConnectionC := make(chan *curatorConnection)
rolesC := make(chan *cpb.NodeRoles)
supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
// Plain conversion from Event Value to channel.
"map-bootstrap-data": event.Pipe[*bootstrapData](s.bootstrapData, bootstrapDataC),
// Plain conversion from Event Value to channel.
- "map-cluster-membership": event.Pipe[*ClusterMembership](s.clusterMembership, clusterMembershipC, FilterHome()),
+ "map-curator-connection": event.Pipe[*curatorConnection](s.curatorConnection, curatorConnectionC),
// Plain conversion from Event Value to channel.
"map-roles": event.Pipe[*cpb.NodeRoles](s.localRoles, rolesC),
- // Provide config from clusterMembership and roles.
+ // Provide config from above.
"reduce-config": func(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
var lr *cpb.NodeRoles
- var cm *ClusterMembership
+ var cc *curatorConnection
var bd *bootstrapData
for {
select {
case <-ctx.Done():
return ctx.Err()
case lr = <-rolesC:
- case cm = <-clusterMembershipC:
+ case cc = <-curatorConnectionC:
case bd = <-bootstrapDataC:
}
@@ -138,7 +136,7 @@
//
// The only problem is when we remove a ConsensusMember from a node which still
// has BootstrapData lingering from first bootup. However, we currently do not
- // support removing consensus roles (or any roles for that matter).
+ // support removing consensus roles.
//
// TODO(q3k): support the above edge case. This can be done, for example, by
// rewriting the reduction to wait for all data to be available and by
@@ -158,7 +156,7 @@
}
// Otherwise, try to interpret node roles if available.
- if lr != nil && cm != nil {
+ if lr != nil && cc != nil {
supervisor.Logger(ctx).Infof("Using role assigned by cluster...")
role := lr.ConsensusMember
if role == nil {
@@ -202,7 +200,7 @@
consensusConfig: &consensus.Config{
Data: &s.storageRoot.Data.Etcd,
Ephemeral: &s.storageRoot.Ephemeral.Consensus,
- NodePrivateKey: cm.credentials.TLSCredentials().PrivateKey.(ed25519.PrivateKey),
+ NodePrivateKey: cc.credentials.TLSCredentials().PrivateKey.(ed25519.PrivateKey),
JoinCluster: &consensus.JoinCluster{
CACertificate: caCert,
NodeCertificate: peerCert,
@@ -213,7 +211,7 @@
ExistingNodes: nodes,
},
},
- existingMembership: cm,
+ existing: cc,
})
}
}
@@ -236,6 +234,7 @@
// Start Control Plane if we have a config.
if startup.consensusConfig == nil {
supervisor.Logger(ctx).Infof("No consensus config, not starting up control plane.")
+ s.localControlPlane.Set(nil)
} else {
supervisor.Logger(ctx).Infof("Got config, starting consensus and curator...")
@@ -247,9 +246,8 @@
}
// Prepare curator config, notably performing a bootstrap step if necessary. The
- // preparation will result in a set of node credentials to run the curator with
- // nd a previously used cluster directory to be passed over to the new
- // ClusterMembership, if any.
+ // preparation will result in a set of node credentials that will be used to
+ // fully continue bringing up this node.
var creds *identity.NodeCredentials
var caCert []byte
if b := startup.bootstrap; b != nil {
@@ -316,17 +314,17 @@
// First, run a few assertions. This should never happen with the Map/Reduce
// logic above, ideally we would encode this in the type system.
- if startup.existingMembership == nil {
- panic("no existingMembership but not bootstrapping either")
+ if startup.existing == nil {
+ panic("no existing curator connection but not bootstrapping either")
}
- if startup.existingMembership.credentials == nil {
- panic("no existingMembership.credentials but not bootstrapping either")
+ if startup.existing.credentials == nil {
+ panic("no existing.credentials but not bootstrapping either")
}
// Use already existing credentials, and pass over already known curators (as
// we're not the only node, and we'd like downstream consumers to be able to
// keep connecting to existing curators in case the local one fails).
- creds = startup.existingMembership.credentials
+ creds = startup.existing.credentials
}
// Start curator.
@@ -340,24 +338,17 @@
}
supervisor.Signal(ctx, supervisor.SignalHealthy)
- supervisor.Logger(ctx).Infof("Control plane running, submitting clusterMembership.")
+ supervisor.Logger(ctx).Infof("Control plane running, submitting localControlPlane.")
- // We now have a locally running ControlPlane. Reflect that in a new
- // ClusterMembership.
- s.clusterMembership.Set(&ClusterMembership{
- localConsensus: con,
- localCurator: cur,
- credentials: creds,
- pubkey: creds.PublicKey(),
- resolver: s.resolver,
- })
+ s.localControlPlane.Set(&localControlPlane{consensus: con, curator: cur})
+ if startup.bootstrap != nil {
+ // Feed curatorConnection if bootstrapping to continue the node bringup.
+ s.curatorConnection.Set(newCuratorConnection(creds, s.resolver))
+ }
}
- // Restart everything if we get a significantly different config (ie., a config
+ // Restart everything if we get a significantly different config (i.e. a config
// whose change would/should either turn up or tear down the Control Plane).
- //
- // Not restarting on every single change prevents us from going in a
- // ClusterMembership -> ClusterDirectory -> ClusterMembership thrashing loop.
for {
nc, err := w.Get(ctx)
if err != nil {
diff --git a/metropolis/node/core/roleserve/worker_heartbeat.go b/metropolis/node/core/roleserve/worker_heartbeat.go
index fdaa9be..f79951c 100644
--- a/metropolis/node/core/roleserve/worker_heartbeat.go
+++ b/metropolis/node/core/roleserve/worker_heartbeat.go
@@ -18,29 +18,23 @@
type workerHeartbeat struct {
network *network.Service
- // clusterMembership will be read.
- clusterMembership *memory.Value[*ClusterMembership]
+ // curatorConnection will be read.
+ curatorConnection *memory.Value[*curatorConnection]
}
func (s *workerHeartbeat) run(ctx context.Context) error {
nw := s.network.Watch()
defer nw.Close()
- w := s.clusterMembership.Watch()
+ w := s.curatorConnection.Watch()
defer w.Close()
- supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := w.Get(ctx, FilterHome())
+ supervisor.Logger(ctx).Infof("Waiting for curator connection...")
+ cc, err := w.Get(ctx)
if err != nil {
return err
}
- supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
-
- conn, err := cm.DialCurator()
- if err != nil {
- return err
- }
- defer conn.Close()
- cur := ipb.NewCuratorClient(conn)
+ supervisor.Logger(ctx).Infof("Got curator connection, starting...")
+ cur := ipb.NewCuratorClient(cc.conn)
stream, err := cur.Heartbeat(ctx)
if err != nil {
diff --git a/metropolis/node/core/roleserve/worker_hostsfile.go b/metropolis/node/core/roleserve/worker_hostsfile.go
index 6cc35cc..c454ff2 100644
--- a/metropolis/node/core/roleserve/worker_hostsfile.go
+++ b/metropolis/node/core/roleserve/worker_hostsfile.go
@@ -19,10 +19,8 @@
storageRoot *localstorage.Root
// network will be read. It provides data about the local node's address.
- network *network.Service
- // clusterMembership will be read. It provides data about the node identity but
- // also provides access to the rest of the cluster's data via the Curator API.
- clusterMembership *memory.Value[*ClusterMembership]
+ network *network.Service
+ curatorConnection *memory.Value[*curatorConnection]
// clusterDirectorySaved will be written. A value of true indicates that the
// cluster directory has been successfully written at least once to the ESP.
@@ -30,29 +28,23 @@
}
func (s *workerHostsfile) run(ctx context.Context) error {
- w := s.clusterMembership.Watch()
+ w := s.curatorConnection.Watch()
defer w.Close()
- supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := w.Get(ctx, FilterHome())
+ supervisor.Logger(ctx).Infof("Waiting for curator connection...")
+ cc, err := w.Get(ctx)
if err != nil {
return err
}
supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
- // TODO(issues/193): stop dialing the curator everywhere.
- conn, err := cm.DialCurator()
- if err != nil {
- return err
- }
- defer conn.Close()
- cur := ipb.NewCuratorClient(conn)
+ cur := ipb.NewCuratorClient(cc.conn)
svc := hostsfile.Service{
Config: hostsfile.Config{
Network: s.network,
Ephemeral: &s.storageRoot.Ephemeral,
ESP: &s.storageRoot.ESP,
- NodeID: cm.NodeID(),
+ NodeID: cc.nodeID(),
Curator: cur,
ClusterDirectorySaved: s.clusterDirectorySaved,
},
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index d8c1b1f..14781e3 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -6,6 +6,7 @@
"net"
"source.monogon.dev/metropolis/node/core/clusternet"
+ "source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/kubernetes"
@@ -31,7 +32,8 @@
storageRoot *localstorage.Root
localRoles *memory.Value[*cpb.NodeRoles]
- clusterMembership *memory.Value[*ClusterMembership]
+ localControlPlane *memory.Value[*localControlPlane]
+ curatorConnection *memory.Value[*curatorConnection]
kubernetesStatus *memory.Value[*KubernetesStatus]
podNetwork *memory.Value[*clusternet.Prefixes]
}
@@ -40,8 +42,10 @@
// reduced) datum for the main Kubernetes launcher responsible for starting the
// service, if at all.
type kubernetesStartup struct {
- roles *cpb.NodeRoles
- membership *ClusterMembership
+ roles *cpb.NodeRoles
+ lcp *localControlPlane
+ curator ipb.CuratorClient
+ node *identity.Node
}
// changed informs the Kubernetes launcher whether two different
@@ -68,43 +72,39 @@
}
func (s *workerKubernetes) run(ctx context.Context) error {
- // TODO(q3k): stop depending on local consensus, split up k8s into control plane
- // and workers.
-
- // Map/Reduce a *kubernetesStartup from different data sources. This will then
- // populate an Event Value that the actual launcher will use to start
- // Kubernetes.
- //
- // ClusterMambership -M-> clusterMembershipC --R---> startupV
- // |
- // NodeRoles -M-> rolesC --------------'
- //
var startupV memory.Value[*kubernetesStartup]
- clusterMembershipC := make(chan *ClusterMembership)
+ localControlPlaneC := make(chan *localControlPlane)
+ curatorConnectionC := make(chan *curatorConnection)
rolesC := make(chan *cpb.NodeRoles)
supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
// Plain conversion from Event Value to channel.
- "map-cluster-membership": event.Pipe[*ClusterMembership](s.clusterMembership, clusterMembershipC, FilterHome()),
+ "map-local-control-plane": event.Pipe[*localControlPlane](s.localControlPlane, localControlPlaneC),
+ // Plain conversion from Event Value to channel.
+ "map-curator-connection": event.Pipe[*curatorConnection](s.curatorConnection, curatorConnectionC),
// Plain conversion from Event Value to channel.
"map-roles": event.Pipe[*cpb.NodeRoles](s.localRoles, rolesC),
- // Provide config from clusterMembership and roles.
+ // Provide config from the above.
"reduce-config": func(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
var lr *cpb.NodeRoles
- var cm *ClusterMembership
+ var lcp *localControlPlane
+ var cc *curatorConnection
for {
select {
case <-ctx.Done():
return ctx.Err()
case lr = <-rolesC:
- case cm = <-clusterMembershipC:
+ case lcp = <-localControlPlaneC:
+ case cc = <-curatorConnectionC:
}
- if lr != nil && cm != nil {
+ if lr != nil && cc != nil {
startupV.Set(&kubernetesStartup{
- roles: lr,
- membership: cm,
+ roles: lr,
+ lcp: lcp,
+ node: &cc.credentials.Node,
+ curator: ipb.NewCuratorClient(cc.conn),
})
}
}
@@ -146,23 +146,22 @@
supervisor.Logger(ctx).Infof("No Kubernetes controller role, not starting.")
continue
}
- if d.membership.localConsensus == nil {
+ if !d.lcp.exists() {
supervisor.Logger(ctx).Warningf("No local consensus, cannot start.")
continue
}
break
}
- supervisor.Logger(ctx).Infof("Waiting for local consensus...")
- pki, err := kpki.FromLocalConsensus(ctx, d.membership.localConsensus)
+ pki, err := kpki.FromLocalConsensus(ctx, d.lcp.consensus)
if err != nil {
return fmt.Errorf("getting kubernetes PKI client: %w", err)
}
- supervisor.Logger(ctx).Infof("Got data, starting Kubernetes...")
+ supervisor.Logger(ctx).Infof("Starting Kubernetes controller...")
controller := kubernetes.NewController(kubernetes.ConfigController{
- Node: &d.membership.credentials.Node,
+ Node: d.node,
ServiceIPRange: serviceIPRange,
ClusterNet: clusterIPRange,
ClusterDomain: clusterDomain,
@@ -218,12 +217,6 @@
break
}
- cur, err := d.membership.DialCurator()
- if err != nil {
- return fmt.Errorf("could not dial curator: %w", err)
- }
- ccli := ipb.NewCuratorClient(cur)
-
// Start containerd.
containerdSvc := &containerd.Service{
EphemeralVolume: &s.storageRoot.Ephemeral.Containerd,
@@ -239,8 +232,8 @@
Root: s.storageRoot,
Network: s.network,
- NodeID: d.membership.NodeID(),
- CuratorClient: ccli,
+ NodeID: d.node.ID(),
+ CuratorClient: d.curator,
PodNetwork: s.podNetwork,
})
// Start Kubernetes.
diff --git a/metropolis/node/core/roleserve/worker_nodemgmt.go b/metropolis/node/core/roleserve/worker_nodemgmt.go
index 9ffb112..7516f2d 100644
--- a/metropolis/node/core/roleserve/worker_nodemgmt.go
+++ b/metropolis/node/core/roleserve/worker_nodemgmt.go
@@ -10,22 +10,22 @@
)
type workerNodeMgmt struct {
- clusterMembership *memory.Value[*ClusterMembership]
+ curatorConnection *memory.Value[*curatorConnection]
logTree *logtree.LogTree
}
func (s *workerNodeMgmt) run(ctx context.Context) error {
- w := s.clusterMembership.Watch()
+ w := s.curatorConnection.Watch()
defer w.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := w.Get(ctx, FilterHome())
+ cc, err := w.Get(ctx)
if err != nil {
return err
}
supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
srv := mgmt.Service{
- NodeCredentials: cm.credentials,
+ NodeCredentials: cc.credentials,
LogTree: s.logTree,
}
return srv.Run(ctx)
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
index 5f64676..fb1fb33 100644
--- a/metropolis/node/core/roleserve/worker_rolefetch.go
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -11,32 +11,27 @@
)
// workerRoleFetch is the Role Fetcher, an internal bookkeeping service
-// responsible for populating localRoles based on a clusterMembership whenever
+// responsible for populating localRoles based on a curatorConnection whenever
// the node is HOME and cluster credentials / curator access is available.
type workerRoleFetch struct {
- clusterMembership *memory.Value[*ClusterMembership]
+ curatorConnection *memory.Value[*curatorConnection]
// localRoles will be written.
localRoles *memory.Value[*cpb.NodeRoles]
}
func (s *workerRoleFetch) run(ctx context.Context) error {
- w := s.clusterMembership.Watch()
+ w := s.curatorConnection.Watch()
defer w.Close()
- supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := w.Get(ctx, FilterHome())
+ supervisor.Logger(ctx).Infof("Waiting for curator connection...")
+ cc, err := w.Get(ctx)
if err != nil {
return err
}
- supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+ supervisor.Logger(ctx).Infof("Got curator connection, starting...")
- nodeID := cm.NodeID()
- conn, err := cm.DialCurator()
- if err != nil {
- return err
- }
- defer conn.Close()
- cur := ipb.NewCuratorClient(conn)
+ nodeID := cc.nodeID()
+ cur := ipb.NewCuratorClient(cc.conn)
// Start watch for current node, update localRoles whenever we get something
// new.
diff --git a/metropolis/node/core/roleserve/worker_statuspush.go b/metropolis/node/core/roleserve/worker_statuspush.go
index 4cea7c9..c27605f 100644
--- a/metropolis/node/core/roleserve/worker_statuspush.go
+++ b/metropolis/node/core/roleserve/worker_statuspush.go
@@ -4,7 +4,6 @@
"context"
"fmt"
- "google.golang.org/grpc"
"google.golang.org/protobuf/encoding/prototext"
common "source.monogon.dev/metropolis/node"
@@ -22,8 +21,10 @@
type workerStatusPush struct {
network *network.Service
- // clusterMembership will be read.
- clusterMembership *memory.Value[*ClusterMembership]
+ // localControlPlane will be read
+ localControlPlane *memory.Value[*localControlPlane]
+ // curatorConnection will be read.
+ curatorConnection *memory.Value[*curatorConnection]
// clusterDirectorySaved will be read.
clusterDirectorySaved *memory.Value[bool]
}
@@ -32,15 +33,9 @@
// 'map' runnables (waiting on Event Values) and the main loop.
type workerStatusPushChannels struct {
// address of the node, or empty if none. Retrieved from network service.
- address chan string
- // nodeID of this node. Populated whenever available from ClusterMembership.
- nodeID chan string
- // curatorClient connecting to the cluster, populated whenever available from
- // ClusterMembership. Used to actually submit the update.
- curatorClient chan ipb.CuratorClient
- // localCurator describing whether this node has a locally running curator on
- // the default port. Retrieved from ClusterMembership.
- localCurator chan bool
+ address chan string
+ localControlPlane chan *localControlPlane
+ curatorConnection chan *curatorConnection
}
// workerStatusPushLoop runs the main loop acting on data received from
@@ -64,25 +59,28 @@
changed = true
}
- case newNodeID := <-chans.nodeID:
+ case curCon := <-chans.curatorConnection:
+ newNodeID := curCon.nodeID()
if nodeID != newNodeID {
supervisor.Logger(ctx).Infof("Got new NodeID: %s", newNodeID)
nodeID = newNodeID
changed = true
}
+ if cur == nil {
+ cur = ipb.NewCuratorClient(curCon.conn)
+ supervisor.Logger(ctx).Infof("Got curator connection.")
+ changed = true
+ }
- case cur = <-chans.curatorClient:
- supervisor.Logger(ctx).Infof("Got curator connection.")
-
- case localCurator := <-chans.localCurator:
- if status.RunningCurator == nil && localCurator {
+ case lcp := <-chans.localControlPlane:
+ if status.RunningCurator == nil && lcp.exists() {
supervisor.Logger(ctx).Infof("Got new local curator state: running")
status.RunningCurator = &cpb.NodeStatus_RunningCurator{
Port: int32(common.CuratorServicePort),
}
changed = true
}
- if status.RunningCurator != nil && !localCurator {
+ if status.RunningCurator != nil && !lcp.exists() {
supervisor.Logger(ctx).Infof("Got new local curator state: not running")
status.RunningCurator = nil
changed = true
@@ -105,10 +103,9 @@
func (s *workerStatusPush) run(ctx context.Context) error {
chans := workerStatusPushChannels{
- address: make(chan string),
- nodeID: make(chan string),
- curatorClient: make(chan ipb.CuratorClient),
- localCurator: make(chan bool),
+ address: make(chan string),
+ curatorConnection: make(chan *curatorConnection),
+ localControlPlane: make(chan *localControlPlane),
}
// All the channel sends in the map runnables are preemptible by a context
@@ -133,59 +130,8 @@
}
}
})
- supervisor.Run(ctx, "map-cluster-membership", func(ctx context.Context) error {
- supervisor.Signal(ctx, supervisor.SignalHealthy)
-
- // Do not submit heartbeat until we've got the cluster directory saved.
- //
- // TODO(q3k): add a node status field for this instead.
- supervisor.Logger(ctx).Infof("Waiting for cluster directory to be saved...")
- cdw := s.clusterDirectorySaved.Watch()
- _, err := cdw.Get(ctx, event.Filter(func(t bool) bool { return t }))
- if err != nil {
- return fmt.Errorf("getting cluster directory state failed: %w", err)
- }
-
- var conn *grpc.ClientConn
- defer func() {
- if conn != nil {
- conn.Close()
- }
- }()
-
- w := s.clusterMembership.Watch()
- defer w.Close()
- supervisor.Logger(ctx).Infof("Cluster directory saved, waiting for cluster membership...")
- for {
- cm, err := w.Get(ctx, FilterHome())
- if err != nil {
- return fmt.Errorf("getting cluster membership status failed: %w", err)
- }
-
- if conn == nil {
- conn, err = cm.DialCurator()
- if err != nil {
- return fmt.Errorf("when attempting to connect to curator: %w", err)
- }
- select {
- case chans.curatorClient <- ipb.NewCuratorClient(conn):
- case <-ctx.Done():
- return ctx.Err()
- }
- }
-
- select {
- case chans.localCurator <- cm.localCurator != nil:
- case <-ctx.Done():
- return ctx.Err()
- }
- select {
- case chans.nodeID <- cm.NodeID():
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- })
+ supervisor.Run(ctx, "pipe-local-control-plane", event.Pipe[*localControlPlane](s.localControlPlane, chans.localControlPlane))
+ supervisor.Run(ctx, "pipe-curator-connection", event.Pipe[*curatorConnection](s.curatorConnection, chans.curatorConnection))
supervisor.Signal(ctx, supervisor.SignalHealthy)
return workerStatusPushLoop(ctx, &chans)
diff --git a/metropolis/node/core/roleserve/worker_statuspush_test.go b/metropolis/node/core/roleserve/worker_statuspush_test.go
index a285c77..d8a0a8c 100644
--- a/metropolis/node/core/roleserve/worker_statuspush_test.go
+++ b/metropolis/node/core/roleserve/worker_statuspush_test.go
@@ -16,8 +16,12 @@
"google.golang.org/protobuf/testing/protocmp"
common "source.monogon.dev/metropolis/node"
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/consensus"
+ "source.monogon.dev/metropolis/node/core/curator"
+ "source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -62,10 +66,9 @@
// expected. It does not exercise the 'map' runnables.
func TestWorkerStatusPush(t *testing.T) {
chans := workerStatusPushChannels{
- address: make(chan string),
- nodeID: make(chan string),
- curatorClient: make(chan ipb.CuratorClient),
- localCurator: make(chan bool),
+ address: make(chan string),
+ localControlPlane: make(chan *localControlPlane),
+ curatorConnection: make(chan *curatorConnection),
}
go supervisor.TestHarness(t, func(ctx context.Context) error {
@@ -95,16 +98,20 @@
}
defer cl.Close()
- // Actual test code starts here.
+ eph := rpc.NewEphemeralClusterCredentials(t, 1)
+ nodeID := eph.Nodes[0].ID()
- chans.curatorClient <- ipb.NewCuratorClient(cl)
+ // Actual test code starts here.
+ chans.curatorConnection <- &curatorConnection{
+ credentials: eph.Nodes[0],
+ conn: cl,
+ }
cur.expectReports(t, nil)
// Provide enough data for the first status report to be submitted.
- chans.nodeID <- "1234"
chans.address <- "192.0.2.10"
cur.expectReports(t, []*ipb.UpdateNodeStatusRequest{
- {NodeId: "1234", Status: &cpb.NodeStatus{
+ {NodeId: nodeID, Status: &cpb.NodeStatus{
ExternalAddress: "192.0.2.10",
}},
})
@@ -113,31 +120,37 @@
chans.address <- "192.0.2.10"
chans.address <- "192.0.2.11"
cur.expectReports(t, []*ipb.UpdateNodeStatusRequest{
- {NodeId: "1234", Status: &cpb.NodeStatus{
+ {NodeId: nodeID, Status: &cpb.NodeStatus{
ExternalAddress: "192.0.2.10",
}},
- {NodeId: "1234", Status: &cpb.NodeStatus{
+ {NodeId: nodeID, Status: &cpb.NodeStatus{
ExternalAddress: "192.0.2.11",
}},
})
// Enabling and disabling local curator should work.
- chans.localCurator <- true
- chans.localCurator <- false
+ chans.localControlPlane <- &localControlPlane{
+ curator: &curator.Service{},
+ consensus: &consensus.Service{},
+ }
+ chans.localControlPlane <- &localControlPlane{
+ curator: nil,
+ consensus: nil,
+ }
cur.expectReports(t, []*ipb.UpdateNodeStatusRequest{
- {NodeId: "1234", Status: &cpb.NodeStatus{
+ {NodeId: nodeID, Status: &cpb.NodeStatus{
ExternalAddress: "192.0.2.10",
}},
- {NodeId: "1234", Status: &cpb.NodeStatus{
+ {NodeId: nodeID, Status: &cpb.NodeStatus{
ExternalAddress: "192.0.2.11",
}},
- {NodeId: "1234", Status: &cpb.NodeStatus{
+ {NodeId: nodeID, Status: &cpb.NodeStatus{
ExternalAddress: "192.0.2.11",
RunningCurator: &cpb.NodeStatus_RunningCurator{
Port: int32(common.CuratorServicePort),
},
}},
- {NodeId: "1234", Status: &cpb.NodeStatus{
+ {NodeId: nodeID, Status: &cpb.NodeStatus{
ExternalAddress: "192.0.2.11",
}},
})