blob: a19fa14ade94cb48375146574ff862d196491a7d [file] [log] [blame]
package consensus
import (
"context"
"crypto/ed25519"
"crypto/x509"
"fmt"
"net"
"strconv"
clientv3 "go.etcd.io/etcd/client/v3"
"source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/pki"
)
// ServiceHandle is implemented by Service and should be the type expected by
// other code which relies on a Consensus instance. Ie., it's the downstream API
// for a Consensus Service.
type ServiceHandle interface {
// Watch returns a Event Value compatible Watcher for accessing the State of the
// consensus Service in a safe manner.
Watch() Watcher
}
// Watch returns a Event Value compatible Watcher for accessing the State of the
// consensus Service in a safe manner.
func (s *Service) Watch() Watcher {
return Watcher{s.value.Watch()}
}
type Watcher struct {
event.Watcher
}
func (w *Watcher) Get(ctx context.Context, opts ...event.GetOption) (*Status, error) {
v, err := w.Watcher.Get(ctx, opts...)
if err != nil {
return nil, err
}
return v.(*Status), nil
}
func (w *Watcher) GetRunning(ctx context.Context) (*Status, error) {
for {
st, err := w.Get(ctx)
if err != nil {
return nil, err
}
if st.Running() {
return st, nil
}
}
}
// Status of the consensus service. It represents either a running consensus
// service to which a client can connect and on which management can be
// performed, or a stopped service.
type Status struct {
// localPeerURL and localMemberID are the expected public URL and etcd member ID
// of the etcd server wrapped by this consensus instance. If set, a sub-runnable
// of the consensus will ensure that the given memberID always has localPeerURL
// set as its peer URL.
//
// These will not be set when the Status has been generated by a
// testServiceHandle.
localPeerURL string
localMemberID uint64
// cl is the root etcd client to the underlying cluster.
cl *clientv3.Client
// ca is the PKI CA used to authenticate etcd members.
ca *pki.Certificate
// stopped is set to true if the underlying service has been stopped or hasn't
// yet been started.
stopped bool
// noClusterMemberManagement disables etcd cluster member management in
// UpdateNodeRoles. This is currently necessary in order to test the call,
// due to limitations of the test harness.
noClusterMemberManagement bool
}
// Running returns true if this status represents a running consensus service
// which can be connected to or managed. These calls are not guaranteed to
// succeed (as the server might have stopped in the meantime), but the caller
// can use this value as a hint to whether attempts to access the consensus
// service should be done.
func (s *Status) Running() bool {
return !s.stopped
}
func (s *Status) pkiClient() (client.Namespaced, error) {
return clientFor(s.cl, "namespaced", "etcd-pki")
}
// CuratorClient returns a namespaced etcd client for use by the Curator.
func (s *Status) CuratorClient() (client.Namespaced, error) {
return clientFor(s.cl, "namespaced", "curator")
}
// KubernetesClient returns a namespaced etcd client for use by Kubernetes.
func (s *Status) KubernetesClient() (client.Namespaced, error) {
return clientFor(s.cl, "namespaced", "kubernetes")
}
// ClusterClient returns an etcd management API client, for use by downstream
// clients that wish to perform maintenance operations on the etcd cluster (eg.
// list/modify nodes, promote learners, ...).
func (s *Status) ClusterClient() clientv3.Cluster {
return s.cl
}
// AddNode creates a new consensus member corresponding to a given Ed25519 node
// public key if one does not yet exist. The member will at first be marked as a
// Learner, ensuring it does not take part in quorum until it has finished
// catching up to the state of the etcd store. As it does, the autopromoter will
// turn it into a 'full' node and it will start taking part in the quorum and be
// able to perform all etcd operations.
func (s *Status) AddNode(ctx context.Context, pk ed25519.PublicKey, opts ...*AddNodeOption) (*JoinCluster, error) {
clPKI, err := s.pkiClient()
if err != nil {
return nil, err
}
nodeID := identity.NodeID(pk)
var extraNames []string
name := nodeID
port := int(node.ConsensusPort)
for _, opt := range opts {
if opt.externalAddress != "" {
name = opt.externalAddress
extraNames = append(extraNames, name)
}
if opt.externalPort != 0 {
port = opt.externalPort
}
}
member := pki.Certificate{
Name: nodeID,
Namespace: &pkiNamespace,
Issuer: s.ca,
Template: pkiPeerCertificate(pk, extraNames),
Mode: pki.CertificateExternal,
PublicKey: pk,
}
caBytes, err := s.ca.Ensure(ctx, clPKI)
if err != nil {
return nil, fmt.Errorf("could not ensure CA certificate: %w", err)
}
memberBytes, err := member.Ensure(ctx, clPKI)
if err != nil {
return nil, fmt.Errorf("could not ensure member certificate: %w", err)
}
caCert, err := x509.ParseCertificate(caBytes)
if err != nil {
return nil, fmt.Errorf("could not parse CA certificate: %w", err)
}
memberCert, err := x509.ParseCertificate(memberBytes)
if err != nil {
return nil, fmt.Errorf("could not parse newly issued member certificate: %w", err)
}
members, err := s.cl.MemberList(ctx)
if err != nil {
return nil, fmt.Errorf("could not retrieve existing members: %w", err)
}
var existingNodes []ExistingNode
var newExists bool
for _, m := range members.Members {
if m.Name == nodeID {
newExists = true
}
if m.IsLearner {
continue
}
if len(m.PeerURLs) < 1 {
continue
}
existingNodes = append(existingNodes, ExistingNode{
Name: m.Name,
URL: m.PeerURLs[0],
})
}
crlW := s.ca.WatchCRL(clPKI)
crl, err := crlW.Get(ctx)
if err != nil {
return nil, fmt.Errorf("could not retrieve initial CRL: %w", err)
}
if !newExists && !s.noClusterMemberManagement {
addr := fmt.Sprintf("https://%s", net.JoinHostPort(name, strconv.Itoa(port)))
if _, err := s.cl.MemberAddAsLearner(ctx, []string{addr}); err != nil {
return nil, fmt.Errorf("could not add new member as learner: %w", err)
}
}
return &JoinCluster{
CACertificate: caCert,
NodeCertificate: memberCert,
ExistingNodes: existingNodes,
InitialCRL: crl,
}, nil
}
// AddNodeOptions can be passed to AddNode to influence the behaviour of the
// function. Currently this is only used internally by tests.
type AddNodeOption struct {
externalAddress string
externalPort int
}