[SEBA-232]

Publish XOS internal events on kafka as well as redis

Switch to confluent_kafka, create XOSKafkaProducer wrapper lib

Remove nonfunctional test for connection failure

Change-Id: I4d3057fcc0b5b56022ef3f853dbe0323ef071af7
diff --git a/VERSION b/VERSION
index 7c7e096..eca07e4 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.1.1-dev
+2.1.2
diff --git a/containers/chameleon/Dockerfile.chameleon b/containers/chameleon/Dockerfile.chameleon
index 5fc21f3..8cdb226 100644
--- a/containers/chameleon/Dockerfile.chameleon
+++ b/containers/chameleon/Dockerfile.chameleon
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 # xosproject/chameleon
-FROM xosproject/xos-base:2.0.0
+FROM xosproject/xos-base:2.1.2
 
 # xos-base already has protoc and dependencies installed
 
diff --git a/containers/xos/Dockerfile.client b/containers/xos/Dockerfile.client
index 8ae9b17..caac8fa 100644
--- a/containers/xos/Dockerfile.client
+++ b/containers/xos/Dockerfile.client
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 # xosproject/xos-client
-FROM xosproject/xos-libraries:2.1.0
+FROM xosproject/xos-libraries:2.1.2
 
 # Install XOS client
 COPY xos/xos_client /tmp/xos_client
diff --git a/containers/xos/Dockerfile.libraries b/containers/xos/Dockerfile.libraries
index cff084e..7e14091 100644
--- a/containers/xos/Dockerfile.libraries
+++ b/containers/xos/Dockerfile.libraries
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-FROM xosproject/xos-base:2.1.0
+FROM xosproject/xos-base:2.1.2
 
 # Add libraries
 COPY lib /opt/xos/lib
@@ -22,6 +22,7 @@
 RUN cd /opt/xos/lib/xos-util && python setup.py install \
  && cd /opt/xos/lib/xos-config && python setup.py install \
  && cd /opt/xos/lib/xos-genx && python setup.py install \
+ && cd /opt/xos/lib/xos-kafka && python setup.py install \
  && cd /opt/xos/lib/kafkaloghandler && python setup.py install
 
 # Label image
diff --git a/containers/xos/Dockerfile.synchronizer-base b/containers/xos/Dockerfile.synchronizer-base
index 350a8d8..86f58d2 100644
--- a/containers/xos/Dockerfile.synchronizer-base
+++ b/containers/xos/Dockerfile.synchronizer-base
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 # xosproject/xos-synchronizer-base
-FROM xosproject/xos-client:2.1.0
+FROM xosproject/xos-client:2.1.2
 
 COPY xos/synchronizers/new_base /opt/xos/synchronizers/new_base
 COPY xos/xos/logger.py /opt/xos/xos/logger.py
diff --git a/containers/xos/Dockerfile.xos-core b/containers/xos/Dockerfile.xos-core
index 28d07dc..a875d9c 100644
--- a/containers/xos/Dockerfile.xos-core
+++ b/containers/xos/Dockerfile.xos-core
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 # xosproject/xos-core
-FROM xosproject/xos-libraries:2.1.0
+FROM xosproject/xos-libraries:2.1.2
 
 # Install XOS
 ADD xos /opt/xos
diff --git a/containers/xos/pip_requested.txt b/containers/xos/pip_requested.txt
index 6adb9dc..cd6a85b 100644
--- a/containers/xos/pip_requested.txt
+++ b/containers/xos/pip_requested.txt
@@ -6,6 +6,7 @@
 Werkzeug==0.14.1
 ansible==2.5.0
 astunparse==1.5.0
+confluent-kafka==0.11.5
 django-extensions==2.0.6
 django-timezones==0.2
 djangorestframework==3.7.7
diff --git a/containers/xos/pip_requirements.txt b/containers/xos/pip_requirements.txt
index 41f3715..97ed1dd 100644
--- a/containers/xos/pip_requirements.txt
+++ b/containers/xos/pip_requirements.txt
@@ -25,6 +25,7 @@
 cliff==2.11.0
 cmd2==0.8.2
 colorama==0.3.9
+confluent-kafka==0.11.5
 constantly==15.1.0
 contextlib2==0.5.5
 cryptography==2.2.2
