blob: 1427eb9ed1593c6d1ab0b052e6bebad5ba32b6e9 [file] [log] [blame]
Serge Bazanskie6bc2272023-03-28 16:28:13 +02001package curator
2
3import (
4 "context"
5 "net/netip"
6
7 "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
8 "google.golang.org/grpc/codes"
9 "google.golang.org/grpc/status"
10
11 "source.monogon.dev/metropolis/node/core/identity"
12 "source.monogon.dev/metropolis/node/core/rpc"
Serge Bazanskibc739722023-03-28 20:12:01 +020013 "source.monogon.dev/metropolis/pkg/event"
14 "source.monogon.dev/metropolis/pkg/event/etcd"
Serge Bazanskie6bc2272023-03-28 16:28:13 +020015
16 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
17)
18
Serge Bazanskibc739722023-03-28 20:12:01 +020019// preapreClusternetCacheUnlocked makes sure the leader's clusternetCache exists,
20// and loads it from etcd otherwise.
21func (l *leaderCurator) prepareClusternetCacheUnlocked(ctx context.Context) error {
22 if l.ls.clusternetCache != nil {
23 return nil
24 }
25
26 cache := make(map[string]string)
27
28 // Get all nodes.
29 start, end := nodeEtcdPrefix.KeyRange()
30 value := etcd.NewValue[*nodeAtID](l.etcd, start, nodeValueConverter, etcd.Range(end))
31 w := value.Watch()
32 defer w.Close()
33 for {
34 nodeKV, err := w.Get(ctx, event.BacklogOnly[*nodeAtID]())
35 if err == event.BacklogDone {
36 break
37 }
38 if err != nil {
39 rpc.Trace(ctx).Printf("etcd watch failed (initial fetch): %v", err)
40 return status.Error(codes.Unavailable, "internal error during clusternet cache load")
41 }
42 n := nodeKV.value
43 if n == nil {
44 continue
45 }
46
47 // Ignore nodes without cluster networking.
48 if n.wireguardKey == "" {
49 continue
50 }
51
52 // If we have an inconsistency in the database, just pretend it's not there.
53 //
54 // TODO(q3k): try to recover from this.
55 if id, ok := cache[n.wireguardKey]; ok && id != n.ID() {
56 continue
57 }
58 cache[n.wireguardKey] = n.ID()
59 }
60
61 l.ls.clusternetCache = cache
62 return nil
63}
64
Serge Bazanskie6bc2272023-03-28 16:28:13 +020065func (l *leaderCurator) UpdateNodeClusterNetworking(ctx context.Context, req *ipb.UpdateNodeClusterNetworkingRequest) (*ipb.UpdateNodeClusterNetworkingResponse, error) {
66 // Ensure that the given node_id matches the calling node. We currently
67 // only allow for direct self-reporting of status by nodes.
68 pi := rpc.GetPeerInfo(ctx)
69 if pi == nil || pi.Node == nil {
70 return nil, status.Error(codes.PermissionDenied, "only nodes can update node cluster networking")
71 }
72 id := identity.NodeID(pi.Node.PublicKey)
73
74 if req.Clusternet == nil {
75 return nil, status.Error(codes.InvalidArgument, "clusternet must be set")
76 }
77 cn := req.Clusternet
78 if cn.WireguardPubkey == "" {
79 return nil, status.Error(codes.InvalidArgument, "clusternet.wireguard_pubkey must be set")
80 }
Serge Bazanskibc739722023-03-28 20:12:01 +020081 key, err := wgtypes.ParseKey(cn.WireguardPubkey)
Serge Bazanskie6bc2272023-03-28 16:28:13 +020082 if err != nil {
83 return nil, status.Error(codes.InvalidArgument, "clusternet.wireguard_pubkey must be a valid wireguard public key")
84 }
85
Serge Bazanskibc739722023-03-28 20:12:01 +020086 // Lock everything, as we're doing a complex read/modify/store here.
Serge Bazanskie6bc2272023-03-28 16:28:13 +020087 l.muNodes.Lock()
88 defer l.muNodes.Unlock()
89
Serge Bazanskibc739722023-03-28 20:12:01 +020090 if err := l.prepareClusternetCacheUnlocked(ctx); err != nil {
91 return nil, err
92 }
93
94 if nid, ok := l.ls.clusternetCache[key.String()]; ok && nid != id {
95 return nil, status.Error(codes.InvalidArgument, "public key alread used by another node")
96 }
97
98 // TODO(q3k): unhardcode this and synchronize with Kubernetes code.
Lorenz Brun2f7e0a22023-06-22 16:56:13 +020099 clusterNet := netip.MustParsePrefix("10.192.0.0/11")
Serge Bazanskibc739722023-03-28 20:12:01 +0200100
Serge Bazanskie6bc2272023-03-28 16:28:13 +0200101 // Retrieve node ...
102 node, err := nodeLoad(ctx, l.leadership, id)
103 if err != nil {
104 return nil, err
105 }
106
107 if node.status == nil {
108 return nil, status.Error(codes.FailedPrecondition, "node needs to submit at least one status update")
109 }
110 externalIP := node.status.ExternalAddress
111
112 // Parse/validate given prefixes.
113 var prefixes []netip.Prefix
114 for i, prefix := range cn.Prefixes {
115 // Parse them.
116 p, err := netip.ParsePrefix(prefix.Cidr)
117 if err != nil {
118 return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr invalid: %v", i, err)
119 }
120
121 // Make sure they're in canonical form.
122 masked := p.Masked()
123 if masked.String() != p.String() {
124 return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr (%s) must be in canonical format (ie. all address bits within the subnet must be zero)", i, p.String())
125 }
126
127 // Make sure they're fully contained within clusterNet or are the /32 of a node's
128 // externalIP.
129
130 okay := false
131 if clusterNet.Contains(p.Addr()) && p.Bits() >= clusterNet.Bits() {
132 okay = true
133 }
134 if p.IsSingleIP() && p.Addr().String() == externalIP {
135 okay = true
136 }
137
138 if !okay {
139 return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr (%s) must be fully contained within cluster network (%s) or be the node's external IP (%s)", i, p.String(), clusterNet.String(), externalIP)
140 }
141
142 prefixes = append(prefixes, p)
143
144 }
145
Serge Bazanskibc739722023-03-28 20:12:01 +0200146 // Modify and save node.
147 node.wireguardKey = key.String()
Serge Bazanskie6bc2272023-03-28 16:28:13 +0200148 node.networkPrefixes = prefixes
Serge Bazanskie6bc2272023-03-28 16:28:13 +0200149 if err := nodeSave(ctx, l.leadership, node); err != nil {
150 return nil, err
151 }
152
Serge Bazanskibc739722023-03-28 20:12:01 +0200153 // Now that etcd is saved, also modify our cache.
154 l.ls.clusternetCache[key.String()] = id
155
Serge Bazanskie6bc2272023-03-28 16:28:13 +0200156 return &ipb.UpdateNodeClusterNetworkingResponse{}, nil
157}