| |
| # Copyright 2017-present Open Networking Foundation |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import json |
| import os, sys |
| import unittest |
| from mock import patch, PropertyMock, ANY, MagicMock |
| from unit_test_common import setup_sync_unit_test |
| |
| def fake_init_kubernetes_client(self): |
| self.v1core = MagicMock() |
| self.v1apps = MagicMock() |
| self.v1batch = MagicMock() |
| |
| class TestPullPods(unittest.TestCase): |
| |
| def setUp(self): |
| self.unittest_setup = setup_sync_unit_test(os.path.abspath(os.path.dirname(os.path.realpath(__file__))), |
| globals(), |
| [("kubernetes-service", "kubernetes.proto")] ) |
| self.mockxoskafka = MagicMock() |
| |
| modules = { |
| 'xoskafka': self.mockxoskafka, |
| 'xoskafka.XOSKafkaProducer': self.mockxoskafka.XOSKafkaProducer, |
| } |
| |
| self.module_patcher = patch.dict('sys.modules', modules) |
| self.module_patcher.start() |
| |
| sys.path.append(os.path.join(os.path.abspath(os.path.dirname(os.path.realpath(__file__))), "../pull_steps")) |
| |
| from pull_pods import KubernetesServiceInstancePullStep |
| self.pull_step_class = KubernetesServiceInstancePullStep |
| |
| self.service = KubernetesService() |
| self.trust_domain = TrustDomain(name="test-trust", owner=self.service) |
| self.principal = Principal(name="test-principal", trust_domain = self.trust_domain) |
| self.image = Image(name="test-image", tag="1.1", kind="container") |
| |
| def tearDown(self): |
| sys.path = self.unittest_setup["sys_path_save"] |
| self.module_patcher.stop() |
| |
| def test_read_obj_kind(self): |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client): |
| pull_step = self.pull_step_class() |
| pull_step.v1apps.read_namespaced_replica_set.return_value = ["my_replica_set"] |
| pull_step.v1apps.read_namespaced_stateful_set.return_value = ["my_stateful_set"] |
| pull_step.v1apps.read_namespaced_daemon_set.return_value = ["my_daemon_set"] |
| pull_step.v1apps.read_namespaced_deployment.return_value = ["my_deployment"] |
| pull_step.v1batch.read_namespaced_job.return_value = ["my_job"] |
| |
| obj = pull_step.read_obj_kind("ReplicaSet", "foo", self.trust_domain) |
| self.assertEqual(obj, ["my_replica_set"]) |
| |
| obj = pull_step.read_obj_kind("StatefulSet", "foo", self.trust_domain) |
| self.assertEqual(obj, ["my_stateful_set"]) |
| |
| obj = pull_step.read_obj_kind("DaemonSet", "foo", self.trust_domain) |
| self.assertEqual(obj, ["my_daemon_set"]) |
| |
| obj = pull_step.read_obj_kind("Deployment", "foo", self.trust_domain) |
| self.assertEqual(obj, ["my_deployment"]) |
| |
| obj = pull_step.read_obj_kind("Job", "foo", self.trust_domain) |
| self.assertEqual(obj, ["my_job"]) |
| |
| def test_get_controller_from_obj(self): |
| """ Setup an owner_reference chain: leaf --> StatefulSet --> Deployment. Calling get_controller_from_obj() |
| on the leaf should return the deployment. |
| """ |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client): |
| leaf_obj = MagicMock() |
| leaf_obj.metadata.owner_references= [MagicMock(controller=True, name="my_stateful_set", kind="StatefulSet")] |
| |
| ss_obj = MagicMock() |
| ss_obj.metadata.owner_references= [MagicMock(controller=True, name="my_deployment", kind="Deployment")] |
| |
| dep_obj = MagicMock() |
| dep_obj.metadata.owner_references = [] |
| |
| pull_step = self.pull_step_class() |
| pull_step.v1apps.read_namespaced_stateful_set.return_value = ss_obj |
| pull_step.v1apps.read_namespaced_deployment.return_value = dep_obj |
| |
| controller = pull_step.get_controller_from_obj(leaf_obj, self.trust_domain) |
| self.assertEqual(controller, dep_obj) |
| |
| def test_get_slice_from_pod_exists(self): |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client),\ |
| patch.object(self.pull_step_class, "get_controller_from_obj") as get_controller_from_obj, \ |
| patch.object(Slice.objects, "get_items") as slice_objects: |
| pull_step = self.pull_step_class() |
| |
| myslice = Slice(name="myslice") |
| |
| dep_obj = MagicMock() |
| dep_obj.metadata.name = myslice.name |
| get_controller_from_obj.return_value = dep_obj |
| |
| slice_objects.return_value = [myslice] |
| |
| pod = MagicMock() |
| |
| slice = pull_step.get_slice_from_pod(pod, self.trust_domain, self.principal) |
| self.assertEqual(slice, myslice) |
| |
| def test_get_slice_from_pod_noexist(self): |
| """ Call get_slice_from_pod() where not pre-existing slice is present. A new slice will be created, named |
| after the pod's controller. |
| """ |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client),\ |
| patch.object(self.pull_step_class, "get_controller_from_obj") as get_controller_from_obj, \ |
| patch.object(Site.objects, "get_items") as site_objects: |
| pull_step = self.pull_step_class() |
| |
| site_objects.return_value=[Site(name="mysite")] |
| |
| dep_obj = MagicMock() |
| dep_obj.metadata.name = "my_other_slice" |
| get_controller_from_obj.return_value = dep_obj |
| |
| pod = MagicMock() |
| |
| slice = pull_step.get_slice_from_pod(pod, self.trust_domain, self.principal) |
| self.assertEqual(slice.name, "my_other_slice") |
| self.assertEqual(slice.trust_domain, self.trust_domain) |
| self.assertEqual(slice.principal, self.principal) |
| self.assertEqual(slice.xos_managed, False) |
| |
| def test_get_trustdomain_from_pod_exists(self): |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \ |
| patch.object(TrustDomain.objects, "get_items") as trustdomain_objects: |
| pull_step = self.pull_step_class() |
| |
| pod = MagicMock() |
| pod.metadata.namespace = self.trust_domain.name |
| |
| trustdomain_objects.return_value = [self.trust_domain] |
| |
| trustdomain = pull_step.get_trustdomain_from_pod(pod, owner_service=self.service) |
| self.assertEqual(trustdomain, self.trust_domain) |
| |
| def test_get_trustdomain_from_pod_noexist(self): |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client): |
| pull_step = self.pull_step_class() |
| |
| pod = MagicMock() |
| pod.metadata.namespace = "new-trust" |
| |
| trustdomain = pull_step.get_trustdomain_from_pod(pod, owner_service=self.service) |
| self.assertEqual(trustdomain.name, "new-trust") |
| self.assertEqual(trustdomain.owner, self.service) |
| |
| def test_get_principal_from_pod_exists(self): |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \ |
| patch.object(Principal.objects, "get_items") as principal_objects: |
| pull_step = self.pull_step_class() |
| |
| pod = MagicMock() |
| pod.spec.service_account = self.principal.name |
| |
| principal_objects.return_value = [self.principal] |
| |
| principal = pull_step.get_principal_from_pod(pod, trust_domain=self.trust_domain) |
| self.assertEqual(principal, self.principal) |
| |
| def test_get_principal_from_pod_noexist(self): |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client): |
| pull_step = self.pull_step_class() |
| |
| pod = MagicMock() |
| pod.spec.service_account = "new-principal" |
| |
| principal = pull_step.get_principal_from_pod(pod, trust_domain=self.trust_domain) |
| self.assertEqual(principal.name, "new-principal") |
| self.assertEqual(principal.trust_domain, self.trust_domain) |
| |
| def test_get_image_from_pod_exists(self): |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \ |
| patch.object(Image.objects, "get_items") as image_objects: |
| pull_step = self.pull_step_class() |
| |
| container = MagicMock() |
| container.image = "%s:%s" % (self.image.name, self.image.tag) |
| |
| pod = MagicMock() |
| pod.spec.containers = [container] |
| |
| image_objects.return_value = [self.image] |
| |
| image = pull_step.get_image_from_pod(pod) |
| self.assertEqual(image, self.image) |
| |
| def test_get_image_from_pod_noexist(self): |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client): |
| pull_step = self.pull_step_class() |
| |
| container = MagicMock() |
| container.image = "new-image:2.3" \ |
| |
| pod = MagicMock() |
| pod.spec.containers = [container] |
| |
| image = pull_step.get_image_from_pod(pod) |
| self.assertEqual(image.name, "new-image") |
| self.assertEqual(image.tag, "2.3") |
| self.assertEqual(image.kind, "container") |
| |
| def make_pod(self, name, trust_domain, principal, image): |
| container = MagicMock() |
| container.image = "%s:%s" % (image.name, image.tag) |
| |
| pod = MagicMock() |
| pod.metadata.name = name |
| pod.metadata.namespace = trust_domain.name |
| pod.spec.service_account = principal.name |
| |
| return pod |
| |
| def test_pull_records_new_pod(self): |
| """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created |
| """ |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \ |
| patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \ |
| patch.object(self.pull_step_class, "get_principal_from_pod") as get_principal, \ |
| patch.object(self.pull_step_class, "get_slice_from_pod") as get_slice, \ |
| patch.object(self.pull_step_class, "get_image_from_pod") as get_image, \ |
| patch.object(KubernetesService.objects, "get_items") as service_objects, \ |
| patch.object(KubernetesServiceInstance.objects, "get_items") as si_objects, \ |
| patch.object(KubernetesServiceInstance, "save", autospec=True) as ksi_save: |
| |
| service_objects.return_value = [self.service] |
| |
| slice = Slice(name="myslice") |
| |
| get_trustdomain.return_value = self.trust_domain |
| get_principal.return_value = self.principal |
| get_slice.return_value = slice |
| get_image.return_value = self.image |
| |
| pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image) |
| pod.status.pod_ip = "1.2.3.4" |
| |
| pull_step = self.pull_step_class() |
| pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod]) |
| |
| pull_step.pull_records() |
| |
| self.assertEqual(ksi_save.call_count, 2) |
| saved_ksi = ksi_save.call_args[0][0] |
| |
| self.assertEqual(saved_ksi.name, "my-pod") |
| self.assertEqual(saved_ksi.pod_ip, "1.2.3.4") |
| self.assertEqual(saved_ksi.owner, self.service) |
| self.assertEqual(saved_ksi.slice, slice) |
| self.assertEqual(saved_ksi.image, self.image) |
| self.assertEqual(saved_ksi.xos_managed, False) |
| |
| def test_pull_records_missing_pod(self): |
| """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created |
| """ |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \ |
| patch.object(KubernetesService.objects, "get_items") as service_objects, \ |
| patch.object(KubernetesServiceInstance.objects, "get_items") as si_objects, \ |
| patch.object(KubernetesServiceInstance, "delete", autospec=True) as ksi_delete: |
| service_objects.return_value = [self.service] |
| |
| si = KubernetesServiceInstance(name="my-pod", owner=self.service, xos_managed=False) |
| si_objects.return_value = [si] |
| |
| pull_step = self.pull_step_class() |
| pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[]) |
| |
| pull_step.pull_records() |
| |
| self.assertEqual(ksi_delete.call_count, 1) |
| deleted_ksi = ksi_delete.call_args[0][0] |
| |
| def test_pull_records_new_pod_kafka_event(self): |
| """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created |
| """ |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \ |
| patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \ |
| patch.object(self.pull_step_class, "get_principal_from_pod") as get_principal, \ |
| patch.object(self.pull_step_class, "get_slice_from_pod") as get_slice, \ |
| patch.object(self.pull_step_class, "get_image_from_pod") as get_image, \ |
| patch.object(self.pull_step_class, "send_notification", autospec=True) as send_notification, \ |
| patch.object(KubernetesService.objects, "get_items") as service_objects, \ |
| patch.object(KubernetesServiceInstance.objects, "get_items") as si_objects, \ |
| patch.object(KubernetesServiceInstance, "save", autospec=True) as ksi_save: |
| |
| service_objects.return_value = [self.service] |
| |
| slice = Slice(name="myslice") |
| |
| get_trustdomain.return_value = self.trust_domain |
| get_principal.return_value = self.principal |
| get_slice.return_value = slice |
| get_image.return_value = self.image |
| |
| pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image) |
| pod.status.pod_ip = "1.2.3.4" |
| |
| pull_step = self.pull_step_class() |
| |
| pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod]) |
| |
| pull_step.pull_records() |
| |
| self.assertEqual(ksi_save.call_count, 2) |
| |
| # Inspect the last KubernetesServiceInstance that was saved. There's no way to inspect the first one saved |
| # if there are multiple calls, as the sync step will cause the object to be updated. |
| saved_ksi = ksi_save.call_args[0][0] |
| self.assertEqual(saved_ksi.name, "my-pod") |
| self.assertEqual(saved_ksi.pod_ip, "1.2.3.4") |
| self.assertEqual(saved_ksi.owner, self.service) |
| self.assertEqual(saved_ksi.slice, slice) |
| self.assertEqual(saved_ksi.image, self.image) |
| self.assertEqual(saved_ksi.xos_managed, False) |
| self.assertEqual(saved_ksi.need_event, False) |
| |
| self.assertEqual(send_notification.call_count, 1) |
| self.assertEqual(send_notification.call_args[0][1], saved_ksi) |
| self.assertEqual(send_notification.call_args[0][2], pod) |
| self.assertEqual(send_notification.call_args[0][3], "created") |
| |
| def test_pull_records_existing_pod_kafka_event(self): |
| """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created |
| """ |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \ |
| patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \ |
| patch.object(self.pull_step_class, "get_principal_from_pod") as get_principal, \ |
| patch.object(self.pull_step_class, "get_slice_from_pod") as get_slice, \ |
| patch.object(self.pull_step_class, "get_image_from_pod") as get_image, \ |
| patch.object(self.pull_step_class, "send_notification", autospec=True) as send_notification, \ |
| patch.object(KubernetesService.objects, "get_items") as service_objects, \ |
| patch.object(KubernetesServiceInstance.objects, "get_items") as si_objects, \ |
| patch.object(KubernetesServiceInstance, "save", autospec=True) as ksi_save: |
| |
| service_objects.return_value = [self.service] |
| |
| slice = Slice(name="myslice") |
| |
| get_trustdomain.return_value = self.trust_domain |
| get_principal.return_value = self.principal |
| get_slice.return_value = slice |
| get_image.return_value = self.image |
| |
| pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image) |
| pod.status.pod_ip = "1.2.3.4" |
| |
| xos_pod = KubernetesServiceInstance(name="my-pod", |
| pod_ip="", |
| owner=self.service, |
| slice=slice, |
| image=self.image, |
| xos_managed=False, |
| need_event=False, |
| last_event_sent="created") |
| si_objects.return_value = [xos_pod] |
| |
| pull_step = self.pull_step_class() |
| |
| pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod]) |
| |
| pull_step.pull_records() |
| |
| self.assertEqual(ksi_save.call_count, 2) |
| |
| # Inspect the last KubernetesServiceInstance that was saved. There's no way to inspect the first one saved |
| # if there are multiple calls, as the sync step will cause the object to be updated. |
| saved_ksi = ksi_save.call_args[0][0] |
| self.assertEqual(saved_ksi.name, "my-pod") |
| self.assertEqual(saved_ksi.pod_ip, "1.2.3.4") |
| self.assertEqual(saved_ksi.owner, self.service) |
| self.assertEqual(saved_ksi.slice, slice) |
| self.assertEqual(saved_ksi.image, self.image) |
| self.assertEqual(saved_ksi.xos_managed, False) |
| self.assertEqual(saved_ksi.need_event, False) |
| |
| self.assertEqual(send_notification.call_count, 1) |
| self.assertEqual(send_notification.call_args[0][1], saved_ksi) |
| self.assertEqual(send_notification.call_args[0][2], pod) |
| self.assertEqual(send_notification.call_args[0][3], "updated") |
| |
| def test_send_notification_created(self): |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client): |
| |
| pull_step = self.pull_step_class() |
| |
| from xoskafka import XOSKafkaProducer |
| |
| pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image) |
| pod.status.pod_ip = "1.2.3.4" |
| pod.metadata.labels = {"foo": "bar"} |
| xos_pod = KubernetesServiceInstance(name="my-pod", |
| pod_ip="", |
| owner=self.service, |
| slice=slice, |
| image=self.image, |
| xos_managed=False, |
| need_event=False, |
| last_event_sent="created") |
| |
| pull_step.send_notification(xos_pod, pod, "created") |
| |
| self.assertEqual(XOSKafkaProducer.produce.call_count, 1) |
| topic = XOSKafkaProducer.produce.call_args[0][0] |
| key = XOSKafkaProducer.produce.call_args[0][1] |
| event = json.loads(XOSKafkaProducer.produce.call_args[0][2]) |
| |
| self.assertEqual(topic, "xos.kubernetes.pod-details") |
| self.assertEqual(key, "my-pod") |
| |
| self.assertEqual(event["name"], "my-pod") |
| self.assertEqual(event["status"], "created") |
| self.assertEqual(event["producer"], "k8s-sync") |
| self.assertEqual(event["labels"], {"foo": "bar"}) |
| self.assertEqual(event["netinterfaces"], [{"name": "primary", "addresses": ["1.2.3.4"]}]) |
| |
| def test_send_notification_deleted(self): |
| with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client): |
| pull_step = self.pull_step_class() |
| |
| from xoskafka import XOSKafkaProducer |
| |
| xos_pod = KubernetesServiceInstance(name="my-pod", |
| pod_ip="", |
| owner=self.service, |
| slice=slice, |
| image=self.image, |
| xos_managed=False, |
| need_event=False, |
| last_event_sent="created") |
| pull_step.send_notification(xos_pod, None, "deleted") |
| |
| self.assertEqual(XOSKafkaProducer.produce.call_count, 1) |
| topic = XOSKafkaProducer.produce.call_args[0][0] |
| key = XOSKafkaProducer.produce.call_args[0][1] |
| event = json.loads(XOSKafkaProducer.produce.call_args[0][2]) |
| |
| self.assertEqual(topic, "xos.kubernetes.pod-details") |
| self.assertEqual(key, "my-pod") |
| |
| self.assertEqual(event["name"], "my-pod") |
| self.assertEqual(event["status"], "deleted") |
| self.assertEqual(event["producer"], "k8s-sync") |
| |
| if __name__ == '__main__': |
| unittest.main() |