[SEBA-154]

Add a KafkaLogHandler to ship logs via kafka

Change-Id: I8b1d6cd8600d4b46752c2754198de0c1c66e398c
diff --git a/containers/xos/Dockerfile.libraries b/containers/xos/Dockerfile.libraries
index 23ec024..ef51df4 100644
--- a/containers/xos/Dockerfile.libraries
+++ b/containers/xos/Dockerfile.libraries
@@ -1,4 +1,3 @@
-
 # Copyright 2017-present Open Networking Foundation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,22 +12,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 FROM xosproject/xos-base:master
 
 # Add libraries
 COPY lib /opt/xos/lib
-
 COPY VERSION /opt/xos
 
-# Install the config module
-# and the xosgenx library
-RUN cd /opt/xos/lib/xos-util; \
-        python setup.py install; \
-        cd /opt/xos/lib/xos-config/; \
-	python setup.py install; \
-	cd /opt/xos/lib/xos-genx/; \
-	python setup.py install
+# Install python modules included with XOS
+RUN cd /opt/xos/lib/xos-util && python setup.py install \
+ && cd /opt/xos/lib/xos-config && python setup.py install \
+ && cd /opt/xos/lib/xos-genx && python setup.py install \
+ && cd /opt/xos/lib/kafkaloghandler && python setup.py install
 
 # Label image
 ARG org_label_schema_schema_version=1.0
@@ -45,4 +39,4 @@
       org.label-schema.vcs-url=$org_label_schema_vcs_url \
       org.label-schema.vcs-ref=$org_label_schema_vcs_ref \
       org.label-schema.build-date=$org_label_schema_build_date \
