blob: d41c1d05baffffb4a8786b765f9cb3222f7ed2a7 [file] [log] [blame]
#
# Copyright 2018 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from task import Task
from datetime import datetime
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
from voltha.extensions.omci.omci_defs import ReasonCodes
from voltha.extensions.omci.omci_frame import OmciFrame, OmciGet
class IntervalDataTaskFailure(Exception):
pass
class IntervalDataTask(Task):
"""
OpenOMCI Performance Interval Get Request
"""
task_priority = Task.DEFAULT_PRIORITY
name = "Interval Data Task"
max_payload = 29
def __init__(self, omci_agent, device_id, class_id, entity_id,
max_get_response_payload=max_payload,
parent_class_id=None,
parent_entity_id=None,
upstream=None):
"""
Class initialization
:param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
:param device_id: (str) ONU Device ID
:param class_id: (int) ME Class ID
:param entity_id: (int) ME entity ID
:param max_get_response_payload: (int) Maximum number of octets in a
single GET response frame
"""
super(IntervalDataTask, self).__init__(IntervalDataTask.name,
omci_agent,
device_id,
priority=IntervalDataTask.task_priority,
exclusive=False)
self._local_deferred = None
self._class_id = class_id
self._entity_id = entity_id
self._parent_class_id = parent_class_id
self._parent_entity_id = parent_entity_id
self._upstream = upstream
me_map = self.omci_agent.get_device(self.device_id).me_map
if self._class_id not in me_map:
msg = "The requested ME Class () does not exist in the ONU's ME Map".format(self._class_id)
self.log.warn('unknown-pm-me', msg=msg)
raise IntervalDataTaskFailure(msg)
self._entity = me_map[self._class_id]
self._counter_attributes = self.get_counter_attributes_names_and_size()
self._max_payload = max_get_response_payload
def cancel_deferred(self):
super(IntervalDataTask, self).cancel_deferred()
d, self._local_deferred = self._local_deferred, None
try:
if d is not None and not d.called:
d.cancel()
except:
pass
def start(self):
"""
Start the tasks
"""
super(IntervalDataTask, self).start()
self._local_deferred = reactor.callLater(0, self.perform_get_interval)
def stop(self):
"""
Shutdown the tasks
"""
self.log.debug('stopping')
self.cancel_deferred()
super(IntervalDataTask, self).stop()
def get_counter_attributes_names_and_size(self):
"""
Get all of the counter attributes names and the amount of storage they take
:return: (dict) Attribute name -> length
"""
return {name: self._entity.attributes[attr_index].field.sz
for name, attr_index in self._entity.attribute_name_to_index_map.items()
if self._entity.attributes[attr_index].is_counter}
@inlineCallbacks
def perform_get_interval(self):
"""
Sync the time
"""
self.log.info('perform-get-interval', class_id=self._class_id,
entity_id=self._entity_id)
device = self.omci_agent.get_device(self.device_id)
attr_names = self._counter_attributes.keys()
final_results = {
'class_id': self._class_id,
'entity_id': self._entity_id,
'me_name': self._entity.__name__, # Mostly for debugging...
'interval_utc_time': None,
'parent_class_id': self._parent_class_id,
'parent_entity_id': self._parent_entity_id,
'upstream': self._upstream
# Counters added here as they are retrieved
}
last_end_time = None
while len(attr_names) > 0:
# Get as many attributes that will fit. Always include the 1 octet
# Interval End Time Attribute and 2 octets for the Entity ID
remaining_payload = self._max_payload - 3
attributes = list()
for name in attr_names:
if self._counter_attributes[name] > remaining_payload:
break
attributes.append(name)
remaining_payload -= self._counter_attributes[name]
attr_names = attr_names[len(attributes):]
attributes.append('interval_end_time')
frame = OmciFrame(
transaction_id=None,
message_type=OmciGet.message_id,
omci_message=OmciGet(
entity_class=self._class_id,
entity_id=self._entity_id,
attributes_mask=self._entity.mask_for(*attributes)
)
)
self.log.debug('interval-get-request', class_id=self._class_id,
entity_id=self._entity_id)
try:
self.strobe_watchdog()
results = yield device.omci_cc.send(frame)
omci_msg = results.fields['omci_message'].fields
status = omci_msg['success_code']
end_time = omci_msg['data'].get('interval_end_time')
self.log.debug('interval-get-results', class_id=self._class_id,
entity_id=self._entity_id, status=status,
end_time=end_time)
if status != ReasonCodes.Success:
raise IntervalDataTaskFailure('Unexpected Response Status: {}, Class ID: {}'.
format(status, self._class_id))
if last_end_time is None:
last_end_time = end_time
elif end_time != last_end_time:
msg = 'Interval End Time Changed during retrieval from {} to {}'\
.format(last_end_time, end_time)
self.log.info('interval-roll-over', msg=msg, class_id=self._class_id)
raise IntervalDataTaskFailure(msg)
final_results['interval_utc_time'] = datetime.utcnow()
for attribute in attributes:
final_results[attribute] = omci_msg['data'].get(attribute)
except TimeoutError as e:
self.log.warn('interval-get-timeout', e=e, class_id=self._class_id,
entity_id=self._entity_id, attributes=attributes)
self.deferred.errback(failure.Failure(e))
except Exception as e:
self.log.exception('interval-get-failure', e=e, class_id=self._class_id)
self.deferred.errback(failure.Failure(e))
# Successful if here
self.deferred.callback(final_results)