blob: 34dd62b6d68a40e834708ad027430e1caff5cbb8 [file] [log] [blame]
package curator
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/pkg/event/etcd"
)
// leaderCurator implements the Curator gRPC API (cpb.Curator) as a curator
// leader.
type leaderCurator struct {
leadership
}
// Watch returns a stream of updates concerning some part of the cluster
// managed by the curator.
//
// See metropolis.node.core.curator.proto.api.Curator for more information.
func (l *leaderCurator) Watch(req *cpb.WatchRequest, srv cpb.Curator_WatchServer) error {
nic, ok := req.Kind.(*cpb.WatchRequest_NodeInCluster_)
if !ok {
return status.Error(codes.Unimplemented, "unsupported watch kind")
}
nodeID := nic.NodeInCluster.NodeId
// Constructing arbitrary etcd path: this is okay, as we only have node objects
// underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
// that doesn't exist, and that will just hang . All access is privileged, so
// there's also no need to filter anything.
// TODO(q3k): formalize and strongly type etcd paths for cluster state?
// Probably worth waiting for type parameters before attempting to do that.
nodePath := nodeEtcdPath(nodeID)
value := etcd.NewValue(l.etcd, nodePath, func(data []byte) (interface{}, error) {
return nodeUnmarshal(data)
})
w := value.Watch()
defer w.Close()
for {
v, err := w.Get(srv.Context())
if err != nil {
if rpcErr, ok := rpcError(err); ok {
return rpcErr
}
}
node := v.(*Node)
ev := &cpb.WatchEvent{
Nodes: []*cpb.Node{
{
Id: node.ID(),
Roles: node.proto().Roles,
},
},
}
if err := srv.Send(ev); err != nil {
return err
}
}
}