blob: e1f59d6ad55a90a839ac4c9088fadd3e78ddb568 [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package consensus
18
19import (
20 "context"
21 "fmt"
22 "git.monogon.dev/source/nexantic.git/core/internal/common"
23 "github.com/pkg/errors"
24 "go.etcd.io/etcd/clientv3"
25 "go.etcd.io/etcd/clientv3/namespace"
26 "go.etcd.io/etcd/embed"
27 "go.etcd.io/etcd/etcdserver/api/membership"
28 "go.etcd.io/etcd/pkg/types"
29 "go.etcd.io/etcd/proxy/grpcproxy/adapter"
30 "go.uber.org/zap"
31 "net/url"
32 "os"
33 "strings"
34)
35
36const (
37 DefaultClusterToken = "SIGNOS"
38 DefaultLogger = "zap"
39)
40
41type (
42 Service struct {
43 *common.BaseService
44
45 etcd *embed.Etcd
46 kv clientv3.KV
47 ready bool
48
49 config *Config
50 }
51
52 Config struct {
53 Name string
54 DataDir string
55 InitialCluster string
56 NewCluster bool
57
58 ExternalHost string
59 ListenHost string
60 ListenPort uint16
61 }
62
63 Member struct {
64 ID uint64
65 Name string
66 Address string
67 Synced bool
68 }
69)
70
71func NewConsensusService(config Config, logger *zap.Logger) (*Service, error) {
72 consensusServer := &Service{
73 config: &config,
74 }
75 consensusServer.BaseService = common.NewBaseService("consensus", logger, consensusServer)
76
77 return consensusServer, nil
78}
79
80func (s *Service) OnStart() error {
81 if s.config == nil {
82 return errors.New("config for consensus is nil")
83 }
84
85 cfg := embed.NewConfig()
86
87 // Reset LCUrls because we don't want to expose any client
88 cfg.LCUrls = nil
89
90 apURL, err := url.Parse(fmt.Sprintf("http://%s:%d", s.config.ExternalHost, s.config.ListenPort))
91 if err != nil {
92 return errors.Wrap(err, "invalid external_host or listen_port")
93 }
94
95 lpURL, err := url.Parse(fmt.Sprintf("http://%s:%d", s.config.ListenHost, s.config.ListenPort))
96 if err != nil {
97 return errors.Wrap(err, "invalid listen_host or listen_port")
98 }
99 cfg.APUrls = []url.URL{*apURL}
100 cfg.LPUrls = []url.URL{*lpURL}
101 cfg.ACUrls = []url.URL{}
102
103 cfg.Dir = s.config.DataDir
104 cfg.InitialClusterToken = DefaultClusterToken
105 cfg.Name = s.config.Name
106
107 // Only relevant if creating or joining a cluster; otherwise settings will be ignored
108 if s.config.NewCluster {
109 cfg.ClusterState = "new"
110 cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
111 } else if s.config.InitialCluster != "" {
112 cfg.ClusterState = "existing"
113 cfg.InitialCluster = s.config.InitialCluster
114 }
115
116 cfg.Logger = DefaultLogger
117
118 server, err := embed.StartEtcd(cfg)
119 if err != nil {
120 return err
121 }
122 s.etcd = server
123
124 // Override the logger
125 //*server.GetLogger() = *s.Logger.With(zap.String("component", "etcd"))
126
127 go func() {
128 s.Logger.Info("waiting for etcd to become ready")
129 <-s.etcd.Server.ReadyNotify()
130 s.ready = true
131 s.Logger.Info("etcd is now ready")
132 }()
133
134 // Inject kv client
135 s.kv = clientv3.NewKVFromKVClient(adapter.KvServerToKvClient(s.etcd.Server), nil)
136
137 return nil
138}
139
140func (s *Service) OnStop() error {
141 s.etcd.Close()
142
143 return nil
144}
145
146// IsProvisioned returns whether the node has been setup before and etcd has a data directory
147func (s *Service) IsProvisioned() bool {
148 _, err := os.Stat(s.config.DataDir)
149
150 return !os.IsNotExist(err)
151}
152
153// IsReady returns whether etcd is ready and synced
154func (s *Service) IsReady() bool {
155 return s.ready
156}
157
158// AddMember adds a new etcd member to the cluster
159func (s *Service) AddMember(ctx context.Context, name string, url string) (uint64, error) {
160 urls, err := types.NewURLs([]string{url})
161 if err != nil {
162 return 0, err
163 }
164
165 member := membership.NewMember(name, urls, DefaultClusterToken, nil)
166
167 _, err = s.etcd.Server.AddMember(ctx, *member)
168 if err != nil {
169 return 0, err
170 }
171
172 return uint64(member.ID), nil
173}
174
175// RemoveMember removes a member from the etcd cluster
176func (s *Service) RemoveMember(ctx context.Context, id uint64) error {
177 _, err := s.etcd.Server.RemoveMember(ctx, id)
178 return err
179}
180
181// Health returns the current cluster health
182func (s *Service) Health() {
183}
184
185// GetConfig returns the current consensus config
186func (s *Service) GetConfig() Config {
187 return *s.config
188}
189
190// SetConfig sets the consensus config. Changes are only applied when the service is restarted.
191func (s *Service) SetConfig(config Config) {
192 s.config = &config
193}
194
195// GetInitialClusterString returns the InitialCluster string that can be used to bootstrap a consensus node
196func (s *Service) GetInitialClusterString() string {
197 members := s.etcd.Server.Cluster().Members()
198 clusterString := strings.Builder{}
199
200 for i, m := range members {
201 if i != 0 {
202 clusterString.WriteString(",")
203 }
204 clusterString.WriteString(m.Name)
205 clusterString.WriteString("=")
206 clusterString.WriteString(m.PickPeerURL())
207 }
208
209 return clusterString.String()
210}
211
212// GetNodes returns a list of consensus nodes
213func (s *Service) GetNodes() []Member {
214 members := s.etcd.Server.Cluster().Members()
215 cMembers := make([]Member, len(members))
216 for i, m := range members {
217 cMembers[i] = Member{
218 ID: uint64(m.ID),
219 Name: m.Name,
220 Address: m.PickPeerURL(),
221 Synced: !m.IsLearner,
222 }
223 }
224
225 return cMembers
226}
227
228func (s *Service) GetStore(module, space string) clientv3.KV {
229 return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
230}