[SEBA-257]
Specify the kafka group ID and default topic config
Change-Id: I199e7f6f118fca7707bfb783659417a13b3b7e27
diff --git a/VERSION b/VERSION
index eca07e4..ac2cdeb 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.1.2
+2.1.3
diff --git a/containers/chameleon/Dockerfile.chameleon b/containers/chameleon/Dockerfile.chameleon
index 8cdb226..116fb2d 100644
--- a/containers/chameleon/Dockerfile.chameleon
+++ b/containers/chameleon/Dockerfile.chameleon
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/chameleon
-FROM xosproject/xos-base:2.1.2
+FROM xosproject/xos-base:2.1.3
# xos-base already has protoc and dependencies installed
diff --git a/containers/xos/Dockerfile.client b/containers/xos/Dockerfile.client
index caac8fa..9210f39 100644
--- a/containers/xos/Dockerfile.client
+++ b/containers/xos/Dockerfile.client
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-client
-FROM xosproject/xos-libraries:2.1.2
+FROM xosproject/xos-libraries:2.1.3
# Install XOS client
COPY xos/xos_client /tmp/xos_client
diff --git a/containers/xos/Dockerfile.libraries b/containers/xos/Dockerfile.libraries
index 7e14091..8ead8ef 100644
--- a/containers/xos/Dockerfile.libraries
+++ b/containers/xos/Dockerfile.libraries
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-FROM xosproject/xos-base:2.1.2
+FROM xosproject/xos-base:2.1.3
# Add libraries
COPY lib /opt/xos/lib
diff --git a/containers/xos/Dockerfile.synchronizer-base b/containers/xos/Dockerfile.synchronizer-base
index 86f58d2..3f63f83 100644
--- a/containers/xos/Dockerfile.synchronizer-base
+++ b/containers/xos/Dockerfile.synchronizer-base
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-synchronizer-base
-FROM xosproject/xos-client:2.1.2
+FROM xosproject/xos-client:2.1.3
COPY xos/synchronizers/new_base /opt/xos/synchronizers/new_base
COPY xos/xos/logger.py /opt/xos/xos/logger.py
diff --git a/containers/xos/Dockerfile.xos-core b/containers/xos/Dockerfile.xos-core
index a875d9c..753f14f 100644
--- a/containers/xos/Dockerfile.xos-core
+++ b/containers/xos/Dockerfile.xos-core
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-core
-FROM xosproject/xos-libraries:2.1.2
+FROM xosproject/xos-libraries:2.1.3
# Install XOS
ADD xos /opt/xos
diff --git a/xos/synchronizers/new_base/event_engine.py b/xos/synchronizers/new_base/event_engine.py
index 04aca29..8f43e2f 100644
--- a/xos/synchronizers/new_base/event_engine.py
+++ b/xos/synchronizers/new_base/event_engine.py
@@ -1,3 +1,4 @@
+
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,6 +25,21 @@
log = create_logger(Config().get('logging'))
+class XOSKafkaMessage():
+
+ def __init__(self, consumer_msg):
+
+ self.topic = consumer_msg.topic()
+ self.key = consumer_msg.key()
+ self.value = consumer_msg.value()
+
+ self.timestamp = None
+ (ts_type, ts_val) = consumer_msg.timestamp()
+
+ if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
+ self.timestamp = ts_val
+
+
class XOSKafkaThread(threading.Thread):
""" XOSKafkaThread
@@ -40,8 +56,11 @@
self.daemon = True
def create_kafka_consumer(self):
+ # use the service name as the group id
consumer_config = {
+ 'group.id': Config().get('name'),
'bootstrap.servers': ','.join(self.bootstrap_servers),
+ 'default.topic.config': {'auto.offset.reset': 'smallest'},
}
return confluent_kafka.Consumer(**consumer_config)
@@ -54,33 +73,59 @@
raise Exception("Both topics and pattern are defined for step %s. Choose one." %
self.step.__name__)
+ log.info("Waiting for events",
+ topic=self.step.topics,
+ pattern=self.step.pattern,
+ step=self.step.__name__)
+
while True:
try:
- self.consumer = self.create_kafka_consumer()
- if self.step.topics:
- self.consumer.subscribe(self.step.topics)
- elif self.step.pattern:
- self.consumer.subscribe(self.step.pattern)
+ # setup consumer or loop on failure
+ if self.consumer is None:
+ self.consumer = self.create_kafka_consumer()
- log.info("Waiting for events",
- topic=self.step.topics,
- pattern=self.step.pattern,
- step=self.step.__name__)
+ if self.step.topics:
+ self.consumer.subscribe(self.step.topics)
- for msg in self.consumer.poll():
- log.info("Processing event", msg=msg, step=self.step.__name__)
- try:
- self.step(log=log).process_event(msg)
- except:
- log.exception("Exception in event step", msg=msg, step=self.step.__name__)
+ elif self.step.pattern:
+ self.consumer.subscribe(self.step.pattern)
except confluent_kafka.KafkaError._ALL_BROKERS_DOWN, e:
log.warning("No brokers available on %s, %s" % (self.bootstrap_servers, e))
time.sleep(20)
+ continue
+
except confluent_kafka.KafkaError, e:
# Maybe Kafka has not started yet. Log the exception and try again in a second.
log.exception("Exception in kafka loop: %s" % e)
time.sleep(1)
+ continue
+
+ # wait until we get a message, if no message, loop again
+ msg = self.consumer.poll(timeout=1.0)
+
+ if msg is None:
+ continue
+
+ if msg.error():
+ if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
+ log.debug("Reached end of kafka topic %s, partition: %s, offset: %d" %
+ (msg.topic(), msg.partition(), msg.offset()))
+ else:
+ log.exception("Error in kafka message: %s" % msg.error())
+
+ else:
+ # wrap parsing the event in a class
+ event_msg = XOSKafkaMessage(msg)
+
+ log.info("Processing event", event_msg=event_msg, step=self.step.__name__)
+
+ try:
+ self.step(log=log).process_event(event_msg)
+
+ except:
+ log.exception("Exception in event step", event_msg=event_msg, step=self.step.__name__)
+
class XOSEventEngine:
""" XOSEventEngine
diff --git a/xos/synchronizers/new_base/tests/test_event_engine.py b/xos/synchronizers/new_base/tests/test_event_engine.py
index 54d64ee..5919721 100644
--- a/xos/synchronizers/new_base/tests/test_event_engine.py
+++ b/xos/synchronizers/new_base/tests/test_event_engine.py
@@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import confluent_kafka
import functools
import unittest
+
from mock import patch, PropertyMock, ANY
import os, sys
@@ -28,19 +30,57 @@
else:
return orig(key)
+
class FakeKafkaConsumer():
- def __init__(self, values=["sampleevent"]):
+ def __init__(self, values=[]):
self.values = values
def subscribe(self, topics):
pass
- def poll(self):
- for x in self.values:
- yield x
+ def poll(self, timeout=1.0):
+ if self.values:
+ return FakeKafkaMessage(self.values.pop())
# block forever
time.sleep(1000)
+
+class FakeKafkaMessage():
+ ''' Works like Message in confluent_kafka
+ https://docs.confluent.io/current/clients/confluent-kafka-python/#message
+ '''
+
+ def __init__(self, timestamp=None, topic='faketopic',
+ key='fakekey', value='fakevalue', error=False):
+
+ if timestamp is None:
+ self.fake_ts_type = confluent_kafka.TIMESTAMP_NOT_AVAILABLE
+ self.fake_timestamp = None
+ else:
+ self.fake_ts_type = confluent_kafka.TIMESTAMP_CREATE_TIME
+ self.fake_timestamp = timestamp
+
+ self.fake_topic = topic
+ self.fake_key = key
+ self.fake_value = value
+ self.fake_error = error
+
+ def error(self):
+ return self.fake_error
+
+ def timestamp(self):
+ return (self.fake_ts_type, self.fake_timestamp)
+
+ def topic(self):
+ return self.fake_topic
+
+ def key(self):
+ return self.fake_key
+
+ def value(self):
+ return self.fake_value
+
+
class TestEventEngine(unittest.TestCase):
def setUp(self):
global XOSKafkaThread, Config, event_engine_log
@@ -81,7 +121,8 @@
with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
patch.object(FakeKafkaConsumer, "subscribe") as fake_subscribe, \
patch.object(self.event_engine.event_steps[0], "process_event") as process_event:
- create_kafka_consumer.return_value = FakeKafkaConsumer()
+
+ create_kafka_consumer.return_value = FakeKafkaConsumer(values=["sampleevent"])
self.event_engine.start()
self.assertEqual(len(self.event_engine.threads), 1)
@@ -90,10 +131,11 @@
time.sleep(0.1)
# We should have subscribed to the fake consumer
- fake_subscribe.assert_called_with(["sometopic"])
+ fake_subscribe.assert_called_once()
- # The fake consumer will have returned one event, and that event will have been passed to our step
- process_event.assert_called_with("sampleevent")
+ # The fake consumer will have returned one event
+ process_event.assert_called_once()
+
def test_start_with_pattern(self):
self.event_engine.load_event_step_modules(self.event_steps_dir)
@@ -107,7 +149,7 @@
pattern.return_value = "somepattern"
topics.return_value = []
- create_kafka_consumer.return_value = FakeKafkaConsumer()
+ create_kafka_consumer.return_value = FakeKafkaConsumer(values=["sampleevent"])
self.event_engine.start()
self.assertEqual(len(self.event_engine.threads), 1)
@@ -118,8 +160,8 @@
# We should have subscribed to the fake consumer
fake_subscribe.assert_called_with("somepattern")
- # The fake consumer will have returned one event, and that event will have been passed to our step
- process_event.assert_called_with("sampleevent")
+ # The fake consumer will have returned one event
+ process_event.assert_called_once()
def test_start_bad_tech(self):