Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)
Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/__init__.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/__init__.py
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/openstack_infra/__init__.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/openstack_infra/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/openstack_infra/__init__.py
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/openstack_infra/notifications.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/openstack_infra/notifications.py
new file mode 100644
index 0000000..807672d
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/openstack_infra/notifications.py
@@ -0,0 +1,67 @@
+#
+# Copyright 2012 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Handler for producing network counter messages from Neutron notification
+ events.
+
+"""
+
+import oslo_messaging
+from oslo_config import cfg
+
+from ceilometer.agent import plugin_base
+from oslo_log import log
+from ceilometer import sample
+
+OPTS = [
+ cfg.StrOpt('openstack_infra_service_control_exchange',
+ default='openstack_infra',
+ help="Exchange name for INFRA notifications."),
+]
+
+cfg.CONF.register_opts(OPTS)
+
+LOG = log.getLogger(__name__)
+
+
+class OPENSTACK_INFRANotificationBase(plugin_base.NotificationBase):
+
+ resource_name = None
+
+ def get_targets(self,conf):
+ """Return a sequence of oslo.messaging.Target
+ This sequence is defining the exchange and topics to be connected for
+ this plugin.
+ """
+ LOG.info("get_targets for OPENSTACK INFRA Notification Listener")
+ return [oslo_messaging.Target(topic=topic,
+ exchange=conf.openstack_infra_service_control_exchange)
+ for topic in self.get_notification_topics(conf)]
+
+class OPENSTACK_INFRANotification(OPENSTACK_INFRANotificationBase):
+
+ resource_name = None
+ event_types = ['infra$']
+
+ def process_notification(self, message):
+ LOG.info('Received OPENSTACK INFRA notification: resource_id =%(resource_id)s' % {'resource_id': message['payload']['resource_id']})
+ yield sample.Sample.from_notification(
+ name=message['payload']['counter_name'],
+ type=message['payload']['counter_type'],
+ unit=message['payload']['counter_unit'],
+ volume=message['payload']['counter_volume'],
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['project_id'],
+ resource_id=message['payload']['resource_id'],
+ message=message)
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/vcpe/__init__.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/vcpe/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/vcpe/__init__.py
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/vcpe/notifications.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/vcpe/notifications.py
new file mode 100644
index 0000000..06a2eb7
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/vcpe/notifications.py
@@ -0,0 +1,272 @@
+#
+# Copyright 2012 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Handler for producing network counter messages from Neutron notification
+ events.
+
+"""
+
+import oslo_messaging
+from oslo_config import cfg
+
+from ceilometer.agent import plugin_base
+from oslo_log import log
+from ceilometer import sample
+
+OPTS = [
+ cfg.StrOpt('vsgservice_control_exchange',
+ default='vcpeservice',
+ help="Exchange name for VCPE notifications."),
+]
+
+cfg.CONF.register_opts(OPTS)
+
+LOG = log.getLogger(__name__)
+
+
+class VCPENotificationBase(plugin_base.NotificationBase):
+
+ resource_name = None
+
+ def get_targets(self,conf):
+ """Return a sequence of oslo.messaging.Target
+
+ This sequence is defining the exchange and topics to be connected for
+ this plugin.
+ """
+ LOG.info("SRIKANTH: get_targets for VCPE Notification Listener")
+ return [oslo_messaging.Target(topic=topic,
+ exchange=conf.vsgservice_control_exchange)
+ for topic in self.get_notification_topics(conf)]
+
+class VCPENotification(VCPENotificationBase):
+
+ resource_name = None
+ event_types = ['vcpe$']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VCPE notification: vcpe_id=%(vcpe_id)s' % {'vcpe_id': message['payload']['vcpe_id']})
+ yield sample.Sample.from_notification(
+ name='vsg',
+ type=sample.TYPE_GAUGE,
+ unit='vsg',
+ volume=1,
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=message['payload']['vcpe_id'],
+ message=message)
+
+class VCPEComputeStatistics(VCPENotificationBase):
+
+ resource_name = None
+ event_types = ['vcpe.compute.stats']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VCPE event vcpe.compute.stats')
+ if message['payload']:
+ if 'cpu_util' in message['payload']:
+ yield sample.Sample.from_notification(
+ name='cpu_util',
+ type=sample.TYPE_GAUGE,
+ unit='%',
+ volume=float(message['payload']['cpu_util']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=message['payload']['vcpe_id'],
+ message=message)
+ if 'memory' in message['payload']:
+ yield sample.Sample.from_notification(
+ name='memory',
+ type=sample.TYPE_GAUGE,
+ unit='MB',
+ volume=float(message['payload']['memory']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=message['payload']['vcpe_id'],
+ message=message)
+ if 'memory_usage' in message['payload']:
+ yield sample.Sample.from_notification(
+ name='memory.usage',
+ type=sample.TYPE_GAUGE,
+ unit='MB',
+ volume=float(message['payload']['memory_usage']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=message['payload']['vcpe_id'],
+ message=message)
+ if 'network_stats' in message['payload']:
+ for intf in message['payload']['network_stats']:
+ resource_id = message['payload']['vcpe_id'] + '-' + intf['intf']
+ if 'rx_bytes' in intf:
+ yield sample.Sample.from_notification(
+ name='network.incoming.bytes',
+ type=sample.TYPE_CUMULATIVE,
+ unit='B',
+ volume=float(intf['rx_bytes']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=resource_id,
+ message=message)
+ if 'tx_bytes' in intf:
+ yield sample.Sample.from_notification(
+ name='network.outgoing.bytes',
+ type=sample.TYPE_CUMULATIVE,
+ unit='B',
+ volume=float(intf['tx_bytes']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=resource_id,
+ message=message)
+ if 'rx_packets' in intf:
+ yield sample.Sample.from_notification(
+ name='network.incoming.packets',
+ type=sample.TYPE_CUMULATIVE,
+ unit='packet',
+ volume=float(intf['rx_packets']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=resource_id,
+ message=message)
+ if 'tx_packets' in intf:
+ yield sample.Sample.from_notification(
+ name='network.outgoing.packets',
+ type=sample.TYPE_CUMULATIVE,
+ unit='packet',
+ volume=float(intf['tx_packets']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=resource_id,
+ message=message)
+
+class VCPEDNSCacheSize(VCPENotificationBase):
+
+ resource_name = None
+ event_types = ['vcpe.dns.cache.size']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VCPE cache.size notification')
+ yield sample.Sample.from_notification(
+ name='vsg.dns.cache.size',
+ type=sample.TYPE_GAUGE,
+ unit='entries',
+ volume=float(message['payload']['cache_size']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=message['payload']['vcpe_id'],
+ message=message)
+
+class VCPEDNSTotalInsertedEntries(VCPENotificationBase):
+
+ resource_name = None
+ event_types = ['vcpe.dns.total_instered_entries']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VCPE total_instered_entries notification')
+ yield sample.Sample.from_notification(
+ name='vsg.dns.total_instered_entries',
+ type=sample.TYPE_CUMULATIVE,
+ unit='entries',
+ volume=float(message['payload']['total_instered_entries']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=message['payload']['vcpe_id'],
+ message=message)
+
+class VCPEDNSReplacedUnexpiredEntries(VCPENotificationBase):
+
+ resource_name = None
+ event_types = ['vcpe.dns.replaced_unexpired_entries']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VCPE replaced_unexpired_entries notification')
+ yield sample.Sample.from_notification(
+ name='vsg.dns.replaced_unexpired_entries',
+ type=sample.TYPE_CUMULATIVE,
+ unit='entries',
+ volume=float(message['payload']['replaced_unexpired_entries']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=message['payload']['vcpe_id'],
+ message=message)
+
+class VCPEDNSQueriesForwarded(VCPENotificationBase):
+
+ resource_name = None
+ event_types = ['vcpe.dns.queries_forwarded']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VCPE queries_forwarded notification')
+ yield sample.Sample.from_notification(
+ name='vsg.dns.queries_forwarded',
+ type=sample.TYPE_CUMULATIVE,
+ unit='queries',
+ volume=float(message['payload']['queries_forwarded']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=message['payload']['vcpe_id'],
+ message=message)
+
+class VCPEDNSQueriesAnsweredLocally(VCPENotificationBase):
+
+ resource_name = None
+ event_types = ['vcpe.dns.queries_answered_locally']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VCPE queries_answered_locally notification')
+ yield sample.Sample.from_notification(
+ name='vsg.dns.queries_answered_locally',
+ type=sample.TYPE_CUMULATIVE,
+ unit='queries',
+ volume=float(message['payload']['queries_answered_locally']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=message['payload']['vcpe_id'],
+ message=message)
+
+class VCPEDNSServerQueriesSent(VCPENotificationBase):
+
+ resource_name = None
+ event_types = ['vcpe.dns.server.queries_sent']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VCPE server.queries_sent notification')
+ resource_id = message['payload']['vcpe_id'] + '-' + message['payload']['upstream_server']
+ yield sample.Sample.from_notification(
+ name='vsg.dns.server.queries_sent',
+ type=sample.TYPE_CUMULATIVE,
+ unit='queries',
+ volume=float(message['payload']['queries_sent']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=resource_id,
+ message=message)
+
+class VCPEDNSServerQueriesFailed(VCPENotificationBase):
+
+ resource_name = None
+ event_types = ['vcpe.dns.server.queries_failed']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VCPE server.queries_failed notification')
+ resource_id = message['payload']['vcpe_id'] + '-' + message['payload']['upstream_server']
+ yield sample.Sample.from_notification(
+ name='vsg.dns.server.queries_failed',
+ type=sample.TYPE_CUMULATIVE,
+ unit='queries',
+ volume=float(message['payload']['queries_failed']),
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['tenant_id'],
+ resource_id=resource_id,
+ message=message)
+
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/volt/__init__.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/volt/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/volt/__init__.py
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/volt/notifications.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/volt/notifications.py
new file mode 100644
index 0000000..a1f3173
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/ext_services/volt/notifications.py
@@ -0,0 +1,86 @@
+#
+# Copyright 2012 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Handler for producing network counter messages from Neutron notification
+ events.
+
+"""
+
+import oslo_messaging
+from oslo_config import cfg
+
+from ceilometer.agent import plugin_base
+from oslo_log import log
+from ceilometer import sample
+
+OPTS = [
+ cfg.StrOpt('voltservice_control_exchange',
+ default='voltlistener',
+ help="Exchange name for VOLT notifications."),
+]
+
+cfg.CONF.register_opts(OPTS)
+
+LOG = log.getLogger(__name__)
+
+
+class VOLTNotificationBase(plugin_base.NotificationBase):
+
+ resource_name = None
+
+ def get_targets(self,conf):
+ """Return a sequence of oslo.messaging.Target
+
+ This sequence is defining the exchange and topics to be connected for
+ this plugin.
+ """
+ LOG.info("SRIKANTH: get_targets for VOLT Notification Listener")
+ return [oslo_messaging.Target(topic=topic,
+ exchange=conf.voltservice_control_exchange)
+ for topic in self.get_notification_topics(conf)]
+
+class VOLTDeviceNotification(VOLTNotificationBase):
+ resource_name = 'volt.device'
+ event_types = ['volt.device','volt.device.disconnect']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VOLT notification')
+ yield sample.Sample.from_notification(
+ name=message['event_type'],
+ type=sample.TYPE_GAUGE,
+ unit='olt',
+ volume=1,
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['project_id'],
+ resource_id=message['payload']['id'],
+ message=message)
+
+class VOLTDeviceSubscriberNotification(VOLTNotificationBase):
+ resource_name = 'volt.device.subscriber'
+ event_types = ['volt.device.subscriber','volt.device.subscriber.unregister']
+
+ def process_notification(self, message):
+ LOG.info('SRIKANTH: Received VOLT notification')
+ resource_id = message['payload']['id'] + '-' + message['payload']['subscriber_id']
+ yield sample.Sample.from_notification(
+ name=message['event_type'],
+ type=sample.TYPE_GAUGE,
+ unit='subscriber',
+ volume=1,
+ user_id=message['payload']['user_id'],
+ project_id=message['payload']['project_id'],
+ resource_id=resource_id,
+ message=message)
+
+
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/__init__.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/__init__.py
new file mode 100644
index 0000000..9c53c36
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/__init__.py
@@ -0,0 +1,101 @@
+#
+# Copyright 2014 NEC Corporation. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+from oslo_utils import netutils
+import six
+from six.moves.urllib import parse as urlparse
+from stevedore import driver as _driver
+
+from ceilometer.agent import plugin_base
+from ceilometer import sample
+
+
+@six.add_metaclass(abc.ABCMeta)
+class _Base(plugin_base.PollsterBase):
+
+ NAMESPACE = 'network.statistics.drivers'
+ drivers = {}
+
+ @property
+ def default_discovery(self):
+ # this signifies that the pollster gets its resources from
+ # elsewhere, in this case they're manually listed in the
+ # pipeline configuration
+ return None
+
+ @abc.abstractproperty
+ def meter_name(self):
+ """Return a Meter Name."""
+
+ @abc.abstractproperty
+ def meter_type(self):
+ """Return a Meter Type."""
+
+ @abc.abstractproperty
+ def meter_unit(self):
+ """Return a Meter Unit."""
+
+ @staticmethod
+ def _parse_my_resource(resource):
+
+ parse_url = netutils.urlsplit(resource)
+
+ params = urlparse.parse_qs(parse_url.query)
+ parts = urlparse.ParseResult(parse_url.scheme,
+ parse_url.netloc,
+ parse_url.path,
+ None,
+ None,
+ None)
+ return parts, params
+
+ @staticmethod
+ def get_driver(scheme):
+ if scheme not in _Base.drivers:
+ _Base.drivers[scheme] = _driver.DriverManager(_Base.NAMESPACE,
+ scheme).driver()
+ return _Base.drivers[scheme]
+
+ def get_samples(self, manager, cache, resources):
+ resources = resources or []
+ for resource in resources:
+ parse_url, params = self._parse_my_resource(resource)
+ ext = self.get_driver(parse_url.scheme)
+ sample_data = ext.get_sample_data(self.meter_name,
+ parse_url,
+ params,
+ cache)
+
+ for data in sample_data or []:
+ if data is None:
+ continue
+ if not isinstance(data, list):
+ data = [data]
+ for (volume, resource_id,
+ resource_metadata, timestamp) in data:
+
+ yield sample.Sample(
+ name=self.meter_name,
+ type=self.meter_type,
+ unit=self.meter_unit,
+ volume=volume,
+ user_id=None,
+ project_id='default_admin_tenant',
+ resource_id=resource_id,
+ timestamp=timestamp,
+ resource_metadata=resource_metadata
+ )
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/onos/__init__.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/onos/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/onos/__init__.py
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/onos/client.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/onos/client.py
new file mode 100644
index 0000000..46b6285
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/onos/client.py
@@ -0,0 +1,250 @@
+#
+# Copyright 2013 NEC Corporation. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+from oslo_config import cfg
+import requests
+from requests import auth
+import six
+import json
+
+from ceilometer.i18n import _
+from ceilometer.openstack.common import log
+
+
+CONF = cfg.CONF
+CONF.import_opt('http_timeout', 'ceilometer.service')
+
+
+LOG = log.getLogger(__name__)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class _Base(object):
+ """Base class of ONOS REST APIs Clients."""
+
+ @abc.abstractproperty
+ def base_url(self):
+ """Returns base url for each REST API."""
+
+ def __init__(self, client):
+ self.client = client
+
+ def request(self, path, container_name):
+ return self.client.request(self.base_url + path, container_name)
+
+
+class ONOSRESTAPIFailed(Exception):
+ pass
+
+
+class ONOSRESTAPIClient(_Base):
+ """ONOS Statistics REST API Client
+
+ Base URL:
+ {endpoint}/onos/v1
+ """
+
+ base_url = '/onos/v1'
+
+ def get_devices(self, container_name):
+ """Get device informations
+
+ URL:
+ {Base URL}/devices
+ """
+ output = '{ "devices":[ \
+ { \
+ "id":"of:0000000000000001", \
+ "type":"SWITCH", \
+ "available":true, \
+ "role":"MASTER", \
+ "mfr":"Stanford University, Ericsson Research and CPqD Research", \
+ "hw":"OpenFlow 1.3 Reference Userspace Switch", \
+ "sw":"Apr 6 2015 16:10:53", \
+ "serial":"1", \
+ "chassisId":"1", \
+ "annotations":{"protocol":"OF_13","channelId":"192.168.10.50:39306"} \
+ }]}'
+
+ return self.request('/devices', container_name)
+ #LOG.info("SRIKANTH: Returning dummy ONOS devices output")
+ #outputJson = json.loads(output)
+ #return outputJson
+
+ def get_flow_statistics(self, container_name):
+ """Get flow statistics
+
+ URL:
+ {Base URL}/flows
+ """
+ output = '{"flows":[ \
+ { \
+ "deviceId":"of:0000000000000001", \
+ "id":"3377699721451393", \
+ "tableId":2, \
+ "appId":12, \
+ "groupId":0, \
+ "priority":100, \
+ "timeout":0, \
+ "isPermanent":true, \
+ "state":"PENDING_ADD", \
+ "life":0, \
+ "packets":0, \
+ "bytes":0, \
+ "lastSeen":1439355470576, \
+ "treatment":{"instructions":[],"deferred":[]}, \
+ "selector":{"criteria":[]} \
+ }]}'
+ return self.request('/flows', container_name)
+ #LOG.info("SRIKANTH: Returning dummy ONOS flow statistics output")
+ #outputJson = json.loads(output)
+ #return outputJson
+
+ def get_port_statistics(self, container_name):
+ """Get port statistics
+
+ URL:
+ {Base URL}/portstats
+ """
+ output = '{ "portstats": [ \
+ { \
+ "deviceId":"of:0000000000000001", \
+ "id":"3", \
+ "receivePackets": "182", \
+ "sentPackets": "173", \
+ "receiveBytes": "12740", \
+ "sentBytes": "12110", \
+ "receiveDrops": "740", \
+ "sentDrops": "110", \
+ "receiveErrors": "740", \
+ "sentErrors": "110", \
+ "receiveFrameError": "740", \
+ "receiveOverRunError": "740", \
+ "receiveCrcError": "740", \
+ "collisionCount": "110" \
+ }]}'
+ #TODO Add Portstats REST API to ONOS
+ return self.request('/statistics/ports', container_name)
+ #LOG.info("SRIKANTH: Returning dummy ONOS port statistics output")
+ #outputJson = json.loads(output)
+ #return outputJson
+
+ def get_table_statistics(self, container_name):
+ """Get table statistics
+
+ URL:
+ {Base URL}/table
+ """
+ output = '{ \
+ "tableStatistics": [ \
+ { \
+ "deviceId":"of:0000000000000001", \
+ "id":"4", \
+ "activeCount": "11", \
+ "lookupCount": "816", \
+ "matchedCount": "220", \
+ "maximumEntries": "1000" \
+ } \
+ ] \
+ }'
+ #TODO Add table statistics REST API to ONOS
+ return self.request('/statistics/flows/tables', container_name)
+ #LOG.info("SRIKANTH: Returning dummy ONOS table statistics output")
+ #outputJson = json.loads(output)
+ #return outputJson
+
+class Client(object):
+
+ def __init__(self, endpoint, params):
+ self.rest_client = ONOSRESTAPIClient(self)
+
+ self._endpoint = endpoint
+
+ self._req_params = self._get_req_params(params)
+
+ @staticmethod
+ def _get_req_params(params):
+ req_params = {
+ 'headers': {
+ 'Accept': 'application/json'
+ },
+ 'timeout': CONF.http_timeout,
+ }
+
+ auth_way = params.get('auth')
+ if auth_way in ['basic', 'digest']:
+ user = params.get('user')
+ password = params.get('password')
+
+ if auth_way == 'basic':
+ auth_class = auth.HTTPBasicAuth
+ else:
+ auth_class = auth.HTTPDigestAuth
+
+ req_params['auth'] = auth_class(user, password)
+ return req_params
+
+ def _log_req(self, url):
+
+ curl_command = ['REQ: curl -i -X GET ', '"%s" ' % (url)]
+
+ if 'auth' in self._req_params:
+ auth_class = self._req_params['auth']
+ if isinstance(auth_class, auth.HTTPBasicAuth):
+ curl_command.append('--basic ')
+ else:
+ curl_command.append('--digest ')
+
+ curl_command.append('--user "%s":"%s" ' % (auth_class.username,
+ auth_class.password))
+
+ for name, value in six.iteritems(self._req_params['headers']):
+ curl_command.append('-H "%s: %s" ' % (name, value))
+
+ LOG.debug(''.join(curl_command))
+
+ @staticmethod
+ def _log_res(resp):
+
+ dump = ['RES: \n', 'HTTP %.1f %s %s\n' % (resp.raw.version,
+ resp.status_code,
+ resp.reason)]
+ dump.extend('%s: %s\n' % (k, v)
+ for k, v in six.iteritems(resp.headers))
+ dump.append('\n')
+ if resp.content:
+ dump.extend([resp.content, '\n'])
+
+ LOG.debug(''.join(dump))
+
+ def _http_request(self, url):
+ if CONF.debug:
+ self._log_req(url)
+ resp = requests.get(url, **self._req_params)
+ if CONF.debug:
+ self._log_res(resp)
+ if resp.status_code / 100 != 2:
+ raise ONOSRESTAPIFailed(
+ _('ONOS API returned %(status)s %(reason)s') %
+ {'status': resp.status_code, 'reason': resp.reason})
+
+ return resp.json()
+
+ def request(self, path, container_name):
+
+ url = self._endpoint + path % {'container_name': container_name}
+ return self._http_request(url)
diff --git a/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/onos/driver.py b/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/onos/driver.py
new file mode 100644
index 0000000..810275f
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer-plugins/network/statistics/onos/driver.py
@@ -0,0 +1,359 @@
+#
+# Copyright 2013 NEC Corporation. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from oslo_utils import timeutils
+import six
+from six import moves
+from six.moves.urllib import parse as urlparse
+
+from ceilometer.i18n import _
+from ceilometer.network.statistics import driver
+from ceilometer.network.statistics.onos import client
+from ceilometer.openstack.common import log
+from ceilometer import utils
+
+
+LOG = log.getLogger(__name__)
+
+
+def _get_properties(properties, prefix='properties'):
+ resource_meta = {}
+ if properties is not None:
+ for k, v in six.iteritems(properties):
+ value = v['value']
+ key = prefix + '_' + k
+ if 'name' in v:
+ key += '_' + v['name']
+ resource_meta[key] = value
+ return resource_meta
+
+
+def _get_int_sample(key, statistic, resource_id, resource_meta):
+ if key not in statistic:
+ return None
+ return int(statistic[key]), resource_id, resource_meta
+
+
+class ONOSDriver(driver.Driver):
+ """Driver of network info collector from ONOS.
+
+ This driver uses resources in "pipeline.yaml".
+ Resource requires below conditions:
+
+ * resource is url
+ * scheme is "onos"
+
+ This driver can be configured via query parameters.
+ Supported parameters:
+
+ * scheme:
+ The scheme of request url to ONOS REST API endpoint.
+ (default http)
+ * auth:
+ Auth strategy of http.
+ This parameter can be set basic and digest.(default None)
+ * user:
+ This is username that is used by auth.(default None)
+ * password:
+ This is password that is used by auth.(default None)
+ * container_name:
+ Name of container of ONOS.(default "default")
+ This parameter allows multi vaues.
+
+ e.g.::
+
+ onos://127.0.0.1:8181/onos/v1?auth=basic&user=admin&password=admin&scheme=http
+
+ In this case, the driver send request to below URLs:
+
+ http://127.0.0.1:8181/onos/v1/flows
+ """
+ @staticmethod
+ def _prepare_cache(endpoint, params, cache):
+
+ if 'network.statistics.onos' in cache:
+ return cache['network.statistics.onos']
+
+ data = {}
+
+ container_names = params.get('container_name', ['default'])
+
+ onos_params = {}
+ if 'auth' in params:
+ onos_params['auth'] = params['auth'][0]
+ if 'user' in params:
+ onos_params['user'] = params['user'][0]
+ if 'password' in params:
+ onos_params['password'] = params['password'][0]
+ cs = client.Client(endpoint, onos_params)
+
+ for container_name in container_names:
+ try:
+ container_data = {}
+
+ # get flow statistics
+ container_data['flow'] = cs.rest_client.get_flow_statistics(
+ container_name)
+
+ # get port statistics
+ container_data['port'] = cs.rest_client.get_port_statistics(
+ container_name)
+
+ # get table statistics
+ container_data['table'] = cs.rest_client.get_table_statistics(
+ container_name)
+
+ # get topology
+ #container_data['topology'] = cs.topology.get_topology(
+ # container_name)
+
+ # get switch informations
+ container_data['switch'] = cs.rest_client.get_devices(
+ container_name)
+
+ container_data['timestamp'] = timeutils.isotime()
+
+ data[container_name] = container_data
+ except Exception:
+ LOG.exception(_('Request failed to connect to ONOS'
+ ' with NorthBound REST API'))
+
+ cache['network.statistics.onos'] = data
+
+ return data
+
+ def get_sample_data(self, meter_name, parse_url, params, cache):
+
+ extractor = self._get_extractor(meter_name)
+ if extractor is None:
+ # The way to getting meter is not implemented in this driver or
+ # ONOS REST API has not api to getting meter.
+ return None
+
+ iter = self._get_iter(meter_name)
+ if iter is None:
+ # The way to getting meter is not implemented in this driver or
+ # ONOS REST API has not api to getting meter.
+ return None
+
+ parts = urlparse.ParseResult(params.get('scheme', ['http'])[0],
+ parse_url.netloc,
+ parse_url.path,
+ None,
+ None,
+ None)
+ endpoint = urlparse.urlunparse(parts)
+
+ data = self._prepare_cache(endpoint, params, cache)
+
+ samples = []
+ for name, value in six.iteritems(data):
+ timestamp = value['timestamp']
+ for sample in iter(extractor, value):
+ if sample is not None:
+ # set controller name and container name
+ # to resource_metadata
+ sample[2]['controller'] = 'ONOS'
+ sample[2]['container'] = name
+
+ samples.append(sample + (timestamp, ))
+
+ return samples
+
+ def _get_iter(self, meter_name):
+ if meter_name == 'switch':
+ return self._iter_switch
+ elif meter_name.startswith('switch.flow'):
+ return self._iter_flow
+ elif meter_name.startswith('switch.table'):
+ return self._iter_table
+ elif meter_name.startswith('switch.port'):
+ return self._iter_port
+
+ def _get_extractor(self, meter_name):
+ method_name = '_' + meter_name.replace('.', '_')
+ return getattr(self, method_name, None)
+
+ @staticmethod
+ def _iter_switch(extractor, data):
+ for switch in data['switch']['devices']:
+ yield extractor(switch, switch['id'], {})
+
+ @staticmethod
+ def _switch(statistic, resource_id, resource_meta):
+
+ for key in ['mfr','hw','sw','available']:
+ resource_meta[key] = statistic[key]
+ for key in ['protocol','channelId']:
+ resource_meta[key] = statistic['annotations'][key]
+
+ return 1, resource_id, resource_meta
+
+ @staticmethod
+ def _iter_port(extractor, data):
+ for statistic in data['port']['statistics']:
+ for port_statistic in statistic['ports']:
+ resource_meta = {'port': port_statistic['port']}
+ yield extractor(port_statistic, statistic['device'],
+ resource_meta, data)
+
+ @staticmethod
+ def _switch_port(statistic, resource_id, resource_meta, data):
+ return 1, resource_id, resource_meta
+
+ @staticmethod
+ def _switch_port_receive_packets(statistic, resource_id,
+ resource_meta, data):
+ return _get_int_sample('packetsReceived', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_port_transmit_packets(statistic, resource_id,
+ resource_meta, data):
+ return _get_int_sample('packetsSent', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_port_receive_bytes(statistic, resource_id,
+ resource_meta, data):
+ return _get_int_sample('bytesReceived', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_port_transmit_bytes(statistic, resource_id,
+ resource_meta, data):
+ return _get_int_sample('bytesSent', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_port_receive_drops(statistic, resource_id,
+ resource_meta, data):
+ return _get_int_sample('packetsRxDropped', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_port_transmit_drops(statistic, resource_id,
+ resource_meta, data):
+ return _get_int_sample('packetsTxDropped', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_port_receive_errors(statistic, resource_id,
+ resource_meta, data):
+ return _get_int_sample('packetsRxErrors', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_port_transmit_errors(statistic, resource_id,
+ resource_meta, data):
+ return _get_int_sample('packetsTxErrors', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_port_receive_frame_error(statistic, resource_id,
+ resource_meta, data):
+ #return _get_int_sample('receiveFrameError', statistic, resource_id,
+ # resource_meta)
+ return 0, resource_id, resource_meta
+
+ @staticmethod
+ def _switch_port_receive_overrun_error(statistic, resource_id,
+ resource_meta, data):
+ #return _get_int_sample('receiveOverRunError', statistic, resource_id,
+ # resource_meta)
+ return 0, resource_id, resource_meta
+
+ @staticmethod
+ def _switch_port_receive_crc_error(statistic, resource_id,
+ resource_meta, data):
+ #return _get_int_sample('receiveCrcError', statistic, resource_id,
+ # resource_meta)
+ return 0, resource_id, resource_meta
+
+ @staticmethod
+ def _switch_port_collision_count(statistic, resource_id,
+ resource_meta, data):
+ #return _get_int_sample('collisionCount', statistic, resource_id,
+ # resource_meta)
+ return 0, resource_id, resource_meta
+
+ @staticmethod
+ def _iter_table(extractor, data):
+ for statistic in data['table']['statistics']:
+ for table_statistic in statistic['table']:
+ resource_meta = {'table_id': table_statistic['tableId']}
+ yield extractor(table_statistic,
+ statistic['device'],
+ resource_meta)
+
+ @staticmethod
+ def _switch_table(statistic, resource_id, resource_meta):
+ return 1, resource_id, resource_meta
+
+ @staticmethod
+ def _switch_table_active_entries(statistic, resource_id,
+ resource_meta):
+ return _get_int_sample('activeEntries', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_table_lookup_packets(statistic, resource_id,
+ resource_meta):
+ return _get_int_sample('packetsLookedUp', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_table_matched_packets(statistic, resource_id,
+ resource_meta):
+ return _get_int_sample('packetsMathced', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _iter_flow(extractor, data):
+ for flow_statistic in data['flow']['flows']:
+ resource_meta = {'flow_id': flow_statistic['id'],
+ 'table_id': flow_statistic['tableId'],
+ 'priority': flow_statistic['priority'],
+ 'state': flow_statistic['state']}
+ yield extractor(flow_statistic,
+ flow_statistic['deviceId'],
+ resource_meta)
+
+ @staticmethod
+ def _switch_flow(statistic, resource_id, resource_meta):
+ return 1, resource_id, resource_meta
+
+ @staticmethod
+ def _switch_flow_duration_seconds(statistic, resource_id,
+ resource_meta):
+ if 'life' not in statistic:
+ return None
+ return int(statistic['life']/1000), resource_id, resource_meta
+
+ @staticmethod
+ def _switch_flow_duration_nanoseconds(statistic, resource_id,
+ resource_meta):
+ if 'life' not in statistic:
+ return None
+ return int(statistic['life']*1000), resource_id, resource_meta
+
+ @staticmethod
+ def _switch_flow_packets(statistic, resource_id, resource_meta):
+ return _get_int_sample('packets', statistic, resource_id,
+ resource_meta)
+
+ @staticmethod
+ def _switch_flow_bytes(statistic, resource_id, resource_meta):
+ return _get_int_sample('bytes', statistic, resource_id,
+ resource_meta)