| #!/usr/bin/env python |
| # -*- coding: utf-8 -*- |
| # |
| # Copyright 2017 the original author or authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # This is a very simple implementation of a dashboard creation service that |
| # listens to the Kafka bus on the voltha.kpis toic looking for performance |
| # monitoring metrics for olts. If a new olt appears on the bus the service will |
| # create a dashboard for it for both packet and byte stats creating one row per |
| # port/stream and in each row one panel for packet stats and one for byte |
| # stats. |
| # |
| # TODO: Capture all of the metadata for existing dashboards from Grafana. We're |
| # only capturing the device and device id from the title which is good enough |
| # for now. |
| # TODO: Leverage Grafana to act as a template builder simplifying the |
| # specification of a template without having to resort to a separate API for |
| # the dashd service. The basic premise is a dashboard with any name except |
| # voltha.template is created for any device. Once happy with the dashboard it's |
| # renamed voltha.template and this will automatically trigger the creation of a |
| # new template to use for all dashboards. All existing dashboards are |
| # immediately deleted and new ones are created using the template. The template |
| # is renamed voltha.template.active and can be deleted at this point. This has |
| # been started. |
| |
| # |
| # Metadata format. |
| # The metadata for each device from which relevant metrics are recieved are |
| # stored in a dash_meta dictionary structure as follows. |
| # |
| # {<device_id1>: { |
| # device:<device_type>, |
| # slug:<grafana_slug>, |
| # timer: <timer_val> |
| # created: <creation_status> |
| # ports: { |
| # <port_id>:[ |
| # <metric1>, |
| # <metric2>, |
| # ..., |
| # <metricN> |
| # ] |
| # } |
| # }, |
| # ... |
| # <device_idN>: { |
| # } |
| # } |
| # |
| |
| from structlog import get_logger |
| from argparse import ArgumentParser |
| |
| from afkak.client import KafkaClient |
| from afkak.common import ( |
| KafkaUnavailableError, |
| OFFSET_LATEST) |
| from afkak.consumer import Consumer |
| from twisted.internet import reactor |
| from twisted.internet.defer import DeferredList, inlineCallbacks |
| from twisted.python.failure import Failure |
| from twisted.internet.task import LoopingCall |
| |
| from common.utils.consulhelpers import get_endpoint_from_consul |
| import requests |
| import json |
| import re |
| import sys |
| import time |
| from dashd.dash_template import DashTemplate |
| |
| log = get_logger() |
| |
| |
| class DashDaemon(object): |
| def __init__(self, consul_endpoint, kafka_endpoint, grafana_url, topic="voltha.heartbeat"): |
| #logging.basicConfig( |
| # format='%(asctime)s:%(name)s:' + |
| # '%(levelname)s:%(process)d:%(message)s', |
| # level=logging.INFO |
| #) |
| self.dash_meta = {} |
| self.timer_resolution = 10 |
| self.timer_duration = 600 |
| self.topic = topic |
| self.dash_template = DashTemplate(grafana_url) |
| self.grafana_url = grafana_url |
| self.kafka_endpoint = kafka_endpoint |
| self.consul_endpoint = consul_endpoint |
| |
| if kafka_endpoint.startswith('@'): |
| retrys = 10 |
| while True: |
| try: |
| self.kafka_endpoint = get_endpoint_from_consul( |
| self.consul_endpoint, kafka_endpoint[1:]) |
| break |
| except: |
| log.error("unable-to-communicate-with-consul") |
| self.stop() |
| retrys -= 1 |
| if retrys == 0: |
| log.error("unable-to-communicate-with-consul") |
| self.stop() |
| time.sleep(10) |
| |
| self.on_start_callback = None |
| |
| self._client = KafkaClient(self.kafka_endpoint) |
| self._consumer_list = [] # List of consumers |
| # List of deferred returned from consumers' start() methods |
| self._consumer_d_list = [] |
| |
| def set_on_start_callback(self, on_start_callback): |
| # This function is currently unused, future requirements. |
| self.on_start_callback = on_start_callback |
| return self |
| |
| @inlineCallbacks |
| def start(self): |
| partitions = [] |
| try: |
| while not partitions: |
| yield self._client.load_metadata_for_topics(self.topic) |
| #self._client.load_metadata_for_topics(self.topic) |
| e = self._client.metadata_error_for_topic(self.topic) |
| if e: |
| log.warning('no-metadata-for-topic', error=e, |
| topic=self.topic) |
| else: |
| partitions = self._client.topic_partitions[self.topic] |
| break |
| time.sleep(20) |
| except KafkaUnavailableError: |
| log.error("unable-to-communicate-with-Kafka-brokers") |
| self.stop() |
| |
| def _note_consumer_stopped(result, consumer): |
| log.info('consumer-stopped', consumer=consumer, |
| result=result) |
| |
| for partition in partitions: |
| c = Consumer(self._client, self.topic, partition, |
| self.msg_processor) |
| self._consumer_list.append(c) |
| log.info('consumer-started', topic=self.topic, partition=partition) |
| d = c.start(OFFSET_LATEST) |
| d.addBoth(_note_consumer_stopped, c) |
| self._consumer_d_list.append(d) |
| |
| # Now read the list of existing dashboards from Grafana and create the |
| # dictionary of dashboard timers. If we've crashed there will be |
| # dashboards there. Just add them and if they're no longer valid |
| # they'll be deleted. If they are valid then they'll persist. |
| #print("Starting main loop") |
| try: |
| retrys = 10 |
| while True: |
| r = requests.get(self.grafana_url + "/datasources") |
| if r.status_code == requests.codes.ok: |
| break |
| else: |
| retrys -= 1 |
| if retrys == 0: |
| log.error("unable-to-communicate-with-grafana") |
| self.stop() |
| time.sleep(10) |
| j = r.json() |
| data_source = False |
| for i in j: |
| if i["name"] == "Voltha Stats": |
| data_source = True |
| break |
| if not data_source: |
| r = requests.post(self.grafana_url + "/datasources", |
| data = {"name":"Voltha Stats","type":"graphite", |
| "access":"proxy","url":"http://localhost:81"}) |
| log.info('data-source-added',status=r.status_code, text=r.text) |
| |
| retrys = 10 |
| while True: |
| r = requests.get(self.grafana_url + "/search?") |
| if r.status_code == requests.codes.ok: |
| break |
| else: |
| retrys -= 1 |
| if retrys == 0: |
| log.error("unable-to-communicate-with-grafana") |
| self.stop() |
| time.sleep(10) |
| j = r.json() |
| for i in j: |
| # Look for dashboards that have a title of *olt.[[:hexidgit:]]. |
| # These will be the ones of interest. Others should just be left |
| # alone. |
| #print(i['title']) |
| match = re.search(r'(.*olt)\.([0-9a-zA-Z]+)',i['title']) |
| if match and match.lastindex > 0: |
| #print(match.group(1), match.group(2)) |
| self.dash_meta[match.group(2)] = {} |
| self.dash_meta[match.group(2)]['timer'] = self.timer_duration # 10 min |
| self.dash_meta[match.group(2)]['device'] = match.group(1) |
| self.dash_meta[match.group(2)]['created'] = False |
| self.dash_meta[match.group(2)]['ports'] = {} |
| # TODO: We should really capture all of the chart data |
| # including the rows, panels, and data points being logged. |
| # This is good enough for now though to determine if |
| # there's already a dashboard for a given device. |
| |
| |
| def countdown_processor(): |
| # Called every X (timer_resolution) seconds to count down each of the |
| # dash timers. If a timer reaches 0 the corresponding |
| # dashboard is removed. |
| #log.info("Counting down.") |
| try: |
| for dashboard in self.dash_meta.keys(): |
| #print("Counting down %s." %dashboard) |
| # Issue a log if the counter decrement is somewhat relevant |
| if(self.dash_meta[dashboard]['timer'] % 100 == 0 and \ |
| self.dash_meta[dashboard]['timer'] != self.timer_duration): |
| log.info("counting-down",dashboard=dashboard, |
| timer=self.dash_meta[dashboard]['timer']) |
| self.dash_meta[dashboard]['timer'] -= self.timer_resolution |
| if self.dash_meta[dashboard]['timer'] <= 0: |
| # Delete the dashboard here |
| log.info("FIXME:-Should-delete-the-dashboard-here", |
| dashboard=dashboard) |
| pass |
| except: |
| e = sys.exc_info() |
| log.error("error", error=e) |
| # Start the dashboard countdown processor |
| log.info("starting-countdown-processor") |
| lc = LoopingCall(countdown_processor) |
| lc.start(self.timer_resolution) |
| |
| @inlineCallbacks |
| def template_checker(): |
| try: |
| # Called every so often (timer_resolution seconds because it's |
| # convenient) to check if a template dashboard has been defined |
| # in Grafana. If it has been, replace the built in template |
| # with the one provided |
| r = requests.get(self.grafana_url + "/search?query=template") |
| db = r.json() |
| if len(db) == 1: |
| # Apply the template |
| yield self.dash_template.apply_template(db[0]) |
| elif len(db) != 0: |
| # This is an error, log it. |
| log.warning("More-than-one-template-provided-ignoring") |
| except: |
| e = sys.exc_info() |
| log.error("error", error=e) |
| |
| log.info("starting-template-checker") |
| lc = LoopingCall(template_checker) |
| lc.start(self.timer_resolution) |
| |
| except: |
| e = sys.exc_info() |
| log.error("error", error=e) |
| |
| def stop(self): |
| log.info("\n") |
| log.info('end-of-execution-stopping-consumers') |
| # Ask each of our consumers to stop. When a consumer fully stops, it |
| # fires the deferred returned from its start() method. We saved all |
| # those deferreds away (above, in start()) in self._consumer_d_list, |
| # so now we'll use a DeferredList to wait for all of them... |
| for consumer in self._consumer_list: |
| consumer.stop() |
| dl = DeferredList(self._consumer_d_list) |
| |
| # Once the consumers are all stopped, then close our client |
| def _stop_client(result): |
| if isinstance(result, Failure): |
| log.error('error', result=result) |
| else: |
| log.info('all-consumers-stopped', client=self._client) |
| self._client.close() |
| return result |
| |
| dl.addBoth(_stop_client) |
| |
| # And once the client is shutdown, stop the reactor |
| def _stop_reactor(result): |
| reactor.stop() |
| return result |
| |
| dl.addBoth(_stop_reactor) |
| |
| def check_for_dashboard(self, msg): |
| need_dash = {} |
| done = {} |
| # Extract the ids for all olt(s) in the message and do one of 2 |
| # things. If it exists, reset the meta_data timer for the dashboard and |
| # if it doesn't exist add it to the array of needed dashboards. |
| metrics = json.loads(getattr(msg.message,'value'))['prefixes'] |
| for key in metrics.keys(): |
| match = re.search(r'voltha\.(.*olt)\.([0-9a-zA-Z]+)\.(.*)',key) |
| if match and match.lastindex > 1: |
| if match.group(2) in self.dash_meta and match.group(2) not in done: |
| # Update the delete countdown timer |
| self.dash_meta[match.group(2)]['timer'] = self.timer_duration |
| done[match.group(2)] = True |
| # Issue a log if the reset if somewhat relevant. |
| if self.dash_meta[match.group(2)]['timer'] < \ |
| self.timer_duration - self.timer_resolution: |
| log.info("reset-timer",device=match.group(2)) |
| #print("reset timer for: %s" %match.group(2)) |
| else: |
| # No dahsboard exists, |
| need_dash[key] = metrics[key] |
| return need_dash |
| |
| def create_dashboards(self, createList): |
| dataIds = "ABCDEFGHIJKLMNOP" |
| for dash in createList: |
| #log.info("creating a dashboard for: %s" % self.dash_meta[dash]) |
| # Create one row per "interface" |
| # Create one panel per metric type for the time being it's one |
| # panel for byte stats and one panel for packet stats. |
| newDash = json.loads(self.dash_template.dashBoard) |
| newDash['dashboard']['title'] = self.dash_meta[dash]['device'] + \ |
| '.' + dash |
| # The port is the main grouping attribute |
| for port in self.dash_meta[dash]['ports']: |
| # Add in the rows for the port specified by the template |
| for row in self.dash_template.rows: |
| r = json.loads(self.dash_template.dashRow) |
| r['title'] = re.sub(r'%port%',port, row['title']) |
| p = {} |
| # Add the panels to the row per the template |
| panelId = 1 |
| for panel in self.dash_template.panels: |
| p = json.loads(self.dash_template.dashPanel) |
| p['id'] = panelId |
| panelId += 1 |
| p['title'] = re.sub(r'%port%', port.upper(), panel['title']) |
| t = {} |
| dataId = 0 |
| # Add the targets to the panel |
| for dpoint in sorted(self.dash_meta[dash]['ports'][port]): |
| if dpoint in panel: |
| t['refId'] = dataIds[dataId] |
| db = re.sub(r'%port%',port,panel[dpoint]) |
| db = re.sub(r'%device%', |
| self.dash_meta[dash]['device'],db) |
| db = re.sub(r'%deviceId%', dash,db) |
| t['target'] = db |
| p['targets'].append(t.copy()) |
| dataId += 1 |
| r['panels'].append(p.copy()) |
| newDash['dashboard']['rows'].append(r.copy()) |
| #print("NEW DASHBOARD: ",json.dumps(newDash)) |
| #print(r.json()) |
| r = \ |
| requests.post(self.grafana_url + "/dashboards/db", |
| json=newDash) |
| self.dash_meta[dash]['slug'] = r.json()['slug'] |
| self.dash_meta[dash]['created'] = True |
| log.info("created-dashboard", slug=self.dash_meta[dash]['slug']) |
| |
| def msg_processor(self, consumer, msglist): |
| try: |
| createList = [] |
| for msg in msglist: |
| # Reset the timer for existing dashboards and get back a dict |
| # of of dashboards to create if any. |
| need_dash = self.check_for_dashboard(msg) |
| # Now populate the meta data for all missing dashboards |
| for key in need_dash.keys(): |
| match = re.search(r'voltha\.(.*olt)\.([0-9a-zA-Z]+)\.(.*)',key) |
| if match and match.lastindex > 2: |
| if match.group(2) in self.dash_meta: |
| # The entry will have been created when the first |
| # port in the record was encountered so just |
| # populate the metrics and port info. |
| # TODO: The keys below are the names of the metrics |
| # that are in the Kafka record. This auto-discovery |
| # is fine if all that's needed are raw metrics. If |
| # metrics are "cooked" by a downstream process and |
| # subsequently fed to graphite/carbon without being |
| # re-posted to Kafka, discovery becomes impossible. |
| # In those cases and in cases where finer grain |
| # control of what's displayed is required, a config |
| # file would be necessary. |
| self.dash_meta[match.group(2)]['ports'][match.group(3)] = \ |
| need_dash[key]['metrics'].keys() |
| else: |
| # Not there, create a meta-data record for the |
| # device and add this port. |
| #print("Adding meta data for", match.group(1), |
| # match.group(2)) |
| createList.append(match.group(2)) |
| self.dash_meta[match.group(2)] = {} |
| self.dash_meta[match.group(2)]['timer'] = 600 |
| self.dash_meta[match.group(2)]['device'] = match.group(1) |
| self.dash_meta[match.group(2)]['created'] = False |
| self.dash_meta[match.group(2)]['ports'] = {} |
| #print("Adding port", match.group(3), "to", match.group(1), |
| # match.group(2)) |
| self.dash_meta[match.group(2)]['ports'][match.group(3)] = \ |
| need_dash[key]['metrics'].keys() |
| # Now go ahead and create the dashboards using the meta data that |
| # wwas just populated for them. |
| if len(createList) != 0: # Create any missing dashboards. |
| self.create_dashboards(createList) |
| except: |
| e = sys.exc_info() |
| log.error("error", error=e) |
| |
| def parse_options(): |
| parser = ArgumentParser("Manage Grafana Dashboards") |
| parser.add_argument("-c", "--consul", |
| help="consul ip and port", |
| default='10.100.198.220:8500') |
| |
| parser.add_argument("-t", "--topic", |
| help="topic to listen from", |
| default="voltha.kpis") |
| |
| parser.add_argument("-g", "--grafana_url", |
| help="graphana api url", |
| default= "http://admin:admin@localhost:8882/api") |
| |
| parser.add_argument("-k", "--kafka", |
| help="kafka bus", |
| default=None) |
| |
| parser.add_argument("-s", "--host", |
| help="docker host ip", |
| default=None) |
| |
| return parser.parse_args() |
| |
| def main(): |
| logging.basicConfig( |
| format='%(asctime)s:%(name)s:' + |
| '%(levelname)s:%(process)d:%(message)s', |
| level=logging.INFO |
| ) |
| |
| args = parse_options() |
| |
| dashd = DashDaemon(args.consul, args.kafka, args.grafana_url, args.topic) |
| reactor.callWhenRunning(dashd.start) |
| reactor.run() |
| log.info("completed!") |
| |
| |
| if __name__ == "__main__": |
| main() |