core/internal/consensus: refactor
This refactors the consensus to:
- use localstorage
- use the supervisor system
- have a significantly simpler API for callers (no more
PrecreateCertificate, etc.)
- use a watcher for CRLs
- actually have all bootstrap paths tested
- keep the CA key in memory (keeping it in etcd only seems like odd
threat modelling and can posisbly cause issues on quorum losses)
This breaks the build, as is part of a multi-revision refactor of the
core node service code.
Test Plan: adds tests \o/
X-Origin-Diff: phab/D579
GitOrigin-RevId: fadee7785028ef806d8243a770c70cb0fb82c20e
diff --git a/core/internal/consensus/consensus.go b/core/internal/consensus/consensus.go
index 5885aa8..94d84b2 100644
--- a/core/internal/consensus/consensus.go
+++ b/core/internal/consensus/consensus.go
@@ -14,43 +14,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// package consensus manages the embedded etcd cluster.
+// Package consensus implements a managed etcd cluster member service, with a self-hosted CA system for issuing peer
+// certificates. Currently each Smalltown node runs an etcd member, and connects to the etcd member locally over a unix
+// domain socket.
+//
+// The service supports two modes of startup:
+// - initializing a new cluster, by bootstrapping the CA in memory, starting a cluster, committing the CA to etcd
+// afterwards, and saving the new node's certificate to local storage
+// - joining an existing cluster, using certificates from local storage and loading the CA from etcd. This flow is also
+// used when the node joins a cluster for the first time (then the certificates required must be provisioned
+// externally before starting the consensus service).
+//
+// Regardless of how the etcd member service was started, the resulting running service is further managed and used
+// in the same way.
+//
package consensus
import (
- "bytes"
"context"
- "crypto/x509"
- "encoding/binary"
- "encoding/hex"
"encoding/pem"
"fmt"
- "io/ioutil"
- "math/rand"
"net"
"net/url"
- "os"
- "path"
- "path/filepath"
- "strings"
+ "sync"
"time"
- "github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/namespace"
"go.etcd.io/etcd/embed"
- "go.etcd.io/etcd/etcdserver/api/membership"
- "go.etcd.io/etcd/pkg/types"
- "go.etcd.io/etcd/proxy/grpcproxy/adapter"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
- "golang.org/x/sys/unix"
- "git.monogon.dev/source/nexantic.git/core/generated/api"
"git.monogon.dev/source/nexantic.git/core/internal/common"
- "git.monogon.dev/source/nexantic.git/core/internal/common/service"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/internal/consensus/ca"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
)
const (
@@ -58,106 +57,98 @@
DefaultLogger = "zap"
)
-const (
- CAPath = "ca.pem"
- CertPath = "cert.pem"
- KeyPath = "cert-key.pem"
- CRLPath = "ca-crl.der"
- CRLSwapPath = "ca-crl.der.swp"
-)
+// Service is the etcd cluster member service.
+type Service struct {
+ // The configuration with which the service was started. This is immutable.
+ config *Config
-const (
- LocalListenerURL = "unix:///consensus/listener.sock:0"
-)
+ // stateMu guards state. This is locked internally on public methods of Service that require access to state. The
+ // state might be recreated on service restart.
+ stateMu sync.Mutex
+ state *state
+}
-type (
- Service struct {
- *service.BaseService
+// state is the runtime state of a running etcd member.
+type state struct {
+ etcd *embed.Etcd
+ ready atomic.Bool
- etcd *embed.Etcd
- kv clientv3.KV
- ready atomic.Bool
+ ca *ca.CA
+ // cl is an etcd client that loops back to the localy running etcd server. This runs over the Client unix domain
+ // socket that etcd starts.
+ cl *clientv3.Client
+}
- // bootstrapCA and bootstrapCert cache the etcd cluster CA data during bootstrap.
- bootstrapCA *ca.CA
- bootstrapCert []byte
+type Config struct {
+ // Data directory (persistent, encrypted storage) for etcd.
+ Data *localstorage.DataEtcdDirectory
+ // Ephemeral directory for etcd.
+ Ephemeral *localstorage.EphemeralConsensusDirectory
- watchCRLTicker *time.Ticker
- lastCRL []byte
+ // Name is the cluster name. This must be the same amongst all etcd members within one cluster.
+ Name string
+ // NewCluster selects whether the etcd member will start a new cluster and bootstrap a CA and the first member
+ // certificate, or load existing PKI certificates from disk.
+ NewCluster bool
+ // InitialCluster sets the initial cluster peer URLs when NewCluster is set, and is ignored otherwise. Usually this
+ // will be just the new, single server, and more members will be added later.
+ InitialCluster string
+ // ExternalHost is the IP address or hostname at which this cluster member is reachable to other cluster members.
+ ExternalHost string
+ // ListenHost is the IP address or hostname at which this cluster member will listen.
+ ListenHost string
+ // Port is the port at which this cluster member will listen for other members. If zero, defaults to the global
+ // Smalltown setting.
+ Port int
+}
- config *Config
- }
-
- Config struct {
- Name string
- DataDir string
- InitialCluster string
- NewCluster bool
- ExternalHost string
- ListenHost string
- }
-
- Member struct {
- ID uint64
- Name string
- Address string
- Synced bool
- }
-)
-
-func NewConsensusService(config Config, logger *zap.Logger) (*Service, error) {
- consensusServer := &Service{
+func New(config Config) *Service {
+ return &Service{
config: &config,
}
- consensusServer.BaseService = service.NewBaseService("consensus", logger, consensusServer)
-
- return consensusServer, nil
}
-func peerURL(host string) url.URL {
- return url.URL{Scheme: "https", Host: fmt.Sprintf("%s:%d", host, common.ConsensusPort)}
-}
+// configure transforms the service configuration into an embedded etcd configuration. This is pure and side effect
+// free.
+func (s *Service) configure(ctx context.Context) (*embed.Config, error) {
+ if err := s.config.Ephemeral.MkdirAll(0700); err != nil {
+ return nil, fmt.Errorf("failed to create ephemeral directory: %w", err)
+ }
+ if err := s.config.Data.MkdirAll(0700); err != nil {
+ return nil, fmt.Errorf("failed to create data directory: %w", err)
+ }
-func (s *Service) OnStart() error {
- // See: https://godoc.org/github.com/coreos/etcd/embed#Config
-
- if s.config == nil {
- return errors.New("config for consensus is nil")
+ port := s.config.Port
+ if port == 0 {
+ port = common.ConsensusPort
}
cfg := embed.NewConfig()
- cfg.PeerTLSInfo.CertFile = filepath.Join(s.config.DataDir, CertPath)
- cfg.PeerTLSInfo.KeyFile = filepath.Join(s.config.DataDir, KeyPath)
- cfg.PeerTLSInfo.TrustedCAFile = filepath.Join(s.config.DataDir, CAPath)
- cfg.PeerTLSInfo.ClientCertAuth = true
- cfg.PeerTLSInfo.CRLFile = filepath.Join(s.config.DataDir, CRLPath)
-
- lastCRL, err := ioutil.ReadFile(cfg.PeerTLSInfo.CRLFile)
- if err != nil {
- return fmt.Errorf("failed to read etcd CRL: %w", err)
- }
- s.lastCRL = lastCRL
-
- // Expose etcd to local processes
- if err := os.MkdirAll("/consensus", 0700); err != nil {
- return fmt.Errorf("Failed to create consensus runtime state directory: %w", err)
- }
- listenerURL, err := url.Parse(LocalListenerURL)
- if err != nil {
- panic(err)
- }
- cfg.LCUrls = []url.URL{*listenerURL}
-
- cfg.APUrls = []url.URL{peerURL(s.config.ExternalHost)}
- cfg.LPUrls = []url.URL{peerURL(s.config.ListenHost)}
- cfg.ACUrls = []url.URL{}
-
- cfg.Dir = s.config.DataDir
- cfg.InitialClusterToken = DefaultClusterToken
cfg.Name = s.config.Name
+ cfg.Dir = s.config.Data.Data.FullPath()
+ cfg.InitialClusterToken = DefaultClusterToken
- // Only relevant if creating or joining a cluster; otherwise settings will be ignored
+ cfg.PeerTLSInfo.CertFile = s.config.Data.PeerPKI.Certificate.FullPath()
+ cfg.PeerTLSInfo.KeyFile = s.config.Data.PeerPKI.Key.FullPath()
+ cfg.PeerTLSInfo.TrustedCAFile = s.config.Data.PeerPKI.CACertificate.FullPath()
+ cfg.PeerTLSInfo.ClientCertAuth = true
+ cfg.PeerTLSInfo.CRLFile = s.config.Data.PeerCRL.FullPath()
+
+ cfg.LCUrls = []url.URL{{
+ Scheme: "unix",
+ Path: s.config.Ephemeral.ClientSocket.FullPath() + ":0",
+ }}
+ cfg.ACUrls = []url.URL{}
+ cfg.LPUrls = []url.URL{{
+ Scheme: "https",
+ Host: fmt.Sprintf("%s:%d", s.config.ListenHost, port),
+ }}
+ cfg.APUrls = []url.URL{{
+ Scheme: "https",
+ Host: fmt.Sprintf("%s:%d", s.config.ExternalHost, port),
+ }}
+
if s.config.NewCluster {
cfg.ClusterState = "new"
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
@@ -166,339 +157,279 @@
cfg.InitialCluster = s.config.InitialCluster
}
+ logger := supervisor.Logger(ctx)
cfg.Logger = DefaultLogger
cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(
- s.Logger.With(zap.String("component", "etcd")).WithOptions(zap.IncreaseLevel(zapcore.WarnLevel)),
- s.Logger.Core(),
+ logger.With(zap.String("component", "etcd")).WithOptions(zap.IncreaseLevel(zapcore.WarnLevel)),
+ logger.Core(),
nil,
)
+ return cfg, nil
+}
+
+// Run is a Supervisor runnable that starts the etcd member service. It will become healthy once the member joins the
+// cluster successfully.
+func (s *Service) Run(ctx context.Context) error {
+ st := &state{
+ ready: *atomic.NewBool(false),
+ }
+ s.stateMu.Lock()
+ s.state = st
+ s.stateMu.Unlock()
+
+ if s.config.NewCluster {
+ // Expect certificate to be absent from disk.
+ absent, err := s.config.Data.PeerPKI.AllAbsent()
+ if err != nil {
+ return fmt.Errorf("checking certificate existence: %w", err)
+ }
+ if !absent {
+ return fmt.Errorf("want new cluster, but certificates already exist on disk")
+ }
+
+ // Generate CA, keep in memory, write it down in etcd later.
+ st.ca, err = ca.New("Smalltown etcd peer Root CA")
+ if err != nil {
+ return fmt.Errorf("when creating new cluster's peer CA: %w", err)
+ }
+
+ ip := net.ParseIP(s.config.ExternalHost)
+ if ip == nil {
+ return fmt.Errorf("configued external host is not an IP address (got %q)", s.config.ExternalHost)
+ }
+
+ cert, key, err := st.ca.Issue(ctx, nil, s.config.Name, ip)
+ if err != nil {
+ return fmt.Errorf("when issuing new cluster's first certificate: %w", err)
+ }
+
+ if err := s.config.Data.PeerPKI.MkdirAll(0600); err != nil {
+ return fmt.Errorf("when creating PKI directory: %w", err)
+ }
+ if err := s.config.Data.PeerPKI.CACertificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: st.ca.CACertRaw}), 0600); err != nil {
+ return fmt.Errorf("when writing CA certificate to disk: %w", err)
+ }
+ if err := s.config.Data.PeerPKI.Certificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}), 0600); err != nil {
+ return fmt.Errorf("when writing certificate to disk: %w", err)
+ }
+ if err := s.config.Data.PeerPKI.Key.Write(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key}), 0600); err != nil {
+ return fmt.Errorf("when writing certificate to disk: %w", err)
+ }
+ } else {
+ // Expect certificate to be present on disk.
+ present, err := s.config.Data.PeerPKI.AllExist()
+ if err != nil {
+ return fmt.Errorf("checking certificate existence: %w", err)
+ }
+ if !present {
+ return fmt.Errorf("want existing cluster, but certificate is missing from disk")
+ }
+ }
+
+ if err := s.config.Data.MkdirAll(0600); err != nil {
+ return fmt.Errorf("failed to create data directory; %w", err)
+ }
+
+ cfg, err := s.configure(ctx)
+ if err != nil {
+ return fmt.Errorf("when configuring etcd: %w", err)
+ }
+
server, err := embed.StartEtcd(cfg)
- if err != nil {
- return err
- }
- s.etcd = server
-
- // Override the logger
- //*server.GetLogger() = *s.Logger.With(zap.String("component", "etcd"))
- // TODO(leo): can we uncomment this?
-
- go func() {
- s.Logger.Info("waiting for etcd to become ready")
- <-s.etcd.Server.ReadyNotify()
- s.ready.Store(true)
- s.Logger.Info("etcd is now ready")
+ keep := false
+ defer func() {
+ if !keep && server != nil {
+ server.Close()
+ }
}()
-
- // Inject kv client
- s.kv = clientv3.NewKVFromKVClient(adapter.KvServerToKvClient(s.etcd.Server), nil)
-
- // Start CRL watcher
- go s.watchCRL()
- ctx := context.TODO()
- go s.autoPromote(ctx)
-
- return nil
-}
-
-// WriteCertificateFiles writes the given node certificate data to local storage
-// such that it can be used by the embedded etcd server.
-// Unfortunately, we cannot pass the certificates directly to etcd.
-func (s *Service) WriteCertificateFiles(certs *api.ConsensusCertificates) error {
- if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CRLPath), certs.Crl, 0600); err != nil {
- return err
- }
- if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CertPath),
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs.Cert}), 0600); err != nil {
- return err
- }
- if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, KeyPath),
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: certs.Key}), 0600); err != nil {
- return err
- }
- if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CAPath),
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs.Ca}), 0600); err != nil {
- return err
- }
- return nil
-}
-
-// PrecreateCA generates the etcd cluster certificate authority and writes it to local storage.
-func (s *Service) PrecreateCA(extIP net.IP) error {
- // Provision an etcd CA
- etcdRootCA, err := ca.New("Smalltown etcd Root CA")
if err != nil {
- return err
+ return fmt.Errorf("failed to start etcd: %w", err)
}
- cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost, extIP)
- if err != nil {
- return fmt.Errorf("failed to self-issue a certificate: %w", err)
- }
- if err := os.MkdirAll(s.config.DataDir, 0700); err != nil {
- return fmt.Errorf("failed to create consensus data dir: %w", err)
- }
- // Preserve certificate for later injection
- s.bootstrapCert = cert
- if err := s.WriteCertificateFiles(&api.ConsensusCertificates{
- Ca: etcdRootCA.CACertRaw,
- Crl: etcdRootCA.CRLRaw,
- Cert: cert,
- Key: privkey,
- }); err != nil {
- return fmt.Errorf("failed to setup certificates: %w", err)
- }
- s.bootstrapCA = etcdRootCA
- return nil
-}
+ st.etcd = server
-const (
- caPathEtcd = "/etcd-ca/ca.der"
- caKeyPathEtcd = "/etcd-ca/ca-key.der"
- crlPathEtcd = "/etcd-ca/crl.der"
+ supervisor.Logger(ctx).Info("waiting for etcd...")
- // This prefix stores the individual certs the etcd CA has issued.
- certPrefixEtcd = "/etcd-ca/certs"
-)
-
-// InjectCA copies the CA from data cached during PrecreateCA to etcd.
-// Requires a previous call to PrecreateCA.
-func (s *Service) InjectCA() error {
- if s.bootstrapCA == nil || s.bootstrapCert == nil {
- panic("bootstrapCA or bootstrapCert are nil - missing PrecreateCA call?")
- }
- if _, err := s.kv.Put(context.Background(), caPathEtcd, string(s.bootstrapCA.CACertRaw)); err != nil {
- return err
- }
- // TODO(lorenz): Should be wrapped by the master key
- if _, err := s.kv.Put(context.Background(), caKeyPathEtcd, string([]byte(*s.bootstrapCA.PrivateKey))); err != nil {
- return err
- }
- if _, err := s.kv.Put(context.Background(), crlPathEtcd, string(s.bootstrapCA.CRLRaw)); err != nil {
- return err
- }
- certVal, err := x509.ParseCertificate(s.bootstrapCert)
- if err != nil {
- return err
- }
- serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
- if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(s.bootstrapCert)); err != nil {
- return fmt.Errorf("failed to persist certificate: %w", err)
- }
- // Clear out bootstrap CA after injecting
- s.bootstrapCA = nil
- s.bootstrapCert = []byte{}
- return nil
-}
-
-func (s *Service) etcdGetSingle(path string) ([]byte, int64, error) {
- res, err := s.kv.Get(context.Background(), path)
- if err != nil {
- return nil, -1, fmt.Errorf("failed to get key from etcd: %w", err)
- }
- if len(res.Kvs) != 1 {
- return nil, -1, errors.New("key not available or multiple keys returned")
- }
- return res.Kvs[0].Value, res.Kvs[0].ModRevision, nil
-}
-
-func (s *Service) getCAFromEtcd() (*ca.CA, int64, error) {
- // TODO: Technically this could be done in a single request, but it's more logic
- caCert, _, err := s.etcdGetSingle(caPathEtcd)
- if err != nil {
- return nil, -1, fmt.Errorf("failed to get CA certificate from etcd: %w", err)
- }
- caKey, _, err := s.etcdGetSingle(caKeyPathEtcd)
- if err != nil {
- return nil, -1, fmt.Errorf("failed to get CA key from etcd: %w", err)
- }
- // TODO: Unwrap CA key once wrapping is implemented
- crl, crlRevision, err := s.etcdGetSingle(crlPathEtcd)
- if err != nil {
- return nil, -1, fmt.Errorf("failed to get CRL from etcd: %w", err)
- }
- idCA, err := ca.FromCertificates(caCert, caKey, crl)
- if err != nil {
- return nil, -1, fmt.Errorf("failed to take CA online: %w", err)
- }
- return idCA, crlRevision, nil
-}
-
-// ProvisionMember sets up and returns provisioning data to join another node into the consensus.
-// It issues PKI material, creates a static cluster bootstrap specification string (known as initial-cluster in etcd)
-// and adds the new node as a learner (non-voting) member to the cluster. Once the new node has caught up with the
-// cluster it is automatically promoted to a voting member by the autoPromote process.
-func (s *Service) ProvisionMember(name string, ip net.IP) (*api.ConsensusCertificates, string, error) {
- idCA, _, err := s.getCAFromEtcd()
- if err != nil {
- return nil, "", fmt.Errorf("failed to get consensus CA: %w", err)
- }
- cert, key, err := idCA.IssueCertificate(name, ip)
- if err != nil {
- return nil, "", fmt.Errorf("failed to issue certificate: %w", err)
- }
- certVal, err := x509.ParseCertificate(cert)
- if err != nil {
- return nil, "", fmt.Errorf("failed to parse just-issued consensus cert: %w", err)
- }
- serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
- if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(cert)); err != nil {
- // We issued a certificate, but failed to persist it. Return an error and forget it ever happened.
- return nil, "", fmt.Errorf("failed to persist certificate: %w", err)
+ okay := true
+ select {
+ case <-st.etcd.Server.ReadyNotify():
+ case <-ctx.Done():
+ okay = false
}
- currentMembers := s.etcd.Server.Cluster().Members()
- var memberStrs []string
- for _, member := range currentMembers {
- memberStrs = append(memberStrs, fmt.Sprintf("%v=%v", member.Name, member.PickPeerURL()))
- }
- apURL := peerURL(ip.String())
- memberStrs = append(memberStrs, fmt.Sprintf("%s=%s", name, apURL.String()))
-
- pubKeyPrefix, err := common.IDKeyPrefixFromName(name)
- if err != nil {
- return nil, "", fmt.Errorf("invalid new node name: %v", err)
+ if !okay {
+ supervisor.Logger(ctx).Info("context done, aborting wait")
+ return ctx.Err()
}
- crl, _, err := s.etcdGetSingle(crlPathEtcd)
-
- _, err = s.etcd.Server.AddMember(context.Background(), membership.Member{
- RaftAttributes: membership.RaftAttributes{
- PeerURLs: types.URLs{apURL}.StringSlice(),
- IsLearner: true,
- },
- Attributes: membership.Attributes{Name: name},
- ID: types.ID(binary.BigEndian.Uint64(pubKeyPrefix[:8])),
+ socket := s.config.Ephemeral.ClientSocket.FullPath()
+ cl, err := clientv3.New(clientv3.Config{
+ Endpoints: []string{fmt.Sprintf("unix://%s:0", socket)},
+ DialTimeout: time.Second,
})
if err != nil {
- return nil, "", fmt.Errorf("failed to provision member: %w", err)
+ return fmt.Errorf("failed to connect to new etcd instance: %w", err)
}
- return &api.ConsensusCertificates{
- Ca: idCA.CACertRaw,
- Cert: cert,
- Crl: crl,
- Key: key,
- }, strings.Join(memberStrs, ","), nil
+ st.cl = cl
+
+ if s.config.NewCluster {
+ if st.ca == nil {
+ panic("peerCA has not been generated")
+ }
+
+ // Save new CA into etcd.
+ err = st.ca.Save(ctx, cl.KV)
+ if err != nil {
+ return fmt.Errorf("failed to save new CA to etcd: %w", err)
+ }
+ } else {
+ // Load existing CA from etcd.
+ st.ca, err = ca.Load(ctx, cl.KV)
+ if err != nil {
+ return fmt.Errorf("failed to load CA from etcd: %w", err)
+ }
+ }
+
+ // Start CRL watcher.
+ if err := supervisor.Run(ctx, "crl", s.watchCRL); err != nil {
+ return fmt.Errorf("failed to start CRL watcher: %w", err)
+ }
+ // Start autopromoter.
+ if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
+ return fmt.Errorf("failed to start autopromoter: %w", err)
+ }
+
+ supervisor.Logger(ctx).Info("etcd is now ready")
+ keep = true
+ st.ready.Store(true)
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ <-ctx.Done()
+ st.etcd.Close()
+ return ctx.Err()
}
-func (s *Service) RevokeCertificate(hostname string) error {
- rand.Seed(time.Now().UnixNano())
- for {
- idCA, crlRevision, err := s.getCAFromEtcd()
- if err != nil {
- return err
+// watchCRL is a sub-runnable of the etcd cluster member service that updates the on-local-storage CRL to match the
+// newest available version in etcd.
+func (s *Service) watchCRL(ctx context.Context) error {
+ s.stateMu.Lock()
+ cl := s.state.cl
+ ca := s.state.ca
+ s.stateMu.Unlock()
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ for e := range ca.WaitCRLChange(ctx, cl.KV, cl.Watcher) {
+ if e.Err != nil {
+ return fmt.Errorf("watching CRL: %w", e.Err)
}
- allIssuedCerts, err := s.kv.Get(context.Background(), certPrefixEtcd, clientv3.WithPrefix())
- for _, cert := range allIssuedCerts.Kvs {
- certVal, err := x509.ParseCertificate(cert.Value)
- if err != nil {
- s.Logger.Error("Failed to parse previously issued certificate, this is a security risk", zap.Error(err))
+
+ if err := s.config.Data.PeerCRL.Write(e.CRL, 0600); err != nil {
+ return fmt.Errorf("saving CRL: %w", err)
+ }
+ }
+
+ // unreachable
+ return nil
+}
+
+func (s *Service) autopromoter(ctx context.Context) error {
+ t := time.NewTicker(5 * time.Second)
+ defer t.Stop()
+
+ autopromote := func() {
+ s.stateMu.Lock()
+ st := s.state
+ s.stateMu.Unlock()
+
+ if st.etcd.Server.Leader() != st.etcd.Server.ID() {
+ return
+ }
+
+ for _, member := range st.etcd.Server.Cluster().Members() {
+ if !member.IsLearner {
continue
}
- for _, dnsName := range certVal.DNSNames {
- if dnsName == hostname {
- // Revoke this
- if err := idCA.Revoke(certVal.SerialNumber); err != nil {
- // We need to fail if any single revocation fails otherwise outer applications
- // have no chance of calling this safely
- return err
- }
- }
- }
- }
- // TODO(leo): this needs a test
- cmp := clientv3.Compare(clientv3.ModRevision(crlPathEtcd), "=", crlRevision)
- op := clientv3.OpPut(crlPathEtcd, string(idCA.CRLRaw))
- res, err := s.kv.Txn(context.Background()).If(cmp).Then(op).Commit()
- if err != nil {
- return fmt.Errorf("failed to persist new CRL in etcd: %w", err)
- }
- if res.Succeeded { // Transaction has succeeded
- break
- }
- // Sleep a random duration between 0 and 300ms to reduce serialization failures
- time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
- }
- return nil
-}
-func (s *Service) watchCRL() {
- // TODO(lorenz): Change etcd client to WatchableKV and make this an actual watch
- // This needs changes in more places, so leaving it now
- s.watchCRLTicker = time.NewTicker(30 * time.Second)
- for range s.watchCRLTicker.C {
- crl, _, err := s.etcdGetSingle(crlPathEtcd)
- if err != nil {
- s.Logger.Warn("Failed to check for new CRL", zap.Error(err))
- continue
- }
- // This is cryptographic material but not secret, so no constant time compare necessary here
- if !bytes.Equal(crl, s.lastCRL) {
- if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CRLSwapPath), crl, 0600); err != nil {
- s.Logger.Warn("Failed to write updated CRL", zap.Error(err))
- }
- // This uses unix.Rename to guarantee a particular atomic update behavior
- if err := unix.Rename(filepath.Join(s.config.DataDir, CRLSwapPath), filepath.Join(s.config.DataDir, CRLPath)); err != nil {
- s.Logger.Warn("Failed to atomically swap updated CRL", zap.Error(err))
+ // We always call PromoteMember since the metadata necessary to decide if we should is private.
+ // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
+ // connected or are still behind on transactions.
+ if _, err := st.etcd.Server.PromoteMember(ctx, uint64(member.ID)); err != nil {
+ supervisor.Logger(ctx).Info("Failed to promote consensus node", zap.String("node", member.Name), zap.Error(err))
+ } else {
+ supervisor.Logger(ctx).Info("Promoted new consensus node", zap.String("node", member.Name))
}
}
}
-}
-// autoPromote automatically promotes learning (non-voting) members to voting members. etcd currently lacks auto-promote
-// capabilities (https://github.com/etcd-io/etcd/issues/10537) so we need to do this ourselves.
-func (s *Service) autoPromote(ctx context.Context) {
- promoteTicker := time.NewTicker(5 * time.Second)
- go func() {
- <-ctx.Done()
- promoteTicker.Stop()
- }()
- for range promoteTicker.C {
- if s.etcd.Server.Leader() != s.etcd.Server.ID() {
- continue
- }
- for _, member := range s.etcd.Server.Cluster().Members() {
- if member.IsLearner {
- // We always call PromoteMember since the metadata necessary to decide if we should is private.
- // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
- // connected or are still behind on transactions.
- if _, err := s.etcd.Server.PromoteMember(context.Background(), uint64(member.ID)); err != nil {
- s.Logger.Info("Failed to promote consensus node", zap.String("node", member.Name), zap.Error(err))
- }
- s.Logger.Info("Promoted new consensus node", zap.String("node", member.Name))
- }
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-t.C:
+ autopromote()
}
}
}
-func (s *Service) OnStop() error {
- s.watchCRLTicker.Stop()
- s.etcd.Close()
-
- return nil
-}
-
-// IsProvisioned returns whether the node has been setup before and etcd has a data directory
-func (s *Service) IsProvisioned() bool {
- _, err := os.Stat(s.config.DataDir)
-
- return !os.IsNotExist(err)
-}
-
// IsReady returns whether etcd is ready and synced
func (s *Service) IsReady() bool {
- return s.ready.Load()
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ if s.state == nil {
+ return false
+ }
+ return s.state.ready.Load()
}
-// GetConfig returns the current consensus config
-func (s *Service) GetConfig() Config {
- return *s.config
+func (s *Service) WaitReady(ctx context.Context) error {
+ // TODO(q3k): reimplement the atomic ready flag as an event synchronization mechanism
+ if s.IsReady() {
+ return nil
+ }
+ t := time.NewTicker(100 * time.Millisecond)
+ defer t.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-t.C:
+ if s.IsReady() {
+ return nil
+ }
+ }
+ }
}
-// SetConfig sets the consensus config. Changes are only applied when the service is restarted.
-func (s *Service) SetConfig(config Config) {
- s.config = &config
+// KV returns and etcd KV client interface to the etcd member/cluster.
+func (s *Service) KV(module, space string) clientv3.KV {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
}
-func (s *Service) GetStore(module, space string) clientv3.KV {
- return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
+func (s *Service) KVRoot() clientv3.KV {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return s.state.cl.KV
+}
+
+func (s *Service) Cluster() clientv3.Cluster {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return s.state.cl.Cluster
+}
+
+// MemberInfo returns information about this etcd cluster member: its ID and name. This will block until this
+// information is available (ie. the cluster status is Ready).
+func (s *Service) MemberInfo(ctx context.Context) (id uint64, name string, err error) {
+ if err = s.WaitReady(ctx); err != nil {
+ err = fmt.Errorf("when waiting for cluster readiness: %w", err)
+ return
+ }
+
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ id = uint64(s.state.etcd.Server.ID())
+ name = s.config.Name
+ return
}