VOL-714: OpenOMCI Performance Monitoring Intervals State Machine implementation
Patch to remove import of OmciEvents event bus. Should not be part of this commit
Change-Id: I8784280479051ff81f940fa2edb73cde2ff2e4cb
diff --git a/voltha/extensions/omci/me_frame.py b/voltha/extensions/omci/me_frame.py
index 743ee1a..c713bc2 100644
--- a/voltha/extensions/omci/me_frame.py
+++ b/voltha/extensions/omci/me_frame.py
@@ -314,3 +314,26 @@
attributes_mask=self.entity_class.mask_for(*mask_set),
command_sequence_number=data.values()[0]
))
+
+ def synchronize_time(self, time=None):
+ """
+ Create a Synchronize Time request from for this ME
+ :param time: (DateTime) Time to set to. If none, use UTC
+ :return: (OmciFrame) OMCI Frame
+ """
+ from datetime import datetime
+ self._check_operation(OP.SynchronizeTime)
+ dt = time or datetime.utcnow()
+
+ return OmciFrame(
+ transaction_id=None,
+ message_type=OmciSynchronizeTime.message_id,
+ omci_message=OmciSynchronizeTime(
+ entity_class=getattr(self.entity_class, 'class_id'),
+ entity_id=getattr(self, 'entity_id'),
+ year=dt.year,
+ month=dt.month,
+ hour=dt.hour,
+ minute=dt.minute,
+ second=dt.second,
+ ))
diff --git a/voltha/extensions/omci/omci_cc.py b/voltha/extensions/omci/omci_cc.py
index c873ef3..6cc05c4 100644
--- a/voltha/extensions/omci/omci_cc.py
+++ b/voltha/extensions/omci/omci_cc.py
@@ -123,7 +123,7 @@
:return: (str) Topic string
"""
assert event in OmciCCRxEvents, \
- 'Event {} is not an OMCI-CC Rx Event'.format(event.Name)
+ 'Event {} is not an OMCI-CC Rx Event'.format(event.name)
return 'omci-rx:{}:{}'.format(device_id, event.name)
diff --git a/voltha/extensions/omci/omci_entities.py b/voltha/extensions/omci/omci_entities.py
index 66c4bda..64ebab9 100644
--- a/voltha/extensions/omci/omci_entities.py
+++ b/voltha/extensions/omci/omci_entities.py
@@ -31,7 +31,7 @@
class EntityClassAttribute(object):
def __init__(self, fld, access=set(), optional=False, range_check=None,
- avc=False, deprecated=False):
+ avc=False, tca=False, counter=False, deprecated=False):
"""
Initialize an Attribute for a Managed Entity Class
@@ -40,6 +40,9 @@
:param optional: (boolean) If true, attribute is option, else mandatory
:param range_check: (callable) None, Lambda, or Function to validate value
:param avc: (boolean) If true, an AVC notification can occur for the attribute
+ :param tca: (boolean) If true, a threshold crossing alert alarm notification can occur
+ for the attribute
+ :param counter: (boolean) If true, this attribute is a PM counter
:param deprecated: (boolean) If true, this attribute is deprecated and
only 'read' operations (if-any) performed.
"""
@@ -48,6 +51,8 @@
self._optional = optional
self._range_check = range_check
self._avc = avc
+ self._tca = tca
+ self._counter = counter
self._deprecated = deprecated
@property
@@ -63,6 +68,10 @@
return self._optional
@property
+ def is_counter(self):
+ return self._counter
+
+ @property
def range_check(self):
return self._range_check
@@ -1108,6 +1117,249 @@
mandatory_operations = {OP.Set, OP.Get, OP.GetNext}
notifications = {OP.AttributeValueChange}
+
+class EthernetPMMonitoringHistoryData(EntityClass):
+ class_id = 24
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R, AA.SBC}),
+ ECA(ByteField("interval_end_time", None), {AA.R}),
+ ECA(ShortField("threshold_data_1_2_id", None), {AA.R, AA.W, AA.SBC}),
+ ECA(IntField("fcs_errors", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("excessive_collision_counter", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("late_collision_counter", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("frames_too_long", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("buffer_overflows_on_rx", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("buffer_overflows_on_tx", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("single_collision_frame_counter", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("multiple_collisions_frame_counter", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("sqe_counter", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("deferred_tx_counter", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("internal_mac_tx_error_counter", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("carrier_sense_error_counter", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("alignment_error_counter", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("internal_mac_rx_error_counter", None), {AA.R}, tca=True, counter=True)
+ ]
+ mandatory_operations = {OP.Create, OP.Delete, OP.Get, OP.Set, OP.GetCurrentData}
+ notifications = {OP.AlarmNotification}
+
+
+class FecPerformanceMonitoringHistoryData(EntityClass):
+ class_id = 312
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R, AA.SBC}),
+ ECA(ByteField("interval_end_time", None), {AA.R}),
+ ECA(ShortField("threshold_data_1_2_id", None), {AA.R, AA.W, AA.SBC}),
+ ECA(IntField("corrected_bytes", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("corrected_code_words", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("uncorrectable_code_words", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("total_code_words", None), {AA.R}, counter=True),
+ ECA(ShortField("fec_seconds", None), {AA.R}, tca=True, counter=True)
+ ]
+ mandatory_operations = {OP.Create, OP.Delete, OP.Get, OP.Set, OP.GetCurrentData}
+ notifications = {OP.AlarmNotification}
+
+
+class EthernetFrameDownstreamPerformanceMonitoringHistoryData(EntityClass):
+ class_id = 321
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R, AA.SBC}),
+ ECA(ByteField("interval_end_time", None), {AA.R}),
+ ECA(ShortField("threshold_data_1_2_id", None), {AA.R, AA.W, AA.SBC}),
+ ECA(IntField("drop_events", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("octets", None), {AA.R}, counter=True),
+ ECA(IntField("packets", None), {AA.R}, counter=True),
+ ECA(IntField("broadcast_packets", None), {AA.R}, counter=True),
+ ECA(IntField("multicast_packets", None), {AA.R}, counter=True),
+ ECA(IntField("crc_errored_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("undersize_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("oversize_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("64_octets", None), {AA.R}, counter=True),
+ ECA(IntField("65_to_127_octets", None), {AA.R}, counter=True),
+ ECA(IntField("128_to_255_octets", None), {AA.R}, counter=True),
+ ECA(IntField("256_to_511_octets", None), {AA.R}, counter=True),
+ ECA(IntField("512_to_1023_octets", None), {AA.R}, counter=True),
+ ECA(IntField("1024_to_1518_octets", None), {AA.R}, counter=True)
+ ]
+ mandatory_operations = {OP.Create, OP.Delete, OP.Get, OP.Set, OP.GetCurrentData}
+ notifications = {OP.AlarmNotification}
+
+
+class EthernetFrameUpstreamPerformanceMonitoringHistoryData(EntityClass):
+ class_id = 322
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R, AA.SBC}),
+ ECA(ByteField("interval_end_time", None), {AA.R}),
+ ECA(ShortField("threshold_data_1_2_id", None), {AA.R, AA.W, AA.SBC}),
+ ECA(IntField("drop_events", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("octets", None), {AA.R}, counter=True),
+ ECA(IntField("packets", None), {AA.R}, counter=True),
+ ECA(IntField("broadcast_packets", None), {AA.R}, counter=True),
+ ECA(IntField("multicast_packets", None), {AA.R}, counter=True),
+ ECA(IntField("crc_errored_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("undersize_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("oversize_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("64_octets", None), {AA.R}, counter=True),
+ ECA(IntField("65_to_127_octets", None), {AA.R}, counter=True),
+ ECA(IntField("128_to_255_octets", None), {AA.R}, counter=True),
+ ECA(IntField("256_to_511_octets", None), {AA.R}, counter=True),
+ ECA(IntField("512_to_1023_octets", None), {AA.R}, counter=True),
+ ECA(IntField("1024_to_1518_octets", None), {AA.R}, counter=True)
+ ]
+ mandatory_operations = {OP.Create, OP.Delete, OP.Get, OP.Set, OP.GetCurrentData}
+ notifications = {OP.AlarmNotification}
+
+
+class EthernetFrameExtendedPerformanceMonitoring(EntityClass):
+ class_id = 334
+
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R, AA.SBC}),
+ ECA(ByteField("interval_end_time", None), {AA.R}),
+ # 2-octet field -> Threshold data 1/2 ID
+ # 2-octet field -> Parent ME Class
+ # 2-octet field -> Parent ME Instance
+ # 2-octet field -> Accumulation disable
+ # 2-octet field -> TCA Disable
+ # 2-octet field -> Control fields bitmap
+ # 2-octet field -> TCI
+ # 2-octet field -> Reserved
+ ECA(FieldListField("control_block", None, ShortField('', 0),
+ count_from=lambda _: 8), {AA.R, AA.W, AA.SBC}),
+ ECA(IntField("drop_events", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("octets", None), {AA.R}, counter=True),
+ ECA(IntField("packets", None), {AA.R}, counter=True),
+ ECA(IntField("broadcast_packets", None), {AA.R}, counter=True),
+ ECA(IntField("multicast_packets", None), {AA.R}, counter=True),
+ ECA(IntField("crc_errored_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("undersize_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("oversize_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("64_octets", None), {AA.R}, counter=True),
+ ECA(IntField("65_to_127_octets", None), {AA.R}, counter=True),
+ ECA(IntField("128_to_255_octets", None), {AA.R}, counter=True),
+ ECA(IntField("256_to_511_octets", None), {AA.R}, counter=True),
+ ECA(IntField("512_to_1023_octets", None), {AA.R}, counter=True),
+ ECA(IntField("1024_to_1518_octets", None), {AA.R}, counter=True)
+ ]
+ mandatory_operations = {OP.Create, OP.Delete, OP.Get, OP.Set}
+ optional_operations = {OP.GetCurrentData}
+ notifications = {OP.AlarmNotification}
+
+
+class EthernetFrameExtendedPerformanceMonitoring64Bit(EntityClass):
+ class_id = 426
+
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R, AA.SBC}),
+ ECA(ByteField("interval_end_time", None), {AA.R}),
+ # 2-octet field -> Threshold data 1/2 ID
+ # 2-octet field -> Parent ME Class
+ # 2-octet field -> Parent ME Instance
+ # 2-octet field -> Accumulation disable
+ # 2-octet field -> TCA Disable
+ # 2-octet field -> Control fields bitmap
+ # 2-octet field -> TCI
+ # 2-octet field -> Reserved
+ ECA(FieldListField("control_block", None, ShortField('', 0),
+ count_from=lambda _: 8), {AA.R, AA.W, AA.SBC}),
+ ECA(LongField("drop_events", None), {AA.R}, tca=True, counter=True),
+ ECA(LongField("octets", None), {AA.R}, counter=True),
+ ECA(LongField("packets", None), {AA.R}, counter=True),
+ ECA(LongField("broadcast_packets", None), {AA.R}, counter=True),
+ ECA(LongField("multicast_packets", None), {AA.R}, counter=True),
+ ECA(LongField("crc_errored_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(LongField("undersize_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(LongField("oversize_packets", None), {AA.R}, tca=True, counter=True),
+ ECA(LongField("64_octets", None), {AA.R}, counter=True),
+ ECA(LongField("65_to_127_octets", None), {AA.R}, counter=True),
+ ECA(LongField("128_to_255_octets", None), {AA.R}, counter=True),
+ ECA(LongField("256_to_511_octets", None), {AA.R}, counter=True),
+ ECA(LongField("512_to_1023_octets", None), {AA.R}, counter=True),
+ ECA(LongField("1024_to_1518_octets", None), {AA.R}, counter=True)
+ ]
+ mandatory_operations = {OP.Create, OP.Delete, OP.Get, OP.Set}
+ optional_operations = {OP.GetCurrentData}
+ notifications = {OP.AlarmNotification}
+
+
+class GemPortNetworkCtpMonitoringHistoryData(EntityClass):
+ class_id = 341
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R, AA.SBC}),
+ ECA(ByteField("interval_end_time", None), {AA.R}),
+ ECA(ShortField("threshold_data_1_2_id", None), {AA.R, AA.W, AA.SBC}),
+ ECA(IntField("transmitted_gem_frames", None), {AA.R}, counter=True),
+ ECA(IntField("received_gem_frames", None), {AA.R}, counter=True),
+ ECA(LongField("received_payload_bytes", None), {AA.R}, counter=True),
+ ECA(LongField("transmitted_payload_bytes", None), {AA.R}, counter=True),
+ ECA(IntField("encryption_key_errors", None), {AA.R}, tca=True, counter=True)
+ ]
+ mandatory_operations = {OP.Create, OP.Delete, OP.Get, OP.Set, OP.GetCurrentData}
+ notifications = {OP.AlarmNotification}
+
+
+class XgPonTcPerformanceMonitoringHistoryData(EntityClass):
+ class_id = 344
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R, AA.SBC}),
+ ECA(ByteField("interval_end_time", None), {AA.R}),
+ ECA(ShortField("threshold_data_1_2_id", None), {AA.R, AA.W, AA.SBC}),
+ ECA(IntField("psbd_hec_error_count", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("xgtc_hec_error_count", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("unknown_profile_count", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("transmitted_xgem_frames", None), {AA.R}, counter=True),
+ ECA(IntField("fragment_xgem_frames", None), {AA.R}, counter=True),
+ ECA(IntField("xgem_hec_lost_words_count", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("xgem_key_errors", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("xgem_hec_error_count", None), {AA.R}, tca=True, counter=True)
+ ]
+ mandatory_operations = {OP.Create, OP.Delete, OP.Get, OP.Set}
+ optional_operations = {OP.GetCurrentData}
+ notifications = {OP.AlarmNotification}
+
+
+class XgPonDownstreamPerformanceMonitoringHistoryData(EntityClass):
+ class_id = 345
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R, AA.SBC}),
+ ECA(ByteField("interval_end_time", None), {AA.R},),
+ ECA(ShortField("threshold_data_1_2_id", None), {AA.R, AA.W, AA.SBC}),
+ ECA(IntField("ploam_mic_error_count", None), {AA.R}, tca=True, counter=True),
+ ECA(IntField("downstream_ploam_messages_count", None), {AA.R}, counter=True),
+ ECA(IntField("profile_messages_received", None), {AA.R}, counter=True),
+ ECA(IntField("ranging_time_messages_received", None), {AA.R}, counter=True),
+ ECA(IntField("deactivate_onu_id_messages_received", None), {AA.R}, counter=True),
+ ECA(IntField("disable_serial_number_messages_received", None), {AA.R}, counter=True),
+ ECA(IntField("request_registration_messages_receeved", None), {AA.R}, counter=True),
+ ECA(IntField("assign_alloc_id_messages_received", None), {AA.R}, counter=True),
+ ECA(IntField("key_control_messages_received", None), {AA.R}, counter=True),
+ ECA(IntField("sleep_allow_messages_received", None), {AA.R}, counter=True),
+ ECA(IntField("baseline_omci_messages_received_count", None), {AA.R}, counter=True),
+ ECA(IntField("extended_omci_messages_received_count", None), {AA.R}, counter=True),
+ ECA(IntField("assign_onu_id_messages_received", None), {AA.R}, counter=True),
+ ECA(IntField("omci_mic_error_count", None), {AA.R}, tca=True, counter=True),
+ ]
+ mandatory_operations = {OP.Create, OP.Delete, OP.Get, OP.Set}
+ optional_operations = {OP.GetCurrentData}
+ notifications = {OP.AlarmNotification}
+
+
+class XgPonUpstreamPerformanceMonitoringHistoryData(EntityClass):
+ class_id = 346
+ attributes = [
+ ECA(ShortField("managed_entity_id", None), {AA.R, AA.SBC}),
+ ECA(ByteField("interval_end_time", None), {AA.R}),
+ ECA(ShortField("threshold_data_1_2_id", None), {AA.R, AA.W, AA.SBC}),
+ ECA(IntField("upstream_ploam_message_count", None), {AA.R}, counter=True),
+ ECA(IntField("serial_number_onu_message_count", None), {AA.R}, counter=True),
+ ECA(IntField("registration_message_count", None), {AA.R}, counter=True),
+ ECA(IntField("key_report_message_count", None), {AA.R}, counter=True),
+ ECA(IntField("acknowledge_message_count", None), {AA.R}, counter=True),
+ ECA(IntField("sleep_request_message_count", None), {AA.R}, counter=True),
+ ]
+ mandatory_operations = {OP.Create, OP.Delete, OP.Get, OP.Set}
+ optional_operations = {OP.GetCurrentData}
+
+
# entity class lookup table from entity_class values
entity_classes_name_map = dict(
inspect.getmembers(sys.modules[__name__],
diff --git a/voltha/extensions/omci/omci_frame.py b/voltha/extensions/omci/omci_frame.py
index 1f06e5f..98faf42 100644
--- a/voltha/extensions/omci/omci_frame.py
+++ b/voltha/extensions/omci/omci_frame.py
@@ -25,7 +25,9 @@
OmciMibUploadNext, OmciMibUploadResponse, OmciMibUpload, \
OmciGetAllAlarmsNextResponse, OmciAttributeValueChange, \
OmciTestResult, OmciAlarmNotification, \
- OmciReboot, OmciRebootResponse, OmciGetNext, OmciGetNextResponse
+ OmciReboot, OmciRebootResponse, OmciGetNext, OmciGetNextResponse, \
+ OmciSynchronizeTime, OmciSynchronizeTimeResponse, OmciGetCurrentData, \
+ OmciGetCurrentDataResponse
from voltha.extensions.omci.omci_messages import OmciCreateResponse
@@ -121,6 +123,20 @@
PacketField("omci_message", None, OmciGetNextResponse), align=36),
lambda pkt: pkt.message_type == OmciGetNextResponse.message_id),
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciSynchronizeTime), align=36),
+ lambda pkt: pkt.message_type == OmciSynchronizeTime.message_id),
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciSynchronizeTimeResponse), align=36),
+ lambda pkt: pkt.message_type == OmciSynchronizeTimeResponse.message_id),
+
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciGetCurrentData), align=36),
+ lambda pkt: pkt.message_type == OmciGetCurrentData.message_id),
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciGetCurrentDataResponse), align=36),
+ lambda pkt: pkt.message_type == OmciGetCurrentDataResponse.message_id),
+
# TODO add entries for remaining OMCI message types
IntField("omci_trailer", 0x00000028)
diff --git a/voltha/extensions/omci/omci_me.py b/voltha/extensions/omci/omci_me.py
index 6af02fb..cf55a44 100644
--- a/voltha/extensions/omci/omci_me.py
+++ b/voltha/extensions/omci/omci_me.py
@@ -731,3 +731,186 @@
raise NotImplemented('Unknown request')
super(OmciFrame, self).__init__(Omci, 0, data)
+
+
+class EthernetPMMonitoringHistoryDataFrame(MEFrame):
+ """
+ This managed entity collects some of the performance monitoring data for a physical
+ Ethernet interface
+ """
+ def __init__(self, entity_id, attributes):
+ """
+ :param entity_id: (int) This attribute uniquely identifies each instance of
+ this managed entity. Through an identical ID, this
+ managed entity is implicitly linked to an instance
+ of the physical path termination point Ethernet UNI
+
+ :param attributes: (basestring, list, set, dict) attributes. For gets
+ a string, list, or set can be provided. For set
+ operations, a dictionary should be provided, for
+ deletes None may be specified.
+ """
+ super(EthernetPMMonitoringHistoryDataFrame, self).__init__(
+ EthernetPMMonitoringHistoryData,
+ entity_id,
+ MEFrame._attr_to_data(attributes))
+
+
+class FecPerformanceMonitoringHistoryDataFrame(MEFrame):
+ """
+ This managed entity collects performance monitoring data associated with PON
+ downstream FEC counters.
+ """
+ def __init__(self, entity_id, attributes):
+ """
+ :param entity_id: (int) This attribute uniquely identifies each instance of
+ this managed entity. Through an identical ID, this
+ managed entity is implicitly linked to an instance of
+ the ANI-G
+
+ :param attributes: (basestring, list, set, dict) attributes. For gets
+ a string, list, or set can be provided. For set
+ operations, a dictionary should be provided, for
+ deletes None may be specified.
+ """
+ super(FecPerformanceMonitoringHistoryDataFrame, self).__init__(
+ FecPerformanceMonitoringHistoryData,
+ entity_id,
+ MEFrame._attr_to_data(attributes))
+
+
+class EthernetFrameDownstreamPerformanceMonitoringHistoryDataFrame(MEFrame):
+ """
+ This managed entity collects performance monitoring data associated with downstream
+ Ethernet frame delivery. It is based on the Etherstats group of [IETF RFC 2819].
+ """
+ def __init__(self, entity_id, attributes):
+ """
+ :param entity_id: (int) This attribute uniquely identifies each instance of
+ this managed entity. Through an identical ID, this
+ managed entity is implicitly linked to an instance of
+ a MAC bridge port configuration data
+
+ :param attributes: (basestring, list, set, dict) attributes. For gets
+ a string, list, or set can be provided. For set
+ operations, a dictionary should be provided, for
+ deletes None may be specified.
+ """
+ super(EthernetFrameDownstreamPerformanceMonitoringHistoryDataFrame, self).__init__(
+ EthernetFrameDownstreamPerformanceMonitoringHistoryData,
+ entity_id,
+ MEFrame._attr_to_data(attributes))
+
+
+class EthernetFrameUpstreamPerformanceMonitoringHistoryDataFrame(MEFrame):
+ """
+ This managed entity collects performance monitoring data associated with upstream
+ Ethernet frame delivery. It is based on the Etherstats group of [IETF RFC 2819].
+ """
+ def __init__(self, entity_id, attributes):
+ """
+ :param entity_id: (int) This attribute uniquely identifies each instance of
+ this managed entity. Through an identical ID, this
+ managed entity is implicitly linked to an instance of
+ a MAC bridge port configuration data
+
+ :param attributes: (basestring, list, set, dict) attributes. For gets
+ a string, list, or set can be provided. For set
+ operations, a dictionary should be provided, for
+ deletes None may be specified.
+ """
+ super(EthernetFrameUpstreamPerformanceMonitoringHistoryDataFrame, self).__init__(
+ EthernetFrameUpstreamPerformanceMonitoringHistoryData,
+ entity_id,
+ MEFrame._attr_to_data(attributes))
+
+
+class GemPortNetworkCtpMonitoringHistoryDataFrame(MEFrame):
+ """
+ This managed entity collects GEM frame performance monitoring data associated
+ with a GEM port network CTP
+ """
+ def __init__(self, entity_id, attributes):
+ """
+ :param entity_id: (int) This attribute uniquely identifies each instance of
+ this managed entity. Through an identical ID, this
+ managed entity is implicitly linked to an instance
+ of the GEM port network CTP.
+
+ :param attributes: (basestring, list, set, dict) attributes. For gets
+ a string, list, or set can be provided. For set
+ operations, a dictionary should be provided, for
+ deletes None may be specified.
+ """
+ super(GemPortNetworkCtpMonitoringHistoryDataFrame, self).__init__(
+ GemPortNetworkCtpMonitoringHistoryData,
+ entity_id,
+ MEFrame._attr_to_data(attributes))
+
+
+class XgPonTcPerformanceMonitoringHistoryDataFrame(MEFrame):
+ """
+ This managed entity collects performance monitoring data associated with
+ the XG-PON transmission convergence layer, as defined in [ITU-T G.987.3]
+ """
+ def __init__(self, entity_id, attributes):
+ """
+ :param entity_id: (int) This attribute uniquely identifies each instance of
+ this managed entity. Through an identical ID, this
+ managed entity is implicitly linked to an instance of
+ the ANI-G.
+
+ :param attributes: (basestring, list, set, dict) attributes. For gets
+ a string, list, or set can be provided. For set
+ operations, a dictionary should be provided, for
+ deletes None may be specified.
+ """
+ super(XgPonTcPerformanceMonitoringHistoryDataFrame, self).__init__(
+ XgPonTcPerformanceMonitoringHistoryData, entity_id,
+ MEFrame._attr_to_data(attributes))
+
+
+class XgPonDownstreamPerformanceMonitoringHistoryDataFrame(MEFrame):
+ """
+ This managed entity collects performance monitoring data associated with
+ the XG-PON ined in [ITU-T G.987.3]
+ """
+ def __init__(self, entity_id, attributes):
+ """
+ :param entity_id: (int) This attribute uniquely identifies each instance of
+ this managed entity. Through an identical ID, this
+ managed entity is implicitly linked to an instance of
+ the ANI-G.
+
+ :param attributes: (basestring, list, set, dict) attributes. For gets
+ a string, list, or set can be provided. For set
+ operations, a dictionary should be provided, for
+ deletes None may be specified.
+ """
+ super(XgPonDownstreamPerformanceMonitoringHistoryDataFrame, self).__init__(
+ XgPonDownstreamPerformanceMonitoringHistoryData,
+ entity_id,
+ MEFrame._attr_to_data(attributes))
+
+
+class XgPonUpstreamPerformanceMonitoringHistoryDataFrame(MEFrame):
+ """
+ This managed entity collects performance monitoring data associated with
+ the XG-PON transmission convergence layer, as defined in [ITU-T G.987.3]
+ """
+ def __init__(self, entity_id, attributes):
+ """
+ :param entity_id: (int) TThis attribute uniquely identifies each instance of
+ this managed entity. Through an identical ID, this
+ managed entity is implicitly linked to an instance of
+ the ANI-G.
+
+ :param attributes: (basestring, list, set, dict) attributes. For gets
+ a string, list, or set can be provided. For set
+ operations, a dictionary should be provided, for
+ deletes None may be specified.
+ """
+ super(XgPonUpstreamPerformanceMonitoringHistoryDataFrame, self).__init__(
+ XgPonUpstreamPerformanceMonitoringHistoryData,
+ entity_id,
+ MEFrame._attr_to_data(attributes))
diff --git a/voltha/extensions/omci/omci_messages.py b/voltha/extensions/omci/omci_messages.py
index 57d2be9..2e11136 100644
--- a/voltha/extensions/omci/omci_messages.py
+++ b/voltha/extensions/omci/omci_messages.py
@@ -375,3 +375,54 @@
lambda pkt: pkt.success_code == 0)
]
+
+class OmciSynchronizeTime(OmciMessage):
+ name = "OmciSynchronizeTime"
+ message_id = 0x58
+ fields_desc = [
+ ShortField("entity_class", 256), # OntG
+ ShortField("entity_id", 0),
+ ShortField("year", 0), # eg) 2018
+ ByteField("month", 0), # 1..12
+ ByteField("day", 0), # 1..31
+ ByteField("hour", 0), # 0..23
+ ByteField("minute", 0), # 0..59
+ ByteField("second", 0) # 0..59
+ ]
+
+
+class OmciSynchronizeTimeResponse(OmciMessage):
+ name = "OmciSynchronizeTimeResponse"
+ message_id = 0x38
+ fields_desc = [
+ ShortField("entity_class", 256), # OntG
+ ShortField("entity_id", 0),
+ ByteField("success_code", 0),
+ ConditionalField(ShortField("success_info", None),
+ lambda pkt: pkt.success_code == 0)
+ ]
+
+
+class OmciGetCurrentData(OmciMessage):
+ name = "OmciGetCurrentData"
+ message_id = 0x5C
+ fields_desc = [
+ ShortField("entity_class", None),
+ ShortField("entity_id", 0),
+ ShortField("attributes_mask", None),
+ ]
+
+
+class OmciGetCurrentDataResponse(OmciMessage):
+ name = "OmciGetCurrentDataResponse"
+ message_id = 0x3C
+ fields_desc = [
+ ShortField("entity_class", None),
+ ShortField("entity_id", 0),
+ ByteField("success_code", 0),
+ ShortField("attributes_mask", None),
+ ShortField("unsupported_attributes_mask", None),
+ ShortField("failed_attributes_mask", None),
+ ConditionalField(
+ OmciMaskedData("data"), lambda pkt: pkt.success_code == 0)
+ ]
diff --git a/voltha/extensions/omci/onu_configuration.py b/voltha/extensions/omci/onu_configuration.py
index fbd5018..6010294 100644
--- a/voltha/extensions/omci/onu_configuration.py
+++ b/voltha/extensions/omci/onu_configuration.py
@@ -138,6 +138,17 @@
return ontg[ATTRIBUTES_KEY].get('version')
@property
+ def serial_number(self):
+ """
+ The serial number is unique for each ONU
+ """
+ ontg = self._get_capability('_ont_g', OntG.class_id, 0)
+ if ontg is None or ATTRIBUTES_KEY not in ontg:
+ return None
+
+ return ontg[ATTRIBUTES_KEY].get('serial_number')
+
+ @property
def traffic_management_option(self):
"""
This attribute identifies the upstream traffic management function
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 6a5497d..88d75d4 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -88,8 +88,12 @@
self._capabilities_sm = capabilities_info['state-machine'](self._omci_agent,
device_id,
capabilities_info['tasks'])
+ # ONU Performance Monitoring Intervals state machine
+ interval_info = support_classes.get('performance-intervals')
+ self._pm_intervals_sm = interval_info['state-machine'](self._omci_agent, device_id,
+ interval_info['tasks'])
except Exception as e:
- self.log.exception('mib-sync-create-failed', e=e)
+ self.log.exception('state-machine-create-failed', e=e)
raise
# Put state machines in the order you wish to start them
@@ -100,7 +104,7 @@
self._capabilities_sm,
]
self._on_sync_state_machines = [ # Run after first in_sync event
-
+ self._pm_intervals_sm
]
self._custom_me_map = custom_me_map
self._me_map = omci_entities.entity_id_to_class_map.copy()
@@ -122,7 +126,7 @@
:return: (str) Topic string
"""
assert event in OnuDeviceEvents, \
- 'Event {} is not an ONU Device Event'.format(event.Name)
+ 'Event {} is not an ONU Device Event'.format(event.name)
return 'omci-device:{}:{}'.format(device_id, event.name)
@property
@@ -152,6 +156,13 @@
return self._capabilities_sm
@property
+ def pm_intervals_state_machine(self):
+ """
+ Reference to the OpenOMCI PM Intervals state machine for this ONU
+ """
+ return self._pm_intervals_sm
+
+ @property
def active(self):
"""
Is the ONU device currently active/running
@@ -278,7 +289,7 @@
def start_state_machines(machines):
for sm in machines:
self._state_machines.append(sm)
- sm.start()
+ reactor.callLater(0, sm.start)
self._deferred = reactor.callLater(0, start_state_machines,
self._on_sync_state_machines)
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index c78d8e5..0e21b29 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -20,9 +20,14 @@
from voltha.extensions.omci.tasks.mib_upload import MibUploadTask
from voltha.extensions.omci.tasks.get_mds_task import GetMdsTask
from voltha.extensions.omci.tasks.mib_resync_task import MibResyncTask
+from voltha.extensions.omci.tasks.sync_time_task import SyncTimeTask
+from voltha.extensions.omci.tasks.interval_data_task import IntervalDataTask
from voltha.extensions.omci.onu_device_entry import OnuDeviceEntry
from voltha.extensions.omci.state_machines.omci_onu_capabilities import OnuOmciCapabilities
from voltha.extensions.omci.tasks.onu_capabilities_task import OnuCapabilitiesTask
+from voltha.extensions.omci.state_machines.performance_intervals import PerformanceIntervals
+from voltha.extensions.omci.tasks.omci_create_pm_task import OmciCreatePMRequest
+from voltha.extensions.omci.tasks.omci_delete_pm_task import OmciDeletePMRequest
OpenOmciAgentDefaults = {
'mib-synchronizer': {
@@ -38,13 +43,22 @@
}
},
'omci-capabilities': {
- 'state-machine': OnuOmciCapabilities, # Implements OMCI capabilities state mach9ine
+ 'state-machine': OnuOmciCapabilities, # Implements OMCI capabilities state machine
'tasks': {
'get-capabilities': OnuCapabilitiesTask # Get supported ME and Commands
}
+ },
+ 'performance-intervals': {
+ 'state-machine': PerformanceIntervals, # Implements PM Intervals State machine
+ 'tasks': {
+ 'sync-time': SyncTimeTask,
+ 'collect-data': IntervalDataTask,
+ 'create-pm': OmciCreatePMRequest,
+ 'delete-pm': OmciDeletePMRequest,
+ },
}
# 'alarm-syncronizer': {
- # 'state-machine': AlarmSynchronizer, # Implements the MIB synchronization state machine
+ # 'state-machine': AlarmSynchronizer, # Implements the Alarm sync state machine
# 'database': AlarmDb, # For any State storage needs
# 'tasks': {
# 'task-1': needToWrite,
@@ -133,7 +147,6 @@
self._started = False
# ONUs OMCI shutdown
-
for device in self._devices.itervalues():
device.stop()
diff --git a/voltha/extensions/omci/state_machines/performance_intervals.py b/voltha/extensions/omci/state_machines/performance_intervals.py
new file mode 100644
index 0000000..b3c656c
--- /dev/null
+++ b/voltha/extensions/omci/state_machines/performance_intervals.py
@@ -0,0 +1,778 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from transitions import Machine
+from datetime import datetime, timedelta
+from random import uniform, shuffle
+from twisted.internet import reactor
+from common.utils.indexpool import IndexPool
+from voltha.extensions.omci.omci_defs import EntityOperations, ReasonCodes
+from voltha.extensions.omci.omci_cc import OmciCCRxEvents, OMCI_CC, TX_REQUEST_KEY, \
+ RX_RESPONSE_KEY
+from voltha.extensions.omci.database.mib_db_api import ATTRIBUTES_KEY
+from voltha.extensions.omci.omci_entities import MacBridgePortConfigurationData
+from voltha.extensions.omci.omci_entities import EthernetPMMonitoringHistoryData, \
+ FecPerformanceMonitoringHistoryData, \
+ XgPonTcPerformanceMonitoringHistoryData, \
+ XgPonDownstreamPerformanceMonitoringHistoryData, \
+ XgPonUpstreamPerformanceMonitoringHistoryData, \
+ EthernetFrameUpstreamPerformanceMonitoringHistoryData, \
+ EthernetFrameDownstreamPerformanceMonitoringHistoryData, \
+ EthernetFrameExtendedPerformanceMonitoring, \
+ EthernetFrameExtendedPerformanceMonitoring64Bit
+
+
+RxEvent = OmciCCRxEvents
+OP = EntityOperations
+RC = ReasonCodes
+
+
+class PerformanceIntervals(object):
+ """
+ OpenOMCI ONU Performance Monitoring Intervals State machine
+
+ This state machine focuses on L2 Internet Data Service and Classical
+ PM (for the v2.0 release).
+ """
+ DEFAULT_STATES = ['disabled', 'starting', 'synchronize_time', 'idle', 'create_pm_me',
+ 'collect_data', 'threshold_exceeded']
+
+ DEFAULT_TRANSITIONS = [
+ {'trigger': 'start', 'source': 'disabled', 'dest': 'starting'},
+ {'trigger': 'tick', 'source': 'starting', 'dest': 'synchronize_time'},
+
+ {'trigger': 'success', 'source': 'synchronize_time', 'dest': 'idle'},
+ {'trigger': 'failure', 'source': 'synchronize_time', 'dest': 'synchronize_time'},
+
+ {'trigger': 'tick', 'source': 'idle', 'dest': 'collect_data'},
+ {'trigger': 'add_me', 'source': 'idle', 'dest': 'create_pm_me'},
+ {'trigger': 'delete_me', 'source': 'idle', 'dest': 'delete_pm_me'},
+
+ {'trigger': 'success', 'source': 'create_pm_me', 'dest': 'idle'},
+ {'trigger': 'failure', 'source': 'create_pm_me', 'dest': 'idle'},
+
+ {'trigger': 'success', 'source': 'delete_pm_me', 'dest': 'idle'},
+ {'trigger': 'failure', 'source': 'delete_pm_me', 'dest': 'idle'},
+
+ {'trigger': 'success', 'source': 'collect_data', 'dest': 'idle'},
+ {'trigger': 'failure', 'source': 'collect_data', 'dest': 'idle'},
+
+ # TODO: Add rebooted event transitions to disabled or synchronize_time
+ # TODO: Need to capture Threshold Crossing Alarms appropriately
+
+ # Do wildcard 'stop' trigger last so it covers all previous states
+ {'trigger': 'stop', 'source': '*', 'dest': 'disabled'},
+ {'trigger': 'reboot', 'source': '*', 'dest': 'rebooted'},
+ ]
+ DEFAULT_RETRY = 10 # Seconds to delay after task failure/timeout/poll
+ DEFAULT_TICK_DELAY = 5 # Seconds between checks for collection tick
+ DEFAULT_INTERVAL_SKEW = 10 * 60 # Seconds to skew past interval boundary
+ DEFAULT_COLLECT_ATTEMPTS = 3 # Maximum number of collection fetch attempts
+
+ def __init__(self, agent, device_id, tasks,
+ states=DEFAULT_STATES,
+ transitions=DEFAULT_TRANSITIONS,
+ initial_state='disabled',
+ timeout_delay=DEFAULT_RETRY,
+ tick_delay=DEFAULT_TICK_DELAY,
+ interval_skew=DEFAULT_INTERVAL_SKEW,
+ collect_attempts=DEFAULT_COLLECT_ATTEMPTS):
+ """
+ Class initialization
+
+ :param agent: (OpenOmciAgent) Agent
+ :param device_id: (str) ONU Device ID
+ :param tasks: (dict) Tasks to run
+ :param states: (list) List of valid states
+ :param transitions: (dict) Dictionary of triggers and state changes
+ :param initial_state: (str) Initial state machine state
+ :param timeout_delay: (int/float) Number of seconds after a timeout to pause
+ :param tick_delay: (int/float) Collection poll check delay while idle
+ :param interval_skew: (int/float) Seconds to randomly skew the next interval
+ collection to spread out requests for PM intervals
+ :param collect_attempts: (int) Max requests for a single PM interval before fail
+ """
+ self.log = structlog.get_logger(device_id=device_id)
+
+ self._agent = agent
+ self._device_id = device_id
+ self._device = None
+ self._timeout_delay = timeout_delay
+ self._tick_delay = tick_delay
+ self._interval_skew = interval_skew
+ self._collect_attempts = collect_attempts
+
+ self._sync_time_task = tasks['sync-time']
+ self._get_interval_task = tasks['collect-data']
+ self._create_pm_task = tasks['create-pm']
+ self._delete_pm_task = tasks['delete-pm']
+
+ self._omci_cc_subscriptions = { # RxEvent.enum -> Subscription Object
+ RxEvent.MIB_Reset: None,
+ RxEvent.Create: None,
+ RxEvent.Delete: None
+ }
+ self._omci_cc_sub_mapping = {
+ RxEvent.MIB_Reset: self.on_mib_reset_response,
+ RxEvent.Create: self.on_create_response,
+ RxEvent.Delete: self.on_delete_response,
+ }
+ self._me_watch_list = {
+ MacBridgePortConfigurationData.class_id: {
+ 'create-delete': self.add_remove_enet_frame_pm,
+ 'instances': dict() # BP entity_id -> (PM class_id, PM entity_id)
+ }
+ }
+ self._deferred = None
+ self._task_deferred = None
+ self._current_task = None
+ self._add_me_deferred = None
+ self._delete_me_deferred = None
+ self._next_interval = None
+ self._enet_entity_id = IndexPool(1024, 1)
+
+ # (Class ID, Instance ID) -> Collect attempts remaining
+ self._pm_me_collect_retries = dict()
+ self._add_pm_me = dict() # (pm cid, pm eid) -> (me cid, me eid, upstream)
+ self._del_pm_me = set()
+
+ # Statistics and attributes
+ # TODO: add any others if it will support problem diagnosis
+
+ # Set up state machine to manage states
+ self.machine = Machine(model=self, states=states,
+ transitions=transitions,
+ initial=initial_state,
+ queued=True,
+ name='{}-{}'.format(self.__class__.__name__,
+ device_id))
+
+ def _cancel_deferred(self):
+ d1, self._deferred = self._deferred, None
+ d2, self._task_deferred = self._task_deferred, None
+ d3, self._add_me_deferred = self._add_me_deferred, None
+ d4, self._delete_me_deferred = self._delete_me_deferred, None
+
+ for d in [d1, d2, d3, d4]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def _cancel_tasks(self):
+ task, self._current_task = self._current_task, None
+ if task is not None:
+ task.stop()
+
+ def __str__(self):
+ return 'PerformanceIntervals: Device ID: {}, State:{}'.format(self._device_id,
+ self.state)
+
+ def delete(self):
+ """
+ Cleanup any state information
+ """
+ self.stop()
+
+ @property
+ def device_id(self):
+ return self._device_id
+
+ def _me_is_supported(self, class_id):
+ """
+ Check to see if ONU supports this ME
+
+ :param class_id: (int) ME Class ID
+ :return: (bool) If ME is supported
+ """
+ #
+ supported = self._device.omci_capabilities.supported_managed_entities
+ return class_id in supported if supported is not None else False
+
+ def add_pm_me(self, pm_class_id, pm_entity_id, cid=0, eid=0, upstream=False):
+ """
+ Add a new Performance Monitoring ME.
+
+ The ME ID will be added to an internal list and will be added the next
+ time the idle state is reached. An 'add_pm_me' trigger will be raised in
+ case already in the Idle state.
+
+ :param pm_class_id: (int) ME Class ID (1..0xFFFE)
+ :param pm_entity_id: (int) Instancec ID (1..0xFFFE)
+ :param cid: (int) Class ID of entity monitored, may be None
+ :param eid: (int) Instance ID of entity monitored, may be None
+ :param upstream: (bool): Flag indicating if PM is for upstream traffic
+ """
+ if not isinstance(pm_class_id, int):
+ raise TypeError('PM ME Instance ID is an integer')
+ if not 0 < pm_class_id < 0xFFFF:
+ raise ValueError('PM ME Instance ID must be 1..65534')
+
+ # Check to see if ONU supports this ME
+ if not self._me_is_supported(pm_class_id):
+ self.log.warn('unsupported-PM-me', class_id=pm_class_id)
+ return
+
+ key = (pm_class_id, pm_entity_id)
+ entry = (cid, eid, upstream)
+
+ if key not in self._pm_me_collect_retries and key not in self._add_pm_me:
+ self._add_pm_me[key] = entry
+
+ if self._add_me_deferred is None:
+ self._add_me_deferred = reactor.callLater(0, self.add_me)
+
+ if (pm_class_id, pm_entity_id) in self._del_pm_me:
+ self._del_pm_me.remove((pm_class_id, pm_entity_id))
+
+ def delete_pm_me(self, class_id, entity_id):
+ """
+ Remove a new Performance Monitoring ME.
+
+ The ME ID will be added to an internal list and will be removed the next
+ time the idle state is reached. An 'delete_pm_me' trigger will be raised in
+ case already in the Idle state.
+
+ :param class_id: (int) ME Class ID (1..0xFFFE)
+ :param entity_id: (int) Instance ID (1..0xFFFE)
+ """
+ if not isinstance(class_id, int):
+ raise TypeError('PM ME Class ID is an integer')
+ if not 0 < class_id < 0xFFFF:
+ raise ValueError('PM ME Class ID must be 1..65534')
+
+ # Check to see if ONU supports this ME
+ if not self._me_is_supported(class_id):
+ self.log.warn('unsupported-PM-me', class_id=class_id)
+ return
+
+ key = (class_id, entity_id)
+
+ if key in self._pm_me_collect_retries and key not in self._del_pm_me:
+ self._del_pm_me.add(key)
+
+ if self._delete_me_deferred is None:
+ self._delete_me_deferred = reactor.callLater(0, self.delete_me)
+
+ self._add_pm_me.pop(key)
+
+ def on_enter_disabled(self):
+ """
+ State machine is being stopped
+ """
+ self._cancel_deferred()
+ self._cancel_tasks()
+ self._next_interval = None
+
+ # Drop OMCI ME Response subscriptions
+ for event, sub in self._omci_cc_subscriptions.iteritems():
+ if sub is not None:
+ self._omci_cc_subscriptions[event] = None
+ self._device.omci_cc.event_bus.unsubscribe(sub)
+
+ # Manually remove ani ANI/PON and UNI PM interval MEs
+ config = self._device.configuration
+
+ for pon in config.ani_g_entities:
+ entity_id = pon['entity-id']
+ self.delete_pm_me(FecPerformanceMonitoringHistoryData.class_id, entity_id)
+ self.delete_pm_me(FecPerformanceMonitoringHistoryData.class_id, entity_id)
+ self.delete_pm_me(XgPonTcPerformanceMonitoringHistoryData.class_id, entity_id)
+ self.delete_pm_me(XgPonDownstreamPerformanceMonitoringHistoryData.class_id, entity_id)
+ self.delete_pm_me(XgPonUpstreamPerformanceMonitoringHistoryData.class_id, entity_id)
+
+ for uni in config.uni_g_entities:
+ entity_id = uni['entity-id']
+ self.delete_pm_me(EthernetPMMonitoringHistoryData.class_id, entity_id)
+
+ def on_enter_starting(self):
+ """ Add the PON/ANI and UNI PM intervals"""
+ self._device = self._agent.get_device(self._device_id)
+ self._cancel_deferred()
+
+ # Set up OMCI ME Response subscriptions
+ try:
+ for event, sub in self._omci_cc_sub_mapping.iteritems():
+ if self._omci_cc_subscriptions[event] is None:
+ self._omci_cc_subscriptions[event] = \
+ self._device.omci_cc.event_bus.subscribe(
+ topic=OMCI_CC.event_bus_topic(self._device_id, event),
+ callback=sub)
+
+ except Exception as e:
+ self.log.exception('omci-cc-subscription-setup', e=e)
+
+ try:
+ # Manually start some ANI/PON and UNI PM interval MEs
+ config = self._device.configuration
+ anis = config.ani_g_entities
+ unis = config.uni_g_entities
+
+ if anis is not None:
+ for entity_id in anis.iterkeys():
+ self.add_pm_me(FecPerformanceMonitoringHistoryData.class_id,
+ entity_id)
+ self.add_pm_me(FecPerformanceMonitoringHistoryData.class_id
+ , entity_id)
+ self.add_pm_me(XgPonTcPerformanceMonitoringHistoryData.class_id,
+ entity_id)
+ self.add_pm_me(XgPonDownstreamPerformanceMonitoringHistoryData.class_id,
+ entity_id)
+ self.add_pm_me(XgPonUpstreamPerformanceMonitoringHistoryData.class_id,
+ entity_id)
+
+ if unis is not None:
+ for entity_id in config.uni_g_entities.iterkeys():
+ self.add_pm_me(EthernetPMMonitoringHistoryData.class_id, entity_id)
+
+ # Look for existing instances of dynamically created ME's that have PM
+ # associated with them and add them now
+ for class_id in self._me_watch_list.iterkeys():
+ instances = {k: v for k, v in
+ self._device.query_mib(class_id=class_id).items()
+ if isinstance(k, int)}
+
+ for entity_id, data in instances.items():
+ method = self._me_watch_list[class_id]['create-delete']
+ cid, eid = method(None, class_id, entity_id,
+ add=True, attributes=data[ATTRIBUTES_KEY])
+ if cid > 0:
+ # BP entity_id -> (PM class_id, PM entity_id)
+ instances = self._me_watch_list[class_id]['instances']
+ instances[entity_id] = (cid, eid)
+
+ except Exception as e:
+ self.log.exception('pm-me-setup', e=e)
+
+ # Got to synchronize_time state
+ self._deferred = reactor.callLater(0, self.tick)
+
+ def on_enter_synchronize_time(self):
+ """
+ State machine has just transitioned to the synchronize_time state
+ """
+ self._cancel_deferred()
+
+ def success(results):
+ self.log.debug('sync-time-success: {}'.format(results))
+ self._current_task = None
+ self._deferred = reactor.callLater(0, self.success)
+ # Calculate next interval time
+ self._next_interval = self.get_next_interval
+
+ def failure(reason):
+ self.log.info('sync-time-failure', reason=reason)
+ self._current_task = None
+ self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+
+ # Schedule a task to set the ONU time
+ self._current_task = self._sync_time_task(self._agent, self._device_id)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_idle(self):
+ """
+ State machine has just transitioned to the idle state
+
+ In this state, any added PM MEs that need to be created will be.
+ TODO: some non-interval PM stats (if there are any) are collected here
+ """
+ self._cancel_deferred()
+
+ if len(self._del_pm_me) and self._delete_me_deferred is None:
+ self._delete_me_deferred = reactor.callLater(0, self.delete_me)
+
+ elif len(self._add_pm_me) and self._add_me_deferred is None:
+ self._add_me_deferred = reactor.callLater(0, self.add_me)
+
+ else:
+ # TODO: Compute a better mechanism than just polling here, perhaps based on
+ # the next time to fetch data for 'any' interval
+ self._deferred = reactor.callLater(self._tick_delay, self.tick)
+
+ def on_enter_create_pm_me(self):
+ """
+ State machine has just transitioned to the create_pm_me state
+ """
+ self._cancel_deferred()
+ self._cancel_tasks()
+ mes, self._add_pm_me = self._add_pm_me, dict()
+
+ def success(results):
+ self.log.debug('create-me-success: {}'.format(results))
+
+ # Check if already here. The create request could have received
+ # an already-exists status code which we consider successful
+ for pm, me in mes.items():
+ self._pm_me_collect_retries[pm] = self.pm_collected(pm)
+
+ self._current_task = None
+ self._deferred = reactor.callLater(0, self.success)
+
+ def failure(reason):
+ self.log.info('create-me-failure', reason=reason)
+ self._current_task = None
+ for pm, me in mes.items():
+ self._add_pm_me[pm] = me
+
+ self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+
+ self._current_task = self._create_pm_task(self._agent, self._device_id, mes)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_delete_pm_me(self):
+ """
+ State machine has just transitioned to the delete_pm_me state
+ """
+ self._cancel_deferred()
+ self._cancel_tasks()
+
+ mes, self._del_pm_me = self._del_pm_me, set()
+
+ def success(results):
+ self.log.debug('delete-me-success: {}'.format(results))
+ self._current_task = None
+ for me in mes:
+ self._pm_me_collect_retries.pop(me)
+
+ self._deferred = reactor.callLater(0, self.success)
+
+ def failure(reason):
+ self.log.info('delete-me-failure', reason=reason)
+ self._current_task = None
+ for me in mes:
+ self._del_pm_me.add(me)
+
+ self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+
+ self._current_task = self._delete_pm_task(self._agent, self._device_id, mes)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+
+ def on_enter_collect_data(self):
+ """
+ State machine has just transitioned to the collect_data state
+ """
+ if self._next_interval is not None and self._next_interval > datetime.utcnow():
+ # Not ready for next interval, transition back to idle and we should get
+ # called again after a short delay
+ reactor.callLater(0, self.success)
+ return
+
+ self._cancel_deferred()
+ self._cancel_tasks()
+ keys = self._pm_me_collect_retries.keys()
+ shuffle(keys)
+
+ for key in keys:
+ class_id = key[0]
+ entity_id = key[1]
+
+ # Collect the data ?
+ if self._pm_me_collect_retries[key] > 0:
+ def success(results):
+ self.log.info('collect-success', results=results)
+ self._current_task = None
+ self._pm_me_collect_retries[key] = 0
+ self._deferred = reactor.callLater(0, self.success)
+ return results
+
+ def failure(reason):
+ self.log.info('collect-failure', reason=reason)
+ self._current_task = None
+ self._pm_me_collect_retries[key] -= 1
+ self._deferred = reactor.callLater(self._timeout_delay, self.failure)
+ return reason # Halt callback processing
+
+ # start the task
+ self.log.info('collect-task-start')
+ self._current_task = self._get_interval_task(self._agent, self._device_id,
+ class_id, entity_id)
+ self._task_deferred = self._device.task_runner.queue_task(self._current_task)
+ self._task_deferred.addCallbacks(success, failure)
+ self._task_deferred.addCallback(self.publish_data)
+ return
+
+ # Here if all intervals have been collected (we are up to date)
+ self._next_interval = self.get_next_interval
+ self.log.info('collect-calculate-next', next=self._next_interval)
+
+ self._pm_me_collect_retries = dict.fromkeys(self._pm_me_collect_retries, self._collect_attempts)
+ reactor.callLater(0, self.success)
+
+ def on_enter_threshold_exceeded(self):
+ """
+ State machine has just transitioned to the threshold_exceeded state
+ """
+ pass # TODO: Not sure if we want this state. Need to get alarm synchronizer working first
+
+ @property
+ def get_next_interval(self):
+ """
+ Determine the time for the next interval collection for all of this
+ ONUs PM Intervals. Earliest fetch time is at least 1 minute into the
+ next interval.
+
+ :return: (datetime) UTC time to get the next interval
+ """
+ now = datetime.utcnow()
+
+ # Get delta seconds to at least 1 minute into next interval
+ next_delta_secs = (16 - (now.minute % 15)) * 60
+ next_interval = now + timedelta(seconds=next_delta_secs)
+
+ # NOTE: For debugging, uncomment next section to perform collection
+ # right after initial code startup/mib-sync
+ # if self._next_interval is None:
+ # return now # Do it now (just for debugging purposes)
+
+ # Skew the next time up to the maximum specified
+ # TODO: May want to skew in a shorter range and select the minute
+ # based off some device property value to make collection a
+ # little more predictable on a per-ONU basis.
+ return next_interval + timedelta(seconds=uniform(0, self._interval_skew))
+
+ def pm_collected(self, key):
+ """
+ Query database and determine if PM data needs to be collected for this ME
+ """
+ class_id = key[0]
+ entity_id = key[1]
+
+ return self._collect_attempts # TODO: Implement persistent storage
+
+ def publish_data(self, results):
+ """
+ Publish the PM interval results on the appropriate bus. The results are
+ a dictionary with the following format.
+
+ 'class-id': (int) ME Class ID,
+ 'entity-id': (int) ME Entity ID,
+ 'me-name': (str) ME Class name, # Mostly for debugging...
+ 'interval-end-time': None,
+ 'interval-utc-time': (DateTime) UTC time when retrieved from ONU,
+
+ Counters added here as they are retrieved with the format of
+ 'counter-attribute-name': value (int)
+
+ :param results: (dict) PM results
+ """
+ self.log.debug('collect-publish', results=results)
+ pass # TODO: Publish it here
+ pass # TODO: Save off last time interval fetched to persistent storage
+
+ def on_mib_reset_response(self, _topic, msg):
+ """
+ Called upon receipt of a MIB Reset Response for this ONU
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.debug('on-mib-reset-response', state=self.state)
+ try:
+ response = msg[RX_RESPONSE_KEY]
+ omci_msg = response.fields['omci_message'].fields
+ status = omci_msg['success_code']
+
+ if status == RC.Success:
+ for class_id in self._me_watch_list.iterkeys():
+ # BP entity_id -> (PM class_id, PM entity_id)
+ instances = self._me_watch_list[class_id]['instances']
+ for _, me_pair in instances.items():
+ self._me_watch_list[class_id]['create-delete'](None, me_pair[0],
+ me_pair[1], add=False)
+ self._me_watch_list[class_id]['instances'] = dict()
+
+ except KeyError:
+ pass # NOP
+
+ def on_create_response(self, _topic, msg):
+ """
+ Called upon receipt of a Create Response for this ONU.
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.debug('on-create-response', state=self.state)
+
+ def valid_request(stat, c_id, e_id):
+ return self._omci_cc_subscriptions[RxEvent.Delete] is not None\
+ and stat in (RC.Success, RC.InstanceExists) \
+ and c_id in self._me_watch_list.keys() \
+ and e_id not in self._me_watch_list[c_id]['instances']
+
+ response = msg[RX_RESPONSE_KEY]
+ omci = response.fields['omci_message'].fields
+ class_id = omci['entity_class']
+ entity_id = omci['entity_id']
+ status = omci['success_code']
+
+ if valid_request(status, class_id, entity_id):
+ request = msg[TX_REQUEST_KEY]
+ method = self._me_watch_list[class_id]['create-delete']
+ cid, eid = method(request, class_id, entity_id, add=True)
+
+ if cid > 0:
+ # BP entity_id -> (PM class_id, PM entity_id)
+ instances = self._me_watch_list[class_id]['instances']
+ instances[entity_id] = (cid, eid)
+
+ def on_delete_response(self, _topic, msg):
+ """
+ Called upon receipt of a Delete Response for this ONU
+
+ :param _topic: (str) OMCI-RX topic
+ :param msg: (dict) Dictionary with 'rx-response' and 'tx-request' (if any)
+ """
+ self.log.debug('on-delete-response', state=self.state)
+
+ def valid_request(stat, cid, eid):
+ return self._omci_cc_subscriptions[RxEvent.Delete] is not None\
+ and stat in (RC.Success, RC.UnknownInstance) \
+ and cid in self._me_watch_list.keys() \
+ and eid in self._me_watch_list[cid]['instances']
+
+ response = msg[RX_RESPONSE_KEY]
+ omci = response.fields['omci_message'].fields
+ class_id = omci['entity_class']
+ entity_id = omci['entity_id']
+ status = omci['success_code']
+
+ if valid_request(status, class_id, entity_id):
+ request = msg[TX_REQUEST_KEY]
+ method = self._me_watch_list[class_id]['create-delete']
+
+ method(request, class_id, entity_id, add=False)
+ # BP entity_id -> (PM class_id, PM entity_id)
+ instances = self._me_watch_list[class_id]['instances']
+ del instances[entity_id]
+
+ def get_pm_entity_id_for_add(self, pm_cid, eid):
+ """
+ Select the Entity ID to use for a specific PM Class ID. For extended
+ PM ME's, an entity id (>0) is allocated
+
+ :param pm_cid: (int) PM ME Class ID to create/get entry ID for
+ :param eid: (int) Reference class's entity ID. Used as PM entity ID for non-
+ extended PM history PMs
+ :return: (int) Entity ID to use
+ """
+ if pm_cid in (EthernetFrameExtendedPerformanceMonitoring.class_id,
+ EthernetFrameExtendedPerformanceMonitoring64Bit.class_id):
+ return self._enet_entity_id.get_next()
+ return eid
+
+ def release_pm_entity_id(self, pm_cid, eid):
+ if pm_cid in (EthernetFrameExtendedPerformanceMonitoring.class_id,
+ EthernetFrameExtendedPerformanceMonitoring64Bit.class_id):
+ try:
+ self._enet_entity_id.release(eid)
+ except:
+ pass
+
+ def add_remove_enet_frame_pm(self, request, class_id, entity_id,
+ add=True,
+ attributes=None):
+ """
+ Add/remove PM for the dynamic MAC Port configuration data.
+
+ This can be called in a variety of ways:
+
+ o If from an Response event from OMCI_CC, the request will contain
+ the original create/delete request. The class_id and entity_id will
+ be the MAC Data Configuration Data class and instance ID.
+ add = True if create, False if delete
+
+ o If starting up (and the associated ME is already created), the MAC
+ Data Configuration Data class and instance ID, and attributes are
+ provided. request = None and add = True
+
+ o If cleaning up (stopping), the PM ME class_id, entity_id are provided.
+ request = None and add = False
+
+ :return: (int, int) PM ME class_id and entity_id for add/remove was performed.
+ class and entity IDs are non-zero on success
+ """
+ pm_entity_id = 0
+ cid = 0
+ eid = 0
+ upstream = False
+
+ def tp_type_to_pm(tp):
+ # TODO: Support 64-bit extended Monitoring MEs.
+ # This will result in the need to maintain entity IDs of PMs differently
+ upstream_types = [ # EthernetFrameExtendedPerformanceMonitoring64Bit.class_id,
+ EthernetFrameExtendedPerformanceMonitoring.class_id,
+ EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id], True
+ downstream_types = [ # EthernetFrameExtendedPerformanceMonitoring64Bit.class_id,
+ EthernetFrameExtendedPerformanceMonitoring.class_id,
+ EthernetFrameDownstreamPerformanceMonitoringHistoryData.class_id], False
+ return {
+ 1: downstream_types,
+ 3: upstream_types,
+ 5: downstream_types,
+ 6: downstream_types,
+ }.get(tp, None)
+
+ if request is not None:
+ assert class_id == MacBridgePortConfigurationData.class_id
+
+ # Is this associated with the ANI or the UNI side of the bridge?
+ # For VOLTHA v2.0, only high-speed internet data service is
+ attributes = request.fields['omci_message'].fields['data']
+ pm_class_ids, upstream = tp_type_to_pm(attributes['tp_type'])
+ cid = request.fields['omci_message'].fields['entity_class']
+ eid = request.fields['omci_message'].fields['entity_id']
+ if not add:
+ instances = self._me_watch_list[cid]['instances']
+ _, pm_entity_id = instances.get(eid, (None, None))
+
+ elif add:
+ assert class_id == MacBridgePortConfigurationData.class_id
+ assert isinstance(attributes, dict)
+
+ # Is this associated with the ANI or the UNI side of the bridge?
+ pm_class_ids, upstream = tp_type_to_pm(attributes.get('tp_type'))
+ cid = class_id
+ eid = entity_id
+
+ else:
+ assert class_id in (EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id,
+ EthernetFrameDownstreamPerformanceMonitoringHistoryData.class_id,
+ EthernetFrameExtendedPerformanceMonitoring.class_id,
+ EthernetFrameExtendedPerformanceMonitoring64Bit.class_id)
+ pm_class_ids = [class_id]
+
+ if pm_class_ids is None:
+ return False # Unable to select a supported ME for this ONU
+
+ if add:
+ for pm_class_id in pm_class_ids:
+ if self._me_is_supported(pm_class_id):
+ pm_entity_id = self.get_pm_entity_id_for_add(pm_class_id, eid)
+ self.add_pm_me(pm_class_id, pm_entity_id, cid=cid, eid=eid,
+ upstream=upstream)
+ return pm_class_id, pm_entity_id
+ else:
+ for pm_class_id in pm_class_ids:
+ if self._me_is_supported(pm_class_id):
+ self.delete_pm_me(pm_class_id, pm_entity_id)
+ self.release_pm_entity_id(pm_class_id, pm_entity_id)
+ return pm_class_id, pm_entity_id
+
+ return 0, 0
diff --git a/voltha/extensions/omci/tasks/interval_data_task.py b/voltha/extensions/omci/tasks/interval_data_task.py
new file mode 100644
index 0000000..cb02187
--- /dev/null
+++ b/voltha/extensions/omci/tasks/interval_data_task.py
@@ -0,0 +1,185 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from datetime import datetime
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
+from voltha.extensions.omci.omci_defs import ReasonCodes
+from voltha.extensions.omci.omci_frame import OmciFrame, OmciGet
+
+
+class IntervalDataTaskFailure(Exception):
+ pass
+
+
+class IntervalDataTask(Task):
+ """
+ OpenOMCI Performance Interval Get Request
+ """
+ task_priority = Task.DEFAULT_PRIORITY
+ name = "Interval Data Task"
+ max_payload = 29
+
+ def __init__(self, omci_agent, device_id, class_id, entity_id,
+ max_get_response_payload=max_payload):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param class_id: (int) ME Class ID
+ :param entity_id: (int) ME entity ID
+ :param max_get_response_payload: (int) Maximum number of octets in a
+ single GET response frame
+ """
+ super(IntervalDataTask, self).__init__(IntervalDataTask.name,
+ omci_agent,
+ device_id,
+ priority=IntervalDataTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+ self._class_id = class_id
+ self._entity_id = entity_id
+
+ me_map = self.omci_agent.get_device(self.device_id).me_map
+ if self._class_id not in me_map:
+ msg = "The requested ME Class () does not exist in the ONU's ME Map".format(self._class_id)
+ self.log.warn('unknown-pm-me', msg=msg)
+ raise IntervalDataTaskFailure(msg)
+
+ self._entity = me_map[self._class_id]
+ self._counter_attributes = self.get_counter_attributes_names_and_size()
+ self._max_payload = max_get_response_payload
+
+ def cancel_deferred(self):
+ super(IntervalDataTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start the tasks
+ """
+ super(IntervalDataTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_get_interval)
+
+ def stop(self):
+ """
+ Shutdown the tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(IntervalDataTask, self).stop()
+
+ def get_counter_attributes_names_and_size(self):
+ """
+ Get all of the counter attributes names and the amount of storage they take
+
+ :return: (dict) Attribute name -> length
+ """
+ return {name: self._entity.attributes[attr_index].field.sz
+ for name, attr_index in self._entity.attribute_name_to_index_map.items()
+ if self._entity.attributes[attr_index].is_counter}
+
+ @inlineCallbacks
+ def perform_get_interval(self):
+ """
+ Sync the time
+ """
+ self.log.info('perform-get-interval', class_id=self._class_id,
+ entity_id=self._entity_id)
+
+ device = self.omci_agent.get_device(self.device_id)
+ attr_names = self._counter_attributes.keys()
+
+ final_results = {
+ 'class-id': self._class_id,
+ 'entity-id': self._entity_id,
+ 'me-name': self._entity.__name__, # Mostly for debugging...
+ 'interval-utc-time': None,
+ # Counters added here as they are retrieved
+ }
+ last_end_time = None
+
+ while len(attr_names) > 0:
+ # Get as many attributes that will fit. Always include the 1 octet
+ # Interval End Time Attribute
+
+ remaining_payload = self._max_payload - 1
+ attributes = list()
+ for name in attr_names:
+ if self._counter_attributes[name] > remaining_payload:
+ break
+
+ attributes.append(name)
+ remaining_payload -= self._counter_attributes[name]
+
+ attr_names = attr_names[len(attributes):]
+ attributes.append('interval_end_time')
+
+ frame = OmciFrame(
+ transaction_id=None,
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=self._class_id,
+ entity_id=self._entity_id,
+ attributes_mask=self._entity.mask_for(*attributes)
+ )
+ )
+ try:
+ results = yield device.omci_cc.send(frame)
+
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+ end_time = omci_msg['data'].get('interval_end_time')
+
+ self.log.debug('interval-get-results', class_id=self._class_id,
+ entity_id=self._entity_id, status=status,
+ end_time=end_time)
+
+ if status != ReasonCodes.Success:
+ raise IntervalDataTaskFailure('Unexpected Response Status: {}'.
+ format(status))
+
+ if last_end_time is None:
+ last_end_time = end_time
+
+ elif end_time != last_end_time:
+ msg = 'Interval End Time Changed during retrieval from {} to {}'\
+ .format(last_end_time, end_time)
+ self.log.info('interval-roll-over', msg=msg)
+ raise IntervalDataTaskFailure(msg)
+
+ final_results['interval-utc-time'] = datetime.utcnow()
+ for attribute in attributes:
+ final_results[attribute] = omci_msg['data'].get(attribute)
+
+ except TimeoutError as e:
+ self.log.warn('interval-get-timeout', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('interval-get-failure', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ # Successful if here
+ self.deferred.callback(final_results)
diff --git a/voltha/extensions/omci/tasks/omci_create_pm_task.py b/voltha/extensions/omci/tasks/omci_create_pm_task.py
new file mode 100644
index 0000000..d2f41ec
--- /dev/null
+++ b/voltha/extensions/omci/tasks/omci_create_pm_task.py
@@ -0,0 +1,144 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, failure
+from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
+from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.omci_messages import OmciCreate
+
+RC = ReasonCodes
+OP = EntityOperations
+
+
+class CreatePMException(Exception):
+ pass
+
+
+class OmciCreatePMRequest(Task):
+ """
+ OpenOMCI routine to create the requested PM Interval MEs
+
+ TODO: Support of thresholding crossing alarms will be in a future VOLTHA release
+ """
+ task_priority = Task.DEFAULT_PRIORITY
+ name = "ONU OMCI Create PM ME Task"
+
+ def __init__(self, omci_agent, device_id, me_dict, exclusive=False):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param me_dict: (dict) (pm cid, pm eid) -> (me cid, me eid, upstream)
+ :param exclusive: (bool) True if this Create request Task exclusively own the
+ OMCI-CC while running. Default: False
+ """
+ super(OmciCreatePMRequest, self).__init__(OmciCreatePMRequest.name,
+ omci_agent,
+ device_id,
+ priority=OmciCreatePMRequest.task_priority,
+ exclusive=exclusive)
+ self._device = omci_agent.get_device(device_id)
+ self._me_dict = me_dict
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(OmciCreatePMRequest, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def stop_if_not_running(self):
+ if not self.running:
+ raise CreatePMException('Create PM ME Task was cancelled')
+
+ def start(self):
+ """ Start task """
+ super(OmciCreatePMRequest, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_create)
+
+ @inlineCallbacks
+ def perform_create(self):
+ """ Perform the create requests """
+ self.log.info('perform-create')
+
+ try:
+ for pm, me in self._me_dict.items():
+ pm_class_id = pm[0]
+ pm_entity_id = pm[1]
+ me_class_id = me[0]
+ me_entity_id = me[1]
+ upstream = me[2]
+
+ if me_class_id == 0:
+ # Typical/common PM interval format
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=pm_class_id,
+ entity_id=pm_entity_id,
+ data=dict()
+ )
+ )
+ else:
+ # Extended PM interval format
+ bitmap = 0 if upstream else 1 << 2
+
+ data = {'control_block': [
+ 0, # Threshold data 1/2 ID
+ me_class_id, # Parent ME Class
+ me_entity_id, # Parent ME Instance
+ 0, # Accumulation disable
+ 0, # TCA Disable
+ bitmap, # Control fields bitmap
+ 0, # TCI
+ 0 # Reserved
+
+ ]}
+ frame = OmciFrame(
+ transaction_id=None, # OMCI-CC will set
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=pm_class_id,
+ entity_id=pm_entity_id,
+ data=data
+ )
+ )
+ results = yield self._device.omci_cc.send(frame)
+ self.stop_if_not_running()
+
+ status = results.fields['omci_message'].fields['success_code']
+ self.log.debug('perform-create-status', status=status)
+
+ # Did it fail
+ if status != RC.Success.value and status != RC.InstanceExists.value:
+ msg = 'ME: {}, entity: {} failed with status {}'.format(pm_class_id,
+ pm_entity_id,
+ status)
+ raise CreatePMException(msg)
+
+ self.log.debug('create-pm-success', class_id=pm_class_id,
+ entity_id=pm_entity_id)
+ self.deferred.callback(self)
+
+ except Exception as e:
+ self.log.exception('perform-create', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/voltha/extensions/omci/tasks/omci_delete_pm_task.py b/voltha/extensions/omci/tasks/omci_delete_pm_task.py
new file mode 100644
index 0000000..c80a575
--- /dev/null
+++ b/voltha/extensions/omci/tasks/omci_delete_pm_task.py
@@ -0,0 +1,112 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, failure
+from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
+from voltha.extensions.omci.omci_frame import OmciFrame
+from voltha.extensions.omci.omci_messages import OmciDelete
+
+RC = ReasonCodes
+OP = EntityOperations
+
+
+class DeletePMException(Exception):
+ pass
+
+
+class OmciDeletePMRequest(Task):
+ """
+ OpenOMCI routine to delete the requested PM Interval MEs
+ """
+ task_priority = Task.DEFAULT_PRIORITY
+ name = "ONU OMCI Delete PM ME Task"
+
+ def __init__(self, omci_agent, device_id, me_set, exclusive=False):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param me_set: (set) Tuples of class_id / entity_id to create
+ :param exclusive: (bool) True if this Create request Task exclusively own the
+ OMCI-CC while running. Default: False
+ """
+ super(OmciDeletePMRequest, self).__init__(OmciDeletePMRequest.name,
+ omci_agent,
+ device_id,
+ priority=OmciDeletePMRequest.task_priority,
+ exclusive=exclusive)
+ self._device = omci_agent.get_device(device_id)
+ self._me_tuples = me_set
+ self._local_deferred = None
+
+ def cancel_deferred(self):
+ super(OmciDeletePMRequest, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def stop_if_not_running(self):
+ if not self.running:
+ raise DeletePMException('Delete PM ME Task was cancelled')
+
+ def start(self):
+ """ Start task """
+ super(OmciDeletePMRequest, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_delete)
+
+ @inlineCallbacks
+ def perform_delete(self):
+ """ Perform the delete requests """
+ self.log.info('perform-delete')
+
+ try:
+ for me in self._me_tuples:
+ class_id = me[0]
+ entity_id = me[1]
+
+ frame = OmciFrame(
+ transaction_id=None,
+ message_type=OmciDelete.message_id,
+ omci_message=OmciDelete(
+ entity_class=class_id,
+ entity_id=entity_id
+ )
+ )
+ results = yield self._device.omci_cc.send(frame)
+ self.stop_if_not_running()
+
+ status = results.fields['omci_message'].fields['success_code']
+ self.log.info('perform-delete-status', status=status)
+
+ # Did it fail, it instance does not exist, not an error
+ if status != RC.Success.value and status != RC.UnknownInstance.value:
+ msg = 'ME: {}, entity: {} failed with status {}'.format(class_id,
+ entity_id,
+ status)
+ raise DeletePMException(msg)
+
+ self.log.debug('delete-pm-success', class_id=class_id,
+ entity_id=entity_id)
+ self.deferred.callback(self)
+
+ except Exception as e:
+ self.log.exception('perform-create', e=e)
+ self.deferred.errback(failure.Failure(e))
diff --git a/voltha/extensions/omci/tasks/sync_time_task.py b/voltha/extensions/omci/tasks/sync_time_task.py
new file mode 100644
index 0000000..7686cb4
--- /dev/null
+++ b/voltha/extensions/omci/tasks/sync_time_task.py
@@ -0,0 +1,107 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from task import Task
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, TimeoutError, failure
+from voltha.extensions.omci.omci_me import OntGFrame
+from voltha.extensions.omci.omci_defs import ReasonCodes as RC
+from datetime import datetime
+
+
+class SyncTimeTask(Task):
+ """
+ OpenOMCI - Synchronize the ONU time with server
+ """
+ task_priority = Task.DEFAULT_PRIORITY + 10
+ name = "Sync Time Task"
+
+ def __init__(self, omci_agent, device_id, use_utc=True):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param use_utc: (bool) Use UTC time if True, otherwise local time
+ """
+ super(SyncTimeTask, self).__init__(SyncTimeTask.name,
+ omci_agent,
+ device_id,
+ priority=SyncTimeTask.task_priority,
+ exclusive=False)
+ self._local_deferred = None
+ self._use_utc = use_utc
+
+ def cancel_deferred(self):
+ super(SyncTimeTask, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def start(self):
+ """
+ Start the tasks
+ """
+ super(SyncTimeTask, self).start()
+ self._local_deferred = reactor.callLater(0, self.perform_sync_time)
+
+ def stop(self):
+ """
+ Shutdown the tasks
+ """
+ self.log.debug('stopping')
+
+ self.cancel_deferred()
+ super(SyncTimeTask, self).stop()
+
+ @inlineCallbacks
+ def perform_sync_time(self):
+ """
+ Sync the time
+ """
+ self.log.info('perform-sync-time')
+
+ try:
+ device = self.omci_agent.get_device(self.device_id)
+
+ #########################################
+ # ONT-G (ME #256)
+ dt = datetime.utcnow() if self._use_utc else datetime.now()
+
+ results = yield device.omci_cc.send(OntGFrame().synchronize_time(dt))
+
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+ self.log.debug('sync-time', status=status)
+
+ if status == RC.Success:
+ self.log.info('sync-time', success_info=omci_msg['success_info'] & 0x0f)
+
+ assert status == RC.Success, 'Unexpected Response Status: {}'.format(status)
+
+ # Successful if here
+ self.deferred.callback(results)
+
+ except TimeoutError as e:
+ self.log.warn('sync-time-timeout', e=e)
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('sync-time', e=e)
+ self.deferred.errback(failure.Failure(e))