blob: f9d839fce2f3fae3dd63b1ef2d471946acb092e7 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Jan Schärd20ddcc2024-05-08 14:18:29 +02004package reconciler
5
6import (
7 "context"
8 "testing"
9 "time"
10
11 "go.etcd.io/etcd/tests/v3/integration"
12 "google.golang.org/protobuf/proto"
13 "k8s.io/client-go/kubernetes/fake"
14
15 "source.monogon.dev/metropolis/node/core/consensus/client"
16 "source.monogon.dev/metropolis/node/core/curator"
17 ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
Jan Schärd20ddcc2024-05-08 14:18:29 +020018 cpb "source.monogon.dev/metropolis/proto/common"
19 mversion "source.monogon.dev/metropolis/version"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020020 "source.monogon.dev/osbase/supervisor"
Jan Schärd20ddcc2024-05-08 14:18:29 +020021 "source.monogon.dev/version"
22 vpb "source.monogon.dev/version/spec"
23)
24
25// TestMinimumReleasesNotAboveMetropolisRelease tests that minimum releases
26// are not above the metropolis release itself, because that would cause
27// things to get stuck.
28func TestMinimumReleasesNotAboveMetropolisRelease(t *testing.T) {
29 if version.ReleaseLessThan(mversion.Version.Release, minReconcilerRelease) {
30 t.Errorf("Metropolis release %s is below the minimum reconciler release %s",
31 version.Semver(mversion.Version),
32 version.Release(minReconcilerRelease),
33 )
34 }
35 if version.ReleaseLessThan(mversion.Version.Release, minApiserverRelease) {
36 t.Errorf("Metropolis release %s is below the minimum apiserver release %s",
37 version.Semver(mversion.Version),
38 version.Release(minApiserverRelease),
39 )
40 }
41}
42
43// startEtcd creates an etcd cluster and client for testing.
44func startEtcd(t *testing.T) client.Namespaced {
45 t.Helper()
46 // Start a single-node etcd cluster.
47 integration.BeforeTestExternal(t)
48 cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
49 t.Cleanup(func() {
50 cluster.Terminate(t)
51 })
52 // Create etcd client to test cluster.
53 curEtcd, _ := client.NewLocal(cluster.Client(0)).Sub("curator")
54 return curEtcd
55}
56
57func setStatus(t *testing.T, cl client.Namespaced, status *ppb.KubernetesReconcilerStatus) {
58 t.Helper()
59 ctx := context.Background()
60
61 statusBytes, err := proto.Marshal(status)
62 if err != nil {
63 t.Fatalf("Failed to marshal status: %v", err)
64 }
65
66 _, err = cl.Put(ctx, statusKey, string(statusBytes))
67 if err != nil {
68 t.Fatalf("Put: %v", err)
69 }
70}
71
72func makeNode(isController bool, release *vpb.Version_Release) *ppb.Node {
73 node := &ppb.Node{
74 Roles: &cpb.NodeRoles{},
75 Status: &cpb.NodeStatus{
76 Version: &vpb.Version{Release: release},
77 },
78 }
79 if isController {
80 node.Roles.KubernetesController = &cpb.NodeRoles_KubernetesController{}
81 }
82 return node
83}
84
85// putNode puts the node into etcd, or deletes if nil.
86// It returns the etcd revision of the operation.
87func putNode(t *testing.T, cl client.Namespaced, id string, node *ppb.Node) int64 {
88 t.Helper()
89 ctx := context.Background()
90
91 nkey, err := curator.NodeEtcdPrefix.Key(id)
92 if err != nil {
93 t.Fatal(err)
94 }
95 if node != nil {
96 nodeBytes, err := proto.Marshal(node)
97 if err != nil {
98 t.Fatalf("Failed to marshal node: %v", err)
99 }
100 resp, err := cl.Put(ctx, nkey, string(nodeBytes))
101 if err != nil {
102 t.Fatalf("Put: %v", err)
103 }
104 return resp.Header.Revision
105 } else {
106 resp, err := cl.Delete(ctx, nkey)
107 if err != nil {
108 t.Fatalf("Delete: %v", err)
109 }
110 return resp.Header.Revision
111 }
112}
113
114// TestWaitReady tests that WaitReady does not return too early, and the test
115// will time out if WaitReady fails to return when it is supposed to.
116func TestWaitReady(t *testing.T) {
117 cl := startEtcd(t)
118
119 isReady := make(chan struct{})
120 supervisor.TestHarness(t, func(ctx context.Context) error {
121 err := WaitReady(ctx, cl)
122 if err != nil {
123 t.Error(err)
124 }
125 close(isReady)
126 supervisor.Signal(ctx, supervisor.SignalHealthy)
127 supervisor.Signal(ctx, supervisor.SignalDone)
128 return nil
129 })
130
131 // status does not exist.
132 time.Sleep(10 * time.Millisecond)
133
134 // Version is too old.
135 setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
136 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
137 Version: &vpb.Version{
138 Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 0},
139 },
140 MinimumCompatibleRelease: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 0},
141 })
142 time.Sleep(10 * time.Millisecond)
143
144 // MinimumCompatibleRelease is too new.
145 setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
146 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
147 Version: &vpb.Version{
148 Release: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
149 },
150 MinimumCompatibleRelease: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
151 })
152 time.Sleep(10 * time.Millisecond)
153
154 select {
155 case <-isReady:
156 t.Fatal("WaitReady returned too early.")
157 default:
158 }
159
160 // Now set the status to something compatible.
161 setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
162 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
163 Version: &vpb.Version{
164 Release: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
165 },
166 MinimumCompatibleRelease: mversion.Version.Release,
167 })
168
169 <-isReady
170}
171
172// TestWatchNodes ensures that WatchNodes always updates releases correctly
173// as nodes are changed in various ways.
174func TestWatchNodes(t *testing.T) {
175 ctx := context.Background()
176 cl := startEtcd(t)
177 s := Service{
178 Etcd: cl,
179 }
180 w := s.releases.Watch()
181 defer w.Close()
182
183 expectReleases := func(expectMin, expectMax string, expectRev int64) {
184 t.Helper()
185 releases, err := w.Get(ctx)
186 if err != nil {
187 t.Fatal(err)
188 }
189 if actualMin := version.Release(releases.minRelease); actualMin != expectMin {
190 t.Fatalf("Expected minimum release %s, got %s", expectMin, actualMin)
191 }
192 if actualMax := version.Release(releases.maxRelease); actualMax != expectMax {
193 t.Fatalf("Expected maximum release %s, got %s", expectMax, actualMax)
194 }
195 if releases.revision != expectRev {
196 t.Fatalf("Expected revision %v, got %v", expectRev, releases.revision)
197 }
198 }
199
200 putNode(t, cl, "a1", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
201 putNode(t, cl, "a2", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
202 putNode(t, cl, "a3", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
203 putNode(t, cl, "b", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 3}))
204 rev := putNode(t, cl, "c", makeNode(true, &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0}))
205
206 supervisor.TestHarness(t, s.watchNodes)
207 expectReleases("0.0.2", "10000.0.0", rev)
208 // Node a1 is no longer a Kubernetes controller.
209 rev = putNode(t, cl, "a1", makeNode(false, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
210 expectReleases("0.0.2", "10000.0.0", rev)
211 // Node a2 is deleted.
212 rev = putNode(t, cl, "a2", nil)
213 expectReleases("0.0.2", "10000.0.0", rev)
214 // Node a3 changes release. Now, the minimum should change.
215 rev = putNode(t, cl, "a3", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 4}))
216 expectReleases("0.0.3", "10000.0.0", rev)
217}
218
219// TestService tests the entire service, checking that it reconciles
220// only in situations where it should.
221func TestService(t *testing.T) {
222 reconcileWait = 10 * time.Millisecond
223 cl := startEtcd(t)
224 clientset := fake.NewSimpleClientset()
225 s := Service{
226 Etcd: cl,
227 ClientSet: clientset,
228 NodeID: "testnode",
229 }
230
231 // This node is newer than the local node, election should not start.
232 putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0}))
233
234 cancelService, _ := supervisor.TestHarness(t, s.Run)
235
236 time.Sleep(50 * time.Millisecond)
237 if len(clientset.Actions()) != 0 {
238 t.Fatal("Actions shouldn't have been performed yet.")
239 }
240
241 // The status allows a too old node to start.
242 setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
243 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
244 Version: &vpb.Version{
245 Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
246 },
247 MinimumCompatibleRelease: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
248 })
249
250 // This node is too old, before minApiserverRelease.
251 putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
252
253 // watch-releases restarts with 500 ms backoff + randomization, so wait 1s.
254 time.Sleep(time.Second)
255 if len(clientset.Actions()) != 0 {
256 t.Fatal("Actions shouldn't have been performed yet.")
257 }
258
259 // Upgrade the node.
260 putNode(t, cl, "a", makeNode(true, minApiserverRelease))
261
262 // Wait for status to be set.
263 waitForActions := func() {
264 isReady := make(chan struct{})
265 supervisor.TestHarness(t, func(ctx context.Context) error {
266 err := WaitReady(ctx, cl)
267 if err != nil {
268 t.Error(err)
269 }
270 close(isReady)
271 supervisor.Signal(ctx, supervisor.SignalHealthy)
272 supervisor.Signal(ctx, supervisor.SignalDone)
273 return nil
274 })
275 <-isReady
276
277 if len(clientset.Actions()) == 0 {
278 t.Fatal("Actions should have been performed.")
279 }
280 clientset.ClearActions()
281 }
282 waitForActions()
283
284 // The status does not allow a too old node to start.
285 setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
286 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
287 Version: &vpb.Version{
288 Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
289 },
290 MinimumCompatibleRelease: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
291 })
292
293 // This node is too old, before minApiserverRelease. But because it is not
294 // allowed to start, the reconciler is not blocked.
295 putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
296
297 // Start another instance. The old node is still leader.
298 supervisor.TestHarness(t, s.Run)
299
300 time.Sleep(50 * time.Millisecond)
301 if len(clientset.Actions()) != 0 {
302 t.Fatal("Actions shouldn't have been performed yet.")
303 }
304
305 // Stop the first instance. Now the second instance should get elected.
306 cancelService()
307 waitForActions()
308}