metropolis/node/kubernetes: update labels based on node roles

This implements the labelmaker, a reconciling loop running on Kubernetes
controller nodes which updates Kubernetes node labels based on cluster
data.

Currently it only updates role labels based on cluster roles, but this
can be extended in the future to also replicate Metropolis node labels
into Kubernetes node labels.

Change-Id: I9c5ba92bb46f064aa03836720d4a80adc6061ab9
Reviewed-on: https://review.monogon.dev/c/monogon/+/3464
Tested-by: Jenkins CI
Reviewed-by: Jan Schär <jan@monogon.tech>
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index 7b6038d..898b60e 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -162,6 +162,7 @@
 			Root:           s.storageRoot,
 			Consensus:      d.lcp.consensus,
 			Network:        s.network,
+			Curator:        d.curator,
 		})
 		// Start Kubernetes.
 		if err := supervisor.Run(ctx, "run", controller.Run); err != nil {
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index d4a36dc..787a9c7 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -8,6 +8,7 @@
         "controller-manager.go",
         "csi.go",
         "kubelet.go",
+        "labelmaker.go",
         "provisioner.go",
         "scheduler.go",
         "service_controller.go",
@@ -49,6 +50,7 @@
         "@io_k8s_apimachinery//pkg/runtime",
         "@io_k8s_apimachinery//pkg/runtime/schema",
         "@io_k8s_apiserver//pkg/apis/apiserver",
+        "@io_k8s_client_go//applyconfigurations/core/v1:core",
         "@io_k8s_client_go//informers",
         "@io_k8s_client_go//informers/core/v1:core",
         "@io_k8s_client_go//informers/storage/v1:storage",
diff --git a/metropolis/node/kubernetes/labelmaker.go b/metropolis/node/kubernetes/labelmaker.go
new file mode 100644
index 0000000..a5f7869
--- /dev/null
+++ b/metropolis/node/kubernetes/labelmaker.go
@@ -0,0 +1,187 @@
+package kubernetes
+
+import (
+	"context"
+	"strings"
+	"time"
+
+	kerrors "k8s.io/apimachinery/pkg/api/errors"
+	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
+	"k8s.io/client-go/kubernetes"
+
+	common "source.monogon.dev/metropolis/node"
+	"source.monogon.dev/osbase/supervisor"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+// labelmaker is a service responsible for keeping Kubernetes node labels in sync
+// with Metropolis data. Currently, it synchronized Metropolis node roles into
+// corresponding Kubernetes roles (implemented as labels).
+//
+// The labelmaker service runs on all Kubernetes controller nodes. This is safe,
+// but it can cause spurious updates or update errors.
+//
+// TODO(q3k): leader elect a node responsible for this (and other Kubernetes
+// control loops, too).
+type labelmaker struct {
+	clientSet kubernetes.Interface
+	curator   ipb.CuratorClient
+}
+
+// managedLabelPrefixes are string prefixes to the keys of labels that we manage.
+// This is used to filter out labels when doing a local diff of state vs. intent,
+// before Kubernetes server-side apply is used.
+var managedLabelPrefixes = []string{
+	"node-role.kubernetes.io/",
+}
+
+// filterManaged filters out all labels not falling under managedLabelPrefixes.
+func filterManaged(l common.Labels) common.Labels {
+	res := make(map[string]string)
+	for k, v := range l {
+		for _, prefix := range managedLabelPrefixes {
+			if strings.HasPrefix(k, prefix) {
+				res[k] = v
+				break
+			}
+		}
+	}
+	return res
+}
+
+// getIntendedLabelsPerNode returns labels that a node should have according to
+// Curator data.
+func getIntendedLabelsPerNode(node *ipb.Node) common.Labels {
+	labels := make(map[string]string)
+	// Mark role labels with empty string as content, following convention set by
+	// kubeadm et al.
+	if node.Roles.ConsensusMember != nil {
+		labels["node-role.kubernetes.io/ConsensusMember"] = ""
+	}
+	if node.Roles.KubernetesController != nil {
+		labels["node-role.kubernetes.io/KubernetesController"] = ""
+	}
+	if node.Roles.KubernetesWorker != nil {
+		labels["node-role.kubernetes.io/KubernetesWorker"] = ""
+	}
+	return labels
+}
+
+// getCurrentLabelsForNode returns the current labels in Kubernetes for a given
+// node.
+//
+// If the given node does not exist in Kubernetes, an empty label map is returned.
+func getCurrentLabelsForNode(ctx context.Context, clientset kubernetes.Interface, nid string) (common.Labels, error) {
+	node, err := clientset.CoreV1().Nodes().Get(ctx, nid, v1.GetOptions{})
+	if kerrors.IsNotFound(err) {
+		return nil, nil
+	}
+	if err != nil {
+		return nil, err
+	}
+	return node.Labels, nil
+}
+
+// run the labelmaker service.
+func (l *labelmaker) run(ctx context.Context) error {
+	for {
+		ctxT, ctxC := context.WithTimeout(ctx, time.Minute*10)
+		err := l.runInner(ctxT)
+		// runInner will return an error either on context expiry (which should lead to a
+		// silent restart) or any other error (either processing error or parent context
+		// timeout) which should be bubbled up.
+		if err == nil {
+			// This shouldn't happen, but let's handle it just in case.
+			ctxC()
+			continue
+		}
+		if ctxT.Err() != nil && ctx.Err() == nil {
+			// We should check with errors.Is whether the returned error is influenced by
+			// context expiry, but gRPC apparently doesn't (always?) wrap the context error
+			// cause into the returned error. Skipping this check might lead us to
+			// misidentifying other errors as context expiry if these two race, but there's
+			// not much we can do about this.
+			ctxC()
+			continue
+		}
+		// Probably some other kind of error. Return it.
+		ctxC()
+		return err
+	}
+}
+
+// runInner runs the labelmaker service with one active curator Watch call. This
+// will be interrupted by the given context timing out (as provided by run) in
+// order to ensure nodes that couldn't get processed have another chance later
+// on.
+func (l *labelmaker) runInner(ctx context.Context) error {
+	srv, err := l.curator.Watch(ctx, &ipb.WatchRequest{
+		Kind: &ipb.WatchRequest_NodesInCluster_{
+			NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
+		},
+	})
+	if err != nil {
+		return err
+	}
+	defer srv.CloseSend()
+
+	for {
+		ev, err := srv.Recv()
+		if err != nil {
+			return err
+		}
+
+		supervisor.Logger(ctx).Infof("Processing %d nodes...", len(ev.Nodes))
+
+		for _, node := range ev.Nodes {
+			if err := l.processClusterNode(ctx, node); err != nil {
+				supervisor.Logger(ctx).Warningf("Failed to process labels on node %s: %v", node.Id, err)
+			}
+		}
+
+		if ev.Progress == ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED {
+			supervisor.Logger(ctx).Infof("Caught up with node backlog, now watching for updates.")
+		}
+	}
+}
+
+// processClusterNodes runs the label reconciliation algorithm on a single node.
+// It requests current labels from Kubernetes and issues a server-side-apply
+// operation to bring them into the requested state, if there is a diff.
+func (l *labelmaker) processClusterNode(ctx context.Context, node *ipb.Node) error {
+	intent := getIntendedLabelsPerNode(node)
+	state, err := getCurrentLabelsForNode(ctx, l.clientSet, node.Id)
+	if err != nil {
+		return err
+	}
+	if state == nil {
+		return nil
+	}
+	intent = filterManaged(intent)
+	state = filterManaged(state)
+
+	if intent.Equals(state) {
+		return nil
+	}
+	supervisor.Logger(ctx).Infof("Updating labels on node %s... ", node.Id)
+	for k, v := range state {
+		if intent[k] != v {
+			supervisor.Logger(ctx).Infof("  Removing %s=%q", k, v)
+		}
+	}
+	for k, v := range intent {
+		if state[k] != v {
+			supervisor.Logger(ctx).Infof("  Adding %s=%q", k, v)
+		}
+	}
+
+	cfg := applycorev1.Node(node.Id)
+	cfg.Labels = intent
+	_, err = l.clientSet.CoreV1().Nodes().Apply(ctx, cfg, v1.ApplyOptions{
+		FieldManager: "metropolis-labelmaker",
+		Force:        true,
+	})
+	return err
+}
diff --git a/metropolis/node/kubernetes/service_controller.go b/metropolis/node/kubernetes/service_controller.go
index 7aa91e8..71b133c 100644
--- a/metropolis/node/kubernetes/service_controller.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -34,8 +34,10 @@
 	"source.monogon.dev/metropolis/node/kubernetes/metricsproxy"
 	"source.monogon.dev/metropolis/node/kubernetes/pki"
 	"source.monogon.dev/metropolis/node/kubernetes/reconciler"
-	apb "source.monogon.dev/metropolis/proto/api"
 	"source.monogon.dev/osbase/supervisor"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	apb "source.monogon.dev/metropolis/proto/api"
 )
 
 type ConfigController struct {
@@ -47,6 +49,7 @@
 	Consensus consensus.ServiceHandle
 	Network   *network.Service
 	Node      *identity.NodeCredentials
+	Curator   ipb.CuratorClient
 }
 
 type Controller struct {
@@ -157,6 +160,14 @@
 		return fmt.Errorf("could not run sub-service reconciler: %w", err)
 	}
 
+	lm := labelmaker{
+		clientSet: clientSet,
+		curator:   s.c.Curator,
+	}
+	if err := supervisor.Run(ctx, "labelmaker", lm.run); err != nil {
+		return err
+	}
+
 	// Before we start anything else, make sure reconciliation passes at least once.
 	// This makes the initial startup of a cluster much cleaner as we don't end up
 	// starting the scheduler/controller-manager/etc just to get them to immediately
diff --git a/metropolis/node/labels.go b/metropolis/node/labels.go
index 3e51aae..f26e5bd 100644
--- a/metropolis/node/labels.go
+++ b/metropolis/node/labels.go
@@ -75,3 +75,33 @@
 	}
 	return ""
 }
+
+// Labels on a node, a map from label key to value.
+type Labels map[string]string
+
+// Equals returns true if these Labels are equal to some others. Equality is
+// defined by having the same set of keys and corresponding values.
+func (l Labels) Equals(others Labels) bool {
+	for k, v := range l {
+		if v2, ok := others[k]; !ok || v != v2 {
+			return false
+		}
+	}
+	for k, v := range others {
+		if v2, ok := l[k]; !ok || v != v2 {
+			return false
+		}
+	}
+	return true
+}
+
+// Filter returns a subset of labels for which pred returns true.
+func (l Labels) Filter(pred func(k, v string) bool) Labels {
+	res := make(Labels)
+	for k, v := range l {
+		if pred(k, v) {
+			res[k] = v
+		}
+	}
+	return res
+}
diff --git a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
index a2ba770..3519a17 100644
--- a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
+++ b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
@@ -34,6 +34,7 @@
     },
     deps = [
         "//metropolis/node",
+        "//metropolis/proto/api",
         "//metropolis/proto/common",
         "//metropolis/test/launch",
         "//metropolis/test/localregistry",
diff --git a/metropolis/test/e2e/suites/kubernetes/run_test.go b/metropolis/test/e2e/suites/kubernetes/run_test.go
index 9056ec4..f68f0a4 100644
--- a/metropolis/test/e2e/suites/kubernetes/run_test.go
+++ b/metropolis/test/e2e/suites/kubernetes/run_test.go
@@ -24,6 +24,7 @@
 	podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
 
 	common "source.monogon.dev/metropolis/node"
+	apb "source.monogon.dev/metropolis/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
 	mlaunch "source.monogon.dev/metropolis/test/launch"
 	"source.monogon.dev/metropolis/test/localregistry"
@@ -61,6 +62,121 @@
 	largeTestTimeout = 120 * time.Second
 )
 
