m/n/kubernetes: use node clusternet to submit cluster networking routes
This completes the work on using the new cluster networking service from
Kubernetes, thereby allowing non-worker nodes to participate in cluster
networking.
Change-Id: I7f3759186d7c8cc49833be29963f82a1714d293e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1418
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 8aa4fc0..1818bf4 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -8,6 +8,7 @@
"value_clustermembership.go",
"value_kubernetes.go",
"value_node.go",
+ "worker_clusternet.go",
"worker_controlplane.go",
"worker_heartbeat.go",
"worker_kubernetes.go",
@@ -19,6 +20,7 @@
visibility = ["//visibility:public"],
deps = [
"//metropolis/node",
+ "//metropolis/node/core/clusternet",
"//metropolis/node/core/consensus",
"//metropolis/node/core/curator",
"//metropolis/node/core/curator/proto/api",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index a1969bf..86b5b5a 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -44,6 +44,7 @@
"crypto/ed25519"
common "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/clusternet"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
@@ -83,6 +84,7 @@
KubernetesStatus memory.Value[*KubernetesStatus]
bootstrapData memory.Value[*bootstrapData]
localRoles memory.Value[*cpb.NodeRoles]
+ podNetwork memory.Value[*clusternet.Prefixes]
controlPlane *workerControlPlane
statusPush *workerStatusPush
@@ -90,6 +92,7 @@
kubernetes *workerKubernetes
rolefetch *workerRoleFetch
nodeMgmt *workerNodeMgmt
+ clusternet *workerClusternet
}
// New creates a Role Server services from a Config.
@@ -126,6 +129,7 @@
clusterMembership: &s.ClusterMembership,
kubernetesStatus: &s.KubernetesStatus,
+ podNetwork: &s.podNetwork,
}
s.rolefetch = &workerRoleFetch{
@@ -138,6 +142,12 @@
clusterMembership: &s.ClusterMembership,
logTree: s.LogTree,
}
+ s.clusternet = &workerClusternet{
+ storageRoot: s.StorageRoot,
+
+ clusterMembership: &s.ClusterMembership,
+ podNetwork: &s.podNetwork,
+ }
return s
}
@@ -198,6 +208,7 @@
supervisor.Run(ctx, "heartbeat", s.heartbeat.run)
supervisor.Run(ctx, "rolefetch", s.rolefetch.run)
supervisor.Run(ctx, "nodemgmt", s.nodeMgmt.run)
+ supervisor.Run(ctx, "clusternet", s.clusternet.run)
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
diff --git a/metropolis/node/core/roleserve/worker_clusternet.go b/metropolis/node/core/roleserve/worker_clusternet.go
new file mode 100644
index 0000000..d80dea1
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_clusternet.go
@@ -0,0 +1,51 @@
+package roleserve
+
+import (
+ "context"
+ "net"
+
+ "source.monogon.dev/metropolis/node/core/clusternet"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+type workerClusternet struct {
+ storageRoot *localstorage.Root
+
+ // clusterMembership will be read.
+ clusterMembership *memory.Value[*ClusterMembership]
+ // podNetwork will be read.
+ podNetwork *memory.Value[*clusternet.Prefixes]
+}
+
+func (s *workerClusternet) run(ctx context.Context) error {
+ w := s.clusterMembership.Watch()
+ defer w.Close()
+ supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+ cm, err := w.Get(ctx, FilterHome())
+ if err != nil {
+ return err
+ }
+ supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+
+ conn, err := cm.DialCurator()
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+ cur := ipb.NewCuratorClient(conn)
+
+ svc := clusternet.Service{
+ Curator: cur,
+ ClusterNet: net.IPNet{
+ IP: []byte{10, 0, 0, 0},
+ Mask: net.IPMask{255, 255, 0, 0},
+ },
+ DataDirectory: &s.storageRoot.Data.Kubernetes.ClusterNetworking,
+ LocalKubernetesPodNetwork: s.podNetwork,
+ }
+ return svc.Run(ctx)
+}
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index f8e8438..da2aa9f 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -5,6 +5,7 @@
"fmt"
"net"
+ "source.monogon.dev/metropolis/node/core/clusternet"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/kubernetes"
@@ -32,6 +33,7 @@
localRoles *memory.Value[*cpb.NodeRoles]
clusterMembership *memory.Value[*ClusterMembership]
kubernetesStatus *memory.Value[*KubernetesStatus]
+ podNetwork *memory.Value[*clusternet.Prefixes]
}
// kubernetesStartup is used internally to provide a reduced (as in MapReduce
@@ -167,7 +169,6 @@
return fmt.Errorf("failed to start containerd service: %w", err)
}
-
controller := kubernetes.NewController(kubernetes.ConfigController{
Node: &d.membership.credentials.Node,
ServiceIPRange: serviceIPRange,
@@ -176,6 +177,7 @@
KPKI: pki,
Root: s.storageRoot,
Network: s.network,
+ PodNetwork: s.podNetwork,
})
// Start Kubernetes.
if err := supervisor.Run(ctx, "run", controller.Run); err != nil {