| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 1 | package curator |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "fmt" |
| 6 | "io/ioutil" |
| 7 | "os" |
| 8 | "testing" |
| 9 | "time" |
| 10 | |
| 11 | "go.etcd.io/etcd/clientv3" |
| 12 | "go.etcd.io/etcd/integration" |
| 13 | |
| 14 | "source.monogon.dev/metropolis/node/core/consensus/client" |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 15 | "source.monogon.dev/metropolis/node/core/identity" |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 16 | "source.monogon.dev/metropolis/node/core/localstorage" |
| 17 | "source.monogon.dev/metropolis/node/core/localstorage/declarative" |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 18 | "source.monogon.dev/metropolis/node/core/rpc" |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 19 | "source.monogon.dev/metropolis/pkg/supervisor" |
| 20 | ) |
| 21 | |
| 22 | var ( |
| 23 | // cluster is a 3-member in-memory etcd cluster for testing. |
| 24 | cluster *integration.ClusterV3 |
| 25 | // endpoints is a list of the three etcd members that make up the cluster above. |
| 26 | endpoints []string |
| 27 | ) |
| 28 | |
| 29 | // TestMain brings up a 3 node etcd cluster for tests to use. |
| 30 | func TestMain(m *testing.M) { |
| 31 | cfg := integration.ClusterConfig{ |
| 32 | Size: 3, |
| 33 | GRPCKeepAliveMinTime: time.Millisecond, |
| 34 | } |
| 35 | cluster = integration.NewClusterV3(nil, &cfg) |
| 36 | endpoints = make([]string, 3) |
| 37 | for i := range endpoints { |
| 38 | endpoints[i] = cluster.Client(i).Endpoints()[0] |
| 39 | } |
| 40 | |
| 41 | v := m.Run() |
| 42 | cluster.Terminate(nil) |
| 43 | os.Exit(v) |
| 44 | } |
| 45 | |
| 46 | // dut is the design under test harness - in this case, a curator instance. |
| 47 | type dut struct { |
| 48 | // endpoint of the etcd server that this instance is connected to. Each instance |
| 49 | // connects to a different member of the etcd cluster so that we can easily |
| 50 | // inject partitions between curator instances. |
| 51 | endpoint string |
| 52 | // instance is the curator Service instance itself. |
| 53 | instance *Service |
| 54 | |
| 55 | // temporary directory in which the Curator's ephemeral directory is placed. |
| 56 | // Needs to be cleaned up. |
| 57 | temporary string |
| 58 | } |
| 59 | |
| 60 | func (d *dut) cleanup() { |
| 61 | os.RemoveAll(d.temporary) |
| 62 | } |
| 63 | |
| 64 | // newDut creates a new dut harness for a curator instance, connected to a given |
| 65 | // etcd endpoint. |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 66 | func newDut(ctx context.Context, t *testing.T, endpoint string, n *identity.NodeCredentials) *dut { |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 67 | t.Helper() |
| 68 | // Create new etcd client to the given endpoint. |
| 69 | cli, err := clientv3.New(clientv3.Config{ |
| 70 | Endpoints: []string{endpoint}, |
| 71 | DialTimeout: 1 * time.Second, |
| 72 | DialKeepAliveTime: 1 * time.Second, |
| 73 | DialKeepAliveTimeout: 1 * time.Second, |
| 74 | Context: ctx, |
| 75 | }) |
| 76 | if err != nil { |
| 77 | t.Fatalf("clientv3.New: %v", err) |
| 78 | } |
| 79 | |
| 80 | // Create ephemeral directory for curator and place it into /tmp. |
| 81 | dir := localstorage.EphemeralCuratorDirectory{} |
| Serge Bazanski | c1bf6aa | 2021-08-23 13:05:24 +0200 | [diff] [blame] | 82 | tmp, err := ioutil.TempDir("/tmp", "curator-test-*") |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 83 | if err != nil { |
| 84 | t.Fatalf("TempDir: %v", err) |
| 85 | } |
| 86 | err = declarative.PlaceFS(&dir, tmp) |
| 87 | if err != nil { |
| 88 | t.Fatalf("PlaceFS: %v", err) |
| 89 | } |
| 90 | |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 91 | svc := New(Config{ |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 92 | Etcd: client.NewLocal(cli), |
| 93 | NodeCredentials: n, |
| 94 | LeaderTTL: time.Second, |
| 95 | Directory: &dir, |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 96 | }) |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 97 | if err := supervisor.Run(ctx, n.ID(), svc.Run); err != nil { |
| 98 | t.Fatalf("Run %s: %v", n.ID(), err) |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 99 | } |
| 100 | return &dut{ |
| 101 | endpoint: endpoint, |
| 102 | instance: svc, |
| 103 | temporary: tmp, |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | // dutSet is a collection of duts keyed by endpoint to which they're connected. |
| 108 | // Since each dut is connected to a different endpoint in these tests, the |
| 109 | // endpoint is used as a unique identifier for each dut/instance. |
| 110 | type dutSet map[string]*dut |
| 111 | |
| 112 | // dutUpdate is an update from a dut's Curator instance - either a new |
| 113 | // electionStatus or an error while retrieving it. |
| 114 | type dutUpdate struct { |
| 115 | // endpoint to which this dut's Curator instance is connected. |
| 116 | endpoint string |
| 117 | // status received from the dut's Curator instance, or nil if err is set. |
| 118 | status *electionStatus |
| 119 | err error |
| 120 | } |
| 121 | |
| 122 | // dutSetStatus is a point-in-time snapshot of the electionStatus of Curator |
| 123 | // instances, keyed by endpoints in the same way as dutSet. |
| 124 | type dutSetStatus map[string]*electionStatus |
| 125 | |
| 126 | // leaders returns a list of endpoints that currently see themselves as leaders. |
| 127 | func (d dutSetStatus) leaders() []string { |
| 128 | var res []string |
| 129 | for e, s := range d { |
| 130 | if s.leader != nil { |
| 131 | res = append(res, e) |
| 132 | } |
| 133 | } |
| 134 | return res |
| 135 | } |
| 136 | |
| 137 | // followers returns a list of endpoints that currently see themselves as |
| 138 | // followers. |
| 139 | func (d dutSetStatus) followers() []string { |
| 140 | var res []string |
| 141 | for e, s := range d { |
| 142 | if s.follower != nil { |
| 143 | res = append(res, e) |
| 144 | } |
| 145 | } |
| 146 | return res |
| 147 | } |
| 148 | |
| 149 | // wait blocks until the dutSetStatus of a given dutSet reaches some state (as |
| 150 | // implemented by predicate f). |
| 151 | func (s dutSet) wait(ctx context.Context, f func(s dutSetStatus) bool) (dutSetStatus, error) { |
| 152 | ctx2, ctxC := context.WithCancel(ctx) |
| 153 | defer ctxC() |
| 154 | |
| 155 | // dss is the dutSetStatus which we will keep updating with the electionStatus |
| 156 | // of each dut's Curator as long as predicate f returns false. |
| 157 | dss := make(dutSetStatus) |
| 158 | |
| 159 | // updC is a channel of updates from all dut's electionStatus watchers. The |
| 160 | // dutUpdate type contains the endpoint to distinguish the source of each |
| 161 | // update. |
| 162 | updC := make(chan dutUpdate) |
| 163 | |
| 164 | // Run a watcher for each dut which sends that dut's newest available |
| 165 | // electionStatus (or error) to updC. |
| 166 | for e, d := range s { |
| 167 | w := d.instance.electionWatch() |
| 168 | go func(e string, w electionWatcher) { |
| 169 | defer w.Close() |
| 170 | for { |
| 171 | s, err := w.get(ctx2) |
| 172 | if err != nil { |
| 173 | updC <- dutUpdate{ |
| 174 | endpoint: e, |
| 175 | err: err, |
| 176 | } |
| 177 | return |
| 178 | } |
| 179 | updC <- dutUpdate{ |
| 180 | endpoint: e, |
| 181 | status: s, |
| 182 | } |
| 183 | } |
| 184 | }(e, w) |
| 185 | } |
| 186 | |
| 187 | // Keep updating dss with updates from updC and call f on every change. |
| 188 | for { |
| 189 | select { |
| 190 | case <-ctx.Done(): |
| 191 | return nil, ctx.Err() |
| 192 | case u := <-updC: |
| 193 | if u.err != nil { |
| 194 | return nil, fmt.Errorf("from %q: %w", u.endpoint, u.err) |
| 195 | } |
| 196 | dss[u.endpoint] = u.status |
| 197 | } |
| 198 | |
| 199 | if f(dss) { |
| 200 | return dss, nil |
| 201 | } |
| 202 | } |
| 203 | } |
| 204 | |
| 205 | // TestLeaderElectionStatus exercises the electionStatus watch/get functionality |
| 206 | // from the Curator code. It spawns a cluster of three curators and ensures all |
| 207 | // of them respond correctly to election, partitioning and subsequent |
| 208 | // re-election. |
| 209 | func TestLeaderElectionStatus(t *testing.T) { |
| 210 | ctx, ctxC := context.WithCancel(context.Background()) |
| 211 | defer ctxC() |
| 212 | |
| 213 | // Map from endpoint name to etcd member list index. Alongside with the |
| 214 | // endpoints list, this is used to quickly look up endpoint<->member_num. Since |
| 215 | // we only have one Curator instance per etcd member, we can use the instance's |
| 216 | // etcd endpoint as a unique key to identify it. |
| 217 | endpointToNum := map[string]int{ |
| 218 | endpoints[0]: 0, |
| 219 | endpoints[1]: 1, |
| 220 | endpoints[2]: 2, |
| 221 | } |
| 222 | |
| 223 | // Start a new supervisor in which we create all curator DUTs. |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 224 | ephemeral := rpc.NewEphemeralClusterCredentials(t, 3) |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 225 | dutC := make(chan *dut) |
| Serge Bazanski | 79fc1e9 | 2021-07-06 16:25:22 +0200 | [diff] [blame] | 226 | supervisor.TestHarness(t, func(ctx context.Context) error { |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 227 | for e, n := range endpointToNum { |
| 228 | dutC <- newDut(ctx, t, e, ephemeral.Nodes[n]) |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 229 | } |
| 230 | close(dutC) |
| 231 | supervisor.Signal(ctx, supervisor.SignalHealthy) |
| 232 | supervisor.Signal(ctx, supervisor.SignalDone) |
| 233 | return nil |
| 234 | }) |
| 235 | |
| 236 | // Build dutSet, ie. map from endpoint to Curator DUT. |
| 237 | duts := make(dutSet) |
| 238 | for d := range dutC { |
| 239 | duts[d.endpoint] = d |
| 240 | } |
| 241 | // Schedule cleanup for all DUTs. |
| 242 | defer func() { |
| 243 | for _, dut := range duts { |
| 244 | dut.cleanup() |
| 245 | } |
| 246 | }() |
| 247 | |
| 248 | // Wait until we have a Curator leader. |
| 249 | dss, err := duts.wait(ctx, func(dss dutSetStatus) bool { |
| 250 | return len(dss.leaders()) == 1 && len(dss.followers()) == 2 |
| 251 | }) |
| 252 | if err != nil { |
| 253 | t.Fatalf("waiting for dut set: %v", err) |
| 254 | } |
| 255 | leaderEndpoint := dss.leaders()[0] |
| 256 | |
| 257 | // Retrieve key and rev from Curator's leader. We will later test to ensure |
| 258 | // these have changed when we switch to another leader and back. |
| 259 | key := dss[leaderEndpoint].leader.lockKey |
| 260 | rev := dss[leaderEndpoint].leader.lockRev |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 261 | leaderNodeID := duts[leaderEndpoint].instance.config.NodeCredentials.ID() |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 262 | leaderNum := endpointToNum[leaderEndpoint] |
| 263 | |
| 264 | // Ensure the leader/follower data in the electionStatus are as expected. |
| 265 | for endpoint, status := range dss { |
| 266 | if endpoint == leaderEndpoint { |
| 267 | // The leader instance should not also be a follower. |
| 268 | if status.follower != nil { |
| 269 | t.Errorf("leader cannot also be a follower") |
| 270 | } |
| 271 | } else { |
| 272 | // The follower instances should also not be leaders. |
| 273 | if status.leader != nil { |
| 274 | t.Errorf("instance %q is leader", endpoint) |
| 275 | } |
| 276 | follower := status.follower |
| 277 | if follower == nil { |
| 278 | t.Errorf("instance %q is not a follower", endpoint) |
| 279 | continue |
| 280 | } |
| 281 | // The follower instances should point to the leader in their seen lock. |
| 282 | if want, got := leaderNodeID, follower.lock.NodeId; want != got { |
| 283 | t.Errorf("instance %q sees node id %q as follower, wanted %q", endpoint, want, got) |
| 284 | } |
| 285 | } |
| 286 | } |
| 287 | |
| 288 | // Partition off leader's etcd instance from other instances. |
| 289 | for n, member := range cluster.Members { |
| 290 | if n == leaderNum { |
| 291 | continue |
| 292 | } |
| 293 | cluster.Members[leaderNum].InjectPartition(t, member) |
| 294 | } |
| 295 | |
| 296 | // Wait until we switch leaders |
| 297 | dss, err = duts.wait(ctx, func(dss dutSetStatus) bool { |
| 298 | // Ensure we've lost leadership on the initial leader. |
| 299 | if i, ok := dss[leaderEndpoint]; ok && i.leader != nil { |
| 300 | return false |
| 301 | } |
| 302 | return len(dss.leaders()) == 1 && len(dss.followers()) == 1 |
| 303 | }) |
| 304 | if err != nil { |
| 305 | t.Fatalf("waiting for dut set: %v", err) |
| 306 | } |
| 307 | newLeaderEndpoint := dss.leaders()[0] |
| 308 | |
| 309 | // Ensure the old instance is neither leader nor follower (signaling loss of |
| 310 | // quorum). |
| 311 | if want, got := false, dss[leaderEndpoint].leader != nil; want != got { |
| 312 | t.Errorf("old leader's leadership is %v, wanted %v", want, got) |
| 313 | } |
| 314 | if want, got := false, dss[leaderEndpoint].follower != nil; want != got { |
| 315 | t.Errorf("old leader's followership is %v, wanted %v", want, got) |
| 316 | } |
| 317 | |
| 318 | // Get new leader's key and rev. |
| 319 | newKey := dss[newLeaderEndpoint].leader.lockKey |
| 320 | newRev := dss[newLeaderEndpoint].leader.lockRev |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 321 | newLeaderNodeID := duts[newLeaderEndpoint].instance.config.NodeCredentials.ID() |
| Serge Bazanski | 76003f8 | 2021-06-17 16:39:01 +0200 | [diff] [blame] | 322 | |
| 323 | if leaderEndpoint == newLeaderEndpoint { |
| 324 | t.Errorf("leader endpoint didn't change (%q -> %q)", leaderEndpoint, newLeaderEndpoint) |
| 325 | } |
| 326 | if key == newKey { |
| 327 | t.Errorf("leader election key didn't change (%q -> %q)", key, newKey) |
| 328 | } |
| 329 | if rev == newRev { |
| 330 | t.Errorf("leader election rev didn't change (%d -> %d)", rev, newRev) |
| 331 | } |
| 332 | |
| 333 | // Ensure the last node of the cluster (not the current leader and not the |
| 334 | // previous leader) is now a follower pointing at the new leader. |
| 335 | for endpoint, status := range dss { |
| 336 | if endpoint == leaderEndpoint || endpoint == newLeaderEndpoint { |
| 337 | continue |
| 338 | } |
| 339 | follower := status.follower |
| 340 | if follower == nil { |
| 341 | t.Errorf("instance %q is not a follower", endpoint) |
| 342 | continue |
| 343 | } |
| 344 | if want, got := newLeaderNodeID, follower.lock.NodeId; want != got { |
| 345 | t.Errorf("instance %q sees node id %q as follower, wanted %q", endpoint, want, got) |
| 346 | } |
| 347 | } |
| 348 | } |