VOL-281: OfAgent integration in swarm cluster
- Auto connect/reconnect of agent with a voltha instance
- Survives and cleans up after a voltha disconnect
- Added new compose files to start ofagent and onos in swarm cluster

Amendments:
- Fixed logs to have uniform format
- Removed instructions to start/stop ONOS service in cluster scripts
- Added missing change in local handler to exit streaming rpc calls
  after ofagent termination
- Renamed references from voltha to vcore where necessary

Change-Id: Icb4611d92be35b48e557e6b12f7d2074282175ea
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 8a4081c..0d75be0 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -28,34 +28,38 @@
 from grpc._channel import _Rendezvous
 from ofagent.protos import third_party
 from protos import voltha_pb2
+from protos.voltha_pb2 import OfAgentSubscriber
 from grpc_client import GrpcClient
 
 from agent import Agent
 from google.protobuf.empty_pb2 import Empty
+from common.utils.dockerhelpers import get_my_containers_name
 
 
 log = get_logger()
 # _ = third_party
 
 class ConnectionManager(object):
-
-    def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoints,
-                 voltha_retry_interval=0.5, devices_refresh_interval=5):
+    def __init__(self, consul_endpoint, vcore_endpoint, controller_endpoints,
+                 vcore_retry_interval=0.5, devices_refresh_interval=5,
+                 subscription_refresh_interval=5):
 
         log.info('init-connection-manager')
-        log.info('list-of-controllers',controller_endpoints=controller_endpoints)
+        log.info('list-of-controllers', controller_endpoints=controller_endpoints)
         self.controller_endpoints = controller_endpoints
         self.consul_endpoint = consul_endpoint
-        self.voltha_endpoint = voltha_endpoint
+        self.vcore_endpoint = vcore_endpoint
 
         self.channel = None
-        self.grpc_client = None  # single, shared gRPC client to Voltha
+        self.grpc_client = None  # single, shared gRPC client to vcore
 
         self.agent_map = {}  # (datapath_id, controller_endpoint) -> Agent()
         self.device_id_to_datapath_id_map = {}
 
-        self.voltha_retry_interval = voltha_retry_interval
+        self.vcore_retry_interval = vcore_retry_interval
         self.devices_refresh_interval = devices_refresh_interval
+        self.subscription_refresh_interval = subscription_refresh_interval
+        self.subscription = None
 
         self.running = False
 
@@ -68,11 +72,8 @@
 
         self.running = True
 
-        # Get voltha grpc endpoint
-        self.channel = self.get_grpc_channel_with_voltha()
-
-        # Create shared gRPC API object
-        self.grpc_client = GrpcClient(self, self.channel).start()
+        # Start monitoring the vcore grpc channel
+        reactor.callInThread(self.monitor_vcore_grpc_channel)
 
         # Start monitoring logical devices and manage agents accordingly
         reactor.callLater(0, self.monitor_logical_devices)
@@ -87,8 +88,9 @@
         for agent in self.agent_map.itervalues():
             agent.stop()
         self.running = False
-        self.grpc_client.stop()
-        del self.channel
+
+        self._reset_grpc_attributes()
+
         log.info('stopped')
 
     def resolve_endpoint(self, endpoint):
@@ -98,52 +100,121 @@
                 ip_port_endpoint = get_endpoint_from_consul(
                     self.consul_endpoint, endpoint[1:])
                 log.info(
-                    'Found endpoint {} service at {}'.format(endpoint,
-                                                             ip_port_endpoint))
+                    '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
             except Exception as e:
-                log.error('Failure to locate {} service from '
-                               'consul {}:'.format(endpoint, repr(e)))
-                log.error('Committing suicide...')
+                log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
+                log.error('committing-suicide')
                 # Committing suicide in order to let docker restart ofagent
                 os.system("kill -15 {}".format(os.getpid()))
         if ip_port_endpoint:
             host, port = ip_port_endpoint.split(':', 2)
             return host, int(port)
 
-    def get_grpc_channel_with_voltha(self):
-        log.info('Resolving voltha endpoint {} from consul'.format(
-            self.voltha_endpoint))
-        host, port = self.resolve_endpoint(self.voltha_endpoint)
+    def _reset_grpc_attributes(self):
+        log.debug('start-reset-grpc-attributes')
+
+        if self.grpc_client is not None:
+            self.grpc_client.stop()
+
+        if self.channel is not None:
+            del self.channel
+
+        self.is_alive = False
+        self.channel = None
+        self.subscription = None
+        self.grpc_client = None
+
+        log.debug('stop-reset-grpc-attributes')
+
+    def _assign_grpc_attributes(self):
+        log.debug('start-assign-grpc-attributes')
+
+        host, port = self.resolve_endpoint(self.vcore_endpoint)
+        log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
+
         assert host is not None
         assert port is not None
