SEBA-169 Push kafka events when Kubernetes pods are created, updated, or destroyed
Change-Id: Idd711140c919ba2c232c7f08f0dc23507cfdd973
diff --git a/xos/synchronizer/pull_steps/pull_pods.py b/xos/synchronizer/pull_steps/pull_pods.py
index 3cbbbce..047fb10 100644
--- a/xos/synchronizer/pull_steps/pull_pods.py
+++ b/xos/synchronizer/pull_steps/pull_pods.py
@@ -18,6 +18,8 @@
Implements a syncstep to pull information about pods form Kubernetes.
"""
+import json
+
from synchronizers.new_base.pullstep import PullStep
from synchronizers.new_base.modelaccessor import KubernetesServiceInstance, KubernetesService, Slice, Principal, \
TrustDomain, Site, Image
@@ -38,6 +40,14 @@
def __init__(self):
super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
+
+ self.kafka_producer = None
+ if Config.get("event_bus.endpoint"):
+ try:
+ self.init_kafka_producer()
+ except:
+ log.exception("Failed to initialize Kafka producer")
+
self.init_kubernetes_client()
def init_kubernetes_client(self):
@@ -47,6 +57,21 @@
self.v1apps = kubernetes_client.AppsV1Api()
self.v1batch = kubernetes_client.BatchV1Api()
+ def init_kafka_producer(self):
+ from kafka import KafkaProducer
+ eventbus_kind = Config.get("event_bus.kind")
+ eventbus_endpoint = Config.get("event_bus.endpoint")
+
+ if not eventbus_kind:
+ log.error("Eventbus kind is not configured in synchronizer config file.")
+ return
+
+ if eventbus_kind not in ["kafka"]:
+ log.error("Eventbus kind is set to a technology we do not implement.", eventbus_kind=eventbus_kind)
+ return
+
+ self.kafka_producer = KafkaProducer(bootstrap_servers = [eventbus_endpoint])
+
def obj_to_handle(self, obj):
""" Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
Kubernetes.
@@ -189,6 +214,26 @@
else:
return None
+ def send_notification(self, xos_pod, k8s_pod, status):
+ if not self.kafka_producer:
+ return
+
+ event = {"status": status,
+ "name": xos_pod.name}
+
+ if xos_pod.id:
+ event["kubernetesserviceinstance_id"] = xos_pod.id
+
+ if k8s_pod:
+ event["labels"] = k8s_pod.metadata.labels
+
+ if k8s_pod.status.pod_ip:
+ event["netinterfaces"] = [{"name": "primary",
+ "addresses": [k8s_pod.status.pod_ip]}]
+
+ self.kafka_producer.send("xos.kubernetes.pod-details", json.dumps(event))
+ self.kafka_producer.flush()
+
def pull_records(self):
# Read all pods from Kubernetes, store them in k8s_pods_by_name
k8s_pods_by_name = {}
@@ -233,27 +278,50 @@
slice = slice,
image = image,
backend_handle = self.obj_to_handle(pod),
- xos_managed = False)
+ xos_managed = False,
+ need_event = True)
xos_pod.save()
xos_pods_by_name[k] = xos_pod
log.info("Created XOS POD %s" % xos_pod.name)
+ xos_pod = xos_pods_by_name[k]
+
# Check to see if the ip address has changed. This can happen for pods that are managed by XOS. The IP
# isn't available immediately when XOS creates a pod, but shows up a bit later. So handle that case
# here.
- xos_pod = xos_pods_by_name[k]
if (pod.status.pod_ip is not None) and (xos_pod.pod_ip != pod.status.pod_ip):
xos_pod.pod_ip = pod.status.pod_ip
- xos_pod.save(update_fields = ["pod_ip"])
+ xos_pod.need_event = True # Trigger a new kafka event
+ xos_pod.save(update_fields = ["pod_ip", "need_event"])
+ log.info("Updated XOS POD %s" % xos_pod.name)
+
+ # Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If
+ # so, then we'll try to send the event again later.
+ if (xos_pod.need_event) and (self.kafka_producer):
+ if xos_pod.last_event_sent == "created":
+ event_kind = "updated"
+ else:
+ event_kind = "created"
+
+ self.send_notification(xos_pod, pod, event_kind)
+
+ xos_pod.need_event = False
+ xos_pod.last_event_sent = event_kind
+ xos_pod.save(update_fields=["need_event", "last_event_sent"])
+
except:
log.exception("Failed to process k8s pod", k=k, pod=pod)
# For each xos pod, see if there is no k8s pod. If that's the case, then the pud must have been deleted.
for (k,xos_pod) in xos_pods_by_name.items():
- if (not k in k8s_pods_by_name):
- if (xos_pod.xos_managed):
- # Should we do something so it gets re-created by the syncstep?
- pass
- else:
- xos_pod.delete()
- log.info("Deleted XOS POD %s" % k)
+ try:
+ if (not k in k8s_pods_by_name):
+ if (xos_pod.xos_managed):
+ # Should we do something so it gets re-created by the syncstep?
+ pass
+ else:
+ self.send_notification(xos_pod, None, "deleted")
+ xos_pod.delete()
+ log.info("Deleted XOS POD %s" % k)
+ except:
+ log.exception("Failed to process xos pod", k=k, xos_pod=xos_pod)