blob: 140ed94a89eac175ee4dcdd2d3663c52ffe62217 [file] [log] [blame]
Scott Baker82b2b082018-04-16 16:02:14 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16 pull_pods.py
17
18 Implements a syncstep to pull information about pods form Kubernetes.
19"""
20
Scott Baker987748d2018-08-09 16:14:11 -070021import json
22
Scott Bakera30fae72019-02-01 16:14:43 -080023from xossynchronizer.pull_steps.pullstep import PullStep
24from xossynchronizer.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
Scott Baker3fd18e52018-04-17 09:18:21 -070025 TrustDomain, Site, Image
Scott Baker82b2b082018-04-16 16:02:14 -070026
27from xosconfig import Config
28from multistructlog import create_logger
Zack Williams3dc97602018-09-13 22:33:26 -070029from xoskafka import XOSKafkaProducer
Scott Baker8b725f52019-06-11 16:14:39 -070030from helpers import debug_once
Scott Baker82b2b082018-04-16 16:02:14 -070031
Scott Baker82b2b082018-04-16 16:02:14 -070032log = create_logger(Config().get('logging'))
33
Zack Williams3dc97602018-09-13 22:33:26 -070034
Scott Baker82b2b082018-04-16 16:02:14 -070035class KubernetesServiceInstancePullStep(PullStep):
36 """
37 KubernetesServiceInstancePullStep
38
Scott Baker3fd18e52018-04-17 09:18:21 -070039 Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance
40 if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created
41 as necessary to fill the required dependencies of the KubernetesServiceInstance.
Scott Baker82b2b082018-04-16 16:02:14 -070042 """
43
Scott Bakerb186b812019-02-27 09:34:15 -080044 def __init__(self, *args, **kwargs):
45 super(KubernetesServiceInstancePullStep, self).__init__(*args, observed_model=KubernetesServiceInstance, **kwargs)
Scott Baker987748d2018-08-09 16:14:11 -070046
Scott Baker13e953c2018-05-17 09:19:15 -070047 self.init_kubernetes_client()
Scott Baker82b2b082018-04-16 16:02:14 -070048
Scott Baker13e953c2018-05-17 09:19:15 -070049 def init_kubernetes_client(self):
50 from kubernetes import client as kubernetes_client, config as kubernetes_config
Scott Baker82b2b082018-04-16 16:02:14 -070051 kubernetes_config.load_incluster_config()
Scott Baker3fd18e52018-04-17 09:18:21 -070052 self.v1core = kubernetes_client.CoreV1Api()
53 self.v1apps = kubernetes_client.AppsV1Api()
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070054 self.v1batch = kubernetes_client.BatchV1Api()
Scott Baker3fd18e52018-04-17 09:18:21 -070055
56 def obj_to_handle(self, obj):
57 """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
58 Kubernetes.
59 """
60 return obj.metadata.self_link
61
62 def read_obj_kind(self, kind, name, trust_domain):
63 """ Given an object kind and name, read it from Kubernetes """
64 if kind == "ReplicaSet":
65 resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name)
66 elif kind == "StatefulSet":
Scott Bakere7943542018-05-15 10:00:05 -070067 resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070068 elif kind == "DaemonSet":
69 resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name)
70 elif kind == "Deployment":
71 resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070072 elif kind == "Job":
73 resource = self.v1batch.read_namespaced_job(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070074 else:
75 resource = None
76 return resource
77
Scott Baker8b725f52019-06-11 16:14:39 -070078 def get_controller_from_obj(self, pod_name, obj, trust_domain, depth=0):
Scott Baker3fd18e52018-04-17 09:18:21 -070079 """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that
80 is marked as a controller, but does not have any owners.
81
82 This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over
83 the ReplicaSet and return the Deployment.
84 """
85
86 owner_references = obj.metadata.owner_references
87 if not owner_references:
88 if (depth==0):
89 # If depth is zero, then we're still looking at the object, not a controller.
90 return None
91 return obj
92
93 for owner_reference in owner_references:
94 if not getattr(owner_reference, "controller", False):
95 continue
96 owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070097 if not owner:
Scott Baker04dd3e62018-11-27 17:17:13 -080098 # Failed to fetch the owner, probably because the owner's kind is something we do not understand. An
99 # example is the etcd-cluser pod, which is owned by a deployment of kind "EtcdCluster".
Scott Baker8b725f52019-06-11 16:14:39 -0700100 debug_once("Pod %s: Failed to fetch owner" % pod_name, owner_reference=owner_reference)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -0700101 continue
Scott Baker8b725f52019-06-11 16:14:39 -0700102 controller = self.get_controller_from_obj(pod_name, owner, trust_domain, depth+1)
Scott Baker3fd18e52018-04-17 09:18:21 -0700103 if controller:
104 return controller
105
106 return None
107
Scott Baker8b725f52019-06-11 16:14:39 -0700108 def get_slice_from_pod(self, pod_name, pod, trust_domain, principal):
Scott Baker3fd18e52018-04-17 09:18:21 -0700109 """ Given a pod, determine which XOS Slice goes with it
110 If the Slice doesn't exist, create it.
111 """
Scott Baker8b725f52019-06-11 16:14:39 -0700112 controller = self.get_controller_from_obj(pod_name, pod, trust_domain)
Scott Baker3fd18e52018-04-17 09:18:21 -0700113 if not controller:
114 return None
115
116 slice_name = controller.metadata.name
Matteo Scandolof5a65452018-08-16 18:08:03 -0700117 if hasattr(controller.metadata, "labels") and controller.metadata.labels is not None:
Scott Baker3fd18e52018-04-17 09:18:21 -0700118 if "xos_slice_name" in controller.metadata.labels:
119 # Someone has labeled the controller with an xos slice name. Use it.
120 slice_name = controller.metadata.labels["xos_slice_name"]
121
122 existing_slices = Slice.objects.filter(name = slice_name)
123 if not existing_slices:
124 # TODO(smbaker): atomicity
125 s = Slice(name=slice_name, site = Site.objects.first(),
126 trust_domain=trust_domain,
127 principal=principal,
128 backend_handle=self.obj_to_handle(controller),
129 controller_kind=controller.kind,
130 xos_managed=False)
131 s.save()
132 return s
133 else:
134 return existing_slices[0]
135
136 def get_trustdomain_from_pod(self, pod, owner_service):
137 """ Given a pod, determine which XOS TrustDomain goes with it
138 If the TrustDomain doesn't exist, create it.
139 """
140 existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace)
141 if not existing_trustdomains:
142 k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace)
143
144 # TODO(smbaker): atomicity
145 t = TrustDomain(name = pod.metadata.namespace,
146 xos_managed=False,
147 owner=owner_service,
148 backend_handle = self.obj_to_handle(k8s_trust_domain))
149 t.save()
150 return t
151 else:
152 return existing_trustdomains[0]
153
154 def get_principal_from_pod(self, pod, trust_domain):
155 """ Given a pod, determine which XOS Principal goes with it
156 If the Principal doesn't exist, create it.
157 """
158 principal_name = getattr(pod.spec, "service_account", None)
159 if not principal_name:
160 return None
161 existing_principals = Principal.objects.filter(name = principal_name)
162 if not existing_principals:
163 k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name)
164
165 # TODO(smbaker): atomicity
166 p = Principal(name = principal_name,
167 trust_domain = trust_domain,
168 xos_managed = False,
169 backend_handle = self.obj_to_handle(k8s_service_account))
170 p.save()
171 return p
172 else:
173 return existing_principals[0]
174
175 def get_image_from_pod(self, pod):
176 """ Given a pod, determine which XOS Image goes with it
177 If the Image doesn't exist, create it.
178 """
179 containers = pod.spec.containers
180 if containers:
181 # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now?
182 container = containers[0]
183 if ":" in container.image:
Matteo Scandolof5a65452018-08-16 18:08:03 -0700184 (name, tag) = container.image.rsplit(":", 1)
Scott Baker3fd18e52018-04-17 09:18:21 -0700185 else:
186 # Is assuming a default necessary?
187 name = container.image
188 tag = "master"
189
Matteo Scandoloc72dce02019-02-22 10:23:28 -0800190 # FIXME image.name is unique, but tag may differ. Update validation in the Image model so that the combination of name and tag is unique
Scott Baker3fd18e52018-04-17 09:18:21 -0700191 existing_images = Image.objects.filter(name=name, tag=tag, kind="container")
192 if not existing_images:
193 i = Image(name=name, tag=tag, kind="container", xos_managed=False)
194 i.save()
195 return i
196 else:
197 return existing_images[0]
198 else:
199 return None
Scott Baker82b2b082018-04-16 16:02:14 -0700200
Scott Baker987748d2018-08-09 16:14:11 -0700201 def send_notification(self, xos_pod, k8s_pod, status):
Scott Baker987748d2018-08-09 16:14:11 -0700202
203 event = {"status": status,
Scott Baker3c2b8202018-08-15 10:51:55 -0700204 "name": xos_pod.name,
205 "producer": "k8s-sync"}
Scott Baker987748d2018-08-09 16:14:11 -0700206
207 if xos_pod.id:
208 event["kubernetesserviceinstance_id"] = xos_pod.id
209
210 if k8s_pod:
211 event["labels"] = k8s_pod.metadata.labels
212
213 if k8s_pod.status.pod_ip:
214 event["netinterfaces"] = [{"name": "primary",
215 "addresses": [k8s_pod.status.pod_ip]}]
216
Zack Williams3dc97602018-09-13 22:33:26 -0700217 topic = "xos.kubernetes.pod-details"
218 key = xos_pod.name
219 value = json.dumps(event, default=lambda o: repr(o))
220
221 XOSKafkaProducer.produce(topic, key, value)
222
Scott Baker987748d2018-08-09 16:14:11 -0700223
Scott Baker82b2b082018-04-16 16:02:14 -0700224 def pull_records(self):
Scott Baker3fd18e52018-04-17 09:18:21 -0700225 # Read all pods from Kubernetes, store them in k8s_pods_by_name
226 k8s_pods_by_name = {}
227 ret = self.v1core.list_pod_for_all_namespaces(watch=False)
228 for item in ret.items:
229 k8s_pods_by_name[item.metadata.name] = item
Scott Baker82b2b082018-04-16 16:02:14 -0700230
Scott Baker3fd18e52018-04-17 09:18:21 -0700231 # Read all pods from XOS, store them in xos_pods_by_name
232 xos_pods_by_name = {}
233 existing_pods = KubernetesServiceInstance.objects.all()
234 for pod in existing_pods:
235 xos_pods_by_name[pod.name] = pod
236
237 kubernetes_services = KubernetesService.objects.all()
238 if len(kubernetes_services)==0:
239 raise Exception("There are no Kubernetes Services yet")
240 if len(kubernetes_services)>1:
241 # Simplifying assumption -- there is only one Kubernetes Service
242 raise Exception("There are too many Kubernetes Services")
243 kubernetes_service = kubernetes_services[0]
244
245 # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod.
Scott Baker8b725f52019-06-11 16:14:39 -0700246 for (k, pod) in k8s_pods_by_name.items():
Scott Bakere7943542018-05-15 10:00:05 -0700247 try:
248 if not k in xos_pods_by_name:
249 trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service)
250 if not trust_domain:
Scott Baker04dd3e62018-11-27 17:17:13 -0800251 # All kubernetes pods should belong to a namespace. If we can't find the namespace, then
252 # something is very wrong in K8s.
253 log.warning("Unable to determine trust_domain for pod %s. Ignoring." % k)
Scott Bakere7943542018-05-15 10:00:05 -0700254 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700255
Scott Bakere7943542018-05-15 10:00:05 -0700256 principal = self.get_principal_from_pod(pod, trust_domain)
Scott Baker8b725f52019-06-11 16:14:39 -0700257 slice = self.get_slice_from_pod(k, pod, trust_domain=trust_domain, principal=principal)
Scott Bakere7943542018-05-15 10:00:05 -0700258 image = self.get_image_from_pod(pod)
Scott Baker3fd18e52018-04-17 09:18:21 -0700259
Scott Bakere7943542018-05-15 10:00:05 -0700260 if not slice:
Scott Baker04dd3e62018-11-27 17:17:13 -0800261 # We could get here if the pod doesn't have a controller, or if the controller is of a kind
262 # that we don't understand (such as the Etcd controller). If so, the pod is not something we
263 # are interested in.
Scott Baker8b725f52019-06-11 16:14:39 -0700264 debug_once("Pod %s: Unable to determine slice. Ignoring." % k)
Scott Bakere7943542018-05-15 10:00:05 -0700265 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700266
Scott Bakere7943542018-05-15 10:00:05 -0700267 xos_pod = KubernetesServiceInstance(name=k,
268 pod_ip = pod.status.pod_ip,
269 owner = kubernetes_service,
270 slice = slice,
271 image = image,
272 backend_handle = self.obj_to_handle(pod),
Scott Baker987748d2018-08-09 16:14:11 -0700273 xos_managed = False,
274 need_event = True)
Scott Bakere7943542018-05-15 10:00:05 -0700275 xos_pod.save()
276 xos_pods_by_name[k] = xos_pod
277 log.info("Created XOS POD %s" % xos_pod.name)
Scott Baker3fd18e52018-04-17 09:18:21 -0700278
Scott Baker987748d2018-08-09 16:14:11 -0700279 xos_pod = xos_pods_by_name[k]
280
Scott Bakere7943542018-05-15 10:00:05 -0700281 # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP
282 # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case
283 # here.
Scott Bakere7943542018-05-15 10:00:05 -0700284 if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip):
285 xos_pod.pod_ip = pod.status.pod_ip
Scott Baker987748d2018-08-09 16:14:11 -0700286 xos_pod.need_event = True # Trigger a new kafka event
287 xos_pod.save(update_fields = ["pod_ip", "need_event"])
288 log.info("Updated XOS POD %s" % xos_pod.name)
289
290 # Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If
291 # so, then we'll try to send the event again later.
Zack Williams3dc97602018-09-13 22:33:26 -0700292 if (xos_pod.need_event):
Scott Baker987748d2018-08-09 16:14:11 -0700293 if xos_pod.last_event_sent == "created":
294 event_kind = "updated"
295 else:
296 event_kind = "created"
297
298 self.send_notification(xos_pod, pod, event_kind)
299
300 xos_pod.need_event = False
301 xos_pod.last_event_sent = event_kind
302 xos_pod.save(update_fields=["need_event", "last_event_sent"])
303
Scott Bakere7943542018-05-15 10:00:05 -0700304 except:
305 log.exception("Failed to process k8s pod", k=k, pod=pod)
Scott Bakerac43a742018-05-07 16:54:03 -0700306
Scott Baker3fd18e52018-04-17 09:18:21 -0700307 # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
308 for (k,xos_pod) in xos_pods_by_name.items():
Scott Baker987748d2018-08-09 16:14:11 -0700309 try:
310 if (not k in k8s_pods_by_name):
311 if (xos_pod.xos_managed):
312 # Should we do something so it gets re-created by the syncstep?
313 pass
314 else:
315 self.send_notification(xos_pod, None, "deleted")
316 xos_pod.delete()
317 log.info("Deleted XOS POD %s" % k)
318 except:
319 log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod)