blob: 3cbbbce60857ab0cd75818017d1f563215d95170 [file] [log] [blame]
Scott Baker82b2b082018-04-16 16:02:14 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16 pull_pods.py
17
18 Implements a syncstep to pull information about pods form Kubernetes.
19"""
20
21from synchronizers.new_base.pullstep import PullStep
Scott Baker3fd18e52018-04-17 09:18:21 -070022from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
23 TrustDomain, Site, Image
Scott Baker82b2b082018-04-16 16:02:14 -070024
25from xosconfig import Config
26from multistructlog import create_logger
27
Scott Baker82b2b082018-04-16 16:02:14 -070028log = create_logger(Config().get('logging'))
29
30class KubernetesServiceInstancePullStep(PullStep):
31 """
32 KubernetesServiceInstancePullStep
33
Scott Baker3fd18e52018-04-17 09:18:21 -070034 Pull pod-related information from Kubernetes. Each pod we find is used to create a KubernetesServiceInstance
35 if one does not already exist. Additional support objects (Slices, TrustDomains, Principals) may be created
36 as necessary to fill the required dependencies of the KubernetesServiceInstance.
Scott Baker82b2b082018-04-16 16:02:14 -070037 """
38
39 def __init__(self):
40 super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
Scott Baker13e953c2018-05-17 09:19:15 -070041 self.init_kubernetes_client()
Scott Baker82b2b082018-04-16 16:02:14 -070042
Scott Baker13e953c2018-05-17 09:19:15 -070043 def init_kubernetes_client(self):
44 from kubernetes import client as kubernetes_client, config as kubernetes_config
Scott Baker82b2b082018-04-16 16:02:14 -070045 kubernetes_config.load_incluster_config()
Scott Baker3fd18e52018-04-17 09:18:21 -070046 self.v1core = kubernetes_client.CoreV1Api()
47 self.v1apps = kubernetes_client.AppsV1Api()
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070048 self.v1batch = kubernetes_client.BatchV1Api()
Scott Baker3fd18e52018-04-17 09:18:21 -070049
50 def obj_to_handle(self, obj):
51 """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
52 Kubernetes.
53 """
54 return obj.metadata.self_link
55
56 def read_obj_kind(self, kind, name, trust_domain):
57 """ Given an object kind and name, read it from Kubernetes """
58 if kind == "ReplicaSet":
59 resource = self.v1apps.read_namespaced_replica_set(name, trust_domain.name)
60 elif kind == "StatefulSet":
Scott Bakere7943542018-05-15 10:00:05 -070061 resource = self.v1apps.read_namespaced_stateful_set(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070062 elif kind == "DaemonSet":
63 resource = self.v1apps.read_namespaced_daemon_set(name, trust_domain.name)
64 elif kind == "Deployment":
65 resource = self.v1apps.read_namespaced_deployment(name, trust_domain.name)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070066 elif kind == "Job":
67 resource = self.v1batch.read_namespaced_job(name, trust_domain.name)
Scott Baker3fd18e52018-04-17 09:18:21 -070068 else:
69 resource = None
70 return resource
71
72 def get_controller_from_obj(self, obj, trust_domain, depth=0):
73 """ Given an object, Search for its controller. Strategy is to walk backward until we find some object that
74 is marked as a controller, but does not have any owners.
75
76 This seems adequate to cover the case where ReplicaSet is owned by a Deployment, and we want to skup over
77 the ReplicaSet and return the Deployment.
78 """
79
80 owner_references = obj.metadata.owner_references
81 if not owner_references:
82 if (depth==0):
83 # If depth is zero, then we're still looking at the object, not a controller.
84 return None
85 return obj
86
87 for owner_reference in owner_references:
88 if not getattr(owner_reference, "controller", False):
89 continue
90 owner = self.read_obj_kind(owner_reference.kind, owner_reference.name, trust_domain)
Scott Bakerd4c5bbe2018-05-10 11:52:36 -070091 if not owner:
92 log.warning("failed to fetch owner", owner_reference=owner_reference)
93 continue
Scott Baker3fd18e52018-04-17 09:18:21 -070094 controller = self.get_controller_from_obj(owner, trust_domain, depth+1)
95 if controller:
96 return controller
97
98 return None
99
100 def get_slice_from_pod(self, pod, trust_domain, principal):
101 """ Given a pod, determine which XOS Slice goes with it
102 If the Slice doesn't exist, create it.
103 """
104 controller = self.get_controller_from_obj(pod, trust_domain)
105 if not controller:
106 return None
107
108 slice_name = controller.metadata.name
109 if hasattr(controller.metadata, "labels"):
110 if "xos_slice_name" in controller.metadata.labels:
111 # Someone has labeled the controller with an xos slice name. Use it.
112 slice_name = controller.metadata.labels["xos_slice_name"]
113
114 existing_slices = Slice.objects.filter(name = slice_name)
115 if not existing_slices:
116 # TODO(smbaker): atomicity
117 s = Slice(name=slice_name, site = Site.objects.first(),
118 trust_domain=trust_domain,
119 principal=principal,
120 backend_handle=self.obj_to_handle(controller),
121 controller_kind=controller.kind,
122 xos_managed=False)
123 s.save()
124 return s
125 else:
126 return existing_slices[0]
127
128 def get_trustdomain_from_pod(self, pod, owner_service):
129 """ Given a pod, determine which XOS TrustDomain goes with it
130 If the TrustDomain doesn't exist, create it.
131 """
132 existing_trustdomains = TrustDomain.objects.filter(name = pod.metadata.namespace)
133 if not existing_trustdomains:
134 k8s_trust_domain = self.v1core.read_namespace(pod.metadata.namespace)
135
136 # TODO(smbaker): atomicity
137 t = TrustDomain(name = pod.metadata.namespace,
138 xos_managed=False,
139 owner=owner_service,
140 backend_handle = self.obj_to_handle(k8s_trust_domain))
141 t.save()
142 return t
143 else:
144 return existing_trustdomains[0]
145
146 def get_principal_from_pod(self, pod, trust_domain):
147 """ Given a pod, determine which XOS Principal goes with it
148 If the Principal doesn't exist, create it.
149 """
150 principal_name = getattr(pod.spec, "service_account", None)
151 if not principal_name:
152 return None
153 existing_principals = Principal.objects.filter(name = principal_name)
154 if not existing_principals:
155 k8s_service_account = self.v1core.read_namespaced_service_account(principal_name, trust_domain.name)
156
157 # TODO(smbaker): atomicity
158 p = Principal(name = principal_name,
159 trust_domain = trust_domain,
160 xos_managed = False,
161 backend_handle = self.obj_to_handle(k8s_service_account))
162 p.save()
163 return p
164 else:
165 return existing_principals[0]
166
167 def get_image_from_pod(self, pod):
168 """ Given a pod, determine which XOS Image goes with it
169 If the Image doesn't exist, create it.
170 """
171 containers = pod.spec.containers
172 if containers:
173 # TODO(smbaker): Assumes all containers in a pod use the same image. Valid assumption for now?
174 container = containers[0]
175 if ":" in container.image:
176 (name, tag) = container.image.split(":")
177 else:
178 # Is assuming a default necessary?
179 name = container.image
180 tag = "master"
181
182 existing_images = Image.objects.filter(name=name, tag=tag, kind="container")
183 if not existing_images:
184 i = Image(name=name, tag=tag, kind="container", xos_managed=False)
185 i.save()
186 return i
187 else:
188 return existing_images[0]
189 else:
190 return None
Scott Baker82b2b082018-04-16 16:02:14 -0700191
192 def pull_records(self):
Scott Baker3fd18e52018-04-17 09:18:21 -0700193 # Read all pods from Kubernetes, store them in k8s_pods_by_name
194 k8s_pods_by_name = {}
195 ret = self.v1core.list_pod_for_all_namespaces(watch=False)
196 for item in ret.items:
197 k8s_pods_by_name[item.metadata.name] = item
Scott Baker82b2b082018-04-16 16:02:14 -0700198
Scott Baker3fd18e52018-04-17 09:18:21 -0700199 # Read all pods from XOS, store them in xos_pods_by_name
200 xos_pods_by_name = {}
201 existing_pods = KubernetesServiceInstance.objects.all()
202 for pod in existing_pods:
203 xos_pods_by_name[pod.name] = pod
204
205 kubernetes_services = KubernetesService.objects.all()
206 if len(kubernetes_services)==0:
207 raise Exception("There are no Kubernetes Services yet")
208 if len(kubernetes_services)>1:
209 # Simplifying assumption -- there is only one Kubernetes Service
210 raise Exception("There are too many Kubernetes Services")
211 kubernetes_service = kubernetes_services[0]
212
213 # For each k8s pod, see if there is an xos pod. If there is not, then create the xos pod.
214 for (k,pod) in k8s_pods_by_name.items():
Scott Bakere7943542018-05-15 10:00:05 -0700215 try:
216 if not k in xos_pods_by_name:
217 trust_domain = self.get_trustdomain_from_pod(pod, owner_service=kubernetes_service)
218 if not trust_domain:
219 log.warning("Unable to determine trust_domain for %s" % k)
220 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700221
Scott Bakere7943542018-05-15 10:00:05 -0700222 principal = self.get_principal_from_pod(pod, trust_domain)
223 slice = self.get_slice_from_pod(pod, trust_domain=trust_domain, principal=principal)
224 image = self.get_image_from_pod(pod)
Scott Baker3fd18e52018-04-17 09:18:21 -0700225
Scott Bakere7943542018-05-15 10:00:05 -0700226 if not slice:
227 log.warning("Unable to determine slice for %s" % k)
228 continue
Scott Baker3fd18e52018-04-17 09:18:21 -0700229
Scott Bakere7943542018-05-15 10:00:05 -0700230 xos_pod = KubernetesServiceInstance(name=k,
231 pod_ip = pod.status.pod_ip,
232 owner = kubernetes_service,
233 slice = slice,
234 image = image,
235 backend_handle = self.obj_to_handle(pod),
236 xos_managed = False)
237 xos_pod.save()
238 xos_pods_by_name[k] = xos_pod
239 log.info("Created XOS POD %s" % xos_pod.name)
Scott Baker3fd18e52018-04-17 09:18:21 -0700240
Scott Bakere7943542018-05-15 10:00:05 -0700241 # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP
242 # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case
243 # here.
244 xos_pod = xos_pods_by_name[k]
245 if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip):
246 xos_pod.pod_ip = pod.status.pod_ip
247 xos_pod.save(update_fields = ["pod_ip"])
248 except:
249 log.exception("Failed to process k8s pod", k=k, pod=pod)
Scott Bakerac43a742018-05-07 16:54:03 -0700250
Scott Baker3fd18e52018-04-17 09:18:21 -0700251 # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
252 for (k,xos_pod) in xos_pods_by_name.items():
253 if (not k in k8s_pods_by_name):
254 if (xos_pod.xos_managed):
255 # Should we do something so it gets re-created by the syncstep?
256 pass
257 else:
258 xos_pod.delete()
259 log.info("Deleted XOS POD %s" % k)