SEBA-596 #comments Implemented periodic test action for ONT
Change-Id: I4e913b90b3f770eaf9421888f671bd2adc9ec1e3
diff --git a/VERSION b/VERSION
index 6cec60f..7e541ae 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.2.2-dev0
+2.2.2
\ No newline at end of file
diff --git a/pyvoltha/adapters/extensions/omci/me_frame.py b/pyvoltha/adapters/extensions/omci/me_frame.py
index 4f451ee..06ae2bb 100644
--- a/pyvoltha/adapters/extensions/omci/me_frame.py
+++ b/pyvoltha/adapters/extensions/omci/me_frame.py
@@ -475,4 +475,21 @@
entity_class=getattr(self.entity_class, 'class_id'),
entity_id=getattr(self, 'entity_id'),
))
+
+ def test(self):
+ """
+ Create a test request frame for this ME
+ :return: (OmciFrame) OMCI Frame
+ """
+ self._check_operation(OP.Test)
+
+ return OmciFrame(
+ transaction_id=None,
+ message_type=OmciTest.message_id,
+ omci_message=OmciTest(
+ entity_class=getattr(self.entity_class, 'class_id'),
+ entity_id=getattr(self, 'entity_id'),
+ self_test=0x07
+ ))
+
diff --git a/pyvoltha/adapters/extensions/omci/omci_cc.py b/pyvoltha/adapters/extensions/omci/omci_cc.py
index 829274b..4b695a5 100644
--- a/pyvoltha/adapters/extensions/omci/omci_cc.py
+++ b/pyvoltha/adapters/extensions/omci/omci_cc.py
@@ -351,8 +351,11 @@
omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
rx_tid = rx_frame.fields['transaction_id']
+ msg_type = rx_frame.fields['message_type']
self.log.debug('Received message for rx_tid', rx_tid = rx_tid)
- if rx_tid == 0:
+ # Filter the Test Result frame and route through receive onu
+ # message method.
+ if rx_tid == 0 or msg_type == EntityOperations.TestResult.value:
self.log.debug('Receive ONU message', rx_tid=0)
return self._receive_onu_message(rx_frame)
diff --git a/pyvoltha/adapters/extensions/omci/omci_frame.py b/pyvoltha/adapters/extensions/omci/omci_frame.py
index a8cb733..d9a957f 100644
--- a/pyvoltha/adapters/extensions/omci/omci_frame.py
+++ b/pyvoltha/adapters/extensions/omci/omci_frame.py
@@ -31,7 +31,7 @@
OmciDownloadSection, OmciDownloadSectionLast, OmciDownloadSectionResponse, \
OmciEndSoftwareDownload, OmciEndSoftwareDownloadResponse, \
OmciActivateImage, OmciActivateImageResponse, \
- OmciCommitImage, OmciCommitImageResponse, OmciCreateResponse
+ OmciCommitImage, OmciCommitImageResponse, OmciCreateResponse, OmciTestResponse, OmciTest
class OmciFrame(Packet):
@@ -109,10 +109,6 @@
PacketField("omci_message", None, OmciAttributeValueChange), align=36),
lambda pkt: pkt.message_type == OmciAttributeValueChange.message_id),
ConditionalField(FixedLenField(
- PacketField("omci_message", None, OmciTestResult), align=36),
- lambda pkt: pkt.message_type == OmciTestResult.message_id),
-
- ConditionalField(FixedLenField(
PacketField("omci_message", None, OmciReboot), align=36),
lambda pkt: pkt.message_type == OmciReboot.message_id),
ConditionalField(FixedLenField(
@@ -175,6 +171,16 @@
ConditionalField(FixedLenField(
PacketField("omci_message", None, OmciCommitImageResponse), align=36),
lambda pkt: pkt.message_type == OmciCommitImageResponse.message_id),
+ # Create Frame for Omci Test.
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciTest), align=36),
+ lambda pkt: pkt.message_type == OmciTest.message_id),
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciTestResponse), align=36),
+ lambda pkt: pkt.message_type == OmciTestResponse.message_id),
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciTestResult), align=36),
+ lambda pkt: pkt.message_type == OmciTestResult.message_id),
# TODO add entries for remaining OMCI message types
diff --git a/pyvoltha/adapters/extensions/omci/omci_messages.py b/pyvoltha/adapters/extensions/omci/omci_messages.py
index 018edf6..c8526da 100644
--- a/pyvoltha/adapters/extensions/omci/omci_messages.py
+++ b/pyvoltha/adapters/extensions/omci/omci_messages.py
@@ -336,18 +336,6 @@
OmciMaskedData("data")
]
-
-class OmciTestResult(OmciMessage):
- name = "TestResult"
- message_id = 0x1B
- fields_desc = [
- ShortField("entity_class", None),
- ShortField("entity_id", 0)
- # ME Test specific message contents starts here
- # TODO: Can this be coded easily with scapy?
- ]
-
-
class OmciReboot(OmciMessage):
name = "OmciOnuReboot"
message_id = 0x59
@@ -555,4 +543,36 @@
ShortField("entity_id", None),
ByteField("result", 0) # Activate image unconditionally
]
+
+class OmciTest(OmciMessage):
+ name = "OmciTest"
+ message_id = 0x52
+ fields_desc = [
+ ShortField("entity_class", None),
+ ShortField("entity_id", 0),
+ ShortField('self_test', 0x07)
+ ]
+
+
+class OmciTestResponse(OmciMessage):
+ name = "OmciTesResponse"
+ message_id = 0x32
+ fields_desc = [
+ ShortField("entity_class", None),
+ ShortField("entity_id", 0),
+ ByteField("success_code", None)
+ ]
+
+class OmciTestResult(OmciMessage):
+ name = "TestResult"
+ message_id = 0x1B
+ fields_desc = [
+ ShortField("entity_class", None),
+ ShortField("entity_id", 0),
+ ShortField("power_feed_voltage", 1),
+ ShortField('received_optical_power', 3),
+ ShortField('mean_optical_launch_power', 5),
+ ShortField('laser_bias_current', 9),
+ ShortField('temperature', 12)
+ ]
diff --git a/pyvoltha/adapters/extensions/omci/tasks/omci_test_request.py b/pyvoltha/adapters/extensions/omci/tasks/omci_test_request.py
new file mode 100644
index 0000000..4ee8502
--- /dev/null
+++ b/pyvoltha/adapters/extensions/omci/tasks/omci_test_request.py
@@ -0,0 +1,236 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import arrow
+from task import Task
+from twisted.internet.task import LoopingCall
+from twisted.internet.defer import failure, inlineCallbacks, TimeoutError, \
+ returnValue
+from pyvoltha.adapters.extensions.omci.omci_defs import ReasonCodes, \
+ EntityOperations
+from pyvoltha.adapters.extensions.omci.omci_me import MEFrame
+from pyvoltha.common.event_bus import EventBusClient
+from voltha_protos.events_pb2 import KpiEvent2, KpiEventType, KpiEvent
+from voltha_protos.events_pb2 import MetricInformation, MetricMetaData
+from voltha_protos.events_pb2 import Event, EventType, EventCategory, \
+ EventSubCategory, EventHeader
+from voltha_protos.events_pb2 import Event
+
+RC = ReasonCodes
+OP = EntityOperations
+
+
+class TestFailure(Exception):
+ pass
+
+
+class OmciTestRequest(Task):
+ """
+ OpenOMCI Test an OMCI ME Instance Attributes
+
+ Upon completion, the Task deferred callback is invoked with a reference of
+ this Task object.
+
+ """
+ task_priority = 128
+ name = "ONU OMCI Test Task"
+ MAX_TABLE_SIZE = 16 * 1024 # Keep get-next logic reasonable
+ OPTICAL_GROUP_NAME = 'PON_Optical'
+ DEFAULT_COLLECTION_FREQUENCY = 600 * 10 # 10 minutes
+ DEFAULT_FREQUENCY_KEY = 'default-collection-frequency'
+
+ def __init__(self, core_proxy, omci_agent, device_id, entity_class,
+ serial_number,
+ logical_device_id,
+ exclusive=True, allow_failure=False, **kwargs):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param entity_class: (EntityClass) ME Class to retrieve
+ :param entity_id: (int) ME Class instance ID to retrieve
+ :param attributes: (list or set) Name of attributes to retrieve
+ :param exclusive: (bool) True if this GET request Task exclusively own the
+ OMCI-CC while running. Default: True
+ :param allow_failure: (bool) If true, attempt to get all valid attributes
+ if the original request receives an error
+ code of 9 (Attributes failed or unknown).
+ """
+ super(OmciTestRequest, self).__init__(OmciTestRequest.name,
+ omci_agent,
+ device_id,
+ priority=OmciTestRequest.task_priority,
+ exclusive=exclusive)
+ self._device = omci_agent.get_device(device_id)
+ self._entity_class = entity_class
+ self._allow_failure = allow_failure
+ self._failed_or_unknown_attributes = set()
+ self._results = None
+ self._local_deferred = None
+ self.device_id = device_id
+ self.event_bus = EventBusClient()
+ self.lc = None
+ self.default_freq = \
+ kwargs.get(OmciTestRequest.DEFAULT_FREQUENCY_KEY,
+ OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY)
+ self.serial_number = serial_number
+ self.logical_device_id = logical_device_id
+ self.core_proxy = core_proxy
+ topic = 'omci-rx:{}:{}'.format(self.device_id, 'Test_Result')
+ self.msg = self.event_bus.subscribe(topic, self.process_messages)
+
+ def cancel_deferred(self):
+ """
+
+ :return: None
+ """
+ super(OmciTestRequest, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ @property
+ def me_class(self):
+ """The OMCI Managed Entity Class associated with this request"""
+ return self._entity_class
+
+ @property
+ def entity_id(self):
+ """The ME Entity ID associated with this request"""
+ return self._entity_id
+
+ @property
+ def success_code(self):
+ """
+ Return the OMCI success/reason code for the Get Response.
+ """
+ if self._results is None:
+ return None
+ return self._results.fields['omci_message'].fields['success']
+
+ def start_collector(self, callback=None):
+ """
+ Start the collection loop for an adapter if the frequency > 0
+
+ :param callback: (callable) Function to call to collect PM data
+ """
+ self.log.info("starting-pm-collection", device_name=self.name)
+ if callback is None:
+ callback = self.perform_test_omci
+
+ if self.lc is None:
+ self.lc = LoopingCall(callback)
+
+ if self.default_freq > 0:
+ self.lc.start(interval=self.default_freq / 10)
+
+ def format_id(self, event):
+ return 'voltha.{}.{}.{}'.format(self.core_proxy.listening_topic,
+ self.device_id, event)
+
+ def get_event_header(self, _type, category, sub_category, event, raised_ts):
+ """
+
+ :return: (dict) Event header
+ """
+ return EventHeader(id=self.format_id(event),
+ category=category,
+ sub_category=sub_category,
+ type=_type,
+ type_version="0.1",
+ raised_ts=raised_ts,
+ reported_ts=arrow.utcnow().timestamp
+ )
+
+ def publish_metrics(self, data, event_name, onu_device_id):
+ """
+
+ :param data: actual test result dict
+ :param event_name: Test_result
+ :param onu_device_id: Onu device id
+ :return: None
+ """
+ metric_data = MetricInformation(
+ metadata=MetricMetaData(title=OmciTestRequest.OPTICAL_GROUP_NAME,
+ ts=arrow.utcnow().float_timestamp,
+ logical_device_id=self.logical_device_id,
+ serial_no=self.serial_number,
+ device_id=onu_device_id,
+ context={
+ 'events': event_name
+ }),
+ metrics=data)
+ self.log.info('Publish-Test-Result')
+ event_header = self.get_event_header(EventType.KPI_EVENT2,
+ EventCategory.EQUIPMENT,
+ EventSubCategory.ONU, "KPI_EVENT",
+ arrow.utcnow().timestamp)
+ kpi_event = KpiEvent2(
+ type=KpiEventType.slice,
+ ts=arrow.utcnow().float_timestamp,
+ slice_data=[metric_data])
+ event = Event(header=event_header, kpi_event2=kpi_event)
+ self.core_proxy.submit_event(event)
+
+ def process_messages(self, topic, msg):
+ """
+
+ :param topic: topic name of onu.
+ :param msg: actual test result dict
+ :return: None
+ """
+ result_frame = {}
+ event_name = topic.split(':')[-1]
+ onu_device_id = topic.split(':')[-2]
+ frame = msg['rx-response']
+ for key, value in (frame.fields['omci_message'].fields).iteritems():
+ result_frame[key] = long(value)
+ self.publish_metrics(result_frame, event_name, onu_device_id)
+
+ @inlineCallbacks
+ def perform_test_omci(self):
+ """
+ Perform the initial test request
+ """
+ ani_g_entities = self._device.configuration.ani_g_entities
+ ani_g_entities_ids = ani_g_entities.keys() if ani_g_entities \
+ is not None else None
+ self._entity_id = ani_g_entities_ids[0]
+ self.log.info('perform-test', entity_class=self._entity_class,
+ entity_id=self._entity_id)
+ try:
+ frame = MEFrame(self._entity_class, self._entity_id, []).test()
+ result = yield self._device.omci_cc.send(frame)
+ if not result.fields['omci_message'].fields['success_code']:
+ self.log.info('Self-Test Submitted Successfully',
+ code=result.fields[
+ 'omci_message'].fields['success_code'])
+ else:
+ raise TestFailure('Test Failure: {}'.format(
+ result.fields['omci_message'].fields['success_code']))
+ except TimeoutError as e:
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('perform-test-Error', e=e,
+ class_id=self._entity_class,
+ entity_id=self._entity_id)
+ self.deferred.errback(failure.Failure(e))