blob: 2be471a23110d87622385924795ed1f7f5bcc4f7 [file] [log] [blame]
Sergio Slobodrianf39aaf82017-02-28 16:10:16 -05001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright 2017 the original author or authors.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19# This is a very simple implementation of a dashboard creation service that
20# listens to the Kafka bus on the voltha.kpis toic looking for performance
21# monitoring metrics for olts. If a new olt appears on the bus the service will
22# create a dashboard for it for both packet and byte stats creating one row per
23# port/stream and in each row one panel for packet stats and one for byte
24# stats.
25#
26# TODO: Capture all of the metadata for existing dashboards from Grafana. We're
27# only capturing the device and device id from the title which is good enough
28# for now.
29# TODO: Leverage Grafana to act as a template builder simplifying the
30# specification of a template without having to resort to a separate API for
31# the dashd service. The basic premise is a dashboard with any name except
32# voltha.template is created for any device. Once happy with the dashboard it's
33# renamed voltha.template and this will automatically trigger the creation of a
34# new template to use for all dashboards. All existing dashboards are
35# immediately deleted and new ones are created using the template. The template
36# is renamed voltha.template.active and can be deleted at this point. This has
37# been started.
38
39#
40# Metadata format.
41# The metadata for each device from which relevant metrics are recieved are
42# stored in a dash_meta dictionary structure as follows.
43#
44# {<device_id1>: {
45# device:<device_type>,
46# slug:<grafana_slug>,
47# timer: <timer_val>
48# created: <creation_status>
49# ports: {
50# <port_id>:[
51# <metric1>,
52# <metric2>,
53# ...,
54# <metricN>
55# ]
56# }
57# },
58# ...
59# <device_idN>: {
60# }
61# }
62#
63
64from structlog import get_logger
65from argparse import ArgumentParser
66
67from afkak.client import KafkaClient
68from afkak.common import (
69 KafkaUnavailableError,
70 OFFSET_LATEST)
71from afkak.consumer import Consumer
72from twisted.internet import reactor
73from twisted.internet.defer import DeferredList, inlineCallbacks
74from twisted.python.failure import Failure
75from twisted.internet.task import LoopingCall
76
77from common.utils.consulhelpers import get_endpoint_from_consul
78import requests
79import json
80import re
81import sys
82from dashd.dash_template import DashTemplate
83
84log = get_logger()
85
86
87class DashDaemon(object):
88 def __init__(self, consul_endpoint, grafana_url, topic="voltha.heartbeat"):
89 #logging.basicConfig(
90 # format='%(asctime)s:%(name)s:' +
91 # '%(levelname)s:%(process)d:%(message)s',
92 # level=logging.INFO
93 #)
94 self.dash_meta = {}
95 self.timer_resolution = 10
96 self.timer_duration = 600
97 self.topic = topic
98 self.dash_template = DashTemplate(grafana_url)
99 self.grafana_url = grafana_url
100 self.kafka_endpoint = get_endpoint_from_consul(consul_endpoint,
101 'kafka')
102 # print('kafka endpoint: ', self.kafka_endpoint)
103 self.on_start_callback = None
104
105 self._client = KafkaClient(self.kafka_endpoint)
106 self._consumer_list = [] # List of consumers
107 # List of deferred returned from consumers' start() methods
108 self._consumer_d_list = []
109
110 def set_on_start_callback(self, on_start_callback):
111 # This function is currently unused, future requirements.
112 self.on_start_callback = on_start_callback
113 return self
114
115 @inlineCallbacks
116 def start(self):
117 partitions = []
118 try:
119 while not partitions:
120 yield self._client.load_metadata_for_topics(self.topic)
121 e = self._client.metadata_error_for_topic(self.topic)
122 if e:
123 log.warning('no-metadata-for-topic', error=e,
124 topic=self.topic)
125 else:
126 partitions = self._client.topic_partitions[self.topic]
127 except KafkaUnavailableError:
128 log.error("unable-to-communicate-with-Kafka-brokers")
129 self.stop()
130
131 def _note_consumer_stopped(result, consumer):
132 log.info('consumer-stopped', consumer=consumer,
133 result=result)
134
135 for partition in partitions:
136 c = Consumer(self._client, self.topic, partition,
137 self.msg_processor)
138 self._consumer_list.append(c)
139 log.info('consumer-started', topic=self.topic, partition=partition)
140 d = c.start(OFFSET_LATEST)
141 d.addBoth(_note_consumer_stopped, c)
142 self._consumer_d_list.append(d)
143
144 # Now read the list of existing dashboards from Grafana and create the
145 # dictionary of dashboard timers. If we've crashed there will be
146 # dashboards there. Just add them and if they're no longer valid
147 # they'll be deleted. If they are valid then they'll persist.
148 #print("Starting main loop")
149 try:
150 r = requests.get(self.grafana_url + "/search?")
151 j = r.json()
152 for i in j:
153 # Look for dashboards that have a title of *olt.[[:hexidgit:]].
154 # These will be the ones of interest. Others should just be left
155 # alone.
156 #print(i['title'])
157 match = re.search(r'(.*olt)\.([0-9a-zA-Z]+)',i['title'])
158 if match and match.lastindex > 0:
159 #print(match.group(1), match.group(2))
160 self.dash_meta[match.group(2)] = {}
161 self.dash_meta[match.group(2)]['timer'] = self.timer_duration # 10 min
162 self.dash_meta[match.group(2)]['device'] = match.group(1)
163 self.dash_meta[match.group(2)]['created'] = False
164 self.dash_meta[match.group(2)]['ports'] = {}
165 # TODO: We should really capture all of the chart data
166 # including the rows, panels, and data points being logged.
167 # This is good enough for now though to determine if
168 # there's already a dashboard for a given device.
169
170
171 def countdown_processor():
172 # Called every X (timer_resolution) seconds to count down each of the
173 # dash timers. If a timer reaches 0 the corresponding
174 # dashboard is removed.
175 #log.info("Counting down.")
176 try:
177 for dashboard in self.dash_meta.keys():
178 #print("Counting down %s." %dashboard)
179 # Issue a log if the counter decrement is somewhat relevant
180 if(self.dash_meta[dashboard]['timer'] % 100 == 0 and \
181 self.dash_meta[dashboard]['timer'] != self.timer_duration):
182 log.info("counting-down",dashboard=dashboard,
183 timer=self.dash_meta[dashboard]['timer'])
184 self.dash_meta[dashboard]['timer'] -= self.timer_resolution
185 if self.dash_meta[dashboard]['timer'] <= 0:
186 # Delete the dashboard here
187 log.info("FIXME:-Should-delete-the-dashboard-here",
188 dashboard=dashboard)
189 pass
190 except:
191 e = sys.exc_info()
192 log.error("error", error=e)
193 # Start the dashboard countdown processor
194 log.info("starting-countdown-processor")
195 lc = LoopingCall(countdown_processor)
196 lc.start(self.timer_resolution)
197
198 @inlineCallbacks
199 def template_checker():
200 try:
201 # Called every so often (timer_resolution seconds because it's
202 # convenient) to check if a template dashboard has been defined
203 # in Grafana. If it has been, replace the built in template
204 # with the one provided
205 r = requests.get(self.grafana_url + "/search?query=template")
206 db = r.json()
207 if len(db) == 1:
208 # Apply the template
209 yield self.dash_template.apply_template(db[0])
210 elif len(db) != 0:
211 # This is an error, log it.
212 log.warning("More-than-one-template-provided-ignoring")
213 except:
214 e = sys.exc_info()
215 log.error("error", error=e)
216
217 log.info("starting-template-checker")
218 lc = LoopingCall(template_checker)
219 lc.start(self.timer_resolution)
220
221 except:
222 e = sys.exc_info()
223 log.error("error", error=e)
224
225 def stop(self):
226 log.info("\n")
227 log.info('end-of-execution-stopping-consumers')
228 # Ask each of our consumers to stop. When a consumer fully stops, it
229 # fires the deferred returned from its start() method. We saved all
230 # those deferreds away (above, in start()) in self._consumer_d_list,
231 # so now we'll use a DeferredList to wait for all of them...
232 for consumer in self._consumer_list:
233 consumer.stop()
234 dl = DeferredList(self._consumer_d_list)
235
236 # Once the consumers are all stopped, then close our client
237 def _stop_client(result):
238 if isinstance(result, Failure):
239 log.error('error', result=result)
240 else:
241 log.info('all-consumers-stopped', client=self._client)
242 self._client.close()
243 return result
244
245 dl.addBoth(_stop_client)
246
247 # And once the client is shutdown, stop the reactor
248 def _stop_reactor(result):
249 reactor.stop()
250 return result
251
252 dl.addBoth(_stop_reactor)
253
254 def check_for_dashboard(self, msg):
255 need_dash = {}
256 done = {}
257 # Extract the ids for all olt(s) in the message and do one of 2
258 # things. If it exists, reset the meta_data timer for the dashboard and
259 # if it doesn't exist add it to the array of needed dashboards.
260 metrics = json.loads(getattr(msg.message,'value'))['prefixes']
261 for key in metrics.keys():
262 match = re.search(r'voltha\.(.*olt)\.([0-9a-zA-Z]+)\.(.*)',key)
263 if match and match.lastindex > 1:
264 if match.group(2) in self.dash_meta and match.group(2) not in done:
265 # Update the delete countdown timer
266 self.dash_meta[match.group(2)]['timer'] = self.timer_duration
267 done[match.group(2)] = True
268 # Issue a log if the reset if somewhat relevant.
269 if self.dash_meta[match.group(2)]['timer'] < \
270 self.timer_duration - self.timer_resolution:
271 log.info("reset-timer",device=match.group(2))
272 #print("reset timer for: %s" %match.group(2))
273 else:
274 # No dahsboard exists,
275 need_dash[key] = metrics[key]
276 return need_dash
277
278 def create_dashboards(self, createList):
279 dataIds = "ABCDEFGHIJKLMNOP"
280 for dash in createList:
281 #log.info("creating a dashboard for: %s" % self.dash_meta[dash])
282 # Create one row per "interface"
283 # Create one panel per metric type for the time being it's one
284 # panel for byte stats and one panel for packet stats.
285 newDash = json.loads(self.dash_template.dashBoard)
286 newDash['dashboard']['title'] = self.dash_meta[dash]['device'] + \
287 '.' + dash
288 # The port is the main grouping attribute
289 for port in self.dash_meta[dash]['ports']:
290 # Add in the rows for the port specified by the template
291 for row in self.dash_template.rows:
292 r = json.loads(self.dash_template.dashRow)
293 r['title'] = re.sub(r'%port%',port, row['title'])
294 p = {}
295 # Add the panels to the row per the template
296 panelId = 1
297 for panel in self.dash_template.panels:
298 p = json.loads(self.dash_template.dashPanel)
299 p['id'] = panelId
300 panelId += 1
301 p['title'] = re.sub(r'%port%', port.upper(), panel['title'])
302 t = {}
303 dataId = 0
304 # Add the targets to the panel
305 for dpoint in sorted(self.dash_meta[dash]['ports'][port]):
306 if dpoint in panel:
307 t['refId'] = dataIds[dataId]
308 db = re.sub(r'%port%',port,panel[dpoint])
309 db = re.sub(r'%device%',
310 self.dash_meta[dash]['device'],db)
311 db = re.sub(r'%deviceId%', dash,db)
312 t['target'] = db
313 p['targets'].append(t.copy())
314 dataId += 1
315 r['panels'].append(p.copy())
316 newDash['dashboard']['rows'].append(r.copy())
317 #print("NEW DASHBOARD: ",json.dumps(newDash))
318 #print(r.json())
319 r = \
320 requests.post(self.grafana_url + "/dashboards/db",
321 json=newDash)
322 self.dash_meta[dash]['slug'] = r.json()['slug']
323 self.dash_meta[dash]['created'] = True
324 log.info("created-dashboard", slug=self.dash_meta[dash]['slug'])
325
326 def msg_processor(self, consumer, msglist):
327 try:
328 createList = []
329 for msg in msglist:
330 # Reset the timer for existing dashboards and get back a dict
331 # of of dashboards to create if any.
332 need_dash = self.check_for_dashboard(msg)
333 # Now populate the meta data for all missing dashboards
334 for key in need_dash.keys():
335 match = re.search(r'voltha\.(.*olt)\.([0-9a-zA-Z]+)\.(.*)',key)
336 if match and match.lastindex > 2:
337 if match.group(2) in self.dash_meta:
338 # The entry will have been created when the first
339 # port in the record was encountered so just
340 # populate the metrics and port info.
341 # TODO: The keys below are the names of the metrics
342 # that are in the Kafka record. This auto-discovery
343 # is fine if all that's needed are raw metrics. If
344 # metrics are "cooked" by a downstream process and
345 # subsequently fed to graphite/carbon without being
346 # re-posted to Kafka, discovery becomes impossible.
347 # In those cases and in cases where finer grain
348 # control of what's displayed is required, a config
349 # file would be necessary.
350 self.dash_meta[match.group(2)]['ports'][match.group(3)] = \
351 need_dash[key]['metrics'].keys()
352 else:
353 # Not there, create a meta-data record for the
354 # device and add this port.
355 #print("Adding meta data for", match.group(1),
356 # match.group(2))
357 createList.append(match.group(2))
358 self.dash_meta[match.group(2)] = {}
359 self.dash_meta[match.group(2)]['timer'] = 600
360 self.dash_meta[match.group(2)]['device'] = match.group(1)
361 self.dash_meta[match.group(2)]['created'] = False
362 self.dash_meta[match.group(2)]['ports'] = {}
363 #print("Adding port", match.group(3), "to", match.group(1),
364 # match.group(2))
365 self.dash_meta[match.group(2)]['ports'][match.group(3)] = \
366 need_dash[key]['metrics'].keys()
367 # Now go ahead and create the dashboards using the meta data that
368 # wwas just populated for them.
369 if len(createList) != 0: # Create any missing dashboards.
370 self.create_dashboards(createList)
371 except:
372 e = sys.exc_info()
373 log.error("error", error=e)
374
375def parse_options():
376 parser = ArgumentParser("Manage Grafana Dashboards")
377 parser.add_argument("-c", "--consul",
378 help="consul ip and port",
379 default='10.100.198.220:8500')
380
381 parser.add_argument("-t", "--topic",
382 help="topic to listen from",
383 default="voltha.kpis")
384
385 parser.add_argument("-g", "--grafana_url",
386 help="graphana api url",
387 default= "http://admin:admin@localhost:8882/api")
388
389 parser.add_argument("-k", "--kafka",
390 help="kafka bus",
391 default=None)
392
393 parser.add_argument("-s", "--host",
394 help="docker host ip",
395 default=None)
396
397 return parser.parse_args()
398
399def main():
400 logging.basicConfig(
401 format='%(asctime)s:%(name)s:' +
402 '%(levelname)s:%(process)d:%(message)s',
403 level=logging.INFO
404 )
405
406 args = parse_options()
407
408 dashd = DashDaemon(args.consul, args.grafana_url, args.topic)
409 reactor.callWhenRunning(dashd.start)
410 reactor.run()
411 log.info("completed!")
412
413
414if __name__ == "__main__":
415 main()