Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 1 | # Copyright 2017-present Open Networking Foundation |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | |
| 15 | """ |
| 16 | pull_pods.py |
| 17 | |
| 18 | Implements a syncstep to pull information about pods form Kubernetes. |
| 19 | """ |
| 20 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 21 | import json |
| 22 | |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 23 | from synchronizers.new_base.pullstep import PullStep |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 24 | from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \ |
| 25 | TrustDomain, Site, Image |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 26 | |
| 27 | from xosconfig import Config |
| 28 | from multistructlog import create_logger |
| 29 | |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 30 | log = create_logger(Config().get('logging')) |
| 31 | |
| 32 | class KubernetesServiceInstancePullStep(PullStep): |
| 33 | """ |
| 34 | KubernetesServiceInstancePullStep |
| 35 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 36 | Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance |
| 37 | if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created |
| 38 | as necessary to fill the required dependencies of the KubernetesServiceInstance. |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 39 | """ |
| 40 | |
| 41 | def __init__(self): |
| 42 | super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance) |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 43 | |
| 44 | self.kafka_producer = None |
| 45 | if Config.get("event_bus.endpoint"): |
| 46 | try: |
| 47 | self.init_kafka_producer() |
| 48 | except: |
| 49 | log.exception("Failed to initialize Kafka producer") |
| 50 | |
Scott Baker | 13e953c | 2018-05-17 09:19:15 -0700 | [diff] [blame] | 51 | self.init_kubernetes_client() |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 52 | |
Scott Baker | 13e953c | 2018-05-17 09:19:15 -0700 | [diff] [blame] | 53 | def init_kubernetes_client(self): |
| 54 | from kubernetes import client as kubernetes_client, config as kubernetes_config |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 55 | kubernetes_config.load_incluster_config() |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 56 | self.v1core = kubernetes_client.CoreV1Api() |
| 57 | self.v1apps = kubernetes_client.AppsV1Api() |
Scott Baker | d4c5bbe | 2018-05-10 11:52:36 -0700 | [diff] [blame] | 58 | self.v1batch = kubernetes_client.BatchV1Api() |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 59 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 60 | def init_kafka_producer(self): |
| 61 | from kafka import KafkaProducer |
| 62 | eventbus_kind = Config.get("event_bus.kind") |
| 63 | eventbus_endpoint = Config.get("event_bus.endpoint") |
| 64 | |
| 65 | if not eventbus_kind: |
| 66 | log.error("Eventbus kind is not configured in synchronizer config file.") |
| 67 | return |
| 68 | |
| 69 | if eventbus_kind not in ["kafka"]: |
| 70 | log.error("Eventbus kind is set to a technology we do not implement.", eventbus_kind=eventbus_kind) |
| 71 | return |
| 72 | |
| 73 | self.kafka_producer = KafkaProducer(bootstrap_servers = [eventbus_endpoint]) |
| 74 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 75 | def obj_to_handle(self, obj): |
| 76 | """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within |
| 77 | Kubernetes. |
| 78 | """ |
| 79 | return obj.metadata.self_link |
| 80 | |
| 81 | def read_obj_kind(self, kind, name, trust_domain): |
| 82 | """ Given an object kind and name, read it from Kubernetes """ |
| 83 | if kind == "ReplicaSet": |
| 84 | resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name) |
| 85 | elif kind == "StatefulSet": |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 86 | resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 87 | elif kind == "DaemonSet": |
| 88 | resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name) |
| 89 | elif kind == "Deployment": |
| 90 | resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name) |
Scott Baker | d4c5bbe | 2018-05-10 11:52:36 -0700 | [diff] [blame] | 91 | elif kind == "Job": |
| 92 | resource = self.v1batch.read_namespaced_job(name, trust_domain.name) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 93 | else: |
| 94 | resource = None |
| 95 | return resource |
| 96 | |
| 97 | def get_controller_from_obj(self, obj, trust_domain, depth=0): |
| 98 | """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that |
| 99 | is marked as a controller, but does not have any owners. |
| 100 | |
| 101 | This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over |
| 102 | the ReplicaSet and return the Deployment. |
| 103 | """ |
| 104 | |
| 105 | owner_references = obj.metadata.owner_references |
| 106 | if not owner_references: |
| 107 | if (depth==0): |
| 108 | # If depth is zero, then we're still looking at the object, not a controller. |
| 109 | return None |
| 110 | return obj |
| 111 | |
| 112 | for owner_reference in owner_references: |
| 113 | if not getattr(owner_reference, "controller", False): |
| 114 | continue |
| 115 | owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain) |
Scott Baker | d4c5bbe | 2018-05-10 11:52:36 -0700 | [diff] [blame] | 116 | if not owner: |
| 117 | log.warning("failed to fetch owner", owner_reference=owner_reference) |
| 118 | continue |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 119 | controller = self.get_controller_from_obj(owner, trust_domain, depth+1) |
| 120 | if controller: |
| 121 | return controller |
| 122 | |
| 123 | return None |
| 124 | |
| 125 | def get_slice_from_pod(self, pod, trust_domain, principal): |
| 126 | """ Given a pod, determine which XOS Slice goes with it |
| 127 | If the Slice doesn't exist, create it. |
| 128 | """ |
| 129 | controller = self.get_controller_from_obj(pod, trust_domain) |
| 130 | if not controller: |
| 131 | return None |
| 132 | |
| 133 | slice_name = controller.metadata.name |
| 134 | if hasattr(controller.metadata, "labels"): |
| 135 | if "xos_slice_name" in controller.metadata.labels: |
| 136 | # Someone has labeled the controller with an xos slice name. Use it. |
| 137 | slice_name = controller.metadata.labels["xos_slice_name"] |
| 138 | |
| 139 | existing_slices = Slice.objects.filter(name = slice_name) |
| 140 | if not existing_slices: |
| 141 | # TODO(smbaker): atomicity |
| 142 | s = Slice(name=slice_name, site = Site.objects.first(), |
| 143 | trust_domain=trust_domain, |
| 144 | principal=principal, |
| 145 | backend_handle=self.obj_to_handle(controller), |
| 146 | controller_kind=controller.kind, |
| 147 | xos_managed=False) |
| 148 | s.save() |
| 149 | return s |
| 150 | else: |
| 151 | return existing_slices[0] |
| 152 | |
| 153 | def get_trustdomain_from_pod(self, pod, owner_service): |
| 154 | """ Given a pod, determine which XOS TrustDomain goes with it |
| 155 | If the TrustDomain doesn't exist, create it. |
| 156 | """ |
| 157 | existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace) |
| 158 | if not existing_trustdomains: |
| 159 | k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace) |
| 160 | |
| 161 | # TODO(smbaker): atomicity |
| 162 | t = TrustDomain(name = pod.metadata.namespace, |
| 163 | xos_managed=False, |
| 164 | owner=owner_service, |
| 165 | backend_handle = self.obj_to_handle(k8s_trust_domain)) |
| 166 | t.save() |
| 167 | return t |
| 168 | else: |
| 169 | return existing_trustdomains[0] |
| 170 | |
| 171 | def get_principal_from_pod(self, pod, trust_domain): |
| 172 | """ Given a pod, determine which XOS Principal goes with it |
| 173 | If the Principal doesn't exist, create it. |
| 174 | """ |
| 175 | principal_name = getattr(pod.spec, "service_account", None) |
| 176 | if not principal_name: |
| 177 | return None |
| 178 | existing_principals = Principal.objects.filter(name = principal_name) |
| 179 | if not existing_principals: |
| 180 | k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name) |
| 181 | |
| 182 | # TODO(smbaker): atomicity |
| 183 | p = Principal(name = principal_name, |
| 184 | trust_domain = trust_domain, |
| 185 | xos_managed = False, |
| 186 | backend_handle = self.obj_to_handle(k8s_service_account)) |
| 187 | p.save() |
| 188 | return p |
| 189 | else: |
| 190 | return existing_principals[0] |
| 191 | |
| 192 | def get_image_from_pod(self, pod): |
| 193 | """ Given a pod, determine which XOS Image goes with it |
| 194 | If the Image doesn't exist, create it. |
| 195 | """ |
| 196 | containers = pod.spec.containers |
| 197 | if containers: |
| 198 | # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now? |
| 199 | container = containers[0] |
| 200 | if ":" in container.image: |
| 201 | (name, tag) = container.image.split(":") |
| 202 | else: |
| 203 | # Is assuming a default necessary? |
| 204 | name = container.image |
| 205 | tag = "master" |
| 206 | |
| 207 | existing_images = Image.objects.filter(name=name, tag=tag, kind="container") |
| 208 | if not existing_images: |
| 209 | i = Image(name=name, tag=tag, kind="container", xos_managed=False) |
| 210 | i.save() |
| 211 | return i |
| 212 | else: |
| 213 | return existing_images[0] |
| 214 | else: |
| 215 | return None |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 216 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 217 | def send_notification(self, xos_pod, k8s_pod, status): |
| 218 | if not self.kafka_producer: |
| 219 | return |
| 220 | |
| 221 | event = {"status": status, |
Scott Baker | 3c2b820 | 2018-08-15 10:51:55 -0700 | [diff] [blame^] | 222 | "name": xos_pod.name, |
| 223 | "producer": "k8s-sync"} |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 224 | |
| 225 | if xos_pod.id: |
| 226 | event["kubernetesserviceinstance_id"] = xos_pod.id |
| 227 | |
| 228 | if k8s_pod: |
| 229 | event["labels"] = k8s_pod.metadata.labels |
| 230 | |
| 231 | if k8s_pod.status.pod_ip: |
| 232 | event["netinterfaces"] = [{"name": "primary", |
| 233 | "addresses": [k8s_pod.status.pod_ip]}] |
| 234 | |
| 235 | self.kafka_producer.send("xos.kubernetes.pod-details", json.dumps(event)) |
| 236 | self.kafka_producer.flush() |
| 237 | |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 238 | def pull_records(self): |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 239 | # Read all pods from Kubernetes, store them in k8s_pods_by_name |
| 240 | k8s_pods_by_name = {} |
| 241 | ret = self.v1core.list_pod_for_all_namespaces(watch=False) |
| 242 | for item in ret.items: |
| 243 | k8s_pods_by_name[item.metadata.name] = item |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 244 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 245 | # Read all pods from XOS, store them in xos_pods_by_name |
| 246 | xos_pods_by_name = {} |
| 247 | existing_pods = KubernetesServiceInstance.objects.all() |
| 248 | for pod in existing_pods: |
| 249 | xos_pods_by_name[pod.name] = pod |
| 250 | |
| 251 | kubernetes_services = KubernetesService.objects.all() |
| 252 | if len(kubernetes_services)==0: |
| 253 | raise Exception("There are no Kubernetes Services yet") |
| 254 | if len(kubernetes_services)>1: |
| 255 | # Simplifying assumption -- there is only one Kubernetes Service |
| 256 | raise Exception("There are too many Kubernetes Services") |
| 257 | kubernetes_service = kubernetes_services[0] |
| 258 | |
| 259 | # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod. |
| 260 | for (k,pod) in k8s_pods_by_name.items(): |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 261 | try: |
| 262 | if not k in xos_pods_by_name: |
| 263 | trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service) |
| 264 | if not trust_domain: |
| 265 | log.warning("Unable to determine trust_domain for %s" % k) |
| 266 | continue |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 267 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 268 | principal = self.get_principal_from_pod(pod, trust_domain) |
| 269 | slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal) |
| 270 | image = self.get_image_from_pod(pod) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 271 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 272 | if not slice: |
| 273 | log.warning("Unable to determine slice for %s" % k) |
| 274 | continue |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 275 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 276 | xos_pod = KubernetesServiceInstance(name=k, |
| 277 | pod_ip = pod.status.pod_ip, |
| 278 | owner = kubernetes_service, |
| 279 | slice = slice, |
| 280 | image = image, |
| 281 | backend_handle = self.obj_to_handle(pod), |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 282 | xos_managed = False, |
| 283 | need_event = True) |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 284 | xos_pod.save() |
| 285 | xos_pods_by_name[k] = xos_pod |
| 286 | log.info("Created XOS POD %s" % xos_pod.name) |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 287 | |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 288 | xos_pod = xos_pods_by_name[k] |
| 289 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 290 | # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP |
| 291 | # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case |
| 292 | # here. |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 293 | if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip): |
| 294 | xos_pod.pod_ip = pod.status.pod_ip |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 295 | xos_pod.need_event = True # Trigger a new kafka event |
| 296 | xos_pod.save(update_fields = ["pod_ip", "need_event"]) |
| 297 | log.info("Updated XOS POD %s" % xos_pod.name) |
| 298 | |
| 299 | # Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If |
| 300 | # so, then we'll try to send the event again later. |
| 301 | if (xos_pod.need_event) and (self.kafka_producer): |
| 302 | if xos_pod.last_event_sent == "created": |
| 303 | event_kind = "updated" |
| 304 | else: |
| 305 | event_kind = "created" |
| 306 | |
| 307 | self.send_notification(xos_pod, pod, event_kind) |
| 308 | |
| 309 | xos_pod.need_event = False |
| 310 | xos_pod.last_event_sent = event_kind |
| 311 | xos_pod.save(update_fields=["need_event", "last_event_sent"]) |
| 312 | |
Scott Baker | e794354 | 2018-05-15 10:00:05 -0700 | [diff] [blame] | 313 | except: |
| 314 | log.exception("Failed to process k8s pod", k=k, pod=pod) |
Scott Baker | ac43a74 | 2018-05-07 16:54:03 -0700 | [diff] [blame] | 315 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 316 | # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted. |
| 317 | for (k,xos_pod) in xos_pods_by_name.items(): |
Scott Baker | 987748d | 2018-08-09 16:14:11 -0700 | [diff] [blame] | 318 | try: |
| 319 | if (not k in k8s_pods_by_name): |
| 320 | if (xos_pod.xos_managed): |
| 321 | # Should we do something so it gets re-created by the syncstep? |
| 322 | pass |
| 323 | else: |
| 324 | self.send_notification(xos_pod, None, "deleted") |
| 325 | xos_pod.delete() |
| 326 | log.info("Deleted XOS POD %s" % k) |
| 327 | except: |
| 328 | log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod) |