blob: 539c1b352aaf590373a479cf2ee5e5723f0ba847 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# (C) Copyright Broadcom Corporation 2016
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from broadviewpublisherbase import BroadViewPublisherBase
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer, Producer
import six
import uuid
import datetime
from broadview_collector.serializers.bst_to_ceilometer import BSTToCeilometer
from broadview_collector.serializers.pt_to_ceilometer import PTToCeilometer
import json
import ConfigParser
import sys
try:
from oslo_log import log
except:
import logging as log
LOG = log.getLogger(__name__)
class BroadViewPublisher(BroadViewPublisherBase):
def readConfig(self):
try:
bvcfg = ConfigParser.ConfigParser()
bvcfg.read("/etc/broadviewcollector.conf")
self.rabbit_user = bvcfg.get("ceilometer", "rabbit_user")
self.rabbit_password = bvcfg.get("ceilometer", "rabbit_password")
self.rabbit_host = bvcfg.get("ceilometer", "rabbit_host")
self.rabbit_exchange = bvcfg.get("ceilometer", "rabbit_exchange")
except:
LOG.info("BroadViewPublisher: unable to read configuration")
def errback(self, exc, interval):
LOG.error('Error: %r', exc, exc_info=1)
LOG.info('Retry in %s seconds.', interval)
def setup_rabbit_mq_channel(self):
ceilometer_exchange = Exchange(self.rabbit_exchange, "topic", durable=False)
# connections/channels
connection = BrokerConnection(self.rabbit_host, self.rabbit_user, self.rabbit_password)
LOG.info("BroadViewPublisher: Connection to RabbitMQ server successful")
channel = connection.channel()
# produce
self._producer = Producer(channel, exchange=ceilometer_exchange, routing_key='notifications.info')
self._publish = connection.ensure(self._producer, self._producer.publish, errback=self.errback, max_retries=3)
def __init__(self):
self.rabbit_user = 'openstack'
self.rabbit_password = 'password'
self.rabbit_host = '1.2.3.4'
self.rabbit_exchange = 'broadview_service'
self._producer = None
self._publish = None
self.readConfig()
LOG.info("BroadViewPublisher: Ceilometer Publisher Initialized")
def publish(self, host, data):
code = 500
# get a producer if needed
if not self._producer:
self.setup_rabbit_mq_channel()
if self._producer:
code = 200
if self.isBST(data):
success, sdata = BSTToCeilometer().serialize(host, data)
elif self.isPT(data):
self._topic = "broadview-pt"
success, sdata = PTToCeilometer().serialize(host, data)
else:
success = False
if success:
sdata = json.loads(sdata)
if success:
for x in sdata:
try:
#self._producer.publish(x)
self._publish(x)
LOG.debug("BroadViewPublisher: Sent data to ceilometer exchange")
except Exception as e:
LOG.info('exception: {}'.format(e))
LOG.info('unable to send to ceilometer exchange {}: {}'.format(self.rabbit_exchange, sys.exc_info()[0]))
else:
code = 500
return code
def __repr__(self):
return "BroadView Ceilometer Publisher {} {}".format(self.rabbit_host, self.rabbit_exchange)