Protobuf type for KPI slice metric type events

Change-Id: I397a2c480089aaf254070ba22695f3e63f9a8a69
diff --git a/shovel/main.py b/shovel/main.py
index 3854601..5d46810 100755
--- a/shovel/main.py
+++ b/shovel/main.py
@@ -131,16 +131,17 @@
 def _convert(msg):
     """Convert a graphite key value string to pickle."""
 
-    def extract_slice(ts, data):
-        for object_path, metrics in data.iteritems():
-            for metric_name, value in metrics.iteritems():
+    def extract_slice(ts, prefixes):
+        for object_path, metrics in prefixes.iteritems():
+            for metric_name, value in metrics['metrics'].iteritems():
                 path = '.'.join((object_path, metric_name))
                 yield (path, ts, value)
 
     assert isinstance(msg, dict)
     type = msg.get('type')
     if type == 'slice':
-        extractor, kw = extract_slice, dict(ts=msg['ts'], data=msg['data'])
+        extractor, kw = extract_slice, dict(ts=msg['ts'],
+                                            prefixes=msg['prefixes'])
     else:
         raise Exception('Unknown format')
 
diff --git a/voltha/northbound/diagnostics.py b/voltha/northbound/diagnostics.py
index 1f6e889..b53c901 100644
--- a/voltha/northbound/diagnostics.py
+++ b/voltha/northbound/diagnostics.py
@@ -18,6 +18,7 @@
 """
 Voltha internal diagnostics
 """
+
 import arrow
 import gc
 import structlog
@@ -30,6 +31,7 @@
 from zope.interface import implementer
 
 from common.event_bus import EventBusClient
+from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
 from voltha.registry import IComponent, registry
 
 log = structlog.get_logger()
@@ -72,15 +74,17 @@
                 rss /= 1024
             return rss
 
-        data = dict(
-            type='slice',
+        kpi_event = KpiEvent(
+            type=KpiEventType.slice,
             ts=ts,
-            data={
-                'voltha.internal.{}'.format(self.instance_id): {
-                    'deferreds': deferreds(),
-                    'rss-mb': rss_mb(),
-                }
+            prefixes={
+                'voltha.internal.{}'.format(self.instance_id):
+                    MetricValuePairs(metrics={
+                        'deferreds': deferreds(),
+                        'rss-mb': rss_mb(),
+                    })
             }
         )
-        self.event_bus.publish('kpis', dumps(data))
+
+        self.event_bus.publish('kpis', kpi_event)
         log.debug('periodic-check', ts=ts)
diff --git a/voltha/northbound/kafka/event_bus_publisher.py b/voltha/northbound/kafka/event_bus_publisher.py
index 092626d..921a1de 100644
--- a/voltha/northbound/kafka/event_bus_publisher.py
+++ b/voltha/northbound/kafka/event_bus_publisher.py
@@ -21,6 +21,9 @@
 bus toward the external world.
 """
 import structlog
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
 
 from common.event_bus import EventBusClient
 
@@ -67,5 +70,10 @@
                      event_bus_topic=event_bus_topic)
 
     def forward(self, kafka_topic, msg):
+
+        # convert to JSON string if msg is a protobuf msg
+        if isinstance(msg, Message):
+            msg = dumps(MessageToDict(msg, True, True))
+
         self.kafka_proxy.send_message(kafka_topic, msg)
 
diff --git a/voltha/protos/events.proto b/voltha/protos/events.proto
index 628a3f5..5f95b7f 100644
--- a/voltha/protos/events.proto
+++ b/voltha/protos/events.proto
@@ -13,20 +13,27 @@
     }
 }
 
+
 /*
+ * Struct to convey a dictionary of metric->value pairs. Typically used in
+ * pure shared-timestamp or shared-timestamp + shared object prefix situations.
+ */
+message MetricValuePairs {
+
+    // Metric / value pairs.
+    map<string, float> metrics = 1;
+
+}
+
+
 message KpiEvent {
 
-    KpiEventType type = 1;
+    KpiEventType.KpiEventType type = 1;
 
     // Fields used when for slice:
 
     float ts = 2;  // UTC time-stamp of data in slice mode (seconds since epoc)
 
-    message MetricSamples {
-        map<string, float> metric_samples = 1;
-    }
-
-    map<string, MetricSamples> data = 3;
+    map<string, MetricValuePairs> prefixes = 3;
 
 }
-*/