Sergio Slobodrian | f39aaf8 | 2017-02-28 16:10:16 -0500 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # -*- coding: utf-8 -*- |
| 3 | # |
| 4 | # Copyright 2017 the original author or authors. |
| 5 | # |
| 6 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 7 | # you may not use this file except in compliance with the License. |
| 8 | # You may obtain a copy of the License at |
| 9 | # |
| 10 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | # |
| 12 | # Unless required by applicable law or agreed to in writing, software |
| 13 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | # See the License for the specific language governing permissions and |
| 16 | # limitations under the License. |
| 17 | # |
| 18 | |
| 19 | # This is a very simple implementation of a dashboard creation service that |
| 20 | # listens to the Kafka bus on the voltha.kpis toic looking for performance |
| 21 | # monitoring metrics for olts. If a new olt appears on the bus the service will |
| 22 | # create a dashboard for it for both packet and byte stats creating one row per |
| 23 | # port/stream and in each row one panel for packet stats and one for byte |
| 24 | # stats. |
| 25 | # |
| 26 | # TODO: Capture all of the metadata for existing dashboards from Grafana. We're |
| 27 | # only capturing the device and device id from the title which is good enough |
| 28 | # for now. |
| 29 | # TODO: Leverage Grafana to act as a template builder simplifying the |
| 30 | # specification of a template without having to resort to a separate API for |
| 31 | # the dashd service. The basic premise is a dashboard with any name except |
| 32 | # voltha.template is created for any device. Once happy with the dashboard it's |
| 33 | # renamed voltha.template and this will automatically trigger the creation of a |
| 34 | # new template to use for all dashboards. All existing dashboards are |
| 35 | # immediately deleted and new ones are created using the template. The template |
| 36 | # is renamed voltha.template.active and can be deleted at this point. This has |
| 37 | # been started. |
| 38 | |
| 39 | # |
| 40 | # Metadata format. |
| 41 | # The metadata for each device from which relevant metrics are recieved are |
| 42 | # stored in a dash_meta dictionary structure as follows. |
| 43 | # |
| 44 | # {<device_id1>: { |
| 45 | # device:<device_type>, |
| 46 | # slug:<grafana_slug>, |
| 47 | # timer: <timer_val> |
| 48 | # created: <creation_status> |
| 49 | # ports: { |
| 50 | # <port_id>:[ |
| 51 | # <metric1>, |
| 52 | # <metric2>, |
| 53 | # ..., |
| 54 | # <metricN> |
| 55 | # ] |
| 56 | # } |
| 57 | # }, |
| 58 | # ... |
| 59 | # <device_idN>: { |
| 60 | # } |
| 61 | # } |
| 62 | # |
| 63 | |
| 64 | from structlog import get_logger |
| 65 | from argparse import ArgumentParser |
| 66 | |
| 67 | from afkak.client import KafkaClient |
| 68 | from afkak.common import ( |
| 69 | KafkaUnavailableError, |
| 70 | OFFSET_LATEST) |
| 71 | from afkak.consumer import Consumer |
| 72 | from twisted.internet import reactor |
| 73 | from twisted.internet.defer import DeferredList, inlineCallbacks |
| 74 | from twisted.python.failure import Failure |
| 75 | from twisted.internet.task import LoopingCall |
| 76 | |
| 77 | from common.utils.consulhelpers import get_endpoint_from_consul |
| 78 | import requests |
| 79 | import json |
| 80 | import re |
| 81 | import sys |
Sergio Slobodrian | 61999e5 | 2017-04-03 19:09:11 -0400 | [diff] [blame] | 82 | import time |
Sergio Slobodrian | f39aaf8 | 2017-02-28 16:10:16 -0500 | [diff] [blame] | 83 | from dashd.dash_template import DashTemplate |
| 84 | |
| 85 | log = get_logger() |
| 86 | |
| 87 | |
| 88 | class DashDaemon(object): |
| 89 | def __init__(self, consul_endpoint, grafana_url, topic="voltha.heartbeat"): |
| 90 | #logging.basicConfig( |
| 91 | # format='%(asctime)s:%(name)s:' + |
| 92 | # '%(levelname)s:%(process)d:%(message)s', |
| 93 | # level=logging.INFO |
| 94 | #) |
| 95 | self.dash_meta = {} |
| 96 | self.timer_resolution = 10 |
| 97 | self.timer_duration = 600 |
| 98 | self.topic = topic |
| 99 | self.dash_template = DashTemplate(grafana_url) |
| 100 | self.grafana_url = grafana_url |
Sergio Slobodrian | 61999e5 | 2017-04-03 19:09:11 -0400 | [diff] [blame] | 101 | self.kafka_endpoint = None |
| 102 | self.consul_endpoint = consul_endpoint |
Sergio Slobodrian | c2e4ccc | 2017-04-21 13:31:04 -0400 | [diff] [blame] | 103 | retrys = 10 |
Sergio Slobodrian | 61999e5 | 2017-04-03 19:09:11 -0400 | [diff] [blame] | 104 | while True: |
| 105 | try: |
| 106 | self.kafka_endpoint = get_endpoint_from_consul(self.consul_endpoint, |
| 107 | 'kafka') |
| 108 | break |
| 109 | except: |
| 110 | log.error("unable-to-communicate-with-consul") |
Sergio Slobodrian | c2e4ccc | 2017-04-21 13:31:04 -0400 | [diff] [blame] | 111 | self.stop() |
| 112 | retrys -= 1 |
| 113 | if retrys == 0: |
| 114 | log.error("unable-to-communicate-with-consul") |
| 115 | self.stop() |
Sergio Slobodrian | 61999e5 | 2017-04-03 19:09:11 -0400 | [diff] [blame] | 116 | time.sleep(10) |
Sergio Slobodrian | f39aaf8 | 2017-02-28 16:10:16 -0500 | [diff] [blame] | 117 | self.on_start_callback = None |
| 118 | |
| 119 | self._client = KafkaClient(self.kafka_endpoint) |
| 120 | self._consumer_list = [] # List of consumers |
| 121 | # List of deferred returned from consumers' start() methods |
| 122 | self._consumer_d_list = [] |
| 123 | |
| 124 | def set_on_start_callback(self, on_start_callback): |
| 125 | # This function is currently unused, future requirements. |
| 126 | self.on_start_callback = on_start_callback |
| 127 | return self |
| 128 | |
| 129 | @inlineCallbacks |
| 130 | def start(self): |
| 131 | partitions = [] |
| 132 | try: |
| 133 | while not partitions: |
| 134 | yield self._client.load_metadata_for_topics(self.topic) |
Sergio Slobodrian | 61999e5 | 2017-04-03 19:09:11 -0400 | [diff] [blame] | 135 | #self._client.load_metadata_for_topics(self.topic) |
Sergio Slobodrian | f39aaf8 | 2017-02-28 16:10:16 -0500 | [diff] [blame] | 136 | e = self._client.metadata_error_for_topic(self.topic) |
| 137 | if e: |
| 138 | log.warning('no-metadata-for-topic', error=e, |
| 139 | topic=self.topic) |
| 140 | else: |
| 141 | partitions = self._client.topic_partitions[self.topic] |
Sergio Slobodrian | 61999e5 | 2017-04-03 19:09:11 -0400 | [diff] [blame] | 142 | break |
| 143 | time.sleep(20) |
Sergio Slobodrian | f39aaf8 | 2017-02-28 16:10:16 -0500 | [diff] [blame] | 144 | except KafkaUnavailableError: |
| 145 | log.error("unable-to-communicate-with-Kafka-brokers") |
| 146 | self.stop() |
| 147 | |
| 148 | def _note_consumer_stopped(result, consumer): |
| 149 | log.info('consumer-stopped', consumer=consumer, |
| 150 | result=result) |
| 151 | |
| 152 | for partition in partitions: |
| 153 | c = Consumer(self._client, self.topic, partition, |
| 154 | self.msg_processor) |
| 155 | self._consumer_list.append(c) |
| 156 | log.info('consumer-started', topic=self.topic, partition=partition) |
| 157 | d = c.start(OFFSET_LATEST) |
| 158 | d.addBoth(_note_consumer_stopped, c) |
| 159 | self._consumer_d_list.append(d) |
| 160 | |
| 161 | # Now read the list of existing dashboards from Grafana and create the |
| 162 | # dictionary of dashboard timers. If we've crashed there will be |
| 163 | # dashboards there. Just add them and if they're no longer valid |
| 164 | # they'll be deleted. If they are valid then they'll persist. |
| 165 | #print("Starting main loop") |
| 166 | try: |
Sergio Slobodrian | c2e4ccc | 2017-04-21 13:31:04 -0400 | [diff] [blame] | 167 | retrys = 10 |
Sergio Slobodrian | 61999e5 | 2017-04-03 19:09:11 -0400 | [diff] [blame] | 168 | while True: |
| 169 | r = requests.get(self.grafana_url + "/datasources") |
| 170 | if r.status_code == requests.codes.ok: |
| 171 | break |
| 172 | else: |
Sergio Slobodrian | c2e4ccc | 2017-04-21 13:31:04 -0400 | [diff] [blame] | 173 | retrys -= 1 |
| 174 | if retrys == 0: |
| 175 | log.error("unable-to-communicate-with-grafana") |
| 176 | self.stop() |
Sergio Slobodrian | 61999e5 | 2017-04-03 19:09:11 -0400 | [diff] [blame] | 177 | time.sleep(10) |
| 178 | j = r.json() |
| 179 | data_source = False |
| 180 | for i in j: |
| 181 | if i["name"] == "Voltha Stats": |
| 182 | data_source = True |
| 183 | break |
| 184 | if not data_source: |
| 185 | r = requests.post(self.grafana_url + "/datasources", |
| 186 | data = {"name":"Voltha Stats","type":"graphite", |
| 187 | "access":"proxy","url":"http://localhost:81"}) |
| 188 | log.info('data-source-added',status=r.status_code, text=r.text) |
| 189 | |
Sergio Slobodrian | c2e4ccc | 2017-04-21 13:31:04 -0400 | [diff] [blame] | 190 | retrys = 10 |
Sergio Slobodrian | 61999e5 | 2017-04-03 19:09:11 -0400 | [diff] [blame] | 191 | while True: |
| 192 | r = requests.get(self.grafana_url + "/search?") |
| 193 | if r.status_code == requests.codes.ok: |
| 194 | break |
| 195 | else: |
Sergio Slobodrian | c2e4ccc | 2017-04-21 13:31:04 -0400 | [diff] [blame] | 196 | retrys -= 1 |
| 197 | if retrys == 0: |
| 198 | log.error("unable-to-communicate-with-grafana") |
| 199 | self.stop() |
Sergio Slobodrian | 61999e5 | 2017-04-03 19:09:11 -0400 | [diff] [blame] | 200 | time.sleep(10) |
Sergio Slobodrian | f39aaf8 | 2017-02-28 16:10:16 -0500 | [diff] [blame] | 201 | j = r.json() |
| 202 | for i in j: |
| 203 | # Look for dashboards that have a title of *olt.[[:hexidgit:]]. |
| 204 | # These will be the ones of interest. Others should just be left |
| 205 | # alone. |
| 206 | #print(i['title']) |
| 207 | match = re.search(r'(.*olt)\.([0-9a-zA-Z]+)',i['title']) |
| 208 | if match and match.lastindex > 0: |
| 209 | #print(match.group(1), match.group(2)) |
| 210 | self.dash_meta[match.group(2)] = {} |
| 211 | self.dash_meta[match.group(2)]['timer'] = self.timer_duration # 10 min |
| 212 | self.dash_meta[match.group(2)]['device'] = match.group(1) |
| 213 | self.dash_meta[match.group(2)]['created'] = False |
| 214 | self.dash_meta[match.group(2)]['ports'] = {} |
| 215 | # TODO: We should really capture all of the chart data |
| 216 | # including the rows, panels, and data points being logged. |
| 217 | # This is good enough for now though to determine if |
| 218 | # there's already a dashboard for a given device. |
| 219 | |
| 220 | |
| 221 | def countdown_processor(): |
| 222 | # Called every X (timer_resolution) seconds to count down each of the |
| 223 | # dash timers. If a timer reaches 0 the corresponding |
| 224 | # dashboard is removed. |
| 225 | #log.info("Counting down.") |
| 226 | try: |
| 227 | for dashboard in self.dash_meta.keys(): |
| 228 | #print("Counting down %s." %dashboard) |
| 229 | # Issue a log if the counter decrement is somewhat relevant |
| 230 | if(self.dash_meta[dashboard]['timer'] % 100 == 0 and \ |
| 231 | self.dash_meta[dashboard]['timer'] != self.timer_duration): |
| 232 | log.info("counting-down",dashboard=dashboard, |
| 233 | timer=self.dash_meta[dashboard]['timer']) |
| 234 | self.dash_meta[dashboard]['timer'] -= self.timer_resolution |
| 235 | if self.dash_meta[dashboard]['timer'] <= 0: |
| 236 | # Delete the dashboard here |
| 237 | log.info("FIXME:-Should-delete-the-dashboard-here", |
| 238 | dashboard=dashboard) |
| 239 | pass |
| 240 | except: |
| 241 | e = sys.exc_info() |
| 242 | log.error("error", error=e) |
| 243 | # Start the dashboard countdown processor |
| 244 | log.info("starting-countdown-processor") |
| 245 | lc = LoopingCall(countdown_processor) |
| 246 | lc.start(self.timer_resolution) |
| 247 | |
| 248 | @inlineCallbacks |
| 249 | def template_checker(): |
| 250 | try: |
| 251 | # Called every so often (timer_resolution seconds because it's |
| 252 | # convenient) to check if a template dashboard has been defined |
| 253 | # in Grafana. If it has been, replace the built in template |
| 254 | # with the one provided |
| 255 | r = requests.get(self.grafana_url + "/search?query=template") |
| 256 | db = r.json() |
| 257 | if len(db) == 1: |
| 258 | # Apply the template |
| 259 | yield self.dash_template.apply_template(db[0]) |
| 260 | elif len(db) != 0: |
| 261 | # This is an error, log it. |
| 262 | log.warning("More-than-one-template-provided-ignoring") |
| 263 | except: |
| 264 | e = sys.exc_info() |
| 265 | log.error("error", error=e) |
| 266 | |
| 267 | log.info("starting-template-checker") |
| 268 | lc = LoopingCall(template_checker) |
| 269 | lc.start(self.timer_resolution) |
| 270 | |
| 271 | except: |
| 272 | e = sys.exc_info() |
| 273 | log.error("error", error=e) |
| 274 | |
| 275 | def stop(self): |
| 276 | log.info("\n") |
| 277 | log.info('end-of-execution-stopping-consumers') |
| 278 | # Ask each of our consumers to stop. When a consumer fully stops, it |
| 279 | # fires the deferred returned from its start() method. We saved all |
| 280 | # those deferreds away (above, in start()) in self._consumer_d_list, |
| 281 | # so now we'll use a DeferredList to wait for all of them... |
| 282 | for consumer in self._consumer_list: |
| 283 | consumer.stop() |
| 284 | dl = DeferredList(self._consumer_d_list) |
| 285 | |
| 286 | # Once the consumers are all stopped, then close our client |
| 287 | def _stop_client(result): |
| 288 | if isinstance(result, Failure): |
| 289 | log.error('error', result=result) |
| 290 | else: |
| 291 | log.info('all-consumers-stopped', client=self._client) |
| 292 | self._client.close() |
| 293 | return result |
| 294 | |
| 295 | dl.addBoth(_stop_client) |
| 296 | |
| 297 | # And once the client is shutdown, stop the reactor |
| 298 | def _stop_reactor(result): |
| 299 | reactor.stop() |
| 300 | return result |
| 301 | |
| 302 | dl.addBoth(_stop_reactor) |
| 303 | |
| 304 | def check_for_dashboard(self, msg): |
| 305 | need_dash = {} |
| 306 | done = {} |
| 307 | # Extract the ids for all olt(s) in the message and do one of 2 |
| 308 | # things. If it exists, reset the meta_data timer for the dashboard and |
| 309 | # if it doesn't exist add it to the array of needed dashboards. |
| 310 | metrics = json.loads(getattr(msg.message,'value'))['prefixes'] |
| 311 | for key in metrics.keys(): |
| 312 | match = re.search(r'voltha\.(.*olt)\.([0-9a-zA-Z]+)\.(.*)',key) |
| 313 | if match and match.lastindex > 1: |
| 314 | if match.group(2) in self.dash_meta and match.group(2) not in done: |
| 315 | # Update the delete countdown timer |
| 316 | self.dash_meta[match.group(2)]['timer'] = self.timer_duration |
| 317 | done[match.group(2)] = True |
| 318 | # Issue a log if the reset if somewhat relevant. |
| 319 | if self.dash_meta[match.group(2)]['timer'] < \ |
| 320 | self.timer_duration - self.timer_resolution: |
| 321 | log.info("reset-timer",device=match.group(2)) |
| 322 | #print("reset timer for: %s" %match.group(2)) |
| 323 | else: |
| 324 | # No dahsboard exists, |
| 325 | need_dash[key] = metrics[key] |
| 326 | return need_dash |
| 327 | |
| 328 | def create_dashboards(self, createList): |
| 329 | dataIds = "ABCDEFGHIJKLMNOP" |
| 330 | for dash in createList: |
| 331 | #log.info("creating a dashboard for: %s" % self.dash_meta[dash]) |
| 332 | # Create one row per "interface" |
| 333 | # Create one panel per metric type for the time being it's one |
| 334 | # panel for byte stats and one panel for packet stats. |
| 335 | newDash = json.loads(self.dash_template.dashBoard) |
| 336 | newDash['dashboard']['title'] = self.dash_meta[dash]['device'] + \ |
| 337 | '.' + dash |
| 338 | # The port is the main grouping attribute |
| 339 | for port in self.dash_meta[dash]['ports']: |
| 340 | # Add in the rows for the port specified by the template |
| 341 | for row in self.dash_template.rows: |
| 342 | r = json.loads(self.dash_template.dashRow) |
| 343 | r['title'] = re.sub(r'%port%',port, row['title']) |
| 344 | p = {} |
| 345 | # Add the panels to the row per the template |
| 346 | panelId = 1 |
| 347 | for panel in self.dash_template.panels: |
| 348 | p = json.loads(self.dash_template.dashPanel) |
| 349 | p['id'] = panelId |
| 350 | panelId += 1 |
| 351 | p['title'] = re.sub(r'%port%', port.upper(), panel['title']) |
| 352 | t = {} |
| 353 | dataId = 0 |
| 354 | # Add the targets to the panel |
| 355 | for dpoint in sorted(self.dash_meta[dash]['ports'][port]): |
| 356 | if dpoint in panel: |
| 357 | t['refId'] = dataIds[dataId] |
| 358 | db = re.sub(r'%port%',port,panel[dpoint]) |
| 359 | db = re.sub(r'%device%', |
| 360 | self.dash_meta[dash]['device'],db) |
| 361 | db = re.sub(r'%deviceId%', dash,db) |
| 362 | t['target'] = db |
| 363 | p['targets'].append(t.copy()) |
| 364 | dataId += 1 |
| 365 | r['panels'].append(p.copy()) |
| 366 | newDash['dashboard']['rows'].append(r.copy()) |
| 367 | #print("NEW DASHBOARD: ",json.dumps(newDash)) |
| 368 | #print(r.json()) |
| 369 | r = \ |
| 370 | requests.post(self.grafana_url + "/dashboards/db", |
| 371 | json=newDash) |
| 372 | self.dash_meta[dash]['slug'] = r.json()['slug'] |
| 373 | self.dash_meta[dash]['created'] = True |
| 374 | log.info("created-dashboard", slug=self.dash_meta[dash]['slug']) |
| 375 | |
| 376 | def msg_processor(self, consumer, msglist): |
| 377 | try: |
| 378 | createList = [] |
| 379 | for msg in msglist: |
| 380 | # Reset the timer for existing dashboards and get back a dict |
| 381 | # of of dashboards to create if any. |
| 382 | need_dash = self.check_for_dashboard(msg) |
| 383 | # Now populate the meta data for all missing dashboards |
| 384 | for key in need_dash.keys(): |
| 385 | match = re.search(r'voltha\.(.*olt)\.([0-9a-zA-Z]+)\.(.*)',key) |
| 386 | if match and match.lastindex > 2: |
| 387 | if match.group(2) in self.dash_meta: |
| 388 | # The entry will have been created when the first |
| 389 | # port in the record was encountered so just |
| 390 | # populate the metrics and port info. |
| 391 | # TODO: The keys below are the names of the metrics |
| 392 | # that are in the Kafka record. This auto-discovery |
| 393 | # is fine if all that's needed are raw metrics. If |
| 394 | # metrics are "cooked" by a downstream process and |
| 395 | # subsequently fed to graphite/carbon without being |
| 396 | # re-posted to Kafka, discovery becomes impossible. |
| 397 | # In those cases and in cases where finer grain |
| 398 | # control of what's displayed is required, a config |
| 399 | # file would be necessary. |
| 400 | self.dash_meta[match.group(2)]['ports'][match.group(3)] = \ |
| 401 | need_dash[key]['metrics'].keys() |
| 402 | else: |
| 403 | # Not there, create a meta-data record for the |
| 404 | # device and add this port. |
| 405 | #print("Adding meta data for", match.group(1), |
| 406 | # match.group(2)) |
| 407 | createList.append(match.group(2)) |
| 408 | self.dash_meta[match.group(2)] = {} |
| 409 | self.dash_meta[match.group(2)]['timer'] = 600 |
| 410 | self.dash_meta[match.group(2)]['device'] = match.group(1) |
| 411 | self.dash_meta[match.group(2)]['created'] = False |
| 412 | self.dash_meta[match.group(2)]['ports'] = {} |
| 413 | #print("Adding port", match.group(3), "to", match.group(1), |
| 414 | # match.group(2)) |
| 415 | self.dash_meta[match.group(2)]['ports'][match.group(3)] = \ |
| 416 | need_dash[key]['metrics'].keys() |
| 417 | # Now go ahead and create the dashboards using the meta data that |
| 418 | # wwas just populated for them. |
| 419 | if len(createList) != 0: # Create any missing dashboards. |
| 420 | self.create_dashboards(createList) |
| 421 | except: |
| 422 | e = sys.exc_info() |
| 423 | log.error("error", error=e) |
| 424 | |
| 425 | def parse_options(): |
| 426 | parser = ArgumentParser("Manage Grafana Dashboards") |
| 427 | parser.add_argument("-c", "--consul", |
| 428 | help="consul ip and port", |
| 429 | default='10.100.198.220:8500') |
| 430 | |
| 431 | parser.add_argument("-t", "--topic", |
| 432 | help="topic to listen from", |
| 433 | default="voltha.kpis") |
| 434 | |
| 435 | parser.add_argument("-g", "--grafana_url", |
| 436 | help="graphana api url", |
| 437 | default= "http://admin:admin@localhost:8882/api") |
| 438 | |
| 439 | parser.add_argument("-k", "--kafka", |
| 440 | help="kafka bus", |
| 441 | default=None) |
| 442 | |
| 443 | parser.add_argument("-s", "--host", |
| 444 | help="docker host ip", |
| 445 | default=None) |
| 446 | |
| 447 | return parser.parse_args() |
| 448 | |
| 449 | def main(): |
| 450 | logging.basicConfig( |
| 451 | format='%(asctime)s:%(name)s:' + |
| 452 | '%(levelname)s:%(process)d:%(message)s', |
| 453 | level=logging.INFO |
| 454 | ) |
| 455 | |
| 456 | args = parse_options() |
| 457 | |
| 458 | dashd = DashDaemon(args.consul, args.grafana_url, args.topic) |
| 459 | reactor.callWhenRunning(dashd.start) |
| 460 | reactor.run() |
| 461 | log.info("completed!") |
| 462 | |
| 463 | |
| 464 | if __name__ == "__main__": |
| 465 | main() |