SEBA-169 Push kafka events when Kubernetes pods are created, updated, or destroyed

Change-Id: Idd711140c919ba2c232c7f08f0dc23507cfdd973
diff --git a/xos/synchronizer/kubernetes_config.yaml b/xos/synchronizer/config.yaml
similarity index 91%
rename from xos/synchronizer/kubernetes_config.yaml
rename to xos/synchronizer/config.yaml
index 764daf6..0eaab8b 100644
--- a/xos/synchronizer/kubernetes_config.yaml
+++ b/xos/synchronizer/config.yaml
@@ -15,10 +15,6 @@
 
 
 name: kubernetes
-accessor:
-  username: "admin@opencord.org"
-  password: "letmein"
-  endpoint: xos-core:50051
 required_models:
   - KubernetesService
   - KubernetesServiceInstance
diff --git a/xos/synchronizer/kubernetes-synchronizer.py b/xos/synchronizer/kubernetes-synchronizer.py
old mode 100644
new mode 100755
index 8287263..34cb34d
--- a/xos/synchronizer/kubernetes-synchronizer.py
+++ b/xos/synchronizer/kubernetes-synchronizer.py
@@ -29,8 +29,13 @@
 import sys
 from xosconfig import Config
 
-config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/kubernetes_config.yaml')
-Config.init(config_file, 'synchronizer-config-schema.yaml')
+base_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/config.yaml')
+mounted_config_file = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/mounted_config.yaml')
+
+if os.path.isfile(mounted_config_file):
+    Config.init(base_config_file, 'synchronizer-config-schema.yaml', mounted_config_file)
+else:
+    Config.init(base_config_file, 'synchronizer-config-schema.yaml')
 
 # prevent logging noise from k8s API calls
 logging.getLogger("kubernetes.client.rest").setLevel(logging.WARNING)
diff --git a/xos/synchronizer/models/kubernetes.xproto b/xos/synchronizer/models/kubernetes.xproto
index 40229f7..405f1c2 100644
--- a/xos/synchronizer/models/kubernetes.xproto
+++ b/xos/synchronizer/models/kubernetes.xproto
@@ -9,6 +9,8 @@
 message KubernetesServiceInstance (ComputeServiceInstance){
      option verbose_name = "Kubernetes Service Instance";
      optional string pod_ip = 1 [max_length=32, db_index = False, null = True, blank = True, help_text = "IP address of pod"];
+     required bool need_event = 2 [default = False, blank = True, help_text = "True if a kafka event needs to be sent by the pull step"];
+     optional string last_event_sent = 3 [max_length=32, db_index = False, null = True, blank = True, choices = "(('created', 'CREATED'), ('updated', 'UPDATED'), ('deleted', 'DELETED'))", help_text = "Type of last event sent"];
 }
 
 message KubernetesData (XOSBase) {
diff --git a/xos/synchronizer/pull_steps/pull_pods.py b/xos/synchronizer/pull_steps/pull_pods.py
index 3cbbbce..047fb10 100644
--- a/xos/synchronizer/pull_steps/pull_pods.py
+++ b/xos/synchronizer/pull_steps/pull_pods.py
@@ -18,6 +18,8 @@
     Implements a syncstep to pull information about pods form Kubernetes.
 """
 
+import json
+
 from synchronizers.new_base.pullstep import PullStep
 from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
                                                  TrustDomain, Site, Image
@@ -38,6 +40,14 @@
 
     def __init__(self):
         super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
+
+        self.kafka_producer = None
+        if Config.get("event_bus.endpoint"):
+            try:
+                self.init_kafka_producer()
+            except:
+                log.exception("Failed to initialize Kafka producer")
+
         self.init_kubernetes_client()
 
     def init_kubernetes_client(self):
@@ -47,6 +57,21 @@
         self.v1apps = kubernetes_client.AppsV1Api()
         self.v1batch = kubernetes_client.BatchV1Api()
 
+    def init_kafka_producer(self):
+        from kafka import KafkaProducer
+        eventbus_kind = Config.get("event_bus.kind")
+        eventbus_endpoint = Config.get("event_bus.endpoint")
+
+        if not eventbus_kind:
+            log.error("Eventbus kind is not configured in synchronizer config file.")
+            return
+
+        if eventbus_kind not in ["kafka"]:
+            log.error("Eventbus kind is set to a technology we do not implement.", eventbus_kind=eventbus_kind)
+            return
+
+        self.kafka_producer = KafkaProducer(bootstrap_servers = [eventbus_endpoint])
+
     def obj_to_handle(self, obj):
         """ Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
             Kubernetes.
@@ -189,6 +214,26 @@
         else:
             return None
 
+    def send_notification(self, xos_pod, k8s_pod, status):
+        if not self.kafka_producer:
+            return
+
+        event = {"status": status,
+                 "name": xos_pod.name}
+
+        if xos_pod.id:
+            event["kubernetesserviceinstance_id"] = xos_pod.id
+
+        if k8s_pod:
+            event["labels"] = k8s_pod.metadata.labels
+
+            if k8s_pod.status.pod_ip:
+                event["netinterfaces"] = [{"name": "primary",
+                                          "addresses": [k8s_pod.status.pod_ip]}]
+
+        self.kafka_producer.send("xos.kubernetes.pod-details", json.dumps(event))
+        self.kafka_producer.flush()
+
     def pull_records(self):
         # Read all pods from Kubernetes, store them in k8s_pods_by_name
         k8s_pods_by_name = {}
@@ -233,27 +278,50 @@
                                                         slice = slice,
                                                         image = image,
                                                         backend_handle = self.obj_to_handle(pod),
-                                                        xos_managed = False)
+                                                        xos_managed = False,
+                                                        need_event = True)
                     xos_pod.save()
                     xos_pods_by_name[k] = xos_pod
                     log.info("Created XOS POD %s" % xos_pod.name)
 
