blob: d251dddb6b6a596d1d1798229adf20176ee2c805 [file] [log] [blame]
Matteo Scandoloeb0d11c2017-08-08 13:05:26 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
rdudyalab086cf32016-08-11 00:07:45 -040017import logging
18import logging.handlers
19import logging.config
20import ConfigParser
21import socket
22import msgpack
23from kombu.connection import BrokerConnection
24from kombu.messaging import Exchange, Queue, Consumer, Producer
25import six
26import uuid
27import datetime
28from oslo_utils import netutils
29from oslo_utils import timeutils
30from oslo_utils import units
31
32
33
34#logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False)
35class UdpService():
36 def __init__(self):
37 config = ConfigParser.ConfigParser()
38 config.read('udpagent.conf')
39 self.udp_address = config.get('udpservice','udp_address')
40 self.udp_port = int(config.get('udpservice','udp_port'))
41 self.rabbit_user = config.get('udpservice','rabbit_userid')
42 self.rabbit_password = config.get('udpservice','rabbit_password')
43 self.rabbit_host = config.get('udpservice','rabbit_hosts')
44 self.acord_control_exchange = config.get('udpservice','acord_control_exchange')
45 logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False)
46 def printconfig(self):
47 logging.debug("udp_address:%s",self.udp_address)
48 logging.debug("udp_port:%s",self.udp_port)
49 logging.debug("rabbit_user:%s",self.rabbit_user)
50 logging.debug("rabbit_password:%s",self.rabbit_password)
51 logging.debug("rabbit_hosts:%s",self.rabbit_host)
52 logging.debug("cord_control_exchange:%s",self.acord_control_exchange)
53
54 def convert_sample_to_event_data(self,msg):
55 event_data = {'event_type': 'infra','message_id':six.text_type(uuid.uuid4()),'publisher_id': 'cpe_publisher_id','timestamp':datetime.datetime.now().isoformat(),'priority':'INFO','payload':msg}
56 return event_data
57
rdudyala996d70b2016-10-13 17:40:55 +000058 def errback(self, exc, interval):
59 logging.error('Error: %r', exc, exc_info=1)
60 logging.info('Retry in %s seconds.', interval)
61
rdudyalab086cf32016-08-11 00:07:45 -040062 def setup_rabbit_mq_channel(self):
63 service_exchange = Exchange(self.acord_control_exchange, "topic", durable=False)
64 # connections/channels
65 connection = BrokerConnection(self.rabbit_host, self.rabbit_user, self.rabbit_password)
66 logging.info("Connection to RabbitMQ server successful")
67 channel = connection.channel()
68 # produce
69 self.producer = Producer(channel, exchange=service_exchange, routing_key='notifications.info')
rdudyala996d70b2016-10-13 17:40:55 +000070 self.publish = connection.ensure(self.producer, self.producer.publish, errback=self.errback, max_retries=3)
71
rdudyalab086cf32016-08-11 00:07:45 -040072
73 def start_udp(self):
74 address_family = socket.AF_INET
75 if netutils.is_valid_ipv6(self.udp_address):
76 address_family = socket.AF_INET6
77 udp = socket.socket(address_family, socket.SOCK_DGRAM)
78 udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
79 udp.bind((self.udp_address,
80 self.udp_port))
81
82 self.setup_rabbit_mq_channel()
83 self.udp_run = True
84 while self.udp_run:
85 # NOTE(jd) Arbitrary limit of 64K because that ought to be
86 # enough for anybody.
87 data, source = udp.recvfrom(64 * units.Ki)
88 try:
89 sample = msgpack.loads(data, encoding='utf-8')
90 except Exception:
91 logging.warning("UDP: Cannot decode data sent by %s", source)
92 else:
93 try:
94 if sample.has_key("event_type"):
rdudyala996d70b2016-10-13 17:40:55 +000095 #logging.debug("recevied event :%s",sample)
96 logging.debug("recevied event :%s",sample['event_type'])
97 #self.producer.publish(sample)
98 self.publish(sample)
rdudyalab086cf32016-08-11 00:07:45 -040099 else:
rdudyala996d70b2016-10-13 17:40:55 +0000100 #logging.debug("recevied Sample :%s",sample)
101 logging.debug("recevied Sample :%s",sample['counter_name'])
rdudyalab086cf32016-08-11 00:07:45 -0400102 msg = self.convert_sample_to_event_data(sample)
rdudyala996d70b2016-10-13 17:40:55 +0000103 #self.producer.publish(msg)
104 self.publish(msg)
rdudyalab086cf32016-08-11 00:07:45 -0400105 except Exception:
106 logging.exception("UDP: Unable to publish msg")
107
108
109def main():
110 try:
111 udpservice=UdpService()
112 udpservice.printconfig()
113 udpservice.start_udp()
114
115 except Exception as e:
116 logging.exception("* Error in starting udpagent:%s",e.__str__())
117
118
119
120if __name__ == "__main__":
121 main()