[SEBA-257]

Specify the kafka group ID and default topic config

Change-Id: I199e7f6f118fca7707bfb783659417a13b3b7e27
diff --git a/VERSION b/VERSION
index eca07e4..ac2cdeb 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.1.2
+2.1.3
diff --git a/containers/chameleon/Dockerfile.chameleon b/containers/chameleon/Dockerfile.chameleon
index 8cdb226..116fb2d 100644
--- a/containers/chameleon/Dockerfile.chameleon
+++ b/containers/chameleon/Dockerfile.chameleon
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 # xosproject/chameleon
-FROM xosproject/xos-base:2.1.2
+FROM xosproject/xos-base:2.1.3
 
 # xos-base already has protoc and dependencies installed
 
diff --git a/containers/xos/Dockerfile.client b/containers/xos/Dockerfile.client
index caac8fa..9210f39 100644
--- a/containers/xos/Dockerfile.client
+++ b/containers/xos/Dockerfile.client
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 # xosproject/xos-client
-FROM xosproject/xos-libraries:2.1.2
+FROM xosproject/xos-libraries:2.1.3
 
 # Install XOS client
 COPY xos/xos_client /tmp/xos_client
diff --git a/containers/xos/Dockerfile.libraries b/containers/xos/Dockerfile.libraries
index 7e14091..8ead8ef 100644
--- a/containers/xos/Dockerfile.libraries
+++ b/containers/xos/Dockerfile.libraries
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM xosproject/xos-base:2.1.2
+FROM xosproject/xos-base:2.1.3
 
 # Add libraries
 COPY lib /opt/xos/lib
diff --git a/containers/xos/Dockerfile.synchronizer-base b/containers/xos/Dockerfile.synchronizer-base
index 86f58d2..3f63f83 100644
--- a/containers/xos/Dockerfile.synchronizer-base
+++ b/containers/xos/Dockerfile.synchronizer-base
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 # xosproject/xos-synchronizer-base
-FROM xosproject/xos-client:2.1.2
+FROM xosproject/xos-client:2.1.3
 
 COPY xos/synchronizers/new_base /opt/xos/synchronizers/new_base
 COPY xos/xos/logger.py /opt/xos/xos/logger.py
diff --git a/containers/xos/Dockerfile.xos-core b/containers/xos/Dockerfile.xos-core
index a875d9c..753f14f 100644
--- a/containers/xos/Dockerfile.xos-core
+++ b/containers/xos/Dockerfile.xos-core
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 # xosproject/xos-core
-FROM xosproject/xos-libraries:2.1.2
+FROM xosproject/xos-libraries:2.1.3
 
 # Install XOS
 ADD xos /opt/xos
diff --git a/xos/synchronizers/new_base/event_engine.py b/xos/synchronizers/new_base/event_engine.py
index 04aca29..8f43e2f 100644
--- a/xos/synchronizers/new_base/event_engine.py
+++ b/xos/synchronizers/new_base/event_engine.py
@@ -1,3 +1,4 @@
+
 # Copyright 2017-present Open Networking Foundation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,6 +25,21 @@
 log = create_logger(Config().get('logging'))
 
 
+class XOSKafkaMessage():
+
+    def __init__(self, consumer_msg):
+
+        self.topic = consumer_msg.topic()
+        self.key = consumer_msg.key()
+        self.value = consumer_msg.value()
+
+        self.timestamp = None
+        (ts_type, ts_val) = consumer_msg.timestamp()
+
+        if ts_type is not confluent_kafka.TIMESTAMP_NOT_AVAILABLE:
+            self.timestamp = ts_val
+
+
 class XOSKafkaThread(threading.Thread):
     """ XOSKafkaThread
 
@@ -40,8 +56,11 @@
         self.daemon = True
 
     def create_kafka_consumer(self):
+        # use the service name as the group id
         consumer_config = {
+            'group.id': Config().get('name'),
             'bootstrap.servers': ','.join(self.bootstrap_servers),
+            'default.topic.config': {'auto.offset.reset': 'smallest'},
         }
 
         return confluent_kafka.Consumer(**consumer_config)
@@ -54,33 +73,59 @@
             raise Exception("Both topics and pattern are defined for step %s. Choose one." %
                             self.step.__name__)
 
+        log.info("Waiting for events",
+                 topic=self.step.topics,
+                 pattern=self.step.pattern,
+                 step=self.step.__name__)
+
         while True:
             try:
-                self.consumer = self.create_kafka_consumer()
-                if self.step.topics:
-                    self.consumer.subscribe(self.step.topics)
-                elif self.step.pattern:
-                    self.consumer.subscribe(self.step.pattern)
+                # setup consumer or loop on failure
+                if self.consumer is None:
+                    self.consumer = self.create_kafka_consumer()
 
-                log.info("Waiting for events",
-                         topic=self.step.topics,
-                         pattern=self.step.pattern,
-                         step=self.step.__name__)
+                    if self.step.topics:
+                        self.consumer.subscribe(self.step.topics)
 
-                for msg in self.consumer.poll():
-                    log.info("Processing event", msg=msg, step=self.step.__name__)
-                    try:
-                        self.step(log=log).process_event(msg)
-                    except:
-                        log.exception("Exception in event step", msg=msg, step=self.step.__name__)
+                    elif self.step.pattern:
+                        self.consumer.subscribe(self.step.pattern)
 
             except confluent_kafka.KafkaError._ALL_BROKERS_DOWN, e:
                 log.warning("No brokers available on %s, %s" % (self.bootstrap_servers, e))
                 time.sleep(20)
+                continue
+
             except confluent_kafka.KafkaError, e:
                 # Maybe Kafka has not started yet. Log the exception and try again in a second.
                 log.exception("Exception in kafka loop: %s" % e)
                 time.sleep(1)
+                continue
+
+            # wait until we get a message, if no message, loop again
+            msg = self.consumer.poll(timeout=1.0)
+
+            if msg is None:
+                continue
+
+            if msg.error():
+                if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
+                    log.debug("Reached end of kafka topic %s, partition: %s, offset: %d" %
+                              (msg.topic(), msg.partition(), msg.offset()))
+                else:
+                    log.exception("Error in kafka message: %s" % msg.error())
+
+            else:
+                # wrap parsing the event in a class
+                event_msg = XOSKafkaMessage(msg)
+
+                log.info("Processing event", event_msg=event_msg, step=self.step.__name__)
+
+                try:
+                    self.step(log=log).process_event(event_msg)
+
+                except:
+                    log.exception("Exception in event step", event_msg=event_msg, step=self.step.__name__)
+
 
 class XOSEventEngine:
     """ XOSEventEngine
