m/n/core: remove local listener from curator
While working on a client library to access a cluster reliably, a
thought popped into my head:
Do we even need to run the local (UNIX domain socket) listener in the
Curator?
And, after checking all code paths, to my surprise... no, not really.
Why did we ever do it? Perhaps because we started differently structured
cluster bootstrap codebase that caused it to be a hard requirement. Or
maybe it was just a momentary lapse of reason. Regardless, with the
current codebase, it makes no sense: we always have Node credentials
available, and we run the Curator on all network interfaces. So why not
just connect over loopback and use TLS?
Here are some of the benefits of removing the local listener:
It removes a whole bunch of code, and pulling at a few more threads in
the Curator and RPC codebases will probably let us remove quite a bit
more now unused abstractions.
It leads to a more secure product, as we have one less privilege domain
socket to worry about (although we still have the etcd one... but that's
a whole different can of worms).
And most importantly, it paves the way for a vastly simplified cluster
client - one in which the transport is the same regardless of whether we
connect to a local or remote curator. This should let us use bog
standard gRPC load balancing / resolving extensions to reach the Curator
in an idiomatic and robust way.
Change-Id: I1fe9b04ba3b5f4e001050c25aec61a761077492f
Reviewed-on: https://review.monogon.dev/c/monogon/+/624
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 571a7ca..90ae216 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -25,7 +25,6 @@
"//metropolis/node/core/curator/proto/api:go_default_library",
"//metropolis/node/core/curator/proto/private:go_default_library",
"//metropolis/node/core/identity:go_default_library",
- "//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/rpc:go_default_library",
"//metropolis/pkg/combinectx:go_default_library",
"//metropolis/pkg/event:go_default_library",
@@ -59,8 +58,6 @@
"//metropolis/node/core/curator/proto/api:go_default_library",
"//metropolis/node/core/curator/proto/private:go_default_library",
"//metropolis/node/core/identity:go_default_library",
- "//metropolis/node/core/localstorage:go_default_library",
- "//metropolis/node/core/localstorage/declarative:go_default_library",
"//metropolis/node/core/rpc:go_default_library",
"//metropolis/pkg/event/memory:go_default_library",
"//metropolis/pkg/pki:go_default_library",
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
index e247296..5d6eedd 100644
--- a/metropolis/node/core/curator/curator.go
+++ b/metropolis/node/core/curator/curator.go
@@ -4,9 +4,11 @@
// The Curator is implemented as a leader-elected service. Instances of the
// service are running colocated with all nodes that run a consensus (etcd)
// server.
-// Each instance listens locally over gRPC for requests from code running on the
-// same node, and publicly over gRPC for traffic from other nodes (eg. ones that
-// do not run an instance of the Curator) and external users.
+//
+// Each instance listens on all network interfaces, for requests both from the
+// code running on the same node, for traffic from other nodes (eg. ones that do
+// not run an instance of the Curator) and from external users.
+//
// The curator leader keeps its state fully in etcd. Followers forward all
// requests to the active leader.
package curator
@@ -18,14 +20,11 @@
"time"
"go.etcd.io/etcd/clientv3/concurrency"
- "google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node/core/consensus"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
- "source.monogon.dev/metropolis/node/core/localstorage"
- "source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -44,9 +43,6 @@
// resiliency against short network partitions.
// A value less or equal to zero will default to 60 seconds.
LeaderTTL time.Duration
- // Directory is the curator ephemeral directory in which the curator will
- // store its local domain socket for connections from the node.
- Directory *localstorage.EphemeralCuratorDirectory
}
// Service is the Curator service. See the package-level documentation for more
@@ -278,11 +274,10 @@
return fmt.Errorf("while retrieving consensus client: %w", err)
}
- // Start listener. This is a gRPC service listening on a local socket,
- // providing the Curator API to consumers, dispatching to either a locally
- // running leader, or forwarding to a remotely running leader.
+ // Start listener. This is a gRPC service listening on all interfaces, providing
+ // the Curator API to consumers, dispatching to either a locally running leader,
+ // or forwarding to a remotely running leader.
lis := listener{
- directory: s.config.Directory,
node: s.config.NodeCredentials,
electionWatch: s.electionWatch,
dispatchC: make(chan dispatchRequest),
@@ -314,8 +309,3 @@
supervisor.Logger(ctx).Info("Curator election restarting...")
}
}
-
-func (s *Service) DialCluster() (*grpc.ClientConn, error) {
- remote := fmt.Sprintf("unix://%s", s.config.Directory.ClientSocket.FullPath())
- return rpc.NewNodeClient(remote)
-}
diff --git a/metropolis/node/core/curator/curator_test.go b/metropolis/node/core/curator/curator_test.go
index 9212a52..cd5889b 100644
--- a/metropolis/node/core/curator/curator_test.go
+++ b/metropolis/node/core/curator/curator_test.go
@@ -12,8 +12,6 @@
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
- "source.monogon.dev/metropolis/node/core/localstorage"
- "source.monogon.dev/metropolis/node/core/localstorage/declarative"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -76,30 +74,17 @@
t.Fatalf("clientv3.New: %v", err)
}
- // Create ephemeral directory for curator and place it into /tmp.
- dir := localstorage.EphemeralCuratorDirectory{}
- tmp, err := os.MkdirTemp("/tmp", "curator-test-*")
- if err != nil {
- t.Fatalf("TempDir: %v", err)
- }
- err = declarative.PlaceFS(&dir, tmp)
- if err != nil {
- t.Fatalf("PlaceFS: %v", err)
- }
-
svc := New(Config{
NodeCredentials: n,
LeaderTTL: time.Second,
- Directory: &dir,
Consensus: consensus.TestServiceHandle(t, cli),
})
if err := supervisor.Run(ctx, n.ID(), svc.Run); err != nil {
t.Fatalf("Run %s: %v", n.ID(), err)
}
return &dut{
- endpoint: endpoint,
- instance: svc,
- temporary: tmp,
+ endpoint: endpoint,
+ instance: svc,
}
}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index e4829bf..174b53c 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -27,8 +27,8 @@
)
// fakeLeader creates a curatorLeader without any underlying leader election, in
-// its own etcd namespace. It starts public and local gRPC listeners and returns
-// clients to them.
+// its own etcd namespace. It starts the gRPC listener and returns clients to
+// it, from the point of view of the local node and some other external node.
//
// The gRPC listeners are replicated to behave as when running the Curator
// within Metropolis, so all calls performed will be authenticated and encrypted
@@ -110,13 +110,10 @@
t.Fatalf("could not generate unknown node keypair: %v", err)
}
- // Create security interceptors for both gRPC listeners.
+ // Create security interceptors for gRPC listener.
externalSec := &rpc.ExternalServerSecurity{
NodeCredentials: nodeCredentials,
}
- localSec := &rpc.LocalServerSecurity{
- Node: &nodeCredentials.Node,
- }
// Build a curator leader object. This implements methods that will be
// exercised by tests.
@@ -130,26 +127,18 @@
// Create a curator gRPC server which performs authentication as per the created
// listenerSecurity and is backed by the created leader.
externalSrv := externalSec.SetupExternalGRPC(nil, leader)
- localSrv := localSec.SetupLocalGRPC(nil, leader)
// The gRPC server will listen on an internal 'loopback' buffer.
externalLis := bufconn.Listen(1024 * 1024)
- localLis := bufconn.Listen(1024 * 1024)
go func() {
if err := externalSrv.Serve(externalLis); err != nil {
t.Fatalf("GRPC serve failed: %v", err)
}
}()
- go func() {
- if err := localSrv.Serve(localLis); err != nil {
- t.Fatalf("GRPC serve failed: %v", err)
- }
- }()
// Stop the gRPC server on context cancel.
go func() {
<-ctx.Done()
externalSrv.Stop()
- localSrv.Stop()
}()
// Create an authenticated manager gRPC client.
@@ -158,12 +147,11 @@
t.Fatalf("Dialing external GRPC failed: %v", err)
}
- // Create a locally authenticated node gRPC client.
- lcl, err := rpc.NewNodeClientTest(localLis)
+ // Create an ephemeral node gRPC client for the local node.
+ lcl, err := rpc.NewAuthenticatedClientTest(externalLis, nodeCredentials.TLSCredentials(), nodeCredentials.ClusterCA())
if err != nil {
- t.Fatalf("Dialing local GRPC failed: %v", err)
+ t.Fatalf("Dialing external GRPC failed: %v", err)
}
-
// Create an ephemeral node gRPC client for the 'other node'.
ocl, err := rpc.NewEphemeralClientTest(externalLis, otherPriv, nodeCredentials.ClusterCA())
if err != nil {
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index d142851..a8d7e42 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -15,7 +15,6 @@
"source.monogon.dev/metropolis/node/core/consensus/client"
cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
- "source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/combinectx"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -44,9 +43,6 @@
// etcd is a client to the locally running consensus (etcd) server which is used
// both for storing lock/leader election status and actual Curator data.
etcd client.Namespaced
- // directory is the ephemeral directory in which the local gRPC socket will
- // be available for node-local consumers.
- directory *localstorage.EphemeralCuratorDirectory
// electionWatch is a function that returns an active electionWatcher for the
// listener to use when determining local leadership. As the listener may
// restart on error, this factory-function is used instead of an electionWatcher
@@ -90,6 +86,8 @@
// Respond to requests and status updates.
for {
select {
+ case <-ctx.Done():
+ return
case r := <-l.dispatchC:
// Handle request.
r.resC <- listenerTarget{
@@ -211,25 +209,8 @@
es := rpc.ExternalServerSecurity{
NodeCredentials: l.node,
}
- ls := rpc.LocalServerSecurity{
- Node: &l.node.Node,
- }
- err := supervisor.Run(ctx, "local", func(ctx context.Context) error {
- lisLocal, err := net.ListenUnix("unix", &net.UnixAddr{Name: l.directory.ClientSocket.FullPath(), Net: "unix"})
- if err != nil {
- return fmt.Errorf("failed to listen: %w", err)
- }
- defer lisLocal.Close()
-
- runnable := supervisor.GRPCServer(ls.SetupLocalGRPC(supervisor.MustSubLogger(ctx, "rpc"), l), lisLocal, true)
- return runnable(ctx)
- })
- if err != nil {
- return fmt.Errorf("while starting local gRPC listener: %w", err)
- }
-
- err = supervisor.Run(ctx, "external", func(ctx context.Context) error {
+ err := supervisor.Run(ctx, "external", func(ctx context.Context) error {
lisExternal, err := net.Listen("tcp", fmt.Sprintf(":%d", node.CuratorServicePort))
if err != nil {
return fmt.Errorf("failed to listen on external curator socket: %w", err)
diff --git a/metropolis/node/core/curator/listener_test.go b/metropolis/node/core/curator/listener_test.go
index 7c1744e..ccd094c 100644
--- a/metropolis/node/core/curator/listener_test.go
+++ b/metropolis/node/core/curator/listener_test.go
@@ -3,14 +3,11 @@
import (
"context"
"errors"
- "os"
"testing"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "source.monogon.dev/metropolis/node/core/localstorage"
- "source.monogon.dev/metropolis/node/core/localstorage/declarative"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -24,20 +21,6 @@
// It does not test the gRPC listener socket itself and the actual
// implementations - that is deferred to curator functionality tests.
func TestListenerSwitch(t *testing.T) {
- // Create ephemeral directory for curator and place it into /tmp.
- dir := localstorage.EphemeralCuratorDirectory{}
- // Force usage of /tmp as temp directory root, otherwsie TMPDIR from Bazel
- // returns a path long enough that socket binds in the localstorage fail
- // (as socket names are limited to 108 characters).
- tmp, err := os.MkdirTemp("/tmp", "curator-test-*")
- if err != nil {
- t.Fatalf("TempDir: %v", err)
- }
- err = declarative.PlaceFS(&dir, tmp)
- if err != nil {
- t.Fatalf("PlaceFS: %v", err)
- }
-
// Create test event value.
var val memory.Value
@@ -46,8 +29,7 @@
// Create DUT listener.
l := &listener{
- etcd: nil,
- directory: &dir,
+ etcd: nil,
electionWatch: func() electionWatcher {
return electionWatcher{
Watcher: val.Watch(),
@@ -81,7 +63,7 @@
})
}()
ctxRC()
- err = <-errC
+ err := <-errC
if err == nil || !errors.Is(err, context.Canceled) {
t.Fatalf("callImpl context should have returned context error, got %v", err)
}
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index da54d03..9a03a15 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -14,16 +14,13 @@
// that run a consensus server (etcd). Every instance either serves traffic
// directly (if it is the leader) or passes all RPCs over to the current
// leader.
-// The curator listens on gRPC over a local UNIX domain socket accessible to the
-// rest of the node code, and on a node's port over TLS with a certificate
-// issued by the Cluster CA.
+// The curator listens on gRPC on all network interfaces at a well known port,
+// with access encrypted and authenticated by TLS using certificates issued by
+// the Cluster CA.
//
// The curator is a privileged service, and performs per-RPC authorization based
-// on the identity of the client:
-// - When serving traffic locally over a UNIX domain socket, the service
-// attaches the identity of this node to the RPCs.
-// - When serving over public gRPC, cluster authentication is required and gRPC
-// client identity will be tied to the RPCs.
+// on the identity of the client, which is determined by the client certificate
+// supplied over TLS.
//
// TODO(q3k): implement and document public Cluster gRPC.
// TODO(q3k): implement and document cluster auth for nodes and escrowed user
diff --git a/metropolis/node/core/localstorage/directory_root.go b/metropolis/node/core/localstorage/directory_root.go
index 39fda2c..c3a49cb 100644
--- a/metropolis/node/core/localstorage/directory_root.go
+++ b/metropolis/node/core/localstorage/directory_root.go
@@ -60,7 +60,6 @@
// TODO(q3k): do this automatically?
for _, d := range []declarative.DirectoryPlacement{
r.Ephemeral.Consensus,
- r.Ephemeral.Curator,
r.Ephemeral.Containerd, r.Ephemeral.Containerd.Tmp, r.Ephemeral.Containerd.RunSC, r.Ephemeral.Containerd.IPAM,
r.Ephemeral.FlexvolumePlugins,
} {
diff --git a/metropolis/node/core/localstorage/storage.go b/metropolis/node/core/localstorage/storage.go
index d067438..27ffd1d 100644
--- a/metropolis/node/core/localstorage/storage.go
+++ b/metropolis/node/core/localstorage/storage.go
@@ -154,7 +154,6 @@
type EphemeralDirectory struct {
declarative.Directory
Consensus EphemeralConsensusDirectory `dir:"consensus"`
- Curator EphemeralCuratorDirectory `dir:"curator"`
Containerd EphemeralContainerdDirectory `dir:"containerd"`
FlexvolumePlugins declarative.Directory `dir:"flexvolume_plugins"`
Hosts declarative.File `file:"hosts"`
@@ -167,13 +166,6 @@
ServerLogsFIFO declarative.File `file:"server-logs.fifo"`
}
-type EphemeralCuratorDirectory struct {
- declarative.Directory
- // Curator ephemeral socket, dialed by local curator clients.
- // See: //metropolis/node/core/curator.
- ClientSocket declarative.File `file:"client.sock"`
-}
-
type EphemeralContainerdDirectory struct {
declarative.Directory
ClientSocket declarative.File `file:"client.sock"`
diff --git a/metropolis/node/core/roleserve/value_clustermembership.go b/metropolis/node/core/roleserve/value_clustermembership.go
index 4f9d196..d699473 100644
--- a/metropolis/node/core/roleserve/value_clustermembership.go
+++ b/metropolis/node/core/roleserve/value_clustermembership.go
@@ -10,7 +10,6 @@
common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus"
- "source.monogon.dev/metropolis/node/core/curator"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event"
@@ -33,9 +32,6 @@
// but also accesses it to pass over information about already known remote
// curators and to get the local node's identity.
type ClusterMembership struct {
- // localCurator is set by the Control Plane Worker when this node runs control
- // plane services.
- localCurator *curator.Service
// localConsensus is set by the Control Plane Worker when this node runs control
// plane services.
localConsensus *consensus.Service
@@ -115,7 +111,7 @@
if cm.credentials == nil {
continue
}
- if cm.localCurator == nil && cm.remoteCurators == nil {
+ if cm.remoteCurators == nil {
continue
}
return cm, nil
@@ -133,14 +129,10 @@
// perform a GetHome/DialCurator process on any gRPC error. A smarter
// load-balancing/re-dialing client will be implemented in the future.
func (m *ClusterMembership) DialCurator() (*grpc.ClientConn, error) {
- if m.localCurator != nil {
- return m.localCurator.DialCluster()
- }
-
// Dial first curator.
// TODO(q3k): load balance
if m.remoteCurators == nil || len(m.remoteCurators.Nodes) < 1 {
- return nil, fmt.Errorf("no local or remote curators available")
+ return nil, fmt.Errorf("no curators available")
}
host := m.remoteCurators.Nodes[0].Addresses[0].Host
addr := net.JoinHostPort(host, common.CuratorServicePort.PortString())
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
index 6193c5a..aa5b4a3 100644
--- a/metropolis/node/core/roleserve/worker_controlplane.go
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -1,6 +1,7 @@
package roleserve
import (
+ "bytes"
"context"
"crypto/ed25519"
"crypto/x509"
@@ -337,12 +338,33 @@
directory = startup.existingMembership.remoteCurators
}
+ // Ensure this node is present in the cluster directory.
+ if directory == nil {
+ directory = &cpb.ClusterDirectory{}
+ }
+ missing := true
+ for _, n := range directory.Nodes {
+ if bytes.Equal(n.PublicKey, creds.PublicKey()) {
+ missing = false
+ break
+ }
+ }
+ if missing {
+ directory.Nodes = append(directory.Nodes, &cpb.ClusterDirectory_Node{
+ PublicKey: creds.PublicKey(),
+ Addresses: []*cpb.ClusterDirectory_Node_Address{
+ {
+ Host: "127.0.0.1",
+ },
+ },
+ })
+ }
+
// Start curator.
cur := curator.New(curator.Config{
NodeCredentials: creds,
Consensus: con,
LeaderTTL: 10 * time.Second,
- Directory: &s.storageRoot.Ephemeral.Curator,
})
if err := supervisor.Run(ctx, "curator", cur.Run); err != nil {
return fmt.Errorf("failed to start curator: %w", err)
@@ -354,7 +376,6 @@
// We now have a locally running ControlPlane. Reflect that in a new
// ClusterMembership.
s.clusterMembership.set(&ClusterMembership{
- localCurator: cur,
localConsensus: con,
credentials: creds,
remoteCurators: directory,
diff --git a/metropolis/node/core/rpc/server_authentication.go b/metropolis/node/core/rpc/server_authentication.go
index 50cf36a..82cdf9c 100644
--- a/metropolis/node/core/rpc/server_authentication.go
+++ b/metropolis/node/core/rpc/server_authentication.go
@@ -18,8 +18,12 @@
apb "source.monogon.dev/metropolis/proto/api"
)
-// authenticationStrategy is implemented by {Local,External}ServerSecurity to
-// share logic between the two implementations.
+// authenticationStrategy is implemented by ExternalServerSecurity. Historically
+// it has also been implemented by LocalServerSecurity (listening on a local
+// domain socket), but this implementation has since been removed.
+//
+// TODO(q3k): simplify this code and remove this interface now that there's only
+// ExternalServerSecurity.
type authenticationStrategy interface {
// getPeerInfo will be called by the stream and unary gRPC server interceptors
// to authenticate incoming gRPC calls. It's given the gRPC context of the call
@@ -194,52 +198,3 @@
return cert, nil
}
-
-// LocalServerSecurity are the security options of an RPC server that will run
-// the Curator service over a local domain socket. When set up using
-// LocalServerSecurity, all incoming RPCs will be authenticated as coming from
-// the node that this service is running on.
-//
-// It implements authenticationStrategy.
-type LocalServerSecurity struct {
- // Node for which the gRPC server will authenticate all incoming requests as
- // originating from.
- Node *identity.Node
-
- // nodePermissions is used by tests to inject the permissions available to a
- // node. When not set, it defaults to the global nodePermissions map.
- nodePermissions Permissions
-}
-
-// SetupLocalGRPC returns a grpc.Server ready to listen on a local domain socket
-// and serve the Curator service. All incoming RPCs will be authenticated as
-// originating from the node for which LocalServerSecurity has been configured.
-func (l *LocalServerSecurity) SetupLocalGRPC(logger logtree.LeveledLogger, impls ClusterInternalServices) *grpc.Server {
- s := grpc.NewServer(
- grpc.UnaryInterceptor(unaryInterceptor(logger, l)),
- grpc.StreamInterceptor(streamInterceptor(logger, l)),
- )
- cpb.RegisterCuratorServer(s, impls)
- return s
-}
-
-func (l *LocalServerSecurity) getPeerInfo(_ context.Context) (*PeerInfo, error) {
- // Local connections are always node connections.
- np := l.nodePermissions
- if np == nil {
- np = nodePermissions
- }
- return &PeerInfo{
- Node: &PeerInfoNode{
- PublicKey: l.Node.PublicKey(),
- Permissions: np,
- },
- }, nil
-}
-
-func (l *LocalServerSecurity) getPeerInfoUnauthenticated(_ context.Context) (*PeerInfo, error) {
- // This shouldn't happen - why would we call unauthenticated methods locally?
- // This can be implemented, but doesn't really make sense. For now, assume this
- // is a programming error. This can be changed if needed.
- return nil, status.Errorf(codes.Unauthenticated, "unauthenticated methods not supported over local connections")
-}
diff --git a/metropolis/node/core/rpc/server_authentication_test.go b/metropolis/node/core/rpc/server_authentication_test.go
index 795b460..f0ae6a2 100644
--- a/metropolis/node/core/rpc/server_authentication_test.go
+++ b/metropolis/node/core/rpc/server_authentication_test.go
@@ -102,70 +102,3 @@
t.Errorf("GetRegisterTicket (by ephemeral cert) returned %v, wanted codes.Unauthenticated", err)
}
}
-
-// TestLocalServerSecurity ensures that the unary interceptor of the
-// LocalServerSecurity structure works, and authenticates/authorizes incoming
-// RPCs as expected.
-func TestLocalServerSecurity(t *testing.T) {
- ctx, ctxC := context.WithCancel(context.Background())
- defer ctxC()
-
- eph := NewEphemeralClusterCredentials(t, 1)
-
- permissions := make(Permissions)
- for k, v := range nodePermissions {
- permissions[k] = v
- }
-
- ls := LocalServerSecurity{
- Node: &eph.Nodes[0].Node,
- nodePermissions: permissions,
- }
-
- impl := &testImplementation{}
- srv := ls.SetupLocalGRPC(nil, impl)
- lis := bufconn.Listen(1024 * 1024)
- go func() {
- if err := srv.Serve(lis); err != nil {
- t.Fatalf("GRPC serve failed: %v", err)
- }
- }()
- defer lis.Close()
- defer srv.Stop()
-
- // Nodes should have access to Curator.Watch.
- cl, err := NewNodeClientTest(lis)
- if err != nil {
- t.Fatalf("NewAuthenticatedClient: %v", err)
- }
- defer cl.Close()
-
- curator := cpb.NewCuratorClient(cl)
- req := &cpb.WatchRequest{
- Kind: &cpb.WatchRequest_NodeInCluster_{
- NodeInCluster: &cpb.WatchRequest_NodeInCluster{
- NodeId: eph.Nodes[0].ID(),
- },
- },
- }
- w, err := curator.Watch(ctx, req)
- if err != nil {
- t.Fatalf("Watch: %v", err)
- }
- _, err = w.Recv()
- if s, ok := status.FromError(err); !ok || s.Code() != codes.Unimplemented {
- t.Errorf("Watch (by local node) returned %v, wanted codes.Unimplemented", err)
- }
-
- // Take away the node's PERMISSION_READ_CLUSTER_STATUS permissions and try
- // again. This should fail.
- permissions[epb.Permission_PERMISSION_READ_CLUSTER_STATUS] = false
- w, err = curator.Watch(ctx, req)
- if err != nil {
- t.Fatalf("Watch: %v", err)
- }
- _, err = w.Recv()
- if s, ok := status.FromError(err); !ok || s.Code() != codes.PermissionDenied {
- t.Errorf("Watch (by local node after removing permission) returned %v, wanted codes.PermissionDenied", err)
- }
-}