VOL-1832: Preload tx audit and enqueue recv due to slow send
Change-Id: I64a2f45f04fb42fbae4456c6b9d4ef8cfcbc12d1
diff --git a/VERSION b/VERSION
index ee1372d..7179039 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.2.2
+0.2.3
diff --git a/pyvoltha/adapters/extensions/omci/omci_cc.py b/pyvoltha/adapters/extensions/omci/omci_cc.py
index e7f306b..e86504b 100644
--- a/pyvoltha/adapters/extensions/omci/omci_cc.py
+++ b/pyvoltha/adapters/extensions/omci/omci_cc.py
@@ -355,12 +355,15 @@
omci_entities.entity_id_to_class_map = saved_me_map # Always restore it.
rx_tid = rx_frame.fields['transaction_id']
+ self.log.debug('Received message for rx_tid', rx_tid = rx_tid)
if rx_tid == 0:
+ self.log.debug('Receive ONU message', rx_tid=0)
return self._receive_onu_message(rx_frame)
# Previously unreachable if this is the very first round-trip Rx or we
# have been running consecutive errors
if self._rx_frames == 0 or self._consecutive_errors != 0:
+ self.log.debug('Consecutive errors for rx', err = self._consecutive_errors)
self.reactor.callLater(0, self._publish_connectivity_event, True)
self._rx_frames += 1
@@ -376,6 +379,9 @@
if last_tx_tuple is None or \
last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id') != rx_tid:
# Possible late Rx on a message that timed-out
+ if last_tx_tuple:
+ self.log.debug('Unknown message', rx_tid=rx_tid,
+ tx_id=last_tx_tuple[OMCI_CC.REQUEST_FRAME].fields.get('transaction_id'))
self._rx_unknown_tid += 1
self._rx_late += 1
return
@@ -389,6 +395,7 @@
# Late arrival already serviced by a timeout?
if d.called:
self._rx_late += 1
+ self.log.debug('Serviced by timeout. Late arrival', rx_late = self._rx_late)
return
except Exception as e:
@@ -398,6 +405,8 @@
return
# Publish Rx event to listeners in a different task
+ self.log.debug('Publish rx event', rx_tid = rx_tid,
+ tx_tid = tx_frame.fields['transaction_id'])
reactor.callLater(0, self._publish_rx_frame, tx_frame, rx_frame)
# begin success callback chain (will cancel timeout and queue next Tx message)
@@ -616,6 +625,8 @@
self._send_next_request(high_priority)
+ self.log.debug('inter-adapter-recv-omci', tid=rx_tid)
+
# Return rx_frame (to next item in callback list)
return rx_frame
@@ -691,6 +702,7 @@
if not self.enabled or self._proxy_address is None:
# TODO custom exceptions throughout this code would be helpful
self._tx_errors += 1
+ self.log.error("cannot-send-omci-msg", tx_errors=self._tx_errors, omci_cc_enabled=self._enabled, proxy_address=self._proxy_address)
return fail(result=failure.Failure(Exception('OMCI is not enabled')))
timeout = float(timeout)
@@ -804,14 +816,34 @@
try:
self._rx_response[index] = None
+ # NOTE: We preload the tx audit fields and enqueue ourselves for receive.
+ # This is because, the response could be faster than the yield send wakeup latency
+ self._tx_frames += 1
+
+ # Note: the 'd' deferred in the queued request we just got will
+ # already have its success callback queued (callLater -> 0) with a
+ # result of "queued". Here we need time it out internally so
+ # we can call cleanup appropriately. G.988 mentions that most ONUs
+ # will process an request in < 1 second.
+ dc_timeout = timeout if timeout > 0 else 1.0
+
+ # Timeout on internal deferred to support internal retries if requested
+ dc = self.reactor.callLater(dc_timeout, self._request_timeout, tx_tid, high_priority)
+
+ # (timestamp, defer, frame, timeout, retry, delayedCall)
+ self._tx_request[index] = (ts, d, frame, timeout, retry, dc)
+
+ if timeout > 0:
+ d.addCallbacks(self._request_success, self._request_failure,
+ callbackArgs=(high_priority,),
+ errbackArgs=(tx_tid, high_priority))
+
omci_msg = InterAdapterOmciMessage(
message=hexify(str(frame)),
proxy_address=self._proxy_address,
connect_status=self._device.connect_status)
- self.log.debug('inter-adapter-send-omci', omci_msg=omci_msg,
- connect_status=self._device.connect_status,
- channel_id=self._proxy_address.channel_id)
+ self.log.debug('inter-adapter-send-omci', tid=tx_tid, omci_msg=omci_msg.message)
yield self._adapter_proxy.send_inter_adapter_message(
msg=omci_msg,
@@ -821,30 +853,10 @@
to_device_id=self._device_id,
proxy_device_id=self._proxy_address.device_id
)
-
+ self.log.debug('done-inter-adapter-send-message', tx_tid=tx_tid)
finally:
omci_entities.entity_id_to_class_map = saved_me_map
- self._tx_frames += 1
-
- # Note: the 'd' deferred in the queued request we just got will
- # already have its success callback queued (callLater -> 0) with a
- # result of "queued". Here we need time it out internally so
- # we can call cleanup appropriately. G.988 mentions that most ONUs
- # will process an request in < 1 second.
- dc_timeout = timeout if timeout > 0 else 1.0
-
- # Timeout on internal deferred to support internal retries if requested
- dc = self.reactor.callLater(dc_timeout, self._request_timeout, tx_tid, high_priority)
-
- # (timestamp, defer, frame, timeout, retry, delayedCall)
- self._tx_request[index] = (ts, d, frame, timeout, retry, dc)
-
- if timeout > 0:
- d.addCallbacks(self._request_success, self._request_failure,
- callbackArgs=(high_priority,),
- errbackArgs=(tx_tid, high_priority))
-
except IndexError:
pass # Nothing pending in this queue
diff --git a/pyvoltha/adapters/extensions/omci/onu_device_entry.py b/pyvoltha/adapters/extensions/omci/onu_device_entry.py
index dab0195..52de1f4 100644
--- a/pyvoltha/adapters/extensions/omci/onu_device_entry.py
+++ b/pyvoltha/adapters/extensions/omci/onu_device_entry.py
@@ -359,7 +359,7 @@
self._state_machines.append(sm)
sm.start()
- self._deferred = reactor.callLater(0, start_state_machines,
+ self._deferred = reactor.callLater(1, start_state_machines,
self._on_start_state_machines)
# Notify any event listeners
self._publish_device_status_event()
diff --git a/pyvoltha/adapters/extensions/omci/tasks/omci_get_request.py b/pyvoltha/adapters/extensions/omci/tasks/omci_get_request.py
index 70c01e9..7950944 100644
--- a/pyvoltha/adapters/extensions/omci/tasks/omci_get_request.py
+++ b/pyvoltha/adapters/extensions/omci/tasks/omci_get_request.py
@@ -274,7 +274,10 @@
elif status != RC.Success.value:
raise GetException('Get failed with status code: {}'.format(status))
- assert attr_mask == get_omci['attributes_mask'], 'wrong attribute'
+ # assert attr_mask == get_omci['attributes_mask'], 'wrong attribute'
+ if attr_mask != get_omci['attributes_mask']:
+ self.log.debug('attr mask does not match expected mask', attr_mask=attr_mask,
+ expected_mask = get_omci['attributes_mask'])
results_omci['attributes_mask'] |= attr_mask
if results_omci.get('data') is None: