core -> metropolis
Smalltown is now called Metropolis!
This is the first commit in a series of cleanup commits that prepare us
for an open source release. This one just some Bazel packages around to
follow a stricter directory layout.
All of Metropolis now lives in `//metropolis`.
All of Metropolis Node code now lives in `//metropolis/node`.
All of the main /init now lives in `//m/n/core`.
All of the Kubernetes functionality/glue now lives in `//m/n/kubernetes`.
Next steps:
- hunt down all references to Smalltown and replace them appropriately
- narrow down visibility rules
- document new code organization
- move `//build/toolchain` to `//monogon/build/toolchain`
- do another cleanup pass between `//golibs` and
`//monogon/node/{core,common}`.
- remove `//delta` and `//anubis`
Fixes T799.
Test Plan: Just a very large refactor. CI should help us out here.
Bug: T799
X-Origin-Diff: phab/D667
GitOrigin-RevId: 6029b8d4edc42325d50042596b639e8b122d0ded
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
new file mode 100644
index 0000000..8916164
--- /dev/null
+++ b/metropolis/node/core/consensus/consensus.go
@@ -0,0 +1,429 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package consensus implements a managed etcd cluster member service, with a self-hosted CA system for issuing peer
+// certificates. Currently each Smalltown node runs an etcd member, and connects to the etcd member locally over a unix
+// domain socket.
+//
+// The service supports two modes of startup:
+// - initializing a new cluster, by bootstrapping the CA in memory, starting a cluster, committing the CA to etcd
+// afterwards, and saving the new node's certificate to local storage
+// - joining an existing cluster, using certificates from local storage and loading the CA from etcd. This flow is also
+// used when the node joins a cluster for the first time (then the certificates required must be provisioned
+// externally before starting the consensus service).
+//
+// Regardless of how the etcd member service was started, the resulting running service is further managed and used
+// in the same way.
+//
+package consensus
+
+import (
+ "context"
+ "encoding/pem"
+ "fmt"
+ "net"
+ "net/url"
+ "sync"
+ "time"
+
+ "go.etcd.io/etcd/clientv3"
+ "go.etcd.io/etcd/clientv3/namespace"
+ "go.etcd.io/etcd/embed"
+ "go.uber.org/atomic"
+
+ common "git.monogon.dev/source/nexantic.git/metropolis/node"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/consensus/ca"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
+)
+
+const (
+ DefaultClusterToken = "SIGNOS"
+ DefaultLogger = "zap"
+)
+
+// Service is the etcd cluster member service.
+type Service struct {
+ // The configuration with which the service was started. This is immutable.
+ config *Config
+
+ // stateMu guards state. This is locked internally on public methods of Service that require access to state. The
+ // state might be recreated on service restart.
+ stateMu sync.Mutex
+ state *state
+}
+
+// state is the runtime state of a running etcd member.
+type state struct {
+ etcd *embed.Etcd
+ ready atomic.Bool
+
+ ca *ca.CA
+ // cl is an etcd client that loops back to the localy running etcd server. This runs over the Client unix domain
+ // socket that etcd starts.
+ cl *clientv3.Client
+}
+
+type Config struct {
+ // Data directory (persistent, encrypted storage) for etcd.
+ Data *localstorage.DataEtcdDirectory
+ // Ephemeral directory for etcd.
+ Ephemeral *localstorage.EphemeralConsensusDirectory
+
+ // Name is the cluster name. This must be the same amongst all etcd members within one cluster.
+ Name string
+ // NewCluster selects whether the etcd member will start a new cluster and bootstrap a CA and the first member
+ // certificate, or load existing PKI certificates from disk.
+ NewCluster bool
+ // InitialCluster sets the initial cluster peer URLs when NewCluster is set, and is ignored otherwise. Usually this
+ // will be just the new, single server, and more members will be added later.
+ InitialCluster string
+ // ExternalHost is the IP address or hostname at which this cluster member is reachable to other cluster members.
+ ExternalHost string
+ // ListenHost is the IP address or hostname at which this cluster member will listen.
+ ListenHost string
+ // Port is the port at which this cluster member will listen for other members. If zero, defaults to the global
+ // Smalltown setting.
+ Port int
+}
+
+func New(config Config) *Service {
+ return &Service{
+ config: &config,
+ }
+}
+
+// configure transforms the service configuration into an embedded etcd configuration. This is pure and side effect
+// free.
+func (s *Service) configure(ctx context.Context) (*embed.Config, error) {
+ if err := s.config.Ephemeral.MkdirAll(0700); err != nil {
+ return nil, fmt.Errorf("failed to create ephemeral directory: %w", err)
+ }
+ if err := s.config.Data.MkdirAll(0700); err != nil {
+ return nil, fmt.Errorf("failed to create data directory: %w", err)
+ }
+
+ port := s.config.Port
+ if port == 0 {
+ port = common.ConsensusPort
+ }
+
+ cfg := embed.NewConfig()
+
+ cfg.Name = s.config.Name
+ cfg.Dir = s.config.Data.Data.FullPath()
+ cfg.InitialClusterToken = DefaultClusterToken
+
+ cfg.PeerTLSInfo.CertFile = s.config.Data.PeerPKI.Certificate.FullPath()
+ cfg.PeerTLSInfo.KeyFile = s.config.Data.PeerPKI.Key.FullPath()
+ cfg.PeerTLSInfo.TrustedCAFile = s.config.Data.PeerPKI.CACertificate.FullPath()
+ cfg.PeerTLSInfo.ClientCertAuth = true
+ cfg.PeerTLSInfo.CRLFile = s.config.Data.PeerCRL.FullPath()
+
+ cfg.LCUrls = []url.URL{{
+ Scheme: "unix",
+ Path: s.config.Ephemeral.ClientSocket.FullPath() + ":0",
+ }}
+ cfg.ACUrls = []url.URL{}
+ cfg.LPUrls = []url.URL{{
+ Scheme: "https",
+ Host: fmt.Sprintf("%s:%d", s.config.ListenHost, port),
+ }}
+ cfg.APUrls = []url.URL{{
+ Scheme: "https",
+ Host: fmt.Sprintf("%s:%d", s.config.ExternalHost, port),
+ }}
+
+ if s.config.NewCluster {
+ cfg.ClusterState = "new"
+ cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
+ } else if s.config.InitialCluster != "" {
+ cfg.ClusterState = "existing"
+ cfg.InitialCluster = s.config.InitialCluster
+ }
+
+ // TODO(q3k): pipe logs from etcd to supervisor.RawLogger via a file.
+ cfg.Logger = DefaultLogger
+ cfg.LogOutputs = []string{"stderr"}
+
+ return cfg, nil
+}
+
+// Run is a Supervisor runnable that starts the etcd member service. It will become healthy once the member joins the
+// cluster successfully.
+func (s *Service) Run(ctx context.Context) error {
+ st := &state{
+ ready: *atomic.NewBool(false),
+ }
+ s.stateMu.Lock()
+ s.state = st
+ s.stateMu.Unlock()
+
+ if s.config.NewCluster {
+ // Expect certificate to be absent from disk.
+ absent, err := s.config.Data.PeerPKI.AllAbsent()
+ if err != nil {
+ return fmt.Errorf("checking certificate existence: %w", err)
+ }
+ if !absent {
+ return fmt.Errorf("want new cluster, but certificates already exist on disk")
+ }
+
+ // Generate CA, keep in memory, write it down in etcd later.
+ st.ca, err = ca.New("Smalltown etcd peer Root CA")
+ if err != nil {
+ return fmt.Errorf("when creating new cluster's peer CA: %w", err)
+ }
+
+ ip := net.ParseIP(s.config.ExternalHost)
+ if ip == nil {
+ return fmt.Errorf("configued external host is not an IP address (got %q)", s.config.ExternalHost)
+ }
+
+ cert, key, err := st.ca.Issue(ctx, nil, s.config.Name, ip)
+ if err != nil {
+ return fmt.Errorf("when issuing new cluster's first certificate: %w", err)
+ }
+
+ if err := s.config.Data.PeerPKI.MkdirAll(0600); err != nil {
+ return fmt.Errorf("when creating PKI directory: %w", err)
+ }
+ if err := s.config.Data.PeerPKI.CACertificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: st.ca.CACertRaw}), 0600); err != nil {
+ return fmt.Errorf("when writing CA certificate to disk: %w", err)
+ }
+ if err := s.config.Data.PeerPKI.Certificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}), 0600); err != nil {
+ return fmt.Errorf("when writing certificate to disk: %w", err)
+ }
+ if err := s.config.Data.PeerPKI.Key.Write(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key}), 0600); err != nil {
+ return fmt.Errorf("when writing certificate to disk: %w", err)
+ }
+ } else {
+ // Expect certificate to be present on disk.
+ present, err := s.config.Data.PeerPKI.AllExist()
+ if err != nil {
+ return fmt.Errorf("checking certificate existence: %w", err)
+ }
+ if !present {
+ return fmt.Errorf("want existing cluster, but certificate is missing from disk")
+ }
+ }
+
+ if err := s.config.Data.MkdirAll(0600); err != nil {
+ return fmt.Errorf("failed to create data directory; %w", err)
+ }
+
+ cfg, err := s.configure(ctx)
+ if err != nil {
+ return fmt.Errorf("when configuring etcd: %w", err)
+ }
+
+ server, err := embed.StartEtcd(cfg)
+ keep := false
+ defer func() {
+ if !keep && server != nil {
+ server.Close()
+ }
+ }()
+ if err != nil {
+ return fmt.Errorf("failed to start etcd: %w", err)
+ }
+ st.etcd = server
+
+ supervisor.Logger(ctx).Info("waiting for etcd...")
+
+ okay := true
+ select {
+ case <-st.etcd.Server.ReadyNotify():
+ case <-ctx.Done():
+ okay = false
+ }
+
+ if !okay {
+ supervisor.Logger(ctx).Info("context done, aborting wait")
+ return ctx.Err()
+ }
+
+ socket := s.config.Ephemeral.ClientSocket.FullPath()
+ cl, err := clientv3.New(clientv3.Config{
+ Endpoints: []string{fmt.Sprintf("unix://%s:0", socket)},
+ DialTimeout: time.Second,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to connect to new etcd instance: %w", err)
+ }
+ st.cl = cl
+
+ if s.config.NewCluster {
+ if st.ca == nil {
+ panic("peerCA has not been generated")
+ }
+
+ // Save new CA into etcd.
+ err = st.ca.Save(ctx, cl.KV)
+ if err != nil {
+ return fmt.Errorf("failed to save new CA to etcd: %w", err)
+ }
+ } else {
+ // Load existing CA from etcd.
+ st.ca, err = ca.Load(ctx, cl.KV)
+ if err != nil {
+ return fmt.Errorf("failed to load CA from etcd: %w", err)
+ }
+ }
+
+ // Start CRL watcher.
+ if err := supervisor.Run(ctx, "crl", s.watchCRL); err != nil {
+ return fmt.Errorf("failed to start CRL watcher: %w", err)
+ }
+ // Start autopromoter.
+ if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
+ return fmt.Errorf("failed to start autopromoter: %w", err)
+ }
+
+ supervisor.Logger(ctx).Info("etcd is now ready")
+ keep = true
+ st.ready.Store(true)
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ <-ctx.Done()
+ st.etcd.Close()
+ return ctx.Err()
+}
+
+// watchCRL is a sub-runnable of the etcd cluster member service that updates the on-local-storage CRL to match the
+// newest available version in etcd.
+func (s *Service) watchCRL(ctx context.Context) error {
+ s.stateMu.Lock()
+ cl := s.state.cl
+ ca := s.state.ca
+ s.stateMu.Unlock()
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ for e := range ca.WaitCRLChange(ctx, cl.KV, cl.Watcher) {
+ if e.Err != nil {
+ return fmt.Errorf("watching CRL: %w", e.Err)
+ }
+
+ if err := s.config.Data.PeerCRL.Write(e.CRL, 0600); err != nil {
+ return fmt.Errorf("saving CRL: %w", err)
+ }
+ }
+
+ // unreachable
+ return nil
+}
+
+func (s *Service) autopromoter(ctx context.Context) error {
+ t := time.NewTicker(5 * time.Second)
+ defer t.Stop()
+
+ autopromote := func() {
+ s.stateMu.Lock()
+ st := s.state
+ s.stateMu.Unlock()
+
+ if st.etcd.Server.Leader() != st.etcd.Server.ID() {
+ return
+ }
+
+ for _, member := range st.etcd.Server.Cluster().Members() {
+ if !member.IsLearner {
+ continue
+ }
+
+ // We always call PromoteMember since the metadata necessary to decide if we should is private.
+ // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
+ // connected or are still behind on transactions.
+ if _, err := st.etcd.Server.PromoteMember(ctx, uint64(member.ID)); err != nil {
+ supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
+ } else {
+ supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
+ }
+ }
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-t.C:
+ autopromote()
+ }
+ }
+}
+
+// IsReady returns whether etcd is ready and synced
+func (s *Service) IsReady() bool {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ if s.state == nil {
+ return false
+ }
+ return s.state.ready.Load()
+}
+
+func (s *Service) WaitReady(ctx context.Context) error {
+ // TODO(q3k): reimplement the atomic ready flag as an event synchronization mechanism
+ if s.IsReady() {
+ return nil
+ }
+ t := time.NewTicker(100 * time.Millisecond)
+ defer t.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-t.C:
+ if s.IsReady() {
+ return nil
+ }
+ }
+ }
+}
+
+// KV returns and etcd KV client interface to the etcd member/cluster.
+func (s *Service) KV(module, space string) clientv3.KV {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
+}
+
+func (s *Service) KVRoot() clientv3.KV {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return s.state.cl.KV
+}
+
+func (s *Service) Cluster() clientv3.Cluster {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return s.state.cl.Cluster
+}
+
+// MemberInfo returns information about this etcd cluster member: its ID and name. This will block until this
+// information is available (ie. the cluster status is Ready).
+func (s *Service) MemberInfo(ctx context.Context) (id uint64, name string, err error) {
+ if err = s.WaitReady(ctx); err != nil {
+ err = fmt.Errorf("when waiting for cluster readiness: %w", err)
+ return
+ }
+
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ id = uint64(s.state.etcd.Server.ID())
+ name = s.config.Name
+ return
+}