[SEBA-249]
Update kubernetes-synchronizer to use confluent_kafka
Change-Id: I3bf944e95a86a5e6d797a75871956d1b4a0a74a7
diff --git a/Dockerfile.synchronizer b/Dockerfile.synchronizer
index 23a0f61..743a3c5 100644
--- a/Dockerfile.synchronizer
+++ b/Dockerfile.synchronizer
@@ -15,7 +15,7 @@
# docker build -t xosproject/kubernetes-synchronizer:candidate -f Dockerfile.synchronizer .
# xosproject/kubernetes-synchronizer
-FROM xosproject/xos-synchronizer-base:2.1.0
+FROM xosproject/xos-synchronizer-base:2.1.5
COPY xos/synchronizer /opt/xos/synchronizers/kubernetes
COPY VERSION /opt/xos/synchronizers/kubernetes/
diff --git a/VERSION b/VERSION
index 6d7de6e..4abbf0b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.0.2
+1.0.3-dev0
diff --git a/xos/synchronizer/kubernetes-synchronizer.py b/xos/synchronizer/kubernetes-synchronizer.py
index 34cb34d..563f499 100755
--- a/xos/synchronizer/kubernetes-synchronizer.py
+++ b/xos/synchronizer/kubernetes-synchronizer.py
@@ -37,9 +37,14 @@
else:
Config.init(base_config_file, 'synchronizer-config-schema.yaml')
+from xoskafka import XOSKafkaProducer
+
# prevent logging noise from k8s API calls
logging.getLogger("kubernetes.client.rest").setLevel(logging.WARNING)
+# init kafka producer connection
+XOSKafkaProducer.init()
+
synchronizer_path = os.path.join(os.path.dirname(
os.path.realpath(__file__)), "../../synchronizers/new_base")
sys.path.append(synchronizer_path)
diff --git a/xos/synchronizer/pull_steps/pull_pods.py b/xos/synchronizer/pull_steps/pull_pods.py
index 0fe0fc0..65b657f 100644
--- a/xos/synchronizer/pull_steps/pull_pods.py
+++ b/xos/synchronizer/pull_steps/pull_pods.py
@@ -26,9 +26,11 @@
from xosconfig import Config
from multistructlog import create_logger
+from xoskafka import XOSKafkaProducer
log = create_logger(Config().get('logging'))
+
class KubernetesServiceInstancePullStep(PullStep):
"""
KubernetesServiceInstancePullStep
@@ -41,13 +43,6 @@
def __init__(self):
super(KubernetesServiceInstancePullStep, self).__init__(observed_model=KubernetesServiceInstance)
- self.kafka_producer = None
- if Config.get("event_bus.endpoint"):
- try:
- self.init_kafka_producer()
- except:
- log.exception("Failed to initialize Kafka producer")
-
self.init_kubernetes_client()
def init_kubernetes_client(self):
@@ -57,21 +52,6 @@
self.v1apps = kubernetes_client.AppsV1Api()
self.v1batch = kubernetes_client.BatchV1Api()
- def init_kafka_producer(self):
- from kafka import KafkaProducer
- eventbus_kind = Config.get("event_bus.kind")
- eventbus_endpoint = Config.get("event_bus.endpoint")
-
- if not eventbus_kind:
- log.error("Eventbus kind is not configured in synchronizer config file.")
- return
-
- if eventbus_kind not in ["kafka"]:
- log.error("Eventbus kind is set to a technology we do not implement.", eventbus_kind=eventbus_kind)
- return
-
- self.kafka_producer = KafkaProducer(bootstrap_servers = [eventbus_endpoint])
-
def obj_to_handle(self, obj):
""" Convert a Kubernetes resource into a handle that we can use to uniquely identify the object within
Kubernetes.
@@ -215,8 +195,6 @@
return None
def send_notification(self, xos_pod, k8s_pod, status):
- if not self.kafka_producer:
- return
event = {"status": status,
"name": xos_pod.name,
@@ -232,8 +210,12 @@
event["netinterfaces"] = [{"name": "primary",
"addresses": [k8s_pod.status.pod_ip]}]
- self.kafka_producer.send("xos.kubernetes.pod-details", json.dumps(event))
- self.kafka_producer.flush()
+ topic = "xos.kubernetes.pod-details"
+ key = xos_pod.name
+ value = json.dumps(event, default=lambda o: repr(o))
+
+ XOSKafkaProducer.produce(topic, key, value)
+
def pull_records(self):
# Read all pods from Kubernetes, store them in k8s_pods_by_name
@@ -298,7 +280,7 @@
# Check to see if we haven't sent the Kafka event yet. It's possible Kafka could be down. If
# so, then we'll try to send the event again later.
- if (xos_pod.need_event) and (self.kafka_producer):
+ if (xos_pod.need_event):
if xos_pod.last_event_sent == "created":
event_kind = "updated"
else:
diff --git a/xos/synchronizer/tests/test_pull_pods.py b/xos/synchronizer/tests/test_pull_pods.py
index 01ba354..ff5c8f3 100644
--- a/xos/synchronizer/tests/test_pull_pods.py
+++ b/xos/synchronizer/tests/test_pull_pods.py
@@ -30,6 +30,15 @@
self.unittest_setup = setup_sync_unit_test(os.path.abspath(os.path.dirname(os.path.realpath(__file__))),
globals(),
[("kubernetes-service", "kubernetes.proto")] )
+ self.mockxoskafka = MagicMock()
+
+ modules = {
+ 'xoskafka': self.mockxoskafka,
+ 'xoskafka.XOSKafkaProducer': self.mockxoskafka.XOSKafkaProducer,
+ }
+
+ self.module_patcher = patch.dict('sys.modules', modules)
+ self.module_patcher.start()
sys.path.append(os.path.join(os.path.abspath(os.path.dirname(os.path.realpath(__file__))), "../pull_steps"))
@@ -43,6 +52,7 @@
def tearDown(self):
sys.path = self.unittest_setup["sys_path_save"]
+ self.module_patcher.stop()
def test_read_obj_kind(self):
with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
@@ -222,7 +232,7 @@
return pod
def test_pull_records_new_pod(self):
- """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+ """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created
"""
with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
@@ -250,7 +260,7 @@
pull_step.pull_records()
- self.assertEqual(ksi_save.call_count, 1)
+ self.assertEqual(ksi_save.call_count, 2)
saved_ksi = ksi_save.call_args[0][0]
self.assertEqual(saved_ksi.name, "my-pod")
@@ -261,7 +271,7 @@
self.assertEqual(saved_ksi.xos_managed, False)
def test_pull_records_missing_pod(self):
- """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+ """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created
"""
with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
patch.object(KubernetesService.objects, "get_items") as service_objects, \
@@ -281,7 +291,7 @@
deleted_ksi = ksi_delete.call_args[0][0]
def test_pull_records_new_pod_kafka_event(self):
- """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+ """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created
"""
with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
@@ -306,7 +316,7 @@
pod.status.pod_ip = "1.2.3.4"
pull_step = self.pull_step_class()
- pull_step.kafka_producer = "foo"
+
pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod])
pull_step.pull_records()
@@ -330,7 +340,7 @@
self.assertEqual(send_notification.call_args[0][3], "created")
def test_pull_records_existing_pod_kafka_event(self):
- """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance sohuld be created
+ """ A pod is found in k8s that does not exist in XOS. A new KubernetesServiceInstance should be created
"""
with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client), \
patch.object(self.pull_step_class, "get_trustdomain_from_pod") as get_trustdomain, \
@@ -365,7 +375,7 @@
si_objects.return_value = [xos_pod]
pull_step = self.pull_step_class()
- pull_step.kafka_producer = "foo"
+
pull_step.v1core.list_pod_for_all_namespaces.return_value = MagicMock(items=[pod])
pull_step.pull_records()
@@ -390,8 +400,10 @@
def test_send_notification_created(self):
with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
+
pull_step = self.pull_step_class()
- pull_step.kafka_producer = MagicMock()
+
+ from xoskafka import XOSKafkaProducer
pod = self.make_pod("my-pod", self.trust_domain, self.principal, self.image)
pod.status.pod_ip = "1.2.3.4"
@@ -404,13 +416,16 @@
xos_managed=False,
need_event=False,
last_event_sent="created")
+
pull_step.send_notification(xos_pod, pod, "created")
- self.assertEqual(pull_step.kafka_producer.send.call_count, 1)
- topic = pull_step.kafka_producer.send.call_args[0][0]
- event = json.loads(pull_step.kafka_producer.send.call_args[0][1])
+ self.assertEqual(XOSKafkaProducer.produce.call_count, 1)
+ topic = XOSKafkaProducer.produce.call_args[0][0]
+ key = XOSKafkaProducer.produce.call_args[0][1]
+ event = json.loads(XOSKafkaProducer.produce.call_args[0][2])
self.assertEqual(topic, "xos.kubernetes.pod-details")
+ self.assertEqual(key, "my-pod")
self.assertEqual(event["name"], "my-pod")
self.assertEqual(event["status"], "created")
@@ -421,7 +436,8 @@
def test_send_notification_deleted(self):
with patch.object(self.pull_step_class, "init_kubernetes_client", new=fake_init_kubernetes_client):
pull_step = self.pull_step_class()
- pull_step.kafka_producer = MagicMock()
+
+ from xoskafka import XOSKafkaProducer
xos_pod = KubernetesServiceInstance(name="my-pod",
pod_ip="",
@@ -433,11 +449,13 @@
last_event_sent="created")
pull_step.send_notification(xos_pod, None, "deleted")
- self.assertEqual(pull_step.kafka_producer.send.call_count, 1)
- topic = pull_step.kafka_producer.send.call_args[0][0]
- event = json.loads(pull_step.kafka_producer.send.call_args[0][1])
+ self.assertEqual(XOSKafkaProducer.produce.call_count, 1)
+ topic = XOSKafkaProducer.produce.call_args[0][0]
+ key = XOSKafkaProducer.produce.call_args[0][1]
+ event = json.loads(XOSKafkaProducer.produce.call_args[0][2])
self.assertEqual(topic, "xos.kubernetes.pod-details")
+ self.assertEqual(key, "my-pod")
self.assertEqual(event["name"], "my-pod")
self.assertEqual(event["status"], "deleted")