blob: 1cac7846ba912c899aa603609783c8ef9500a847 [file] [log] [blame]
rdudyalab086cf32016-08-11 00:07:45 -04001import logging
2import logging.handlers
3import logging.config
4import ConfigParser
5import socket
6import msgpack
7from kombu.connection import BrokerConnection
8from kombu.messaging import Exchange, Queue, Consumer, Producer
9import six
10import uuid
11import datetime
12from oslo_utils import netutils
13from oslo_utils import timeutils
14from oslo_utils import units
15
16
17
18#logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False)
19class UdpService():
20 def __init__(self):
21 config = ConfigParser.ConfigParser()
22 config.read('udpagent.conf')
23 self.udp_address = config.get('udpservice','udp_address')
24 self.udp_port = int(config.get('udpservice','udp_port'))
25 self.rabbit_user = config.get('udpservice','rabbit_userid')
26 self.rabbit_password = config.get('udpservice','rabbit_password')
27 self.rabbit_host = config.get('udpservice','rabbit_hosts')
28 self.acord_control_exchange = config.get('udpservice','acord_control_exchange')
29 logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False)
30 def printconfig(self):
31 logging.debug("udp_address:%s",self.udp_address)
32 logging.debug("udp_port:%s",self.udp_port)
33 logging.debug("rabbit_user:%s",self.rabbit_user)
34 logging.debug("rabbit_password:%s",self.rabbit_password)
35 logging.debug("rabbit_hosts:%s",self.rabbit_host)
36 logging.debug("cord_control_exchange:%s",self.acord_control_exchange)
37
38 def convert_sample_to_event_data(self,msg):
39 event_data = {'event_type': 'infra','message_id':six.text_type(uuid.uuid4()),'publisher_id': 'cpe_publisher_id','timestamp':datetime.datetime.now().isoformat(),'priority':'INFO','payload':msg}
40 return event_data
41
rdudyala996d70b2016-10-13 17:40:55 +000042 def errback(self, exc, interval):
43 logging.error('Error: %r', exc, exc_info=1)
44 logging.info('Retry in %s seconds.', interval)
45
rdudyalab086cf32016-08-11 00:07:45 -040046 def setup_rabbit_mq_channel(self):
47 service_exchange = Exchange(self.acord_control_exchange, "topic", durable=False)
48 # connections/channels
49 connection = BrokerConnection(self.rabbit_host, self.rabbit_user, self.rabbit_password)
50 logging.info("Connection to RabbitMQ server successful")
51 channel = connection.channel()
52 # produce
53 self.producer = Producer(channel, exchange=service_exchange, routing_key='notifications.info')
rdudyala996d70b2016-10-13 17:40:55 +000054 self.publish = connection.ensure(self.producer, self.producer.publish, errback=self.errback, max_retries=3)
55
rdudyalab086cf32016-08-11 00:07:45 -040056
57 def start_udp(self):
58 address_family = socket.AF_INET
59 if netutils.is_valid_ipv6(self.udp_address):
60 address_family = socket.AF_INET6
61 udp = socket.socket(address_family, socket.SOCK_DGRAM)
62 udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
63 udp.bind((self.udp_address,
64 self.udp_port))
65
66 self.setup_rabbit_mq_channel()
67 self.udp_run = True
68 while self.udp_run:
69 # NOTE(jd) Arbitrary limit of 64K because that ought to be
70 # enough for anybody.
71 data, source = udp.recvfrom(64 * units.Ki)
72 try:
73 sample = msgpack.loads(data, encoding='utf-8')
74 except Exception:
75 logging.warning("UDP: Cannot decode data sent by %s", source)
76 else:
77 try:
78 if sample.has_key("event_type"):
rdudyala996d70b2016-10-13 17:40:55 +000079 #logging.debug("recevied event :%s",sample)
80 logging.debug("recevied event :%s",sample['event_type'])
81 #self.producer.publish(sample)
82 self.publish(sample)
rdudyalab086cf32016-08-11 00:07:45 -040083 else:
rdudyala996d70b2016-10-13 17:40:55 +000084 #logging.debug("recevied Sample :%s",sample)
85 logging.debug("recevied Sample :%s",sample['counter_name'])
rdudyalab086cf32016-08-11 00:07:45 -040086 msg = self.convert_sample_to_event_data(sample)
rdudyala996d70b2016-10-13 17:40:55 +000087 #self.producer.publish(msg)
88 self.publish(msg)
rdudyalab086cf32016-08-11 00:07:45 -040089 except Exception:
90 logging.exception("UDP: Unable to publish msg")
91
92
93def main():
94 try:
95 udpservice=UdpService()
96 udpservice.printconfig()
97 udpservice.start_udp()
98
99 except Exception as e:
100 logging.exception("* Error in starting udpagent:%s",e.__str__())
101
102
103
104if __name__ == "__main__":
105 main()