metropolis/node/kubernetes: synchronize metropolis node labels to kubernetes

This extends the labelmaker to manage Kubernetes node labels mirrored
from Metropolis node labels.

Note that currently there is no way to edit a ClusterConfiguration at
cluster runtime, but this will come in a future CL.

Change-Id: If7dbc3796085a8b85c1b5b2a181bcb1cee3d1db4
Reviewed-on: https://review.monogon.dev/c/monogon/+/3469
Reviewed-by: Jan Schär <jan@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/kubernetes/labelmaker.go b/metropolis/node/kubernetes/labelmaker.go
index a5f7869..1713223 100644
--- a/metropolis/node/kubernetes/labelmaker.go
+++ b/metropolis/node/kubernetes/labelmaker.go
@@ -2,11 +2,12 @@
 
 import (
 	"context"
-	"strings"
+	"fmt"
+	"regexp"
 	"time"
 
 	kerrors "k8s.io/apimachinery/pkg/api/errors"
-	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
 	"k8s.io/client-go/kubernetes"
 
@@ -14,6 +15,7 @@
 	"source.monogon.dev/osbase/supervisor"
 
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	apb "source.monogon.dev/metropolis/proto/api"
 )
 
 // labelmaker is a service responsible for keeping Kubernetes node labels in sync
@@ -28,34 +30,16 @@
 type labelmaker struct {
 	clientSet kubernetes.Interface
 	curator   ipb.CuratorClient
+	mgmt      apb.ManagementClient
 }
 
-// managedLabelPrefixes are string prefixes to the keys of labels that we manage.
-// This is used to filter out labels when doing a local diff of state vs. intent,
-// before Kubernetes server-side apply is used.
-var managedLabelPrefixes = []string{
-	"node-role.kubernetes.io/",
-}
-
-// filterManaged filters out all labels not falling under managedLabelPrefixes.
-func filterManaged(l common.Labels) common.Labels {
-	res := make(map[string]string)
-	for k, v := range l {
-		for _, prefix := range managedLabelPrefixes {
-			if strings.HasPrefix(k, prefix) {
-				res[k] = v
-				break
-			}
-		}
-	}
-	return res
-}
+const labelmakerFieldManager string = "metropolis-label"
 
 // getIntendedLabelsPerNode returns labels that a node should have according to
