blob: 1cac7846ba912c899aa603609783c8ef9500a847 [file] [log] [blame]
import logging
import logging.handlers
import logging.config
import ConfigParser
import socket
import msgpack
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
import six
import uuid
import datetime
from oslo_utils import netutils
from oslo_utils import timeutils
from oslo_utils import units
#logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False)
class UdpService():
def __init__(self):
config = ConfigParser.ConfigParser()
config.read('udpagent.conf')
self.udp_address = config.get('udpservice','udp_address')
self.udp_port = int(config.get('udpservice','udp_port'))
self.rabbit_user = config.get('udpservice','rabbit_userid')
self.rabbit_password = config.get('udpservice','rabbit_password')
self.rabbit_host = config.get('udpservice','rabbit_hosts')
self.acord_control_exchange = config.get('udpservice','acord_control_exchange')
logging.config.fileConfig('udpagent.conf', disable_existing_loggers=False)
def printconfig(self):
logging.debug("udp_address:%s",self.udp_address)
logging.debug("udp_port:%s",self.udp_port)
logging.debug("rabbit_user:%s",self.rabbit_user)
logging.debug("rabbit_password:%s",self.rabbit_password)
logging.debug("rabbit_hosts:%s",self.rabbit_host)
logging.debug("cord_control_exchange:%s",self.acord_control_exchange)
def convert_sample_to_event_data(self,msg):
event_data = {'event_type': 'infra','message_id':six.text_type(uuid.uuid4()),'publisher_id': 'cpe_publisher_id','timestamp':datetime.datetime.now().isoformat(),'priority':'INFO','payload':msg}
return event_data
def errback(self, exc, interval):
logging.error('Error: %r', exc, exc_info=1)
logging.info('Retry in %s seconds.', interval)
def setup_rabbit_mq_channel(self):
service_exchange = Exchange(self.acord_control_exchange, "topic", durable=False)
# connections/channels
connection = BrokerConnection(self.rabbit_host, self.rabbit_user, self.rabbit_password)
logging.info("Connection to RabbitMQ server successful")
channel = connection.channel()
# produce
self.producer = Producer(channel, exchange=service_exchange, routing_key='notifications.info')
self.publish = connection.ensure(self.producer, self.producer.publish, errback=self.errback, max_retries=3)
def start_udp(self):
address_family = socket.AF_INET
if netutils.is_valid_ipv6(self.udp_address):
address_family = socket.AF_INET6
udp = socket.socket(address_family, socket.SOCK_DGRAM)
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp.bind((self.udp_address,
self.udp_port))
self.setup_rabbit_mq_channel()
self.udp_run = True
while self.udp_run:
# NOTE(jd) Arbitrary limit of 64K because that ought to be
# enough for anybody.
data, source = udp.recvfrom(64 * units.Ki)
try:
sample = msgpack.loads(data, encoding='utf-8')
except Exception:
logging.warning("UDP: Cannot decode data sent by %s", source)
else:
try:
if sample.has_key("event_type"):
#logging.debug("recevied event :%s",sample)
logging.debug("recevied event :%s",sample['event_type'])
#self.producer.publish(sample)
self.publish(sample)
else:
#logging.debug("recevied Sample :%s",sample)
logging.debug("recevied Sample :%s",sample['counter_name'])
msg = self.convert_sample_to_event_data(sample)
#self.producer.publish(msg)
self.publish(msg)
except Exception:
logging.exception("UDP: Unable to publish msg")
def main():
try:
udpservice=UdpService()
udpservice.printconfig()
udpservice.start_udp()
except Exception as e:
logging.exception("* Error in starting udpagent:%s",e.__str__())
if __name__ == "__main__":
main()