blob: 89c970be227be632f6910e89bc517f905be2b4f5 [file] [log] [blame]
Scott Baker82b2b082018-04-16 16:02:14 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16 pull_pods.py
17
18 Implements a syncstep to pull information about pods form Kubernetes.
19"""
20
Scott Baker987748d2018-08-09 16:14:11 -070021import json
22
Scott Baker82b2b082018-04-16 16:02:14 -070023from synchronizers.new_base.pullstep import PullStep
Scott Baker3fd18e52018-04-17 09:18:21 -070024from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
25 TrustDomain, Site, Image
Scott Baker82b2b082018-04-16 16:02:14 -070026
27from xosconfig import Config
28from multistructlog import create_logger
29
Scott Baker82b2b082018-04-16 16:02:14 -070030log = create_logger(Config().get('logging'))
31
32class KubernetesServiceInstancePullStep(PullStep):
33 """
34 KubernetesServiceInstancePullStep
35
Scott Baker3fd18e52018-04-17 09:18:21 -070036 Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance
37 if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created
38 as necessary to fill the required dependencies of the KubernetesServiceInstance.
Scott Baker82b2b082018-04-16 16:02:14 -070039 """
40
41 def __init__(self):
42 super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
Scott Baker987748d2018-08-09 16:14:11 -070043
44 self.kafka_producer = None
45 if Config.get("event_bus.endpoint"):
46 try:
47 self.init_kafka_producer()
48 except:
49 log.exception("Failed to initialize Kafka producer")
50
Scott Baker13e953c2018-05-17 09:19:15 -070051 self.init_kubernetes_client()
Scott Baker82b2b082018-04-16 16:02:14 -070052
Scott Baker13e953c2018-05-17 09:19:15 -070053 def init_kubernetes_client(self):
54 from kubernetes import client as kubernetes_client, config as kubernetes_config
Scott Baker82b2b082018-04-16 16:02:14 -070055 kubernetes_config.load_incluster_config()
Scott Baker3fd18e52018-04-17 09:18:21 -070056 self.v1core = kubernetes_client.CoreV1Api()
57 self.v1apps = kubernetes_client.AppsV1Api()
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070058 self.v1batch = kubernetes_client.BatchV1Api()
Scott Baker3fd18e52018-04-17 09:18:21 -070059
Scott Baker987748d2018-08-09 16:14:11 -070060 def init_kafka_producer(self):
61 from kafka import KafkaProducer
62 eventbus_kind = Config.get("event_bus.kind")
63 eventbus_endpoint = Config.get("event_bus.endpoint")
64
65 if not eventbus_kind:
66 log.error("Eventbus kind is not configured in synchronizer config file.")
67 return
68
69 if eventbus_kind not in ["kafka"]:
70 log.error("Eventbus kind is set to a technology we do not implement.", eventbus_kind=eventbus_kind)
71 return
72
73 self.kafka_producer = KafkaProducer(bootstrap_servers = [eventbus_endpoint])
74
Scott Baker3fd18e52018-04-17 09:18:21 -070075 def obj_to_handle(self, obj):
76 """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
77 Kubernetes.
78 """
79 return obj.metadata.self_link
80
81 def read_obj_kind(self, kind, name, trust_domain):
82 """ Given an object kind and name, read it from Kubernetes """
83 if kind == "ReplicaSet":
84 resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name)
85 elif kind == "StatefulSet":
Scott Bakere7943542018-05-15 10:00:05 -070086 resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070087 elif kind == "DaemonSet":
88 resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name)
89 elif kind == "Deployment":
90 resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070091 elif kind == "Job":
92 resource = self.v1batch.read_namespaced_job(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070093 else:
94 resource = None
95 return resource
96
97 def get_controller_from_obj(self, obj, trust_domain, depth=0):
98 """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that
99 is marked as a controller, but does not have any owners.
100
101 This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over
102 the ReplicaSet and return the Deployment.
103 """
104
105 owner_references = obj.metadata.owner_references
106 if not owner_references:
107 if (depth==0):
108 # If depth is zero, then we're still looking at the object, not a controller.
109 return None
110 return obj
111
112 for owner_reference in owner_references:
113 if not getattr(owner_reference, "controller", False):
114 continue
115 owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -0700116 if not owner:
117 log.warning("failed to fetch owner", owner_reference=owner_reference)
118 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700119 controller = self.get_controller_from_obj(owner, trust_domain, depth+1)
120 if controller:
121 return controller
122
123 return None
124
125 def get_slice_from_pod(self, pod, trust_domain, principal):
126 """ Given a pod, determine which XOS Slice goes with it
127 If the Slice doesn't exist, create it.
128 """
129 controller = self.get_controller_from_obj(pod, trust_domain)
130 if not controller:
131 return None
132
133 slice_name = controller.metadata.name
134 if hasattr(controller.metadata, "labels"):
135 if "xos_slice_name" in controller.metadata.labels:
136 # Someone has labeled the controller with an xos slice name. Use it.
137 slice_name = controller.metadata.labels["xos_slice_name"]
138
139 existing_slices = Slice.objects.filter(name = slice_name)
140 if not existing_slices:
141 # TODO(smbaker): atomicity
142 s = Slice(name=slice_name, site = Site.objects.first(),
143 trust_domain=trust_domain,
144 principal=principal,
145 backend_handle=self.obj_to_handle(controller),
146 controller_kind=controller.kind,
147 xos_managed=False)
148 s.save()
149 return s
150 else:
151 return existing_slices[0]
152
153 def get_trustdomain_from_pod(self, pod, owner_service):
154 """ Given a pod, determine which XOS TrustDomain goes with it
155 If the TrustDomain doesn't exist, create it.
156 """
157 existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace)
158 if not existing_trustdomains:
159 k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace)
160
161 # TODO(smbaker): atomicity
162 t = TrustDomain(name = pod.metadata.namespace,
163 xos_managed=False,
164 owner=owner_service,
165 backend_handle = self.obj_to_handle(k8s_trust_domain))
166 t.save()
167 return t
168 else:
169 return existing_trustdomains[0]
170
171 def get_principal_from_pod(self, pod, trust_domain):
172 """ Given a pod, determine which XOS Principal goes with it
173 If the Principal doesn't exist, create it.
174 """
175 principal_name = getattr(pod.spec, "service_account", None)
176 if not principal_name:
177 return None
178 existing_principals = Principal.objects.filter(name = principal_name)
179 if not existing_principals:
180 k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name)
181
182 # TODO(smbaker): atomicity
183 p = Principal(name = principal_name,
184 trust_domain = trust_domain,
185 xos_managed = False,
186 backend_handle = self.obj_to_handle(k8s_service_account))
187 p.save()
188 return p
189 else:
190 return existing_principals[0]
191
192 def get_image_from_pod(self, pod):
193 """ Given a pod, determine which XOS Image goes with it
194 If the Image doesn't exist, create it.
195 """
196 containers = pod.spec.containers
197 if containers:
198 # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now?
199 container = containers[0]
200 if ":" in container.image:
201 (name, tag) = container.image.split(":")
202 else:
203 # Is assuming a default necessary?
204 name = container.image
205 tag = "master"
206
207 existing_images = Image.objects.filter(name=name, tag=tag, kind="container")
208 if not existing_images:
209 i = Image(name=name, tag=tag, kind="container", xos_managed=False)
210 i.save()
211 return i
212 else:
213 return existing_images[0]
214 else:
215 return None
Scott Baker82b2b082018-04-16 16:02:14 -0700216
Scott Baker987748d2018-08-09 16:14:11 -0700217 def send_notification(self, xos_pod, k8s_pod, status):
218 if not self.kafka_producer:
219 return
220
221 event = {"status": status,
Scott Baker3c2b8202018-08-15 10:51:55 -0700222 "name": xos_pod.name,
223 "producer": "k8s-sync"}
Scott Baker987748d2018-08-09 16:14:11 -0700224
225 if xos_pod.id:
226 event["kubernetesserviceinstance_id"] = xos_pod.id
227
228 if k8s_pod:
229 event["labels"] = k8s_pod.metadata.labels
230
231 if k8s_pod.status.pod_ip:
232 event["netinterfaces"] = [{"name": "primary",
233 "addresses": [k8s_pod.status.pod_ip]}]
234
235 self.kafka_producer.send("xos.kubernetes.pod-details", json.dumps(event))
236 self.kafka_producer.flush()
237
Scott Baker82b2b082018-04-16 16:02:14 -0700238 def pull_records(self):
Scott Baker3fd18e52018-04-17 09:18:21 -0700239 # Read all pods from Kubernetes, store them in k8s_pods_by_name
240 k8s_pods_by_name = {}
241 ret = self.v1core.list_pod_for_all_namespaces(watch=False)
242 for item in ret.items:
243 k8s_pods_by_name[item.metadata.name] = item
Scott Baker82b2b082018-04-16 16:02:14 -0700244
Scott Baker3fd18e52018-04-17 09:18:21 -0700245 # Read all pods from XOS, store them in xos_pods_by_name
246 xos_pods_by_name = {}
247 existing_pods = KubernetesServiceInstance.objects.all()
248 for pod in existing_pods:
249 xos_pods_by_name[pod.name] = pod
250
251 kubernetes_services = KubernetesService.objects.all()
252 if len(kubernetes_services)==0:
253 raise Exception("There are no Kubernetes Services yet")
254 if len(kubernetes_services)>1:
255 # Simplifying assumption -- there is only one Kubernetes Service
256 raise Exception("There are too many Kubernetes Services")
257 kubernetes_service = kubernetes_services[0]
258
259 # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod.
260 for (k,pod) in k8s_pods_by_name.items():
Scott Bakere7943542018-05-15 10:00:05 -0700261 try:
262 if not k in xos_pods_by_name:
263 trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service)
264 if not trust_domain:
265 log.warning("Unable to determine trust_domain for %s" % k)
266 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700267
Scott Bakere7943542018-05-15 10:00:05 -0700268 principal = self.get_principal_from_pod(pod, trust_domain)
269 slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal)
270 image = self.get_image_from_pod(pod)
Scott Baker3fd18e52018-04-17 09:18:21 -0700271
Scott Bakere7943542018-05-15 10:00:05 -0700272 if not slice:
273 log.warning("Unable to determine slice for %s" % k)
274 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700275
Scott Bakere7943542018-05-15 10:00:05 -0700276 xos_pod = KubernetesServiceInstance(name=k,
277 pod_ip = pod.status.pod_ip,
278 owner = kubernetes_service,
279 slice = slice,
280 image = image,
281 backend_handle = self.obj_to_handle(pod),
Scott Baker987748d2018-08-09 16:14:11 -0700282 xos_managed = False,
283 need_event = True)
Scott Bakere7943542018-05-15 10:00:05 -0700284 xos_pod.save()
285 xos_pods_by_name[k] = xos_pod
286 log.info("Created XOS POD %s" % xos_pod.name)
Scott Baker3fd18e52018-04-17 09:18:21 -0700287
Scott Baker987748d2018-08-09 16:14:11 -0700288 xos_pod = xos_pods_by_name[k]
289
Scott Bakere7943542018-05-15 10:00:05 -0700290 # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP
291 # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case
292 # here.
Scott Bakere7943542018-05-15 10:00:05 -0700293 if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip):
294 xos_pod.pod_ip = pod.status.pod_ip
Scott Baker987748d2018-08-09 16:14:11 -0700295 xos_pod.need_event = True # Trigger a new kafka event
296 xos_pod.save(update_fields = ["pod_ip", "need_event"])
297 log.info("Updated XOS POD %s" % xos_pod.name)
298
299 # Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If
300 # so, then we'll try to send the event again later.
301 if (xos_pod.need_event) and (self.kafka_producer):
302 if xos_pod.last_event_sent == "created":
303 event_kind = "updated"
304 else:
305 event_kind = "created"
306
307 self.send_notification(xos_pod, pod, event_kind)
308
309 xos_pod.need_event = False
310 xos_pod.last_event_sent = event_kind
311 xos_pod.save(update_fields=["need_event", "last_event_sent"])
312
Scott Bakere7943542018-05-15 10:00:05 -0700313 except:
314 log.exception("Failed to process k8s pod", k=k, pod=pod)
Scott Bakerac43a742018-05-07 16:54:03 -0700315
Scott Baker3fd18e52018-04-17 09:18:21 -0700316 # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
317 for (k,xos_pod) in xos_pods_by_name.items():
Scott Baker987748d2018-08-09 16:14:11 -0700318 try:
319 if (not k in k8s_pods_by_name):
320 if (xos_pod.xos_managed):
321 # Should we do something so it gets re-created by the syncstep?
322 pass
323 else:
324 self.send_notification(xos_pod, None, "deleted")
325 xos_pod.delete()
326 log.info("Deleted XOS POD %s" % k)
327 except:
328 log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod)