blob: bd627a284e62486a6034b3eb8402c900e33f8437 [file] [log] [blame]
Jan Schärd20ddcc2024-05-08 14:18:29 +02001package reconciler
2
3import (
4 "context"
5 "testing"
6 "time"
7
8 "go.etcd.io/etcd/tests/v3/integration"
9 "google.golang.org/protobuf/proto"
10 "k8s.io/client-go/kubernetes/fake"
11
12 "source.monogon.dev/metropolis/node/core/consensus/client"
13 "source.monogon.dev/metropolis/node/core/curator"
14 ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
15 "source.monogon.dev/metropolis/pkg/supervisor"
16 cpb "source.monogon.dev/metropolis/proto/common"
17 mversion "source.monogon.dev/metropolis/version"
18 "source.monogon.dev/version"
19 vpb "source.monogon.dev/version/spec"
20)
21
22// TestMinimumReleasesNotAboveMetropolisRelease tests that minimum releases
23// are not above the metropolis release itself, because that would cause
24// things to get stuck.
25func TestMinimumReleasesNotAboveMetropolisRelease(t *testing.T) {
26 if version.ReleaseLessThan(mversion.Version.Release, minReconcilerRelease) {
27 t.Errorf("Metropolis release %s is below the minimum reconciler release %s",
28 version.Semver(mversion.Version),
29 version.Release(minReconcilerRelease),
30 )
31 }
32 if version.ReleaseLessThan(mversion.Version.Release, minApiserverRelease) {
33 t.Errorf("Metropolis release %s is below the minimum apiserver release %s",
34 version.Semver(mversion.Version),
35 version.Release(minApiserverRelease),
36 )
37 }
38}
39
40// startEtcd creates an etcd cluster and client for testing.
41func startEtcd(t *testing.T) client.Namespaced {
42 t.Helper()
43 // Start a single-node etcd cluster.
44 integration.BeforeTestExternal(t)
45 cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
46 t.Cleanup(func() {
47 cluster.Terminate(t)
48 })
49 // Create etcd client to test cluster.
50 curEtcd, _ := client.NewLocal(cluster.Client(0)).Sub("curator")
51 return curEtcd
52}
53
54func setStatus(t *testing.T, cl client.Namespaced, status *ppb.KubernetesReconcilerStatus) {
55 t.Helper()
56 ctx := context.Background()
57
58 statusBytes, err := proto.Marshal(status)
59 if err != nil {
60 t.Fatalf("Failed to marshal status: %v", err)
61 }
62
63 _, err = cl.Put(ctx, statusKey, string(statusBytes))
64 if err != nil {
65 t.Fatalf("Put: %v", err)
66 }
67}
68
69func makeNode(isController bool, release *vpb.Version_Release) *ppb.Node {
70 node := &ppb.Node{
71 Roles: &cpb.NodeRoles{},
72 Status: &cpb.NodeStatus{
73 Version: &vpb.Version{Release: release},
74 },
75 }
76 if isController {
77 node.Roles.KubernetesController = &cpb.NodeRoles_KubernetesController{}
78 }
79 return node
80}
81
82// putNode puts the node into etcd, or deletes if nil.
83// It returns the etcd revision of the operation.
84func putNode(t *testing.T, cl client.Namespaced, id string, node *ppb.Node) int64 {
85 t.Helper()
86 ctx := context.Background()
87
88 nkey, err := curator.NodeEtcdPrefix.Key(id)
89 if err != nil {
90 t.Fatal(err)
91 }
92 if node != nil {
93 nodeBytes, err := proto.Marshal(node)
94 if err != nil {
95 t.Fatalf("Failed to marshal node: %v", err)
96 }
97 resp, err := cl.Put(ctx, nkey, string(nodeBytes))
98 if err != nil {
99 t.Fatalf("Put: %v", err)
100 }
101 return resp.Header.Revision
102 } else {
103 resp, err := cl.Delete(ctx, nkey)
104 if err != nil {
105 t.Fatalf("Delete: %v", err)
106 }
107 return resp.Header.Revision
108 }
109}
110
111// TestWaitReady tests that WaitReady does not return too early, and the test
112// will time out if WaitReady fails to return when it is supposed to.
113func TestWaitReady(t *testing.T) {
114 cl := startEtcd(t)
115
116 isReady := make(chan struct{})
117 supervisor.TestHarness(t, func(ctx context.Context) error {
118 err := WaitReady(ctx, cl)
119 if err != nil {
120 t.Error(err)
121 }
122 close(isReady)
123 supervisor.Signal(ctx, supervisor.SignalHealthy)
124 supervisor.Signal(ctx, supervisor.SignalDone)
125 return nil
126 })
127
128 // status does not exist.
129 time.Sleep(10 * time.Millisecond)
130
131 // Version is too old.
132 setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
133 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
134 Version: &vpb.Version{
135 Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 0},
136 },
137 MinimumCompatibleRelease: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 0},
138 })
139 time.Sleep(10 * time.Millisecond)
140
141 // MinimumCompatibleRelease is too new.
142 setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
143 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
144 Version: &vpb.Version{
145 Release: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
146 },
147 MinimumCompatibleRelease: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
148 })
149 time.Sleep(10 * time.Millisecond)
150
151 select {
152 case <-isReady:
153 t.Fatal("WaitReady returned too early.")
154 default:
155 }
156
157 // Now set the status to something compatible.
158 setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
159 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
160 Version: &vpb.Version{
161 Release: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
162 },
163 MinimumCompatibleRelease: mversion.Version.Release,
164 })
165
166 <-isReady
167}
168
169// TestWatchNodes ensures that WatchNodes always updates releases correctly
170// as nodes are changed in various ways.
171func TestWatchNodes(t *testing.T) {
172 ctx := context.Background()
173 cl := startEtcd(t)
174 s := Service{
175 Etcd: cl,
176 }
177 w := s.releases.Watch()
178 defer w.Close()
179
180 expectReleases := func(expectMin, expectMax string, expectRev int64) {
181 t.Helper()
182 releases, err := w.Get(ctx)
183 if err != nil {
184 t.Fatal(err)
185 }
186 if actualMin := version.Release(releases.minRelease); actualMin != expectMin {
187 t.Fatalf("Expected minimum release %s, got %s", expectMin, actualMin)
188 }
189 if actualMax := version.Release(releases.maxRelease); actualMax != expectMax {
190 t.Fatalf("Expected maximum release %s, got %s", expectMax, actualMax)
191 }
192 if releases.revision != expectRev {
193 t.Fatalf("Expected revision %v, got %v", expectRev, releases.revision)
194 }
195 }
196
197 putNode(t, cl, "a1", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
198 putNode(t, cl, "a2", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
199 putNode(t, cl, "a3", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
200 putNode(t, cl, "b", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 3}))
201 rev := putNode(t, cl, "c", makeNode(true, &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0}))
202
203 supervisor.TestHarness(t, s.watchNodes)
204 expectReleases("0.0.2", "10000.0.0", rev)
205 // Node a1 is no longer a Kubernetes controller.
206 rev = putNode(t, cl, "a1", makeNode(false, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
207 expectReleases("0.0.2", "10000.0.0", rev)
208 // Node a2 is deleted.
209 rev = putNode(t, cl, "a2", nil)
210 expectReleases("0.0.2", "10000.0.0", rev)
211 // Node a3 changes release. Now, the minimum should change.
212 rev = putNode(t, cl, "a3", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 4}))
213 expectReleases("0.0.3", "10000.0.0", rev)
214}
215
216// TestService tests the entire service, checking that it reconciles
217// only in situations where it should.
218func TestService(t *testing.T) {
219 reconcileWait = 10 * time.Millisecond
220 cl := startEtcd(t)
221 clientset := fake.NewSimpleClientset()
222 s := Service{
223 Etcd: cl,
224 ClientSet: clientset,
225 NodeID: "testnode",
226 }
227
228 // This node is newer than the local node, election should not start.
229 putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0}))
230
231 cancelService, _ := supervisor.TestHarness(t, s.Run)
232
233 time.Sleep(50 * time.Millisecond)
234 if len(clientset.Actions()) != 0 {
235 t.Fatal("Actions shouldn't have been performed yet.")
236 }
237
238 // The status allows a too old node to start.
239 setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
240 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
241 Version: &vpb.Version{
242 Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
243 },
244 MinimumCompatibleRelease: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
245 })
246
247 // This node is too old, before minApiserverRelease.
248 putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
249
250 // watch-releases restarts with 500 ms backoff + randomization, so wait 1s.
251 time.Sleep(time.Second)
252 if len(clientset.Actions()) != 0 {
253 t.Fatal("Actions shouldn't have been performed yet.")
254 }
255
256 // Upgrade the node.
257 putNode(t, cl, "a", makeNode(true, minApiserverRelease))
258
259 // Wait for status to be set.
260 waitForActions := func() {
261 isReady := make(chan struct{})
262 supervisor.TestHarness(t, func(ctx context.Context) error {
263 err := WaitReady(ctx, cl)
264 if err != nil {
265 t.Error(err)
266 }
267 close(isReady)
268 supervisor.Signal(ctx, supervisor.SignalHealthy)
269 supervisor.Signal(ctx, supervisor.SignalDone)
270 return nil
271 })
272 <-isReady
273
274 if len(clientset.Actions()) == 0 {
275 t.Fatal("Actions should have been performed.")
276 }
277 clientset.ClearActions()
278 }
279 waitForActions()
280
281 // The status does not allow a too old node to start.
282 setStatus(t, cl, &ppb.KubernetesReconcilerStatus{
283 State: ppb.KubernetesReconcilerStatus_STATE_DONE,
284 Version: &vpb.Version{
285 Release: &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2},
286 },
287 MinimumCompatibleRelease: &vpb.Version_Release{Major: 10000, Minor: 0, Patch: 0},
288 })
289
290 // This node is too old, before minApiserverRelease. But because it is not
291 // allowed to start, the reconciler is not blocked.
292 putNode(t, cl, "a", makeNode(true, &vpb.Version_Release{Major: 0, Minor: 0, Patch: 2}))
293
294 // Start another instance. The old node is still leader.
295 supervisor.TestHarness(t, s.Run)
296
297 time.Sleep(50 * time.Millisecond)
298 if len(clientset.Actions()) != 0 {
299 t.Fatal("Actions shouldn't have been performed yet.")
300 }
301
302 // Stop the first instance. Now the second instance should get elected.
303 cancelService()
304 waitForActions()
305}