Add nanoswitch and cluster testing
Adds nanoswitch and the `switched-multi2` launch target to launch two Smalltown instances on a switched
network and enroll them into a single cluster. Nanoswitch contains a Linux bridge and a minimal DHCP server
and connects to the two Smalltown instances over virtual Ethernet cables. Also moves out the DHCP client into
a package since nanoswitch needs it.
Test Plan:
Manually tested using `bazel run //:launch -- switched-multi2` and observing that the second VM
(whose serial port is mapped to stdout) prints that it is enrolled. Also validated by `bazel run //core/cmd/dbg -- kubectl get node -o wide` returning two ready nodes.
X-Origin-Diff: phab/D572
GitOrigin-RevId: 9f6e2b3d8268749dd81588205646ae3976ad14b3
diff --git a/core/cmd/launch-multi2/BUILD.bazel b/core/cmd/launch-multi2/BUILD.bazel
new file mode 100644
index 0000000..3e3e570
--- /dev/null
+++ b/core/cmd/launch-multi2/BUILD.bazel
@@ -0,0 +1,29 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = ["main.go"],
+ importpath = "git.monogon.dev/source/nexantic.git/core/cmd/launch-multi2",
+ visibility = ["//visibility:private"],
+ deps = [
+ "//core/api/api:go_default_library",
+ "//core/internal/common:go_default_library",
+ "//core/internal/launch:go_default_library",
+ "@com_github_grpc_ecosystem_go_grpc_middleware//retry:go_default_library",
+ "@org_golang_google_grpc//:go_default_library",
+ ],
+)
+
+go_binary(
+ name = "launch-multi2",
+ data = [
+ "//core:image",
+ "//core:swtpm_data",
+ "//core/cmd/nanoswitch:initramfs",
+ "//core/tools/ktest:linux-testing",
+ "//third_party/edk2:firmware",
+ "@com_github_bonzini_qboot//:qboot-bin",
+ ],
+ embed = [":go_default_library"],
+ visibility = ["//visibility:public"],
+)
diff --git a/core/cmd/launch-multi2/main.go b/core/cmd/launch-multi2/main.go
new file mode 100644
index 0000000..0b7ef4e
--- /dev/null
+++ b/core/cmd/launch-multi2/main.go
@@ -0,0 +1,96 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "log"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
+ "google.golang.org/grpc"
+
+ "git.monogon.dev/source/nexantic.git/core/generated/api"
+ "git.monogon.dev/source/nexantic.git/core/internal/common"
+ "git.monogon.dev/source/nexantic.git/core/internal/launch"
+)
+
+func main() {
+ sigs := make(chan os.Signal, 1)
+ signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ <-sigs
+ cancel()
+ }()
+ sw0, vm0, err := launch.NewSocketPair()
+ if err != nil {
+ log.Fatalf("Failed to create network pipe: %v\n", err)
+ }
+ sw1, vm1, err := launch.NewSocketPair()
+ if err != nil {
+ log.Fatalf("Failed to create network pipe: %v\n", err)
+ }
+
+ go func() {
+ if err := launch.Launch(ctx, launch.Options{ConnectToSocket: vm0, SerialPort: os.Stdout}); err != nil {
+ log.Fatalf("Failed to launch vm0: %v", err)
+ }
+ }()
+ nanoswitchPortMap := make(launch.PortMap)
+ identityPorts := []uint16{
+ common.ExternalServicePort,
+ common.DebugServicePort,
+ common.KubernetesAPIPort,
+ }
+ for _, port := range identityPorts {
+ nanoswitchPortMap[port] = port
+ }
+ go func() {
+ opts := []grpcretry.CallOption{
+ grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
+ }
+ conn, err := nanoswitchPortMap.DialGRPC(common.ExternalServicePort, grpc.WithInsecure(),
+ grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(opts...)))
+ if err != nil {
+ panic(err)
+ }
+ defer conn.Close()
+ cmc := api.NewClusterManagementClient(conn)
+ res, err := cmc.NewEnrolmentConfig(context.Background(), &api.NewEnrolmentConfigRequest{
+ Name: "test",
+ }, grpcretry.WithMax(10))
+ if err != nil {
+ log.Fatalf("Failed to get enrolment config: %v", err)
+ }
+ if err := launch.Launch(ctx, launch.Options{ConnectToSocket: vm1, EnrolmentConfig: res.EnrolmentConfig, SerialPort: os.Stdout}); err != nil {
+ log.Fatalf("Failed to launch vm1: %v", err)
+ }
+ }()
+ if err := launch.RunMicroVM(ctx, &launch.MicroVMOptions{
+ SerialPort: os.Stdout,
+ KernelPath: "core/tools/ktest/linux-testing.elf",
+ InitramfsPath: "core/cmd/nanoswitch/initramfs.lz4",
+ ExtraNetworkInterfaces: []*os.File{sw0, sw1},
+ PortMap: nanoswitchPortMap,
+ }); err != nil {
+ log.Fatalf("Failed to launch nanoswitch: %v", err)
+ }
+}
diff --git a/core/cmd/launch/main.go b/core/cmd/launch/main.go
index 100d350..ff5c4d5 100644
--- a/core/cmd/launch/main.go
+++ b/core/cmd/launch/main.go
@@ -18,7 +18,7 @@
import (
"context"
- "fmt"
+ "log"
"os"
"os/signal"
"syscall"
@@ -38,6 +38,6 @@
if err == ctx.Err() {
return
}
- fmt.Printf("Failed to execute: %v\n", err)
+ log.Fatalf("Failed to execute: %v\n", err)
}
}
diff --git a/core/cmd/nanoswitch/BUILD b/core/cmd/nanoswitch/BUILD
new file mode 100644
index 0000000..c70e20f
--- /dev/null
+++ b/core/cmd/nanoswitch/BUILD
@@ -0,0 +1,40 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+load("//core/build:def.bzl", "smalltown_initramfs")
+
+go_library(
+ name = "go_default_library",
+ srcs = ["nanoswitch.go"],
+ importpath = "git.monogon.dev/source/nexantic.git/core/cmd/nanoswitch",
+ visibility = ["//visibility:private"],
+ deps = [
+ "//core/internal/common:go_default_library",
+ "//core/internal/common/supervisor:go_default_library",
+ "//core/internal/launch:go_default_library",
+ "//core/internal/network/dhcp:go_default_library",
+ "@com_github_google_nftables//:go_default_library",
+ "@com_github_google_nftables//expr:go_default_library",
+ "@com_github_insomniacslk_dhcp//dhcpv4:go_default_library",
+ "@com_github_insomniacslk_dhcp//dhcpv4/server4:go_default_library",
+ "@com_github_vishvananda_netlink//:go_default_library",
+ "@org_golang_x_sys//unix:go_default_library",
+ "@org_uber_go_zap//:go_default_library",
+ ],
+)
+
+go_binary(
+ name = "nanoswitch",
+ embed = [":go_default_library"],
+ pure = "on",
+ visibility = ["//visibility:public"],
+)
+
+smalltown_initramfs(
+ name = "initramfs",
+ files = {
+ ":nanoswitch": "/init",
+
+ # CA Certificate bundle
+ "@cacerts//file": "/etc/ssl/cert.pem",
+ },
+ visibility = ["//visibility:public"],
+)
diff --git a/core/cmd/nanoswitch/nanoswitch.go b/core/cmd/nanoswitch/nanoswitch.go
new file mode 100644
index 0000000..73e5135
--- /dev/null
+++ b/core/cmd/nanoswitch/nanoswitch.go
@@ -0,0 +1,298 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// nanoswitch is a virtualized switch/router combo intended for testing.
+// It uses the first interface as an external interface to connect to the host and pass traffic in and out. All other
+// interfaces are switched together and served by a built-in DHCP server. Traffic from that network to the
+// SLIRP/external network is SNATed as the host-side SLIRP ignores routed packets.
+// It also has built-in userspace proxying support for debugging.
+package main
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net"
+ "os"
+ "time"
+
+ "github.com/google/nftables"
+ "github.com/google/nftables/expr"
+ "github.com/insomniacslk/dhcp/dhcpv4"
+ "github.com/insomniacslk/dhcp/dhcpv4/server4"
+ "github.com/vishvananda/netlink"
+ "go.uber.org/zap"
+ "golang.org/x/sys/unix"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/common"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/launch"
+ "git.monogon.dev/source/nexantic.git/core/internal/network/dhcp"
+)
+
+var switchIP = net.IP{10, 1, 0, 1}
+var switchSubnetMask = net.CIDRMask(24, 32)
+
+// defaultLeaseOptions sets the lease options needed to properly configure connectivity to nanoswitch
+func defaultLeaseOptions(reply *dhcpv4.DHCPv4) {
+ reply.GatewayIPAddr = switchIP
+ reply.UpdateOption(dhcpv4.OptDNS(net.IPv4(10, 42, 0, 3))) // SLIRP fake DNS server
+ reply.UpdateOption(dhcpv4.OptRouter(switchIP))
+ reply.IPAddressLeaseTime(12 * time.Hour)
+ reply.UpdateOption(dhcpv4.OptSubnetMask(switchSubnetMask))
+}
+
+// runDHCPServer runs an extremely minimal DHCP server with most options hardcoded, a wrapping bump allocator for the
+// IPs, 12h Lease timeout and no support for DHCP collision detection.
+func runDHCPServer(link netlink.Link) supervisor.Runnable {
+ currentIP := net.IP{10, 1, 0, 1}
+
+ return func(ctx context.Context) error {
+ laddr := net.UDPAddr{
+ IP: net.IPv4(0, 0, 0, 0),
+ Port: 67,
+ }
+ server, err := server4.NewServer(link.Attrs().Name, &laddr, func(conn net.PacketConn, peer net.Addr, m *dhcpv4.DHCPv4) {
+ if m == nil {
+ return
+ }
+ reply, err := dhcpv4.NewReplyFromRequest(m)
+ if err != nil {
+ supervisor.Logger(ctx).Warn("Failed to generate DHCP reply", zap.Error(err))
+ return
+ }
+ reply.UpdateOption(dhcpv4.OptServerIdentifier(switchIP))
+ reply.ServerIPAddr = switchIP
+
+ switch m.MessageType() {
+ case dhcpv4.MessageTypeDiscover:
+ reply.UpdateOption(dhcpv4.OptMessageType(dhcpv4.MessageTypeOffer))
+ defaultLeaseOptions(reply)
+ currentIP[3]++ // Works only because it's a /24
+ reply.YourIPAddr = currentIP
+ supervisor.Logger(ctx).Info("Replying with DHCP IP", zap.String("ip", reply.YourIPAddr.String()))
+ case dhcpv4.MessageTypeRequest:
+ reply.UpdateOption(dhcpv4.OptMessageType(dhcpv4.MessageTypeAck))
+ defaultLeaseOptions(reply)
+ reply.YourIPAddr = m.RequestedIPAddress()
+ case dhcpv4.MessageTypeRelease, dhcpv4.MessageTypeDecline:
+ supervisor.Logger(ctx).Info("Ignoring Release/Decline")
+ }
+ if _, err := conn.WriteTo(reply.ToBytes(), peer); err != nil {
+ supervisor.Logger(ctx).Warn("Cannot reply to client", zap.Error(err))
+ }
+ })
+ if err != nil {
+ return err
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ go func() {
+ <-ctx.Done()
+ server.Close()
+ }()
+ return server.Serve()
+ }
+}
+
+// userspaceProxy listens on port and proxies all TCP connections to the same port on targetIP
+func userspaceProxy(targetIP net.IP, port uint16) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ logger := supervisor.Logger(ctx)
+ tcpListener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(0, 0, 0, 0), Port: int(port)})
+ if err != nil {
+ return err
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ go func() {
+ <-ctx.Done()
+ tcpListener.Close()
+ }()
+ for {
+ conn, err := tcpListener.AcceptTCP()
+ if err != nil {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ return err
+ }
+ go func(conn *net.TCPConn) {
+ defer conn.Close()
+ upstreamConn, err := net.DialTCP("tcp", nil, &net.TCPAddr{IP: targetIP, Port: int(port)})
+ if err != nil {
+ logger.Info("Userspace proxy failed to connect to upstream", zap.Error(err))
+ return
+ }
+ defer upstreamConn.Close()
+ go io.Copy(upstreamConn, conn)
+ io.Copy(conn, upstreamConn)
+ }(conn)
+ }
+
+ }
+}
+
+// addNetworkRoutes sets up routing from DHCP
+func addNetworkRoutes(link netlink.Link, addr net.IPNet, gw net.IP) error {
+ if err := netlink.AddrReplace(link, &netlink.Addr{IPNet: &addr}); err != nil {
+ return fmt.Errorf("failed to add DHCP address to network interface \"%v\": %w", link.Attrs().Name, err)
+ }
+
+ if gw.IsUnspecified() {
+ return nil
+ }
+
+ route := &netlink.Route{
+ Dst: &net.IPNet{IP: net.IPv4(0, 0, 0, 0), Mask: net.IPv4Mask(0, 0, 0, 0)},
+ Gw: gw,
+ Scope: netlink.SCOPE_UNIVERSE,
+ }
+ if err := netlink.RouteAdd(route); err != nil {
+ return fmt.Errorf("could not add default route: netlink.RouteAdd(%+v): %v", route, err)
+ }
+ return nil
+}
+
+// nfifname converts an interface name into 16 bytes padded with zeroes (for nftables)
+func nfifname(n string) []byte {
+ b := make([]byte, 16)
+ copy(b, []byte(n+"\x00"))
+ return b
+}
+
+func main() {
+ logger, err := zap.NewDevelopment()
+ if err != nil {
+ panic(err)
+ }
+
+ supervisor.New(context.Background(), logger, func(ctx context.Context) error {
+ logger := supervisor.Logger(ctx)
+ logger.Info("Starting NanoSwitch, a tiny TOR switch emulator")
+
+ // Set up target filesystems.
+ for _, el := range []struct {
+ dir string
+ fs string
+ flags uintptr
+ }{
+ {"/sys", "sysfs", unix.MS_NOEXEC | unix.MS_NOSUID | unix.MS_NODEV},
+ {"/proc", "proc", unix.MS_NOEXEC | unix.MS_NOSUID | unix.MS_NODEV},
+ {"/dev", "devtmpfs", unix.MS_NOEXEC | unix.MS_NOSUID},
+ {"/dev/pts", "devpts", unix.MS_NOEXEC | unix.MS_NOSUID},
+ } {
+ if err := os.Mkdir(el.dir, 0755); err != nil && !os.IsExist(err) {
+ return fmt.Errorf("could not make %s: %w", el.dir, err)
+ }
+ if err := unix.Mount(el.fs, el.dir, el.fs, el.flags, ""); err != nil {
+ return fmt.Errorf("could not mount %s on %s: %w", el.fs, el.dir, err)
+ }
+ }
+
+ c := &nftables.Conn{}
+
+ links, err := netlink.LinkList()
+ if err != nil {
+ logger.Panic("Failed to list links", zap.Error(err))
+ }
+ var externalLink netlink.Link
+ var vmLinks []netlink.Link
+ for _, link := range links {
+ attrs := link.Attrs()
+ if link.Type() == "device" && len(attrs.HardwareAddr) > 0 {
+ if attrs.Flags&net.FlagUp != net.FlagUp {
+ netlink.LinkSetUp(link) // Attempt to take up all ethernet links
+ }
+ if bytes.Equal(attrs.HardwareAddr, launch.HostInterfaceMAC) {
+ externalLink = link
+ } else {
+ vmLinks = append(vmLinks, link)
+ }
+ }
+ }
+ vmBridgeLink := &netlink.Bridge{LinkAttrs: netlink.LinkAttrs{Name: "vmbridge", Flags: net.FlagUp}}
+ if err := netlink.LinkAdd(vmBridgeLink); err != nil {
+ logger.Panic("Failed to create vmbridge", zap.Error(err))
+ }
+ for _, link := range vmLinks {
+ if err := netlink.LinkSetMaster(link, vmBridgeLink); err != nil {
+ logger.Panic("Failed to add VM interface to bridge", zap.Error(err))
+ }
+ logger.Info("Assigned interface to bridge", zap.String("if", link.Attrs().Name))
+ }
+ if err := netlink.AddrReplace(vmBridgeLink, &netlink.Addr{IPNet: &net.IPNet{IP: switchIP, Mask: switchSubnetMask}}); err != nil {
+ logger.Panic("Failed to assign static IP to vmbridge")
+ }
+ if externalLink != nil {
+ nat := c.AddTable(&nftables.Table{
+ Family: nftables.TableFamilyIPv4,
+ Name: "nat",
+ })
+
+ postrouting := c.AddChain(&nftables.Chain{
+ Name: "postrouting",
+ Hooknum: nftables.ChainHookPostrouting,
+ Priority: nftables.ChainPriorityNATSource,
+ Table: nat,
+ Type: nftables.ChainTypeNAT,
+ })
+
+ // Masquerade/SNAT all traffic going out of the external interface
+ c.AddRule(&nftables.Rule{
+ Table: nat,
+ Chain: postrouting,
+ Exprs: []expr.Any{
+ &expr.Meta{Key: expr.MetaKeyOIFNAME, Register: 1},
+ &expr.Cmp{
+ Op: expr.CmpOpEq,
+ Register: 1,
+ Data: nfifname(externalLink.Attrs().Name),
+ },
+ &expr.Masq{},
+ },
+ })
+
+ if err := c.Flush(); err != nil {
+ panic(err)
+ }
+
+ dhcpClient := dhcp.New()
+ supervisor.Run(ctx, "dhcp-client", dhcpClient.Run(externalLink))
+ if err := ioutil.WriteFile("/proc/sys/net/ipv4/ip_forward", []byte("1\n"), 0644); err != nil {
+ logger.Panic("Failed to write ip forwards", zap.Error(err))
+ }
+ status, err := dhcpClient.Status(ctx, true)
+ if err != nil {
+ return err
+ }
+
+ if err := addNetworkRoutes(externalLink, status.Address, status.Gateway); err != nil {
+ return err
+ }
+ } else {
+ logger.Info("No upstream interface detected")
+ }
+ supervisor.Run(ctx, "dhcp-server", runDHCPServer(vmBridgeLink))
+ supervisor.Run(ctx, "proxy-ext1", userspaceProxy(net.IPv4(10, 1, 0, 2), common.ExternalServicePort))
+ supervisor.Run(ctx, "proxy-dbg1", userspaceProxy(net.IPv4(10, 1, 0, 2), common.DebugServicePort))
+ supervisor.Run(ctx, "proxy-k8s-api1", userspaceProxy(net.IPv4(10, 1, 0, 2), common.KubernetesAPIPort))
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+ })
+ select {}
+}
diff --git a/core/internal/api/BUILD.bazel b/core/internal/api/BUILD.bazel
index e862340..2f25fe6 100644
--- a/core/internal/api/BUILD.bazel
+++ b/core/internal/api/BUILD.bazel
@@ -18,6 +18,7 @@
"//core/internal/consensus:go_default_library",
"//core/pkg/tpm:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
+ "@com_github_grpc_ecosystem_go_grpc_middleware//retry:go_default_library",
"@io_etcd_go_etcd//clientv3:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
diff --git a/core/internal/api/nodemanagement.go b/core/internal/api/nodemanagement.go
index 0a3614e..4bc4659 100644
--- a/core/internal/api/nodemanagement.go
+++ b/core/internal/api/nodemanagement.go
@@ -23,19 +23,26 @@
"crypto/rand"
"crypto/sha256"
"crypto/subtle"
+ "crypto/tls"
"crypto/x509"
"encoding/hex"
"errors"
"fmt"
"io"
+ "net"
+ "time"
"github.com/gogo/protobuf/proto"
+ grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
+ "google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"git.monogon.dev/source/nexantic.git/core/generated/api"
+ "git.monogon.dev/source/nexantic.git/core/internal/common"
"git.monogon.dev/source/nexantic.git/core/pkg/tpm"
)
@@ -53,7 +60,7 @@
return "", errors.New("invalid node identity certificate")
}
- return "smalltown-" + hex.EncodeToString([]byte(pubKey[:16])), nil
+ return common.NameFromIDKey(pubKey), nil
}
func (s *Server) registerNewNode(node *api.Node) error {
@@ -178,6 +185,42 @@
}})
}
+func (s *Server) dialNode(ctx context.Context, node *api.Node) (api.NodeServiceClient, error) {
+ masterID, err := s.loadMasterCert()
+ if err != nil {
+ return nil, err
+ }
+
+ secureTransport := &tls.Config{
+ Certificates: []tls.Certificate{*masterID},
+ InsecureSkipVerify: true,
+ // Critical function, please review any changes with care
+ // TODO(lorenz): Actively check that this actually provides the security guarantees that we need
+ VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
+ for _, cert := range rawCerts {
+ // X.509 certificates in DER can be compared like this since DER has a unique representation
+ // for each certificate.
+ if bytes.Equal(cert, node.IdCert) {
+ return nil
+ }
+ }
+ return errors.New("failed to find authorized Node certificate")
+ },
+ MinVersion: tls.VersionTLS13,
+ }
+ addr := net.IP(node.Address)
+ opts := []grpcretry.CallOption{
+ grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
+ }
+ clientCreds := grpc.WithTransportCredentials(credentials.NewTLS(secureTransport))
+ clientConn, err := grpc.DialContext(ctx, fmt.Sprintf("%v:%v", addr, common.NodeServicePort), clientCreds,
+ grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(opts...)))
+ if err != nil {
+ return nil, fmt.Errorf("failed to dial node service: %w", err)
+ }
+ return api.NewNodeServiceClient(clientConn), nil
+}
+
func (s *Server) NewTPM2NodeRegister(registerServer api.NodeManagementService_NewTPM2NodeRegisterServer) error {
registerReqContainer, err := registerServer.Recv()
if err != nil {
@@ -258,8 +301,17 @@
}
// TODO: Plug in policy engine here
+ idCert, err := x509.ParseCertificate(newNodeInfo.IdCert)
+ if err != nil {
+ return err
+ }
+ nodeIdPubKey, ok := idCert.PublicKey.(ed25519.PublicKey)
+ if !ok || len(nodeIdPubKey) != ed25519.PublicKeySize {
+ return status.Error(codes.InvalidArgument, "Invalid ID certificate public key")
+ }
node := api.Node{
+ Name: common.NameFromIDKey(nodeIdPubKey),
Address: newNodeInfo.Ip,
Integrity: &api.Node_Tpm2{Tpm2: &api.NodeTPM2{
AkPub: registerReq.AkPublic,
@@ -268,7 +320,7 @@
}},
GlobalUnlockKey: newNodeInfo.GlobalUnlockKey,
IdCert: newNodeInfo.IdCert,
- State: api.Node_UNININITALIZED,
+ State: api.Node_MASTER,
}
if err := s.registerNewNode(&node); err != nil {
@@ -276,5 +328,27 @@
return status.Error(codes.Internal, "failed to register node")
}
+ go func() {
+ ctx := context.Background()
+ nodeClient, err := s.dialNode(ctx, &node)
+ if err != nil {
+ s.Logger.Warn("Failed to join newly enrolled node", zap.Error(err))
+ return
+ }
+ newCerts, initialCluster, err := s.consensusService.ProvisionMember(node.Name, node.Address)
+ if err != nil {
+ s.Logger.Warn("Failed to join newly enrolled node", zap.Error(err))
+ return
+ }
+ _, err = nodeClient.JoinCluster(ctx, &api.JoinClusterRequest{
+ InitialCluster: initialCluster,
+ Certs: newCerts,
+ }, grpcretry.WithMax(10))
+ if err != nil {
+ s.Logger.Warn("Failed to join newly enrolled node", zap.Error(err))
+ return
+ }
+ }()
+
return nil
}
diff --git a/core/internal/common/setup.go b/core/internal/common/setup.go
index db00692..531b688 100644
--- a/core/internal/common/setup.go
+++ b/core/internal/common/setup.go
@@ -16,6 +16,13 @@
package common
+import (
+ "crypto/ed25519"
+ "encoding/hex"
+ "errors"
+ "strings"
+)
+
type (
SmalltownState string
)
@@ -38,3 +45,14 @@
// Node is fully provisioned.
StateJoined SmalltownState = "enrolled"
)
+
+func NameFromIDKey(pubKey ed25519.PublicKey) string {
+ return "smalltown-" + hex.EncodeToString(pubKey[:16])
+}
+
+func IDKeyPrefixFromName(name string) ([]byte, error) {
+ if !strings.HasPrefix(name, "smalltown-") {
+ return []byte{}, errors.New("invalid name")
+ }
+ return hex.DecodeString(strings.TrimPrefix(name, "smalltown-"))
+}
diff --git a/core/internal/consensus/ca/ca.go b/core/internal/consensus/ca/ca.go
index 5952b6f..20f7c31 100644
--- a/core/internal/consensus/ca/ca.go
+++ b/core/internal/consensus/ca/ca.go
@@ -35,6 +35,7 @@
"errors"
"fmt"
"math/big"
+ "net"
"time"
)
@@ -150,7 +151,7 @@
}
// IssueCertificate issues a certificate
-func (ca *CA) IssueCertificate(commonName string) (cert []byte, privkey []byte, err error) {
+func (ca *CA) IssueCertificate(commonName string, ip net.IP) (cert []byte, privkey []byte, err error) {
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 127)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
@@ -179,6 +180,7 @@
NotAfter: unknownNotAfter,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
DNSNames: []string{commonName},
+ IPAddresses: []net.IP{ip},
}
cert, err = x509.CreateCertificate(rand.Reader, etcdCert, ca.CACert, pubKey, ca.PrivateKey)
return
diff --git a/core/internal/consensus/consensus.go b/core/internal/consensus/consensus.go
index d401c1a..5885aa8 100644
--- a/core/internal/consensus/consensus.go
+++ b/core/internal/consensus/consensus.go
@@ -21,11 +21,13 @@
"bytes"
"context"
"crypto/x509"
+ "encoding/binary"
"encoding/hex"
"encoding/pem"
"fmt"
"io/ioutil"
"math/rand"
+ "net"
"net/url"
"os"
"path"
@@ -112,6 +114,10 @@
return consensusServer, nil
}
+func peerURL(host string) url.URL {
+ return url.URL{Scheme: "https", Host: fmt.Sprintf("%s:%d", host, common.ConsensusPort)}
+}
+
func (s *Service) OnStart() error {
// See: https://godoc.org/github.com/coreos/etcd/embed#Config
@@ -143,19 +149,8 @@
}
cfg.LCUrls = []url.URL{*listenerURL}
- // Advertise Peer URLs
- apURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ExternalHost, common.ConsensusPort))
- if err != nil {
- return fmt.Errorf("invalid external_host or listen_port: %w", err)
- }
-
- // Listen Peer URLs
- lpURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ListenHost, common.ConsensusPort))
- if err != nil {
- return fmt.Errorf("invalid listen_host or listen_port: %w", err)
- }
- cfg.APUrls = []url.URL{*apURL}
- cfg.LPUrls = []url.URL{*lpURL}
+ cfg.APUrls = []url.URL{peerURL(s.config.ExternalHost)}
+ cfg.LPUrls = []url.URL{peerURL(s.config.ListenHost)}
cfg.ACUrls = []url.URL{}
cfg.Dir = s.config.DataDir
@@ -200,6 +195,8 @@
// Start CRL watcher
go s.watchCRL()
+ ctx := context.TODO()
+ go s.autoPromote(ctx)
return nil
}
@@ -227,13 +224,13 @@
}
// PrecreateCA generates the etcd cluster certificate authority and writes it to local storage.
-func (s *Service) PrecreateCA() error {
+func (s *Service) PrecreateCA(extIP net.IP) error {
// Provision an etcd CA
etcdRootCA, err := ca.New("Smalltown etcd Root CA")
if err != nil {
return err
}
- cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost)
+ cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost, extIP)
if err != nil {
return fmt.Errorf("failed to self-issue a certificate: %w", err)
}
@@ -326,30 +323,61 @@
return idCA, crlRevision, nil
}
-func (s *Service) IssueCertificate(hostname string) (*api.ConsensusCertificates, error) {
+// ProvisionMember sets up and returns provisioning data to join another node into the consensus.
+// It issues PKI material, creates a static cluster bootstrap specification string (known as initial-cluster in etcd)
+// and adds the new node as a learner (non-voting) member to the cluster. Once the new node has caught up with the
+// cluster it is automatically promoted to a voting member by the autoPromote process.
+func (s *Service) ProvisionMember(name string, ip net.IP) (*api.ConsensusCertificates, string, error) {
idCA, _, err := s.getCAFromEtcd()
if err != nil {
- return nil, err
+ return nil, "", fmt.Errorf("failed to get consensus CA: %w", err)
}
- cert, key, err := idCA.IssueCertificate(hostname)
+ cert, key, err := idCA.IssueCertificate(name, ip)
if err != nil {
- return nil, fmt.Errorf("failed to issue certificate: %w", err)
+ return nil, "", fmt.Errorf("failed to issue certificate: %w", err)
}
certVal, err := x509.ParseCertificate(cert)
if err != nil {
- return nil, err
+ return nil, "", fmt.Errorf("failed to parse just-issued consensus cert: %w", err)
}
serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(cert)); err != nil {
// We issued a certificate, but failed to persist it. Return an error and forget it ever happened.
- return nil, fmt.Errorf("failed to persist certificate: %w", err)
+ return nil, "", fmt.Errorf("failed to persist certificate: %w", err)
+ }
+
+ currentMembers := s.etcd.Server.Cluster().Members()
+ var memberStrs []string
+ for _, member := range currentMembers {
+ memberStrs = append(memberStrs, fmt.Sprintf("%v=%v", member.Name, member.PickPeerURL()))
+ }
+ apURL := peerURL(ip.String())
+ memberStrs = append(memberStrs, fmt.Sprintf("%s=%s", name, apURL.String()))
+
+ pubKeyPrefix, err := common.IDKeyPrefixFromName(name)
+ if err != nil {
+ return nil, "", fmt.Errorf("invalid new node name: %v", err)
+ }
+
+ crl, _, err := s.etcdGetSingle(crlPathEtcd)
+
+ _, err = s.etcd.Server.AddMember(context.Background(), membership.Member{
+ RaftAttributes: membership.RaftAttributes{
+ PeerURLs: types.URLs{apURL}.StringSlice(),
+ IsLearner: true,
+ },
+ Attributes: membership.Attributes{Name: name},
+ ID: types.ID(binary.BigEndian.Uint64(pubKeyPrefix[:8])),
+ })
+ if err != nil {
+ return nil, "", fmt.Errorf("failed to provision member: %w", err)
}
return &api.ConsensusCertificates{
Ca: idCA.CACertRaw,
Cert: cert,
- Crl: idCA.CRLRaw,
+ Crl: crl,
Key: key,
- }, nil
+ }, strings.Join(memberStrs, ","), nil
}
func (s *Service) RevokeCertificate(hostname string) error {
@@ -416,6 +444,32 @@
}
}
+// autoPromote automatically promotes learning (non-voting) members to voting members. etcd currently lacks auto-promote
+// capabilities (https://github.com/etcd-io/etcd/issues/10537) so we need to do this ourselves.
+func (s *Service) autoPromote(ctx context.Context) {
+ promoteTicker := time.NewTicker(5 * time.Second)
+ go func() {
+ <-ctx.Done()
+ promoteTicker.Stop()
+ }()
+ for range promoteTicker.C {
+ if s.etcd.Server.Leader() != s.etcd.Server.ID() {
+ continue
+ }
+ for _, member := range s.etcd.Server.Cluster().Members() {
+ if member.IsLearner {
+ // We always call PromoteMember since the metadata necessary to decide if we should is private.
+ // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
+ // connected or are still behind on transactions.
+ if _, err := s.etcd.Server.PromoteMember(context.Background(), uint64(member.ID)); err != nil {
+ s.Logger.Info("Failed to promote consensus node", zap.String("node", member.Name), zap.Error(err))
+ }
+ s.Logger.Info("Promoted new consensus node", zap.String("node", member.Name))
+ }
+ }
+ }
+}
+
func (s *Service) OnStop() error {
s.watchCRLTicker.Stop()
s.etcd.Close()
@@ -435,33 +489,6 @@
return s.ready.Load()
}
-// AddMember adds a new etcd member to the cluster
-func (s *Service) AddMember(ctx context.Context, name string, url string) (uint64, error) {
- urls, err := types.NewURLs([]string{url})
- if err != nil {
- return 0, err
- }
-
- member := membership.NewMember(name, urls, DefaultClusterToken, nil)
-
- _, err = s.etcd.Server.AddMember(ctx, *member)
- if err != nil {
- return 0, err
- }
-
- return uint64(member.ID), nil
-}
-
-// RemoveMember removes a member from the etcd cluster
-func (s *Service) RemoveMember(ctx context.Context, id uint64) error {
- _, err := s.etcd.Server.RemoveMember(ctx, id)
- return err
-}
-
-// Health returns the current cluster health
-func (s *Service) Health() {
-}
-
// GetConfig returns the current consensus config
func (s *Service) GetConfig() Config {
return *s.config
@@ -472,39 +499,6 @@
s.config = &config
}
-// GetInitialClusterString returns the InitialCluster string that can be used to bootstrap a consensus node
-func (s *Service) GetInitialClusterString() string {
- members := s.etcd.Server.Cluster().Members()
- clusterString := strings.Builder{}
-
- for i, m := range members {
- if i != 0 {
- clusterString.WriteString(",")
- }
- clusterString.WriteString(m.Name)
- clusterString.WriteString("=")
- clusterString.WriteString(m.PickPeerURL())
- }
-
- return clusterString.String()
-}
-
-// GetNodes returns a list of consensus nodes
-func (s *Service) GetNodes() []Member {
- members := s.etcd.Server.Cluster().Members()
- cMembers := make([]Member, len(members))
- for i, m := range members {
- cMembers[i] = Member{
- ID: uint64(m.ID),
- Name: m.Name,
- Address: m.PickPeerURL(),
- Synced: !m.IsLearner,
- }
- }
-
- return cMembers
-}
-
func (s *Service) GetStore(module, space string) clientv3.KV {
return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
}
diff --git a/core/internal/network/BUILD.bazel b/core/internal/network/BUILD.bazel
index 7e45086..9eefc1b 100644
--- a/core/internal/network/BUILD.bazel
+++ b/core/internal/network/BUILD.bazel
@@ -2,16 +2,12 @@
go_library(
name = "go_default_library",
- srcs = [
- "dhcp.go",
- "main.go",
- ],
+ srcs = ["main.go"],
importpath = "git.monogon.dev/source/nexantic.git/core/internal/network",
visibility = ["//:__subpackages__"],
deps = [
"//core/internal/common/supervisor:go_default_library",
- "@com_github_insomniacslk_dhcp//dhcpv4:go_default_library",
- "@com_github_insomniacslk_dhcp//dhcpv4/nclient4:go_default_library",
+ "//core/internal/network/dhcp:go_default_library",
"@com_github_vishvananda_netlink//:go_default_library",
"@org_golang_x_sys//unix:go_default_library",
"@org_uber_go_zap//:go_default_library",
diff --git a/core/internal/network/dhcp/BUILD.bazel b/core/internal/network/dhcp/BUILD.bazel
new file mode 100644
index 0000000..40ac372
--- /dev/null
+++ b/core/internal/network/dhcp/BUILD.bazel
@@ -0,0 +1,15 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = ["dhcp.go"],
+ importpath = "git.monogon.dev/source/nexantic.git/core/internal/network/dhcp",
+ visibility = ["//core:__subpackages__"],
+ deps = [
+ "//core/internal/common/supervisor:go_default_library",
+ "@com_github_insomniacslk_dhcp//dhcpv4:go_default_library",
+ "@com_github_insomniacslk_dhcp//dhcpv4/nclient4:go_default_library",
+ "@com_github_vishvananda_netlink//:go_default_library",
+ "@org_uber_go_zap//:go_default_library",
+ ],
+)
diff --git a/core/internal/network/dhcp.go b/core/internal/network/dhcp/dhcp.go
similarity index 70%
rename from core/internal/network/dhcp.go
rename to core/internal/network/dhcp/dhcp.go
index 983c25c..0eef2cc 100644
--- a/core/internal/network/dhcp.go
+++ b/core/internal/network/dhcp/dhcp.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package network
+package dhcp
import (
"context"
@@ -29,48 +29,48 @@
"go.uber.org/zap"
)
-type dhcpClient struct {
+type Client struct {
reqC chan *dhcpStatusReq
}
-func newDHCPClient() *dhcpClient {
- return &dhcpClient{
+func New() *Client {
+ return &Client{
reqC: make(chan *dhcpStatusReq),
}
}
type dhcpStatusReq struct {
- resC chan *dhcpStatus
+ resC chan *Status
wait bool
}
-func (r *dhcpStatusReq) fulfill(s *dhcpStatus) {
+func (r *dhcpStatusReq) fulfill(s *Status) {
go func() {
r.resC <- s
}()
}
-// dhcpStatus is the IPv4 configuration provisioned via DHCP for a given interface. It does not necessarily represent
+// Status is the IPv4 configuration provisioned via DHCP for a given interface. It does not necessarily represent
// a configuration that is active or even valid.
-type dhcpStatus struct {
- // address is 'our' (the node's) IPv4 address on the network.
- address net.IPNet
- // gateway is the default gateway/router of this network, or 0.0.0.0 if none was given.
- gateway net.IP
- // dns is a list of IPv4 DNS servers to use.
- dns []net.IP
+type Status struct {
+ // Address is 'our' (the node's) IPv4 Address on the network.
+ Address net.IPNet
+ // Gateway is the default Gateway/router of this network, or 0.0.0.0 if none was given.
+ Gateway net.IP
+ // DNS is a list of IPv4 DNS servers to use.
+ DNS []net.IP
}
-func (s *dhcpStatus) String() string {
- return fmt.Sprintf("Address: %s, Gateway: %s, DNS: %v", s.address.String(), s.gateway.String(), s.dns)
+func (s *Status) String() string {
+ return fmt.Sprintf("Address: %s, Gateway: %s, DNS: %v", s.Address.String(), s.Gateway.String(), s.DNS)
}
-func (c *dhcpClient) run(iface netlink.Link) supervisor.Runnable {
+func (c *Client) Run(iface netlink.Link) supervisor.Runnable {
return func(ctx context.Context) error {
logger := supervisor.Logger(ctx)
- // Channel updated with address once one gets assigned/updated
- newC := make(chan *dhcpStatus)
+ // Channel updated with Address once one gets assigned/updated
+ newC := make(chan *Status)
// Status requests waiting for configuration
waiters := []*dhcpStatusReq{}
@@ -106,7 +106,7 @@
// We start at WAITING, once we get a current config we move to ASSIGNED
// Once this becomes more complex (ie. has to handle link state changes)
// this should grow into a real state machine.
- var current *dhcpStatus
+ var current *Status
logger.Info("DHCP client WAITING")
for {
select {
@@ -133,28 +133,28 @@
}
}
-// parseAck turns an internal status (from the dhcpv4 library) into a dhcpStatus
-func parseAck(ack *dhcpv4.DHCPv4) *dhcpStatus {
+// parseAck turns an internal Status (from the dhcpv4 library) into a Status
+func parseAck(ack *dhcpv4.DHCPv4) *Status {
address := net.IPNet{IP: ack.YourIPAddr, Mask: ack.SubnetMask()}
- // DHCP routers are optional - if none are provided, assume no router and set gateway to 0.0.0.0
- // (this makes gateway.IsUnspecified() == true)
+ // DHCP routers are optional - if none are provided, assume no router and set Gateway to 0.0.0.0
+ // (this makes Gateway.IsUnspecified() == true)
gateway, _, _ := net.ParseCIDR("0.0.0.0/0")
if routers := ack.Router(); len(routers) > 0 {
gateway = routers[0]
}
- return &dhcpStatus{
- address: address,
- gateway: gateway,
- dns: ack.DNS(),
+ return &Status{
+ Address: address,
+ Gateway: gateway,
+ DNS: ack.DNS(),
}
}
-// status returns the DHCP configuration requested from us by the local DHCP server.
-// If wait is true, this function will block until a DHCP configuration is available. Otherwise, a nil status may be
+// Status returns the DHCP configuration requested from us by the local DHCP server.
+// If wait is true, this function will block until a DHCP configuration is available. Otherwise, a nil Status may be
// returned to indicate that no configuration has been received yet.
-func (c *dhcpClient) status(ctx context.Context, wait bool) (*dhcpStatus, error) {
- resC := make(chan *dhcpStatus)
+func (c *Client) Status(ctx context.Context, wait bool) (*Status, error) {
+ resC := make(chan *Status)
c.reqC <- &dhcpStatusReq{
resC: resC,
wait: wait,
diff --git a/core/internal/network/main.go b/core/internal/network/main.go
index 00d7fb2..2466e05 100644
--- a/core/internal/network/main.go
+++ b/core/internal/network/main.go
@@ -22,11 +22,12 @@
"net"
"os"
- "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
-
"github.com/vishvananda/netlink"
"go.uber.org/zap"
"golang.org/x/sys/unix"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/network/dhcp"
)
const (
@@ -36,7 +37,7 @@
type Service struct {
config Config
- dhcp *dhcpClient
+ dhcp *dhcp.Client
logger *zap.Logger
}
@@ -47,7 +48,7 @@
func New(config Config) *Service {
return &Service{
config: config,
- dhcp: newDHCPClient(),
+ dhcp: dhcp.New(),
}
}
@@ -76,7 +77,7 @@
func (s *Service) addNetworkRoutes(link netlink.Link, addr net.IPNet, gw net.IP) error {
if err := netlink.AddrReplace(link, &netlink.Addr{IPNet: &addr}); err != nil {
- return err
+ return fmt.Errorf("failed to add DHCP address to network interface \"%v\": %w", link.Attrs().Name, err)
}
if gw.IsUnspecified() {
@@ -96,20 +97,20 @@
}
func (s *Service) useInterface(ctx context.Context, iface netlink.Link) error {
- err := supervisor.Run(ctx, "dhcp", s.dhcp.run(iface))
+ err := supervisor.Run(ctx, "dhcp", s.dhcp.Run(iface))
if err != nil {
return err
}
- status, err := s.dhcp.status(ctx, true)
+ status, err := s.dhcp.Status(ctx, true)
if err != nil {
- return fmt.Errorf("could not get DHCP status: %w", err)
+ return fmt.Errorf("could not get DHCP Status: %w", err)
}
- if err := setResolvconf(status.dns, []string{}); err != nil {
+ if err := setResolvconf(status.DNS, []string{}); err != nil {
s.logger.Warn("failed to set resolvconf", zap.Error(err))
}
- if err := s.addNetworkRoutes(iface, net.IPNet{IP: status.address.IP, Mask: status.address.Mask}, status.gateway); err != nil {
+ if err := s.addNetworkRoutes(iface, status.Address, status.Gateway); err != nil {
s.logger.Warn("failed to add routes", zap.Error(err))
}
@@ -118,11 +119,11 @@
// GetIP returns the current IP (and optionally waits for one to be assigned)
func (s *Service) GetIP(ctx context.Context, wait bool) (*net.IP, error) {
- status, err := s.dhcp.status(ctx, wait)
+ status, err := s.dhcp.Status(ctx, wait)
if err != nil {
return nil, err
}
- return &status.address.IP, nil
+ return &status.Address.IP, nil
}
func (s *Service) Run(ctx context.Context) error {
diff --git a/core/internal/node/main.go b/core/internal/node/main.go
index 2cf88f4..8c40e9f 100644
--- a/core/internal/node/main.go
+++ b/core/internal/node/main.go
@@ -25,7 +25,6 @@
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
- "encoding/hex"
"errors"
"flag"
"fmt"
@@ -215,6 +214,10 @@
return err
}
+ if err := s.initNodeAPI(); err != nil {
+ return err
+ }
+
// We only support TPM2 at the moment, any abstractions here would be premature
trustAgent := tpm2.TPM2Agent{}
@@ -281,7 +284,11 @@
s.Consensus.SetConfig(config)
// Generate the cluster CA and store it to local storage.
- if err := s.Consensus.PrecreateCA(); err != nil {
+ extIP, err := s.Network.GetIP(ctx, true)
+ if err != nil {
+ return err
+ }
+ if err := s.Consensus.PrecreateCA(*extIP); err != nil {
return err
}
@@ -370,7 +377,7 @@
return []byte{}, "", fmt.Errorf("failed to write node key: %w", err)
}
- name := "smalltown-" + hex.EncodeToString([]byte(pubKey[:16]))
+ name := common.NameFromIDKey(pubKey)
// This has no SANs because it authenticates by public key, not by name
nodeCert := &x509.Certificate{
@@ -439,7 +446,7 @@
secureTransport := &tls.Config{
Certificates: []tls.Certificate{nodeID},
- ClientAuth: tls.RequireAndVerifyClientCert,
+ ClientAuth: tls.RequestClientCert,
InsecureSkipVerify: true,
// Critical function, please review any changes with care
// TODO(lorenz): Actively check that this actually provides the security guarantees that we need
@@ -451,6 +458,7 @@
return nil
}
}
+ s.logger.Warn("Rejecting NodeService connection with no trusted client certificate")
return errors.New("failed to find authorized NMS certificate")
},
MinVersion: tls.VersionTLS13,
@@ -470,6 +478,7 @@
panic(err) // Can only happen during initialization and is always fatal
}
}()
+
return nil
}
diff --git a/core/internal/node/setup.go b/core/internal/node/setup.go
index cbbfd4d..a9e841c 100644
--- a/core/internal/node/setup.go
+++ b/core/internal/node/setup.go
@@ -18,10 +18,11 @@
import (
"context"
- "errors"
"fmt"
+ "os"
- "go.uber.org/zap"
+ "git.monogon.dev/source/nexantic.git/core/internal/storage"
+
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -61,28 +62,34 @@
return &api.NewNodeInfo{
EnrolmentConfig: s.enrolmentConfig,
- Ip: []byte(*nodeIP),
+ Ip: *nodeIP,
IdCert: nodeCert,
GlobalUnlockKey: globalUnlockKey,
}, nodeID, nil
}
-func (s *SmalltownNode) JoinCluster(context context.Context, req *api.JoinClusterRequest) (*api.JoinClusterResponse, error) {
+func (s *SmalltownNode) JoinCluster(ctx context.Context, req *api.JoinClusterRequest) (*api.JoinClusterResponse, error) {
if s.state != common.StateEnrollMode {
return nil, ErrNotInJoinMode
}
s.logger.Info("Joining Consenus")
+ dataPath, err := s.Storage.GetPathInPlace(storage.PlaceData, "etcd")
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "Data partition not available: %v", err)
+ }
+
+ if err := os.MkdirAll(dataPath, 0600); err != nil {
+ return nil, status.Errorf(codes.Internal, "Cannot create path on data partition: %v", err)
+ }
+
config := s.Consensus.GetConfig()
config.Name = s.hostname
- config.InitialCluster = "default" // Clusters can't cross-join anyways due to cryptography
+ config.InitialCluster = req.InitialCluster
+ config.DataDir = dataPath
s.Consensus.SetConfig(config)
- var err error
- if err != nil {
- s.logger.Warn("Invalid JoinCluster request", zap.Error(err))
- return nil, errors.New("invalid join request")
- }
+
if err := s.Consensus.WriteCertificateFiles(req.Certs); err != nil {
return nil, err
}
@@ -94,6 +101,8 @@
}
s.state = common.StateJoined
+ go s.Containerd.Run()(context.TODO())
+ s.Kubernetes.Start()
s.logger.Info("Joined cluster. Node is now syncing.")
diff --git a/core/internal/storage/blockdev.go b/core/internal/storage/blockdev.go
index da3dcfa..fc556e1 100644
--- a/core/internal/storage/blockdev.go
+++ b/core/internal/storage/blockdev.go
@@ -104,7 +104,7 @@
return err
}
defer integrityPartition.Close()
- zeroed512BBuf := make([]byte, 4096)
+ zeroed512BBuf := make([]byte, 4096*128)
if _, err := integrityPartition.Write(zeroed512BBuf); err != nil {
return fmt.Errorf("failed to wipe header: %w", err)
}