Add nanoswitch and cluster testing

Adds nanoswitch and the `switched-multi2` launch target to launch two Smalltown instances on a switched
network and enroll them into a single cluster. Nanoswitch contains a Linux bridge and a minimal DHCP server
and connects to the two Smalltown instances over virtual Ethernet cables. Also moves out the DHCP client into
a package since nanoswitch needs it.

Test Plan:
Manually tested using `bazel run //:launch -- switched-multi2` and observing that the second VM
(whose serial port is mapped to stdout) prints that it is enrolled. Also validated by `bazel run //core/cmd/dbg -- kubectl get node -o wide` returning two ready nodes.

X-Origin-Diff: phab/D572
GitOrigin-RevId: 9f6e2b3d8268749dd81588205646ae3976ad14b3
diff --git a/core/internal/consensus/ca/ca.go b/core/internal/consensus/ca/ca.go
index 5952b6f..20f7c31 100644
--- a/core/internal/consensus/ca/ca.go
+++ b/core/internal/consensus/ca/ca.go
@@ -35,6 +35,7 @@
 	"errors"
 	"fmt"
 	"math/big"
+	"net"
 	"time"
 )
 
@@ -150,7 +151,7 @@
 }
 
 // IssueCertificate issues a certificate
-func (ca *CA) IssueCertificate(commonName string) (cert []byte, privkey []byte, err error) {
+func (ca *CA) IssueCertificate(commonName string, ip net.IP) (cert []byte, privkey []byte, err error) {
 	serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 127)
 	serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
 	if err != nil {
@@ -179,6 +180,7 @@
 		NotAfter:              unknownNotAfter,
 		ExtKeyUsage:           []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
 		DNSNames:              []string{commonName},
+		IPAddresses:           []net.IP{ip},
 	}
 	cert, err = x509.CreateCertificate(rand.Reader, etcdCert, ca.CACert, pubKey, ca.PrivateKey)
 	return
diff --git a/core/internal/consensus/consensus.go b/core/internal/consensus/consensus.go
index d401c1a..5885aa8 100644
--- a/core/internal/consensus/consensus.go
+++ b/core/internal/consensus/consensus.go
@@ -21,11 +21,13 @@
 	"bytes"
 	"context"
 	"crypto/x509"
+	"encoding/binary"
 	"encoding/hex"
 	"encoding/pem"
 	"fmt"
 	"io/ioutil"
 	"math/rand"
+	"net"
 	"net/url"
 	"os"
 	"path"
@@ -112,6 +114,10 @@
 	return consensusServer, nil
 }
 
+func peerURL(host string) url.URL {
+	return url.URL{Scheme: "https", Host: fmt.Sprintf("%s:%d", host, common.ConsensusPort)}
+}
+
 func (s *Service) OnStart() error {
 	// See: https://godoc.org/github.com/coreos/etcd/embed#Config
 
@@ -143,19 +149,8 @@
 	}
 	cfg.LCUrls = []url.URL{*listenerURL}
 
-	// Advertise Peer URLs
-	apURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ExternalHost, common.ConsensusPort))
-	if err != nil {
-		return fmt.Errorf("invalid external_host or listen_port: %w", err)
-	}
-
-	// Listen Peer URLs
-	lpURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ListenHost, common.ConsensusPort))
-	if err != nil {
-		return fmt.Errorf("invalid listen_host or listen_port: %w", err)
-	}
-	cfg.APUrls = []url.URL{*apURL}
-	cfg.LPUrls = []url.URL{*lpURL}
+	cfg.APUrls = []url.URL{peerURL(s.config.ExternalHost)}
+	cfg.LPUrls = []url.URL{peerURL(s.config.ListenHost)}
 	cfg.ACUrls = []url.URL{}
 
 	cfg.Dir = s.config.DataDir
@@ -200,6 +195,8 @@
 
 	// Start CRL watcher
 	go s.watchCRL()
+	ctx := context.TODO()
+	go s.autoPromote(ctx)
 
 	return nil
 }
@@ -227,13 +224,13 @@
 }
 
 // PrecreateCA generates the etcd cluster certificate authority and writes it to local storage.
-func (s *Service) PrecreateCA() error {
+func (s *Service) PrecreateCA(extIP net.IP) error {
 	// Provision an etcd CA
 	etcdRootCA, err := ca.New("Smalltown etcd Root CA")
 	if err != nil {
 		return err
 	}
-	cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost)
+	cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost, extIP)
 	if err != nil {
 		return fmt.Errorf("failed to self-issue a certificate: %w", err)
 	}
@@ -326,30 +323,61 @@
 	return idCA, crlRevision, nil
 }
 
