Enabling monitoring for Exampleservice as a reference for other services
Change-Id: I67801d07170dea267a22179e4f8a7fb1312b6521
diff --git a/xos/synchronizer/monitoring_agent/__init__.py b/xos/synchronizer/monitoring_agent/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/xos/synchronizer/monitoring_agent/__init__.py
diff --git a/xos/synchronizer/monitoring_agent/exampleservice_stats.py b/xos/synchronizer/monitoring_agent/exampleservice_stats.py
new file mode 100644
index 0000000..64843fd
--- /dev/null
+++ b/xos/synchronizer/monitoring_agent/exampleservice_stats.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python
+
+# Author: Mike Adolphs, 2009
+# Blog: http://www.matejunkie.com/
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License only!
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+
+import sys
+import urllib
+
+def retrieve_status_page():
+ statusPage = "http://localhost/server-status?auto"
+ try:
+ retrPage = urllib.urlretrieve(statusPage, '/tmp/server-status.log')
+ return True
+ except:
+ return False
+
+def parse_status_page():
+ """Main parsing function to put the server-status file's content into
+ a dictionary."""
+
+ file = open('/tmp/server-status.log', 'r')
+ line = file.readline()
+ dictStatus = {}
+ counter = 1
+
+ while line:
+ line = line.strip()
+ if "Total Accesses:" in line:
+ key = "total.accesses"
+ val = {'val':int(line.strip("Total Accesses:")), 'unit':'accesses', 'metric_type':'gauge'}
+ elif "Total kBytes:" in line:
+ key = "total.kBytes"
+ val = {'val':float(line.strip("Total kBytes:")), 'unit':'kBytes', 'metric_type':'gauge'}
+ elif "Uptime:" in line:
+ key = "uptime"
+ val = {'val':int(line.strip("Uptime:")), 'unit':'seconds', 'metric_type':'gauge'}
+ elif "ReqPerSec:" in line:
+ key = "reqpersec"
+ val = {'val':float(line.strip("ReqPerSec:")), 'unit':'rate', 'metric_type':'gauge'}
+ elif "BytesPerSec:" in line:
+ key = "bytespersec"
+ val = {'val':float(line.strip("BytesPerSec:")), 'unit':'rate', 'metric_type':'gauge'}
+ elif "BytesPerReq:" in line:
+ key = "bytesperreq"
+ val = {'val':float(line.strip("BytesPerReq:")), 'unit':'rate', 'metric_type':'gauge'}
+ elif "BusyWorkers:" in line:
+ key = "busyworkers"
+ val = {'val':int(line.strip("BusyWorkers:")), 'unit':'workers', 'metric_type':'gauge'}
+ elif "IdleWorkers:" in line:
+ key = "idleworkers"
+ val = {'val':int(line.strip("IdleWorkers:")), 'unit':'workers', 'metric_type':'gauge'}
+
+ dictStatus[key] = val
+ counter = counter + 1
+ line = file.readline()
+
+ return dictStatus
+
diff --git a/xos/synchronizer/monitoring_agent/monitoring_agent.conf b/xos/synchronizer/monitoring_agent/monitoring_agent.conf
new file mode 100755
index 0000000..2a53b03
--- /dev/null
+++ b/xos/synchronizer/monitoring_agent/monitoring_agent.conf
@@ -0,0 +1,21 @@
+[loggers]
+keys=root
+
+[handlers]
+keys=logfile
+
+[formatters]
+keys=logfileformatter
+
+[logger_root]
+level=DEBUG
+handlers=logfile
+
+[formatter_logfileformatter]
+format='%(asctime)s %(filename)s %(levelname)s %(message)s'
+
+[handler_logfile]
+class=handlers.RotatingFileHandler
+level=NOTSET
+args=('monitoring_agent.log','a',10000000,5)
+formatter=logfileformatter
diff --git a/xos/synchronizer/monitoring_agent/monitoring_agent.py b/xos/synchronizer/monitoring_agent/monitoring_agent.py
new file mode 100644
index 0000000..1839ed5
--- /dev/null
+++ b/xos/synchronizer/monitoring_agent/monitoring_agent.py
@@ -0,0 +1,111 @@
+#!/usr/bin/python
+from flask import request, Request, jsonify
+from flask import Flask
+from flask import make_response
+from kombu.connection import BrokerConnection
+from kombu.messaging import Exchange, Queue, Consumer, Producer
+import logging
+import logging.handlers
+import logging.config
+import exampleservice_stats as stats
+import threading
+import subprocess
+import six
+import uuid
+import datetime
+from urlparse import urlparse
+app = Flask(__name__)
+
+start_publish = False
+keystone_tenant_id='3a397e70f64e4e40b69b6266c634d9d0'
+keystone_user_id='1e3ce043029547f1a61c1996d1a531a2'
+rabbit_user='openstack'
+rabbit_password='80608318c273f348a7c3'
+rabbit_host='10.11.10.1'
+rabbit_exchange='cord'
+publisher_id='exampleservice_publisher'
+
+@app.route('/monitoring/agent/exampleservice/start',methods=['POST'])
+def exampleservice_start_monitoring_agent():
+ global start_publish, rabbit_user, rabbit_password, rabbit_host, rabbit_exchange
+ try:
+ # To do validation of user inputs for all the functions
+ target = request.json['target']
+ logging.debug("target:%s",target)
+ keystone_user_id = request.json['keystone_user_id']
+ keystone_tenant_id = request.json['keystone_tenant_id']
+ url = urlparse(target)
+ rabbit_user = url.username
+ rabbit_password = url.password
+ rabbit_host = url.hostname
+
+ setup_rabbit_mq_channel()
+
+ start_publish = True
+ periodic_publish()
+
+ logging.info("Exampleservice monitoring is enabled")
+ return "Exampleservice monitoring is enabled"
+ except Exception as e:
+ return e.__str__()
+
+@app.route('/monitoring/agent/exampleservice/stop',methods=['POST'])
+def openstack_stop():
+ global start_publish
+ start_publish = False
+ logging.info ("Exampleservice monitoring is stopped")
+ return "Exampleservice monitoring is stopped"
+
+
+producer = None
+def setup_rabbit_mq_channel():
+ global producer
+ global rabbit_user, rabbit_password, rabbit_host, rabbit_exchange,publisher_id
+ service_exchange = Exchange(rabbit_exchange, "topic", durable=False)
+ # connections/channels
+ connection = BrokerConnection(rabbit_host, rabbit_user, rabbit_password)
+ logging.info('Connection to RabbitMQ server successful')
+ channel = connection.channel()
+ # produce
+ producer = Producer(channel, exchange=service_exchange, routing_key='notifications.info')
+ p = subprocess.Popen('hostname', shell=True, stdout=subprocess.PIPE)
+ (hostname, error) = p.communicate()
+ publisher_id = publisher_id + '_on_' + hostname
+ logging.info('publisher_id=%s',publisher_id)
+
+def publish_exampleservice_stats(example_stats):
+ global producer
+ global keystone_tenant_id, keystone_user_id, publisher_id
+
+ for k,v in example_stats.iteritems():
+ msg = {'event_type': 'cord.'+k,
+ 'message_id':six.text_type(uuid.uuid4()),
+ 'publisher_id': publisher_id,
+ 'timestamp':datetime.datetime.now().isoformat(),
+ 'priority':'INFO',
+ 'payload': {'name':k,
+ 'unit':v['unit'],
+ 'result':v['val'],
+ 'type':v['metric_type'],
+ 'resource_id':'exampleservice',
+ 'user_id':keystone_user_id,
+ 'tenant_id':keystone_tenant_id
+ }
+ }
+ producer.publish(msg)
+ logging.debug('Publishing exampleservice event: %s', msg)
+
+def periodic_publish():
+ global start_publish
+ if not start_publish:
+ return
+ stats.retrieve_status_page()
+ resParse = stats.parse_status_page()
+ logging.debug ("publish:%(data)s" % {'data':resParse})
+ publish_exampleservice_stats(resParse)
+ threading.Timer(5, periodic_publish).start()
+
+if __name__ == "__main__":
+ logging.config.fileConfig('monitoring_agent.conf', disable_existing_loggers=False)
+ logging.info ("Exampleservice monitoring is listening on port 5004")
+ app.run(host="0.0.0.0",port=5004,debug=False)