blob: e933cd4bb93dc62855168da50525021d4ad70934 [file] [log] [blame]
Sergio Slobodrianf39aaf82017-02-28 16:10:16 -05001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2017 the original author or authors.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19# This is a very simple implementation of a dashboard creation service that
20# listens to the Kafka bus on the voltha.kpis toic looking for performance
21# monitoring metrics for olts. If a new olt appears on the bus the service will
22# create a dashboard for it for both packet and byte stats creating one row per
23# port/stream and in each row one panel for packet stats and one for byte
24# stats.
25#
26# TODO: Capture all of the metadata for existing dashboards from Grafana. We're
27# only capturing the device and device id from the title which is good enough
28# for now.
29# TODO: Leverage Grafana to act as a template builder simplifying the
30# specification of a template without having to resort to a separate API for
31# the dashd service. The basic premise is a dashboard with any name except
32# voltha.template is created for any device. Once happy with the dashboard it's
33# renamed voltha.template and this will automatically trigger the creation of a
34# new template to use for all dashboards. All existing dashboards are
35# immediately deleted and new ones are created using the template. The template
36# is renamed voltha.template.active and can be deleted at this point. This has
37# been started.
38
39#
40# Metadata format.
41# The metadata for each device from which relevant metrics are recieved are
42# stored in a dash_meta dictionary structure as follows.
43#
44# {<device_id1>: {
45# device:<device_type>,
46# slug:<grafana_slug>,
47# timer: <timer_val>
48# created: <creation_status>
49# ports: {
50# <port_id>:[
51# <metric1>,
52# <metric2>,
53# ...,
54# <metricN>
55# ]
56# }
57# },
58# ...
59# <device_idN>: {
60# }
61# }
62#
63
64from structlog import get_logger
65from argparse import ArgumentParser
66
67from afkak.client import KafkaClient
68from afkak.common import (
69 KafkaUnavailableError,
70 OFFSET_LATEST)
71from afkak.consumer import Consumer
72from twisted.internet import reactor
73from twisted.internet.defer import DeferredList, inlineCallbacks
74from twisted.python.failure import Failure
75from twisted.internet.task import LoopingCall
76
77from common.utils.consulhelpers import get_endpoint_from_consul
78import requests
79import json
80import re
81import sys
Sergio Slobodrian61999e52017-04-03 19:09:11 -040082import time
Sergio Slobodrianf39aaf82017-02-28 16:10:16 -050083from dashd.dash_template import DashTemplate
84
85log = get_logger()
86
87
88class DashDaemon(object):
Richard Jankowskid1232062017-07-17 14:10:23 -040089 def __init__(self, consul_endpoint, kafka_endpoint, grafana_url, topic="voltha.heartbeat"):
Sergio Slobodrianf39aaf82017-02-28 16:10:16 -050090 #logging.basicConfig(
91 # format='%(asctime)s:%(name)s:' +
92 # '%(levelname)s:%(process)d:%(message)s',
93 # level=logging.INFO
94 #)
95 self.dash_meta = {}
96 self.timer_resolution = 10
97 self.timer_duration = 600
98 self.topic = topic
99 self.dash_template = DashTemplate(grafana_url)
100 self.grafana_url = grafana_url
Richard Jankowskid1232062017-07-17 14:10:23 -0400101 self.kafka_endpoint = kafka_endpoint
Sergio Slobodrian61999e52017-04-03 19:09:11 -0400102 self.consul_endpoint = consul_endpoint
Richard Jankowskid1232062017-07-17 14:10:23 -0400103
104 if kafka_endpoint.startswith('@'):
105 retrys = 10
106 while True:
107 try:
108 self.kafka_endpoint = get_endpoint_from_consul(
109 self.consul_endpoint, kafka_endpoint[1:])
110 break
111 except:
112 log.error("unable-to-communicate-with-consul")
113 self.stop()
114 retrys -= 1
115 if retrys == 0:
116 log.error("unable-to-communicate-with-consul")
117 self.stop()
118 time.sleep(10)
119
Sergio Slobodrianf39aaf82017-02-28 16:10:16 -0500120 self.on_start_callback = None
121
122 self._client = KafkaClient(self.kafka_endpoint)
123 self._consumer_list = [] # List of consumers
124 # List of deferred returned from consumers' start() methods
125 self._consumer_d_list = []
126
127 def set_on_start_callback(self, on_start_callback):
128 # This function is currently unused, future requirements.
129 self.on_start_callback = on_start_callback
130 return self
131
132 @inlineCallbacks
133 def start(self):
134 partitions = []
135 try:
136 while not partitions:
137 yield self._client.load_metadata_for_topics(self.topic)
Sergio Slobodrian61999e52017-04-03 19:09:11 -0400138 #self._client.load_metadata_for_topics(self.topic)
Sergio Slobodrianf39aaf82017-02-28 16:10:16 -0500139 e = self._client.metadata_error_for_topic(self.topic)
140 if e:
141 log.warning('no-metadata-for-topic', error=e,
142 topic=self.topic)
143 else:
144 partitions = self._client.topic_partitions[self.topic]
Sergio Slobodrian61999e52017-04-03 19:09:11 -0400145 break
146 time.sleep(20)
Sergio Slobodrianf39aaf82017-02-28 16:10:16 -0500147 except KafkaUnavailableError:
148 log.error("unable-to-communicate-with-Kafka-brokers")
149 self.stop()
150
151 def _note_consumer_stopped(result, consumer):
152 log.info('consumer-stopped', consumer=consumer,
153 result=result)
154
155 for partition in partitions:
156 c = Consumer(self._client, self.topic, partition,
157 self.msg_processor)
158 self._consumer_list.append(c)
159 log.info('consumer-started', topic=self.topic, partition=partition)
160 d = c.start(OFFSET_LATEST)
161 d.addBoth(_note_consumer_stopped, c)
162 self._consumer_d_list.append(d)
163
164 # Now read the list of existing dashboards from Grafana and create the
165 # dictionary of dashboard timers. If we've crashed there will be
166 # dashboards there. Just add them and if they're no longer valid
167 # they'll be deleted. If they are valid then they'll persist.
168 #print("Starting main loop")
169 try:
Sergio Slobodrian3483a5d2017-04-21 13:31:04 -0400170 retrys = 10
Sergio Slobodrian61999e52017-04-03 19:09:11 -0400171 while True:
172 r = requests.get(self.grafana_url + "/datasources")
173 if r.status_code == requests.codes.ok:
174 break
175 else:
Sergio Slobodrian3483a5d2017-04-21 13:31:04 -0400176 retrys -= 1
177 if retrys == 0:
178 log.error("unable-to-communicate-with-grafana")
179 self.stop()
Sergio Slobodrian61999e52017-04-03 19:09:11 -0400180 time.sleep(10)
181 j = r.json()
182 data_source = False
183 for i in j:
184 if i["name"] == "Voltha Stats":
185 data_source = True
186 break
187 if not data_source:
188 r = requests.post(self.grafana_url + "/datasources",
189 data = {"name":"Voltha Stats","type":"graphite",
190 "access":"proxy","url":"http://localhost:81"})
191 log.info('data-source-added',status=r.status_code, text=r.text)
192
Sergio Slobodrian3483a5d2017-04-21 13:31:04 -0400193 retrys = 10
Sergio Slobodrian61999e52017-04-03 19:09:11 -0400194 while True:
195 r = requests.get(self.grafana_url + "/search?")
196 if r.status_code == requests.codes.ok:
197 break
198 else:
Sergio Slobodrian3483a5d2017-04-21 13:31:04 -0400199 retrys -= 1
200 if retrys == 0:
201 log.error("unable-to-communicate-with-grafana")
202 self.stop()
Sergio Slobodrian61999e52017-04-03 19:09:11 -0400203 time.sleep(10)
Sergio Slobodrianf39aaf82017-02-28 16:10:16 -0500204 j = r.json()
205 for i in j:
206 # Look for dashboards that have a title of *olt.[[:hexidgit:]].
207 # These will be the ones of interest. Others should just be left
208 # alone.
209 #print(i['title'])
210 match = re.search(r'(.*olt)\.([0-9a-zA-Z]+)',i['title'])
211 if match and match.lastindex > 0:
212 #print(match.group(1), match.group(2))
213 self.dash_meta[match.group(2)] = {}
214 self.dash_meta[match.group(2)]['timer'] = self.timer_duration # 10 min
215 self.dash_meta[match.group(2)]['device'] = match.group(1)
216 self.dash_meta[match.group(2)]['created'] = False
217 self.dash_meta[match.group(2)]['ports'] = {}
218 # TODO: We should really capture all of the chart data
219 # including the rows, panels, and data points being logged.
220 # This is good enough for now though to determine if
221 # there's already a dashboard for a given device.
222
223
224 def countdown_processor():
225 # Called every X (timer_resolution) seconds to count down each of the
226 # dash timers. If a timer reaches 0 the corresponding
227 # dashboard is removed.
228 #log.info("Counting down.")
229 try:
230 for dashboard in self.dash_meta.keys():
231 #print("Counting down %s." %dashboard)
232 # Issue a log if the counter decrement is somewhat relevant
233 if(self.dash_meta[dashboard]['timer'] % 100 == 0 and \
234 self.dash_meta[dashboard]['timer'] != self.timer_duration):
235 log.info("counting-down",dashboard=dashboard,
236 timer=self.dash_meta[dashboard]['timer'])
237 self.dash_meta[dashboard]['timer'] -= self.timer_resolution
238 if self.dash_meta[dashboard]['timer'] <= 0:
239 # Delete the dashboard here
240 log.info("FIXME:-Should-delete-the-dashboard-here",
241 dashboard=dashboard)
242 pass
243 except:
244 e = sys.exc_info()
245 log.error("error", error=e)
246 # Start the dashboard countdown processor
247 log.info("starting-countdown-processor")
248 lc = LoopingCall(countdown_processor)
249 lc.start(self.timer_resolution)
250
251 @inlineCallbacks
252 def template_checker():
253 try:
254 # Called every so often (timer_resolution seconds because it's
255 # convenient) to check if a template dashboard has been defined
256 # in Grafana. If it has been, replace the built in template
257 # with the one provided
258 r = requests.get(self.grafana_url + "/search?query=template")
259 db = r.json()
260 if len(db) == 1:
261 # Apply the template
262 yield self.dash_template.apply_template(db[0])
263 elif len(db) != 0:
264 # This is an error, log it.
265 log.warning("More-than-one-template-provided-ignoring")
266 except:
267 e = sys.exc_info()
268 log.error("error", error=e)
269
270 log.info("starting-template-checker")
271 lc = LoopingCall(template_checker)
272 lc.start(self.timer_resolution)
273
274 except:
275 e = sys.exc_info()
276 log.error("error", error=e)
277
278 def stop(self):
279 log.info("\n")
280 log.info('end-of-execution-stopping-consumers')
281 # Ask each of our consumers to stop. When a consumer fully stops, it
282 # fires the deferred returned from its start() method. We saved all
283 # those deferreds away (above, in start()) in self._consumer_d_list,
284 # so now we'll use a DeferredList to wait for all of them...
285 for consumer in self._consumer_list:
286 consumer.stop()
287 dl = DeferredList(self._consumer_d_list)
288
289 # Once the consumers are all stopped, then close our client
290 def _stop_client(result):
291 if isinstance(result, Failure):
292 log.error('error', result=result)
293 else:
294 log.info('all-consumers-stopped', client=self._client)
295 self._client.close()
296 return result
297
298 dl.addBoth(_stop_client)
299
300 # And once the client is shutdown, stop the reactor
301 def _stop_reactor(result):
302 reactor.stop()
303 return result
304
305 dl.addBoth(_stop_reactor)
306
307 def check_for_dashboard(self, msg):
308 need_dash = {}
309 done = {}
310 # Extract the ids for all olt(s) in the message and do one of 2
311 # things. If it exists, reset the meta_data timer for the dashboard and
312 # if it doesn't exist add it to the array of needed dashboards.
313 metrics = json.loads(getattr(msg.message,'value'))['prefixes']
314 for key in metrics.keys():
315 match = re.search(r'voltha\.(.*olt)\.([0-9a-zA-Z]+)\.(.*)',key)
316 if match and match.lastindex > 1:
317 if match.group(2) in self.dash_meta and match.group(2) not in done:
318 # Update the delete countdown timer
319 self.dash_meta[match.group(2)]['timer'] = self.timer_duration
320 done[match.group(2)] = True
321 # Issue a log if the reset if somewhat relevant.
322 if self.dash_meta[match.group(2)]['timer'] < \
323 self.timer_duration - self.timer_resolution:
324 log.info("reset-timer",device=match.group(2))
325 #print("reset timer for: %s" %match.group(2))
326 else:
327 # No dahsboard exists,
328 need_dash[key] = metrics[key]
329 return need_dash
330
331 def create_dashboards(self, createList):
332 dataIds = "ABCDEFGHIJKLMNOP"
333 for dash in createList:
334 #log.info("creating a dashboard for: %s" % self.dash_meta[dash])
335 # Create one row per "interface"
336 # Create one panel per metric type for the time being it's one
337 # panel for byte stats and one panel for packet stats.
338 newDash = json.loads(self.dash_template.dashBoard)
339 newDash['dashboard']['title'] = self.dash_meta[dash]['device'] + \
340 '.' + dash
341 # The port is the main grouping attribute
342 for port in self.dash_meta[dash]['ports']:
343 # Add in the rows for the port specified by the template
344 for row in self.dash_template.rows:
345 r = json.loads(self.dash_template.dashRow)
346 r['title'] = re.sub(r'%port%',port, row['title'])
347 p = {}
348 # Add the panels to the row per the template
349 panelId = 1
350 for panel in self.dash_template.panels:
351 p = json.loads(self.dash_template.dashPanel)
352 p['id'] = panelId
353 panelId += 1
354 p['title'] = re.sub(r'%port%', port.upper(), panel['title'])
355 t = {}
356 dataId = 0
357 # Add the targets to the panel
358 for dpoint in sorted(self.dash_meta[dash]['ports'][port]):
359 if dpoint in panel:
360 t['refId'] = dataIds[dataId]
361 db = re.sub(r'%port%',port,panel[dpoint])
362 db = re.sub(r'%device%',
363 self.dash_meta[dash]['device'],db)
364 db = re.sub(r'%deviceId%', dash,db)
365 t['target'] = db
366 p['targets'].append(t.copy())
367 dataId += 1
368 r['panels'].append(p.copy())
369 newDash['dashboard']['rows'].append(r.copy())
370 #print("NEW DASHBOARD: ",json.dumps(newDash))
371 #print(r.json())
372 r = \
373 requests.post(self.grafana_url + "/dashboards/db",
374 json=newDash)
375 self.dash_meta[dash]['slug'] = r.json()['slug']
376 self.dash_meta[dash]['created'] = True
377 log.info("created-dashboard", slug=self.dash_meta[dash]['slug'])
378
379 def msg_processor(self, consumer, msglist):
380 try:
381 createList = []
382 for msg in msglist:
383 # Reset the timer for existing dashboards and get back a dict
384 # of of dashboards to create if any.
385 need_dash = self.check_for_dashboard(msg)
386 # Now populate the meta data for all missing dashboards
387 for key in need_dash.keys():
388 match = re.search(r'voltha\.(.*olt)\.([0-9a-zA-Z]+)\.(.*)',key)
389 if match and match.lastindex > 2:
390 if match.group(2) in self.dash_meta:
391 # The entry will have been created when the first
392 # port in the record was encountered so just
393 # populate the metrics and port info.
394 # TODO: The keys below are the names of the metrics
395 # that are in the Kafka record. This auto-discovery
396 # is fine if all that's needed are raw metrics. If
397 # metrics are "cooked" by a downstream process and
398 # subsequently fed to graphite/carbon without being
399 # re-posted to Kafka, discovery becomes impossible.
400 # In those cases and in cases where finer grain
401 # control of what's displayed is required, a config
402 # file would be necessary.
403 self.dash_meta[match.group(2)]['ports'][match.group(3)] = \
404 need_dash[key]['metrics'].keys()
405 else:
406 # Not there, create a meta-data record for the
407 # device and add this port.
408 #print("Adding meta data for", match.group(1),
409 # match.group(2))
410 createList.append(match.group(2))
411 self.dash_meta[match.group(2)] = {}
412 self.dash_meta[match.group(2)]['timer'] = 600
413 self.dash_meta[match.group(2)]['device'] = match.group(1)
414 self.dash_meta[match.group(2)]['created'] = False
415 self.dash_meta[match.group(2)]['ports'] = {}
416 #print("Adding port", match.group(3), "to", match.group(1),
417 # match.group(2))
418 self.dash_meta[match.group(2)]['ports'][match.group(3)] = \
419 need_dash[key]['metrics'].keys()
420 # Now go ahead and create the dashboards using the meta data that
421 # wwas just populated for them.
422 if len(createList) != 0: # Create any missing dashboards.
423 self.create_dashboards(createList)
424 except:
425 e = sys.exc_info()
426 log.error("error", error=e)
427
428def parse_options():
429 parser = ArgumentParser("Manage Grafana Dashboards")
430 parser.add_argument("-c", "--consul",
431 help="consul ip and port",
432 default='10.100.198.220:8500')
433
434 parser.add_argument("-t", "--topic",
435 help="topic to listen from",
436 default="voltha.kpis")
437
438 parser.add_argument("-g", "--grafana_url",
439 help="graphana api url",
440 default= "http://admin:admin@localhost:8882/api")
441
442 parser.add_argument("-k", "--kafka",
443 help="kafka bus",
444 default=None)
445
446 parser.add_argument("-s", "--host",
447 help="docker host ip",
448 default=None)
449
450 return parser.parse_args()
451
452def main():
453 logging.basicConfig(
454 format='%(asctime)s:%(name)s:' +
455 '%(levelname)s:%(process)d:%(message)s',
456 level=logging.INFO
457 )
458
459 args = parse_options()
460
Richard Jankowskid1232062017-07-17 14:10:23 -0400461 dashd = DashDaemon(args.consul, args.kafka, args.grafana_url, args.topic)
Sergio Slobodrianf39aaf82017-02-28 16:10:16 -0500462 reactor.callWhenRunning(dashd.start)
463 reactor.run()
464 log.info("completed!")
465
466
467if __name__ == "__main__":
468 main()