-      org.opencord.vcs-commit-date=$org_opencord_vcs_commit_date
\ No newline at end of file
+      org.opencord.vcs-commit-date=$org_opencord_vcs_commit_date
diff --git a/lib/kafkaloghandler/MANIFEST.in b/lib/kafkaloghandler/MANIFEST.in
new file mode 100644
index 0000000..9561fb1
--- /dev/null
+++ b/lib/kafkaloghandler/MANIFEST.in
@@ -0,0 +1 @@
+include README.rst
diff --git a/lib/kafkaloghandler/README.rst b/lib/kafkaloghandler/README.rst
new file mode 100644
index 0000000..8dd8fd0
--- /dev/null
+++ b/lib/kafkaloghandler/README.rst
@@ -0,0 +1,4 @@
+KafkaLogHandler
+===============
+
+Provides a logging handler that sends messages to a Kafka message bus.
diff --git a/lib/kafkaloghandler/kafkaloghandler/__init__.py b/lib/kafkaloghandler/kafkaloghandler/__init__.py
new file mode 100644
index 0000000..4a82628
--- /dev/null
+++ b/lib/kafkaloghandler/kafkaloghandler/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
new file mode 100644
index 0000000..d7e5352
--- /dev/null
+++ b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# kafkaloghandler - logging handler that sends to Kafka
+
+import json
+import kafka
+import logging
+import sys
+import time
+
+
+class KafkaLogHandler(logging.Handler):
+
+    def __init__(self,
+                 bootstrap_servers=["localhost:9092"],
+                 key="kh",  # kafka key
+                 topic="kafkaloghandler",  # kafka topic
+                 timeout=10.0,  # kafka connection timeout
+                 flatten=3,  # maximum depth of dict flattening
+                 blacklist=["_logger"],  # keys excluded from messages
+                 ):
+
+        logging.Handler.__init__(self)
+
+        try:
+            self.producer = kafka.KafkaProducer(
+                bootstrap_servers=bootstrap_servers,
+
+                # Replace unserializable items with repr version.
+                # Otherwise, the entire log message is discarded if
+                # it contains any unserializable fields
+                value_serializer=lambda val: json.dumps(
+                    val,
+                    separators=(',', ':'),
+                    default=lambda o: repr(o),
+                    )
+                )
+
+        except kafka.errors.KafkaError, e:
+            print "Kafka Error: %s" % e
+            # die if there's an error
+            sys.exit(1)
+
+        self.topic = topic
+        self.key = key
+        self.flatten = flatten
+        self.blacklist = blacklist
+        self.timeout = timeout
+
+    def _flatten(self, ns, toflatten, maxdepth):
+        """ flatten dicts creating a key_subkey_subsubkey_... hierarchy """
+
+        # avoid recursivly flattening forever
+        if maxdepth < 1:
+            return toflatten
+
+        flattened = {}
+
+        for k, v in toflatten.iteritems():
+
+            prefix = "%s_%s" % (ns, k)
+
+            if isinstance(v, dict):
+                flattened.update(self._flatten(prefix, v, maxdepth-1))
+            else:
+                flattened[prefix] = v
+
+        return flattened
+
+    def emit(self, record):
+
+        recvars = {}
+
+        for k, v in vars(record).iteritems():
+            # skip items in blacklist
+            if k in self.blacklist:
+                continue
+
+            # flatten any sub-dicts down
+            if self.flatten and isinstance(v, dict):
+                recvars.update(self._flatten(k, v, self.flatten))
+                continue
+
+            recvars[k] = v
+
+        self.producer.send(self.topic, key=self.key, value=recvars)
+
+    def flush(self):
+        self.producer.flush(self.timeout)
+
+    def close(self):
+        self.producer.close(self.timeout)
+
+
+if __name__ == '__main__':
+
+    logger = logging.getLogger(__name__)
+    logger.handlers = []
+
+    logger.setLevel(logging.INFO)
+
+    kh = KafkaLogHandler(
+            bootstrap_servers=["test-kafka:9092"],
+            topic="testtopic",
+            )
+
+    logger.addHandler(kh)
+
+    logger.error('Error message')
+
+    extra_data = {
+      "key1": "value1",
+      "key2": "value2",
+    }
+
+    logger.info('Info message with extra data', extra=extra_data)
+
+    index = 0
+    while True:
+        logger.info('Info message - loop count: %s' % index)
+        index += 1
+        time.sleep(10)
diff --git a/lib/kafkaloghandler/multistructlogtest.py b/lib/kafkaloghandler/multistructlogtest.py
new file mode 100644
index 0000000..c65165f
--- /dev/null
+++ b/lib/kafkaloghandler/multistructlogtest.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import multistructlog
+import time
+
+# config of logging
+logconfig = {
+    "version": 1,
+    "handlers": {
+        "console": {
+            "class": "logging.StreamHandler"
+            },
+        "kafka": {
+            "class": "kafkaloghandler.kafkaloghandler.KafkaLogHandler",
+            "bootstrap_servers": ["test-kafka:9092"],
+            "topic": "testtopic"
+            },
+        },
+
+    "loggers": {
+        "multistructlog": {
+            "handlers": ["console", "kafka"],
+            "level": "DEBUG"
+        }
+    },
+}
+
+logger = multistructlog.create_logger(logconfig)
+
+logger.error('Test error message')
+
+extra_data = {
+  "key1": "value1",
+  "key2": "value2",
+}
+
+logger.info('Test info message with extra data', extra=extra_data)
+
+index = 0
+while True:
+    logger.info('Info message - loop count: %s' % index)
+    index += 1
+    time.sleep(10)
diff --git a/lib/kafkaloghandler/setup.py b/lib/kafkaloghandler/setup.py
new file mode 100644
index 0000000..9dbcbb2
--- /dev/null
+++ b/lib/kafkaloghandler/setup.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xosutil.autoversion_setup import setup_with_auto_version
+from xosutil.version import __version__
+
+
+def readme():
+    with open('README.rst') as f:
+        return f.read()
+
+
+setup_with_auto_version(
+    name='kafkaloghandler',
+    version=__version__,
+    description='Kafka Logging Handler',
+    long_description=readme(),
+    classifiers=[
+        'Topic :: System :: Logging',
+        'Topic :: Internet :: Log Analysis',
+        'License :: OSI Approved :: Apache Software License',
+    ],
+    author='Zack Williams',
+    author_email='zdw@opennetworking.org',
+    packages=['kafkaloghandler'],
+    license='Apache v2',
+    install_requires=[
+        'kafka>=1.3.5',
+        ],
+    include_package_data=True,
+    zip_safe=False,
+    )