-        # Create grpc channel to Voltha
-        channel = grpc.insecure_channel('{}:{}'.format(host, port))
-        log.info('Acquired a grpc channel to voltha')
-        return channel
+
+        # Establish a connection to the vcore GRPC server
+        self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
+        self.is_alive = True
+
+        log.debug('stop-assign-grpc-attributes')
+
+    @inlineCallbacks
+    def monitor_vcore_grpc_channel(self):
+        log.debug('start-monitor-vcore-grpc-channel')
+
+        while self.running:
+            try:
+                # If a subscription is not yet assigned then establish new GRPC connection
+                # ... otherwise keep using existing connection details
+                if self.subscription is None:
+                    self._assign_grpc_attributes()
+
+                # Send subscription request to register the current ofagent instance
+                container_name = get_my_containers_name()
+                stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
+                subscription = stub.Subscribe(OfAgentSubscriber(ofagent_id=container_name))
+
+                # If the subscriber id matches the current instance
+                # ... then the subscription has succeeded
+                if subscription is not None and subscription.ofagent_id == container_name:
+                    if self.subscription is None:
+                        # Keep details on the current GRPC session and subscription
+                        log.debug('subscription-with-vcore-successful', subscription=subscription)
+                        self.subscription = subscription
+                        self.grpc_client = GrpcClient(self, self.channel).start()
+
+                    # Sleep a bit in between each subscribe
+                    yield asleep(self.subscription_refresh_interval)
+
+                    # Move on to next subscribe request
+                    continue
+
+                # The subscription did not succeed, reset and move on
+                else:
+                    log.info('subscription-with-vcore-unavailable', subscription=subscription)
+
+            except _Rendezvous, e:
+                log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
+
+            except Exception as e:
+                log.exception('unexpected-subscription-termination-with-vcore', e=e)
+
+            # Reset grpc details
+            # The vcore instance is either not available for subscription
+            # or a failure occurred with the existing communication.
+            self._reset_grpc_attributes()
+
+            # Sleep for a short period and retry
+            yield asleep(self.vcore_retry_interval)
+
+        log.debug('stop-monitor-vcore-grpc-channel')
 
     @inlineCallbacks
     def get_list_of_logical_devices_from_voltha(self):
 
-        while True:
-            log.info('Retrieve devices from voltha')
+        while self.running:
+            log.info('retrieve-logical-device-list')
             try:
                 stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
                 devices = stub.ListLogicalDevices(Empty()).items
                 for device in devices:
-                    log.info("Devices {} -> {}".format(device.id,
-                                                       device.datapath_id))
+                    log.info("logical-device-entry", id=device.id, datapath_id=device.datapath_id)
+
                 returnValue(devices)
 
             except _Rendezvous, e:
+                log.error('vcore-communication-failure', exception=e, status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
                     os.system("kill -15 {}".format(os.getpid()))
 
             except Exception as e:
-                log.error('Failure to retrieve devices from '
-                               'voltha: {}'.format(repr(e)))
+                log.exception('logical-devices-retrieval-failure', exception=e)
 
-            log.info('reconnect', after_delay=self.voltha_retry_interval)
-            yield asleep(self.voltha_retry_interval)
+            log.info('reconnect', after_delay=self.vcore_retry_interval)
+            yield asleep(self.vcore_retry_interval)
 
     def refresh_agent_connections(self, devices):
         """
@@ -185,7 +256,7 @@
         device_id = device.id
         for controller_endpoint in self.controller_endpoints:
             agent = Agent(controller_endpoint, datapath_id,
-                      device_id, self.grpc_client)
+                          device_id, self.grpc_client)
             agent.start()
             self.agent_map[(datapath_id,controller_endpoint)] = agent
             self.device_id_to_datapath_id_map[device_id] = datapath_id
@@ -200,30 +271,45 @@
 
     @inlineCallbacks
     def monitor_logical_devices(self):
-        while True:
+        log.debug('start-monitor-logical-devices')
+
+        while self.running:
+            log.info('monitoring-logical-devices')
+
             # should change to a gRPC streaming call
             # see https://jira.opencord.org/browse/CORD-821
 
-            # get current list from Voltha
-            devices = yield self.get_list_of_logical_devices_from_voltha()
+            try:
+                if self.channel is not None and self.grpc_client is not None:
+                    # get current list from Voltha
+                    devices = yield self.get_list_of_logical_devices_from_voltha()
 
-            # update agent list and mapping tables as needed
-            self.refresh_agent_connections(devices)
+                    # update agent list and mapping tables as needed
+                    self.refresh_agent_connections(devices)
+                else:
+                    log.info('vcore-communication-unavailable')
 
-            # wait before next poll
-            yield asleep(self.devices_refresh_interval)
-            log.info('Monitor connections')
+                # wait before next poll
+                yield asleep(self.devices_refresh_interval)
+
+            except _Rendezvous, e:
+                log.error('vcore-communication-failure', exception=repr(e), status=e.code())
+
+            except Exception as e:
+                log.exception('unexpected-vcore-communication-failure', exception=repr(e))
+
+        log.debug('stop-monitor-logical-devices')
 
     def forward_packet_in(self, device_id, ofp_packet_in):
         datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
         if datapath_id:
-           for controller_endpoint in self.controller_endpoints:
-               agent = self.agent_map[(datapath_id,controller_endpoint)]
-               agent.forward_packet_in(ofp_packet_in)
+            for controller_endpoint in self.controller_endpoints:
+                agent = self.agent_map[(datapath_id, controller_endpoint)]
+                agent.forward_packet_in(ofp_packet_in)
 
     def forward_change_event(self, device_id, event):
         datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
         if datapath_id:
-           for controller_endpoint in self.controller_endpoints:
-               agent = self.agent_map[(datapath_id,controller_endpoint)]
-               agent.forward_change_event(event)
+            for controller_endpoint in self.controller_endpoints:
+                agent = self.agent_map[(datapath_id, controller_endpoint)]
+                agent.forward_change_event(event)