Time-stamp added and renamed heartbeat
Change-Id: I7f38752aa83572bc6b6b93093546e0fdf47763f5
diff --git a/kafka/kafka-consumer.py b/kafka/kafka-consumer.py
index 3f405d8..0b45f6f 100644
--- a/kafka/kafka-consumer.py
+++ b/kafka/kafka-consumer.py
@@ -20,7 +20,7 @@
class ConsumerExample(object):
- def __init__(self, consul_endpoint, topic="heartbeat.voltha", runtime=60):
+ def __init__(self, consul_endpoint, topic="voltha.heartbeat", runtime=60):
self.topic = topic
self.runtime = runtime
self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
@@ -105,7 +105,7 @@
parser.add_argument("-t", "--topic",
help="topic to listen from",
- default="heartbeat.voltha")
+ default="voltha.heartbeat")
parser.add_argument("-r", "--runtime",
help="total runtime",
diff --git a/tests/itests/docutests/build_md_test.py b/tests/itests/docutests/build_md_test.py
index d8c888c..f4b43b7 100644
--- a/tests/itests/docutests/build_md_test.py
+++ b/tests/itests/docutests/build_md_test.py
@@ -441,7 +441,7 @@
# Verify kafka client is receiving the messages
print "Verify kafka client is receiving the heartbeat messages ..."
- expected_pattern = ['heartbeat.voltha', 'heartbeat']
+ expected_pattern = ['voltha.heartbeat', 'heartbeat']
cmd = command_defs['kafka_client_run_10_secs']
kafka_client_output = run_long_running_command_with_timeout(cmd,
20)
diff --git a/voltha/main.py b/voltha/main.py
index 2262c94..9653674 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -18,12 +18,14 @@
"""Virtual OLT Hardware Abstraction main entry point"""
import argparse
+import arrow
import os
import time
import yaml
from simplejson import dumps
from twisted.internet.defer import inlineCallbacks
+from twisted.internet.task import LoopingCall
from zope.interface import implementer
from common.event_bus import EventBusClient
@@ -371,7 +373,6 @@
def heartbeat():
self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
- from twisted.internet.task import LoopingCall
lc = LoopingCall(heartbeat)
lc.start(10)
@@ -381,19 +382,21 @@
# For heartbeat we will send a message to a specific "voltha-heartbeat"
# topic. The message is a protocol buf
# message
- message = dumps(dict(
+ message = dict(
type='heartbeat',
voltha_instance=instance_id,
ip=get_my_primary_local_ipv4()
- ))
- topic = "heartbeat.voltha"
+ )
+ topic = "voltha.heartbeat"
- from twisted.internet.task import LoopingCall
+ def send_msg():
+ message['ts'] = arrow.utcnow().timestamp
+ kafka_proxy.send_message(topic, dumps(message))
+
kafka_proxy = get_kafka_proxy()
if kafka_proxy:
- lc = LoopingCall(kafka_proxy.send_message, topic, message)
+ lc = LoopingCall(send_msg)
lc.start(10)
- pass
else:
self.log.error('Kafka proxy has not been created!')
diff --git a/voltha/northbound/diagnostics.py b/voltha/northbound/diagnostics.py
index 55ec07b..1f6e889 100644
--- a/voltha/northbound/diagnostics.py
+++ b/voltha/northbound/diagnostics.py
@@ -23,6 +23,7 @@
import structlog
import resource
+import sys
from simplejson import dumps
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
@@ -66,7 +67,10 @@
return len(gc.get_referrers(Deferred))
def rss_mb():
- return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/1024/1024
+ rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/1024
+ if sys.platform.startswith('darwin'):
+ rss /= 1024
+ return rss
data = dict(
type='slice',