blob: 00972de02acf32ecc1b85b8508a4a10e6f893efd [file] [log] [blame]
Serge Bazanski4abeb132022-10-11 11:32:19 +02001package server
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "net"
8 "os"
9
Serge Bazanski42f13462023-04-19 15:00:06 +020010 "github.com/cenkalti/backoff/v4"
Serge Bazanski4abeb132022-10-11 11:32:19 +020011 "google.golang.org/grpc"
12 "google.golang.org/grpc/reflection"
13 "k8s.io/klog/v2"
14
15 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanskic50f6942023-04-24 18:27:22 +020016 "source.monogon.dev/cloud/bmaas/bmdb/metrics"
Serge Bazanski77628312023-02-15 23:33:22 +010017 "source.monogon.dev/cloud/bmaas/bmdb/webug"
Serge Bazanski4abeb132022-10-11 11:32:19 +020018 apb "source.monogon.dev/cloud/bmaas/server/api"
19 "source.monogon.dev/cloud/lib/component"
20)
21
22type Config struct {
23 Component component.ComponentConfig
24 BMDB bmdb.BMDB
Serge Bazanski77628312023-02-15 23:33:22 +010025 Webug webug.Config
Serge Bazanski4abeb132022-10-11 11:32:19 +020026
27 // PublicListenAddress is the address at which the 'public' (agent-facing) gRPC
28 // server listener will run.
29 PublicListenAddress string
30}
31
32// TODO(q3k): factor this out to BMDB library?
33func runtimeInfo() string {
34 hostname, _ := os.Hostname()
35 if hostname == "" {
36 hostname = "UNKNOWN"
37 }
38 return fmt.Sprintf("host %s", hostname)
39}
40
41func (c *Config) RegisterFlags() {
42 c.Component.RegisterFlags("srv")
43 c.BMDB.ComponentName = "srv"
44 c.BMDB.RuntimeInfo = runtimeInfo()
45 c.BMDB.Database.RegisterFlags("bmdb")
Serge Bazanski77628312023-02-15 23:33:22 +010046 c.Webug.RegisterFlags()
Serge Bazanski4abeb132022-10-11 11:32:19 +020047
48 flag.StringVar(&c.PublicListenAddress, "srv_public_grpc_listen_address", ":8080", "Address to listen at for public/user gRPC connections for bmdbsrv")
49}
50
51type Server struct {
52 Config Config
53
54 // ListenGRPC will contain the address at which the internal gRPC server is
55 // listening after .Start() has been called. This can differ from the configured
56 // value if the configuration requests any port (via :0).
57 ListenGRPC string
58 // ListenPublic will contain the address at which the 'public' (agent-facing)
59 // gRPC server is lsitening after .Start() has been called.
60 ListenPublic string
61
62 bmdb *bmdb.Connection
63 acsvc *agentCallbackService
Serge Bazanski42f13462023-04-19 15:00:06 +020064
65 sessionC chan *bmdb.Session
66}
67
68// sessionWorker emits a valid BMDB session to sessionC as long as ctx is active.
69func (s *Server) sessionWorker(ctx context.Context) {
70 var session *bmdb.Session
71 for {
72 if session == nil || session.Expired() {
73 klog.Infof("Starting new session...")
74 bo := backoff.NewExponentialBackOff()
75 err := backoff.Retry(func() error {
76 var err error
Serge Bazanskic50f6942023-04-24 18:27:22 +020077 session, err = s.bmdb.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorBMSRV})
Serge Bazanski42f13462023-04-19 15:00:06 +020078 if err != nil {
79 klog.Errorf("Failed to start session: %v", err)
80 return err
81 } else {
82 return nil
83 }
84 }, backoff.WithContext(bo, ctx))
85 if err != nil {
86 // If something's really wrong just crash.
87 klog.Exitf("Gave up on starting session: %v", err)
88 }
89 klog.Infof("New session: %s", session.UUID)
90 }
91
92 select {
93 case <-ctx.Done():
94 return
95 case s.sessionC <- session:
96 }
97 }
98}
99
100func (s *Server) session(ctx context.Context) (*bmdb.Session, error) {
101 select {
102 case sess := <-s.sessionC:
103 return sess, nil
104 case <-ctx.Done():
105 return nil, ctx.Err()
106 }
Serge Bazanski4abeb132022-10-11 11:32:19 +0200107}
108
109func (s *Server) startPublic(ctx context.Context) {
110 g := grpc.NewServer(s.Config.Component.GRPCServerOptionsPublic()...)
111 lis, err := net.Listen("tcp", s.Config.PublicListenAddress)
112 if err != nil {
113 klog.Exitf("Could not listen: %v", err)
114 }
115 s.ListenPublic = lis.Addr().String()
116 apb.RegisterAgentCallbackServer(g, s.acsvc)
117 reflection.Register(g)
118
119 klog.Infof("Public API listening on %s", s.ListenPublic)
120 go func() {
121 err := g.Serve(lis)
122 if err != ctx.Err() {
123 klog.Exitf("Public gRPC serve failed: %v", err)
124 }
125 }()
126}
127
128func (s *Server) startInternalGRPC(ctx context.Context) {
129 g := grpc.NewServer(s.Config.Component.GRPCServerOptions()...)
130 lis, err := net.Listen("tcp", s.Config.Component.GRPCListenAddress)
131 if err != nil {
132 klog.Exitf("Could not listen: %v", err)
133 }
134 s.ListenGRPC = lis.Addr().String()
135
136 reflection.Register(g)
137 klog.Infof("Internal gRPC listening on %s", s.ListenGRPC)
138 go func() {
139 err := g.Serve(lis)
140 if err != ctx.Err() {
141 klog.Exitf("Internal gRPC serve failed: %v", err)
142 }
143 }()
144}
145
146// Start the BMaaS Server in background goroutines. This should only be called
147// once. The process will exit with debug logs if starting the server failed.
148func (s *Server) Start(ctx context.Context) {
Serge Bazanskic50f6942023-04-24 18:27:22 +0200149 reg := s.Config.Component.PrometheusRegistry()
150 s.Config.BMDB.EnableMetrics(reg)
Serge Bazanskifbda89e2023-04-24 17:43:58 +0200151 s.Config.Component.StartPrometheus(ctx)
152
Serge Bazanski4abeb132022-10-11 11:32:19 +0200153 conn, err := s.Config.BMDB.Open(true)
154 if err != nil {
155 klog.Exitf("Failed to connect to BMDB: %v", err)
156 }
157 s.acsvc = &agentCallbackService{
158 s: s,
159 }
160 s.bmdb = conn
Serge Bazanski42f13462023-04-19 15:00:06 +0200161 s.sessionC = make(chan *bmdb.Session)
162 go s.sessionWorker(ctx)
Serge Bazanski4abeb132022-10-11 11:32:19 +0200163 s.startInternalGRPC(ctx)
164 s.startPublic(ctx)
Serge Bazanski77628312023-02-15 23:33:22 +0100165 go func() {
166 if err := s.Config.Webug.Start(ctx, conn); err != nil && err != ctx.Err() {
167 klog.Exitf("Failed to start webug: %v", err)
168 }
169 }()
Serge Bazanski4abeb132022-10-11 11:32:19 +0200170}