core/internal: move containerd and kubernetes to localstorage
This moves the last users of the old 'storage' library onto 'localstorage'. We move a lot of 'runtime' directories to a single `/ephemeral` root. This could be called `/run`, but that might imply FHS compliance - which we don't have, nor want to have.
We also slightly refactor Kubernetes services to be a bit nicer to spawn. But generally, this is a pure refactor, with no functional changes.
Test Plan: this should fail. part of a larger stack. D590 is the first tip of the stack that should work.
X-Origin-Diff: phab/D589
GitOrigin-RevId: d2a7c0bb52c2a7c753199221c609e03474936c22
diff --git a/core/internal/kubernetes/clusternet/clusternet.go b/core/internal/kubernetes/clusternet/clusternet.go
index aa3e7ce..5c42bb8 100644
--- a/core/internal/kubernetes/clusternet/clusternet.go
+++ b/core/internal/kubernetes/clusternet/clusternet.go
@@ -30,10 +30,13 @@
"encoding/json"
"errors"
"fmt"
- "io/ioutil"
"net"
"os"
+ "k8s.io/client-go/informers"
+
+ "k8s.io/client-go/kubernetes"
+
"github.com/vishvananda/netlink"
"go.uber.org/zap"
"golang.zx2c4.com/wireguard/wgctrl"
@@ -41,33 +44,34 @@
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
- "k8s.io/client-go/informers"
- coreinformers "k8s.io/client-go/informers/core/v1"
- "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"git.monogon.dev/source/nexantic.git/core/internal/common"
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
"git.monogon.dev/source/nexantic.git/core/pkg/jsonpatch"
)
const (
clusterNetDeviceName = "clusternet"
publicKeyAnnotation = "node.smalltown.nexantic.com/wg-pubkey"
-
- privateKeyPath = "/data/kubernetes/clusternet.key"
)
-type clusternet struct {
- nodeName string
- wgClient *wgctrl.Client
- nodeInformer coreinformers.NodeInformer
- logger *zap.Logger
+type Service struct {
+ NodeName string
+ Kubernetes kubernetes.Interface
+ ClusterNet net.IPNet
+ InformerFactory informers.SharedInformerFactory
+ DataDirectory *localstorage.DataKubernetesClusterNetworkingDirectory
+
+ wgClient *wgctrl.Client
+ privKey wgtypes.Key
+ logger *zap.Logger
}
// ensureNode creates/updates the corresponding WireGuard peer entry for the given node objet
-func (c *clusternet) ensureNode(newNode *corev1.Node) error {
- if newNode.Name == c.nodeName {
+func (s *Service) ensureNode(newNode *corev1.Node) error {
+ if newNode.Name == s.NodeName {
// Node doesn't need to connect to itself
return nil
}
@@ -83,12 +87,12 @@
for _, addr := range newNode.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
if internalIP != nil {
- c.logger.Warn("More than one NodeInternalIP specified, using the first one")
+ s.logger.Warn("More than one NodeInternalIP specified, using the first one")
break
}
internalIP = net.ParseIP(addr.Address)
if internalIP == nil {
- c.logger.Warn("failed to parse Internal IP")
+ s.logger.Warn("failed to parse Internal IP")
}
}
}
@@ -99,16 +103,16 @@
for _, podNetStr := range newNode.Spec.PodCIDRs {
_, podNet, err := net.ParseCIDR(podNetStr)
if err != nil {
- c.logger.Warn("Node PodCIDR failed to parse, ignored", zap.Error(err), zap.String("node", newNode.Name))
+ s.logger.Warn("Node PodCIDR failed to parse, ignored", zap.Error(err), zap.String("node", newNode.Name))
continue
}
allowedIPs = append(allowedIPs, *podNet)
}
- c.logger.Debug("Adding/Updating WireGuard peer node", zap.String("node", newNode.Name),
+ s.logger.Debug("Adding/Updating WireGuard peer node", zap.String("node", newNode.Name),
zap.String("endpointIP", internalIP.String()), zap.Any("allowedIPs", allowedIPs))
// WireGuard's kernel side has create/update semantics on peers by default. So we can just add the peer multiple
// times to update it.
- err = c.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Peers: []wgtypes.PeerConfig{{
PublicKey: pubKey,
Endpoint: &net.UDPAddr{Port: common.WireGuardPort, IP: internalIP},
@@ -123,8 +127,8 @@
}
// removeNode removes the corresponding WireGuard peer entry for the given node object
-func (c *clusternet) removeNode(oldNode *corev1.Node) error {
- if oldNode.Name == c.nodeName {
+func (s *Service) removeNode(oldNode *corev1.Node) error {
+ if oldNode.Name == s.NodeName {
// Node doesn't need to connect to itself
return nil
}
@@ -136,7 +140,7 @@
if err != nil {
return fmt.Errorf("node public-key annotation not decodable: %w", err)
}
- err = c.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Peers: []wgtypes.PeerConfig{{
PublicKey: pubKey,
Remove: true,
@@ -148,119 +152,121 @@
return nil
}
-// EnsureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
-func EnsureOnDiskKey() (*wgtypes.Key, error) {
- privKeyRaw, err := ioutil.ReadFile(privateKeyPath)
+// ensureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
+func (s *Service) ensureOnDiskKey() error {
+ keyRaw, err := s.DataDirectory.Key.Read()
if os.IsNotExist(err) {
- privKey, err := wgtypes.GeneratePrivateKey()
+ key, err := wgtypes.GeneratePrivateKey()
if err != nil {
- return nil, fmt.Errorf("failed to generate private key: %w", err)
+ return fmt.Errorf("failed to generate private key: %w", err)
}
- if err := ioutil.WriteFile(privateKeyPath, []byte(privKey.String()), 0600); err != nil {
- return nil, fmt.Errorf("failed to store newly generated key: %w", err)
+ if err := s.DataDirectory.Key.Write([]byte(key.String()), 0600); err != nil {
+ return fmt.Errorf("failed to store newly generated key: %w", err)
}
- return &privKey, nil
+
+ s.privKey = key
+ return nil
} else if err != nil {
- return nil, fmt.Errorf("failed to load on-disk key: %w", err)
+ return fmt.Errorf("failed to load on-disk key: %w", err)
}
- privKey, err := wgtypes.ParseKey(string(privKeyRaw))
+
+ key, err := wgtypes.ParseKey(string(keyRaw))
if err != nil {
- return nil, fmt.Errorf("invalid private key in file: %w", err)
+ return fmt.Errorf("invalid private key in file: %w", err)
}
- return &privKey, nil
+ s.privKey = key
+ return nil
+}
+
+// annotateThisNode annotates the node (as defined by NodeName) with the wireguard public key of this node.
+func (s *Service) annotateThisNode(ctx context.Context) error {
+ patch := []jsonpatch.JsonPatchOp{{
+ Operation: "add",
+ Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
+ Value: s.privKey.PublicKey().String(),
+ }}
+
+ patchRaw, err := json.Marshal(patch)
+ if err != nil {
+ return fmt.Errorf("failed to encode JSONPatch: %w", err)
+ }
+
+ if _, err := s.Kubernetes.CoreV1().Nodes().Patch(ctx, s.NodeName, types.JSONPatchType, patchRaw, metav1.PatchOptions{}); err != nil {
+ return fmt.Errorf("failed to patch resource: %w", err)
+ }
+
+ return nil
}
// Run runs the ClusterNet service. See package description for what it does.
-func Run(informerFactory informers.SharedInformerFactory, clusterNet net.IPNet, clientSet kubernetes.Interface, key *wgtypes.Key) supervisor.Runnable {
- return func(ctx context.Context) error {
- logger := supervisor.Logger(ctx)
- nodeName, err := os.Hostname()
- if err != nil {
- return fmt.Errorf("failed to determine hostname: %w", err)
- }
- wgClient, err := wgctrl.New()
- if err != nil {
- return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
- }
+func (s *Service) Run(ctx context.Context) error {
+ logger := supervisor.Logger(ctx)
- nodeAnnotationPatch := []jsonpatch.JsonPatchOp{{
- Operation: "add",
- Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
- Value: key.PublicKey().String(),
- }}
-
- nodeAnnotationPatchRaw, err := json.Marshal(nodeAnnotationPatch)
- if err != nil {
- return fmt.Errorf("failed to encode JSONPatch: %w", err)
- }
-
- if _, err := clientSet.CoreV1().Nodes().Patch(ctx, nodeName, types.JSONPatchType, nodeAnnotationPatchRaw, metav1.PatchOptions{}); err != nil {
- return fmt.Errorf("failed to set ClusterNet public key for node: %w", err)
- }
-
- nodeInformer := informerFactory.Core().V1().Nodes()
- wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
- if err := netlink.LinkAdd(wgInterface); err != nil {
- return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
- }
- defer netlink.LinkDel(wgInterface)
-
- listenPort := common.WireGuardPort
- if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
- PrivateKey: key,
- ListenPort: &listenPort,
- }); err != nil {
- return fmt.Errorf("failed to set up WireGuard interface: %w", err)
- }
-
- if err := netlink.RouteAdd(&netlink.Route{
- Dst: &clusterNet,
- LinkIndex: wgInterface.Index,
- }); err != nil && !os.IsExist(err) {
- return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
- }
-
- cnet := clusternet{
- wgClient: wgClient,
- nodeInformer: nodeInformer,
- logger: logger,
- nodeName: nodeName,
- }
-
- nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: func(new interface{}) {
- newNode, ok := new.(*corev1.Node)
- if !ok {
- logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
- return
- }
- if err := cnet.ensureNode(newNode); err != nil {
- logger.Warn("Failed to sync node", zap.Error(err))
- }
- },
- UpdateFunc: func(old, new interface{}) {
- newNode, ok := new.(*corev1.Node)
- if !ok {
- logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
- return
- }
- if err := cnet.ensureNode(newNode); err != nil {
- logger.Warn("Failed to sync node", zap.Error(err))
- }
- },
- DeleteFunc: func(old interface{}) {
- oldNode, ok := old.(*corev1.Node)
- if !ok {
- logger.Error("Received non-node item in node event handler", zap.Reflect("item", oldNode))
- return
- }
- if err := cnet.removeNode(oldNode); err != nil {
- logger.Warn("Failed to sync node", zap.Error(err))
- }
- },
- })
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- nodeInformer.Informer().Run(ctx.Done())
- return ctx.Err()
+ wgClient, err := wgctrl.New()
+ if err != nil {
+ return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
}
+
+ wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
+ if err := netlink.LinkAdd(wgInterface); err != nil {
+ return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
+ }
+ defer netlink.LinkDel(wgInterface)
+
+ listenPort := common.WireGuardPort
+ if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ PrivateKey: &s.privKey,
+ ListenPort: &listenPort,
+ }); err != nil {
+ return fmt.Errorf("failed to set up WireGuard interface: %w", err)
+ }
+
+ if err := netlink.RouteAdd(&netlink.Route{
+ Dst: &s.ClusterNet,
+ LinkIndex: wgInterface.Index,
+ }); err != nil && !os.IsExist(err) {
+ return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
+ }
+
+ if err := s.annotateThisNode(ctx); err != nil {
+ return fmt.Errorf("when annotating this node with public key: %w", err)
+ }
+
+ nodeInformer := s.InformerFactory.Core().V1().Nodes()
+ nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(new interface{}) {
+ newNode, ok := new.(*corev1.Node)
+ if !ok {
+ logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
+ return
+ }
+ if err := s.ensureNode(newNode); err != nil {
+ logger.Warn("Failed to sync node", zap.Error(err))
+ }
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newNode, ok := new.(*corev1.Node)
+ if !ok {
+ logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
+ return
+ }
+ if err := s.ensureNode(newNode); err != nil {
+ logger.Warn("Failed to sync node", zap.Error(err))
+ }
+ },
+ DeleteFunc: func(old interface{}) {
+ oldNode, ok := old.(*corev1.Node)
+ if !ok {
+ logger.Error("Received non-node item in node event handler", zap.Reflect("item", oldNode))
+ return
+ }
+ if err := s.removeNode(oldNode); err != nil {
+ logger.Warn("Failed to sync node", zap.Error(err))
+ }
+ },
+ })
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ nodeInformer.Informer().Run(ctx.Done())
+ return ctx.Err()
}