VOL-281: OfAgent integration in swarm cluster
- Auto connect/reconnect of agent with a voltha instance
- Survives and cleans up after a voltha disconnect
- Added new compose files to start ofagent and onos in swarm cluster

Amendments:
- Fixed logs to have uniform format
- Removed instructions to start/stop ONOS service in cluster scripts
- Added missing change in local handler to exit streaming rpc calls
  after ofagent termination
- Renamed references from voltha to vcore where necessary

Change-Id: Icb4611d92be35b48e557e6b12f7d2074282175ea
diff --git a/compose/docker-compose-ofagent-swarm.yml b/compose/docker-compose-ofagent-swarm.yml
new file mode 100644
index 0000000..dee18cc
--- /dev/null
+++ b/compose/docker-compose-ofagent-swarm.yml
@@ -0,0 +1,31 @@
+#
+# This Docker stackfile deploys a ofagent container on all swarm instances.
+#
+# The stackfile assumes that overlay network 'voltha_net' has already been
+# created. To deploy the stack, issue the command:
+#
+#     docker stack deploy -c docker-compose-ofagent-swarm.yml ofagent
+#
+version: "3.2"
+services:
+    ofagent:
+        image: cord/ofagent
+        deploy:
+            replicas: 3
+        entrypoint:
+            - /ofagent/ofagent/main.py
+            - -v
+            - --consul=consul:8500
+            - --fluentd=fluentd:24224
+            - --controller=onos:6653
+            - --grpc-endpoint=vcore:50556
+            - --instance-id-is-container-name
+        volumes:
+            - /var/run/docker.sock:/tmp/docker.sock
+        networks:
+            - voltha-net
+
+networks:
+    voltha-net:
+        external:
+            name: voltha_net
diff --git a/compose/docker-compose-onos-swarm.yml b/compose/docker-compose-onos-swarm.yml
new file mode 100644
index 0000000..54d43ed
--- /dev/null
+++ b/compose/docker-compose-onos-swarm.yml
@@ -0,0 +1,54 @@
+#
+# This Docker stackfile deploys a single onos container and freeradius.
+#
+# The stackfile assumes that overlay network 'voltha_net' has already been
+# created. To deploy the stack, issue the command:
+#
+#     docker stack deploy -c docker-compose-onos-swarm.yml onos
+#
+version: "3.2"
+services:
+    freeradius:
+        deploy:
+            replicas: 1
+        image: "marcelmaatkamp/freeradius"
+        ports:
+            - "1812:1812/udp"
+            - "1813:1813"
+            - "18120:18120"
+        volumes:
+            - /cord/incubator/voltha/compose/data/clients.conf:/etc/raddb/clients.conf
+            - /cord/incubator/voltha/compose/data/users:/etc/raddb/users
+        networks:
+            - net
+            - voltha-net
+
+    onos:
+        deploy:
+            # Single instance for now since a cluster 
+            # needs to be configured through onos 
+           replicas: 1
+        image: "cord/onos"
+        ports:
+            - 8101:8101 # ssh
+            - 6653:6653 # OF
+            - 8181:8181 # UI
+        environment:
+            ONOS_APPS: 'drivers,openflow-base'
+        networks:
+            - net
+            - voltha-net
+
+networks:
+    net:
+        driver: overlay
+        driver_opts:
+           encrypted: "true"
+        ipam:
+            driver: default
+            config:
+                - subnet: 172.25.0.0/24
+
+    voltha-net:
+        external:
+            name: voltha_net
diff --git a/install/voltha-swarm-start.sh b/install/voltha-swarm-start.sh
index ad5bdd0..4b40887 100755
--- a/install/voltha-swarm-start.sh
+++ b/install/voltha-swarm-start.sh
@@ -35,6 +35,7 @@
 
 
 docker stack deploy -c ${voltha_base_dir}/compose/docker-compose-voltha-swarm.yml vcore
+docker stack deploy -c ${voltha_base_dir}/compose/docker-compose-ofagent-swarm.yml ofagent
 docker stack deploy -c ${voltha_base_dir}/compose/docker-compose-envoy-swarm.yml voltha
 docker stack deploy -c ${voltha_base_dir}/compose/docker-compose-vcli.yml cli
 docker stack deploy -c ${voltha_base_dir}/compose/docker-compose-chameleon-swarm.yml chameleon
diff --git a/install/voltha-swarm-stop.sh b/install/voltha-swarm-stop.sh
old mode 100644
new mode 100755
index 8cdf9db..2633cd9
--- a/install/voltha-swarm-stop.sh
+++ b/install/voltha-swarm-stop.sh
@@ -4,6 +4,7 @@
 docker service rm netconf_netconf
 docker service rm cli_cli
 docker service rm voltha_voltha
+docker service rm ofagent_ofagent 
 docker service rm vcore_vcore
 docker service rm tools
 docker stack rm consul
diff --git a/ofagent/connection_mgr.py b/ofagent/connection_mgr.py
index 8a4081c..0d75be0 100644
--- a/ofagent/connection_mgr.py
+++ b/ofagent/connection_mgr.py
@@ -28,34 +28,38 @@
 from grpc._channel import _Rendezvous
 from ofagent.protos import third_party
 from protos import voltha_pb2
+from protos.voltha_pb2 import OfAgentSubscriber
 from grpc_client import GrpcClient
 
 from agent import Agent
 from google.protobuf.empty_pb2 import Empty
+from common.utils.dockerhelpers import get_my_containers_name
 
 
 log = get_logger()
 # _ = third_party
 
 class ConnectionManager(object):
-
-    def __init__(self, consul_endpoint, voltha_endpoint, controller_endpoints,
-                 voltha_retry_interval=0.5, devices_refresh_interval=5):
+    def __init__(self, consul_endpoint, vcore_endpoint, controller_endpoints,
+                 vcore_retry_interval=0.5, devices_refresh_interval=5,
+                 subscription_refresh_interval=5):
 
         log.info('init-connection-manager')
-        log.info('list-of-controllers',controller_endpoints=controller_endpoints)
+        log.info('list-of-controllers', controller_endpoints=controller_endpoints)
         self.controller_endpoints = controller_endpoints
         self.consul_endpoint = consul_endpoint
-        self.voltha_endpoint = voltha_endpoint
+        self.vcore_endpoint = vcore_endpoint
 
         self.channel = None
-        self.grpc_client = None  # single, shared gRPC client to Voltha
+        self.grpc_client = None  # single, shared gRPC client to vcore
 
         self.agent_map = {}  # (datapath_id, controller_endpoint) -> Agent()
         self.device_id_to_datapath_id_map = {}
 
-        self.voltha_retry_interval = voltha_retry_interval
+        self.vcore_retry_interval = vcore_retry_interval
         self.devices_refresh_interval = devices_refresh_interval
+        self.subscription_refresh_interval = subscription_refresh_interval
+        self.subscription = None
 
         self.running = False
 
@@ -68,11 +72,8 @@
 
         self.running = True
 
-        # Get voltha grpc endpoint
-        self.channel = self.get_grpc_channel_with_voltha()
-
-        # Create shared gRPC API object
-        self.grpc_client = GrpcClient(self, self.channel).start()
+        # Start monitoring the vcore grpc channel
+        reactor.callInThread(self.monitor_vcore_grpc_channel)
 
         # Start monitoring logical devices and manage agents accordingly
         reactor.callLater(0, self.monitor_logical_devices)
@@ -87,8 +88,9 @@
         for agent in self.agent_map.itervalues():
             agent.stop()
         self.running = False
-        self.grpc_client.stop()
-        del self.channel
+
+        self._reset_grpc_attributes()
+
         log.info('stopped')
 
     def resolve_endpoint(self, endpoint):
@@ -98,52 +100,121 @@
                 ip_port_endpoint = get_endpoint_from_consul(
                     self.consul_endpoint, endpoint[1:])
                 log.info(
-                    'Found endpoint {} service at {}'.format(endpoint,
-                                                             ip_port_endpoint))
+                    '{}-service-endpoint-found'.format(endpoint), address=ip_port_endpoint)
             except Exception as e:
-                log.error('Failure to locate {} service from '
-                               'consul {}:'.format(endpoint, repr(e)))
-                log.error('Committing suicide...')
+                log.error('{}-service-endpoint-not-found'.format(endpoint), exception=repr(e))
+                log.error('committing-suicide')
                 # Committing suicide in order to let docker restart ofagent
                 os.system("kill -15 {}".format(os.getpid()))
         if ip_port_endpoint:
             host, port = ip_port_endpoint.split(':', 2)
             return host, int(port)
 
-    def get_grpc_channel_with_voltha(self):
-        log.info('Resolving voltha endpoint {} from consul'.format(
-            self.voltha_endpoint))
-        host, port = self.resolve_endpoint(self.voltha_endpoint)
+    def _reset_grpc_attributes(self):
+        log.debug('start-reset-grpc-attributes')
+
+        if self.grpc_client is not None:
+            self.grpc_client.stop()
+
+        if self.channel is not None:
+            del self.channel
+
+        self.is_alive = False
+        self.channel = None
+        self.subscription = None
+        self.grpc_client = None
+
+        log.debug('stop-reset-grpc-attributes')
+
+    def _assign_grpc_attributes(self):
+        log.debug('start-assign-grpc-attributes')
+
+        host, port = self.resolve_endpoint(self.vcore_endpoint)
+        log.info('revolved-vcore-endpoint', endpoint=self.vcore_endpoint, host=host, port=port)
+
         assert host is not None
         assert port is not None
-        # Create grpc channel to Voltha
-        channel = grpc.insecure_channel('{}:{}'.format(host, port))
-        log.info('Acquired a grpc channel to voltha')
-        return channel
+
+        # Establish a connection to the vcore GRPC server
+        self.channel = grpc.insecure_channel('{}:{}'.format(host, port))
+        self.is_alive = True
+
+        log.debug('stop-assign-grpc-attributes')
+
+    @inlineCallbacks
+    def monitor_vcore_grpc_channel(self):
+        log.debug('start-monitor-vcore-grpc-channel')
+
+        while self.running:
+            try:
+                # If a subscription is not yet assigned then establish new GRPC connection
+                # ... otherwise keep using existing connection details
+                if self.subscription is None:
+                    self._assign_grpc_attributes()
+
+                # Send subscription request to register the current ofagent instance
+                container_name = get_my_containers_name()
+                stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
+                subscription = stub.Subscribe(OfAgentSubscriber(ofagent_id=container_name))
+
+                # If the subscriber id matches the current instance
+                # ... then the subscription has succeeded
+                if subscription is not None and subscription.ofagent_id == container_name:
+                    if self.subscription is None:
+                        # Keep details on the current GRPC session and subscription
+                        log.debug('subscription-with-vcore-successful', subscription=subscription)
+                        self.subscription = subscription
+                        self.grpc_client = GrpcClient(self, self.channel).start()
+
+                    # Sleep a bit in between each subscribe
+                    yield asleep(self.subscription_refresh_interval)
+
+                    # Move on to next subscribe request
+                    continue
+
+                # The subscription did not succeed, reset and move on
+                else:
+                    log.info('subscription-with-vcore-unavailable', subscription=subscription)
+
+            except _Rendezvous, e:
+                log.error('subscription-with-vcore-terminated',exception=e, status=e.code())
+
+            except Exception as e:
+                log.exception('unexpected-subscription-termination-with-vcore', e=e)
+
+            # Reset grpc details
+            # The vcore instance is either not available for subscription
+            # or a failure occurred with the existing communication.
+            self._reset_grpc_attributes()
+
+            # Sleep for a short period and retry
+            yield asleep(self.vcore_retry_interval)
+
+        log.debug('stop-monitor-vcore-grpc-channel')
 
     @inlineCallbacks
     def get_list_of_logical_devices_from_voltha(self):
 
