| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 1 | package curator |
| 2 | |
| 3 | import ( |
| Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 4 | "context" |
| Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 5 | |
| 6 | "go.etcd.io/etcd/clientv3" |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 7 | "google.golang.org/grpc/codes" |
| 8 | "google.golang.org/grpc/status" |
| Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 9 | "google.golang.org/protobuf/proto" |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 10 | |
| 11 | cpb "source.monogon.dev/metropolis/node/core/curator/proto/api" |
| Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 12 | "source.monogon.dev/metropolis/node/core/identity" |
| 13 | "source.monogon.dev/metropolis/node/core/rpc" |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 14 | "source.monogon.dev/metropolis/pkg/event/etcd" |
| 15 | ) |
| 16 | |
| 17 | // leaderCurator implements the Curator gRPC API (cpb.Curator) as a curator |
| 18 | // leader. |
| 19 | type leaderCurator struct { |
| Serge Bazanski | 3be4832 | 2021-10-05 17:24:26 +0200 | [diff] [blame] | 20 | *leadership |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 21 | } |
| 22 | |
| 23 | // Watch returns a stream of updates concerning some part of the cluster |
| 24 | // managed by the curator. |
| 25 | // |
| 26 | // See metropolis.node.core.curator.proto.api.Curator for more information. |
| 27 | func (l *leaderCurator) Watch(req *cpb.WatchRequest, srv cpb.Curator_WatchServer) error { |
| 28 | nic, ok := req.Kind.(*cpb.WatchRequest_NodeInCluster_) |
| 29 | if !ok { |
| 30 | return status.Error(codes.Unimplemented, "unsupported watch kind") |
| 31 | } |
| 32 | nodeID := nic.NodeInCluster.NodeId |
| 33 | // Constructing arbitrary etcd path: this is okay, as we only have node objects |
| 34 | // underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node |
| 35 | // that doesn't exist, and that will just hang . All access is privileged, so |
| 36 | // there's also no need to filter anything. |
| 37 | // TODO(q3k): formalize and strongly type etcd paths for cluster state? |
| 38 | // Probably worth waiting for type parameters before attempting to do that. |
| Serge Bazanski | 080f7ff | 2021-09-09 13:01:00 +0200 | [diff] [blame] | 39 | nodePath, err := nodeEtcdPrefix.Key(nodeID) |
| 40 | if err != nil { |
| 41 | return status.Errorf(codes.InvalidArgument, "invalid node name: %v", err) |
| 42 | } |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 43 | |
| 44 | value := etcd.NewValue(l.etcd, nodePath, func(data []byte) (interface{}, error) { |
| 45 | return nodeUnmarshal(data) |
| 46 | }) |
| 47 | w := value.Watch() |
| 48 | defer w.Close() |
| 49 | |
| 50 | for { |
| 51 | v, err := w.Get(srv.Context()) |
| 52 | if err != nil { |
| 53 | if rpcErr, ok := rpcError(err); ok { |
| 54 | return rpcErr |
| 55 | } |
| Serge Bazanski | bc7614e | 2021-09-09 13:07:09 +0200 | [diff] [blame] | 56 | // TODO(q3k): log err |
| 57 | return status.Error(codes.Unavailable, "internal error") |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 58 | } |
| 59 | node := v.(*Node) |
| 60 | ev := &cpb.WatchEvent{ |
| 61 | Nodes: []*cpb.Node{ |
| 62 | { |
| Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 63 | Id: node.ID(), |
| 64 | Roles: node.proto().Roles, |
| 65 | Status: node.status, |
| Serge Bazanski | 99f4774 | 2021-08-04 20:21:42 +0200 | [diff] [blame] | 66 | }, |
| 67 | }, |
| 68 | } |
| 69 | if err := srv.Send(ev); err != nil { |
| 70 | return err |
| 71 | } |
| 72 | } |
| 73 | } |
| Serge Bazanski | 2893e98 | 2021-09-09 13:06:16 +0200 | [diff] [blame] | 74 | |
| 75 | // UpdateNodeStatus is called by nodes in the cluster to report their own |
| 76 | // status. This status is recorded by the curator and can be retrieed via |
| 77 | // Watch. |
| 78 | func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (*cpb.UpdateNodeStatusResponse, error) { |
| 79 | // Ensure that the given node_id matches the calling node. We currently |
| 80 | // only allow for direct self-reporting of status by nodes. |
| 81 | pi := rpc.GetPeerInfo(ctx) |
| 82 | if pi == nil || pi.Node == nil { |
| 83 | return nil, status.Error(codes.PermissionDenied, "only nodes can update node status") |
| 84 | } |
| 85 | id := identity.NodeID(pi.Node.PublicKey) |
| 86 | if id != req.NodeId { |
| 87 | return nil, status.Errorf(codes.PermissionDenied, "node %q cannot update the status of node %q", id, req.NodeId) |
| 88 | } |
| 89 | |
| 90 | // Verify sent status. Currently we assume the entire status must be set at |
| 91 | // once, and cannot be unset. |
| 92 | if req.Status == nil || req.Status.ExternalAddress == "" { |
| 93 | return nil, status.Errorf(codes.InvalidArgument, "Status and Status.ExternalAddress must be set") |
| 94 | } |
| 95 | |
| 96 | // As we're performing a node update with two etcd transactions below (one |
| 97 | // to retrieve, one to save and upate node), take a local lock to ensure |
| 98 | // that we don't have a race between either two UpdateNodeStatus calls or |
| 99 | // an UpdateNodeStatus call and some other mutation to the node store. |
| 100 | l.muNodes.Lock() |
| 101 | defer l.muNodes.Unlock() |
| 102 | |
| 103 | // Retrieve node ... |
| 104 | key, err := nodeEtcdPrefix.Key(id) |
| 105 | if err != nil { |
| 106 | return nil, status.Errorf(codes.InvalidArgument, "invalid node id") |
| 107 | } |
| 108 | res, err := l.txnAsLeader(ctx, clientv3.OpGet(key)) |
| 109 | if err != nil { |
| 110 | if rpcErr, ok := rpcError(err); ok { |
| 111 | return nil, rpcErr |
| 112 | } |
| 113 | return nil, status.Errorf(codes.Unavailable, "could not retrieve node: %v", err) |
| 114 | } |
| 115 | kvs := res.Responses[0].GetResponseRange().Kvs |
| 116 | if len(kvs) < 1 { |
| 117 | return nil, status.Error(codes.NotFound, "no such node") |
| 118 | } |
| 119 | node, err := nodeUnmarshal(kvs[0].Value) |
| 120 | if err != nil { |
| 121 | return nil, status.Errorf(codes.Unavailable, "failed to unmarshal node: %v", err) |
| 122 | } |
| 123 | // ... update its' status ... |
| 124 | node.status = req.Status |
| 125 | // ... and save it to etcd. |
| 126 | bytes, err := proto.Marshal(node.proto()) |
| 127 | if err != nil { |
| 128 | return nil, status.Errorf(codes.Unavailable, "failed to marshal node: %v", err) |
| 129 | } |
| 130 | _, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(bytes))) |
| 131 | if err != nil { |
| 132 | if rpcErr, ok := rpcError(err); ok { |
| 133 | return nil, rpcErr |
| 134 | } |
| 135 | return nil, status.Errorf(codes.Unavailable, "could not update node: %v", err) |
| 136 | } |
| 137 | |
| 138 | return &cpb.UpdateNodeStatusResponse{}, nil |
| 139 | } |