blob: b1cc71d0f52397fc34c65d0063dd7a98b1d49096 [file] [log] [blame]
Scott Baker82b2b082018-04-16 16:02:14 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16 pull_pods.py
17
18 Implements a syncstep to pull information about pods form Kubernetes.
19"""
20
21from synchronizers.new_base.pullstep import PullStep
Scott Baker3fd18e52018-04-17 09:18:21 -070022from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
23 TrustDomain, Site, Image
Scott Baker82b2b082018-04-16 16:02:14 -070024
25from xosconfig import Config
26from multistructlog import create_logger
27
28from kubernetes import client as kubernetes_client, config as kubernetes_config
29
30log = create_logger(Config().get('logging'))
31
32class KubernetesServiceInstancePullStep(PullStep):
33 """
34 KubernetesServiceInstancePullStep
35
Scott Baker3fd18e52018-04-17 09:18:21 -070036 Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance
37 if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created
38 as necessary to fill the required dependencies of the KubernetesServiceInstance.
Scott Baker82b2b082018-04-16 16:02:14 -070039 """
40
41 def __init__(self):
42 super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
43
44 kubernetes_config.load_incluster_config()
Scott Baker3fd18e52018-04-17 09:18:21 -070045 self.v1core = kubernetes_client.CoreV1Api()
46 self.v1apps = kubernetes_client.AppsV1Api()
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070047 self.v1batch = kubernetes_client.BatchV1Api()
Scott Baker3fd18e52018-04-17 09:18:21 -070048
49 def obj_to_handle(self, obj):
50 """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
51 Kubernetes.
52 """
53 return obj.metadata.self_link
54
55 def read_obj_kind(self, kind, name, trust_domain):
56 """ Given an object kind and name, read it from Kubernetes """
57 if kind == "ReplicaSet":
58 resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name)
59 elif kind == "StatefulSet":
Scott Bakere7943542018-05-15 10:00:05 -070060 resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070061 elif kind == "DaemonSet":
62 resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name)
63 elif kind == "Deployment":
64 resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070065 elif kind == "Job":
66 resource = self.v1batch.read_namespaced_job(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070067 else:
68 resource = None
69 return resource
70
71 def get_controller_from_obj(self, obj, trust_domain, depth=0):
72 """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that
73 is marked as a controller, but does not have any owners.
74
75 This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over
76 the ReplicaSet and return the Deployment.
77 """
78
79 owner_references = obj.metadata.owner_references
80 if not owner_references:
81 if (depth==0):
82 # If depth is zero, then we're still looking at the object, not a controller.
83 return None
84 return obj
85
86 for owner_reference in owner_references:
87 if not getattr(owner_reference, "controller", False):
88 continue
89 owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070090 if not owner:
91 log.warning("failed to fetch owner", owner_reference=owner_reference)
92 continue
Scott Baker3fd18e52018-04-17 09:18:21 -070093 controller = self.get_controller_from_obj(owner, trust_domain, depth+1)
94 if controller:
95 return controller
96
97 return None
98
99 def get_slice_from_pod(self, pod, trust_domain, principal):
100 """ Given a pod, determine which XOS Slice goes with it
101 If the Slice doesn't exist, create it.
102 """
103 controller = self.get_controller_from_obj(pod, trust_domain)
104 if not controller:
105 return None
106
107 slice_name = controller.metadata.name
108 if hasattr(controller.metadata, "labels"):
109 if "xos_slice_name" in controller.metadata.labels:
110 # Someone has labeled the controller with an xos slice name. Use it.
111 slice_name = controller.metadata.labels["xos_slice_name"]
112
113 existing_slices = Slice.objects.filter(name = slice_name)
114 if not existing_slices:
115 # TODO(smbaker): atomicity
116 s = Slice(name=slice_name, site = Site.objects.first(),
117 trust_domain=trust_domain,
118 principal=principal,
119 backend_handle=self.obj_to_handle(controller),
120 controller_kind=controller.kind,
121 xos_managed=False)
122 s.save()
123 return s
124 else:
125 return existing_slices[0]
126
127 def get_trustdomain_from_pod(self, pod, owner_service):
128 """ Given a pod, determine which XOS TrustDomain goes with it
129 If the TrustDomain doesn't exist, create it.
130 """
131 existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace)
132 if not existing_trustdomains:
133 k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace)
134
135 # TODO(smbaker): atomicity
136 t = TrustDomain(name = pod.metadata.namespace,
137 xos_managed=False,
138 owner=owner_service,
139 backend_handle = self.obj_to_handle(k8s_trust_domain))
140 t.save()
141 return t
142 else:
143 return existing_trustdomains[0]
144
145 def get_principal_from_pod(self, pod, trust_domain):
146 """ Given a pod, determine which XOS Principal goes with it
147 If the Principal doesn't exist, create it.
148 """
149 principal_name = getattr(pod.spec, "service_account", None)
150 if not principal_name:
151 return None
152 existing_principals = Principal.objects.filter(name = principal_name)
153 if not existing_principals:
154 k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name)
155
156 # TODO(smbaker): atomicity
157 p = Principal(name = principal_name,
158 trust_domain = trust_domain,
159 xos_managed = False,
160 backend_handle = self.obj_to_handle(k8s_service_account))
161 p.save()
162 return p
163 else:
164 return existing_principals[0]
165
166 def get_image_from_pod(self, pod):
167 """ Given a pod, determine which XOS Image goes with it
168 If the Image doesn't exist, create it.
169 """
170 containers = pod.spec.containers
171 if containers:
172 # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now?
173 container = containers[0]
174 if ":" in container.image:
175 (name, tag) = container.image.split(":")
176 else:
177 # Is assuming a default necessary?
178 name = container.image
179 tag = "master"
180
181 existing_images = Image.objects.filter(name=name, tag=tag, kind="container")
182 if not existing_images:
183 i = Image(name=name, tag=tag, kind="container", xos_managed=False)
184 i.save()
185 return i
186 else:
187 return existing_images[0]
188 else:
189 return None
Scott Baker82b2b082018-04-16 16:02:14 -0700190
191 def pull_records(self):
Scott Baker3fd18e52018-04-17 09:18:21 -0700192 # Read all pods from Kubernetes, store them in k8s_pods_by_name
193 k8s_pods_by_name = {}
194 ret = self.v1core.list_pod_for_all_namespaces(watch=False)
195 for item in ret.items:
196 k8s_pods_by_name[item.metadata.name] = item
Scott Baker82b2b082018-04-16 16:02:14 -0700197
Scott Baker3fd18e52018-04-17 09:18:21 -0700198 # Read all pods from XOS, store them in xos_pods_by_name
199 xos_pods_by_name = {}
200 existing_pods = KubernetesServiceInstance.objects.all()
201 for pod in existing_pods:
202 xos_pods_by_name[pod.name] = pod
203
204 kubernetes_services = KubernetesService.objects.all()
205 if len(kubernetes_services)==0:
206 raise Exception("There are no Kubernetes Services yet")
207 if len(kubernetes_services)>1:
208 # Simplifying assumption -- there is only one Kubernetes Service
209 raise Exception("There are too many Kubernetes Services")
210 kubernetes_service = kubernetes_services[0]
211
212 # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod.
213 for (k,pod) in k8s_pods_by_name.items():
Scott Bakere7943542018-05-15 10:00:05 -0700214 try:
215 if not k in xos_pods_by_name:
216 trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service)
217 if not trust_domain:
218 log.warning("Unable to determine trust_domain for %s" % k)
219 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700220
Scott Bakere7943542018-05-15 10:00:05 -0700221 principal = self.get_principal_from_pod(pod, trust_domain)
222 slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal)
223 image = self.get_image_from_pod(pod)
Scott Baker3fd18e52018-04-17 09:18:21 -0700224
Scott Bakere7943542018-05-15 10:00:05 -0700225 if not slice:
226 log.warning("Unable to determine slice for %s" % k)
227 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700228
Scott Bakere7943542018-05-15 10:00:05 -0700229 xos_pod = KubernetesServiceInstance(name=k,
230 pod_ip = pod.status.pod_ip,
231 owner = kubernetes_service,
232 slice = slice,
233 image = image,
234 backend_handle = self.obj_to_handle(pod),
235 xos_managed = False)
236 xos_pod.save()
237 xos_pods_by_name[k] = xos_pod
238 log.info("Created XOS POD %s" % xos_pod.name)
Scott Baker3fd18e52018-04-17 09:18:21 -0700239
Scott Bakere7943542018-05-15 10:00:05 -0700240 # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP
241 # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case
242 # here.
243 xos_pod = xos_pods_by_name[k]
244 if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip):
245 xos_pod.pod_ip = pod.status.pod_ip
246 xos_pod.save(update_fields = ["pod_ip"])
247 except:
248 log.exception("Failed to process k8s pod", k=k, pod=pod)
Scott Bakerac43a742018-05-07 16:54:03 -0700249
Scott Baker3fd18e52018-04-17 09:18:21 -0700250 # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
251 for (k,xos_pod) in xos_pods_by_name.items():
252 if (not k in k8s_pods_by_name):
253 if (xos_pod.xos_managed):
254 # Should we do something so it gets re-created by the syncstep?
255 pass
256 else:
257 xos_pod.delete()
258 log.info("Deleted XOS POD %s" % k)