metropolis: first pass API for reconfiguring cluster
This implements management.ConfigureCluster. This API is based around
Protobuf FieldMasks, which is a new thing in the Metropolis codebase
(node config mutation is performed via optional fields).
Whether this is the right way to do this is to be discussed.
Alternatives considered are:
1. Always insert a full new config, providing the old one as a base. The
downside of that is the potential conflicts that will spring up the
moment we have systems regularly mutate independent parts of the
config. Additionally, this might lead to some odd behaviour when
dealing with clients that don't have support for newer versions of
the config proto.
2. Use optional fields, like in Node role code. However, this has the
downside of duplicating protos (one for the config state, one for the
mutation request). Plus, protobuf optionals are still somewhat
unusual.
3. Provide individual requests for mutating fields (like with Node
labels). This also results in a lot of boilerplate code.
4. Something akin to JSON Patch, but for protobufs, which doesn't seem
to exist.
Change-Id: I42e5eabd42076e947f4bc8399b843e0e1fd48548
Reviewed-on: https://review.monogon.dev/c/monogon/+/3591
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 1361c95..86d2629 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -15,6 +15,7 @@
"impl_leader_curator.go",
"impl_leader_management.go",
"listener.go",
+ "reconfigure.go",
"state.go",
"state_cluster.go",
"state_node.go",
@@ -42,6 +43,7 @@
"@com_github_google_cel_go//cel:go_default_library",
"@com_github_google_cel_go//checker/decls:go_default_library",
"@com_github_google_cel_go//common/types:go_default_library",
+ "@com_github_google_go_cmp//cmp",
"@com_zx2c4_golang_wireguard_wgctrl//wgtypes",
"@io_etcd_go_etcd_api_v3//mvccpb",
"@io_etcd_go_etcd_client_v3//:client",
@@ -51,8 +53,11 @@
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
+ "@org_golang_google_protobuf//encoding/prototext",
"@org_golang_google_protobuf//proto",
+ "@org_golang_google_protobuf//testing/protocmp",
"@org_golang_google_protobuf//types/known/durationpb",
+ "@org_golang_google_protobuf//types/known/fieldmaskpb",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)
@@ -62,6 +67,7 @@
srcs = [
"curator_test.go",
"impl_leader_test.go",
+ "reconfigure_test.go",
"state_test.go",
],
embed = [":curator"],
@@ -90,6 +96,7 @@
"@org_golang_google_grpc//test/bufconn",
"@org_golang_google_protobuf//proto",
"@org_golang_google_protobuf//testing/protocmp",
+ "@org_golang_google_protobuf//types/known/fieldmaskpb",
"@org_golang_google_protobuf//types/known/timestamppb",
"@org_uber_go_zap//:zap",
],
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index 6d5be0f..f40c8d2 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -61,6 +61,10 @@
// additionally guarded using etcd transactions.
muNodes sync.Mutex
+ // muCluster is like muNodes, but for cluster configuration accessed via
+ // clusterLoad/clusterSave.
+ muCluster sync.Mutex
+
consensusStatus *consensus.Status
consensus consensus.ServiceHandle
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 585cb94..0666e9d 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -10,6 +10,7 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "google.golang.org/protobuf/encoding/prototext"
dpb "google.golang.org/protobuf/types/known/durationpb"
common "source.monogon.dev/metropolis/node"
@@ -553,3 +554,52 @@
return &apb.UpdateNodeLabelsResponse{}, nil
}
+
+func (l *leaderManagement) ConfigureCluster(ctx context.Context, req *apb.ConfigureClusterRequest) (*apb.ConfigureClusterResponse, error) {
+ l.muCluster.Lock()
+ defer l.muCluster.Unlock()
+
+ // Get existing config.
+ cl, err := clusterLoad(ctx, l.leadership)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "could not load cluster: %v", err)
+ }
+ existing, err := cl.proto()
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "could not serialize cluster config: %v", err)
+ }
+
+ nct, _ := prototext.Marshal(req.NewConfig)
+ rpc.Trace(ctx).Printf("New config: %s", nct)
+ bct := []byte("not provided")
+ if req.BaseConfig != nil {
+ bct, _ = prototext.Marshal(req.BaseConfig)
+ }
+ rpc.Trace(ctx).Printf("Base config: %s", bct)
+ rpc.Trace(ctx).Printf("Fields: %v", req.UpdateMask.Paths)
+ ect, _ := prototext.Marshal(req.NewConfig)
+ rpc.Trace(ctx).Printf("Existing config: %s", ect)
+
+ // Mutate.
+ merged, err := reconfigureCluster(req.BaseConfig, req.NewConfig, existing, req.UpdateMask)
+ if err != nil {
+ return nil, err
+ }
+
+ mct, _ := prototext.Marshal(merged)
+ rpc.Trace(ctx).Printf("Merged config: %s", mct)
+
+ // Save new config.
+ cl, err = clusterFromProto(merged)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "failed to rebuild cluster config: %v", err)
+ }
+ err = clusterSave(ctx, l.leadership, cl)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "failed to save cluster config: %v", err)
+ }
+
+ return &apb.ConfigureClusterResponse{
+ ResultingConfig: merged,
+ }, nil
+}
diff --git a/metropolis/node/core/curator/reconfigure.go b/metropolis/node/core/curator/reconfigure.go
new file mode 100644
index 0000000..705aa41
--- /dev/null
+++ b/metropolis/node/core/curator/reconfigure.go
@@ -0,0 +1,98 @@
+package curator
+
+import (
+ "strings"
+
+ "github.com/google/go-cmp/cmp"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/testing/protocmp"
+ "google.golang.org/protobuf/types/known/fieldmaskpb"
+
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// reconfigureCluster does a three-way merge of a given cluster configuration
+// (new, existing and optional base) into a merged configuration based on the
+// fields set in the given fieldmask.
+//
+// An error will be returned if the mask contains paths that reference unknown
+// fields or references fields which cannot be changed.
+func reconfigureCluster(base, new, existing *cpb.ClusterConfiguration, mask *fieldmaskpb.FieldMask) (*cpb.ClusterConfiguration, error) {
+ if new == nil {
+ return nil, status.Error(codes.InvalidArgument, "new_config must be set")
+ }
+ if mask == nil {
+ return nil, status.Error(codes.InvalidArgument, "update_mask must be set")
+ }
+
+ mask.Normalize()
+ if !mask.IsValid(new) {
+ return nil, status.Error(codes.InvalidArgument, "update_mask is invalid for new_config")
+ }
+
+ if base != nil {
+ if !mask.IsValid(base) {
+ return nil, status.Error(codes.InvalidArgument, "update_mask is invalid for base_config")
+ }
+ }
+
+ // Merged proto, start with deep copy of existing configuration.
+ merged := proto.Clone(existing).(*cpb.ClusterConfiguration)
+
+ for _, path := range mask.Paths {
+ handled, err := reconfigureKubernetesConfig(base, new, existing, merged, path)
+ if err != nil {
+ return nil, err
+ }
+ if !handled {
+ return nil, status.Errorf(codes.InvalidArgument, "cannot modify %s", path)
+ }
+ }
+
+ return merged, nil
+}
+
+// reconfigureKubernetesConfig does a three-way merge of Kubernetes configuration
+// (new, existing and optional base) of a given protobuf field path into merged.
+//
+// An error is returned if there is an issue applying the given change.
+// Otherwise, a boolean value is returned, indicating whether this given filed
+// path was handled.
+func reconfigureKubernetesConfig(base, new, existing, merged *cpb.ClusterConfiguration, path string) (bool, error) {
+ if path == "kubernetes_config" {
+ return false, status.Error(codes.InvalidArgument, "cannot mutate kubernetes_config directly, only subfields")
+ }
+ if !strings.HasPrefix(path, "kubernetes_config.") {
+ return false, nil
+ }
+
+ // KubernetesConfig should always exist in stored configs.
+ if merged.KubernetesConfig == nil {
+ merged.KubernetesConfig = &cpb.ClusterConfiguration_KubernetesConfig{}
+ }
+ if existing.KubernetesConfig == nil {
+ existing.KubernetesConfig = &cpb.ClusterConfiguration_KubernetesConfig{}
+ }
+
+ // Check KubernetesConfig in user structs.
+ if new.KubernetesConfig == nil {
+ return false, status.Errorf(codes.InvalidArgument, "cannot reference field %s in new_config", path)
+ }
+ if base != nil && base.KubernetesConfig == nil {
+ return false, status.Errorf(codes.InvalidArgument, "cannot reference field %s in old_config", path)
+ }
+
+ switch path {
+ case "kubernetes_config.node_labels_to_synchronize":
+ if base != nil && cmp.Diff(base.KubernetesConfig.NodeLabelsToSynchronize, existing.KubernetesConfig.NodeLabelsToSynchronize, protocmp.Transform()) != "" {
+ return false, status.Error(codes.FailedPrecondition, "base_config.kubernetes_config.node_labels_to_synchronize different from current value")
+ }
+ merged.KubernetesConfig.NodeLabelsToSynchronize = new.KubernetesConfig.NodeLabelsToSynchronize
+ default:
+ return false, status.Errorf(codes.InvalidArgument, "cannot mutate %s", path)
+ }
+
+ return true, nil
+}
diff --git a/metropolis/node/core/curator/reconfigure_test.go b/metropolis/node/core/curator/reconfigure_test.go
new file mode 100644
index 0000000..41000a4
--- /dev/null
+++ b/metropolis/node/core/curator/reconfigure_test.go
@@ -0,0 +1,162 @@
+package curator
+
+import (
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "google.golang.org/protobuf/testing/protocmp"
+ "google.golang.org/protobuf/types/known/fieldmaskpb"
+
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+func TestReconfigureCluster(t *testing.T) {
+ mkCfg := func(regexes ...string) *cpb.ClusterConfiguration {
+ res := &cpb.ClusterConfiguration{
+ TpmMode: cpb.ClusterConfiguration_TPM_MODE_BEST_EFFORT,
+ StorageSecurityPolicy: cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_PERMISSIVE,
+ KubernetesConfig: &cpb.ClusterConfiguration_KubernetesConfig{},
+ }
+ for _, regex := range regexes {
+ res.KubernetesConfig.NodeLabelsToSynchronize = append(res.KubernetesConfig.NodeLabelsToSynchronize, &cpb.ClusterConfiguration_KubernetesConfig_NodeLabelsToSynchronize{
+ Regexp: regex,
+ })
+ }
+ return res
+ }
+
+ for i, te := range []struct {
+ base *cpb.ClusterConfiguration
+ new *cpb.ClusterConfiguration
+ existing *cpb.ClusterConfiguration
+ mask *fieldmaskpb.FieldMask
+ result *cpb.ClusterConfiguration
+ shouldFail bool
+ }{
+ // Case 0: no-op on an empty config.
+ {
+ base: &cpb.ClusterConfiguration{},
+ new: &cpb.ClusterConfiguration{},
+ existing: &cpb.ClusterConfiguration{},
+ mask: &fieldmaskpb.FieldMask{Paths: []string{}},
+ result: &cpb.ClusterConfiguration{},
+ },
+ // Case 1: no-op with an empty base.
+ {
+ new: &cpb.ClusterConfiguration{},
+ existing: &cpb.ClusterConfiguration{},
+ mask: &fieldmaskpb.FieldMask{Paths: []string{}},
+ result: &cpb.ClusterConfiguration{},
+ },
+ // Case 2: no-op on a populated config.
+ {
+ base: &cpb.ClusterConfiguration{},
+ new: &cpb.ClusterConfiguration{},
+ existing: mkCfg("^foo$"),
+ mask: &fieldmaskpb.FieldMask{Paths: []string{}},
+ result: mkCfg("^foo$"),
+ },
+ // Case 3: reconfigure kubernetes node labels.
+ {
+ base: &cpb.ClusterConfiguration{
+ KubernetesConfig: &cpb.ClusterConfiguration_KubernetesConfig{
+ NodeLabelsToSynchronize: []*cpb.ClusterConfiguration_KubernetesConfig_NodeLabelsToSynchronize{
+ {Regexp: "^foo$"},
+ },
+ },
+ },
+ new: &cpb.ClusterConfiguration{
+ KubernetesConfig: &cpb.ClusterConfiguration_KubernetesConfig{
+ NodeLabelsToSynchronize: []*cpb.ClusterConfiguration_KubernetesConfig_NodeLabelsToSynchronize{
+ {Regexp: "^bar$"},
+ },
+ },
+ },
+ existing: mkCfg("^foo$"),
+ mask: &fieldmaskpb.FieldMask{Paths: []string{"kubernetes_config.node_labels_to_synchronize"}},
+ result: mkCfg("^bar$"),
+ },
+ // Case 4: reconfigure kubernetes node labels without base.
+ {
+ new: &cpb.ClusterConfiguration{
+ KubernetesConfig: &cpb.ClusterConfiguration_KubernetesConfig{
+ NodeLabelsToSynchronize: []*cpb.ClusterConfiguration_KubernetesConfig_NodeLabelsToSynchronize{
+ {Regexp: "^bar$"},
+ },
+ },
+ },
+ existing: mkCfg("^foo$"),
+ mask: &fieldmaskpb.FieldMask{Paths: []string{"kubernetes_config.node_labels_to_synchronize"}},
+ result: mkCfg("^bar$"),
+ },
+ // Case 5: no-op with an empty base.
+ {
+ new: &cpb.ClusterConfiguration{},
+ existing: &cpb.ClusterConfiguration{},
+ mask: &fieldmaskpb.FieldMask{Paths: []string{}},
+ result: &cpb.ClusterConfiguration{},
+ },
+ // Case 6: missing new.
+ {
+ new: nil,
+ existing: &cpb.ClusterConfiguration{},
+ mask: &fieldmaskpb.FieldMask{Paths: []string{}},
+ result: &cpb.ClusterConfiguration{},
+ shouldFail: true,
+ },
+ // Case 7: missing mask.
+ {
+ new: &cpb.ClusterConfiguration{},
+ existing: &cpb.ClusterConfiguration{},
+ mask: nil,
+ result: &cpb.ClusterConfiguration{},
+ shouldFail: true,
+ },
+ // Case 8: mask references unknown field.
+ {
+ new: &cpb.ClusterConfiguration{},
+ existing: &cpb.ClusterConfiguration{},
+ mask: &fieldmaskpb.FieldMask{Paths: []string{"foo"}},
+ result: &cpb.ClusterConfiguration{},
+ shouldFail: true,
+ },
+ // Case 9: mask references field unset in new.
+ {
+ new: &cpb.ClusterConfiguration{},
+ existing: &cpb.ClusterConfiguration{},
+ mask: &fieldmaskpb.FieldMask{Paths: []string{"kubernetes_config.node_labels_to_synchronize"}},
+ result: &cpb.ClusterConfiguration{},
+ shouldFail: true,
+ },
+ // Case 10: mask references field unset in base.
+ {
+ base: &cpb.ClusterConfiguration{},
+ new: &cpb.ClusterConfiguration{
+ KubernetesConfig: &cpb.ClusterConfiguration_KubernetesConfig{
+ NodeLabelsToSynchronize: []*cpb.ClusterConfiguration_KubernetesConfig_NodeLabelsToSynchronize{},
+ },
+ },
+ existing: &cpb.ClusterConfiguration{},
+ mask: &fieldmaskpb.FieldMask{Paths: []string{"kubernetes_config.node_labels_to_synchronize"}},
+ result: &cpb.ClusterConfiguration{},
+ shouldFail: true,
+ },
+ } {
+ {
+ got, err := reconfigureCluster(te.base, te.new, te.existing, te.mask)
+ if !te.shouldFail {
+ if err != nil {
+ t.Errorf("Case %d: %v", i, err)
+ continue
+ }
+ if diff := cmp.Diff(te.result, got, protocmp.Transform()); diff != "" {
+ t.Errorf("Case %d: %s", i, diff)
+ }
+ } else {
+ if err == nil {
+ t.Errorf("Case %d: should've failed, got success", i)
+ }
+ }
+ }
+ }
+}
diff --git a/metropolis/proto/api/BUILD.bazel b/metropolis/proto/api/BUILD.bazel
index f90885a..294dcb8 100644
--- a/metropolis/proto/api/BUILD.bazel
+++ b/metropolis/proto/api/BUILD.bazel
@@ -17,6 +17,7 @@
"//osbase/logtree/proto:proto_proto",
"//osbase/net/proto:net_proto_proto",
"@protobuf//:duration_proto",
+ "@protobuf//:field_mask_proto",
],
)
diff --git a/metropolis/proto/api/management.proto b/metropolis/proto/api/management.proto
index f6900df..0302a79 100644
--- a/metropolis/proto/api/management.proto
+++ b/metropolis/proto/api/management.proto
@@ -3,6 +3,7 @@
option go_package = "source.monogon.dev/metropolis/proto/api";
import "google/protobuf/duration.proto";
+import "google/protobuf/field_mask.proto";
import "osbase/logtree/proto/logtree.proto";
import "metropolis/proto/common/common.proto";
@@ -117,6 +118,12 @@
need: PERMISSION_UPDATE_NODE_LABELS
};
}
+
+ rpc ConfigureCluster(ConfigureClusterRequest) returns (ConfigureClusterResponse) {
+ option (metropolis.proto.ext.authorization) = {
+ need: PERMISSION_CONFIGURE_CLUSTER
+ };
+ }
}
message GetRegisterTicketRequest {
@@ -506,3 +513,29 @@
message UpdateNodeLabelsResponse {
}
+message ConfigureClusterRequest {
+ // Base configuration to apply the change on. If set, the server will verify
+ // that the fields in this message (referenced by update_mask) have the same
+ // value as the current configuration. If there is a difference, an error will
+ // be returned and the configuration change will be aborted.
+ //
+ // This field _should_ be set to prevent race conditions with other clients
+ // attempting to mutate the configuration.
+ common.ClusterConfiguration base_config = 1;
+
+ // New configuration to set. Only fields referenced to by update_mask will be
+ // updated.
+ common.ClusterConfiguration new_config = 2;
+
+ // Fields that should be changed from the current state (and base config state,
+ // if set) into the new config state.
+ //
+ // Currently, only the following fields can be mutated:
+ // 1. kubernetes_config.node_labels_to_synchronize
+ google.protobuf.FieldMask update_mask = 3;
+}
+
+message ConfigureClusterResponse {
+ // Resulting config as set on the server, merged from the users new_config.
+ common.ClusterConfiguration resulting_config = 1;
+}
\ No newline at end of file
diff --git a/metropolis/proto/ext/authorization.proto b/metropolis/proto/ext/authorization.proto
index e526ec3..81de8bd 100644
--- a/metropolis/proto/ext/authorization.proto
+++ b/metropolis/proto/ext/authorization.proto
@@ -30,6 +30,7 @@
PERMISSION_DELETE_NODE = 9;
PERMISSION_UPDATE_NODE_LABELS = 10;
PERMISSION_NODE_POWER_MANAGEMENT = 11;
+ PERMISSION_CONFIGURE_CLUSTER = 12;
}
// Authorization policy for an RPC method. This message/API does not have the
diff --git a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
index 3519a17..a5ec530 100644
--- a/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
+++ b/metropolis/test/e2e/suites/kubernetes/BUILD.bazel
@@ -45,5 +45,6 @@
"@io_k8s_apimachinery//pkg/api/resource",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_kubernetes//pkg/api/v1/pod",
+ "@org_golang_google_protobuf//types/known/fieldmaskpb",
],
)
diff --git a/metropolis/test/e2e/suites/kubernetes/run_test.go b/metropolis/test/e2e/suites/kubernetes/run_test.go
index cf33126..80a8292 100644
--- a/metropolis/test/e2e/suites/kubernetes/run_test.go
+++ b/metropolis/test/e2e/suites/kubernetes/run_test.go
@@ -17,6 +17,7 @@
"time"
"github.com/bazelbuild/rules_go/go/runfiles"
+ "google.golang.org/protobuf/types/known/fieldmaskpb"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
@@ -206,10 +207,41 @@
"test.monogon.dev/foo": "bar",
}
if labels := getLabelsForNode(cluster.NodeIDs[1]); !want.Equals(labels) {
- return fmt.Errorf("Node %s should have labels %s, has %s", cluster.NodeIDs[1], want, labels)
+ return fmt.Errorf("node %s should have labels %s, has %s", cluster.NodeIDs[1], want, labels)
}
return nil
})
+
+ // Reconfigure node label rules.
+ _, err = mgmt.ConfigureCluster(ctx, &apb.ConfigureClusterRequest{
+ BaseConfig: &cpb.ClusterConfiguration{
+ KubernetesConfig: &cpb.ClusterConfiguration_KubernetesConfig{
+ NodeLabelsToSynchronize: []*cpb.ClusterConfiguration_KubernetesConfig_NodeLabelsToSynchronize{
+ {Regexp: `^test\.monogon\.dev/`},
+ },
+ },
+ },
+ NewConfig: &cpb.ClusterConfiguration{
+ KubernetesConfig: &cpb.ClusterConfiguration_KubernetesConfig{},
+ },
+ UpdateMask: &fieldmaskpb.FieldMask{
+ Paths: []string{"kubernetes_config.node_labels_to_synchronize"},
+ },
+ })
+ if err != nil {
+ t.Fatalf("Could not update cluster configuration: %v", err)
+ }
+
+ ci, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+ if err != nil {
+ t.Fatalf("Could not get cluster info")
+ }
+ // See if the config changed.
+ if rules := ci.ClusterConfiguration.KubernetesConfig.NodeLabelsToSynchronize; len(rules) != 0 {
+ t.Fatalf("Wanted 0 label rules in config after reconfiguration, have %d: %v", len(rules), rules)
+ }
+ // TODO: ensure new rules get applied, but that will require watching the cluster
+ // config for changes in the labelmaker.
}
// TestE2EKubernetes exercises the Kubernetes functionality of Metropolis.