blob: af87c9e8944c7a97a130b35fd95c26ea2fb0bc06 [file] [log] [blame]
Serge Bazanski76003f82021-06-17 16:39:01 +02001package curator
2
3import (
4 "context"
5 "fmt"
6 "io/ioutil"
7 "os"
8 "testing"
9 "time"
10
11 "go.etcd.io/etcd/clientv3"
12 "go.etcd.io/etcd/integration"
13
14 "source.monogon.dev/metropolis/node/core/consensus/client"
Serge Bazanski3379a5d2021-09-09 12:56:40 +020015 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski76003f82021-06-17 16:39:01 +020016 "source.monogon.dev/metropolis/node/core/localstorage"
17 "source.monogon.dev/metropolis/node/core/localstorage/declarative"
Serge Bazanski3379a5d2021-09-09 12:56:40 +020018 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanski76003f82021-06-17 16:39:01 +020019 "source.monogon.dev/metropolis/pkg/supervisor"
20)
21
22var (
23 // cluster is a 3-member in-memory etcd cluster for testing.
24 cluster *integration.ClusterV3
25 // endpoints is a list of the three etcd members that make up the cluster above.
26 endpoints []string
27)
28
29// TestMain brings up a 3 node etcd cluster for tests to use.
30func TestMain(m *testing.M) {
31 cfg := integration.ClusterConfig{
32 Size: 3,
33 GRPCKeepAliveMinTime: time.Millisecond,
34 }
35 cluster = integration.NewClusterV3(nil, &cfg)
36 endpoints = make([]string, 3)
37 for i := range endpoints {
38 endpoints[i] = cluster.Client(i).Endpoints()[0]
39 }
40
41 v := m.Run()
42 cluster.Terminate(nil)
43 os.Exit(v)
44}
45
46// dut is the design under test harness - in this case, a curator instance.
47type dut struct {
48 // endpoint of the etcd server that this instance is connected to. Each instance
49 // connects to a different member of the etcd cluster so that we can easily
50 // inject partitions between curator instances.
51 endpoint string
52 // instance is the curator Service instance itself.
53 instance *Service
54
55 // temporary directory in which the Curator's ephemeral directory is placed.
56 // Needs to be cleaned up.
57 temporary string
58}
59
60func (d *dut) cleanup() {
61 os.RemoveAll(d.temporary)
62}
63
64// newDut creates a new dut harness for a curator instance, connected to a given
65// etcd endpoint.
Serge Bazanski3379a5d2021-09-09 12:56:40 +020066func newDut(ctx context.Context, t *testing.T, endpoint string, n *identity.NodeCredentials) *dut {
Serge Bazanski76003f82021-06-17 16:39:01 +020067 t.Helper()
68 // Create new etcd client to the given endpoint.
69 cli, err := clientv3.New(clientv3.Config{
70 Endpoints: []string{endpoint},
71 DialTimeout: 1 * time.Second,
72 DialKeepAliveTime: 1 * time.Second,
73 DialKeepAliveTimeout: 1 * time.Second,
74 Context: ctx,
75 })
76 if err != nil {
77 t.Fatalf("clientv3.New: %v", err)
78 }
79
80 // Create ephemeral directory for curator and place it into /tmp.
81 dir := localstorage.EphemeralCuratorDirectory{}
Serge Bazanskic1bf6aa2021-08-23 13:05:24 +020082 tmp, err := ioutil.TempDir("/tmp", "curator-test-*")
Serge Bazanski76003f82021-06-17 16:39:01 +020083 if err != nil {
84 t.Fatalf("TempDir: %v", err)
85 }
86 err = declarative.PlaceFS(&dir, tmp)
87 if err != nil {
88 t.Fatalf("PlaceFS: %v", err)
89 }
90
Serge Bazanski76003f82021-06-17 16:39:01 +020091 svc := New(Config{
Serge Bazanski3379a5d2021-09-09 12:56:40 +020092 Etcd: client.NewLocal(cli),
93 NodeCredentials: n,
94 LeaderTTL: time.Second,
95 Directory: &dir,
Serge Bazanski76003f82021-06-17 16:39:01 +020096 })
Serge Bazanski3379a5d2021-09-09 12:56:40 +020097 if err := supervisor.Run(ctx, n.ID(), svc.Run); err != nil {
98 t.Fatalf("Run %s: %v", n.ID(), err)
Serge Bazanski76003f82021-06-17 16:39:01 +020099 }
100 return &dut{
101 endpoint: endpoint,
102 instance: svc,
103 temporary: tmp,
104 }
105}
106
107// dutSet is a collection of duts keyed by endpoint to which they're connected.
108// Since each dut is connected to a different endpoint in these tests, the
109// endpoint is used as a unique identifier for each dut/instance.
110type dutSet map[string]*dut
111
112// dutUpdate is an update from a dut's Curator instance - either a new
113// electionStatus or an error while retrieving it.
114type dutUpdate struct {
115 // endpoint to which this dut's Curator instance is connected.
116 endpoint string
117 // status received from the dut's Curator instance, or nil if err is set.
118 status *electionStatus
119 err error
120}
121
122// dutSetStatus is a point-in-time snapshot of the electionStatus of Curator
123// instances, keyed by endpoints in the same way as dutSet.
124type dutSetStatus map[string]*electionStatus
125
126// leaders returns a list of endpoints that currently see themselves as leaders.
127func (d dutSetStatus) leaders() []string {
128 var res []string
129 for e, s := range d {
130 if s.leader != nil {
131 res = append(res, e)
132 }
133 }
134 return res
135}
136
137// followers returns a list of endpoints that currently see themselves as
138// followers.
139func (d dutSetStatus) followers() []string {
140 var res []string
141 for e, s := range d {
142 if s.follower != nil {
143 res = append(res, e)
144 }
145 }
146 return res
147}
148
149// wait blocks until the dutSetStatus of a given dutSet reaches some state (as
150// implemented by predicate f).
151func (s dutSet) wait(ctx context.Context, f func(s dutSetStatus) bool) (dutSetStatus, error) {
152 ctx2, ctxC := context.WithCancel(ctx)
153 defer ctxC()
154
155 // dss is the dutSetStatus which we will keep updating with the electionStatus
156 // of each dut's Curator as long as predicate f returns false.
157 dss := make(dutSetStatus)
158
159 // updC is a channel of updates from all dut's electionStatus watchers. The
160 // dutUpdate type contains the endpoint to distinguish the source of each
161 // update.
162 updC := make(chan dutUpdate)
163
164 // Run a watcher for each dut which sends that dut's newest available
165 // electionStatus (or error) to updC.
166 for e, d := range s {
167 w := d.instance.electionWatch()
168 go func(e string, w electionWatcher) {
169 defer w.Close()
170 for {
171 s, err := w.get(ctx2)
172 if err != nil {
173 updC <- dutUpdate{
174 endpoint: e,
175 err: err,
176 }
177 return
178 }
179 updC <- dutUpdate{
180 endpoint: e,
181 status: s,
182 }
183 }
184 }(e, w)
185 }
186
187 // Keep updating dss with updates from updC and call f on every change.
188 for {
189 select {
190 case <-ctx.Done():
191 return nil, ctx.Err()
192 case u := <-updC:
193 if u.err != nil {
194 return nil, fmt.Errorf("from %q: %w", u.endpoint, u.err)
195 }
196 dss[u.endpoint] = u.status
197 }
198
199 if f(dss) {
200 return dss, nil
201 }
202 }
203}
204
205// TestLeaderElectionStatus exercises the electionStatus watch/get functionality
206// from the Curator code. It spawns a cluster of three curators and ensures all
207// of them respond correctly to election, partitioning and subsequent
208// re-election.
209func TestLeaderElectionStatus(t *testing.T) {
210 ctx, ctxC := context.WithCancel(context.Background())
211 defer ctxC()
212
213 // Map from endpoint name to etcd member list index. Alongside with the
214 // endpoints list, this is used to quickly look up endpoint<->member_num. Since
215 // we only have one Curator instance per etcd member, we can use the instance's
216 // etcd endpoint as a unique key to identify it.
217 endpointToNum := map[string]int{
218 endpoints[0]: 0,
219 endpoints[1]: 1,
220 endpoints[2]: 2,
221 }
222
223 // Start a new supervisor in which we create all curator DUTs.
Serge Bazanski3379a5d2021-09-09 12:56:40 +0200224 ephemeral := rpc.NewEphemeralClusterCredentials(t, 3)
Serge Bazanski76003f82021-06-17 16:39:01 +0200225 dutC := make(chan *dut)
Serge Bazanski79fc1e92021-07-06 16:25:22 +0200226 supervisor.TestHarness(t, func(ctx context.Context) error {
Serge Bazanski3379a5d2021-09-09 12:56:40 +0200227 for e, n := range endpointToNum {
228 dutC <- newDut(ctx, t, e, ephemeral.Nodes[n])
Serge Bazanski76003f82021-06-17 16:39:01 +0200229 }
230 close(dutC)
231 supervisor.Signal(ctx, supervisor.SignalHealthy)
232 supervisor.Signal(ctx, supervisor.SignalDone)
233 return nil
234 })
235
236 // Build dutSet, ie. map from endpoint to Curator DUT.
237 duts := make(dutSet)
238 for d := range dutC {
239 duts[d.endpoint] = d
240 }
241 // Schedule cleanup for all DUTs.
242 defer func() {
243 for _, dut := range duts {
244 dut.cleanup()
245 }
246 }()
247
248 // Wait until we have a Curator leader.
249 dss, err := duts.wait(ctx, func(dss dutSetStatus) bool {
250 return len(dss.leaders()) == 1 && len(dss.followers()) == 2
251 })
252 if err != nil {
253 t.Fatalf("waiting for dut set: %v", err)
254 }
255 leaderEndpoint := dss.leaders()[0]
256
257 // Retrieve key and rev from Curator's leader. We will later test to ensure
258 // these have changed when we switch to another leader and back.
259 key := dss[leaderEndpoint].leader.lockKey
260 rev := dss[leaderEndpoint].leader.lockRev
Serge Bazanski3379a5d2021-09-09 12:56:40 +0200261 leaderNodeID := duts[leaderEndpoint].instance.config.NodeCredentials.ID()
Serge Bazanski76003f82021-06-17 16:39:01 +0200262 leaderNum := endpointToNum[leaderEndpoint]
263
264 // Ensure the leader/follower data in the electionStatus are as expected.
265 for endpoint, status := range dss {
266 if endpoint == leaderEndpoint {
267 // The leader instance should not also be a follower.
268 if status.follower != nil {
269 t.Errorf("leader cannot also be a follower")
270 }
271 } else {
272 // The follower instances should also not be leaders.
273 if status.leader != nil {
274 t.Errorf("instance %q is leader", endpoint)
275 }
276 follower := status.follower
277 if follower == nil {
278 t.Errorf("instance %q is not a follower", endpoint)
279 continue
280 }
281 // The follower instances should point to the leader in their seen lock.
282 if want, got := leaderNodeID, follower.lock.NodeId; want != got {
283 t.Errorf("instance %q sees node id %q as follower, wanted %q", endpoint, want, got)
284 }
285 }
286 }
287
288 // Partition off leader's etcd instance from other instances.
289 for n, member := range cluster.Members {
290 if n == leaderNum {
291 continue
292 }
293 cluster.Members[leaderNum].InjectPartition(t, member)
294 }
295
296 // Wait until we switch leaders
297 dss, err = duts.wait(ctx, func(dss dutSetStatus) bool {
298 // Ensure we've lost leadership on the initial leader.
299 if i, ok := dss[leaderEndpoint]; ok && i.leader != nil {
300 return false
301 }
302 return len(dss.leaders()) == 1 && len(dss.followers()) == 1
303 })
304 if err != nil {
305 t.Fatalf("waiting for dut set: %v", err)
306 }
307 newLeaderEndpoint := dss.leaders()[0]
308
309 // Ensure the old instance is neither leader nor follower (signaling loss of
310 // quorum).
311 if want, got := false, dss[leaderEndpoint].leader != nil; want != got {
312 t.Errorf("old leader's leadership is %v, wanted %v", want, got)
313 }
314 if want, got := false, dss[leaderEndpoint].follower != nil; want != got {
315 t.Errorf("old leader's followership is %v, wanted %v", want, got)
316 }
317
318 // Get new leader's key and rev.
319 newKey := dss[newLeaderEndpoint].leader.lockKey
320 newRev := dss[newLeaderEndpoint].leader.lockRev
Serge Bazanski3379a5d2021-09-09 12:56:40 +0200321 newLeaderNodeID := duts[newLeaderEndpoint].instance.config.NodeCredentials.ID()
Serge Bazanski76003f82021-06-17 16:39:01 +0200322
323 if leaderEndpoint == newLeaderEndpoint {
324 t.Errorf("leader endpoint didn't change (%q -> %q)", leaderEndpoint, newLeaderEndpoint)
325 }
326 if key == newKey {
327 t.Errorf("leader election key didn't change (%q -> %q)", key, newKey)
328 }
329 if rev == newRev {
330 t.Errorf("leader election rev didn't change (%d -> %d)", rev, newRev)
331 }
332
333 // Ensure the last node of the cluster (not the current leader and not the
334 // previous leader) is now a follower pointing at the new leader.
335 for endpoint, status := range dss {
336 if endpoint == leaderEndpoint || endpoint == newLeaderEndpoint {
337 continue
338 }
339 follower := status.follower
340 if follower == nil {
341 t.Errorf("instance %q is not a follower", endpoint)
342 continue
343 }
344 if want, got := newLeaderNodeID, follower.lock.NodeId; want != got {
345 t.Errorf("instance %q sees node id %q as follower, wanted %q", endpoint, want, got)
346 }
347 }
348}