[SEBA-154]
Add a KafkaLogHandler to ship logs via kafka
Change-Id: I8b1d6cd8600d4b46752c2754198de0c1c66e398c
diff --git a/containers/xos/Dockerfile.libraries b/containers/xos/Dockerfile.libraries
index 23ec024..ef51df4 100644
--- a/containers/xos/Dockerfile.libraries
+++ b/containers/xos/Dockerfile.libraries
@@ -1,4 +1,3 @@
-
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,22 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
FROM xosproject/xos-base:master
# Add libraries
COPY lib /opt/xos/lib
-
COPY VERSION /opt/xos
-# Install the config module
-# and the xosgenx library
-RUN cd /opt/xos/lib/xos-util; \
- python setup.py install; \
- cd /opt/xos/lib/xos-config/; \
- python setup.py install; \
- cd /opt/xos/lib/xos-genx/; \
- python setup.py install
+# Install python modules included with XOS
+RUN cd /opt/xos/lib/xos-util && python setup.py install \
+ && cd /opt/xos/lib/xos-config && python setup.py install \
+ && cd /opt/xos/lib/xos-genx && python setup.py install \
+ && cd /opt/xos/lib/kafkaloghandler && python setup.py install
# Label image
ARG org_label_schema_schema_version=1.0
@@ -45,4 +39,4 @@
org.label-schema.vcs-url=$org_label_schema_vcs_url \
org.label-schema.vcs-ref=$org_label_schema_vcs_ref \
org.label-schema.build-date=$org_label_schema_build_date \
- org.opencord.vcs-commit-date=$org_opencord_vcs_commit_date
\ No newline at end of file
+ org.opencord.vcs-commit-date=$org_opencord_vcs_commit_date
diff --git a/lib/kafkaloghandler/MANIFEST.in b/lib/kafkaloghandler/MANIFEST.in
new file mode 100644
index 0000000..9561fb1
--- /dev/null
+++ b/lib/kafkaloghandler/MANIFEST.in
@@ -0,0 +1 @@
+include README.rst
diff --git a/lib/kafkaloghandler/README.rst b/lib/kafkaloghandler/README.rst
new file mode 100644
index 0000000..8dd8fd0
--- /dev/null
+++ b/lib/kafkaloghandler/README.rst
@@ -0,0 +1,4 @@
+KafkaLogHandler
+===============
+
+Provides a logging handler that sends messages to a Kafka message bus.
diff --git a/lib/kafkaloghandler/kafkaloghandler/__init__.py b/lib/kafkaloghandler/kafkaloghandler/__init__.py
new file mode 100644
index 0000000..4a82628
--- /dev/null
+++ b/lib/kafkaloghandler/kafkaloghandler/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
new file mode 100644
index 0000000..d7e5352
--- /dev/null
+++ b/lib/kafkaloghandler/kafkaloghandler/kafkaloghandler.py
@@ -0,0 +1,136 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# kafkaloghandler - logging handler that sends to Kafka
+
+import json
+import kafka
+import logging
+import sys
+import time
+
+
+class KafkaLogHandler(logging.Handler):
+
+ def __init__(self,
+ bootstrap_servers=["localhost:9092"],
+ key="kh", # kafka key
+ topic="kafkaloghandler", # kafka topic
+ timeout=10.0, # kafka connection timeout
+ flatten=3, # maximum depth of dict flattening
+ blacklist=["_logger"], # keys excluded from messages
+ ):
+
+ logging.Handler.__init__(self)
+
+ try:
+ self.producer = kafka.KafkaProducer(
+ bootstrap_servers=bootstrap_servers,
+
+ # Replace unserializable items with repr version.
+ # Otherwise, the entire log message is discarded if
+ # it contains any unserializable fields
+ value_serializer=lambda val: json.dumps(
+ val,
+ separators=(',', ':'),
+ default=lambda o: repr(o),
+ )
+ )
+
+ except kafka.errors.KafkaError, e:
+ print "Kafka Error: %s" % e
+ # die if there's an error
+ sys.exit(1)
+
+ self.topic = topic
+ self.key = key
+ self.flatten = flatten
+ self.blacklist = blacklist
+ self.timeout = timeout
+
+ def _flatten(self, ns, toflatten, maxdepth):
+ """ flatten dicts creating a key_subkey_subsubkey_... hierarchy """
+
+ # avoid recursivly flattening forever
+ if maxdepth < 1:
+ return toflatten
+
+ flattened = {}
+
+ for k, v in toflatten.iteritems():
+
+ prefix = "%s_%s" % (ns, k)
+
+ if isinstance(v, dict):
+ flattened.update(self._flatten(prefix, v, maxdepth-1))
+ else:
+ flattened[prefix] = v
+
+ return flattened
+
+ def emit(self, record):
+
+ recvars = {}
+
+ for k, v in vars(record).iteritems():
+ # skip items in blacklist
+ if k in self.blacklist:
+ continue
+
+ # flatten any sub-dicts down
+ if self.flatten and isinstance(v, dict):
+ recvars.update(self._flatten(k, v, self.flatten))
+ continue
+
+ recvars[k] = v
+
+ self.producer.send(self.topic, key=self.key, value=recvars)
+
+ def flush(self):
+ self.producer.flush(self.timeout)
+
+ def close(self):
+ self.producer.close(self.timeout)
+
+
+if __name__ == '__main__':
+
+ logger = logging.getLogger(__name__)
+ logger.handlers = []
+
+ logger.setLevel(logging.INFO)
+
+ kh = KafkaLogHandler(
+ bootstrap_servers=["test-kafka:9092"],
+ topic="testtopic",
+ )
+
+ logger.addHandler(kh)
+
+ logger.error('Error message')
+
+ extra_data = {
+ "key1": "value1",
+ "key2": "value2",
+ }
+
+ logger.info('Info message with extra data', extra=extra_data)
+
+ index = 0
+ while True:
+ logger.info('Info message - loop count: %s' % index)
+ index += 1
+ time.sleep(10)
diff --git a/lib/kafkaloghandler/multistructlogtest.py b/lib/kafkaloghandler/multistructlogtest.py
new file mode 100644
index 0000000..c65165f
--- /dev/null
+++ b/lib/kafkaloghandler/multistructlogtest.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import multistructlog
+import time
+
+# config of logging
+logconfig = {
+ "version": 1,
+ "handlers": {
+ "console": {
+ "class": "logging.StreamHandler"
+ },
+ "kafka": {
+ "class": "kafkaloghandler.kafkaloghandler.KafkaLogHandler",
+ "bootstrap_servers": ["test-kafka:9092"],
+ "topic": "testtopic"
+ },
+ },
+
+ "loggers": {
+ "multistructlog": {
+ "handlers": ["console", "kafka"],
+ "level": "DEBUG"
+ }
+ },
+}
+
+logger = multistructlog.create_logger(logconfig)
+
+logger.error('Test error message')
+
+extra_data = {
+ "key1": "value1",
+ "key2": "value2",
+}
+
+logger.info('Test info message with extra data', extra=extra_data)
+
+index = 0
+while True:
+ logger.info('Info message - loop count: %s' % index)
+ index += 1
+ time.sleep(10)
diff --git a/lib/kafkaloghandler/setup.py b/lib/kafkaloghandler/setup.py
new file mode 100644
index 0000000..9dbcbb2
--- /dev/null
+++ b/lib/kafkaloghandler/setup.py
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xosutil.autoversion_setup import setup_with_auto_version
+from xosutil.version import __version__
+
+
+def readme():
+ with open('README.rst') as f:
+ return f.read()
+
+
+setup_with_auto_version(
+ name='kafkaloghandler',
+ version=__version__,
+ description='Kafka Logging Handler',
+ long_description=readme(),
+ classifiers=[
+ 'Topic :: System :: Logging',
+ 'Topic :: Internet :: Log Analysis',
+ 'License :: OSI Approved :: Apache Software License',
+ ],
+ author='Zack Williams',
+ author_email='zdw@opennetworking.org',
+ packages=['kafkaloghandler'],
+ license='Apache v2',
+ install_requires=[
+ 'kafka>=1.3.5',
+ ],
+ include_package_data=True,
+ zip_safe=False,
+ )