diff --git a/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
index d7e5352..b5a2a06 100644
--- a/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
+++ b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
@@ -17,7 +17,7 @@
 # kafkaloghandler - logging handler that sends to Kafka
 
 import json
-import kafka
+import confluent_kafka
 import logging
 import sys
 import time
@@ -36,33 +36,30 @@
 
         logging.Handler.__init__(self)
 
-        try:
-            self.producer = kafka.KafkaProducer(
-                bootstrap_servers=bootstrap_servers,
-
-                # Replace unserializable items with repr version.
-                # Otherwise, the entire log message is discarded if
-                # it contains any unserializable fields
-                value_serializer=lambda val: json.dumps(
-                    val,
-                    separators=(',', ':'),
-                    default=lambda o: repr(o),
-                    )
-                )
-
-        except kafka.errors.KafkaError, e:
-            print "Kafka Error: %s" % e
-            # die if there's an error
-            sys.exit(1)
-
+        self.bootstrap_servers = bootstrap_servers
         self.topic = topic
         self.key = key
         self.flatten = flatten
         self.blacklist = blacklist
         self.timeout = timeout
+        self.producer = None
+
+    def _connect(self):
+
+        try:
+            producer_config = {
+                'bootstrap.servers': ','.join(self.bootstrap_servers),
+            }
+
+            self.producer = confluent_kafka.Producer(**producer_config)
+
+        except confluent_kafka.KafkaError, e:
+            print "Kafka Error: %s" % e
+            # die if there's an error
+            sys.exit(1)
 
     def _flatten(self, ns, toflatten, maxdepth):
-        """ flatten dicts creating a key_subkey_subsubkey_... hierarchy """
+        """ flatten dicts creating a key.subkey.subsubkey... hierarchy """
 
         # avoid recursivly flattening forever
         if maxdepth < 1:
@@ -72,7 +69,7 @@
 
         for k, v in toflatten.iteritems():
 
-            prefix = "%s_%s" % (ns, k)
+            prefix = "%s.%s" % (ns, k)
 
             if isinstance(v, dict):
                 flattened.update(self._flatten(prefix, v, maxdepth-1))
@@ -97,13 +94,30 @@
 
             recvars[k] = v
 
-        self.producer.send(self.topic, key=self.key, value=recvars)
+        # Replace unserializable items with repr version.
+        # Otherwise, the log message may be discarded if it contains any
+        # unserializable fields
+        json_recvars = json.dumps(
+            recvars,
+            separators=(',', ':'),
+            default=lambda o: repr(o),
+            )
+
+        if self.producer is None:
+            self._connect()
+
+        try:
+            self.producer.produce(self.topic, json_recvars, self.key)
+
+        except confluent_kafka.KafkaError, e:
+            print "Kafka Error: %s" % e
+            # currently don't do anything on failure...
+            pass
 
     def flush(self):
-        self.producer.flush(self.timeout)
 
-    def close(self):
-        self.producer.close(self.timeout)
+        if self.producer:
+            self.producer.flush(self.timeout)
 
 
 if __name__ == '__main__':
diff --git a/lib/kafkaloghandler/setup.py b/lib/kafkaloghandler/setup.py
index 9dbcbb2..cba89ff 100644
--- a/lib/kafkaloghandler/setup.py
+++ b/lib/kafkaloghandler/setup.py
@@ -38,7 +38,7 @@
     packages=['kafkaloghandler'],
     license='Apache v2',
     install_requires=[
-        'kafka>=1.3.5',
+        'confluent-kafka>=0.11.5',
         ],
     include_package_data=True,
     zip_safe=False,
diff --git a/lib/xos-config/xosconfig/default.py b/lib/xos-config/xosconfig/default.py
index 36767e3..2c4e6d9 100644
--- a/lib/xos-config/xosconfig/default.py
+++ b/lib/xos-config/xosconfig/default.py
@@ -1,4 +1,3 @@
-
 # Copyright 2017-present Open Networking Foundation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 DEFAULT_VALUES = {
     'xos_dir': '/opt/xos',
 
@@ -62,5 +60,6 @@
     },
     'node_key': '/opt/cord_profile/node_key',
     'config_dir': '/etc/xos/sync',
