blob: 65b657f7eb6f6dd13b634303e89ce56f5899a856 [file] [log] [blame]
Scott Baker82b2b082018-04-16 16:02:14 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16 pull_pods.py
17
18 Implements a syncstep to pull information about pods form Kubernetes.
19"""
20
Scott Baker987748d2018-08-09 16:14:11 -070021import json
22
Scott Baker82b2b082018-04-16 16:02:14 -070023from synchronizers.new_base.pullstep import PullStep
Scott Baker3fd18e52018-04-17 09:18:21 -070024from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
25 TrustDomain, Site, Image
Scott Baker82b2b082018-04-16 16:02:14 -070026
27from xosconfig import Config
28from multistructlog import create_logger
Zack Williams3dc97602018-09-13 22:33:26 -070029from xoskafka import XOSKafkaProducer
Scott Baker82b2b082018-04-16 16:02:14 -070030
Scott Baker82b2b082018-04-16 16:02:14 -070031log = create_logger(Config().get('logging'))
32
Zack Williams3dc97602018-09-13 22:33:26 -070033
Scott Baker82b2b082018-04-16 16:02:14 -070034class KubernetesServiceInstancePullStep(PullStep):
35 """
36 KubernetesServiceInstancePullStep
37
Scott Baker3fd18e52018-04-17 09:18:21 -070038 Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance
39 if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created
40 as necessary to fill the required dependencies of the KubernetesServiceInstance.
Scott Baker82b2b082018-04-16 16:02:14 -070041 """
42
43 def __init__(self):
44 super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
Scott Baker987748d2018-08-09 16:14:11 -070045
Scott Baker13e953c2018-05-17 09:19:15 -070046 self.init_kubernetes_client()
Scott Baker82b2b082018-04-16 16:02:14 -070047
Scott Baker13e953c2018-05-17 09:19:15 -070048 def init_kubernetes_client(self):
49 from kubernetes import client as kubernetes_client, config as kubernetes_config
Scott Baker82b2b082018-04-16 16:02:14 -070050 kubernetes_config.load_incluster_config()
Scott Baker3fd18e52018-04-17 09:18:21 -070051 self.v1core = kubernetes_client.CoreV1Api()
52 self.v1apps = kubernetes_client.AppsV1Api()
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070053 self.v1batch = kubernetes_client.BatchV1Api()
Scott Baker3fd18e52018-04-17 09:18:21 -070054
55 def obj_to_handle(self, obj):
56 """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
57 Kubernetes.
58 """
59 return obj.metadata.self_link
60
61 def read_obj_kind(self, kind, name, trust_domain):
62 """ Given an object kind and name, read it from Kubernetes """
63 if kind == "ReplicaSet":
64 resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name)
65 elif kind == "StatefulSet":
Scott Bakere7943542018-05-15 10:00:05 -070066 resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070067 elif kind == "DaemonSet":
68 resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name)
69 elif kind == "Deployment":
70 resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070071 elif kind == "Job":
72 resource = self.v1batch.read_namespaced_job(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070073 else:
74 resource = None
75 return resource
76
77 def get_controller_from_obj(self, obj, trust_domain, depth=0):
78 """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that
79 is marked as a controller, but does not have any owners.
80
81 This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over
82 the ReplicaSet and return the Deployment.
83 """
84
85 owner_references = obj.metadata.owner_references
86 if not owner_references:
87 if (depth==0):
88 # If depth is zero, then we're still looking at the object, not a controller.
89 return None
90 return obj
91
92 for owner_reference in owner_references:
93 if not getattr(owner_reference, "controller", False):
94 continue
95 owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070096 if not owner:
97 log.warning("failed to fetch owner", owner_reference=owner_reference)
98 continue
Scott Baker3fd18e52018-04-17 09:18:21 -070099 controller = self.get_controller_from_obj(owner, trust_domain, depth+1)
100 if controller:
101 return controller
102
103 return None
104
105 def get_slice_from_pod(self, pod, trust_domain, principal):
106 """ Given a pod, determine which XOS Slice goes with it
107 If the Slice doesn't exist, create it.
108 """
109 controller = self.get_controller_from_obj(pod, trust_domain)
110 if not controller:
111 return None
112
113 slice_name = controller.metadata.name
Matteo Scandolof5a65452018-08-16 18:08:03 -0700114 if hasattr(controller.metadata, "labels") and controller.metadata.labels is not None:
Scott Baker3fd18e52018-04-17 09:18:21 -0700115 if "xos_slice_name" in controller.metadata.labels:
116 # Someone has labeled the controller with an xos slice name. Use it.
117 slice_name = controller.metadata.labels["xos_slice_name"]
118
119 existing_slices = Slice.objects.filter(name = slice_name)
120 if not existing_slices:
121 # TODO(smbaker): atomicity
122 s = Slice(name=slice_name, site = Site.objects.first(),
123 trust_domain=trust_domain,
124 principal=principal,
125 backend_handle=self.obj_to_handle(controller),
126 controller_kind=controller.kind,
127 xos_managed=False)
128 s.save()
129 return s
130 else:
131 return existing_slices[0]
132
133 def get_trustdomain_from_pod(self, pod, owner_service):
134 """ Given a pod, determine which XOS TrustDomain goes with it
135 If the TrustDomain doesn't exist, create it.
136 """
137 existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace)
138 if not existing_trustdomains:
139 k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace)
140
141 # TODO(smbaker): atomicity
142 t = TrustDomain(name = pod.metadata.namespace,
143 xos_managed=False,
144 owner=owner_service,
145 backend_handle = self.obj_to_handle(k8s_trust_domain))
146 t.save()
147 return t
148 else:
149 return existing_trustdomains[0]
150
151 def get_principal_from_pod(self, pod, trust_domain):
152 """ Given a pod, determine which XOS Principal goes with it
153 If the Principal doesn't exist, create it.
154 """
155 principal_name = getattr(pod.spec, "service_account", None)
156 if not principal_name:
157 return None
158 existing_principals = Principal.objects.filter(name = principal_name)
159 if not existing_principals:
160 k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name)
161
162 # TODO(smbaker): atomicity
163 p = Principal(name = principal_name,
164 trust_domain = trust_domain,
165 xos_managed = False,
166 backend_handle = self.obj_to_handle(k8s_service_account))
167 p.save()
168 return p
169 else:
170 return existing_principals[0]
171
172 def get_image_from_pod(self, pod):
173 """ Given a pod, determine which XOS Image goes with it
174 If the Image doesn't exist, create it.
175 """
176 containers = pod.spec.containers
177 if containers:
178 # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now?
179 container = containers[0]
180 if ":" in container.image:
Matteo Scandolof5a65452018-08-16 18:08:03 -0700181 (name, tag) = container.image.rsplit(":", 1)
Scott Baker3fd18e52018-04-17 09:18:21 -0700182 else:
183 # Is assuming a default necessary?
184 name = container.image
185 tag = "master"
186
187 existing_images = Image.objects.filter(name=name, tag=tag, kind="container")
188 if not existing_images:
189 i = Image(name=name, tag=tag, kind="container", xos_managed=False)
190 i.save()
191 return i
192 else:
193 return existing_images[0]
194 else:
195 return None
Scott Baker82b2b082018-04-16 16:02:14 -0700196
Scott Baker987748d2018-08-09 16:14:11 -0700197 def send_notification(self, xos_pod, k8s_pod, status):
Scott Baker987748d2018-08-09 16:14:11 -0700198
199 event = {"status": status,
Scott Baker3c2b8202018-08-15 10:51:55 -0700200 "name": xos_pod.name,
201 "producer": "k8s-sync"}
Scott Baker987748d2018-08-09 16:14:11 -0700202
203 if xos_pod.id:
204 event["kubernetesserviceinstance_id"] = xos_pod.id
205
206 if k8s_pod:
207 event["labels"] = k8s_pod.metadata.labels
208
209 if k8s_pod.status.pod_ip:
210 event["netinterfaces"] = [{"name": "primary",
211 "addresses": [k8s_pod.status.pod_ip]}]
212
Zack Williams3dc97602018-09-13 22:33:26 -0700213 topic = "xos.kubernetes.pod-details"
214 key = xos_pod.name
215 value = json.dumps(event, default=lambda o: repr(o))
216
217 XOSKafkaProducer.produce(topic, key, value)
218
Scott Baker987748d2018-08-09 16:14:11 -0700219
Scott Baker82b2b082018-04-16 16:02:14 -0700220 def pull_records(self):
Scott Baker3fd18e52018-04-17 09:18:21 -0700221 # Read all pods from Kubernetes, store them in k8s_pods_by_name
222 k8s_pods_by_name = {}
223 ret = self.v1core.list_pod_for_all_namespaces(watch=False)
224 for item in ret.items:
225 k8s_pods_by_name[item.metadata.name] = item
Scott Baker82b2b082018-04-16 16:02:14 -0700226
Scott Baker3fd18e52018-04-17 09:18:21 -0700227 # Read all pods from XOS, store them in xos_pods_by_name
228 xos_pods_by_name = {}
229 existing_pods = KubernetesServiceInstance.objects.all()
230 for pod in existing_pods:
231 xos_pods_by_name[pod.name] = pod
232
233 kubernetes_services = KubernetesService.objects.all()
234 if len(kubernetes_services)==0:
235 raise Exception("There are no Kubernetes Services yet")
236 if len(kubernetes_services)>1:
237 # Simplifying assumption -- there is only one Kubernetes Service
238 raise Exception("There are too many Kubernetes Services")
239 kubernetes_service = kubernetes_services[0]
240
241 # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod.
242 for (k,pod) in k8s_pods_by_name.items():
Scott Bakere7943542018-05-15 10:00:05 -0700243 try:
244 if not k in xos_pods_by_name:
245 trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service)
246 if not trust_domain:
247 log.warning("Unable to determine trust_domain for %s" % k)
248 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700249
Scott Bakere7943542018-05-15 10:00:05 -0700250 principal = self.get_principal_from_pod(pod, trust_domain)
251 slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal)
252 image = self.get_image_from_pod(pod)
Scott Baker3fd18e52018-04-17 09:18:21 -0700253
Scott Bakere7943542018-05-15 10:00:05 -0700254 if not slice:
255 log.warning("Unable to determine slice for %s" % k)
256 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700257
Scott Bakere7943542018-05-15 10:00:05 -0700258 xos_pod = KubernetesServiceInstance(name=k,
259 pod_ip = pod.status.pod_ip,
260 owner = kubernetes_service,
261 slice = slice,
262 image = image,
263 backend_handle = self.obj_to_handle(pod),
Scott Baker987748d2018-08-09 16:14:11 -0700264 xos_managed = False,
265 need_event = True)
Scott Bakere7943542018-05-15 10:00:05 -0700266 xos_pod.save()
267 xos_pods_by_name[k] = xos_pod
268 log.info("Created XOS POD %s" % xos_pod.name)
Scott Baker3fd18e52018-04-17 09:18:21 -0700269
Scott Baker987748d2018-08-09 16:14:11 -0700270 xos_pod = xos_pods_by_name[k]
271
Scott Bakere7943542018-05-15 10:00:05 -0700272 # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP
273 # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case
274 # here.
Scott Bakere7943542018-05-15 10:00:05 -0700275 if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip):
276 xos_pod.pod_ip = pod.status.pod_ip
Scott Baker987748d2018-08-09 16:14:11 -0700277 xos_pod.need_event = True # Trigger a new kafka event
278 xos_pod.save(update_fields = ["pod_ip", "need_event"])
279 log.info("Updated XOS POD %s" % xos_pod.name)
280
281 # Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If
282 # so, then we'll try to send the event again later.
Zack Williams3dc97602018-09-13 22:33:26 -0700283 if (xos_pod.need_event):
Scott Baker987748d2018-08-09 16:14:11 -0700284 if xos_pod.last_event_sent == "created":
285 event_kind = "updated"
286 else:
287 event_kind = "created"
288
289 self.send_notification(xos_pod, pod, event_kind)
290
291 xos_pod.need_event = False
292 xos_pod.last_event_sent = event_kind
293 xos_pod.save(update_fields=["need_event", "last_event_sent"])
294
Scott Bakere7943542018-05-15 10:00:05 -0700295 except:
296 log.exception("Failed to process k8s pod", k=k, pod=pod)
Scott Bakerac43a742018-05-07 16:54:03 -0700297
Scott Baker3fd18e52018-04-17 09:18:21 -0700298 # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
299 for (k,xos_pod) in xos_pods_by_name.items():
Scott Baker987748d2018-08-09 16:14:11 -0700300 try:
301 if (not k in k8s_pods_by_name):
302 if (xos_pod.xos_managed):
303 # Should we do something so it gets re-created by the syncstep?
304 pass
305 else:
306 self.send_notification(xos_pod, None, "deleted")
307 xos_pod.delete()
308 log.info("Deleted XOS POD %s" % k)
309 except:
310 log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod)