blob: 22604ef341c765368f5485d81bc86fbe5290939a [file] [log] [blame]
#
# Copyright 2015 Cisco Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import kafka
from oslo_utils import netutils
from six.moves.urllib import parse as urlparse
import logging as LOG
class KafkaBrokerPublisher():
def __init__(self, parsed_url):
self.kafka_client = None
self.kafka_server = None
self.kafka_consumer = None
self.host, self.port = netutils.parse_host_port(
parsed_url.netloc, default_port=9092)
self.local_queue = []
params = urlparse.parse_qs(parsed_url.query)
self.topic = params.get('topic', ['ceilometer'])[-1]
self.policy = params.get('policy', ['default'])[-1]
self.max_queue_length = int(params.get(
'max_queue_length', [1024])[-1])
self.max_retry = int(params.get('max_retry', [100])[-1])
if self.policy in ['default', 'drop', 'queue']:
LOG.info(('Publishing policy set to %s') % self.policy)
else:
LOG.warn(('Publishing policy is unknown (%s) force to default')
% self.policy)
self.policy = 'default'
try:
self._get_client()
self._get_server()
except Exception as e:
LOG.exception("Failed to connect to Kafka service: %s", e)
def _get_client(self):
if not self.kafka_client:
self.kafka_client = kafka.KafkaClient(
"%s:%s" % (self.host, self.port))
self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
def _get_server(self):
if not self.kafka_server:
self.kafka_server = kafka.KafkaClient(
"%s:%s" % (self.host, self.port))
self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
def _send(self, data):
#for d in data:
try:
self.kafka_producer.send_messages(
self.topic, json.dumps(data))
except Exception as e:
LOG.exception(("Failed to send sample data: %s"), e)
raise