-    'backoff_disabled': True
+    'backoff_disabled': True,
+    'kafka_bootstrap_servers': ['cord-kafka:9092'],
 }
diff --git a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
index 37876ad..a9ae095 100644
--- a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
+++ b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
@@ -1,4 +1,3 @@
-
 # Copyright 2017-present Open Networking Foundation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 map:
   name:
     type: str
@@ -78,6 +76,10 @@
       kind:
         type: str
         required: False
+  kafka_bootstrap_servers:
+    type: seq
+    sequence:
+      - type: str
   event_bus:
     type: map
     required: False
diff --git a/lib/xos-config/xosconfig/xos-config-schema.yaml b/lib/xos-config/xosconfig/xos-config-schema.yaml
index d897e76..0705630 100644
--- a/lib/xos-config/xosconfig/xos-config-schema.yaml
+++ b/lib/xos-config/xosconfig/xos-config-schema.yaml
@@ -40,3 +40,7 @@
     type: any
   xos_dir:
     type: str
+  kafka_bootstrap_servers:
+    type: seq
+    sequence:
+      - type: str
diff --git a/lib/xos-kafka/MANIFEST.in b/lib/xos-kafka/MANIFEST.in
new file mode 100644
index 0000000..9561fb1
--- /dev/null
+++ b/lib/xos-kafka/MANIFEST.in
@@ -0,0 +1 @@
+include README.rst
diff --git a/lib/xos-kafka/README.rst b/lib/xos-kafka/README.rst
new file mode 100644
index 0000000..c4e5937
--- /dev/null
+++ b/lib/xos-kafka/README.rst
@@ -0,0 +1,4 @@
+XOSKafka
+========
+
+Wraps Kafka libraries, handling errors and aggregating connection to producer
diff --git a/lib/xos-kafka/setup.py b/lib/xos-kafka/setup.py
new file mode 100644
index 0000000..4486f19
--- /dev/null
+++ b/lib/xos-kafka/setup.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xosutil.autoversion_setup import setup_with_auto_version
+from xosutil.version import __version__
+
+
+def readme():
+    with open('README.rst') as f:
+        return f.read()
+
+
+setup_with_auto_version(
+    name='xoskafka',
+    version=__version__,
+    description='Wrapper around kafka for XOS',
+    long_description=readme(),
+    classifiers=[
+        'License :: OSI Approved :: Apache Software License',
+    ],
+    author='Zack Williams',
+    author_email='zdw@opennetworking.org',
+    packages=['xoskafka'],
+    license='Apache v2',
+    install_requires=[
+        'confluent-kafka>=0.11.5',
+        'xosconfig>=2.1.0',
+        'multistructlog>=1.5',
+        ],
+    include_package_data=True,
+    zip_safe=False,
+    )
diff --git a/lib/xos-kafka/xoskafka/__init__.py b/lib/xos-kafka/xoskafka/__init__.py
new file mode 100644
index 0000000..69f5a32
--- /dev/null
+++ b/lib/xos-kafka/xoskafka/__init__.py
@@ -0,0 +1,15 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .xoskafkaproducer import XOSKafkaProducer
diff --git a/lib/xos-kafka/xoskafka/xoskafkaproducer.py b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
new file mode 100644
index 0000000..6781311
--- /dev/null
+++ b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
@@ -0,0 +1,75 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" XOSKafkaProducer """
+
+import confluent_kafka
+
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get('logging'))
+
+kafka_producer = None
+
+
+class XOSKafkaProducer:
+    """ XOSKafkaProducer
+        Wrapper to share Kafka Producer connection
+    """
+
+    @staticmethod
+    def init():
+
+        global kafka_producer
+
+        if kafka_producer:
+            raise Exception('XOSKafkaProducer already initialized')
+
+        else:
+            log.info('Connecting to Kafka with bootstrap servers: %s' %
+                     Config.get('kafka_bootstrap_servers'))
+
+            try:
+                producer_config = {
+                    'bootstrap.servers':
+                        ','.join(Config.get('kafka_bootstrap_servers')),
+                }
+
+                kafka_producer = confluent_kafka.Producer(**producer_config)
+
+                log.info('Connected to Kafka: %s' % kafka_producer)
+
+            except confluent_kafka.KafkaError, e:
+                log.exception("Kafka Error: %s" % e)
+
+    @classmethod
+    def produce(cls, topic, key, value):
+
+        try:
+            kafka_producer.produce(
+                topic,
+                value,
+                key,
+                callback=cls._kafka_delivery_callback
+                )
+
+        except confluent_kafka.KafkaError, err:
+            log.exception("Kafka Error", err)
+
+    @staticmethod
+    def _kafka_delivery_callback(err, msg):
+        if err:
+            log.error('Message failed delivery: %s' % err)
+        else:
+            log.trace('Message delivered', message=msg)
diff --git a/scripts/setup_venv.sh b/scripts/setup_venv.sh
index 99717e1..feab72a 100755
--- a/scripts/setup_venv.sh
+++ b/scripts/setup_venv.sh
@@ -46,11 +46,16 @@
 echo "xos-config Installed"
 popd
 
