blob: fe3de12205a2f0a2a7940c5dfe75e59753f31024 [file] [log] [blame]
Matteo Scandoloeb0d11c2017-08-08 13:05:26 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
Scott Baker31acc652016-06-23 15:47:56 -070017#!/usr/bin/python
18import socket,thread
19import sys
20import fnmatch
21import operator
22import logging
23import ConfigParser
24from urlparse import urlparse
25from sflow_sub_records import *
26
27from flask import request, Request, jsonify
28from flask import Flask
29from flask import make_response
30app = Flask(__name__)
31
32COMPARATORS = {
33 'gt': operator.gt,
34 'lt': operator.lt,
35 'ge': operator.ge,
36 'le': operator.le,
37 'eq': operator.eq,
38 'ne': operator.ne,
39}
40
41LEVELS = {'DEBUG': logging.DEBUG,
42 'INFO': logging.INFO,
43 'WARNING': logging.WARNING,
44 'ERROR': logging.ERROR,
45 'CRITICAL': logging.CRITICAL}
46
47_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
48
49@app.route('/subscribe',methods=['POST'])
50def subscribe():
51 logging.debug(" SUB data:%s",request.data)
52 target = request.data
53 parse_target=urlparse(target)
54 if not parse_target.netloc:
55 err_str = "Error:Invalid target format"
56 logging.error("* Invalid target format")
57 return err_str
58
59 status = ""
60 if parse_target.scheme == "udp" :
61 host=parse_target.hostname
62 port=parse_target.port
63 scheme = parse_target.scheme
64 app_ip = host
65 app_port = port
66
67 if host == None or port == None :
68 err_str = "* Error: Invalid IP Address format"
69 logging.error("* Invalid IP Address format")
70 return err_str
71
72 subscrip_obj=sflow_sub_record(scheme,None,app_ip,app_port,None,None)
73 status = add_sflow_sub_record(subscrip_obj)
74 print_sflow_sub_records()
75
76 if parse_target.scheme == "kafka" :
77 pass
78 if parse_target.scheme == "file" :
79 pass
80 return status
81
82@app.route('/unsubscribe',methods=['POST'])
83def unsubscribe():
84 try :
85 target = request.data
86 parse_target=urlparse(target)
87 if not parse_target.netloc:
88 err_str = "Error:Invalid target format"
89 logging.error("* Invalid target format")
90 return err_str
91
92 status = ""
93 if parse_target.scheme == "udp" :
94 host=parse_target.hostname
95 port=parse_target.port
96 scheme = parse_target.scheme
97 app_ip = host
98 app_port = port
99
100 delete_sflow_sub_record(app_ip, app_port)
101 except Exception as e:
102 logging.error("* %s",e.__str__())
103 return e.__str__()
104 return "UnSubscrition is sucessful! \n"
105
106@app.errorhandler(404)
107def not_found(error):
108 return make_response(jsonify({'error': 'Not found'}), 404)
109
110def sflow_recv(host,port):
111 udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
112 udp.bind((host, port))
113 logging.info("Started sflow receive thread on %s:%s",host, str(port))
114
115 while True:
116 data, source = udp.recvfrom(64000)
117 for obj in sflow_sub_database:
118 target_host = obj.ipaddress
119 target_port = int(obj.portno)
120 try:
121 logging.debug("Replicating the sFlow data to:%s:%s",target_host, str(target_port))
122 udp.sendto(data,(target_host,target_port))
123 except Exception:
124 logging.error ("Unable to send sFlow data to target %s:%s ",target_host,str(target_port))
125 logging.warn("Exiting sflow receive thread")
126
127
128def initialize(host,port):
129 thread.start_new(sflow_recv,(host,port,))
130
131if __name__ == "__main__":
132
133 try:
134 config = ConfigParser.ConfigParser()
135 config.read('sflow_pub_sub.conf')
136 webserver_host = config.get('WEB_SERVER','webserver_host')
137 webserver_port = int (config.get('WEB_SERVER','webserver_port'))
138 sflow_listening_ip_addr = config.get('SFLOW','listening_ip_addr')
139 sflow_listening_port = int (config.get('SFLOW','listening_port'))
140
141 log_level = config.get('LOGGING','level')
142 log_file = config.get('LOGGING','filename')
143
144 level = LEVELS.get(log_level, logging.NOTSET)
145 logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
146 datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
147 except Exception as e:
148 print("* Error in config file:",e.__str__())
149 logging.error("* Error in confing file:%s",e.__str__())
150 else:
151 initialize(sflow_listening_ip_addr,sflow_listening_port)
152 app.run(host=webserver_host,port=webserver_port,debug=False)