[SEBA-232]

Publish XOS internal events on kafka as well as redis

Switch to confluent_kafka, create XOSKafkaProducer wrapper lib

Remove nonfunctional test for connection failure

Change-Id: I4d3057fcc0b5b56022ef3f853dbe0323ef071af7
diff --git a/lib/xos-kafka/MANIFEST.in b/lib/xos-kafka/MANIFEST.in
new file mode 100644
index 0000000..9561fb1
--- /dev/null
+++ b/lib/xos-kafka/MANIFEST.in
@@ -0,0 +1 @@
+include README.rst
diff --git a/lib/xos-kafka/README.rst b/lib/xos-kafka/README.rst
new file mode 100644
index 0000000..c4e5937
--- /dev/null
+++ b/lib/xos-kafka/README.rst
@@ -0,0 +1,4 @@
+XOSKafka
+========
+
+Wraps Kafka libraries, handling errors and aggregating connection to producer
diff --git a/lib/xos-kafka/setup.py b/lib/xos-kafka/setup.py
new file mode 100644
index 0000000..4486f19
--- /dev/null
+++ b/lib/xos-kafka/setup.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xosutil.autoversion_setup import setup_with_auto_version
+from xosutil.version import __version__
+
+
+def readme():
+    with open('README.rst') as f:
+        return f.read()
+
+
+setup_with_auto_version(
+    name='xoskafka',
+    version=__version__,
+    description='Wrapper around kafka for XOS',
+    long_description=readme(),
+    classifiers=[
+        'License :: OSI Approved :: Apache Software License',
+    ],
+    author='Zack Williams',
+    author_email='zdw@opennetworking.org',
+    packages=['xoskafka'],
+    license='Apache v2',
+    install_requires=[
+        'confluent-kafka>=0.11.5',
+        'xosconfig>=2.1.0',
+        'multistructlog>=1.5',
+        ],
+    include_package_data=True,
+    zip_safe=False,
+    )
diff --git a/lib/xos-kafka/xoskafka/__init__.py b/lib/xos-kafka/xoskafka/__init__.py
new file mode 100644
index 0000000..69f5a32
--- /dev/null
+++ b/lib/xos-kafka/xoskafka/__init__.py
@@ -0,0 +1,15 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .xoskafkaproducer import XOSKafkaProducer
diff --git a/lib/xos-kafka/xoskafka/xoskafkaproducer.py b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
new file mode 100644
index 0000000..6781311
--- /dev/null
+++ b/lib/xos-kafka/xoskafka/xoskafkaproducer.py
@@ -0,0 +1,75 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" XOSKafkaProducer """
+
+import confluent_kafka
+
+from xosconfig import Config
+from multistructlog import create_logger
+log = create_logger(Config().get('logging'))
+
+kafka_producer = None
+
+
+class XOSKafkaProducer:
+    """ XOSKafkaProducer
+        Wrapper to share Kafka Producer connection
+    """
+
+    @staticmethod
+    def init():
+
+        global kafka_producer
+
+        if kafka_producer:
+            raise Exception('XOSKafkaProducer already initialized')
+
+        else:
+            log.info('Connecting to Kafka with bootstrap servers: %s' %
+                     Config.get('kafka_bootstrap_servers'))
+
+            try:
+                producer_config = {
+                    'bootstrap.servers':
+                        ','.join(Config.get('kafka_bootstrap_servers')),
+                }
+
+                kafka_producer = confluent_kafka.Producer(**producer_config)
+
+                log.info('Connected to Kafka: %s' % kafka_producer)
+
+            except confluent_kafka.KafkaError, e:
+                log.exception("Kafka Error: %s" % e)
+
+    @classmethod
+    def produce(cls, topic, key, value):
+
+        try:
+            kafka_producer.produce(
+                topic,
+                value,
+                key,
+                callback=cls._kafka_delivery_callback
+                )
+
+        except confluent_kafka.KafkaError, err:
+            log.exception("Kafka Error", err)
+
+    @staticmethod
+    def _kafka_delivery_callback(err, msg):
+        if err:
+            log.error('Message failed delivery: %s' % err)
+        else:
+            log.trace('Message delivered', message=msg)