Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 1 | # Copyright 2017-present Open Networking Foundation |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | |
| 15 | """ |
| 16 | pull_pods.py |
| 17 | |
| 18 | Implements a syncstep to pull information about pods form Kubernetes. |
| 19 | """ |
| 20 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 21 | import json |
| 22 | |
Scott Baker | a30fae7 | 2019-02-01 16:14:43 -0800 | [diff] [blame] | 23 | from xossynchronizer.pull_steps.pullstep import PullStep |
| 24 | from xossynchronizer.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \ |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 25 | TrustDomain, Site, Image |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 26 | |
| 27 | from xosconfig import Config |
| 28 | from multistructlog import create_logger |
Zack Williams | 3dc9760 | 2018-09-13 22:33:26 -0700 | [diff] [blame] | 29 | from xoskafka import XOSKafkaProducer |
Scott Baker | 8b725f5 | 2019-06-11 16:14:39 -0700 | [diff] [blame] | 30 | from helpers import debug_once |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 31 | |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 32 | log = create_logger(Config().get('logging')) |
| 33 | |
Zack Williams | 3dc9760 | 2018-09-13 22:33:26 -0700 | [diff] [blame] | 34 | |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 35 | class KubernetesServiceInstancePullStep(PullStep): |
| 36 | """ |
| 37 | KubernetesServiceInstancePullStep |
| 38 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 39 | Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance |
| 40 | if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created |
| 41 | as necessary to fill the required dependencies of the KubernetesServiceInstance. |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 42 | """ |
| 43 | |
Scott Baker | b186b81 | 2019-02-27 09:34:15 -0800 | [diff] [blame] | 44 | def __init__(self, *args, **kwargs): |
| 45 | super(KubernetesServiceInstancePullStep, self).__init__(*args, observed_model=KubernetesServiceInstance, **kwargs) |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 46 | |
Scott Baker | 13e953c | 2018-05-17 09:19:15 -0700 | [diff] [blame] | 47 | self.init_kubernetes_client() |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 48 | |
Scott Baker | 13e953c | 2018-05-17 09:19:15 -0700 | [diff] [blame] | 49 | def init_kubernetes_client(self): |
| 50 | from kubernetes import client as kubernetes_client, config as kubernetes_config |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 51 | kubernetes_config.load_incluster_config() |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 52 | self.v1core = kubernetes_client.CoreV1Api() |
| 53 | self.v1apps = kubernetes_client.AppsV1Api() |
Scott Baker | d4c5bbe | 2018-05-10 11:52:36 -0700 | [diff] [blame] | 54 | self.v1batch = kubernetes_client.BatchV1Api() |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 55 | |
| 56 | def obj_to_handle(self, obj): |
| 57 | """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within |
| 58 | Kubernetes. |
| 59 | """ |
| 60 | return obj.metadata.self_link |
| 61 | |
| 62 | def read_obj_kind(self, kind, name, trust_domain): |
| 63 | """ Given an object kind and name, read it from Kubernetes """ |
| 64 | if kind == "ReplicaSet": |
| 65 | resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name) |
| 66 | elif kind == "StatefulSet": |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 67 | resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 68 | elif kind == "DaemonSet": |
| 69 | resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name) |
| 70 | elif kind == "Deployment": |
| 71 | resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name) |
Scott Baker | d4c5bbe | 2018-05-10 11:52:36 -0700 | [diff] [blame] | 72 | elif kind == "Job": |
| 73 | resource = self.v1batch.read_namespaced_job(name, trust_domain.name) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 74 | else: |
| 75 | resource = None |
| 76 | return resource |
| 77 | |
Scott Baker | 8b725f5 | 2019-06-11 16:14:39 -0700 | [diff] [blame] | 78 | def get_controller_from_obj(self, pod_name, obj, trust_domain, depth=0): |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 79 | """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that |
| 80 | is marked as a controller, but does not have any owners. |
| 81 | |
| 82 | This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over |
| 83 | the ReplicaSet and return the Deployment. |
| 84 | """ |
| 85 | |
| 86 | owner_references = obj.metadata.owner_references |
| 87 | if not owner_references: |
| 88 | if (depth==0): |
| 89 | # If depth is zero, then we're still looking at the object, not a controller. |
| 90 | return None |
| 91 | return obj |
| 92 | |
| 93 | for owner_reference in owner_references: |
| 94 | if not getattr(owner_reference, "controller", False): |
| 95 | continue |
| 96 | owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain) |
Scott Baker | d4c5bbe | 2018-05-10 11:52:36 -0700 | [diff] [blame] | 97 | if not owner: |
Scott Baker | 04dd3e6 | 2018-11-27 17:17:13 -0800 | [diff] [blame] | 98 | # Failed to fetch the owner, probably because the owner's kind is something we do not understand. An |
| 99 | # example is the etcd-cluser pod, which is owned by a deployment of kind "EtcdCluster". |
Scott Baker | 8b725f5 | 2019-06-11 16:14:39 -0700 | [diff] [blame] | 100 | debug_once("Pod %s: Failed to fetch owner" % pod_name, owner_reference=owner_reference) |
Scott Baker | d4c5bbe | 2018-05-10 11:52:36 -0700 | [diff] [blame] | 101 | continue |
Scott Baker | 8b725f5 | 2019-06-11 16:14:39 -0700 | [diff] [blame] | 102 | controller = self.get_controller_from_obj(pod_name, owner, trust_domain, depth+1) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 103 | if controller: |
| 104 | return controller |
| 105 | |
| 106 | return None |
| 107 | |
Scott Baker | 8b725f5 | 2019-06-11 16:14:39 -0700 | [diff] [blame] | 108 | def get_slice_from_pod(self, pod_name, pod, trust_domain, principal): |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 109 | """ Given a pod, determine which XOS Slice goes with it |
| 110 | If the Slice doesn't exist, create it. |
| 111 | """ |
Scott Baker | 8b725f5 | 2019-06-11 16:14:39 -0700 | [diff] [blame] | 112 | controller = self.get_controller_from_obj(pod_name, pod, trust_domain) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 113 | if not controller: |
| 114 | return None |
| 115 | |
| 116 | slice_name = controller.metadata.name |
Matteo Scandolo | f5a6545 | 2018-08-16 18:08:03 -0700 | [diff] [blame] | 117 | if hasattr(controller.metadata, "labels") and controller.metadata.labels is not None: |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 118 | if "xos_slice_name" in controller.metadata.labels: |
| 119 | # Someone has labeled the controller with an xos slice name. Use it. |
| 120 | slice_name = controller.metadata.labels["xos_slice_name"] |
| 121 | |
| 122 | existing_slices = Slice.objects.filter(name = slice_name) |
| 123 | if not existing_slices: |
| 124 | # TODO(smbaker): atomicity |
| 125 | s = Slice(name=slice_name, site = Site.objects.first(), |
| 126 | trust_domain=trust_domain, |
| 127 | principal=principal, |
| 128 | backend_handle=self.obj_to_handle(controller), |
| 129 | controller_kind=controller.kind, |
| 130 | xos_managed=False) |
| 131 | s.save() |
| 132 | return s |
| 133 | else: |
| 134 | return existing_slices[0] |
| 135 | |
| 136 | def get_trustdomain_from_pod(self, pod, owner_service): |
| 137 | """ Given a pod, determine which XOS TrustDomain goes with it |
| 138 | If the TrustDomain doesn't exist, create it. |
| 139 | """ |
| 140 | existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace) |
| 141 | if not existing_trustdomains: |
| 142 | k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace) |
| 143 | |
| 144 | # TODO(smbaker): atomicity |
| 145 | t = TrustDomain(name = pod.metadata.namespace, |
| 146 | xos_managed=False, |
| 147 | owner=owner_service, |
| 148 | backend_handle = self.obj_to_handle(k8s_trust_domain)) |
| 149 | t.save() |
| 150 | return t |
| 151 | else: |
| 152 | return existing_trustdomains[0] |
| 153 | |
| 154 | def get_principal_from_pod(self, pod, trust_domain): |
| 155 | """ Given a pod, determine which XOS Principal goes with it |
| 156 | If the Principal doesn't exist, create it. |
| 157 | """ |
| 158 | principal_name = getattr(pod.spec, "service_account", None) |
| 159 | if not principal_name: |
| 160 | return None |
| 161 | existing_principals = Principal.objects.filter(name = principal_name) |
| 162 | if not existing_principals: |
| 163 | k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name) |
| 164 | |
| 165 | # TODO(smbaker): atomicity |
| 166 | p = Principal(name = principal_name, |
| 167 | trust_domain = trust_domain, |
| 168 | xos_managed = False, |
| 169 | backend_handle = self.obj_to_handle(k8s_service_account)) |
| 170 | p.save() |
| 171 | return p |
| 172 | else: |
| 173 | return existing_principals[0] |
| 174 | |
| 175 | def get_image_from_pod(self, pod): |
| 176 | """ Given a pod, determine which XOS Image goes with it |
| 177 | If the Image doesn't exist, create it. |
| 178 | """ |
| 179 | containers = pod.spec.containers |
| 180 | if containers: |
| 181 | # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now? |
| 182 | container = containers[0] |
| 183 | if ":" in container.image: |
Matteo Scandolo | f5a6545 | 2018-08-16 18:08:03 -0700 | [diff] [blame] | 184 | (name, tag) = container.image.rsplit(":", 1) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 185 | else: |
| 186 | # Is assuming a default necessary? |
| 187 | name = container.image |
| 188 | tag = "master" |
| 189 | |
Matteo Scandolo | c72dce0 | 2019-02-22 10:23:28 -0800 | [diff] [blame] | 190 | # FIXME image.name is unique, but tag may differ. Update validation in the Image model so that the combination of name and tag is unique |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 191 | existing_images = Image.objects.filter(name=name, tag=tag, kind="container") |
| 192 | if not existing_images: |
| 193 | i = Image(name=name, tag=tag, kind="container", xos_managed=False) |
| 194 | i.save() |
| 195 | return i |
| 196 | else: |
| 197 | return existing_images[0] |
| 198 | else: |
| 199 | return None |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 200 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 201 | def send_notification(self, xos_pod, k8s_pod, status): |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 202 | |
| 203 | event = {"status": status, |
Scott Baker | 3c2b820 | 2018-08-15 10:51:55 -0700 | [diff] [blame] | 204 | "name": xos_pod.name, |
| 205 | "producer": "k8s-sync"} |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 206 | |
| 207 | if xos_pod.id: |
| 208 | event["kubernetesserviceinstance_id"] = xos_pod.id |
| 209 | |
| 210 | if k8s_pod: |
| 211 | event["labels"] = k8s_pod.metadata.labels |
| 212 | |
| 213 | if k8s_pod.status.pod_ip: |
| 214 | event["netinterfaces"] = [{"name": "primary", |
| 215 | "addresses": [k8s_pod.status.pod_ip]}] |
| 216 | |
Zack Williams | 3dc9760 | 2018-09-13 22:33:26 -0700 | [diff] [blame] | 217 | topic = "xos.kubernetes.pod-details" |
| 218 | key = xos_pod.name |
| 219 | value = json.dumps(event, default=lambda o: repr(o)) |
| 220 | |
| 221 | XOSKafkaProducer.produce(topic, key, value) |
| 222 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 223 | |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 224 | def pull_records(self): |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 225 | # Read all pods from Kubernetes, store them in k8s_pods_by_name |
| 226 | k8s_pods_by_name = {} |
| 227 | ret = self.v1core.list_pod_for_all_namespaces(watch=False) |
| 228 | for item in ret.items: |
| 229 | k8s_pods_by_name[item.metadata.name] = item |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 230 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 231 | # Read all pods from XOS, store them in xos_pods_by_name |
| 232 | xos_pods_by_name = {} |
| 233 | existing_pods = KubernetesServiceInstance.objects.all() |
| 234 | for pod in existing_pods: |
| 235 | xos_pods_by_name[pod.name] = pod |
| 236 | |
| 237 | kubernetes_services = KubernetesService.objects.all() |
| 238 | if len(kubernetes_services)==0: |
| 239 | raise Exception("There are no Kubernetes Services yet") |
| 240 | if len(kubernetes_services)>1: |
| 241 | # Simplifying assumption -- there is only one Kubernetes Service |
| 242 | raise Exception("There are too many Kubernetes Services") |
| 243 | kubernetes_service = kubernetes_services[0] |
| 244 | |
| 245 | # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod. |
Scott Baker | 8b725f5 | 2019-06-11 16:14:39 -0700 | [diff] [blame] | 246 | for (k, pod) in k8s_pods_by_name.items(): |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 247 | try: |
| 248 | if not k in xos_pods_by_name: |
| 249 | trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service) |
| 250 | if not trust_domain: |
Scott Baker | 04dd3e6 | 2018-11-27 17:17:13 -0800 | [diff] [blame] | 251 | # All kubernetes pods should belong to a namespace. If we can't find the namespace, then |
| 252 | # something is very wrong in K8s. |
| 253 | log.warning("Unable to determine trust_domain for pod %s. Ignoring." % k) |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 254 | continue |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 255 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 256 | principal = self.get_principal_from_pod(pod, trust_domain) |
Scott Baker | 8b725f5 | 2019-06-11 16:14:39 -0700 | [diff] [blame] | 257 | slice = self.get_slice_from_pod(k, pod, trust_domain=trust_domain, principal=principal) |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 258 | image = self.get_image_from_pod(pod) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 259 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 260 | if not slice: |
Scott Baker | 04dd3e6 | 2018-11-27 17:17:13 -0800 | [diff] [blame] | 261 | # We could get here if the pod doesn't have a controller, or if the controller is of a kind |
| 262 | # that we don't understand (such as the Etcd controller). If so, the pod is not something we |
| 263 | # are interested in. |
Scott Baker | 8b725f5 | 2019-06-11 16:14:39 -0700 | [diff] [blame] | 264 | debug_once("Pod %s: Unable to determine slice. Ignoring." % k) |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 265 | continue |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 266 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 267 | xos_pod = KubernetesServiceInstance(name=k, |
| 268 | pod_ip = pod.status.pod_ip, |
| 269 | owner = kubernetes_service, |
| 270 | slice = slice, |
| 271 | image = image, |
| 272 | backend_handle = self.obj_to_handle(pod), |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 273 | xos_managed = False, |
| 274 | need_event = True) |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 275 | xos_pod.save() |
| 276 | xos_pods_by_name[k] = xos_pod |
| 277 | log.info("Created XOS POD %s" % xos_pod.name) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 278 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 279 | xos_pod = xos_pods_by_name[k] |
| 280 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 281 | # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP |
| 282 | # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case |
| 283 | # here. |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 284 | if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip): |
| 285 | xos_pod.pod_ip = pod.status.pod_ip |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 286 | xos_pod.need_event = True # Trigger a new kafka event |
| 287 | xos_pod.save(update_fields = ["pod_ip", "need_event"]) |
| 288 | log.info("Updated XOS POD %s" % xos_pod.name) |
| 289 | |
| 290 | # Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If |
| 291 | # so, then we'll try to send the event again later. |
Zack Williams | 3dc9760 | 2018-09-13 22:33:26 -0700 | [diff] [blame] | 292 | if (xos_pod.need_event): |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 293 | if xos_pod.last_event_sent == "created": |
| 294 | event_kind = "updated" |
| 295 | else: |
| 296 | event_kind = "created" |
| 297 | |
| 298 | self.send_notification(xos_pod, pod, event_kind) |
| 299 | |
| 300 | xos_pod.need_event = False |
| 301 | xos_pod.last_event_sent = event_kind |
| 302 | xos_pod.save(update_fields=["need_event", "last_event_sent"]) |
| 303 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 304 | except: |
| 305 | log.exception("Failed to process k8s pod", k=k, pod=pod) |
Scott Baker | ac43a74 | 2018-05-07 16:54:03 -0700 | [diff] [blame] | 306 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 307 | # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted. |
| 308 | for (k,xos_pod) in xos_pods_by_name.items(): |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 309 | try: |
| 310 | if (not k in k8s_pods_by_name): |
| 311 | if (xos_pod.xos_managed): |
| 312 | # Should we do something so it gets re-created by the syncstep? |
| 313 | pass |
| 314 | else: |
| 315 | self.send_notification(xos_pod, None, "deleted") |
| 316 | xos_pod.delete() |
| 317 | log.info("Deleted XOS POD %s" % k) |
| 318 | except: |
| 319 | log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod) |