Adding delay if kafka bus is not found
Change-Id: Ia0e0a1ef2a56bd1905a5d1aee1571d8064a5a137
diff --git a/xos/synchronizers/new_base/event_engine.py b/xos/synchronizers/new_base/event_engine.py
index 60cdcbb..ad4a13b 100644
--- a/xos/synchronizers/new_base/event_engine.py
+++ b/xos/synchronizers/new_base/event_engine.py
@@ -20,6 +20,7 @@
import time
from xosconfig import Config
from multistructlog import create_logger
+from kafka.errors import NoBrokersAvailable
log = create_logger(Config().get('logging'))
@@ -69,6 +70,9 @@
self.step(log=log).process_event(msg)
except:
log.exception("Exception in event step", msg=msg, step=self.step.__name__)
+ except NoBrokersAvailable:
+ log.warning("No brokers available on %s" % self.bootstrap_servers)
+ time.sleep(20)
except:
# Maybe Kafka has not started yet. Log the exception and try again in a second.
log.exception("Exception in kafka loop")
diff --git a/xos/synchronizers/new_base/tests/test_event_engine.py b/xos/synchronizers/new_base/tests/test_event_engine.py
index 1424643..4e1c984 100644
--- a/xos/synchronizers/new_base/tests/test_event_engine.py
+++ b/xos/synchronizers/new_base/tests/test_event_engine.py
@@ -15,8 +15,7 @@
import functools
import unittest
from mock import patch, PropertyMock, ANY
-import mock
-import pdb
+from kafka.errors import NoBrokersAvailable
import os, sys
import time
@@ -43,6 +42,12 @@
# block forever
time.sleep(1000)
+class MockKafkaError:
+ NoBrokersAvailable = Exception
+
+class MockKafka:
+ error = MockKafkaError
+
class TestEventEngine(unittest.TestCase):
def setUp(self):
global XOSKafkaThread, Config, event_engine_log
@@ -53,7 +58,7 @@
sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base', 'tests', 'event_steps'))
- config = os.path.join(test_path, "test_config.yaml")
+ config = os.path.join(test_path, "test_config.yaml")
from xosconfig import Config
Config.clear()
Config.init(config, 'synchronizer-config-schema.yaml')
@@ -123,6 +128,16 @@
# The fake consumer will have returned one event, and that event will have been passed to our step
process_event.assert_called_with("sampleevent")
+ def _test_start_no_bus(self):
+ self.event_engine.load_event_step_modules(self.event_steps_dir)
+ with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
+ patch.object(event_engine_log, "warning") as log_warning:
+
+ create_kafka_consumer.side_effect = NoBrokersAvailable()
+ self.event_engine.start()
+
+ log_warning.assert_called()
+
def test_start_bad_tech(self):
""" Set an unknown Technology in the event_step. XOSEventEngine.start() should print an error message and
not create any threads.