Protobuf type for KPI slice metric type events
Change-Id: I397a2c480089aaf254070ba22695f3e63f9a8a69
diff --git a/shovel/main.py b/shovel/main.py
index 3854601..5d46810 100755
--- a/shovel/main.py
+++ b/shovel/main.py
@@ -131,16 +131,17 @@
def _convert(msg):
"""Convert a graphite key value string to pickle."""
- def extract_slice(ts, data):
- for object_path, metrics in data.iteritems():
- for metric_name, value in metrics.iteritems():
+ def extract_slice(ts, prefixes):
+ for object_path, metrics in prefixes.iteritems():
+ for metric_name, value in metrics['metrics'].iteritems():
path = '.'.join((object_path, metric_name))
yield (path, ts, value)
assert isinstance(msg, dict)
type = msg.get('type')
if type == 'slice':
- extractor, kw = extract_slice, dict(ts=msg['ts'], data=msg['data'])
+ extractor, kw = extract_slice, dict(ts=msg['ts'],
+ prefixes=msg['prefixes'])
else:
raise Exception('Unknown format')
diff --git a/voltha/northbound/diagnostics.py b/voltha/northbound/diagnostics.py
index 1f6e889..b53c901 100644
--- a/voltha/northbound/diagnostics.py
+++ b/voltha/northbound/diagnostics.py
@@ -18,6 +18,7 @@
"""
Voltha internal diagnostics
"""
+
import arrow
import gc
import structlog
@@ -30,6 +31,7 @@
from zope.interface import implementer
from common.event_bus import EventBusClient
+from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
from voltha.registry import IComponent, registry
log = structlog.get_logger()
@@ -72,15 +74,17 @@
rss /= 1024
return rss
- data = dict(
- type='slice',
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
ts=ts,
- data={
- 'voltha.internal.{}'.format(self.instance_id): {
- 'deferreds': deferreds(),
- 'rss-mb': rss_mb(),
- }
+ prefixes={
+ 'voltha.internal.{}'.format(self.instance_id):
+ MetricValuePairs(metrics={
+ 'deferreds': deferreds(),
+ 'rss-mb': rss_mb(),
+ })
}
)
- self.event_bus.publish('kpis', dumps(data))
+
+ self.event_bus.publish('kpis', kpi_event)
log.debug('periodic-check', ts=ts)
diff --git a/voltha/northbound/kafka/event_bus_publisher.py b/voltha/northbound/kafka/event_bus_publisher.py
index 092626d..921a1de 100644
--- a/voltha/northbound/kafka/event_bus_publisher.py
+++ b/voltha/northbound/kafka/event_bus_publisher.py
@@ -21,6 +21,9 @@
bus toward the external world.
"""
import structlog
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
from common.event_bus import EventBusClient
@@ -67,5 +70,10 @@
event_bus_topic=event_bus_topic)
def forward(self, kafka_topic, msg):
+
+ # convert to JSON string if msg is a protobuf msg
+ if isinstance(msg, Message):
+ msg = dumps(MessageToDict(msg, True, True))
+
self.kafka_proxy.send_message(kafka_topic, msg)
diff --git a/voltha/protos/events.proto b/voltha/protos/events.proto
index 628a3f5..5f95b7f 100644
--- a/voltha/protos/events.proto
+++ b/voltha/protos/events.proto
@@ -13,20 +13,27 @@
}
}
+
/*
+ * Struct to convey a dictionary of metric->value pairs. Typically used in
+ * pure shared-timestamp or shared-timestamp + shared object prefix situations.
+ */
+message MetricValuePairs {
+
+ // Metric / value pairs.
+ map<string, float> metrics = 1;
+
+}
+
+
message KpiEvent {
- KpiEventType type = 1;
+ KpiEventType.KpiEventType type = 1;
// Fields used when for slice:
float ts = 2; // UTC time-stamp of data in slice mode (seconds since epoc)
- message MetricSamples {
- map<string, float> metric_samples = 1;
- }
-
- map<string, MetricSamples> data = 3;
+ map<string, MetricValuePairs> prefixes = 3;
}
-*/