core -> metropolis
Smalltown is now called Metropolis!
This is the first commit in a series of cleanup commits that prepare us
for an open source release. This one just some Bazel packages around to
follow a stricter directory layout.
All of Metropolis now lives in `//metropolis`.
All of Metropolis Node code now lives in `//metropolis/node`.
All of the main /init now lives in `//m/n/core`.
All of the Kubernetes functionality/glue now lives in `//m/n/kubernetes`.
Next steps:
- hunt down all references to Smalltown and replace them appropriately
- narrow down visibility rules
- document new code organization
- move `//build/toolchain` to `//monogon/build/toolchain`
- do another cleanup pass between `//golibs` and
`//monogon/node/{core,common}`.
- remove `//delta` and `//anubis`
Fixes T799.
Test Plan: Just a very large refactor. CI should help us out here.
Bug: T799
X-Origin-Diff: phab/D667
GitOrigin-RevId: 6029b8d4edc42325d50042596b639e8b122d0ded
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
new file mode 100644
index 0000000..f1fa849
--- /dev/null
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -0,0 +1,54 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "apiserver.go",
+ "controller-manager.go",
+ "csi.go",
+ "kubelet.go",
+ "provisioner.go",
+ "scheduler.go",
+ "service.go",
+ ],
+ importpath = "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes",
+ visibility = ["//metropolis/node:__subpackages__"],
+ deps = [
+ "//metropolis/node:go_default_library",
+ "//metropolis/node/common/fileargs:go_default_library",
+ "//metropolis/node/common/fsquota:go_default_library",
+ "//metropolis/node/common/supervisor:go_default_library",
+ "//metropolis/node/core/localstorage:go_default_library",
+ "//metropolis/node/core/localstorage/declarative:go_default_library",
+ "//metropolis/node/core/logtree:go_default_library",
+ "//metropolis/node/core/network/dns:go_default_library",
+ "//metropolis/node/kubernetes/clusternet:go_default_library",
+ "//metropolis/node/kubernetes/nfproxy:go_default_library",
+ "//metropolis/node/kubernetes/pki:go_default_library",
+ "//metropolis/node/kubernetes/reconciler:go_default_library",
+ "//metropolis/proto/api:go_default_library",
+ "@com_github_container_storage_interface_spec//lib/go/csi:go_default_library",
+ "@io_bazel_rules_go//proto/wkt:wrappers_go_proto",
+ "@io_k8s_api//core/v1:go_default_library",
+ "@io_k8s_api//storage/v1:go_default_library",
+ "@io_k8s_apimachinery//pkg/api/errors:go_default_library",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
+ "@io_k8s_client_go//informers:go_default_library",
+ "@io_k8s_client_go//informers/core/v1:go_default_library",
+ "@io_k8s_client_go//informers/storage/v1:go_default_library",
+ "@io_k8s_client_go//kubernetes:go_default_library",
+ "@io_k8s_client_go//kubernetes/scheme:go_default_library",
+ "@io_k8s_client_go//kubernetes/typed/core/v1:go_default_library",
+ "@io_k8s_client_go//tools/cache:go_default_library",
+ "@io_k8s_client_go//tools/clientcmd:go_default_library",
+ "@io_k8s_client_go//tools/record:go_default_library",
+ "@io_k8s_client_go//tools/reference:go_default_library",
+ "@io_k8s_client_go//util/workqueue:go_default_library",
+ "@io_k8s_kubelet//config/v1beta1:go_default_library",
+ "@io_k8s_kubelet//pkg/apis/pluginregistration/v1:go_default_library",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//codes:go_default_library",
+ "@org_golang_google_grpc//status:go_default_library",
+ "@org_golang_x_sys//unix:go_default_library",
+ ],
+)
diff --git a/metropolis/node/kubernetes/apiserver.go b/metropolis/node/kubernetes/apiserver.go
new file mode 100644
index 0000000..583e268
--- /dev/null
+++ b/metropolis/node/kubernetes/apiserver.go
@@ -0,0 +1,137 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kubernetes
+
+import (
+ "context"
+ "encoding/pem"
+ "fmt"
+ "io"
+ "net"
+ "os/exec"
+
+ common "git.monogon.dev/source/nexantic.git/metropolis/node"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/fileargs"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/pki"
+)
+
+type apiserverService struct {
+ KPKI *pki.KubernetesPKI
+ AdvertiseAddress net.IP
+ ServiceIPRange net.IPNet
+ Output io.Writer
+ EphemeralConsensusDirectory *localstorage.EphemeralConsensusDirectory
+
+ // All PKI-related things are in DER
+ idCA []byte
+ kubeletClientCert []byte
+ kubeletClientKey []byte
+ aggregationCA []byte
+ aggregationClientCert []byte
+ aggregationClientKey []byte
+ serviceAccountPrivKey []byte // In PKIX form
+ serverCert []byte
+ serverKey []byte
+}
+
+func (s *apiserverService) loadPKI(ctx context.Context) error {
+ for _, el := range []struct {
+ targetCert *[]byte
+ targetKey *[]byte
+ name pki.KubeCertificateName
+ }{
+ {&s.idCA, nil, pki.IdCA},
+ {&s.kubeletClientCert, &s.kubeletClientKey, pki.KubeletClient},
+ {&s.aggregationCA, nil, pki.AggregationCA},
+ {&s.aggregationClientCert, &s.aggregationClientKey, pki.FrontProxyClient},
+ {&s.serverCert, &s.serverKey, pki.APIServer},
+ } {
+ cert, key, err := s.KPKI.Certificate(ctx, el.name)
+ if err != nil {
+ return fmt.Errorf("could not load certificate %q from PKI: %w", el.name, err)
+ }
+ if el.targetCert != nil {
+ *el.targetCert = cert
+ }
+ if el.targetKey != nil {
+ *el.targetKey = key
+ }
+ }
+
+ var err error
+ s.serviceAccountPrivKey, err = s.KPKI.ServiceAccountKey(ctx)
+ if err != nil {
+ return fmt.Errorf("could not load serviceaccount privkey: %w", err)
+ }
+ return nil
+}
+
+func (s *apiserverService) Run(ctx context.Context) error {
+ if err := s.loadPKI(ctx); err != nil {
+ return fmt.Errorf("loading PKI data failed: %w", err)
+ }
+ args, err := fileargs.New()
+ if err != nil {
+ panic(err) // If this fails, something is very wrong. Just crash.
+ }
+ defer args.Close()
+
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-apiserver",
+ fmt.Sprintf("--advertise-address=%v", s.AdvertiseAddress.String()),
+ "--authorization-mode=Node,RBAC",
+ args.FileOpt("--client-ca-file", "client-ca.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: s.idCA})),
+ "--enable-admission-plugins=NodeRestriction,PodSecurityPolicy",
+ "--enable-aggregator-routing=true",
+ "--insecure-port=0",
+ fmt.Sprintf("--secure-port=%v", common.KubernetesAPIPort),
+ fmt.Sprintf("--etcd-servers=unix:///%s:0", s.EphemeralConsensusDirectory.ClientSocket.FullPath()),
+ args.FileOpt("--kubelet-client-certificate", "kubelet-client-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: s.kubeletClientCert})),
+ args.FileOpt("--kubelet-client-key", "kubelet-client-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: s.kubeletClientKey})),
+ "--kubelet-preferred-address-types=InternalIP",
+ args.FileOpt("--proxy-client-cert-file", "aggregation-client-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: s.aggregationClientCert})),
+ args.FileOpt("--proxy-client-key-file", "aggregation-client-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: s.aggregationClientKey})),
+ "--requestheader-allowed-names=front-proxy-client",
+ args.FileOpt("--requestheader-client-ca-file", "aggregation-ca.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: s.aggregationCA})),
+ "--requestheader-extra-headers-prefix=X-Remote-Extra-",
+ "--requestheader-group-headers=X-Remote-Group",
+ "--requestheader-username-headers=X-Remote-User",
+ args.FileOpt("--service-account-key-file", "service-account-pubkey.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: s.serviceAccountPrivKey})),
+ fmt.Sprintf("--service-cluster-ip-range=%v", s.ServiceIPRange.String()),
+ args.FileOpt("--tls-cert-file", "server-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: s.serverCert})),
+ args.FileOpt("--tls-private-key-file", "server-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: s.serverKey})),
+ )
+ if args.Error() != nil {
+ return err
+ }
+ cmd.Stdout = s.Output
+ cmd.Stderr = s.Output
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ err = cmd.Run()
+ fmt.Fprintf(s.Output, "apiserver stopped: %v\n", err)
+ return err
+}
diff --git a/metropolis/node/kubernetes/clusternet/BUILD.bazel b/metropolis/node/kubernetes/clusternet/BUILD.bazel
new file mode 100644
index 0000000..9e9cc01
--- /dev/null
+++ b/metropolis/node/kubernetes/clusternet/BUILD.bazel
@@ -0,0 +1,27 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "clusternet.go",
+ "netlink_compat.go",
+ ],
+ importpath = "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/clusternet",
+ visibility = ["//metropolis/node/kubernetes:__subpackages__"],
+ deps = [
+ "//metropolis/node:go_default_library",
+ "//metropolis/node/common/jsonpatch:go_default_library",
+ "//metropolis/node/common/supervisor:go_default_library",
+ "//metropolis/node/core/localstorage:go_default_library",
+ "//metropolis/node/core/logtree:go_default_library",
+ "@com_github_vishvananda_netlink//:go_default_library",
+ "@com_zx2c4_golang_wireguard_wgctrl//:go_default_library",
+ "@com_zx2c4_golang_wireguard_wgctrl//wgtypes:go_default_library",
+ "@io_k8s_api//core/v1:go_default_library",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
+ "@io_k8s_apimachinery//pkg/types:go_default_library",
+ "@io_k8s_client_go//informers:go_default_library",
+ "@io_k8s_client_go//kubernetes:go_default_library",
+ "@io_k8s_client_go//tools/cache:go_default_library",
+ ],
+)
diff --git a/metropolis/node/kubernetes/clusternet/clusternet.go b/metropolis/node/kubernetes/clusternet/clusternet.go
new file mode 100644
index 0000000..d8dc7ad
--- /dev/null
+++ b/metropolis/node/kubernetes/clusternet/clusternet.go
@@ -0,0 +1,276 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package clusternet implements a WireGuard-based overlay network for Kubernetes. It relies on controller-manager's
+// IPAM to assign IP ranges to nodes and on Kubernetes' Node objects to distribute the Node IPs and public keys.
+//
+// It sets up a single WireGuard network interface and routes the entire ClusterCIDR into that network interface,
+// relying on WireGuard's AllowedIPs mechanism to look up the correct peer node to send the traffic to. This means
+// that the routing table doesn't change and doesn't have to be separately managed. When clusternet is started
+// it annotates its WireGuard public key onto its node object.
+// For each node object that's created or updated on the K8s apiserver it checks if a public key annotation is set and
+// if yes a peer with that public key, its InternalIP as endpoint and the CIDR for that node as AllowedIPs is created.
+package clusternet
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net"
+ "os"
+
+ "github.com/vishvananda/netlink"
+ "golang.zx2c4.com/wireguard/wgctrl"
+ "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+
+ common "git.monogon.dev/source/nexantic.git/metropolis/node"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/jsonpatch"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/logtree"
+)
+
+const (
+ clusterNetDeviceName = "clusternet"
+ publicKeyAnnotation = "node.smalltown.nexantic.com/wg-pubkey"
+)
+
+type Service struct {
+ NodeName string
+ Kubernetes kubernetes.Interface
+ ClusterNet net.IPNet
+ InformerFactory informers.SharedInformerFactory
+ DataDirectory *localstorage.DataKubernetesClusterNetworkingDirectory
+
+ wgClient *wgctrl.Client
+ privKey wgtypes.Key
+ logger logtree.LeveledLogger
+}
+
+// ensureNode creates/updates the corresponding WireGuard peer entry for the given node objet
+func (s *Service) ensureNode(newNode *corev1.Node) error {
+ if newNode.Name == s.NodeName {
+ // Node doesn't need to connect to itself
+ return nil
+ }
+ pubKeyRaw := newNode.Annotations[publicKeyAnnotation]
+ if pubKeyRaw == "" {
+ return nil
+ }
+ pubKey, err := wgtypes.ParseKey(pubKeyRaw)
+ if err != nil {
+ return fmt.Errorf("failed to parse public-key annotation: %w", err)
+ }
+ var internalIP net.IP
+ for _, addr := range newNode.Status.Addresses {
+ if addr.Type == corev1.NodeInternalIP {
+ if internalIP != nil {
+ s.logger.Warningf("More than one NodeInternalIP specified, using the first one")
+ break
+ }
+ internalIP = net.ParseIP(addr.Address)
+ if internalIP == nil {
+ s.logger.Warningf("Failed to parse Internal IP %s", addr.Address)
+ }
+ }
+ }
+ if internalIP == nil {
+ return errors.New("node has no Internal IP")
+ }
+ var allowedIPs []net.IPNet
+ for _, podNetStr := range newNode.Spec.PodCIDRs {
+ _, podNet, err := net.ParseCIDR(podNetStr)
+ if err != nil {
+ s.logger.Warningf("Node %s PodCIDR failed to parse, ignored: %v", newNode.Name, err)
+ continue
+ }
+ allowedIPs = append(allowedIPs, *podNet)
+ }
+ allowedIPs = append(allowedIPs, net.IPNet{IP: internalIP, Mask: net.CIDRMask(32, 32)})
+ s.logger.V(1).Infof("Adding/Updating WireGuard peer node %s, endpoint %s, allowedIPs %+v", newNode.Name, internalIP.String(), allowedIPs)
+ // WireGuard's kernel side has create/update semantics on peers by default. So we can just add the peer multiple
+ // times to update it.
+ err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ Peers: []wgtypes.PeerConfig{{
+ PublicKey: pubKey,
+ Endpoint: &net.UDPAddr{Port: common.WireGuardPort, IP: internalIP},
+ ReplaceAllowedIPs: true,
+ AllowedIPs: allowedIPs,
+ }},
+ })
+ if err != nil {
+ return fmt.Errorf("failed to add WireGuard peer node: %w", err)
+ }
+ return nil
+}
+
+// removeNode removes the corresponding WireGuard peer entry for the given node object
+func (s *Service) removeNode(oldNode *corev1.Node) error {
+ if oldNode.Name == s.NodeName {
+ // Node doesn't need to connect to itself
+ return nil
+ }
+ pubKeyRaw := oldNode.Annotations[publicKeyAnnotation]
+ if pubKeyRaw == "" {
+ return nil
+ }
+ pubKey, err := wgtypes.ParseKey(pubKeyRaw)
+ if err != nil {
+ return fmt.Errorf("node public-key annotation not decodable: %w", err)
+ }
+ err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ Peers: []wgtypes.PeerConfig{{
+ PublicKey: pubKey,
+ Remove: true,
+ }},
+ })
+ if err != nil {
+ return fmt.Errorf("failed to remove WireGuard peer node: %w", err)
+ }
+ return nil
+}
+
+// ensureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
+func (s *Service) ensureOnDiskKey() error {
+ keyRaw, err := s.DataDirectory.Key.Read()
+ if os.IsNotExist(err) {
+ key, err := wgtypes.GeneratePrivateKey()
+ if err != nil {
+ return fmt.Errorf("failed to generate private key: %w", err)
+ }
+ if err := s.DataDirectory.Key.Write([]byte(key.String()), 0600); err != nil {
+ return fmt.Errorf("failed to store newly generated key: %w", err)
+ }
+
+ s.privKey = key
+ return nil
+ } else if err != nil {
+ return fmt.Errorf("failed to load on-disk key: %w", err)
+ }
+
+ key, err := wgtypes.ParseKey(string(keyRaw))
+ if err != nil {
+ return fmt.Errorf("invalid private key in file: %w", err)
+ }
+ s.privKey = key
+ return nil
+}
+
+// annotateThisNode annotates the node (as defined by NodeName) with the wireguard public key of this node.
+func (s *Service) annotateThisNode(ctx context.Context) error {
+ patch := []jsonpatch.JsonPatchOp{{
+ Operation: "add",
+ Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
+ Value: s.privKey.PublicKey().String(),
+ }}
+
+ patchRaw, err := json.Marshal(patch)
+ if err != nil {
+ return fmt.Errorf("failed to encode JSONPatch: %w", err)
+ }
+
+ if _, err := s.Kubernetes.CoreV1().Nodes().Patch(ctx, s.NodeName, types.JSONPatchType, patchRaw, metav1.PatchOptions{}); err != nil {
+ return fmt.Errorf("failed to patch resource: %w", err)
+ }
+
+ return nil
+}
+
+// Run runs the ClusterNet service. See package description for what it does.
+func (s *Service) Run(ctx context.Context) error {
+ logger := supervisor.Logger(ctx)
+ s.logger = logger
+
+ wgClient, err := wgctrl.New()
+ if err != nil {
+ return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
+ }
+ s.wgClient = wgClient
+
+ if err := s.ensureOnDiskKey(); err != nil {
+ return fmt.Errorf("failed to ensure on-disk key: %w", err)
+ }
+
+ wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
+ if err := netlink.LinkAdd(wgInterface); err != nil {
+ return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
+ }
+ defer netlink.LinkDel(wgInterface)
+
+ listenPort := common.WireGuardPort
+ if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ PrivateKey: &s.privKey,
+ ListenPort: &listenPort,
+ }); err != nil {
+ return fmt.Errorf("failed to set up WireGuard interface: %w", err)
+ }
+
+ if err := netlink.RouteAdd(&netlink.Route{
+ Dst: &s.ClusterNet,
+ LinkIndex: wgInterface.Index,
+ }); err != nil && !os.IsExist(err) {
+ return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
+ }
+
+ if err := s.annotateThisNode(ctx); err != nil {
+ return fmt.Errorf("when annotating this node with public key: %w", err)
+ }
+
+ nodeInformer := s.InformerFactory.Core().V1().Nodes()
+ nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(new interface{}) {
+ newNode, ok := new.(*corev1.Node)
+ if !ok {
+ logger.Errorf("Received non-node item %+v in node event handler", new)
+ return
+ }
+ if err := s.ensureNode(newNode); err != nil {
+ logger.Warningf("Failed to sync node: %v", err)
+ }
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newNode, ok := new.(*corev1.Node)
+ if !ok {
+ logger.Errorf("Received non-node item %+v in node event handler", new)
+ return
+ }
+ if err := s.ensureNode(newNode); err != nil {
+ logger.Warningf("Failed to sync node: %v", err)
+ }
+ },
+ DeleteFunc: func(old interface{}) {
+ oldNode, ok := old.(*corev1.Node)
+ if !ok {
+ logger.Errorf("Received non-node item %+v in node event handler", oldNode)
+ return
+ }
+ if err := s.removeNode(oldNode); err != nil {
+ logger.Warningf("Failed to sync node: %v", err)
+ }
+ },
+ })
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ nodeInformer.Informer().Run(ctx.Done())
+ return ctx.Err()
+}
diff --git a/metropolis/node/kubernetes/clusternet/netlink_compat.go b/metropolis/node/kubernetes/clusternet/netlink_compat.go
new file mode 100644
index 0000000..a90cc47
--- /dev/null
+++ b/metropolis/node/kubernetes/clusternet/netlink_compat.go
@@ -0,0 +1,33 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Can be removed once https://github.com/vishvananda/netlink/pull/464 lands
+package clusternet
+
+import "github.com/vishvananda/netlink"
+
+// Wireguard represent links of type "wireguard", see https://www.wireguard.com/
+type Wireguard struct {
+ netlink.LinkAttrs
+}
+
+func (wg *Wireguard) Attrs() *netlink.LinkAttrs {
+ return &wg.LinkAttrs
+}
+
+func (wg *Wireguard) Type() string {
+ return "wireguard"
+}
diff --git a/metropolis/node/kubernetes/containerd/BUILD.bazel b/metropolis/node/kubernetes/containerd/BUILD.bazel
new file mode 100644
index 0000000..9e42595
--- /dev/null
+++ b/metropolis/node/kubernetes/containerd/BUILD.bazel
@@ -0,0 +1,20 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = ["main.go"],
+ importpath = "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/containerd",
+ visibility = ["//metropolis/node/core:__subpackages__"],
+ deps = [
+ "//metropolis/node/common/supervisor:go_default_library",
+ "//metropolis/node/core/localstorage:go_default_library",
+ "@com_github_containerd_containerd//:go_default_library",
+ "@com_github_containerd_containerd//namespaces:go_default_library",
+ ],
+)
+
+exports_files([
+ "config.toml",
+ "runsc.toml",
+ "cnispec.gojson",
+])
diff --git a/metropolis/node/kubernetes/containerd/cnispec.gojson b/metropolis/node/kubernetes/containerd/cnispec.gojson
new file mode 100644
index 0000000..0057036
--- /dev/null
+++ b/metropolis/node/kubernetes/containerd/cnispec.gojson
@@ -0,0 +1,29 @@
+{{- /*gotype: github.com/containerd/cri/pkg/server.cniConfigTemplate*/ -}}
+{
+ "name": "k8s-pod-network",
+ "cniVersion": "0.3.1",
+ "plugins": [
+ {
+ "type": "ptp",
+ "mtu": 1420,
+ "ipam": {
+ "type": "host-local",
+ "dataDir": "/containerd/run/ipam",
+ "ranges": [
+ {{range $i, $range := .PodCIDRRanges}}{{if $i}},
+ {{end}}[
+ {
+ "subnet": "{{$range}}"
+ }
+ ]
+ {{end}}
+ ],
+ "routes": [
+ {{range $i, $route := .Routes}}{{if $i}},
+ {{end}}{
+ "dst": "{{$route}}"
+}{{end}}]
+}
+}
+]
+}
\ No newline at end of file
diff --git a/metropolis/node/kubernetes/containerd/config.toml b/metropolis/node/kubernetes/containerd/config.toml
new file mode 100644
index 0000000..f8c7fb1
--- /dev/null
+++ b/metropolis/node/kubernetes/containerd/config.toml
@@ -0,0 +1,125 @@
+version = 2
+root = "/data/containerd"
+state = "/ephemeral/containerd"
+plugin_dir = ""
+disabled_plugins = []
+required_plugins = []
+oom_score = 0
+
+[grpc]
+ address = "/ephemeral/containerd/client.sock"
+ tcp_address = ""
+ tcp_tls_cert = ""
+ tcp_tls_key = ""
+ uid = 0
+ gid = 0
+ max_recv_message_size = 16777216
+ max_send_message_size = 16777216
+
+[ttrpc]
+ address = ""
+ uid = 0
+ gid = 0
+
+[debug]
+ address = ""
+ uid = 0
+ gid = 0
+ level = ""
+
+[metrics]
+ address = ""
+ grpc_histogram = false
+
+[cgroup]
+ path = ""
+
+[timeouts]
+ "io.containerd.timeout.shim.cleanup" = "5s"
+ "io.containerd.timeout.shim.load" = "5s"
+ "io.containerd.timeout.shim.shutdown" = "3s"
+ "io.containerd.timeout.task.state" = "2s"
+
+[plugins]
+ [plugins."io.containerd.gc.v1.scheduler"]
+ pause_threshold = 0.02
+ deletion_threshold = 0
+ mutation_threshold = 100
+ schedule_delay = "0s"
+ startup_delay = "100ms"
+ [plugins."io.containerd.grpc.v1.cri"]
+ disable_tcp_service = true
+ stream_server_address = "127.0.0.1"
+ stream_server_port = "0"
+ stream_idle_timeout = "4h0m0s"
+ enable_selinux = false
+ sandbox_image = "k8s.gcr.io/pause:3.1"
+ stats_collect_period = 10
+ systemd_cgroup = false
+ enable_tls_streaming = false
+ ignore_image_defined_volumes = true
+ max_container_log_line_size = 16384
+ disable_cgroup = false
+ disable_apparmor = true
+ restrict_oom_score_adj = false
+ max_concurrent_downloads = 3
+ disable_proc_mount = false
+ [plugins."io.containerd.grpc.v1.cri".containerd]
+ snapshotter = "overlayfs"
+ default_runtime_name = "runsc"
+ no_pivot = false
+ [plugins."io.containerd.grpc.v1.cri".containerd.default_runtime]
+ runtime_type = ""
+ runtime_engine = ""
+ runtime_root = ""
+ privileged_without_host_devices = false
+ [plugins."io.containerd.grpc.v1.cri".containerd.untrusted_workload_runtime]
+ runtime_type = ""
+ runtime_engine = ""
+ runtime_root = ""
+ privileged_without_host_devices = false
+ [plugins."io.containerd.grpc.v1.cri".containerd.runtimes]
+ [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runsc]
+ runtime_type = "io.containerd.runsc.v1"
+ runtime_engine = ""
+ runtime_root = ""
+ privileged_without_host_devices = false
+ [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runsc.options]
+ TypeUrl = "io.containerd.runsc.v1.options"
+ ConfigPath = "/containerd/conf/runsc.toml"
+ [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc]
+ runtime_type = "io.containerd.runc.v2"
+ runtime_engine = ""
+ runtime_root = ""
+ privileged_without_host_devices = false
+ base_runtime_spec = ""
+ [plugins."io.containerd.grpc.v1.cri".cni]
+ bin_dir = "/containerd/bin/cni"
+ conf_dir = "/containerd/conf/cni"
+ max_conf_num = 0
+ conf_template = "/containerd/conf/cnispec.gojson"
+ [plugins."io.containerd.grpc.v1.cri".registry]
+ [plugins."io.containerd.grpc.v1.cri".registry.mirrors]
+ [plugins."io.containerd.grpc.v1.cri".registry.mirrors."docker.io"]
+ endpoint = ["https://registry-1.docker.io"]
+ [plugins."io.containerd.grpc.v1.cri".x509_key_pair_streaming]
+ tls_cert_file = ""
+ tls_key_file = ""
+ [plugins."io.containerd.internal.v1.opt"]
+ path = "/containerd/bin"
+ [plugins."io.containerd.internal.v1.restart"]
+ interval = "10s"
+ [plugins."io.containerd.metadata.v1.bolt"]
+ content_sharing_policy = "shared"
+ [plugins."io.containerd.monitor.v1.cgroups"]
+ no_prometheus = false
+ [plugins."io.containerd.runtime.v1.linux"]
+ shim = "containerd-shim"
+ runtime = "noop"
+ runtime_root = ""
+ no_shim = false
+ shim_debug = false
+ [plugins."io.containerd.runtime.v2.task"]
+ platforms = ["linux/amd64"]
+ [plugins."io.containerd.service.v1.diff-service"]
+ default = ["walking"]
\ No newline at end of file
diff --git a/metropolis/node/kubernetes/containerd/main.go b/metropolis/node/kubernetes/containerd/main.go
new file mode 100644
index 0000000..366f902
--- /dev/null
+++ b/metropolis/node/kubernetes/containerd/main.go
@@ -0,0 +1,146 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package containerd
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "time"
+
+ ctr "github.com/containerd/containerd"
+ "github.com/containerd/containerd/namespaces"
+
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
+)
+
+const (
+ preseedNamespacesDir = "/containerd/preseed/"
+)
+
+type Service struct {
+ EphemeralVolume *localstorage.EphemeralContainerdDirectory
+}
+
+func (s *Service) Run(ctx context.Context) error {
+ cmd := exec.CommandContext(ctx, "/containerd/bin/containerd", "--config", "/containerd/conf/config.toml")
+ cmd.Env = []string{"PATH=/containerd/bin", "TMPDIR=" + s.EphemeralVolume.Tmp.FullPath()}
+
+ runscFifo, err := os.OpenFile(s.EphemeralVolume.RunSCLogsFIFO.FullPath(), os.O_CREATE|os.O_RDONLY, os.ModeNamedPipe|0777)
+ if err != nil {
+ return err
+ }
+
+ if err := supervisor.Run(ctx, "runsc", s.logPump(runscFifo)); err != nil {
+ return fmt.Errorf("failed to start runsc log pump: %w", err)
+ }
+
+ if err := supervisor.Run(ctx, "preseed", s.runPreseed); err != nil {
+ return fmt.Errorf("failed to start preseed runnable: %w", err)
+ }
+ return supervisor.RunCommand(ctx, cmd)
+}
+
+// logPump returns a runnable that pipes data from a file/FIFO into its raw logger.
+// TODO(q3k): refactor this out to a generic function in supervisor or logtree.
+func (s *Service) logPump(fifo *os.File) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ for {
+ // Quit if requested.
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ n, err := io.Copy(supervisor.RawLogger(ctx), fifo)
+ if n == 0 && err == nil {
+ // Hack because pipes/FIFOs can return zero reads when nobody is writing. To avoid busy-looping,
+ // sleep a bit before retrying. This does not loose data since the FIFO internal buffer will
+ // stall writes when it becomes full. 10ms maximum stall in a non-latency critical process (reading
+ // debug logs) is not an issue for us.
+ time.Sleep(10 * time.Millisecond)
+ } else if err != nil {
+ return fmt.Errorf("log pump failed: %v", err)
+ }
+ }
+ }
+}
+
+// runPreseed loads OCI bundles in tar form from preseedNamespacesDir into containerd at startup.
+// This can be run multiple times, containerd will automatically dedup the layers.
+// containerd uses namespaces to keep images (and everything else) separate so to define where the images will be loaded
+// to they need to be in a folder named after the namespace they should be loaded into.
+// containerd's CRI plugin (which is built as part of containerd) uses a hardcoded namespace ("k8s.io") for everything
+// accessed through CRI, so if an image should be available on K8s it needs to be in that namespace.
+// As an example if image helloworld should be loaded for use with Kubernetes, the OCI bundle needs to be at
+// <preseedNamespacesDir>/k8s.io/helloworld.tar. No tagging beyond what's in the bundle is performed.
+func (s *Service) runPreseed(ctx context.Context) error {
+ client, err := ctr.New(s.EphemeralVolume.ClientSocket.FullPath())
+ if err != nil {
+ return fmt.Errorf("failed to connect to containerd: %w", err)
+ }
+ logger := supervisor.Logger(ctx)
+ preseedNamespaceDirs, err := ioutil.ReadDir(preseedNamespacesDir)
+ if err != nil {
+ return fmt.Errorf("failed to open preseed dir: %w", err)
+ }
+ for _, dir := range preseedNamespaceDirs {
+ if !dir.IsDir() {
+ logger.Warningf("Non-Directory %q found in preseed folder, ignoring", dir.Name())
+ continue
+ }
+ namespace := dir.Name()
+ images, err := ioutil.ReadDir(filepath.Join(preseedNamespacesDir, namespace))
+ if err != nil {
+ return fmt.Errorf("failed to list namespace preseed directory for ns \"%v\": %w", namespace, err)
+ }
+ ctxWithNS := namespaces.WithNamespace(ctx, namespace)
+ for _, image := range images {
+ if image.IsDir() {
+ logger.Warningf("Directory %q found in preseed namespaced folder, ignoring", image.Name())
+ continue
+ }
+ imageFile, err := os.Open(filepath.Join(preseedNamespacesDir, namespace, image.Name()))
+ if err != nil {
+ return fmt.Errorf("failed to open preseed image \"%v\": %w", image.Name(), err)
+ }
+ // defer in this loop is fine since we're never going to preseed more than ~1M images which is where our
+ // file descriptor limit is.
+ defer imageFile.Close()
+ importedImages, err := client.Import(ctxWithNS, imageFile)
+ if err != nil {
+ return fmt.Errorf("failed to import preseed image: %w", err)
+ }
+ var importedImageNames []string
+ for _, img := range importedImages {
+ importedImageNames = append(importedImageNames, img.Name)
+ }
+ logger.Infof("Successfully imported preseeded bundle %s/%s into containerd", namespace, strings.Join(importedImageNames, ","))
+ }
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+}
diff --git a/metropolis/node/kubernetes/containerd/runsc.toml b/metropolis/node/kubernetes/containerd/runsc.toml
new file mode 100644
index 0000000..4fe0751
--- /dev/null
+++ b/metropolis/node/kubernetes/containerd/runsc.toml
@@ -0,0 +1,6 @@
+root = "/ephemeral/containerd/runsc"
+[runsc_config]
+debug = "true"
+debug-log = "/ephemeral/containerd/runsc-logs.fifo"
+panic-log = "/ephemeral/containerd/runsc-logs.fifo"
+log = "/ephemeral/containerd/runsc-logs.fifo"
diff --git a/metropolis/node/kubernetes/controller-manager.go b/metropolis/node/kubernetes/controller-manager.go
new file mode 100644
index 0000000..487511f
--- /dev/null
+++ b/metropolis/node/kubernetes/controller-manager.go
@@ -0,0 +1,93 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kubernetes
+
+import (
+ "context"
+ "encoding/pem"
+ "fmt"
+ "net"
+ "os/exec"
+
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/fileargs"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/pki"
+)
+
+type controllerManagerConfig struct {
+ clusterNet net.IPNet
+ // All PKI-related things are in DER
+ kubeConfig []byte
+ rootCA []byte
+ serviceAccountPrivKey []byte // In PKCS#8 form
+ serverCert []byte
+ serverKey []byte
+}
+
+func getPKIControllerManagerConfig(ctx context.Context, kpki *pki.KubernetesPKI) (*controllerManagerConfig, error) {
+ var config controllerManagerConfig
+ var err error
+ config.rootCA, _, err = kpki.Certificate(ctx, pki.IdCA)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get ID root CA: %w", err)
+ }
+ config.serverCert, config.serverKey, err = kpki.Certificate(ctx, pki.ControllerManager)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get controller-manager serving certificate: %w", err)
+ }
+ config.serviceAccountPrivKey, err = kpki.ServiceAccountKey(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get serviceaccount privkey: %w", err)
+ }
+ config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.ControllerManagerClient)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get controller-manager kubeconfig: %w", err)
+ }
+ return &config, nil
+}
+
+func runControllerManager(config controllerManagerConfig) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ args, err := fileargs.New()
+ if err != nil {
+ panic(err) // If this fails, something is very wrong. Just crash.
+ }
+ defer args.Close()
+
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-controller-manager",
+ args.FileOpt("--kubeconfig", "kubeconfig", config.kubeConfig),
+ args.FileOpt("--service-account-private-key-file", "service-account-privkey.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serviceAccountPrivKey})),
+ args.FileOpt("--root-ca-file", "root-ca.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.rootCA})),
+ "--port=0", // Kill insecure serving
+ "--use-service-account-credentials=true", // Enables things like PSP enforcement
+ fmt.Sprintf("--cluster-cidr=%v", config.clusterNet.String()),
+ args.FileOpt("--tls-cert-file", "server-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
+ args.FileOpt("--tls-private-key-file", "server-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
+ "--allocate-node-cidrs",
+ "--cluster-cidr="+config.clusterNet.String(),
+ )
+
+ if args.Error() != nil {
+ return fmt.Errorf("failed to use fileargs: %w", err)
+ }
+ return supervisor.RunCommand(ctx, cmd)
+ }
+}
diff --git a/metropolis/node/kubernetes/csi.go b/metropolis/node/kubernetes/csi.go
new file mode 100644
index 0000000..4b44a1a
--- /dev/null
+++ b/metropolis/node/kubernetes/csi.go
@@ -0,0 +1,246 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "os"
+ "path/filepath"
+ "regexp"
+
+ "github.com/container-storage-interface/spec/lib/go/csi"
+ "github.com/golang/protobuf/ptypes/wrappers"
+ "golang.org/x/sys/unix"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
+
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/fsquota"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/logtree"
+)
+
+// Derived from K8s spec for acceptable names, but shortened to 130 characters to avoid issues with
+// maximum path length. We don't provision longer names so this applies only if you manually create
+// a volume with a name of more than 130 characters.
+var acceptableNames = regexp.MustCompile("^[a-z][a-bz0-9-.]{0,128}[a-z0-9]$")
+
+const volumeDir = "volumes"
+
+type csiPluginServer struct {
+ KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
+ VolumesDirectory *localstorage.DataVolumesDirectory
+
+ logger logtree.LeveledLogger
+}
+
+func (s *csiPluginServer) Run(ctx context.Context) error {
+ s.logger = supervisor.Logger(ctx)
+
+ pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.Plugins.VFS.FullPath(), Net: "unix"})
+ if err != nil {
+ return fmt.Errorf("failed to listen on CSI socket: %w", err)
+ }
+ pluginListener.SetUnlinkOnClose(true)
+
+ pluginServer := grpc.NewServer()
+ csi.RegisterIdentityServer(pluginServer, s)
+ csi.RegisterNodeServer(pluginServer, s)
+ // Enable graceful shutdown since we don't have long-running RPCs and most of them shouldn't and can't be
+ // cancelled anyways.
+ if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
+ return err
+ }
+
+ registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
+ if err != nil {
+ return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
+ }
+ registrationListener.SetUnlinkOnClose(true)
+
+ registrationServer := grpc.NewServer()
+ pluginregistration.RegisterRegistrationServer(registrationServer, s)
+ if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
+ return err
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+}
+
+func (*csiPluginServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method NodeStageVolume not supported")
+}
+
+func (*csiPluginServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method NodeUnstageVolume not supported")
+}
+
+func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+ if !acceptableNames.MatchString(req.VolumeId) {
+ return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
+ }
+
+ // TODO(q3k): move this logic to localstorage?
+ volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
+
+ switch req.VolumeCapability.AccessMode.Mode {
+ case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
+ case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
+ default:
+ return nil, status.Error(codes.InvalidArgument, "unsupported access mode")
+ }
+ switch req.VolumeCapability.AccessType.(type) {
+ case *csi.VolumeCapability_Mount:
+ default:
+ return nil, status.Error(codes.InvalidArgument, "unsupported access type")
+ }
+
+ err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
+ switch {
+ case err == unix.ENOENT:
+ return nil, status.Error(codes.NotFound, "volume not found")
+ case err != nil:
+ return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
+ }
+
+ if req.Readonly {
+ err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
+ if err != nil {
+ _ = unix.Unmount(req.TargetPath, 0) // Best-effort
+ return nil, status.Errorf(codes.Unavailable, "failed to remount volume: %v", err)
+ }
+ }
+ return &csi.NodePublishVolumeResponse{}, nil
+}
+
+func (*csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
+ if err := unix.Unmount(req.TargetPath, 0); err != nil {
+ return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
+ }
+ return &csi.NodeUnpublishVolumeResponse{}, nil
+}
+
+func (*csiPluginServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
+ quota, err := fsquota.GetQuota(req.VolumePath)
+ if os.IsNotExist(err) {
+ return nil, status.Error(codes.NotFound, "volume does not exist at this path")
+ } else if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "failed to get quota: %v", err)
+ }
+
+ return &csi.NodeGetVolumeStatsResponse{
+ Usage: []*csi.VolumeUsage{
+ {
+ Total: int64(quota.Bytes),
+ Unit: csi.VolumeUsage_BYTES,
+ Used: int64(quota.BytesUsed),
+ Available: int64(quota.Bytes - quota.BytesUsed),
+ },
+ {
+ Total: int64(quota.Inodes),
+ Unit: csi.VolumeUsage_INODES,
+ Used: int64(quota.InodesUsed),
+ Available: int64(quota.Inodes - quota.InodesUsed),
+ },
+ },
+ }, nil
+}
+
+func (*csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
+ if req.CapacityRange.LimitBytes <= 0 {
+ return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
+ }
+ if err := fsquota.SetQuota(req.VolumePath, uint64(req.CapacityRange.LimitBytes), 0); err != nil {
+ return nil, status.Errorf(codes.Unavailable, "failed to update quota: %v", err)
+ }
+ return &csi.NodeExpandVolumeResponse{CapacityBytes: req.CapacityRange.LimitBytes}, nil
+}
+
+func rpcCapability(cap csi.NodeServiceCapability_RPC_Type) *csi.NodeServiceCapability {
+ return &csi.NodeServiceCapability{
+ Type: &csi.NodeServiceCapability_Rpc{
+ Rpc: &csi.NodeServiceCapability_RPC{Type: cap},
+ },
+ }
+}
+
+func (*csiPluginServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
+ return &csi.NodeGetCapabilitiesResponse{
+ Capabilities: []*csi.NodeServiceCapability{
+ rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
+ rpcCapability(csi.NodeServiceCapability_RPC_GET_VOLUME_STATS),
+ },
+ }, nil
+}
+
+func (*csiPluginServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
+ }
+ return &csi.NodeGetInfoResponse{
+ NodeId: hostname,
+ }, nil
+}
+
+// CSI Identity endpoints
+func (*csiPluginServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
+ return &csi.GetPluginInfoResponse{
+ Name: "com.smalltown.vfs",
+ VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
+ }, nil
+}
+
+func (*csiPluginServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
+ return &csi.GetPluginCapabilitiesResponse{
+ Capabilities: []*csi.PluginCapability{
+ {
+ Type: &csi.PluginCapability_VolumeExpansion_{
+ VolumeExpansion: &csi.PluginCapability_VolumeExpansion{
+ Type: csi.PluginCapability_VolumeExpansion_ONLINE,
+ },
+ },
+ },
+ },
+ }, nil
+}
+
+func (s *csiPluginServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
+ return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: true}}, nil
+}
+
+// Registration endpoints
+func (s *csiPluginServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
+ return &pluginregistration.PluginInfo{
+ Type: "CSIPlugin",
+ Name: "com.smalltown.vfs",
+ Endpoint: s.KubeletDirectory.Plugins.VFS.FullPath(),
+ SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
+ }, nil
+}
+
+func (s *csiPluginServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
+ if req.Error != "" {
+ s.logger.Warningf("Kubelet failed registering CSI plugin: %v", req.Error)
+ }
+ return &pluginregistration.RegistrationStatusResponse{}, nil
+}
diff --git a/metropolis/node/kubernetes/hyperkube/BUILD b/metropolis/node/kubernetes/hyperkube/BUILD
new file mode 100644
index 0000000..dced1c7
--- /dev/null
+++ b/metropolis/node/kubernetes/hyperkube/BUILD
@@ -0,0 +1,29 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+load("@//third_party/go:kubernetes_version_def.bzl", "version_x_defs")
+
+go_library(
+ name = "go_default_library",
+ srcs = ["main.go"],
+ importpath = "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/hyperkube",
+ visibility = ["//visibility:private"],
+ deps = [
+ "@com_github_spf13_cobra//:go_default_library",
+ "@com_github_spf13_pflag//:go_default_library",
+ "@io_k8s_component_base//cli/flag:go_default_library",
+ "@io_k8s_component_base//logs:go_default_library",
+ "@io_k8s_component_base//metrics/prometheus/restclient:go_default_library",
+ "@io_k8s_component_base//metrics/prometheus/version:go_default_library",
+ "@io_k8s_kubernetes//cmd/kube-apiserver/app:go_default_library",
+ "@io_k8s_kubernetes//cmd/kube-controller-manager/app:go_default_library",
+ "@io_k8s_kubernetes//cmd/kube-scheduler/app:go_default_library",
+ "@io_k8s_kubernetes//cmd/kubelet/app:go_default_library",
+ ],
+)
+
+go_binary(
+ name = "hyperkube",
+ embed = [":go_default_library"],
+ pure = "on",
+ visibility = ["//visibility:public"],
+ x_defs = version_x_defs(),
+)
diff --git a/metropolis/node/kubernetes/hyperkube/main.go b/metropolis/node/kubernetes/hyperkube/main.go
new file mode 100644
index 0000000..3b4ac08
--- /dev/null
+++ b/metropolis/node/kubernetes/hyperkube/main.go
@@ -0,0 +1,122 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/*
+Copyright 2014 The Kubernetes Authors.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Adapted from https://github.com/dims/hyperkube
+
+package main
+
+import (
+ goflag "flag"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/spf13/cobra"
+ "github.com/spf13/pflag"
+
+ cliflag "k8s.io/component-base/cli/flag"
+ "k8s.io/component-base/logs"
+ _ "k8s.io/component-base/metrics/prometheus/restclient" // for client metric registration
+ _ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
+ kubeapiserver "k8s.io/kubernetes/cmd/kube-apiserver/app"
+ kubecontrollermanager "k8s.io/kubernetes/cmd/kube-controller-manager/app"
+ kubescheduler "k8s.io/kubernetes/cmd/kube-scheduler/app"
+ kubelet "k8s.io/kubernetes/cmd/kubelet/app"
+)
+
+func main() {
+ rand.Seed(time.Now().UnixNano())
+
+ hyperkubeCommand, allCommandFns := NewHyperKubeCommand()
+
+ // TODO: once we switch everything over to Cobra commands, we can go back to calling
+ // cliflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
+ // normalize func and add the go flag set by hand.
+ pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
+ pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
+ // cliflag.InitFlags()
+ logs.InitLogs()
+ defer logs.FlushLogs()
+
+ basename := filepath.Base(os.Args[0])
+ if err := commandFor(basename, hyperkubeCommand, allCommandFns).Execute(); err != nil {
+ os.Exit(1)
+ }
+}
+
+func commandFor(basename string, defaultCommand *cobra.Command, commands []func() *cobra.Command) *cobra.Command {
+ for _, commandFn := range commands {
+ command := commandFn()
+ if command.Name() == basename {
+ return command
+ }
+ for _, alias := range command.Aliases {
+ if alias == basename {
+ return command
+ }
+ }
+ }
+
+ return defaultCommand
+}
+
+// NewHyperKubeCommand is the entry point for hyperkube
+func NewHyperKubeCommand() (*cobra.Command, []func() *cobra.Command) {
+ // these have to be functions since the command is polymorphic. Cobra wants you to be top level
+ // command to get executed
+ apiserver := func() *cobra.Command { return kubeapiserver.NewAPIServerCommand() }
+ controller := func() *cobra.Command { return kubecontrollermanager.NewControllerManagerCommand() }
+ scheduler := func() *cobra.Command { return kubescheduler.NewSchedulerCommand() }
+ kubelet := func() *cobra.Command { return kubelet.NewKubeletCommand() }
+
+ commandFns := []func() *cobra.Command{
+ apiserver,
+ controller,
+ scheduler,
+ kubelet,
+ }
+
+ cmd := &cobra.Command{
+ Use: "kube",
+ Short: "Combines all Kubernetes components in a single binary",
+ Run: func(cmd *cobra.Command, args []string) {
+ if len(args) != 0 {
+ cmd.Help()
+ os.Exit(1)
+ }
+ },
+ }
+
+ for i := range commandFns {
+ cmd.AddCommand(commandFns[i]())
+ }
+
+ return cmd, commandFns
+}
diff --git a/metropolis/node/kubernetes/kubelet.go b/metropolis/node/kubernetes/kubelet.go
new file mode 100644
index 0000000..e9c6ce5
--- /dev/null
+++ b/metropolis/node/kubernetes/kubelet.go
@@ -0,0 +1,145 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kubernetes
+
+import (
+ "context"
+ "encoding/json"
+ "encoding/pem"
+ "fmt"
+ "io"
+ "net"
+ "os/exec"
+
+ v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ kubeletconfig "k8s.io/kubelet/config/v1beta1"
+
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/fileargs"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage/declarative"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/pki"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/reconciler"
+)
+
+type kubeletService struct {
+ NodeName string
+ ClusterDNS []net.IP
+ KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
+ EphemeralDirectory *localstorage.EphemeralDirectory
+ Output io.Writer
+ KPKI *pki.KubernetesPKI
+}
+
+func (s *kubeletService) createCertificates(ctx context.Context) error {
+ identity := fmt.Sprintf("system:node:%s", s.NodeName)
+
+ ca := s.KPKI.Certificates[pki.IdCA]
+ cacert, _, err := ca.Ensure(ctx, s.KPKI.KV)
+ if err != nil {
+ return fmt.Errorf("could not ensure ca certificate: %w", err)
+ }
+
+ kubeconfig, err := pki.New(ca, "", pki.Client(identity, []string{"system:nodes"})).Kubeconfig(ctx, s.KPKI.KV)
+ if err != nil {
+ return fmt.Errorf("could not create volatile kubelet client cert: %w", err)
+ }
+
+ cert, key, err := pki.New(ca, "", pki.Server([]string{s.NodeName}, nil)).Ensure(ctx, s.KPKI.KV)
+ if err != nil {
+ return fmt.Errorf("could not create volatile kubelet server cert: %w", err)
+ }
+
+ // TODO(q3k): this should probably become its own function //metropolis/node/kubernetes/pki.
+ for _, el := range []struct {
+ target declarative.FilePlacement
+ data []byte
+ }{
+ {s.KubeletDirectory.Kubeconfig, kubeconfig},
+ {s.KubeletDirectory.PKI.CACertificate, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cacert})},
+ {s.KubeletDirectory.PKI.Certificate, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})},
+ {s.KubeletDirectory.PKI.Key, pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key})},
+ } {
+ if err := el.target.Write(el.data, 0400); err != nil {
+ return fmt.Errorf("could not write %v: %w", el.target, err)
+ }
+ }
+
+ return nil
+}
+
+func (s *kubeletService) configure() *kubeletconfig.KubeletConfiguration {
+ var clusterDNS []string
+ for _, dnsIP := range s.ClusterDNS {
+ clusterDNS = append(clusterDNS, dnsIP.String())
+ }
+
+ return &kubeletconfig.KubeletConfiguration{
+ TypeMeta: v1.TypeMeta{
+ Kind: "KubeletConfiguration",
+ APIVersion: kubeletconfig.GroupName + "/v1beta1",
+ },
+ TLSCertFile: s.KubeletDirectory.PKI.Certificate.FullPath(),
+ TLSPrivateKeyFile: s.KubeletDirectory.PKI.Key.FullPath(),
+ TLSMinVersion: "VersionTLS13",
+ ClusterDNS: clusterDNS,
+ Authentication: kubeletconfig.KubeletAuthentication{
+ X509: kubeletconfig.KubeletX509Authentication{
+ ClientCAFile: s.KubeletDirectory.PKI.CACertificate.FullPath(),
+ },
+ },
+ // TODO(q3k): move reconciler.False to a generic package, fix the following references.
+ ClusterDomain: "cluster.local", // cluster.local is hardcoded in the certificate too currently
+ EnableControllerAttachDetach: reconciler.False(),
+ HairpinMode: "none",
+ MakeIPTablesUtilChains: reconciler.False(), // We don't have iptables
+ FailSwapOn: reconciler.False(), // Our kernel doesn't have swap enabled which breaks Kubelet's detection
+ KubeReserved: map[string]string{
+ "cpu": "200m",
+ "memory": "300Mi",
+ },
+
+ // We're not going to use this, but let's make it point to a known-empty directory in case anybody manages to
+ // trigger it.
+ VolumePluginDir: s.EphemeralDirectory.FlexvolumePlugins.FullPath(),
+ }
+}
+
+func (s *kubeletService) Run(ctx context.Context) error {
+ if err := s.createCertificates(ctx); err != nil {
+ return fmt.Errorf("when creating certificates: %w", err)
+ }
+
+ configRaw, err := json.Marshal(s.configure())
+ if err != nil {
+ return fmt.Errorf("when marshaling kubelet configuration: %w", err)
+ }
+
+ fargs, err := fileargs.New()
+ if err != nil {
+ return err
+ }
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kubelet",
+ fargs.FileOpt("--config", "config.json", configRaw),
+ "--container-runtime=remote",
+ fmt.Sprintf("--container-runtime-endpoint=unix://%s", s.EphemeralDirectory.Containerd.ClientSocket.FullPath()),
+ fmt.Sprintf("--kubeconfig=%s", s.KubeletDirectory.Kubeconfig.FullPath()),
+ fmt.Sprintf("--root-dir=%s", s.KubeletDirectory.FullPath()),
+ )
+ cmd.Env = []string{"PATH=/kubernetes/bin"}
+ return supervisor.RunCommand(ctx, cmd)
+}
diff --git a/metropolis/node/kubernetes/nfproxy/BUILD.bazel b/metropolis/node/kubernetes/nfproxy/BUILD.bazel
new file mode 100644
index 0000000..29124a6
--- /dev/null
+++ b/metropolis/node/kubernetes/nfproxy/BUILD.bazel
@@ -0,0 +1,22 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = ["nfproxy.go"],
+ importpath = "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/nfproxy",
+ visibility = ["//metropolis/node/kubernetes:__subpackages__"],
+ deps = [
+ "//metropolis/node/common/supervisor:go_default_library",
+ "@com_github_sbezverk_nfproxy//pkg/controller:go_default_library",
+ "@com_github_sbezverk_nfproxy//pkg/nftables:go_default_library",
+ "@com_github_sbezverk_nfproxy//pkg/proxy:go_default_library",
+ "@io_k8s_api//core/v1:go_default_library",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
+ "@io_k8s_apimachinery//pkg/labels:go_default_library",
+ "@io_k8s_apimachinery//pkg/selection:go_default_library",
+ "@io_k8s_client_go//informers:go_default_library",
+ "@io_k8s_client_go//kubernetes:go_default_library",
+ "@io_k8s_client_go//kubernetes/scheme:go_default_library",
+ "@io_k8s_client_go//tools/record:go_default_library",
+ ],
+)
diff --git a/metropolis/node/kubernetes/nfproxy/nfproxy.go b/metropolis/node/kubernetes/nfproxy/nfproxy.go
new file mode 100644
index 0000000..5fc9a11
--- /dev/null
+++ b/metropolis/node/kubernetes/nfproxy/nfproxy.go
@@ -0,0 +1,104 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package nfproxy is a Kubernetes Service IP proxy based exclusively on the Linux nftables interface.
+// It uses netfilter's NAT capabilities to accept traffic on service IPs and DNAT it to the respective endpoint.
+package nfproxy
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net"
+ "os"
+ "time"
+
+ "github.com/sbezverk/nfproxy/pkg/controller"
+ "github.com/sbezverk/nfproxy/pkg/nftables"
+ "github.com/sbezverk/nfproxy/pkg/proxy"
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/labels"
+ "k8s.io/apimachinery/pkg/selection"
+ kubeinformers "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ "k8s.io/client-go/tools/record"
+
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+)
+
+type Service struct {
+ // Traffic in ClusterCIDR is assumed to be originated inside the cluster and will not be SNATed
+ ClusterCIDR net.IPNet
+ // A Kubernetes ClientSet with read access to endpoints and services
+ ClientSet kubernetes.Interface
+}
+
+func (s *Service) Run(ctx context.Context) error {
+ var ipv4ClusterCIDR string
+ var ipv6ClusterCIDR string
+ if s.ClusterCIDR.IP.To4() == nil && s.ClusterCIDR.IP.To16() != nil {
+ ipv6ClusterCIDR = s.ClusterCIDR.String()
+ } else if s.ClusterCIDR.IP.To4() != nil {
+ ipv4ClusterCIDR = s.ClusterCIDR.String()
+ } else {
+ return errors.New("invalid ClusterCIDR")
+ }
+ nfti, err := nftables.InitNFTables(ipv4ClusterCIDR, ipv6ClusterCIDR)
+ if err != nil {
+ return fmt.Errorf("failed to initialize nftables with error: %w", err)
+ }
+
+ // Create event recorder to report events into K8s
+ hostname, err := os.Hostname()
+ if err != nil {
+ return fmt.Errorf("failed to get local host name with error: %w", err)
+ }
+ eventBroadcaster := record.NewBroadcaster()
+ recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "nfproxy", Host: hostname})
+
+ // Create new proxy controller with endpoint slices enabled
+ // https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/
+ nfproxy := proxy.NewProxy(nfti, hostname, recorder, true)
+
+ // Create special informer which doesn't track headless services
+ noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
+ if err != nil {
+ return fmt.Errorf("failed to create Requirement for noHeadlessEndpoints: %w", err)
+ }
+ labelSelector := labels.NewSelector()
+ labelSelector = labelSelector.Add(*noHeadlessEndpoints)
+
+ kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(s.ClientSet, time.Minute*5,
+ kubeinformers.WithTweakListOptions(func(options *metav1.ListOptions) {
+ options.LabelSelector = labelSelector.String()
+ }))
+
+ svcController := controller.NewServiceController(nfproxy, s.ClientSet, kubeInformerFactory.Core().V1().Services())
+ ep := controller.NewEndpointSliceController(nfproxy, s.ClientSet, kubeInformerFactory.Discovery().V1beta1().EndpointSlices())
+ kubeInformerFactory.Start(ctx.Done())
+
+ if err = svcController.Start(ctx.Done()); err != nil {
+ return fmt.Errorf("error running Service controller: %w", err)
+ }
+ if err = ep.Start(ctx.Done()); err != nil {
+ return fmt.Errorf("error running endpoint controller: %w", err)
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+}
diff --git a/metropolis/node/kubernetes/pki/BUILD.bazel b/metropolis/node/kubernetes/pki/BUILD.bazel
new file mode 100644
index 0000000..f82603d
--- /dev/null
+++ b/metropolis/node/kubernetes/pki/BUILD.bazel
@@ -0,0 +1,19 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "ca.go",
+ "certificate.go",
+ "kubernetes.go",
+ ],
+ importpath = "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/pki",
+ visibility = ["//metropolis/node:__subpackages__"],
+ deps = [
+ "//metropolis/node:go_default_library",
+ "//metropolis/node/core/logtree:go_default_library",
+ "@io_etcd_go_etcd//clientv3:go_default_library",
+ "@io_k8s_client_go//tools/clientcmd:go_default_library",
+ "@io_k8s_client_go//tools/clientcmd/api:go_default_library",
+ ],
+)
diff --git a/metropolis/node/kubernetes/pki/ca.go b/metropolis/node/kubernetes/pki/ca.go
new file mode 100644
index 0000000..64453cd
--- /dev/null
+++ b/metropolis/node/kubernetes/pki/ca.go
@@ -0,0 +1,151 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pki
+
+import (
+ "context"
+ "crypto"
+ "crypto/ed25519"
+ "crypto/rand"
+ "crypto/sha1"
+ "crypto/x509"
+ "crypto/x509/pkix"
+ "encoding/asn1"
+ "fmt"
+ "math/big"
+ "time"
+
+ "go.etcd.io/etcd/clientv3"
+)
+
+// Issuer is a CA that can issue certificates. Two issuers are currently implemented:
+// - SelfSigned, which will generated a certificate signed by its corresponding private key.
+// - Certificate, which will use another existing Certificate as a CA.
+type Issuer interface {
+ // CACertificate returns the DER-encoded x509 certificate of the CA that will sign certificates when Issue is
+ // called, or nil if this is self-signing issuer.
+ CACertificate(ctx context.Context, kv clientv3.KV) ([]byte, error)
+ // Issue will generate a key and certificate signed by the Issuer. The returned certificate is x509 DER-encoded,
+ // while the key is a bare ed25519 key.
+ Issue(ctx context.Context, template x509.Certificate, kv clientv3.KV) (cert, key []byte, err error)
+}
+
+var (
+ // From RFC 5280 Section 4.1.2.5
+ unknownNotAfter = time.Unix(253402300799, 0)
+)
+
+// Workaround for https://github.com/golang/go/issues/26676 in Go's crypto/x509. Specifically Go
+// violates Section 4.2.1.2 of RFC 5280 without this.
+// Fixed for 1.15 in https://go-review.googlesource.com/c/go/+/227098/.
+//
+// Taken from https://github.com/FiloSottile/mkcert/blob/master/cert.go#L295 written by one of Go's
+// crypto engineers
+func calculateSKID(pubKey crypto.PublicKey) ([]byte, error) {
+ spkiASN1, err := x509.MarshalPKIXPublicKey(pubKey)
+ if err != nil {
+ return nil, err
+ }
+
+ var spki struct {
+ Algorithm pkix.AlgorithmIdentifier
+ SubjectPublicKey asn1.BitString
+ }
+ _, err = asn1.Unmarshal(spkiASN1, &spki)
+ if err != nil {
+ return nil, err
+ }
+ skid := sha1.Sum(spki.SubjectPublicKey.Bytes)
+ return skid[:], nil
+}
+
+// issueCertificate is a generic low level certificate-and-key issuance function. If ca or cakey is null, the
+// certificate will be self-signed. The returned certificate is DER-encoded, while the returned key is internal.
+func issueCertificate(template x509.Certificate, ca *x509.Certificate, caKey interface{}) (cert, key []byte, err error) {
+ pubKey, privKey, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ panic(err)
+ }
+
+ serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 127)
+ serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
+ if err != nil {
+ err = fmt.Errorf("failed to generate serial number: %w", err)
+ return
+ }
+
+ skid, err := calculateSKID(pubKey)
+ if err != nil {
+ return []byte{}, privKey, err
+ }
+
+ template.SerialNumber = serialNumber
+ template.NotBefore = time.Now()
+ template.NotAfter = unknownNotAfter
+ template.BasicConstraintsValid = true
+ template.SubjectKeyId = skid
+
+ // Set the AuthorityKeyID to the SKID of the signing certificate (or self, if self-signing).
+ if ca != nil && caKey != nil {
+ template.AuthorityKeyId = ca.AuthorityKeyId
+ } else {
+ template.AuthorityKeyId = template.SubjectKeyId
+ }
+
+ if ca == nil || caKey == nil {
+ ca = &template
+ caKey = privKey
+ }
+
+ caCertRaw, err := x509.CreateCertificate(rand.Reader, &template, ca, pubKey, caKey)
+ return caCertRaw, privKey, err
+}
+
+type selfSigned struct{}
+
+func (s *selfSigned) Issue(ctx context.Context, template x509.Certificate, kv clientv3.KV) (cert, key []byte, err error) {
+ return issueCertificate(template, nil, nil)
+}
+
+func (s *selfSigned) CACertificate(ctx context.Context, kv clientv3.KV) ([]byte, error) {
+ return nil, nil
+}
+
+var (
+ // SelfSigned is an Issuer that generates self-signed certificates.
+ SelfSigned = &selfSigned{}
+)
+
+func (c *Certificate) Issue(ctx context.Context, template x509.Certificate, kv clientv3.KV) (cert, key []byte, err error) {
+ caCert, caKey, err := c.ensure(ctx, kv)
+ if err != nil {
+ return nil, nil, fmt.Errorf("could not ensure CA certificate %q exists: %w", c.name, err)
+ }
+
+ ca, err := x509.ParseCertificate(caCert)
+ if err != nil {
+ return nil, nil, fmt.Errorf("could not parse CA certificate: %w", err)
+ }
+ // Ensure only one level of CAs exist, and that they are created explicitly.
+ template.IsCA = false
+ return issueCertificate(template, ca, ed25519.PrivateKey(caKey))
+}
+
+func (c *Certificate) CACertificate(ctx context.Context, kv clientv3.KV) ([]byte, error) {
+ cert, _, err := c.ensure(ctx, kv)
+ return cert, err
+}
diff --git a/metropolis/node/kubernetes/pki/certificate.go b/metropolis/node/kubernetes/pki/certificate.go
new file mode 100644
index 0000000..6bd50f9
--- /dev/null
+++ b/metropolis/node/kubernetes/pki/certificate.go
@@ -0,0 +1,192 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pki
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/x509"
+ "crypto/x509/pkix"
+ "fmt"
+ "net"
+
+ "go.etcd.io/etcd/clientv3"
+)
+
+// Certificate is the promise of a Certificate being available to the caller. In this case, Certificate refers to a
+// pair of x509 certificate and corresponding private key.
+// Certificates can be stored in etcd, and their issuers might also be store on etcd. As such, this type's methods
+// contain references to an etcd KV client.
+// This Certificate type is agnostic to usage, but mostly geared towards Kubernetes certificates.
+type Certificate struct {
+ // issuer is the Issuer that will generate this certificate if one doesn't yet exist or etcd, or the requested
+ // certificate is volatile (not to be stored on etcd).
+ issuer Issuer
+ // name is a unique key for storing the certificate in etcd. If empty, certificate is 'volatile', will not be stored
+ // on etcd, and every .Ensure() call will generate a new pair.
+ name string
+ // template is an x509 certificate definition that will be used to generate the certificate when issuing it.
+ template x509.Certificate
+}
+
+const (
+ // etcdPrefix is where all the PKI data is stored in etcd.
+ etcdPrefix = "/kube-pki/"
+)
+
+func etcdPath(f string, args ...interface{}) string {
+ return etcdPrefix + fmt.Sprintf(f, args...)
+}
+
+// New creates a new Certificate, or to be more precise, a promise that a certificate will exist once Ensure is called.
+// Issuer must be a valid certificate issuer (SelfSigned or another Certificate). Name must be unique among all
+// certificates, or empty (which will cause the certificate to be volatile, ie. not stored in etcd).
+func New(issuer Issuer, name string, template x509.Certificate) *Certificate {
+ return &Certificate{
+ issuer: issuer,
+ name: name,
+ template: template,
+ }
+}
+
+// Client makes a Kubernetes PKI-compatible client certificate template.
+// Directly derived from Kubernetes PKI requirements documented at
+// https://kubernetes.io/docs/setup/best-practices/certificates/#configure-certificates-manually
+func Client(identity string, groups []string) x509.Certificate {
+ return x509.Certificate{
+ Subject: pkix.Name{
+ CommonName: identity,
+ Organization: groups,
+ },
+ KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment,
+ ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
+ }
+}
+
+// Server makes a Kubernetes PKI-compatible server certificate template.
+func Server(dnsNames []string, ips []net.IP) x509.Certificate {
+ return x509.Certificate{
+ Subject: pkix.Name{},
+ KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment,
+ ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
+ DNSNames: dnsNames,
+ IPAddresses: ips,
+ }
+}
+
+// CA makes a Certificate that can sign other certificates.
+func CA(cn string) x509.Certificate {
+ return x509.Certificate{
+ Subject: pkix.Name{
+ CommonName: cn,
+ },
+ IsCA: true,
+ KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature,
+ ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageOCSPSigning},
+ }
+}
+
+func (c *Certificate) etcdPaths() (cert, key string) {
+ return etcdPath("%s-cert.der", c.name), etcdPath("%s-key.der", c.name)
+}
+
+// ensure returns a DER-encoded x509 certificate and internally encoded bare ed25519 key for a given Certificate,
+// in memory (if volatile), loading it from etcd, or creating and saving it on etcd if needed.
+// This function is safe to call in parallel from multiple etcd clients (including across machines), but it will error
+// in case a concurrent certificate generation happens. These errors are, however, safe to retry - as long as all the
+// certificate creators (ie., Smalltown nodes) run the same version of this code.
+// TODO(q3k): in the future, this should be handled better - especially as we introduce new certificates, or worse,
+// change the issuance chain. As a stopgap measure, an explicit per-certificate or even global lock can be implemented.
+// And, even before that, we can handle concurrency errors in a smarter way.
+func (c *Certificate) ensure(ctx context.Context, kv clientv3.KV) (cert, key []byte, err error) {
+ if c.name == "" {
+ // Volatile certificate - generate.
+ // TODO(q3k): cache internally?
+ cert, key, err = c.issuer.Issue(ctx, c.template, kv)
+ if err != nil {
+ err = fmt.Errorf("failed to issue: %w", err)
+ return
+ }
+ return
+ }
+
+ certPath, keyPath := c.etcdPaths()
+
+ // Try loading certificate and key from etcd.
+ certRes, err := kv.Get(ctx, certPath)
+ if err != nil {
+ err = fmt.Errorf("failed to get certificate from etcd: %w", err)
+ return
+ }
+ keyRes, err := kv.Get(ctx, keyPath)
+ if err != nil {
+ err = fmt.Errorf("failed to get key from etcd: %w", err)
+ return
+ }
+
+ if len(certRes.Kvs) == 1 && len(keyRes.Kvs) == 1 {
+ // Certificate and key exists in etcd, return that.
+ cert = certRes.Kvs[0].Value
+ key = keyRes.Kvs[0].Value
+
+ err = nil
+ // TODO(q3k): check for expiration
+ return
+ }
+
+ // No certificate found - issue one.
+ cert, key, err = c.issuer.Issue(ctx, c.template, kv)
+ if err != nil {
+ err = fmt.Errorf("failed to issue: %w", err)
+ return
+ }
+
+ // Save to etcd in transaction. This ensures that no partial writes happen, and that we haven't been raced to the
+ // save.
+ res, err := kv.Txn(ctx).
+ If(
+ clientv3.Compare(clientv3.CreateRevision(certPath), "=", 0),
+ clientv3.Compare(clientv3.CreateRevision(keyPath), "=", 0),
+ ).
+ Then(
+ clientv3.OpPut(certPath, string(cert)),
+ clientv3.OpPut(keyPath, string(key)),
+ ).Commit()
+ if err != nil {
+ err = fmt.Errorf("failed to write newly issued certificate: %w", err)
+ } else if !res.Succeeded {
+ err = fmt.Errorf("certificate issuance transaction failed: concurrent write")
+ }
+
+ return
+}
+
+// Ensure returns an x509 DER-encoded (but not PEM-encoded) certificate and key for a given Certificate.
+// If the certificate is volatile, each call to Ensure will cause a new certificate to be generated.
+// Otherwise, it will be retrieved from etcd, or generated and stored there if needed.
+func (c *Certificate) Ensure(ctx context.Context, kv clientv3.KV) (cert, key []byte, err error) {
+ cert, key, err = c.ensure(ctx, kv)
+ if err != nil {
+ return nil, nil, err
+ }
+ key, err = x509.MarshalPKCS8PrivateKey(ed25519.PrivateKey(key))
+ if err != nil {
+ err = fmt.Errorf("could not marshal private key (data corruption?): %w", err)
+ return
+ }
+ return cert, key, err
+}
diff --git a/metropolis/node/kubernetes/pki/kubernetes.go b/metropolis/node/kubernetes/pki/kubernetes.go
new file mode 100644
index 0000000..c4827a9
--- /dev/null
+++ b/metropolis/node/kubernetes/pki/kubernetes.go
@@ -0,0 +1,228 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pki
+
+import (
+ "context"
+ "crypto/rand"
+ "crypto/rsa"
+ "crypto/x509"
+ "encoding/pem"
+ "fmt"
+ "net"
+
+ "go.etcd.io/etcd/clientv3"
+ "k8s.io/client-go/tools/clientcmd"
+ configapi "k8s.io/client-go/tools/clientcmd/api"
+
+ common "git.monogon.dev/source/nexantic.git/metropolis/node"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/logtree"
+)
+
+// KubeCertificateName is an enum-like unique name of a static Kubernetes certificate. The value of the name is used
+// as the unique part of an etcd path where the certificate and key are stored.
+type KubeCertificateName string
+
+const (
+ // The main Kubernetes CA, used to authenticate API consumers, and servers.
+ IdCA KubeCertificateName = "id-ca"
+
+ // Kubernetes apiserver server certificate.
+ APIServer KubeCertificateName = "apiserver"
+
+ // Kubelet client certificate, used to authenticate to the apiserver.
+ KubeletClient KubeCertificateName = "kubelet-client"
+
+ // Kubernetes Controller manager client certificate, used to authenticate to the apiserver.
+ ControllerManagerClient KubeCertificateName = "controller-manager-client"
+ // Kubernetes Controller manager server certificate, used to run its HTTP server.
+ ControllerManager KubeCertificateName = "controller-manager"
+
+ // Kubernetes Scheduler client certificate, used to authenticate to the apiserver.
+ SchedulerClient KubeCertificateName = "scheduler-client"
+ // Kubernetes scheduler server certificate, used to run its HTTP server.
+ Scheduler KubeCertificateName = "scheduler"
+
+ // Root-on-kube (system:masters) client certificate. Used to control the apiserver (and resources) by Smalltown
+ // internally.
+ Master KubeCertificateName = "master"
+
+ // OpenAPI Kubernetes Aggregation CA.
+ // See: https://kubernetes.io/docs/tasks/extend-kubernetes/configure-aggregation-layer/#ca-reusage-and-conflicts
+ AggregationCA KubeCertificateName = "aggregation-ca"
+ FrontProxyClient KubeCertificateName = "front-proxy-client"
+)
+
+const (
+ // serviceAccountKeyName is the etcd path part that is used to store the ServiceAccount authentication secret.
+ // This is not a certificate, just an RSA key.
+ serviceAccountKeyName = "service-account-privkey"
+)
+
+// KubernetesPKI manages all PKI resources required to run Kubernetes on Smalltown. It contains all static certificates,
+// which can be retrieved, or be used to generate Kubeconfigs from.
+type KubernetesPKI struct {
+ logger logtree.LeveledLogger
+ KV clientv3.KV
+ Certificates map[KubeCertificateName]*Certificate
+}
+
+func NewKubernetes(l logtree.LeveledLogger, kv clientv3.KV) *KubernetesPKI {
+ pki := KubernetesPKI{
+ logger: l,
+ KV: kv,
+ Certificates: make(map[KubeCertificateName]*Certificate),
+ }
+
+ make := func(i, name KubeCertificateName, template x509.Certificate) {
+ pki.Certificates[name] = New(pki.Certificates[i], string(name), template)
+ }
+
+ pki.Certificates[IdCA] = New(SelfSigned, string(IdCA), CA("Smalltown Kubernetes ID CA"))
+ make(IdCA, APIServer, Server(
+ []string{
+ "kubernetes",
+ "kubernetes.default",
+ "kubernetes.default.svc",
+ "kubernetes.default.svc.cluster",
+ "kubernetes.default.svc.cluster.local",
+ "localhost",
+ },
+ []net.IP{{10, 0, 255, 1}, {127, 0, 0, 1}}, // TODO(q3k): add service network internal apiserver address
+ ))
+ make(IdCA, KubeletClient, Client("smalltown:apiserver-kubelet-client", nil))
+ make(IdCA, ControllerManagerClient, Client("system:kube-controller-manager", nil))
+ make(IdCA, ControllerManager, Server([]string{"kube-controller-manager.local"}, nil))
+ make(IdCA, SchedulerClient, Client("system:kube-scheduler", nil))
+ make(IdCA, Scheduler, Server([]string{"kube-scheduler.local"}, nil))
+ make(IdCA, Master, Client("smalltown:master", []string{"system:masters"}))
+
+ pki.Certificates[AggregationCA] = New(SelfSigned, string(AggregationCA), CA("Smalltown OpenAPI Aggregation CA"))
+ make(AggregationCA, FrontProxyClient, Client("front-proxy-client", nil))
+
+ return &pki
+}
+
+// EnsureAll ensures that all static certificates (and the serviceaccount key) are present on etcd.
+func (k *KubernetesPKI) EnsureAll(ctx context.Context) error {
+ for n, v := range k.Certificates {
+ k.logger.Infof("Ensuring %s exists", string(n))
+ _, _, err := v.Ensure(ctx, k.KV)
+ if err != nil {
+ return fmt.Errorf("could not ensure certificate %q exists: %w", n, err)
+ }
+ }
+ _, err := k.ServiceAccountKey(ctx)
+ if err != nil {
+ return fmt.Errorf("could not ensure service account key exists: %w", err)
+ }
+ return nil
+}
+
+// Kubeconfig generates a kubeconfig blob for a given certificate name. The same lifetime semantics as in .Certificate
+// apply.
+func (k *KubernetesPKI) Kubeconfig(ctx context.Context, name KubeCertificateName) ([]byte, error) {
+ c, ok := k.Certificates[name]
+ if !ok {
+ return nil, fmt.Errorf("no certificate %q", name)
+ }
+ return c.Kubeconfig(ctx, k.KV)
+}
+
+// Certificate retrieves an x509 DER-encoded (but not PEM-wrapped) key and certificate for a given certificate name.
+// If the requested certificate is volatile, it will be created on demand. Otherwise it will be created on etcd (if not
+// present), and retrieved from there.
+func (k *KubernetesPKI) Certificate(ctx context.Context, name KubeCertificateName) (cert, key []byte, err error) {
+ c, ok := k.Certificates[name]
+ if !ok {
+ return nil, nil, fmt.Errorf("no certificate %q", name)
+ }
+ return c.Ensure(ctx, k.KV)
+}
+
+// Kubeconfig generates a kubeconfig blob for this certificate. The same lifetime semantics as in .Ensure apply.
+func (c *Certificate) Kubeconfig(ctx context.Context, kv clientv3.KV) ([]byte, error) {
+
+ cert, key, err := c.Ensure(ctx, kv)
+ if err != nil {
+ return nil, fmt.Errorf("could not ensure certificate exists: %w", err)
+ }
+
+ kubeconfig := configapi.NewConfig()
+
+ cluster := configapi.NewCluster()
+ cluster.Server = fmt.Sprintf("https://127.0.0.1:%v", common.KubernetesAPIPort)
+
+ ca, err := c.issuer.CACertificate(ctx, kv)
+ if err != nil {
+ return nil, fmt.Errorf("could not get CA certificate: %w", err)
+ }
+ if ca != nil {
+ cluster.CertificateAuthorityData = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: ca})
+ }
+ kubeconfig.Clusters["default"] = cluster
+
+ authInfo := configapi.NewAuthInfo()
+ authInfo.ClientCertificateData = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})
+ authInfo.ClientKeyData = pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key})
+ kubeconfig.AuthInfos["default"] = authInfo
+
+ ct := configapi.NewContext()
+ ct.Cluster = "default"
+ ct.AuthInfo = "default"
+ kubeconfig.Contexts["default"] = ct
+
+ kubeconfig.CurrentContext = "default"
+ return clientcmd.Write(*kubeconfig)
+}
+
+// ServiceAccountKey retrieves (and possible generates and stores on etcd) the Kubernetes service account key. The
+// returned data is ready to be used by Kubernetes components (in PKIX form).
+func (k *KubernetesPKI) ServiceAccountKey(ctx context.Context) ([]byte, error) {
+ // TODO(q3k): this should be abstracted away once we abstract away etcd access into a library with try-or-create
+ // semantics.
+
+ path := etcdPath("%s.der", serviceAccountKeyName)
+
+ // Try loading key from etcd.
+ keyRes, err := k.KV.Get(ctx, path)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get key from etcd: %w", err)
+ }
+
+ if len(keyRes.Kvs) == 1 {
+ // Certificate and key exists in etcd, return that.
+ return keyRes.Kvs[0].Value, nil
+ }
+
+ // No key found - generate one.
+ keyRaw, err := rsa.GenerateKey(rand.Reader, 2048)
+ if err != nil {
+ panic(err)
+ }
+ key, err := x509.MarshalPKCS8PrivateKey(keyRaw)
+ if err != nil {
+ panic(err) // Always a programmer error
+ }
+
+ // Save to etcd.
+ _, err = k.KV.Put(ctx, path, string(key))
+ if err != nil {
+ err = fmt.Errorf("failed to write newly generated key: %w", err)
+ }
+ return key, nil
+}
diff --git a/metropolis/node/kubernetes/provisioner.go b/metropolis/node/kubernetes/provisioner.go
new file mode 100644
index 0000000..b671125
--- /dev/null
+++ b/metropolis/node/kubernetes/provisioner.go
@@ -0,0 +1,368 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kubernetes
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+
+ v1 "k8s.io/api/core/v1"
+ storagev1 "k8s.io/api/storage/v1"
+ apierrs "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/informers"
+ coreinformers "k8s.io/client-go/informers/core/v1"
+ storageinformers "k8s.io/client-go/informers/storage/v1"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/kubernetes/scheme"
+ typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/tools/record"
+ ref "k8s.io/client-go/tools/reference"
+ "k8s.io/client-go/util/workqueue"
+
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/fsquota"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/logtree"
+)
+
+// ONCHANGE(//metropolis/node/kubernetes/reconciler:resources_csi.go): needs to match csiProvisionerServerName declared.
+const csiProvisionerServerName = "com.nexantic.smalltown.vfs"
+
+// csiProvisionerServer is responsible for the provisioning and deprovisioning of CSI-based container volumes. It runs on all
+// nodes and watches PVCs for ones assigned to the node it's running on and fulfills the provisioning request by
+// creating a directory, applying a quota and creating the corresponding PV. When the PV is released and its retention
+// policy is Delete, the directory and the PV resource are deleted.
+type csiProvisionerServer struct {
+ NodeName string
+ Kubernetes kubernetes.Interface
+ InformerFactory informers.SharedInformerFactory
+ VolumesDirectory *localstorage.DataVolumesDirectory
+
+ claimQueue workqueue.RateLimitingInterface
+ pvQueue workqueue.RateLimitingInterface
+ recorder record.EventRecorder
+ pvcInformer coreinformers.PersistentVolumeClaimInformer
+ pvInformer coreinformers.PersistentVolumeInformer
+ storageClassInformer storageinformers.StorageClassInformer
+ logger logtree.LeveledLogger
+}
+
+// runCSIProvisioner runs the main provisioning machinery. It consists of a bunch of informers which keep track of
+// the events happening on the Kubernetes control plane and informs us when something happens. If anything happens to
+// PVCs or PVs, we enqueue the identifier of that resource in a work queue. Queues are being worked on by only one
+// worker to limit load and avoid complicated locking infrastructure. Failed items are requeued.
+func (p *csiProvisionerServer) Run(ctx context.Context) error {
+ // The recorder is used to log Kubernetes events for successful or failed volume provisions. These events then
+ // show up in `kubectl describe pvc` and can be used by admins to debug issues with this provisioner.
+ eventBroadcaster := record.NewBroadcaster()
+ eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.Kubernetes.CoreV1().Events("")})
+ p.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerServerName, Host: p.NodeName})
+
+ p.pvInformer = p.InformerFactory.Core().V1().PersistentVolumes()
+ p.pvcInformer = p.InformerFactory.Core().V1().PersistentVolumeClaims()
+ p.storageClassInformer = p.InformerFactory.Storage().V1().StorageClasses()
+
+ p.claimQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+ p.pvQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+
+ p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: p.enqueueClaim,
+ UpdateFunc: func(old, new interface{}) {
+ p.enqueueClaim(new)
+ },
+ })
+ p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: p.enqueuePV,
+ UpdateFunc: func(old, new interface{}) {
+ p.enqueuePV(new)
+ },
+ })
+ p.logger = supervisor.Logger(ctx)
+
+ go p.pvcInformer.Informer().Run(ctx.Done())
+ go p.pvInformer.Informer().Run(ctx.Done())
+ go p.storageClassInformer.Informer().Run(ctx.Done())
+
+ // These will self-terminate once the queues are shut down
+ go p.processQueueItems(p.claimQueue, func(key string) error {
+ return p.processPVC(key)
+ })
+ go p.processQueueItems(p.pvQueue, func(key string) error {
+ return p.processPV(key)
+ })
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ p.claimQueue.ShutDown()
+ p.pvQueue.ShutDown()
+ return nil
+}
+
+// isOurPVC checks if the given PVC is is to be provisioned by this provisioner and has been scheduled onto this node
+func (p *csiProvisionerServer) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
+ if pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] != csiProvisionerServerName {
+ return false
+ }
+ if pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] != p.NodeName {
+ return false
+ }
+ return true
+}
+
+// isOurPV checks if the given PV has been provisioned by this provisioner and has been scheduled onto this node
+func (p *csiProvisionerServer) isOurPV(pv *v1.PersistentVolume) bool {
+ if pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] != csiProvisionerServerName {
+ return false
+ }
+ if pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] != p.NodeName {
+ return false
+ }
+ return true
+}
+
+// enqueueClaim adds an added/changed PVC to the work queue
+func (p *csiProvisionerServer) enqueueClaim(obj interface{}) {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ p.logger.Errorf("Not queuing PVC because key could not be derived: %v", err)
+ return
+ }
+ p.claimQueue.Add(key)
+}
+
+// enqueuePV adds an added/changed PV to the work queue
+func (p *csiProvisionerServer) enqueuePV(obj interface{}) {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ p.logger.Errorf("Not queuing PV because key could not be derived: %v", err)
+ return
+ }
+ p.pvQueue.Add(key)
+}
+
+// processQueueItems gets items from the given work queue and calls the process function for each of them. It self-
+// terminates once the queue is shut down.
+func (p *csiProvisionerServer) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
+ for {
+ obj, shutdown := queue.Get()
+ if shutdown {
+ return
+ }
+
+ func(obj interface{}) {
+ defer queue.Done(obj)
+ key, ok := obj.(string)
+ if !ok {
+ queue.Forget(obj)
+ p.logger.Errorf("Expected string in workqueue, got %+v", obj)
+ return
+ }
+
+ if err := process(key); err != nil {
+ p.logger.Warningf("Failed processing item %q, requeueing (numrequeues: %d): %v", key, queue.NumRequeues(obj), err)
+ queue.AddRateLimited(obj)
+ }
+
+ queue.Forget(obj)
+ }(obj)
+ }
+}
+
+// volumePath gets the path where the volume is stored.
+func (p *csiProvisionerServer) volumePath(volumeID string) string {
+ return filepath.Join(p.VolumesDirectory.FullPath(), volumeID)
+}
+
+// processPVC looks at a single PVC item from the queue, determines if it needs to be provisioned and logs the
+// provisioning result to the recorder
+func (p *csiProvisionerServer) processPVC(key string) error {
+ namespace, name, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ return fmt.Errorf("invalid resource key: %s", key)
+ }
+ pvc, err := p.pvcInformer.Lister().PersistentVolumeClaims(namespace).Get(name)
+ if apierrs.IsNotFound(err) {
+ return nil // nothing to do, no error
+ } else if err != nil {
+ return fmt.Errorf("failed to get PVC for processing: %w", err)
+ }
+
+ if !p.isOurPVC(pvc) {
+ return nil
+ }
+
+ if pvc.Status.Phase != "Pending" {
+ // If the PVC is not pending, we don't need to provision anything
+ return nil
+ }
+
+ storageClass, err := p.storageClassInformer.Lister().Get(*pvc.Spec.StorageClassName)
+ if err != nil {
+ return fmt.Errorf("")
+ }
+
+ if storageClass.Provisioner != csiProvisionerServerName {
+ // We're not responsible for this PVC. Can only happen if controller-manager makes a mistake
+ // setting the annotations, but we're bailing here anyways for safety.
+ return nil
+ }
+
+ err = p.provisionPVC(pvc, storageClass)
+
+ if err != nil {
+ p.recorder.Eventf(pvc, v1.EventTypeWarning, "ProvisioningFailed", "Failed to provision PV: %v", err)
+ return err
+ }
+ p.recorder.Eventf(pvc, v1.EventTypeNormal, "Provisioned", "Successfully provisioned PV")
+
+ return nil
+}
+
+// provisionPVC creates the directory where the volume lives, sets a quota for the requested amount of storage and
+// creates the PV object representing this new volume
+func (p *csiProvisionerServer) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
+ claimRef, err := ref.GetReference(scheme.Scheme, pvc)
+ if err != nil {
+ return fmt.Errorf("failed to get reference to PVC: %w", err)
+ }
+
+ storageReq := pvc.Spec.Resources.Requests[v1.ResourceStorage]
+ if storageReq.IsZero() {
+ return fmt.Errorf("PVC is not requesting any storage, this is not supported")
+ }
+ capacity, ok := storageReq.AsInt64()
+ if !ok {
+ return fmt.Errorf("PVC requesting more than 2^63 bytes of storage, this is not supported")
+ }
+
+ if *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock {
+ return fmt.Errorf("Block PVCs are not supported by Smalltown")
+ }
+
+ volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
+ volumePath := p.volumePath(volumeID)
+
+ p.logger.Infof("Creating local PV %s", volumeID)
+ if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
+ return fmt.Errorf("failed to create volume directory: %w", err)
+ }
+ files, err := ioutil.ReadDir(volumePath)
+ if err != nil {
+ return fmt.Errorf("failed to list files in newly-created volume: %w", err)
+ }
+ if len(files) > 0 {
+ return errors.New("newly-created volume already contains data, bailing")
+ }
+ if err := fsquota.SetQuota(volumePath, uint64(capacity), 100000); err != nil {
+ return fmt.Errorf("failed to update quota: %v", err)
+ }
+
+ vol := &v1.PersistentVolume{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: volumeID,
+ Annotations: map[string]string{
+ "pv.kubernetes.io/provisioned-by": csiProvisionerServerName},
+ },
+ Spec: v1.PersistentVolumeSpec{
+ AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
+ Capacity: v1.ResourceList{
+ v1.ResourceStorage: storageReq, // We're always giving the exact amount
+ },
+ PersistentVolumeSource: v1.PersistentVolumeSource{
+ CSI: &v1.CSIPersistentVolumeSource{
+ Driver: csiProvisionerServerName,
+ VolumeHandle: volumeID,
+ },
+ },
+ ClaimRef: claimRef,
+ NodeAffinity: &v1.VolumeNodeAffinity{
+ Required: &v1.NodeSelector{
+ NodeSelectorTerms: []v1.NodeSelectorTerm{
+ {
+ MatchExpressions: []v1.NodeSelectorRequirement{
+ {
+ Key: "kubernetes.io/hostname",
+ Operator: v1.NodeSelectorOpIn,
+ Values: []string{p.NodeName},
+ },
+ },
+ },
+ },
+ },
+ },
+ StorageClassName: *pvc.Spec.StorageClassName,
+ PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
+ },
+ }
+
+ _, err = p.Kubernetes.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{})
+ if err != nil && !apierrs.IsAlreadyExists(err) {
+ return fmt.Errorf("failed to create PV object: %w", err)
+ }
+ return nil
+}
+
+// processPV looks at a single PV item from the queue and checks if it has been released and needs to be deleted. If yes
+// it deletes the associated quota, directory and the PV object and logs the result to the recorder.
+func (p *csiProvisionerServer) processPV(key string) error {
+ _, name, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ return fmt.Errorf("invalid resource key: %s", key)
+ }
+ pv, err := p.pvInformer.Lister().Get(name)
+ if apierrs.IsNotFound(err) {
+ return nil // nothing to do, no error
+ } else if err != nil {
+ return fmt.Errorf("failed to get PV for processing: %w", err)
+ }
+
+ if !p.isOurPV(pv) {
+ return nil
+ }
+ if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != "Released" {
+ return nil
+ }
+ volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
+
+ // Log deletes for auditing purposes
+ p.logger.Infof("Deleting persistent volume %s", pv.Spec.CSI.VolumeHandle)
+ if err := fsquota.SetQuota(volumePath, 0, 0); err != nil {
+ // We record these here manually since a successful deletion removes the PV we'd be attaching them to
+ p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to remove quota: %v", err)
+ return fmt.Errorf("failed to remove quota: %w", err)
+ }
+ err = os.RemoveAll(volumePath)
+ if os.IsNotExist(err) {
+ return nil
+ } else if err != nil {
+ p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete volume: %v", err)
+ return fmt.Errorf("failed to delete volume: %w", err)
+ }
+
+ err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
+ if err != nil && !apierrs.IsNotFound(err) {
+ p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete PV object from K8s API: %v", err)
+ return fmt.Errorf("failed to delete PV object: %w", err)
+ }
+ return nil
+}
diff --git a/metropolis/node/kubernetes/reconciler/BUILD.bazel b/metropolis/node/kubernetes/reconciler/BUILD.bazel
new file mode 100644
index 0000000..d8f2db6
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/BUILD.bazel
@@ -0,0 +1,38 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "reconciler.go",
+ "resources_csi.go",
+ "resources_podsecuritypolicy.go",
+ "resources_rbac.go",
+ "resources_runtimeclass.go",
+ "resources_storageclass.go",
+ ],
+ importpath = "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/reconciler",
+ visibility = ["//metropolis/node:__subpackages__"],
+ deps = [
+ "//metropolis/node/common/supervisor:go_default_library",
+ "@io_k8s_api//core/v1:go_default_library",
+ "@io_k8s_api//node/v1beta1:go_default_library",
+ "@io_k8s_api//policy/v1beta1:go_default_library",
+ "@io_k8s_api//rbac/v1:go_default_library",
+ "@io_k8s_api//storage/v1:go_default_library",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
+ "@io_k8s_client_go//kubernetes:go_default_library",
+ ],
+)
+
+go_test(
+ name = "go_default_test",
+ srcs = ["reconciler_test.go"],
+ embed = [":go_default_library"],
+ deps = [
+ "@io_k8s_api//node/v1beta1:go_default_library",
+ "@io_k8s_api//policy/v1beta1:go_default_library",
+ "@io_k8s_api//rbac/v1:go_default_library",
+ "@io_k8s_api//storage/v1:go_default_library",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
+ ],
+)
diff --git a/metropolis/node/kubernetes/reconciler/reconciler.go b/metropolis/node/kubernetes/reconciler/reconciler.go
new file mode 100644
index 0000000..9c5ba4e
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/reconciler.go
@@ -0,0 +1,163 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// The reconciler ensures that a base set of K8s resources is always available in the cluster. These are necessary to
+// ensure correct out-of-the-box functionality. All resources containing the smalltown.com/builtin=true label are assumed
+// to be managed by the reconciler.
+// It currently does not revert modifications made by admins, it is planned to create an admission plugin prohibiting
+// such modifications to resources with the smalltown.com/builtin label to deal with that problem. This would also solve a
+// potential issue where you could delete resources just by adding the smalltown.com/builtin=true label.
+package reconciler
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+)
+
+// Sad workaround for all the pointer booleans in K8s specs
+func True() *bool {
+ val := true
+ return &val
+}
+func False() *bool {
+ val := false
+ return &val
+}
+
+const (
+ // BuiltinLabelKey is used as a k8s label to mark built-in objects (ie., managed by the reconciler)
+ BuiltinLabelKey = "smalltown.com/builtin"
+ // BuiltinLabelValue is used as a k8s label value, under the BuiltinLabelKey key.
+ BuiltinLabelValue = "true"
+ // BuiltinRBACPrefix is used to prefix all built-in objects that are part of the rbac/v1 API (eg.
+ // {Cluster,}Role{Binding,} objects). This corresponds to the colon-separated 'namespaces' notation used by
+ // Kubernetes system (system:) objects.
+ BuiltinRBACPrefix = "smalltown:"
+)
+
+// builtinLabels makes a kubernetes-compatible label dictionary (key->value) that is used to mark objects that are
+// built-in into Smalltown (ie., managed by the reconciler). These are then subsequently retrieved by listBuiltins.
+// The extra argument specifies what other labels are to be merged into the the labels dictionary, for convenience. If
+// nil or empty, no extra labels will be applied.
+func builtinLabels(extra map[string]string) map[string]string {
+ l := map[string]string{
+ BuiltinLabelKey: BuiltinLabelValue,
+ }
+ if extra != nil {
+ for k, v := range extra {
+ l[k] = v
+ }
+ }
+ return l
+}
+
+// listBuiltins returns a k8s client ListOptions structure that allows to retrieve all objects that are built-in into
+// Smalltown currently present in the API server (ie., ones that are to be managed by the reconciler). These are created
+// by applying builtinLabels to their metadata labels.
+var listBuiltins = meta.ListOptions{
+ LabelSelector: fmt.Sprintf("%s=%s", BuiltinLabelKey, BuiltinLabelValue),
+}
+
+// builtinRBACName returns a name that is compatible with colon-delimited 'namespaced' objects, a la system:*.
+// These names are to be used by all builtins created as part of the rbac/v1 Kubernetes API.
+func builtinRBACName(name string) string {
+ return BuiltinRBACPrefix + name
+}
+
+// resource is a type of resource to be managed by the reconciler. All builti-ins/reconciled objects must implement
+// this interface to be managed correctly by the reconciler.
+type resource interface {
+ // List returns a list of names of objects current present on the target (ie. k8s API server).
+ List(ctx context.Context) ([]string, error)
+ // Create creates an object on the target. The el interface{} argument is the black box object returned by the
+ // Expected() call.
+ Create(ctx context.Context, el interface{}) error
+ // Delete delete an object, by name, from the target.
+ Delete(ctx context.Context, name string) error
+ // Expected returns a map of all objects expected to be present on the target. The keys are names (which must
+ // correspond to the names returned by List() and used by Delete(), and the values are blackboxes that will then
+ // be passed to the Create() call if their corresponding key (name) does not exist on the target.
+ Expected() map[string]interface{}
+}
+
+func allResources(clientSet kubernetes.Interface) map[string]resource {
+ return map[string]resource{
+ "psps": resourcePodSecurityPolicies{clientSet},
+ "clusterroles": resourceClusterRoles{clientSet},
+ "clusterrolebindings": resourceClusterRoleBindings{clientSet},
+ "storageclasses": resourceStorageClasses{clientSet},
+ "csidrivers": resourceCSIDrivers{clientSet},
+ "runtimeclasses": resourceRuntimeClasses{clientSet},
+ }
+}
+
+func Run(clientSet kubernetes.Interface) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ log := supervisor.Logger(ctx)
+ resources := allResources(clientSet)
+ t := time.NewTicker(10 * time.Second)
+ reconcileAll := func() {
+ for name, resource := range resources {
+ if err := reconcile(ctx, resource); err != nil {
+ log.Warningf("Failed to reconcile built-in resources %s: %v", name, err)
+ }
+ }
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ reconcileAll()
+ for {
+ select {
+ case <-t.C:
+ reconcileAll()
+ case <-ctx.Done():
+ return nil
+ }
+ }
+ }
+}
+
+func reconcile(ctx context.Context, r resource) error {
+ present, err := r.List(ctx)
+ if err != nil {
+ return err
+ }
+ presentSet := make(map[string]bool)
+ for _, el := range present {
+ presentSet[el] = true
+ }
+ expectedMap := r.Expected()
+ for name, el := range expectedMap {
+ if !presentSet[name] {
+ if err := r.Create(ctx, el); err != nil {
+ return err
+ }
+ }
+ }
+ for name, _ := range presentSet {
+ if _, ok := expectedMap[name]; !ok {
+ if err := r.Delete(ctx, name); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
diff --git a/metropolis/node/kubernetes/reconciler/reconciler_test.go b/metropolis/node/kubernetes/reconciler/reconciler_test.go
new file mode 100644
index 0000000..b58d4af
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/reconciler_test.go
@@ -0,0 +1,184 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package reconciler
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ node "k8s.io/api/node/v1beta1"
+ policy "k8s.io/api/policy/v1beta1"
+ rbac "k8s.io/api/rbac/v1"
+ storage "k8s.io/api/storage/v1"
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+// kubernetesMeta unwraps an interface{} that might contain a Kubernetes resource of type that is managed by the
+// reconciler. Any time a new Kubernetes type is managed by the reconciler, the following switch should be extended
+// to cover that type.
+func kubernetesMeta(v interface{}) *meta.ObjectMeta {
+ switch v2 := v.(type) {
+ case *rbac.ClusterRole:
+ return &v2.ObjectMeta
+ case *rbac.ClusterRoleBinding:
+ return &v2.ObjectMeta
+ case *storage.CSIDriver:
+ return &v2.ObjectMeta
+ case *storage.StorageClass:
+ return &v2.ObjectMeta
+ case *policy.PodSecurityPolicy:
+ return &v2.ObjectMeta
+ case *node.RuntimeClass:
+ return &v2.ObjectMeta
+ }
+ return nil
+}
+
+// TestExpectedNamedCorrectly ensures that all the Expected objects of all resource types have a correspondence between
+// their returned key and inner name. This contract must be met in order for the reconciler to not create runaway
+// resources. This assumes all managed resources are Kubernetes resources.
+func TestExpectedNamedCorrectly(t *testing.T) {
+ for reconciler, r := range allResources(nil) {
+ for outer, v := range r.Expected() {
+ meta := kubernetesMeta(v)
+ if meta == nil {
+ t.Errorf("reconciler %q, object %q: could not decode kubernetes metadata", reconciler, outer)
+ continue
+ }
+ if inner := meta.Name; outer != inner {
+ t.Errorf("reconciler %q, object %q: inner name mismatch (%q)", reconciler, outer, inner)
+ continue
+ }
+ }
+ }
+}
+
+// TestExpectedLabeledCorrectly ensures that all the Expected objects of all resource types have a Kubernetes metadata
+// label that signifies it's a builtin object, to be retrieved afterwards. This contract must be met in order for the
+// reconciler to not keep overwriting objects (and possibly failing), when a newly created object is not then
+// retrievable using a selector corresponding to this label. This assumes all managed resources are Kubernetes objects.
+func TestExpectedLabeledCorrectly(t *testing.T) {
+ for reconciler, r := range allResources(nil) {
+ for outer, v := range r.Expected() {
+ meta := kubernetesMeta(v)
+ if meta == nil {
+ t.Errorf("reconciler %q, object %q: could not decode kubernetes metadata", reconciler, outer)
+ continue
+ }
+ if data := meta.Labels[BuiltinLabelKey]; data != BuiltinLabelValue {
+ t.Errorf("reconciler %q, object %q: %q=%q, wanted =%q", reconciler, outer, BuiltinLabelKey, data, BuiltinLabelValue)
+ continue
+ }
+ }
+ }
+}
+
+// testResource is a resource type used for testing. The inner type is a string that is equal to its name (key).
+// It simulates a target (ie. k8s apiserver mock) that always acts nominally (all resources are created, deleted as
+// requested, and the state is consistent with requests).
+type testResource struct {
+ // current is the simulated state of resources in the target.
+ current map[string]string
+ // expected is what this type will report as the Expected() resources.
+ expected map[string]string
+}
+
+func (r *testResource) List(ctx context.Context) ([]string, error) {
+ var keys []string
+ for k, _ := range r.current {
+ keys = append(keys, k)
+ }
+ return keys, nil
+}
+
+func (r *testResource) Create(ctx context.Context, el interface{}) error {
+ r.current[el.(string)] = el.(string)
+ return nil
+}
+
+func (r *testResource) Delete(ctx context.Context, name string) error {
+ delete(r.current, name)
+ return nil
+}
+
+func (r *testResource) Expected() map[string]interface{} {
+ exp := make(map[string]interface{})
+ for k, v := range r.expected {
+ exp[k] = v
+ }
+ return exp
+}
+
+// newTestResource creates a test resource with a list of expected resource strings.
+func newTestResource(want ...string) *testResource {
+ expected := make(map[string]string)
+ for _, w := range want {
+ expected[w] = w
+ }
+ return &testResource{
+ current: make(map[string]string),
+ expected: expected,
+ }
+}
+
+// currentDiff returns a human-readable string showing the different between the current state and the given resource
+// strings. If no difference is present, the returned string is empty.
+func (r *testResource) currentDiff(want ...string) string {
+ expected := make(map[string]string)
+ for _, w := range want {
+ if _, ok := r.current[w]; !ok {
+ return fmt.Sprintf("%q missing in current", w)
+ }
+ expected[w] = w
+ }
+ for _, g := range r.current {
+ if _, ok := expected[g]; !ok {
+ return fmt.Sprintf("%q spurious in current", g)
+ }
+ }
+ return ""
+}
+
+// TestBasicReconciliation ensures that the reconcile function does manipulate a target state based on a set of
+// expected resources.
+func TestBasicReconciliation(t *testing.T) {
+ ctx := context.Background()
+ r := newTestResource("foo", "bar", "baz")
+
+ // nothing should have happened yet (testing the test)
+ if diff := r.currentDiff(); diff != "" {
+ t.Fatalf("wrong state after creation: %s", diff)
+ }
+
+ if err := reconcile(ctx, r); err != nil {
+ t.Fatalf("reconcile: %v", err)
+ }
+ // everything requested should have been created
+ if diff := r.currentDiff("foo", "bar", "baz"); diff != "" {
+ t.Fatalf("wrong state after reconciliation: %s", diff)
+ }
+
+ delete(r.expected, "foo")
+ if err := reconcile(ctx, r); err != nil {
+ t.Fatalf("reconcile: %v", err)
+ }
+ // foo should not be missing
+ if diff := r.currentDiff("bar", "baz"); diff != "" {
+ t.Fatalf("wrong state after deleting foo: %s", diff)
+ }
+}
diff --git a/metropolis/node/kubernetes/reconciler/resources_csi.go b/metropolis/node/kubernetes/reconciler/resources_csi.go
new file mode 100644
index 0000000..ecbcb4b
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/resources_csi.go
@@ -0,0 +1,71 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package reconciler
+
+import (
+ "context"
+
+ storage "k8s.io/api/storage/v1"
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+)
+
+// TODO(q3k): this is duplicated with //metropolis/node/kubernetes:provisioner.go; integrate this once provisioner.go
+// gets moved into a subpackage.
+// ONCHANGE(//metropolis/node/kubernetes:provisioner.go): needs to match csiProvisionerName declared.
+const csiProvisionerName = "com.nexantic.smalltown.vfs"
+
+type resourceCSIDrivers struct {
+ kubernetes.Interface
+}
+
+func (r resourceCSIDrivers) List(ctx context.Context) ([]string, error) {
+ res, err := r.StorageV1().CSIDrivers().List(ctx, listBuiltins)
+ if err != nil {
+ return nil, err
+ }
+ objs := make([]string, len(res.Items))
+ for i, el := range res.Items {
+ objs[i] = el.ObjectMeta.Name
+ }
+ return objs, nil
+}
+
+func (r resourceCSIDrivers) Create(ctx context.Context, el interface{}) error {
+ _, err := r.StorageV1().CSIDrivers().Create(ctx, el.(*storage.CSIDriver), meta.CreateOptions{})
+ return err
+}
+
+func (r resourceCSIDrivers) Delete(ctx context.Context, name string) error {
+ return r.StorageV1().CSIDrivers().Delete(ctx, name, meta.DeleteOptions{})
+}
+
+func (r resourceCSIDrivers) Expected() map[string]interface{} {
+ return map[string]interface{}{
+ csiProvisionerName: &storage.CSIDriver{
+ ObjectMeta: meta.ObjectMeta{
+ Name: csiProvisionerName,
+ Labels: builtinLabels(nil),
+ },
+ Spec: storage.CSIDriverSpec{
+ AttachRequired: False(),
+ PodInfoOnMount: False(),
+ VolumeLifecycleModes: []storage.VolumeLifecycleMode{storage.VolumeLifecyclePersistent},
+ },
+ },
+ }
+}
diff --git a/metropolis/node/kubernetes/reconciler/resources_podsecuritypolicy.go b/metropolis/node/kubernetes/reconciler/resources_podsecuritypolicy.go
new file mode 100644
index 0000000..507089f
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/resources_podsecuritypolicy.go
@@ -0,0 +1,108 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package reconciler
+
+import (
+ "context"
+
+ core "k8s.io/api/core/v1"
+ policy "k8s.io/api/policy/v1beta1"
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+)
+
+type resourcePodSecurityPolicies struct {
+ kubernetes.Interface
+}
+
+func (r resourcePodSecurityPolicies) List(ctx context.Context) ([]string, error) {
+ res, err := r.PolicyV1beta1().PodSecurityPolicies().List(ctx, listBuiltins)
+ if err != nil {
+ return nil, err
+ }
+ objs := make([]string, len(res.Items))
+ for i, el := range res.Items {
+ objs[i] = el.ObjectMeta.Name
+ }
+ return objs, nil
+}
+
+func (r resourcePodSecurityPolicies) Create(ctx context.Context, el interface{}) error {
+ _, err := r.PolicyV1beta1().PodSecurityPolicies().Create(ctx, el.(*policy.PodSecurityPolicy), meta.CreateOptions{})
+ return err
+}
+
+func (r resourcePodSecurityPolicies) Delete(ctx context.Context, name string) error {
+ return r.PolicyV1beta1().PodSecurityPolicies().Delete(ctx, name, meta.DeleteOptions{})
+}
+
+func (r resourcePodSecurityPolicies) Expected() map[string]interface{} {
+ return map[string]interface{}{
+ "default": &policy.PodSecurityPolicy{
+ ObjectMeta: meta.ObjectMeta{
+ Name: "default",
+ Labels: builtinLabels(nil),
+ Annotations: map[string]string{
+ "kubernetes.io/description": "This default PSP allows the creation of pods using features that are" +
+ " generally considered safe against any sort of escape.",
+ },
+ },
+ Spec: policy.PodSecurityPolicySpec{
+ AllowPrivilegeEscalation: True(),
+ AllowedCapabilities: []core.Capability{ // runc's default list of allowed capabilities
+ "SETPCAP",
+ "MKNOD",
+ "AUDIT_WRITE",
+ "CHOWN",
+ "NET_RAW",
+ "DAC_OVERRIDE",
+ "FOWNER",
+ "FSETID",
+ "KILL",
+ "SETGID",
+ "SETUID",
+ "NET_BIND_SERVICE",
+ "SYS_CHROOT",
+ "SETFCAP",
+ },
+ HostNetwork: false,
+ HostIPC: false,
+ HostPID: false,
+ FSGroup: policy.FSGroupStrategyOptions{
+ Rule: policy.FSGroupStrategyRunAsAny,
+ },
+ RunAsUser: policy.RunAsUserStrategyOptions{
+ Rule: policy.RunAsUserStrategyRunAsAny,
+ },
+ SELinux: policy.SELinuxStrategyOptions{
+ Rule: policy.SELinuxStrategyRunAsAny,
+ },
+ SupplementalGroups: policy.SupplementalGroupsStrategyOptions{
+ Rule: policy.SupplementalGroupsStrategyRunAsAny,
+ },
+ Volumes: []policy.FSType{ // Volumes considered safe to use
+ policy.ConfigMap,
+ policy.EmptyDir,
+ policy.Projected,
+ policy.Secret,
+ policy.DownwardAPI,
+ policy.PersistentVolumeClaim,
+ },
+ },
+ },
+ }
+}
diff --git a/metropolis/node/kubernetes/reconciler/resources_rbac.go b/metropolis/node/kubernetes/reconciler/resources_rbac.go
new file mode 100644
index 0000000..40ca879
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/resources_rbac.go
@@ -0,0 +1,154 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package reconciler
+
+import (
+ "context"
+
+ rbac "k8s.io/api/rbac/v1"
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+)
+
+var (
+ clusterRolePSPDefault = builtinRBACName("psp-default")
+ clusterRoleBindingDefaultPSP = builtinRBACName("default-psp-for-sa")
+ clusterRoleBindingAPIServerKubeletClient = builtinRBACName("apiserver-kubelet-client")
+)
+
+type resourceClusterRoles struct {
+ kubernetes.Interface
+}
+
+func (r resourceClusterRoles) List(ctx context.Context) ([]string, error) {
+ res, err := r.RbacV1().ClusterRoles().List(ctx, listBuiltins)
+ if err != nil {
+ return nil, err
+ }
+ objs := make([]string, len(res.Items))
+ for i, el := range res.Items {
+ objs[i] = el.ObjectMeta.Name
+ }
+ return objs, nil
+}
+
+func (r resourceClusterRoles) Create(ctx context.Context, el interface{}) error {
+ _, err := r.RbacV1().ClusterRoles().Create(ctx, el.(*rbac.ClusterRole), meta.CreateOptions{})
+ return err
+}
+
+func (r resourceClusterRoles) Delete(ctx context.Context, name string) error {
+ return r.RbacV1().ClusterRoles().Delete(ctx, name, meta.DeleteOptions{})
+}
+
+func (r resourceClusterRoles) Expected() map[string]interface{} {
+ return map[string]interface{}{
+ clusterRolePSPDefault: &rbac.ClusterRole{
+ ObjectMeta: meta.ObjectMeta{
+ Name: clusterRolePSPDefault,
+ Labels: builtinLabels(nil),
+ Annotations: map[string]string{
+ "kubernetes.io/description": "This role grants access to the \"default\" PSP.",
+ },
+ },
+ Rules: []rbac.PolicyRule{
+ {
+ APIGroups: []string{"policy"},
+ Resources: []string{"podsecuritypolicies"},
+ ResourceNames: []string{"default"},
+ Verbs: []string{"use"},
+ },
+ },
+ },
+ }
+}
+
+type resourceClusterRoleBindings struct {
+ kubernetes.Interface
+}
+
+func (r resourceClusterRoleBindings) List(ctx context.Context) ([]string, error) {
+ res, err := r.RbacV1().ClusterRoleBindings().List(ctx, listBuiltins)
+ if err != nil {
+ return nil, err
+ }
+ objs := make([]string, len(res.Items))
+ for i, el := range res.Items {
+ objs[i] = el.ObjectMeta.Name
+ }
+ return objs, nil
+}
+
+func (r resourceClusterRoleBindings) Create(ctx context.Context, el interface{}) error {
+ _, err := r.RbacV1().ClusterRoleBindings().Create(ctx, el.(*rbac.ClusterRoleBinding), meta.CreateOptions{})
+ return err
+}
+
+func (r resourceClusterRoleBindings) Delete(ctx context.Context, name string) error {
+ return r.RbacV1().ClusterRoleBindings().Delete(ctx, name, meta.DeleteOptions{})
+}
+
+func (r resourceClusterRoleBindings) Expected() map[string]interface{} {
+ return map[string]interface{}{
+ clusterRoleBindingDefaultPSP: &rbac.ClusterRoleBinding{
+ ObjectMeta: meta.ObjectMeta{
+ Name: clusterRoleBindingDefaultPSP,
+ Labels: builtinLabels(nil),
+ Annotations: map[string]string{
+ "kubernetes.io/description": "This binding grants every service account access to the \"default\" PSP. " +
+ "Creation of Pods is still restricted by other RBAC roles. Otherwise no pods (unprivileged or not) " +
+ "can be created.",
+ },
+ },
+ RoleRef: rbac.RoleRef{
+ APIGroup: rbac.GroupName,
+ Kind: "ClusterRole",
+ Name: clusterRolePSPDefault,
+ },
+ Subjects: []rbac.Subject{
+ {
+ APIGroup: rbac.GroupName,
+ Kind: "Group",
+ Name: "system:serviceaccounts",
+ },
+ },
+ },
+ clusterRoleBindingAPIServerKubeletClient: &rbac.ClusterRoleBinding{
+ ObjectMeta: meta.ObjectMeta{
+ Name: clusterRoleBindingAPIServerKubeletClient,
+ Labels: builtinLabels(nil),
+ Annotations: map[string]string{
+ "kubernetes.io/description": "This binding grants the apiserver access to the kubelets. This enables " +
+ "lots of built-in functionality like reading logs or forwarding ports via the API.",
+ },
+ },
+ RoleRef: rbac.RoleRef{
+ APIGroup: rbac.GroupName,
+ Kind: "ClusterRole",
+ Name: "system:kubelet-api-admin",
+ },
+ Subjects: []rbac.Subject{
+ {
+ APIGroup: rbac.GroupName,
+ Kind: "User",
+ // TODO(q3k): describe this name's contract, or unify with whatever creates this.
+ Name: "smalltown:apiserver-kubelet-client",
+ },
+ },
+ },
+ }
+}
diff --git a/metropolis/node/kubernetes/reconciler/resources_runtimeclass.go b/metropolis/node/kubernetes/reconciler/resources_runtimeclass.go
new file mode 100644
index 0000000..c202c0e
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/resources_runtimeclass.go
@@ -0,0 +1,69 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package reconciler
+
+import (
+ "context"
+
+ node "k8s.io/api/node/v1beta1"
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+)
+
+type resourceRuntimeClasses struct {
+ kubernetes.Interface
+}
+
+func (r resourceRuntimeClasses) List(ctx context.Context) ([]string, error) {
+ res, err := r.NodeV1beta1().RuntimeClasses().List(ctx, listBuiltins)
+ if err != nil {
+ return nil, err
+ }
+ objs := make([]string, len(res.Items))
+ for i, el := range res.Items {
+ objs[i] = el.ObjectMeta.Name
+ }
+ return objs, nil
+}
+
+func (r resourceRuntimeClasses) Create(ctx context.Context, el interface{}) error {
+ _, err := r.NodeV1beta1().RuntimeClasses().Create(ctx, el.(*node.RuntimeClass), meta.CreateOptions{})
+ return err
+}
+
+func (r resourceRuntimeClasses) Delete(ctx context.Context, name string) error {
+ return r.NodeV1beta1().RuntimeClasses().Delete(ctx, name, meta.DeleteOptions{})
+}
+
+func (r resourceRuntimeClasses) Expected() map[string]interface{} {
+ return map[string]interface{}{
+ "gvisor": &node.RuntimeClass{
+ ObjectMeta: meta.ObjectMeta{
+ Name: "gvisor",
+ Labels: builtinLabels(nil),
+ },
+ Handler: "runsc",
+ },
+ "runc": &node.RuntimeClass{
+ ObjectMeta: meta.ObjectMeta{
+ Name: "runc",
+ Labels: builtinLabels(nil),
+ },
+ Handler: "runc",
+ },
+ }
+}
diff --git a/metropolis/node/kubernetes/reconciler/resources_storageclass.go b/metropolis/node/kubernetes/reconciler/resources_storageclass.go
new file mode 100644
index 0000000..72476ec
--- /dev/null
+++ b/metropolis/node/kubernetes/reconciler/resources_storageclass.go
@@ -0,0 +1,72 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package reconciler
+
+import (
+ "context"
+
+ core "k8s.io/api/core/v1"
+ storage "k8s.io/api/storage/v1"
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+)
+
+var reclaimPolicyDelete = core.PersistentVolumeReclaimDelete
+var waitForConsumerBinding = storage.VolumeBindingWaitForFirstConsumer
+
+type resourceStorageClasses struct {
+ kubernetes.Interface
+}
+
+func (r resourceStorageClasses) List(ctx context.Context) ([]string, error) {
+ res, err := r.StorageV1().StorageClasses().List(ctx, listBuiltins)
+ if err != nil {
+ return nil, err
+ }
+ objs := make([]string, len(res.Items))
+ for i, el := range res.Items {
+ objs[i] = el.ObjectMeta.Name
+ }
+ return objs, nil
+}
+
+func (r resourceStorageClasses) Create(ctx context.Context, el interface{}) error {
+ _, err := r.StorageV1().StorageClasses().Create(ctx, el.(*storage.StorageClass), meta.CreateOptions{})
+ return err
+}
+
+func (r resourceStorageClasses) Delete(ctx context.Context, name string) error {
+ return r.StorageV1().StorageClasses().Delete(ctx, name, meta.DeleteOptions{})
+}
+
+func (r resourceStorageClasses) Expected() map[string]interface{} {
+ return map[string]interface{}{
+ "local": &storage.StorageClass{
+ ObjectMeta: meta.ObjectMeta{
+ Name: "local",
+ Labels: builtinLabels(nil),
+ Annotations: map[string]string{
+ "storageclass.kubernetes.io/is-default-class": "true",
+ },
+ },
+ AllowVolumeExpansion: True(),
+ Provisioner: csiProvisionerName,
+ ReclaimPolicy: &reclaimPolicyDelete,
+ VolumeBindingMode: &waitForConsumerBinding,
+ },
+ }
+}
diff --git a/metropolis/node/kubernetes/scheduler.go b/metropolis/node/kubernetes/scheduler.go
new file mode 100644
index 0000000..21e6663
--- /dev/null
+++ b/metropolis/node/kubernetes/scheduler.go
@@ -0,0 +1,70 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kubernetes
+
+import (
+ "context"
+ "encoding/pem"
+ "fmt"
+ "os/exec"
+
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/fileargs"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/pki"
+)
+
+type schedulerConfig struct {
+ kubeConfig []byte
+ serverCert []byte
+ serverKey []byte
+}
+
+func getPKISchedulerConfig(ctx context.Context, kpki *pki.KubernetesPKI) (*schedulerConfig, error) {
+ var config schedulerConfig
+ var err error
+ config.serverCert, config.serverKey, err = kpki.Certificate(ctx, pki.Scheduler)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get scheduler serving certificate: %w", err)
+ }
+ config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.SchedulerClient)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get scheduler kubeconfig: %w", err)
+ }
+ return &config, nil
+}
+
+func runScheduler(config schedulerConfig) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ args, err := fileargs.New()
+ if err != nil {
+ panic(err) // If this fails, something is very wrong. Just crash.
+ }
+ defer args.Close()
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-scheduler",
+ args.FileOpt("--kubeconfig", "kubeconfig", config.kubeConfig),
+ "--port=0", // Kill insecure serving
+ args.FileOpt("--tls-cert-file", "server-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
+ args.FileOpt("--tls-private-key-file", "server-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
+ )
+ if args.Error() != nil {
+ return fmt.Errorf("failed to use fileargs: %w", err)
+ }
+ return supervisor.RunCommand(ctx, cmd)
+ }
+}
diff --git a/metropolis/node/kubernetes/service.go b/metropolis/node/kubernetes/service.go
new file mode 100644
index 0000000..2917bfc
--- /dev/null
+++ b/metropolis/node/kubernetes/service.go
@@ -0,0 +1,177 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "os"
+ "time"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/clientcmd"
+
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/network/dns"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/clusternet"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/nfproxy"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/pki"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes/reconciler"
+ apb "git.monogon.dev/source/nexantic.git/metropolis/proto/api"
+)
+
+type Config struct {
+ AdvertiseAddress net.IP
+ ServiceIPRange net.IPNet
+ ClusterNet net.IPNet
+
+ KPKI *pki.KubernetesPKI
+ Root *localstorage.Root
+ CorednsRegistrationChan chan *dns.ExtraDirective
+}
+
+type Service struct {
+ c Config
+}
+
+func New(c Config) *Service {
+ s := &Service{
+ c: c,
+ }
+ return s
+}
+
+func (s *Service) Run(ctx context.Context) error {
+ controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, s.c.KPKI)
+ if err != nil {
+ return fmt.Errorf("could not generate controller manager pki config: %w", err)
+ }
+ controllerManagerConfig.clusterNet = s.c.ClusterNet
+ schedulerConfig, err := getPKISchedulerConfig(ctx, s.c.KPKI)
+ if err != nil {
+ return fmt.Errorf("could not generate scheduler pki config: %w", err)
+ }
+
+ masterKubeconfig, err := s.c.KPKI.Kubeconfig(ctx, pki.Master)
+ if err != nil {
+ return fmt.Errorf("could not generate master kubeconfig: %w", err)
+ }
+
+ rawClientConfig, err := clientcmd.NewClientConfigFromBytes(masterKubeconfig)
+ if err != nil {
+ return fmt.Errorf("could not generate kubernetes client config: %w", err)
+ }
+
+ clientConfig, err := rawClientConfig.ClientConfig()
+ clientSet, err := kubernetes.NewForConfig(clientConfig)
+ if err != nil {
+ return fmt.Errorf("could not generate kubernetes client: %w", err)
+ }
+
+ informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
+
+ hostname, err := os.Hostname()
+ if err != nil {
+ return fmt.Errorf("failed to get hostname: %w", err)
+ }
+
+ dnsHostIP := s.c.AdvertiseAddress // TODO: Which IP to use
+
+ apiserver := &apiserverService{
+ KPKI: s.c.KPKI,
+ AdvertiseAddress: s.c.AdvertiseAddress,
+ ServiceIPRange: s.c.ServiceIPRange,
+ EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
+ }
+
+ kubelet := kubeletService{
+ NodeName: hostname,
+ ClusterDNS: []net.IP{dnsHostIP},
+ KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+ EphemeralDirectory: &s.c.Root.Ephemeral,
+ KPKI: s.c.KPKI,
+ }
+
+ csiPlugin := csiPluginServer{
+ KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+ VolumesDirectory: &s.c.Root.Data.Volumes,
+ }
+
+ csiProvisioner := csiProvisionerServer{
+ NodeName: hostname,
+ Kubernetes: clientSet,
+ InformerFactory: informerFactory,
+ VolumesDirectory: &s.c.Root.Data.Volumes,
+ }
+
+ clusternet := clusternet.Service{
+ NodeName: hostname,
+ Kubernetes: clientSet,
+ ClusterNet: s.c.ClusterNet,
+ InformerFactory: informerFactory,
+ DataDirectory: &s.c.Root.Data.Kubernetes.ClusterNetworking,
+ }
+
+ nfproxy := nfproxy.Service{
+ ClusterCIDR: s.c.ClusterNet,
+ ClientSet: clientSet,
+ }
+
+ for _, sub := range []struct {
+ name string
+ runnable supervisor.Runnable
+ }{
+ {"apiserver", apiserver.Run},
+ {"controller-manager", runControllerManager(*controllerManagerConfig)},
+ {"scheduler", runScheduler(*schedulerConfig)},
+ {"kubelet", kubelet.Run},
+ {"reconciler", reconciler.Run(clientSet)},
+ {"csi-plugin", csiPlugin.Run},
+ {"csi-provisioner", csiProvisioner.Run},
+ {"clusternet", clusternet.Run},
+ {"nfproxy", nfproxy.Run},
+ } {
+ err := supervisor.Run(ctx, sub.name, sub.runnable)
+ if err != nil {
+ return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
+ }
+ }
+
+ supervisor.Logger(ctx).Info("Registering K8s CoreDNS")
+ clusterDNSDirective := dns.NewKubernetesDirective("cluster.local", masterKubeconfig)
+ s.c.CorednsRegistrationChan <- clusterDNSDirective
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ s.c.CorednsRegistrationChan <- dns.CancelDirective(clusterDNSDirective)
+ return nil
+}
+
+// GetDebugKubeconfig issues a kubeconfig for an arbitrary given identity. Useful for debugging and testing.
+func (s *Service) GetDebugKubeconfig(ctx context.Context, request *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+ ca := s.c.KPKI.Certificates[pki.IdCA]
+ debugKubeconfig, err := pki.New(ca, "", pki.Client(request.Id, request.Groups)).Kubeconfig(ctx, s.c.KPKI.KV)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "Failed to generate kubeconfig: %v", err)
+ }
+ return &apb.GetDebugKubeconfigResponse{DebugKubeconfig: string(debugKubeconfig)}, nil
+}