blob: 672687f7453f9ff1919bbc42e02425c237e6993c [file] [log] [blame]
Scott Baker13e953c2018-05-17 09:19:15 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import os, sys
17import unittest
18from mock import patch, PropertyMock, ANY, MagicMock
19from unit_test_common import setup_sync_unit_test
20
21def fake_init_kubernetes_client(self):
22 self.v1core = MagicMock()
23 self.v1apps = MagicMock()
24 self.v1batch = MagicMock()
25
26class TestPullPods(unittest.TestCase):
27
28 def setUp(self):
29 self.unittest_setup = setup_sync_unit_test(os.path.abspath(os.path.dirname(os.path.realpath(__file__))),
30 globals(),
31 [("kubernetes-service", "kubernetes.proto")] )
32
33 sys.path.append(os.path.join(os.path.abspath(os.path.dirname(os.path.realpath(__file__))), "../pull_steps"))
34
35 from pull_pods import KubernetesServiceInstancePullStep
36 self.pull_step_class = KubernetesServiceInstancePullStep
37
38 self.service = KubernetesService()
39 self.trust_domain = TrustDomain(name="test-trust", owner=self.service)
40 self.principal = Principal(name="test-principal", trust_domain = self.trust_domain)
41 self.image = Image(name="test-image", tag="1.1", kind="container")
42
43 def tearDown(self):
44 sys.path = self.unittest_setup["sys_path_save"]
45
46 def test_read_obj_kind(self):
47 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
48 pull_step = self.pull_step_class()
49 pull_step.v1apps.read_namespaced_replica_set.return_value = ["my_replica_set"]
50 pull_step.v1apps.read_namespaced_stateful_set.return_value = ["my_stateful_set"]
51 pull_step.v1apps.read_namespaced_daemon_set.return_value = ["my_daemon_set"]
52 pull_step.v1apps.read_namespaced_deployment.return_value = ["my_deployment"]
53 pull_step.v1batch.read_namespaced_job.return_value = ["my_job"]
54
55 obj = pull_step.read_obj_kind("ReplicaSet", "foo", self.trust_domain)
56 self.assertEqual(obj, ["my_replica_set"])
57
58 obj = pull_step.read_obj_kind("StatefulSet", "foo", self.trust_domain)
59 self.assertEqual(obj, ["my_stateful_set"])
60
61 obj = pull_step.read_obj_kind("DaemonSet", "foo", self.trust_domain)
62 self.assertEqual(obj, ["my_daemon_set"])
63
64 obj = pull_step.read_obj_kind("Deployment", "foo", self.trust_domain)
65 self.assertEqual(obj, ["my_deployment"])
66
67 obj = pull_step.read_obj_kind("Job", "foo", self.trust_domain)
68 self.assertEqual(obj, ["my_job"])
69
70 def test_get_controller_from_obj(self):
71 """ Setup an owner_reference chain: leaf --> StatefulSet --> Deployment. Calling get_controller_from_obj()
72 on the leaf should return the deployment.
73 """
74 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
75 leaf_obj = MagicMock()
76 leaf_obj.metadata.owner_references= [MagicMock(controller=True, name="my_stateful_set", kind="StatefulSet")]
77
78 ss_obj = MagicMock()
79 ss_obj.metadata.owner_references= [MagicMock(controller=True, name="my_deployment", kind="Deployment")]
80
81 dep_obj = MagicMock()
82 dep_obj.metadata.owner_references = []
83
84 pull_step = self.pull_step_class()
85 pull_step.v1apps.read_namespaced_stateful_set.return_value = ss_obj
86 pull_step.v1apps.read_namespaced_deployment.return_value = dep_obj
87
88 controller = pull_step.get_controller_from_obj(leaf_obj, self.trust_domain)
89 self.assertEqual(controller, dep_obj)
90
91 def test_get_slice_from_pod_exists(self):
92 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client),\
93 patch.object(self.pull_step_class, "get_controller_from_obj") as get_controller_from_obj, \
94 patch.object(Slice.objects, "get_items") as slice_objects:
95 pull_step = self.pull_step_class()
96
97 myslice = Slice(name="myslice")
98
99 dep_obj = MagicMock()
100 dep_obj.metadata.name = myslice.name
101 get_controller_from_obj.return_value = dep_obj
102
103 slice_objects.return_value = [myslice]
104
105 pod = MagicMock()
106
107 slice = pull_step.get_slice_from_pod(pod, self.trust_domain, self.principal)
108 self.assertEqual(slice, myslice)
109
110 def test_get_slice_from_pod_noexist(self):
111 """ Call get_slice_from_pod() where not pre-existing slice is present. A new slice will be created, named
112 after the pod's controller.
113 """
114 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client),\
115 patch.object(self.pull_step_class, "get_controller_from_obj") as get_controller_from_obj, \
116 patch.object(Site.objects, "get_items") as site_objects:
117 pull_step = self.pull_step_class()
118
119 site_objects.return_value=[Site(name="mysite")]
120
121 dep_obj = MagicMock()
122 dep_obj.metadata.name = "my_other_slice"
123 get_controller_from_obj.return_value = dep_obj
124
125 pod = MagicMock()
126
127 slice = pull_step.get_slice_from_pod(pod, self.trust_domain, self.principal)
128 self.assertEqual(slice.name, "my_other_slice")
129 self.assertEqual(slice.trust_domain, self.trust_domain)
130 self.assertEqual(slice.principal, self.principal)
131 self.assertEqual(slice.xos_managed, False)
132
133 def test_get_trustdomain_from_pod_exists(self):
134 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
135 patch.object(TrustDomain.objects, "get_items") as trustdomain_objects:
136 pull_step = self.pull_step_class()
137
138 pod = MagicMock()
139 pod.metadata.namespace = self.trust_domain.name
140
141 trustdomain_objects.return_value = [self.trust_domain]
142
143 trustdomain = pull_step.get_trustdomain_from_pod(pod, owner_service=self.service)
144 self.assertEqual(trustdomain, self.trust_domain)
145
146 def test_get_trustdomain_from_pod_noexist(self):
147 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
148 pull_step = self.pull_step_class()
149
150 pod = MagicMock()
151 pod.metadata.namespace = "new-trust"
152
153 trustdomain = pull_step.get_trustdomain_from_pod(pod, owner_service=self.service)
154 self.assertEqual(trustdomain.name, "new-trust")
155 self.assertEqual(trustdomain.owner, self.service)
156
157 def test_get_principal_from_pod_exists(self):
158 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
159 patch.object(Principal.objects, "get_items") as principal_objects:
160 pull_step = self.pull_step_class()
161
162 pod = MagicMock()
163 pod.spec.service_account = self.principal.name
164
165 principal_objects.return_value = [self.principal]
166
167 principal = pull_step.get_principal_from_pod(pod, trust_domain=self.trust_domain)
168 self.assertEqual(principal, self.principal)
169
170 def test_get_principal_from_pod_noexist(self):
171 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
172 pull_step = self.pull_step_class()
173
174 pod = MagicMock()
175 pod.spec.service_account = "new-principal"
176
177 principal = pull_step.get_principal_from_pod(pod, trust_domain=self.trust_domain)
178 self.assertEqual(principal.name, "new-principal")
179 self.assertEqual(principal.trust_domain, self.trust_domain)
180
181 def test_get_image_from_pod_exists(self):
182 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
183 patch.object(Image.objects, "get_items") as image_objects:
184 pull_step = self.pull_step_class()
185
186 container = MagicMock()
187 container.image = "%s:%s" % (self.image.name, self.image.tag)
188
189 pod = MagicMock()
190 pod.spec.containers = [container]
191
192 image_objects.return_value = [self.image]
193
194 image = pull_step.get_image_from_pod(pod)
195 self.assertEqual(image, self.image)
196
197 def test_get_image_from_pod_noexist(self):
198 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
199 pull_step = self.pull_step_class()
200
201 container = MagicMock()
202 container.image = "new-image:2.3" \
203
204 pod = MagicMock()
205 pod.spec.containers = [container]
206
207 image = pull_step.get_image_from_pod(pod)
208 self.assertEqual(image.name, "new-image")
209 self.assertEqual(image.tag, "2.3")
210 self.assertEqual(image.kind, "container")
211
212 def make_pod(self, name, trust_domain, principal, image):
213 container = MagicMock()
214 container.image = "%s:%s" % (image.name, image.tag)
215
216 pod = MagicMock()
217 pod.metadata.name = name
218 pod.metadata.namespace = trust_domain.name
219 pod.spec.service_account = principal.name
220
221 return pod
222
223 def test_pull_records_new_pod(self):
224 """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
225 """
226 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
227 patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
228 patch.object(self.pull_step_class, "get_principal_from_pod") as get_principal, \
229 patch.object(self.pull_step_class, "get_slice_from_pod") as get_slice, \
230 patch.object(self.pull_step_class, "get_image_from_pod") as get_image, \
231 patch.object(KubernetesService.objects, "get_items") as service_objects, \
232 patch.object(KubernetesServiceInstance.objects, "get_items") as si_objects, \
233 patch.object(KubernetesServiceInstance, "save", autospec=True) as ksi_save:
234
235 service_objects.return_value = [self.service]
236
237 slice = Slice(name="myslice")
238
239 get_trustdomain.return_value = self.trust_domain
240 get_principal.return_value = self.principal
241 get_slice.return_value = slice
242 get_image.return_value = self.image
243
244 pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image)
245 pod.status.pod_ip = "1.2.3.4"
246
247 pull_step = self.pull_step_class()
248 pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod])
249
250 pull_step.pull_records()
251
252 self.assertEqual(ksi_save.call_count, 1)
253 saved_ksi = ksi_save.call_args[0][0]
254
255 self.assertEqual(saved_ksi.name, "my-pod")
256 self.assertEqual(saved_ksi.pod_ip, "1.2.3.4")
257 self.assertEqual(saved_ksi.owner, self.service)
258 self.assertEqual(saved_ksi.slice, slice)
259 self.assertEqual(saved_ksi.image, self.image)
260 self.assertEqual(saved_ksi.xos_managed, False)
261
262 def test_pull_records_missing_pod(self):
263 """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
264 """
265 with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
266 patch.object(KubernetesService.objects, "get_items") as service_objects, \
267 patch.object(KubernetesServiceInstance.objects, "get_items") as si_objects, \
268 patch.object(KubernetesServiceInstance, "delete", autospec=True) as ksi_delete:
269 service_objects.return_value = [self.service]
270
271 si = KubernetesServiceInstance(name="my-pod", owner=self.service, xos_managed=False)
272 si_objects.return_value = [si]
273
274 pull_step = self.pull_step_class()
275 pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[])
276
277 pull_step.pull_records()
278
279 self.assertEqual(ksi_delete.call_count, 1)
280 deleted_ksi = ksi_delete.call_args[0][0]
281
282if __name__ == '__main__':
283 unittest.main()