Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 1 | # Copyright 2017-present Open Networking Foundation |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | |
| 15 | """ |
| 16 | pull_pods.py |
| 17 | |
| 18 | Implements a syncstep to pull information about pods form Kubernetes. |
| 19 | """ |
| 20 | |
| 21 | from synchronizers.new_base.pullstep import PullStep |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 22 | from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \ |
| 23 | TrustDomain, Site, Image |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 24 | |
| 25 | from xosconfig import Config |
| 26 | from multistructlog import create_logger |
| 27 | |
| 28 | from kubernetes import client as kubernetes_client, config as kubernetes_config |
| 29 | |
| 30 | log = create_logger(Config().get('logging')) |
| 31 | |
| 32 | class KubernetesServiceInstancePullStep(PullStep): |
| 33 | """ |
| 34 | KubernetesServiceInstancePullStep |
| 35 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 36 | Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance |
| 37 | if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created |
| 38 | as necessary to fill the required dependencies of the KubernetesServiceInstance. |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 39 | """ |
| 40 | |
| 41 | def __init__(self): |
| 42 | super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance) |
| 43 | |
| 44 | kubernetes_config.load_incluster_config() |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 45 | self.v1core = kubernetes_client.CoreV1Api() |
| 46 | self.v1apps = kubernetes_client.AppsV1Api() |
| 47 | |
| 48 | def obj_to_handle(self, obj): |
| 49 | """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within |
| 50 | Kubernetes. |
| 51 | """ |
| 52 | return obj.metadata.self_link |
| 53 | |
| 54 | def read_obj_kind(self, kind, name, trust_domain): |
| 55 | """ Given an object kind and name, read it from Kubernetes """ |
| 56 | if kind == "ReplicaSet": |
| 57 | resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name) |
| 58 | elif kind == "StatefulSet": |
| 59 | resource = self.v1apps.read_namespaced_statefule_set(name, trust_domain.name) |
| 60 | elif kind == "DaemonSet": |
| 61 | resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name) |
| 62 | elif kind == "Deployment": |
| 63 | resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name) |
| 64 | else: |
| 65 | resource = None |
| 66 | return resource |
| 67 | |
| 68 | def get_controller_from_obj(self, obj, trust_domain, depth=0): |
| 69 | """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that |
| 70 | is marked as a controller, but does not have any owners. |
| 71 | |
| 72 | This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over |
| 73 | the ReplicaSet and return the Deployment. |
| 74 | """ |
| 75 | |
| 76 | owner_references = obj.metadata.owner_references |
| 77 | if not owner_references: |
| 78 | if (depth==0): |
| 79 | # If depth is zero, then we're still looking at the object, not a controller. |
| 80 | return None |
| 81 | return obj |
| 82 | |
| 83 | for owner_reference in owner_references: |
| 84 | if not getattr(owner_reference, "controller", False): |
| 85 | continue |
| 86 | owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain) |
| 87 | controller = self.get_controller_from_obj(owner, trust_domain, depth+1) |
| 88 | if controller: |
| 89 | return controller |
| 90 | |
| 91 | return None |
| 92 | |
| 93 | def get_slice_from_pod(self, pod, trust_domain, principal): |
| 94 | """ Given a pod, determine which XOS Slice goes with it |
| 95 | If the Slice doesn't exist, create it. |
| 96 | """ |
| 97 | controller = self.get_controller_from_obj(pod, trust_domain) |
| 98 | if not controller: |
| 99 | return None |
| 100 | |
| 101 | slice_name = controller.metadata.name |
| 102 | if hasattr(controller.metadata, "labels"): |
| 103 | if "xos_slice_name" in controller.metadata.labels: |
| 104 | # Someone has labeled the controller with an xos slice name. Use it. |
| 105 | slice_name = controller.metadata.labels["xos_slice_name"] |
| 106 | |
| 107 | existing_slices = Slice.objects.filter(name = slice_name) |
| 108 | if not existing_slices: |
| 109 | # TODO(smbaker): atomicity |
| 110 | s = Slice(name=slice_name, site = Site.objects.first(), |
| 111 | trust_domain=trust_domain, |
| 112 | principal=principal, |
| 113 | backend_handle=self.obj_to_handle(controller), |
| 114 | controller_kind=controller.kind, |
| 115 | xos_managed=False) |
| 116 | s.save() |
| 117 | return s |
| 118 | else: |
| 119 | return existing_slices[0] |
| 120 | |
| 121 | def get_trustdomain_from_pod(self, pod, owner_service): |
| 122 | """ Given a pod, determine which XOS TrustDomain goes with it |
| 123 | If the TrustDomain doesn't exist, create it. |
| 124 | """ |
| 125 | existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace) |
| 126 | if not existing_trustdomains: |
| 127 | k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace) |
| 128 | |
| 129 | # TODO(smbaker): atomicity |
| 130 | t = TrustDomain(name = pod.metadata.namespace, |
| 131 | xos_managed=False, |
| 132 | owner=owner_service, |
| 133 | backend_handle = self.obj_to_handle(k8s_trust_domain)) |
| 134 | t.save() |
| 135 | return t |
| 136 | else: |
| 137 | return existing_trustdomains[0] |
| 138 | |
| 139 | def get_principal_from_pod(self, pod, trust_domain): |
| 140 | """ Given a pod, determine which XOS Principal goes with it |
| 141 | If the Principal doesn't exist, create it. |
| 142 | """ |
| 143 | principal_name = getattr(pod.spec, "service_account", None) |
| 144 | if not principal_name: |
| 145 | return None |
| 146 | existing_principals = Principal.objects.filter(name = principal_name) |
| 147 | if not existing_principals: |
| 148 | k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name) |
| 149 | |
| 150 | # TODO(smbaker): atomicity |
| 151 | p = Principal(name = principal_name, |
| 152 | trust_domain = trust_domain, |
| 153 | xos_managed = False, |
| 154 | backend_handle = self.obj_to_handle(k8s_service_account)) |
| 155 | p.save() |
| 156 | return p |
| 157 | else: |
| 158 | return existing_principals[0] |
| 159 | |
| 160 | def get_image_from_pod(self, pod): |
| 161 | """ Given a pod, determine which XOS Image goes with it |
| 162 | If the Image doesn't exist, create it. |
| 163 | """ |
| 164 | containers = pod.spec.containers |
| 165 | if containers: |
| 166 | # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now? |
| 167 | container = containers[0] |
| 168 | if ":" in container.image: |
| 169 | (name, tag) = container.image.split(":") |
| 170 | else: |
| 171 | # Is assuming a default necessary? |
| 172 | name = container.image |
| 173 | tag = "master" |
| 174 | |
| 175 | existing_images = Image.objects.filter(name=name, tag=tag, kind="container") |
| 176 | if not existing_images: |
| 177 | i = Image(name=name, tag=tag, kind="container", xos_managed=False) |
| 178 | i.save() |
| 179 | return i |
| 180 | else: |
| 181 | return existing_images[0] |
| 182 | else: |
| 183 | return None |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 184 | |
| 185 | def pull_records(self): |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 186 | # Read all pods from Kubernetes, store them in k8s_pods_by_name |
| 187 | k8s_pods_by_name = {} |
| 188 | ret = self.v1core.list_pod_for_all_namespaces(watch=False) |
| 189 | for item in ret.items: |
| 190 | k8s_pods_by_name[item.metadata.name] = item |
Scott Baker | 82b2b08 | 2018-04-16 16:02:14 -0700 | [diff] [blame] | 191 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 192 | # Read all pods from XOS, store them in xos_pods_by_name |
| 193 | xos_pods_by_name = {} |
| 194 | existing_pods = KubernetesServiceInstance.objects.all() |
| 195 | for pod in existing_pods: |
| 196 | xos_pods_by_name[pod.name] = pod |
| 197 | |
| 198 | kubernetes_services = KubernetesService.objects.all() |
| 199 | if len(kubernetes_services)==0: |
| 200 | raise Exception("There are no Kubernetes Services yet") |
| 201 | if len(kubernetes_services)>1: |
| 202 | # Simplifying assumption -- there is only one Kubernetes Service |
| 203 | raise Exception("There are too many Kubernetes Services") |
| 204 | kubernetes_service = kubernetes_services[0] |
| 205 | |
| 206 | # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod. |
| 207 | for (k,pod) in k8s_pods_by_name.items(): |
| 208 | if not k in xos_pods_by_name: |
| 209 | trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service) |
| 210 | if not trust_domain: |
| 211 | log.warning("Unable to determine trust_domain for %s" % k) |
| 212 | continue |
| 213 | |
| 214 | principal = self.get_principal_from_pod(pod, trust_domain) |
| 215 | slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal) |
| 216 | image = self.get_image_from_pod(pod) |
| 217 | |
| 218 | if not slice: |
| 219 | log.warning("Unable to determine slice for %s" % k) |
| 220 | continue |
| 221 | |
| 222 | xos_pod = KubernetesServiceInstance(name=k, |
| 223 | pod_ip = pod.status.pod_ip, |
| 224 | owner = kubernetes_service, |
| 225 | slice = slice, |
| 226 | image = image, |
| 227 | backend_handle = self.obj_to_handle(pod), |
| 228 | xos_managed = False) |
| 229 | xos_pod.save() |
Scott Baker | ac43a74 | 2018-05-07 16:54:03 -0700 | [diff] [blame] | 230 | xos_pods_by_name[k] = xos_pod |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 231 | log.info("Created XOS POD %s" % xos_pod.name) |
| 232 | |
Scott Baker | ac43a74 | 2018-05-07 16:54:03 -0700 | [diff] [blame] | 233 | # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP |
| 234 | # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case |
| 235 | # here. |
| 236 | xos_pod = xos_pods_by_name[k] |
| 237 | if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip): |
| 238 | xos_pod.pod_ip = pod.status.pod_ip |
| 239 | xos_pod.save(update_fields = ["pod_ip"]) |
| 240 | |
Scott Baker | 3fd18e5 | 2018-04-17 09:18:21 -0700 | [diff] [blame] | 241 | # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted. |
| 242 | for (k,xos_pod) in xos_pods_by_name.items(): |
| 243 | if (not k in k8s_pods_by_name): |
| 244 | if (xos_pod.xos_managed): |
| 245 | # Should we do something so it gets re-created by the syncstep? |
| 246 | pass |
| 247 | else: |
| 248 | xos_pod.delete() |
| 249 | log.info("Deleted XOS POD %s" % k) |