metropolis/node/kubernetes: synchronize metropolis node labels to kubernetes
This extends the labelmaker to manage Kubernetes node labels mirrored
from Metropolis node labels.
Note that currently there is no way to edit a ClusterConfiguration at
cluster runtime, but this will come in a future CL.
Change-Id: If7dbc3796085a8b85c1b5b2a181bcb1cee3d1db4
Reviewed-on: https://review.monogon.dev/c/monogon/+/3469
Reviewed-by: Jan Schär <jan@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 7f3f0d3..f5e140a 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -36,6 +36,7 @@
"//metropolis/node/kubernetes",
"//metropolis/node/kubernetes/containerd",
"//metropolis/node/kubernetes/pki",
+ "//metropolis/proto/api",
"//metropolis/proto/common",
"//metropolis/version",
"//osbase/event",
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index 898b60e..cfab7cb 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -6,18 +6,20 @@
"net"
"source.monogon.dev/metropolis/node/core/clusternet"
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/kubernetes"
"source.monogon.dev/metropolis/node/kubernetes/containerd"
kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
- cpb "source.monogon.dev/metropolis/proto/common"
"source.monogon.dev/osbase/event"
"source.monogon.dev/osbase/event/memory"
"source.monogon.dev/osbase/net/dns"
"source.monogon.dev/osbase/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ apb "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
)
// workerKubernetes is the Kubernetes Worker, responsible for launching
@@ -42,10 +44,11 @@
// reduced) datum for the main Kubernetes launcher responsible for starting the
// service, if at all.
type kubernetesStartup struct {
- roles *cpb.NodeRoles
- lcp *localControlPlane
- curator ipb.CuratorClient
- node *identity.NodeCredentials
+ roles *cpb.NodeRoles
+ lcp *localControlPlane
+ curator ipb.CuratorClient
+ management apb.ManagementClient
+ node *identity.NodeCredentials
}
// changed informs the Kubernetes launcher whether two different
@@ -95,10 +98,11 @@
}
if lr != nil && cc != nil {
startupV.Set(&kubernetesStartup{
- roles: lr,
- lcp: lcp,
- node: cc.Credentials,
- curator: ipb.NewCuratorClient(cc.conn),
+ roles: lr,
+ lcp: lcp,
+ node: cc.Credentials,
+ curator: ipb.NewCuratorClient(cc.conn),
+ management: apb.NewManagementClient(cc.conn),
})
}
}
@@ -163,6 +167,7 @@
Consensus: d.lcp.consensus,
Network: s.network,
Curator: d.curator,
+ Management: d.management,
})
// Start Kubernetes.
if err := supervisor.Run(ctx, "run", controller.Run); err != nil {
diff --git a/metropolis/node/kubernetes/labelmaker.go b/metropolis/node/kubernetes/labelmaker.go
index a5f7869..1713223 100644
--- a/metropolis/node/kubernetes/labelmaker.go
+++ b/metropolis/node/kubernetes/labelmaker.go
@@ -2,11 +2,12 @@
import (
"context"
- "strings"
+ "fmt"
+ "regexp"
"time"
kerrors "k8s.io/apimachinery/pkg/api/errors"
- v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/client-go/kubernetes"
@@ -14,6 +15,7 @@
"source.monogon.dev/osbase/supervisor"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ apb "source.monogon.dev/metropolis/proto/api"
)
// labelmaker is a service responsible for keeping Kubernetes node labels in sync
@@ -28,34 +30,16 @@
type labelmaker struct {
clientSet kubernetes.Interface
curator ipb.CuratorClient
+ mgmt apb.ManagementClient
}
-// managedLabelPrefixes are string prefixes to the keys of labels that we manage.
-// This is used to filter out labels when doing a local diff of state vs. intent,
-// before Kubernetes server-side apply is used.
-var managedLabelPrefixes = []string{
- "node-role.kubernetes.io/",
-}
-
-// filterManaged filters out all labels not falling under managedLabelPrefixes.
-func filterManaged(l common.Labels) common.Labels {
- res := make(map[string]string)
- for k, v := range l {
- for _, prefix := range managedLabelPrefixes {
- if strings.HasPrefix(k, prefix) {
- res[k] = v
- break
- }
- }
- }
- return res
-}
+const labelmakerFieldManager string = "metropolis-label"
// getIntendedLabelsPerNode returns labels that a node should have according to
-// Curator data.
-func getIntendedLabelsPerNode(node *ipb.Node) common.Labels {
+// Curator data and the given list of custom label regexes.
+func getIntendedLabelsPerNode(labelRegexes []*regexp.Regexp, node *ipb.Node) common.Labels {
labels := make(map[string]string)
- // Mark role labels with empty string as content, following convention set by
+ // Add role labels. Use empty string as content, following convention set by
// kubeadm et al.
if node.Roles.ConsensusMember != nil {
labels["node-role.kubernetes.io/ConsensusMember"] = ""
@@ -66,6 +50,16 @@
if node.Roles.KubernetesWorker != nil {
labels["node-role.kubernetes.io/KubernetesWorker"] = ""
}
+
+ // Add user managed labels.
+ for _, pair := range node.Labels.Pairs {
+ for _, regex := range labelRegexes {
+ if regex.MatchString(pair.Key) {
+ labels[pair.Key] = pair.Value
+ break
+ }
+ }
+ }
return labels
}
@@ -74,14 +68,25 @@
//
// If the given node does not exist in Kubernetes, an empty label map is returned.
func getCurrentLabelsForNode(ctx context.Context, clientset kubernetes.Interface, nid string) (common.Labels, error) {
- node, err := clientset.CoreV1().Nodes().Get(ctx, nid, v1.GetOptions{})
+ node, err := clientset.CoreV1().Nodes().Get(ctx, nid, metav1.GetOptions{})
if kerrors.IsNotFound(err) {
return nil, nil
}
if err != nil {
return nil, err
}
- return node.Labels, nil
+ nac, err := applycorev1.ExtractNode(node, labelmakerFieldManager)
+ if err != nil {
+ return nil, err
+ }
+ // If there are no managed labels in this node, the Labels field will be nil. We
+ // can't return that, as our caller would think it means that this node doesn't
+ // exist in Kubernetes.
+ labels := nac.Labels
+ if labels == nil {
+ labels = make(common.Labels)
+ }
+ return labels, nil
}
// run the labelmaker service.
@@ -117,6 +122,24 @@
// order to ensure nodes that couldn't get processed have another chance later
// on.
func (l *labelmaker) runInner(ctx context.Context) error {
+ var labelRegexes []*regexp.Regexp
+ info, err := l.mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+ if err != nil {
+ return fmt.Errorf("could not get cluster info: %w", err)
+ }
+ if info.ClusterConfiguration != nil && info.ClusterConfiguration.KubernetesConfig != nil {
+ for _, rule := range info.ClusterConfiguration.KubernetesConfig.NodeLabelsToSynchronize {
+ if rule.Regexp != "" {
+ re, err := regexp.Compile(rule.Regexp)
+ if err != nil {
+ supervisor.Logger(ctx).Warningf("Invalid regexp rule %q for node labels: %v", rule.Regexp, err)
+ continue
+ }
+ labelRegexes = append(labelRegexes, re)
+ }
+ }
+ }
+
srv, err := l.curator.Watch(ctx, &ipb.WatchRequest{
Kind: &ipb.WatchRequest_NodesInCluster_{
NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
@@ -127,6 +150,8 @@
}
defer srv.CloseSend()
+ supervisor.Logger(ctx).Infof("Starting node watch with %d regexes", len(labelRegexes))
+
for {
ev, err := srv.Recv()
if err != nil {
@@ -136,7 +161,7 @@
supervisor.Logger(ctx).Infof("Processing %d nodes...", len(ev.Nodes))
for _, node := range ev.Nodes {
- if err := l.processClusterNode(ctx, node); err != nil {
+ if err := l.processClusterNode(ctx, node, labelRegexes); err != nil {
supervisor.Logger(ctx).Warningf("Failed to process labels on node %s: %v", node.Id, err)
}
}
@@ -150,17 +175,16 @@
// processClusterNodes runs the label reconciliation algorithm on a single node.
// It requests current labels from Kubernetes and issues a server-side-apply
// operation to bring them into the requested state, if there is a diff.
-func (l *labelmaker) processClusterNode(ctx context.Context, node *ipb.Node) error {
- intent := getIntendedLabelsPerNode(node)
+func (l *labelmaker) processClusterNode(ctx context.Context, node *ipb.Node, labelRegexes []*regexp.Regexp) error {
+ intent := getIntendedLabelsPerNode(labelRegexes, node)
state, err := getCurrentLabelsForNode(ctx, l.clientSet, node.Id)
if err != nil {
return err
}
if state == nil {
+ supervisor.Logger(ctx).V(1).Infof("Node %s does not exist in Kubernetes, skipping", node.Id)
return nil
}
- intent = filterManaged(intent)
- state = filterManaged(state)
if intent.Equals(state) {
return nil
@@ -179,8 +203,8 @@
cfg := applycorev1.Node(node.Id)
cfg.Labels = intent
- _, err = l.clientSet.CoreV1().Nodes().Apply(ctx, cfg, v1.ApplyOptions{
- FieldManager: "metropolis-labelmaker",
+ _, err = l.clientSet.CoreV1().Nodes().Apply(ctx, cfg, metav1.ApplyOptions{
+ FieldManager: labelmakerFieldManager,
Force: true,
})
return err
diff --git a/metropolis/node/kubernetes/service_controller.go b/metropolis/node/kubernetes/service_controller.go
index 71b133c..02cce8c 100644
--- a/metropolis/node/kubernetes/service_controller.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -44,12 +44,13 @@
ServiceIPRange net.IPNet
ClusterNet net.IPNet
- KPKI *pki.PKI
- Root *localstorage.Root
- Consensus consensus.ServiceHandle
- Network *network.Service
- Node *identity.NodeCredentials
- Curator ipb.CuratorClient
+ KPKI *pki.PKI
+ Root *localstorage.Root
+ Consensus consensus.ServiceHandle
+ Network *network.Service
+ Node *identity.NodeCredentials
+ Curator ipb.CuratorClient
+ Management apb.ManagementClient
}
type Controller struct {
@@ -163,6 +164,7 @@
lm := labelmaker{
clientSet: clientSet,
curator: s.c.Curator,
+ mgmt: s.c.Management,
}
if err := supervisor.Run(ctx, "labelmaker", lm.run); err != nil {
return err