core/internal: move containerd and kubernetes to localstorage
This moves the last users of the old 'storage' library onto 'localstorage'. We move a lot of 'runtime' directories to a single `/ephemeral` root. This could be called `/run`, but that might imply FHS compliance - which we don't have, nor want to have.
We also slightly refactor Kubernetes services to be a bit nicer to spawn. But generally, this is a pure refactor, with no functional changes.
Test Plan: this should fail. part of a larger stack. D590 is the first tip of the stack that should work.
X-Origin-Diff: phab/D589
GitOrigin-RevId: d2a7c0bb52c2a7c753199221c609e03474936c22
diff --git a/core/cmd/init/BUILD.bazel b/core/cmd/init/BUILD.bazel
index af37ea6..368470f 100644
--- a/core/cmd/init/BUILD.bazel
+++ b/core/cmd/init/BUILD.bazel
@@ -16,9 +16,9 @@
"//core/internal/common:go_default_library", # keep
"//core/internal/common/supervisor:go_default_library",
"//core/internal/network:go_default_library",
- "//core/internal/node:go_default_library",
- "//core/internal/storage:go_default_library",
"//core/pkg/tpm:go_default_library",
+ "@de_cinfra_git_source_nexantic_git//core/internal/node:go_default_library",
+ "@de_cinfra_git_source_nexantic_git//core/internal/storage:go_default_library",
"@org_golang_x_sys//unix:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
diff --git a/core/internal/containerd/BUILD.bazel b/core/internal/containerd/BUILD.bazel
index a1deae0..77262b5 100644
--- a/core/internal/containerd/BUILD.bazel
+++ b/core/internal/containerd/BUILD.bazel
@@ -6,9 +6,8 @@
importpath = "git.monogon.dev/source/nexantic.git/core/internal/containerd",
visibility = ["//core:__subpackages__"],
deps = [
- "//core/internal/common/supervisor:go_default_library",
+ "//core/internal/localstorage:go_default_library",
"//core/pkg/logbuffer:go_default_library",
- "@org_golang_x_sys//unix:go_default_library",
],
)
diff --git a/core/internal/containerd/config.toml b/core/internal/containerd/config.toml
index 415391a..75d0a69 100644
--- a/core/internal/containerd/config.toml
+++ b/core/internal/containerd/config.toml
@@ -1,13 +1,13 @@
version = 2
root = "/data/containerd"
-state = "/containerd/run"
+state = "/ephemeral/containerd"
plugin_dir = ""
disabled_plugins = []
required_plugins = []
oom_score = 0
[grpc]
- address = "/containerd/run/containerd.sock"
+ address = "/ephemeral/containerd/client.sock"
tcp_address = ""
tcp_tls_cert = ""
tcp_tls_key = ""
diff --git a/core/internal/containerd/main.go b/core/internal/containerd/main.go
index 9abc976..289efe7 100644
--- a/core/internal/containerd/main.go
+++ b/core/internal/containerd/main.go
@@ -24,63 +24,54 @@
"os/exec"
"time"
- "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
-
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
"git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
-
- "golang.org/x/sys/unix"
)
-const runscLogsFIFOPath = "/containerd/run/runsc-logs.fifo"
-
type Service struct {
+ EphemeralVolume *localstorage.EphemeralContainerdDirectory
+
Log *logbuffer.LogBuffer
RunscLog *logbuffer.LogBuffer
}
-func New() (*Service, error) {
- return &Service{Log: logbuffer.New(5000, 16384), RunscLog: logbuffer.New(5000, 16384)}, nil
-}
+func (s *Service) Run(ctx context.Context) error {
+ if s.Log == nil {
+ s.Log = logbuffer.New(5000, 16384)
+ }
+ if s.RunscLog == nil {
+ s.RunscLog = logbuffer.New(5000, 16384)
+ }
-func (s *Service) Run() supervisor.Runnable {
- return func(ctx context.Context) error {
- cmd := exec.CommandContext(ctx, "/containerd/bin/containerd", "--config", "/containerd/conf/config.toml")
- cmd.Stdout = s.Log
- cmd.Stderr = s.Log
- cmd.Env = []string{"PATH=/containerd/bin", "TMPDIR=/containerd/run/tmp"}
+ cmd := exec.CommandContext(ctx, "/containerd/bin/containerd", "--config", "/containerd/conf/config.toml")
+ cmd.Stdout = s.Log
+ cmd.Stderr = s.Log
+ cmd.Env = []string{"PATH=/containerd/bin", "TMPDIR=" + s.EphemeralVolume.Tmp.FullPath()}
- if err := unix.Mount("tmpfs", "/containerd/run", "tmpfs", 0, ""); err != nil {
- panic(err)
- }
- if err := os.MkdirAll("/containerd/run/tmp", 0755); err != nil {
- panic(err)
- }
-
- runscFifo, err := os.OpenFile(runscLogsFIFOPath, os.O_CREATE|os.O_RDONLY, os.ModeNamedPipe|0777)
- if err != nil {
- return err
- }
- go func() {
- for {
- n, err := io.Copy(s.RunscLog, runscFifo)
- if n == 0 && err == nil {
- // Hack because pipes/FIFOs can return zero reads when nobody is writing. To avoid busy-looping,
- // sleep a bit before retrying. This does not loose data since the FIFO internal buffer will
- // stall writes when it becomes full. 10ms maximum stall in a non-latency critical process (reading
- // debug logs) is not an issue for us.
- time.Sleep(10 * time.Millisecond)
- } else if err != nil {
- // TODO: Use supervisor.Logger() and Error() before exiting. Should never happen.
- fmt.Println(err)
- return // It's likely that this will busy-loop printing errors if it encounters one, so bail
- }
- }
- }()
-
- // TODO(lorenz): Healthcheck against CRI RuntimeService.Status() and SignalHealthy
-
- err = cmd.Run()
- fmt.Fprintf(s.Log, "containerd stopped: %v\n", err)
+ runscFifo, err := os.OpenFile(s.EphemeralVolume.RunSCLogsFIFO.FullPath(), os.O_CREATE|os.O_RDONLY, os.ModeNamedPipe|0777)
+ if err != nil {
return err
}
+ go func() {
+ for {
+ n, err := io.Copy(s.RunscLog, runscFifo)
+ if n == 0 && err == nil {
+ // Hack because pipes/FIFOs can return zero reads when nobody is writing. To avoid busy-looping,
+ // sleep a bit before retrying. This does not loose data since the FIFO internal buffer will
+ // stall writes when it becomes full. 10ms maximum stall in a non-latency critical process (reading
+ // debug logs) is not an issue for us.
+ time.Sleep(10 * time.Millisecond)
+ } else if err != nil {
+ // TODO: Use supervisor.Logger() and Error() before exiting. Should never happen.
+ fmt.Println(err)
+ return // It's likely that this will busy-loop printing errors if it encounters one, so bail
+ }
+ }
+ }()
+
+ // TODO(lorenz): Healthcheck against CRI RuntimeService.Status() and SignalHealthy
+
+ err = cmd.Run()
+ fmt.Fprintf(s.Log, "containerd stopped: %v\n", err)
+ return err
}
diff --git a/core/internal/containerd/runsc.toml b/core/internal/containerd/runsc.toml
index 15126b9..4fe0751 100644
--- a/core/internal/containerd/runsc.toml
+++ b/core/internal/containerd/runsc.toml
@@ -1,6 +1,6 @@
-root = "/containerd/run/runsc"
+root = "/ephemeral/containerd/runsc"
[runsc_config]
debug = "true"
-debug-log = "/containerd/run/runsc-logs.fifo"
-panic-log = "/containerd/run/runsc-logs.fifo"
-log = "/containerd/run/runsc-logs.fifo"
+debug-log = "/ephemeral/containerd/runsc-logs.fifo"
+panic-log = "/ephemeral/containerd/runsc-logs.fifo"
+log = "/ephemeral/containerd/runsc-logs.fifo"
diff --git a/core/internal/kubernetes/BUILD.bazel b/core/internal/kubernetes/BUILD.bazel
index 0a7fa22..3bcbe6a 100644
--- a/core/internal/kubernetes/BUILD.bazel
+++ b/core/internal/kubernetes/BUILD.bazel
@@ -14,20 +14,19 @@
importpath = "git.monogon.dev/source/nexantic.git/core/internal/kubernetes",
visibility = ["//core:__subpackages__"],
deps = [
- "//core/api/api:go_default_library",
"//core/internal/common:go_default_library",
"//core/internal/common/supervisor:go_default_library",
- "//core/internal/consensus:go_default_library",
"//core/internal/kubernetes/clusternet:go_default_library",
"//core/internal/kubernetes/pki:go_default_library",
"//core/internal/kubernetes/reconciler:go_default_library",
- "//core/internal/storage:go_default_library",
+ "//core/internal/localstorage:go_default_library",
+ "//core/internal/localstorage/declarative:go_default_library",
"//core/pkg/fileargs:go_default_library",
"//core/pkg/fsquota:go_default_library",
"//core/pkg/logbuffer:go_default_library",
+ "//core/proto/api:go_default_library",
"@com_github_container_storage_interface_spec//lib/go/csi:go_default_library",
"@io_bazel_rules_go//proto/wkt:wrappers_go_proto",
- "@io_etcd_go_etcd//clientv3:go_default_library",
"@io_k8s_api//core/v1:go_default_library",
"@io_k8s_api//storage/v1:go_default_library",
"@io_k8s_apimachinery//pkg/api/errors:go_default_library",
diff --git a/core/internal/kubernetes/apiserver.go b/core/internal/kubernetes/apiserver.go
index 26c258a..58c3d1e 100644
--- a/core/internal/kubernetes/apiserver.go
+++ b/core/internal/kubernetes/apiserver.go
@@ -27,14 +27,17 @@
"git.monogon.dev/source/nexantic.git/core/internal/common"
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
"git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
-
- "go.etcd.io/etcd/clientv3"
)
-type apiserverConfig struct {
- advertiseAddress net.IP
- serviceIPRange net.IPNet
+type apiserverService struct {
+ KPKI *pki.KubernetesPKI
+ AdvertiseAddress net.IP
+ ServiceIPRange net.IPNet
+ Output io.Writer
+ EphemeralConsensusDirectory *localstorage.EphemeralConsensusDirectory
+
// All PKI-related things are in DER
idCA []byte
kubeletClientCert []byte
@@ -47,23 +50,21 @@
serverKey []byte
}
-func getPKIApiserverConfig(ctx context.Context, kv clientv3.KV, kpki *pki.KubernetesPKI) (*apiserverConfig, error) {
- var config apiserverConfig
-
+func (s *apiserverService) loadPKI(ctx context.Context) error {
for _, el := range []struct {
targetCert *[]byte
targetKey *[]byte
name pki.KubeCertificateName
}{
- {&config.idCA, nil, pki.IdCA},
- {&config.kubeletClientCert, &config.kubeletClientKey, pki.KubeletClient},
- {&config.aggregationCA, nil, pki.AggregationCA},
- {&config.aggregationClientCert, &config.aggregationClientKey, pki.FrontProxyClient},
- {&config.serverCert, &config.serverKey, pki.APIServer},
+ {&s.idCA, nil, pki.IdCA},
+ {&s.kubeletClientCert, &s.kubeletClientKey, pki.KubeletClient},
+ {&s.aggregationCA, nil, pki.AggregationCA},
+ {&s.aggregationClientCert, &s.aggregationClientKey, pki.FrontProxyClient},
+ {&s.serverCert, &s.serverKey, pki.APIServer},
} {
- cert, key, err := kpki.Certificate(ctx, el.name, kv)
+ cert, key, err := s.KPKI.Certificate(ctx, el.name)
if err != nil {
- return nil, fmt.Errorf("could not load certificate %q from PKI: %w", el.name, err)
+ return fmt.Errorf("could not load certificate %q from PKI: %w", el.name, err)
}
if el.targetCert != nil {
*el.targetCert = cert
@@ -74,62 +75,63 @@
}
var err error
- config.serviceAccountPrivKey, err = kpki.ServiceAccountKey(ctx, kv)
+ s.serviceAccountPrivKey, err = s.KPKI.ServiceAccountKey(ctx)
if err != nil {
- return nil, fmt.Errorf("could not load serviceaccount privkey: %w", err)
+ return fmt.Errorf("could not load serviceaccount privkey: %w", err)
}
- return &config, nil
+ return nil
}
-func runAPIServer(config apiserverConfig, output io.Writer) supervisor.Runnable {
- return func(ctx context.Context) error {
- args, err := fileargs.New()
- if err != nil {
- panic(err) // If this fails, something is very wrong. Just crash.
- }
- defer args.Close()
- cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-apiserver",
- fmt.Sprintf("--advertise-address=%v", config.advertiseAddress.String()),
- "--authorization-mode=Node,RBAC",
- args.FileOpt("--client-ca-file", "client-ca.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.idCA})),
- "--enable-admission-plugins=NodeRestriction,PodSecurityPolicy",
- "--enable-aggregator-routing=true",
- "--insecure-port=0",
- fmt.Sprintf("--secure-port=%v", common.KubernetesAPIPort),
- // Due to the magic of GRPC this really needs four slashes and a :0
- fmt.Sprintf("--etcd-servers=%v", "unix:////consensus/listener.sock:0"),
- args.FileOpt("--kubelet-client-certificate", "kubelet-client-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.kubeletClientCert})),
- args.FileOpt("--kubelet-client-key", "kubelet-client-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.kubeletClientKey})),
- "--kubelet-preferred-address-types=InternalIP",
- args.FileOpt("--proxy-client-cert-file", "aggregation-client-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationClientCert})),
- args.FileOpt("--proxy-client-key-file", "aggregation-client-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.aggregationClientKey})),
- "--requestheader-allowed-names=front-proxy-client",
- args.FileOpt("--requestheader-client-ca-file", "aggregation-ca.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationCA})),
- "--requestheader-extra-headers-prefix=X-Remote-Extra-",
- "--requestheader-group-headers=X-Remote-Group",
- "--requestheader-username-headers=X-Remote-User",
- args.FileOpt("--service-account-key-file", "service-account-pubkey.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serviceAccountPrivKey})),
- fmt.Sprintf("--service-cluster-ip-range=%v", config.serviceIPRange.String()),
- args.FileOpt("--tls-cert-file", "server-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
- args.FileOpt("--tls-private-key-file", "server-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
- )
- if args.Error() != nil {
- return err
- }
- cmd.Stdout = output
- cmd.Stderr = output
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- err = cmd.Run()
- fmt.Fprintf(output, "apiserver stopped: %v\n", err)
+func (s *apiserverService) Run(ctx context.Context) error {
+ if err := s.loadPKI(ctx); err != nil {
+ return fmt.Errorf("loading PKI data failed: %w", err)
+ }
+ args, err := fileargs.New()
+ if err != nil {
+ panic(err) // If this fails, something is very wrong. Just crash.
+ }
+ defer args.Close()
+
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-apiserver",
+ fmt.Sprintf("--advertise-address=%v", s.AdvertiseAddress.String()),
+ "--authorization-mode=Node,RBAC",
+ args.FileOpt("--client-ca-file", "client-ca.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: s.idCA})),
+ "--enable-admission-plugins=NodeRestriction,PodSecurityPolicy",
+ "--enable-aggregator-routing=true",
+ "--insecure-port=0",
+ fmt.Sprintf("--secure-port=%v", common.KubernetesAPIPort),
+ fmt.Sprintf("--etcd-servers=unix:///%s:0", s.EphemeralConsensusDirectory.ClientSocket.FullPath()),
+ args.FileOpt("--kubelet-client-certificate", "kubelet-client-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: s.kubeletClientCert})),
+ args.FileOpt("--kubelet-client-key", "kubelet-client-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: s.kubeletClientKey})),
+ "--kubelet-preferred-address-types=InternalIP",
+ args.FileOpt("--proxy-client-cert-file", "aggregation-client-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: s.aggregationClientCert})),
+ args.FileOpt("--proxy-client-key-file", "aggregation-client-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: s.aggregationClientKey})),
+ "--requestheader-allowed-names=front-proxy-client",
+ args.FileOpt("--requestheader-client-ca-file", "aggregation-ca.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: s.aggregationCA})),
+ "--requestheader-extra-headers-prefix=X-Remote-Extra-",
+ "--requestheader-group-headers=X-Remote-Group",
+ "--requestheader-username-headers=X-Remote-User",
+ args.FileOpt("--service-account-key-file", "service-account-pubkey.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: s.serviceAccountPrivKey})),
+ fmt.Sprintf("--service-cluster-ip-range=%v", s.ServiceIPRange.String()),
+ args.FileOpt("--tls-cert-file", "server-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: s.serverCert})),
+ args.FileOpt("--tls-private-key-file", "server-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: s.serverKey})),
+ )
+ if args.Error() != nil {
return err
}
+ cmd.Stdout = s.Output
+ cmd.Stderr = s.Output
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ err = cmd.Run()
+ fmt.Fprintf(s.Output, "apiserver stopped: %v\n", err)
+ return err
}
diff --git a/core/internal/kubernetes/clusternet/BUILD.bazel b/core/internal/kubernetes/clusternet/BUILD.bazel
index 484439c..dd5c58e 100644
--- a/core/internal/kubernetes/clusternet/BUILD.bazel
+++ b/core/internal/kubernetes/clusternet/BUILD.bazel
@@ -11,6 +11,7 @@
deps = [
"//core/internal/common:go_default_library",
"//core/internal/common/supervisor:go_default_library",
+ "//core/internal/localstorage:go_default_library",
"//core/pkg/jsonpatch:go_default_library",
"@com_github_vishvananda_netlink//:go_default_library",
"@com_zx2c4_golang_wireguard_wgctrl//:go_default_library",
@@ -19,7 +20,6 @@
"@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library",
"@io_k8s_apimachinery//pkg/types:go_default_library",
"@io_k8s_client_go//informers:go_default_library",
- "@io_k8s_client_go//informers/core/v1:go_default_library",
"@io_k8s_client_go//kubernetes:go_default_library",
"@io_k8s_client_go//tools/cache:go_default_library",
"@org_uber_go_zap//:go_default_library",
diff --git a/core/internal/kubernetes/clusternet/clusternet.go b/core/internal/kubernetes/clusternet/clusternet.go
index aa3e7ce..5c42bb8 100644
--- a/core/internal/kubernetes/clusternet/clusternet.go
+++ b/core/internal/kubernetes/clusternet/clusternet.go
@@ -30,10 +30,13 @@
"encoding/json"
"errors"
"fmt"
- "io/ioutil"
"net"
"os"
+ "k8s.io/client-go/informers"
+
+ "k8s.io/client-go/kubernetes"
+
"github.com/vishvananda/netlink"
"go.uber.org/zap"
"golang.zx2c4.com/wireguard/wgctrl"
@@ -41,33 +44,34 @@
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
- "k8s.io/client-go/informers"
- coreinformers "k8s.io/client-go/informers/core/v1"
- "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"git.monogon.dev/source/nexantic.git/core/internal/common"
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
"git.monogon.dev/source/nexantic.git/core/pkg/jsonpatch"
)
const (
clusterNetDeviceName = "clusternet"
publicKeyAnnotation = "node.smalltown.nexantic.com/wg-pubkey"
-
- privateKeyPath = "/data/kubernetes/clusternet.key"
)
-type clusternet struct {
- nodeName string
- wgClient *wgctrl.Client
- nodeInformer coreinformers.NodeInformer
- logger *zap.Logger
+type Service struct {
+ NodeName string
+ Kubernetes kubernetes.Interface
+ ClusterNet net.IPNet
+ InformerFactory informers.SharedInformerFactory
+ DataDirectory *localstorage.DataKubernetesClusterNetworkingDirectory
+
+ wgClient *wgctrl.Client
+ privKey wgtypes.Key
+ logger *zap.Logger
}
// ensureNode creates/updates the corresponding WireGuard peer entry for the given node objet
-func (c *clusternet) ensureNode(newNode *corev1.Node) error {
- if newNode.Name == c.nodeName {
+func (s *Service) ensureNode(newNode *corev1.Node) error {
+ if newNode.Name == s.NodeName {
// Node doesn't need to connect to itself
return nil
}
@@ -83,12 +87,12 @@
for _, addr := range newNode.Status.Addresses {
if addr.Type == corev1.NodeInternalIP {
if internalIP != nil {
- c.logger.Warn("More than one NodeInternalIP specified, using the first one")
+ s.logger.Warn("More than one NodeInternalIP specified, using the first one")
break
}
internalIP = net.ParseIP(addr.Address)
if internalIP == nil {
- c.logger.Warn("failed to parse Internal IP")
+ s.logger.Warn("failed to parse Internal IP")
}
}
}
@@ -99,16 +103,16 @@
for _, podNetStr := range newNode.Spec.PodCIDRs {
_, podNet, err := net.ParseCIDR(podNetStr)
if err != nil {
- c.logger.Warn("Node PodCIDR failed to parse, ignored", zap.Error(err), zap.String("node", newNode.Name))
+ s.logger.Warn("Node PodCIDR failed to parse, ignored", zap.Error(err), zap.String("node", newNode.Name))
continue
}
allowedIPs = append(allowedIPs, *podNet)
}
- c.logger.Debug("Adding/Updating WireGuard peer node", zap.String("node", newNode.Name),
+ s.logger.Debug("Adding/Updating WireGuard peer node", zap.String("node", newNode.Name),
zap.String("endpointIP", internalIP.String()), zap.Any("allowedIPs", allowedIPs))
// WireGuard's kernel side has create/update semantics on peers by default. So we can just add the peer multiple
// times to update it.
- err = c.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Peers: []wgtypes.PeerConfig{{
PublicKey: pubKey,
Endpoint: &net.UDPAddr{Port: common.WireGuardPort, IP: internalIP},
@@ -123,8 +127,8 @@
}
// removeNode removes the corresponding WireGuard peer entry for the given node object
-func (c *clusternet) removeNode(oldNode *corev1.Node) error {
- if oldNode.Name == c.nodeName {
+func (s *Service) removeNode(oldNode *corev1.Node) error {
+ if oldNode.Name == s.NodeName {
// Node doesn't need to connect to itself
return nil
}
@@ -136,7 +140,7 @@
if err != nil {
return fmt.Errorf("node public-key annotation not decodable: %w", err)
}
- err = c.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
Peers: []wgtypes.PeerConfig{{
PublicKey: pubKey,
Remove: true,
@@ -148,119 +152,121 @@
return nil
}
-// EnsureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
-func EnsureOnDiskKey() (*wgtypes.Key, error) {
- privKeyRaw, err := ioutil.ReadFile(privateKeyPath)
+// ensureOnDiskKey loads the private key from disk or (if none exists) generates one and persists it.
+func (s *Service) ensureOnDiskKey() error {
+ keyRaw, err := s.DataDirectory.Key.Read()
if os.IsNotExist(err) {
- privKey, err := wgtypes.GeneratePrivateKey()
+ key, err := wgtypes.GeneratePrivateKey()
if err != nil {
- return nil, fmt.Errorf("failed to generate private key: %w", err)
+ return fmt.Errorf("failed to generate private key: %w", err)
}
- if err := ioutil.WriteFile(privateKeyPath, []byte(privKey.String()), 0600); err != nil {
- return nil, fmt.Errorf("failed to store newly generated key: %w", err)
+ if err := s.DataDirectory.Key.Write([]byte(key.String()), 0600); err != nil {
+ return fmt.Errorf("failed to store newly generated key: %w", err)
}
- return &privKey, nil
+
+ s.privKey = key
+ return nil
} else if err != nil {
- return nil, fmt.Errorf("failed to load on-disk key: %w", err)
+ return fmt.Errorf("failed to load on-disk key: %w", err)
}
- privKey, err := wgtypes.ParseKey(string(privKeyRaw))
+
+ key, err := wgtypes.ParseKey(string(keyRaw))
if err != nil {
- return nil, fmt.Errorf("invalid private key in file: %w", err)
+ return fmt.Errorf("invalid private key in file: %w", err)
}
- return &privKey, nil
+ s.privKey = key
+ return nil
+}
+
+// annotateThisNode annotates the node (as defined by NodeName) with the wireguard public key of this node.
+func (s *Service) annotateThisNode(ctx context.Context) error {
+ patch := []jsonpatch.JsonPatchOp{{
+ Operation: "add",
+ Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
+ Value: s.privKey.PublicKey().String(),
+ }}
+
+ patchRaw, err := json.Marshal(patch)
+ if err != nil {
+ return fmt.Errorf("failed to encode JSONPatch: %w", err)
+ }
+
+ if _, err := s.Kubernetes.CoreV1().Nodes().Patch(ctx, s.NodeName, types.JSONPatchType, patchRaw, metav1.PatchOptions{}); err != nil {
+ return fmt.Errorf("failed to patch resource: %w", err)
+ }
+
+ return nil
}
// Run runs the ClusterNet service. See package description for what it does.
-func Run(informerFactory informers.SharedInformerFactory, clusterNet net.IPNet, clientSet kubernetes.Interface, key *wgtypes.Key) supervisor.Runnable {
- return func(ctx context.Context) error {
- logger := supervisor.Logger(ctx)
- nodeName, err := os.Hostname()
- if err != nil {
- return fmt.Errorf("failed to determine hostname: %w", err)
- }
- wgClient, err := wgctrl.New()
- if err != nil {
- return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
- }
+func (s *Service) Run(ctx context.Context) error {
+ logger := supervisor.Logger(ctx)
- nodeAnnotationPatch := []jsonpatch.JsonPatchOp{{
- Operation: "add",
- Path: "/metadata/annotations/" + jsonpatch.EncodeJSONRefToken(publicKeyAnnotation),
- Value: key.PublicKey().String(),
- }}
-
- nodeAnnotationPatchRaw, err := json.Marshal(nodeAnnotationPatch)
- if err != nil {
- return fmt.Errorf("failed to encode JSONPatch: %w", err)
- }
-
- if _, err := clientSet.CoreV1().Nodes().Patch(ctx, nodeName, types.JSONPatchType, nodeAnnotationPatchRaw, metav1.PatchOptions{}); err != nil {
- return fmt.Errorf("failed to set ClusterNet public key for node: %w", err)
- }
-
- nodeInformer := informerFactory.Core().V1().Nodes()
- wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
- if err := netlink.LinkAdd(wgInterface); err != nil {
- return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
- }
- defer netlink.LinkDel(wgInterface)
-
- listenPort := common.WireGuardPort
- if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
- PrivateKey: key,
- ListenPort: &listenPort,
- }); err != nil {
- return fmt.Errorf("failed to set up WireGuard interface: %w", err)
- }
-
- if err := netlink.RouteAdd(&netlink.Route{
- Dst: &clusterNet,
- LinkIndex: wgInterface.Index,
- }); err != nil && !os.IsExist(err) {
- return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
- }
-
- cnet := clusternet{
- wgClient: wgClient,
- nodeInformer: nodeInformer,
- logger: logger,
- nodeName: nodeName,
- }
-
- nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: func(new interface{}) {
- newNode, ok := new.(*corev1.Node)
- if !ok {
- logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
- return
- }
- if err := cnet.ensureNode(newNode); err != nil {
- logger.Warn("Failed to sync node", zap.Error(err))
- }
- },
- UpdateFunc: func(old, new interface{}) {
- newNode, ok := new.(*corev1.Node)
- if !ok {
- logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
- return
- }
- if err := cnet.ensureNode(newNode); err != nil {
- logger.Warn("Failed to sync node", zap.Error(err))
- }
- },
- DeleteFunc: func(old interface{}) {
- oldNode, ok := old.(*corev1.Node)
- if !ok {
- logger.Error("Received non-node item in node event handler", zap.Reflect("item", oldNode))
- return
- }
- if err := cnet.removeNode(oldNode); err != nil {
- logger.Warn("Failed to sync node", zap.Error(err))
- }
- },
- })
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- nodeInformer.Informer().Run(ctx.Done())
- return ctx.Err()
+ wgClient, err := wgctrl.New()
+ if err != nil {
+ return fmt.Errorf("failed to connect to netlink's WireGuard config endpoint: %w", err)
}
+
+ wgInterface := &Wireguard{LinkAttrs: netlink.LinkAttrs{Name: clusterNetDeviceName, Flags: net.FlagUp}}
+ if err := netlink.LinkAdd(wgInterface); err != nil {
+ return fmt.Errorf("failed to add WireGuard network interfacee: %w", err)
+ }
+ defer netlink.LinkDel(wgInterface)
+
+ listenPort := common.WireGuardPort
+ if err := wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
+ PrivateKey: &s.privKey,
+ ListenPort: &listenPort,
+ }); err != nil {
+ return fmt.Errorf("failed to set up WireGuard interface: %w", err)
+ }
+
+ if err := netlink.RouteAdd(&netlink.Route{
+ Dst: &s.ClusterNet,
+ LinkIndex: wgInterface.Index,
+ }); err != nil && !os.IsExist(err) {
+ return fmt.Errorf("failed to add cluster net route to Wireguard interface: %w", err)
+ }
+
+ if err := s.annotateThisNode(ctx); err != nil {
+ return fmt.Errorf("when annotating this node with public key: %w", err)
+ }
+
+ nodeInformer := s.InformerFactory.Core().V1().Nodes()
+ nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(new interface{}) {
+ newNode, ok := new.(*corev1.Node)
+ if !ok {
+ logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
+ return
+ }
+ if err := s.ensureNode(newNode); err != nil {
+ logger.Warn("Failed to sync node", zap.Error(err))
+ }
+ },
+ UpdateFunc: func(old, new interface{}) {
+ newNode, ok := new.(*corev1.Node)
+ if !ok {
+ logger.Error("Received non-node item in node event handler", zap.Reflect("item", new))
+ return
+ }
+ if err := s.ensureNode(newNode); err != nil {
+ logger.Warn("Failed to sync node", zap.Error(err))
+ }
+ },
+ DeleteFunc: func(old interface{}) {
+ oldNode, ok := old.(*corev1.Node)
+ if !ok {
+ logger.Error("Received non-node item in node event handler", zap.Reflect("item", oldNode))
+ return
+ }
+ if err := s.removeNode(oldNode); err != nil {
+ logger.Warn("Failed to sync node", zap.Error(err))
+ }
+ },
+ })
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ nodeInformer.Informer().Run(ctx.Done())
+ return ctx.Err()
}
diff --git a/core/internal/kubernetes/controller-manager.go b/core/internal/kubernetes/controller-manager.go
index 8a85a99..126076e 100644
--- a/core/internal/kubernetes/controller-manager.go
+++ b/core/internal/kubernetes/controller-manager.go
@@ -24,8 +24,6 @@
"net"
"os/exec"
- "go.etcd.io/etcd/clientv3"
-
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
"git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
@@ -41,24 +39,22 @@
serverKey []byte
}
-var clusterNet = net.IPNet{IP: net.IP{10, 0, 0, 0}, Mask: net.IPMask{255, 255, 0, 0}}
-
-func getPKIControllerManagerConfig(ctx context.Context, kv clientv3.KV, kpki *pki.KubernetesPKI) (*controllerManagerConfig, error) {
+func getPKIControllerManagerConfig(ctx context.Context, kpki *pki.KubernetesPKI) (*controllerManagerConfig, error) {
var config controllerManagerConfig
var err error
- config.rootCA, _, err = kpki.Certificate(ctx, pki.IdCA, kv)
+ config.rootCA, _, err = kpki.Certificate(ctx, pki.IdCA)
if err != nil {
return nil, fmt.Errorf("failed to get ID root CA: %w", err)
}
- config.serverCert, config.serverKey, err = kpki.Certificate(ctx, pki.ControllerManager, kv)
+ config.serverCert, config.serverKey, err = kpki.Certificate(ctx, pki.ControllerManager)
if err != nil {
return nil, fmt.Errorf("failed to get controller-manager serving certificate: %w", err)
}
- config.serviceAccountPrivKey, err = kpki.ServiceAccountKey(ctx, kv)
+ config.serviceAccountPrivKey, err = kpki.ServiceAccountKey(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get serviceaccount privkey: %w", err)
}
- config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.ControllerManagerClient, kv)
+ config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.ControllerManagerClient)
if err != nil {
return nil, fmt.Errorf("failed to get controller-manager kubeconfig: %w", err)
}
@@ -87,7 +83,7 @@
args.FileOpt("--tls-private-key-file", "server-key.pem",
pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
"--allocate-node-cidrs",
- "--cluster-cidr="+clusterNet.String(),
+ "--cluster-cidr="+config.clusterNet.String(),
)
if args.Error() != nil {
diff --git a/core/internal/kubernetes/csi.go b/core/internal/kubernetes/csi.go
index 6db82bc..e151396 100644
--- a/core/internal/kubernetes/csi.go
+++ b/core/internal/kubernetes/csi.go
@@ -24,8 +24,6 @@
"path/filepath"
"regexp"
- "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
-
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes/wrappers"
"go.uber.org/zap"
@@ -35,7 +33,8 @@
"google.golang.org/grpc/status"
pluginregistration "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
- "git.monogon.dev/source/nexantic.git/core/internal/storage"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
"git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
)
@@ -46,69 +45,63 @@
const volumeDir = "volumes"
-const pluginSocketPath = "/data/kubernetes/kubelet/plugins/com.smalltown.vfs.sock"
+type csiPluginServer struct {
+ KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
+ VolumesDirectory *localstorage.DataVolumesDirectory
-type csiServer struct {
- manager *storage.Manager
- logger *zap.Logger
+ logger *zap.Logger
}
-func runCSIPlugin(storMan *storage.Manager) supervisor.Runnable {
- return func(ctx context.Context) error {
- s := &csiServer{
- manager: storMan,
- logger: supervisor.Logger(ctx),
- }
- pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: pluginSocketPath, Net: "unix"})
- if err != nil {
- return fmt.Errorf("failed to listen on CSI socket: %w", err)
- }
- pluginListener.SetUnlinkOnClose(true)
+func (s *csiPluginServer) Run(ctx context.Context) error {
+ s.logger = supervisor.Logger(ctx)
- pluginServer := grpc.NewServer()
- csi.RegisterIdentityServer(pluginServer, s)
- csi.RegisterNodeServer(pluginServer, s)
- // Enable graceful shutdown since we don't have long-running RPCs and most of them shouldn't and can't be
- // cancelled anyways.
- if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
- return err
- }
-
- registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{
- Name: "/data/kubernetes/kubelet/plugins_registry/com.smalltown.vfs-reg.sock",
- Net: "unix",
- })
- if err != nil {
- return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
- }
- registrationListener.SetUnlinkOnClose(true)
-
- registrationServer := grpc.NewServer()
- pluginregistration.RegisterRegistrationServer(registrationServer, s)
- if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
- return err
- }
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- supervisor.Signal(ctx, supervisor.SignalDone)
- return nil
+ pluginListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.Plugins.VFS.FullPath(), Net: "unix"})
+ if err != nil {
+ return fmt.Errorf("failed to listen on CSI socket: %w", err)
}
+ pluginListener.SetUnlinkOnClose(true)
+
+ pluginServer := grpc.NewServer()
+ csi.RegisterIdentityServer(pluginServer, s)
+ csi.RegisterNodeServer(pluginServer, s)
+ // Enable graceful shutdown since we don't have long-running RPCs and most of them shouldn't and can't be
+ // cancelled anyways.
+ if err := supervisor.Run(ctx, "csi-node", supervisor.GRPCServer(pluginServer, pluginListener, true)); err != nil {
+ return err
+ }
+
+ registrationListener, err := net.ListenUnix("unix", &net.UnixAddr{Name: s.KubeletDirectory.PluginsRegistry.VFSReg.FullPath(), Net: "unix"})
+ if err != nil {
+ return fmt.Errorf("failed to listen on CSI registration socket: %w", err)
+ }
+ registrationListener.SetUnlinkOnClose(true)
+
+ registrationServer := grpc.NewServer()
+ pluginregistration.RegisterRegistrationServer(registrationServer, s)
+ if err := supervisor.Run(ctx, "registration", supervisor.GRPCServer(registrationServer, registrationListener, true)); err != nil {
+ return err
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
}
-func (*csiServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
+func (*csiPluginServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method NodeStageVolume not supported")
}
-func (*csiServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
+
+func (*csiPluginServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method NodeUnstageVolume not supported")
}
-func (s *csiServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
+func (s *csiPluginServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if !acceptableNames.MatchString(req.VolumeId) {
return nil, status.Error(codes.InvalidArgument, "invalid characters in volume id")
}
- volumePath, err := s.manager.GetPathInPlace(storage.PlaceData, filepath.Join(volumeDir, req.VolumeId))
- if err != nil {
- return nil, status.Error(codes.Unavailable, "persistent data storage not available")
- }
+
+ // TODO(q3k): move this logic to localstorage?
+ volumePath := filepath.Join(s.VolumesDirectory.FullPath(), req.VolumeId)
+
switch req.VolumeCapability.AccessMode.Mode {
case csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER:
case csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
@@ -121,12 +114,14 @@
return nil, status.Error(codes.InvalidArgument, "unsupported access type")
}
- err = unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
- if err == unix.ENOENT {
+ err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND, "")
+ switch {
+ case err == unix.ENOENT:
return nil, status.Error(codes.NotFound, "volume not found")
- } else if err != nil {
+ case err != nil:
return nil, status.Errorf(codes.Unavailable, "failed to bind-mount volume: %v", err)
}
+
if req.Readonly {
err := unix.Mount(volumePath, req.TargetPath, "", unix.MS_BIND|unix.MS_REMOUNT|unix.MS_RDONLY, "")
if err != nil {
@@ -136,13 +131,15 @@
}
return &csi.NodePublishVolumeResponse{}, nil
}
-func (*csiServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
+
+func (*csiPluginServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if err := unix.Unmount(req.TargetPath, 0); err != nil {
return nil, status.Errorf(codes.Unavailable, "failed to unmount volume: %v", err)
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}
-func (*csiServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
+
+func (*csiPluginServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
quota, err := fsquota.GetQuota(req.VolumePath)
if os.IsNotExist(err) {
return nil, status.Error(codes.NotFound, "volume does not exist at this path")
@@ -167,7 +164,8 @@
},
}, nil
}
-func (*csiServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
+
+func (*csiPluginServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
if req.CapacityRange.LimitBytes <= 0 {
return nil, status.Error(codes.InvalidArgument, "invalid expanded volume size: at or below zero bytes")
}
@@ -184,7 +182,8 @@
},
}
}
-func (*csiServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
+
+func (*csiPluginServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
rpcCapability(csi.NodeServiceCapability_RPC_EXPAND_VOLUME),
@@ -192,7 +191,8 @@
},
}, nil
}
-func (*csiServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
+
+func (*csiPluginServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, status.Errorf(codes.Unavailable, "failed to get node identity: %v", err)
@@ -203,13 +203,14 @@
}
// CSI Identity endpoints
-func (*csiServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
+func (*csiPluginServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
return &csi.GetPluginInfoResponse{
Name: "com.smalltown.vfs",
VendorVersion: "0.0.1", // TODO(lorenz): Maybe stamp?
}, nil
}
-func (*csiServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
+
+func (*csiPluginServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
@@ -223,23 +224,21 @@
}, nil
}
-func (s *csiServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
- _, err := s.manager.GetPathInPlace(storage.PlaceData, volumeDir)
- ready := err == nil
- return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: ready}}, nil
+func (s *csiPluginServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
+ return &csi.ProbeResponse{Ready: &wrappers.BoolValue{Value: true}}, nil
}
// Registration endpoints
-func (s *csiServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
+func (s *csiPluginServer) GetInfo(ctx context.Context, req *pluginregistration.InfoRequest) (*pluginregistration.PluginInfo, error) {
return &pluginregistration.PluginInfo{
Type: "CSIPlugin",
Name: "com.smalltown.vfs",
- Endpoint: pluginSocketPath,
+ Endpoint: s.KubeletDirectory.Plugins.VFS.FullPath(),
SupportedVersions: []string{"1.2"}, // Keep in sync with container-storage-interface/spec package version
}, nil
}
-func (s *csiServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
+func (s *csiPluginServer) NotifyRegistrationStatus(ctx context.Context, req *pluginregistration.RegistrationStatus) (*pluginregistration.RegistrationStatusResponse, error) {
if req.Error != "" {
s.logger.Warn("Kubelet failed registering CSI plugin", zap.String("error", req.Error))
}
diff --git a/core/internal/kubernetes/kubelet.go b/core/internal/kubernetes/kubelet.go
index 639e891..d45a238 100644
--- a/core/internal/kubernetes/kubelet.go
+++ b/core/internal/kubernetes/kubelet.go
@@ -22,133 +22,130 @@
"encoding/pem"
"fmt"
"io"
- "io/ioutil"
"net"
- "os"
"os/exec"
- "go.etcd.io/etcd/clientv3"
-
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/reconciler"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
"git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfig "k8s.io/kubelet/config/v1beta1"
)
-var (
- kubeletRoot = "/data/kubernetes"
- kubeletKubeconfig = kubeletRoot + "/kubelet.kubeconfig"
- kubeletCACert = kubeletRoot + "/ca.crt"
- kubeletCert = kubeletRoot + "/kubelet.crt"
- kubeletKey = kubeletRoot + "/kubelet.key"
-)
-
-type KubeletSpec struct {
- clusterDNS []net.IP
+type kubeletService struct {
+ NodeName string
+ ClusterDNS []net.IP
+ KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
+ EphemeralDirectory *localstorage.EphemeralDirectory
+ Output io.Writer
+ KPKI *pki.KubernetesPKI
}
-func createKubeletConfig(ctx context.Context, kv clientv3.KV, kpki *pki.KubernetesPKI, nodeName string) error {
- identity := fmt.Sprintf("system:node:%s", nodeName)
+func (s *kubeletService) createCertificates(ctx context.Context) error {
+ identity := fmt.Sprintf("system:node:%s", s.NodeName)
- ca := kpki.Certificates[pki.IdCA]
- cacert, _, err := ca.Ensure(ctx, kv)
+ ca := s.KPKI.Certificates[pki.IdCA]
+ cacert, _, err := ca.Ensure(ctx, s.KPKI.KV)
if err != nil {
return fmt.Errorf("could not ensure ca certificate: %w", err)
}
- kubeconfig, err := pki.New(ca, "", pki.Client(identity, []string{"system:nodes"})).Kubeconfig(ctx, kv)
+ kubeconfig, err := pki.New(ca, "", pki.Client(identity, []string{"system:nodes"})).Kubeconfig(ctx, s.KPKI.KV)
if err != nil {
return fmt.Errorf("could not create volatile kubelet client cert: %w", err)
}
- cert, key, err := pki.New(ca, "volatile", pki.Server([]string{nodeName}, nil)).Ensure(ctx, kv)
+ cert, key, err := pki.New(ca, "", pki.Server([]string{s.NodeName}, nil)).Ensure(ctx, s.KPKI.KV)
if err != nil {
return fmt.Errorf("could not create volatile kubelet server cert: %w", err)
}
- if err := os.MkdirAll(kubeletRoot, 0755); err != nil {
- return fmt.Errorf("could not create kubelet root directory: %w", err)
- }
// TODO(q3k): this should probably become its own function //core/internal/kubernetes/pki.
for _, el := range []struct {
- target string
+ target declarative.FilePlacement
data []byte
}{
- {kubeletKubeconfig, kubeconfig},
- {kubeletCACert, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cacert})},
- {kubeletCert, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})},
- {kubeletKey, pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key})},
+ {s.KubeletDirectory.Kubeconfig, kubeconfig},
+ {s.KubeletDirectory.PKI.CACertificate, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cacert})},
+ {s.KubeletDirectory.PKI.Certificate, pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert})},
+ {s.KubeletDirectory.PKI.Key, pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key})},
} {
- if err := ioutil.WriteFile(el.target, el.data, 0400); err != nil {
- return fmt.Errorf("could not write %q: %w", el.target, err)
+ if err := el.target.Write(el.data, 0400); err != nil {
+ return fmt.Errorf("could not write %v: %w", el.target, err)
}
}
return nil
}
-func runKubelet(spec *KubeletSpec, output io.Writer) supervisor.Runnable {
- return func(ctx context.Context) error {
- fargs, err := fileargs.New()
- if err != nil {
- return err
- }
- var clusterDNS []string
- for _, dnsIP := range spec.clusterDNS {
- clusterDNS = append(clusterDNS, dnsIP.String())
- }
+func (s *kubeletService) configure() *kubeletconfig.KubeletConfiguration {
+ var clusterDNS []string
+ for _, dnsIP := range s.ClusterDNS {
+ clusterDNS = append(clusterDNS, dnsIP.String())
+ }
- kubeletConf := &kubeletconfig.KubeletConfiguration{
- TypeMeta: v1.TypeMeta{
- Kind: "KubeletConfiguration",
- APIVersion: kubeletconfig.GroupName + "/v1beta1",
+ return &kubeletconfig.KubeletConfiguration{
+ TypeMeta: v1.TypeMeta{
+ Kind: "KubeletConfiguration",
+ APIVersion: kubeletconfig.GroupName + "/v1beta1",
+ },
+ TLSCertFile: s.KubeletDirectory.PKI.Certificate.FullPath(),
+ TLSPrivateKeyFile: s.KubeletDirectory.PKI.Key.FullPath(),
+ TLSMinVersion: "VersionTLS13",
+ ClusterDNS: clusterDNS,
+ Authentication: kubeletconfig.KubeletAuthentication{
+ X509: kubeletconfig.KubeletX509Authentication{
+ ClientCAFile: s.KubeletDirectory.PKI.CACertificate.FullPath(),
},
- TLSCertFile: "/data/kubernetes/kubelet.crt",
- TLSPrivateKeyFile: "/data/kubernetes/kubelet.key",
- TLSMinVersion: "VersionTLS13",
- ClusterDNS: clusterDNS,
- Authentication: kubeletconfig.KubeletAuthentication{
- X509: kubeletconfig.KubeletX509Authentication{
- ClientCAFile: "/data/kubernetes/ca.crt",
- },
- },
- // TODO(q3k): move reconciler.False to a generic package, fix the following references.
- ClusterDomain: "cluster.local", // cluster.local is hardcoded in the certificate too currently
- EnableControllerAttachDetach: reconciler.False(),
- HairpinMode: "none",
- MakeIPTablesUtilChains: reconciler.False(), // We don't have iptables
- FailSwapOn: reconciler.False(), // Our kernel doesn't have swap enabled which breaks Kubelet's detection
- KubeReserved: map[string]string{
- "cpu": "200m",
- "memory": "300Mi",
- },
+ },
+ // TODO(q3k): move reconciler.False to a generic package, fix the following references.
+ ClusterDomain: "cluster.local", // cluster.local is hardcoded in the certificate too currently
+ EnableControllerAttachDetach: reconciler.False(),
+ HairpinMode: "none",
+ MakeIPTablesUtilChains: reconciler.False(), // We don't have iptables
+ FailSwapOn: reconciler.False(), // Our kernel doesn't have swap enabled which breaks Kubelet's detection
+ KubeReserved: map[string]string{
+ "cpu": "200m",
+ "memory": "300Mi",
+ },
- // We're not going to use this, but let's make it point to a known-empty directory in case anybody manages to
- // trigger it.
- VolumePluginDir: "/kubernetes/conf/flexvolume-plugins",
- }
+ // We're not going to use this, but let's make it point to a known-empty directory in case anybody manages to
+ // trigger it.
+ VolumePluginDir: s.EphemeralDirectory.FlexvolumePlugins.FullPath(),
+ }
+}
- configRaw, err := json.Marshal(kubeletConf)
- if err != nil {
- return err
- }
- cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kubelet",
- fargs.FileOpt("--config", "config.json", configRaw),
- "--container-runtime=remote",
- "--container-runtime-endpoint=unix:///containerd/run/containerd.sock",
- "--kubeconfig=/data/kubernetes/kubelet.kubeconfig",
- "--root-dir=/data/kubernetes/kubelet",
- )
- cmd.Env = []string{"PATH=/kubernetes/bin"}
- cmd.Stdout = output
- cmd.Stderr = output
+func (s *kubeletService) Run(ctx context.Context) error {
+ if err := s.createCertificates(ctx); err != nil {
+ return fmt.Errorf("when creating certificates: %w", err)
+ }
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- err = cmd.Run()
- fmt.Fprintf(output, "kubelet stopped: %v\n", err)
+ configRaw, err := json.Marshal(s.configure())
+ if err != nil {
+ return fmt.Errorf("when marshaling kubelet configuration: %w", err)
+ }
+
+ fargs, err := fileargs.New()
+ if err != nil {
return err
}
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kubelet",
+ fargs.FileOpt("--config", "config.json", configRaw),
+ "--container-runtime=remote",
+ fmt.Sprintf("--container-runtime-endpoint=unix://%s", s.EphemeralDirectory.Containerd.ClientSocket.FullPath()),
+ fmt.Sprintf("--kubeconfig=%s", s.KubeletDirectory.Kubeconfig.FullPath()),
+ fmt.Sprintf("--root-dir=%s", s.KubeletDirectory.FullPath()),
+ )
+ cmd.Env = []string{"PATH=/kubernetes/bin"}
+ cmd.Stdout = s.Output
+ cmd.Stderr = s.Output
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ err = cmd.Run()
+ fmt.Fprintf(s.Output, "kubelet stopped: %v\n", err)
+ return err
}
diff --git a/core/internal/kubernetes/pki/kubernetes.go b/core/internal/kubernetes/pki/kubernetes.go
index ed70b87..48ce6e9 100644
--- a/core/internal/kubernetes/pki/kubernetes.go
+++ b/core/internal/kubernetes/pki/kubernetes.go
@@ -78,12 +78,14 @@
// which can be retrieved, or be used to generate Kubeconfigs from.
type KubernetesPKI struct {
logger *zap.Logger
+ KV clientv3.KV
Certificates map[KubeCertificateName]*Certificate
}
-func NewKubernetes(l *zap.Logger) *KubernetesPKI {
+func NewKubernetes(l *zap.Logger, kv clientv3.KV) *KubernetesPKI {
pki := KubernetesPKI{
logger: l,
+ KV: kv,
Certificates: make(map[KubeCertificateName]*Certificate),
}
@@ -117,15 +119,15 @@
}
// EnsureAll ensures that all static certificates (and the serviceaccount key) are present on etcd.
-func (k *KubernetesPKI) EnsureAll(ctx context.Context, kv clientv3.KV) error {
+func (k *KubernetesPKI) EnsureAll(ctx context.Context) error {
for n, v := range k.Certificates {
k.logger.Info("ensureing certificate existence", zap.String("name", string(n)))
- _, _, err := v.Ensure(ctx, kv)
+ _, _, err := v.Ensure(ctx, k.KV)
if err != nil {
return fmt.Errorf("could not ensure certificate %q exists: %w", n, err)
}
}
- _, err := k.ServiceAccountKey(ctx, kv)
+ _, err := k.ServiceAccountKey(ctx)
if err != nil {
return fmt.Errorf("could not ensure service account key exists: %w", err)
}
@@ -134,23 +136,23 @@
// Kubeconfig generates a kubeconfig blob for a given certificate name. The same lifetime semantics as in .Certificate
// apply.
-func (k *KubernetesPKI) Kubeconfig(ctx context.Context, name KubeCertificateName, kv clientv3.KV) ([]byte, error) {
+func (k *KubernetesPKI) Kubeconfig(ctx context.Context, name KubeCertificateName) ([]byte, error) {
c, ok := k.Certificates[name]
if !ok {
return nil, fmt.Errorf("no certificate %q", name)
}
- return c.Kubeconfig(ctx, kv)
+ return c.Kubeconfig(ctx, k.KV)
}
// Certificate retrieves an x509 DER-encoded (but not PEM-wrapped) key and certificate for a given certificate name.
// If the requested certificate is volatile, it will be created on demand. Otherwise it will be created on etcd (if not
// present), and retrieved from there.
-func (k *KubernetesPKI) Certificate(ctx context.Context, name KubeCertificateName, kv clientv3.KV) (cert, key []byte, err error) {
+func (k *KubernetesPKI) Certificate(ctx context.Context, name KubeCertificateName) (cert, key []byte, err error) {
c, ok := k.Certificates[name]
if !ok {
return nil, nil, fmt.Errorf("no certificate %q", name)
}
- return c.Ensure(ctx, kv)
+ return c.Ensure(ctx, k.KV)
}
// Kubeconfig generates a kubeconfig blob for this certificate. The same lifetime semantics as in .Ensure apply.
@@ -191,14 +193,14 @@
// ServiceAccountKey retrieves (and possible generates and stores on etcd) the Kubernetes service account key. The
// returned data is ready to be used by Kubernetes components (in PKIX form).
-func (k *KubernetesPKI) ServiceAccountKey(ctx context.Context, kv clientv3.KV) ([]byte, error) {
+func (k *KubernetesPKI) ServiceAccountKey(ctx context.Context) ([]byte, error) {
// TODO(q3k): this should be abstracted away once we abstract away etcd access into a library with try-or-create
// semantics.
path := etcdPath("%s.der", serviceAccountKeyName)
// Try loading key from etcd.
- keyRes, err := kv.Get(ctx, path)
+ keyRes, err := k.KV.Get(ctx, path)
if err != nil {
return nil, fmt.Errorf("failed to get key from etcd: %w", err)
}
@@ -219,7 +221,7 @@
}
// Save to etcd.
- _, err = kv.Put(ctx, path, string(key))
+ _, err = k.KV.Put(ctx, path, string(key))
if err != nil {
err = fmt.Errorf("failed to write newly generated key: %w", err)
}
diff --git a/core/internal/kubernetes/provisioner.go b/core/internal/kubernetes/provisioner.go
index c864715..0e9e419 100644
--- a/core/internal/kubernetes/provisioner.go
+++ b/core/internal/kubernetes/provisioner.go
@@ -25,11 +25,6 @@
"path/filepath"
"go.uber.org/zap"
-
- "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
- "git.monogon.dev/source/nexantic.git/core/internal/storage"
- "git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
-
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
@@ -44,25 +39,31 @@
"k8s.io/client-go/tools/record"
ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+ "git.monogon.dev/source/nexantic.git/core/pkg/fsquota"
)
-// ONCHANGE(//core/internal/kubernetes/reconciler:resources_csi.go): needs to match csiProvisionerName declared.
-const csiProvisionerName = "com.nexantic.smalltown.vfs"
+// ONCHANGE(//core/internal/kubernetes/reconciler:resources_csi.go): needs to match csiProvisionerServerName declared.
+const csiProvisionerServerName = "com.nexantic.smalltown.vfs"
-// csiProvisioner is responsible for the provisioning and deprovisioning of CSI-based container volumes. It runs on all
+// csiProvisionerServer is responsible for the provisioning and deprovisioning of CSI-based container volumes. It runs on all
// nodes and watches PVCs for ones assigned to the node it's running on and fulfills the provisioning request by
// creating a directory, applying a quota and creating the corresponding PV. When the PV is released and its retention
// policy is Delete, the directory and the PV resource are deleted.
-type csiProvisioner struct {
- nodeName string
- kubeclientset kubernetes.Interface
+type csiProvisionerServer struct {
+ NodeName string
+ Kubernetes kubernetes.Interface
+ InformerFactory informers.SharedInformerFactory
+ VolumesDirectory *localstorage.DataVolumesDirectory
+
claimQueue workqueue.RateLimitingInterface
pvQueue workqueue.RateLimitingInterface
recorder record.EventRecorder
pvcInformer coreinformers.PersistentVolumeClaimInformer
pvInformer coreinformers.PersistentVolumeInformer
storageClassInformer storageinformers.StorageClassInformer
- storageManager *storage.Manager
logger *zap.Logger
}
@@ -70,79 +71,77 @@
// the events happening on the Kubernetes control plane and informs us when something happens. If anything happens to
// PVCs or PVs, we enqueue the identifier of that resource in a work queue. Queues are being worked on by only one
// worker to limit load and avoid complicated locking infrastructure. Failed items are requeued.
-func runCSIProvisioner(storMan *storage.Manager, kubeclientset kubernetes.Interface, informerFactory informers.SharedInformerFactory) supervisor.Runnable {
- return func(ctx context.Context) error {
- nodeName, err := os.Hostname()
- if err != nil {
- panic(err)
- }
+func (p *csiProvisionerServer) Run(ctx context.Context) error {
+ // The recorder is used to log Kubernetes events for successful or failed volume provisions. These events then
+ // show up in `kubectl describe pvc` and can be used by admins to debug issues with this provisioner.
+ eventBroadcaster := record.NewBroadcaster()
+ eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.Kubernetes.CoreV1().Events("")})
+ p.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerServerName, Host: p.NodeName})
- // The recorder is used to log Kubernetes events for successful or failed volume provisions. These events then
- // show up in `kubectl describe pvc` and can be used by admins to debug issues with this provisioner.
- eventBroadcaster := record.NewBroadcaster()
- eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
- recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: csiProvisionerName, Host: nodeName})
+ p.pvInformer = p.InformerFactory.Core().V1().PersistentVolumes()
+ p.pvcInformer = p.InformerFactory.Core().V1().PersistentVolumeClaims()
+ p.storageClassInformer = p.InformerFactory.Storage().V1().StorageClasses()
- p := &csiProvisioner{
- nodeName: nodeName,
- recorder: recorder,
- kubeclientset: kubeclientset,
- pvInformer: informerFactory.Core().V1().PersistentVolumes(),
- pvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
- storageClassInformer: informerFactory.Storage().V1().StorageClasses(),
- claimQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
- pvQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
- storageManager: storMan,
- logger: supervisor.Logger(ctx),
- }
+ p.claimQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
+ p.pvQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
- p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: p.enqueueClaim,
- UpdateFunc: func(old, new interface{}) {
- p.enqueueClaim(new)
- },
- })
- p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: p.enqueuePV,
- UpdateFunc: func(old, new interface{}) {
- p.enqueuePV(new)
- },
- })
+ p.pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: p.enqueueClaim,
+ UpdateFunc: func(old, new interface{}) {
+ p.enqueueClaim(new)
+ },
+ })
+ p.pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: p.enqueuePV,
+ UpdateFunc: func(old, new interface{}) {
+ p.enqueuePV(new)
+ },
+ })
+ p.logger = supervisor.Logger(ctx)
- go p.pvcInformer.Informer().Run(ctx.Done())
- go p.pvInformer.Informer().Run(ctx.Done())
- go p.storageClassInformer.Informer().Run(ctx.Done())
+ go p.pvcInformer.Informer().Run(ctx.Done())
+ go p.pvInformer.Informer().Run(ctx.Done())
+ go p.storageClassInformer.Informer().Run(ctx.Done())
- // These will self-terminate once the queues are shut down
- go p.processQueueItems(p.claimQueue, func(key string) error {
- return p.processPVC(key)
- })
- go p.processQueueItems(p.pvQueue, func(key string) error {
- return p.processPV(key)
- })
+ // These will self-terminate once the queues are shut down
+ go p.processQueueItems(p.claimQueue, func(key string) error {
+ return p.processPVC(key)
+ })
+ go p.processQueueItems(p.pvQueue, func(key string) error {
+ return p.processPV(key)
+ })
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- <-ctx.Done()
- p.claimQueue.ShutDown()
- p.pvQueue.ShutDown()
- return nil
- }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ p.claimQueue.ShutDown()
+ p.pvQueue.ShutDown()
+ return nil
}
// isOurPVC checks if the given PVC is is to be provisioned by this provisioner and has been scheduled onto this node
-func (p *csiProvisioner) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
- return pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] == csiProvisionerName &&
- (pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] == p.nodeName)
+func (p *csiProvisionerServer) isOurPVC(pvc *v1.PersistentVolumeClaim) bool {
+ if pvc.ObjectMeta.Annotations["volume.beta.kubernetes.io/storage-provisioner"] != csiProvisionerServerName {
+ return false
+ }
+ if pvc.ObjectMeta.Annotations["volume.kubernetes.io/selected-node"] != p.NodeName {
+ return false
+ }
+ return true
}
// isOurPV checks if the given PV has been provisioned by this provisioner and has been scheduled onto this node
-func (p *csiProvisioner) isOurPV(pv *v1.PersistentVolume) bool {
- return pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] == csiProvisionerName &&
- pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] == p.nodeName
+func (p *csiProvisionerServer) isOurPV(pv *v1.PersistentVolume) bool {
+ if pv.ObjectMeta.Annotations["pv.kubernetes.io/provisioned-by"] != csiProvisionerServerName {
+ return false
+ }
+ if pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions[0].Values[0] != p.NodeName {
+ return false
+ }
+ return true
}
// enqueueClaim adds an added/changed PVC to the work queue
-func (p *csiProvisioner) enqueueClaim(obj interface{}) {
+func (p *csiProvisionerServer) enqueueClaim(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
p.logger.Error("Not queuing PVC because key could not be derived", zap.Error(err))
@@ -152,7 +151,7 @@
}
// enqueuePV adds an added/changed PV to the work queue
-func (p *csiProvisioner) enqueuePV(obj interface{}) {
+func (p *csiProvisionerServer) enqueuePV(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
p.logger.Error("Not queuing PV because key could not be derived", zap.Error(err))
@@ -163,7 +162,7 @@
// processQueueItems gets items from the given work queue and calls the process function for each of them. It self-
// terminates once the queue is shut down.
-func (p *csiProvisioner) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
+func (p *csiProvisionerServer) processQueueItems(queue workqueue.RateLimitingInterface, process func(key string) error) {
for {
obj, shutdown := queue.Get()
if shutdown {
@@ -190,25 +189,14 @@
}
}
-// getVolumePath gets the path where the volume is stored or an error if the storage manager doesn't
-// have the volume available
-func (p *csiProvisioner) getVolumePath(volumeID string) (string, error) {
- return p.storageManager.GetPathInPlace(storage.PlaceData, filepath.Join(volumeDir, volumeID))
-}
-
-// ensureVolumePath ensures that the top-level volume directory is created. It fails if the storage manager doesn't
-// have the volume available.
-func (p *csiProvisioner) ensureVolumePath() error {
- path, err := p.storageManager.GetPathInPlace(storage.PlaceData, volumeDir)
- if err != nil {
- return err
- }
- return os.MkdirAll(path, 0640)
+// volumePath gets the path where the volume is stored.
+func (p *csiProvisionerServer) volumePath(volumeID string) string {
+ return filepath.Join(p.VolumesDirectory.FullPath(), volumeID)
}
// processPVC looks at a single PVC item from the queue, determines if it needs to be provisioned and logs the
// provisioning result to the recorder
-func (p *csiProvisioner) processPVC(key string) error {
+func (p *csiProvisionerServer) processPVC(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("invalid resource key: %s", key)
@@ -234,7 +222,7 @@
return fmt.Errorf("")
}
- if storageClass.Provisioner != csiProvisionerName {
+ if storageClass.Provisioner != csiProvisionerServerName {
// We're not responsible for this PVC. Can only happen if controller-manager makes a mistake
// setting the annotations, but we're bailing here anyways for safety.
return nil
@@ -253,7 +241,7 @@
// provisionPVC creates the directory where the volume lives, sets a quota for the requested amount of storage and
// creates the PV object representing this new volume
-func (p *csiProvisioner) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
+func (p *csiProvisionerServer) provisionPVC(pvc *v1.PersistentVolumeClaim, storageClass *storagev1.StorageClass) error {
claimRef, err := ref.GetReference(scheme.Scheme, pvc)
if err != nil {
return fmt.Errorf("failed to get reference to PVC: %w", err)
@@ -273,14 +261,7 @@
}
volumeID := "pvc-" + string(pvc.ObjectMeta.UID)
- volumePath, err := p.getVolumePath(volumeID)
- if err != nil {
- return fmt.Errorf("unable to access volumes: %w", err)
- }
-
- if err := p.ensureVolumePath(); err != nil {
- return fmt.Errorf("failed to create volume location: %w", err)
- }
+ volumePath := p.volumePath(volumeID)
p.logger.Info("Creating local PV", zap.String("volume-id", volumeID))
if err := os.Mkdir(volumePath, 0644); err != nil && !os.IsExist(err) {
@@ -301,7 +282,7 @@
ObjectMeta: metav1.ObjectMeta{
Name: volumeID,
Annotations: map[string]string{
- "pv.kubernetes.io/provisioned-by": csiProvisionerName},
+ "pv.kubernetes.io/provisioned-by": csiProvisionerServerName},
},
Spec: v1.PersistentVolumeSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
@@ -310,7 +291,7 @@
},
PersistentVolumeSource: v1.PersistentVolumeSource{
CSI: &v1.CSIPersistentVolumeSource{
- Driver: csiProvisionerName,
+ Driver: csiProvisionerServerName,
VolumeHandle: volumeID,
},
},
@@ -323,7 +304,7 @@
{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
- Values: []string{p.nodeName},
+ Values: []string{p.NodeName},
},
},
},
@@ -335,7 +316,8 @@
},
}
- if _, err = p.kubeclientset.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{}); !apierrs.IsAlreadyExists(err) && err != nil {
+ _, err = p.Kubernetes.CoreV1().PersistentVolumes().Create(context.Background(), vol, metav1.CreateOptions{})
+ if err != nil && !apierrs.IsAlreadyExists(err) {
return fmt.Errorf("failed to create PV object: %w", err)
}
return nil
@@ -343,7 +325,7 @@
// processPV looks at a single PV item from the queue and checks if it has been released and needs to be deleted. If yes
// it deletes the associated quota, directory and the PV object and logs the result to the recorder.
-func (p *csiProvisioner) processPV(key string) error {
+func (p *csiProvisionerServer) processPV(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("invalid resource key: %s", key)
@@ -361,7 +343,7 @@
if pv.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete || pv.Status.Phase != "Released" {
return nil
}
- volumePath, err := p.getVolumePath(pv.Spec.CSI.VolumeHandle)
+ volumePath := p.volumePath(pv.Spec.CSI.VolumeHandle)
// Log deletes for auditing purposes
p.logger.Info("Deleting persistent volume", zap.String("name", pv.Spec.CSI.VolumeHandle))
@@ -378,7 +360,7 @@
return fmt.Errorf("failed to delete volume: %w", err)
}
- err = p.kubeclientset.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
+ err = p.Kubernetes.CoreV1().PersistentVolumes().Delete(context.Background(), pv.Name, metav1.DeleteOptions{})
if err != nil && !apierrs.IsNotFound(err) {
p.recorder.Eventf(pv, v1.EventTypeWarning, "DeprovisioningFailed", "Failed to delete PV object from K8s API: %v", err)
return fmt.Errorf("failed to delete PV object: %w", err)
diff --git a/core/internal/kubernetes/scheduler.go b/core/internal/kubernetes/scheduler.go
index 35b1e64..5a91134 100644
--- a/core/internal/kubernetes/scheduler.go
+++ b/core/internal/kubernetes/scheduler.go
@@ -23,8 +23,6 @@
"io"
"os/exec"
- "go.etcd.io/etcd/clientv3"
-
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
"git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
@@ -36,14 +34,14 @@
serverKey []byte
}
-func getPKISchedulerConfig(ctx context.Context, kv clientv3.KV, kpki *pki.KubernetesPKI) (*schedulerConfig, error) {
+func getPKISchedulerConfig(ctx context.Context, kpki *pki.KubernetesPKI) (*schedulerConfig, error) {
var config schedulerConfig
var err error
- config.serverCert, config.serverKey, err = kpki.Certificate(ctx, pki.Scheduler, kv)
+ config.serverCert, config.serverKey, err = kpki.Certificate(ctx, pki.Scheduler)
if err != nil {
return nil, fmt.Errorf("failed to get scheduler serving certificate: %w", err)
}
- config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.SchedulerClient, kv)
+ config.kubeConfig, err = kpki.Kubeconfig(ctx, pki.SchedulerClient)
if err != nil {
return nil, fmt.Errorf("failed to get scheduler kubeconfig: %w", err)
}
diff --git a/core/internal/kubernetes/service.go b/core/internal/kubernetes/service.go
index ccfb41c..2396066 100644
--- a/core/internal/kubernetes/service.go
+++ b/core/internal/kubernetes/service.go
@@ -22,191 +22,192 @@
"fmt"
"net"
"os"
+ "sync"
"time"
- "go.etcd.io/etcd/clientv3"
- "go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
- schema "git.monogon.dev/source/nexantic.git/core/generated/api"
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
- "git.monogon.dev/source/nexantic.git/core/internal/consensus"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/clusternet"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/reconciler"
- "git.monogon.dev/source/nexantic.git/core/internal/storage"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
"git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
+ apb "git.monogon.dev/source/nexantic.git/core/proto/api"
)
type Config struct {
AdvertiseAddress net.IP
ServiceIPRange net.IPNet
ClusterNet net.IPNet
+
+ KPKI *pki.KubernetesPKI
+ Root *localstorage.Root
}
-type Service struct {
- consensusService *consensus.Service
- storageService *storage.Manager
- logger *zap.Logger
-
+type state struct {
apiserverLogs *logbuffer.LogBuffer
controllerManagerLogs *logbuffer.LogBuffer
schedulerLogs *logbuffer.LogBuffer
kubeletLogs *logbuffer.LogBuffer
-
- kpki *pki.KubernetesPKI
}
-func New(logger *zap.Logger, consensusService *consensus.Service, storageService *storage.Manager) *Service {
- s := &Service{
- consensusService: consensusService,
- storageService: storageService,
- logger: logger,
+type Service struct {
+ c Config
+ stateMu sync.Mutex
+ state *state
+}
+func New(c Config) *Service {
+ s := &Service{
+ c: c,
+ }
+ return s
+}
+
+func (s *Service) getState() *state {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ return s.state
+}
+
+func (s *Service) Run(ctx context.Context) error {
+ st := &state{
apiserverLogs: logbuffer.New(5000, 16384),
controllerManagerLogs: logbuffer.New(5000, 16384),
schedulerLogs: logbuffer.New(5000, 16384),
kubeletLogs: logbuffer.New(5000, 16384),
+ }
+ s.stateMu.Lock()
+ s.state = st
+ s.stateMu.Unlock()
- kpki: pki.NewKubernetes(logger.Named("pki")),
+ controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, s.c.KPKI)
+ if err != nil {
+ return fmt.Errorf("could not generate controller manager pki config: %w", err)
+ }
+ controllerManagerConfig.clusterNet = s.c.ClusterNet
+ schedulerConfig, err := getPKISchedulerConfig(ctx, s.c.KPKI)
+ if err != nil {
+ return fmt.Errorf("could not generate scheduler pki config: %w", err)
}
- return s
-}
+ masterKubeconfig, err := s.c.KPKI.Kubeconfig(ctx, pki.Master)
+ if err != nil {
+ return fmt.Errorf("could not generate master kubeconfig: %w", err)
+ }
-func (s *Service) getKV() clientv3.KV {
- return s.consensusService.GetStore("kubernetes", "")
-}
+ rawClientConfig, err := clientcmd.NewClientConfigFromBytes(masterKubeconfig)
+ if err != nil {
+ return fmt.Errorf("could not generate kubernetes client config: %w", err)
+ }
-func (s *Service) NewCluster() error {
- // TODO(q3k): this needs to be passed by the caller.
- ctx := context.TODO()
- return s.kpki.EnsureAll(ctx, s.getKV())
+ clientConfig, err := rawClientConfig.ClientConfig()
+ clientSet, err := kubernetes.NewForConfig(clientConfig)
+ if err != nil {
+ return fmt.Errorf("could not generate kubernetes client: %w", err)
+ }
+
+ informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
+
+ hostname, err := os.Hostname()
+ if err != nil {
+ return fmt.Errorf("failed to get hostname: %w", err)
+ }
+
+ apiserver := &apiserverService{
+ KPKI: s.c.KPKI,
+ AdvertiseAddress: s.c.AdvertiseAddress,
+ ServiceIPRange: s.c.ServiceIPRange,
+ Output: st.apiserverLogs,
+ EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
+ }
+
+ kubelet := kubeletService{
+ NodeName: hostname,
+ ClusterDNS: nil,
+ KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+ EphemeralDirectory: &s.c.Root.Ephemeral,
+ Output: st.kubeletLogs,
+ KPKI: s.c.KPKI,
+ }
+
+ csiPlugin := csiPluginServer{
+ KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+ VolumesDirectory: &s.c.Root.Data.Volumes,
+ }
+
+ csiProvisioner := csiProvisionerServer{
+ NodeName: hostname,
+ Kubernetes: clientSet,
+ InformerFactory: informerFactory,
+ VolumesDirectory: &s.c.Root.Data.Volumes,
+ }
+
+ clusternet := clusternet.Service{
+ NodeName: hostname,
+ Kubernetes: clientSet,
+ ClusterNet: s.c.ClusterNet,
+ InformerFactory: informerFactory,
+ DataDirectory: &s.c.Root.Data.Kubernetes.ClusterNetworking,
+ }
+
+ for _, sub := range []struct {
+ name string
+ runnable supervisor.Runnable
+ }{
+ {"apiserver", apiserver.Run},
+ {"controller-manager", runControllerManager(*controllerManagerConfig, st.controllerManagerLogs)},
+ {"scheduler", runScheduler(*schedulerConfig, st.schedulerLogs)},
+ {"kubelet", kubelet.Run},
+ {"reconciler", reconciler.Run(clientSet)},
+ {"csi-plugin", csiPlugin.Run},
+ {"csi-provisioner", csiProvisioner.Run},
+ {"clusternet", clusternet.Run},
+ } {
+ err := supervisor.Run(ctx, sub.name, sub.runnable)
+ if err != nil {
+ return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
+ }
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
}
// GetComponentLogs grabs logs from various Kubernetes binaries
func (s *Service) GetComponentLogs(component string, n int) ([]string, error) {
+ s.stateMu.Lock()
+ defer s.stateMu.Unlock()
+ if s.state == nil {
+ return nil, errors.New("kubernetes not started yet")
+ }
+
switch component {
case "apiserver":
- return s.apiserverLogs.ReadLinesTruncated(n, "..."), nil
+ return s.state.apiserverLogs.ReadLinesTruncated(n, "..."), nil
case "controller-manager":
- return s.controllerManagerLogs.ReadLinesTruncated(n, "..."), nil
+ return s.state.controllerManagerLogs.ReadLinesTruncated(n, "..."), nil
case "scheduler":
- return s.schedulerLogs.ReadLinesTruncated(n, "..."), nil
+ return s.state.schedulerLogs.ReadLinesTruncated(n, "..."), nil
case "kubelet":
- return s.kubeletLogs.ReadLinesTruncated(n, "..."), nil
+ return s.state.kubeletLogs.ReadLinesTruncated(n, "..."), nil
default:
- return []string{}, errors.New("component not available")
+ return nil, errors.New("component not available")
}
}
// GetDebugKubeconfig issues a kubeconfig for an arbitrary given identity. Useful for debugging and testing.
-func (s *Service) GetDebugKubeconfig(ctx context.Context, request *schema.GetDebugKubeconfigRequest) (*schema.GetDebugKubeconfigResponse, error) {
- if !s.consensusService.IsReady() {
- return nil, status.Error(codes.Unavailable, "Consensus not ready yet")
- }
- ca := s.kpki.Certificates[pki.IdCA]
- debugKubeconfig, err := pki.New(ca, "", pki.Client(request.Id, request.Groups)).Kubeconfig(ctx, s.getKV())
+func (s *Service) GetDebugKubeconfig(ctx context.Context, request *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+ ca := s.c.KPKI.Certificates[pki.IdCA]
+ debugKubeconfig, err := pki.New(ca, "", pki.Client(request.Id, request.Groups)).Kubeconfig(ctx, s.c.KPKI.KV)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "Failed to generate kubeconfig: %v", err)
}
- return &schema.GetDebugKubeconfigResponse{DebugKubeconfig: string(debugKubeconfig)}, nil
-}
-
-func (s *Service) Start() error {
- // TODO(lorenz): This creates a new supervision tree, it should instead attach to the root one. But for that SmalltownNode needs
- // to be ported to supervisor.
- supervisor.New(context.TODO(), s.logger, s.Run())
- return nil
-}
-
-func (s *Service) Run() supervisor.Runnable {
- return func(ctx context.Context) error {
- config := Config{
- AdvertiseAddress: net.IP{10, 0, 2, 15}, // Depends on networking
- ServiceIPRange: net.IPNet{ // TODO: Decide if configurable / final value
- IP: net.IP{192, 168, 188, 0},
- Mask: net.IPMask{0xff, 0xff, 0xff, 0x00}, // /24, but Go stores as a literal mask
- },
- ClusterNet: net.IPNet{
- IP: net.IP{192, 168, 188, 0},
- Mask: net.IPMask{0xff, 0xff, 0xfd, 0x00}, // /22
- },
- }
- consensusKV := s.getKV()
- apiserverConfig, err := getPKIApiserverConfig(ctx, consensusKV, s.kpki)
- if err != nil {
- return fmt.Errorf("could not generate apiserver pki config: %w", err)
- }
- apiserverConfig.advertiseAddress = config.AdvertiseAddress
- apiserverConfig.serviceIPRange = config.ServiceIPRange
- controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, consensusKV, s.kpki)
- if err != nil {
- return fmt.Errorf("could not generate controller manager pki config: %w", err)
- }
- controllerManagerConfig.clusterNet = config.ClusterNet
- schedulerConfig, err := getPKISchedulerConfig(ctx, consensusKV, s.kpki)
- if err != nil {
- return fmt.Errorf("could not generate scheduler pki config: %w", err)
- }
-
- masterKubeconfig, err := s.kpki.Kubeconfig(ctx, pki.Master, consensusKV)
- if err != nil {
- return fmt.Errorf("could not generate master kubeconfig: %w", err)
- }
-
- rawClientConfig, err := clientcmd.NewClientConfigFromBytes(masterKubeconfig)
- if err != nil {
- return fmt.Errorf("could not generate kubernetes client config: %w", err)
- }
-
- clientConfig, err := rawClientConfig.ClientConfig()
- clientSet, err := kubernetes.NewForConfig(clientConfig)
- if err != nil {
- return fmt.Errorf("could not generate kubernetes client: %w", err)
- }
-
- informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
-
- hostname, err := os.Hostname()
- if err != nil {
- return fmt.Errorf("failed to get hostname: %w", err)
- }
- err = createKubeletConfig(ctx, consensusKV, s.kpki, hostname)
- if err != nil {
- return fmt.Errorf("could not created kubelet config: %w", err)
- }
-
- key, err := clusternet.EnsureOnDiskKey()
- if err != nil {
- return fmt.Errorf("failed to ensure cluster key: %w", err)
- }
-
- for _, sub := range []struct {
- name string
- runnable supervisor.Runnable
- }{
- {"apiserver", runAPIServer(*apiserverConfig, s.apiserverLogs)},
- {"controller-manager", runControllerManager(*controllerManagerConfig, s.controllerManagerLogs)},
- {"scheduler", runScheduler(*schedulerConfig, s.schedulerLogs)},
- {"kubelet", runKubelet(&KubeletSpec{}, s.kubeletLogs)},
- {"reconciler", reconciler.Run(clientSet)},
- {"csi-plugin", runCSIPlugin(s.storageService)},
- {"pv-provisioner", runCSIProvisioner(s.storageService, clientSet, informerFactory)},
- {"clusternet", clusternet.Run(informerFactory, clusterNet, clientSet, key)},
- } {
- err := supervisor.Run(ctx, sub.name, sub.runnable)
- if err != nil {
- return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
- }
- }
-
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- supervisor.Signal(ctx, supervisor.SignalDone)
- return nil
- }
+ return &apb.GetDebugKubeconfigResponse{DebugKubeconfig: string(debugKubeconfig)}, nil
}
diff --git a/core/internal/localstorage/declarative/placement_local.go b/core/internal/localstorage/declarative/placement_local.go
index 1ebdba5..82b6a71 100644
--- a/core/internal/localstorage/declarative/placement_local.go
+++ b/core/internal/localstorage/declarative/placement_local.go
@@ -20,6 +20,7 @@
"fmt"
"io/ioutil"
"os"
+ "sync"
"golang.org/x/sys/unix"
)
@@ -31,8 +32,9 @@
}
type FSPlacement struct {
- root *FSRoot
- path string
+ root *FSRoot
+ path string
+ writeLock sync.Mutex
}
func (f *FSPlacement) FullPath() string {
@@ -60,18 +62,17 @@
// Write performs an atomic file write, via a temporary file.
func (f *FSPlacement) Write(d []byte, mode os.FileMode) error {
- tmp, err := ioutil.TempFile("", "")
- if err != nil {
- return fmt.Errorf("temporary file creation failed: %w", err)
- }
- defer tmp.Close()
- defer os.Remove(tmp.Name())
- if _, err := tmp.Write(d); err != nil {
+ f.writeLock.Lock()
+ defer f.writeLock.Unlock()
+
+ // TODO(q3k): ensure that these do not collide with an existing sibling file, or generate this suffix randomly.
+ tmp := f.FullPath() + ".__smalltown_tmp"
+ defer os.Remove(tmp)
+ if err := ioutil.WriteFile(tmp, d, mode); err != nil {
return fmt.Errorf("temporary file write failed: %w", err)
}
- tmp.Close()
- if err := unix.Rename(tmp.Name(), f.FullPath()); err != nil {
+ if err := unix.Rename(tmp, f.FullPath()); err != nil {
return fmt.Errorf("renaming target file failed: %w", err)
}
diff --git a/core/internal/localstorage/directory_data.go b/core/internal/localstorage/directory_data.go
index ae842d9..60f030d 100644
--- a/core/internal/localstorage/directory_data.go
+++ b/core/internal/localstorage/directory_data.go
@@ -21,6 +21,8 @@
"os"
"os/exec"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
+
"golang.org/x/sys/unix"
"git.monogon.dev/source/nexantic.git/core/internal/localstorage/crypt"
@@ -114,6 +116,22 @@
return nil, fmt.Errorf("mounting: %w", err)
}
+ // TODO(q3k): do this automatically?
+ for _, d := range []declarative.DirectoryPlacement{
+ d.Etcd, d.Etcd.Data, d.Etcd.PeerPKI,
+ d.Containerd,
+ d.Kubernetes,
+ d.Kubernetes.Kubelet, d.Kubernetes.Kubelet.PKI, d.Kubernetes.Kubelet.Plugins, d.Kubernetes.Kubelet.PluginsRegistry,
+ d.Kubernetes.ClusterNetworking,
+ d.Node,
+ d.Volumes,
+ } {
+ err := d.MkdirAll(0700)
+ if err != nil {
+ return nil, fmt.Errorf("creating directory failed: %w", err)
+ }
+ }
+
if err := unlock.Write(localUnlockBlob, 0600); err != nil {
return nil, fmt.Errorf("writing unlock blob: %w", err)
}
diff --git a/core/internal/localstorage/directory_root.go b/core/internal/localstorage/directory_root.go
index 324350a..128cb49 100644
--- a/core/internal/localstorage/directory_root.go
+++ b/core/internal/localstorage/directory_root.go
@@ -24,6 +24,7 @@
"golang.org/x/sys/unix"
"git.monogon.dev/source/nexantic.git/core/internal/localstorage/crypt"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
)
func (r *Root) Start(ctx context.Context) error {
@@ -48,5 +49,35 @@
r.Data.canMount = true
+ if err := os.Mkdir(r.Tmp.FullPath(), 0777); err != nil {
+ return fmt.Errorf("making /tmp directory: %w", err)
+ }
+
+ if err := unix.Mount("tmpfs", r.Tmp.FullPath(), "tmpfs", unix.MS_NOEXEC|unix.MS_NODEV, ""); err != nil {
+ return fmt.Errorf("mounting /tmp: %w", err)
+ }
+
+ // TODO(q3k): do this automatically?
+ for _, d := range []declarative.DirectoryPlacement{
+ r.Etc,
+ r.Ephemeral,
+ r.Ephemeral.Consensus,
+ r.Ephemeral.Containerd, r.Ephemeral.Containerd.Tmp, r.Ephemeral.Containerd.RunSC, r.Ephemeral.Containerd.IPAM,
+ r.Ephemeral.FlexvolumePlugins,
+ } {
+ err := d.MkdirAll(0700)
+ if err != nil {
+ return fmt.Errorf("creating directory failed: %w", err)
+ }
+ }
+
+ for _, d := range []declarative.DirectoryPlacement{
+ r.Ephemeral, r.Ephemeral.Containerd, r.Ephemeral.Containerd.Tmp,
+ } {
+ if err := os.Chmod(d.FullPath(), 0755); err != nil {
+ return fmt.Errorf("failed to chmod containerd tmp path: %w", err)
+ }
+ }
+
return nil
}
diff --git a/core/internal/localstorage/storage.go b/core/internal/localstorage/storage.go
index 91153cf..fae92e4 100644
--- a/core/internal/localstorage/storage.go
+++ b/core/internal/localstorage/storage.go
@@ -39,10 +39,16 @@
type Root struct {
declarative.Directory
- ESP ESPDirectory `dir:"esp"`
- Data DataDirectory `dir:"data"`
- Etc EtcDirectory `dir:"etc"`
+ // UEFI ESP partition, mounted from plaintext storage.
+ ESP ESPDirectory `dir:"esp"`
+ // Persistent Data partition, mounted from encrypted and authenticated storage.
+ Data DataDirectory `dir:"data"`
+ // FHS-standard /etc directory, containes /etc/hosts, /etc/machine-id, and other compatibility files.
+ Etc EtcDirectory `dir:"etc"`
+ // Ephemeral data, used by runtime, stored in tmpfs. Things like sockets, temporary config files, etc.
Ephemeral EphemeralDirectory `dir:"ephemeral"`
+ // FHS-standard /tmp directory, used by ioutil.TempFile.
+ Tmp TmpDirectory `dir:"tmp"`
}
type PKIDirectory struct {
@@ -78,9 +84,11 @@
// mounted is set by DataDirectory when it is mounted. It ensures it's only mounted once.
mounted bool
- Etcd DataEtcdDirectory `dir:"etcd"`
- Node PKIDirectory `dir:"node_pki"`
- Volumes declarative.Directory `dir:"volumes"`
+ Containerd declarative.Directory `dir:"containerd"`
+ Etcd DataEtcdDirectory `dir:"etcd"`
+ Kubernetes DataKubernetesDirectory `dir:"kubernetes"`
+ Node PKIDirectory `dir:"node_pki"`
+ Volumes DataVolumesDirectory `dir:"volumes"`
}
type DataEtcdDirectory struct {
@@ -90,18 +98,64 @@
Data declarative.Directory `dir:"data"`
}
+type DataKubernetesDirectory struct {
+ declarative.Directory
+ ClusterNetworking DataKubernetesClusterNetworkingDirectory `dir:"clusternet"`
+ Kubelet DataKubernetesKubeletDirectory `dir:"kubelet"`
+}
+
+type DataKubernetesClusterNetworkingDirectory struct {
+ declarative.Directory
+ Key declarative.File `file:"private.key"`
+}
+
+type DataKubernetesKubeletDirectory struct {
+ declarative.Directory
+ Kubeconfig declarative.File `file:"kubeconfig"`
+ PKI PKIDirectory `dir:"pki"`
+
+ Plugins struct {
+ declarative.Directory
+ VFS declarative.File `file:"com.smalltown.vfs.sock"`
+ } `dir:"plugins"`
+
+ PluginsRegistry struct {
+ declarative.Directory
+ VFSReg declarative.File `file:"com.smalltown.vfs-reg.sock"`
+ } `dir:"plugins_registry"`
+}
+
+type DataVolumesDirectory struct {
+ declarative.Directory
+}
+
type EtcDirectory struct {
declarative.Directory
Hosts declarative.File `file:"hosts"`
- MachineID declarative.File `file:"machine_id"`
+ MachineID declarative.File `file:"machine-id"`
}
type EphemeralDirectory struct {
declarative.Directory
- Consensus EphemeralConsensusDirectory `dir:"consensus"`
+ Consensus EphemeralConsensusDirectory `dir:"consensus"`
+ Containerd EphemeralContainerdDirectory `dir:"containerd"`
+ FlexvolumePlugins declarative.Directory `dir:"flexvolume_plugins"`
}
type EphemeralConsensusDirectory struct {
declarative.Directory
ClientSocket declarative.File `file:"client.sock"`
}
+
+type EphemeralContainerdDirectory struct {
+ declarative.Directory
+ ClientSocket declarative.File `file:"client.sock"`
+ RunSCLogsFIFO declarative.File `file:"runsc-logs.fifo"`
+ Tmp declarative.Directory `dir:"tmp"`
+ RunSC declarative.Directory `dir:"runsc"`
+ IPAM declarative.Directory `dir:"ipam"`
+}
+
+type TmpDirectory struct {
+ declarative.Directory
+}
diff --git a/core/internal/storage/BUILD.bazel b/core/internal/storage/BUILD.bazel
deleted file mode 100644
index 545f9a9..0000000
--- a/core/internal/storage/BUILD.bazel
+++ /dev/null
@@ -1,21 +0,0 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
-
-go_library(
- name = "go_default_library",
- srcs = [
- "blockdev.go",
- "data.go",
- "find.go",
- "storage.go",
- ],
- importpath = "git.monogon.dev/source/nexantic.git/core/internal/storage",
- visibility = ["//:__subpackages__"],
- deps = [
- "//core/pkg/devicemapper:go_default_library",
- "//core/pkg/sysfs:go_default_library",
- "//core/pkg/tpm:go_default_library",
- "@com_github_rekby_gpt//:go_default_library",
- "@org_golang_x_sys//unix:go_default_library",
- "@org_uber_go_zap//:go_default_library",
- ],
-)
diff --git a/core/internal/storage/blockdev.go b/core/internal/storage/blockdev.go
deleted file mode 100644
index 660abe6..0000000
--- a/core/internal/storage/blockdev.go
+++ /dev/null
@@ -1,149 +0,0 @@
-// Copyright 2020 The Monogon Project Authors.
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package storage
-
-import (
- "encoding/binary"
- "encoding/hex"
- "fmt"
- "os"
- "syscall"
-
- "git.monogon.dev/source/nexantic.git/core/pkg/devicemapper"
-
- "golang.org/x/sys/unix"
-)
-
-func readDataSectors(path string) (uint64, error) {
- integrityPartition, err := os.Open(path)
- if err != nil {
- return 0, err
- }
- defer integrityPartition.Close()
- // Based on structure defined in
- // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/drivers/md/dm-integrity.c#n59
- if _, err := integrityPartition.Seek(16, 0); err != nil {
- return 0, err
- }
- var providedDataSectors uint64
- if err := binary.Read(integrityPartition, binary.LittleEndian, &providedDataSectors); err != nil {
- return 0, err
- }
- return providedDataSectors, nil
-}
-
-// MapEncryptedBlockDevice maps an encrypted device (node) at baseName to a
-// decrypted device at /dev/$name using the given encryptionKey
-func MapEncryptedBlockDevice(name string, baseName string, encryptionKey []byte) error {
- integritySectors, err := readDataSectors(baseName)
- if err != nil {
- return fmt.Errorf("failed to read the number of usable sectors on the integrity device: %w", err)
- }
-
- integrityDevName := fmt.Sprintf("/dev/%v-integrity", name)
- integrityDMName := fmt.Sprintf("%v-integrity", name)
- integrityDev, err := devicemapper.CreateActiveDevice(integrityDMName, []devicemapper.Target{
- devicemapper.Target{
- Length: integritySectors,
- Type: "integrity",
- Parameters: fmt.Sprintf("%v 0 28 J 1 journal_sectors:1024", baseName),
- },
- })
- if err != nil {
- return fmt.Errorf("failed to create Integrity device: %w", err)
- }
- if err := unix.Mknod(integrityDevName, 0600|unix.S_IFBLK, int(integrityDev)); err != nil {
- unix.Unlink(integrityDevName)
- devicemapper.RemoveDevice(integrityDMName)
- return fmt.Errorf("failed to create integrity device node: %w", err)
- }
-
- cryptDevName := fmt.Sprintf("/dev/%v", name)
- cryptDev, err := devicemapper.CreateActiveDevice(name, []devicemapper.Target{
- devicemapper.Target{
- Length: integritySectors,
- Type: "crypt",
- Parameters: fmt.Sprintf("capi:gcm(aes)-random %v 0 %v 0 1 integrity:28:aead", hex.EncodeToString(encryptionKey), integrityDevName),
- },
- })
- if err != nil {
- unix.Unlink(integrityDevName)
- devicemapper.RemoveDevice(integrityDMName)
- return fmt.Errorf("failed to create crypt device: %w", err)
- }
- if err := unix.Mknod(cryptDevName, 0600|unix.S_IFBLK, int(cryptDev)); err != nil {
- unix.Unlink(cryptDevName)
- devicemapper.RemoveDevice(name)
-
- unix.Unlink(integrityDevName)
- devicemapper.RemoveDevice(integrityDMName)
- return fmt.Errorf("failed to create crypt device node: %w", err)
- }
- return nil
-}
-
-// InitializeEncryptedBlockDevice initializes a new encrypted block device. This can take a long
-// time since all bytes on the mapped block device need to be zeroed.
-func InitializeEncryptedBlockDevice(name, baseName string, encryptionKey []byte) error {
- integrityPartition, err := os.OpenFile(baseName, os.O_WRONLY, 0)
- if err != nil {
- return err
- }
- defer integrityPartition.Close()
- zeroed512BBuf := make([]byte, 4096*128)
- if _, err := integrityPartition.Write(zeroed512BBuf); err != nil {
- return fmt.Errorf("failed to wipe header: %w", err)
- }
- integrityPartition.Close()
-
- integrityDMName := fmt.Sprintf("%v-integrity", name)
- _, err = devicemapper.CreateActiveDevice(integrityDMName, []devicemapper.Target{
- devicemapper.Target{
- Length: 1,
- Type: "integrity",
- Parameters: fmt.Sprintf("%v 0 28 J 1 journal_sectors:1024", baseName),
- },
- })
- if err != nil {
- return fmt.Errorf("failed to create discovery integrity device: %w", err)
- }
- if err := devicemapper.RemoveDevice(integrityDMName); err != nil {
- return fmt.Errorf("failed to remove discovery integrity device: %w", err)
- }
-
- if err := MapEncryptedBlockDevice(name, baseName, encryptionKey); err != nil {
- return err
- }
-
- blkdev, err := os.OpenFile(fmt.Sprintf("/dev/%v", name), unix.O_DIRECT|os.O_WRONLY|os.O_SYNC, 0000)
- if err != nil {
- return fmt.Errorf("failed to open new encrypted device for zeroing: %w", err)
- }
- defer blkdev.Close()
- blockSize, err := unix.IoctlGetUint32(int(blkdev.Fd()), unix.BLKSSZGET)
- zeroedBuf := make([]byte, blockSize*10000) // Make it faster
- for {
- _, err := blkdev.Write(zeroedBuf)
- if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
- break
- }
- if err != nil {
- return fmt.Errorf("failed to zero-initalize new encrypted device: %w", err)
- }
- }
- return nil
-}
diff --git a/core/internal/storage/data.go b/core/internal/storage/data.go
deleted file mode 100644
index 337aae0..0000000
--- a/core/internal/storage/data.go
+++ /dev/null
@@ -1,192 +0,0 @@
-// Copyright 2020 The Monogon Project Authors.
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package storage
-
-import (
- "fmt"
- "io/ioutil"
- "os"
- "os/exec"
- "path/filepath"
- "sync"
-
- "go.uber.org/zap"
- "golang.org/x/sys/unix"
-
- "git.monogon.dev/source/nexantic.git/core/pkg/tpm"
-)
-
-const (
- dataMountPath = "/data"
- espMountPath = "/esp"
- espDataPath = espMountPath + "/EFI/smalltown"
- etcdSealedKeyLocation = espDataPath + "/data-key.bin"
-)
-
-type Manager struct {
- logger *zap.Logger
- dataReady bool
- initializationError error
- mutex sync.RWMutex
-}
-
-func Initialize(logger *zap.Logger) (*Manager, error) {
- if err := FindPartitions(); err != nil {
- return nil, err
- }
-
- if err := os.Mkdir("/esp", 0755); err != nil {
- return nil, err
- }
-
- // We're mounting ESP sync for reliability, this lowers our chances of getting half-written files
- if err := unix.Mount(ESPDevicePath, espMountPath, "vfat", unix.MS_NOEXEC|unix.MS_NODEV|unix.MS_SYNC, ""); err != nil {
- return nil, err
- }
-
- manager := &Manager{
- logger: logger,
- dataReady: false,
- }
-
- manager.mutex.Lock()
- defer manager.mutex.Unlock()
-
- return manager, nil
-}
-
-var keySize uint16 = 256 / 8
-
-// MountData mounts the Smalltown data partition with the given global unlock key. It automatically
-// unseals the local unlock key from the TPM.
-func (s *Manager) MountData(globalUnlockKey []byte) error {
- localPath, err := s.GetPathInPlace(PlaceESP, "local_unlock.bin")
- if err != nil {
- return fmt.Errorf("failed to find ESP mount: %w", err)
- }
- localUnlockBlob, err := ioutil.ReadFile(localPath)
- if err != nil {
- return fmt.Errorf("failed to read local unlock file from ESP: %w", err)
- }
- localUnlockKey, err := tpm.Unseal(localUnlockBlob)
- if err != nil {
- return fmt.Errorf("failed to unseal local unlock key: %w", err)
- }
-
- key := make([]byte, keySize)
- for i := uint16(0); i < keySize; i++ {
- key[i] = localUnlockKey[i] ^ globalUnlockKey[i]
- }
-
- if err := MapEncryptedBlockDevice("data", SmalltownDataCryptPath, key); err != nil {
- return err
- }
- if err := s.mountData(); err != nil {
- return err
- }
- s.mutex.Lock()
- s.dataReady = true
- s.mutex.Unlock()
- s.logger.Info("Mounted encrypted storage")
- return nil
-}
-
-// InitializeData initializes the Smalltown data partition and returns the global unlock key. It seals
-// the local portion into the TPM and stores the blob on the ESP. This is a potentially slow
-// operation since it touches the whole partition.
-func (s *Manager) InitializeData() ([]byte, error) {
- localUnlockKey, err := tpm.GenerateSafeKey(keySize)
- if err != nil {
- return []byte{}, fmt.Errorf("failed to generate safe key: %w", err)
- }
- globalUnlockKey, err := tpm.GenerateSafeKey(keySize)
- if err != nil {
- return []byte{}, fmt.Errorf("failed to generate safe key: %w", err)
- }
-
- localUnlockBlob, err := tpm.Seal(localUnlockKey, tpm.SecureBootPCRs)
- if err != nil {
- return []byte{}, fmt.Errorf("failed to seal local unlock key: %w", err)
- }
-
- // The actual key is generated by XORing together the localUnlockKey and the globalUnlockKey
- // This provides us with a mathematical guarantee that the resulting key cannot be recovered
- // whithout knowledge of both parts.
- key := make([]byte, keySize)
- for i := uint16(0); i < keySize; i++ {
- key[i] = localUnlockKey[i] ^ globalUnlockKey[i]
- }
-
- if err := InitializeEncryptedBlockDevice("data", SmalltownDataCryptPath, key); err != nil {
- s.logger.Error("Failed to initialize encrypted block device", zap.Error(err))
- return []byte{}, fmt.Errorf("failed to initialize encrypted block device: %w", err)
- }
- mkfsCmd := exec.Command("/bin/mkfs.xfs", "-qf", "/dev/data")
- if _, err := mkfsCmd.Output(); err != nil {
- s.logger.Error("Failed to format encrypted block device", zap.Error(err))
- return []byte{}, fmt.Errorf("failed to format encrypted block device: %w", err)
- }
-
- if err := s.mountData(); err != nil {
- return []byte{}, err
- }
-
- s.mutex.Lock()
- s.dataReady = true
- s.mutex.Unlock()
-
- localPath, err := s.GetPathInPlace(PlaceESP, "local_unlock.bin")
- if err != nil {
- return []byte{}, fmt.Errorf("failed to find ESP mount: %w", err)
- }
- if err := ioutil.WriteFile(localPath, localUnlockBlob, 0600); err != nil {
- return []byte{}, fmt.Errorf("failed to write local unlock file to ESP: %w", err)
- }
-
- s.logger.Info("Initialized encrypted storage")
- return globalUnlockKey, nil
-}
-
-func (s *Manager) mountData() error {
- if err := os.Mkdir("/data", 0755); err != nil {
- return err
- }
-
- if err := unix.Mount("/dev/data", "/data", "xfs", unix.MS_NOEXEC|unix.MS_NODEV, "pquota"); err != nil {
- return err
- }
- return nil
-}
-
-// GetPathInPlace returns a path in the given place
-// It may return ErrNotInitialized if the place you're trying to access
-// is not initialized or ErrUnknownPlace if the place is not known
-func (s *Manager) GetPathInPlace(place DataPlace, path string) (string, error) {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- switch place {
- case PlaceESP:
- return filepath.Join(espDataPath, path), nil
- case PlaceData:
- if s.dataReady {
- return filepath.Join(dataMountPath, path), nil
- }
- return "", ErrNotInitialized
- default:
- return "", ErrUnknownPlace
- }
-}
diff --git a/core/internal/storage/find.go b/core/internal/storage/find.go
deleted file mode 100644
index 8d83510..0000000
--- a/core/internal/storage/find.go
+++ /dev/null
@@ -1,90 +0,0 @@
-// Copyright 2020 The Monogon Project Authors.
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package storage
-
-import (
- "fmt"
- "io/ioutil"
- "os"
- "path/filepath"
- "strconv"
-
- "git.monogon.dev/source/nexantic.git/core/pkg/sysfs"
-
- "github.com/rekby/gpt"
- "golang.org/x/sys/unix"
-)
-
-// EFIPartitionType is the standardized partition type value for the EFI ESP partition. The human readable GUID is C12A7328-F81F-11D2-BA4B-00A0C93EC93B.
-var EFIPartitionType = gpt.PartType{0x28, 0x73, 0x2a, 0xc1, 0x1f, 0xf8, 0xd2, 0x11, 0xba, 0x4b, 0x00, 0xa0, 0xc9, 0x3e, 0xc9, 0x3b}
-
-// SmalltownDataPartitionType is the partition type value for a Smalltown data partition. The human-readable GUID is 9eeec464-6885-414a-b278-4305c51f7966.
-var SmalltownDataPartitionType = gpt.PartType{0x64, 0xc4, 0xee, 0x9e, 0x85, 0x68, 0x4a, 0x41, 0xb2, 0x78, 0x43, 0x05, 0xc5, 0x1f, 0x79, 0x66}
-
-var ESPDevicePath = "/dev/esp"
-var SmalltownDataCryptPath = "/dev/data-crypt"
-
-// FindPartitions looks for the ESP and the Smalltown data partition and maps them to ESPDevicePath and
-// SmalltownDataCryptPath respectively. This doesn't fail if it doesn't find the partitions, only if
-// something goes catastrophically wrong.
-func FindPartitions() error {
- blockdevNames, err := ioutil.ReadDir("/sys/class/block")
- if err != nil {
- return fmt.Errorf("failed to read sysfs block class: %w", err)
- }
- for _, blockdevName := range blockdevNames {
- ueventData, err := sysfs.ReadUevents(filepath.Join("/sys/class/block", blockdevName.Name(), "uevent"))
- if err != nil {
- return fmt.Errorf("failed to read uevent for block device %v: %w", blockdevName.Name(), err)
- }
- if ueventData["DEVTYPE"] == "disk" {
- majorDev, err := strconv.Atoi(ueventData["MAJOR"])
- if err != nil {
- return fmt.Errorf("failed to convert uevent: %w", err)
- }
- devNodeName := fmt.Sprintf("/dev/%v", ueventData["DEVNAME"])
- blkdev, err := os.Open(devNodeName)
- if err != nil {
- return fmt.Errorf("failed to open block device %v: %w", devNodeName, err)
- }
- defer blkdev.Close()
- blockSize, err := unix.IoctlGetUint32(int(blkdev.Fd()), unix.BLKSSZGET)
- if err != nil {
- continue // This is not a regular block device
- }
- blkdev.Seek(int64(blockSize), 0)
- table, err := gpt.ReadTable(blkdev, uint64(blockSize))
- if err != nil {
- // Probably just not a GPT-partitioned disk
- continue
- }
- for partNumber, part := range table.Partitions {
- if part.Type == EFIPartitionType {
- if err := unix.Mknod(ESPDevicePath, 0600|unix.S_IFBLK, int(unix.Mkdev(uint32(majorDev), uint32(partNumber+1)))); err != nil {
- return fmt.Errorf("failed to create device node for ESP partition: %w", err)
- }
- }
- if part.Type == SmalltownDataPartitionType {
- if err := unix.Mknod(SmalltownDataCryptPath, 0600|unix.S_IFBLK, int(unix.Mkdev(uint32(majorDev), uint32(partNumber+1)))); err != nil {
- return fmt.Errorf("failed to create device node for Smalltown encrypted data partition: %w", err)
- }
- }
- }
- }
- }
- return nil
-}
diff --git a/core/internal/storage/storage.go b/core/internal/storage/storage.go
deleted file mode 100644
index d0743e8..0000000
--- a/core/internal/storage/storage.go
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright 2020 The Monogon Project Authors.
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package storage
-
-import "errors"
-
-type DataPlace uint32
-
-const (
- PlaceESP DataPlace = 0
- PlaceData = 1
-)
-
-var (
- // ErrNotInitialized will be returned when trying to access a place that's not yet initialized
- ErrNotInitialized = errors.New("this place is not initialized")
- // ErrUnknownPlace will be returned when trying to access a place that's not known
- ErrUnknownPlace = errors.New("this place is not known")
-)
-
-type StorageManager interface {
- GetPathInPlace(place DataPlace, path string) (string, error)
-}