blob: 36562b34ee7f8d6dcc8b9753f39eaec504cb2304 [file] [log] [blame]
Scott Baker82b2b082018-04-16 16:02:14 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16 pull_pods.py
17
18 Implements a syncstep to pull information about pods form Kubernetes.
19"""
20
Scott Baker987748d2018-08-09 16:14:11 -070021import json
22
Scott Baker82b2b082018-04-16 16:02:14 -070023from synchronizers.new_base.pullstep import PullStep
Scott Baker3fd18e52018-04-17 09:18:21 -070024from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
25 TrustDomain, Site, Image
Scott Baker82b2b082018-04-16 16:02:14 -070026
27from xosconfig import Config
28from multistructlog import create_logger
Zack Williams3dc97602018-09-13 22:33:26 -070029from xoskafka import XOSKafkaProducer
Scott Baker82b2b082018-04-16 16:02:14 -070030
Scott Baker82b2b082018-04-16 16:02:14 -070031log = create_logger(Config().get('logging'))
32
Zack Williams3dc97602018-09-13 22:33:26 -070033
Scott Baker82b2b082018-04-16 16:02:14 -070034class KubernetesServiceInstancePullStep(PullStep):
35 """
36 KubernetesServiceInstancePullStep
37
Scott Baker3fd18e52018-04-17 09:18:21 -070038 Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance
39 if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created
40 as necessary to fill the required dependencies of the KubernetesServiceInstance.
Scott Baker82b2b082018-04-16 16:02:14 -070041 """
42
43 def __init__(self):
44 super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
Scott Baker987748d2018-08-09 16:14:11 -070045
Scott Baker13e953c2018-05-17 09:19:15 -070046 self.init_kubernetes_client()
Scott Baker82b2b082018-04-16 16:02:14 -070047
Scott Baker13e953c2018-05-17 09:19:15 -070048 def init_kubernetes_client(self):
49 from kubernetes import client as kubernetes_client, config as kubernetes_config
Scott Baker82b2b082018-04-16 16:02:14 -070050 kubernetes_config.load_incluster_config()
Scott Baker3fd18e52018-04-17 09:18:21 -070051 self.v1core = kubernetes_client.CoreV1Api()
52 self.v1apps = kubernetes_client.AppsV1Api()
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070053 self.v1batch = kubernetes_client.BatchV1Api()
Scott Baker3fd18e52018-04-17 09:18:21 -070054
55 def obj_to_handle(self, obj):
56 """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
57 Kubernetes.
58 """
59 return obj.metadata.self_link
60
61 def read_obj_kind(self, kind, name, trust_domain):
62 """ Given an object kind and name, read it from Kubernetes """
63 if kind == "ReplicaSet":
64 resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name)
65 elif kind == "StatefulSet":
Scott Bakere7943542018-05-15 10:00:05 -070066 resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070067 elif kind == "DaemonSet":
68 resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name)
69 elif kind == "Deployment":
70 resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070071 elif kind == "Job":
72 resource = self.v1batch.read_namespaced_job(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070073 else:
74 resource = None
75 return resource
76
77 def get_controller_from_obj(self, obj, trust_domain, depth=0):
78 """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that
79 is marked as a controller, but does not have any owners.
80
81 This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over
82 the ReplicaSet and return the Deployment.
83 """
84
85 owner_references = obj.metadata.owner_references
86 if not owner_references:
87 if (depth==0):
88 # If depth is zero, then we're still looking at the object, not a controller.
89 return None
90 return obj
91
92 for owner_reference in owner_references:
93 if not getattr(owner_reference, "controller", False):
94 continue
95 owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070096 if not owner:
Scott Baker04dd3e62018-11-27 17:17:13 -080097 # Failed to fetch the owner, probably because the owner's kind is something we do not understand. An
98 # example is the etcd-cluser pod, which is owned by a deployment of kind "EtcdCluster".
99 log.debug("failed to fetch owner", owner_reference=owner_reference)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -0700100 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700101 controller = self.get_controller_from_obj(owner, trust_domain, depth+1)
102 if controller:
103 return controller
104
105 return None
106
107 def get_slice_from_pod(self, pod, trust_domain, principal):
108 """ Given a pod, determine which XOS Slice goes with it
109 If the Slice doesn't exist, create it.
110 """
111 controller = self.get_controller_from_obj(pod, trust_domain)
112 if not controller:
113 return None
114
115 slice_name = controller.metadata.name
Matteo Scandolof5a65452018-08-16 18:08:03 -0700116 if hasattr(controller.metadata, "labels") and controller.metadata.labels is not None:
Scott Baker3fd18e52018-04-17 09:18:21 -0700117 if "xos_slice_name" in controller.metadata.labels:
118 # Someone has labeled the controller with an xos slice name. Use it.
119 slice_name = controller.metadata.labels["xos_slice_name"]
120
121 existing_slices = Slice.objects.filter(name = slice_name)
122 if not existing_slices:
123 # TODO(smbaker): atomicity
124 s = Slice(name=slice_name, site = Site.objects.first(),
125 trust_domain=trust_domain,
126 principal=principal,
127 backend_handle=self.obj_to_handle(controller),
128 controller_kind=controller.kind,
129 xos_managed=False)
130 s.save()
131 return s
132 else:
133 return existing_slices[0]
134
135 def get_trustdomain_from_pod(self, pod, owner_service):
136 """ Given a pod, determine which XOS TrustDomain goes with it
137 If the TrustDomain doesn't exist, create it.
138 """
139 existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace)
140 if not existing_trustdomains:
141 k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace)
142
143 # TODO(smbaker): atomicity
144 t = TrustDomain(name = pod.metadata.namespace,
145 xos_managed=False,
146 owner=owner_service,
147 backend_handle = self.obj_to_handle(k8s_trust_domain))
148 t.save()
149 return t
150 else:
151 return existing_trustdomains[0]
152
153 def get_principal_from_pod(self, pod, trust_domain):
154 """ Given a pod, determine which XOS Principal goes with it
155 If the Principal doesn't exist, create it.
156 """
157 principal_name = getattr(pod.spec, "service_account", None)
158 if not principal_name:
159 return None
160 existing_principals = Principal.objects.filter(name = principal_name)
161 if not existing_principals:
162 k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name)
163
164 # TODO(smbaker): atomicity
165 p = Principal(name = principal_name,
166 trust_domain = trust_domain,
167 xos_managed = False,
168 backend_handle = self.obj_to_handle(k8s_service_account))
169 p.save()
170 return p
171 else:
172 return existing_principals[0]
173
174 def get_image_from_pod(self, pod):
175 """ Given a pod, determine which XOS Image goes with it
176 If the Image doesn't exist, create it.
177 """
178 containers = pod.spec.containers
179 if containers:
180 # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now?
181 container = containers[0]
182 if ":" in container.image:
Matteo Scandolof5a65452018-08-16 18:08:03 -0700183 (name, tag) = container.image.rsplit(":", 1)
Scott Baker3fd18e52018-04-17 09:18:21 -0700184 else:
185 # Is assuming a default necessary?
186 name = container.image
187 tag = "master"
188
189 existing_images = Image.objects.filter(name=name, tag=tag, kind="container")
190 if not existing_images:
191 i = Image(name=name, tag=tag, kind="container", xos_managed=False)
192 i.save()
193 return i
194 else:
195 return existing_images[0]
196 else:
197 return None
Scott Baker82b2b082018-04-16 16:02:14 -0700198
Scott Baker987748d2018-08-09 16:14:11 -0700199 def send_notification(self, xos_pod, k8s_pod, status):
Scott Baker987748d2018-08-09 16:14:11 -0700200
201 event = {"status": status,
Scott Baker3c2b8202018-08-15 10:51:55 -0700202 "name": xos_pod.name,
203 "producer": "k8s-sync"}
Scott Baker987748d2018-08-09 16:14:11 -0700204
205 if xos_pod.id:
206 event["kubernetesserviceinstance_id"] = xos_pod.id
207
208 if k8s_pod:
209 event["labels"] = k8s_pod.metadata.labels
210
211 if k8s_pod.status.pod_ip:
212 event["netinterfaces"] = [{"name": "primary",
213 "addresses": [k8s_pod.status.pod_ip]}]
214
Zack Williams3dc97602018-09-13 22:33:26 -0700215 topic = "xos.kubernetes.pod-details"
216 key = xos_pod.name
217 value = json.dumps(event, default=lambda o: repr(o))
218
219 XOSKafkaProducer.produce(topic, key, value)
220
Scott Baker987748d2018-08-09 16:14:11 -0700221
Scott Baker82b2b082018-04-16 16:02:14 -0700222 def pull_records(self):
Scott Baker3fd18e52018-04-17 09:18:21 -0700223 # Read all pods from Kubernetes, store them in k8s_pods_by_name
224 k8s_pods_by_name = {}
225 ret = self.v1core.list_pod_for_all_namespaces(watch=False)
226 for item in ret.items:
227 k8s_pods_by_name[item.metadata.name] = item
Scott Baker82b2b082018-04-16 16:02:14 -0700228
Scott Baker3fd18e52018-04-17 09:18:21 -0700229 # Read all pods from XOS, store them in xos_pods_by_name
230 xos_pods_by_name = {}
231 existing_pods = KubernetesServiceInstance.objects.all()
232 for pod in existing_pods:
233 xos_pods_by_name[pod.name] = pod
234
235 kubernetes_services = KubernetesService.objects.all()
236 if len(kubernetes_services)==0:
237 raise Exception("There are no Kubernetes Services yet")
238 if len(kubernetes_services)>1:
239 # Simplifying assumption -- there is only one Kubernetes Service
240 raise Exception("There are too many Kubernetes Services")
241 kubernetes_service = kubernetes_services[0]
242
243 # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod.
244 for (k,pod) in k8s_pods_by_name.items():
Scott Bakere7943542018-05-15 10:00:05 -0700245 try:
246 if not k in xos_pods_by_name:
247 trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service)
248 if not trust_domain:
Scott Baker04dd3e62018-11-27 17:17:13 -0800249 # All kubernetes pods should belong to a namespace. If we can't find the namespace, then
250 # something is very wrong in K8s.
251 log.warning("Unable to determine trust_domain for pod %s. Ignoring." % k)
Scott Bakere7943542018-05-15 10:00:05 -0700252 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700253
Scott Bakere7943542018-05-15 10:00:05 -0700254 principal = self.get_principal_from_pod(pod, trust_domain)
255 slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal)
256 image = self.get_image_from_pod(pod)
Scott Baker3fd18e52018-04-17 09:18:21 -0700257
Scott Bakere7943542018-05-15 10:00:05 -0700258 if not slice:
Scott Baker04dd3e62018-11-27 17:17:13 -0800259 # We could get here if the pod doesn't have a controller, or if the controller is of a kind
260 # that we don't understand (such as the Etcd controller). If so, the pod is not something we
261 # are interested in.
262 log.debug("Unable to determine slice for pod %s. Ignoring." % k)
Scott Bakere7943542018-05-15 10:00:05 -0700263 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700264
Scott Bakere7943542018-05-15 10:00:05 -0700265 xos_pod = KubernetesServiceInstance(name=k,
266 pod_ip = pod.status.pod_ip,
267 owner = kubernetes_service,
268 slice = slice,
269 image = image,
270 backend_handle = self.obj_to_handle(pod),
Scott Baker987748d2018-08-09 16:14:11 -0700271 xos_managed = False,
272 need_event = True)
Scott Bakere7943542018-05-15 10:00:05 -0700273 xos_pod.save()
274 xos_pods_by_name[k] = xos_pod
275 log.info("Created XOS POD %s" % xos_pod.name)
Scott Baker3fd18e52018-04-17 09:18:21 -0700276
Scott Baker987748d2018-08-09 16:14:11 -0700277 xos_pod = xos_pods_by_name[k]
278
Scott Bakere7943542018-05-15 10:00:05 -0700279 # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP
280 # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case
281 # here.
Scott Bakere7943542018-05-15 10:00:05 -0700282 if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip):
283 xos_pod.pod_ip = pod.status.pod_ip
Scott Baker987748d2018-08-09 16:14:11 -0700284 xos_pod.need_event = True # Trigger a new kafka event
285 xos_pod.save(update_fields = ["pod_ip", "need_event"])
286 log.info("Updated XOS POD %s" % xos_pod.name)
287
288 # Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If
289 # so, then we'll try to send the event again later.
Zack Williams3dc97602018-09-13 22:33:26 -0700290 if (xos_pod.need_event):
Scott Baker987748d2018-08-09 16:14:11 -0700291 if xos_pod.last_event_sent == "created":
292 event_kind = "updated"
293 else:
294 event_kind = "created"
295
296 self.send_notification(xos_pod, pod, event_kind)
297
298 xos_pod.need_event = False
299 xos_pod.last_event_sent = event_kind
300 xos_pod.save(update_fields=["need_event", "last_event_sent"])
301
Scott Bakere7943542018-05-15 10:00:05 -0700302 except:
303 log.exception("Failed to process k8s pod", k=k, pod=pod)
Scott Bakerac43a742018-05-07 16:54:03 -0700304
Scott Baker3fd18e52018-04-17 09:18:21 -0700305 # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
306 for (k,xos_pod) in xos_pods_by_name.items():
Scott Baker987748d2018-08-09 16:14:11 -0700307 try:
308 if (not k in k8s_pods_by_name):
309 if (xos_pod.xos_managed):
310 # Should we do something so it gets re-created by the syncstep?
311 pass
312 else:
313 self.send_notification(xos_pod, None, "deleted")
314 xos_pod.delete()
315 log.info("Deleted XOS POD %s" % k)
316 except:
317 log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod)