VOL-1212: Deprecated alarm, avc, and test result message queues
Change-Id: Icb4da4fd23484b2375f6e776fa665bea7f3d5dad
diff --git a/tests/utests/voltha/extensions/omci/test_omci_cc.py b/tests/utests/voltha/extensions/omci/test_omci_cc.py
index 210d435..102e017 100644
--- a/tests/utests/voltha/extensions/omci/test_omci_cc.py
+++ b/tests/utests/voltha/extensions/omci/test_omci_cc.py
@@ -122,8 +122,6 @@
'rx_frames': omci_cc.rx_frames,
'rx_unknown_tid': omci_cc.rx_unknown_tid,
'rx_onu_frames': omci_cc.rx_onu_frames,
- 'rx_alarm_overflow': omci_cc.rx_alarm_overflow,
- 'rx_avc_overflow': omci_cc.rx_avc_overflow,
'rx_onu_discards': omci_cc.rx_onu_discards,
'rx_timeouts': omci_cc.rx_timeouts,
'rx_unknown_me': omci_cc.rx_unknown_me,
@@ -155,8 +153,6 @@
self.assertEqual(omci_cc.rx_frames, 0)
self.assertEqual(omci_cc.rx_unknown_tid, 0)
self.assertEqual(omci_cc.rx_onu_frames, 0)
- self.assertEqual(omci_cc.rx_alarm_overflow, 0)
- self.assertEqual(omci_cc.rx_avc_overflow, 0)
self.assertEqual(omci_cc.rx_onu_discards, 0)
self.assertEqual(omci_cc.rx_unknown_me, 0)
self.assertEqual(omci_cc.rx_timeouts, 0)
@@ -234,8 +230,6 @@
# d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
- # d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
- # d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
# d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
@@ -266,8 +260,6 @@
# d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
- # d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
- # d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
# d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
@@ -305,8 +297,6 @@
# d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
- # d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
- # d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
# d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
@@ -337,8 +327,6 @@
# d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
- # d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
- # d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
# d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
@@ -366,8 +354,6 @@
d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
- d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
- d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
@@ -396,8 +382,6 @@
d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
- d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
- d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
@@ -426,8 +410,6 @@
# d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
- # d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
- # d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
# d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
# d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
# d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
@@ -452,8 +434,6 @@
d.addCallback(self._check_stats, snapshot, 'rx_frames', snapshot['rx_frames'] + 1)
d.addCallback(self._check_stats, snapshot, 'rx_unknown_tid', snapshot['rx_unknown_tid'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_frames', snapshot['rx_onu_frames'])
- d.addCallback(self._check_stats, snapshot, 'rx_alarm_overflow', snapshot['rx_alarm_overflow'])
- d.addCallback(self._check_stats, snapshot, 'rx_avc_overflow', snapshot['rx_avc_overflow'])
d.addCallback(self._check_stats, snapshot, 'rx_onu_discards', snapshot['rx_onu_discards'])
d.addCallback(self._check_stats, snapshot, 'rx_unknown_me', snapshot['rx_unknown_me'])
d.addCallback(self._check_stats, snapshot, 'rx_timeouts', snapshot['rx_timeouts'])
@@ -635,6 +615,47 @@
self.assertTrue(True) # TODO: Implement
+ def test_rx_discard_if_disabled(self):
+ # ME without a known decoder
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = False
+ snapshot = self._snapshot_stats()
+
+ msg = '00fc2e0a00020000ff780000e00000010000000c' \
+ '0000000000000000000000000000000000000000' \
+ '00000028105a86ef'
+
+ omci_cc.receive_message(hex2raw(msg))
+
+ # Note: No counter increments
+ self.assertEqual(omci_cc.rx_frames, snapshot['rx_frames'])
+ self.assertEqual(omci_cc.rx_unknown_me, snapshot['rx_unknown_me'])
+ self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
+ self.assertEqual(omci_cc.rx_onu_frames, snapshot['rx_onu_frames'])
+ self.assertEqual(omci_cc.rx_unknown_tid, snapshot['rx_unknown_tid'])
+
+ def test_omci_alarm_decode(self):
+ """
+ This test covers an issue discovered in Sept 2018 (JIRA-1213). It was
+ an exception during frame decode.
+ """
+ self.setup_one_of_each()
+
+ omci_cc = self.onu_handler.omci_cc
+ omci_cc.enabled = True
+
+ # Frame from the JIRA issue
+ msg = '0000100a000b0102800000000000000000000000' \
+ '0000000000000000000000000000000000000015' \
+ '000000282d3ae0a6'
+
+ results = omci_cc.receive_message(hex2raw(msg))
+
+ self.assertTrue(True, 'Truth is the truth')
+
+
if __name__ == '__main__':
main()
diff --git a/voltha/extensions/omci/omci_cc.py b/voltha/extensions/omci/omci_cc.py
index 90b4b2a..326c8f8 100644
--- a/voltha/extensions/omci/omci_cc.py
+++ b/voltha/extensions/omci/omci_cc.py
@@ -29,10 +29,6 @@
from binascii import hexlify
-_MAX_INCOMING_ALARM_MESSAGES = 256
-_MAX_INCOMING_AVC_MESSAGES = 256
-_MAX_INCOMING_TEST_RESULT_MESSAGES = 64
-
DEFAULT_OMCI_TIMEOUT = 3 # Seconds
MAX_OMCI_REQUEST_AGE = 60 # Seconds
MAX_OMCI_TX_ID = 0xFFFF # 2 Octets max
@@ -77,10 +73,7 @@
OmciGetAllAlarmsNextResponse.message_id: RxEvent.Get_ALARM_Get_Next
}
- def __init__(self, adapter_agent, device_id, me_map=None,
- alarm_queue_limit=_MAX_INCOMING_ALARM_MESSAGES,
- avc_queue_limit=_MAX_INCOMING_ALARM_MESSAGES,
- test_results_queue_limit=_MAX_INCOMING_TEST_RESULT_MESSAGES):
+ def __init__(self, adapter_agent, device_id, me_map=None):
self.log = structlog.get_logger(device_id=device_id)
self._adapter_agent = adapter_agent
self._device_id = device_id
@@ -88,9 +81,6 @@
self._tx_tid = 1
self._enabled = False
self._requests = dict() # Tx ID -> (timestamp, deferred, tx_frame, timeout)
- self._alarm_queue = DeferredQueue(size=alarm_queue_limit)
- self._avc_queue = DeferredQueue(size=avc_queue_limit)
- self._test_results_queue = DeferredQueue(size=test_results_queue_limit)
self._me_map = me_map
# Statistics
@@ -98,8 +88,6 @@
self._rx_frames = 0
self._rx_unknown_tid = 0 # Rx OMCI with no Tx TID match
self._rx_onu_frames = 0 # Autonomously generated ONU frames
- self._rx_alarm_overflow = 0 # Autonomously generated ONU alarms rx overflow
- self._rx_avc_overflow = 0 # Autonomously generated ONU AVC rx overflow
self._rx_onu_discards = 0 # Autonomously generated ONU unknown message types
self._rx_timeouts = 0
self._rx_unknown_me = 0 # Number of managed entities Rx without a decode definition
@@ -172,14 +160,6 @@
return self._rx_onu_frames
@property
- def rx_alarm_overflow(self):
- return self._rx_alarm_overflow # Alarm ONU autonomous overflows
-
- @property
- def rx_avc_overflow(self):
- return self._rx_avc_overflow # Attribute Value change autonomous overflows
-
- @property
def rx_onu_discards(self):
return self._rx_onu_discards # Attribute Value change autonomous overflows
@@ -208,48 +188,6 @@
avg = self._reply_sum / self._rx_frames if self._rx_frames > 0 else 0.0
return int(round(avg * 1000.0)) # Milliseconds
- @property
- def get_alarm_message(self):
- """
- Attempt to retrieve and remove an ONU Alarm Message from the ONU
- autonomous message queue.
-
- TODO: We may want to deprecate this, see TODO comment around line 399 in
- the _request_success() method below
-
- :return: a Deferred which fires with the next Alarm Frame available in
- the queue.
- """
- return self._alarm_queue.get()
-
- @property
- def get_avc_message(self):
- """
- Attempt to retrieve and remove an ONU Attribute Value Change (AVC)
- Message from the ONU autonomous message queue.
-
- TODO: We may want to deprecate this, see TODO comment around line 399 in
- the _request_success() method below
-
- :return: a Deferred which fires with the next AVC Frame available in
- the queue.
- """
- return self._avc_queue.get()
-
- @property
- def get_test_results(self):
- """
- Attempt to retrieve and remove an ONU Test Results Message from the
- ONU autonomous message queue.
-
- TODO: We may want to deprecate this, see TODO comment around line 399 in
- the _request_success() method below
-
- :return: a Deferred which fires with the next Test Results Frame is
- available in the queue.
- """
- return self._test_results_queue.get()
-
def _start(self):
"""
Start the OMCI Communications Channel
@@ -268,20 +206,12 @@
self.flush()
self._proxy_address = None
- # TODO: What is best way to clean up any outstanding futures for these queues
- self._alarm_queue = None
- self._avc_queue = None
- self._test_results_queue = None
-
def _receive_onu_message(self, rx_frame):
""" Autonomously generated ONU frame Rx handler"""
- from twisted.internet.defer import QueueOverflow
self.log.debug('rx-onu-frame', frame_type=type(rx_frame),
frame=hexify(str(rx_frame)))
- # TODO: Signal, via defer if Alarm Overflow or just an event?
msg_type = rx_frame.fields['message_type']
-
self._rx_onu_frames += 1
msg = {TX_REQUEST_KEY: None,
@@ -290,34 +220,16 @@
if msg_type == EntityOperations.AlarmNotification.value:
topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Alarm_Notification)
reactor.callLater(0, self.event_bus.publish, topic, msg)
- try:
- self._alarm_queue.put((rx_frame, arrow.utcnow().float_timestamp))
-
- except QueueOverflow:
- self._rx_alarm_overflow += 1
- self.log.warn('onu-rx-alarm-overflow', cnt=self._rx_alarm_overflow)
elif msg_type == EntityOperations.AttributeValueChange.value:
topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.AVC_Notification)
reactor.callLater(0, self.event_bus.publish, topic, msg)
- try:
- self._alarm_queue.put((rx_frame, arrow.utcnow().float_timestamp))
-
- except QueueOverflow:
- self._rx_avc_overflow += 1
- self.log.warn('onu-rx-avc-overflow', cnt=self._rx_avc_overflow)
elif msg_type == EntityOperations.TestResult.value:
topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Test_Result)
reactor.callLater(0, self.event_bus.publish, topic, msg)
- try:
- self._test_results_queue.put((rx_frame, arrow.utcnow().float_timestamp))
-
- except QueueOverflow:
- self.log.warn('onu-rx-test-results-overflow')
else:
- # TODO: Need to add test results message support
self.log.warn('onu-unsupported-autonomous-message', type=msg_type)
self._rx_onu_discards += 1
@@ -326,6 +238,7 @@
Receive and OMCI message from the proxy channel to the OLT.
Call this from your ONU Adapter on a new OMCI Rx on the proxy channel
+ :param msg: (str) OMCI binary message (used as input to Scapy packet decoder)
"""
if self.enabled:
try:
@@ -502,14 +415,6 @@
self._requests = dict()
- if max_age == 0:
- # Flush autonomous messages (Alarms & AVCs)
- while self._alarm_queue.pending:
- _ = yield self._alarm_queue.get()
-
- while self._avc_queue.pending:
- _ = yield self._avc_queue.get()
-
def _get_tx_tid(self):
"""
Get the next Transaction ID for a tx. Note TID=0 is reserved
diff --git a/voltha/extensions/omci/tasks/omci_get_request.py b/voltha/extensions/omci/tasks/omci_get_request.py
index b199f8f..0db4e5d 100644
--- a/voltha/extensions/omci/tasks/omci_get_request.py
+++ b/voltha/extensions/omci/tasks/omci_get_request.py
@@ -196,7 +196,8 @@
self.deferred.callback(self)
except Exception as e:
- self.log.exception('perform-get', e=e)
+ self.log.exception('perform-get', e=e, class_id=self._entity_class,
+ entity_id=self._entity_id, attributes=self._attributes)
self.deferred.errback(failure.Failure(e))
@inlineCallbacks