This update addresses the following:
1) Fix an issue with peer grpc connection after a vcore instance
is recycled.
2) Set the default peer grpc retry to 0 for improved API performance in a
cluster
3) Adjust the logs level in the global_handler and local_handler.
This basically removes unnecessary and performance impacting
info logs and set some logs to warning instead of info.
4) Add a test to detect whether the cluster is in a transient state.
During that time, the coordinator leader waits until all previously
running voltha instances are registered in Consul before upading the
core assignments. This means that to a previously running vcore will
remain in the assignment list during the transient period, hence some
API requests targetted that vcore will fail during that time. Once
all expected voltha instances are registered the coordinator leader
will update the core assignment key, hence subsequent API requests
should pass.
Change-Id: I918b6af8f9158444d9e612c490972493b9609344
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index a1777a1..0c2d746 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -271,7 +271,10 @@
try:
log.info('membership-record-before')
(_, record) = yield self._retry('GET',
- self.membership_record_key)
+ self.membership_record_key,
+ wait='5s',
+ index=0
+ )
log.info('membership-record-after', record=record)
if record is None or \
'Session' not in record or \
@@ -377,6 +380,8 @@
def _start_leader_tracking(self):
reactor.callLater(0, self._leadership_tracking_loop)
+
+
@inlineCallbacks
def _leadership_tracking_loop(self):
try:
@@ -518,16 +523,25 @@
self._clear_backoff()
break
except ConsulException, e:
- log.exception('consul-not-up', consul=self.consul,
- session=self.consul.Session, e=e)
+ log.exception('consul-not-up',
+ operation=operation,
+ args=args,
+ session=self.consul.Session,
+ e=e)
yield self._backoff('consul-not-up')
except ConnectionError, e:
log.exception('cannot-connect-to-consul',
- consul=self.consul, e=e)
+ operation=operation,
+ args=args,
+ session=self.consul.Session,
+ e=e)
yield self._backoff('cannot-connect-to-consul')
except StaleMembershipEntryException, e:
log.exception('stale-membership-record-in-the-way',
- consul=self.consul, e=e)
+ operation=operation,
+ args=args,
+ session=self.consul.Session,
+ e=e)
yield self._backoff('stale-membership-record-in-the-way')
except Exception, e:
if not self.shutting_down:
diff --git a/voltha/core/dispatcher.py b/voltha/core/dispatcher.py
index 3b7a217..be253b8 100644
--- a/voltha/core/dispatcher.py
+++ b/voltha/core/dispatcher.py
@@ -88,7 +88,7 @@
:param context: grpc context
:return: the response of that dispatching request
"""
- log.debug('start',
+ log.info('start',
_method_name=method_name,
id=id,
request=request)
@@ -173,12 +173,16 @@
request,
context)
# Then get peers results
+ log.info('maps', peers=self.peers_map, grpc=self.grpc_conn_map)
current_responses = [result]
for core_id in self.peers_map:
if core_id == self.core_store_id:
continue # already processed
- if self.peers_map[core_id] and self.grpc_conn_map[core_id]:
+ # As a safeguard, check whether the core_id is in the grpc map
+ if core_id not in self.grpc_conn_map:
+ log.warn('no-grpc-peer-connection', core=core_id)
+ elif self.peers_map[core_id] and self.grpc_conn_map[core_id]:
res = yield self._dispatch_to_peer(core_id,
method_name,
request,
@@ -248,7 +252,14 @@
try:
log.info('grpc-channel-refresh', to_open=to_open,
to_close=to_close)
- # First open the connection
+
+ # Close the unused connection
+ for id in to_close:
+ if self.grpc_conn_map[id]:
+ # clear connection
+ self._disconnect_from_peer(id)
+
+ # Open the new connections
for id, host in to_open.iteritems():
if id in self.grpc_conn_map and self.grpc_conn_map[id]:
# clear connection
@@ -257,11 +268,6 @@
self.grpc_conn_map[id] = \
yield self._connect_to_peer(host, self.grpc_port)
- # Close the unused connection
- for id in to_close:
- if self.grpc_conn_map[id]:
- # clear connection
- self._disconnect_from_peer(id)
except Exception, e:
log.exception('exception', e=e)
@@ -312,13 +318,14 @@
method_name,
request,
context,
- retry=1):
+ retry=0):
"""
Invoke a gRPC call to the remote server and return the response.
:param core_id: The voltha instance where this request needs to be sent
:param method_name: The method name inside the service stub
:param request: The request protobuf message
:param context: grprc context
+ :param retry: on failure, the number of times to retry.
:return: The response as a protobuf message
"""
log.debug('peer-dispatch',
@@ -347,7 +354,7 @@
code = e.code()
if code == grpc.StatusCode.UNAVAILABLE:
# Try to reconnect
- status = self._reconnect_to_peer(core_id)
+ status = yield self._reconnect_to_peer(core_id)
if status and retry > 0:
response = yield self._dispatch_to_peer(core_id,
method_name,
diff --git a/voltha/core/global_handler.py b/voltha/core/global_handler.py
index f626cc8..56604a5 100644
--- a/voltha/core/global_handler.py
+++ b/voltha/core/global_handler.py
@@ -140,14 +140,14 @@
Empty(),
context,
core_id=core_id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Voltha Instance error')
context.set_code(response.error_code)
returnValue(VolthaInstance())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -158,7 +158,7 @@
Empty(),
context,
broadcast=True)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
returnValue(response)
@twisted_async
@@ -170,15 +170,15 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details(
'Logical device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(LogicalDevice())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -190,15 +190,15 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details(
'Logical device ports \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Ports())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -209,15 +209,15 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details(
'Logical device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Flows())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -229,15 +229,15 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details(
'Logical device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -249,15 +249,15 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details(
'Logical device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(FlowGroups())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -269,15 +269,15 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details(
'Logical device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -288,7 +288,7 @@
Empty(),
context,
broadcast=True)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
returnValue(response)
@twisted_async
@@ -299,7 +299,7 @@
Empty(),
context,
broadcast=True)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
returnValue(response)
@twisted_async
@@ -310,14 +310,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Device())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -327,14 +327,14 @@
response = yield self.dispatcher.dispatch('CreateDevice',
request,
context)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Create device error')
context.set_code(response.error_code)
returnValue(Device())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -345,14 +345,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Device())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -363,14 +363,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Device())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -381,14 +381,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Device())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -399,14 +399,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(Empty())
@twisted_async
@@ -417,14 +417,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Ports())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -435,14 +435,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(PmConfigs())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -453,14 +453,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -471,14 +471,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Flows())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -489,14 +489,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(FlowGroups())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -509,14 +509,14 @@
response = yield self.dispatcher.dispatch('ListDeviceTypes',
request,
context)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device types error')
context.set_code(response.error_code)
returnValue(DeviceTypes())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -529,15 +529,15 @@
response = yield self.dispatcher.dispatch('GetDeviceType',
request,
context)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device type \'{}\' error'.format(
request.id))
context.set_code(response.error_code)
returnValue(DeviceType())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -548,7 +548,7 @@
Empty(),
context,
broadcast=True)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
returnValue(response)
@twisted_async
@@ -561,12 +561,12 @@
id=request.id)
log.info('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device group\'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(DeviceGroup())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
# bbf_fiber rpcs start
@@ -580,12 +580,12 @@
context,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelgroup error')
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -612,12 +612,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelgroup\'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelgroupConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -638,12 +638,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelgroup\'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelgroupConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -656,12 +656,12 @@
context,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelpartition error')
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -682,13 +682,13 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelpartition\'{}\' error'.format(
request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelpartitionConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -709,13 +709,13 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelpartition\'{}\' error'.format(
request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelpartitionConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -736,13 +736,13 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelpartition\'{}\' error'.format(
request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelpartitionConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -755,12 +755,12 @@
context,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelpair error')
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -781,12 +781,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelpair\'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelpairConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -807,12 +807,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelpair\'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelpairConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -833,12 +833,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channelpair\'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelpairConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -852,13 +852,13 @@
id=request.id)
log.info('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channeltermination \'{}\' error'.format(
request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelterminationConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -872,13 +872,13 @@
id=request.id)
log.info('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channeltermination \'{}\' error'.format(
request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelterminationConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -892,13 +892,13 @@
id=request.id)
log.info('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channeltermination \'{}\' error'.format(
request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelterminationConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -912,13 +912,13 @@
id=request.id)
log.info('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Channeltermination \'{}\' error'.format(
request.id))
context.set_code(response.error_code)
returnValue(fb.ChannelterminationConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -931,12 +931,12 @@
context,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Ontani error')
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -957,12 +957,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Ontani \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.OntaniConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -983,12 +983,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Ontani \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.OntaniConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1009,12 +1009,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Ontani \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.OntaniConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1027,12 +1027,12 @@
context,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('VOntani error')
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1053,12 +1053,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('VOntani \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.VOntaniConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1079,12 +1079,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('VOntani \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.VOntaniConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1105,12 +1105,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('VOntani \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.VOntaniConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1123,12 +1123,12 @@
context,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('VEnet error')
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1149,12 +1149,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('VEnet \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.VEnetConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1175,12 +1175,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('VEnet \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.VEnetConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1201,12 +1201,12 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('VEnet \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(fb.VEnetConfig())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1345,12 +1345,12 @@
context,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('{}\' error' .format(type(request).__name__))
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
def manage_global_xpon_object(self, request, context, method_name):
@@ -1370,13 +1370,13 @@
id=request.id,
broadcast=True)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('{}\'{}\' error'.format(type(request).__name__,
request.id))
context.set_code(response.error_code)
returnValue(_xpon_object_type())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
# bbf_fiber rpcs end
@@ -1405,14 +1405,14 @@
context,
id=request.id,
broadcast=True)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Create alarm error')
context.set_code(response.error_code)
returnValue(AlarmFilter())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1423,14 +1423,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Alarm filter\'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(AlarmFilter())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1442,14 +1442,14 @@
context,
id=request.id,
broadcast=True)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Alarm filter\'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(AlarmFilter())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1461,14 +1461,14 @@
context,
id=request.id,
broadcast=True)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Alarm filter\'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Empty())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(Empty())
@twisted_async
@@ -1479,14 +1479,14 @@
Empty(),
context,
broadcast=True)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Alarm filters error')
context.set_code(response.error_code)
returnValue(AlarmFilter())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1497,14 +1497,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(Images())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1515,14 +1515,14 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(SelfTestResponse())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1534,17 +1534,17 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
except Exception as e:
log.exception('grpc-exception', e=e)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(OperationResp(code=OperationResp.OPERATION_FAILURE))
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1556,17 +1556,17 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
except Exception as e:
log.exception('grpc-exception', e=e)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(ImageDownloads())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1578,17 +1578,17 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
except Exception as e:
log.exception('grpc-exception', e=e)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(ImageDownload())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1600,17 +1600,17 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
except Exception as e:
log.exception('grpc-exception', e=e)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(ImageDownloads())
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@@ -1623,17 +1623,17 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
except Exception as e:
log.exception('grpc-exception', e=e)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(OperationResp(code=OperationResp.OPERATION_FAILURE))
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1645,17 +1645,17 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
except Exception as e:
log.exception('grpc-exception', e=e)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(OperationResp(code=OperationResp.OPERATION_FAILURE))
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
@twisted_async
@@ -1667,15 +1667,15 @@
request,
context,
id=request.id)
- log.info('grpc-response', response=response)
+ log.debug('grpc-response', response=response)
except Exception as e:
log.exception('grpc-exception', e=e)
if isinstance(response, DispatchError):
- log.info('grpc-error-response', error=response.error_code)
+ log.warn('grpc-error-response', error=response.error_code)
context.set_details('Device \'{}\' error'.format(request.id))
context.set_code(response.error_code)
returnValue(OperationResp(code=OperationResp.OPERATION_FAILURE))
else:
- log.info('grpc-success-response', response=response)
+ log.debug('grpc-success-response', response=response)
returnValue(response)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 77d5204..f87830d 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -652,7 +652,7 @@
pm_configs = self.root.get(
'/devices/{}/pm_configs'.format(request.id))
pm_configs.id = request.id
- log.info('device-for-pms', pm_configs=pm_configs)
+ log.debug('device-for-pms', pm_configs=pm_configs)
return pm_configs
except KeyError:
context.set_details(
@@ -662,7 +662,7 @@
@twisted_async
def UpdateDevicePmConfigs(self, request, context):
- log.info('grpc-request', request=request)
+ log.debug('grpc-request', request=request)
if '/' in request.id:
context.set_details(
diff --git a/voltha/leader.py b/voltha/leader.py
index 0dda65a..3bfbc6c 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -20,7 +20,7 @@
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet.base import DelayedCall
-from twisted.internet.defer import inlineCallbacks, DeferredList
+from twisted.internet.defer import inlineCallbacks, DeferredList, returnValue
from simplejson import dumps, loads
from common.utils.asleep import asleep
@@ -42,10 +42,10 @@
"""
ID_EXTRACTOR = '^(%s)([^/]+)$'
- ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
CORE_STORE_KEY_EXTRACTOR = '^%s(?P<core_store_id>[^/]+)/root$'
CORE_NUMBER_EXTRACTOR = '^.*\.([0-9]+)\..*$'
START_TIMESTAMP_EXTRACTOR = '^.*\..*\..*_([0-9]+)$'
+ ASSIGNMENT_ID_EXTRACTOR = '^(%s)([^/]+)/core_store$'
# Public methods:
@@ -72,12 +72,11 @@
self.core_data_id_match = re.compile(
self.CORE_STORE_KEY_EXTRACTOR % self.coord.core_store_prefix).match
- self.assignment_match = re.compile(
- self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
-
self.core_match = re.compile(self.CORE_NUMBER_EXTRACTOR).match
self.timestamp_match = re.compile(self.START_TIMESTAMP_EXTRACTOR ).match
+ self.assignment_id_match = re.compile(
+ self.ASSIGNMENT_ID_EXTRACTOR % self.coord.assignment_prefix).match
@inlineCallbacks
def start(self):
@@ -189,7 +188,7 @@
unique_members[voltha_number] = {'id': member['id'],
'timestamp': timestamp,
'host': member['host']}
- update_occurred = True
+ update_occurred = True
if update_occurred:
updated_members = []
@@ -203,14 +202,59 @@
return members
@inlineCallbacks
+ def _is_temporal_state(self, members):
+ try:
+ # First get the current core assignments
+ (_, results) = yield self.coord.kv_get(
+ self.coord.assignment_prefix,
+ recurse=True)
+
+ log.debug('core-assignments', assignment=results)
+ if results:
+ old_assignment = [
+ {'id': self.assignment_id_match(e['Key']).group(2),
+ 'core': e['Value']}
+ for e in results]
+
+ # If there are no curr_assignments then we are starting the
+ # system. In this case we should keep processing
+ if len(old_assignment) == 0:
+ returnValue(False)
+
+ # Tackle the simplest scenario - #members >= #old_assignment
+ if len(members) >= len(old_assignment):
+ returnValue(False)
+
+ # Everything else is a temporal state
+ log.info('temporal-state-detected', members=members,
+ old_assignments=old_assignment)
+
+ returnValue(True)
+ else:
+ returnValue(False)
+ except Exception as e:
+ log.exception('temporal-state-error', e=e)
+ returnValue(True)
+
+ @inlineCallbacks
def _track_members(self, index):
previous_index = index
try:
- # Put a wait of 5 seconds to wait for a change of membership,
- # if any. Without it, if all consul nodes go down then we will
- # never get out of this watch.
(index, results) = yield self.coord.kv_get(
- self.coord.membership_prefix, wait='5s', index=index, recurse=True)
+ self.coord.membership_prefix,
+ wait='10s',
+ index=index,
+ recurse=True)
+
+ if not results:
+ log.info('no-data-yet', index=index)
+ return
+
+ # Check whether we are still the leader - a new regime may be in
+ # place by the time we see a membership update
+ if self.halted:
+ log.info('I am no longer the leader')
+ return
# This can happen if consul went down and came back with no data
if not results:
@@ -238,8 +282,14 @@
log.info('active-members', active_members=members,
sanitized_members=updated_members)
- # Check if the two sets are the same
- if updated_members != self.members:
+ # Check if we are in a temporal state. If true wait for the
+ # next membership changes
+ temporal_state = yield self._is_temporal_state(updated_members)
+ if temporal_state:
+ log.info('temporal-state-detected')
+ pass # Wait for next member list change
+ elif updated_members != self.members:
+ # if the two sets are the same
# update the current set of config
yield self._update_core_store_references()
log.info('membership-changed',
@@ -260,7 +310,7 @@
# to prevent flood
finally:
if not self.halted:
- reactor.callLater(0, self._track_members, index)
+ reactor.callLater(1, self._track_members, index)
def _restart_reassignment_soak_timer(self):
diff --git a/voltha/voltha.yml b/voltha/voltha.yml
index 8b5b1ee..88c0b9f 100644
--- a/voltha/voltha.yml
+++ b/voltha/voltha.yml
@@ -63,9 +63,9 @@
membership_watch_relatch_delay: 0.1
membership_maintenance_loop_delay: 5
tracking_loop_delay: 1
- session_time_to_live: 300
- session_renewal_loop_delay: 30
- session_renewal_timeout: 60
+ session_time_to_live: 60
+ session_renewal_loop_delay: 10
+ session_renewal_timeout: 10
worker:
time_to_let_leader_update: 5
diff --git a/voltha/worker.py b/voltha/worker.py
index 3967cce..d180cd1 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -147,13 +147,15 @@
@inlineCallbacks
def _track_my_peers(self, index):
try:
+ prev_index = index
if self.mycore_store_id:
# Wait for updates to the store assigment key
(index, mappings) = yield self.coord.kv_get(
self.coord.core_store_assignment_key,
+ wait='10s',
index=index,
recurse=True)
- if mappings:
+ if mappings and index != prev_index:
new_map = loads(mappings[0]['Value'])
# Remove my id from my peers list
new_map.pop(self.mycore_store_id)
@@ -161,6 +163,9 @@
self.coord.publish_peers_map_change(new_map)
self.peers_map = new_map
log.info('peer-mapping-changed', mapping=new_map)
+ else:
+ log.debug('no-mapping-change', mappings=mappings,
+ index=index, prev_index=prev_index)
except Exception, e:
log.exception('peer-track-error', e=e)
@@ -172,7 +177,7 @@
finally:
if not self.halted:
# Wait longer if we have not received a core id yet
- reactor.callLater(0 if self.mycore_store_id else 5,
+ reactor.callLater(1 if self.mycore_store_id else 5,
self._track_my_peers, index)
def _stash_and_restart_soak_timer(self, candidate_workload):