-pushd "$XOS_DIR//lib/xos-genx"
+pushd "$XOS_DIR/lib/xos-genx"
 python setup.py install
 echo "xos-genx Installed"
 popd
 
+pushd "$XOS_DIR/lib/xos-kafka"
+python setup.py install
+echo "xos-kafka Installed"
+popd
+
 pushd "$XOS_DIR/xos/xos_client"
 make
 echo "xos-client Installed"
diff --git a/scripts/xos_dev_reqs.txt b/scripts/xos_dev_reqs.txt
index 1680477..5f1f9c3 100644
--- a/scripts/xos_dev_reqs.txt
+++ b/scripts/xos_dev_reqs.txt
@@ -4,10 +4,10 @@
 Twisted==16.6.0
 astunparse==1.5.0
 colorama==0.3.9
+confluent-kafka==0.11.5
 grpcio-tools==1.12.0
 grpcio==1.12.0
 ipaddress==1.0.19
-kafka==1.3.5
 multistructlog==1.5
 netaddr==0.7.19
 networkx==1.11
diff --git a/unittest.cfg b/unittest.cfg
index f37d783..65eba41 100644
--- a/unittest.cfg
+++ b/unittest.cfg
@@ -5,3 +5,9 @@
                  xos-config
                  coreapi
 
+[coverage]
+always-on = True
+coverage = xos
+           lib
+coverage-report = term
+coverage-report = xml
diff --git a/xos/core/models/user.py b/xos/core/models/user.py
index 4bd0e0f..63510c7 100644
--- a/xos/core/models/user.py
+++ b/xos/core/models/user.py
@@ -39,9 +39,6 @@
 # currently generate the User models.
 import security
 
-import redis
-from redis import ConnectionError
-
 # Create your models here.
 
 
@@ -309,7 +306,7 @@
 
         super(User, self).save(*args, **kwargs)
 
-        self.push_redis_event()
+        self.push_messagebus_event()
 
         self._initial = self._dict
 
diff --git a/xos/core/models/xosbase.py b/xos/core/models/xosbase.py
index ab02987..8cd1d4f 100644
--- a/xos/core/models/xosbase.py
+++ b/xos/core/models/xosbase.py
@@ -50,7 +50,7 @@
         if (purge):
             pk = self.pk
             super(XOSBase, self).delete(*args, **kwds)
-            self.push_redis_event(deleted=True, pk=pk)
+            self.push_messagebus_event(deleted=True, pk=pk)
         else:
             if (not self.write_protect ):
                 self.deleted = True
@@ -205,7 +205,7 @@
             self.verify_live_keys(update_fields = update_fields)
             super(XOSBase, self).save(*args, **kwargs)
 
-        self.push_redis_event()
+        self.push_messagebus_event()
 
         self._initial = self._dict
 
diff --git a/xos/core/models/xosbase_header.py b/xos/core/models/xosbase_header.py
index 0e3a5bb..da40d16 100644
--- a/xos/core/models/xosbase_header.py
+++ b/xos/core/models/xosbase_header.py
@@ -1,4 +1,3 @@
-
 # Copyright 2017-present Open Networking Foundation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 import datetime
 import time
 import calendar
@@ -39,6 +37,8 @@
 import redis
 from redis import ConnectionError
 
+from xoskafka import XOSKafkaProducer
+
 from xosconfig import Config
 from multistructlog import create_logger
 
