m/pkg/event: make type-safe
This is a fairly large change which makes use of Go type parameters
(“generics”) to make the event library (and its memory/etcd
implementations) type safe.
Since we now have the event.Value interface strongly typed, we also move
options which were implementation-specific (like BacklogOnly)
to be part of that interface, instead of the previously type-asserted
specific implementations. Use of options that are not handled by a
particular implementation is a runtime error. Expressing this in the
type system is probably not worth the effort.
We also implement Filter to allow offloading some of the functionality previously implemented in type assertion wrappers into the library itself.
In the end, this ends up removing a bunch of type assertion code, at
the cost of a fairly sweeping change. Unfortunately, some of this is due
to IntelliJ suddenly deciding to reformat comments.
Change-Id: I1ca6d93db1b5c4055a21af3fb9e5e3d425c0d86e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1322
Tested-by: Jenkins CI
Reviewed-by: Leopold Schabel <leo@monogon.tech>
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index 4f0ab36..246fe1f 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -20,7 +20,6 @@
"//metropolis/node/core/roleserve",
"//metropolis/node/core/rpc",
"//metropolis/node/core/rpc/resolver",
- "//metropolis/pkg/event/memory",
"//metropolis/pkg/supervisor",
"//metropolis/proto/api",
"//metropolis/proto/common",
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index d5eee03..6254669 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -14,15 +14,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// cluster implements low-level clustering logic, especially logic regarding to
-// bootstrapping, registering into and joining a cluster. Its goal is to provide
-// the rest of the node code with the following:
-// - A mounted plaintext storage.
-// - Node credentials/identity.
-// - A locally running etcd server if the node is supposed to run one, and a
-// client connection to that etcd cluster if so.
-// - The state of the cluster as seen by the node, to enable code to respond to
-// node lifecycle changes.
+// Package cluster implements low-level clustering logic, especially logic
+// regarding to bootstrapping, registering into and joining a cluster. Its goal
+// is to provide the rest of the node code with the following:
+// - A mounted plaintext storage.
+// - Node credentials/identity.
+// - A locally running etcd server if the node is supposed to run one, and a
+// client connection to that etcd cluster if so.
+// - The state of the cluster as seen by the node, to enable code to respond to
+// node lifecycle changes.
package cluster
import (
@@ -43,7 +43,6 @@
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/core/roleserve"
- "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
@@ -62,7 +61,6 @@
storageRoot *localstorage.Root
networkService *network.Service
roleServer *roleserve.Service
- status memory.Value
state
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index b570c69..5daa02f 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -99,6 +99,7 @@
"source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/logtree/unraw"
"source.monogon.dev/metropolis/pkg/pki"
@@ -144,7 +145,7 @@
type Service struct {
config *Config
- value memory.Value
+ value memory.Value[*Status]
ca *pki.Certificate
}
@@ -454,7 +455,7 @@
}
}
- w := s.Watch()
+ w := s.value.Watch()
for {
st, err := w.Get(ctx)
if err != nil {
@@ -473,6 +474,10 @@
}
}
+func (s *Service) Watch() event.Watcher[*Status] {
+ return s.value.Watch()
+}
+
// selfupdater is a runnable that performs a one-shot (once per Service Run,
// thus once for each configuration) update of the node's Peer URL in etcd. This
// is currently only really needed because the first node in the cluster
@@ -482,7 +487,7 @@
// more robust.
func (s *Service) selfupdater(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
- w := s.Watch()
+ w := s.value.Watch()
for {
st, err := w.Get(ctx)
if err != nil {
diff --git a/metropolis/node/core/consensus/status.go b/metropolis/node/core/consensus/status.go
index 992d0ac..17df8f9 100644
--- a/metropolis/node/core/consensus/status.go
+++ b/metropolis/node/core/consensus/status.go
@@ -23,38 +23,12 @@
type ServiceHandle interface {
// Watch returns a Event Value compatible Watcher for accessing the State of the
// consensus Service in a safe manner.
- Watch() Watcher
+ Watch() event.Watcher[*Status]
}
-// Watch returns a Event Value compatible Watcher for accessing the State of the
-// consensus Service in a safe manner.
-func (s *Service) Watch() Watcher {
- return Watcher{s.value.Watch()}
-}
-
-type Watcher struct {
- event.Watcher
-}
-
-func (w *Watcher) Get(ctx context.Context, opts ...event.GetOption) (*Status, error) {
- v, err := w.Watcher.Get(ctx, opts...)
- if err != nil {
- return nil, err
- }
- return v.(*Status), nil
-}
-
-func (w *Watcher) GetRunning(ctx context.Context) (*Status, error) {
- for {
- st, err := w.Get(ctx)
- if err != nil {
- return nil, err
- }
- if st.Running() {
- return st, nil
- }
- }
-}
+var FilterRunning = event.Filter(func(st *Status) bool {
+ return st.Running()
+})
// Status of the consensus service. It represents either a running consensus
// service to which a client can connect and on which management can be
diff --git a/metropolis/node/core/consensus/testhelpers.go b/metropolis/node/core/consensus/testhelpers.go
index f69f73e..8b0b213 100644
--- a/metropolis/node/core/consensus/testhelpers.go
+++ b/metropolis/node/core/consensus/testhelpers.go
@@ -10,7 +10,7 @@
)
type testServiceHandle struct {
- s memory.Value
+ s memory.Value[*Status]
}
// TestServiceHandle builds a somewhat functioning ServiceHandle from a bare
@@ -41,9 +41,5 @@
t.Fatalf("failed to ensure PKI CA: %v", err)
}
tsh.s.Set(st)
- return &tsh
-}
-
-func (h *testServiceHandle) Watch() Watcher {
- return Watcher{h.s.Watch()}
+ return &tsh.s
}
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index be23b55..2283b34 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -66,6 +66,7 @@
"//metropolis/node/core/curator/proto/private",
"//metropolis/node/core/identity",
"//metropolis/node/core/rpc",
+ "//metropolis/pkg/event",
"//metropolis/pkg/logtree",
"//metropolis/pkg/pki",
"//metropolis/pkg/supervisor",
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
index 400f08c..336b7d6 100644
--- a/metropolis/node/core/curator/curator.go
+++ b/metropolis/node/core/curator/curator.go
@@ -25,7 +25,6 @@
"source.monogon.dev/metropolis/node/core/consensus"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
- "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -57,7 +56,7 @@
// status is a memory Event Value for keeping the electionStatus of this
// instance. It is not exposed to users of the Curator.
- status memory.Value
+ status memory.Value[*electionStatus]
}
// New creates a new curator Service.
@@ -94,28 +93,6 @@
lock *ppb.LeaderElectionValue
}
-func (s *Service) electionWatch() electionWatcher {
- return electionWatcher{
- Watcher: s.status.Watch(),
- }
-}
-
-// electionWatcher is a type-safe wrapper around event.Watcher which provides
-// electionStatus values.
-type electionWatcher struct {
- event.Watcher
-}
-
-// get retrieves an electionStatus from the electionWatcher.
-func (w *electionWatcher) get(ctx context.Context) (*electionStatus, error) {
- val, err := w.Watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- status := val.(electionStatus)
- return &status, err
-}
-
// buildLockValue returns a serialized etcd value that will be set by the
// instance when it becomes a leader. This value is a serialized
// LeaderElectionValue from private/storage.proto.
@@ -206,7 +183,7 @@
if err := proto.Unmarshal(o.Kvs[0].Value, &lock); err != nil {
return fmt.Errorf("parsing existing lock value failed: %w", err)
}
- s.status.Set(electionStatus{
+ s.status.Set(&electionStatus{
follower: &electionStatusFollower{
lock: &lock,
},
@@ -223,7 +200,7 @@
supervisor.Logger(ctx).Info("Curator became leader.")
// Update status, watchers will now know that this curator is the leader.
- s.status.Set(electionStatus{
+ s.status.Set(&electionStatus{
leader: &electionStatusLeader{
lockKey: election.Key(),
lockRev: election.Rev(),
@@ -246,9 +223,9 @@
// Start local election watcher. This logs what this curator knows about its own
// leadership.
go func() {
- w := s.electionWatch()
+ w := s.status.Watch()
for {
- s, err := w.get(ctx)
+ s, err := w.Get(ctx)
if err != nil {
supervisor.Logger(ctx).Warningf("Election watcher exiting: get(): %v", err)
return
@@ -264,7 +241,7 @@
supervisor.Logger(ctx).Infof("Waiting for consensus...")
w := s.config.Consensus.Watch()
defer w.Close()
- st, err := w.GetRunning(ctx)
+ st, err := w.Get(ctx, consensus.FilterRunning)
if err != nil {
return fmt.Errorf("while waiting for consensus: %w", err)
}
@@ -279,7 +256,7 @@
// or forwarding to a remotely running leader.
lis := listener{
node: s.config.NodeCredentials,
- electionWatch: s.electionWatch,
+ electionWatch: s.status.Watch,
consensus: s.config.Consensus,
etcd: etcd,
}
@@ -297,9 +274,9 @@
supervisor.Signal(ctx, supervisor.SignalHealthy)
for {
- s.status.Set(electionStatus{})
+ s.status.Set(&electionStatus{})
err := s.elect(ctx)
- s.status.Set(electionStatus{})
+ s.status.Set(&electionStatus{})
if err != nil && errors.Is(err, ctx.Err()) {
return fmt.Errorf("election round failed due to context cancelation, not attempting to re-elect: %w", err)
diff --git a/metropolis/node/core/curator/curator_test.go b/metropolis/node/core/curator/curator_test.go
index b1a9671..e9674f5 100644
--- a/metropolis/node/core/curator/curator_test.go
+++ b/metropolis/node/core/curator/curator_test.go
@@ -13,6 +13,7 @@
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -131,11 +132,11 @@
// Run a watcher for each dut which sends that dut's newest available
// electionStatus (or error) to updC.
for e, d := range s {
- w := d.instance.electionWatch()
- go func(e string, w electionWatcher) {
+ w := d.instance.status.Watch()
+ go func(e string, w event.Watcher[*electionStatus]) {
defer w.Close()
for {
- s, err := w.get(ctx2)
+ s, err := w.Get(ctx2)
if err != nil {
updC <- dutUpdate{
endpoint: e,
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index a06f8cd..869cfd9 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -15,9 +15,11 @@
tpb "google.golang.org/protobuf/types/known/timestamppb"
common "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/consensus"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/etcd"
"source.monogon.dev/metropolis/pkg/pki"
cpb "source.monogon.dev/metropolis/proto/common"
@@ -74,7 +76,7 @@
defer w.Close()
for {
- v, err := w.Get(ctx)
+ nodeKV, err := w.Get(ctx)
if err != nil {
if rpcErr, ok := rpcError(err); ok {
return rpcErr
@@ -84,7 +86,6 @@
}
ev := &ipb.WatchEvent{}
- nodeKV := v.(nodeAtID)
nodeKV.appendToEvent(ev)
if err := srv.Send(ev); err != nil {
return err
@@ -99,7 +100,7 @@
ctx := srv.Context()
start, end := nodeEtcdPrefix.KeyRange()
- value := etcd.NewValue(l.etcd, start, nodeValueConverter, etcd.Range(end))
+ value := etcd.NewValue[*nodeAtID](l.etcd, start, nodeValueConverter, etcd.Range(end))
w := value.Watch()
defer w.Close()
@@ -107,15 +108,14 @@
// Perform initial fetch from etcd.
nodes := make(map[string]*Node)
for {
- v, err := w.Get(ctx, etcd.BacklogOnly)
- if err == etcd.BacklogDone {
+ nodeKV, err := w.Get(ctx, event.BacklogOnly[*nodeAtID]())
+ if err == event.BacklogDone {
break
}
if err != nil {
rpc.Trace(ctx).Printf("etcd watch failed (initial fetch): %v", err)
return status.Error(codes.Unavailable, "internal error during initial fetch")
}
- nodeKV := v.(nodeAtID)
if nodeKV.value != nil {
nodes[nodeKV.id] = nodeKV.value
}
@@ -148,13 +148,12 @@
// Send updates as they arrive from etcd watcher.
for {
- v, err := w.Get(ctx)
+ nodeKV, err := w.Get(ctx)
if err != nil {
rpc.Trace(ctx).Printf("etcd watch failed (update): %v", err)
return status.Errorf(codes.Unavailable, "internal error during update")
}
we := &ipb.WatchEvent{}
- nodeKV := v.(nodeAtID)
nodeKV.appendToEvent(we)
if err := srv.Send(we); err != nil {
return err
@@ -173,7 +172,7 @@
// nodeValueConverter is called by etcd node value watchers to convert updates
// from the cluster into nodeAtID, ensuring data integrity and checking
// invariants.
-func nodeValueConverter(key, value []byte) (interface{}, error) {
+func nodeValueConverter(key, value []byte) (*nodeAtID, error) {
res := nodeAtID{
id: nodeEtcdPrefix.ExtractID(string(key)),
}
@@ -192,7 +191,7 @@
// panic.
return nil, fmt.Errorf("invalid node key %q", key)
}
- return res, nil
+ return &res, nil
}
// appendToId records a node update represented by nodeAtID into a Curator
@@ -429,7 +428,7 @@
w := l.consensus.Watch()
defer w.Close()
- st, err := w.GetRunning(ctx)
+ st, err := w.Get(ctx, consensus.FilterRunning)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not get running consensus: %v", err)
}
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index b725759..69c1445 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -11,6 +11,7 @@
"google.golang.org/grpc/status"
dpb "google.golang.org/protobuf/types/known/durationpb"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
@@ -325,7 +326,7 @@
w := l.consensus.Watch()
defer w.Close()
- st, err := w.GetRunning(ctx)
+ st, err := w.Get(ctx, consensus.FilterRunning)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "could not get running consensus: %v", err)
}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index e093d31..2290ab9 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -15,6 +15,7 @@
cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
)
@@ -45,7 +46,7 @@
// listener to use when determining local leadership. As the listener may
// restart on error, this factory-function is used instead of an electionWatcher
// directly.
- electionWatch func() electionWatcher
+ electionWatch func() event.Watcher[*electionStatus]
consensus consensus.ServiceHandle
}
@@ -57,7 +58,7 @@
// waiting for a result.
w := l.electionWatch()
supervisor.Logger(ctx).Infof("Waiting for election status...")
- st, err := w.get(ctx)
+ st, err := w.Get(ctx)
if err != nil {
return fmt.Errorf("could not get election status: %w", err)
}
@@ -141,7 +142,7 @@
case st.leader != nil:
supervisor.Logger(ctx).Infof("Leader running until leadership lost.")
for {
- nst, err := w.get(ctx)
+ nst, err := w.Get(ctx)
if err != nil {
return fmt.Errorf("getting election status after starting listener failed, bailing just in case: %w", err)
}
@@ -152,7 +153,7 @@
case st.follower != nil:
supervisor.Logger(ctx).Infof("Follower running until leadership change.")
for {
- nst, err := w.get(ctx)
+ nst, err := w.Get(ctx)
if err != nil {
return fmt.Errorf("getting election status after starting listener failed, bailing just in case: %w", err)
}
diff --git a/metropolis/node/core/network/hostsfile/hostsfile.go b/metropolis/node/core/network/hostsfile/hostsfile.go
index 8b5eb9a..a4455ec 100644
--- a/metropolis/node/core/network/hostsfile/hostsfile.go
+++ b/metropolis/node/core/network/hostsfile/hostsfile.go
@@ -2,12 +2,12 @@
// files/interfaces used by the system to resolve the local node's name and the
// names of other nodes in the cluster:
//
-// 1. All cluster node names are written into /etc/hosts for DNS resolution.
-// 2. The local node's name is written into /etc/machine-id.
-// 3. The local node's name is set as the UNIX hostname of the machine (via the
-// sethostname call).
-// 4. The local node's ClusterDirectory is updated with the same set of
-// addresses as the one used in /etc/hosts.
+// 1. All cluster node names are written into /etc/hosts for DNS resolution.
+// 2. The local node's name is written into /etc/machine-id.
+// 3. The local node's name is set as the UNIX hostname of the machine (via the
+// sethostname call).
+// 4. The local node's ClusterDirectory is updated with the same set of
+// addresses as the one used in /etc/hosts.
//
// The hostsfile Service can start up in two modes: with cluster connectivity
// and without cluster connectivity. Without cluster connectivity, only
@@ -137,7 +137,7 @@
cmw := s.Roleserver.ClusterMembership.Watch()
defer cmw.Close()
supervisor.Logger(ctx).Infof("Waiting for node ID...")
- nodeID, err := cmw.GetNodeID(ctx)
+ nodeID, err := roleserve.GetNodeID(ctx, cmw)
if err != nil {
return err
}
@@ -261,7 +261,7 @@
defer cmw.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := cmw.GetHome(ctx)
+ cm, err := cmw.Get(ctx, roleserve.FilterHome())
if err != nil {
return err
}
diff --git a/metropolis/node/core/network/main.go b/metropolis/node/core/network/main.go
index 45de6b2..8740d5a 100644
--- a/metropolis/node/core/network/main.go
+++ b/metropolis/node/core/network/main.go
@@ -55,7 +55,7 @@
natTable *nftables.Table
natPostroutingChain *nftables.Chain
- status memory.Value
+ status memory.Value[*Status]
}
func New() *Service {
@@ -76,35 +76,12 @@
DNSServers dhcp4c.DNSServers
}
-// Watcher allows network Service consumers to watch for updates of the current
-// Status.
-type Watcher struct {
- watcher event.Watcher
-}
-
-// Get returns the newest network Status from a Watcher. It will block until a
-// new Status is available.
-func (w *Watcher) Get(ctx context.Context) (*Status, error) {
- val, err := w.watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- status := val.(Status)
- return &status, err
-}
-
-func (w *Watcher) Close() error {
- return w.watcher.Close()
-}
-
// Watch returns a Watcher, which can be used by consumers of the network
// Service to retrieve the current network status.
// Close must be called on the Watcher when it is not used anymore in order to
// prevent goroutine leaks.
-func (s *Service) Watch() Watcher {
- return Watcher{
- watcher: s.status.Watch(),
- }
+func (s *Service) Watch() event.Watcher[*Status] {
+ return s.status.Watch()
}
// ConfigureDNS sets a DNS ExtraDirective on the built-in DNS server of the
@@ -134,7 +111,7 @@
s.ConfigureDNS(dns.NewUpstreamDirective(newServers))
}
// Notify status waiters.
- s.status.Set(Status{
+ s.status.Set(&Status{
ExternalAddress: new.AssignedIP,
DNSServers: new.DNSServers(),
})
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 5a0e2f6..0d0997d 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -6,19 +6,18 @@
// cluster's curator, updates the status of the node within the curator, and
// spawns on-demand services.
//
-//
-// .-----------. .--------. Watches .------------.
-// | Cluster |--------->| Role |<----------| Node Roles |
-// | Enrolment | Provides | Server | Updates '------------'
-// '-----------' Data | |----. .-------------.
-// '--------' '----->| Node Status |
-// Spawns | | Spawns '-------------'
-// .-----' '-----.
-// V V
-// .-----------. .------------.
-// | Consensus | | Kubernetes |
-// | & Curator | | |
-// '-----------' '------------'
+// .-----------. .--------. Watches .------------.
+// | Cluster |--------->| Role |<----------| Node Roles |
+// | Enrolment | Provides | Server | Updates '------------'
+// '-----------' Data | |----. .-------------.
+// '--------' '----->| Node Status |
+// Spawns | | Spawns '-------------'
+// .-----' '-----.
+// V V
+// .-----------. .------------.
+// | Consensus | | Kubernetes |
+// | & Curator | | |
+// '-----------' '------------'
//
// The internal state of the Role Server (eg. status of services, input from
// Cluster Enrolment, current node roles as retrieved from the cluster) is
@@ -38,7 +37,6 @@
// or remote). It is updated both by external processes (ie. data from the
// Cluster Enrolment) as well as logic responsible for spawning the control
// plane.
-//
package roleserve
import (
@@ -50,6 +48,7 @@
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/core/rpc/resolver"
+ "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -76,10 +75,10 @@
type Service struct {
Config
- ClusterMembership ClusterMembershipValue
- KubernetesStatus KubernetesStatusValue
- bootstrapData bootstrapDataValue
- localRoles localRolesValue
+ ClusterMembership memory.Value[*ClusterMembership]
+ KubernetesStatus memory.Value[*KubernetesStatus]
+ bootstrapData memory.Value[*bootstrapData]
+ localRoles memory.Value[*cpb.NodeRoles]
controlPlane *workerControlPlane
statusPush *workerStatusPush
@@ -141,11 +140,11 @@
// available on the loopback interface.
s.Resolver.AddOverride(nid, resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
- s.ClusterMembership.set(&ClusterMembership{
+ s.ClusterMembership.Set(&ClusterMembership{
pubkey: pubkey,
resolver: s.Resolver,
})
- s.bootstrapData.set(&bootstrapData{
+ s.bootstrapData.Set(&bootstrapData{
nodePrivateKey: privkey,
initialOwnerKey: iok,
clusterUnlockKey: cuk,
@@ -159,7 +158,7 @@
// available on the loopback interface.
s.Resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
- s.ClusterMembership.set(&ClusterMembership{
+ s.ClusterMembership.Set(&ClusterMembership{
remoteCurators: directory,
credentials: &credentials,
pubkey: credentials.PublicKey(),
@@ -172,7 +171,7 @@
// available on the loopback interface.
s.Resolver.AddOverride(credentials.ID(), resolver.NodeByHostPort("127.0.0.1", uint16(common.CuratorServicePort)))
- s.ClusterMembership.set(&ClusterMembership{
+ s.ClusterMembership.Set(&ClusterMembership{
remoteCurators: directory,
credentials: &credentials,
pubkey: credentials.PublicKey(),
diff --git a/metropolis/node/core/roleserve/value_bootstrapdata.go b/metropolis/node/core/roleserve/value_bootstrapdata.go
index 85618bc..29a6ae2 100644
--- a/metropolis/node/core/roleserve/value_bootstrapdata.go
+++ b/metropolis/node/core/roleserve/value_bootstrapdata.go
@@ -1,11 +1,7 @@
package roleserve
import (
- "context"
"crypto/ed25519"
-
- "source.monogon.dev/metropolis/pkg/event"
- "source.monogon.dev/metropolis/pkg/event/memory"
)
// bootstrapData is an internal EventValue structure which is populated by the
@@ -19,29 +15,3 @@
initialOwnerKey []byte
nodePrivateJoinKey ed25519.PrivateKey
}
-
-type bootstrapDataValue struct {
- value memory.Value
-}
-
-func (c *bootstrapDataValue) Watch() *bootstrapDataWatcher {
- return &bootstrapDataWatcher{
- Watcher: c.value.Watch(),
- }
-}
-
-func (c *bootstrapDataValue) set(v *bootstrapData) {
- c.value.Set(v)
-}
-
-type bootstrapDataWatcher struct {
- event.Watcher
-}
-
-func (c *bootstrapDataWatcher) Get(ctx context.Context) (*bootstrapData, error) {
- v, err := c.Watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- return v.(*bootstrapData), nil
-}
diff --git a/metropolis/node/core/roleserve/value_clustermembership.go b/metropolis/node/core/roleserve/value_clustermembership.go
index f14e2c4..e956d10 100644
--- a/metropolis/node/core/roleserve/value_clustermembership.go
+++ b/metropolis/node/core/roleserve/value_clustermembership.go
@@ -13,7 +13,6 @@
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/node/core/rpc/resolver"
"source.monogon.dev/metropolis/pkg/event"
- "source.monogon.dev/metropolis/pkg/event/memory"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -53,44 +52,14 @@
resolver *resolver.Resolver
}
-type ClusterMembershipValue struct {
- value memory.Value
-}
-
-func (c *ClusterMembershipValue) Watch() *ClusterMembershipWatcher {
- return &ClusterMembershipWatcher{
- w: c.value.Watch(),
- }
-}
-
-func (c *ClusterMembershipValue) set(v *ClusterMembership) {
- c.value.Set(v)
-}
-
-type ClusterMembershipWatcher struct {
- w event.Watcher
-}
-
-func (c *ClusterMembershipWatcher) Close() error {
- return c.w.Close()
-}
-
-func (c *ClusterMembershipWatcher) getAny(ctx context.Context) (*ClusterMembership, error) {
- v, err := c.w.Get(ctx)
- if err != nil {
- return nil, err
- }
- return v.(*ClusterMembership), nil
-}
-
// GetNodeID returns the Node ID of the locally running node whenever available.
// NodeIDs are available early on in the node startup process and are guaranteed
// to never change at runtime. The watcher will then block all further Get calls
// until new information is available. This method should only be used if
// GetNodeID is the only method ran on the watcher.
-func (c *ClusterMembershipWatcher) GetNodeID(ctx context.Context) (string, error) {
+func GetNodeID(ctx context.Context, watcher event.Watcher[*ClusterMembership]) (string, error) {
for {
- cm, err := c.getAny(ctx)
+ cm, err := watcher.Get(ctx)
if err != nil {
return "", err
}
@@ -105,20 +74,16 @@
// the cluster's Curator). See proto.common.ClusterState for more information
// about cluster states. The watcher will then block all futher Get calls until
// new information is available.
-func (c *ClusterMembershipWatcher) GetHome(ctx context.Context) (*ClusterMembership, error) {
- for {
- cm, err := c.getAny(ctx)
- if err != nil {
- return nil, err
- }
+func FilterHome() event.GetOption[*ClusterMembership] {
+ return event.Filter(func(cm *ClusterMembership) bool {
if cm.credentials == nil {
- continue
+ return false
}
if cm.remoteCurators == nil {
- continue
+ return false
}
- return cm, nil
- }
+ return true
+ })
}
// DialCurator returns an authenticated gRPC client connection to the Curator
diff --git a/metropolis/node/core/roleserve/value_kubernetes.go b/metropolis/node/core/roleserve/value_kubernetes.go
index 4334dc3..88bdfeb 100644
--- a/metropolis/node/core/roleserve/value_kubernetes.go
+++ b/metropolis/node/core/roleserve/value_kubernetes.go
@@ -1,11 +1,7 @@
package roleserve
import (
- "context"
-
"source.monogon.dev/metropolis/node/kubernetes"
- "source.monogon.dev/metropolis/pkg/event"
- "source.monogon.dev/metropolis/pkg/event/memory"
)
// KubernetesStatus is an Event Value structure populated by a running
@@ -14,33 +10,3 @@
type KubernetesStatus struct {
Svc *kubernetes.Service
}
-
-type KubernetesStatusValue struct {
- value memory.Value
-}
-
-func (k *KubernetesStatusValue) Watch() *KubernetesStatusWatcher {
- return &KubernetesStatusWatcher{
- Watcher: k.value.Watch(),
- }
-}
-
-func (k *KubernetesStatusValue) set(v *KubernetesStatus) {
- k.value.Set(v)
-}
-
-type KubernetesStatusWatcher struct {
- event.Watcher
-}
-
-// Get waits until the Kubernetes services is available. The returned
-// KubernetesStatus is guaranteed to contain a kubernetes.Service that was
-// running at the time of this call returning (but which might have since been
-// stopped).
-func (k *KubernetesStatusWatcher) Get(ctx context.Context) (*KubernetesStatus, error) {
- v, err := k.Watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- return v.(*KubernetesStatus), nil
-}
diff --git a/metropolis/node/core/roleserve/value_node.go b/metropolis/node/core/roleserve/value_node.go
index f095f2f..26050ee 100644
--- a/metropolis/node/core/roleserve/value_node.go
+++ b/metropolis/node/core/roleserve/value_node.go
@@ -1,36 +1 @@
package roleserve
-
-import (
- "context"
-
- "source.monogon.dev/metropolis/pkg/event"
- "source.monogon.dev/metropolis/pkg/event/memory"
- cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-type localRolesValue struct {
- value memory.Value
-}
-
-func (c *localRolesValue) Watch() *localRolesWatcher {
- return &localRolesWatcher{
- Watcher: c.value.Watch(),
- }
-}
-
-func (c *localRolesValue) set(v *cpb.NodeRoles) {
- c.value.Set(v)
-}
-
-type localRolesWatcher struct {
- event.Watcher
-}
-
-// Get retrieves the roles assigned to the local node by the cluster.
-func (c *localRolesWatcher) Get(ctx context.Context) (*cpb.NodeRoles, error) {
- v, err := c.Watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- return v.(*cpb.NodeRoles), nil
-}
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
index 5c8c4b2..354e67e 100644
--- a/metropolis/node/core/roleserve/worker_controlplane.go
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -44,11 +44,11 @@
storageRoot *localstorage.Root
// bootstrapData will be read.
- bootstrapData *bootstrapDataValue
+ bootstrapData *memory.Value[*bootstrapData]
// clusterMembership will be read and written.
- clusterMembership *ClusterMembershipValue
+ clusterMembership *memory.Value[*ClusterMembership]
// localRoles will be read.
- localRoles *localRolesValue
+ localRoles *memory.Value[*cpb.NodeRoles]
// resolver will be read and used to populate ClusterMembership.
resolver *resolver.Resolver
}
@@ -98,7 +98,7 @@
// |
// NodeRoles -M-> rolesC --------------'
//
- var startupV memory.Value
+ var startupV memory.Value[*controlPlaneStartup]
// Channels are used as intermediaries between map stages and the final reduce,
// which is okay as long as the entire tree restarts simultaneously (which we
@@ -126,7 +126,7 @@
w := s.clusterMembership.Watch()
defer w.Close()
for {
- v, err := w.GetHome(ctx)
+ v, err := w.Get(ctx, FilterHome())
if err != nil {
return err
}
@@ -264,11 +264,10 @@
// Read config from startupV.
w := startupV.Watch()
defer w.Close()
- startupI, err := w.Get(ctx)
+ startup, err := w.Get(ctx)
if err != nil {
return err
}
- startup := startupI.(*controlPlaneStartup)
// Start Control Plane if we have a config.
if startup.consensusConfig == nil {
@@ -424,7 +423,7 @@
// We now have a locally running ControlPlane. Reflect that in a new
// ClusterMembership.
- s.clusterMembership.set(&ClusterMembership{
+ s.clusterMembership.Set(&ClusterMembership{
localConsensus: con,
localCurator: cur,
credentials: creds,
@@ -440,11 +439,10 @@
// Not restarting on every single change prevents us from going in a
// ClusterMembership -> ClusterDirectory -> ClusterMembership thrashing loop.
for {
- ncI, err := w.Get(ctx)
+ nc, err := w.Get(ctx)
if err != nil {
return err
}
- nc := ncI.(*controlPlaneStartup)
if nc.changed(startup) {
supervisor.Logger(ctx).Infof("Configuration changed, restarting...")
return fmt.Errorf("config changed, restarting")
diff --git a/metropolis/node/core/roleserve/worker_heartbeat.go b/metropolis/node/core/roleserve/worker_heartbeat.go
index db06845..fdaa9be 100644
--- a/metropolis/node/core/roleserve/worker_heartbeat.go
+++ b/metropolis/node/core/roleserve/worker_heartbeat.go
@@ -9,6 +9,7 @@
"source.monogon.dev/metropolis/node/core/curator"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -18,7 +19,7 @@
network *network.Service
// clusterMembership will be read.
- clusterMembership *ClusterMembershipValue
+ clusterMembership *memory.Value[*ClusterMembership]
}
func (s *workerHeartbeat) run(ctx context.Context) error {
@@ -28,7 +29,7 @@
w := s.clusterMembership.Watch()
defer w.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := w.GetHome(ctx)
+ cm, err := w.Get(ctx, FilterHome())
if err != nil {
return err
}
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index 3be3c41..e31cf59 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -5,6 +5,7 @@
"fmt"
"net"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/kubernetes"
@@ -26,9 +27,9 @@
network *network.Service
storageRoot *localstorage.Root
- localRoles *localRolesValue
- clusterMembership *ClusterMembershipValue
- kubernetesStatus *KubernetesStatusValue
+ localRoles *memory.Value[*cpb.NodeRoles]
+ clusterMembership *memory.Value[*ClusterMembership]
+ kubernetesStatus *memory.Value[*KubernetesStatus]
}
// kubernetesStartup is used internally to provide a reduced (as in MapReduce
@@ -64,7 +65,7 @@
// |
// NodeRoles -M-> rolesC --------------'
//
- var startupV memory.Value
+ var startupV memory.Value[*kubernetesStartup]
clusterMembershipC := make(chan *ClusterMembership)
rolesC := make(chan *cpb.NodeRoles)
@@ -76,7 +77,7 @@
w := s.clusterMembership.Watch()
defer w.Close()
for {
- v, err := w.GetHome(ctx)
+ v, err := w.Get(ctx, FilterHome())
if err != nil {
return err
}
@@ -127,11 +128,11 @@
// KubernetesController local role.
var d *kubernetesStartup
for {
- dV, err := w.Get(ctx)
+ var err error
+ d, err = w.Get(ctx)
if err != nil {
return err
}
- d = dV.(*kubernetesStartup)
supervisor.Logger(ctx).Infof("Got new startup data.")
if d.roles.KubernetesController == nil {
supervisor.Logger(ctx).Infof("No Kubernetes role, not starting.")
@@ -144,10 +145,10 @@
break
}
- supervisor.Logger(ctx).Infof("Waiting for local consensus...")
+ supervisor.Logger(ctx).Infof("Waiting for local consensus (%+v)...")
cstW := d.membership.localConsensus.Watch()
defer cstW.Close()
- cst, err := cstW.GetRunning(ctx)
+ cst, err := cstW.Get(ctx, consensus.FilterRunning)
if err != nil {
return fmt.Errorf("waiting for local consensus: %w", err)
}
@@ -197,7 +198,7 @@
}
// Let downstream know that Kubernetes is running.
- s.kubernetesStatus.set(&KubernetesStatus{
+ s.kubernetesStatus.Set(&KubernetesStatus{
Svc: kubeSvc,
})
@@ -206,11 +207,10 @@
// Restart everything if we get a significantly different config (ie., a config
// whose change would/should either turn up or tear down Kubernetes).
for {
- ncI, err := w.Get(ctx)
+ nc, err := w.Get(ctx)
if err != nil {
return err
}
- nc := ncI.(*kubernetesStartup)
if nc.changed(d) {
supervisor.Logger(ctx).Errorf("watcher got new config, restarting")
return fmt.Errorf("restarting")
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
index 3856f62..5f64676 100644
--- a/metropolis/node/core/roleserve/worker_rolefetch.go
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -5,24 +5,26 @@
"fmt"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
)
// workerRoleFetch is the Role Fetcher, an internal bookkeeping service
// responsible for populating localRoles based on a clusterMembership whenever
// the node is HOME and cluster credentials / curator access is available.
type workerRoleFetch struct {
- clusterMembership *ClusterMembershipValue
+ clusterMembership *memory.Value[*ClusterMembership]
// localRoles will be written.
- localRoles *localRolesValue
+ localRoles *memory.Value[*cpb.NodeRoles]
}
func (s *workerRoleFetch) run(ctx context.Context) error {
w := s.clusterMembership.Watch()
defer w.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
- cm, err := w.GetHome(ctx)
+ cm, err := w.Get(ctx, FilterHome())
if err != nil {
return err
}
@@ -70,7 +72,7 @@
if n.Roles.KubernetesWorker != nil {
supervisor.Logger(ctx).Infof(" - kubernetes worker")
}
- s.localRoles.set(n.Roles)
+ s.localRoles.Set(n.Roles)
break
}
}
diff --git a/metropolis/node/core/roleserve/worker_statuspush.go b/metropolis/node/core/roleserve/worker_statuspush.go
index 732508a..ed806a0 100644
--- a/metropolis/node/core/roleserve/worker_statuspush.go
+++ b/metropolis/node/core/roleserve/worker_statuspush.go
@@ -10,6 +10,7 @@
common "source.monogon.dev/metropolis/node"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -20,7 +21,7 @@
network *network.Service
// clusterMembership will be read.
- clusterMembership *ClusterMembershipValue
+ clusterMembership *memory.Value[*ClusterMembership]
}
// workerStatusPushChannels contain all the channels between the status pusher's
@@ -142,7 +143,7 @@
defer w.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
for {
- cm, err := w.GetHome(ctx)
+ cm, err := w.Get(ctx, FilterHome())
if err != nil {
return fmt.Errorf("getting cluster membership status failed: %w", err)
}
diff --git a/metropolis/pkg/event/etcd/etcd.go b/metropolis/pkg/event/etcd/etcd.go
index d49a592..14dfd99 100644
--- a/metropolis/pkg/event/etcd/etcd.go
+++ b/metropolis/pkg/event/etcd/etcd.go
@@ -19,17 +19,17 @@
// artificially, as there currently is no code path that needs this to be
// strictly true. However, users of this library might want to rely on the
// Value type instead of particular Value implementations.
- _ event.ValueWatch = &Value{}
+ _ event.ValueWatch[StringAt] = &Value[StringAt]{}
)
// Value is an 'Event Value' backed in an etcd cluster, accessed over an
// etcd client. This is a stateless handle and can be copied and shared across
// goroutines.
-type Value struct {
+type Value[T any] struct {
+ decoder func(key, value []byte) (T, error)
etcd client.Namespaced
key string
keyEnd string
- decoder BytesDecoder
}
type Option struct {
@@ -67,12 +67,12 @@
// NewValue creates a new Value for a given key(s) in an etcd client. The
// given decoder will be used to convert bytes retrieved from etcd into the
// interface{} value retrieved by Get by this value's watcher.
-func NewValue(etcd client.Namespaced, key string, decoder BytesDecoder, options ...*Option) *Value {
- res := &Value{
+func NewValue[T any](etcd client.Namespaced, key string, decoder func(key, value []byte) (T, error), options ...*Option) *Value[T] {
+ res := &Value[T]{
+ decoder: decoder,
etcd: etcd,
key: key,
keyEnd: key,
- decoder: decoder,
}
for _, opt := range options {
@@ -84,26 +84,25 @@
return res
}
-// BytesDecoder is a function that converts bytes retrieved from etcd into an
-// end-user facing value. Additionally, a key is available so that returned
-// values can be augmented with the location they were retrieved from. This is
-// especially useful when returning values resulting from an etcd range.
-//
-// If an error is returned, the Get call performed on a watcher configured with
-// this decoder will fail, swallowing that particular update, but the watcher
-// will continue to work. Any provided BytesDecoder implementations must be safe
-// to copy.
-type BytesDecoder = func(key []byte, data []byte) (interface{}, error)
-
-// NoDecoder is a no-op decoder which passes through the retrieved bytes as a
-// []byte type to the user.
-func NoDecoder(key []byte, data []byte) (interface{}, error) {
- return data, nil
+func DecoderNoop(_, value []byte) ([]byte, error) {
+ return value, nil
}
-func (e *Value) Watch() event.Watcher {
+func DecoderStringAt(key, value []byte) (StringAt, error) {
+ return StringAt{
+ Key: string(key),
+ Value: string(value),
+ }, nil
+}
+
+type StringAt struct {
+ Key string
+ Value string
+}
+
+func (e *Value[T]) Watch() event.Watcher[T] {
ctx, ctxC := context.WithCancel(context.Background())
- return &watcher{
+ return &watcher[T]{
Value: *e,
ctx: ctx,
@@ -115,9 +114,9 @@
}
}
-type watcher struct {
+type watcher[T any] struct {
// Value copy, used to configure the behaviour of this watcher.
- Value
+ Value[T]
// ctx is the context that expresses the liveness of this watcher. It is
// canceled when the watcher is closed, and the etcd Watch hangs off of it.
@@ -163,7 +162,7 @@
// setup initiates wc (the watch channel from etcd) after retrieving the initial
// value(s) with a get operation.
-func (w *watcher) setup(ctx context.Context) error {
+func (w *watcher[T]) setup(ctx context.Context) error {
if w.wc != nil {
return nil
}
@@ -231,7 +230,7 @@
// backfill blocks until a backlog of items is available. An error is returned
// if the context is canceled.
-func (w *watcher) backfill(ctx context.Context) error {
+func (w *watcher[T]) backfill(ctx context.Context) error {
// Keep watching for watch events.
for {
var resp *clientv3.WatchResponse
@@ -327,44 +326,16 @@
backlogOnly bool
}
-var (
- // BacklogOnly will prevent Get from blocking on waiting for more updates from
- // etcd, by instead returning BacklogDone whenever no more data is currently
- // locally available. This is different however, from establishing that there
- // are no more pending updates from the etcd cluster - the only way to ensure
- // the local client is up to date is by performing Get calls without this option
- // set.
- //
- // This mode of retrieval should only be used for the retrieval of the existing
- // data in the etcd cluster on the initial creation of the Watcher (by
- // repeatedly calling Get until BacklogDone isreturned), and shouldn't be set
- // for any subsequent call. Any use of this option after that initial fetch is
- // undefined behaviour that exposes the internals of the Get implementation, and
- // must not be relied on. However, in the future, this behaviour might be
- // formalized.
- //
- // This mode is particularly useful for ranged watchers. Non-ranged watchers can
- // still use this option to distinguish between blocking because of the
- // nonexistence of an object vs. blocking because of networking issues. However,
- // non-ranged retrieval semantics generally will rarely need to make this
- // distinction.
- BacklogOnly = GetOption{backlogOnly: true}
-
- // BacklogDone is returned by Get when BacklogOnly is set and there is no more
- // event data stored in the Watcher client, ie. when the initial cluster state
- // of the requested key has been retrieved.
- BacklogDone = errors.New("no more backlogged data")
-)
-
// Get implements the Get method of the Watcher interface.
// It can return an error in three cases:
-// - the given context is canceled (in which case, the given error will wrap
-// the context error)
-// - the watcher's BytesDecoder returned an error (in which case the error
-// returned by the BytesDecoder will be returned verbatim)
-// - it has been called with BacklogOnly and the Watcher has no more local
-// event data to return (see BacklogOnly for more information on the
-// semantics of this mode of operation)
+// - the given context is canceled (in which case, the given error will wrap
+// the context error)
+// - the watcher's BytesDecoder returned an error (in which case the error
+// returned by the BytesDecoder will be returned verbatim)
+// - it has been called with BacklogOnly and the Watcher has no more local
+// event data to return (see BacklogOnly for more information on the
+// semantics of this mode of operation)
+//
// Note that transient and permanent etcd errors are never returned, and the
// Get call will attempt to recover from these errors as much as possible. This
// also means that the user of the Watcher will not be notified if the
@@ -377,51 +348,72 @@
// TODO(q3k): implement internal, limited buffering for backlogged data not yet
// consumed by client, as etcd client library seems to use an unbound buffer in
// case this happens ( see: watcherStream.buf in clientv3).
-func (w *watcher) Get(ctx context.Context, opts ...event.GetOption) (interface{}, error) {
+func (w *watcher[T]) Get(ctx context.Context, opts ...event.GetOption[T]) (T, error) {
+ var empty T
select {
case w.getSem <- struct{}{}:
default:
- return nil, fmt.Errorf("cannot Get() concurrently on a single waiter")
+ return empty, fmt.Errorf("cannot Get() concurrently on a single waiter")
}
defer func() {
<-w.getSem
}()
backlogOnly := false
- for _, optI := range opts {
- opt, ok := optI.(GetOption)
- if !ok {
- return nil, fmt.Errorf("get options must be of type etcd.GetOption")
+ var predicate func(t T) bool
+ for _, opt := range opts {
+ if opt.Predicate != nil {
+ predicate = opt.Predicate
}
- if opt.backlogOnly {
+ if opt.BacklogOnly {
backlogOnly = true
}
}
+ ranged := w.key != w.keyEnd
+ if ranged && predicate != nil {
+ return empty, errors.New("filtering unimplemented for ranged etcd values")
+ }
+ if backlogOnly && predicate != nil {
+ return empty, errors.New("filtering unimplemented for backlog-only requests")
+ }
+
+ for {
+ v, err := w.getUnlocked(ctx, ranged, backlogOnly)
+ if err != nil {
+ return empty, err
+ }
+ if predicate == nil || predicate(v) {
+ return v, nil
+ }
+ }
+}
+
+func (w *watcher[T]) getUnlocked(ctx context.Context, ranged, backlogOnly bool) (T, error) {
+ var empty T
// Early check for context cancelations, preventing spurious contact with etcd
// if there's no need to.
if w.ctx.Err() != nil {
- return nil, w.ctx.Err()
+ return empty, w.ctx.Err()
}
if err := w.setup(ctx); err != nil {
- return nil, fmt.Errorf("when setting up watcher: %w", err)
+ return empty, fmt.Errorf("when setting up watcher: %w", err)
}
if backlogOnly && len(w.backlogged) == 0 {
- return nil, BacklogDone
+ return empty, event.BacklogDone
}
// Update backlog from etcd if needed.
if len(w.backlogged) < 1 {
err := w.backfill(ctx)
if err != nil {
- return nil, fmt.Errorf("when watching for new value: %w", err)
+ return empty, fmt.Errorf("when watching for new value: %w", err)
}
}
// Backlog is now guaranteed to contain at least one element.
- ranged := w.key != w.keyEnd
if !ranged {
// For non-ranged queries, drain backlog fully.
if len(w.backlogged) != 1 {
@@ -440,7 +432,7 @@
}
}
-func (w *watcher) Close() error {
+func (w *watcher[T]) Close() error {
w.ctxC()
return nil
}
diff --git a/metropolis/pkg/event/etcd/etcd_test.go b/metropolis/pkg/event/etcd/etcd_test.go
index 13f6ea8..81aee51 100644
--- a/metropolis/pkg/event/etcd/etcd_test.go
+++ b/metropolis/pkg/event/etcd/etcd_test.go
@@ -52,18 +52,18 @@
// WG after it performs the initial retrieval of a value from etcd, but before
// it starts the watcher. This is used to test potential race conditions
// present between these two steps.
-func setRaceWg(w event.Watcher) *sync.WaitGroup {
+func setRaceWg[T any](w event.Watcher[T]) *sync.WaitGroup {
wg := sync.WaitGroup{}
- w.(*watcher).testRaceWG = &wg
+ w.(*watcher[T]).testRaceWG = &wg
return &wg
}
// setSetupWg creates a new WaitGroup and sets the given watcher to wait on
// thie WG after an etcd watch channel is created. This is used in tests to
// ensure that the watcher is fully created before it is tested.
-func setSetupWg(w event.Watcher) *sync.WaitGroup {
+func setSetupWg[T any](w event.Watcher[T]) *sync.WaitGroup {
wg := sync.WaitGroup{}
- w.(*watcher).testSetupWG = &wg
+ w.(*watcher[T]).testSetupWG = &wg
return &wg
}
@@ -152,7 +152,7 @@
// expect runs a Get on the given Watcher, ensuring the returned value is a
// given string.
-func expect(t *testing.T, w event.Watcher, value string) {
+func expect(t *testing.T, w event.Watcher[StringAt], value string) {
t.Helper()
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
@@ -162,7 +162,7 @@
t.Fatalf("Get: %v", err)
}
- if got, want := string(got.([]byte)), value; got != want {
+ if got, want := got.Value, value; got != want {
t.Errorf("Wanted value %q, got %q", want, got)
}
}
@@ -173,7 +173,7 @@
// blocks for 101 milliseconds). Thus, this function should be used sparingly
// and in tests that perform other baseline behaviour checks alongside this
// test.
-func expectTimeout(t *testing.T, w event.Watcher) {
+func expectTimeout[T any](t *testing.T, w event.Watcher[T]) {
t.Helper()
ctx, ctxC := context.WithTimeout(context.Background(), 100*time.Millisecond)
got, err := w.Get(ctx)
@@ -186,7 +186,7 @@
// wait wraps a watcher into a channel of strings, ensuring that the watcher
// never errors on Get calls and always returns strings.
-func wait(t *testing.T, w event.Watcher) (chan string, func()) {
+func wait(t *testing.T, w event.Watcher[StringAt]) (chan string, func()) {
t.Helper()
ctx, ctxC := context.WithCancel(context.Background())
@@ -201,7 +201,7 @@
if err != nil {
t.Fatalf("Get: %v", err)
}
- c <- string(got.([]byte))
+ c <- got.Value
}
}()
@@ -214,7 +214,7 @@
defer tc.close()
k := "test-simple"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
tc.put(t, k, "one")
watcher := value.Watch()
@@ -231,42 +231,27 @@
q, cancel := wait(t, watcher)
// Test will hang here if the above value does not receive the set "six".
+ log.Printf("a")
for el := range q {
+ log.Printf("%q", el)
if el == "six" {
break
}
}
+ log.Printf("b")
cancel()
}
-// stringAt is a helper type for testing ranged watchers. It's returned by a
-// watcher whose decoder is set to stringDecoder.
-type stringAt struct {
- key, value string
-}
-
-func stringAtDecoder(key, value []byte) (interface{}, error) {
- valueS := ""
- if value != nil {
- valueS = string(value)
- }
- return stringAt{
- key: string(key),
- value: valueS,
- }, nil
-}
-
// stringAtGet performs a Get from a Watcher, expecting a stringAt and updating
// the given map with the retrieved value.
-func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher, m map[string]string) {
+func stringAtGet(ctx context.Context, t *testing.T, w event.Watcher[StringAt], m map[string]string) {
t.Helper()
vr, err := w.Get(ctx)
if err != nil {
t.Fatalf("Get: %v", err)
}
- v := vr.(stringAt)
- m[v.key] = v.value
+ m[vr.Key] = vr.Value
}
// TestSimpleRange exercises the simplest behaviour of a ranged watcher,
@@ -277,7 +262,7 @@
ks := "test-simple-range/"
ke := "test-simple-range0"
- value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
tc.put(t, ks+"a", "one")
tc.put(t, ks+"b", "two")
tc.put(t, ks+"c", "three")
@@ -320,7 +305,7 @@
defer tc.close()
k := "test-cancel"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
tc.put(t, k, "one")
watcher := value.Watch()
@@ -355,7 +340,7 @@
defer tc.close()
k := "test-cancel-on-get"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
tc.put(t, k, "one")
@@ -419,7 +404,7 @@
tc.setEndpoints(0)
k := "test-client-reconnect"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
tc.put(t, k, "one")
watcher := value.Watch()
@@ -455,10 +440,10 @@
tcRest.setEndpoints(1, 2)
k := "test-client-partition"
- valueOne := NewValue(tcOne.namespaced, k, NoDecoder)
+ valueOne := NewValue(tcOne.namespaced, k, DecoderStringAt)
watcherOne := valueOne.Watch()
defer watcherOne.Close()
- valueRest := NewValue(tcRest.namespaced, k, NoDecoder)
+ valueRest := NewValue(tcRest.namespaced, k, DecoderStringAt)
watcherRest := valueRest.Watch()
defer watcherRest.Close()
@@ -489,7 +474,7 @@
k := "test-early-use"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -515,7 +500,7 @@
k := "test-remove"
tc.put(t, k, "one")
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -532,7 +517,7 @@
ks := "test-remove-range/"
ke := "test-remove-range0"
- value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
tc.put(t, ks+"a", "one")
tc.put(t, ks+"b", "two")
tc.put(t, ks+"c", "three")
@@ -573,7 +558,7 @@
tc.put(t, k, "one")
tc.remove(t, k)
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -602,13 +587,13 @@
// on Get, but that the watcher continues to work after the error has been
// returned.
func TestDecoder(t *testing.T) {
- decodeStringifiedNumbersDivisibleBy3 := func(_, data []byte) (interface{}, error) {
- num, err := strconv.ParseInt(string(data), 10, 64)
+ decoderDivisibleByThree := func(_, value []byte) (int64, error) {
+ num, err := strconv.ParseInt(string(value), 10, 64)
if err != nil {
- return nil, fmt.Errorf("not a valid number")
+ return 0, fmt.Errorf("not a valid number")
}
if (num % 3) != 0 {
- return nil, fmt.Errorf("not divisible by 3")
+ return 0, fmt.Errorf("not divisible by 3")
}
return num, nil
}
@@ -620,7 +605,7 @@
defer ctxC()
k := "test-decoder"
- value := NewValue(tc.namespaced, k, decodeStringifiedNumbersDivisibleBy3)
+ value := NewValue(tc.namespaced, k, decoderDivisibleByThree)
watcher := value.Watch()
defer watcher.Close()
tc.put(t, k, "3")
@@ -643,7 +628,7 @@
}
} else {
queue <- errOrInt{
- val: res.(int64),
+ val: res,
}
}
}
@@ -686,7 +671,7 @@
defer tc.close()
k := "test-backlog"
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
@@ -704,8 +689,7 @@
if err != nil {
t.Fatalf("Get() returned error before expected final value: %v", err)
}
- val := string(valB.([]byte))
- if val == "val-999" {
+ if valB.Value == "val-999" {
break
}
}
@@ -719,7 +703,7 @@
ks := "test-backlog-range/"
ke := "test-backlog-range0"
- value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
w := value.Watch()
defer w.Close()
@@ -764,30 +748,30 @@
k := "test-backlog-only"
tc.put(t, k, "initial")
- value := NewValue(tc.namespaced, k, NoDecoder)
+ value := NewValue(tc.namespaced, k, DecoderStringAt)
watcher := value.Watch()
defer watcher.Close()
- d, err := watcher.Get(ctx, BacklogOnly)
+ d, err := watcher.Get(ctx, event.BacklogOnly[StringAt]())
if err != nil {
t.Fatalf("First Get failed: %v", err)
}
- if want, got := "initial", string(d.([]byte)); want != got {
+ if want, got := "initial", d.Value; want != got {
t.Fatalf("First Get: wanted value %q, got %q", want, got)
}
// As expected, next call to Get with BacklogOnly fails - there truly is no new
// updates to emit.
- _, err = watcher.Get(ctx, BacklogOnly)
- if want, got := BacklogDone, err; want != got {
+ _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
+ if want, got := event.BacklogDone, err; want != got {
t.Fatalf("Second Get: wanted %v, got %v", want, got)
}
// Implementation detail: even though there is a new value ('second'),
// BacklogOnly will still return BacklogDone.
tc.put(t, k, "second")
- _, err = watcher.Get(ctx, BacklogOnly)
- if want, got := BacklogDone, err; want != got {
+ _, err = watcher.Get(ctx, event.BacklogOnly[StringAt]())
+ if want, got := event.BacklogDone, err; want != got {
t.Fatalf("Third Get: wanted %v, got %v", want, got)
}
@@ -796,7 +780,7 @@
if err != nil {
t.Fatalf("Fourth Get failed: %v", err)
}
- if want, got := "second", string(d.([]byte)); want != got {
+ if want, got := "second", d.Value; want != got {
t.Fatalf("Fourth Get: wanted value %q, got %q", want, got)
}
}
@@ -821,7 +805,7 @@
}
}
- value := NewValue(tc.namespaced, ks, stringAtDecoder, Range(ke))
+ value := NewValue(tc.namespaced, ks, DecoderStringAt, Range(ke))
w := value.Watch()
defer w.Close()
@@ -829,12 +813,11 @@
res := make(map[string]string)
// Run first Get - this is the barrier defining what's part of the backlog.
- g, err := w.Get(ctx, BacklogOnly)
+ g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
if err != nil {
t.Fatalf("Get: %v", err)
}
- kv := g.(stringAt)
- res[kv.key] = kv.value
+ res[g.Key] = g.Value
// These won't be part of the backlog.
tc.put(t, ks+"a", fmt.Sprintf("val-100"))
@@ -843,16 +826,15 @@
// Retrieve the rest of the backlog until BacklogDone is returned.
nUpdates := 1
for {
- g, err := w.Get(ctx, BacklogOnly)
- if err == BacklogDone {
+ g, err := w.Get(ctx, event.BacklogOnly[StringAt]())
+ if err == event.BacklogDone {
break
}
if err != nil {
t.Fatalf("Get: %v", err)
}
nUpdates += 1
- kv := g.(stringAt)
- res[kv.key] = kv.value
+ res[g.Key] = g.Value
}
// The backlog should've been compacted to just two entries at their newest
diff --git a/metropolis/pkg/event/event.go b/metropolis/pkg/event/event.go
index bc46525..ba9190c 100644
--- a/metropolis/pkg/event/event.go
+++ b/metropolis/pkg/event/event.go
@@ -20,7 +20,7 @@
// Values currently are kept in memory (see: MemoryValue), but a future
// implementation might exist for other storage backends, eg. etcd.
//
-// Background and intended use
+// # Background and intended use
//
// The Event Value library is intended to be used within Metropolis'
// supervisor-based runnables to communicate state changes to other runnables,
@@ -33,20 +33,20 @@
// Why not just channels?
//
// Plain channels have multiple deficiencies for this usecase:
-// - Strict FIFO behaviour: all values sent to a channel must be received, and
-// historic and newest data must be treated in the same way. This means that
-// a consumer of state changes must process all updates to the value as if
-// they are the newest, and unable to skip rapid updates when a system is
-// slowly settling due to a cascading state change.
-// - Implementation overhead: implementing an observer
-// registration/unregistration pattern is prone to programming bugs,
-// especially for features like always first sending the current state to a
-// new observer.
-// - Strict buffer size: due to their FIFO nature and the possibility of
-// consumers not receiving actively, channels would have to buffer all
-// existing updates, requiring some arbitrary best-guess channel buffer
-// sizing that would still not prevent blocking writes or data loss in a
-// worst case scenario.
+// - Strict FIFO behaviour: all values sent to a channel must be received, and
+// historic and newest data must be treated in the same way. This means that
+// a consumer of state changes must process all updates to the value as if
+// they are the newest, and unable to skip rapid updates when a system is
+// slowly settling due to a cascading state change.
+// - Implementation overhead: implementing an observer
+// registration/unregistration pattern is prone to programming bugs,
+// especially for features like always first sending the current state to a
+// new observer.
+// - Strict buffer size: due to their FIFO nature and the possibility of
+// consumers not receiving actively, channels would have to buffer all
+// existing updates, requiring some arbitrary best-guess channel buffer
+// sizing that would still not prevent blocking writes or data loss in a
+// worst case scenario.
//
// Or, in other words: Go channels are a synchronization primitive, not a
// ready-made solution to this problem. The Event Value implementation in fact
@@ -56,40 +56,30 @@
//
// Go's condition variable implementation doesn't fully address our needs
// either:
-// - No context/canceling support: once a condition is being Wait()ed on,
-// this cannot be interrupted. This is especially painful and unwieldy when
-// dealing with context-heavy code, such as Metropolis.
-// - Spartan API: expecting users to plainly use sync.Cond is risky, as the API
-// is fairly low-level.
-// - No solution for late consumers: late consumers (ones that missed the value
-// being set by a producer) would still have to implement logic in order to
-// find out such a value, as sync.Cond only supports what amounts to
-// edge-level triggers as part of its Broadcast/Signal system.
+// - No context/canceling support: once a condition is being Wait()ed on,
+// this cannot be interrupted. This is especially painful and unwieldy when
+// dealing with context-heavy code, such as Metropolis.
+// - Spartan API: expecting users to plainly use sync.Cond is risky, as the API
+// is fairly low-level.
+// - No solution for late consumers: late consumers (ones that missed the value
+// being set by a producer) would still have to implement logic in order to
+// find out such a value, as sync.Cond only supports what amounts to
+// edge-level triggers as part of its Broadcast/Signal system.
//
// It would be possible to implement MemoryValue using a sync.Cond internally,
// but such an implementation would likely be more complex than the current
// implementation based on channels and mutexes, as it would have to work
// around issues like lack of canceling, etc.
-//
-// Type safety
-//
-// The Value/Watcher interfaces are, unfortunately, implemented using
-// interface{}. There was an attempt to use Go's existing generic types facility
-// (interfaces) to solve this problem. However, with Type Parameters likely soon
-// appearing in mainline Go, this was not a priority, as that will fully solve
-// this problem without requiring mental gymnastics. For now, users of this
-// library will have to write some boilerplate code to allow consumers/watchers
-// to access the data in a a typesafe manner without assertions. See
-// ExampleValue_full for one possible approach to this.
package event
import (
"context"
+ "errors"
)
// A Value is an 'Event Value', some piece of data that can be updated ('Set')
// by Producers and retrieved by Consumers.
-type Value interface {
+type Value[T any] interface {
// Set updates the Value to the given data. It is safe to call this from
// multiple goroutines, including concurrently.
//
@@ -103,24 +93,24 @@
// All updates will be serialized in an arbitrary order - if multiple
// producers wish to perform concurrent actions to update the Value partially,
// this should be negotiated and serialized externally by the producers.
- Set(val interface{})
+ Set(val T)
// ValueWatch implements the Watch method. It is split out into another
// interface to allow some 'Event Values' to implement only the watch/read
// part, with the write side being implicit or defined by a more complex
- // interface then a simple Set().
- ValueWatch
+ // interface than a simple Set().
+ ValueWatch[T]
}
// ValueWatch is the read side of an 'Event Value', witch can by retrieved by
// Consumers by performing a Watch operation on it.
-type ValueWatch interface {
+type ValueWatch[T any] interface {
// Watch retrieves a Watcher that keeps track on the version of the data
// contained within the Value that was last seen by a consumer. Once a
// Watcher is retrieved, it can be used to then get the actual data stored
// within the Value, and to reliably retrieve updates to it without having
// to poll for changes.
- Watch() Watcher
+ Watch() Watcher[T]
}
// A Watcher keeps track of the last version of data seen by a consumer for a
@@ -128,11 +118,11 @@
// safe to use this type concurrently. However, it is safe to move/copy it
// across different goroutines, as long as no two goroutines access it
// simultaneously.
-type Watcher interface {
+type Watcher[T any] interface {
// Get blocks until a Value's data is available:
// - On first use of a Watcher, Get will return the data contained in the
// value at the time of calling .Watch(), or block if no data has been
- // .Set() on it yet. If a value has been Set() since the the initial
+ // .Set() on it yet. If a value has been Set() since the initial
// creation of the Watch() but before Get() is called for the first
// time, the first Get() call will immediately return the new value.
// - On subsequent uses of a Watcher, Get will block until the given Value
@@ -168,11 +158,51 @@
// continue skipping some updates.
// If multiple goroutines need to access the Value, they should each use
// their own Watcher.
- Get(context.Context, ...GetOption) (interface{}, error)
+ Get(context.Context, ...GetOption[T]) (T, error)
// Close must be called if the Watcher is not going to be used anymore -
// otherwise, a goroutine will leak.
Close() error
}
-type GetOption interface{}
+type GetOption[T any] struct {
+ Predicate func(t T) bool
+ BacklogOnly bool
+}
+
+func Filter[T any](pred func(T) bool) GetOption[T] {
+ return GetOption[T]{
+ Predicate: pred,
+ }
+}
+
+// BacklogOnly will prevent Get from blocking on waiting for more updates from
+// etcd, by instead returning BacklogDone whenever no more data is currently
+// locally available. This is different however, from establishing that there
+// are no more pending updates from the etcd cluster - the only way to ensure
+// the local client is up to date is by performing Get calls without this option
+// set.
+//
+// This mode of retrieval should only be used for the retrieval of the existing
+// data in the etcd cluster on the initial creation of the Watcher (by
+// repeatedly calling Get until BacklogDone is returned), and shouldn't be set
+// for any subsequent call. Any use of this option after that initial fetch is
+// undefined behaviour that exposes the internals of the Get implementation, and
+// must not be relied on. However, in the future, this behaviour might be
+// formalized.
+//
+// This mode is particularly useful for ranged watchers. Non-ranged watchers can
+// still use this option to distinguish between blocking because of the
+// nonexistence of an object vs. blocking because of networking issues. However,
+// non-ranged retrieval semantics generally will rarely need to make this
+// distinction.
+func BacklogOnly[T any]() GetOption[T] {
+ return GetOption[T]{BacklogOnly: true}
+}
+
+var (
+ // BacklogDone is returned by Get when BacklogOnly is set and there is no more
+ // event data stored in the Watcher client, ie. when the initial cluster state
+ // of the requested key has been retrieved.
+ BacklogDone = errors.New("no more backlogged data")
+)
diff --git a/metropolis/pkg/event/memory/BUILD.bazel b/metropolis/pkg/event/memory/BUILD.bazel
index da07dc3..d8c1990 100644
--- a/metropolis/pkg/event/memory/BUILD.bazel
+++ b/metropolis/pkg/event/memory/BUILD.bazel
@@ -15,5 +15,4 @@
"memory_test.go",
],
embed = [":memory"],
- deps = ["//metropolis/pkg/event"],
)
diff --git a/metropolis/pkg/event/memory/example_test.go b/metropolis/pkg/event/memory/example_test.go
index a119666..583650c 100644
--- a/metropolis/pkg/event/memory/example_test.go
+++ b/metropolis/pkg/event/memory/example_test.go
@@ -21,8 +21,6 @@
"fmt"
"net"
"time"
-
- "source.monogon.dev/metropolis/pkg/event"
)
// NetworkStatus is example data that will be stored in a Value.
@@ -31,35 +29,11 @@
DefaultGateway net.IP
}
-// NetworkStatusWatcher is a typesafe wrapper around a Watcher.
-type NetworkStatusWatcher struct {
- watcher event.Watcher
-}
-
-// Get wraps Watcher.Get and performs type assertion.
-func (s *NetworkStatusWatcher) Get(ctx context.Context) (*NetworkStatus, error) {
- val, err := s.watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- ns := val.(NetworkStatus)
- return &ns, nil
-}
-
// NetworkService is a fake/example network service that is responsible for
// communicating the newest information about a machine's network configuration
// to consumers/watchers.
type NetworkService struct {
- Provider Value
-}
-
-// Watch is a thin wrapper around Value.Watch that returns the typesafe
-// NetworkStatusWatcher wrapper.
-func (s *NetworkService) Watch() NetworkStatusWatcher {
- watcher := s.Provider.Watch()
- return NetworkStatusWatcher{
- watcher: watcher,
- }
+ Provider Value[NetworkStatus]
}
// Run pretends to execute the network service's main logic loop, in which it
@@ -113,7 +87,7 @@
// Run an /etc/hosts updater. It will watch for updates from the NetworkService
// about the current IP address of the node.
go func() {
- w := ns.Watch()
+ w := ns.Provider.Watch()
for {
status, err := w.Get(ctx)
if err != nil {
diff --git a/metropolis/pkg/event/memory/memory.go b/metropolis/pkg/event/memory/memory.go
index 0db2524..f0a2ab9 100644
--- a/metropolis/pkg/event/memory/memory.go
+++ b/metropolis/pkg/event/memory/memory.go
@@ -18,6 +18,7 @@
import (
"context"
+ "errors"
"fmt"
"sync"
@@ -29,25 +30,25 @@
// there currently is no code path that needs this to be strictly true. However,
// users of this library might want to rely on the Value type instead of
// particular Value implementations.
- _ event.Value = &Value{}
+ _ event.Value[int] = &Value[int]{}
)
// Value is a 'memory value', which implements a event.Value stored in memory.
// It is safe to construct an empty object of this type. However, this must not
// be copied.
-type Value struct {
+type Value[T any] struct {
// mu guards the inner, innerSet and watchers fields.
mu sync.RWMutex
// inner is the latest data Set on the Value. It is used to provide the
// newest version of the Set data to new watchers.
- inner interface{}
+ inner T
// innerSet is true when inner has been Set at least once. It is used to
// differentiate between a nil and unset value.
innerSet bool
// watchers is the list of watchers that should be updated when new data is
// Set. It will grow on every .Watch() and shrink any time a watcher is
// determined to have been closed.
- watchers []*watcher
+ watchers []*watcher[T]
// Sync, if set to true, blocks all .Set() calls on the Value until all
// Watchers derived from it actively .Get() the new value. This can be used
@@ -65,7 +66,7 @@
// multiple goroutines, including concurrently.
//
// For more information about guarantees, see event.Value.Set.
-func (m *Value) Set(val interface{}) {
+func (m *Value[T]) Set(val T) {
m.mu.Lock()
defer m.mu.Unlock()
@@ -75,7 +76,7 @@
// Go through all watchers, updating them on the new value and filtering out
// all closed watchers.
- newWatchers := make([]*watcher, 0, len(m.watchers))
+ newWatchers := make([]*watcher[T], 0, len(m.watchers))
for _, w := range m.watchers {
w := w
if w.closed() {
@@ -89,15 +90,15 @@
// watcher implements the event.Watcher interface for watchers returned by
// Value.
-type watcher struct {
+type watcher[T any] struct {
// activeReqC is a channel used to request an active submission channel
// from a pending Get function, if any.
- activeReqC chan chan interface{}
+ activeReqC chan chan T
// deadletterSubmitC is a channel used to communicate a value that
// attempted to be submitted via activeReqC. This will be received by the
// deadletter worker of this watcher and passed on to the next .Get call
// that occurs.
- deadletterSubmitC chan interface{}
+ deadletterSubmitC chan T
// getSem is a channel-based semaphore (which is of size 1, and thus in
// fact a mutex) that is used to ensure that only a single .Get() call is
@@ -112,10 +113,10 @@
// contained within the Value that was last seen by a consumer.
//
// For more information about guarantees, see event.Value.Watch.
-func (m *Value) Watch() event.Watcher {
- waiter := &watcher{
- activeReqC: make(chan chan interface{}),
- deadletterSubmitC: make(chan interface{}),
+func (m *Value[T]) Watch() event.Watcher[T] {
+ waiter := &watcher[T]{
+ activeReqC: make(chan chan T),
+ deadletterSubmitC: make(chan T),
close: make(chan struct{}),
getSem: make(chan struct{}, 1),
}
@@ -143,9 +144,9 @@
// It watches the deadletterSubmitC channel for updated data, and overrides
// previously received data. Then, when a .Get() begins to pend (and respond to
// activeReqC receives), the deadletter worker will deliver that value.
-func (m *watcher) deadletterWorker() {
+func (m *watcher[T]) deadletterWorker() {
// Current value, and flag to mark it as set (vs. nil).
- var cur interface{}
+ var cur T
var set bool
for {
@@ -186,7 +187,7 @@
}
// closed returns whether this watcher has been closed.
-func (m *watcher) closed() bool {
+func (m *watcher[T]) closed() bool {
select {
case _, ok := <-m.close:
if !ok {
@@ -198,7 +199,7 @@
}
// update is the high level update-this-watcher function called by Value.
-func (m *watcher) update(sync bool, val interface{}) {
+func (m *watcher[T]) update(sync bool, val T) {
// If synchronous delivery was requested, block until a watcher .Gets it.
if sync {
c := <-m.activeReqC
@@ -224,38 +225,37 @@
}
}
-func (m *watcher) Close() error {
+func (m *watcher[T]) Close() error {
close(m.deadletterSubmitC)
close(m.close)
return nil
}
-// GetOption is a memory.Get-specific option passed to Get. Currently no options
-// are implemented.
-type GetOption struct {
-}
-
// Get blocks until a Value's data is available. See event.Watcher.Get for
// guarantees and more information.
-func (m *watcher) Get(ctx context.Context, opts ...event.GetOption) (interface{}, error) {
+func (m *watcher[T]) Get(ctx context.Context, opts ...event.GetOption[T]) (T, error) {
// Make sure we're the only active .Get call.
+ var empty T
select {
case m.getSem <- struct{}{}:
default:
- return nil, fmt.Errorf("cannot Get() concurrently on a single waiter")
+ return empty, fmt.Errorf("cannot Get() concurrently on a single waiter")
}
defer func() {
<-m.getSem
}()
- for _, optI := range opts {
- _, ok := optI.(GetOption)
- if !ok {
- return nil, fmt.Errorf("get options must be of type memory.GetOption")
+ var predicate func(t T) bool
+ for _, opt := range opts {
+ if opt.Predicate != nil {
+ predicate = opt.Predicate
+ }
+ if opt.BacklogOnly != false {
+ return empty, errors.New("BacklogOnly is not implemented for memory watchers")
}
}
- c := make(chan interface{})
+ c := make(chan T)
// Start responding on activeReqC. This signals to .update and to the
// deadletter worker that we're ready to accept data updates.
@@ -285,9 +285,12 @@
for {
select {
case <-ctx.Done():
- return nil, ctx.Err()
+ return empty, ctx.Err()
case m.activeReqC <- c:
case val := <-c:
+ if predicate != nil && !predicate(val) {
+ continue
+ }
return val, nil
}
}
diff --git a/metropolis/pkg/event/memory/memory_test.go b/metropolis/pkg/event/memory/memory_test.go
index f4feb33..41f121e 100644
--- a/metropolis/pkg/event/memory/memory_test.go
+++ b/metropolis/pkg/event/memory/memory_test.go
@@ -28,7 +28,7 @@
// TestAsync exercises the high-level behaviour of a Value, in which a
// watcher is able to catch up to the newest Set value.
func TestAsync(t *testing.T) {
- p := Value{}
+ p := Value[int]{}
p.Set(0)
ctx := context.Background()
@@ -39,7 +39,7 @@
if err != nil {
t.Fatalf("Get: %v", err)
}
- if want, got := 0, val.(int); want != got {
+ if want, got := 0, val; want != got {
t.Fatalf("Value: got %d, wanted %d", got, want)
}
@@ -54,7 +54,7 @@
if err != nil {
t.Fatalf("Get: %v", err)
}
- if want, got := 100, val.(int); want != got {
+ if want, got := 100, val; want != got {
t.Fatalf("Value: got %d, wanted %d", got, want)
}
}
@@ -64,7 +64,7 @@
// This particular test ensures that .Set() calls to a Watcher result in a
// prefect log of updates being transmitted to a watcher.
func TestSync(t *testing.T) {
- p := Value{
+ p := Value[int]{
Sync: true,
}
values := make(chan int, 100)
@@ -79,7 +79,7 @@
if err != nil {
panic(err)
}
- values <- value.(int)
+ values <- value
}
}()
@@ -109,7 +109,7 @@
// This particular test ensures that .Set() calls actually block when a watcher
// is unattended.
func TestSyncBlocks(t *testing.T) {
- p := Value{
+ p := Value[int]{
Sync: true,
}
ctx := context.Background()
@@ -124,7 +124,7 @@
if err != nil {
t.Fatalf("Get: %v", err)
}
- if want, got := 0, value.(int); want != got {
+ if want, got := 0, value; want != got {
t.Fatalf("Got initial value %d, wanted %d", got, want)
}
@@ -160,7 +160,7 @@
t.Fatalf("Set() returned before Get()")
}
- if want, got := 1, value.(int); want != got {
+ if want, got := 1, value; want != got {
t.Fatalf("Wanted value %d, got %d", want, got)
}
@@ -175,7 +175,7 @@
// TestMultipleGets verifies that calling .Get() on a single watcher from two
// goroutines is prevented by returning an error in exactly one of them.
func TestMultipleGets(t *testing.T) {
- p := Value{}
+ p := Value[int]{}
ctx := context.Background()
w := p.Watch()
@@ -204,7 +204,7 @@
func TestConcurrency(t *testing.T) {
ctx := context.Background()
- p := Value{}
+ p := Value[int]{}
p.Set(0)
// Number of watchers to create.
@@ -236,10 +236,10 @@
}
// Ensure monotonicity of received data.
- if val.(int) <= prev {
+ if val <= prev {
done(fmt.Errorf("received out of order data: %d after %d", val, prev))
}
- prev = val.(int)
+ prev = val
// Quit when the final value is received.
if val == final {
@@ -274,7 +274,7 @@
// aborts that particular Get call, but also allows subsequent use of the same
// watcher.
func TestCanceling(t *testing.T) {
- p := Value{
+ p := Value[int]{
Sync: true,
}
@@ -316,7 +316,7 @@
func TestSetAfterWatch(t *testing.T) {
ctx := context.Background()
- p := Value{}
+ p := Value[int]{}
p.Set(0)
watcher := p.Watch()
@@ -326,7 +326,7 @@
if err != nil {
t.Fatalf("Get: %v", err)
}
- if want, got := 1, data.(int); want != got {
+ if want, got := 1, data; want != got {
t.Errorf("Get should've returned %v, got %v", want, got)
}
}
diff --git a/metropolis/pkg/pki/crl.go b/metropolis/pkg/pki/crl.go
index 8b886bf..23838a1 100644
--- a/metropolis/pkg/pki/crl.go
+++ b/metropolis/pkg/pki/crl.go
@@ -145,8 +145,8 @@
// WatchCRL returns and Event Value compatible CRLWatcher which can be used to
// retrieve and watch for the newest CRL available from this CA certificate.
-func (c *Certificate) WatchCRL(cl client.Namespaced) CRLWatcher {
- value := etcd.NewValue(cl, c.crlPath(), func(_, data []byte) (interface{}, error) {
+func (c *Certificate) WatchCRL(cl client.Namespaced) event.Watcher[*CRL] {
+ value := etcd.NewValue(cl, c.crlPath(), func(_, data []byte) (*CRL, error) {
crl, err := x509.ParseCRL(data)
if err != nil {
return nil, fmt.Errorf("could not parse CRL from etcd: %w", err)
@@ -156,29 +156,10 @@
List: crl,
}, nil
})
- return CRLWatcher{value.Watch()}
-}
-
-// CRLWatcher is a Event Value compatible Watcher which will be updated any time
-// a given CA certificate's CRL gets updated.
-type CRLWatcher struct {
- event.Watcher
+ return value.Watch()
}
type CRL struct {
Raw []byte
List *pkix.CertificateList
}
-
-// Retrieve the newest available CRL from etcd, blocking until one is available
-// or updated.
-//
-// The first call will block until a CRL is available, which happens the first
-// time a given CA certificate is stored in etcd (eg. through an Ensure call).
-func (c *CRLWatcher) Get(ctx context.Context, opts ...event.GetOption) (*CRL, error) {
- v, err := c.Watcher.Get(ctx, opts...)
- if err != nil {
- return nil, err
- }
- return v.(*CRL), nil
-}