Initial commit of PassiveTest

Change-Id: Idcd9a0c72df5eae6b4eedc544e473ebc9763ccdb
diff --git a/xos/synchronizer/monitoring_stats_notifier.py b/xos/synchronizer/monitoring_stats_notifier.py
new file mode 100644
index 0000000..bbfc0db
--- /dev/null
+++ b/xos/synchronizer/monitoring_stats_notifier.py
@@ -0,0 +1,111 @@
+import six, uuid, csv, datetime, threading, socket, shutil, argparse, glob, os, copy, pprint, time, sys
+from kombu.connection import BrokerConnection
+from kombu.messaging import Exchange, Queue, Consumer, Producer
+
+XAGG_CSV_DIR="/xsight/var/opt/xagg/tmp"
+XAGG_CSV_PROCESSED_DIR="/xsight/var/opt/xagg/tmp/processed"
+
+class RabbitMQ:
+    def __init__(self, rabbit_host, rabbit_user, rabbit_password, exchange_name):
+        exchange = Exchange(exchange_name, "topic", durable=False)
+        connection = BrokerConnection(rabbit_host, rabbit_user, rabbit_password)
+        channel = connection.channel()
+        self.producer = Producer(channel, exchange=exchange, routing_key="notifications.info")
+
+    def publish(self, stats):
+        self.producer.publish(stats)
+
+
+class CeilometerStats:
+    def __init__(self, keystone_user_id, keystone_tenant_id):
+        self.message_template = {'publisher_id': "monitoring_on_"+socket.gethostname(),
+                                 'priority':'INFO'}
+        self.keystone_user_id = keystone_user_id
+        self.keystone_tenant_id = keystone_tenant_id
+
+    def _get_stat_template(self):
+        retval = copy.copy(self.message_template)
+        retval['message_id'] = six.text_type(uuid.uuid4())
+        retval['timestamp'] = datetime.datetime.now().isoformat()
+        retval['payload'] = {'user_id':self.keystone_user_id,'project_id':self.keystone_tenant_id}
+        return retval
+
+    def get_stat(self,name,event_type,stats={}):
+        retval = self._get_stat_template()
+        retval['event_type']=event_type
+        retval['payload']['resource_id']=name
+        for k,v in stats.iteritems():
+            retval['payload'][k]=v
+        return retval
+
+
+class XaggStatsReader:
+    XAGG_COLUMNS=[
+        {"name":"dn_thruput_min","unit":"kb/s","type":"gauge"},
+        {"name":"dn_thruput_max","unit":"kb/s","type":"gauge"},
+        {"name":"dn_thruput_avg","unit":"kb/s","type":"gauge"},
+        {"name":"up_thruput_min","unit":"kb/s","type":"gauge"},
+        {"name":"up_thruput_max","unit":"kb/s","type":"gauge"},
+        {"name":"up_thruput_avg","unit":"kb/s","type":"gauge"},
+        {"name":"up_byte","unit":"B","type":"cumulative"},
+        {"name":"dn_byte","unit":"B","type":"cumulative"},
+        {"name":"up_pkt","unit":"packet","type":"cumulative"},
+        {"name":"dn_pkt","unit":"packet","type":"cumulative"},
+        {"name":"tcp_rtt","unit":"ms","type":"gauge"},
+        {"name":"tcp_dn_retrans","unit":"packet","type":"gauge"},
+        {"name":"tcp_up_retrans","unit":"packet","type":"gauge"},
+        {"name":"tcp_attempt","unit":"attempt","type":"gauge"},
+        {"name":"tcp_success","unit":"attempt","type":"gauge"}
+    ]
+    CSV_FILE_COLUMNS=["user_src_ip","user_dst_ip","enb_id","customer_group","technology",
+                      "handset","os","apn","service_category","service_type","service_name",
+                      "application_name","app_attempt","app_success","app_response_time",
+                      "dn_byte","dn_thruput_min","dn_thruput_max","dn_thruput_avg","up_byte",
+                      "up_thruput_min","up_thruput_max","up_thruput_avg","tcp_dn_retrans",
+                      "tcp_up_retrans","dn_pkt","up_pkt","tcp_rtt","tcp_attempt","tcp_success"]
+    def __init__(self, ceilometer_stats):
+        self.stats = ceilometer_stats
+
+    def get_stats(self, csvfile):
+        fp = open(csvfile)
+        f = csv.DictReader(filter(lambda row: row[0] !='#',fp),fieldnames=self.CSV_FILE_COLUMNS)
+        retval = []
+        for row in f:
+            name=row["user_src_ip"]+"_"+row["user_dst_ip"]
+            for stat in self.XAGG_COLUMNS:
+                stat['volume'] = row[stat["name"]]
+                retval.append(self.stats.get_stat(name,"passivetest.stats",stat))
+        return retval
+
+def periodic_publish(rabbit_mq,xagg_stats_reader):
+    for stats_file in glob.glob(XAGG_CSV_DIR+"/*.csv"):
+        if not os.path.isdir(stats_file):
+            stats = xagg_stats_reader.get_stats(stats_file)
+            for stat in stats:
+                rabbit_mq.publish(stat)
+            shutil.move(stats_file,XAGG_CSV_PROCESSED_DIR)
+
+    # Publish every minute
+    threading.Timer(60, periodic_publish, args=(rabbit_mq, xagg_stats_reader)).start()
+
+def main():
+    parser = argparse.ArgumentParser(description='Process xagg telemetry and send to ceilometer/monitoring service.')
+    for arg in ["keystone-tenant-id","keystone-user-id","rabbit-host","rabbit-user","rabbit-password","rabbit-exchange-name"]:
+        parser.add_argument("--"+arg,required=True)
+
+    args = parser.parse_args()
+
+    while True:
+        try:
+            rabbit_mq = RabbitMQ(args.rabbit_host, args.rabbit_user, args.rabbit_password, args.rabbit_exchange_name)
+            ceilometer_stats = CeilometerStats(args.keystone_user_id, args.keystone_tenant_id)
+            xagg_stats_reader = XaggStatsReader(ceilometer_stats)
+            periodic_publish(rabbit_mq,xagg_stats_reader)
+        except Exception as e:
+            print(e)
+            sys.stdout.flush()
+        print("Trying again in one minute...")
+        time.sleep(60)
+
+if __name__ == "__main__":
+   exit(main())