@@ -260,8 +260,8 @@
                 return
         raise Exception("Field value %s is not in %s" % (field, str(choices)))
 
-    def serialize_for_redis(self):
-        """ Serialize the object for posting to redis.
+    def serialize_for_messagebus(self):
+        """ Serialize the object for posting to messagebus.
 
             The API serializes ForeignKey fields by naming them <name>_id
             whereas model_to_dict leaves them with the original name. Modify
@@ -282,12 +282,16 @@
 
         return fields
 
+    def push_messagebus_event(self, deleted=False, pk=None):
+        self.push_kafka_event(deleted, pk)
+        self.push_redis_event(deleted, pk)
+
     def push_redis_event(self, deleted=False, pk=None):
         # Transmit update via Redis
         try:
             r = redis.Redis("redis")
 
-            model = self.serialize_for_redis()
+            model = self.serialize_for_messagebus()
             bases = inspect.getmro(self.__class__)
             class_names = ",".join([x.__name__ for x in bases])
 
@@ -308,11 +312,41 @@
 
             payload = json.dumps(json_dict, default=json_handler)
             r.publish(self.__class__.__name__, payload)
+
         except ConnectionError:
             # Redis not running.
             log.error('Connection to Redis failed')
             pass
 
+    def push_kafka_event(self, deleted=False, pk=None):
+        # Transmit update via kafka
+
+        model = self.serialize_for_messagebus()
+        bases = inspect.getmro(self.__class__)
+        class_names = ",".join([x.__name__ for x in bases])
+
+        model['class_names'] = class_names
+
+        if not pk:
+            pk = self.pk
+
+        json_dict = {
+            'pk': pk,
+            'changed_fields': self.changed_fields,
+            'object': model
+        }
+
+        if deleted:
+            json_dict['deleted'] = True
+            json_dict['object']['id'] = pk
+
+        topic = "xos.gui_events"
+        key = self.__class__.__name__
+        json_value = json.dumps(json_dict, default=json_handler)
+
+        XOSKafkaProducer.produce(topic, key, json_value)
+
+
 class AttributeMixin(object):
     # helper for extracting things from a json-encoded
     # service_specific_attribute
@@ -372,4 +406,3 @@
         self.dest=dest
         self.via=via
         self.into=into
-
diff --git a/xos/coreapi/core_main.py b/xos/coreapi/core_main.py
index 2de425a..c4553c1 100644
--- a/xos/coreapi/core_main.py
+++ b/xos/coreapi/core_main.py
@@ -26,6 +26,10 @@
 
 log = create_logger(Config().get('logging'))
 
+# create an single kafka producer connection for the core
+from xoskafka import XOSKafkaProducer
+XOSKafkaProducer.init()
+
 def parse_args():
     parser = argparse.ArgumentParser()
     parser.add_argument("--model_status", dest="model_status", type=int, default=0, help="status of model prep")
diff --git a/xos/synchronizers/new_base/event_engine.py b/xos/synchronizers/new_base/event_engine.py
index 49f8954..04aca29 100644
--- a/xos/synchronizers/new_base/event_engine.py
+++ b/xos/synchronizers/new_base/event_engine.py
@@ -1,4 +1,3 @@
-
 # Copyright 2017-present Open Networking Foundation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import confluent_kafka
 import imp
 import inspect
 import os
@@ -20,15 +20,15 @@
 import time
 from xosconfig import Config
 from multistructlog import create_logger
-from kafka.errors import NoBrokersAvailable
 
 log = create_logger(Config().get('logging'))
 
+
 class XOSKafkaThread(threading.Thread):
-    """ XOSKafkaTrhead
+    """ XOSKafkaThread
 
         A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
