m/n/c/curator: add result filtering to GetNodes
This introduces result filtering to management.GetNodes Curator API
call. GetNodesRequest payload was modified to contain an optional CEL
expression. GetNodes will return only node protobuf messages for which
the expression evaluates to boolean truth. GetNodes behavior remains
unchanged for empty expression strings, returning all nodes.
See: https://github.com/google/cel-go
https: //github.com/google/cel-spec
Change-Id: Ibdd847c73d305de22b7df496c401e9bc37f9f0bc
Reviewed-on: https://review.monogon.dev/c/monogon/+/768
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
Vouch-Run-CI: Mateusz Zalega <mateusz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 681b637..af0e11f 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -5,6 +5,7 @@
srcs = [
"bootstrap.go",
"curator.go",
+ "filters.go",
"impl_follower.go",
"impl_leader.go",
"impl_leader_aaa.go",
@@ -33,6 +34,10 @@
"//metropolis/pkg/supervisor",
"//metropolis/proto/api",
"//metropolis/proto/common",
+ "@com_github_google_cel_go//cel:go_default_library",
+ "@com_github_google_cel_go//checker/decls:go_default_library",
+ "@com_github_google_cel_go//common/types:go_default_library",
+ "@go_googleapis//google/api/expr/v1alpha1:expr_go_proto",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_golang_google_grpc//codes",
diff --git a/metropolis/node/core/curator/filters.go b/metropolis/node/core/curator/filters.go
new file mode 100644
index 0000000..1e43314
--- /dev/null
+++ b/metropolis/node/core/curator/filters.go
@@ -0,0 +1,123 @@
+// This file contains common implementation related to filtering of protobuf
+// messages with Common Expression Language.
+package curator
+
+import (
+ "context"
+
+ "github.com/google/cel-go/cel"
+ celdecls "github.com/google/cel-go/checker/decls"
+ celtypes "github.com/google/cel-go/common/types"
+ exprpb "google.golang.org/genproto/googleapis/api/expr/v1alpha1"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "source.monogon.dev/metropolis/node/core/rpc"
+ apb "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// enumDeclarations is a helper function returning a cel.EnvOption
+// that contains CEL integer constant declarations matching em. It's
+// purpose is to facilitate importing of proto-defined enums into CEL
+// environments.
+func enumDeclarations(em map[int32]string) cel.EnvOption {
+ var ds []*exprpb.Decl
+ for i, n := range em {
+ ds = append(ds, celdecls.NewConst(n, celdecls.Int,
+ &exprpb.Constant{
+ ConstantKind: &exprpb.Constant_Int64Value{Int64Value: int64(i)},
+ },
+ ))
+ }
+ return cel.Declarations(ds...)
+}
+
+// buildFilter takes the CEL expression fexpr, and CEL environment options
+// opts, to produce a filter program. Since its anticipated usage context
+// resides in RPC handlers, it returns RPC-safe, sanitized error messages,
+// while utilizing rpc.Trace to log relevant details.
+func buildFilter(ctx context.Context, fexpr string, opts ...cel.EnvOption) (cel.Program, error) {
+ // Create the CEL environment, containing a node the filter is evaluated
+ // against, and related enum constants.
+ env, err := cel.NewEnv(opts...)
+ if err != nil {
+ rpc.Trace(ctx).Printf("Couldn't create a CEL environment: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "couldn't process the filter expression")
+ }
+
+ // Parse and type-check the expression.
+ p, iss := env.Parse(fexpr)
+ if iss != nil && iss.Err() != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "while parsing the filter expression: %v", iss.Err())
+ }
+ c, iss := env.Check(p)
+ if iss != nil && iss.Err() != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "while checking the filter expression: %v", iss.Err())
+ }
+
+ // Create the filter program.
+ fprg, err := env.Program(c)
+ if err != nil {
+ rpc.Trace(ctx).Printf("Couldn't create a CEL filter program: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "couldn't create the filter program")
+ }
+ return fprg, nil
+}
+
+// evaluateFilter is a helper function that takes a CEL program fprg along with
+// its runtime environment variables varmap, and evaluates it, expecting a
+// boolean result. It returns RPC-safe, sanitized error messages, while
+// utilizing rpc.Trace to log relevant details.
+func evaluateFilter(ctx context.Context, fprg cel.Program, varmap map[string]interface{}) (bool, error) {
+ out, _, err := fprg.Eval(varmap)
+ if err != nil {
+ rpc.Trace(ctx).Printf("Couldn't evaluate a CEL program: %v", err)
+ return false, status.Errorf(codes.Unavailable, "couldn't process the filter expression")
+ }
+
+ res := out.ConvertToType(celtypes.BoolType)
+ if celtypes.IsError(res) {
+ return false, status.Errorf(codes.InvalidArgument, "filter did not evaluate to a boolean value")
+ }
+ return res == celtypes.True, nil
+}
+
+// nodeFilter are functions created by buildNodeFilter, corresponding to a
+// specific CEL filter expression, that wrap evaluateFilter.
+type nodeFilter func(ctx context.Context, node *apb.Node) (bool, error)
+
+// buildNodeFilter wraps buildFilter to return a node filtering function based
+// on the CEL filter expression expr. Given an empty filter expression, it
+// returns a function that keeps every node.
+func buildNodeFilter(ctx context.Context, expr string) (nodeFilter, error) {
+ if expr == "" {
+ return func(_ context.Context, _ *apb.Node) (bool, error) {
+ return true, nil
+ }, nil
+ }
+
+ // Build the filtering CEL program using the expression, along with
+ // node-specific CEL environment options.
+ fprg, err := buildFilter(ctx, expr,
+ cel.Types(&apb.Node{}),
+ cel.Declarations(
+ celdecls.NewVar("node", celdecls.NewTypeParamType("metropolis.proto.api.Node")),
+ ),
+ // There doesn't seem to be an easier way of importing protobuf enums
+ // into CEL environments.
+ enumDeclarations(cpb.NodeState_name),
+ enumDeclarations(apb.Node_Health_name),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // Return a filtering function that captures fprg.
+ return func(ctx context.Context, n *apb.Node) (bool, error) {
+ keep, err := evaluateFilter(ctx, fprg, map[string]interface{}{
+ "node": n,
+ })
+ return keep, err
+ }, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 86a1af6..5592984 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -134,7 +134,7 @@
// nodeHealth returns the node's health, along with the duration since last
// heartbeat was received, given a current timestamp.
func (l *leaderManagement) nodeHealth(node *Node, now time.Time) (apb.Node_Health, time.Duration) {
- // Get the last received node heartbeat's timestamp.
+ // Get the last received node heartbeat's timestamp.
nid := identity.NodeID(node.pubkey)
nts := l.nodeHeartbeatTimestamp(nid)
// lhb is the duration since the last heartbeat was received.
@@ -166,7 +166,7 @@
// GetNodes implements Management.GetNodes, which returns a list of nodes from
// the point of view of the cluster.
-func (l *leaderManagement) GetNodes(_ *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
+func (l *leaderManagement) GetNodes(req *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
ctx := srv.Context()
l.muNodes.Lock()
@@ -178,6 +178,12 @@
return status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
}
+ // Create a CEL filter program, to be used in the reply loop below.
+ filter, err := buildNodeFilter(ctx, req.Filter)
+ if err != nil {
+ return err
+ }
+
// Get a singular monotonic timestamp to reference node heartbeat timestamps
// against.
now := time.Now()
@@ -201,18 +207,33 @@
// Assess the node's health.
health, lhb := l.nodeHealth(node, now)
- if err := srv.Send(&apb.Node{
- Pubkey: node.pubkey,
- State: node.state,
- Status: node.status,
- Roles: roles,
+ entry := apb.Node{
+ Pubkey: node.pubkey,
+ State: node.state,
+ Status: node.status,
+ Roles: roles,
+ // TODO(mateusz@monogon.tech): update the API to use protobuf Duration
+ // message, in order to facilitate filter expressions like
+ // 'node.HeartbeatTimestamp > duration("30s")'.
+ // TODO(mateusz@monogon.tech): change HeartbeatTimestamp proto field
+ // name to TimeSinceHeartbeat, since it's not really a timestamp.
HeartbeatTimestamp: lhb.Nanoseconds(),
Health: health,
- }); err != nil {
+ }
+
+ // Evaluate the filter expression for this node. Send the node, if it's
+ // kept by the filter.
+ keep, err := filter(ctx, &entry)
+ if err != nil {
+ return err
+ }
+ if !keep {
+ continue
+ }
+ if err := srv.Send(&entry); err != nil {
return err
}
}
-
return nil
}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 9fa6bb4..337be8e 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -8,6 +8,7 @@
"crypto/tls"
"crypto/x509"
"encoding/hex"
+ "io"
"net"
"testing"
"time"
@@ -870,3 +871,102 @@
t.Fatalf("CaPublicKey mismatch (wanted %s, got %s)", hex.EncodeToString(want), hex.EncodeToString(got))
}
}
+
+// TestGetNodes exercises management.GetNodes call.
+func TestGetNodes(t *testing.T) {
+ cl := fakeLeader(t)
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ // putNode creates a new node within the cluster, given its initial state.
+ putNode := func(state cpb.NodeState) *Node {
+ npub, _, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate node keypair: %v", err)
+ }
+ jpub, _, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate join keypair: %v", err)
+ }
+ cuk := []byte("fakefakefakefakefakefakefakefake")
+ node := &Node{
+ clusterUnlockKey: cuk,
+ pubkey: npub,
+ jkey: jpub,
+ state: state,
+ }
+ if err := nodeSave(ctx, cl.l, node); err != nil {
+ t.Fatalf("nodeSave failed: %v", err)
+ }
+ return node
+ }
+
+ mgmt := apb.NewManagementClient(cl.mgmtConn)
+
+ // getNodes calls mgmt.GetNodes, given a CEL filter expression as
+ // an argument.
+ getNodes := func(filter string) []*apb.Node {
+ var nodes []*apb.Node
+
+ res, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
+ Filter: filter,
+ })
+ if err != nil {
+ t.Fatalf("GetNodes failed: %v", err)
+ }
+
+ for {
+ node, err := res.Recv()
+ if err != nil && err != io.EOF {
+ t.Fatalf("Recv failed: %v", err)
+ }
+ if err == io.EOF {
+ break
+ }
+ nodes = append(nodes, node)
+ }
+ return nodes
+ }
+
+ // exists returns true, if node n exists within nodes returned by getNodes.
+ exists := func(n *Node, nodes []*apb.Node) bool {
+ for _, e := range nodes {
+ if bytes.Equal(e.Pubkey, n.pubkey) {
+ return true
+ }
+ }
+ return false
+ }
+
+ // Create additional nodes, to be used in test cases below.
+ var nodes []*Node
+ nodes = append(nodes, putNode(cpb.NodeState_NODE_STATE_NEW))
+ nodes = append(nodes, putNode(cpb.NodeState_NODE_STATE_UP))
+ nodes = append(nodes, putNode(cpb.NodeState_NODE_STATE_UP))
+
+ // Call mgmt.GetNodes without a filter expression. The result r should contain
+ // all existing nodes.
+ r := getNodes("")
+ if !exists(nodes[0], r) {
+ t.Fatalf("a node is missing in management.GetNodes result.")
+ }
+ if len(r) < len(nodes) {
+ t.Fatalf("management.GetNodes didn't return expected node count.")
+ }
+
+ // mgmt.GetNodes, provided with the below expression, should return all nodes
+ // which state matches NODE_STATE_UP.
+ r = getNodes("node.state == NODE_STATE_UP")
+ // Hence, the second and third node both should be included in the query
+ // result.
+ if !exists(nodes[1], r) {
+ t.Fatalf("a node is missing in management.GetNodes result.")
+ }
+ if !exists(nodes[2], r) {
+ t.Fatalf("a node is missing in management.GetNodes result.")
+ }
+ // ...but not the first node.
+ if exists(nodes[0], r) {
+ t.Fatalf("management.GetNodes didn't filter out an undesired node.")
+ }
+}
diff --git a/metropolis/proto/api/management.proto b/metropolis/proto/api/management.proto
index 88cab77..17ff6af 100644
--- a/metropolis/proto/api/management.proto
+++ b/metropolis/proto/api/management.proto
@@ -83,6 +83,13 @@
}
message GetNodesRequest {
+ // filter is a CEL expression used to limit the count of GetNodes results.
+ // Each processed node protobuf message is exposed to the filter as
+ // "node" variable, while related state and health enum constants are
+ // anchored in the root namespace, eg. NODE_STATE_UP, or HEARTBEAT_TIMEOUT.
+ // A node is returned each time the expression is evaluated as true. If
+ // empty, all nodes are returned.
+ string filter = 1;
}
// Node in a Metropolis cluster, streamed by Management.GetNodes. For each node