blob: 12767213d9a990f40d0153376bc02d09d769d89a [file] [log] [blame]
Scott Baker31acc652016-06-23 15:47:56 -07001#!/usr/bin/python
2import socket,thread
3import sys
4import fnmatch
5import operator
6import logging
7import ConfigParser
8from urlparse import urlparse
9from sflow_sub_records import *
10
11from flask import request, Request, jsonify
12from flask import Flask
13from flask import make_response
14app = Flask(__name__)
15
16COMPARATORS = {
17 'gt': operator.gt,
18 'lt': operator.lt,
19 'ge': operator.ge,
20 'le': operator.le,
21 'eq': operator.eq,
22 'ne': operator.ne,
23}
24
25LEVELS = {'DEBUG': logging.DEBUG,
26 'INFO': logging.INFO,
27 'WARNING': logging.WARNING,
28 'ERROR': logging.ERROR,
29 'CRITICAL': logging.CRITICAL}
30
31_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
32
33@app.route('/subscribe',methods=['POST'])
34def subscribe():
35 logging.debug(" SUB data:%s",request.data)
36 target = request.data
37 parse_target=urlparse(target)
38 if not parse_target.netloc:
39 err_str = "Error:Invalid target format"
40 logging.error("* Invalid target format")
41 return err_str
42
43 status = ""
44 if parse_target.scheme == "udp" :
45 host=parse_target.hostname
46 port=parse_target.port
47 scheme = parse_target.scheme
48 app_ip = host
49 app_port = port
50
51 if host == None or port == None :
52 err_str = "* Error: Invalid IP Address format"
53 logging.error("* Invalid IP Address format")
54 return err_str
55
56 subscrip_obj=sflow_sub_record(scheme,None,app_ip,app_port,None,None)
57 status = add_sflow_sub_record(subscrip_obj)
58 print_sflow_sub_records()
59
60 if parse_target.scheme == "kafka" :
61 pass
62 if parse_target.scheme == "file" :
63 pass
64 return status
65
66@app.route('/unsubscribe',methods=['POST'])
67def unsubscribe():
68 try :
69 target = request.data
70 parse_target=urlparse(target)
71 if not parse_target.netloc:
72 err_str = "Error:Invalid target format"
73 logging.error("* Invalid target format")
74 return err_str
75
76 status = ""
77 if parse_target.scheme == "udp" :
78 host=parse_target.hostname
79 port=parse_target.port
80 scheme = parse_target.scheme
81 app_ip = host
82 app_port = port
83
84 delete_sflow_sub_record(app_ip, app_port)
85 except Exception as e:
86 logging.error("* %s",e.__str__())
87 return e.__str__()
88 return "UnSubscrition is sucessful! \n"
89
90@app.errorhandler(404)
91def not_found(error):
92 return make_response(jsonify({'error': 'Not found'}), 404)
93
94def sflow_recv(host,port):
95 udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
96 udp.bind((host, port))
97 logging.info("Started sflow receive thread on %s:%s",host, str(port))
98
99 while True:
100 data, source = udp.recvfrom(64000)
101 for obj in sflow_sub_database:
102 target_host = obj.ipaddress
103 target_port = int(obj.portno)
104 try:
105 logging.debug("Replicating the sFlow data to:%s:%s",target_host, str(target_port))
106 udp.sendto(data,(target_host,target_port))
107 except Exception:
108 logging.error ("Unable to send sFlow data to target %s:%s ",target_host,str(target_port))
109 logging.warn("Exiting sflow receive thread")
110
111
112def initialize(host,port):
113 thread.start_new(sflow_recv,(host,port,))
114
115if __name__ == "__main__":
116
117 try:
118 config = ConfigParser.ConfigParser()
119 config.read('sflow_pub_sub.conf')
120 webserver_host = config.get('WEB_SERVER','webserver_host')
121 webserver_port = int (config.get('WEB_SERVER','webserver_port'))
122 sflow_listening_ip_addr = config.get('SFLOW','listening_ip_addr')
123 sflow_listening_port = int (config.get('SFLOW','listening_port'))
124
125 log_level = config.get('LOGGING','level')
126 log_file = config.get('LOGGING','filename')
127
128 level = LEVELS.get(log_level, logging.NOTSET)
129 logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
130 datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
131 except Exception as e:
132 print("* Error in config file:",e.__str__())
133 logging.error("* Error in confing file:%s",e.__str__())
134 else:
135 initialize(sflow_listening_ip_addr,sflow_listening_port)
136 app.run(host=webserver_host,port=webserver_port,debug=False)