+                xos_pod = xos_pods_by_name[k]
+
                 # Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP
                 # isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case
                 # here.
-                xos_pod = xos_pods_by_name[k]
                 if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip):
                     xos_pod.pod_ip = pod.status.pod_ip
-                    xos_pod.save(update_fields = ["pod_ip"])
+                    xos_pod.need_event = True # Trigger a new kafka event
+                    xos_pod.save(update_fields = ["pod_ip", "need_event"])
+                    log.info("Updated XOS POD %s" % xos_pod.name)
+
+                # Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If
+                # so, then we'll try to send the event again later.
+                if (xos_pod.need_event) and (self.kafka_producer):
+                    if xos_pod.last_event_sent == "created":
+                        event_kind = "updated"
+                    else:
+                        event_kind = "created"
+
+                    self.send_notification(xos_pod, pod, event_kind)
+
+                    xos_pod.need_event = False
+                    xos_pod.last_event_sent = event_kind
+                    xos_pod.save(update_fields=["need_event", "last_event_sent"])
+
             except:
                 log.exception("Failed to process k8s pod", k=k, pod=pod)
 
         # For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
         for (k,xos_pod) in xos_pods_by_name.items():
-            if (not k in k8s_pods_by_name):
-                if (xos_pod.xos_managed):
-                    # Should we do something so it gets re-created by the syncstep?
-                    pass
-                else:
-                    xos_pod.delete()
-                    log.info("Deleted XOS POD %s" % k)
+            try:
+                if (not k in k8s_pods_by_name):
+                    if (xos_pod.xos_managed):
+                        # Should we do something so it gets re-created by the syncstep?
+                        pass
+                    else:
+                        self.send_notification(xos_pod, None, "deleted")
+                        xos_pod.delete()
+                        log.info("Deleted XOS POD %s" % k)
+            except:
+                log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod)
diff --git a/xos/synchronizer/tests/test_pull_pods.py b/xos/synchronizer/tests/test_pull_pods.py
index 672687f..c0b8b0b 100644
--- a/xos/synchronizer/tests/test_pull_pods.py
+++ b/xos/synchronizer/tests/test_pull_pods.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import json
 import os, sys
 import unittest
 from mock import patch, PropertyMock, ANY, MagicMock
@@ -279,5 +280,166 @@
             self.assertEqual(ksi_delete.call_count, 1)
             deleted_ksi = ksi_delete.call_args[0][0]
 