-// Curator data.
-func getIntendedLabelsPerNode(node *ipb.Node) common.Labels {
+// Curator data and the given list of custom label regexes.
+func getIntendedLabelsPerNode(labelRegexes []*regexp.Regexp, node *ipb.Node) common.Labels {
 	labels := make(map[string]string)
-	// Mark role labels with empty string as content, following convention set by
+	// Add role labels. Use empty string as content, following convention set by
 	// kubeadm et al.
 	if node.Roles.ConsensusMember != nil {
 		labels["node-role.kubernetes.io/ConsensusMember"] = ""
@@ -66,6 +50,16 @@
 	if node.Roles.KubernetesWorker != nil {
 		labels["node-role.kubernetes.io/KubernetesWorker"] = ""
 	}
+
+	// Add user managed labels.
+	for _, pair := range node.Labels.Pairs {
+		for _, regex := range labelRegexes {
+			if regex.MatchString(pair.Key) {
+				labels[pair.Key] = pair.Value
+				break
+			}
+		}
+	}
 	return labels
 }
 
@@ -74,14 +68,25 @@
 //
 // If the given node does not exist in Kubernetes, an empty label map is returned.
 func getCurrentLabelsForNode(ctx context.Context, clientset kubernetes.Interface, nid string) (common.Labels, error) {
-	node, err := clientset.CoreV1().Nodes().Get(ctx, nid, v1.GetOptions{})
+	node, err := clientset.CoreV1().Nodes().Get(ctx, nid, metav1.GetOptions{})
 	if kerrors.IsNotFound(err) {
 		return nil, nil
 	}
 	if err != nil {
 		return nil, err
 	}
-	return node.Labels, nil
+	nac, err := applycorev1.ExtractNode(node, labelmakerFieldManager)
+	if err != nil {
+		return nil, err
+	}
+	// If there are no managed labels in this node, the Labels field will be nil. We
+	// can't return that, as our caller would think it means that this node doesn't
+	// exist in Kubernetes.
+	labels := nac.Labels
+	if labels == nil {
+		labels = make(common.Labels)
+	}
+	return labels, nil
 }
 
 // run the labelmaker service.
@@ -117,6 +122,24 @@
 // order to ensure nodes that couldn't get processed have another chance later
 // on.
 func (l *labelmaker) runInner(ctx context.Context) error {
+	var labelRegexes []*regexp.Regexp
+	info, err := l.mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+	if err != nil {
+		return fmt.Errorf("could not get cluster info: %w", err)
+	}
+	if info.ClusterConfiguration != nil && info.ClusterConfiguration.KubernetesConfig != nil {
+		for _, rule := range info.ClusterConfiguration.KubernetesConfig.NodeLabelsToSynchronize {
+			if rule.Regexp != "" {
+				re, err := regexp.Compile(rule.Regexp)
+				if err != nil {
+					supervisor.Logger(ctx).Warningf("Invalid regexp rule %q for node labels: %v", rule.Regexp, err)
+					continue
+				}
+				labelRegexes = append(labelRegexes, re)
+			}
+		}
+	}
+
 	srv, err := l.curator.Watch(ctx, &ipb.WatchRequest{
 		Kind: &ipb.WatchRequest_NodesInCluster_{
 			NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
@@ -127,6 +150,8 @@
 	}
 	defer srv.CloseSend()
 
+	supervisor.Logger(ctx).Infof("Starting node watch with %d regexes", len(labelRegexes))
+
 	for {
 		ev, err := srv.Recv()
 		if err != nil {
@@ -136,7 +161,7 @@
 		supervisor.Logger(ctx).Infof("Processing %d nodes...", len(ev.Nodes))
 
 		for _, node := range ev.Nodes {
-			if err := l.processClusterNode(ctx, node); err != nil {
+			if err := l.processClusterNode(ctx, node, labelRegexes); err != nil {
 				supervisor.Logger(ctx).Warningf("Failed to process labels on node %s: %v", node.Id, err)
 			}
 		}
@@ -150,17 +175,16 @@
 // processClusterNodes runs the label reconciliation algorithm on a single node.
 // It requests current labels from Kubernetes and issues a server-side-apply
 // operation to bring them into the requested state, if there is a diff.
-func (l *labelmaker) processClusterNode(ctx context.Context, node *ipb.Node) error {
-	intent := getIntendedLabelsPerNode(node)
+func (l *labelmaker) processClusterNode(ctx context.Context, node *ipb.Node, labelRegexes []*regexp.Regexp) error {
+	intent := getIntendedLabelsPerNode(labelRegexes, node)
 	state, err := getCurrentLabelsForNode(ctx, l.clientSet, node.Id)
 	if err != nil {
 		return err
 	}
 	if state == nil {
+		supervisor.Logger(ctx).V(1).Infof("Node %s does not exist in Kubernetes, skipping", node.Id)
 		return nil
 	}
-	intent = filterManaged(intent)
-	state = filterManaged(state)
 
 	if intent.Equals(state) {
 		return nil
@@ -179,8 +203,8 @@
 
 	cfg := applycorev1.Node(node.Id)
 	cfg.Labels = intent
-	_, err = l.clientSet.CoreV1().Nodes().Apply(ctx, cfg, v1.ApplyOptions{
-		FieldManager: "metropolis-labelmaker",
+	_, err = l.clientSet.CoreV1().Nodes().Apply(ctx, cfg, metav1.ApplyOptions{
+		FieldManager: labelmakerFieldManager,
 		Force:        true,
 	})
 	return err
diff --git a/metropolis/node/kubernetes/service_controller.go b/metropolis/node/kubernetes/service_controller.go
index 71b133c..02cce8c 100644
--- a/metropolis/node/kubernetes/service_controller.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -44,12 +44,13 @@
 	ServiceIPRange net.IPNet
 	ClusterNet     net.IPNet
 
-	KPKI      *pki.PKI
-	Root      *localstorage.Root
-	Consensus consensus.ServiceHandle
-	Network   *network.Service
-	Node      *identity.NodeCredentials
-	Curator   ipb.CuratorClient
+	KPKI       *pki.PKI
+	Root       *localstorage.Root
+	Consensus  consensus.ServiceHandle
+	Network    *network.Service
+	Node       *identity.NodeCredentials
+	Curator    ipb.CuratorClient
+	Management apb.ManagementClient
 }
 
 type Controller struct {
@@ -163,6 +164,7 @@
 	lm := labelmaker{
 		clientSet: clientSet,
 		curator:   s.c.Curator,
+		mgmt:      s.c.Management,
 	}
 	if err := supervisor.Run(ctx, "labelmaker", lm.run); err != nil {
 		return err