[SEBA-249]

Update kubernetes-synchronizer to use confluent_kafka

Change-Id: I3bf944e95a86a5e6d797a75871956d1b4a0a74a7
diff --git a/xos/synchronizer/tests/test_pull_pods.py b/xos/synchronizer/tests/test_pull_pods.py
index 01ba354..ff5c8f3 100644
--- a/xos/synchronizer/tests/test_pull_pods.py
+++ b/xos/synchronizer/tests/test_pull_pods.py
@@ -30,6 +30,15 @@
         self.unittest_setup = setup_sync_unit_test(os.path.abspath(os.path.dirname(os.path.realpath(__file__))),
                                                    globals(),
                                                    [("kubernetes-service", "kubernetes.proto")] )
+        self.mockxoskafka = MagicMock()
+
+        modules = {
+            'xoskafka': self.mockxoskafka,
+            'xoskafka.XOSKafkaProducer': self.mockxoskafka.XOSKafkaProducer,
+        }
+
+        self.module_patcher = patch.dict('sys.modules', modules)
+        self.module_patcher.start()
 
         sys.path.append(os.path.join(os.path.abspath(os.path.dirname(os.path.realpath(__file__))), "../pull_steps"))
 
@@ -43,6 +52,7 @@
 
     def tearDown(self):
         sys.path = self.unittest_setup["sys_path_save"]
+        self.module_patcher.stop()
 
     def test_read_obj_kind(self):
         with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
@@ -222,7 +232,7 @@
         return pod
 
     def test_pull_records_new_pod(self):
-        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created
         """
         with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
              patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
@@ -250,7 +260,7 @@
 
             pull_step.pull_records()
 
-            self.assertEqual(ksi_save.call_count, 1)
+            self.assertEqual(ksi_save.call_count, 2)
             saved_ksi = ksi_save.call_args[0][0]
 
             self.assertEqual(saved_ksi.name, "my-pod")
@@ -261,7 +271,7 @@
             self.assertEqual(saved_ksi.xos_managed, False)
 
     def test_pull_records_missing_pod(self):
-        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created
         """
         with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
                 patch.object(KubernetesService.objects, "get_items") as service_objects, \
@@ -281,7 +291,7 @@
             deleted_ksi = ksi_delete.call_args[0][0]
 
     def test_pull_records_new_pod_kafka_event(self):
-        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created
         """
         with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
              patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
@@ -306,7 +316,7 @@
             pod.status.pod_ip = "1.2.3.4"
 
             pull_step = self.pull_step_class()
-            pull_step.kafka_producer = "foo"
+
             pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod])
 
             pull_step.pull_records()
@@ -330,7 +340,7 @@
             self.assertEqual(send_notification.call_args[0][3], "created")
 
     def test_pull_records_existing_pod_kafka_event(self):
-        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created
         """
         with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
              patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
@@ -365,7 +375,7 @@
             si_objects.return_value = [xos_pod]
 
             pull_step = self.pull_step_class()
-            pull_step.kafka_producer = "foo"
+
             pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod])
 
             pull_step.pull_records()
@@ -390,8 +400,10 @@
 
     def test_send_notification_created(self):
         with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
+
             pull_step = self.pull_step_class()
-            pull_step.kafka_producer = MagicMock()
+
+            from xoskafka import XOSKafkaProducer
 
             pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image)
             pod.status.pod_ip = "1.2.3.4"
@@ -404,13 +416,16 @@
                                                 xos_managed=False,
                                                 need_event=False,
                                                 last_event_sent="created")
+
             pull_step.send_notification(xos_pod, pod, "created")
 
-            self.assertEqual(pull_step.kafka_producer.send.call_count, 1)
-            topic = pull_step.kafka_producer.send.call_args[0][0]
-            event = json.loads(pull_step.kafka_producer.send.call_args[0][1])
+            self.assertEqual(XOSKafkaProducer.produce.call_count, 1)
+            topic = XOSKafkaProducer.produce.call_args[0][0]
+            key = XOSKafkaProducer.produce.call_args[0][1]
+            event = json.loads(XOSKafkaProducer.produce.call_args[0][2])
 
             self.assertEqual(topic, "xos.kubernetes.pod-details")
+            self.assertEqual(key, "my-pod")
 
             self.assertEqual(event["name"], "my-pod")
             self.assertEqual(event["status"], "created")
@@ -421,7 +436,8 @@
     def test_send_notification_deleted(self):
         with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
             pull_step = self.pull_step_class()
-            pull_step.kafka_producer = MagicMock()
+
+            from xoskafka import XOSKafkaProducer
 
             xos_pod = KubernetesServiceInstance(name="my-pod",
                                                 pod_ip="",
@@ -433,11 +449,13 @@
                                                 last_event_sent="created")
             pull_step.send_notification(xos_pod, None, "deleted")
 
-            self.assertEqual(pull_step.kafka_producer.send.call_count, 1)
-            topic = pull_step.kafka_producer.send.call_args[0][0]
-            event = json.loads(pull_step.kafka_producer.send.call_args[0][1])
+            self.assertEqual(XOSKafkaProducer.produce.call_count, 1)
+            topic = XOSKafkaProducer.produce.call_args[0][0]
+            key = XOSKafkaProducer.produce.call_args[0][1]
+            event = json.loads(XOSKafkaProducer.produce.call_args[0][2])
 
             self.assertEqual(topic, "xos.kubernetes.pod-details")
+            self.assertEqual(key, "my-pod")
 
             self.assertEqual(event["name"], "my-pod")
             self.assertEqual(event["status"], "deleted")