[SEBA-232]
Publish XOS internal events on kafka as well as redis
Switch to confluent_kafka, create XOSKafkaProducer wrapper lib
Remove nonfunctional test for connection failure
Change-Id: I4d3057fcc0b5b56022ef3f853dbe0323ef071af7
diff --git a/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
index d7e5352..b5a2a06 100644
--- a/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
+++ b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
@@ -17,7 +17,7 @@
# kafkaloghandler - logging handler that sends to Kafka
import json
-import kafka
+import confluent_kafka
import logging
import sys
import time
@@ -36,33 +36,30 @@
logging.Handler.__init__(self)
- try:
- self.producer = kafka.KafkaProducer(
- bootstrap_servers=bootstrap_servers,
-
- # Replace unserializable items with repr version.
- # Otherwise, the entire log message is discarded if
- # it contains any unserializable fields
- value_serializer=lambda val: json.dumps(
- val,
- separators=(',', ':'),
- default=lambda o: repr(o),
- )
- )
-
- except kafka.errors.KafkaError, e:
- print "Kafka Error: %s" % e
- # die if there's an error
- sys.exit(1)
-
+ self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.key = key
self.flatten = flatten
self.blacklist = blacklist
self.timeout = timeout
+ self.producer = None
+
+ def _connect(self):
+
+ try:
+ producer_config = {
+ 'bootstrap.servers': ','.join(self.bootstrap_servers),
+ }
+
+ self.producer = confluent_kafka.Producer(**producer_config)
+
+ except confluent_kafka.KafkaError, e:
+ print "Kafka Error: %s" % e
+ # die if there's an error
+ sys.exit(1)
def _flatten(self, ns, toflatten, maxdepth):
- """ flatten dicts creating a key_subkey_subsubkey_... hierarchy """
+ """ flatten dicts creating a key.subkey.subsubkey... hierarchy """
# avoid recursivly flattening forever
if maxdepth < 1:
@@ -72,7 +69,7 @@
for k, v in toflatten.iteritems():
- prefix = "%s_%s" % (ns, k)
+ prefix = "%s.%s" % (ns, k)
if isinstance(v, dict):
flattened.update(self._flatten(prefix, v, maxdepth-1))
@@ -97,13 +94,30 @@
recvars[k] = v
- self.producer.send(self.topic, key=self.key, value=recvars)
+ # Replace unserializable items with repr version.
+ # Otherwise, the log message may be discarded if it contains any
+ # unserializable fields
+ json_recvars = json.dumps(
+ recvars,
+ separators=(',', ':'),
+ default=lambda o: repr(o),
+ )
+
+ if self.producer is None:
+ self._connect()
+
+ try:
+ self.producer.produce(self.topic, json_recvars, self.key)
+
+ except confluent_kafka.KafkaError, e:
+ print "Kafka Error: %s" % e
+ # currently don't do anything on failure...
+ pass
def flush(self):
- self.producer.flush(self.timeout)
- def close(self):
- self.producer.close(self.timeout)
+ if self.producer:
+ self.producer.flush(self.timeout)
if __name__ == '__main__':
diff --git a/lib/kafkaloghandler/setup.py b/lib/kafkaloghandler/setup.py
index 9dbcbb2..cba89ff 100644
--- a/lib/kafkaloghandler/setup.py
+++ b/lib/kafkaloghandler/setup.py
@@ -38,7 +38,7 @@
packages=['kafkaloghandler'],
license='Apache v2',
install_requires=[
- 'kafka>=1.3.5',
+ 'confluent-kafka>=0.11.5',
],
include_package_data=True,
zip_safe=False,
diff --git a/lib/xos-config/xosconfig/default.py b/lib/xos-config/xosconfig/default.py
index 36767e3..2c4e6d9 100644
--- a/lib/xos-config/xosconfig/default.py
+++ b/lib/xos-config/xosconfig/default.py
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
DEFAULT_VALUES = {
'xos_dir': '/opt/xos',
@@ -62,5 +60,6 @@
},
'node_key': '/opt/cord_profile/node_key',
'config_dir': '/etc/xos/sync',
- 'backoff_disabled': True
+ 'backoff_disabled': True,
+ 'kafka_bootstrap_servers': ['cord-kafka:9092'],
}
diff --git a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
index 37876ad..a9ae095 100644
--- a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
+++ b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
map:
name:
type: str
@@ -78,6 +76,10 @@
kind:
type: str
required: False
+ kafka_bootstrap_servers:
+ type: seq
+ sequence:
+ - type: str
event_bus:
type: map
required: False
diff --git a/lib/xos-config/xosconfig/xos-config-schema.yaml b/lib/xos-config/xosconfig/xos-config-schema.yaml
index d897e76..0705630 100644
--- a/lib/xos-config/xosconfig/xos-config-schema.yaml
+++ b/lib/xos-config/xosconfig/xos-config-schema.yaml
@@ -40,3 +40,7 @@
type: any
xos_dir:
type: str
+ kafka_bootstrap_servers:
+ type: seq
+ sequence:
+ - type: str
diff --git a/lib/xos-kafka/MANIFEST.in b/lib/xos-kafka/MANIFEST.in
new file mode 100644
index 0000000..9561fb1
--- /dev/null
+++ b/lib/xos-kafka/MANIFEST.in
@@ -0,0 +1 @@
+include README.rst
diff --git a/lib/xos-kafka/README.rst b/lib/xos-kafka/README.rst
new file mode 100644
index 0000000..c4e5937
--- /dev/null
+++ b/lib/xos-kafka/README.rst
@@ -0,0 +1,4 @@
+XOSKafka
+========
+
+Wraps Kafka libraries, handling errors and aggregating connection to producer
diff --git a/lib/xos-kafka/setup.py b/lib/xos-kafka/setup.py
new file mode 100644
index 0000000..4486f19
--- /dev/null
+++ b/lib/xos-kafka/setup.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xosutil.autoversion_setup import setup_with_auto_version
+from xosutil.version import __version__
+
+
+def readme():
+ with open('README.rst') as f:
+ return f.read()
+
+
+setup_with_auto_version(
+ name='xoskafka',
+ version=__version__,
+ description='Wrapper around kafka for XOS',
+ long_description=readme(),
+ classifiers=[
+ 'License :: OSI Approved :: Apache Software License',
+ ],
+ author='Zack Williams',
+ author_email='zdw@opennetworking.org',
+ packages=['xoskafka'],
+ license='Apache v2',
+ install_requires=[
+ 'confluent-kafka>=0.11.5',
+ 'xosconfig>=2.1.0',
+ 'multistructlog>=1.5',
+ ],
+ include_package_data=True,
+ zip_safe=False,
+ )
diff --git a/lib/xos-kafka/xoskafka/__init__.py b/lib/xos-kafka/xoskafka/__init__.py
new file mode 100644
index 0000000..69f5a32
--- /dev/null
+++ b/lib/xos-kafka/xoskafka/__init__.py
@@ -0,0 +1,15 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .xoskafkaproducer import XOSKafkaProducer
diff --git a/lib/xos-kafka/xoskafka/xoskafkaproducer.py b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
new file mode 100644
index 0000000..6781311
--- /dev/null
+++ b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
@@ -0,0 +1,75 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" XOSKafkaProducer """
+
+import confluent_kafka
+
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get('logging'))
+
+kafka_producer = None
+
+
+class XOSKafkaProducer:
+ """ XOSKafkaProducer
+ Wrapper to share Kafka Producer connection
+ """
+
+ @staticmethod
+ def init():
+
+ global kafka_producer
+
+ if kafka_producer:
+ raise Exception('XOSKafkaProducer already initialized')
+
+ else:
+ log.info('Connecting to Kafka with bootstrap servers: %s' %
+ Config.get('kafka_bootstrap_servers'))
+
+ try:
+ producer_config = {
+ 'bootstrap.servers':
+ ','.join(Config.get('kafka_bootstrap_servers')),
+ }
+
+ kafka_producer = confluent_kafka.Producer(**producer_config)
+
+ log.info('Connected to Kafka: %s' % kafka_producer)
+
+ except confluent_kafka.KafkaError, e:
+ log.exception("Kafka Error: %s" % e)
+
+ @classmethod
+ def produce(cls, topic, key, value):
+
+ try:
+ kafka_producer.produce(
+ topic,
+ value,
+ key,
+ callback=cls._kafka_delivery_callback
+ )
+
+ except confluent_kafka.KafkaError, err:
+ log.exception("Kafka Error", err)
+
+ @staticmethod
+ def _kafka_delivery_callback(err, msg):
+ if err:
+ log.error('Message failed delivery: %s' % err)
+ else:
+ log.trace('Message delivered', message=msg)