Reformatting kafka logs

Change-Id: I853d3db2324c6cf96ccd5c7d08fe295299e38688
diff --git a/kafka/kafka-consumer.py b/kafka/kafka-consumer.py
index 47d788e..7ed354d 100644
--- a/kafka/kafka-consumer.py
+++ b/kafka/kafka-consumer.py
@@ -39,16 +39,17 @@
                 yield self._client.load_metadata_for_topics(self.topic)
                 e = self._client.metadata_error_for_topic(self.topic)
                 if e:
-                    log.warning("Error: %r getting metadata for topic: %s",
-                                e, self.topic)
+                    log.warning('no-metadata-for-topic', error=e,
+                                topic=self.topic)
                 else:
                     partitions = self._client.topic_partitions[self.topic]
         except KafkaUnavailableError:
-            log.error("Unable to communicate with any Kafka brokers")
+            log.error("unable-to-communicate-with-Kafka-brokers")
             self.stop()
 
         def _note_consumer_stopped(result, consumer):
-            log.info("Consumer: %r stopped with result: %r", consumer, result)
+            log.info('consumer-stopped', consumer=consumer,
+                     result=result)
 
         for partition in partitions:
             c = Consumer(self._client, self.topic, partition,
@@ -63,7 +64,7 @@
 
     def stop(self):
         log.info("\n")
-        log.info("Time is up, stopping consumers...")
+        log.info('end-of-execution-stopping-consumers')
         # Ask each of our consumers to stop. When a consumer fully stops, it
         # fires the deferred returned from its start() method. We saved all
         # those deferreds away (above, in start()) in self._consumer_d_list,
@@ -75,10 +76,9 @@
         # Once the consumers are all stopped, then close our client
         def _stop_client(result):
             if isinstance(result, Failure):
-                log.error("Error stopping consumers: %r", result)
+                log.error('error', result=result)
             else:
-                log.info("All consumers stopped. Stopping client: %r",
-                         self._client)
+                log.info('all-consumers-stopped', client=self._client)
             self._client.close()
             return result
 
@@ -93,7 +93,7 @@
 
     def msg_processor(self, consumer, msglist):
         for msg in msglist:
-            log.info("proc: msg: %r", msg)
+            log.info('proc', msg=msg)
 
 def parse_options():
     parser = ArgumentParser("Consume kafka messages")
@@ -123,7 +123,7 @@
                                        int(args.runtime))
     reactor.callWhenRunning(consumer_example.start)
     reactor.run()
-    log.info("All Done!")
+    log.info("completed!")
 
 
 if __name__ == "__main__":