blob: 81826ad73ffab8a8c21fbcdeed0906b9ca089820 [file] [log] [blame]
rdudyalab086cf32016-08-11 00:07:45 -04001import logging
2import logging.handlers
3import logging.config
4import ConfigParser
5import socket
6import msgpack
7from kombu.connection import BrokerConnection
8from kombu.messaging import Exchange, Queue, Consumer, Producer
9import six
10import uuid
11import datetime
12from oslo_utils import netutils
13from oslo_utils import timeutils
14from oslo_utils import units
15
16
17
18#logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False)
19class UdpService():
20 def __init__(self):
21 config = ConfigParser.ConfigParser()
22 config.read('udpagent.conf')
23 self.udp_address = config.get('udpservice','udp_address')
24 self.udp_port = int(config.get('udpservice','udp_port'))
25 self.rabbit_user = config.get('udpservice','rabbit_userid')
26 self.rabbit_password = config.get('udpservice','rabbit_password')
27 self.rabbit_host = config.get('udpservice','rabbit_hosts')
28 self.acord_control_exchange = config.get('udpservice','acord_control_exchange')
29 logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False)
30 def printconfig(self):
31 logging.debug("udp_address:%s",self.udp_address)
32 logging.debug("udp_port:%s",self.udp_port)
33 logging.debug("rabbit_user:%s",self.rabbit_user)
34 logging.debug("rabbit_password:%s",self.rabbit_password)
35 logging.debug("rabbit_hosts:%s",self.rabbit_host)
36 logging.debug("cord_control_exchange:%s",self.acord_control_exchange)
37
38 def convert_sample_to_event_data(self,msg):
39 event_data = {'event_type': 'infra','message_id':six.text_type(uuid.uuid4()),'publisher_id': 'cpe_publisher_id','timestamp':datetime.datetime.now().isoformat(),'priority':'INFO','payload':msg}
40 return event_data
41
42 def setup_rabbit_mq_channel(self):
43 service_exchange = Exchange(self.acord_control_exchange, "topic", durable=False)
44 # connections/channels
45 connection = BrokerConnection(self.rabbit_host, self.rabbit_user, self.rabbit_password)
46 logging.info("Connection to RabbitMQ server successful")
47 channel = connection.channel()
48 # produce
49 self.producer = Producer(channel, exchange=service_exchange, routing_key='notifications.info')
50
51 def start_udp(self):
52 address_family = socket.AF_INET
53 if netutils.is_valid_ipv6(self.udp_address):
54 address_family = socket.AF_INET6
55 udp = socket.socket(address_family, socket.SOCK_DGRAM)
56 udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
57 udp.bind((self.udp_address,
58 self.udp_port))
59
60 self.setup_rabbit_mq_channel()
61 self.udp_run = True
62 while self.udp_run:
63 # NOTE(jd) Arbitrary limit of 64K because that ought to be
64 # enough for anybody.
65 data, source = udp.recvfrom(64 * units.Ki)
66 try:
67 sample = msgpack.loads(data, encoding='utf-8')
68 except Exception:
69 logging.warning("UDP: Cannot decode data sent by %s", source)
70 else:
71 try:
72 if sample.has_key("event_type"):
73 logging.debug("recevied event :%s",sample)
74 self.producer.publish(sample)
75 else:
76 logging.debug("recevied Sample :%s",sample)
77 msg = self.convert_sample_to_event_data(sample)
78 self.producer.publish(msg)
79 except Exception:
80 logging.exception("UDP: Unable to publish msg")
81
82
83def main():
84 try:
85 udpservice=UdpService()
86 udpservice.printconfig()
87 udpservice.start_udp()
88
89 except Exception as e:
90 logging.exception("* Error in starting udpagent:%s",e.__str__())
91
92
93
94if __name__ == "__main__":
95 main()