-func (s *Service) IssueCertificate(hostname string) (*api.ConsensusCertificates, error) {
+// ProvisionMember sets up and returns provisioning data to join another node into the consensus.
+// It issues PKI material, creates a static cluster bootstrap specification string (known as initial-cluster in etcd)
+// and adds the new node as a learner (non-voting) member to the cluster. Once the new node has caught up with the
+// cluster it is automatically promoted to a voting member by the autoPromote process.
+func (s *Service) ProvisionMember(name string, ip net.IP) (*api.ConsensusCertificates, string, error) {
 	idCA, _, err := s.getCAFromEtcd()
 	if err != nil {
-		return nil, err
+		return nil, "", fmt.Errorf("failed to get consensus CA: %w", err)
 	}
-	cert, key, err := idCA.IssueCertificate(hostname)
+	cert, key, err := idCA.IssueCertificate(name, ip)
 	if err != nil {
-		return nil, fmt.Errorf("failed to issue certificate: %w", err)
+		return nil, "", fmt.Errorf("failed to issue certificate: %w", err)
 	}
 	certVal, err := x509.ParseCertificate(cert)
 	if err != nil {
-		return nil, err
+		return nil, "", fmt.Errorf("failed to parse just-issued consensus cert: %w", err)
 	}
 	serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
 	if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(cert)); err != nil {
 		// We issued a certificate, but failed to persist it. Return an error and forget it ever happened.
-		return nil, fmt.Errorf("failed to persist certificate: %w", err)
+		return nil, "", fmt.Errorf("failed to persist certificate: %w", err)
+	}
+
+	currentMembers := s.etcd.Server.Cluster().Members()
+	var memberStrs []string
+	for _, member := range currentMembers {
+		memberStrs = append(memberStrs, fmt.Sprintf("%v=%v", member.Name, member.PickPeerURL()))
+	}
+	apURL := peerURL(ip.String())
+	memberStrs = append(memberStrs, fmt.Sprintf("%s=%s", name, apURL.String()))
+
+	pubKeyPrefix, err := common.IDKeyPrefixFromName(name)
+	if err != nil {
+		return nil, "", fmt.Errorf("invalid new node name: %v", err)
+	}
+
+	crl, _, err := s.etcdGetSingle(crlPathEtcd)
+
+	_, err = s.etcd.Server.AddMember(context.Background(), membership.Member{
+		RaftAttributes: membership.RaftAttributes{
+			PeerURLs:  types.URLs{apURL}.StringSlice(),
+			IsLearner: true,
+		},
+		Attributes: membership.Attributes{Name: name},
+		ID:         types.ID(binary.BigEndian.Uint64(pubKeyPrefix[:8])),
+	})
+	if err != nil {
+		return nil, "", fmt.Errorf("failed to provision member: %w", err)
 	}
 	return &api.ConsensusCertificates{
 		Ca:   idCA.CACertRaw,
 		Cert: cert,
-		Crl:  idCA.CRLRaw,
+		Crl:  crl,
 		Key:  key,
-	}, nil
+	}, strings.Join(memberStrs, ","), nil
 }
 
 func (s *Service) RevokeCertificate(hostname string) error {
@@ -416,6 +444,32 @@
 	}
 }
 
+// autoPromote automatically promotes learning (non-voting) members to voting members. etcd currently lacks auto-promote
+// capabilities (https://github.com/etcd-io/etcd/issues/10537) so we need to do this ourselves.
+func (s *Service) autoPromote(ctx context.Context) {
+	promoteTicker := time.NewTicker(5 * time.Second)
+	go func() {
+		<-ctx.Done()
+		promoteTicker.Stop()
+	}()
+	for range promoteTicker.C {
+		if s.etcd.Server.Leader() != s.etcd.Server.ID() {
+			continue
+		}
+		for _, member := range s.etcd.Server.Cluster().Members() {
+			if member.IsLearner {
+				// We always call PromoteMember since the metadata necessary to decide if we should is private.
+				// Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
+				// connected or are still behind on transactions.
+				if _, err := s.etcd.Server.PromoteMember(context.Background(), uint64(member.ID)); err != nil {
+					s.Logger.Info("Failed to promote consensus node", zap.String("node", member.Name), zap.Error(err))
+				}
+				s.Logger.Info("Promoted new consensus node", zap.String("node", member.Name))
+			}
+		}
+	}
+}
+
 func (s *Service) OnStop() error {
 	s.watchCRLTicker.Stop()
 	s.etcd.Close()
@@ -435,33 +489,6 @@
 	return s.ready.Load()
 }
 
-// AddMember adds a new etcd member to the cluster
-func (s *Service) AddMember(ctx context.Context, name string, url string) (uint64, error) {
-	urls, err := types.NewURLs([]string{url})
-	if err != nil {
-		return 0, err
-	}
-
-	member := membership.NewMember(name, urls, DefaultClusterToken, nil)
-
-	_, err = s.etcd.Server.AddMember(ctx, *member)
-	if err != nil {
-		return 0, err
-	}
-
-	return uint64(member.ID), nil
-}
-
-// RemoveMember removes a member from the etcd cluster
-func (s *Service) RemoveMember(ctx context.Context, id uint64) error {
-	_, err := s.etcd.Server.RemoveMember(ctx, id)
-	return err
-}
-
-// Health returns the current cluster health
-func (s *Service) Health() {
-}
-
 // GetConfig returns the current consensus config
 func (s *Service) GetConfig() Config {
 	return *s.config
@@ -472,39 +499,6 @@
 	s.config = &config
 }
 
-// GetInitialClusterString returns the InitialCluster string that can be used to bootstrap a consensus node
-func (s *Service) GetInitialClusterString() string {
-	members := s.etcd.Server.Cluster().Members()
-	clusterString := strings.Builder{}
-
-	for i, m := range members {
-		if i != 0 {
-			clusterString.WriteString(",")
-		}
-		clusterString.WriteString(m.Name)
-		clusterString.WriteString("=")
-		clusterString.WriteString(m.PickPeerURL())
-	}
-
-	return clusterString.String()
-}
-
-// GetNodes returns a list of consensus nodes
-func (s *Service) GetNodes() []Member {
-	members := s.etcd.Server.Cluster().Members()
-	cMembers := make([]Member, len(members))
-	for i, m := range members {
-		cMembers[i] = Member{
-			ID:      uint64(m.ID),
-			Name:    m.Name,
-			Address: m.PickPeerURL(),
-			Synced:  !m.IsLearner,
-		}
-	}
-
-	return cMembers
-}
-
 func (s *Service) GetStore(module, space string) clientv3.KV {
 	return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
 }