Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 1 | # Copyright 2017-present Open Networking Foundation |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | |
| 15 | """ |
| 16 | pull_pods.py |
| 17 | |
| 18 | Implements a syncstep to pull information about pods form Kubernetes. |
| 19 | """ |
| 20 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 21 | import json |
| 22 | |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 23 | from synchronizers.new_base.pullstep import PullStep |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 24 | from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \ |
| 25 | TrustDomain, Site, Image |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 26 | |
| 27 | from xosconfig import Config |
| 28 | from multistructlog import create_logger |
Zack Williams | 3dc9760 | 2018-09-13 22:33:26 -0700 | [diff] [blame] | 29 | from xoskafka import XOSKafkaProducer |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 30 | |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 31 | log = create_logger(Config().get('logging')) |
| 32 | |
Zack Williams | 3dc9760 | 2018-09-13 22:33:26 -0700 | [diff] [blame] | 33 | |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 34 | class KubernetesServiceInstancePullStep(PullStep): |
| 35 | """ |
| 36 | KubernetesServiceInstancePullStep |
| 37 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 38 | Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance |
| 39 | if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created |
| 40 | as necessary to fill the required dependencies of the KubernetesServiceInstance. |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 41 | """ |
| 42 | |
| 43 | def __init__(self): |
| 44 | super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance) |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 45 | |
Scott Baker | 13e953c | 2018-05-17 09:19:15 -0700 | [diff] [blame] | 46 | self.init_kubernetes_client() |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 47 | |
Scott Baker | 13e953c | 2018-05-17 09:19:15 -0700 | [diff] [blame] | 48 | def init_kubernetes_client(self): |
| 49 | from kubernetes import client as kubernetes_client, config as kubernetes_config |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 50 | kubernetes_config.load_incluster_config() |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 51 | self.v1core = kubernetes_client.CoreV1Api() |
| 52 | self.v1apps = kubernetes_client.AppsV1Api() |
Scott Baker | d4c5bbe | 2018-05-10 11:52:36 -0700 | [diff] [blame] | 53 | self.v1batch = kubernetes_client.BatchV1Api() |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 54 | |
| 55 | def obj_to_handle(self, obj): |
| 56 | """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within |
| 57 | Kubernetes. |
| 58 | """ |
| 59 | return obj.metadata.self_link |
| 60 | |
| 61 | def read_obj_kind(self, kind, name, trust_domain): |
| 62 | """ Given an object kind and name, read it from Kubernetes """ |
| 63 | if kind == "ReplicaSet": |
| 64 | resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name) |
| 65 | elif kind == "StatefulSet": |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 66 | resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 67 | elif kind == "DaemonSet": |
| 68 | resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name) |
| 69 | elif kind == "Deployment": |
| 70 | resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name) |
Scott Baker | d4c5bbe | 2018-05-10 11:52:36 -0700 | [diff] [blame] | 71 | elif kind == "Job": |
| 72 | resource = self.v1batch.read_namespaced_job(name, trust_domain.name) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 73 | else: |
| 74 | resource = None |
| 75 | return resource |
| 76 | |
| 77 | def get_controller_from_obj(self, obj, trust_domain, depth=0): |
| 78 | """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that |
| 79 | is marked as a controller, but does not have any owners. |
| 80 | |
| 81 | This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over |
| 82 | the ReplicaSet and return the Deployment. |
| 83 | """ |
| 84 | |
| 85 | owner_references = obj.metadata.owner_references |
| 86 | if not owner_references: |
| 87 | if (depth==0): |
| 88 | # If depth is zero, then we're still looking at the object, not a controller. |
| 89 | return None |
| 90 | return obj |
| 91 | |
| 92 | for owner_reference in owner_references: |
| 93 | if not getattr(owner_reference, "controller", False): |
| 94 | continue |
| 95 | owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain) |
Scott Baker | d4c5bbe | 2018-05-10 11:52:36 -0700 | [diff] [blame] | 96 | if not owner: |
| 97 | log.warning("failed to fetch owner", owner_reference=owner_reference) |
| 98 | continue |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 99 | controller = self.get_controller_from_obj(owner, trust_domain, depth+1) |
| 100 | if controller: |
| 101 | return controller |
| 102 | |
| 103 | return None |
| 104 | |
| 105 | def get_slice_from_pod(self, pod, trust_domain, principal): |
| 106 | """ Given a pod, determine which XOS Slice goes with it |
| 107 | If the Slice doesn't exist, create it. |
| 108 | """ |
| 109 | controller = self.get_controller_from_obj(pod, trust_domain) |
| 110 | if not controller: |
| 111 | return None |
| 112 | |
| 113 | slice_name = controller.metadata.name |
Matteo Scandolo | f5a6545 | 2018-08-16 18:08:03 -0700 | [diff] [blame] | 114 | if hasattr(controller.metadata, "labels") and controller.metadata.labels is not None: |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 115 | if "xos_slice_name" in controller.metadata.labels: |
| 116 | # Someone has labeled the controller with an xos slice name. Use it. |
| 117 | slice_name = controller.metadata.labels["xos_slice_name"] |
| 118 | |
| 119 | existing_slices = Slice.objects.filter(name = slice_name) |
| 120 | if not existing_slices: |
| 121 | # TODO(smbaker): atomicity |
| 122 | s = Slice(name=slice_name, site = Site.objects.first(), |
| 123 | trust_domain=trust_domain, |
| 124 | principal=principal, |
| 125 | backend_handle=self.obj_to_handle(controller), |
| 126 | controller_kind=controller.kind, |
| 127 | xos_managed=False) |
| 128 | s.save() |
| 129 | return s |
| 130 | else: |
| 131 | return existing_slices[0] |
| 132 | |
| 133 | def get_trustdomain_from_pod(self, pod, owner_service): |
| 134 | """ Given a pod, determine which XOS TrustDomain goes with it |
| 135 | If the TrustDomain doesn't exist, create it. |
| 136 | """ |
| 137 | existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace) |
| 138 | if not existing_trustdomains: |
| 139 | k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace) |
| 140 | |
| 141 | # TODO(smbaker): atomicity |
| 142 | t = TrustDomain(name = pod.metadata.namespace, |
| 143 | xos_managed=False, |
| 144 | owner=owner_service, |
| 145 | backend_handle = self.obj_to_handle(k8s_trust_domain)) |
| 146 | t.save() |
| 147 | return t |
| 148 | else: |
| 149 | return existing_trustdomains[0] |
| 150 | |
| 151 | def get_principal_from_pod(self, pod, trust_domain): |
| 152 | """ Given a pod, determine which XOS Principal goes with it |
| 153 | If the Principal doesn't exist, create it. |
| 154 | """ |
| 155 | principal_name = getattr(pod.spec, "service_account", None) |
| 156 | if not principal_name: |
| 157 | return None |
| 158 | existing_principals = Principal.objects.filter(name = principal_name) |
| 159 | if not existing_principals: |
| 160 | k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name) |
| 161 | |
| 162 | # TODO(smbaker): atomicity |
| 163 | p = Principal(name = principal_name, |
| 164 | trust_domain = trust_domain, |
| 165 | xos_managed = False, |
| 166 | backend_handle = self.obj_to_handle(k8s_service_account)) |
| 167 | p.save() |
| 168 | return p |
| 169 | else: |
| 170 | return existing_principals[0] |
| 171 | |
| 172 | def get_image_from_pod(self, pod): |
| 173 | """ Given a pod, determine which XOS Image goes with it |
| 174 | If the Image doesn't exist, create it. |
| 175 | """ |
| 176 | containers = pod.spec.containers |
| 177 | if containers: |
| 178 | # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now? |
| 179 | container = containers[0] |
| 180 | if ":" in container.image: |
Matteo Scandolo | f5a6545 | 2018-08-16 18:08:03 -0700 | [diff] [blame] | 181 | (name, tag) = container.image.rsplit(":", 1) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 182 | else: |
| 183 | # Is assuming a default necessary? |
| 184 | name = container.image |
| 185 | tag = "master" |
| 186 | |
| 187 | existing_images = Image.objects.filter(name=name, tag=tag, kind="container") |
| 188 | if not existing_images: |
| 189 | i = Image(name=name, tag=tag, kind="container", xos_managed=False) |
| 190 | i.save() |
| 191 | return i |
| 192 | else: |
| 193 | return existing_images[0] |
| 194 | else: |
| 195 | return None |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 196 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 197 | def send_notification(self, xos_pod, k8s_pod, status): |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 198 | |
| 199 | event = {"status": status, |
Scott Baker | 3c2b820 | 2018-08-15 10:51:55 -0700 | [diff] [blame] | 200 | "name": xos_pod.name, |
| 201 | "producer": "k8s-sync"} |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 202 | |
| 203 | if xos_pod.id: |
| 204 | event["kubernetesserviceinstance_id"] = xos_pod.id |
| 205 | |
| 206 | if k8s_pod: |
| 207 | event["labels"] = k8s_pod.metadata.labels |
| 208 | |
| 209 | if k8s_pod.status.pod_ip: |
| 210 | event["netinterfaces"] = [{"name": "primary", |
| 211 | "addresses": [k8s_pod.status.pod_ip]}] |
| 212 | |
Zack Williams | 3dc9760 | 2018-09-13 22:33:26 -0700 | [diff] [blame] | 213 | topic = "xos.kubernetes.pod-details" |
| 214 | key = xos_pod.name |
| 215 | value = json.dumps(event, default=lambda o: repr(o)) |
| 216 | |
| 217 | XOSKafkaProducer.produce(topic, key, value) |
| 218 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 219 | |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 220 | def pull_records(self): |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 221 | # Read all pods from Kubernetes, store them in k8s_pods_by_name |
| 222 | k8s_pods_by_name = {} |
| 223 | ret = self.v1core.list_pod_for_all_namespaces(watch=False) |
| 224 | for item in ret.items: |
| 225 | k8s_pods_by_name[item.metadata.name] = item |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 226 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 227 | # Read all pods from XOS, store them in xos_pods_by_name |
| 228 | xos_pods_by_name = {} |
| 229 | existing_pods = KubernetesServiceInstance.objects.all() |
| 230 | for pod in existing_pods: |
| 231 | xos_pods_by_name[pod.name] = pod |
| 232 | |
| 233 | kubernetes_services = KubernetesService.objects.all() |
| 234 | if len(kubernetes_services)==0: |
| 235 | raise Exception("There are no Kubernetes Services yet") |
| 236 | if len(kubernetes_services)>1: |
| 237 | # Simplifying assumption -- there is only one Kubernetes Service |
| 238 | raise Exception("There are too many Kubernetes Services") |
| 239 | kubernetes_service = kubernetes_services[0] |
| 240 | |
| 241 | # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod. |
| 242 | for (k,pod) in k8s_pods_by_name.items(): |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 243 | try: |
| 244 | if not k in xos_pods_by_name: |
| 245 | trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service) |
| 246 | if not trust_domain: |
| 247 | log.warning("Unable to determine trust_domain for %s" % k) |
| 248 | continue |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 249 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 250 | principal = self.get_principal_from_pod(pod, trust_domain) |
| 251 | slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal) |
| 252 | image = self.get_image_from_pod(pod) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 253 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 254 | if not slice: |
| 255 | log.warning("Unable to determine slice for %s" % k) |
| 256 | continue |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 257 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 258 | xos_pod = KubernetesServiceInstance(name=k, |
| 259 | pod_ip = pod.status.pod_ip, |
| 260 | owner = kubernetes_service, |
| 261 | slice = slice, |
| 262 | image = image, |
| 263 | backend_handle = self.obj_to_handle(pod), |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 264 | xos_managed = False, |
| 265 | need_event = True) |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 266 | xos_pod.save() |
| 267 | xos_pods_by_name[k] = xos_pod |
| 268 | log.info("Created XOS POD %s" % xos_pod.name) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 269 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 270 | xos_pod = xos_pods_by_name[k] |
| 271 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 272 | # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP |
| 273 | # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case |
| 274 | # here. |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 275 | if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip): |
| 276 | xos_pod.pod_ip = pod.status.pod_ip |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 277 | xos_pod.need_event = True # Trigger a new kafka event |
| 278 | xos_pod.save(update_fields = ["pod_ip", "need_event"]) |
| 279 | log.info("Updated XOS POD %s" % xos_pod.name) |
| 280 | |
| 281 | # Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If |
| 282 | # so, then we'll try to send the event again later. |
Zack Williams | 3dc9760 | 2018-09-13 22:33:26 -0700 | [diff] [blame] | 283 | if (xos_pod.need_event): |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 284 | if xos_pod.last_event_sent == "created": |
| 285 | event_kind = "updated" |
| 286 | else: |
| 287 | event_kind = "created" |
| 288 | |
| 289 | self.send_notification(xos_pod, pod, event_kind) |
| 290 | |
| 291 | xos_pod.need_event = False |
| 292 | xos_pod.last_event_sent = event_kind |
| 293 | xos_pod.save(update_fields=["need_event", "last_event_sent"]) |
| 294 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 295 | except: |
| 296 | log.exception("Failed to process k8s pod", k=k, pod=pod) |
Scott Baker | ac43a74 | 2018-05-07 16:54:03 -0700 | [diff] [blame] | 297 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 298 | # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted. |
| 299 | for (k,xos_pod) in xos_pods_by_name.items(): |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 300 | try: |
| 301 | if (not k in k8s_pods_by_name): |
| 302 | if (xos_pod.xos_managed): |
| 303 | # Should we do something so it gets re-created by the syncstep? |
| 304 | pass |
| 305 | else: |
| 306 | self.send_notification(xos_pod, None, "deleted") |
| 307 | xos_pod.delete() |
| 308 | log.info("Deleted XOS POD %s" % k) |
| 309 | except: |
| 310 | log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod) |