-        KafkaConsumer is launched to listen on the topics specified by the thread. The thread's process_event()
+        Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
         function is called for each event.
     """
 
@@ -40,8 +40,11 @@
         self.daemon = True
 
     def create_kafka_consumer(self):
-        from kafka import KafkaConsumer
-        return KafkaConsumer(bootstrap_servers=self.bootstrap_servers)
+        consumer_config = {
+            'bootstrap.servers': ','.join(self.bootstrap_servers),
+        }
+
+        return confluent_kafka.Consumer(**consumer_config)
 
     def run(self):
         if (not self.step.topics) and (not self.step.pattern):
@@ -55,27 +58,28 @@
             try:
                 self.consumer = self.create_kafka_consumer()
                 if self.step.topics:
-                    self.consumer.subscribe(topics=self.step.topics)
+                    self.consumer.subscribe(self.step.topics)
                 elif self.step.pattern:
-                    self.consumer.subscribe(pattern=self.step.pattern)
+                    self.consumer.subscribe(self.step.pattern)
 
                 log.info("Waiting for events",
                          topic=self.step.topics,
                          pattern=self.step.pattern,
                          step=self.step.__name__)
 
-                for msg in self.consumer:
+                for msg in self.consumer.poll():
                     log.info("Processing event", msg=msg, step=self.step.__name__)
                     try:
                         self.step(log=log).process_event(msg)
                     except:
                         log.exception("Exception in event step", msg=msg, step=self.step.__name__)
-            except NoBrokersAvailable:
-                log.warning("No brokers available on %s" % self.bootstrap_servers)
+
+            except confluent_kafka.KafkaError._ALL_BROKERS_DOWN, e:
+                log.warning("No brokers available on %s, %s" % (self.bootstrap_servers, e))
                 time.sleep(20)
-            except:
+            except confluent_kafka.KafkaError, e:
                 # Maybe Kafka has not started yet. Log the exception and try again in a second.
-                log.exception("Exception in kafka loop")
+                log.exception("Exception in kafka loop: %s" % e)
                 time.sleep(1)
 
 class XOSEventEngine:
diff --git a/xos/synchronizers/new_base/tests/test_event_engine.py b/xos/synchronizers/new_base/tests/test_event_engine.py
index 4e1c984..54d64ee 100644
--- a/xos/synchronizers/new_base/tests/test_event_engine.py
+++ b/xos/synchronizers/new_base/tests/test_event_engine.py
@@ -15,7 +15,6 @@
 import functools
 import unittest
 from mock import patch, PropertyMock, ANY
-from kafka.errors import NoBrokersAvailable
 
 import os, sys
 import time
@@ -33,21 +32,15 @@
     def __init__(self, values=["sampleevent"]):
         self.values = values
 
-    def subscribe(self, topics=None, pattern=None):
+    def subscribe(self, topics):
         pass
 
-    def __iter__(self):
+    def poll(self):
         for x in self.values:
             yield x
         # block forever
         time.sleep(1000)
 
-class MockKafkaError:
-    NoBrokersAvailable = Exception
-
-class MockKafka:
-    error = MockKafkaError
-
 class TestEventEngine(unittest.TestCase):
     def setUp(self):
         global XOSKafkaThread, Config, event_engine_log
@@ -58,7 +51,7 @@
         sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
         sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base', 'tests', 'event_steps'))
 
-        config =     os.path.join(test_path, "test_config.yaml")
+        config = os.path.join(test_path, "test_config.yaml")
         from xosconfig import Config
         Config.clear()
         Config.init(config, 'synchronizer-config-schema.yaml')
@@ -97,7 +90,7 @@
             time.sleep(0.1)
 
             # We should have subscribed to the fake consumer
-            fake_subscribe.assert_called_with(topics=["sometopic"])
+            fake_subscribe.assert_called_with(["sometopic"])
 
             # The fake consumer will have returned one event, and that event will have been passed to our step
             process_event.assert_called_with("sampleevent")
@@ -123,20 +116,11 @@
             time.sleep(0.1)
 
             # We should have subscribed to the fake consumer
-            fake_subscribe.assert_called_with(pattern="somepattern")
+            fake_subscribe.assert_called_with("somepattern")
 
             # The fake consumer will have returned one event, and that event will have been passed to our step
             process_event.assert_called_with("sampleevent")
 
-    def _test_start_no_bus(self):
-        self.event_engine.load_event_step_modules(self.event_steps_dir)
-        with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
-                patch.object(event_engine_log, "warning") as log_warning:
-
-                create_kafka_consumer.side_effect = NoBrokersAvailable()
-                self.event_engine.start()
-
-        log_warning.assert_called()
 
     def test_start_bad_tech(self):
         """ Set an unknown Technology in the event_step. XOSEventEngine.start() should print an error message and