+    def test_pull_records_new_pod_kafka_event(self):
+        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+        """
+        with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
+             patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
+             patch.object(self.pull_step_class, "get_principal_from_pod") as get_principal, \
+             patch.object(self.pull_step_class, "get_slice_from_pod") as get_slice, \
+             patch.object(self.pull_step_class, "get_image_from_pod") as get_image, \
+             patch.object(self.pull_step_class, "send_notification", autospec=True) as send_notification, \
+             patch.object(KubernetesService.objects, "get_items") as service_objects, \
+             patch.object(KubernetesServiceInstance.objects, "get_items") as si_objects, \
+             patch.object(KubernetesServiceInstance, "save", autospec=True) as ksi_save:
+
+            service_objects.return_value = [self.service]
+
+            slice = Slice(name="myslice")
+
+            get_trustdomain.return_value = self.trust_domain
+            get_principal.return_value = self.principal
+            get_slice.return_value = slice
+            get_image.return_value = self.image
+
+            pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image)
+            pod.status.pod_ip = "1.2.3.4"
+
+            pull_step = self.pull_step_class()
+            pull_step.kafka_producer = "foo"
+            pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod])
+
+            pull_step.pull_records()
+
+            self.assertEqual(ksi_save.call_count, 2)
+
+            # Inspect the last KubernetesServiceInstance that was saved. There's no way to inspect the first one saved
+            # if there are multiple calls, as the sync step will cause the object to be updated.
+            saved_ksi = ksi_save.call_args[0][0]
+            self.assertEqual(saved_ksi.name, "my-pod")
+            self.assertEqual(saved_ksi.pod_ip, "1.2.3.4")
+            self.assertEqual(saved_ksi.owner, self.service)
+            self.assertEqual(saved_ksi.slice, slice)
+            self.assertEqual(saved_ksi.image, self.image)
+            self.assertEqual(saved_ksi.xos_managed, False)
+            self.assertEqual(saved_ksi.need_event, False)
+
+            self.assertEqual(send_notification.call_count, 1)
+            self.assertEqual(send_notification.call_args[0][1], saved_ksi)
+            self.assertEqual(send_notification.call_args[0][2], pod)
+            self.assertEqual(send_notification.call_args[0][3], "created")
+
+    def test_pull_records_existing_pod_kafka_event(self):
+        """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+        """
+        with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
+             patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
+             patch.object(self.pull_step_class, "get_principal_from_pod") as get_principal, \
+             patch.object(self.pull_step_class, "get_slice_from_pod") as get_slice, \
+             patch.object(self.pull_step_class, "get_image_from_pod") as get_image, \
+             patch.object(self.pull_step_class, "send_notification", autospec=True) as send_notification, \
+             patch.object(KubernetesService.objects, "get_items") as service_objects, \
+             patch.object(KubernetesServiceInstance.objects, "get_items") as si_objects, \
+             patch.object(KubernetesServiceInstance, "save", autospec=True) as ksi_save:
+
+            service_objects.return_value = [self.service]
+
+            slice = Slice(name="myslice")
+
+            get_trustdomain.return_value = self.trust_domain
+            get_principal.return_value = self.principal
+            get_slice.return_value = slice
+            get_image.return_value = self.image
+
+            pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image)
+            pod.status.pod_ip = "1.2.3.4"
+
+            xos_pod = KubernetesServiceInstance(name="my-pod",
+                                                pod_ip="",
+                                                owner=self.service,
+                                                slice=slice,
+                                                image=self.image,
+                                                xos_managed=False,
+                                                need_event=False,
+                                                last_event_sent="created")
+            si_objects.return_value = [xos_pod]
+
+            pull_step = self.pull_step_class()
+            pull_step.kafka_producer = "foo"
+            pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod])
+
+            pull_step.pull_records()
+
+            self.assertEqual(ksi_save.call_count, 2)
+
+            # Inspect the last KubernetesServiceInstance that was saved. There's no way to inspect the first one saved
+            # if there are multiple calls, as the sync step will cause the object to be updated.
+            saved_ksi = ksi_save.call_args[0][0]
+            self.assertEqual(saved_ksi.name, "my-pod")
+            self.assertEqual(saved_ksi.pod_ip, "1.2.3.4")
+            self.assertEqual(saved_ksi.owner, self.service)
+            self.assertEqual(saved_ksi.slice, slice)
+            self.assertEqual(saved_ksi.image, self.image)
+            self.assertEqual(saved_ksi.xos_managed, False)
+            self.assertEqual(saved_ksi.need_event, False)
+
+            self.assertEqual(send_notification.call_count, 1)
+            self.assertEqual(send_notification.call_args[0][1], saved_ksi)
+            self.assertEqual(send_notification.call_args[0][2], pod)
+            self.assertEqual(send_notification.call_args[0][3], "updated")
+
+    def test_send_notification_created(self):
+        with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
+            pull_step = self.pull_step_class()
+            pull_step.kafka_producer = MagicMock()
+
+            pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image)
+            pod.status.pod_ip = "1.2.3.4"
+            pod.metadata.labels = {"foo": "bar"}
+            xos_pod = KubernetesServiceInstance(name="my-pod",
+                                                pod_ip="",
+                                                owner=self.service,
+                                                slice=slice,
+                                                image=self.image,
+                                                xos_managed=False,
+                                                need_event=False,
+                                                last_event_sent="created")
+            pull_step.send_notification(xos_pod, pod, "created")
+
+            self.assertEqual(pull_step.kafka_producer.send.call_count, 1)
+            topic = pull_step.kafka_producer.send.call_args[0][0]
+            event = json.loads(pull_step.kafka_producer.send.call_args[0][1])
+
+            self.assertEqual(topic, "xos.kubernetes.pod-details")
+
+            self.assertEqual(event["name"], "my-pod")
+            self.assertEqual(event["status"], "created")
+            self.assertEqual(event["labels"], {"foo": "bar"})
+            self.assertEqual(event["netinterfaces"], [{"name": "primary", "addresses": ["1.2.3.4"]}])
+
+    def test_send_notification_deleted(self):
+        with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
+            pull_step = self.pull_step_class()
+            pull_step.kafka_producer = MagicMock()
+
+            xos_pod = KubernetesServiceInstance(name="my-pod",
+                                                pod_ip="",
+                                                owner=self.service,
+                                                slice=slice,
+                                                image=self.image,
+                                                xos_managed=False,
+                                                need_event=False,
+                                                last_event_sent="created")
+            pull_step.send_notification(xos_pod, None, "deleted")
+
+            self.assertEqual(pull_step.kafka_producer.send.call_count, 1)
+            topic = pull_step.kafka_producer.send.call_args[0][0]
+            event = json.loads(pull_step.kafka_producer.send.call_args[0][1])
+
+            self.assertEqual(topic, "xos.kubernetes.pod-details")
+
+            self.assertEqual(event["name"], "my-pod")
+            self.assertEqual(event["status"], "deleted")
+
 if __name__ == '__main__':
     unittest.main()