VOL-1212: Deprecated alarm, avc, and test result message queues
Change-Id: Icb4da4fd23484b2375f6e776fa665bea7f3d5dad
diff --git a/voltha/extensions/omci/omci_cc.py b/voltha/extensions/omci/omci_cc.py
index 90b4b2a..326c8f8 100644
--- a/voltha/extensions/omci/omci_cc.py
+++ b/voltha/extensions/omci/omci_cc.py
@@ -29,10 +29,6 @@
from binascii import hexlify
-_MAX_INCOMING_ALARM_MESSAGES = 256
-_MAX_INCOMING_AVC_MESSAGES = 256
-_MAX_INCOMING_TEST_RESULT_MESSAGES = 64
-
DEFAULT_OMCI_TIMEOUT = 3 # Seconds
MAX_OMCI_REQUEST_AGE = 60 # Seconds
MAX_OMCI_TX_ID = 0xFFFF # 2 Octets max
@@ -77,10 +73,7 @@
OmciGetAllAlarmsNextResponse.message_id: RxEvent.Get_ALARM_Get_Next
}
- def __init__(self, adapter_agent, device_id, me_map=None,
- alarm_queue_limit=_MAX_INCOMING_ALARM_MESSAGES,
- avc_queue_limit=_MAX_INCOMING_ALARM_MESSAGES,
- test_results_queue_limit=_MAX_INCOMING_TEST_RESULT_MESSAGES):
+ def __init__(self, adapter_agent, device_id, me_map=None):
self.log = structlog.get_logger(device_id=device_id)
self._adapter_agent = adapter_agent
self._device_id = device_id
@@ -88,9 +81,6 @@
self._tx_tid = 1
self._enabled = False
self._requests = dict() # Tx ID -> (timestamp, deferred, tx_frame, timeout)
- self._alarm_queue = DeferredQueue(size=alarm_queue_limit)
- self._avc_queue = DeferredQueue(size=avc_queue_limit)
- self._test_results_queue = DeferredQueue(size=test_results_queue_limit)
self._me_map = me_map
# Statistics
@@ -98,8 +88,6 @@
self._rx_frames = 0
self._rx_unknown_tid = 0 # Rx OMCI with no Tx TID match
self._rx_onu_frames = 0 # Autonomously generated ONU frames
- self._rx_alarm_overflow = 0 # Autonomously generated ONU alarms rx overflow
- self._rx_avc_overflow = 0 # Autonomously generated ONU AVC rx overflow
self._rx_onu_discards = 0 # Autonomously generated ONU unknown message types
self._rx_timeouts = 0
self._rx_unknown_me = 0 # Number of managed entities Rx without a decode definition
@@ -172,14 +160,6 @@
return self._rx_onu_frames
@property
- def rx_alarm_overflow(self):
- return self._rx_alarm_overflow # Alarm ONU autonomous overflows
-
- @property
- def rx_avc_overflow(self):
- return self._rx_avc_overflow # Attribute Value change autonomous overflows
-
- @property
def rx_onu_discards(self):
return self._rx_onu_discards # Attribute Value change autonomous overflows
@@ -208,48 +188,6 @@
avg = self._reply_sum / self._rx_frames if self._rx_frames > 0 else 0.0
return int(round(avg * 1000.0)) # Milliseconds
- @property
- def get_alarm_message(self):
- """
- Attempt to retrieve and remove an ONU Alarm Message from the ONU
- autonomous message queue.
-
- TODO: We may want to deprecate this, see TODO comment around line 399 in
- the _request_success() method below
-
- :return: a Deferred which fires with the next Alarm Frame available in
- the queue.
- """
- return self._alarm_queue.get()
-
- @property
- def get_avc_message(self):
- """
- Attempt to retrieve and remove an ONU Attribute Value Change (AVC)
- Message from the ONU autonomous message queue.
-
- TODO: We may want to deprecate this, see TODO comment around line 399 in
- the _request_success() method below
-
- :return: a Deferred which fires with the next AVC Frame available in
- the queue.
- """
- return self._avc_queue.get()
-
- @property
- def get_test_results(self):
- """
- Attempt to retrieve and remove an ONU Test Results Message from the
- ONU autonomous message queue.
-
- TODO: We may want to deprecate this, see TODO comment around line 399 in
- the _request_success() method below
-
- :return: a Deferred which fires with the next Test Results Frame is
- available in the queue.
- """
- return self._test_results_queue.get()
-
def _start(self):
"""
Start the OMCI Communications Channel
@@ -268,20 +206,12 @@
self.flush()
self._proxy_address = None
- # TODO: What is best way to clean up any outstanding futures for these queues
- self._alarm_queue = None
- self._avc_queue = None
- self._test_results_queue = None
-
def _receive_onu_message(self, rx_frame):
""" Autonomously generated ONU frame Rx handler"""
- from twisted.internet.defer import QueueOverflow
self.log.debug('rx-onu-frame', frame_type=type(rx_frame),
frame=hexify(str(rx_frame)))
- # TODO: Signal, via defer if Alarm Overflow or just an event?
msg_type = rx_frame.fields['message_type']
-
self._rx_onu_frames += 1
msg = {TX_REQUEST_KEY: None,
@@ -290,34 +220,16 @@
if msg_type == EntityOperations.AlarmNotification.value:
topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Alarm_Notification)
reactor.callLater(0, self.event_bus.publish, topic, msg)
- try:
- self._alarm_queue.put((rx_frame, arrow.utcnow().float_timestamp))
-
- except QueueOverflow:
- self._rx_alarm_overflow += 1
- self.log.warn('onu-rx-alarm-overflow', cnt=self._rx_alarm_overflow)
elif msg_type == EntityOperations.AttributeValueChange.value:
topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.AVC_Notification)
reactor.callLater(0, self.event_bus.publish, topic, msg)
- try:
- self._alarm_queue.put((rx_frame, arrow.utcnow().float_timestamp))
-
- except QueueOverflow:
- self._rx_avc_overflow += 1
- self.log.warn('onu-rx-avc-overflow', cnt=self._rx_avc_overflow)
elif msg_type == EntityOperations.TestResult.value:
topic = OMCI_CC.event_bus_topic(self._device_id, RxEvent.Test_Result)
reactor.callLater(0, self.event_bus.publish, topic, msg)
- try:
- self._test_results_queue.put((rx_frame, arrow.utcnow().float_timestamp))
-
- except QueueOverflow:
- self.log.warn('onu-rx-test-results-overflow')
else:
- # TODO: Need to add test results message support
self.log.warn('onu-unsupported-autonomous-message', type=msg_type)
self._rx_onu_discards += 1
@@ -326,6 +238,7 @@
Receive and OMCI message from the proxy channel to the OLT.
Call this from your ONU Adapter on a new OMCI Rx on the proxy channel
+ :param msg: (str) OMCI binary message (used as input to Scapy packet decoder)
"""
if self.enabled:
try:
@@ -502,14 +415,6 @@
self._requests = dict()
- if max_age == 0:
- # Flush autonomous messages (Alarms & AVCs)
- while self._alarm_queue.pending:
- _ = yield self._alarm_queue.get()
-
- while self._avc_queue.pending:
- _ = yield self._avc_queue.get()
-
def _get_tx_tid(self):
"""
Get the next Transaction ID for a tx. Note TID=0 is reserved