rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 1 | import logging |
| 2 | import logging.handlers |
| 3 | import logging.config |
| 4 | import ConfigParser |
| 5 | import socket |
| 6 | import msgpack |
| 7 | from kombu.connection import BrokerConnection |
| 8 | from kombu.messaging import Exchange, Queue, Consumer, Producer |
| 9 | import six |
| 10 | import uuid |
| 11 | import datetime |
| 12 | from oslo_utils import netutils |
| 13 | from oslo_utils import timeutils |
| 14 | from oslo_utils import units |
| 15 | |
| 16 | |
| 17 | |
| 18 | #logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False) |
| 19 | class UdpService(): |
| 20 | def __init__(self): |
| 21 | config = ConfigParser.ConfigParser() |
| 22 | config.read('udpagent.conf') |
| 23 | self.udp_address = config.get('udpservice','udp_address') |
| 24 | self.udp_port = int(config.get('udpservice','udp_port')) |
| 25 | self.rabbit_user = config.get('udpservice','rabbit_userid') |
| 26 | self.rabbit_password = config.get('udpservice','rabbit_password') |
| 27 | self.rabbit_host = config.get('udpservice','rabbit_hosts') |
| 28 | self.acord_control_exchange = config.get('udpservice','acord_control_exchange') |
| 29 | logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False) |
| 30 | def printconfig(self): |
| 31 | logging.debug("udp_address:%s",self.udp_address) |
| 32 | logging.debug("udp_port:%s",self.udp_port) |
| 33 | logging.debug("rabbit_user:%s",self.rabbit_user) |
| 34 | logging.debug("rabbit_password:%s",self.rabbit_password) |
| 35 | logging.debug("rabbit_hosts:%s",self.rabbit_host) |
| 36 | logging.debug("cord_control_exchange:%s",self.acord_control_exchange) |
| 37 | |
| 38 | def convert_sample_to_event_data(self,msg): |
| 39 | event_data = {'event_type': 'infra','message_id':six.text_type(uuid.uuid4()),'publisher_id': 'cpe_publisher_id','timestamp':datetime.datetime.now().isoformat(),'priority':'INFO','payload':msg} |
| 40 | return event_data |
| 41 | |
rdudyala | 996d70b | 2016-10-13 17:40:55 +0000 | [diff] [blame] | 42 | def errback(self, exc, interval): |
| 43 | logging.error('Error: %r', exc, exc_info=1) |
| 44 | logging.info('Retry in %s seconds.', interval) |
| 45 | |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 46 | def setup_rabbit_mq_channel(self): |
| 47 | service_exchange = Exchange(self.acord_control_exchange, "topic", durable=False) |
| 48 | # connections/channels |
| 49 | connection = BrokerConnection(self.rabbit_host, self.rabbit_user, self.rabbit_password) |
| 50 | logging.info("Connection to RabbitMQ server successful") |
| 51 | channel = connection.channel() |
| 52 | # produce |
| 53 | self.producer = Producer(channel, exchange=service_exchange, routing_key='notifications.info') |
rdudyala | 996d70b | 2016-10-13 17:40:55 +0000 | [diff] [blame] | 54 | self.publish = connection.ensure(self.producer, self.producer.publish, errback=self.errback, max_retries=3) |
| 55 | |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 56 | |
| 57 | def start_udp(self): |
| 58 | address_family = socket.AF_INET |
| 59 | if netutils.is_valid_ipv6(self.udp_address): |
| 60 | address_family = socket.AF_INET6 |
| 61 | udp = socket.socket(address_family, socket.SOCK_DGRAM) |
| 62 | udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 63 | udp.bind((self.udp_address, |
| 64 | self.udp_port)) |
| 65 | |
| 66 | self.setup_rabbit_mq_channel() |
| 67 | self.udp_run = True |
| 68 | while self.udp_run: |
| 69 | # NOTE(jd) Arbitrary limit of 64K because that ought to be |
| 70 | # enough for anybody. |
| 71 | data, source = udp.recvfrom(64 * units.Ki) |
| 72 | try: |
| 73 | sample = msgpack.loads(data, encoding='utf-8') |
| 74 | except Exception: |
| 75 | logging.warning("UDP: Cannot decode data sent by %s", source) |
| 76 | else: |
| 77 | try: |
| 78 | if sample.has_key("event_type"): |
rdudyala | 996d70b | 2016-10-13 17:40:55 +0000 | [diff] [blame] | 79 | #logging.debug("recevied event :%s",sample) |
| 80 | logging.debug("recevied event :%s",sample['event_type']) |
| 81 | #self.producer.publish(sample) |
| 82 | self.publish(sample) |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 83 | else: |
rdudyala | 996d70b | 2016-10-13 17:40:55 +0000 | [diff] [blame] | 84 | #logging.debug("recevied Sample :%s",sample) |
| 85 | logging.debug("recevied Sample :%s",sample['counter_name']) |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 86 | msg = self.convert_sample_to_event_data(sample) |
rdudyala | 996d70b | 2016-10-13 17:40:55 +0000 | [diff] [blame] | 87 | #self.producer.publish(msg) |
| 88 | self.publish(msg) |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 89 | except Exception: |
| 90 | logging.exception("UDP: Unable to publish msg") |
| 91 | |
| 92 | |
| 93 | def main(): |
| 94 | try: |
| 95 | udpservice=UdpService() |
| 96 | udpservice.printconfig() |
| 97 | udpservice.start_udp() |
| 98 | |
| 99 | except Exception as e: |
| 100 | logging.exception("* Error in starting udpagent:%s",e.__str__()) |
| 101 | |
| 102 | |
| 103 | |
| 104 | if __name__ == "__main__": |
| 105 | main() |