Gabe Black | 9062322 | 2017-01-18 19:52:28 +0000 | [diff] [blame] | 1 | import six, uuid, csv, datetime, threading, socket, shutil, argparse, glob, os, copy, pprint, time, sys |
| 2 | from kombu.connection import BrokerConnection |
| 3 | from kombu.messaging import Exchange, Queue, Consumer, Producer |
| 4 | |
| 5 | XAGG_CSV_DIR="/xsight/var/opt/xagg/tmp" |
| 6 | XAGG_CSV_PROCESSED_DIR="/xsight/var/opt/xagg/tmp/processed" |
| 7 | |
| 8 | class RabbitMQ: |
| 9 | def __init__(self, rabbit_host, rabbit_user, rabbit_password, exchange_name): |
| 10 | exchange = Exchange(exchange_name, "topic", durable=False) |
| 11 | connection = BrokerConnection(rabbit_host, rabbit_user, rabbit_password) |
| 12 | channel = connection.channel() |
| 13 | self.producer = Producer(channel, exchange=exchange, routing_key="notifications.info") |
| 14 | |
| 15 | def publish(self, stats): |
| 16 | self.producer.publish(stats) |
| 17 | |
| 18 | |
| 19 | class CeilometerStats: |
| 20 | def __init__(self, keystone_user_id, keystone_tenant_id): |
| 21 | self.message_template = {'publisher_id': "monitoring_on_"+socket.gethostname(), |
| 22 | 'priority':'INFO'} |
| 23 | self.keystone_user_id = keystone_user_id |
| 24 | self.keystone_tenant_id = keystone_tenant_id |
| 25 | |
| 26 | def _get_stat_template(self): |
| 27 | retval = copy.copy(self.message_template) |
| 28 | retval['message_id'] = six.text_type(uuid.uuid4()) |
| 29 | retval['timestamp'] = datetime.datetime.now().isoformat() |
| 30 | retval['payload'] = {'user_id':self.keystone_user_id,'project_id':self.keystone_tenant_id} |
| 31 | return retval |
| 32 | |
| 33 | def get_stat(self,name,event_type,stats={}): |
| 34 | retval = self._get_stat_template() |
| 35 | retval['event_type']=event_type |
| 36 | retval['payload']['resource_id']=name |
| 37 | for k,v in stats.iteritems(): |
| 38 | retval['payload'][k]=v |
| 39 | return retval |
| 40 | |
| 41 | |
| 42 | class XaggStatsReader: |
| 43 | XAGG_COLUMNS=[ |
| 44 | {"name":"dn_thruput_min","unit":"kb/s","type":"gauge"}, |
| 45 | {"name":"dn_thruput_max","unit":"kb/s","type":"gauge"}, |
| 46 | {"name":"dn_thruput_avg","unit":"kb/s","type":"gauge"}, |
| 47 | {"name":"up_thruput_min","unit":"kb/s","type":"gauge"}, |
| 48 | {"name":"up_thruput_max","unit":"kb/s","type":"gauge"}, |
| 49 | {"name":"up_thruput_avg","unit":"kb/s","type":"gauge"}, |
| 50 | {"name":"up_byte","unit":"B","type":"cumulative"}, |
| 51 | {"name":"dn_byte","unit":"B","type":"cumulative"}, |
| 52 | {"name":"up_pkt","unit":"packet","type":"cumulative"}, |
| 53 | {"name":"dn_pkt","unit":"packet","type":"cumulative"}, |
| 54 | {"name":"tcp_rtt","unit":"ms","type":"gauge"}, |
| 55 | {"name":"tcp_dn_retrans","unit":"packet","type":"gauge"}, |
| 56 | {"name":"tcp_up_retrans","unit":"packet","type":"gauge"}, |
| 57 | {"name":"tcp_attempt","unit":"attempt","type":"gauge"}, |
| 58 | {"name":"tcp_success","unit":"attempt","type":"gauge"} |
| 59 | ] |
| 60 | CSV_FILE_COLUMNS=["user_src_ip","user_dst_ip","enb_id","customer_group","technology", |
| 61 | "handset","os","apn","service_category","service_type","service_name", |
| 62 | "application_name","app_attempt","app_success","app_response_time", |
| 63 | "dn_byte","dn_thruput_min","dn_thruput_max","dn_thruput_avg","up_byte", |
| 64 | "up_thruput_min","up_thruput_max","up_thruput_avg","tcp_dn_retrans", |
| 65 | "tcp_up_retrans","dn_pkt","up_pkt","tcp_rtt","tcp_attempt","tcp_success"] |
| 66 | def __init__(self, ceilometer_stats): |
| 67 | self.stats = ceilometer_stats |
| 68 | |
| 69 | def get_stats(self, csvfile): |
| 70 | fp = open(csvfile) |
| 71 | f = csv.DictReader(filter(lambda row: row[0] !='#',fp),fieldnames=self.CSV_FILE_COLUMNS) |
| 72 | retval = [] |
| 73 | for row in f: |
| 74 | name=row["user_src_ip"]+"_"+row["user_dst_ip"] |
| 75 | for stat in self.XAGG_COLUMNS: |
| 76 | stat['volume'] = row[stat["name"]] |
| 77 | retval.append(self.stats.get_stat(name,"passivetest.stats",stat)) |
| 78 | return retval |
| 79 | |
| 80 | def periodic_publish(rabbit_mq,xagg_stats_reader): |
| 81 | for stats_file in glob.glob(XAGG_CSV_DIR+"/*.csv"): |
| 82 | if not os.path.isdir(stats_file): |
| 83 | stats = xagg_stats_reader.get_stats(stats_file) |
| 84 | for stat in stats: |
| 85 | rabbit_mq.publish(stat) |
| 86 | shutil.move(stats_file,XAGG_CSV_PROCESSED_DIR) |
| 87 | |
| 88 | # Publish every minute |
| 89 | threading.Timer(60, periodic_publish, args=(rabbit_mq, xagg_stats_reader)).start() |
| 90 | |
| 91 | def main(): |
| 92 | parser = argparse.ArgumentParser(description='Process xagg telemetry and send to ceilometer/monitoring service.') |
| 93 | for arg in ["keystone-tenant-id","keystone-user-id","rabbit-host","rabbit-user","rabbit-password","rabbit-exchange-name"]: |
| 94 | parser.add_argument("--"+arg,required=True) |
| 95 | |
| 96 | args = parser.parse_args() |
| 97 | |
| 98 | while True: |
| 99 | try: |
| 100 | rabbit_mq = RabbitMQ(args.rabbit_host, args.rabbit_user, args.rabbit_password, args.rabbit_exchange_name) |
| 101 | ceilometer_stats = CeilometerStats(args.keystone_user_id, args.keystone_tenant_id) |
| 102 | xagg_stats_reader = XaggStatsReader(ceilometer_stats) |
| 103 | periodic_publish(rabbit_mq,xagg_stats_reader) |
| 104 | except Exception as e: |
| 105 | print(e) |
| 106 | sys.stdout.flush() |
| 107 | print("Trying again in one minute...") |
| 108 | time.sleep(60) |
| 109 | |
| 110 | if __name__ == "__main__": |
| 111 | exit(main()) |