FIXes for monitoring service VM components like kafka, pub-sub...etc
Change-Id: Iddde297d828b64f96628e0b3b23d509c06969648
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/pub_sub.conf b/xos/synchronizer/ceilometer/ceilometer_pub_sub/pub_sub.conf
index 6998903..eae2c1c 100644
--- a/xos/synchronizer/ceilometer/ceilometer_pub_sub/pub_sub.conf
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/pub_sub.conf
@@ -5,7 +5,7 @@
#backupcount = 5
[WEB_SERVER]
-webserver_host = localhost
+webserver_host = 0.0.0.0
webserver_port = 4455
[CLIENT]
@@ -38,7 +38,7 @@
[handler_logfile]
class=handlers.RotatingFileHandler
-level=NOTSET
+level=INFO
args=('pub_sub.log','a',1000000,5)
formatter=logfileformatter
diff --git a/xos/synchronizer/ceilometer/ceilometer_service_custom_image/kafka-installer/install_zookeeper_kafka.sh b/xos/synchronizer/ceilometer/ceilometer_service_custom_image/kafka-installer/install_zookeeper_kafka.sh
index f21a353..be950cd 100755
--- a/xos/synchronizer/ceilometer/ceilometer_service_custom_image/kafka-installer/install_zookeeper_kafka.sh
+++ b/xos/synchronizer/ceilometer/ceilometer_service_custom_image/kafka-installer/install_zookeeper_kafka.sh
@@ -3,6 +3,7 @@
BASE_KAFKA_VERSION=0.9.0.0
KAFKA_VERSION=2.11-0.9.0.0
export CONF_BASE=$PWD
+export SERVICE_HOST=$(hostname)
echo $CONF_BASE
sudo sed -i "s/.*127.0.0.1.*/127.0.0.1 localhost $(hostname)/" /etc/hosts
@@ -123,7 +124,8 @@
if [[ ${SERVICE_HOST} ]]; then
- sudo sed -i "s/host\.name=127\.0\.0\.1/host.name=${SERVICE_HOST}/g" /etc/kafka/server.properties
+ sudo sed -i "s/host\.name=127\.0\.0\.1/host.name=0.0.0.0/g" /etc/kafka/server.properties
+ sudo sed -i "s/^#advertised\.host\.name=.*$/advertised.host.name=${SERVICE_HOST}/g" /etc/kafka/server.properties
sudo sed -i "s/zookeeper\.connect=127\.0\.0\.1:2181/zookeeper.connect=${SERVICE_HOST}:2181/g" /etc/kafka/server.properties
fi
diff --git a/xos/synchronizer/manifest b/xos/synchronizer/manifest
index d261657..a77816a 100644
--- a/xos/synchronizer/manifest
+++ b/xos/synchronizer/manifest
@@ -12,6 +12,8 @@
templates/ceilometer_proxy_server.py
templates/start_ceilometer_proxy
templates/update-keystone-endpoints.py.j2
+templates/ceilometer_pipeline.yaml.j2
+templates/kafka_broker.py
ceilometer/udp_proxy/udpagent.py
templates/udpagent.conf.j2
ceilometer/ceilometer-plugins/network/ext_services/openstack_infra/notifications.py
diff --git a/xos/synchronizer/steps/sync_ceilometerservice.yaml b/xos/synchronizer/steps/sync_ceilometerservice.yaml
index d0c101b..8ef9c96 100644
--- a/xos/synchronizer/steps/sync_ceilometerservice.yaml
+++ b/xos/synchronizer/steps/sync_ceilometerservice.yaml
@@ -4,27 +4,49 @@
gather_facts: False
connection: ssh
user: ubuntu
- sudo: yes
vars:
host_name: {{ instance_hostname }}
host_private_ip: {{ private_ip }}
host_nat_ip: {{ nat_ip }}
+ ceilometer_enable_pub_sub: {{ ceilometer_enable_pub_sub }}
tasks:
- name: Fix /etc/hosts
+ become: yes
lineinfile:
dest=/etc/hosts
regexp="127.0.0.1 localhost"
line="127.0.0.1 localhost {{ instance_hostname }}"
- name : Adding Rabbitmq user
+ become: yes
shell : rabbitmqctl add_vhost /;rabbitmqctl add_user openstack "password";rabbitmqctl set_permissions openstack ".*" ".*" ".*"
- name: Coping keystone endpoint script
+ become: yes
template: src=/opt/xos/synchronizers/monitoring/templates/update-keystone-endpoints.py.j2 dest=/usr/local/bin/update-keystone-endpoints.py owner=root group=root mode=0777
- name: Changing keystone public endpoint in mysql
+ become: yes
shell: update-keystone-endpoints.py --username root --password password --host localhost --endpoint {{ private_ip }} --endpoint-type public
- name: Changing keystone admin endpoint in mysql
+ become: yes
shell: update-keystone-endpoints.py --username root --password password --host localhost --endpoint {{ private_ip }} --endpoint-type admin
+
+ - name: Enabling/Disabling kafka publishing
+ become: yes
+ template: src=/opt/xos/synchronizers/monitoring/templates/ceilometer_pipeline.yaml.j2 dest=/etc/ceilometer/pipeline.yaml owner=root group=root mode=0777
+ notify:
+ - restart ceilometer-agent-notification service
+
+ - name: Apply ceilometer kafka publisher patch until monitoring service is migrated to Newton release or later
+ become: yes
+ template: src=/opt/xos/synchronizers/monitoring/templates/kafka_broker.py dest=/usr/lib/python2.7/dist-packages/ceilometer/publisher/kafka_broker.py owner=root group=root mode=0777
+ notify:
+ - restart ceilometer-agent-notification service
+
+ handlers:
+ - name: restart ceilometer-agent-notification service
+ become: yes
+ service: name=ceilometer-agent-notification state=restarted
diff --git a/xos/synchronizer/steps/sync_monitoringchannel.py b/xos/synchronizer/steps/sync_monitoringchannel.py
index d892d0d..07db238 100644
--- a/xos/synchronizer/steps/sync_monitoringchannel.py
+++ b/xos/synchronizer/steps/sync_monitoringchannel.py
@@ -157,7 +157,7 @@
jump_hostname = fields["hostname"]
#Get the tunnel detsination
- remote_host = o.private_ip
+ remote_host = o.nat_ip
remote_port = o.ceilometer_port
#FIXME: For now, trying to setup the tunnel on the local port same as the remote port
local_port = remote_port
diff --git a/xos/synchronizer/templates/ceilometer_pipeline.yaml.j2 b/xos/synchronizer/templates/ceilometer_pipeline.yaml.j2
new file mode 100644
index 0000000..6760773
--- /dev/null
+++ b/xos/synchronizer/templates/ceilometer_pipeline.yaml.j2
@@ -0,0 +1,107 @@
+---
+sources:
+ - name: meter_source
+ interval: 600
+ meters:
+ - "*"
+ sinks:
+ - meter_sink
+ - name: cpu_source
+ interval: 600
+ meters:
+ - "cpu"
+ sinks:
+ - cpu_sink
+ - cpu_delta_sink
+ - name: disk_source
+ interval: 600
+ meters:
+ - "disk.read.bytes"
+ - "disk.read.requests"
+ - "disk.write.bytes"
+ - "disk.write.requests"
+ - "disk.device.read.bytes"
+ - "disk.device.read.requests"
+ - "disk.device.write.bytes"
+ - "disk.device.write.requests"
+ sinks:
+ - disk_sink
+ - name: network_source
+ interval: 600
+ meters:
+ - "network.incoming.bytes"
+ - "network.incoming.packets"
+ - "network.outgoing.bytes"
+ - "network.outgoing.packets"
+ sinks:
+ - network_sink
+sinks:
+ - name: meter_sink
+ transformers:
+ publishers:
+ - direct://
+{% if ceilometer_enable_pub_sub %}
+ - kafka://localhost:9092?topic=ceilometer
+{% endif %}
+ - name: cpu_sink
+ transformers:
+ - name: "rate_of_change"
+ parameters:
+ target:
+ name: "cpu_util"
+ unit: "%"
+ type: "gauge"
+ scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
+ publishers:
+ - direct://
+{% if ceilometer_enable_pub_sub %}
+ - kafka://localhost:9092?topic=ceilometer
+{% endif %}
+ - name: cpu_delta_sink
+ transformers:
+ - name: "delta"
+ parameters:
+ target:
+ name: "cpu.delta"
+ growth_only: True
+ publishers:
+ - direct://
+{% if ceilometer_enable_pub_sub %}
+ - kafka://localhost:9092?topic=ceilometer
+{% endif %}
+ - name: disk_sink
+ transformers:
+ - name: "rate_of_change"
+ parameters:
+ source:
+ map_from:
+ name: "(disk\\.device|disk)\\.(read|write)\\.(bytes|requests)"
+ unit: "(B|request)"
+ target:
+ map_to:
+ name: "\\1.\\2.\\3.rate"
+ unit: "\\1/s"
+ type: "gauge"
+ publishers:
+ - direct://
+{% if ceilometer_enable_pub_sub %}
+ - kafka://localhost:9092?topic=ceilometer
+{% endif %}
+ - name: network_sink
+ transformers:
+ - name: "rate_of_change"
+ parameters:
+ source:
+ map_from:
+ name: "network\\.(incoming|outgoing)\\.(bytes|packets)"
+ unit: "(B|packet)"
+ target:
+ map_to:
+ name: "network.\\1.\\2.rate"
+ unit: "\\1/s"
+ type: "gauge"
+ publishers:
+ - direct://
+{% if ceilometer_enable_pub_sub %}
+ - kafka://localhost:9092?topic=ceilometer
+{% endif %}
diff --git a/xos/synchronizer/templates/kafka_broker.py b/xos/synchronizer/templates/kafka_broker.py
new file mode 100644
index 0000000..1687025
--- /dev/null
+++ b/xos/synchronizer/templates/kafka_broker.py
@@ -0,0 +1,101 @@
+#
+# Copyright 2015 Cisco Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import kafka
+from oslo_log import log
+from oslo_serialization import jsonutils
+from oslo_utils import netutils
+from six.moves.urllib import parse as urlparse
+
+from ceilometer.i18n import _LE
+from ceilometer.publisher import messaging
+
+LOG = log.getLogger(__name__)
+
+
+class KafkaBrokerPublisher(messaging.MessagingPublisher):
+ """Publish metering data to kafka broker.
+
+ The ip address and port number of kafka broker should be configured in
+ ceilometer pipeline configuration file. If an ip address is not specified,
+ this kafka publisher will not publish any meters.
+
+ To enable this publisher, add the following section to the
+ /etc/ceilometer/pipeline.yaml file or simply add it to an existing
+ pipeline::
+
+ meter:
+ - name: meter_kafka
+ interval: 600
+ counters:
+ - "*"
+ transformers:
+ sinks:
+ - kafka_sink
+ sinks:
+ - name: kafka_sink
+ transformers:
+ publishers:
+ - kafka://[kafka_broker_ip]:[kafka_broker_port]?topic=[topic]
+
+ Kafka topic name and broker's port are required for this publisher to work
+ properly. If topic parameter is missing, this kafka publisher publish
+ metering data under a topic name, 'ceilometer'. If the port number is not
+ specified, this Kafka Publisher will use 9092 as the broker's port.
+ This publisher has transmit options such as queue, drop, and retry. These
+ options are specified using policy field of URL parameter. When queue
+ option could be selected, local queue length can be determined using
+ max_queue_length field as well. When the transfer fails with retry
+ option, try to resend the data as many times as specified in max_retry
+ field. If max_retry is not specified, default the number of retry is 100.
+ """
+
+ def __init__(self, parsed_url):
+ super(KafkaBrokerPublisher, self).__init__(parsed_url)
+ options = urlparse.parse_qs(parsed_url.query)
+
+ self._producer = None
+ self._host, self._port = netutils.parse_host_port(
+ parsed_url.netloc, default_port=9092)
+ self._topic = options.get('topic', ['ceilometer'])[-1]
+ self.max_retry = int(options.get('max_retry', [100])[-1])
+
+ def _ensure_connection(self):
+ if self._producer:
+ return
+
+ try:
+ self._producer = kafka.KafkaProducer(
+ bootstrap_servers=["%s:%s" % (self._host, self._port)])
+ except kafka.errors.KafkaError as e:
+ LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
+ raise messaging.DeliveryFailure('Kafka Client is not available, '
+ 'please restart Kafka client')
+ except Exception as e:
+ LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
+ raise messaging.DeliveryFailure('Kafka Client is not available, '
+ 'please restart Kafka client')
+
+ def _send(self, context, event_type, data):
+ self._ensure_connection()
+ # TODO(sileht): don't split the payload into multiple network
+ # message ... but how to do that without breaking consuming
+ # application...
+ try:
+ for d in data:
+ self._producer.send(self._topic, jsonutils.dumps(d))
+ LOG.info("Kafka publish")
+ except Exception as e:
+ messaging.raise_delivery_failure(e)