Reformatting kafka logs

Change-Id: I853d3db2324c6cf96ccd5c7d08fe295299e38688
diff --git a/common/utils/consulhelpers.py b/common/utils/consulhelpers.py
index 1d2b88f..681aaf4 100644
--- a/common/utils/consulhelpers.py
+++ b/common/utils/consulhelpers.py
@@ -28,8 +28,9 @@
 def get_endpoint_from_consul(consul_endpoint, service_name):
     """Look up, from consul, the service name specified by service-name
     """
-    log.debug('Retrieving endpoint {} from consul {}'.format(service_name,
-                                                             consul_endpoint))
+    log.debug('getting-service-endpoint', consul=consul_endpoint,
+              service=service_name)
+
     host = consul_endpoint.split(':')[0].strip()
     port = int(consul_endpoint.split(':')[1].strip())
 
diff --git a/common/utils/dockerhelpers.py b/common/utils/dockerhelpers.py
index 9379e2d..89acab6 100644
--- a/common/utils/dockerhelpers.py
+++ b/common/utils/dockerhelpers.py
@@ -80,7 +80,7 @@
         docker_cli = Client(base_url=docker_socket)
         info = docker_cli.inspect_container(id)
     except Exception, e:
-        log.debug('failed: {}'.format(e.message))
+        log.exception('failed-inspect-container', id=id, e=e)
         raise
 
     return info
@@ -94,7 +94,7 @@
         host_config = docker_cli.create_host_config(binds=binds,
                                                     port_bindings=port_bindings)
     except Exception, e:
-        log.exception('failed host config creation', volumes, ports, e=e)
+        log.exception('failed-host-config-creation', volumes, ports, e=e)
         raise
 
     return host_config
@@ -103,8 +103,9 @@
     try:
         docker_cli = Client(base_url=docker_socket)
         docker_cli.connect_container_to_network(container, net_id, links=links)
-    except:
-        log.exception('Failed to connect container {} to network {}'.format(container, net_id))
+    except Exception, e:
+        log.exception('failed-connection-to-network',
+                      container=container, net_id=net_id, e=e)
         raise
 
 def create_networking_config(name, links):
@@ -120,7 +121,7 @@
             name : docker_cli.create_endpoint_config(links=links)
         })
     except Exception, e:
-        log.exception('failed network creation', name, e=e)
+        log.exception('failed-network-creation', name, e=e)
         raise
 
     return networking_config
@@ -165,7 +166,7 @@
     def __init__(self, threads=1):
         self.client = CustomClient(base_url=docker_socket)
         self.events = self.client.events(decode=True)
-        log.info("Starting event processor with {} threads".format(threads))
+        log.info("starting", threads=threads)
         self.exec_service = ThreadPoolExecutor(max_workers=threads)
 
     def stop_listening(self):
@@ -200,7 +201,7 @@
             for k in ['time', 'Time']:
                 if k in event:
                     event[k] = datetime.fromtimestamp(event[k])
-            log.debug('docker event: {}'.format(event))
+            log.debug('docker-event', event=event)
 
             data = {}
             i = get_id(event)
@@ -212,14 +213,14 @@
                         data = self.client.inspect_image(i)
                     data[i] = data
                 except errors.NotFound:
-                    log.debug('No data for container {}'.format(i))
+                    log.debug('no-data-for-container', container=i)
 
             status = get_status(event)
             if status in handlers:
                 self.exec_service.submit(handlers[get_status(event)], event,
                                          data, handlers['podder_config'])
             else:
-                log.debug("No handler for {}; skipping...".format(status))
+                log.debug('no-handler', handler=status)
 
 class CustomGenerator(object):
     """
diff --git a/kafka/kafka-consumer.py b/kafka/kafka-consumer.py
index 47d788e..7ed354d 100644
--- a/kafka/kafka-consumer.py
+++ b/kafka/kafka-consumer.py
@@ -39,16 +39,17 @@
                 yield self._client.load_metadata_for_topics(self.topic)
                 e = self._client.metadata_error_for_topic(self.topic)
                 if e:
-                    log.warning("Error: %r getting metadata for topic: %s",
-                                e, self.topic)
+                    log.warning('no-metadata-for-topic', error=e,
+                                topic=self.topic)
                 else:
                     partitions = self._client.topic_partitions[self.topic]
         except KafkaUnavailableError:
-            log.error("Unable to communicate with any Kafka brokers")
+            log.error("unable-to-communicate-with-Kafka-brokers")
             self.stop()
 
         def _note_consumer_stopped(result, consumer):
-            log.info("Consumer: %r stopped with result: %r", consumer, result)
+            log.info('consumer-stopped', consumer=consumer,
+                     result=result)
 
         for partition in partitions:
             c = Consumer(self._client, self.topic, partition,
@@ -63,7 +64,7 @@
 
     def stop(self):
         log.info("\n")
-        log.info("Time is up, stopping consumers...")
+        log.info('end-of-execution-stopping-consumers')
         # Ask each of our consumers to stop. When a consumer fully stops, it
         # fires the deferred returned from its start() method. We saved all
         # those deferreds away (above, in start()) in self._consumer_d_list,
@@ -75,10 +76,9 @@
         # Once the consumers are all stopped, then close our client
         def _stop_client(result):
             if isinstance(result, Failure):
-                log.error("Error stopping consumers: %r", result)
+                log.error('error', result=result)
             else:
-                log.info("All consumers stopped. Stopping client: %r",
-                         self._client)
+                log.info('all-consumers-stopped', client=self._client)
             self._client.close()
             return result
 
@@ -93,7 +93,7 @@
 
     def msg_processor(self, consumer, msglist):
         for msg in msglist:
-            log.info("proc: msg: %r", msg)
+            log.info('proc', msg=msg)
 
 def parse_options():
     parser = ArgumentParser("Consume kafka messages")
@@ -123,7 +123,7 @@
                                        int(args.runtime))
     reactor.callWhenRunning(consumer_example.start)
     reactor.run()
-    log.info("All Done!")
+    log.info("completed!")
 
 
 if __name__ == "__main__":
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 5b9e161..4ee3e66 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -48,9 +48,7 @@
         if KafkaProxy._kafka_instance:
             raise Exception('Singleton exist for :{}'.format(KafkaProxy))
 
-        log.info('KafkaProxy init with kafka endpoint:{}'.format(
-            kafka_endpoint))
-
+        log.debug('initializing', endpoint=kafka_endpoint)
         self.ack_timeout = ack_timeout
         self.max_req_attempts = max_req_attempts
         self.consul_endpoint = consul_endpoint
@@ -59,6 +57,7 @@
         self.kclient = None
         self.kproducer = None
         self.event_bus_publisher = None
+        log.debug('initialized', endpoint=kafka_endpoint)
 
     @inlineCallbacks
     def start(self):
@@ -83,12 +82,11 @@
             try:
                 _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
                                                        self.kafka_endpoint[1:])
-                log.info(
-                    'Found kafka service at {}'.format(_k_endpoint))
+                log.debug('found-kafka-service', endpoint=_k_endpoint)
 
             except Exception as e:
-                log.error('Failure to locate a kafka service from '
-                               'consul {}:'.format(repr(e)))
+                log.exception('no-kafka-service-in-consul', e=e)
+
                 self.kproducer = None
                 self.kclient = None
                 return