-        while True:
-            log.info('Retrieve devices from voltha')
+        while self.running:
+            log.info('retrieve-logical-device-list')
             try:
                 stub = voltha_pb2.VolthaLocalServiceStub(self.channel)
                 devices = stub.ListLogicalDevices(Empty()).items
                 for device in devices:
-                    log.info("Devices {} -> {}".format(device.id,
-                                                       device.datapath_id))
+                    log.info("logical-device-entry", id=device.id, datapath_id=device.datapath_id)
+
                 returnValue(devices)
 
             except _Rendezvous, e:
+                log.error('vcore-communication-failure', exception=e, status=e.code())
                 if e.code() == StatusCode.UNAVAILABLE:
                     os.system("kill -15 {}".format(os.getpid()))
 
             except Exception as e:
-                log.error('Failure to retrieve devices from '
-                               'voltha: {}'.format(repr(e)))
+                log.exception('logical-devices-retrieval-failure', exception=e)
 
-            log.info('reconnect', after_delay=self.voltha_retry_interval)
-            yield asleep(self.voltha_retry_interval)
+            log.info('reconnect', after_delay=self.vcore_retry_interval)
+            yield asleep(self.vcore_retry_interval)
 
     def refresh_agent_connections(self, devices):
         """
@@ -185,7 +256,7 @@
         device_id = device.id
         for controller_endpoint in self.controller_endpoints:
             agent = Agent(controller_endpoint, datapath_id,
-                      device_id, self.grpc_client)
+                          device_id, self.grpc_client)
             agent.start()
             self.agent_map[(datapath_id,controller_endpoint)] = agent
             self.device_id_to_datapath_id_map[device_id] = datapath_id
@@ -200,30 +271,45 @@
 
     @inlineCallbacks
     def monitor_logical_devices(self):
-        while True:
+        log.debug('start-monitor-logical-devices')
+
+        while self.running:
+            log.info('monitoring-logical-devices')
+
             # should change to a gRPC streaming call
             # see https://jira.opencord.org/browse/CORD-821
 
-            # get current list from Voltha
-            devices = yield self.get_list_of_logical_devices_from_voltha()
+            try:
+                if self.channel is not None and self.grpc_client is not None:
+                    # get current list from Voltha
+                    devices = yield self.get_list_of_logical_devices_from_voltha()
 
-            # update agent list and mapping tables as needed
-            self.refresh_agent_connections(devices)
+                    # update agent list and mapping tables as needed
+                    self.refresh_agent_connections(devices)
+                else:
+                    log.info('vcore-communication-unavailable')
 
-            # wait before next poll
-            yield asleep(self.devices_refresh_interval)
-            log.info('Monitor connections')
+                # wait before next poll
+                yield asleep(self.devices_refresh_interval)
+
+            except _Rendezvous, e:
+                log.error('vcore-communication-failure', exception=repr(e), status=e.code())
+
+            except Exception as e:
+                log.exception('unexpected-vcore-communication-failure', exception=repr(e))
+
+        log.debug('stop-monitor-logical-devices')
 
     def forward_packet_in(self, device_id, ofp_packet_in):
         datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
         if datapath_id:
-           for controller_endpoint in self.controller_endpoints:
-               agent = self.agent_map[(datapath_id,controller_endpoint)]
-               agent.forward_packet_in(ofp_packet_in)
+            for controller_endpoint in self.controller_endpoints:
+                agent = self.agent_map[(datapath_id, controller_endpoint)]
+                agent.forward_packet_in(ofp_packet_in)
 
     def forward_change_event(self, device_id, event):
         datapath_id = self.device_id_to_datapath_id_map.get(device_id, None)
         if datapath_id:
-           for controller_endpoint in self.controller_endpoints:
-               agent = self.agent_map[(datapath_id,controller_endpoint)]
-               agent.forward_change_event(event)
+            for controller_endpoint in self.controller_endpoints:
+                agent = self.agent_map[(datapath_id, controller_endpoint)]
+                agent.forward_change_event(event)
diff --git a/tests/itests/README.md b/tests/itests/README.md
index 644b4ee..4aa9572 100644
--- a/tests/itests/README.md
+++ b/tests/itests/README.md
@@ -41,6 +41,23 @@
 . ./env.sh
 nosetests -s tests/itests/ofagent/test_ofagent_multicontroller_failover.py
 ```
+* **Ofagent_recovery**: This tests the OFAgent capability
+to recover the connectivity with Voltha after a component failure.
+Also note this test takes a while to run (approximately 6 mins).
+The steps it follows are
+    * Spawns three ONOS controllers and clusters them.
+    * Spawns required Voltha components.
+    * OFagent establishes connection with the three spawned controllers.
+    * Adds simulated OLT and enables it.
+    * Stop/start OFAgent and VOLTHA processes (2 separate tests)
+    * Ensure that the OLT created prior to stopping process is still present
+    * Adds another simulated OLT to ensure connectivity
+
+```
+cd /cord/incubator/voltha
+. ./env.sh
+nosetests -s tests/itests/ofagent/test_ofagent_recovery.py
+```
 * **Frameio**:  This tests the packet send/receive/filter capabilities of the 
 FrameIOManager.   This test needs to run as root.
 ```
diff --git a/tests/itests/ofagent/test_ofagent_recovery.py b/tests/itests/ofagent/test_ofagent_recovery.py
new file mode 100644
index 0000000..38ea471
--- /dev/null
+++ b/tests/itests/ofagent/test_ofagent_recovery.py
@@ -0,0 +1,238 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import os
+import time
+import json
+
+from tests.itests.voltha.rest_base import RestBase
+
+this_dir = os.path.abspath(os.path.dirname(__file__))
+
+from tests.itests.docutests.test_utils import run_command_to_completion_with_raw_stdout
+
+log = logging.getLogger(__name__)
+
+DOCKER_COMPOSE_FILE = "compose/docker-compose-ofagent-test.yml"
+
+command_defs = dict(
+    docker_stop="docker stop {}",
+    docker_start="docker start {}",
+    docker_compose_start_all="docker-compose -f {} up -d "
+        .format(DOCKER_COMPOSE_FILE),
+    docker_compose_stop="docker-compose -f {} stop"
+        .format(DOCKER_COMPOSE_FILE),
+    docker_compose_rm_f="docker-compose -f {} rm -f"
+        .format(DOCKER_COMPOSE_FILE),
+    onos_form_cluster="./tests/itests/ofagent/onos-form-cluster",
+    onos1_ip="docker inspect --format '{{ .NetworkSettings.Networks.compose_default.IPAddress }}' onos1",
+    onos2_ip="docker inspect --format '{{ .NetworkSettings.Networks.compose_default.IPAddress }}' onos2",
+    onos3_ip="docker inspect --format '{{ .NetworkSettings.Networks.compose_default.IPAddress }}' onos3",
+    add_olt='''curl -k -s -X POST -d '{"type": "simulated_olt"}' \
+               https://localhost:8881/api/v1/local/devices''',
+    enable_olt="curl -k -s -X POST https://localhost:8881/api/v1/local/devices/{}/enable",
+    get_onos_devices="curl -u karaf:karaf  http://localhost:8181/onos/v1/devices")
+
+
+class OfagentRecoveryTest(RestBase):
+    def setUp(self):
+        # Run Voltha,OFAgent,3 ONOS and form ONOS cluster.
+        print "Starting all containers ..."
+        cmd = command_defs['docker_compose_start_all']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        print "Waiting for all containers to be ready ..."
+        time.sleep(60)
+        cmd = command_defs['onos1_ip']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        onos1_ip = out
+        print "ONOS1 IP is {}".format(onos1_ip)
+        cmd = command_defs['onos2_ip']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        onos2_ip = out
+        print "ONOS2 IP is {}".format(onos2_ip)
+        cmd = command_defs['onos3_ip']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        onos3_ip = out
+        print "ONOS3 IP is {}".format(onos3_ip)
+        cmd = command_defs['onos_form_cluster'] + ' {} {} {}'.format(onos1_ip.strip(),
+                                                                     onos2_ip.strip(),
+                                                                     onos3_ip.strip())
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        print "Cluster Output :{} ".format(out)
+
+    def tearDown(self):
+        # Stopping and Removing Voltha,OFAgent,3 ONOS.
+        print "Stopping and removing all containers ..."
+        cmd = command_defs['docker_compose_stop']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        print "Waiting for all containers to be stopped ..."
+        time.sleep(1)
+        cmd = command_defs['docker_compose_rm_f']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+
+    def add_device(self):
+        print "Adding device"
+
+        cmd = command_defs['add_olt']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+        device = json.loads(out)
+
+        print "Added device - id:{}, type:{}".format(device['id'], device['type'])
+        time.sleep(5)
+
+        return device
+
+    def enable_device(self, device_id):
+        print "Enabling device - id:{}".format(device_id)
+
+        cmd = command_defs['enable_olt'].format(device_id)
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+
+        time.sleep(30)
+        print "Enabled device - id:{}".format(device_id)
+
+    def get_device(self, device_id, expected_code=200):
+        print "Getting device - id:{}".format(device_id)
+
+        device = self.get('/api/v1/local/devices/{}'.format(device_id),
+                           expected_code=expected_code)
+
+        if device is not None:
+            print "Got device - id:{}, type:{}".format(device['id'], device['type'])
+        else:
+            print "Unable to get device - id:{}".format(device_id)
+
+        return device
+
+    def get_onos_devices(self):
+        print "Getting ONOS devices ..."
+        cmd = command_defs['get_onos_devices']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+
+        if out is not None:
+            onos_devices = json.loads(out)
+            print "Got ONOS devices"
+        else:
+            onos_devices = None
+            print "Unable to get ONOS devices"
+
+        return onos_devices
+
+    def stop_container(self, container):
+        print "Stopping {} ...".format(container)
+
+        cmd = command_defs['docker_stop'].format(container)
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+
+        time.sleep(10)
+        print "Stopped {}".format(container)
+
+    def start_container(self, container):
+        print "Starting {} ...".format(container)
+
+        cmd = command_defs['docker_start'].format(container)
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+
+        time.sleep(10)
+        print "Started {}".format(container)
+
+    def test_01_recovery_after_voltha_restart(self):
+        # Add and enable a new OLT device
+        device_1 = self.add_device()
+        self.enable_device(device_1['id'])
+
+        # Verify that the device was propagated in ONOS
+        onos_devices = self.get_onos_devices()
+
+        self.assertEqual(len(onos_devices['devices']), 1)
+
+        # Restart voltha
+        self.stop_container('compose_voltha_1')
+        self.assertEqual(self.get_device(device_1['id'], 503), None)
+        self.start_container('compose_voltha_1')
+
+        # Get the device from VOLTHA after restart
+        device_1_after = self.get_device(device_1['id'])
+        self.assertEqual(device_1_after['id'], device_1['id'])
+
+        # Get the device from ONOS after restart
+        onos_devices = self.get_onos_devices()
+
+        self.assertEqual(len(onos_devices['devices']), 1)
+
+        # Add a new device
+        device_2 = self.add_device()
+        self.enable_device(device_2['id'])
+
+        # Ensure that ONOS has picked up the new device
+        onos_devices = self.get_onos_devices()
+
+        self.assertEqual(len(onos_devices['devices']), 2)
+
+    def test_02_recovery_after_ofagent_restart(self):
+        # Add and enable a new OLT device
+        device_1 = self.add_device()
+        self.enable_device(device_1['id'])
+
+        # Verify that the device was propagated in ONOS
+        onos_devices = self.get_onos_devices()
+
+        self.assertEqual(len(onos_devices['devices']), 1)
+
+        # Restart ofagent
+        self.stop_container('compose_ofagent_1')
+
+        # Try to create a device while ofagent is down
+        # this will succeed from a voltha point of view
+        # but it will not be propagated to ONOS until ofagent is back up
+        device_fail = self.add_device()
+        self.enable_device(device_fail['id'])
+        onos_devices = self.get_onos_devices()
+
+        # Onos should only have 1 device
+        self.assertNotEqual(len(onos_devices['devices']), 2)
+
+        self.start_container('compose_ofagent_1')
+
+        # Get the device from VOLTHA after restart
+        device_1_after = self.get_device(device_1['id'])
+        self.assertEqual(device_1_after['id'], device_1['id'])
+
+        # Get the device from ONOS after restart
+        onos_devices = self.get_onos_devices()
+        self.assertEqual(len(onos_devices['devices']), 2)
+
+        # Add a new device
+        device_2 = self.add_device()
+        self.enable_device(device_2['id'])
+
+        # Ensure that ONOS has picked up the new device
+        onos_devices = self.get_onos_devices()
+
+        self.assertEqual(len(onos_devices['devices']), 3)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index f87830d..c598911 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -18,8 +18,10 @@
 import structlog
 from google.protobuf.empty_pb2 import Empty
 from grpc import StatusCode
+from grpc._channel import _Rendezvous
 
 from common.utils.grpc_utils import twisted_async
+from twisted.internet import task
 from common.utils.id_generation import create_cluster_device_id
 from voltha.core.config.config_root import ConfigRoot
 from voltha.protos.openflow_13_pb2 import PacketIn, Flows, FlowGroups, \
@@ -29,11 +31,13 @@
     VolthaInstance, Adapters, LogicalDevices, LogicalDevice, Ports, \
     LogicalPorts, Devices, Device, DeviceType, \
     DeviceTypes, DeviceGroups, DeviceGroup, AdminState, OperStatus, ChangeEvent, \
-    AlarmFilter, AlarmFilters, SelfTestResponse
+    AlarmFilter, AlarmFilters, SelfTestResponse, OfAgentSubscriber
 from voltha.protos.device_pb2 import PmConfigs, Images, ImageDownload, ImageDownloads
 from voltha.protos.common_pb2 import OperationResp
+from voltha.protos.bbf_fiber_base_pb2 import AllMulticastDistributionSetData, AllMulticastGemportsConfigData
 from voltha.registry import registry
 from requests.api import request
+from common.utils.asleep import asleep
 
 log = structlog.get_logger()
 
@@ -48,6 +52,14 @@
         self.started_with_existing_data = False
         self.stopped = False
 
+        self.restart_delay = 2
+        self.subscriber = None
+        self.ofagent_heartbeat_count = 0
+        self.ofagent_heartbeat_max_count = 3
+        self.ofagent_heartbeat_delay = 5
+        self.ofagent_heartbeat_lc = None
+        self.ofagent_is_alive = True
+
     def start(self, config_backend=None):
         log.debug('starting')
         if config_backend:
@@ -72,12 +84,19 @@
 
         registry('grpc_server').register(
             add_VolthaLocalServiceServicer_to_server, self)
+
         log.info('started')
         return self
 
     def stop(self):
         log.debug('stopping')
         self.stopped = True
+
+        if self.ofagent_heartbeat_lc is not None:
+            self.ofagent_heartbeat_lc.stop()
+
+        self._ofagent_session_termination()
+
         log.info('stopped')
 
     def get_proxy(self, path, exclusive=False):
@@ -986,6 +1005,7 @@
     # bbf_fiber rpcs end
 
     def StreamPacketsOut(self, request_iterator, context):
+        log.debug('start-stream-packets-out')
 
         @twisted_async
         def forward_packet_out(packet_out):
@@ -995,16 +1015,20 @@
         for request in request_iterator:
             forward_packet_out(packet_out=request)
 
+        log.debug('stop-stream-packets-out')
+
         return Empty()
 
     def ReceivePacketsIn(self, request, context):
-        while 1:
+        log.debug('start-receive-packets-in')
+        while self.ofagent_is_alive:
             try:
                 packet_in = self.core.packet_in_queue.get(timeout=1)
                 yield packet_in
             except QueueEmpty:
                 if self.stopped:
                     break
+        log.debug('stop-receive-packets-in')
 
     def send_packet_in(self, device_id, ofp_packet_in):
         """Must be called on the twisted thread"""
@@ -1012,13 +1036,15 @@
         self.core.packet_in_queue.put(packet_in)
 
     def ReceiveChangeEvents(self, request, context):
-        while 1:
+        log.debug('start-receive-change-events')
+        while self.ofagent_is_alive:
             try:
                 event = self.core.change_event_queue.get(timeout=1)
                 yield event
             except QueueEmpty:
                 if self.stopped:
                     break
+        log.debug('stop-receive-change-events')
 
     def send_port_change_event(self, device_id, port_status):
         """Must be called on the twisted thread"""
@@ -1151,3 +1177,71 @@
                 'Device \'{}\' not found'.format(request.id))
             context.set_code(StatusCode.NOT_FOUND)
             return SelfTestResponse()
+
+    def _ofagent_session_termination(self):
+        log.debug('start-ofagent-session-termination')
+
+        # Stop ofagent heartbeat
+        if self.ofagent_heartbeat_lc is not None:
+            self.ofagent_heartbeat_lc.stop()
+
+        # Reset flags and assignments
+        self.ofagent_is_alive = False
+        self.subscriber = None
+        self.ofagent_heartbeat_count = 0
+
+        # Some local services will stop (packet-in/change-events)
+        # need to re-register them
+        registry('grpc_server').register(
+            add_VolthaLocalServiceServicer_to_server, self)
+
+        log.debug('stop-ofagent-session-termination')
+
+    def _ofagent_session_heartbeat(self):
+        log.debug('start-ofagent-heartbeat')
+        if self.ofagent_heartbeat_count > self.ofagent_heartbeat_max_count:
+            self._ofagent_session_termination()
+        else:
+            self.ofagent_heartbeat_count += 1
+
+        log.debug('stop-ofagent-heartbeat')
+
+    @twisted_async
+    def Subscribe(self, request, context):
+        log.info('grpc-request', request=request)
+
+        # Check if an ofagent subscriber is assigned
+        if self.subscriber is None:
+            log.debug('ofagent-subscriber-request')
+
+            try:
+                # Assign the request as the active subscriber
+                self.subscriber = OfAgentSubscriber(
+                    ofagent_id=request.ofagent_id,
+                    voltha_id=self.instance_id
+                )
+
+                # Start the hearbeat
+                self.ofagent_heartbeat_count = 0
+                self.ofagent_heartbeat_lc = task.LoopingCall(self._ofagent_session_heartbeat)
+                self.ofagent_heartbeat_lc.start(self.ofagent_heartbeat_delay)
+
+                log.debug('ofagent-subscriber-connected', subscriber=self.subscriber)
+
+            except _Rendezvous, e:
+                log.error('ofagent-subscriber-failure', exception=repr(e), status=e.code())
+
+            except Exception as e:
+                log.exception('ofagent-subscriber-unexpected-failure', exception=repr(e))
+
+        elif self.subscriber.ofagent_id == request.ofagent_id:
+            log.debug('ofagent-subscriber-matches-assigned',
+                     current=self.subscriber)
+            # reset counter
+            self.ofagent_heartbeat_count = 0
+
+        else:
+            log.debug('ofagent-subscriber-not-matching-assigned',
+                     current=self.subscriber)
+
+        return self.subscriber
diff --git a/voltha/protos/voltha.proto b/voltha/protos/voltha.proto
index 30e1834..15cef20 100644
--- a/voltha/protos/voltha.proto
+++ b/voltha/protos/voltha.proto
@@ -207,6 +207,14 @@
     SelfTestResult result = 1;
 }
 
+message OfAgentSubscriber {
+    // ID of ofagent instance
+    string ofagent_id = 1;
+
+    // ID of voltha instance to which the ofagent is subscribed
+    string voltha_id = 2;
+}
+
 /*
  * Cluster-wide Voltha APIs
  *
@@ -1612,4 +1620,6 @@
             post: "/api/v1/local/devices/{id}/self_test"
         };
     }
+
+    rpc Subscribe(OfAgentSubscriber) returns (OfAgentSubscriber) {}
 }