blob: cfe250c8c11efe3519035befb28ff7c8ab24fab3 [file] [log] [blame]
#
# Copyright 2018 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import arrow
from task import Task
from twisted.internet.task import LoopingCall
from twisted.internet.defer import failure, inlineCallbacks, TimeoutError
from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
from voltha.extensions.omci.omci_me import MEFrame
from common.event_bus import EventBusClient
from voltha.protos.events_pb2 import KpiEvent2, KpiEventType, KpiEvent
from voltha.protos.events_pb2 import MetricInformation, MetricMetaData
RC = ReasonCodes
OP = EntityOperations
class TestFailure(Exception):
pass
class OmciTestRequest(Task):
"""
OpenOMCI Test an OMCI ME Instance Attributes
Upon completion, the Task deferred callback is invoked with a reference of
this Task object.
"""
task_priority = 128
name = "ONU OMCI Test Task"
MAX_TABLE_SIZE = 16 * 1024 # Keep get-next logic reasonable
OPTICAL_GROUP_NAME = 'PON_Optical'
DEFAULT_COLLECTION_FREQUENCY = 60 * 10 # 1 minute
DEFAULT_FREQUENCY_KEY = 'default-collection-frequency'
def __init__(self, omci_agent, device_id, entity_class, serial_number,
logical_device_id,
exclusive=True, allow_failure=False, **kwargs):
"""
Class initialization
:param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
:param device_id: (str) ONU Device ID
:param entity_class: (EntityClass) ME Class to retrieve
:param entity_id: (int) ME Class instance ID to retrieve
:param attributes: (list or set) Name of attributes to retrieve
:param exclusive: (bool) True if this GET request Task exclusively own the
OMCI-CC while running. Default: True
:param allow_failure: (bool) If true, attempt to get all valid attributes
if the original request receives an error
code of 9 (Attributes failed or unknown).
"""
super(OmciTestRequest, self).__init__(OmciTestRequest.name,
omci_agent,
device_id,
priority=OmciTestRequest.task_priority,
exclusive=exclusive)
self._device = omci_agent.get_device(device_id)
self._entity_class = entity_class
self._allow_failure = allow_failure
self._failed_or_unknown_attributes = set()
self._results = None
self._local_deferred = None
self.device_id = device_id
self.event_bus = EventBusClient()
self.lc = None
self.default_freq = self.default_freq = \
kwargs.get(OmciTestRequest.DEFAULT_FREQUENCY_KEY,
OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY)
self.serial_number = serial_number
self.logical_device_id = logical_device_id
def cancel_deferred(self):
"""
:return: None
"""
super(OmciTestRequest, self).cancel_deferred()
d, self._local_deferred = self._local_deferred, None
try:
if d is not None and not d.called:
d.cancel()
except:
pass
@property
def me_class(self):
"""The OMCI Managed Entity Class associated with this request"""
return self._entity_class
@property
def entity_id(self):
"""The ME Entity ID associated with this request"""
return self._entity_id
@property
def success_code(self):
"""
Return the OMCI success/reason code for the Get Response.
"""
if self._results is None:
return None
return self._results.fields['omci_message'].fields['success']
def start_collector(self, callback=None):
"""
Start the collection loop for an adapter if the frequency > 0
:param callback: (callable) Function to call to collect PM data
"""
self.log.info("starting-pm-collection", device_name=self.name)
if callback is None:
callback = self.perform_test_omci
if self.lc is None:
self.lc = LoopingCall(callback)
if self.default_freq > 0:
self.lc.start(interval=self.default_freq / 10)
def submit_kpis(self, kpi_event):
"""
:param kpi_event: List of dict.actual event information.
:return: None
"""
try:
assert isinstance(kpi_event, (KpiEvent, KpiEvent2))
self.event_bus.publish('kpis', kpi_event)
except Exception as e:
self.log.exception('failed-kpi-submission',
type=type(kpi_event))
def publish_metrics(self, data, event_name, onu_device_id):
"""
:param data: actual test result dict
:param event_name: Test_result
:param onu_device_id: Onu device id
:return: None
"""
metric_data = MetricInformation(
metadata=MetricMetaData(title=OmciTestRequest.OPTICAL_GROUP_NAME,
ts=arrow.utcnow().float_timestamp,
logical_device_id=self.logical_device_id,
serial_no=self.serial_number,
device_id=onu_device_id,
context={
'events': event_name
}),
metrics=data)
self.log.info('Publish-Test-Result')
kpi_event = KpiEvent2(
type=KpiEventType.slice,
ts=arrow.utcnow().float_timestamp,
slice_data=[metric_data])
self.submit_kpis(kpi_event)
def process_messages(self, topic, msg):
"""
:param topic: topic name of onu.
:param msg: actual test result dict
:return: None
"""
result_frame = {}
event_name = topic.split(':')[-1]
onu_device_id = topic.split(':')[-2]
frame = msg['rx-response']
for key, value in (frame.fields['omci_message'].fields).iteritems():
result_frame[key] = long(value)
self.publish_metrics(result_frame, event_name, onu_device_id)
def read_from_event_bus(self):
"""
Get the test action result from event bus.
:return: None
"""
topic = 'omci-rx:{}:{}'.format(self.device_id, 'Test_Result')
self.msg = self.event_bus.subscribe(topic, self.process_messages)
@inlineCallbacks
def perform_test_omci(self):
"""
Perform the initial test request
"""
ani_g_entities = self._device.configuration.ani_g_entities
ani_g_entities_ids = ani_g_entities.keys() if ani_g_entities \
is not None else None
self._entity_id = ani_g_entities_ids[0]
self.log.info('perform-test', entity_class=self._entity_class,
entity_id=self._entity_id)
try:
frame = MEFrame(self._entity_class, self._entity_id, []).test()
self.strobe_watchdog()
result = yield self._device.omci_cc.send(frame)
if not result.fields['omci_message'].fields['success_code']:
self.read_from_event_bus()
else:
raise TestFailure('Test Failure: {}'.format(
result.fields['omci_message'].fields['success_code']))
except TimeoutError as e:
self.deferred.errback(failure.Failure(e))
except Exception as e:
self.log.exception('perform-test', e=e, class_id=self._entity_class,
entity_id=self._entity_id)
self.deferred.errback(failure.Failure(e))