Add nanoswitch and cluster testing
Adds nanoswitch and the `switched-multi2` launch target to launch two Smalltown instances on a switched
network and enroll them into a single cluster. Nanoswitch contains a Linux bridge and a minimal DHCP server
and connects to the two Smalltown instances over virtual Ethernet cables. Also moves out the DHCP client into
a package since nanoswitch needs it.
Test Plan:
Manually tested using `bazel run //:launch -- switched-multi2` and observing that the second VM
(whose serial port is mapped to stdout) prints that it is enrolled. Also validated by `bazel run //core/cmd/dbg -- kubectl get node -o wide` returning two ready nodes.
X-Origin-Diff: phab/D572
GitOrigin-RevId: 9f6e2b3d8268749dd81588205646ae3976ad14b3
diff --git a/core/internal/consensus/ca/ca.go b/core/internal/consensus/ca/ca.go
index 5952b6f..20f7c31 100644
--- a/core/internal/consensus/ca/ca.go
+++ b/core/internal/consensus/ca/ca.go
@@ -35,6 +35,7 @@
"errors"
"fmt"
"math/big"
+ "net"
"time"
)
@@ -150,7 +151,7 @@
}
// IssueCertificate issues a certificate
-func (ca *CA) IssueCertificate(commonName string) (cert []byte, privkey []byte, err error) {
+func (ca *CA) IssueCertificate(commonName string, ip net.IP) (cert []byte, privkey []byte, err error) {
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 127)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
@@ -179,6 +180,7 @@
NotAfter: unknownNotAfter,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
DNSNames: []string{commonName},
+ IPAddresses: []net.IP{ip},
}
cert, err = x509.CreateCertificate(rand.Reader, etcdCert, ca.CACert, pubKey, ca.PrivateKey)
return
diff --git a/core/internal/consensus/consensus.go b/core/internal/consensus/consensus.go
index d401c1a..5885aa8 100644
--- a/core/internal/consensus/consensus.go
+++ b/core/internal/consensus/consensus.go
@@ -21,11 +21,13 @@
"bytes"
"context"
"crypto/x509"
+ "encoding/binary"
"encoding/hex"
"encoding/pem"
"fmt"
"io/ioutil"
"math/rand"
+ "net"
"net/url"
"os"
"path"
@@ -112,6 +114,10 @@
return consensusServer, nil
}
+func peerURL(host string) url.URL {
+ return url.URL{Scheme: "https", Host: fmt.Sprintf("%s:%d", host, common.ConsensusPort)}
+}
+
func (s *Service) OnStart() error {
// See: https://godoc.org/github.com/coreos/etcd/embed#Config
@@ -143,19 +149,8 @@
}
cfg.LCUrls = []url.URL{*listenerURL}
- // Advertise Peer URLs
- apURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ExternalHost, common.ConsensusPort))
- if err != nil {
- return fmt.Errorf("invalid external_host or listen_port: %w", err)
- }
-
- // Listen Peer URLs
- lpURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ListenHost, common.ConsensusPort))
- if err != nil {
- return fmt.Errorf("invalid listen_host or listen_port: %w", err)
- }
- cfg.APUrls = []url.URL{*apURL}
- cfg.LPUrls = []url.URL{*lpURL}
+ cfg.APUrls = []url.URL{peerURL(s.config.ExternalHost)}
+ cfg.LPUrls = []url.URL{peerURL(s.config.ListenHost)}
cfg.ACUrls = []url.URL{}
cfg.Dir = s.config.DataDir
@@ -200,6 +195,8 @@
// Start CRL watcher
go s.watchCRL()
+ ctx := context.TODO()
+ go s.autoPromote(ctx)
return nil
}
@@ -227,13 +224,13 @@
}
// PrecreateCA generates the etcd cluster certificate authority and writes it to local storage.
-func (s *Service) PrecreateCA() error {
+func (s *Service) PrecreateCA(extIP net.IP) error {
// Provision an etcd CA
etcdRootCA, err := ca.New("Smalltown etcd Root CA")
if err != nil {
return err
}
- cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost)
+ cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost, extIP)
if err != nil {
return fmt.Errorf("failed to self-issue a certificate: %w", err)
}
@@ -326,30 +323,61 @@
return idCA, crlRevision, nil
}
-func (s *Service) IssueCertificate(hostname string) (*api.ConsensusCertificates, error) {
+// ProvisionMember sets up and returns provisioning data to join another node into the consensus.
+// It issues PKI material, creates a static cluster bootstrap specification string (known as initial-cluster in etcd)
+// and adds the new node as a learner (non-voting) member to the cluster. Once the new node has caught up with the
+// cluster it is automatically promoted to a voting member by the autoPromote process.
+func (s *Service) ProvisionMember(name string, ip net.IP) (*api.ConsensusCertificates, string, error) {
idCA, _, err := s.getCAFromEtcd()
if err != nil {
- return nil, err
+ return nil, "", fmt.Errorf("failed to get consensus CA: %w", err)
}
- cert, key, err := idCA.IssueCertificate(hostname)
+ cert, key, err := idCA.IssueCertificate(name, ip)
if err != nil {
- return nil, fmt.Errorf("failed to issue certificate: %w", err)
+ return nil, "", fmt.Errorf("failed to issue certificate: %w", err)
}
certVal, err := x509.ParseCertificate(cert)
if err != nil {
- return nil, err
+ return nil, "", fmt.Errorf("failed to parse just-issued consensus cert: %w", err)
}
serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(cert)); err != nil {
// We issued a certificate, but failed to persist it. Return an error and forget it ever happened.
- return nil, fmt.Errorf("failed to persist certificate: %w", err)
+ return nil, "", fmt.Errorf("failed to persist certificate: %w", err)
+ }
+
+ currentMembers := s.etcd.Server.Cluster().Members()
+ var memberStrs []string
+ for _, member := range currentMembers {
+ memberStrs = append(memberStrs, fmt.Sprintf("%v=%v", member.Name, member.PickPeerURL()))
+ }
+ apURL := peerURL(ip.String())
+ memberStrs = append(memberStrs, fmt.Sprintf("%s=%s", name, apURL.String()))
+
+ pubKeyPrefix, err := common.IDKeyPrefixFromName(name)
+ if err != nil {
+ return nil, "", fmt.Errorf("invalid new node name: %v", err)
+ }
+
+ crl, _, err := s.etcdGetSingle(crlPathEtcd)
+
+ _, err = s.etcd.Server.AddMember(context.Background(), membership.Member{
+ RaftAttributes: membership.RaftAttributes{
+ PeerURLs: types.URLs{apURL}.StringSlice(),
+ IsLearner: true,
+ },
+ Attributes: membership.Attributes{Name: name},
+ ID: types.ID(binary.BigEndian.Uint64(pubKeyPrefix[:8])),
+ })
+ if err != nil {
+ return nil, "", fmt.Errorf("failed to provision member: %w", err)
}
return &api.ConsensusCertificates{
Ca: idCA.CACertRaw,
Cert: cert,
- Crl: idCA.CRLRaw,
+ Crl: crl,
Key: key,
- }, nil
+ }, strings.Join(memberStrs, ","), nil
}
func (s *Service) RevokeCertificate(hostname string) error {
@@ -416,6 +444,32 @@
}
}
+// autoPromote automatically promotes learning (non-voting) members to voting members. etcd currently lacks auto-promote
+// capabilities (https://github.com/etcd-io/etcd/issues/10537) so we need to do this ourselves.
+func (s *Service) autoPromote(ctx context.Context) {
+ promoteTicker := time.NewTicker(5 * time.Second)
+ go func() {
+ <-ctx.Done()
+ promoteTicker.Stop()
+ }()
+ for range promoteTicker.C {
+ if s.etcd.Server.Leader() != s.etcd.Server.ID() {
+ continue
+ }
+ for _, member := range s.etcd.Server.Cluster().Members() {
+ if member.IsLearner {
+ // We always call PromoteMember since the metadata necessary to decide if we should is private.
+ // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
+ // connected or are still behind on transactions.
+ if _, err := s.etcd.Server.PromoteMember(context.Background(), uint64(member.ID)); err != nil {
+ s.Logger.Info("Failed to promote consensus node", zap.String("node", member.Name), zap.Error(err))
+ }
+ s.Logger.Info("Promoted new consensus node", zap.String("node", member.Name))
+ }
+ }
+ }
+}
+
func (s *Service) OnStop() error {
s.watchCRLTicker.Stop()
s.etcd.Close()
@@ -435,33 +489,6 @@
return s.ready.Load()
}
-// AddMember adds a new etcd member to the cluster
-func (s *Service) AddMember(ctx context.Context, name string, url string) (uint64, error) {
- urls, err := types.NewURLs([]string{url})
- if err != nil {
- return 0, err
- }
-
- member := membership.NewMember(name, urls, DefaultClusterToken, nil)
-
- _, err = s.etcd.Server.AddMember(ctx, *member)
- if err != nil {
- return 0, err
- }
-
- return uint64(member.ID), nil
-}
-
-// RemoveMember removes a member from the etcd cluster
-func (s *Service) RemoveMember(ctx context.Context, id uint64) error {
- _, err := s.etcd.Server.RemoveMember(ctx, id)
- return err
-}
-
-// Health returns the current cluster health
-func (s *Service) Health() {
-}
-
// GetConfig returns the current consensus config
func (s *Service) GetConfig() Config {
return *s.config
@@ -472,39 +499,6 @@
s.config = &config
}
-// GetInitialClusterString returns the InitialCluster string that can be used to bootstrap a consensus node
-func (s *Service) GetInitialClusterString() string {
- members := s.etcd.Server.Cluster().Members()
- clusterString := strings.Builder{}
-
- for i, m := range members {
- if i != 0 {
- clusterString.WriteString(",")
- }
- clusterString.WriteString(m.Name)
- clusterString.WriteString("=")
- clusterString.WriteString(m.PickPeerURL())
- }
-
- return clusterString.String()
-}
-
-// GetNodes returns a list of consensus nodes
-func (s *Service) GetNodes() []Member {
- members := s.etcd.Server.Cluster().Members()
- cMembers := make([]Member, len(members))
- for i, m := range members {
- cMembers[i] = Member{
- ID: uint64(m.ID),
- Name: m.Name,
- Address: m.PickPeerURL(),
- Synced: !m.IsLearner,
- }
- }
-
- return cMembers
-}
-
func (s *Service) GetStore(module, space string) clientv3.KV {
return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
}