Add nanoswitch and cluster testing
Adds nanoswitch and the `switched-multi2` launch target to launch two Smalltown instances on a switched
network and enroll them into a single cluster. Nanoswitch contains a Linux bridge and a minimal DHCP server
and connects to the two Smalltown instances over virtual Ethernet cables. Also moves out the DHCP client into
a package since nanoswitch needs it.
Test Plan:
Manually tested using `bazel run //:launch -- switched-multi2` and observing that the second VM
(whose serial port is mapped to stdout) prints that it is enrolled. Also validated by `bazel run //core/cmd/dbg -- kubectl get node -o wide` returning two ready nodes.
X-Origin-Diff: phab/D572
GitOrigin-RevId: 9f6e2b3d8268749dd81588205646ae3976ad14b3
diff --git a/core/internal/api/BUILD.bazel b/core/internal/api/BUILD.bazel
index e862340..2f25fe6 100644
--- a/core/internal/api/BUILD.bazel
+++ b/core/internal/api/BUILD.bazel
@@ -18,6 +18,7 @@
"//core/internal/consensus:go_default_library",
"//core/pkg/tpm:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
+ "@com_github_grpc_ecosystem_go_grpc_middleware//retry:go_default_library",
"@io_etcd_go_etcd//clientv3:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
diff --git a/core/internal/api/nodemanagement.go b/core/internal/api/nodemanagement.go
index 0a3614e..4bc4659 100644
--- a/core/internal/api/nodemanagement.go
+++ b/core/internal/api/nodemanagement.go
@@ -23,19 +23,26 @@
"crypto/rand"
"crypto/sha256"
"crypto/subtle"
+ "crypto/tls"
"crypto/x509"
"encoding/hex"
"errors"
"fmt"
"io"
+ "net"
+ "time"
"github.com/gogo/protobuf/proto"
+ grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"git.monogon.dev/source/nexantic.git/core/generated/api"
+ "git.monogon.dev/source/nexantic.git/core/internal/common"
"git.monogon.dev/source/nexantic.git/core/pkg/tpm"
)
@@ -53,7 +60,7 @@
return "", errors.New("invalid node identity certificate")
}
- return "smalltown-" + hex.EncodeToString([]byte(pubKey[:16])), nil
+ return common.NameFromIDKey(pubKey), nil
}
func (s *Server) registerNewNode(node *api.Node) error {
@@ -178,6 +185,42 @@
}})
}
+func (s *Server) dialNode(ctx context.Context, node *api.Node) (api.NodeServiceClient, error) {
+ masterID, err := s.loadMasterCert()
+ if err != nil {
+ return nil, err
+ }
+
+ secureTransport := &tls.Config{
+ Certificates: []tls.Certificate{*masterID},
+ InsecureSkipVerify: true,
+ // Critical function, please review any changes with care
+ // TODO(lorenz): Actively check that this actually provides the security guarantees that we need
+ VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
+ for _, cert := range rawCerts {
+ // X.509 certificates in DER can be compared like this since DER has a unique representation
+ // for each certificate.
+ if bytes.Equal(cert, node.IdCert) {
+ return nil
+ }
+ }
+ return errors.New("failed to find authorized Node certificate")
+ },
+ MinVersion: tls.VersionTLS13,
+ }
+ addr := net.IP(node.Address)
+ opts := []grpcretry.CallOption{
+ grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
+ }
+ clientCreds := grpc.WithTransportCredentials(credentials.NewTLS(secureTransport))
+ clientConn, err := grpc.DialContext(ctx, fmt.Sprintf("%v:%v", addr, common.NodeServicePort), clientCreds,
+ grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(opts...)))
+ if err != nil {
+ return nil, fmt.Errorf("failed to dial node service: %w", err)
+ }
+ return api.NewNodeServiceClient(clientConn), nil
+}
+
func (s *Server) NewTPM2NodeRegister(registerServer api.NodeManagementService_NewTPM2NodeRegisterServer) error {
registerReqContainer, err := registerServer.Recv()
if err != nil {
@@ -258,8 +301,17 @@
}
// TODO: Plug in policy engine here
+ idCert, err := x509.ParseCertificate(newNodeInfo.IdCert)
+ if err != nil {
+ return err
+ }
+ nodeIdPubKey, ok := idCert.PublicKey.(ed25519.PublicKey)
+ if !ok || len(nodeIdPubKey) != ed25519.PublicKeySize {
+ return status.Error(codes.InvalidArgument, "Invalid ID certificate public key")
+ }
node := api.Node{
+ Name: common.NameFromIDKey(nodeIdPubKey),
Address: newNodeInfo.Ip,
Integrity: &api.Node_Tpm2{Tpm2: &api.NodeTPM2{
AkPub: registerReq.AkPublic,
@@ -268,7 +320,7 @@
}},
GlobalUnlockKey: newNodeInfo.GlobalUnlockKey,
IdCert: newNodeInfo.IdCert,
- State: api.Node_UNININITALIZED,
+ State: api.Node_MASTER,
}
if err := s.registerNewNode(&node); err != nil {
@@ -276,5 +328,27 @@
return status.Error(codes.Internal, "failed to register node")
}
+ go func() {
+ ctx := context.Background()
+ nodeClient, err := s.dialNode(ctx, &node)
+ if err != nil {
+ s.Logger.Warn("Failed to join newly enrolled node", zap.Error(err))
+ return
+ }
+ newCerts, initialCluster, err := s.consensusService.ProvisionMember(node.Name, node.Address)
+ if err != nil {
+ s.Logger.Warn("Failed to join newly enrolled node", zap.Error(err))
+ return
+ }
+ _, err = nodeClient.JoinCluster(ctx, &api.JoinClusterRequest{
+ InitialCluster: initialCluster,
+ Certs: newCerts,
+ }, grpcretry.WithMax(10))
+ if err != nil {
+ s.Logger.Warn("Failed to join newly enrolled node", zap.Error(err))
+ return
+ }
+ }()
+
return nil
}
diff --git a/core/internal/common/setup.go b/core/internal/common/setup.go
index db00692..531b688 100644
--- a/core/internal/common/setup.go
+++ b/core/internal/common/setup.go
@@ -16,6 +16,13 @@
package common
+import (
+ "crypto/ed25519"
+ "encoding/hex"
+ "errors"
+ "strings"
+)
+
type (
SmalltownState string
)
@@ -38,3 +45,14 @@
// Node is fully provisioned.
StateJoined SmalltownState = "enrolled"
)
+
+func NameFromIDKey(pubKey ed25519.PublicKey) string {
+ return "smalltown-" + hex.EncodeToString(pubKey[:16])
+}
+
+func IDKeyPrefixFromName(name string) ([]byte, error) {
+ if !strings.HasPrefix(name, "smalltown-") {
+ return []byte{}, errors.New("invalid name")
+ }
+ return hex.DecodeString(strings.TrimPrefix(name, "smalltown-"))
+}
diff --git a/core/internal/consensus/ca/ca.go b/core/internal/consensus/ca/ca.go
index 5952b6f..20f7c31 100644
--- a/core/internal/consensus/ca/ca.go
+++ b/core/internal/consensus/ca/ca.go
@@ -35,6 +35,7 @@
"errors"
"fmt"
"math/big"
+ "net"
"time"
)
@@ -150,7 +151,7 @@
}
// IssueCertificate issues a certificate
-func (ca *CA) IssueCertificate(commonName string) (cert []byte, privkey []byte, err error) {
+func (ca *CA) IssueCertificate(commonName string, ip net.IP) (cert []byte, privkey []byte, err error) {
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 127)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
@@ -179,6 +180,7 @@
NotAfter: unknownNotAfter,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
DNSNames: []string{commonName},
+ IPAddresses: []net.IP{ip},
}
cert, err = x509.CreateCertificate(rand.Reader, etcdCert, ca.CACert, pubKey, ca.PrivateKey)
return
diff --git a/core/internal/consensus/consensus.go b/core/internal/consensus/consensus.go
index d401c1a..5885aa8 100644
--- a/core/internal/consensus/consensus.go
+++ b/core/internal/consensus/consensus.go
@@ -21,11 +21,13 @@
"bytes"
"context"
"crypto/x509"
+ "encoding/binary"
"encoding/hex"
"encoding/pem"
"fmt"
"io/ioutil"
"math/rand"
+ "net"
"net/url"
"os"
"path"
@@ -112,6 +114,10 @@
return consensusServer, nil
}
+func peerURL(host string) url.URL {
+ return url.URL{Scheme: "https", Host: fmt.Sprintf("%s:%d", host, common.ConsensusPort)}
+}
+
func (s *Service) OnStart() error {
// See: https://godoc.org/github.com/coreos/etcd/embed#Config
@@ -143,19 +149,8 @@
}
cfg.LCUrls = []url.URL{*listenerURL}
- // Advertise Peer URLs
- apURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ExternalHost, common.ConsensusPort))
- if err != nil {
- return fmt.Errorf("invalid external_host or listen_port: %w", err)
- }
-
- // Listen Peer URLs
- lpURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ListenHost, common.ConsensusPort))
- if err != nil {
- return fmt.Errorf("invalid listen_host or listen_port: %w", err)
- }
- cfg.APUrls = []url.URL{*apURL}
- cfg.LPUrls = []url.URL{*lpURL}
+ cfg.APUrls = []url.URL{peerURL(s.config.ExternalHost)}
+ cfg.LPUrls = []url.URL{peerURL(s.config.ListenHost)}
cfg.ACUrls = []url.URL{}
cfg.Dir = s.config.DataDir
@@ -200,6 +195,8 @@
// Start CRL watcher
go s.watchCRL()
+ ctx := context.TODO()
+ go s.autoPromote(ctx)
return nil
}
@@ -227,13 +224,13 @@
}
// PrecreateCA generates the etcd cluster certificate authority and writes it to local storage.
-func (s *Service) PrecreateCA() error {
+func (s *Service) PrecreateCA(extIP net.IP) error {
// Provision an etcd CA
etcdRootCA, err := ca.New("Smalltown etcd Root CA")
if err != nil {
return err
}
- cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost)
+ cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost, extIP)
if err != nil {
return fmt.Errorf("failed to self-issue a certificate: %w", err)
}
@@ -326,30 +323,61 @@
return idCA, crlRevision, nil
}
-func (s *Service) IssueCertificate(hostname string) (*api.ConsensusCertificates, error) {
+// ProvisionMember sets up and returns provisioning data to join another node into the consensus.
+// It issues PKI material, creates a static cluster bootstrap specification string (known as initial-cluster in etcd)
+// and adds the new node as a learner (non-voting) member to the cluster. Once the new node has caught up with the
+// cluster it is automatically promoted to a voting member by the autoPromote process.
+func (s *Service) ProvisionMember(name string, ip net.IP) (*api.ConsensusCertificates, string, error) {
idCA, _, err := s.getCAFromEtcd()
if err != nil {
- return nil, err
+ return nil, "", fmt.Errorf("failed to get consensus CA: %w", err)
}
- cert, key, err := idCA.IssueCertificate(hostname)
+ cert, key, err := idCA.IssueCertificate(name, ip)
if err != nil {
- return nil, fmt.Errorf("failed to issue certificate: %w", err)
+ return nil, "", fmt.Errorf("failed to issue certificate: %w", err)
}
certVal, err := x509.ParseCertificate(cert)
if err != nil {
- return nil, err
+ return nil, "", fmt.Errorf("failed to parse just-issued consensus cert: %w", err)
}
serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(cert)); err != nil {
// We issued a certificate, but failed to persist it. Return an error and forget it ever happened.
- return nil, fmt.Errorf("failed to persist certificate: %w", err)
+ return nil, "", fmt.Errorf("failed to persist certificate: %w", err)
+ }
+
+ currentMembers := s.etcd.Server.Cluster().Members()
+ var memberStrs []string
+ for _, member := range currentMembers {
+ memberStrs = append(memberStrs, fmt.Sprintf("%v=%v", member.Name, member.PickPeerURL()))
+ }
+ apURL := peerURL(ip.String())
+ memberStrs = append(memberStrs, fmt.Sprintf("%s=%s", name, apURL.String()))
+
+ pubKeyPrefix, err := common.IDKeyPrefixFromName(name)
+ if err != nil {
+ return nil, "", fmt.Errorf("invalid new node name: %v", err)
+ }
+
+ crl, _, err := s.etcdGetSingle(crlPathEtcd)
+
+ _, err = s.etcd.Server.AddMember(context.Background(), membership.Member{
+ RaftAttributes: membership.RaftAttributes{
+ PeerURLs: types.URLs{apURL}.StringSlice(),
+ IsLearner: true,
+ },
+ Attributes: membership.Attributes{Name: name},
+ ID: types.ID(binary.BigEndian.Uint64(pubKeyPrefix[:8])),
+ })
+ if err != nil {
+ return nil, "", fmt.Errorf("failed to provision member: %w", err)
}
return &api.ConsensusCertificates{
Ca: idCA.CACertRaw,
Cert: cert,
- Crl: idCA.CRLRaw,
+ Crl: crl,
Key: key,
- }, nil
+ }, strings.Join(memberStrs, ","), nil
}
func (s *Service) RevokeCertificate(hostname string) error {
@@ -416,6 +444,32 @@
}
}
+// autoPromote automatically promotes learning (non-voting) members to voting members. etcd currently lacks auto-promote
+// capabilities (https://github.com/etcd-io/etcd/issues/10537) so we need to do this ourselves.
+func (s *Service) autoPromote(ctx context.Context) {
+ promoteTicker := time.NewTicker(5 * time.Second)
+ go func() {
+ <-ctx.Done()
+ promoteTicker.Stop()
+ }()
+ for range promoteTicker.C {
+ if s.etcd.Server.Leader() != s.etcd.Server.ID() {
+ continue
+ }
+ for _, member := range s.etcd.Server.Cluster().Members() {
+ if member.IsLearner {
+ // We always call PromoteMember since the metadata necessary to decide if we should is private.
+ // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
+ // connected or are still behind on transactions.
+ if _, err := s.etcd.Server.PromoteMember(context.Background(), uint64(member.ID)); err != nil {
+ s.Logger.Info("Failed to promote consensus node", zap.String("node", member.Name), zap.Error(err))
+ }
+ s.Logger.Info("Promoted new consensus node", zap.String("node", member.Name))
+ }
+ }
+ }
+}
+
func (s *Service) OnStop() error {
s.watchCRLTicker.Stop()
s.etcd.Close()
@@ -435,33 +489,6 @@
return s.ready.Load()
}
-// AddMember adds a new etcd member to the cluster
-func (s *Service) AddMember(ctx context.Context, name string, url string) (uint64, error) {
- urls, err := types.NewURLs([]string{url})
- if err != nil {
- return 0, err
- }
-
- member := membership.NewMember(name, urls, DefaultClusterToken, nil)
-
- _, err = s.etcd.Server.AddMember(ctx, *member)
- if err != nil {
- return 0, err
- }
-
- return uint64(member.ID), nil
-}
-
-// RemoveMember removes a member from the etcd cluster
-func (s *Service) RemoveMember(ctx context.Context, id uint64) error {
- _, err := s.etcd.Server.RemoveMember(ctx, id)
- return err
-}
-
-// Health returns the current cluster health
-func (s *Service) Health() {
-}
-
// GetConfig returns the current consensus config
func (s *Service) GetConfig() Config {
return *s.config
@@ -472,39 +499,6 @@
s.config = &config
}
-// GetInitialClusterString returns the InitialCluster string that can be used to bootstrap a consensus node
-func (s *Service) GetInitialClusterString() string {
- members := s.etcd.Server.Cluster().Members()
- clusterString := strings.Builder{}
-
- for i, m := range members {
- if i != 0 {
- clusterString.WriteString(",")
- }
- clusterString.WriteString(m.Name)
- clusterString.WriteString("=")
- clusterString.WriteString(m.PickPeerURL())
- }
-
- return clusterString.String()
-}
-
-// GetNodes returns a list of consensus nodes
-func (s *Service) GetNodes() []Member {
- members := s.etcd.Server.Cluster().Members()
- cMembers := make([]Member, len(members))
- for i, m := range members {
- cMembers[i] = Member{
- ID: uint64(m.ID),
- Name: m.Name,
- Address: m.PickPeerURL(),
- Synced: !m.IsLearner,
- }
- }
-
- return cMembers
-}
-
func (s *Service) GetStore(module, space string) clientv3.KV {
return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
}
diff --git a/core/internal/network/BUILD.bazel b/core/internal/network/BUILD.bazel
index 7e45086..9eefc1b 100644
--- a/core/internal/network/BUILD.bazel
+++ b/core/internal/network/BUILD.bazel
@@ -2,16 +2,12 @@
go_library(
name = "go_default_library",
- srcs = [
- "dhcp.go",
- "main.go",
- ],
+ srcs = ["main.go"],
importpath = "git.monogon.dev/source/nexantic.git/core/internal/network",
visibility = ["//:__subpackages__"],
deps = [
"//core/internal/common/supervisor:go_default_library",
- "@com_github_insomniacslk_dhcp//dhcpv4:go_default_library",
- "@com_github_insomniacslk_dhcp//dhcpv4/nclient4:go_default_library",
+ "//core/internal/network/dhcp:go_default_library",
"@com_github_vishvananda_netlink//:go_default_library",
"@org_golang_x_sys//unix:go_default_library",
"@org_uber_go_zap//:go_default_library",
diff --git a/core/internal/network/dhcp/BUILD.bazel b/core/internal/network/dhcp/BUILD.bazel
new file mode 100644
index 0000000..40ac372
--- /dev/null
+++ b/core/internal/network/dhcp/BUILD.bazel
@@ -0,0 +1,15 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = ["dhcp.go"],
+ importpath = "git.monogon.dev/source/nexantic.git/core/internal/network/dhcp",
+ visibility = ["//core:__subpackages__"],
+ deps = [
+ "//core/internal/common/supervisor:go_default_library",
+ "@com_github_insomniacslk_dhcp//dhcpv4:go_default_library",
+ "@com_github_insomniacslk_dhcp//dhcpv4/nclient4:go_default_library",
+ "@com_github_vishvananda_netlink//:go_default_library",
+ "@org_uber_go_zap//:go_default_library",
+ ],
+)
diff --git a/core/internal/network/dhcp.go b/core/internal/network/dhcp/dhcp.go
similarity index 70%
rename from core/internal/network/dhcp.go
rename to core/internal/network/dhcp/dhcp.go
index 983c25c..0eef2cc 100644
--- a/core/internal/network/dhcp.go
+++ b/core/internal/network/dhcp/dhcp.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package network
+package dhcp
import (
"context"
@@ -29,48 +29,48 @@
"go.uber.org/zap"
)
-type dhcpClient struct {
+type Client struct {
reqC chan *dhcpStatusReq
}
-func newDHCPClient() *dhcpClient {
- return &dhcpClient{
+func New() *Client {
+ return &Client{
reqC: make(chan *dhcpStatusReq),
}
}
type dhcpStatusReq struct {
- resC chan *dhcpStatus
+ resC chan *Status
wait bool
}
-func (r *dhcpStatusReq) fulfill(s *dhcpStatus) {
+func (r *dhcpStatusReq) fulfill(s *Status) {
go func() {
r.resC <- s
}()
}
-// dhcpStatus is the IPv4 configuration provisioned via DHCP for a given interface. It does not necessarily represent
+// Status is the IPv4 configuration provisioned via DHCP for a given interface. It does not necessarily represent
// a configuration that is active or even valid.
-type dhcpStatus struct {
- // address is 'our' (the node's) IPv4 address on the network.
- address net.IPNet
- // gateway is the default gateway/router of this network, or 0.0.0.0 if none was given.
- gateway net.IP
- // dns is a list of IPv4 DNS servers to use.
- dns []net.IP
+type Status struct {
+ // Address is 'our' (the node's) IPv4 Address on the network.
+ Address net.IPNet
+ // Gateway is the default Gateway/router of this network, or 0.0.0.0 if none was given.
+ Gateway net.IP
+ // DNS is a list of IPv4 DNS servers to use.
+ DNS []net.IP
}
-func (s *dhcpStatus) String() string {
- return fmt.Sprintf("Address: %s, Gateway: %s, DNS: %v", s.address.String(), s.gateway.String(), s.dns)
+func (s *Status) String() string {
+ return fmt.Sprintf("Address: %s, Gateway: %s, DNS: %v", s.Address.String(), s.Gateway.String(), s.DNS)
}
-func (c *dhcpClient) run(iface netlink.Link) supervisor.Runnable {
+func (c *Client) Run(iface netlink.Link) supervisor.Runnable {
return func(ctx context.Context) error {
logger := supervisor.Logger(ctx)
- // Channel updated with address once one gets assigned/updated
- newC := make(chan *dhcpStatus)
+ // Channel updated with Address once one gets assigned/updated
+ newC := make(chan *Status)
// Status requests waiting for configuration
waiters := []*dhcpStatusReq{}
@@ -106,7 +106,7 @@
// We start at WAITING, once we get a current config we move to ASSIGNED
// Once this becomes more complex (ie. has to handle link state changes)
// this should grow into a real state machine.
- var current *dhcpStatus
+ var current *Status
logger.Info("DHCP client WAITING")
for {
select {
@@ -133,28 +133,28 @@
}
}
-// parseAck turns an internal status (from the dhcpv4 library) into a dhcpStatus
-func parseAck(ack *dhcpv4.DHCPv4) *dhcpStatus {
+// parseAck turns an internal Status (from the dhcpv4 library) into a Status
+func parseAck(ack *dhcpv4.DHCPv4) *Status {
address := net.IPNet{IP: ack.YourIPAddr, Mask: ack.SubnetMask()}
- // DHCP routers are optional - if none are provided, assume no router and set gateway to 0.0.0.0
- // (this makes gateway.IsUnspecified() == true)
+ // DHCP routers are optional - if none are provided, assume no router and set Gateway to 0.0.0.0
+ // (this makes Gateway.IsUnspecified() == true)
gateway, _, _ := net.ParseCIDR("0.0.0.0/0")
if routers := ack.Router(); len(routers) > 0 {
gateway = routers[0]
}
- return &dhcpStatus{
- address: address,
- gateway: gateway,
- dns: ack.DNS(),
+ return &Status{
+ Address: address,
+ Gateway: gateway,
+ DNS: ack.DNS(),
}
}
-// status returns the DHCP configuration requested from us by the local DHCP server.
-// If wait is true, this function will block until a DHCP configuration is available. Otherwise, a nil status may be
+// Status returns the DHCP configuration requested from us by the local DHCP server.
+// If wait is true, this function will block until a DHCP configuration is available. Otherwise, a nil Status may be
// returned to indicate that no configuration has been received yet.
-func (c *dhcpClient) status(ctx context.Context, wait bool) (*dhcpStatus, error) {
- resC := make(chan *dhcpStatus)
+func (c *Client) Status(ctx context.Context, wait bool) (*Status, error) {
+ resC := make(chan *Status)
c.reqC <- &dhcpStatusReq{
resC: resC,
wait: wait,
diff --git a/core/internal/network/main.go b/core/internal/network/main.go
index 00d7fb2..2466e05 100644
--- a/core/internal/network/main.go
+++ b/core/internal/network/main.go
@@ -22,11 +22,12 @@
"net"
"os"
- "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
-
"github.com/vishvananda/netlink"
"go.uber.org/zap"
"golang.org/x/sys/unix"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/network/dhcp"
)
const (
@@ -36,7 +37,7 @@
type Service struct {
config Config
- dhcp *dhcpClient
+ dhcp *dhcp.Client
logger *zap.Logger
}
@@ -47,7 +48,7 @@
func New(config Config) *Service {
return &Service{
config: config,
- dhcp: newDHCPClient(),
+ dhcp: dhcp.New(),
}
}
@@ -76,7 +77,7 @@
func (s *Service) addNetworkRoutes(link netlink.Link, addr net.IPNet, gw net.IP) error {
if err := netlink.AddrReplace(link, &netlink.Addr{IPNet: &addr}); err != nil {
- return err
+ return fmt.Errorf("failed to add DHCP address to network interface \"%v\": %w", link.Attrs().Name, err)
}
if gw.IsUnspecified() {
@@ -96,20 +97,20 @@
}
func (s *Service) useInterface(ctx context.Context, iface netlink.Link) error {
- err := supervisor.Run(ctx, "dhcp", s.dhcp.run(iface))
+ err := supervisor.Run(ctx, "dhcp", s.dhcp.Run(iface))
if err != nil {
return err
}
- status, err := s.dhcp.status(ctx, true)
+ status, err := s.dhcp.Status(ctx, true)
if err != nil {
- return fmt.Errorf("could not get DHCP status: %w", err)
+ return fmt.Errorf("could not get DHCP Status: %w", err)
}
- if err := setResolvconf(status.dns, []string{}); err != nil {
+ if err := setResolvconf(status.DNS, []string{}); err != nil {
s.logger.Warn("failed to set resolvconf", zap.Error(err))
}
- if err := s.addNetworkRoutes(iface, net.IPNet{IP: status.address.IP, Mask: status.address.Mask}, status.gateway); err != nil {
+ if err := s.addNetworkRoutes(iface, status.Address, status.Gateway); err != nil {
s.logger.Warn("failed to add routes", zap.Error(err))
}
@@ -118,11 +119,11 @@
// GetIP returns the current IP (and optionally waits for one to be assigned)
func (s *Service) GetIP(ctx context.Context, wait bool) (*net.IP, error) {
- status, err := s.dhcp.status(ctx, wait)
+ status, err := s.dhcp.Status(ctx, wait)
if err != nil {
return nil, err
}
- return &status.address.IP, nil
+ return &status.Address.IP, nil
}
func (s *Service) Run(ctx context.Context) error {
diff --git a/core/internal/node/main.go b/core/internal/node/main.go
index 2cf88f4..8c40e9f 100644
--- a/core/internal/node/main.go
+++ b/core/internal/node/main.go
@@ -25,7 +25,6 @@
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
- "encoding/hex"
"errors"
"flag"
"fmt"
@@ -215,6 +214,10 @@
return err
}
+ if err := s.initNodeAPI(); err != nil {
+ return err
+ }
+
// We only support TPM2 at the moment, any abstractions here would be premature
trustAgent := tpm2.TPM2Agent{}
@@ -281,7 +284,11 @@
s.Consensus.SetConfig(config)
// Generate the cluster CA and store it to local storage.
- if err := s.Consensus.PrecreateCA(); err != nil {
+ extIP, err := s.Network.GetIP(ctx, true)
+ if err != nil {
+ return err
+ }
+ if err := s.Consensus.PrecreateCA(*extIP); err != nil {
return err
}
@@ -370,7 +377,7 @@
return []byte{}, "", fmt.Errorf("failed to write node key: %w", err)
}
- name := "smalltown-" + hex.EncodeToString([]byte(pubKey[:16]))
+ name := common.NameFromIDKey(pubKey)
// This has no SANs because it authenticates by public key, not by name
nodeCert := &x509.Certificate{
@@ -439,7 +446,7 @@
secureTransport := &tls.Config{
Certificates: []tls.Certificate{nodeID},
- ClientAuth: tls.RequireAndVerifyClientCert,
+ ClientAuth: tls.RequestClientCert,
InsecureSkipVerify: true,
// Critical function, please review any changes with care
// TODO(lorenz): Actively check that this actually provides the security guarantees that we need
@@ -451,6 +458,7 @@
return nil
}
}
+ s.logger.Warn("Rejecting NodeService connection with no trusted client certificate")
return errors.New("failed to find authorized NMS certificate")
},
MinVersion: tls.VersionTLS13,
@@ -470,6 +478,7 @@
panic(err) // Can only happen during initialization and is always fatal
}
}()
+
return nil
}
diff --git a/core/internal/node/setup.go b/core/internal/node/setup.go
index cbbfd4d..a9e841c 100644
--- a/core/internal/node/setup.go
+++ b/core/internal/node/setup.go
@@ -18,10 +18,11 @@
import (
"context"
- "errors"
"fmt"
+ "os"
- "go.uber.org/zap"
+ "git.monogon.dev/source/nexantic.git/core/internal/storage"
+
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -61,28 +62,34 @@
return &api.NewNodeInfo{
EnrolmentConfig: s.enrolmentConfig,
- Ip: []byte(*nodeIP),
+ Ip: *nodeIP,
IdCert: nodeCert,
GlobalUnlockKey: globalUnlockKey,
}, nodeID, nil
}
-func (s *SmalltownNode) JoinCluster(context context.Context, req *api.JoinClusterRequest) (*api.JoinClusterResponse, error) {
+func (s *SmalltownNode) JoinCluster(ctx context.Context, req *api.JoinClusterRequest) (*api.JoinClusterResponse, error) {
if s.state != common.StateEnrollMode {
return nil, ErrNotInJoinMode
}
s.logger.Info("Joining Consenus")
+ dataPath, err := s.Storage.GetPathInPlace(storage.PlaceData, "etcd")
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "Data partition not available: %v", err)
+ }
+
+ if err := os.MkdirAll(dataPath, 0600); err != nil {
+ return nil, status.Errorf(codes.Internal, "Cannot create path on data partition: %v", err)
+ }
+
config := s.Consensus.GetConfig()
config.Name = s.hostname
- config.InitialCluster = "default" // Clusters can't cross-join anyways due to cryptography
+ config.InitialCluster = req.InitialCluster
+ config.DataDir = dataPath
s.Consensus.SetConfig(config)
- var err error
- if err != nil {
- s.logger.Warn("Invalid JoinCluster request", zap.Error(err))
- return nil, errors.New("invalid join request")
- }
+
if err := s.Consensus.WriteCertificateFiles(req.Certs); err != nil {
return nil, err
}
@@ -94,6 +101,8 @@
}
s.state = common.StateJoined
+ go s.Containerd.Run()(context.TODO())
+ s.Kubernetes.Start()
s.logger.Info("Joined cluster. Node is now syncing.")
diff --git a/core/internal/storage/blockdev.go b/core/internal/storage/blockdev.go
index da3dcfa..fc556e1 100644
--- a/core/internal/storage/blockdev.go
+++ b/core/internal/storage/blockdev.go
@@ -104,7 +104,7 @@
return err
}
defer integrityPartition.Close()
- zeroed512BBuf := make([]byte, 4096)
+ zeroed512BBuf := make([]byte, 4096*128)
if _, err := integrityPartition.Write(zeroed512BBuf); err != nil {
return fmt.Errorf("failed to wipe header: %w", err)
}