This update filters out invalid voltha instances that still appear
in Consul member lists before their session are timed out and removed
from that list.
Change-Id: I6d5b386ec183b289b3cca1f455ab2940566c1280
diff --git a/voltha/leader.py b/voltha/leader.py
index a576a0b..0dda65a 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -44,6 +44,8 @@
ID_EXTRACTOR = '^(%s)([^/]+)$'
ASSIGNMENT_EXTRACTOR = '^%s(?P<member_id>[^/]+)/(?P<work_id>[^/]+)$'
CORE_STORE_KEY_EXTRACTOR = '^%s(?P<core_store_id>[^/]+)/root$'
+ CORE_NUMBER_EXTRACTOR = '^.*\.([0-9]+)\..*$'
+ START_TIMESTAMP_EXTRACTOR = '^.*\..*\..*_([0-9]+)$'
# Public methods:
@@ -73,6 +75,10 @@
self.assignment_match = re.compile(
self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
+ self.core_match = re.compile(self.CORE_NUMBER_EXTRACTOR).match
+ self.timestamp_match = re.compile(self.START_TIMESTAMP_EXTRACTOR ).match
+
+
@inlineCallbacks
def start(self):
log.debug('starting')
@@ -155,6 +161,47 @@
except Exception, e:
log.exception('error-update-store', e=e)
+ def _sanitize_member_list(self, members):
+ # This method removes any duplicates from the member list using the
+ # voltha number from the member id and the time that voltha instance
+ # started, again from the member id. This method is meaningful only
+ # in the swarm cluster. In a non-cluster environment the member id
+ # is formatted differently. In such a case, the method below will
+ # create an exception and return the member list as is.
+
+ try:
+ unique_members = {}
+ update_occurred = False
+ log.info('members', members=members)
+ for member in members:
+ log.info('member', member=member)
+ # Extract the swarm assigned number of the voltha instance
+ voltha_number = self.core_match(member['id']).group(1)
+ timestamp = self.timestamp_match(member['id']).group(1)
+ if voltha_number not in unique_members:
+ unique_members[voltha_number] = {'id': member['id'],
+ 'timestamp': timestamp,
+ 'host': member['host']}
+ else:
+ # Verify whether if this member has the latest timestamp. If
+ # yes, overwrite the previous one
+ if unique_members[voltha_number]['timestamp'] < timestamp:
+ unique_members[voltha_number] = {'id': member['id'],
+ 'timestamp': timestamp,
+ 'host': member['host']}
+ update_occurred = True
+
+ if update_occurred:
+ updated_members = []
+ for _, unique_member in unique_members.iteritems():
+ updated_members.append({'host': unique_member['host'],
+ 'id': unique_member['id']})
+ return updated_members
+ else:
+ return members
+ except:
+ return members
+
@inlineCallbacks
def _track_members(self, index):
previous_index = index
@@ -183,17 +230,23 @@
'host': loads(e['Value'])['host_address']}
for e in results if 'Session' in e]
- log.info('active-members', active_members=members)
+ if members:
+ updated_members = self._sanitize_member_list(members)
+ else:
+ updated_members = None
+
+ log.info('active-members', active_members=members,
+ sanitized_members=updated_members)
# Check if the two sets are the same
- if members != self.members:
+ if updated_members != self.members:
# update the current set of config
yield self._update_core_store_references()
log.info('membership-changed',
prev_members=self.members,
- curr_members=members,
+ curr_members=updated_members,
core_store_mapping=self.core_store_assignment)
- self.members = members
+ self.members = updated_members
self._restart_core_store_reassignment_soak_timer()
else:
log.debug('no-membership-change', index=index)
diff --git a/voltha/worker.py b/voltha/worker.py
index c62c5fa..3967cce 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -160,7 +160,7 @@
if self.peers_map is None or self.peers_map != new_map:
self.coord.publish_peers_map_change(new_map)
self.peers_map = new_map
- log.debug('peer-mapping-changed', mapping=new_map)
+ log.info('peer-mapping-changed', mapping=new_map)
except Exception, e:
log.exception('peer-track-error', e=e)