| Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 1 | package curator |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "net/netip" |
| 6 | |
| 7 | "golang.zx2c4.com/wireguard/wgctrl/wgtypes" |
| 8 | "google.golang.org/grpc/codes" |
| 9 | "google.golang.org/grpc/status" |
| 10 | |
| 11 | "source.monogon.dev/metropolis/node/core/identity" |
| 12 | "source.monogon.dev/metropolis/node/core/rpc" |
| Serge Bazanski | bc73972 | 2023-03-28 20:12:01 +0200 | [diff] [blame] | 13 | "source.monogon.dev/metropolis/pkg/event" |
| 14 | "source.monogon.dev/metropolis/pkg/event/etcd" |
| Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 15 | |
| 16 | ipb "source.monogon.dev/metropolis/node/core/curator/proto/api" |
| 17 | ) |
| 18 | |
| Serge Bazanski | bc73972 | 2023-03-28 20:12:01 +0200 | [diff] [blame] | 19 | // preapreClusternetCacheUnlocked makes sure the leader's clusternetCache exists, |
| 20 | // and loads it from etcd otherwise. |
| 21 | func (l *leaderCurator) prepareClusternetCacheUnlocked(ctx context.Context) error { |
| 22 | if l.ls.clusternetCache != nil { |
| 23 | return nil |
| 24 | } |
| 25 | |
| 26 | cache := make(map[string]string) |
| 27 | |
| 28 | // Get all nodes. |
| 29 | start, end := nodeEtcdPrefix.KeyRange() |
| 30 | value := etcd.NewValue[*nodeAtID](l.etcd, start, nodeValueConverter, etcd.Range(end)) |
| 31 | w := value.Watch() |
| 32 | defer w.Close() |
| 33 | for { |
| 34 | nodeKV, err := w.Get(ctx, event.BacklogOnly[*nodeAtID]()) |
| 35 | if err == event.BacklogDone { |
| 36 | break |
| 37 | } |
| 38 | if err != nil { |
| 39 | rpc.Trace(ctx).Printf("etcd watch failed (initial fetch): %v", err) |
| 40 | return status.Error(codes.Unavailable, "internal error during clusternet cache load") |
| 41 | } |
| 42 | n := nodeKV.value |
| 43 | if n == nil { |
| 44 | continue |
| 45 | } |
| 46 | |
| 47 | // Ignore nodes without cluster networking. |
| 48 | if n.wireguardKey == "" { |
| 49 | continue |
| 50 | } |
| 51 | |
| 52 | // If we have an inconsistency in the database, just pretend it's not there. |
| 53 | // |
| 54 | // TODO(q3k): try to recover from this. |
| 55 | if id, ok := cache[n.wireguardKey]; ok && id != n.ID() { |
| 56 | continue |
| 57 | } |
| 58 | cache[n.wireguardKey] = n.ID() |
| 59 | } |
| 60 | |
| 61 | l.ls.clusternetCache = cache |
| 62 | return nil |
| 63 | } |
| 64 | |
| Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 65 | func (l *leaderCurator) UpdateNodeClusterNetworking(ctx context.Context, req *ipb.UpdateNodeClusterNetworkingRequest) (*ipb.UpdateNodeClusterNetworkingResponse, error) { |
| 66 | // Ensure that the given node_id matches the calling node. We currently |
| 67 | // only allow for direct self-reporting of status by nodes. |
| 68 | pi := rpc.GetPeerInfo(ctx) |
| 69 | if pi == nil || pi.Node == nil { |
| 70 | return nil, status.Error(codes.PermissionDenied, "only nodes can update node cluster networking") |
| 71 | } |
| 72 | id := identity.NodeID(pi.Node.PublicKey) |
| 73 | |
| 74 | if req.Clusternet == nil { |
| 75 | return nil, status.Error(codes.InvalidArgument, "clusternet must be set") |
| 76 | } |
| 77 | cn := req.Clusternet |
| 78 | if cn.WireguardPubkey == "" { |
| 79 | return nil, status.Error(codes.InvalidArgument, "clusternet.wireguard_pubkey must be set") |
| 80 | } |
| Serge Bazanski | bc73972 | 2023-03-28 20:12:01 +0200 | [diff] [blame] | 81 | key, err := wgtypes.ParseKey(cn.WireguardPubkey) |
| Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 82 | if err != nil { |
| 83 | return nil, status.Error(codes.InvalidArgument, "clusternet.wireguard_pubkey must be a valid wireguard public key") |
| 84 | } |
| 85 | |
| Serge Bazanski | bc73972 | 2023-03-28 20:12:01 +0200 | [diff] [blame] | 86 | // Lock everything, as we're doing a complex read/modify/store here. |
| Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 87 | l.muNodes.Lock() |
| 88 | defer l.muNodes.Unlock() |
| 89 | |
| Serge Bazanski | bc73972 | 2023-03-28 20:12:01 +0200 | [diff] [blame] | 90 | if err := l.prepareClusternetCacheUnlocked(ctx); err != nil { |
| 91 | return nil, err |
| 92 | } |
| 93 | |
| 94 | if nid, ok := l.ls.clusternetCache[key.String()]; ok && nid != id { |
| 95 | return nil, status.Error(codes.InvalidArgument, "public key alread used by another node") |
| 96 | } |
| 97 | |
| 98 | // TODO(q3k): unhardcode this and synchronize with Kubernetes code. |
| Lorenz Brun | 2f7e0a2 | 2023-06-22 16:56:13 +0200 | [diff] [blame] | 99 | clusterNet := netip.MustParsePrefix("10.192.0.0/11") |
| Serge Bazanski | bc73972 | 2023-03-28 20:12:01 +0200 | [diff] [blame] | 100 | |
| Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 101 | // Retrieve node ... |
| 102 | node, err := nodeLoad(ctx, l.leadership, id) |
| 103 | if err != nil { |
| 104 | return nil, err |
| 105 | } |
| 106 | |
| 107 | if node.status == nil { |
| 108 | return nil, status.Error(codes.FailedPrecondition, "node needs to submit at least one status update") |
| 109 | } |
| 110 | externalIP := node.status.ExternalAddress |
| 111 | |
| 112 | // Parse/validate given prefixes. |
| 113 | var prefixes []netip.Prefix |
| 114 | for i, prefix := range cn.Prefixes { |
| 115 | // Parse them. |
| 116 | p, err := netip.ParsePrefix(prefix.Cidr) |
| 117 | if err != nil { |
| 118 | return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr invalid: %v", i, err) |
| 119 | } |
| 120 | |
| 121 | // Make sure they're in canonical form. |
| 122 | masked := p.Masked() |
| 123 | if masked.String() != p.String() { |
| 124 | return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr (%s) must be in canonical format (ie. all address bits within the subnet must be zero)", i, p.String()) |
| 125 | } |
| 126 | |
| 127 | // Make sure they're fully contained within clusterNet or are the /32 of a node's |
| 128 | // externalIP. |
| 129 | |
| 130 | okay := false |
| 131 | if clusterNet.Contains(p.Addr()) && p.Bits() >= clusterNet.Bits() { |
| 132 | okay = true |
| 133 | } |
| 134 | if p.IsSingleIP() && p.Addr().String() == externalIP { |
| 135 | okay = true |
| 136 | } |
| 137 | |
| 138 | if !okay { |
| 139 | return nil, status.Errorf(codes.InvalidArgument, "clusternet.prefixes[%d].cidr (%s) must be fully contained within cluster network (%s) or be the node's external IP (%s)", i, p.String(), clusterNet.String(), externalIP) |
| 140 | } |
| 141 | |
| 142 | prefixes = append(prefixes, p) |
| 143 | |
| 144 | } |
| 145 | |
| Serge Bazanski | bc73972 | 2023-03-28 20:12:01 +0200 | [diff] [blame] | 146 | // Modify and save node. |
| 147 | node.wireguardKey = key.String() |
| Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 148 | node.networkPrefixes = prefixes |
| Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 149 | if err := nodeSave(ctx, l.leadership, node); err != nil { |
| 150 | return nil, err |
| 151 | } |
| 152 | |
| Serge Bazanski | bc73972 | 2023-03-28 20:12:01 +0200 | [diff] [blame] | 153 | // Now that etcd is saved, also modify our cache. |
| 154 | l.ls.clusternetCache[key.String()] = id |
| 155 | |
| Serge Bazanski | e6bc227 | 2023-03-28 16:28:13 +0200 | [diff] [blame] | 156 | return &ipb.UpdateNodeClusterNetworkingResponse{}, nil |
| 157 | } |