+// TestE2EKubernetesLabels verifies that Kubernetes node labels are being updated
+// when the cluster state changes.
+func TestE2EKubernetesLabels(t *testing.T) {
+	ctx, cancel := context.WithTimeout(context.Background(), globalTestTimeout)
+	defer cancel()
+
+	clusterOptions := mlaunch.ClusterOptions{
+		NumNodes: 2,
+		InitialClusterConfiguration: &cpb.ClusterConfiguration{
+			TpmMode:               cpb.ClusterConfiguration_TPM_MODE_DISABLED,
+			StorageSecurityPolicy: cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_INSECURE,
+		},
+	}
+	cluster, err := mlaunch.LaunchCluster(ctx, clusterOptions)
+	if err != nil {
+		t.Fatalf("LaunchCluster failed: %v", err)
+	}
+	defer func() {
+		err := cluster.Close()
+		if err != nil {
+			t.Fatalf("cluster Close failed: %v", err)
+		}
+	}()
+
+	con, err := cluster.CuratorClient()
+	if err != nil {
+		t.Fatalf("Could not get curator client: %v", err)
+	}
+	mgmt := apb.NewManagementClient(con)
+	clientSet, err := cluster.GetKubeClientSet()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	getLabelsForNode := func(nid string) common.Labels {
+		node, err := clientSet.CoreV1().Nodes().Get(ctx, nid, metav1.GetOptions{})
+		if kerrors.IsNotFound(err) {
+			return nil
+		}
+		if err != nil {
+			t.Fatalf("Could not get node %s: %v", nid, err)
+			return nil
+		}
+		return common.Labels(node.Labels).Filter(func(k, v string) bool {
+			return strings.HasPrefix(k, "node-role.kubernetes.io/")
+		})
+	}
+
+	// Nodes should have no labels at first.
+	for _, nid := range cluster.NodeIDs {
+		if labels := getLabelsForNode(nid); !labels.Equals(nil) {
+			t.Errorf("Node %s should have no labels, has %s", nid, labels)
+		}
+	}
+	// Nominate both nodes to be Kubernetes workers.
+	for _, nid := range cluster.NodeIDs {
+		yes := true
+		_, err := mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+			Node: &apb.UpdateNodeRolesRequest_Id{
+				Id: nid,
+			},
+			KubernetesWorker: &yes,
+		})
+		if err != nil {
+			t.Fatalf("Could not make %s a KubernetesWorker: %v", nid, err)
+		}
+	}
+
+	util.MustTestEventual(t, "Labels added", ctx, time.Second*5, func(ctx context.Context) error {
+		// Nodes should have role labels now.
+		for _, nid := range cluster.NodeIDs {
+			want := common.Labels{
+				"node-role.kubernetes.io/KubernetesWorker": "",
+			}
+			if nid == cluster.NodeIDs[0] {
+				want["node-role.kubernetes.io/KubernetesController"] = ""
+				want["node-role.kubernetes.io/ConsensusMember"] = ""
+			}
+			if labels := getLabelsForNode(nid); !want.Equals(labels) {
+				return fmt.Errorf("node %s should have labels %s, has %s", nid, want, labels)
+			}
+		}
+		return nil
+	})
+
+	// Remove KubernetesWorker from first node again. It will stay in k8s (arguably,
+	// this is a bug) but its role label should be removed.
+	no := false
+	_, err = mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+		Node: &apb.UpdateNodeRolesRequest_Id{
+			Id: cluster.NodeIDs[0],
+		},
+		KubernetesWorker: &no,
+	})
+	if err != nil {
+		t.Fatalf("Could not remove KubernetesWorker from %s: %v", cluster.NodeIDs[0], err)
+	}
+
+	util.MustTestEventual(t, "Labels removed", ctx, time.Second*5, func(ctx context.Context) error {
+		for _, nid := range cluster.NodeIDs {
+			want := make(common.Labels)
+			if nid == cluster.NodeIDs[0] {
+				want["node-role.kubernetes.io/KubernetesController"] = ""
+				want["node-role.kubernetes.io/ConsensusMember"] = ""
+			} else {
+				want["node-role.kubernetes.io/KubernetesWorker"] = ""
+			}
+			if labels := getLabelsForNode(nid); !want.Equals(labels) {
+				return fmt.Errorf("node %s should have labels %s, has %s", nid, want, labels)
+			}
+		}
+		return nil
+	})
+}
+
 // TestE2EKubernetes exercises the Kubernetes functionality of Metropolis.
 //
 // The tests are performed against an in-memory cluster.