| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 1 | package curator |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "errors" |
| 6 | "io/ioutil" |
| 7 | "testing" |
| 8 | |
| 9 | "google.golang.org/grpc/codes" |
| 10 | "google.golang.org/grpc/status" |
| 11 | |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 12 | "source.monogon.dev/metropolis/node/core/localstorage" |
| 13 | "source.monogon.dev/metropolis/node/core/localstorage/declarative" |
| Serge Bazanski | d7d6e02 | 2021-09-01 15:03:06 +0200 | [diff] [blame] | 14 | "source.monogon.dev/metropolis/node/core/rpc" |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 15 | "source.monogon.dev/metropolis/pkg/event/memory" |
| 16 | "source.monogon.dev/metropolis/pkg/supervisor" |
| 17 | ) |
| 18 | |
| 19 | // TestListenerSwitch exercises the curator listener's |
| 20 | // switch-to-different-implementation functionality, notably ensuring that the |
| 21 | // correct implementation is called and that the context is canceled accordingly |
| 22 | // on implementation switch. |
| 23 | // |
| 24 | // It does not test the gRPC listener socket itself and the actual |
| 25 | // implementations - that is deferred to curator functionality tests. |
| 26 | func TestListenerSwitch(t *testing.T) { |
| 27 | // Create ephemeral directory for curator and place it into /tmp. |
| 28 | dir := localstorage.EphemeralCuratorDirectory{} |
| 29 | // Force usage of /tmp as temp directory root, otherwsie TMPDIR from Bazel |
| 30 | // returns a path long enough that socket binds in the localstorage fail |
| 31 | // (as socket names are limited to 108 characters). |
| 32 | tmp, err := ioutil.TempDir("/tmp", "curator-test-*") |
| 33 | if err != nil { |
| 34 | t.Fatalf("TempDir: %v", err) |
| 35 | } |
| 36 | err = declarative.PlaceFS(&dir, tmp) |
| 37 | if err != nil { |
| 38 | t.Fatalf("PlaceFS: %v", err) |
| 39 | } |
| 40 | |
| 41 | // Create test event value. |
| 42 | var val memory.Value |
| 43 | |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 44 | eph := rpc.NewEphemeralClusterCredentials(t, 1) |
| 45 | creds := eph.Nodes[0] |
| 46 | |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 47 | // Create DUT listener. |
| 48 | l := &listener{ |
| 49 | etcd: nil, |
| 50 | directory: &dir, |
| 51 | electionWatch: func() electionWatcher { |
| 52 | return electionWatcher{ |
| 53 | Watcher: val.Watch(), |
| 54 | } |
| 55 | }, |
| 56 | dispatchC: make(chan dispatchRequest), |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 57 | node: creds, |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 58 | } |
| 59 | |
| 60 | // Start listener under supervisor. |
| Serge Bazanski | 79fc1e9 | 2021-07-06 16:25:22 +0200 | [diff] [blame] | 61 | supervisor.TestHarness(t, l.run) |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 62 | |
| 63 | // Begin with a follower. |
| 64 | val.Set(electionStatus{ |
| 65 | follower: &electionStatusFollower{}, |
| 66 | }) |
| 67 | |
| Serge Bazanski | 79fc1e9 | 2021-07-06 16:25:22 +0200 | [diff] [blame] | 68 | // Context for this test. |
| 69 | ctx, ctxC := context.WithCancel(context.Background()) |
| 70 | defer ctxC() |
| 71 | |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 72 | // Simulate a request context. |
| 73 | ctxR, ctxRC := context.WithCancel(ctx) |
| 74 | |
| 75 | // Check that canceling the request unblocks a pending dispatched call. |
| 76 | errC := make(chan error) |
| 77 | go func() { |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 78 | errC <- l.callImpl(ctxR, func(ctx context.Context, impl rpc.ClusterExternalServices) error { |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 79 | <-ctx.Done() |
| 80 | return ctx.Err() |
| 81 | }) |
| 82 | }() |
| 83 | ctxRC() |
| 84 | err = <-errC |
| 85 | if err == nil || !errors.Is(err, context.Canceled) { |
| 86 | t.Fatalf("callImpl context should have returned context error, got %v", err) |
| 87 | } |
| 88 | |
| 89 | // Check that switching implementations unblocks a pending dispatched call. |
| 90 | scheduledC := make(chan struct{}) |
| 91 | go func() { |
| Serge Bazanski | 3379a5d | 2021-09-09 12:56:40 +0200 | [diff] [blame] | 92 | errC <- l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error { |
| Serge Bazanski | 3c885de | 2021-06-17 17:21:00 +0200 | [diff] [blame] | 93 | close(scheduledC) |
| 94 | <-ctx.Done() |
| 95 | return ctx.Err() |
| 96 | }) |
| 97 | }() |
| 98 | // Block until we actually start executing on the follower listener. |
| 99 | <-scheduledC |
| 100 | // Switch over to leader listener. |
| 101 | val.Set(electionStatus{ |
| 102 | leader: &electionStatusLeader{}, |
| 103 | }) |
| 104 | // Check returned error. |
| 105 | err = <-errC |
| 106 | if err == nil { |
| 107 | t.Fatalf("callImpl context should have returned error, got nil") |
| 108 | } |
| 109 | if serr, ok := status.FromError(err); !ok || serr.Code() != codes.Unavailable { |
| 110 | t.Fatalf("callImpl context should have returned unavailable, got %v", err) |
| 111 | } |
| 112 | } |