core/internal/consensus: refactor
This refactors the consensus to:
- use localstorage
- use the supervisor system
- have a significantly simpler API for callers (no more
PrecreateCertificate, etc.)
- use a watcher for CRLs
- actually have all bootstrap paths tested
- keep the CA key in memory (keeping it in etcd only seems like odd
threat modelling and can posisbly cause issues on quorum losses)
This breaks the build, as is part of a multi-revision refactor of the
core node service code.
Test Plan: adds tests \o/
X-Origin-Diff: phab/D579
GitOrigin-RevId: fadee7785028ef806d8243a770c70cb0fb82c20e
diff --git a/core/internal/consensus/BUILD.bazel b/core/internal/consensus/BUILD.bazel
index f0246f7..74b70d9 100644
--- a/core/internal/consensus/BUILD.bazel
+++ b/core/internal/consensus/BUILD.bazel
@@ -1,4 +1,4 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
@@ -6,20 +6,28 @@
importpath = "git.monogon.dev/source/nexantic.git/core/internal/consensus",
visibility = ["//:__subpackages__"],
deps = [
- "//core/api/api:go_default_library",
"//core/internal/common:go_default_library",
- "//core/internal/common/service:go_default_library",
+ "//core/internal/common/supervisor:go_default_library",
"//core/internal/consensus/ca:go_default_library",
- "@com_github_pkg_errors//:go_default_library",
+ "//core/internal/localstorage:go_default_library",
"@io_etcd_go_etcd//clientv3:go_default_library",
"@io_etcd_go_etcd//clientv3/namespace:go_default_library",
"@io_etcd_go_etcd//embed:go_default_library",
- "@io_etcd_go_etcd//etcdserver/api/membership:go_default_library",
- "@io_etcd_go_etcd//pkg/types:go_default_library",
- "@io_etcd_go_etcd//proxy/grpcproxy/adapter:go_default_library",
- "@org_golang_x_sys//unix:go_default_library",
"@org_uber_go_atomic//:go_default_library",
"@org_uber_go_zap//:go_default_library",
"@org_uber_go_zap//zapcore:go_default_library",
],
)
+
+go_test(
+ name = "go_default_test",
+ srcs = ["consensus_test.go"],
+ embed = [":go_default_library"],
+ deps = [
+ "//core/internal/common/supervisor:go_default_library",
+ "//core/internal/localstorage:go_default_library",
+ "//core/internal/localstorage/declarative:go_default_library",
+ "//golibs/common:go_default_library",
+ "@org_uber_go_zap//:go_default_library",
+ ],
+)
diff --git a/core/internal/consensus/ca/BUILD.bazel b/core/internal/consensus/ca/BUILD.bazel
index c048d4c..5f3c006 100644
--- a/core/internal/consensus/ca/BUILD.bazel
+++ b/core/internal/consensus/ca/BUILD.bazel
@@ -5,4 +5,5 @@
srcs = ["ca.go"],
importpath = "git.monogon.dev/source/nexantic.git/core/internal/consensus/ca",
visibility = ["//:__subpackages__"],
+ deps = ["@io_etcd_go_etcd//clientv3:go_default_library"],
)
diff --git a/core/internal/consensus/ca/ca.go b/core/internal/consensus/ca/ca.go
index 20f7c31..9a1b634 100644
--- a/core/internal/consensus/ca/ca.go
+++ b/core/internal/consensus/ca/ca.go
@@ -17,6 +17,9 @@
// package ca implements a simple standards-compliant certificate authority.
// It only supports ed25519 keys, and does not maintain any persistent state.
//
+// The CA is backed by etcd storage, and can also bootstrap itself without a yet running etcd storage (and commit
+// in-memory secrets to etcd at a later date).
+//
// CA and certificates successfully pass https://github.com/zmap/zlint
// (minus the CA/B rules that a public CA would adhere to, which requires
// things like OCSP servers, Certificate Policies and ECDSA/RSA-only keys).
@@ -25,6 +28,7 @@
// TODO(leo): add zlint test
import (
+ "context"
"crypto"
"crypto/ed25519"
"crypto/rand"
@@ -32,13 +36,28 @@
"crypto/x509"
"crypto/x509/pkix"
"encoding/asn1"
+ "encoding/hex"
"errors"
"fmt"
"math/big"
"net"
"time"
+
+ "go.etcd.io/etcd/clientv3"
)
+const (
+ // TODO(q3k): move this to a declarative storage layer
+ pathCACertificate = "/etcd-ca/ca.der"
+ pathCAKey = "/etcd-ca/ca-key.der"
+ pathCACRL = "/etcd-ca/crl.der"
+ pathIssuedCertificates = "/etcd-ca/certs/"
+)
+
+func pathIssuedCertificate(serial *big.Int) string {
+ return pathIssuedCertificates + hex.EncodeToString(serial.Bytes())
+}
+
var (
// From RFC 5280 Section 4.1.2.5
unknownNotAfter = time.Unix(253402300799, 0)
@@ -46,11 +65,15 @@
type CA struct {
// TODO: Potentially protect the key with memguard
- PrivateKey *ed25519.PrivateKey
+ privateKey *ed25519.PrivateKey
CACert *x509.Certificate
CACertRaw []byte
- CRLRaw []byte
- Revoked []pkix.RevokedCertificate
+
+ // bootstrapIssued are certificates that have been issued by the CA before it has been successfully Saved to etcd.
+ bootstrapIssued [][]byte
+ // canBootstrapIssue is set on CAs that have been created by New and not yet stored to etcd. If not set,
+ // certificates cannot be issued in-memory.
+ canBootstrapIssue bool
}
// Workaround for https://github.com/golang/go/issues/26676 in Go's crypto/x509. Specifically Go
@@ -77,7 +100,8 @@
return skid[:], nil
}
-// New creates a new certificate authority with the given common name.
+// New creates a new certificate authority with the given common name. The newly created CA will be stored in memory
+// until committed to etcd by calling .Save.
func New(name string) (*CA, error) {
pubKey, privKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
@@ -116,19 +140,44 @@
}
ca := &CA{
- PrivateKey: &privKey,
+ privateKey: &privKey,
CACertRaw: caCertRaw,
CACert: caCert,
- }
- if ca.ReissueCRL() != nil {
- return nil, fmt.Errorf("failed to create initial CRL: %w", err)
+
+ canBootstrapIssue: true,
}
return ca, nil
}
-// FromCertificates restores CA state.
-func FromCertificates(caCert []byte, caKey []byte, crl []byte) (*CA, error) {
+// Load restores CA state from etcd.
+func Load(ctx context.Context, kv clientv3.KV) (*CA, error) {
+ resp, err := kv.Txn(ctx).Then(
+ clientv3.OpGet(pathCACertificate),
+ clientv3.OpGet(pathCAKey),
+ // We only read the CRL to ensure it exists on etcd (and early fail on inconsistency)
+ clientv3.OpGet(pathCACRL)).Commit()
+ if err != nil {
+ return nil, fmt.Errorf("failed to retrieve CA from etcd: %w", err)
+ }
+
+ var caCert, caKey, caCRL []byte
+ for _, el := range resp.Responses {
+ for _, kv := range el.GetResponseRange().GetKvs() {
+ switch string(kv.Key) {
+ case pathCACertificate:
+ caCert = kv.Value
+ case pathCAKey:
+ caKey = kv.Value
+ case pathCACRL:
+ caCRL = kv.Value
+ }
+ }
+ }
+ if caCert == nil || caKey == nil || caCRL == nil {
+ return nil, fmt.Errorf("failed to retrieve CA from etcd, missing at least one of {ca key, ca crt, ca crl}")
+ }
+
if len(caKey) != ed25519.PrivateKeySize {
return nil, errors.New("invalid CA private key size")
}
@@ -136,26 +185,58 @@
caCertVal, err := x509.ParseCertificate(caCert)
if err != nil {
- return nil, err
- }
- crlVal, err := x509.ParseCRL(crl)
- if err != nil {
- return nil, err
+ return nil, fmt.Errorf("failed to parse CA certificate: %w", err)
}
return &CA{
- PrivateKey: &privateKey,
+ privateKey: &privateKey,
CACertRaw: caCert,
CACert: caCertVal,
- Revoked: crlVal.TBSCertList.RevokedCertificates,
}, nil
}
-// IssueCertificate issues a certificate
-func (ca *CA) IssueCertificate(commonName string, ip net.IP) (cert []byte, privkey []byte, err error) {
+// Save stores a newly created CA into etcd, committing both the CA data and any certificates issued until then.
+func (c *CA) Save(ctx context.Context, kv clientv3.KV) error {
+ crl, err := c.makeCRL(nil)
+ if err != nil {
+ return fmt.Errorf("failed to generate initial CRL: %w", err)
+ }
+
+ ops := []clientv3.Op{
+ clientv3.OpPut(pathCACertificate, string(c.CACertRaw)),
+ clientv3.OpPut(pathCAKey, string([]byte(*c.privateKey))),
+ clientv3.OpPut(pathCACRL, string(crl)),
+ }
+ for i, certRaw := range c.bootstrapIssued {
+ cert, err := x509.ParseCertificate(certRaw)
+ if err != nil {
+ return fmt.Errorf("failed to parse in-memory certificate %d", i)
+ }
+ ops = append(ops, clientv3.OpPut(pathIssuedCertificate(cert.SerialNumber), string(certRaw)))
+ }
+
+ res, err := kv.Txn(ctx).If(
+ clientv3.Compare(clientv3.CreateRevision(pathCAKey), "=", 0),
+ ).Then(ops...).Commit()
+ if err != nil {
+ return fmt.Errorf("failed to store CA to etcd: %w", err)
+ }
+ if !res.Succeeded {
+ // This should pretty much never happen, but we want to catch it just in case.
+ return fmt.Errorf("failed to store CA to etcd: CA already present - cluster-level data inconsistency")
+ }
+ c.bootstrapIssued = nil
+ c.canBootstrapIssue = false
+ return nil
+}
+
+// Issue issues a certificate. If kv is non-nil, the newly issued certificate will be immediately stored to etcd,
+// otherwise it will be kept in memory (until .Save is called). Certificates can only be issued to memory on
+// newly-created CAs that have not been saved to etcd yet.
+func (c *CA) Issue(ctx context.Context, kv clientv3.KV, commonName string, externalAddress net.IP) (cert []byte, privkey []byte, err error) {
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 127)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
- err = fmt.Errorf("Failed to generate serial number: %w", err)
+ err = fmt.Errorf("failed to generate serial number: %w", err)
return
}
@@ -180,30 +261,180 @@
NotAfter: unknownNotAfter,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
DNSNames: []string{commonName},
- IPAddresses: []net.IP{ip},
+ IPAddresses: []net.IP{externalAddress},
}
- cert, err = x509.CreateCertificate(rand.Reader, etcdCert, ca.CACert, pubKey, ca.PrivateKey)
+ cert, err = x509.CreateCertificate(rand.Reader, etcdCert, c.CACert, pubKey, c.privateKey)
+ if err != nil {
+ err = fmt.Errorf("failed to sign new certificate: %w", err)
+ return
+ }
+
+ if kv != nil {
+ path := pathIssuedCertificate(serialNumber)
+ _, err = kv.Put(ctx, path, string(cert))
+ if err != nil {
+ err = fmt.Errorf("failed to commit new certificate to etcd: %w", err)
+ return
+ }
+ } else {
+ if !c.canBootstrapIssue {
+ err = fmt.Errorf("cannot issue new certificate to memory on existing, etcd-backed CA")
+ return
+ }
+ c.bootstrapIssued = append(c.bootstrapIssued, cert)
+ }
return
}
-func (ca *CA) ReissueCRL() error {
- newCRL, err := ca.CACert.CreateCRL(rand.Reader, ca.PrivateKey, ca.Revoked, time.Now(), unknownNotAfter)
+func (c *CA) makeCRL(revoked []pkix.RevokedCertificate) ([]byte, error) {
+ crl, err := c.CACert.CreateCRL(rand.Reader, c.privateKey, revoked, time.Now(), unknownNotAfter)
if err != nil {
- return err
+ return nil, fmt.Errorf("failed to generate CRL: %w", err)
}
- ca.CRLRaw = newCRL
- return nil
+ return crl, nil
}
-func (ca *CA) Revoke(serial *big.Int) error {
- for _, revokedCert := range ca.Revoked {
+// Revoke revokes a certificate by hostname. The selected hostname will be added to the CRL stored in etcd. This call
+// might fail (safely) if a simultaneous revoke happened that caused the CRL to be bumped. The call can be then retried
+// safely.
+func (c *CA) Revoke(ctx context.Context, kv clientv3.KV, hostname string) error {
+ res, err := kv.Txn(ctx).Then(
+ clientv3.OpGet(pathCACRL),
+ clientv3.OpGet(pathIssuedCertificates, clientv3.WithPrefix())).Commit()
+ if err != nil {
+ return fmt.Errorf("failed to retrieve certificates and CRL from etcd: %w", err)
+ }
+
+ var certs []*x509.Certificate
+ var crlRevision int64
+ var crl *pkix.CertificateList
+ for _, el := range res.Responses {
+ for _, kv := range el.GetResponseRange().GetKvs() {
+ if string(kv.Key) == pathCACRL {
+ crl, err = x509.ParseCRL(kv.Value)
+ if err != nil {
+ return fmt.Errorf("could not parse CRL from etcd: %w", err)
+ }
+ crlRevision = kv.CreateRevision
+ } else {
+ cert, err := x509.ParseCertificate(kv.Value)
+ if err != nil {
+ return fmt.Errorf("could not parse certificate %q from etcd: %w", string(kv.Key), err)
+ }
+ certs = append(certs, cert)
+ }
+ }
+ }
+
+ if crl == nil {
+ return fmt.Errorf("could not find CRL in etcd")
+ }
+ revoked := crl.TBSCertList.RevokedCertificates
+
+ // Find requested hostname in issued certificates.
+ var serial *big.Int
+ for _, cert := range certs {
+ for _, dnsName := range cert.DNSNames {
+ if dnsName == hostname {
+ serial = cert.SerialNumber
+ break
+ }
+ }
+ if serial != nil {
+ break
+ }
+ }
+ if serial == nil {
+ return fmt.Errorf("could not find requested hostname")
+ }
+
+ // Check if certificate has already been revoked.
+ for _, revokedCert := range revoked {
if revokedCert.SerialNumber.Cmp(serial) == 0 {
return nil // Already revoked
}
}
- ca.Revoked = append(ca.Revoked, pkix.RevokedCertificate{
+
+ revoked = append(revoked, pkix.RevokedCertificate{
SerialNumber: serial,
RevocationTime: time.Now(),
})
- return ca.ReissueCRL()
+
+ crlRaw, err := c.makeCRL(revoked)
+ if err != nil {
+ return fmt.Errorf("when generating new CRL for revocation: %w", err)
+ }
+
+ res, err = kv.Txn(ctx).If(
+ clientv3.Compare(clientv3.CreateRevision(pathCACRL), "=", crlRevision),
+ ).Then(
+ clientv3.OpPut(pathCACRL, string(crlRaw)),
+ ).Commit()
+ if err != nil {
+ return fmt.Errorf("when saving new CRL: %w", err)
+ }
+ if !res.Succeeded {
+ return fmt.Errorf("CRL save transaction failed, retry possibly")
+ }
+
+ return nil
+}
+
+// WaitCRLChange returns a channel that will receive a CRLUpdate any time the remote CRL changed. Immediately after
+// calling this method, the current CRL is retrieved from the cluster and put into the channel.
+func (c *CA) WaitCRLChange(ctx context.Context, kv clientv3.KV, w clientv3.Watcher) <-chan CRLUpdate {
+ C := make(chan CRLUpdate)
+
+ go func(ctx context.Context) {
+ ctxC, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ fail := func(f string, args ...interface{}) {
+ C <- CRLUpdate{Err: fmt.Errorf(f, args...)}
+ close(C)
+ }
+
+ initial, err := kv.Get(ctx, pathCACRL)
+ if err != nil {
+ fail("failed to retrieve initial CRL: %w", err)
+ return
+ }
+
+ C <- CRLUpdate{CRL: initial.Kvs[0].Value}
+
+ for wr := range w.Watch(ctxC, pathCACRL, clientv3.WithRev(initial.Kvs[0].CreateRevision)) {
+ if wr.Err() != nil {
+ fail("failed watching CRL: %w", wr.Err())
+ return
+ }
+
+ for _, e := range wr.Events {
+ if string(e.Kv.Key) != pathCACRL {
+ continue
+ }
+
+ C <- CRLUpdate{CRL: e.Kv.Value}
+ }
+ }
+ }(ctx)
+
+ return C
+}
+
+// CRLUpdate is emitted for every remote CRL change, and spuriously on ever new WaitCRLChange.
+type CRLUpdate struct {
+ // The new (or existing, in the case of the first call) CRL. If nil, Err will be set.
+ CRL []byte
+ // If set, an error occurred and the WaitCRLChange call must be restarted. If set, CRL will be nil.
+ Err error
+}
+
+// GetCurrentCRL returns the current CRL for the CA. This should only be used for one-shot operations like
+// bootstrapping a new node that doesn't yet have access to etcd - otherwise, WaitCRLChange shoulde be used.
+func (c *CA) GetCurrentCRL(ctx context.Context, kv clientv3.KV) ([]byte, error) {
+ initial, err := kv.Get(ctx, pathCACRL)
+ if err != nil {
+ return nil, fmt.Errorf("failed to retrieve initial CRL: %w", err)
+ }
+ return initial.Kvs[0].Value, nil
}
diff --git a/core/internal/consensus/consensus.go b/core/internal/consensus/consensus.go
index 5885aa8..94d84b2 100644
--- a/core/internal/consensus/consensus.go
+++ b/core/internal/consensus/consensus.go
@@ -14,43 +14,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// package consensus manages the embedded etcd cluster.
+// Package consensus implements a managed etcd cluster member service, with a self-hosted CA system for issuing peer
+// certificates. Currently each Smalltown node runs an etcd member, and connects to the etcd member locally over a unix
+// domain socket.
+//
+// The service supports two modes of startup:
+// - initializing a new cluster, by bootstrapping the CA in memory, starting a cluster, committing the CA to etcd
+// afterwards, and saving the new node's certificate to local storage
+// - joining an existing cluster, using certificates from local storage and loading the CA from etcd. This flow is also
+// used when the node joins a cluster for the first time (then the certificates required must be provisioned
+// externally before starting the consensus service).
+//
+// Regardless of how the etcd member service was started, the resulting running service is further managed and used
+// in the same way.
+//
package consensus
import (
- "bytes"
"context"
- "crypto/x509"
- "encoding/binary"
- "encoding/hex"
"encoding/pem"
"fmt"
- "io/ioutil"
- "math/rand"
"net"
"net/url"
- "os"
- "path"
- "path/filepath"
- "strings"
+ "sync"
"time"
- "github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/namespace"
"go.etcd.io/etcd/embed"
- "go.etcd.io/etcd/etcdserver/api/membership"
- "go.etcd.io/etcd/pkg/types"
- "go.etcd.io/etcd/proxy/grpcproxy/adapter"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
- "golang.org/x/sys/unix"
- "git.monogon.dev/source/nexantic.git/core/generated/api"
"git.monogon.dev/source/nexantic.git/core/internal/common"
- "git.monogon.dev/source/nexantic.git/core/internal/common/service"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/internal/consensus/ca"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
)
const (
@@ -58,106 +57,98 @@
DefaultLogger = "zap"
)
-const (
- CAPath = "ca.pem"
- CertPath = "cert.pem"
- KeyPath = "cert-key.pem"
- CRLPath = "ca-crl.der"
- CRLSwapPath = "ca-crl.der.swp"
-)
+// Service is the etcd cluster member service.
+type Service struct {
+ // The configuration with which the service was started. This is immutable.
+ config *Config
-const (
- LocalListenerURL = "unix:///consensus/listener.sock:0"
-)
+ // stateMu guards state. This is locked internally on public methods of Service that require access to state. The
+ // state might be recreated on service restart.
+ stateMu sync.Mutex
+ state *state
+}
-type (
- Service struct {
- *service.BaseService
+// state is the runtime state of a running etcd member.
+type state struct {
+ etcd *embed.Etcd
+ ready atomic.Bool
- etcd *embed.Etcd
- kv clientv3.KV
- ready atomic.Bool
+ ca *ca.CA
+ // cl is an etcd client that loops back to the localy running etcd server. This runs over the Client unix domain
+ // socket that etcd starts.
+ cl *clientv3.Client
+}
- // bootstrapCA and bootstrapCert cache the etcd cluster CA data during bootstrap.
- bootstrapCA *ca.CA
- bootstrapCert []byte
+type Config struct {
+ // Data directory (persistent, encrypted storage) for etcd.
+ Data *localstorage.DataEtcdDirectory
+ // Ephemeral directory for etcd.
+ Ephemeral *localstorage.EphemeralConsensusDirectory
- watchCRLTicker *time.Ticker
- lastCRL []byte
+ // Name is the cluster name. This must be the same amongst all etcd members within one cluster.
+ Name string
+ // NewCluster selects whether the etcd member will start a new cluster and bootstrap a CA and the first member
+ // certificate, or load existing PKI certificates from disk.
+ NewCluster bool
+ // InitialCluster sets the initial cluster peer URLs when NewCluster is set, and is ignored otherwise. Usually this
+ // will be just the new, single server, and more members will be added later.
+ InitialCluster string
+ // ExternalHost is the IP address or hostname at which this cluster member is reachable to other cluster members.
+ ExternalHost string
+ // ListenHost is the IP address or hostname at which this cluster member will listen.
+ ListenHost string
+ // Port is the port at which this cluster member will listen for other members. If zero, defaults to the global
+ // Smalltown setting.
+ Port int
+}
- config *Config
- }
-
- Config struct {
- Name string
- DataDir string
- InitialCluster string
- NewCluster bool
- ExternalHost string
- ListenHost string
- }
-
- Member struct {
- ID uint64
- Name string
- Address string
- Synced bool
- }
-)
-
-func NewConsensusService(config Config, logger *zap.Logger) (*Service, error) {
- consensusServer := &Service{
+func New(config Config) *Service {
+ return &Service{
config: &config,
}
- consensusServer.BaseService = service.NewBaseService("consensus", logger, consensusServer)
-
- return consensusServer, nil
}
-func peerURL(host string) url.URL {
- return url.URL{Scheme: "https", Host: fmt.Sprintf("%s:%d", host, common.ConsensusPort)}
-}
+// configure transforms the service configuration into an embedded etcd configuration. This is pure and side effect
+// free.
+func (s *Service) configure(ctx context.Context) (*embed.Config, error) {
+ if err := s.config.Ephemeral.MkdirAll(0700); err != nil {
+ return nil, fmt.Errorf("failed to create ephemeral directory: %w", err)
+ }
+ if err := s.config.Data.MkdirAll(0700); err != nil {
+ return nil, fmt.Errorf("failed to create data directory: %w", err)
+ }
-func (s *Service) OnStart() error {
- // See: https://godoc.org/github.com/coreos/etcd/embed#Config
-
- if s.config == nil {
- return errors.New("config for consensus is nil")
+ port := s.config.Port
+ if port == 0 {
+ port = common.ConsensusPort
}
cfg := embed.NewConfig()
- cfg.PeerTLSInfo.CertFile = filepath.Join(s.config.DataDir, CertPath)
- cfg.PeerTLSInfo.KeyFile = filepath.Join(s.config.DataDir, KeyPath)
- cfg.PeerTLSInfo.TrustedCAFile = filepath.Join(s.config.DataDir, CAPath)
- cfg.PeerTLSInfo.ClientCertAuth = true
- cfg.PeerTLSInfo.CRLFile = filepath.Join(s.config.DataDir, CRLPath)
-
- lastCRL, err := ioutil.ReadFile(cfg.PeerTLSInfo.CRLFile)
- if err != nil {
- return fmt.Errorf("failed to read etcd CRL: %w", err)
- }
- s.lastCRL = lastCRL
-
- // Expose etcd to local processes
- if err := os.MkdirAll("/consensus", 0700); err != nil {
- return fmt.Errorf("Failed to create consensus runtime state directory: %w", err)
- }
- listenerURL, err := url.Parse(LocalListenerURL)
- if err != nil {
- panic(err)
- }
- cfg.LCUrls = []url.URL{*listenerURL}
-
- cfg.APUrls = []url.URL{peerURL(s.config.ExternalHost)}
- cfg.LPUrls = []url.URL{peerURL(s.config.ListenHost)}
- cfg.ACUrls = []url.URL{}
-
- cfg.Dir = s.config.DataDir
- cfg.InitialClusterToken = DefaultClusterToken
cfg.Name = s.config.Name
+ cfg.Dir = s.config.Data.Data.FullPath()
+ cfg.InitialClusterToken = DefaultClusterToken
- // Only relevant if creating or joining a cluster; otherwise settings will be ignored
+ cfg.PeerTLSInfo.CertFile = s.config.Data.PeerPKI.Certificate.FullPath()
+ cfg.PeerTLSInfo.KeyFile = s.config.Data.PeerPKI.Key.FullPath()
+ cfg.PeerTLSInfo.TrustedCAFile = s.config.Data.PeerPKI.CACertificate.FullPath()
+ cfg.PeerTLSInfo.ClientCertAuth = true
+ cfg.PeerTLSInfo.CRLFile = s.config.Data.PeerCRL.FullPath()
+
+ cfg.LCUrls = []url.URL{{
+ Scheme: "unix",
+ Path: s.config.Ephemeral.ClientSocket.FullPath() + ":0",
+ }}
+ cfg.ACUrls = []url.URL{}
+ cfg.LPUrls = []url.URL{{
+ Scheme: "https",
+ Host: fmt.Sprintf("%s:%d", s.config.ListenHost, port),
+ }}
+ cfg.APUrls = []url.URL{{
+ Scheme: "https",
+ Host: fmt.Sprintf("%s:%d", s.config.ExternalHost, port),
+ }}
+
if s.config.NewCluster {
cfg.ClusterState = "new"
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
@@ -166,339 +157,279 @@
cfg.InitialCluster = s.config.InitialCluster
}
+ logger := supervisor.Logger(ctx)
cfg.Logger = DefaultLogger
cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(
- s.Logger.With(zap.String("component", "etcd")).WithOptions(zap.IncreaseLevel(zapcore.WarnLevel)),
- s.Logger.Core(),
+ logger.With(zap.String("component", "etcd")).WithOptions(zap.IncreaseLevel(zapcore.WarnLevel)),
+ logger.Core(),
nil,
)
+ return cfg, nil
+}
+
+// Run is a Supervisor runnable that starts the etcd member service. It will become healthy once the member joins the
+// cluster successfully.
+func (s *Service) Run(ctx context.Context) error {
+ st := &state{
+ ready: *atomic.NewBool(false),
+ }
+ s.stateMu.Lock()
+ s.state = st
+ s.stateMu.Unlock()
+
+ if s.config.NewCluster {
+ // Expect certificate to be absent from disk.
+ absent, err := s.config.Data.PeerPKI.AllAbsent()
+ if err != nil {
+ return fmt.Errorf("checking certificate existence: %w", err)
+ }
+ if !absent {
+ return fmt.Errorf("want new cluster, but certificates already exist on disk")
+ }
+
+ // Generate CA, keep in memory, write it down in etcd later.
+ st.ca, err = ca.New("Smalltown etcd peer Root CA")
+ if err != nil {
+ return fmt.Errorf("when creating new cluster's peer CA: %w", err)
+ }
+
+ ip := net.ParseIP(s.config.ExternalHost)
+ if ip == nil {
+ return fmt.Errorf("configued external host is not an IP address (got %q)", s.config.ExternalHost)
+ }
+
+ cert, key, err := st.ca.Issue(ctx, nil, s.config.Name, ip)
+ if err != nil {
+ return fmt.Errorf("when issuing new cluster's first certificate: %w", err)
+ }
+
+ if err := s.config.Data.PeerPKI.MkdirAll(0600); err != nil {
+ return fmt.Errorf("when creating PKI directory: %w", err)
+ }
+ if err := s.config.Data.PeerPKI.CACertificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: st.ca.CACertRaw}), 0600); err != nil {
+ return fmt.Errorf("when writing CA certificate to disk: %w", err)
+ }
+ if err := s.config.Data.PeerPKI.Certificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}), 0600); err != nil {
+ return fmt.Errorf("when writing certificate to disk: %w", err)
+ }
+ if err := s.config.Data.PeerPKI.Key.Write(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key}), 0600); err != nil {
+ return fmt.Errorf("when writing certificate to disk: %w", err)
+ }
+ } else {
+ // Expect certificate to be present on disk.
+ present, err := s.config.Data.PeerPKI.AllExist()
+ if err != nil {
+ return fmt.Errorf("checking certificate existence: %w", err)
+ }
+ if !present {
+ return fmt.Errorf("want existing cluster, but certificate is missing from disk")
+ }
+ }
+
+ if err := s.config.Data.MkdirAll(0600); err != nil {
+ return fmt.Errorf("failed to create data directory; %w", err)
+ }
+
+ cfg, err := s.configure(ctx)
+ if err != nil {
+ return fmt.Errorf("when configuring etcd: %w", err)
+ }
+
server, err := embed.StartEtcd(cfg)
- if err != nil {
- return err
- }
- s.etcd = server
-
- // Override the logger
- //*server.GetLogger() = *s.Logger.With(zap.String("component", "etcd"))
- // TODO(leo): can we uncomment this?
-
- go func() {
- s.Logger.Info("waiting for etcd to become ready")
- <-s.etcd.Server.ReadyNotify()
- s.ready.Store(true)
- s.Logger.Info("etcd is now ready")
+ keep := false
+ defer func() {
+ if !keep && server != nil {
+ server.Close()
+ }
}()
-
- // Inject kv client
- s.kv = clientv3.NewKVFromKVClient(adapter.KvServerToKvClient(s.etcd.Server), nil)
-
- // Start CRL watcher
- go s.watchCRL()
- ctx := context.TODO()
- go s.autoPromote(ctx)
-
- return nil
-}
-
-// WriteCertificateFiles writes the given node certificate data to local storage
-// such that it can be used by the embedded etcd server.
-// Unfortunately, we cannot pass the certificates directly to etcd.
-func (s *Service) WriteCertificateFiles(certs *api.ConsensusCertificates) error {
- if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CRLPath), certs.Crl, 0600); err != nil {
- return err
- }
- if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CertPath),
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs.Cert}), 0600); err != nil {
- return err
- }
- if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, KeyPath),
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: certs.Key}), 0600); err != nil {
- return err
- }
- if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CAPath),
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs.Ca}), 0600); err != nil {
- return err
- }
- return nil
-}
-
-// PrecreateCA generates the etcd cluster certificate authority and writes it to local storage.
-func (s *Service) PrecreateCA(extIP net.IP) error {
- // Provision an etcd CA
- etcdRootCA, err := ca.New("Smalltown etcd Root CA")
if err != nil {
- return err
+ return fmt.Errorf("failed to start etcd: %w", err)
}
- cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost, extIP)
- if err != nil {
- return fmt.Errorf("failed to self-issue a certificate: %w", err)
- }
- if err := os.MkdirAll(s.config.DataDir, 0700); err != nil {
- return fmt.Errorf("failed to create consensus data dir: %w", err)
- }
- // Preserve certificate for later injection
- s.bootstrapCert = cert
- if err := s.WriteCertificateFiles(&api.ConsensusCertificates{
- Ca: etcdRootCA.CACertRaw,
- Crl: etcdRootCA.CRLRaw,
- Cert: cert,
- Key: privkey,
- }); err != nil {
- return fmt.Errorf("failed to setup certificates: %w", err)
- }
- s.bootstrapCA = etcdRootCA
- return nil
-}
+ st.etcd = server
-const (
- caPathEtcd = "/etcd-ca/ca.der"
- caKeyPathEtcd = "/etcd-ca/ca-key.der"
- crlPathEtcd = "/etcd-ca/crl.der"
+ supervisor.Logger(ctx).Info("waiting for etcd...")
- // This prefix stores the individual certs the etcd CA has issued.
- certPrefixEtcd = "/etcd-ca/certs"
-)
-
-// InjectCA copies the CA from data cached during PrecreateCA to etcd.
-// Requires a previous call to PrecreateCA.
-func (s *Service) InjectCA() error {
- if s.bootstrapCA == nil || s.bootstrapCert == nil {
- panic("bootstrapCA or bootstrapCert are nil - missing PrecreateCA call?")
- }
- if _, err := s.kv.Put(context.Background(), caPathEtcd, string(s.bootstrapCA.CACertRaw)); err != nil {
- return err
- }
- // TODO(lorenz): Should be wrapped by the master key
- if _, err := s.kv.Put(context.Background(), caKeyPathEtcd, string([]byte(*s.bootstrapCA.PrivateKey))); err != nil {
- return err
- }
- if _, err := s.kv.Put(context.Background(), crlPathEtcd, string(s.bootstrapCA.CRLRaw)); err != nil {
- return err
- }
- certVal, err := x509.ParseCertificate(s.bootstrapCert)
- if err != nil {
- return err
- }
- serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
- if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(s.bootstrapCert)); err != nil {
- return fmt.Errorf("failed to persist certificate: %w", err)
- }
- // Clear out bootstrap CA after injecting
- s.bootstrapCA = nil
- s.bootstrapCert = []byte{}
- return nil
-}
-
-func (s *Service) etcdGetSingle(path string) ([]byte, int64, error) {
- res, err := s.kv.Get(context.Background(), path)
- if err != nil {
- return nil, -1, fmt.Errorf("failed to get key from etcd: %w", err)
- }
- if len(res.Kvs) != 1 {
- return nil, -1, errors.New("key not available or multiple keys returned")
- }
- return res.Kvs[0].Value, res.Kvs[0].ModRevision, nil
-}
-
-func (s *Service) getCAFromEtcd() (*ca.CA, int64, error) {
- // TODO: Technically this could be done in a single request, but it's more logic
- caCert, _, err := s.etcdGetSingle(caPathEtcd)
- if err != nil {
- return nil, -1, fmt.Errorf("failed to get CA certificate from etcd: %w", err)
- }
- caKey, _, err := s.etcdGetSingle(caKeyPathEtcd)
- if err != nil {
- return nil, -1, fmt.Errorf("failed to get CA key from etcd: %w", err)
- }
- // TODO: Unwrap CA key once wrapping is implemented
- crl, crlRevision, err := s.etcdGetSingle(crlPathEtcd)
- if err != nil {
- return nil, -1, fmt.Errorf("failed to get CRL from etcd: %w", err)
- }
- idCA, err := ca.FromCertificates(caCert, caKey, crl)
- if err != nil {
- return nil, -1, fmt.Errorf("failed to take CA online: %w", err)
- }
- return idCA, crlRevision, nil
-}
-
-// ProvisionMember sets up and returns provisioning data to join another node into the consensus.
-// It issues PKI material, creates a static cluster bootstrap specification string (known as initial-cluster in etcd)
-// and adds the new node as a learner (non-voting) member to the cluster. Once the new node has caught up with the
-// cluster it is automatically promoted to a voting member by the autoPromote process.
-func (s *Service) ProvisionMember(name string, ip net.IP) (*api.ConsensusCertificates, string, error) {
- idCA, _, err := s.getCAFromEtcd()
- if err != nil {
- return nil, "", fmt.Errorf("failed to get consensus CA: %w", err)
- }
- cert, key, err := idCA.IssueCertificate(name, ip)
- if err != nil {
- return nil, "", fmt.Errorf("failed to issue certificate: %w", err)
- }
- certVal, err := x509.ParseCertificate(cert)
- if err != nil {
- return nil, "", fmt.Errorf("failed to parse just-issued consensus cert: %w", err)
- }
- serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
- if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(cert)); err != nil {
- // We issued a certificate, but failed to persist it. Return an error and forget it ever happened.
- return nil, "", fmt.Errorf("failed to persist certificate: %w", err)
+ okay := true
+ select {
+ case <-st.etcd.Server.ReadyNotify():
+ case <-ctx.Done():
+ okay = false
}
- currentMembers := s.etcd.Server.Cluster().Members()
- var memberStrs []string
- for _, member := range currentMembers {
- memberStrs = append(memberStrs, fmt.Sprintf("%v=%v", member.Name, member.PickPeerURL()))
- }
- apURL := peerURL(ip.String())
- memberStrs = append(memberStrs, fmt.Sprintf("%s=%s", name, apURL.String()))
-
- pubKeyPrefix, err := common.IDKeyPrefixFromName(name)
- if err != nil {
- return nil, "", fmt.Errorf("invalid new node name: %v", err)
+ if !okay {
+ supervisor.Logger(ctx).Info("context done, aborting wait")
+ return ctx.Err()
}
- crl, _, err := s.etcdGetSingle(crlPathEtcd)
-
- _, err = s.etcd.Server.AddMember(context.Background(), membership.Member{
- RaftAttributes: membership.RaftAttributes{
- PeerURLs: types.URLs{apURL}.StringSlice(),
- IsLearner: true,
- },
- Attributes: membership.Attributes{Name: name},
- ID: types.ID(binary.BigEndian.Uint64(pubKeyPrefix[:8])),
+ socket := s.config.Ephemeral.ClientSocket.FullPath()
+ cl, err := clientv3.New(clientv3.Config{
+ Endpoints: []string{fmt.Sprintf("unix://%s:0", socket)},
+ DialTimeout: time.Second,
})
if err != nil {
- return nil, "", fmt.Errorf("failed to provision member: %w", err)
+ return fmt.Errorf("failed to connect to new etcd instance: %w", err)
}
- return &api.ConsensusCertificates{
- Ca: idCA.CACertRaw,
- Cert: cert,
- Crl: crl,
- Key: key,
- }, strings.Join(memberStrs, ","), nil
+ st.cl = cl
+
+ if s.config.NewCluster {
+ if st.ca == nil {
+ panic("peerCA has not been generated")
+ }
+
+ // Save new CA into etcd.
+ err = st.ca.Save(ctx, cl.KV)
+ if err != nil {
+ return fmt.Errorf("failed to save new CA to etcd: %w", err)
+ }
+ } else {
+ // Load existing CA from etcd.
+ st.ca, err = ca.Load(ctx, cl.KV)
+ if err != nil {
+ return fmt.Errorf("failed to load CA from etcd: %w", err)
+ }
+ }
+
+ // Start CRL watcher.
+ if err := supervisor.Run(ctx, "crl", s.watchCRL); err != nil {
+ return fmt.Errorf("failed to start CRL watcher: %w", err)
+ }
+ // Start autopromoter.
+ if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
+ return fmt.Errorf("failed to start autopromoter: %w", err)
+ }
+
+ supervisor.Logger(ctx).Info("etcd is now ready")
+ keep = true
+ st.ready.Store(true)
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ <-ctx.Done()
+ st.etcd.Close()
+ return ctx.Err()
}
-func (s *Service) RevokeCertificate(hostname string) error {
- rand.Seed(time.Now().UnixNano())
- for {
- idCA, crlRevision, err := s.getCAFromEtcd()
- if err != nil {
- return err
+// watchCRL is a sub-runnable of the etcd cluster member service that updates the on-local-storage CRL to match the
+// newest available version in etcd.
+func (s *Service) watchCRL(ctx context.Context) error {
+ s.stateMu.Lock()
+ cl := s.state.cl
+ ca := s.state.ca
+ s.stateMu.Unlock()
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ for e := range ca.WaitCRLChange(ctx, cl.KV, cl.Watcher) {
+ if e.Err != nil {
+ return fmt.Errorf("watching CRL: %w", e.Err)
}
- allIssuedCerts, err := s.kv.Get(context.Background(), certPrefixEtcd, clientv3.WithPrefix())
- for _, cert := range allIssuedCerts.Kvs {
- certVal, err := x509.ParseCertificate(cert.Value)
- if err != nil {
- s.Logger.Error("Failed to parse previously issued certificate, this is a security risk", zap.Error(err))
+
+ if err := s.config.Data.PeerCRL.Write(e.CRL, 0600); err != nil {
+ return fmt.Errorf("saving CRL: %w", err)
+ }
+ }
+
+ // unreachable
+ return nil
+}
+
+func (s *Service) autopromoter(ctx context.Context) error {
+ t := time.NewTicker(5 * time.Second)
+ defer t.Stop()
+
+ autopromote := func() {
+ s.stateMu.Lock()
+ st := s.state
+ s.stateMu.Unlock()
+
+ if st.etcd.Server.Leader() != st.etcd.Server.ID() {
+ return
+ }
+
+ for _, member := range st.etcd.Server.Cluster().Members() {
+ if !member.IsLearner {
continue
}
- for _, dnsName := range certVal.DNSNames {
- if dnsName == hostname {
- // Revoke this
- if err := idCA.Revoke(certVal.SerialNumber); err != nil {
- // We need to fail if any single revocation fails otherwise outer applications
- // have no chance of calling this safely
- return err
- }
- }
- }
- }
- // TODO(leo): this needs a test
- cmp := clientv3.Compare(clientv3.ModRevision(crlPathEtcd), "=", crlRevision)
- op := clientv3.OpPut(crlPathEtcd, string(idCA.CRLRaw))
- res, err := s.kv.Txn(context.Background()).If(cmp).Then(op).Commit()
- if err != nil {
- return fmt.Errorf("failed to persist new CRL in etcd: %w", err)
- }
- if res.Succeeded { // Transaction has succeeded
- break
- }
- // Sleep a random duration between 0 and 300ms to reduce serialization failures
- time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
- }
- return nil
-}
-func (s *Service) watchCRL() {
- // TODO(lorenz): Change etcd client to WatchableKV and make this an actual watch
- // This needs changes in more places, so leaving it now
- s.watchCRLTicker = time.NewTicker(30 * time.Second)
- for range s.watchCRLTicker.C {
- crl, _, err := s.etcdGetSingle(crlPathEtcd)
- if err != nil {
- s.Logger.Warn("Failed to check for new CRL", zap.Error(err))
- continue
- }
- // This is cryptographic material but not secret, so no constant time compare necessary here
- if !bytes.Equal(crl, s.lastCRL) {
- if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CRLSwapPath), crl, 0600); err != nil {
- s.Logger.Warn("Failed to write updated CRL", zap.Error(err))
- }
- // This uses unix.Rename to guarantee a particular atomic update behavior
- if err := unix.Rename(filepath.Join(s.config.DataDir, CRLSwapPath), filepath.Join(s.config.DataDir, CRLPath)); err != nil {
- s.Logger.Warn("Failed to atomically swap updated CRL", zap.Error(err))
+ // We always call PromoteMember since the metadata necessary to decide if we should is private.
+ // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
+ // connected or are still behind on transactions.
+ if _, err := st.etcd.Server.PromoteMember(ctx, uint64(member.ID)); err != nil {
+ supervisor.Logger(ctx).Info("Failed to promote consensus node", zap.String("node", member.Name), zap.Error(err))
+ } else {
+ supervisor.Logger(ctx).Info("Promoted new consensus node", zap.String("node", member.Name))
}
}
}
-}
-// autoPromote automatically promotes learning (non-voting) members to voting members. etcd currently lacks auto-promote
-// capabilities (https://github.com/etcd-io/etcd/issues/10537) so we need to do this ourselves.
-func (s *Service) autoPromote(ctx context.Context) {
- promoteTicker := time.NewTicker(5 * time.Second)
- go func() {
- <-ctx.Done()
- promoteTicker.Stop()
- }()
- for range promoteTicker.C {
- if s.etcd.Server.Leader() != s.etcd.Server.ID() {
- continue
- }
- for _, member := range s.etcd.Server.Cluster().Members() {
- if member.IsLearner {
- // We always call PromoteMember since the metadata necessary to decide if we should is private.
- // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
- // connected or are still behind on transactions.
- if _, err := s.etcd.Server.PromoteMember(context.Background(), uint64(member.ID)); err != nil {
- s.Logger.Info("Failed to promote consensus node", zap.String("node", member.Name), zap.Error(err))
- }
- s.Logger.Info("Promoted new consensus node", zap.String("node", member.Name))
- }
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-t.C:
+ autopromote()
}
}
}
-func (s *Service) OnStop() error {
- s.watchCRLTicker.Stop()
- s.etcd.Close()
-
- return nil
-}
-
-// IsProvisioned returns whether the node has been setup before and etcd has a data directory
-func (s *Service) IsProvisioned() bool {
- _, err := os.Stat(s.config.DataDir)
-
- return !os.IsNotExist(err)
-}
-
// IsReady returns whether etcd is ready and synced
func (s *Service) IsReady() bool {
- return s.ready.Load()
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ if s.state == nil {
+ return false
+ }
+ return s.state.ready.Load()
}
-// GetConfig returns the current consensus config
-func (s *Service) GetConfig() Config {
- return *s.config
+func (s *Service) WaitReady(ctx context.Context) error {
+ // TODO(q3k): reimplement the atomic ready flag as an event synchronization mechanism
+ if s.IsReady() {
+ return nil
+ }
+ t := time.NewTicker(100 * time.Millisecond)
+ defer t.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-t.C:
+ if s.IsReady() {
+ return nil
+ }
+ }
+ }
}
-// SetConfig sets the consensus config. Changes are only applied when the service is restarted.
-func (s *Service) SetConfig(config Config) {
- s.config = &config
+// KV returns and etcd KV client interface to the etcd member/cluster.
+func (s *Service) KV(module, space string) clientv3.KV {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
}
-func (s *Service) GetStore(module, space string) clientv3.KV {
- return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
+func (s *Service) KVRoot() clientv3.KV {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return s.state.cl.KV
+}
+
+func (s *Service) Cluster() clientv3.Cluster {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return s.state.cl.Cluster
+}
+
+// MemberInfo returns information about this etcd cluster member: its ID and name. This will block until this
+// information is available (ie. the cluster status is Ready).
+func (s *Service) MemberInfo(ctx context.Context) (id uint64, name string, err error) {
+ if err = s.WaitReady(ctx); err != nil {
+ err = fmt.Errorf("when waiting for cluster readiness: %w", err)
+ return
+ }
+
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ id = uint64(s.state.etcd.Server.ID())
+ name = s.config.Name
+ return
}
diff --git a/core/internal/consensus/consensus_test.go b/core/internal/consensus/consensus_test.go
new file mode 100644
index 0000000..6e225e0
--- /dev/null
+++ b/core/internal/consensus/consensus_test.go
@@ -0,0 +1,267 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consensus
+
+import (
+ "bytes"
+ "context"
+ "crypto/x509"
+ "io/ioutil"
+ "net"
+ "os"
+ "testing"
+ "time"
+
+ "go.uber.org/zap"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
+ "git.monogon.dev/source/nexantic.git/golibs/common"
+)
+
+type boilerplate struct {
+ ctx context.Context
+ ctxC context.CancelFunc
+ root *localstorage.Root
+ logger *zap.Logger
+ tmpdir string
+}
+
+func prep(t *testing.T) *boilerplate {
+ ctx, ctxC := context.WithCancel(context.Background())
+ root := &localstorage.Root{}
+ tmp, err := ioutil.TempDir("", "smalltown-test")
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = declarative.PlaceFS(root, tmp)
+ if err != nil {
+ t.Fatal(err)
+ }
+ os.MkdirAll(root.Data.Etcd.FullPath(), 0700)
+ os.MkdirAll(root.Ephemeral.Consensus.FullPath(), 0700)
+
+ logger, _ := zap.NewDevelopment()
+
+ return &boilerplate{
+ ctx: ctx,
+ ctxC: ctxC,
+ root: root,
+ logger: logger,
+ tmpdir: tmp,
+ }
+}
+
+func (b *boilerplate) close() {
+ b.ctxC()
+ os.RemoveAll(b.tmpdir)
+}
+
+func waitEtcd(t *testing.T, s *Service) {
+ deadline := time.Now().Add(5 * time.Second)
+ for {
+ if time.Now().After(deadline) {
+ t.Fatalf("etcd did not start up on time")
+ }
+ if s.IsReady() {
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+}
+
+func TestBootstrap(t *testing.T) {
+ b := prep(t)
+ defer b.close()
+ etcd := New(Config{
+ Data: &b.root.Data.Etcd,
+ Ephemeral: &b.root.Ephemeral.Consensus,
+ Name: "test",
+ NewCluster: true,
+ InitialCluster: "127.0.0.1",
+ ExternalHost: "127.0.0.1",
+ ListenHost: "127.0.0.1",
+ Port: common.MustConsume(common.AllocateTCPPort()),
+ })
+
+ supervisor.New(b.ctx, b.logger, etcd.Run)
+ waitEtcd(t, etcd)
+
+ kv := etcd.KV("foo", "bar")
+ if _, err := kv.Put(b.ctx, "/foo", "bar"); err != nil {
+ t.Fatalf("test key creation failed: %v", err)
+ }
+ if _, err := kv.Get(b.ctx, "/foo"); err != nil {
+ t.Fatalf("test key retrieval failed: %v", err)
+ }
+}
+
+func TestMemberInfo(t *testing.T) {
+ b := prep(t)
+ defer b.close()
+ etcd := New(Config{
+ Data: &b.root.Data.Etcd,
+ Ephemeral: &b.root.Ephemeral.Consensus,
+ Name: "test",
+ NewCluster: true,
+ InitialCluster: "127.0.0.1",
+ ExternalHost: "127.0.0.1",
+ ListenHost: "127.0.0.1",
+ Port: common.MustConsume(common.AllocateTCPPort()),
+ })
+ supervisor.New(b.ctx, b.logger, etcd.Run)
+ waitEtcd(t, etcd)
+
+ id, name, err := etcd.MemberInfo(b.ctx)
+ if err != nil {
+ t.Fatalf("MemberInfo: %v", err)
+ }
+
+ // Compare name with configured name.
+ if want, got := "test", name; want != got {
+ t.Errorf("MemberInfo returned name %q, wanted %q (per config)", got, want)
+ }
+
+ // Compare name with cluster information.
+ members, err := etcd.Cluster().MemberList(b.ctx)
+ if err != nil {
+ t.Errorf("MemberList: %v", err)
+ }
+
+ if want, got := 1, len(members.Members); want != got {
+ t.Fatalf("expected one cluster member, got %d", got)
+ }
+ if want, got := id, members.Members[0].ID; want != got {
+ t.Errorf("MemberInfo returned ID %d, Cluster endpoint says %d", want, got)
+ }
+ if want, got := name, members.Members[0].Name; want != got {
+ t.Errorf("MemberInfo returned name %q, Cluster endpoint says %q", want, got)
+ }
+}
+
+func TestRestartFromDisk(t *testing.T) {
+ b := prep(t)
+ defer b.close()
+
+ startEtcd := func(new bool) (*Service, context.CancelFunc) {
+ etcd := New(Config{
+ Data: &b.root.Data.Etcd,
+ Ephemeral: &b.root.Ephemeral.Consensus,
+ Name: "test",
+ NewCluster: new,
+ InitialCluster: "127.0.0.1",
+ ExternalHost: "127.0.0.1",
+ ListenHost: "127.0.0.1",
+ Port: common.MustConsume(common.AllocateTCPPort()),
+ })
+ ctx, ctxC := context.WithCancel(b.ctx)
+ supervisor.New(ctx, b.logger, etcd.Run)
+ waitEtcd(t, etcd)
+ kv := etcd.KV("foo", "bar")
+ if new {
+ if _, err := kv.Put(b.ctx, "/foo", "bar"); err != nil {
+ t.Fatalf("test key creation failed: %v", err)
+ }
+ }
+ if _, err := kv.Get(b.ctx, "/foo"); err != nil {
+ t.Fatalf("test key retrieval failed: %v", err)
+ }
+
+ return etcd, ctxC
+ }
+
+ etcd, ctxC := startEtcd(true)
+ etcd.stateMu.Lock()
+ firstCA := etcd.state.ca.CACertRaw
+ etcd.stateMu.Unlock()
+ ctxC()
+
+ etcd, ctxC = startEtcd(false)
+ etcd.stateMu.Lock()
+ secondCA := etcd.state.ca.CACertRaw
+ etcd.stateMu.Unlock()
+ ctxC()
+
+ if bytes.Compare(firstCA, secondCA) != 0 {
+ t.Fatalf("wanted same, got different CAs accross runs")
+ }
+}
+
+func TestCRL(t *testing.T) {
+ b := prep(t)
+ defer b.close()
+ etcd := New(Config{
+ Data: &b.root.Data.Etcd,
+ Ephemeral: &b.root.Ephemeral.Consensus,
+ Name: "test",
+ NewCluster: true,
+ InitialCluster: "127.0.0.1",
+ ExternalHost: "127.0.0.1",
+ ListenHost: "127.0.0.1",
+ Port: common.MustConsume(common.AllocateTCPPort()),
+ })
+ supervisor.New(b.ctx, b.logger, etcd.Run)
+ waitEtcd(t, etcd)
+
+ etcd.stateMu.Lock()
+ ca := etcd.state.ca
+ kv := etcd.state.cl.KV
+ etcd.stateMu.Unlock()
+
+ certRaw, _, err := ca.Issue(b.ctx, kv, "revoketest", net.ParseIP("1.2.3.4"))
+ if err != nil {
+ t.Fatalf("cert issue failed: %v", err)
+ }
+ cert, err := x509.ParseCertificate(certRaw)
+ if err != nil {
+ t.Fatalf("cert parse failed: %v", err)
+ }
+
+ if err := ca.Revoke(b.ctx, kv, "revoketest"); err != nil {
+ t.Fatalf("cert revoke failed: %v", err)
+ }
+
+ deadline := time.Now().Add(5 * time.Second)
+ for {
+ if time.Now().After(deadline) {
+ t.Fatalf("CRL did not get updated in time")
+ }
+ time.Sleep(100 * time.Millisecond)
+
+ crlRaw, err := b.root.Data.Etcd.PeerCRL.Read()
+ if err != nil {
+ // That's fine. Maybe it hasn't been written yet.
+ continue
+ }
+ crl, err := x509.ParseCRL(crlRaw)
+ if err != nil {
+ // That's fine. Maybe it hasn't been written yet.
+ continue
+ }
+
+ found := false
+ for _, revoked := range crl.TBSCertList.RevokedCertificates {
+ if revoked.SerialNumber.Cmp(cert.SerialNumber) == 0 {
+ found = true
+ }
+ }
+ if found {
+ break
+ }
+ }
+}
diff --git a/core/internal/launch/BUILD.bazel b/core/internal/launch/BUILD.bazel
index 382c73b..47cbc95 100644
--- a/core/internal/launch/BUILD.bazel
+++ b/core/internal/launch/BUILD.bazel
@@ -8,6 +8,7 @@
deps = [
"//core/api/api:go_default_library",
"//core/internal/common:go_default_library",
+ "//golibs/common:go_default_library",
"@com_github_golang_protobuf//proto:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_x_sys//unix:go_default_library",
diff --git a/core/internal/launch/launch.go b/core/internal/launch/launch.go
index 774b432..d08117d 100644
--- a/core/internal/launch/launch.go
+++ b/core/internal/launch/launch.go
@@ -39,23 +39,9 @@
apipb "git.monogon.dev/source/nexantic.git/core/generated/api"
"git.monogon.dev/source/nexantic.git/core/internal/common"
+ freeport "git.monogon.dev/source/nexantic.git/golibs/common"
)
-// This is more of a best-effort solution and not guaranteed to give us unused ports (since we're not immediately using
-// them), but AFAIK qemu cannot dynamically select hostfwd ports
-func getFreePort() (uint16, io.Closer, error) {
- addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
- if err != nil {
- return 0, nil, err
- }
-
- l, err := net.ListenTCP("tcp", addr)
- if err != nil {
- return 0, nil, err
- }
- return uint16(l.Addr().(*net.TCPAddr).Port), l, nil
-}
-
type qemuValue map[string][]string
// toOption encodes structured data into a QEMU option.
@@ -167,7 +153,7 @@
func ConflictFreePortMap() (PortMap, error) {
portMap := make(PortMap)
for _, port := range requiredPorts {
- mappedPort, listenCloser, err := getFreePort()
+ mappedPort, listenCloser, err := freeport.AllocateTCPPort()
if err != nil {
return portMap, fmt.Errorf("failed to get free host port: %w", err)
}
diff --git a/core/internal/localstorage/declarative/BUILD.bazel b/core/internal/localstorage/declarative/BUILD.bazel
index 7c84d9d..c181fd1 100644
--- a/core/internal/localstorage/declarative/BUILD.bazel
+++ b/core/internal/localstorage/declarative/BUILD.bazel
@@ -9,4 +9,5 @@
],
importpath = "git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative",
visibility = ["//core:__subpackages__"],
+ deps = ["@org_golang_x_sys//unix:go_default_library"],
)
diff --git a/core/internal/localstorage/declarative/placement.go b/core/internal/localstorage/declarative/placement.go
index 252dbdf..c16da1d 100644
--- a/core/internal/localstorage/declarative/placement.go
+++ b/core/internal/localstorage/declarative/placement.go
@@ -47,6 +47,9 @@
// backends, and set on all directories during placement by a given backend.
type DirectoryPlacement interface {
Placement
+ // MkdirAll creates this directory and all its parents on backing stores that have a physical directory
+ // structure.
+ MkdirAll(file os.FileMode) error
}
// DirectoryPlacer is a placement backend-defined function that, given the path returned by the parent of a directory,
diff --git a/core/internal/localstorage/declarative/placement_local.go b/core/internal/localstorage/declarative/placement_local.go
index 38fe98d..1ebdba5 100644
--- a/core/internal/localstorage/declarative/placement_local.go
+++ b/core/internal/localstorage/declarative/placement_local.go
@@ -20,6 +20,8 @@
"fmt"
"io/ioutil"
"os"
+
+ "golang.org/x/sys/unix"
)
// FSRoot is a root of a storage backend that resides on the local filesystem.
@@ -56,8 +58,28 @@
return ioutil.ReadFile(f.FullPath())
}
+// Write performs an atomic file write, via a temporary file.
func (f *FSPlacement) Write(d []byte, mode os.FileMode) error {
- return ioutil.WriteFile(f.FullPath(), d, mode)
+ tmp, err := ioutil.TempFile("", "")
+ if err != nil {
+ return fmt.Errorf("temporary file creation failed: %w", err)
+ }
+ defer tmp.Close()
+ defer os.Remove(tmp.Name())
+ if _, err := tmp.Write(d); err != nil {
+ return fmt.Errorf("temporary file write failed: %w", err)
+ }
+ tmp.Close()
+
+ if err := unix.Rename(tmp.Name(), f.FullPath()); err != nil {
+ return fmt.Errorf("renaming target file failed: %w", err)
+ }
+
+ return nil
+}
+
+func (f *FSPlacement) MkdirAll(perm os.FileMode) error {
+ return os.MkdirAll(f.FullPath(), perm)
}
// PlaceFS takes a pointer to a Directory or a pointer to a structure embedding Directory and places it at a given
@@ -84,7 +106,7 @@
np := pathFor(parent, this)
return &FSPlacement{path: np, root: r}
}
- err := place(dd, "", "", dp, fp)
+ err := place(dd, r.root, "", dp, fp)
if err != nil {
return fmt.Errorf("could not place: %w", err)
}
diff --git a/core/internal/localstorage/directory_pki.go b/core/internal/localstorage/directory_pki.go
index a2d4424..f6ebb11 100644
--- a/core/internal/localstorage/directory_pki.go
+++ b/core/internal/localstorage/directory_pki.go
@@ -135,3 +135,33 @@
Leaf: cert,
}, nil
}
+
+// AllExist returns true if all PKI files (cert, key, CA cert) are present on the backing
+// store.
+func (p *PKIDirectory) AllExist() (bool, error) {
+ for _, d := range []*declarative.File{&p.CACertificate, &p.Certificate, &p.Key} {
+ exists, err := d.Exists()
+ if err != nil {
+ return false, fmt.Errorf("failed to check %q: %v", d.FullPath(), err)
+ }
+ if !exists {
+ return false, nil
+ }
+ }
+ return true, nil
+}
+
+// AllAbsent returns true if all PKI files (cert, key, CA cert) are missing from the backing
+// store.
+func (p *PKIDirectory) AllAbsent() (bool, error) {
+ for _, d := range []*declarative.File{&p.CACertificate, &p.Certificate, &p.Key} {
+ exists, err := d.Exists()
+ if err != nil {
+ return false, fmt.Errorf("failed to check %q: %v", d.FullPath(), err)
+ }
+ if exists {
+ return false, nil
+ }
+ }
+ return true, nil
+}
diff --git a/core/internal/localstorage/storage.go b/core/internal/localstorage/storage.go
index 1aab262..91153cf 100644
--- a/core/internal/localstorage/storage.go
+++ b/core/internal/localstorage/storage.go
@@ -39,9 +39,10 @@
type Root struct {
declarative.Directory
- ESP ESPDirectory `dir:"esp"`
- Data DataDirectory `dir:"data"`
- Etc EtcDirectory `dif:"etc"`
+ ESP ESPDirectory `dir:"esp"`
+ Data DataDirectory `dir:"data"`
+ Etc EtcDirectory `dir:"etc"`
+ Ephemeral EphemeralDirectory `dir:"ephemeral"`
}
type PKIDirectory struct {
@@ -77,16 +78,30 @@
// mounted is set by DataDirectory when it is mounted. It ensures it's only mounted once.
mounted bool
- Etcd struct {
- declarative.Directory
- MemberPKI PKIDirectory `dir:"member_pki"`
- } `dir:"etcd"`
+ Etcd DataEtcdDirectory `dir:"etcd"`
Node PKIDirectory `dir:"node_pki"`
Volumes declarative.Directory `dir:"volumes"`
}
+type DataEtcdDirectory struct {
+ declarative.Directory
+ PeerPKI PKIDirectory `dir:"peer_pki"`
+ PeerCRL declarative.File `file:"peer_crl"`
+ Data declarative.Directory `dir:"data"`
+}
+
type EtcDirectory struct {
declarative.Directory
Hosts declarative.File `file:"hosts"`
MachineID declarative.File `file:"machine_id"`
}
+
+type EphemeralDirectory struct {
+ declarative.Directory
+ Consensus EphemeralConsensusDirectory `dir:"consensus"`
+}
+
+type EphemeralConsensusDirectory struct {
+ declarative.Directory
+ ClientSocket declarative.File `file:"client.sock"`
+}