| Serge Bazanski | cf864da | 2024-07-31 11:23:34 +0000 | [diff] [blame] | 1 | package supervisor |
| 2 | |
| 3 | import ( |
| 4 | "sync" |
| 5 | "time" |
| 6 | ) |
| 7 | |
| 8 | // Metrics is an interface from the supervisor to any kind of metrics-collecting |
| 9 | // component. |
| 10 | type Metrics interface { |
| 11 | // NotifyNodeState is called whenever a given runnable at a given DN changes |
| 12 | // state. Called synchronously from the supervisor's processor loop, so must not |
| 13 | // block, but is also guaranteed to only be called from a single goroutine. |
| 14 | NotifyNodeState(dn string, state NodeState) |
| 15 | } |
| 16 | |
| 17 | // metricsFanout is used internally to fan out a single Metrics interface (which |
| 18 | // it implements) onto multiple subordinate Metrics interfaces (as provided by |
| 19 | // the user via WithMetrics). |
| 20 | type metricsFanout struct { |
| 21 | sub []Metrics |
| 22 | } |
| 23 | |
| 24 | func (m *metricsFanout) NotifyNodeState(dn string, state NodeState) { |
| 25 | for _, sub := range m.sub { |
| 26 | sub.NotifyNodeState(dn, state) |
| 27 | } |
| 28 | } |
| 29 | |
| 30 | // InMemoryMetrics is a simple Metrics implementation that keeps an in-memory |
| 31 | // mirror of the state of all DNs in the supervisor. The zero value for |
| 32 | // InMemoryMetrics is ready to use. |
| 33 | type InMemoryMetrics struct { |
| 34 | mu sync.RWMutex |
| 35 | dns map[string]DNState |
| 36 | } |
| 37 | |
| 38 | // DNState is the state of a supervisor runnable, recorded alongside a timestamp |
| 39 | // of when the State changed. |
| 40 | type DNState struct { |
| 41 | // State is the current state of the runnable. |
| 42 | State NodeState |
| 43 | // Transition is the time at which the runnable reached its State. |
| 44 | Transition time.Time |
| 45 | } |
| 46 | |
| 47 | func (m *InMemoryMetrics) NotifyNodeState(dn string, state NodeState) { |
| 48 | m.mu.Lock() |
| 49 | defer m.mu.Unlock() |
| 50 | if m.dns == nil { |
| 51 | m.dns = make(map[string]DNState) |
| 52 | } |
| 53 | m.dns[dn] = DNState{ |
| 54 | State: state, |
| 55 | Transition: time.Now(), |
| 56 | } |
| 57 | } |
| 58 | |
| 59 | // DNs returns a copy (snapshot in time) of the recorded DN states, in a map from |
| 60 | // DN to DNState. The returned value can be mutated. |
| 61 | func (m *InMemoryMetrics) DNs() map[string]DNState { |
| 62 | m.mu.RLock() |
| 63 | defer m.mu.RUnlock() |
| 64 | |
| 65 | res := make(map[string]DNState) |
| 66 | for k, v := range m.dns { |
| 67 | res[k] = v |
| 68 | } |
| 69 | return res |
| 70 | } |