blob: fd8602d967c645542694a2e11ae7fc8be0ae874a [file] [log] [blame]
Serge Bazanski99f47742021-08-04 20:21:42 +02001package curator
2
3import (
Serge Bazanski2893e982021-09-09 13:06:16 +02004 "context"
Serge Bazanski2893e982021-09-09 13:06:16 +02005
6 "go.etcd.io/etcd/clientv3"
Serge Bazanski99f47742021-08-04 20:21:42 +02007 "google.golang.org/grpc/codes"
8 "google.golang.org/grpc/status"
Serge Bazanski2893e982021-09-09 13:06:16 +02009 "google.golang.org/protobuf/proto"
Serge Bazanski99f47742021-08-04 20:21:42 +020010
11 cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Serge Bazanski2893e982021-09-09 13:06:16 +020012 "source.monogon.dev/metropolis/node/core/identity"
13 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski99f47742021-08-04 20:21:42 +020014 "source.monogon.dev/metropolis/pkg/event/etcd"
15)
16
17// leaderCurator implements the Curator gRPC API (cpb.Curator) as a curator
18// leader.
19type leaderCurator struct {
Serge Bazanski3be48322021-10-05 17:24:26 +020020 *leadership
Serge Bazanski99f47742021-08-04 20:21:42 +020021}
22
23// Watch returns a stream of updates concerning some part of the cluster
24// managed by the curator.
25//
26// See metropolis.node.core.curator.proto.api.Curator for more information.
27func (l *leaderCurator) Watch(req *cpb.WatchRequest, srv cpb.Curator_WatchServer) error {
28 nic, ok := req.Kind.(*cpb.WatchRequest_NodeInCluster_)
29 if !ok {
30 return status.Error(codes.Unimplemented, "unsupported watch kind")
31 }
32 nodeID := nic.NodeInCluster.NodeId
33 // Constructing arbitrary etcd path: this is okay, as we only have node objects
34 // underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
35 // that doesn't exist, and that will just hang . All access is privileged, so
36 // there's also no need to filter anything.
37 // TODO(q3k): formalize and strongly type etcd paths for cluster state?
38 // Probably worth waiting for type parameters before attempting to do that.
Serge Bazanski080f7ff2021-09-09 13:01:00 +020039 nodePath, err := nodeEtcdPrefix.Key(nodeID)
40 if err != nil {
41 return status.Errorf(codes.InvalidArgument, "invalid node name: %v", err)
42 }
Serge Bazanski99f47742021-08-04 20:21:42 +020043
44 value := etcd.NewValue(l.etcd, nodePath, func(data []byte) (interface{}, error) {
45 return nodeUnmarshal(data)
46 })
47 w := value.Watch()
48 defer w.Close()
49
50 for {
51 v, err := w.Get(srv.Context())
52 if err != nil {
53 if rpcErr, ok := rpcError(err); ok {
54 return rpcErr
55 }
Serge Bazanskibc7614e2021-09-09 13:07:09 +020056 // TODO(q3k): log err
57 return status.Error(codes.Unavailable, "internal error")
Serge Bazanski99f47742021-08-04 20:21:42 +020058 }
59 node := v.(*Node)
60 ev := &cpb.WatchEvent{
61 Nodes: []*cpb.Node{
62 {
Serge Bazanski2893e982021-09-09 13:06:16 +020063 Id: node.ID(),
64 Roles: node.proto().Roles,
65 Status: node.status,
Serge Bazanski99f47742021-08-04 20:21:42 +020066 },
67 },
68 }
69 if err := srv.Send(ev); err != nil {
70 return err
71 }
72 }
73}
Serge Bazanski2893e982021-09-09 13:06:16 +020074
75// UpdateNodeStatus is called by nodes in the cluster to report their own
76// status. This status is recorded by the curator and can be retrieed via
77// Watch.
78func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (*cpb.UpdateNodeStatusResponse, error) {
79 // Ensure that the given node_id matches the calling node. We currently
80 // only allow for direct self-reporting of status by nodes.
81 pi := rpc.GetPeerInfo(ctx)
82 if pi == nil || pi.Node == nil {
83 return nil, status.Error(codes.PermissionDenied, "only nodes can update node status")
84 }
85 id := identity.NodeID(pi.Node.PublicKey)
86 if id != req.NodeId {
87 return nil, status.Errorf(codes.PermissionDenied, "node %q cannot update the status of node %q", id, req.NodeId)
88 }
89
90 // Verify sent status. Currently we assume the entire status must be set at
91 // once, and cannot be unset.
92 if req.Status == nil || req.Status.ExternalAddress == "" {
93 return nil, status.Errorf(codes.InvalidArgument, "Status and Status.ExternalAddress must be set")
94 }
95
96 // As we're performing a node update with two etcd transactions below (one
97 // to retrieve, one to save and upate node), take a local lock to ensure
98 // that we don't have a race between either two UpdateNodeStatus calls or
99 // an UpdateNodeStatus call and some other mutation to the node store.
100 l.muNodes.Lock()
101 defer l.muNodes.Unlock()
102
103 // Retrieve node ...
104 key, err := nodeEtcdPrefix.Key(id)
105 if err != nil {
106 return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
107 }
108 res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
109 if err != nil {
110 if rpcErr, ok := rpcError(err); ok {
111 return nil, rpcErr
112 }
113 return nil, status.Errorf(codes.Unavailable, "could not retrieve node: %v", err)
114 }
115 kvs := res.Responses[0].GetResponseRange().Kvs
116 if len(kvs) < 1 {
117 return nil, status.Error(codes.NotFound, "no such node")
118 }
119 node, err := nodeUnmarshal(kvs[0].Value)
120 if err != nil {
121 return nil, status.Errorf(codes.Unavailable, "failed to unmarshal node: %v", err)
122 }
123 // ... update its' status ...
124 node.status = req.Status
125 // ... and save it to etcd.
126 bytes, err := proto.Marshal(node.proto())
127 if err != nil {
128 return nil, status.Errorf(codes.Unavailable, "failed to marshal node: %v", err)
129 }
130 _, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(bytes)))
131 if err != nil {
132 if rpcErr, ok := rpcError(err); ok {
133 return nil, rpcErr
134 }
135 return nil, status.Errorf(codes.Unavailable, "could not update node: %v", err)
136 }
137
138 return &cpb.UpdateNodeStatusResponse{}, nil
139}