blob: 661d30474b2076497ea33c36d2cad325106237e8 [file] [log] [blame]
Scott Baker82b2b082018-04-16 16:02:14 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16 pull_pods.py
17
18 Implements a syncstep to pull information about pods form Kubernetes.
19"""
20
Scott Baker987748d2018-08-09 16:14:11 -070021import json
22
Scott Bakera30fae72019-02-01 16:14:43 -080023from xossynchronizer.pull_steps.pullstep import PullStep
24from xossynchronizer.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
Scott Baker3fd18e52018-04-17 09:18:21 -070025 TrustDomain, Site, Image
Scott Baker82b2b082018-04-16 16:02:14 -070026
27from xosconfig import Config
28from multistructlog import create_logger
Zack Williams3dc97602018-09-13 22:33:26 -070029from xoskafka import XOSKafkaProducer
Scott Baker82b2b082018-04-16 16:02:14 -070030
Scott Baker82b2b082018-04-16 16:02:14 -070031log = create_logger(Config().get('logging'))
32
Zack Williams3dc97602018-09-13 22:33:26 -070033
Scott Baker82b2b082018-04-16 16:02:14 -070034class KubernetesServiceInstancePullStep(PullStep):
35 """
36 KubernetesServiceInstancePullStep
37
Scott Baker3fd18e52018-04-17 09:18:21 -070038 Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance
39 if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created
40 as necessary to fill the required dependencies of the KubernetesServiceInstance.
Scott Baker82b2b082018-04-16 16:02:14 -070041 """
42
43 def __init__(self):
44 super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
Scott Baker987748d2018-08-09 16:14:11 -070045
Scott Baker13e953c2018-05-17 09:19:15 -070046 self.init_kubernetes_client()
Scott Baker82b2b082018-04-16 16:02:14 -070047
Scott Baker13e953c2018-05-17 09:19:15 -070048 def init_kubernetes_client(self):
49 from kubernetes import client as kubernetes_client, config as kubernetes_config
Scott Baker82b2b082018-04-16 16:02:14 -070050 kubernetes_config.load_incluster_config()
Scott Baker3fd18e52018-04-17 09:18:21 -070051 self.v1core = kubernetes_client.CoreV1Api()
52 self.v1apps = kubernetes_client.AppsV1Api()
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070053 self.v1batch = kubernetes_client.BatchV1Api()
Scott Baker3fd18e52018-04-17 09:18:21 -070054
55 def obj_to_handle(self, obj):
56 """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
57 Kubernetes.
58 """
59 return obj.metadata.self_link
60
61 def read_obj_kind(self, kind, name, trust_domain):
62 """ Given an object kind and name, read it from Kubernetes """
63 if kind == "ReplicaSet":
64 resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name)
65 elif kind == "StatefulSet":
Scott Bakere7943542018-05-15 10:00:05 -070066 resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070067 elif kind == "DaemonSet":
68 resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name)
69 elif kind == "Deployment":
70 resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070071 elif kind == "Job":
72 resource = self.v1batch.read_namespaced_job(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070073 else:
74 resource = None
75 return resource
76
77 def get_controller_from_obj(self, obj, trust_domain, depth=0):
78 """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that
79 is marked as a controller, but does not have any owners.
80
81 This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over
82 the ReplicaSet and return the Deployment.
83 """
84
85 owner_references = obj.metadata.owner_references
86 if not owner_references:
87 if (depth==0):
88 # If depth is zero, then we're still looking at the object, not a controller.
89 return None
90 return obj
91
92 for owner_reference in owner_references:
93 if not getattr(owner_reference, "controller", False):
94 continue
95 owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070096 if not owner:
Scott Baker04dd3e62018-11-27 17:17:13 -080097 # Failed to fetch the owner, probably because the owner's kind is something we do not understand. An
98 # example is the etcd-cluser pod, which is owned by a deployment of kind "EtcdCluster".
99 log.debug("failed to fetch owner", owner_reference=owner_reference)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -0700100 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700101 controller = self.get_controller_from_obj(owner, trust_domain, depth+1)
102 if controller:
103 return controller
104
105 return None
106
107 def get_slice_from_pod(self, pod, trust_domain, principal):
108 """ Given a pod, determine which XOS Slice goes with it
109 If the Slice doesn't exist, create it.
110 """
111 controller = self.get_controller_from_obj(pod, trust_domain)
112 if not controller:
113 return None
114
115 slice_name = controller.metadata.name
Matteo Scandolof5a65452018-08-16 18:08:03 -0700116 if hasattr(controller.metadata, "labels") and controller.metadata.labels is not None:
Scott Baker3fd18e52018-04-17 09:18:21 -0700117 if "xos_slice_name" in controller.metadata.labels:
118 # Someone has labeled the controller with an xos slice name. Use it.
119 slice_name = controller.metadata.labels["xos_slice_name"]
120
121 existing_slices = Slice.objects.filter(name = slice_name)
122 if not existing_slices:
123 # TODO(smbaker): atomicity
124 s = Slice(name=slice_name, site = Site.objects.first(),
125 trust_domain=trust_domain,
126 principal=principal,
127 backend_handle=self.obj_to_handle(controller),
128 controller_kind=controller.kind,
129 xos_managed=False)
130 s.save()
131 return s
132 else:
133 return existing_slices[0]
134
135 def get_trustdomain_from_pod(self, pod, owner_service):
136 """ Given a pod, determine which XOS TrustDomain goes with it
137 If the TrustDomain doesn't exist, create it.
138 """
139 existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace)
140 if not existing_trustdomains:
141 k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace)
142
143 # TODO(smbaker): atomicity
144 t = TrustDomain(name = pod.metadata.namespace,
145 xos_managed=False,
146 owner=owner_service,
147 backend_handle = self.obj_to_handle(k8s_trust_domain))
148 t.save()
149 return t
150 else:
151 return existing_trustdomains[0]
152
153 def get_principal_from_pod(self, pod, trust_domain):
154 """ Given a pod, determine which XOS Principal goes with it
155 If the Principal doesn't exist, create it.
156 """
157 principal_name = getattr(pod.spec, "service_account", None)
158 if not principal_name:
159 return None
160 existing_principals = Principal.objects.filter(name = principal_name)
161 if not existing_principals:
162 k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name)
163
164 # TODO(smbaker): atomicity
165 p = Principal(name = principal_name,
166 trust_domain = trust_domain,
167 xos_managed = False,
168 backend_handle = self.obj_to_handle(k8s_service_account))
169 p.save()
170 return p
171 else:
172 return existing_principals[0]
173
174 def get_image_from_pod(self, pod):
175 """ Given a pod, determine which XOS Image goes with it
176 If the Image doesn't exist, create it.
177 """
178 containers = pod.spec.containers
179 if containers:
180 # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now?
181 container = containers[0]
182 if ":" in container.image:
Matteo Scandolof5a65452018-08-16 18:08:03 -0700183 (name, tag) = container.image.rsplit(":", 1)
Scott Baker3fd18e52018-04-17 09:18:21 -0700184 else:
185 # Is assuming a default necessary?
186 name = container.image
187 tag = "master"
188
Matteo Scandoloc72dce02019-02-22 10:23:28 -0800189 # FIXME image.name is unique, but tag may differ. Update validation in the Image model so that the combination of name and tag is unique
Scott Baker3fd18e52018-04-17 09:18:21 -0700190 existing_images = Image.objects.filter(name=name, tag=tag, kind="container")
191 if not existing_images:
192 i = Image(name=name, tag=tag, kind="container", xos_managed=False)
193 i.save()
194 return i
195 else:
196 return existing_images[0]
197 else:
198 return None
Scott Baker82b2b082018-04-16 16:02:14 -0700199
Scott Baker987748d2018-08-09 16:14:11 -0700200 def send_notification(self, xos_pod, k8s_pod, status):
Scott Baker987748d2018-08-09 16:14:11 -0700201
202 event = {"status": status,
Scott Baker3c2b8202018-08-15 10:51:55 -0700203 "name": xos_pod.name,
204 "producer": "k8s-sync"}
Scott Baker987748d2018-08-09 16:14:11 -0700205
206 if xos_pod.id:
207 event["kubernetesserviceinstance_id"] = xos_pod.id
208
209 if k8s_pod:
210 event["labels"] = k8s_pod.metadata.labels
211
212 if k8s_pod.status.pod_ip:
213 event["netinterfaces"] = [{"name": "primary",
214 "addresses": [k8s_pod.status.pod_ip]}]
215
Zack Williams3dc97602018-09-13 22:33:26 -0700216 topic = "xos.kubernetes.pod-details"
217 key = xos_pod.name
218 value = json.dumps(event, default=lambda o: repr(o))
219
220 XOSKafkaProducer.produce(topic, key, value)
221
Scott Baker987748d2018-08-09 16:14:11 -0700222
Scott Baker82b2b082018-04-16 16:02:14 -0700223 def pull_records(self):
Scott Baker3fd18e52018-04-17 09:18:21 -0700224 # Read all pods from Kubernetes, store them in k8s_pods_by_name
225 k8s_pods_by_name = {}
226 ret = self.v1core.list_pod_for_all_namespaces(watch=False)
227 for item in ret.items:
228 k8s_pods_by_name[item.metadata.name] = item
Scott Baker82b2b082018-04-16 16:02:14 -0700229
Scott Baker3fd18e52018-04-17 09:18:21 -0700230 # Read all pods from XOS, store them in xos_pods_by_name
231 xos_pods_by_name = {}
232 existing_pods = KubernetesServiceInstance.objects.all()
233 for pod in existing_pods:
234 xos_pods_by_name[pod.name] = pod
235
236 kubernetes_services = KubernetesService.objects.all()
237 if len(kubernetes_services)==0:
238 raise Exception("There are no Kubernetes Services yet")
239 if len(kubernetes_services)>1:
240 # Simplifying assumption -- there is only one Kubernetes Service
241 raise Exception("There are too many Kubernetes Services")
242 kubernetes_service = kubernetes_services[0]
243
244 # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod.
245 for (k,pod) in k8s_pods_by_name.items():
Scott Bakere7943542018-05-15 10:00:05 -0700246 try:
247 if not k in xos_pods_by_name:
248 trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service)
249 if not trust_domain:
Scott Baker04dd3e62018-11-27 17:17:13 -0800250 # All kubernetes pods should belong to a namespace. If we can't find the namespace, then
251 # something is very wrong in K8s.
252 log.warning("Unable to determine trust_domain for pod %s. Ignoring." % k)
Scott Bakere7943542018-05-15 10:00:05 -0700253 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700254
Scott Bakere7943542018-05-15 10:00:05 -0700255 principal = self.get_principal_from_pod(pod, trust_domain)
256 slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal)
257 image = self.get_image_from_pod(pod)
Scott Baker3fd18e52018-04-17 09:18:21 -0700258
Scott Bakere7943542018-05-15 10:00:05 -0700259 if not slice:
Scott Baker04dd3e62018-11-27 17:17:13 -0800260 # We could get here if the pod doesn't have a controller, or if the controller is of a kind
261 # that we don't understand (such as the Etcd controller). If so, the pod is not something we
262 # are interested in.
263 log.debug("Unable to determine slice for pod %s. Ignoring." % k)
Scott Bakere7943542018-05-15 10:00:05 -0700264 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700265
Scott Bakere7943542018-05-15 10:00:05 -0700266 xos_pod = KubernetesServiceInstance(name=k,
267 pod_ip = pod.status.pod_ip,
268 owner = kubernetes_service,
269 slice = slice,
270 image = image,
271 backend_handle = self.obj_to_handle(pod),
Scott Baker987748d2018-08-09 16:14:11 -0700272 xos_managed = False,
273 need_event = True)
Scott Bakere7943542018-05-15 10:00:05 -0700274 xos_pod.save()
275 xos_pods_by_name[k] = xos_pod
276 log.info("Created XOS POD %s" % xos_pod.name)
Scott Baker3fd18e52018-04-17 09:18:21 -0700277
Scott Baker987748d2018-08-09 16:14:11 -0700278 xos_pod = xos_pods_by_name[k]
279
Scott Bakere7943542018-05-15 10:00:05 -0700280 # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP
281 # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case
282 # here.
Scott Bakere7943542018-05-15 10:00:05 -0700283 if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip):
284 xos_pod.pod_ip = pod.status.pod_ip
Scott Baker987748d2018-08-09 16:14:11 -0700285 xos_pod.need_event = True # Trigger a new kafka event
286 xos_pod.save(update_fields = ["pod_ip", "need_event"])
287 log.info("Updated XOS POD %s" % xos_pod.name)
288
289 # Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If
290 # so, then we'll try to send the event again later.
Zack Williams3dc97602018-09-13 22:33:26 -0700291 if (xos_pod.need_event):
Scott Baker987748d2018-08-09 16:14:11 -0700292 if xos_pod.last_event_sent == "created":
293 event_kind = "updated"
294 else:
295 event_kind = "created"
296
297 self.send_notification(xos_pod, pod, event_kind)
298
299 xos_pod.need_event = False
300 xos_pod.last_event_sent = event_kind
301 xos_pod.save(update_fields=["need_event", "last_event_sent"])
302
Scott Bakere7943542018-05-15 10:00:05 -0700303 except:
304 log.exception("Failed to process k8s pod", k=k, pod=pod)
Scott Bakerac43a742018-05-07 16:54:03 -0700305
Scott Baker3fd18e52018-04-17 09:18:21 -0700306 # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
307 for (k,xos_pod) in xos_pods_by_name.items():
Scott Baker987748d2018-08-09 16:14:11 -0700308 try:
309 if (not k in k8s_pods_by_name):
310 if (xos_pod.xos_managed):
311 # Should we do something so it gets re-created by the syncstep?
312 pass
313 else:
314 self.send_notification(xos_pod, None, "deleted")
315 xos_pod.delete()
316 log.info("Deleted XOS POD %s" % k)
317 except:
318 log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod)