[SEBA-249]
Update kubernetes-synchronizer to use confluent_kafka
Change-Id: I3bf944e95a86a5e6d797a75871956d1b4a0a74a7
diff --git a/xos/synchronizer/pull_steps/pull_pods.py b/xos/synchronizer/pull_steps/pull_pods.py
index 0fe0fc0..65b657f 100644
--- a/xos/synchronizer/pull_steps/pull_pods.py
+++ b/xos/synchronizer/pull_steps/pull_pods.py
@@ -26,9 +26,11 @@
from xosconfig import Config
from multistructlog import create_logger
+from xoskafka import XOSKafkaProducer
log = create_logger(Config().get('logging'))
+
class KubernetesServiceInstancePullStep(PullStep):
"""
KubernetesServiceInstancePullStep
@@ -41,13 +43,6 @@
def __init__(self):
super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
- self.kafka_producer = None
- if Config.get("event_bus.endpoint"):
- try:
- self.init_kafka_producer()
- except:
- log.exception("Failed to initialize Kafka producer")
-
self.init_kubernetes_client()
def init_kubernetes_client(self):
@@ -57,21 +52,6 @@
self.v1apps = kubernetes_client.AppsV1Api()
self.v1batch = kubernetes_client.BatchV1Api()
- def init_kafka_producer(self):
- from kafka import KafkaProducer
- eventbus_kind = Config.get("event_bus.kind")
- eventbus_endpoint = Config.get("event_bus.endpoint")
-
- if not eventbus_kind:
- log.error("Eventbus kind is not configured in synchronizer config file.")
- return
-
- if eventbus_kind not in ["kafka"]:
- log.error("Eventbus kind is set to a technology we do not implement.", eventbus_kind=eventbus_kind)
- return
-
- self.kafka_producer = KafkaProducer(bootstrap_servers = [eventbus_endpoint])
-
def obj_to_handle(self, obj):
""" Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
Kubernetes.
@@ -215,8 +195,6 @@
return None
def send_notification(self, xos_pod, k8s_pod, status):
- if not self.kafka_producer:
- return
event = {"status": status,
"name": xos_pod.name,
@@ -232,8 +210,12 @@
event["netinterfaces"] = [{"name": "primary",
"addresses": [k8s_pod.status.pod_ip]}]
- self.kafka_producer.send("xos.kubernetes.pod-details", json.dumps(event))
- self.kafka_producer.flush()
+ topic = "xos.kubernetes.pod-details"
+ key = xos_pod.name
+ value = json.dumps(event, default=lambda o: repr(o))
+
+ XOSKafkaProducer.produce(topic, key, value)
+
def pull_records(self):
# Read all pods from Kubernetes, store them in k8s_pods_by_name
@@ -298,7 +280,7 @@
# Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If
# so, then we'll try to send the event again later.
- if (xos_pod.need_event) and (self.kafka_producer):
+ if (xos_pod.need_event):
if xos_pod.last_event_sent == "created":
event_kind = "updated"
else: