[SEBA-232]
Publish XOS internal events on kafka as well as redis
Switch to confluent_kafka, create XOSKafkaProducer wrapper lib
Remove nonfunctional test for connection failure
Change-Id: I4d3057fcc0b5b56022ef3f853dbe0323ef071af7
diff --git a/VERSION b/VERSION
index 7c7e096..eca07e4 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.1.1-dev
+2.1.2
diff --git a/containers/chameleon/Dockerfile.chameleon b/containers/chameleon/Dockerfile.chameleon
index 5fc21f3..8cdb226 100644
--- a/containers/chameleon/Dockerfile.chameleon
+++ b/containers/chameleon/Dockerfile.chameleon
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/chameleon
-FROM xosproject/xos-base:2.0.0
+FROM xosproject/xos-base:2.1.2
# xos-base already has protoc and dependencies installed
diff --git a/containers/xos/Dockerfile.client b/containers/xos/Dockerfile.client
index 8ae9b17..caac8fa 100644
--- a/containers/xos/Dockerfile.client
+++ b/containers/xos/Dockerfile.client
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-client
-FROM xosproject/xos-libraries:2.1.0
+FROM xosproject/xos-libraries:2.1.2
# Install XOS client
COPY xos/xos_client /tmp/xos_client
diff --git a/containers/xos/Dockerfile.libraries b/containers/xos/Dockerfile.libraries
index cff084e..7e14091 100644
--- a/containers/xos/Dockerfile.libraries
+++ b/containers/xos/Dockerfile.libraries
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-FROM xosproject/xos-base:2.1.0
+FROM xosproject/xos-base:2.1.2
# Add libraries
COPY lib /opt/xos/lib
@@ -22,6 +22,7 @@
RUN cd /opt/xos/lib/xos-util && python setup.py install \
&& cd /opt/xos/lib/xos-config && python setup.py install \
&& cd /opt/xos/lib/xos-genx && python setup.py install \
+ && cd /opt/xos/lib/xos-kafka && python setup.py install \
&& cd /opt/xos/lib/kafkaloghandler && python setup.py install
# Label image
diff --git a/containers/xos/Dockerfile.synchronizer-base b/containers/xos/Dockerfile.synchronizer-base
index 350a8d8..86f58d2 100644
--- a/containers/xos/Dockerfile.synchronizer-base
+++ b/containers/xos/Dockerfile.synchronizer-base
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-synchronizer-base
-FROM xosproject/xos-client:2.1.0
+FROM xosproject/xos-client:2.1.2
COPY xos/synchronizers/new_base /opt/xos/synchronizers/new_base
COPY xos/xos/logger.py /opt/xos/xos/logger.py
diff --git a/containers/xos/Dockerfile.xos-core b/containers/xos/Dockerfile.xos-core
index 28d07dc..a875d9c 100644
--- a/containers/xos/Dockerfile.xos-core
+++ b/containers/xos/Dockerfile.xos-core
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-core
-FROM xosproject/xos-libraries:2.1.0
+FROM xosproject/xos-libraries:2.1.2
# Install XOS
ADD xos /opt/xos
diff --git a/containers/xos/pip_requested.txt b/containers/xos/pip_requested.txt
index 6adb9dc..cd6a85b 100644
--- a/containers/xos/pip_requested.txt
+++ b/containers/xos/pip_requested.txt
@@ -6,6 +6,7 @@
Werkzeug==0.14.1
ansible==2.5.0
astunparse==1.5.0
+confluent-kafka==0.11.5
django-extensions==2.0.6
django-timezones==0.2
djangorestframework==3.7.7
diff --git a/containers/xos/pip_requirements.txt b/containers/xos/pip_requirements.txt
index 41f3715..97ed1dd 100644
--- a/containers/xos/pip_requirements.txt
+++ b/containers/xos/pip_requirements.txt
@@ -25,6 +25,7 @@
cliff==2.11.0
cmd2==0.8.2
colorama==0.3.9
+confluent-kafka==0.11.5
constantly==15.1.0
contextlib2==0.5.5
cryptography==2.2.2
diff --git a/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
index d7e5352..b5a2a06 100644
--- a/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
+++ b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
@@ -17,7 +17,7 @@
# kafkaloghandler - logging handler that sends to Kafka
import json
-import kafka
+import confluent_kafka
import logging
import sys
import time
@@ -36,33 +36,30 @@
logging.Handler.__init__(self)
- try:
- self.producer = kafka.KafkaProducer(
- bootstrap_servers=bootstrap_servers,
-
- # Replace unserializable items with repr version.
- # Otherwise, the entire log message is discarded if
- # it contains any unserializable fields
- value_serializer=lambda val: json.dumps(
- val,
- separators=(',', ':'),
- default=lambda o: repr(o),
- )
- )
-
- except kafka.errors.KafkaError, e:
- print "Kafka Error: %s" % e
- # die if there's an error
- sys.exit(1)
-
+ self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.key = key
self.flatten = flatten
self.blacklist = blacklist
self.timeout = timeout
+ self.producer = None
+
+ def _connect(self):
+
+ try:
+ producer_config = {
+ 'bootstrap.servers': ','.join(self.bootstrap_servers),
+ }
+
+ self.producer = confluent_kafka.Producer(**producer_config)
+
+ except confluent_kafka.KafkaError, e:
+ print "Kafka Error: %s" % e
+ # die if there's an error
+ sys.exit(1)
def _flatten(self, ns, toflatten, maxdepth):
- """ flatten dicts creating a key_subkey_subsubkey_... hierarchy """
+ """ flatten dicts creating a key.subkey.subsubkey... hierarchy """
# avoid recursivly flattening forever
if maxdepth < 1:
@@ -72,7 +69,7 @@
for k, v in toflatten.iteritems():
- prefix = "%s_%s" % (ns, k)
+ prefix = "%s.%s" % (ns, k)
if isinstance(v, dict):
flattened.update(self._flatten(prefix, v, maxdepth-1))
@@ -97,13 +94,30 @@
recvars[k] = v
- self.producer.send(self.topic, key=self.key, value=recvars)
+ # Replace unserializable items with repr version.
+ # Otherwise, the log message may be discarded if it contains any
+ # unserializable fields
+ json_recvars = json.dumps(
+ recvars,
+ separators=(',', ':'),
+ default=lambda o: repr(o),
+ )
+
+ if self.producer is None:
+ self._connect()
+
+ try:
+ self.producer.produce(self.topic, json_recvars, self.key)
+
+ except confluent_kafka.KafkaError, e:
+ print "Kafka Error: %s" % e
+ # currently don't do anything on failure...
+ pass
def flush(self):
- self.producer.flush(self.timeout)
- def close(self):
- self.producer.close(self.timeout)
+ if self.producer:
+ self.producer.flush(self.timeout)
if __name__ == '__main__':
diff --git a/lib/kafkaloghandler/setup.py b/lib/kafkaloghandler/setup.py
index 9dbcbb2..cba89ff 100644
--- a/lib/kafkaloghandler/setup.py
+++ b/lib/kafkaloghandler/setup.py
@@ -38,7 +38,7 @@
packages=['kafkaloghandler'],
license='Apache v2',
install_requires=[
- 'kafka>=1.3.5',
+ 'confluent-kafka>=0.11.5',
],
include_package_data=True,
zip_safe=False,
diff --git a/lib/xos-config/xosconfig/default.py b/lib/xos-config/xosconfig/default.py
index 36767e3..2c4e6d9 100644
--- a/lib/xos-config/xosconfig/default.py
+++ b/lib/xos-config/xosconfig/default.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
DEFAULT_VALUES = {
'xos_dir': '/opt/xos',
@@ -62,5 +60,6 @@
},
'node_key': '/opt/cord_profile/node_key',
'config_dir': '/etc/xos/sync',
- 'backoff_disabled': True
+ 'backoff_disabled': True,
+ 'kafka_bootstrap_servers': ['cord-kafka:9092'],
}
diff --git a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
index 37876ad..a9ae095 100644
--- a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
+++ b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
map:
name:
type: str
@@ -78,6 +76,10 @@
kind:
type: str
required: False
+ kafka_bootstrap_servers:
+ type: seq
+ sequence:
+ - type: str
event_bus:
type: map
required: False
diff --git a/lib/xos-config/xosconfig/xos-config-schema.yaml b/lib/xos-config/xosconfig/xos-config-schema.yaml
index d897e76..0705630 100644
--- a/lib/xos-config/xosconfig/xos-config-schema.yaml
+++ b/lib/xos-config/xosconfig/xos-config-schema.yaml
@@ -40,3 +40,7 @@
type: any
xos_dir:
type: str
+ kafka_bootstrap_servers:
+ type: seq
+ sequence:
+ - type: str
diff --git a/lib/xos-kafka/MANIFEST.in b/lib/xos-kafka/MANIFEST.in
new file mode 100644
index 0000000..9561fb1
--- /dev/null
+++ b/lib/xos-kafka/MANIFEST.in
@@ -0,0 +1 @@
+include README.rst
diff --git a/lib/xos-kafka/README.rst b/lib/xos-kafka/README.rst
new file mode 100644
index 0000000..c4e5937
--- /dev/null
+++ b/lib/xos-kafka/README.rst
@@ -0,0 +1,4 @@
+XOSKafka
+========
+
+Wraps Kafka libraries, handling errors and aggregating connection to producer
diff --git a/lib/xos-kafka/setup.py b/lib/xos-kafka/setup.py
new file mode 100644
index 0000000..4486f19
--- /dev/null
+++ b/lib/xos-kafka/setup.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xosutil.autoversion_setup import setup_with_auto_version
+from xosutil.version import __version__
+
+
+def readme():
+ with open('README.rst') as f:
+ return f.read()
+
+
+setup_with_auto_version(
+ name='xoskafka',
+ version=__version__,
+ description='Wrapper around kafka for XOS',
+ long_description=readme(),
+ classifiers=[
+ 'License :: OSI Approved :: Apache Software License',
+ ],
+ author='Zack Williams',
+ author_email='zdw@opennetworking.org',
+ packages=['xoskafka'],
+ license='Apache v2',
+ install_requires=[
+ 'confluent-kafka>=0.11.5',
+ 'xosconfig>=2.1.0',
+ 'multistructlog>=1.5',
+ ],
+ include_package_data=True,
+ zip_safe=False,
+ )
diff --git a/lib/xos-kafka/xoskafka/__init__.py b/lib/xos-kafka/xoskafka/__init__.py
new file mode 100644
index 0000000..69f5a32
--- /dev/null
+++ b/lib/xos-kafka/xoskafka/__init__.py
@@ -0,0 +1,15 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .xoskafkaproducer import XOSKafkaProducer
diff --git a/lib/xos-kafka/xoskafka/xoskafkaproducer.py b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
new file mode 100644
index 0000000..6781311
--- /dev/null
+++ b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
@@ -0,0 +1,75 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" XOSKafkaProducer """
+
+import confluent_kafka
+
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get('logging'))
+
+kafka_producer = None
+
+
+class XOSKafkaProducer:
+ """ XOSKafkaProducer
+ Wrapper to share Kafka Producer connection
+ """
+
+ @staticmethod
+ def init():
+
+ global kafka_producer
+
+ if kafka_producer:
+ raise Exception('XOSKafkaProducer already initialized')
+
+ else:
+ log.info('Connecting to Kafka with bootstrap servers: %s' %
+ Config.get('kafka_bootstrap_servers'))
+
+ try:
+ producer_config = {
+ 'bootstrap.servers':
+ ','.join(Config.get('kafka_bootstrap_servers')),
+ }
+
+ kafka_producer = confluent_kafka.Producer(**producer_config)
+
+ log.info('Connected to Kafka: %s' % kafka_producer)
+
+ except confluent_kafka.KafkaError, e:
+ log.exception("Kafka Error: %s" % e)
+
+ @classmethod
+ def produce(cls, topic, key, value):
+
+ try:
+ kafka_producer.produce(
+ topic,
+ value,
+ key,
+ callback=cls._kafka_delivery_callback
+ )
+
+ except confluent_kafka.KafkaError, err:
+ log.exception("Kafka Error", err)
+
+ @staticmethod
+ def _kafka_delivery_callback(err, msg):
+ if err:
+ log.error('Message failed delivery: %s' % err)
+ else:
+ log.trace('Message delivered', message=msg)
diff --git a/scripts/setup_venv.sh b/scripts/setup_venv.sh
index 99717e1..feab72a 100755
--- a/scripts/setup_venv.sh
+++ b/scripts/setup_venv.sh
@@ -46,11 +46,16 @@
echo "xos-config Installed"
popd
-pushd "$XOS_DIR//lib/xos-genx"
+pushd "$XOS_DIR/lib/xos-genx"
python setup.py install
echo "xos-genx Installed"
popd
+pushd "$XOS_DIR/lib/xos-kafka"
+python setup.py install
+echo "xos-kafka Installed"
+popd
+
pushd "$XOS_DIR/xos/xos_client"
make
echo "xos-client Installed"
diff --git a/scripts/xos_dev_reqs.txt b/scripts/xos_dev_reqs.txt
index 1680477..5f1f9c3 100644
--- a/scripts/xos_dev_reqs.txt
+++ b/scripts/xos_dev_reqs.txt
@@ -4,10 +4,10 @@
Twisted==16.6.0
astunparse==1.5.0
colorama==0.3.9
+confluent-kafka==0.11.5
grpcio-tools==1.12.0
grpcio==1.12.0
ipaddress==1.0.19
-kafka==1.3.5
multistructlog==1.5
netaddr==0.7.19
networkx==1.11
diff --git a/unittest.cfg b/unittest.cfg
index f37d783..65eba41 100644
--- a/unittest.cfg
+++ b/unittest.cfg
@@ -5,3 +5,9 @@
xos-config
coreapi
+[coverage]
+always-on = True
+coverage = xos
+ lib
+coverage-report = term
+coverage-report = xml
diff --git a/xos/core/models/user.py b/xos/core/models/user.py
index 4bd0e0f..63510c7 100644
--- a/xos/core/models/user.py
+++ b/xos/core/models/user.py
@@ -39,9 +39,6 @@
# currently generate the User models.
import security
-import redis
-from redis import ConnectionError
-
# Create your models here.
@@ -309,7 +306,7 @@
super(User, self).save(*args, **kwargs)
- self.push_redis_event()
+ self.push_messagebus_event()
self._initial = self._dict
diff --git a/xos/core/models/xosbase.py b/xos/core/models/xosbase.py
index ab02987..8cd1d4f 100644
--- a/xos/core/models/xosbase.py
+++ b/xos/core/models/xosbase.py
@@ -50,7 +50,7 @@
if (purge):
pk = self.pk
super(XOSBase, self).delete(*args, **kwds)
- self.push_redis_event(deleted=True, pk=pk)
+ self.push_messagebus_event(deleted=True, pk=pk)
else:
if (not self.write_protect ):
self.deleted = True
@@ -205,7 +205,7 @@
self.verify_live_keys(update_fields = update_fields)
super(XOSBase, self).save(*args, **kwargs)
- self.push_redis_event()
+ self.push_messagebus_event()
self._initial = self._dict
diff --git a/xos/core/models/xosbase_header.py b/xos/core/models/xosbase_header.py
index 0e3a5bb..da40d16 100644
--- a/xos/core/models/xosbase_header.py
+++ b/xos/core/models/xosbase_header.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
import datetime
import time
import calendar
@@ -39,6 +37,8 @@
import redis
from redis import ConnectionError
+from xoskafka import XOSKafkaProducer
+
from xosconfig import Config
from multistructlog import create_logger
@@ -260,8 +260,8 @@
return
raise Exception("Field value %s is not in %s" % (field, str(choices)))
- def serialize_for_redis(self):
- """ Serialize the object for posting to redis.
+ def serialize_for_messagebus(self):
+ """ Serialize the object for posting to messagebus.
The API serializes ForeignKey fields by naming them <name>_id
whereas model_to_dict leaves them with the original name. Modify
@@ -282,12 +282,16 @@
return fields
+ def push_messagebus_event(self, deleted=False, pk=None):
+ self.push_kafka_event(deleted, pk)
+ self.push_redis_event(deleted, pk)
+
def push_redis_event(self, deleted=False, pk=None):
# Transmit update via Redis
try:
r = redis.Redis("redis")
- model = self.serialize_for_redis()
+ model = self.serialize_for_messagebus()
bases = inspect.getmro(self.__class__)
class_names = ",".join([x.__name__ for x in bases])
@@ -308,11 +312,41 @@
payload = json.dumps(json_dict, default=json_handler)
r.publish(self.__class__.__name__, payload)
+
except ConnectionError:
# Redis not running.
log.error('Connection to Redis failed')
pass
+ def push_kafka_event(self, deleted=False, pk=None):
+ # Transmit update via kafka
+
+ model = self.serialize_for_messagebus()
+ bases = inspect.getmro(self.__class__)
+ class_names = ",".join([x.__name__ for x in bases])
+
+ model['class_names'] = class_names
+
+ if not pk:
+ pk = self.pk
+
+ json_dict = {
+ 'pk': pk,
+ 'changed_fields': self.changed_fields,
+ 'object': model
+ }
+
+ if deleted:
+ json_dict['deleted'] = True
+ json_dict['object']['id'] = pk
+
+ topic = "xos.gui_events"
+ key = self.__class__.__name__
+ json_value = json.dumps(json_dict, default=json_handler)
+
+ XOSKafkaProducer.produce(topic, key, json_value)
+
+
class AttributeMixin(object):
# helper for extracting things from a json-encoded
# service_specific_attribute
@@ -372,4 +406,3 @@
self.dest=dest
self.via=via
self.into=into
-
diff --git a/xos/coreapi/core_main.py b/xos/coreapi/core_main.py
index 2de425a..c4553c1 100644
--- a/xos/coreapi/core_main.py
+++ b/xos/coreapi/core_main.py
@@ -26,6 +26,10 @@
log = create_logger(Config().get('logging'))
+# create an single kafka producer connection for the core
+from xoskafka import XOSKafkaProducer
+XOSKafkaProducer.init()
+
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--model_status", dest="model_status", type=int, default=0, help="status of model prep")
diff --git a/xos/synchronizers/new_base/event_engine.py b/xos/synchronizers/new_base/event_engine.py
index 49f8954..04aca29 100644
--- a/xos/synchronizers/new_base/event_engine.py
+++ b/xos/synchronizers/new_base/event_engine.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import confluent_kafka
import imp
import inspect
import os
@@ -20,15 +20,15 @@
import time
from xosconfig import Config
from multistructlog import create_logger
-from kafka.errors import NoBrokersAvailable
log = create_logger(Config().get('logging'))
+
class XOSKafkaThread(threading.Thread):
- """ XOSKafkaTrhead
+ """ XOSKafkaThread
A Thread for servicing Kafka events. There is one event_step associated with one XOSKafkaThread. A
- KafkaConsumer is launched to listen on the topics specified by the thread. The thread's process_event()
+ Consumer is launched to listen on the topics specified by the thread. The thread's process_event()
function is called for each event.
"""
@@ -40,8 +40,11 @@
self.daemon = True
def create_kafka_consumer(self):
- from kafka import KafkaConsumer
- return KafkaConsumer(bootstrap_servers=self.bootstrap_servers)
+ consumer_config = {
+ 'bootstrap.servers': ','.join(self.bootstrap_servers),
+ }
+
+ return confluent_kafka.Consumer(**consumer_config)
def run(self):
if (not self.step.topics) and (not self.step.pattern):
@@ -55,27 +58,28 @@
try:
self.consumer = self.create_kafka_consumer()
if self.step.topics:
- self.consumer.subscribe(topics=self.step.topics)
+ self.consumer.subscribe(self.step.topics)
elif self.step.pattern:
- self.consumer.subscribe(pattern=self.step.pattern)
+ self.consumer.subscribe(self.step.pattern)
log.info("Waiting for events",
topic=self.step.topics,
pattern=self.step.pattern,
step=self.step.__name__)
- for msg in self.consumer:
+ for msg in self.consumer.poll():
log.info("Processing event", msg=msg, step=self.step.__name__)
try:
self.step(log=log).process_event(msg)
except:
log.exception("Exception in event step", msg=msg, step=self.step.__name__)
- except NoBrokersAvailable:
- log.warning("No brokers available on %s" % self.bootstrap_servers)
+
+ except confluent_kafka.KafkaError._ALL_BROKERS_DOWN, e:
+ log.warning("No brokers available on %s, %s" % (self.bootstrap_servers, e))
time.sleep(20)
- except:
+ except confluent_kafka.KafkaError, e:
# Maybe Kafka has not started yet. Log the exception and try again in a second.
- log.exception("Exception in kafka loop")
+ log.exception("Exception in kafka loop: %s" % e)
time.sleep(1)
class XOSEventEngine:
diff --git a/xos/synchronizers/new_base/tests/test_event_engine.py b/xos/synchronizers/new_base/tests/test_event_engine.py
index 4e1c984..54d64ee 100644
--- a/xos/synchronizers/new_base/tests/test_event_engine.py
+++ b/xos/synchronizers/new_base/tests/test_event_engine.py
@@ -15,7 +15,6 @@
import functools
import unittest
from mock import patch, PropertyMock, ANY
-from kafka.errors import NoBrokersAvailable
import os, sys
import time
@@ -33,21 +32,15 @@
def __init__(self, values=["sampleevent"]):
self.values = values
- def subscribe(self, topics=None, pattern=None):
+ def subscribe(self, topics):
pass
- def __iter__(self):
+ def poll(self):
for x in self.values:
yield x
# block forever
time.sleep(1000)
-class MockKafkaError:
- NoBrokersAvailable = Exception
-
-class MockKafka:
- error = MockKafkaError
-
class TestEventEngine(unittest.TestCase):
def setUp(self):
global XOSKafkaThread, Config, event_engine_log
@@ -58,7 +51,7 @@
sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base'))
sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base', 'tests', 'event_steps'))
- config = os.path.join(test_path, "test_config.yaml")
+ config = os.path.join(test_path, "test_config.yaml")
from xosconfig import Config
Config.clear()
Config.init(config, 'synchronizer-config-schema.yaml')
@@ -97,7 +90,7 @@
time.sleep(0.1)
# We should have subscribed to the fake consumer
- fake_subscribe.assert_called_with(topics=["sometopic"])
+ fake_subscribe.assert_called_with(["sometopic"])
# The fake consumer will have returned one event, and that event will have been passed to our step
process_event.assert_called_with("sampleevent")
@@ -123,20 +116,11 @@
time.sleep(0.1)
# We should have subscribed to the fake consumer
- fake_subscribe.assert_called_with(pattern="somepattern")
+ fake_subscribe.assert_called_with("somepattern")
# The fake consumer will have returned one event, and that event will have been passed to our step
process_event.assert_called_with("sampleevent")
- def _test_start_no_bus(self):
- self.event_engine.load_event_step_modules(self.event_steps_dir)
- with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \
- patch.object(event_engine_log, "warning") as log_warning:
-
- create_kafka_consumer.side_effect = NoBrokersAvailable()
- self.event_engine.start()
-
- log_warning.assert_called()
def test_start_bad_tech(self):
""" Set an unknown Technology in the event_step. XOSEventEngine.start() should print an error message and