SEBA-169 Push kafka events when Kubernetes pods are created, updated, or destroyed

Change-Id: Idd711140c919ba2c232c7f08f0dc23507cfdd973
diff --git a/xos/synchronizer/tests/test_pull_pods.py b/xos/synchronizer/tests/test_pull_pods.py
index 672687f..c0b8b0b 100644
--- a/xos/synchronizer/tests/test_pull_pods.py
+++ b/xos/synchronizer/tests/test_pull_pods.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import json
 import os, sys
 import unittest
 from mock import patch, PropertyMock, ANY, MagicMock
@@ -279,5 +280,166 @@
             self.assertEqual(ksi_delete.call_count, 1)
             deleted_ksi = ksi_delete.call_args[0][0]
 
+    def test_pull_records_new_pod_kafka_event(self):
+        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+        """
+        with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
+             patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
+             patch.object(self.pull_step_class, "get_principal_from_pod") as get_principal, \
+             patch.object(self.pull_step_class, "get_slice_from_pod") as get_slice, \
+             patch.object(self.pull_step_class, "get_image_from_pod") as get_image, \
+             patch.object(self.pull_step_class, "send_notification", autospec=True) as send_notification, \
+             patch.object(KubernetesService.objects, "get_items") as service_objects, \
+             patch.object(KubernetesServiceInstance.objects, "get_items") as si_objects, \
+             patch.object(KubernetesServiceInstance, "save", autospec=True) as ksi_save:
+
+            service_objects.return_value = [self.service]
+
+            slice = Slice(name="myslice")
+
+            get_trustdomain.return_value = self.trust_domain
+            get_principal.return_value = self.principal
+            get_slice.return_value = slice
+            get_image.return_value = self.image
+
+            pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image)
+            pod.status.pod_ip = "1.2.3.4"
+
+            pull_step = self.pull_step_class()
+            pull_step.kafka_producer = "foo"
+            pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod])
+
+            pull_step.pull_records()
+
+            self.assertEqual(ksi_save.call_count, 2)
+
+            # Inspect the last KubernetesServiceInstance that was saved. There's no way to inspect the first one saved
+            # if there are multiple calls, as the sync step will cause the object to be updated.
+            saved_ksi = ksi_save.call_args[0][0]
+            self.assertEqual(saved_ksi.name, "my-pod")
+            self.assertEqual(saved_ksi.pod_ip, "1.2.3.4")
+            self.assertEqual(saved_ksi.owner, self.service)
+            self.assertEqual(saved_ksi.slice, slice)
+            self.assertEqual(saved_ksi.image, self.image)
+            self.assertEqual(saved_ksi.xos_managed, False)
+            self.assertEqual(saved_ksi.need_event, False)
+
+            self.assertEqual(send_notification.call_count, 1)
+            self.assertEqual(send_notification.call_args[0][1], saved_ksi)
+            self.assertEqual(send_notification.call_args[0][2], pod)
+            self.assertEqual(send_notification.call_args[0][3], "created")
+
+    def test_pull_records_existing_pod_kafka_event(self):
+        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+        """
+        with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
+             patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
+             patch.object(self.pull_step_class, "get_principal_from_pod") as get_principal, \
+             patch.object(self.pull_step_class, "get_slice_from_pod") as get_slice, \
+             patch.object(self.pull_step_class, "get_image_from_pod") as get_image, \
+             patch.object(self.pull_step_class, "send_notification", autospec=True) as send_notification, \
+             patch.object(KubernetesService.objects, "get_items") as service_objects, \
+             patch.object(KubernetesServiceInstance.objects, "get_items") as si_objects, \
+             patch.object(KubernetesServiceInstance, "save", autospec=True) as ksi_save:
+
+            service_objects.return_value = [self.service]
+
+            slice = Slice(name="myslice")
+
+            get_trustdomain.return_value = self.trust_domain
+            get_principal.return_value = self.principal
+            get_slice.return_value = slice
+            get_image.return_value = self.image
+
+            pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image)
+            pod.status.pod_ip = "1.2.3.4"
+
+            xos_pod = KubernetesServiceInstance(name="my-pod",
+                                                pod_ip="",
+                                                owner=self.service,
+                                                slice=slice,
+                                                image=self.image,
+                                                xos_managed=False,
+                                                need_event=False,
+                                                last_event_sent="created")
+            si_objects.return_value = [xos_pod]
+
+            pull_step = self.pull_step_class()
+            pull_step.kafka_producer = "foo"
+            pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod])
+
+            pull_step.pull_records()
+
+            self.assertEqual(ksi_save.call_count, 2)
+
+            # Inspect the last KubernetesServiceInstance that was saved. There's no way to inspect the first one saved
+            # if there are multiple calls, as the sync step will cause the object to be updated.
+            saved_ksi = ksi_save.call_args[0][0]
+            self.assertEqual(saved_ksi.name, "my-pod")
+            self.assertEqual(saved_ksi.pod_ip, "1.2.3.4")
+            self.assertEqual(saved_ksi.owner, self.service)
+            self.assertEqual(saved_ksi.slice, slice)
+            self.assertEqual(saved_ksi.image, self.image)
+            self.assertEqual(saved_ksi.xos_managed, False)
+            self.assertEqual(saved_ksi.need_event, False)
+
+            self.assertEqual(send_notification.call_count, 1)
+            self.assertEqual(send_notification.call_args[0][1], saved_ksi)
+            self.assertEqual(send_notification.call_args[0][2], pod)
+            self.assertEqual(send_notification.call_args[0][3], "updated")
+
+    def test_send_notification_created(self):
+        with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
+            pull_step = self.pull_step_class()
+            pull_step.kafka_producer = MagicMock()
+
+            pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image)
+            pod.status.pod_ip = "1.2.3.4"
+            pod.metadata.labels = {"foo": "bar"}
+            xos_pod = KubernetesServiceInstance(name="my-pod",
+                                                pod_ip="",
+                                                owner=self.service,
+                                                slice=slice,
+                                                image=self.image,
+                                                xos_managed=False,
+                                                need_event=False,
+                                                last_event_sent="created")
+            pull_step.send_notification(xos_pod, pod, "created")
+
+            self.assertEqual(pull_step.kafka_producer.send.call_count, 1)
+            topic = pull_step.kafka_producer.send.call_args[0][0]
+            event = json.loads(pull_step.kafka_producer.send.call_args[0][1])
+
+            self.assertEqual(topic, "xos.kubernetes.pod-details")
+
+            self.assertEqual(event["name"], "my-pod")
+            self.assertEqual(event["status"], "created")
+            self.assertEqual(event["labels"], {"foo": "bar"})
+            self.assertEqual(event["netinterfaces"], [{"name": "primary", "addresses": ["1.2.3.4"]}])
+
+    def test_send_notification_deleted(self):
+        with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
+            pull_step = self.pull_step_class()
+            pull_step.kafka_producer = MagicMock()
+
+            xos_pod = KubernetesServiceInstance(name="my-pod",
+                                                pod_ip="",
+                                                owner=self.service,
+                                                slice=slice,
+                                                image=self.image,
+                                                xos_managed=False,
+                                                need_event=False,
+                                                last_event_sent="created")
+            pull_step.send_notification(xos_pod, None, "deleted")
+
+            self.assertEqual(pull_step.kafka_producer.send.call_count, 1)
+            topic = pull_step.kafka_producer.send.call_args[0][0]
+            event = json.loads(pull_step.kafka_producer.send.call_args[0][1])
+
+            self.assertEqual(topic, "xos.kubernetes.pod-details")
+
+            self.assertEqual(event["name"], "my-pod")
+            self.assertEqual(event["status"], "deleted")
+
 if __name__ == '__main__':
     unittest.main()