blob: e1f59d6ad55a90a839ac4c9088fadd3e78ddb568 [file] [log] [blame]
// Copyright 2020 The Monogon Project Authors.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consensus
import (
"context"
"fmt"
"git.monogon.dev/source/nexantic.git/core/internal/common"
"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/namespace"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/pkg/types"
"go.etcd.io/etcd/proxy/grpcproxy/adapter"
"go.uber.org/zap"
"net/url"
"os"
"strings"
)
const (
DefaultClusterToken = "SIGNOS"
DefaultLogger = "zap"
)
type (
Service struct {
*common.BaseService
etcd *embed.Etcd
kv clientv3.KV
ready bool
config *Config
}
Config struct {
Name string
DataDir string
InitialCluster string
NewCluster bool
ExternalHost string
ListenHost string
ListenPort uint16
}
Member struct {
ID uint64
Name string
Address string
Synced bool
}
)
func NewConsensusService(config Config, logger *zap.Logger) (*Service, error) {
consensusServer := &Service{
config: &config,
}
consensusServer.BaseService = common.NewBaseService("consensus", logger, consensusServer)
return consensusServer, nil
}
func (s *Service) OnStart() error {
if s.config == nil {
return errors.New("config for consensus is nil")
}
cfg := embed.NewConfig()
// Reset LCUrls because we don't want to expose any client
cfg.LCUrls = nil
apURL, err := url.Parse(fmt.Sprintf("http://%s:%d", s.config.ExternalHost, s.config.ListenPort))
if err != nil {
return errors.Wrap(err, "invalid external_host or listen_port")
}
lpURL, err := url.Parse(fmt.Sprintf("http://%s:%d", s.config.ListenHost, s.config.ListenPort))
if err != nil {
return errors.Wrap(err, "invalid listen_host or listen_port")
}
cfg.APUrls = []url.URL{*apURL}
cfg.LPUrls = []url.URL{*lpURL}
cfg.ACUrls = []url.URL{}
cfg.Dir = s.config.DataDir
cfg.InitialClusterToken = DefaultClusterToken
cfg.Name = s.config.Name
// Only relevant if creating or joining a cluster; otherwise settings will be ignored
if s.config.NewCluster {
cfg.ClusterState = "new"
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
} else if s.config.InitialCluster != "" {
cfg.ClusterState = "existing"
cfg.InitialCluster = s.config.InitialCluster
}
cfg.Logger = DefaultLogger
server, err := embed.StartEtcd(cfg)
if err != nil {
return err
}
s.etcd = server
// Override the logger
//*server.GetLogger() = *s.Logger.With(zap.String("component", "etcd"))
go func() {
s.Logger.Info("waiting for etcd to become ready")
<-s.etcd.Server.ReadyNotify()
s.ready = true
s.Logger.Info("etcd is now ready")
}()
// Inject kv client
s.kv = clientv3.NewKVFromKVClient(adapter.KvServerToKvClient(s.etcd.Server), nil)
return nil
}
func (s *Service) OnStop() error {
s.etcd.Close()
return nil
}
// IsProvisioned returns whether the node has been setup before and etcd has a data directory
func (s *Service) IsProvisioned() bool {
_, err := os.Stat(s.config.DataDir)
return !os.IsNotExist(err)
}
// IsReady returns whether etcd is ready and synced
func (s *Service) IsReady() bool {
return s.ready
}
// AddMember adds a new etcd member to the cluster
func (s *Service) AddMember(ctx context.Context, name string, url string) (uint64, error) {
urls, err := types.NewURLs([]string{url})
if err != nil {
return 0, err
}
member := membership.NewMember(name, urls, DefaultClusterToken, nil)
_, err = s.etcd.Server.AddMember(ctx, *member)
if err != nil {
return 0, err
}
return uint64(member.ID), nil
}
// RemoveMember removes a member from the etcd cluster
func (s *Service) RemoveMember(ctx context.Context, id uint64) error {
_, err := s.etcd.Server.RemoveMember(ctx, id)
return err
}
// Health returns the current cluster health
func (s *Service) Health() {
}
// GetConfig returns the current consensus config
func (s *Service) GetConfig() Config {
return *s.config
}
// SetConfig sets the consensus config. Changes are only applied when the service is restarted.
func (s *Service) SetConfig(config Config) {
s.config = &config
}
// GetInitialClusterString returns the InitialCluster string that can be used to bootstrap a consensus node
func (s *Service) GetInitialClusterString() string {
members := s.etcd.Server.Cluster().Members()
clusterString := strings.Builder{}
for i, m := range members {
if i != 0 {
clusterString.WriteString(",")
}
clusterString.WriteString(m.Name)
clusterString.WriteString("=")
clusterString.WriteString(m.PickPeerURL())
}
return clusterString.String()
}
// GetNodes returns a list of consensus nodes
func (s *Service) GetNodes() []Member {
members := s.etcd.Server.Cluster().Members()
cMembers := make([]Member, len(members))
for i, m := range members {
cMembers[i] = Member{
ID: uint64(m.ID),
Name: m.Name,
Address: m.PickPeerURL(),
Synced: !m.IsLearner,
}
}
return cMembers
}
func (s *Service) GetStore(module, space string) clientv3.KV {
return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
}