blob: bac4c33f773ab5e176670f583a999fa3851e994d [file] [log] [blame]
#
# Copyright 2017 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
OMCI Managed Entity Message support base class
"""
from voltha.extensions.omci.omci import *
# abbreviations
OP = EntityOperations
AA = AttributeAccess
class MEFrame(object):
"""Base class to help simplify Frame Creation"""
def __init__(self, entity_class, entity_id, data):
assert issubclass(entity_class, EntityClass), \
"'{}' must be a subclass of MEFrame".format(entity_class)
self.check_type(entity_id, int)
if not 0 <= entity_id <= 0xFFFF:
raise ValueError('entity_id should be 0..65535')
self._class = entity_class
self._entity_id = entity_id
self.data = data
def __str__(self):
return '{}: Entity_ID: {}, Data: {}'.\
format(self.entity_class_name, self._entity_id, self.data)
@property
def entity_class(self):
"""
The Entity Class for this ME
:return: (EntityClass) Entity class
"""
return self._class
@property
def entity_class_name(self):
return self._class.__class__.__name__
@property
def entity_id(self):
"""
The Entity ID for this ME frame
:return: (int) Entity ID (0..0xFFFF)
"""
return self._entity_id
@staticmethod
def check_type(param, types):
if not isinstance(param, types):
raise TypeError("param '{}' should be a {}".format(param, types))
def _check_operation(self, operation):
allowed = self.entity_class.mandatory_operations | self.entity_class.optional_operations
assert operation in allowed, "{} not allowed for '{}'".format(str(operation).split('.')[1],
self.entity_class_name)
def _check_attributes(self, attributes, access):
for attribute in attributes:
index = self.entity_class.attribute_name_to_index_map.get(attribute)
# Bad attribute name (invalid or spelling error)?
assert index is not None, "Attribute '{}' is not valid for '{}'".format(attribute,
self.entity_class_name)
# Invalid access?
# TODO: Add read-only access to EntityClass _access set. Currently it is protected
assert access in self.entity_class.attributes[index]._access,\
"No '{} access for attribute '{}".format(str(access).split('.')[1], attribute)
@staticmethod
def _attr_to_data(attributes):
"""
Convert an object into the 'data' set or dictionary for get/set/create/delete
requests.
This method takes a 'string', 'list', or 'set' for get requests and
converts it to a 'set' of attributes.
For create/set requests a dictionary of attribute/value pairs is required
:param attributes: (basestring, list, set, dict) attributes. For gets
a string, list, set, or dict can be provided. For create/set
operations, a dictionary should be provided. For delete
the attributes may be None since they are ignored.
:return: (set, dict) set for get/deletes, dict for create/set
"""
if isinstance(attributes, basestring):
# data = [str(attributes)]
data = set()
data.add(str(attributes))
elif isinstance(attributes, list):
assert all(isinstance(attr, basestring) for attr in attributes),\
'attribute list must be strings'
data = {str(attr) for attr in attributes}
assert len(data) == len(attributes), 'Attributes were not unique'
elif isinstance(attributes, set):
assert all(isinstance(attr, basestring) for attr in attributes),\
'attribute set must be strings'
data = {str(attr) for attr in attributes}
elif isinstance(attributes, (dict, type(None))):
data = attributes
else:
raise TypeError("Unsupported attributes type '{}'".format(type(attributes)))
return data
def create(self):
"""
Create a Create request frame for this ME
:return: (OmciFrame) OMCI Frame
"""
assert hasattr(self.entity_class, 'class_id'), 'class_id required for Create actions'
assert hasattr(self, 'entity_id'), 'entity_id required for Create actions'
assert hasattr(self, 'data'), 'data required for Create actions'
data = getattr(self, 'data')
MEFrame.check_type(data, dict)
assert len(data) > 0, 'No attributes supplied'
self._check_operation(OP.Create)
self._check_attributes(data.keys(), AA.Writable)
return OmciFrame(
transaction_id=None,
message_type=OmciCreate.message_id,
omci_message=OmciCreate(
entity_class=getattr(self.entity_class, 'class_id'),
entity_id=getattr(self, 'entity_id'),
data=data
))
def delete(self):
"""
Create a Delete request frame for this ME
:return: (OmciFrame) OMCI Frame
"""
self._check_operation(OP.Delete)
return OmciFrame(
transaction_id=None,
message_type=OmciGet.message_id,
omci_message=OmciGet(
entity_class=getattr(self.entity_class, 'class_id'),
entity_id=getattr(self, 'entity_id')
))
def set(self):
"""
Create a Set request frame for this ME
:return: (OmciFrame) OMCI Frame
"""
assert hasattr(self, 'data'), 'data required for Set actions'
data = getattr(self, 'data')
MEFrame.check_type(data, dict)
assert len(data) > 0, 'No attributes supplied'
self._check_operation(OP.Set)
self._check_attributes(data.keys(), AA.Writable)
return OmciFrame(
transaction_id=None,
message_type=OmciSet.message_id,
omci_message=OmciSet(
entity_class=getattr(self.entity_class, 'class_id'),
entity_id=getattr(self, 'entity_id'),
attributes_mask=self.entity_class.mask_for(*data.keys()),
data=data
))
def get(self):
"""
Create a Get request frame for this ME
:return: (OmciFrame) OMCI Frame
"""
assert hasattr(self, 'data'), 'data required for Get actions'
data = getattr(self, 'data')
MEFrame.check_type(data, (list, set, dict))
assert len(data) > 0, 'No attributes supplied'
mask_set = data.keys() if isinstance(data, dict) else data
self._check_operation(OP.Get)
self._check_attributes(mask_set, AA.Readable)
return OmciFrame(
transaction_id=None,
message_type=OmciGet.message_id,
omci_message=OmciGet(
entity_class=getattr(self.entity_class, 'class_id'),
entity_id=getattr(self, 'entity_id'),
attributes_mask=self.entity_class.mask_for(*mask_set)
))