blob: b5b46de1649476e6c91b684e1e0444537786fb5f [file] [log] [blame]
package curator
import (
"bytes"
"context"
"crypto/ed25519"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"io"
"net"
"strings"
"testing"
"time"
"go.etcd.io/etcd/tests/v3/integration"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/proto"
common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/consensus/client"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/pkg/pki"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
// fakeLeader creates a curatorLeader without any underlying leader election, in
// its own etcd namespace. It starts the gRPC listener and returns clients to
// it, from the point of view of the local node and some other external node.
//
// The gRPC listeners are replicated to behave as when running the Curator
// within Metropolis, so all calls performed will be authenticated and encrypted
// the same way.
//
// This is used to test functionality of the individual curatorLeader RPC
// implementations without the overhead of having to wait for a leader election.
func fakeLeader(t *testing.T) fakeLeaderData {
t.Helper()
lt := logtree.New()
logtree.PipeAllToTest(t, lt)
// Set up context whose cancel function will be returned to the user for
// terminating all harnesses started by this function.
ctx, ctxC := context.WithCancel(context.Background())
// Start a single-node etcd cluster.
integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 1,
})
// Clean up the etcd cluster and cancel the context on test end. We don't just
// use a context because we need the cluster to terminate synchronously before
// another test is ran.
t.Cleanup(func() {
ctxC()
cluster.Terminate(t)
})
// Create etcd client to test cluster.
curEtcd, _ := client.NewLocal(cluster.Client(0)).Sub("curator")
// Create a fake lock key/value and retrieve its revision. This replaces the
// leader election functionality in the curator to enable faster and more
// focused tests.
lockKey := "/test-lock"
res, err := curEtcd.Put(ctx, lockKey, "fake key")
if err != nil {
t.Fatalf("setting fake leader key failed: %v", err)
}
lockRev := res.Header.Revision
// Generate the node's public join key to be used in the bootstrap process.
nodeJoinPub, _, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate node join keypair: %v", err)
}
// Build cluster PKI with first node, replicating the cluster bootstrap process.
nodePub, nodePriv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate node keypair: %v", err)
}
cNode := NewNodeForBootstrap(nil, nodePub, nodeJoinPub)
// Here we would enable the leader node's roles. But for tests, we don't enable
// any.
caCertBytes, nodeCertBytes, err := BootstrapNodeFinish(ctx, curEtcd, &cNode, nil)
if err != nil {
t.Fatalf("could not finish node bootstrap: %v", err)
}
nodeCredentials, err := identity.NewNodeCredentials(nodePriv, nodeCertBytes, caCertBytes)
if err != nil {
t.Fatalf("could not build node credentials: %v", err)
}
// Generate credentials for cluster manager. This doesn't go through the owner
// credentials escrow, instead manually generating a certificate that would be
// generated by the escrow call.
ownerPub, ownerPriv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate owner keypair: %v", err)
}
oc := pki.Certificate{
Namespace: &pkiNamespace,
Issuer: pkiCA,
Template: identity.UserCertificate("owner"),
Name: "owner",
Mode: pki.CertificateExternal,
PublicKey: ownerPub,
}
ownerCertBytes, err := oc.Ensure(ctx, curEtcd)
if err != nil {
t.Fatalf("could not issue owner certificate: %v", err)
}
ownerCreds := tls.Certificate{
PrivateKey: ownerPriv,
Certificate: [][]byte{ownerCertBytes},
}
// Generate keypair for some third-party, unknown node.
otherPub, otherPriv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate unknown node keypair: %v", err)
}
// Create security interceptors for gRPC listener.
sec := &rpc.ServerSecurity{
NodeCredentials: nodeCredentials,
}
// Build a curator leader object. This implements methods that will be
// exercised by tests.
leadership := &leadership{
lockKey: lockKey,
lockRev: lockRev,
leaderID: identity.NodeID(nodePub),
etcd: curEtcd,
consensus: consensus.TestServiceHandle(t, cluster.Client(0)),
}
leader := newCuratorLeader(leadership, &nodeCredentials.Node)
// Create a curator gRPC server which performs authentication as per the created
// ServerSecurity and is backed by the created leader.
srv := grpc.NewServer(sec.GRPCOptions(lt.MustLeveledFor("leader"))...)
ipb.RegisterCuratorServer(srv, leader)
ipb.RegisterCuratorLocalServer(srv, leader)
apb.RegisterAAAServer(srv, leader)
apb.RegisterManagementServer(srv, leader)
// The gRPC server will listen on an internal 'loopback' buffer.
externalLis := bufconn.Listen(1024 * 1024)
go func() {
if err := srv.Serve(externalLis); err != nil {
t.Fatalf("GRPC serve failed: %v", err)
}
}()
// Stop the gRPC server on context cancel.
go func() {
<-ctx.Done()
srv.Stop()
}()
withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
return externalLis.Dial()
})
ca := nodeCredentials.ClusterCA()
// Create an authenticated manager gRPC client.
creds := rpc.NewAuthenticatedCredentials(ownerCreds, rpc.WantRemoteCluster(ca))
gcreds := grpc.WithTransportCredentials(creds)
mcl, err := grpc.Dial("local", withLocalDialer, gcreds)
if err != nil {
t.Fatalf("Dialing external GRPC failed: %v", err)
}
// Create a node gRPC client for the local node.
creds = rpc.NewAuthenticatedCredentials(nodeCredentials.TLSCredentials(), rpc.WantRemoteCluster(ca))
gcreds = grpc.WithTransportCredentials(creds)
lcl, err := grpc.Dial("local", withLocalDialer, gcreds)
if err != nil {
t.Fatalf("Dialing external GRPC failed: %v", err)
}
// Create an ephemeral node gRPC client for the 'other node'.
otherEphCreds, err := rpc.NewEphemeralCredentials(otherPriv, ca)
if err != nil {
t.Fatalf("NewEphemeralCredentials: %v", err)
}
ocl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(otherEphCreds))
if err != nil {
t.Fatalf("Dialing external GRPC failed: %v", err)
}
// Close the clients on context cancel.
go func() {
<-ctx.Done()
mcl.Close()
lcl.Close()
}()
return fakeLeaderData{
l: leadership,
curatorLis: externalLis,
mgmtConn: mcl,
localNodeConn: lcl,
localNodeID: nodeCredentials.ID(),
otherNodeConn: ocl,
otherNodeID: identity.NodeID(otherPub),
otherNodePriv: otherPriv,
ca: nodeCredentials.ClusterCA(),
etcd: curEtcd,
}
}
// fakeLeaderData is returned by fakeLeader and contains information about the
// newly created leader and connections to its gRPC listeners.
type fakeLeaderData struct {
// l is a type internal to Curator, representing its ability to perform
// actions as a leader.
l *leadership
// curatorLis is a listener intended for Node connections.
curatorLis *bufconn.Listener
// mgmtConn is a gRPC connection to the leader's public gRPC interface,
// authenticated as a cluster manager.
mgmtConn grpc.ClientConnInterface
// localNodeConn is a gRPC connection to the leader's internal/local node gRPC
// interface, which usually runs on a domain socket and is only available to
// other Metropolis node code.
localNodeConn grpc.ClientConnInterface
// localNodeID is the NodeID of the fake node that the leader is running on.
localNodeID string
// otherNodeConn is an connection from some other node (otherNodeID) into the
// cluster, authenticated using an ephemeral certificate.
otherNodeConn grpc.ClientConnInterface
// otherNodeID is the NodeID of some other node present in the curator
// state.
otherNodeID string
// otherNodePriv is the private key of the other node present in the curator
// state, ie. the private key for otherNodeID.
otherNodePriv ed25519.PrivateKey
ca *x509.Certificate
// etcd contains a low-level connection to the curator K/V store, which can be
// used to perform low-level changes to the store in tests.
etcd client.Namespaced
}
// putNode is a helper function that creates a new node within the cluster. The
// new node will have its Cluster Unlock Key, Public Key and Join Key set. A
// non-nil mut can further mutate the node before it's saved.
func putNode(t *testing.T, ctx context.Context, l *leadership, mut func(*Node)) *Node {
t.Helper()
npub, _, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate node keypair: %v", err)
}
jpub, _, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate join keypair: %v", err)
}
cuk := []byte("fakefakefakefakefakefakefakefake")
node := &Node{
clusterUnlockKey: cuk,
pubkey: npub,
jkey: jpub,
}
if mut != nil {
mut(node)
}
if err := nodeSave(ctx, l, node); err != nil {
t.Fatalf("nodeSave failed: %v", err)
}
return node
}
// getNodes wraps management.GetNodes, given a CEL filter expression as
// the request payload.
func getNodes(t *testing.T, ctx context.Context, mgmt apb.ManagementClient, filter string) []*apb.Node {
t.Helper()
res, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
Filter: filter,
})
if err != nil {
t.Fatalf("GetNodes failed: %v", err)
}
var nodes []*apb.Node
for {
node, err := res.Recv()
if err != nil && err != io.EOF {
t.Fatalf("Recv failed: %v", err)
}
if err == io.EOF {
break
}
nodes = append(nodes, node)
}
return nodes
}
// TestWatchNodeInCluster exercises a NodeInCluster Watch, from node creation,
// through updates, to its deletion.
func TestWatchNodeInCluster(t *testing.T) {
cl := fakeLeader(t)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
cur := ipb.NewCuratorClient(cl.localNodeConn)
// We'll be using a fake node throughout, manually updating it in the etcd
// cluster.
fakeNodePub, _, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("GenerateKey: %v", err)
}
fakeNodeID := identity.NodeID(fakeNodePub)
fakeNodeKey, _ := nodeEtcdPrefix.Key(fakeNodeID)
w, err := cur.Watch(ctx, &ipb.WatchRequest{
Kind: &ipb.WatchRequest_NodeInCluster_{
NodeInCluster: &ipb.WatchRequest_NodeInCluster{
NodeId: fakeNodeID,
},
},
})
if err != nil {
t.Fatalf("Watch: %v", err)
}
// Recv() should block here, as we don't yet have a node in the cluster. We
// can't really test that reliably, unfortunately.
// Populate new node.
fakeNode := &ppb.Node{
PublicKey: fakeNodePub,
Roles: &cpb.NodeRoles{},
}
fakeNodeInit, err := proto.Marshal(fakeNode)
if err != nil {
t.Fatalf("Marshal: %v", err)
}
_, err = cl.etcd.Put(ctx, fakeNodeKey, string(fakeNodeInit))
if err != nil {
t.Fatalf("Put: %v", err)
}
// Receive cluster node status. This should go through immediately.
ev, err := w.Recv()
if err != nil {
t.Fatalf("Recv: %v", err)
}
if want, got := 1, len(ev.Nodes); want != got {
t.Errorf("wanted %d nodes, got %d", want, got)
} else {
n := ev.Nodes[0]
if want, got := fakeNodeID, n.Id; want != got {
t.Errorf("wanted node %q, got %q", want, got)
}
if n.Status != nil {
t.Errorf("wanted nil status, got %v", n.Status)
}
}
// Update node status. This should trigger an update from the watcher.
fakeNode.Status = &cpb.NodeStatus{
ExternalAddress: "203.0.113.42",
RunningCurator: &cpb.NodeStatus_RunningCurator{
Port: 1234,
},
}
fakeNodeInit, err = proto.Marshal(fakeNode)
if err != nil {
t.Fatalf("Marshal: %v", err)
}
_, err = cl.etcd.Put(ctx, fakeNodeKey, string(fakeNodeInit))
if err != nil {
t.Fatalf("Put: %v", err)
}
// Receive new node. This should go through immediately.
ev, err = w.Recv()
if err != nil {
t.Fatalf("Recv: %v", err)
}
if want, got := 1, len(ev.Nodes); want != got {
t.Errorf("wanted %d nodes, got %d", want, got)
} else {
n := ev.Nodes[0]
if want, got := fakeNodeID, n.Id; want != got {
t.Errorf("wanted node %q, got %q", want, got)
}
if n.Status == nil {
t.Errorf("node status is nil")
} else {
if want := "203.0.113.42"; n.Status.ExternalAddress != want {
t.Errorf("wanted status with ip address %q, got %v", want, n.Status)
}
if want := int32(1234); n.Status.RunningCurator == nil || n.Status.RunningCurator.Port != want {
t.Errorf("wanted running curator with port %d, got %d", want, n.Status.RunningCurator.Port)
}
}
}
// Remove node. This should trigger an update from the watcher.
k, _ := nodeEtcdPrefix.Key(fakeNodeID)
if _, err := cl.etcd.Delete(ctx, k); err != nil {
t.Fatalf("could not delete node from etcd: %v", err)
}
ev, err = w.Recv()
if err != nil {
t.Fatalf("Recv: %v", err)
}
if want, got := 1, len(ev.NodeTombstones); want != got {
t.Errorf("wanted %d node tombstoness, got %d", want, got)
} else {
n := ev.NodeTombstones[0]
if want, got := fakeNodeID, n.NodeId; want != got {
t.Errorf("wanted node %q, got %q", want, got)
}
}
}
// TestWatchNodeInCluster exercises a NodesInCluster Watch, from node creation,
// through updates, to not deletion.
func TestWatchNodesInCluster(t *testing.T) {
cl := fakeLeader(t)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
cur := ipb.NewCuratorClient(cl.localNodeConn)
w, err := cur.Watch(ctx, &ipb.WatchRequest{
Kind: &ipb.WatchRequest_NodesInCluster_{
NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
},
})
if err != nil {
t.Fatalf("Watch: %v", err)
}
nodes := make(map[string]*ipb.Node)
syncNodes := func() *ipb.WatchEvent {
t.Helper()
ev, err := w.Recv()
if err != nil {
t.Fatalf("Recv: %v", err)
}
for _, n := range ev.Nodes {
n := n
nodes[n.Id] = n
}
for _, nt := range ev.NodeTombstones {
delete(nodes, nt.NodeId)
}
return ev
}
// Retrieve initial node fetch. This should yield one node.
for {
ev := syncNodes()
if ev.Progress == ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED {
break
}
}
if n := nodes[cl.localNodeID]; n == nil || n.Id != cl.localNodeID {
t.Errorf("Expected node %q to be present, got %v", cl.localNodeID, nodes[cl.localNodeID])
}
if len(nodes) != 1 {
t.Errorf("Expected exactly one node, got %d", len(nodes))
}
// Update the node status and expect a corresponding WatchEvent.
_, err = cur.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
NodeId: cl.localNodeID,
Status: &cpb.NodeStatus{
ExternalAddress: "203.0.113.43",
},
})
if err != nil {
t.Fatalf("UpdateNodeStatus: %v", err)
}
for {
syncNodes()
n := nodes[cl.localNodeID]
if n == nil {
continue
}
if n.Status == nil || n.Status.ExternalAddress != "203.0.113.43" {
continue
}
break
}
// Add a new (fake) node, and expect a corresponding WatchEvent.
fakeNodePub, _, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("GenerateKey: %v", err)
}
fakeNodeID := identity.NodeID(fakeNodePub)
fakeNodeKey, _ := nodeEtcdPrefix.Key(fakeNodeID)
fakeNode := &ppb.Node{
PublicKey: fakeNodePub,
Roles: &cpb.NodeRoles{},
}
fakeNodeInit, err := proto.Marshal(fakeNode)
if err != nil {
t.Fatalf("Marshal: %v", err)
}
_, err = cl.etcd.Put(ctx, fakeNodeKey, string(fakeNodeInit))
if err != nil {
t.Fatalf("Put: %v", err)
}
for {
syncNodes()
n := nodes[fakeNodeID]
if n == nil {
continue
}
if n.Id != fakeNodeID {
t.Errorf("Wanted faked node ID %q, got %q", fakeNodeID, n.Id)
}
break
}
// Re-open watcher, resynchronize, expect two nodes to be present.
nodes = make(map[string]*ipb.Node)
w, err = cur.Watch(ctx, &ipb.WatchRequest{
Kind: &ipb.WatchRequest_NodesInCluster_{
NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
},
})
if err != nil {
t.Fatalf("Watch: %v", err)
}
for {
ev := syncNodes()
if ev.Progress == ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED {
break
}
}
if n := nodes[cl.localNodeID]; n == nil || n.Status == nil || n.Status.ExternalAddress != "203.0.113.43" {
t.Errorf("Node %q should exist and have external address, got %v", cl.localNodeID, n)
}
if n := nodes[fakeNodeID]; n == nil {
t.Errorf("Node %q should exist, got %v", fakeNodeID, n)
}
if len(nodes) != 2 {
t.Errorf("Exptected two nodes in map, got %d", len(nodes))
}
// Remove fake node, expect it to be removed from synced map.
k, _ := nodeEtcdPrefix.Key(fakeNodeID)
if _, err := cl.etcd.Delete(ctx, k); err != nil {
t.Fatalf("could not delete node from etcd: %v", err)
}
for {
syncNodes()
n := nodes[fakeNodeID]
if n == nil {
break
}
}
}
// TestRegistration exercises the node 'Register' (a.k.a. Registration) flow,
// which is described in the Cluster Lifecycle design document.
//
// It starts out with a node that's foreign to the cluster, and performs all
// the steps required to make that node part of a cluster. It calls into the
// Curator service as the registering node and the Management service as a
// cluster manager. The node registered into the cluster is fully fake, ie. is
// not an actual Metropolis node but instead is fully managed from within the
// test as a set of credentials.
func TestRegistration(t *testing.T) {
cl := fakeLeader(t)
mgmt := apb.NewManagementClient(cl.mgmtConn)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
// Retrieve ticket twice.
res1, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
if err != nil {
t.Fatalf("GetRegisterTicket failed: %v", err)
}
res2, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
if err != nil {
t.Fatalf("GetRegisterTicket failed: %v", err)
}
// Ensure tickets are set and the same.
if len(res1.Ticket) == 0 {
t.Errorf("Ticket is empty")
}
if !bytes.Equal(res1.Ticket, res2.Ticket) {
t.Errorf("Unexpected ticket change between calls")
}
// Generate the node's public join key to be used in the bootstrap process.
nodeJoinPub, _, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate node join keypair: %v", err)
}
// Register 'other node' into cluster.
cur := ipb.NewCuratorClient(cl.otherNodeConn)
_, err = cur.RegisterNode(ctx, &ipb.RegisterNodeRequest{
RegisterTicket: res1.Ticket,
JoinKey: nodeJoinPub,
})
if err != nil {
t.Fatalf("RegisterNode failed: %v", err)
}
expectOtherNode := func(state cpb.NodeState) {
t.Helper()
res, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
if err != nil {
t.Fatalf("GetNodes failed: %v", err)
}
for {
node, err := res.Recv()
if err != nil {
t.Fatalf("Recv failed: %v", err)
}
if identity.NodeID(node.Pubkey) != cl.otherNodeID {
continue
}
if node.State != state {
t.Fatalf("Expected node to be %s, got %s", state, node.State)
}
return
}
}
otherNodePub := cl.otherNodePriv.Public().(ed25519.PublicKey)
// Expect node to now be 'NEW'.
expectOtherNode(cpb.NodeState_NODE_STATE_NEW)
// Approve node.
_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{Pubkey: otherNodePub})
if err != nil {
t.Fatalf("ApproveNode failed: %v", err)
}
// Expect node to be 'STANDBY'.
expectOtherNode(cpb.NodeState_NODE_STATE_STANDBY)
// Approve call should be idempotent and not fail when called a second time.
_, err = mgmt.ApproveNode(ctx, &apb.ApproveNodeRequest{Pubkey: otherNodePub})
if err != nil {
t.Fatalf("ApproveNode failed: %v", err)
}
// Make 'other node' commit itself into the cluster.
_, err = cur.CommitNode(ctx, &ipb.CommitNodeRequest{
ClusterUnlockKey: []byte("fakefakefakefakefakefakefakefake"),
})
if err != nil {
t.Fatalf("CommitNode failed: %v", err)
}
// TODO(q3k): once the test cluster's PKI is consistent with the curator's
// PKI, test the emitted credentials.
// Expect node to be 'UP'.
expectOtherNode(cpb.NodeState_NODE_STATE_UP)
}
// TestJoin exercises Join Flow, as described in "Cluster Lifecycle" design
// document, assuming the node has already completed Register Flow.
func TestJoin(t *testing.T) {
cl := fakeLeader(t)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
// Build the test node and save it into etcd.
npub, _, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate node keypair: %v", err)
}
jpub, jpriv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate join keypair: %v", err)
}
cuk := []byte("fakefakefakefakefakefakefakefake")
node := Node{
clusterUnlockKey: cuk,
pubkey: npub,
jkey: jpub,
state: cpb.NodeState_NODE_STATE_UP,
}
if err := nodeSave(ctx, cl.l, &node); err != nil {
t.Fatalf("nodeSave failed: %v", err)
}
// Connect to Curator using the node's Join Credentials, as opposed to the
// node keypair, then join the cluster.
withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
return cl.curatorLis.Dial()
})
ephCreds, err := rpc.NewEphemeralCredentials(jpriv, cl.ca)
if err != nil {
t.Fatalf("NewEphemeralCredentials: %v", err)
}
eph, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(ephCreds))
if err != nil {
t.Fatalf("Dialing external GRPC failed: %v", err)
}
cur := ipb.NewCuratorClient(eph)
jr, err := cur.JoinNode(ctx, &ipb.JoinNodeRequest{})
if err != nil {
t.Fatalf("JoinNode failed: %v", err)
}
// Compare the received CUK with the one we started out with.
if bytes.Compare(cuk, jr.ClusterUnlockKey) != 0 {
t.Fatal("JoinNode returned an invalid CUK.")
}
}
// TestClusterUpdateNodeStatus exercises the Curator.UpdateNodeStatus RPC by
// sending node updates and making sure they are reflected in subsequent Watch
// events.
func TestClusterUpdateNodeStatus(t *testing.T) {
cl := fakeLeader(t)
curator := ipb.NewCuratorClient(cl.localNodeConn)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
// Retrieve initial node data, it should have no status set.
value, err := curator.Watch(ctx, &ipb.WatchRequest{
Kind: &ipb.WatchRequest_NodeInCluster_{
NodeInCluster: &ipb.WatchRequest_NodeInCluster{
NodeId: cl.localNodeID,
},
},
})
if err != nil {
t.Fatalf("Could not request node watch: %v", err)
}
ev, err := value.Recv()
if err != nil {
t.Fatalf("Could not receive initial node value: %v", err)
}
if status := ev.Nodes[0].Status; status != nil {
t.Errorf("Initial node value contains status, should be nil: %+v", status)
}
// Update status...
_, err = curator.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
NodeId: cl.localNodeID,
Status: &cpb.NodeStatus{
ExternalAddress: "192.0.2.10",
},
})
if err != nil {
t.Fatalf("UpdateNodeStatus: %v", err)
}
// ... and expect it to be reflected in the new node value.
for {
ev, err = value.Recv()
if err != nil {
t.Fatalf("Could not receive second node value: %v", err)
}
// Keep waiting until we get a status.
status := ev.Nodes[0].Status
if status == nil {
continue
}
if want, got := "192.0.2.10", status.ExternalAddress; want != got {
t.Errorf("Wanted external address %q, got %q", want, got)
}
break
}
// Expect updating some other node's ID to fail.
_, err = curator.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
NodeId: cl.otherNodeID,
Status: &cpb.NodeStatus{
ExternalAddress: "192.0.2.10",
},
})
if err == nil {
t.Errorf("UpdateNodeStatus for other node (%q vs local %q) succeeded, should have failed", cl.localNodeID, cl.otherNodeID)
}
}
// TestClusterHeartbeat exercises curator.Heartbeat and mgmt.GetNodes RPCs by
// verifying proper node health transitions as affected by leadership changes
// and timely arrival of node heartbeat updates.
func TestClusterHeartbeat(t *testing.T) {
cl := fakeLeader(t)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
curator := ipb.NewCuratorClient(cl.localNodeConn)
mgmt := apb.NewManagementClient(cl.mgmtConn)
// expectNode is a helper function that fails if health of the node, as
// returned by mgmt.GetNodes call, does not match its argument.
expectNode := func(id string, health apb.Node_Health) {
t.Helper()
res, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
if err != nil {
t.Fatalf("GetNodes failed: %v", err)
}
for {
node, err := res.Recv()
if err != nil {
t.Fatalf("Recv failed: %v", err)
}
if id != identity.NodeID(node.Pubkey) {
continue
}
if node.Health != health {
t.Fatalf("Expected node to be %s, got %s.", health, node.Health)
}
return
}
}
// Test case: the node is UP, and the curator leader has just been elected,
// with no recorded node heartbeats. In this case the node's health is
// UNKNOWN, since it wasn't given enough time to submit a single heartbeat.
cl.l.ls.startTs = time.Now()
expectNode(cl.localNodeID, apb.Node_UNKNOWN)
// Let's turn the clock forward a bit. In this case the node is still UP,
// but the current leadership has been assumed more than HeartbeatTimeout
// ago. If no heartbeats arrived during this period, the node is timing out.
cl.l.ls.startTs = cl.l.ls.startTs.Add(-HeartbeatTimeout)
expectNode(cl.localNodeID, apb.Node_HEARTBEAT_TIMEOUT)
// Now we'll simulate the node sending a couple of heartbeats. The node is
// expected to be HEALTHY after the first of them arrives.
stream, err := curator.Heartbeat(ctx)
if err != nil {
t.Fatalf("While initializing heartbeat stream: %v", err)
}
for i := 0; i < 3; i++ {
if err := stream.Send(&ipb.HeartbeatUpdateRequest{}); err != nil {
t.Fatalf("While sending a heartbeat: %v", err)
}
_, err := stream.Recv()
if err != nil {
t.Fatalf("While receiving a heartbeat reply: %v", err)
}
expectNode(cl.localNodeID, apb.Node_HEALTHY)
}
// This case tests timing out from a healthy state. The passage of time is
// simulated by an adjustment of curator leader's timestamp entry
// corresponding to the tested node's ID.
smv, _ := cl.l.ls.heartbeatTimestamps.Load(cl.localNodeID)
lts := smv.(time.Time)
lts = lts.Add(-HeartbeatTimeout)
cl.l.ls.heartbeatTimestamps.Store(cl.localNodeID, lts)
expectNode(cl.localNodeID, apb.Node_HEARTBEAT_TIMEOUT)
// This case verifies that health of non-UP nodes is assessed to be UNKNOWN,
// regardless of leadership tenure, since only UP nodes are capable of
// sending heartbeats.
npub, _, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate node keypair: %v", err)
}
jpub, _, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
t.Fatalf("could not generate join keypair: %v", err)
}
cuk := []byte("fakefakefakefakefakefakefakefake")
node := Node{
clusterUnlockKey: cuk,
pubkey: npub,
jkey: jpub,
state: cpb.NodeState_NODE_STATE_NEW,
}
if err := nodeSave(ctx, cl.l, &node); err != nil {
t.Fatalf("nodeSave failed: %v", err)
}
expectNode(identity.NodeID(npub), apb.Node_UNKNOWN)
}
// TestManagementClusterInfo exercises GetClusterInfo after setting a status.
func TestManagementClusterInfo(t *testing.T) {
cl := fakeLeader(t)
mgmt := apb.NewManagementClient(cl.mgmtConn)
curator := ipb.NewCuratorClient(cl.localNodeConn)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
// Update status to set an external address.
_, err := curator.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
NodeId: cl.localNodeID,
Status: &cpb.NodeStatus{
ExternalAddress: "192.0.2.10",
},
})
if err != nil {
t.Fatalf("UpdateNodeStatus: %v", err)
}
// Retrieve cluster info and make sure it's as expected.
res, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
if err != nil {
t.Fatalf("GetClusterInfo failed: %v", err)
}
nodes := res.ClusterDirectory.Nodes
if want, got := 1, len(nodes); want != got {
t.Fatalf("ClusterDirectory.Nodes contains %d elements, wanted %d", want, got)
}
node := nodes[0]
// Address should match address set from status.
if want, got := 1, len(node.Addresses); want != got {
t.Fatalf("ClusterDirectory.Nodes[0].Addresses has %d elements, wanted %d", want, got)
}
if want, got := "192.0.2.10", node.Addresses[0].Host; want != got {
t.Errorf("Nodes[0].Addresses[0].Host is %q, wanted %q", want, got)
}
// Cluster CA public key should match
ca, err := x509.ParseCertificate(res.CaCertificate)
if err != nil {
t.Fatalf("CaCertificate could not be parsed: %v", err)
}
if want, got := cl.ca.PublicKey.(ed25519.PublicKey), ca.PublicKey.(ed25519.PublicKey); !bytes.Equal(want, got) {
t.Fatalf("CaPublicKey mismatch (wanted %s, got %s)", hex.EncodeToString(want), hex.EncodeToString(got))
}
}
// TestGetNodes exercises management.GetNodes call.
func TestGetNodes(t *testing.T) {
cl := fakeLeader(t)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
mgmt := apb.NewManagementClient(cl.mgmtConn)
// exists returns true, if node n exists within nodes returned by getNodes.
exists := func(n *Node, nodes []*apb.Node) bool {
for _, e := range nodes {
if bytes.Equal(e.Pubkey, n.pubkey) {
return true
}
}
return false
}
// Create additional nodes, to be used in test cases below.
var nodes []*Node
nodes = append(nodes, putNode(t, ctx, cl.l, func(n *Node) { n.state = cpb.NodeState_NODE_STATE_NEW }))
nodes = append(nodes, putNode(t, ctx, cl.l, func(n *Node) { n.state = cpb.NodeState_NODE_STATE_UP }))
nodes = append(nodes, putNode(t, ctx, cl.l, func(n *Node) { n.state = cpb.NodeState_NODE_STATE_UP }))
// Call mgmt.GetNodes without a filter expression. The result r should contain
// all existing nodes.
an := getNodes(t, ctx, mgmt, "")
if !exists(nodes[0], an) {
t.Fatalf("a node is missing in management.GetNodes result.")
}
if len(an) < len(nodes) {
t.Fatalf("management.GetNodes didn't return expected node count.")
}
// mgmt.GetNodes, provided with the below expression, should return all nodes
// for which state matches NODE_STATE_UP.
upn := getNodes(t, ctx, mgmt, "node.state == NODE_STATE_UP")
// Hence, the second and third node both should be included in the query
// result.
if !exists(nodes[1], upn) {
t.Fatalf("a node is missing in management.GetNodes result.")
}
if !exists(nodes[2], upn) {
t.Fatalf("a node is missing in management.GetNodes result.")
}
// ...but not the first node.
if exists(nodes[0], upn) {
t.Fatalf("management.GetNodes didn't filter out an undesired node.")
}
// Since node roles are protobuf messages, their presence needs to be tested
// with the "has" macro.
kwn := putNode(t, ctx, cl.l, func(n *Node) {
n.kubernetesWorker = &NodeRoleKubernetesWorker{}
})
in := putNode(t, ctx, cl.l, func(n *Node) {
n.kubernetesWorker = nil
})
wnr := getNodes(t, ctx, mgmt, "has(node.roles.kubernetes_worker)")
if !exists(kwn, wnr) {
t.Fatalf("management.GetNodes didn't return the Kubernetes worker node.")
}
if exists(in, wnr) {
t.Fatalf("management.GetNodes returned a node which isn't a Kubernetes worker.")
}
// Exercise duration-based filtering. Start with setting up node and
// leadership timestamps much like in TestClusterHeartbeat.
tsn := putNode(t, ctx, cl.l, func(n *Node) { n.state = cpb.NodeState_NODE_STATE_UP })
nid := identity.NodeID(tsn.pubkey)
// Last of node's tsn heartbeats were received 5 seconds ago,
cl.l.ls.heartbeatTimestamps.Store(nid, time.Now().Add(-5*time.Second))
// ...while the current leader's tenure started 15 seconds ago.
cl.l.ls.startTs = time.Now().Add(-15 * time.Second)
// Get all nodes that sent their last heartbeat between 4 and 6 seconds ago.
// Node tsn should be among the results.
tsr := getNodes(t, ctx, mgmt, "node.time_since_heartbeat < duration('6s') && node.time_since_heartbeat > duration('4s')")
if !exists(tsn, tsr) {
t.Fatalf("node was filtered out where it shouldn't be")
}
// Now, get all nodes that sent their last heartbeat more than 7 seconds ago.
// In this case, node tsn should be filtered out.
tsr = getNodes(t, ctx, mgmt, "node.time_since_heartbeat > duration('7s')")
if exists(tsn, tsr) {
t.Fatalf("node wasn't filtered out where it should be")
}
}
// TestUpdateNodeRoles exercises management.UpdateNodeRoles by running it
// against some newly created nodes, and verifying the effect by examining
// results delivered by a subsequent call to management.GetNodes.
func TestUpdateNodeRoles(t *testing.T) {
cl := fakeLeader(t)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
// Create the test nodes.
var tn []*Node
tn = append(tn, putNode(t, ctx, cl.l, func(n *Node) { n.state = cpb.NodeState_NODE_STATE_UP }))
tn = append(tn, putNode(t, ctx, cl.l, func(n *Node) { n.state = cpb.NodeState_NODE_STATE_UP }))
tn = append(tn, putNode(t, ctx, cl.l, func(n *Node) { n.state = cpb.NodeState_NODE_STATE_UP }))
// Define the test payloads. Each role is optional, and will be updated
// only if it's not nil, and its value differs from the current state.
opt := func(v bool) *bool { return &v }
ue := []*apb.UpdateNodeRolesRequest{
&apb.UpdateNodeRolesRequest{
Node: &apb.UpdateNodeRolesRequest_Pubkey{
Pubkey: tn[0].pubkey,
},
KubernetesWorker: opt(false),
ConsensusMember: opt(false),
},
&apb.UpdateNodeRolesRequest{
Node: &apb.UpdateNodeRolesRequest_Pubkey{
Pubkey: tn[1].pubkey,
},
KubernetesWorker: opt(false),
ConsensusMember: opt(true),
},
&apb.UpdateNodeRolesRequest{
Node: &apb.UpdateNodeRolesRequest_Pubkey{
Pubkey: tn[2].pubkey,
},
KubernetesWorker: opt(true),
ConsensusMember: opt(true),
},
&apb.UpdateNodeRolesRequest{
Node: &apb.UpdateNodeRolesRequest_Pubkey{
Pubkey: tn[2].pubkey,
},
KubernetesWorker: nil,
ConsensusMember: nil,
},
}
// The following UpdateNodeRoles requests rely on noClusterMemberManagement
// being set in the running consensus Status. (see: consensus/testhelpers.go)
// Normally, adding another ConsensusMember would result in a creation of
// another etcd learner. Since the cluster can accomodate only one learner
// at a time, UpdateNodeRoles would block.
// Run all the request payloads defined in ue, with an expectation of all of
// them succeeding.
mgmt := apb.NewManagementClient(cl.mgmtConn)
for _, e := range ue {
_, err := mgmt.UpdateNodeRoles(ctx, e)
if err != nil {
t.Fatalf("management.UpdateNodeRoles: %v", err)
}
}
// Verify that node roles have indeed been updated.
cn := getNodes(t, ctx, mgmt, "")
for i, e := range ue {
for _, n := range cn {
if bytes.Equal(n.Pubkey, e.GetPubkey()) {
if e.KubernetesWorker != nil {
if *e.KubernetesWorker != (n.Roles.KubernetesWorker != nil) {
t.Fatalf("KubernetesWorker role mismatch (node %d/%d).", i+1, len(ue))
}
}
if e.ConsensusMember != nil {
if *e.ConsensusMember != (n.Roles.ConsensusMember != nil) {
t.Fatalf("ConsensusMember role mismatch (node %d/%d).", i+1, len(ue))
}
}
}
}
}
// Try running a request containing a contradictory set of roles. A cluster node
// currently can't be a KubernetesController if it's not a ConsensusMember as
// well.
uf := []*apb.UpdateNodeRolesRequest{
&apb.UpdateNodeRolesRequest{
Node: &apb.UpdateNodeRolesRequest_Pubkey{
Pubkey: tn[0].pubkey,
},
KubernetesController: opt(true),
ConsensusMember: opt(false),
},
&apb.UpdateNodeRolesRequest{
Node: &apb.UpdateNodeRolesRequest_Pubkey{
Pubkey: tn[0].pubkey,
},
KubernetesController: opt(true),
ConsensusMember: nil,
},
}
for _, e := range uf {
_, err := mgmt.UpdateNodeRoles(ctx, e)
if err == nil {
t.Fatalf("expected an error from management.UpdateNodeRoles, got nil.")
}
}
}
// TestGetCurrentLeader ensures that a leader responds with its own information
// when asked for information about the current leader.
func TestGetCurrentLeader(t *testing.T) {
cl := fakeLeader(t)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
curl := ipb.NewCuratorLocalClient(cl.localNodeConn)
srv, err := curl.GetCurrentLeader(ctx, &ipb.GetCurrentLeaderRequest{})
if err != nil {
t.Fatalf("GetCurrentLeader: %v", err)
}
res, err := srv.Recv()
if err != nil {
t.Fatalf("GetCurrentLeader.Recv: %v", err)
}
if want, got := cl.localNodeID, res.LeaderNodeId; want != got {
t.Errorf("Wanted leader node ID %q, got %q", want, got)
}
if want, got := cl.localNodeID, res.ThisNodeId; want != got {
t.Errorf("Wanted local node ID %q, got %q", want, got)
}
if want, got := int32(common.CuratorServicePort), res.LeaderPort; want != got {
t.Errorf("Wanted leader port %d, got %d", want, got)
}
}
// TestIssueKubernetesWorkerCertificate exercises whether we can retrieve
// Kubernetes Worker certificates from the curator.
func TestIssueKubernetesWorkerCertificate(t *testing.T) {
cl := fakeLeader(t)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
mgmt := apb.NewManagementClient(cl.mgmtConn)
true_ := true
_, err := mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
Node: &apb.UpdateNodeRolesRequest_Id{
Id: cl.localNodeID,
},
KubernetesWorker: &true_,
})
if err != nil {
t.Fatalf("Could not make node into Kubernetes worker: %v", err)
}
// Issue certificates for some random pubkey.
kpub, _, _ := ed25519.GenerateKey(rand.Reader)
cpub, _, _ := ed25519.GenerateKey(rand.Reader)
curator := ipb.NewCuratorClient(cl.localNodeConn)
res, err := curator.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
KubernetesWorker: &ipb.IssueCertificateRequest_KubernetesWorker{
KubeletPubkey: kpub,
CsiProvisionerPubkey: cpub,
},
},
})
if err != nil {
t.Fatalf("IssueCertificate: %v", err)
}
kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker
idca, err := x509.ParseCertificate(kw.IdentityCaCertificate)
if err != nil {
t.Fatalf("Could not parse IdCA cert: %v", err)
}
scert, err := x509.ParseCertificate(kw.KubeletServerCertificate)
if err != nil {
t.Fatalf("Could not parse server certificate: %v", err)
}
ccert, err := x509.ParseCertificate(kw.KubeletClientCertificate)
if err != nil {
t.Fatalf("Could not parse client certificate: %v", err)
}
pcert, err := x509.ParseCertificate(kw.CsiProvisionerCertificate)
if err != nil {
t.Fatalf("Could not parse CSI provisiooner certificate: %v", err)
}
if err := scert.CheckSignatureFrom(idca); err != nil {
t.Errorf("Server certificate not signed by IdCA: %v", err)
}
if err := ccert.CheckSignatureFrom(idca); err != nil {
t.Errorf("Client certificate not signed by IdCA: %v", err)
}
if err := pcert.CheckSignatureFrom(idca); err != nil {
t.Errorf("CSI provisioner certificate not signed by IdCA: %v", err)
}
scertPubkey := scert.PublicKey.(ed25519.PublicKey)
if !bytes.Equal(scertPubkey, kpub) {
t.Errorf("Server certificate not emitted for requested key")
}
ccertPubkey := ccert.PublicKey.(ed25519.PublicKey)
if !bytes.Equal(ccertPubkey, kpub) {
t.Errorf("Client certificate not emitted for requested key")
}
pcertPubkey := pcert.PublicKey.(ed25519.PublicKey)
if !bytes.Equal(pcertPubkey, cpub) {
t.Errorf("CSI provisioner certificate not emitted for requested key")
}
// Try issuing again for the same pubkeys. This should work.
_, err = curator.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
KubernetesWorker: &ipb.IssueCertificateRequest_KubernetesWorker{
KubeletPubkey: kpub,
CsiProvisionerPubkey: cpub,
},
},
})
if err != nil {
t.Errorf("Certificate should have been re-issued: %v", err)
}
// Try issuing again for other pubkey. These should be rejected.
kpub2, _, _ := ed25519.GenerateKey(rand.Reader)
cpub2, _, _ := ed25519.GenerateKey(rand.Reader)
_, err = curator.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
KubernetesWorker: &ipb.IssueCertificateRequest_KubernetesWorker{
KubeletPubkey: kpub2,
CsiProvisionerPubkey: cpub,
},
},
})
if err == nil {
t.Errorf("Certificate has been issued again for a different pubkey")
}
_, err = curator.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
KubernetesWorker: &ipb.IssueCertificateRequest_KubernetesWorker{
KubeletPubkey: kpub,
CsiProvisionerPubkey: cpub2,
},
},
})
if err == nil {
t.Errorf("Certificate has been issued again for a different pubkey")
}
}
// TestUpdateNodeClusterNetworking exercises the validation and mutation
// functionality of the UpdateNodeClusterNetworkship implementation in the
// curator leader.
func TestUpdateNodeClusterNetworking(t *testing.T) {
cl := fakeLeader(t)
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
// Make another fake node out-of band. We'll be using it at the end to make sure
// that we can't add another node with the same pubkey. We have to do it as early
// as possible to bypass caching by the leader.
//
// TODO(q3k): implement adding more nodes in harness, and just add another node
// normally. This will actually exercise the cache better.
putNode(t, ctx, cl.l, func(n *Node) { n.wireguardKey = "+nb5grgIKQEbHm5JrUZovPQ9Bv04jR2TtY6sgS0dGG4=" })
cur := ipb.NewCuratorClient(cl.localNodeConn)
// Update the node's external address as it's used in tests.
_, err := cur.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
NodeId: cl.localNodeID,
Status: &cpb.NodeStatus{
ExternalAddress: "203.0.113.43",
},
})
if err != nil {
t.Errorf("Failed to update node external address: %v", err)
}
// Run a few table tests, but end up with an update that went through.
for i, te := range []struct {
req *ipb.UpdateNodeClusterNetworkingRequest
wantErr string
}{
{&ipb.UpdateNodeClusterNetworkingRequest{}, "clusternet must be set"},
{&ipb.UpdateNodeClusterNetworkingRequest{
Clusternet: &cpb.NodeClusterNetworking{},
}, "wireguard_pubkey must be set"},
{&ipb.UpdateNodeClusterNetworkingRequest{
Clusternet: &cpb.NodeClusterNetworking{
WireguardPubkey: "beep boop i'm a key",
},
}, "must be a valid wireguard"},
{&ipb.UpdateNodeClusterNetworkingRequest{
Clusternet: &cpb.NodeClusterNetworking{
WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
},
}, ""},
{&ipb.UpdateNodeClusterNetworkingRequest{
Clusternet: &cpb.NodeClusterNetworking{
WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
Prefixes: []*cpb.NodeClusterNetworking_Prefix{
{Cidr: ""},
},
},
}, "no '/'"},
{&ipb.UpdateNodeClusterNetworkingRequest{
Clusternet: &cpb.NodeClusterNetworking{
WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
Prefixes: []*cpb.NodeClusterNetworking_Prefix{
{Cidr: "10.0.0.128/16"},
},
},
}, "must be in canonical format"},
{&ipb.UpdateNodeClusterNetworkingRequest{
Clusternet: &cpb.NodeClusterNetworking{
WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
Prefixes: []*cpb.NodeClusterNetworking_Prefix{
// Prefix outside of cluster net should not be allowed.
{Cidr: "10.0.0.0/15"},
},
},
}, "must be fully contained"},
{&ipb.UpdateNodeClusterNetworkingRequest{
Clusternet: &cpb.NodeClusterNetworking{
WireguardPubkey: "w9RbFvF14pytyraq16IEuMov032XXrPBOQUr59kcxHg=",
Prefixes: []*cpb.NodeClusterNetworking_Prefix{
// Random /32 should not be allowed.
{Cidr: "8.8.8.8/32"},
},
},
}, "must be fully contained"},
{&ipb.UpdateNodeClusterNetworkingRequest{
Clusternet: &cpb.NodeClusterNetworking{
WireguardPubkey: "GaNXuc/yl8IaXduX6PQ+ZxIG4HtBACubHrRI7rqfA20=",
Prefixes: []*cpb.NodeClusterNetworking_Prefix{
{Cidr: "10.0.0.0/24"},
// Yes, this is allowed.
{Cidr: "10.0.0.0/16"},
{Cidr: "10.0.12.23/32"},
// External address should be allowed.
{Cidr: "203.0.113.43/32"},
},
},
}, ""},
} {
_, err := cur.UpdateNodeClusterNetworking(ctx, te.req)
if te.wantErr != "" {
if !strings.Contains(err.Error(), te.wantErr) {
t.Errorf("case %d: error should've contained %q, got %q", i, te.wantErr, err.Error())
}
} else {
if err != nil {
t.Errorf("case %d: should've passed, got: %v", i, err)
}
}
}
// Make sure the last request actually ended up in a node mutation.
w, err := cur.Watch(ctx, &ipb.WatchRequest{
Kind: &ipb.WatchRequest_NodeInCluster_{
NodeInCluster: &ipb.WatchRequest_NodeInCluster{
NodeId: cl.localNodeID,
},
},
})
if err != nil {
t.Error(err)
}
ev, err := w.Recv()
if err != nil {
t.Error(err)
}
cn := ev.Nodes[0].Clusternet
if want, got := "GaNXuc/yl8IaXduX6PQ+ZxIG4HtBACubHrRI7rqfA20=", cn.WireguardPubkey; want != got {
t.Errorf("Wrong wireguard key: wanted %q, got %q", want, got)
}
if want, got := 4, len(cn.Prefixes); want != got {
t.Errorf("Wanted %d prefixes, got %d", want, got)
} else {
for i, want := range []string{"10.0.0.0/24", "10.0.0.0/16", "10.0.12.23/32", "203.0.113.43/32"} {
if got := cn.Prefixes[i].Cidr; want != got {
t.Errorf("Prefix %d should be %q, got %q", i, want, got)
}
}
}
// Make sure adding another node with the same pubkey fails.
_, err = cur.UpdateNodeClusterNetworking(ctx, &ipb.UpdateNodeClusterNetworkingRequest{
Clusternet: &cpb.NodeClusterNetworking{
WireguardPubkey: "+nb5grgIKQEbHm5JrUZovPQ9Bv04jR2TtY6sgS0dGG4=",
}})
if err == nil || !strings.Contains(err.Error(), "public key alread used by another node") {
t.Errorf("Adding same pubkey to different node should have failed, got %v", err)
}
}