SEBA-595 #comment 1.Implemented Omci Test Action 2.Added test frame to me_frame 3.Pushed Result to kafka bus 4.Added Omci message object for OMCITEST, OMCITESTRESPONSE and OMCITESTRESULT 5. Removed comments from TestResult field of Omci_Message. 6. Added new state for omci test capabilities.
Change-Id: Ib00604983f222a47930b5ebd763e3559bc7eda18
SEBA-595 #comment 1.Removed test state machine flow files. 2.Added PM flow from brcm_handler~
Change-Id: Ib00604983f222a47930b5ebd763e3559bc7eda18
SEBA-595 removed spaces and removed unwanted import
Change-Id: Ib00604983f222a47930b5ebd763e3559bc7eda18
SEBA-595 minor change
Change-Id: Ib00604983f222a47930b5ebd763e3559bc7eda18
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py
index 8df0b09..09345ea 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py
@@ -78,9 +78,7 @@
self.broadcom_omci['mib-synchronizer']['state-machine'] = BrcmMibSynchronizer
self.broadcom_omci['omci-capabilities']['tasks']['get-capabilities'] = BrcmCapabilitiesTask
-
# Defer creation of omci agent to a lazy init that allows subclasses to override support classes
-
# register for adapter messages
self.adapter_agent.register_for_inter_adapter_messages()
@@ -342,4 +340,3 @@
def unsuppress_alarm(self, filter):
raise NotImplementedError()
-
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index b87d638..2e66817 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -55,6 +55,8 @@
from voltha.adapters.brcm_openomci_onu.uni_port import *
from voltha.adapters.brcm_openomci_onu.onu_traffic_descriptor import *
from common.tech_profile.tech_profile import TechProfile
+from voltha.extensions.omci.tasks.omci_test_request import OmciTestRequest
+from voltha.extensions.omci.omci_entities import AniG
OP = EntityOperations
RC = ReasonCodes
@@ -250,6 +252,20 @@
# Start collecting stats from the device after a brief pause
reactor.callLater(10, self.pm_metrics.start_collector)
+
+ # Code to Run OMCI Test Action
+
+ kwargs_omci_test_action = {
+ OmciTestRequest.DEFAULT_FREQUENCY_KEY:
+ OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY
+ }
+ device = self.adapter_agent.get_device(self.device_id)
+ serial_number = device.serial_number
+ test_request = OmciTestRequest(
+ self.omci_agent, self.device_id, AniG, serial_number,
+ self.logical_device_id, exclusive=False,
+ **kwargs_omci_test_action)
+ reactor.callLater(10, test_request.start_collector)
self.enabled = True
else:
self.log.info('onu-already-activated')
diff --git a/voltha/extensions/omci/me_frame.py b/voltha/extensions/omci/me_frame.py
index b6a4133..0350eb6 100644
--- a/voltha/extensions/omci/me_frame.py
+++ b/voltha/extensions/omci/me_frame.py
@@ -476,4 +476,19 @@
entity_class=getattr(self.entity_class, 'class_id'),
entity_id=getattr(self, 'entity_id'),
))
-
+
+ def test(self):
+ """
+ Create a test request frame for this ME
+ :return: (OmciFrame) OMCI Frame
+ """
+ self._check_operation(OP.Test)
+
+ return OmciFrame(
+ transaction_id=None,
+ message_type=OmciTest.message_id,
+ omci_message=OmciTest(
+ entity_class=getattr(self.entity_class, 'class_id'),
+ entity_id=getattr(self, 'entity_id'),
+ self_test=0x07
+ ))
diff --git a/voltha/extensions/omci/omci_cc.py b/voltha/extensions/omci/omci_cc.py
index 7d0874c..8ee1a9c 100644
--- a/voltha/extensions/omci/omci_cc.py
+++ b/voltha/extensions/omci/omci_cc.py
@@ -356,7 +356,10 @@
omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
rx_tid = rx_frame.fields['transaction_id']
- if rx_tid == 0:
+ msg_type = rx_frame.fields['message_type']
+ # Filter the Test Result frame and route through receive onu
+ # message method.
+ if rx_tid == 0 or msg_type == EntityOperations.TestResult.value:
return self._receive_onu_message(rx_frame)
# Previously unreachable if this is the very first round-trip Rx or we
diff --git a/voltha/extensions/omci/omci_frame.py b/voltha/extensions/omci/omci_frame.py
index c0d7d4a..d684003 100644
--- a/voltha/extensions/omci/omci_frame.py
+++ b/voltha/extensions/omci/omci_frame.py
@@ -31,7 +31,7 @@
OmciDownloadSection, OmciDownloadSectionLast, OmciDownloadSectionResponse, \
OmciEndSoftwareDownload, OmciEndSoftwareDownloadResponse, \
OmciActivateImage, OmciActivateImageResponse, \
- OmciCommitImage, OmciCommitImageResponse
+ OmciCommitImage, OmciCommitImageResponse, OmciTest, OmciTestResponse
from voltha.extensions.omci.omci_messages import OmciCreateResponse
@@ -111,10 +111,6 @@
PacketField("omci_message", None, OmciAttributeValueChange), align=36),
lambda pkt: pkt.message_type == OmciAttributeValueChange.message_id),
ConditionalField(FixedLenField(
- PacketField("omci_message", None, OmciTestResult), align=36),
- lambda pkt: pkt.message_type == OmciTestResult.message_id),
-
- ConditionalField(FixedLenField(
PacketField("omci_message", None, OmciReboot), align=36),
lambda pkt: pkt.message_type == OmciReboot.message_id),
ConditionalField(FixedLenField(
@@ -177,6 +173,16 @@
ConditionalField(FixedLenField(
PacketField("omci_message", None, OmciCommitImageResponse), align=36),
lambda pkt: pkt.message_type == OmciCommitImageResponse.message_id),
+ # Create Frame for Omci Test.
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciTest), align=36),
+ lambda pkt: pkt.message_type == OmciTest.message_id),
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciTestResponse), align=36),
+ lambda pkt: pkt.message_type == OmciTestResponse.message_id),
+ ConditionalField(FixedLenField(
+ PacketField("omci_message", None, OmciTestResult), align=36),
+ lambda pkt: pkt.message_type == OmciTestResult.message_id),
# TODO add entries for remaining OMCI message types
diff --git a/voltha/extensions/omci/omci_messages.py b/voltha/extensions/omci/omci_messages.py
index 70a5704..9299c08 100644
--- a/voltha/extensions/omci/omci_messages.py
+++ b/voltha/extensions/omci/omci_messages.py
@@ -338,17 +338,6 @@
]
-class OmciTestResult(OmciMessage):
- name = "TestResult"
- message_id = 0x1B
- fields_desc = [
- ShortField("entity_class", None),
- ShortField("entity_id", 0)
- # ME Test specific message contents starts here
- # TODO: Can this be coded easily with scapy?
- ]
-
-
class OmciReboot(OmciMessage):
name = "OmciOnuReboot"
message_id = 0x59
@@ -567,3 +556,35 @@
ShortField("entity_id", None),
ByteField("result", 0) # Activate image unconditionally
]
+
+class OmciTest(OmciMessage):
+ name = "OmciTest"
+ message_id = 0x52
+ fields_desc = [
+ ShortField("entity_class", None),
+ ShortField("entity_id", 0),
+ ShortField('self_test', 0x07)
+ ]
+
+
+class OmciTestResponse(OmciMessage):
+ name = "OmciTesResponse"
+ message_id = 0x32
+ fields_desc = [
+ ShortField("entity_class", None),
+ ShortField("entity_id", 0),
+ ByteField("success_code", None)
+ ]
+
+class OmciTestResult(OmciMessage):
+ name = "TestResult"
+ message_id = 0x1B
+ fields_desc = [
+ ShortField("entity_class", None),
+ ShortField("entity_id", 0),
+ ShortField("power_feed_voltage", 1),
+ ShortField('received_optical_power', 3),
+ ShortField('mean_optical_launch_power', 5),
+ ShortField('laser_bias_current', 9),
+ ShortField('temperature', 12)
+ ]
diff --git a/voltha/extensions/omci/openomci_agent.py b/voltha/extensions/omci/openomci_agent.py
index b47fbab..752d7ca 100644
--- a/voltha/extensions/omci/openomci_agent.py
+++ b/voltha/extensions/omci/openomci_agent.py
@@ -55,7 +55,7 @@
'state-machine': OnuOmciCapabilities, # Implements OMCI capabilities state machine
'advertise-events': False, # Advertise events on OpenOMCI event bus
'tasks': {
- 'get-capabilities': OnuCapabilitiesTask # Get supported ME and Commands
+ 'get-capabilities': OnuCapabilitiesTask # Get supported ME and Commands
}
},
'performance-intervals': {
diff --git a/voltha/extensions/omci/tasks/omci_test_request.py b/voltha/extensions/omci/tasks/omci_test_request.py
new file mode 100644
index 0000000..cfe250c
--- /dev/null
+++ b/voltha/extensions/omci/tasks/omci_test_request.py
@@ -0,0 +1,225 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import arrow
+from task import Task
+from twisted.internet.task import LoopingCall
+from twisted.internet.defer import failure, inlineCallbacks, TimeoutError
+from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
+from voltha.extensions.omci.omci_me import MEFrame
+from common.event_bus import EventBusClient
+from voltha.protos.events_pb2 import KpiEvent2, KpiEventType, KpiEvent
+from voltha.protos.events_pb2 import MetricInformation, MetricMetaData
+
+RC = ReasonCodes
+OP = EntityOperations
+
+
+class TestFailure(Exception):
+ pass
+
+
+class OmciTestRequest(Task):
+ """
+ OpenOMCI Test an OMCI ME Instance Attributes
+
+ Upon completion, the Task deferred callback is invoked with a reference of
+ this Task object.
+
+ """
+ task_priority = 128
+ name = "ONU OMCI Test Task"
+ MAX_TABLE_SIZE = 16 * 1024 # Keep get-next logic reasonable
+ OPTICAL_GROUP_NAME = 'PON_Optical'
+ DEFAULT_COLLECTION_FREQUENCY = 60 * 10 # 1 minute
+ DEFAULT_FREQUENCY_KEY = 'default-collection-frequency'
+
+ def __init__(self, omci_agent, device_id, entity_class, serial_number,
+ logical_device_id,
+ exclusive=True, allow_failure=False, **kwargs):
+ """
+ Class initialization
+
+ :param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
+ :param device_id: (str) ONU Device ID
+ :param entity_class: (EntityClass) ME Class to retrieve
+ :param entity_id: (int) ME Class instance ID to retrieve
+ :param attributes: (list or set) Name of attributes to retrieve
+ :param exclusive: (bool) True if this GET request Task exclusively own the
+ OMCI-CC while running. Default: True
+ :param allow_failure: (bool) If true, attempt to get all valid attributes
+ if the original request receives an error
+ code of 9 (Attributes failed or unknown).
+ """
+ super(OmciTestRequest, self).__init__(OmciTestRequest.name,
+ omci_agent,
+ device_id,
+ priority=OmciTestRequest.task_priority,
+ exclusive=exclusive)
+ self._device = omci_agent.get_device(device_id)
+ self._entity_class = entity_class
+ self._allow_failure = allow_failure
+ self._failed_or_unknown_attributes = set()
+ self._results = None
+ self._local_deferred = None
+ self.device_id = device_id
+ self.event_bus = EventBusClient()
+ self.lc = None
+ self.default_freq = self.default_freq = \
+ kwargs.get(OmciTestRequest.DEFAULT_FREQUENCY_KEY,
+ OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY)
+ self.serial_number = serial_number
+ self.logical_device_id = logical_device_id
+
+ def cancel_deferred(self):
+ """
+
+ :return: None
+ """
+ super(OmciTestRequest, self).cancel_deferred()
+
+ d, self._local_deferred = self._local_deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ @property
+ def me_class(self):
+ """The OMCI Managed Entity Class associated with this request"""
+ return self._entity_class
+
+ @property
+ def entity_id(self):
+ """The ME Entity ID associated with this request"""
+ return self._entity_id
+
+ @property
+ def success_code(self):
+ """
+ Return the OMCI success/reason code for the Get Response.
+ """
+ if self._results is None:
+ return None
+ return self._results.fields['omci_message'].fields['success']
+
+ def start_collector(self, callback=None):
+ """
+ Start the collection loop for an adapter if the frequency > 0
+
+ :param callback: (callable) Function to call to collect PM data
+ """
+ self.log.info("starting-pm-collection", device_name=self.name)
+ if callback is None:
+ callback = self.perform_test_omci
+
+ if self.lc is None:
+ self.lc = LoopingCall(callback)
+
+ if self.default_freq > 0:
+ self.lc.start(interval=self.default_freq / 10)
+
+ def submit_kpis(self, kpi_event):
+ """
+
+ :param kpi_event: List of dict.actual event information.
+ :return: None
+ """
+ try:
+ assert isinstance(kpi_event, (KpiEvent, KpiEvent2))
+ self.event_bus.publish('kpis', kpi_event)
+ except Exception as e:
+ self.log.exception('failed-kpi-submission',
+ type=type(kpi_event))
+
+ def publish_metrics(self, data, event_name, onu_device_id):
+ """
+
+ :param data: actual test result dict
+ :param event_name: Test_result
+ :param onu_device_id: Onu device id
+ :return: None
+ """
+ metric_data = MetricInformation(
+ metadata=MetricMetaData(title=OmciTestRequest.OPTICAL_GROUP_NAME,
+ ts=arrow.utcnow().float_timestamp,
+ logical_device_id=self.logical_device_id,
+ serial_no=self.serial_number,
+ device_id=onu_device_id,
+ context={
+ 'events': event_name
+ }),
+ metrics=data)
+ self.log.info('Publish-Test-Result')
+ kpi_event = KpiEvent2(
+ type=KpiEventType.slice,
+ ts=arrow.utcnow().float_timestamp,
+ slice_data=[metric_data])
+
+ self.submit_kpis(kpi_event)
+
+ def process_messages(self, topic, msg):
+ """
+
+ :param topic: topic name of onu.
+ :param msg: actual test result dict
+ :return: None
+ """
+ result_frame = {}
+ event_name = topic.split(':')[-1]
+ onu_device_id = topic.split(':')[-2]
+ frame = msg['rx-response']
+ for key, value in (frame.fields['omci_message'].fields).iteritems():
+ result_frame[key] = long(value)
+ self.publish_metrics(result_frame, event_name, onu_device_id)
+
+ def read_from_event_bus(self):
+ """
+ Get the test action result from event bus.
+ :return: None
+ """
+ topic = 'omci-rx:{}:{}'.format(self.device_id, 'Test_Result')
+ self.msg = self.event_bus.subscribe(topic, self.process_messages)
+
+ @inlineCallbacks
+ def perform_test_omci(self):
+ """
+ Perform the initial test request
+ """
+ ani_g_entities = self._device.configuration.ani_g_entities
+ ani_g_entities_ids = ani_g_entities.keys() if ani_g_entities \
+ is not None else None
+ self._entity_id = ani_g_entities_ids[0]
+
+ self.log.info('perform-test', entity_class=self._entity_class,
+ entity_id=self._entity_id)
+ try:
+ frame = MEFrame(self._entity_class, self._entity_id, []).test()
+ self.strobe_watchdog()
+ result = yield self._device.omci_cc.send(frame)
+ if not result.fields['omci_message'].fields['success_code']:
+ self.read_from_event_bus()
+ else:
+ raise TestFailure('Test Failure: {}'.format(
+ result.fields['omci_message'].fields['success_code']))
+ except TimeoutError as e:
+ self.deferred.errback(failure.Failure(e))
+
+ except Exception as e:
+ self.log.exception('perform-test', e=e, class_id=self._entity_class,
+ entity_id=self._entity_id)
+ self.deferred.errback(failure.Failure(e))