blob: 5a15c946f7d8bd78b07e186d228da7e1e994cb1c [file] [log] [blame]
package curator
import (
"context"
"sync"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event/etcd"
)
// leaderCurator implements the Curator gRPC API (cpb.Curator) as a curator
// leader.
type leaderCurator struct {
leadership
// muNodes guards any changes to nodes, and prevents race conditions where the
// curator performs a read-modify-write operation to node data. The curator's
// leadership ensure no two curators run simultaneously, and this lock ensures
// no two parallel curator operations race eachother.
//
// This lock has to be taken any time such RMW operation takes place when not
// additionally guarded using etcd transactions.
muNodes sync.Mutex
}
// Watch returns a stream of updates concerning some part of the cluster
// managed by the curator.
//
// See metropolis.node.core.curator.proto.api.Curator for more information.
func (l *leaderCurator) Watch(req *cpb.WatchRequest, srv cpb.Curator_WatchServer) error {
nic, ok := req.Kind.(*cpb.WatchRequest_NodeInCluster_)
if !ok {
return status.Error(codes.Unimplemented, "unsupported watch kind")
}
nodeID := nic.NodeInCluster.NodeId
// Constructing arbitrary etcd path: this is okay, as we only have node objects
// underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
// that doesn't exist, and that will just hang . All access is privileged, so
// there's also no need to filter anything.
// TODO(q3k): formalize and strongly type etcd paths for cluster state?
// Probably worth waiting for type parameters before attempting to do that.
nodePath, err := nodeEtcdPrefix.Key(nodeID)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid node name: %v", err)
}
value := etcd.NewValue(l.etcd, nodePath, func(data []byte) (interface{}, error) {
return nodeUnmarshal(data)
})
w := value.Watch()
defer w.Close()
for {
v, err := w.Get(srv.Context())
if err != nil {
if rpcErr, ok := rpcError(err); ok {
return rpcErr
}
// TODO(q3k): log err
return status.Error(codes.Unavailable, "internal error")
}
node := v.(*Node)
ev := &cpb.WatchEvent{
Nodes: []*cpb.Node{
{
Id: node.ID(),
Roles: node.proto().Roles,
Status: node.status,
},
},
}
if err := srv.Send(ev); err != nil {
return err
}
}
}
// UpdateNodeStatus is called by nodes in the cluster to report their own
// status. This status is recorded by the curator and can be retrieed via
// Watch.
func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (*cpb.UpdateNodeStatusResponse, error) {
// Ensure that the given node_id matches the calling node. We currently
// only allow for direct self-reporting of status by nodes.
pi := rpc.GetPeerInfo(ctx)
if pi == nil || pi.Node == nil {
return nil, status.Error(codes.PermissionDenied, "only nodes can update node status")
}
id := identity.NodeID(pi.Node.PublicKey)
if id != req.NodeId {
return nil, status.Errorf(codes.PermissionDenied, "node %q cannot update the status of node %q", id, req.NodeId)
}
// Verify sent status. Currently we assume the entire status must be set at
// once, and cannot be unset.
if req.Status == nil || req.Status.ExternalAddress == "" {
return nil, status.Errorf(codes.InvalidArgument, "Status and Status.ExternalAddress must be set")
}
// As we're performing a node update with two etcd transactions below (one
// to retrieve, one to save and upate node), take a local lock to ensure
// that we don't have a race between either two UpdateNodeStatus calls or
// an UpdateNodeStatus call and some other mutation to the node store.
l.muNodes.Lock()
defer l.muNodes.Unlock()
// Retrieve node ...
key, err := nodeEtcdPrefix.Key(id)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
}
res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
if err != nil {
if rpcErr, ok := rpcError(err); ok {
return nil, rpcErr
}
return nil, status.Errorf(codes.Unavailable, "could not retrieve node: %v", err)
}
kvs := res.Responses[0].GetResponseRange().Kvs
if len(kvs) < 1 {
return nil, status.Error(codes.NotFound, "no such node")
}
node, err := nodeUnmarshal(kvs[0].Value)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "failed to unmarshal node: %v", err)
}
// ... update its' status ...
node.status = req.Status
// ... and save it to etcd.
bytes, err := proto.Marshal(node.proto())
if err != nil {
return nil, status.Errorf(codes.Unavailable, "failed to marshal node: %v", err)
}
_, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(bytes)))
if err != nil {
if rpcErr, ok := rpcError(err); ok {
return nil, rpcErr
}
return nil, status.Errorf(codes.Unavailable, "could not update node: %v", err)
}
return &cpb.UpdateNodeStatusResponse{}, nil
}