diff --git a/xos/synchronizers/new_base/tests/test_event_engine.py b/xos/synchronizers/new_base/tests/test_event_engine.py
index 54d64ee..5919721 100644
--- a/xos/synchronizers/new_base/tests/test_event_engine.py
+++ b/xos/synchronizers/new_base/tests/test_event_engine.py
@@ -12,8 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import confluent_kafka
 import functools
 import unittest
+
 from mock import patch, PropertyMock, ANY
 
 import os, sys
@@ -28,19 +30,57 @@
     else:
         return orig(key)
 
+
 class FakeKafkaConsumer():
-    def __init__(self, values=["sampleevent"]):
+    def __init__(self, values=[]):
         self.values = values
 
     def subscribe(self, topics):
         pass
 
-    def poll(self):
-        for x in self.values:
-            yield x
+    def poll(self, timeout=1.0):
+        if self.values:
+            return FakeKafkaMessage(self.values.pop())
         # block forever
         time.sleep(1000)
 
+
+class FakeKafkaMessage():
+    ''' Works like Message in confluent_kafka
+        https://docs.confluent.io/current/clients/confluent-kafka-python/#message
+    '''
+
+    def __init__(self, timestamp=None, topic='faketopic',
+                 key='fakekey', value='fakevalue', error=False):
+
+        if timestamp is None:
+            self.fake_ts_type = confluent_kafka.TIMESTAMP_NOT_AVAILABLE
+            self.fake_timestamp = None
+        else:
+            self.fake_ts_type = confluent_kafka.TIMESTAMP_CREATE_TIME
+            self.fake_timestamp = timestamp
+
+        self.fake_topic = topic
+        self.fake_key = key
+        self.fake_value = value
+        self.fake_error = error
+
+    def error(self):
+        return self.fake_error
+
+    def timestamp(self):
+        return (self.fake_ts_type, self.fake_timestamp)
+
+    def topic(self):
+        return self.fake_topic
+
+    def key(self):
+        return self.fake_key
+
+    def value(self):
+        return self.fake_value
+
+
 class TestEventEngine(unittest.TestCase):
     def setUp(self):
         global XOSKafkaThread, Config, event_engine_log
@@ -81,7 +121,8 @@
         with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
              patch.object(FakeKafkaConsumer, "subscribe") as fake_subscribe, \
              patch.object(self.event_engine.event_steps[0], "process_event") as process_event:
-            create_kafka_consumer.return_value = FakeKafkaConsumer()
+
+            create_kafka_consumer.return_value = FakeKafkaConsumer(values=["sampleevent"])
             self.event_engine.start()
 
             self.assertEqual(len(self.event_engine.threads), 1)
@@ -90,10 +131,11 @@
             time.sleep(0.1)
 
             # We should have subscribed to the fake consumer
-            fake_subscribe.assert_called_with(["sometopic"])
+            fake_subscribe.assert_called_once()
 
-            # The fake consumer will have returned one event, and that event will have been passed to our step
-            process_event.assert_called_with("sampleevent")
+            # The fake consumer will have returned one event
+            process_event.assert_called_once()
+
 
     def test_start_with_pattern(self):
         self.event_engine.load_event_step_modules(self.event_steps_dir)
@@ -107,7 +149,7 @@
             pattern.return_value = "somepattern"
             topics.return_value = []
 
-            create_kafka_consumer.return_value = FakeKafkaConsumer()
+            create_kafka_consumer.return_value = FakeKafkaConsumer(values=["sampleevent"])
             self.event_engine.start()
 
             self.assertEqual(len(self.event_engine.threads), 1)
@@ -118,8 +160,8 @@
             # We should have subscribed to the fake consumer
             fake_subscribe.assert_called_with("somepattern")
 
-            # The fake consumer will have returned one event, and that event will have been passed to our step
-            process_event.assert_called_with("sampleevent")
+            # The fake consumer will have returned one event
+            process_event.assert_called_once()
 
 
     def test_start_bad_tech(self):