Matteo Scandolo | eb0d11c | 2017-08-08 13:05:26 -0700 | [diff] [blame] | 1 | |
| 2 | # Copyright 2017-present Open Networking Foundation |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 17 | import logging |
| 18 | import logging.handlers |
| 19 | import logging.config |
| 20 | import ConfigParser |
| 21 | import socket |
| 22 | import msgpack |
| 23 | from kombu.connection import BrokerConnection |
| 24 | from kombu.messaging import Exchange, Queue, Consumer, Producer |
| 25 | import six |
| 26 | import uuid |
| 27 | import datetime |
| 28 | from oslo_utils import netutils |
| 29 | from oslo_utils import timeutils |
| 30 | from oslo_utils import units |
| 31 | |
| 32 | |
| 33 | |
| 34 | #logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False) |
| 35 | class UdpService(): |
| 36 | def __init__(self): |
| 37 | config = ConfigParser.ConfigParser() |
| 38 | config.read('udpagent.conf') |
| 39 | self.udp_address = config.get('udpservice','udp_address') |
| 40 | self.udp_port = int(config.get('udpservice','udp_port')) |
| 41 | self.rabbit_user = config.get('udpservice','rabbit_userid') |
| 42 | self.rabbit_password = config.get('udpservice','rabbit_password') |
| 43 | self.rabbit_host = config.get('udpservice','rabbit_hosts') |
| 44 | self.acord_control_exchange = config.get('udpservice','acord_control_exchange') |
| 45 | logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False) |
| 46 | def printconfig(self): |
| 47 | logging.debug("udp_address:%s",self.udp_address) |
| 48 | logging.debug("udp_port:%s",self.udp_port) |
| 49 | logging.debug("rabbit_user:%s",self.rabbit_user) |
| 50 | logging.debug("rabbit_password:%s",self.rabbit_password) |
| 51 | logging.debug("rabbit_hosts:%s",self.rabbit_host) |
| 52 | logging.debug("cord_control_exchange:%s",self.acord_control_exchange) |
| 53 | |
| 54 | def convert_sample_to_event_data(self,msg): |
| 55 | event_data = {'event_type': 'infra','message_id':six.text_type(uuid.uuid4()),'publisher_id': 'cpe_publisher_id','timestamp':datetime.datetime.now().isoformat(),'priority':'INFO','payload':msg} |
| 56 | return event_data |
| 57 | |
rdudyala | 996d70b | 2016-10-13 17:40:55 +0000 | [diff] [blame] | 58 | def errback(self, exc, interval): |
| 59 | logging.error('Error: %r', exc, exc_info=1) |
| 60 | logging.info('Retry in %s seconds.', interval) |
| 61 | |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 62 | def setup_rabbit_mq_channel(self): |
| 63 | service_exchange = Exchange(self.acord_control_exchange, "topic", durable=False) |
| 64 | # connections/channels |
| 65 | connection = BrokerConnection(self.rabbit_host, self.rabbit_user, self.rabbit_password) |
| 66 | logging.info("Connection to RabbitMQ server successful") |
| 67 | channel = connection.channel() |
| 68 | # produce |
| 69 | self.producer = Producer(channel, exchange=service_exchange, routing_key='notifications.info') |
rdudyala | 996d70b | 2016-10-13 17:40:55 +0000 | [diff] [blame] | 70 | self.publish = connection.ensure(self.producer, self.producer.publish, errback=self.errback, max_retries=3) |
| 71 | |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 72 | |
| 73 | def start_udp(self): |
| 74 | address_family = socket.AF_INET |
| 75 | if netutils.is_valid_ipv6(self.udp_address): |
| 76 | address_family = socket.AF_INET6 |
| 77 | udp = socket.socket(address_family, socket.SOCK_DGRAM) |
| 78 | udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 79 | udp.bind((self.udp_address, |
| 80 | self.udp_port)) |
| 81 | |
| 82 | self.setup_rabbit_mq_channel() |
| 83 | self.udp_run = True |
| 84 | while self.udp_run: |
| 85 | # NOTE(jd) Arbitrary limit of 64K because that ought to be |
| 86 | # enough for anybody. |
| 87 | data, source = udp.recvfrom(64 * units.Ki) |
| 88 | try: |
| 89 | sample = msgpack.loads(data, encoding='utf-8') |
| 90 | except Exception: |
| 91 | logging.warning("UDP: Cannot decode data sent by %s", source) |
| 92 | else: |
| 93 | try: |
| 94 | if sample.has_key("event_type"): |
rdudyala | 996d70b | 2016-10-13 17:40:55 +0000 | [diff] [blame] | 95 | #logging.debug("recevied event :%s",sample) |
| 96 | logging.debug("recevied event :%s",sample['event_type']) |
| 97 | #self.producer.publish(sample) |
| 98 | self.publish(sample) |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 99 | else: |
rdudyala | 996d70b | 2016-10-13 17:40:55 +0000 | [diff] [blame] | 100 | #logging.debug("recevied Sample :%s",sample) |
| 101 | logging.debug("recevied Sample :%s",sample['counter_name']) |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 102 | msg = self.convert_sample_to_event_data(sample) |
rdudyala | 996d70b | 2016-10-13 17:40:55 +0000 | [diff] [blame] | 103 | #self.producer.publish(msg) |
| 104 | self.publish(msg) |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 105 | except Exception: |
| 106 | logging.exception("UDP: Unable to publish msg") |
| 107 | |
| 108 | |
| 109 | def main(): |
| 110 | try: |
| 111 | udpservice=UdpService() |
| 112 | udpservice.printconfig() |
| 113 | udpservice.start_udp() |
| 114 | |
| 115 | except Exception as e: |
| 116 | logging.exception("* Error in starting udpagent:%s",e.__str__()) |
| 117 | |
| 118 | |
| 119 | |
| 120 | if __name__ == "__main__": |
| 121 | main() |