[ 4460 ] Minor fix when getting invalid voltha instance
Initial commit of the Global Forwarder.

Change-Id: I6c619a8589abaeecba00c004a42beaf063f31448
diff --git a/common/utils/id_generation.py b/common/utils/id_generation.py
index 984c19c..eedf434 100644
--- a/common/utils/id_generation.py
+++ b/common/utils/id_generation.py
@@ -17,6 +17,21 @@
 
 from uuid import uuid4
 
+
+BROADCAST_CORE_ID=hex(0xFFFF)[2:]
+
+def get_next_core_id(current_id_in_hex_str):
+    """
+    :param current_id_in_hex_str: a hex string of the maximum core id 
+    assigned without the leading 0x characters
+    :return: current_id_in_hex_str + 1 in hex string 
+    """
+    if not current_id_in_hex_str or current_id_in_hex_str == '':
+        return '0001'
+    else:
+        return format(int(current_id_in_hex_str, 16) + 1, '04x')
+
+
 def create_cluster_logical_device_ids(core_id, switch_id):
     """
     Creates a logical device id and an OpenFlow datapath id that is unique 
@@ -28,9 +43,21 @@
     """
     switch_id = format(switch_id, '012x')
     id = '{}{}'.format(format(int(core_id), '04x'), switch_id)
-    hex_int=int(id,16)
+    hex_int = int(id, 16)
     return id, hex_int
 
+def is_broadcast_core_id(id):
+    assert id and len(id) == 16
+    return id[:4] == BROADCAST_CORE_ID
+
+def create_cluster_id():
+    """
+    Returns an id that is common across all voltha instances.  The id  
+    is a str of 64 bits.  The lower 48 bits refers to an id specific to that 
+    object while the upper 16 bits refers a broadcast core_id
+    :return: An common id across all Voltha instances
+    """
+    return '{}{}'.format(BROADCAST_CORE_ID, uuid4().hex[:12])
 
 def create_cluster_device_id(core_id):
     """
@@ -42,9 +69,11 @@
     """
     return '{}{}'.format(format(int(core_id), '04x'), uuid4().hex[:12])
 
+
 def get_core_id_from_device_id(device_id):
     # Device id is a string and the first 4 characters represent the core_id
-    assert device_id and device_id.len() == 16
+    assert device_id and len(device_id) == 16
+    # Get the leading 4 hexs and remove leading 0's
     return device_id[:4]
 
 
@@ -55,9 +84,11 @@
     :param logical_device_id: 
     :return: core_id string
     """
-    assert logical_device_id and logical_device_id.len() == 16
+    assert logical_device_id and len(logical_device_id) == 16
+    # Get the leading 4 hexs and remove leading 0's
     return logical_device_id[:4]
 
+
 def get_core_id_from_datapath_id(datapath_id):
     """
     datapath id is a uint64 where:
@@ -68,6 +99,6 @@
     """
     assert datapath_id
     # Get the hex string and remove the '0x' prefix
-    id_in_hex_str=hex(datapath_id)[2:]
-    assert id_in_hex_str.len() > 12
+    id_in_hex_str = hex(datapath_id)[2:]
+    assert len(id_in_hex_str) > 12
     return id_in_hex_str[:-12]
diff --git a/tests/itests/README.md b/tests/itests/README.md
index 00d4020..658d95e 100644
--- a/tests/itests/README.md
+++ b/tests/itests/README.md
@@ -69,6 +69,8 @@
 ``` 
 Run the test:
 ``` 
+cd /cord/incubator/voltha
+. ./env.sh
 nosetests -s tests/itests/voltha/test_device_state_changes.py
 ```  
 
@@ -93,6 +95,8 @@
 
 Run the test:
 ``` 
+cd /cord/incubator/voltha
+. ./env.sh
 nosetests -s tests/itests/voltha/test_persistence.py
 ```  
 
@@ -108,6 +112,14 @@
 nosetests -s tests/itests/voltha/test_voltha_rest_apis.py
 ```    
 
-* Voltha_alarm_events: TODO
+* **Voltha_alarm_events**: TODO
 
-* Voltha_alarm_filters: TODO
+* **Voltha_alarm_filters**: TODO
+
+* **Dispatcher**:  This test exercises the requests forwarding via the Global 
+handler.
+```
+cd /cord/incubator/voltha
+. ./env.sh
+nosetests -s tests/itests/voltha/test_dispatcher.py
+```  
diff --git a/tests/itests/voltha/test_dispatcher.py b/tests/itests/voltha/test_dispatcher.py
new file mode 100644
index 0000000..c369fc0
--- /dev/null
+++ b/tests/itests/voltha/test_dispatcher.py
@@ -0,0 +1,394 @@
+from random import randint
+from time import time, sleep
+
+from google.protobuf.json_format import MessageToDict
+from unittest import main, TestCase
+from voltha.protos.device_pb2 import Device
+from tests.itests.voltha.rest_base import RestBase
+from voltha.core.flow_decomposer import mk_simple_flow_mod, in_port, output
+from voltha.protos import openflow_13_pb2 as ofp
+from common.utils.consulhelpers import get_endpoint_from_consul
+from common.utils.consulhelpers import verify_all_services_healthy
+from tests.itests.docutests.test_utils import \
+    run_command_to_completion_with_raw_stdout, \
+    run_command_to_completion_with_stdout_in_list
+from voltha.protos.voltha_pb2 import AlarmFilter
+
+LOCAL_CONSUL = "localhost:8500"
+DOCKER_COMPOSE_FILE = "compose/docker-compose-system-test.yml"
+
+command_defs = dict(
+    docker_ps="docker ps",
+    docker_compose_start_all="docker-compose -f {} up -d "
+        .format(DOCKER_COMPOSE_FILE),
+    docker_stop_and_remove_all_containers="docker-compose -f {} down"
+        .format(DOCKER_COMPOSE_FILE),
+    docker_compose_start_voltha="docker-compose -f {} up -d voltha "
+        .format(DOCKER_COMPOSE_FILE),
+    docker_compose_stop_voltha="docker-compose -f {} stop voltha"
+        .format(DOCKER_COMPOSE_FILE),
+    docker_compose_remove_voltha="docker-compose -f {} rm -f voltha"
+        .format(DOCKER_COMPOSE_FILE),
+    kafka_topics="kafkacat -b {} -L",
+    kafka_alarms="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
+    kafka_kpis="kafkacat -o end -b {} -C -t voltha.kpis -c 5"
+)
+
+class DispatcherTest(RestBase):
+
+    t0 = [time()]
+
+    def pt(self, msg=''):
+        t1 = time()
+        print '%20.8f ms - %s' % (1000 * (t1 - DispatcherTest.t0[0]),
+                                  msg)
+        DispatcherTest.t0[0] = t1
+
+
+    def wait_till(self, msg, predicate, interval=0.1, timeout=5.0):
+        deadline = time() + timeout
+        while time() < deadline:
+            if predicate():
+                return
+            sleep(interval)
+        self.fail('Timed out while waiting for condition: {}'.format(msg))
+
+
+    def test_01_global_rest_apis(self):
+        # Start the voltha ensemble with a single voltha instance
+        self._stop_and_remove_all_containers()
+        sleep(5)  # A small wait for the system to settle down
+        self.start_all_containers()
+        self.set_rest_endpoint()
+        self.set_kafka_endpoint()
+
+        self._get_root()
+        self._get_schema()
+        self._get_health()
+        self._get_voltha()
+        self._list_voltha_instances()
+        self._get_voltha_instance()
+        olt_id = self._add_olt_device()
+        self._verify_device_preprovisioned_state(olt_id)
+        self._activate_device(olt_id)
+        ldev_id = self._wait_for_logical_device(olt_id)
+        ldevices = self._list_logical_devices()
+        logical_device_id = ldevices['items'][0]['id']
+        self._get_logical_device(logical_device_id)
+        self._list_logical_device_ports(logical_device_id)
+        self._list_and_update_logical_device_flows(logical_device_id)
+        self._list_and_update_logical_device_flow_groups(logical_device_id)
+        devices = self._list_devices()
+        device_id = devices['items'][0]['id']
+        self._get_device(device_id)
+        self._list_device_ports(device_id)
+        self._list_device_flows(device_id)
+        self._list_device_flow_groups(device_id)
+        self._get_images(device_id)
+        self._self_test(device_id)
+        dtypes = self._list_device_types()
+        self._get_device_type(dtypes['items'][0]['id'])
+        alarm_filter = self._create_device_filter(olt_id)
+        self._remove_device_filter(alarm_filter['id'])
+        # TODO: PM APIs test
+
+    def test_02_cross_instances_dispatch(self):
+        """ TODO: So far manual tests done.  Needs to be automated. """
+        pass
+
+    def _stop_and_remove_all_containers(self):
+        # check if there are any running containers first
+        cmd = command_defs['docker_ps']
+        out, err, rc = run_command_to_completion_with_stdout_in_list(cmd)
+        self.assertEqual(rc, 0)
+        if len(out) > 1:  # not counting docker ps header
+            cmd = command_defs['docker_stop_and_remove_all_containers']
+            out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+            self.assertEqual(rc, 0)
+
+    def start_all_containers(self):
+        t0 = time()
+
+        # start all the containers
+        self.pt("Starting all containers ...")
+        cmd = command_defs['docker_compose_start_all']
+        out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
+        self.assertEqual(rc, 0)
+
+        self.pt("Waiting for voltha and chameleon containers to be ready ...")
+        self.wait_till('voltha services HEALTHY',
+                       lambda: verify_all_services_healthy(
+                           LOCAL_CONSUL, service_name='voltha-grpc') == True,
+                       timeout=10)
+        self.wait_till('chameleon services HEALTHY',
+                       lambda: verify_all_services_healthy(
+                           LOCAL_CONSUL,service_name='chameleon-rest') == True,
+                       timeout=10)
+
+        # Chameleon takes some time to compile the protos and make them
+        # available.  So let's wait 10 seconds
+        sleep(10)
+
+    def set_rest_endpoint(self):
+        self.rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL,
+                                                      'chameleon-rest')
+        self.base_url = 'http://' + self.rest_endpoint
+
+    def set_kafka_endpoint(self):
+        self.kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')
+
+
+    def _get_root(self):
+        res = self.get('/', expected_content_type='text/html')
+        self.assertGreaterEqual(res.find('swagger'), 0)
+
+    def _get_schema(self):
+        res = self.get('/schema')
+        self.assertEqual(set(res.keys()), {'protos', 'yang_from','swagger_from'})
+
+    def _get_health(self):
+        res = self.get('/health')
+        self.assertEqual(res['state'], 'HEALTHY')
+
+    # ~~~~~~~~~~~~~~~~~~~~~ TOP LEVEL VOLTHA OPERATIONS ~~~~~~~~~~~~~~~~~~~~~~~
+
+    def _get_voltha(self):
+        res = self.get('/api/v1')
+        self.assertEqual(res['version'], '0.9.0')
+
+    def _list_voltha_instances(self):
+        res = self.get('/api/v1/instances')
+        self.assertEqual(len(res['items']), 1)
+
+    def _get_voltha_instance(self):
+        res = self.get('/api/v1/instances')
+        voltha_id=res['items'][0]
+        res = self.get('/api/v1/instances/{}'.format(voltha_id))
+        self.assertEqual(res['version'], '0.9.0')
+
+    def _add_olt_device(self):
+        device = Device(
+            type='simulated_olt',
+            mac_address='00:00:00:00:00:01'
+        )
+        device = self.post('/api/v1/devices', MessageToDict(device),
+                           expected_code=200)
+        return device['id']
+
+    def _verify_device_preprovisioned_state(self, olt_id):
+        # we also check that so far what we read back is same as what we get
+        # back on create
+        device = self.get('/api/v1/devices/{}'.format(olt_id))
+        self.assertNotEqual(device['id'], '')
+        self.assertEqual(device['adapter'], 'simulated_olt')
+        self.assertEqual(device['admin_state'], 'PREPROVISIONED')
+        self.assertEqual(device['oper_status'], 'UNKNOWN')
+
+    def _activate_device(self, olt_id):
+        path = '/api/v1/devices/{}'.format(olt_id)
+        self.post(path + '/enable', expected_code=200)
+        device = self.get(path)
+        self.assertEqual(device['admin_state'], 'ENABLED')
+
+        self.wait_till(
+            'admin state moves to ACTIVATING or ACTIVE',
+            lambda: self.get(path)['oper_status'] in ('ACTIVATING', 'ACTIVE'),
+            timeout=0.5)
+
+        # eventually, it shall move to active state and by then we shall have
+        # device details filled, connect_state set, and device ports created
+        self.wait_till(
+            'admin state ACTIVE',
+            lambda: self.get(path)['oper_status'] == 'ACTIVE',
+            timeout=0.5)
+        device = self.get(path)
+        images = device['images']
+        image = images['image']
+        image_1 = image[0]
+        version = image_1['version']
+        self.assertNotEqual(version, '')
+        self.assertEqual(device['connect_status'], 'REACHABLE')
+
+        ports = self.get(path + '/ports')['items']
+        self.assertEqual(len(ports), 2)
+
+    def _wait_for_logical_device(self, olt_id):
+        # we shall find the logical device id from the parent_id of the olt
+        # (root) device
+        device = self.get(
+            '/api/v1/devices/{}'.format(olt_id))
+        self.assertNotEqual(device['parent_id'], '')
+        logical_device = self.get(
+            '/api/v1/logical_devices/{}'.format(device['parent_id']))
+
+        # the logical device shall be linked back to the hard device,
+        # its ports too
+        self.assertEqual(logical_device['root_device_id'], device['id'])
+
+        logical_ports = self.get(
+            '/api/v1/logical_devices/{}/ports'.format(
+                logical_device['id'])
+        )['items']
+        self.assertGreaterEqual(len(logical_ports), 1)
+        logical_port = logical_ports[0]
+        self.assertEqual(logical_port['id'], 'nni')
+        self.assertEqual(logical_port['ofp_port']['name'], 'nni')
+        self.assertEqual(logical_port['ofp_port']['port_no'], 129)
+        self.assertEqual(logical_port['device_id'], device['id'])
+        self.assertEqual(logical_port['device_port_no'], 2)
+        return logical_device['id']
+
+    def _list_logical_devices(self):
+        res = self.get('/api/v1/logical_devices')
+        self.assertGreaterEqual(len(res['items']), 1)
+        return res
+
+    def _get_logical_device(self, id):
+        res = self.get('/api/v1/logical_devices/{}'.format(id))
+        self.assertIsNotNone(res['datapath_id'])
+
+    def _list_logical_device_ports(self, id):
+        res = self.get('/api/v1/logical_devices/{}/ports'.format(id))
+        self.assertGreaterEqual(len(res['items']), 1)
+
+    def _list_and_update_logical_device_flows(self, id):
+
+        # retrieve flow list
+        res = self.get('/api/v1/logical_devices/{}/flows'.format(id))
+        len_before = len(res['items'])
+
+        # add some flows
+        req = ofp.FlowTableUpdate(
+            id=id,
+            flow_mod=mk_simple_flow_mod(
+                cookie=randint(1, 10000000000),
+                priority=len_before,
+                match_fields=[
+                    in_port(129)
+                ],
+                actions=[
+                    output(1)
+                ]
+            )
+        )
+        res = self.post('/api/v1/logical_devices/{}/flows'.format(id),
+                        MessageToDict(req, preserving_proto_field_name=True),
+                        expected_code=200)
+        # TODO check some stuff on res
+
+        res = self.get('/api/v1/logical_devices/{}/flows'.format(id))
+        len_after = len(res['items'])
+        self.assertGreater(len_after, len_before)
+
+    def _list_and_update_logical_device_flow_groups(self, id):
+
+        # retrieve flow list
+        res = self.get('/api/v1/logical_devices/{}/flow_groups'.format(id))
+        len_before = len(res['items'])
+
+        # add some flows
+        req = ofp.FlowGroupTableUpdate(
+            id=id,
+            group_mod=ofp.ofp_group_mod(
+                command=ofp.OFPGC_ADD,
+                type=ofp.OFPGT_ALL,
+                group_id=len_before + 1,
+                buckets=[
+                    ofp.ofp_bucket(
+                        actions=[
+                            ofp.ofp_action(
+                                type=ofp.OFPAT_OUTPUT,
+                                output=ofp.ofp_action_output(
+                                    port=1
+                                )
+                            )
+                        ]
+                    )
+                ]
+            )
+        )
+        res = self.post('/api/v1/logical_devices/{}/flow_groups'.format(id),
+                        MessageToDict(req, preserving_proto_field_name=True),
+                        expected_code=200)
+        # TODO check some stuff on res
+
+        res = self.get('/api/v1/logical_devices/{}/flow_groups'.format(id))
+        len_after = len(res['items'])
+        self.assertGreater(len_after, len_before)
+
+    def _list_devices(self):
+        res = self.get('/api/v1/devices')
+        self.assertGreaterEqual(len(res['items']), 2)
+        return res
+
+    def _get_device(self, id):
+        res = self.get('/api/v1/devices/{}'.format(id))
+        # TODO test result
+
+    def _list_device_ports(self, id):
+        res = self.get('/api/v1/devices/{}/ports'.format(id))
+        self.assertGreaterEqual(len(res['items']), 2)
+
+    def _list_device_flows(self, id):
+        # pump some flows into the logical device
+        res = self.get('/api/v1/devices/{}/flows'.format(id))
+        self.assertGreaterEqual(len(res['items']), 1)
+
+    def _list_device_flow_groups(self,id):
+        res = self.get('/api/v1/devices/{}/flow_groups'.format(id))
+        self.assertGreaterEqual(len(res['items']), 0)
+
+    def _list_device_types(self):
+        res = self.get('/api/v1/device_types')
+        self.assertGreaterEqual(len(res['items']), 2)
+        return res
+
+    def _get_device_type(self, dtype):
+        res = self.get('/api/v1/device_types/{}'.format(dtype))
+        self.assertIsNotNone(res)
+        # TODO test the result
+
+    def _list_device_groups(self):
+        pass
+        # res = self.get('/api/v1/device_groups')
+        # self.assertGreaterEqual(len(res['items']), 1)
+
+    def _get_device_group(self):
+        pass
+        # res = self.get('/api/v1/device_groups/1')
+        # # TODO test the result
+
+    def _get_images(self, id):
+        res = self.get('/api/v1/devices/{}/images'.format(id))
+        self.assertIsNotNone(res)
+
+    def _self_test(self, id):
+        res = self.post('/api/v1/devices/{}/self_test'.format(id),
+                        expected_code=200)
+        self.assertIsNotNone(res)
+
+    def _create_device_filter(self, device_id):
+        rules = list()
+        rule = dict()
+
+        # Create a filter with a single rule
+        rule['key'] = 'device_id'
+        rule['value'] = device_id
+        rules.append(rule)
+
+        alarm_filter = AlarmFilter(rules=rules)
+        alarm_filter = self.post('/api/v1/alarm_filters',
+                                 MessageToDict(alarm_filter),
+                                 expected_code=200)
+        self.assertIsNotNone(alarm_filter)
+        return alarm_filter
+
+    def _remove_device_filter(self, alarm_filter_id):
+        path = '/api/v1/alarm_filters/{}'.format(alarm_filter_id)
+        self.delete(path, expected_code=200)
+        alarm_filter = self.get(path, expected_code=404)
+        self.assertIsNone(alarm_filter)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/tests/itests/voltha/test_persistence.py b/tests/itests/voltha/test_persistence.py
index 428d51d..1111c1e 100644
--- a/tests/itests/voltha/test_persistence.py
+++ b/tests/itests/voltha/test_persistence.py
@@ -691,14 +691,14 @@
         rules.append(rule)
 
         alarm_filter = AlarmFilter(rules=rules)
-        alarm_filter = self.post('/api/v1/local/alarm_filters',
+        alarm_filter = self.post('/api/v1/alarm_filters',
                                  MessageToDict(alarm_filter),
                                  expected_code=200)
         self.assertIsNotNone(alarm_filter)
         return alarm_filter
 
     def remove_device_filter(self, alarm_filter_id):
-        path = '/api/v1/local/alarm_filters/{}'.format(alarm_filter_id)
+        path = '/api/v1/alarm_filters/{}'.format(alarm_filter_id)
         self.delete(path, expected_code=200)
         alarm_filter = self.get(path, expected_code=404)
         self.assertIsNone(alarm_filter)
diff --git a/tests/itests/voltha/test_voltha_alarm_filters.py b/tests/itests/voltha/test_voltha_alarm_filters.py
index 88744ea..4b73509 100644
--- a/tests/itests/voltha/test_voltha_alarm_filters.py
+++ b/tests/itests/voltha/test_voltha_alarm_filters.py
@@ -91,7 +91,7 @@
         device = Device(
             type='simulated_olt',
         )
-        device = self.post('/api/v1/local/devices', MessageToDict(device),
+        device = self.post('/api/v1/devices', MessageToDict(device),
                            expected_code=200)
         return device
 
@@ -106,7 +106,7 @@
         rules.append(rule)
 
         alarm_filter = AlarmFilter(rules=rules)
-        alarm_filter = self.post('/api/v1/local/alarm_filters', MessageToDict(alarm_filter),
+        alarm_filter = self.post('/api/v1/alarm_filters', MessageToDict(alarm_filter),
                                  expected_code=200)
 
         return alarm_filter
@@ -114,7 +114,7 @@
     # Active the simulated device.
     # This will trigger the simulation of random alarms
     def activate_device(self, device_id):
-        path = '/api/v1/local/devices/{}'.format(device_id)
+        path = '/api/v1/devices/{}'.format(device_id)
         self.post(path + '/enable', expected_code=200)
         device = self.get(path)
         self.assertEqual(device['admin_state'], 'ENABLED')
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 4de3f0f..e3860b0 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -499,20 +499,23 @@
 
     @inlineCallbacks
     def _simulate_detection_of_onus(self, device_id):
-        for i in xrange(1, 5):
-            log.info('activate-olt-for-onu-{}'.format(i))
-            vlan_id = self._olt_side_onu_activation(i)
-            yield asleep(0.05)
-            self.adapter_agent.child_device_detected(
-                parent_device_id=device_id,
-                parent_port_no=1,
-                child_device_type='simulated_onu',
-                proxy_address=Device.ProxyAddress(
-                    device_id=device_id,
-                    channel_id=vlan_id
-                ),
-                vlan=vlan_id
-            )
+        try:
+            for i in xrange(1, 5):
+                log.info('activate-olt-for-onu-{}'.format(i))
+                vlan_id = self._olt_side_onu_activation(i)
+                yield asleep(0.05)
+                self.adapter_agent.child_device_detected(
+                    parent_device_id=device_id,
+                    parent_port_no=1,
+                    child_device_type='simulated_onu',
+                    proxy_address=Device.ProxyAddress(
+                        device_id=device_id,
+                        channel_id=vlan_id
+                    ),
+                    vlan=vlan_id
+                )
+        except Exception as e:
+            log.exception('error', e=e)
 
     def _olt_side_onu_activation(self, seq):
         """
diff --git a/voltha/coordinator.py b/voltha/coordinator.py
index 20164d4..6eecc11 100644
--- a/voltha/coordinator.py
+++ b/voltha/coordinator.py
@@ -27,6 +27,7 @@
 
 from leader import Leader
 from common.utils.asleep import asleep
+from common.utils.message_queue import MessageQueue
 from voltha.registry import IComponent
 from worker import Worker
 from simplejson import dumps, loads
@@ -75,16 +76,18 @@
             'tracking_loop_delay', 1)
         self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha')
         self.leader_prefix = '/'.join((self.prefix, self.config.get(
-                self.config['leader_key'], 'leader')))
+            self.config['leader_key'], 'leader')))
         self.membership_prefix = '/'.join((self.prefix, self.config.get(
-                self.config['membership_key'], 'members'), ''))
+            self.config['membership_key'], 'members'), ''))
         self.assignment_prefix = '/'.join((self.prefix, self.config.get(
-                self.config['assignment_key'], 'assignments'), ''))
+            self.config['assignment_key'], 'assignments'), ''))
         self.workload_prefix = '/'.join((self.prefix, self.config.get(
-                self.config['workload_key'], 'work'), ''))
+            self.config['workload_key'], 'work'), ''))
         self.core_store_prefix = '/'.join((self.prefix, self.config.get(
-                self.config['core_store_key'], 'data/core')))
-        self.core_storage_suffix='core_store'
+            self.config['core_store_key'], 'data/core')))
+        self.core_store_assignment_key = self.core_store_prefix + \
+                                         '/assignment'
+        self.core_storage_suffix = 'core_store'
 
         self.retries = 0
         self.instance_id = instance_id
@@ -110,6 +113,8 @@
 
         self.wait_for_leader_deferreds = []
 
+        self.peers_mapping_queue = MessageQueue()
+
     def start(self):
         log.debug('starting')
         reactor.callLater(0, self._async_init)
@@ -142,13 +147,18 @@
             self.wait_for_leader_deferreds.append(d)
             return d
 
-
     # Wait for a core data id to be assigned to this voltha instance
     @inlineCallbacks
     def get_core_store_id_and_prefix(self):
         core_store_id = yield self.worker.get_core_store_id()
         returnValue((core_store_id, self.core_store_prefix))
 
+    def recv_peers_map(self):
+        return self.peers_mapping_queue.get()
+
+    def publish_peers_map_change(self, msg):
+        self.peers_mapping_queue.put(msg)
+
     # Proxy methods for consul with retry support
 
     def kv_get(self, *args, **kw):
@@ -249,10 +259,10 @@
                                                     self.membership_record_key,
                                                     index=index)
                 log.debug('membership-record-change-detected',
-                               index=index, record=record)
+                          index=index, record=record)
                 if record is None or \
-                    'Session' not in record or \
-                    record['Session'] != self.session_id:
+                                'Session' not in record or \
+                                record['Session'] != self.session_id:
                     log.debug('remaking-membership-record')
                     yield self._retry(self._do_create_membership_record)
 
@@ -292,7 +302,7 @@
             (index, record) = yield self._retry(self.consul.kv.get,
                                                 self.leader_prefix)
             log.debug('leadership-key',
-                           i_am_leader=result, index=index, record=record)
+                      i_am_leader=result, index=index, record=record)
 
             if record is not None:
                 if result is True:
@@ -314,7 +324,7 @@
                                                      self.leader_prefix,
                                                      index=index)
                 log.debug('leader-key-change',
-                               index=index, updated=updated)
+                          index=index, updated=updated)
                 if updated is None or updated != last:
                     # leadership has changed or vacated (or forcefully
                     # removed), apply now
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index dff4695..9140cc5 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -39,6 +39,7 @@
 from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
     LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey
 from voltha.registry import registry
+from common.utils.id_generation import create_cluster_device_id
 
 
 @implementer(IAdapterAgent)
@@ -512,7 +513,8 @@
         # we create new ONU device objects and insert them into the config
         # TODO should we auto-enable the freshly created device? Probably.
         device = Device(
-            id=uuid4().hex[:12],
+            id=create_cluster_device_id(self.core.core_store_id),
+            # id=uuid4().hex[:12],
             type=child_device_type,
             parent_id=parent_device_id,
             parent_port_no=parent_port_no,
diff --git a/voltha/core/core.py b/voltha/core/core.py
index 6d82d31..80bc5c1 100644
--- a/voltha/core/core.py
+++ b/voltha/core/core.py
@@ -40,10 +40,18 @@
 
 @implementer(IComponent)
 class VolthaCore(object):
-    def __init__(self, instance_id, core_store_id, version, log_level):
+    def __init__(self,
+                 instance_id,
+                 core_store_id,
+                 grpc_port,
+                 version,
+                 log_level):
         self.instance_id = instance_id
         self.stopped = False
-        self.dispatcher = Dispatcher(self, instance_id)
+        self.dispatcher = Dispatcher(self,
+                                     instance_id,
+                                     core_store_id,
+                                     grpc_port)
         self.core_store_id = core_store_id
         self.global_handler = GlobalHandler(
             dispatcher=self.dispatcher,
diff --git a/voltha/core/dispatcher.py b/voltha/core/dispatcher.py
index e0c6c0a..99357ac 100644
--- a/voltha/core/dispatcher.py
+++ b/voltha/core/dispatcher.py
@@ -20,22 +20,38 @@
 calls are forwarded to the LocalHandler.
 """
 import structlog
-
+from twisted.internet.defer import inlineCallbacks, returnValue
 from voltha.protos.voltha_pb2 import VolthaLocalServiceStub
+from voltha.registry import registry
+from twisted.internet import reactor
+import grpc
+from grpc import StatusCode
+from grpc._channel import _Rendezvous
+from common.utils.id_generation import get_core_id_from_device_id, \
+    is_broadcast_core_id
 
 log = structlog.get_logger()
 
 
-class Dispatcher(object):
+class DispatchError(object):
+    def __init__(self, error_code):
+        self.error_code = error_code
 
-    def __init__(self, core, instance_id):
+
+class Dispatcher(object):
+    def __init__(self, core, instance_id, core_store_id, grpc_port):
         self.core = core
         self.instance_id = instance_id
+        self.core_store_id = core_store_id
+        self.grpc_port = grpc_port
         self.local_handler = None
+        self.peers_map = dict()
+        self.grpc_conn_map = {}
 
     def start(self):
         log.debug('starting')
         self.local_handler = self.core.get_local_handler()
+        reactor.callLater(0, self._start_tracking_peers)
         log.info('started')
         return self
 
@@ -43,22 +59,101 @@
         log.debug('stopping')
         log.info('stopped')
 
-    def dispatch(self, instance_id, stub, method_name, input, context):
-        log.debug('dispatch', instance_id=instance_id, stub=stub,
-                  _method_name=method_name, input=input)
-        # special case if instance_id is us
-        if instance_id == self.instance_id:
-            # for now, we assume it is always the local stub
-            assert stub == VolthaLocalServiceStub
-            method = getattr(self.local_handler, method_name)
-            log.debug('dispatching', method=method)
-            res = method(input, context=context)
-            log.debug('dispatch-success', res=res)
-            return res
+    @inlineCallbacks
+    def dispatch(self,
+                 method_name,
+                 request,
+                 context,
+                 core_id=None,
+                 id=None,
+                 broadcast=False):
+        """
+        Called whenever a global request is received from the NBI.  The 
+        request will be dispatch as follows:
+        1) to a specific voltha Instance if the core_id is specified
+        2) to the local Voltha Instance if the request specifies an ID that 
+        matches the core id of the local Voltha instance
+        3) to a remote Voltha Instance if the request specifies an ID that 
+        matches the core id of that voltha instance
+        4) to all Voltha Instances if it's a broadcast request, 
+        e.g. getDevices, i.e. broadcast=True.  The id may or may not be 
+        None. In this case, the results will be returned as a list of 
+        responses back to the global handler
+        5) to the local voltha instance if id=None and broadcast=False.  
+        This occurs in cases where any Voltha instance will return the same 
+        output, e.g. getAdapters()
+        :param method_name: rpc name
+        :param id: the id in the request, if present.
+        :param request: the input parameters
+        :param context: grpc context
+        :return: the response of that dispatching request
+        """
+        log.debug('start',
+                  _method_name=method_name,
+                  id=id,
+                  request=request)
 
-        else:
-            log.warning('no-real-dispatch-yet')
-            raise KeyError()
+        core_id_from_request_id = None
+        if id:
+            try:
+                core_id_from_request_id = get_core_id_from_device_id(id)
+            except Exception, e:
+                log.warning('invalid-id', request=request, id=id)
+                returnValue(DispatchError(StatusCode.NOT_FOUND))
+
+        try:
+            # Broadcast request if set
+            if broadcast:
+                # broadcast to all instances (including locally)
+                res = yield self._broadcast_request(method_name,
+                                                    request,
+                                                    context)
+                returnValue(res)
+
+            # Local Dispatch
+            elif (core_id and core_id == self.core_store_id) or (not id) or \
+                    (core_id_from_request_id and (
+                        (core_id_from_request_id == self.core_store_id) or
+                        (is_broadcast_core_id(id))
+                        )
+                     ):
+                returnValue(self._local_dispatch(self.core_store_id,
+                                                 method_name,
+                                                 request,
+                                                 context))
+            # Peer Dispatch
+            elif core_id_from_request_id:
+                res = yield self._dispatch_to_peer(core_id_from_request_id,
+                                                   method_name,
+                                                   request,
+                                                   context)
+                returnValue(res)
+            else:
+                log.warning('invalid-request', request=request, id=id,
+                            core_id=core_id, broadcast=broadcast)
+                returnValue(DispatchError(StatusCode.INVALID_ARGUMENT))
+
+        except Exception as e:
+            log.exception('remote-dispatch-exception', e=e)
+            returnValue(DispatchError(StatusCode.UNKNOWN))
+
+    def get_core_id_from_instance_id(self, instance_id):
+        """
+        :param instance_id: instance name
+        :return: core id of that instance
+        """
+        if instance_id == self.instance_id:
+            return self.core_store_id
+        for id, instance in self.peers_map.iteritems():
+            if instance['id'] == instance_id:
+                return id
+
+    def get_cluster_instances(self):
+        result = []
+        result.append(self.instance_id)
+        for id, instance in self.peers_map.iteritems():
+            result.append(instance['id'])
+        return result
 
     def instance_id_by_logical_device_id(self, logical_device_id):
         log.warning('temp-mapping-logical-device-id')
@@ -69,3 +164,203 @@
         log.warning('temp-mapping-logical-device-id')
         # TODO no true dispatchong yet, we blindly map everything to self
         return self.instance_id
+
+    @inlineCallbacks
+    def _broadcast_request(self, method_name, request, context):
+        # First get local result
+        result = self._local_dispatch(self.core_store_id,
+                                      method_name,
+                                      request,
+                                      context)
+        # Then get peers results
+        for core_id in self.peers_map:
+            if self.peers_map[core_id] and self.grpc_conn_map[core_id]:
+                res = yield self._dispatch_to_peer(core_id,
+                                                   method_name,
+                                                   request,
+                                                   context)
+                if isinstance(res, DispatchError):
+                    log.warning('ignoring-peer',
+                                core_id=core_id,
+                                error_code=res.error_code)
+                else:
+                    result.MergeFrom(res)
+        returnValue(result)
+
+    def _local_dispatch(self, core_id, method_name, request, context):
+        log.debug('local-dispatch', core_id=core_id)
+        method = getattr(self.local_handler, method_name)
+        res = method(request, context=context)
+        log.debug('local-dispatch-result', res=res, context=context)
+        return res
+
+    @inlineCallbacks
+    def _start_tracking_peers(self):
+        try:
+            while True:
+                peers_map = yield registry('coordinator').recv_peers_map()
+                log.info('peers-map-changed', peers_map=peers_map)
+                yield self.update_grpc_client_map(peers_map)
+                self.peers_map = peers_map
+        except Exception, e:
+            log.exception('exception', e=e)
+
+    @inlineCallbacks
+    def update_grpc_client_map(self, peers_map):
+        try:
+            # 1. Get the list of connection to open and to close
+            to_open = dict()
+            to_close = set()
+            for id, instance in peers_map.iteritems():
+                # Check for no change
+                if id in self.peers_map and self.peers_map[id] == instance:
+                    continue
+
+                if id not in self.peers_map:
+                    if instance:
+                        to_open[id] = instance['host']
+                elif instance:
+                    to_open[id] = instance['host']
+                    if self.peers_map[id]:
+                        to_close.add(id)
+                else:
+                    if self.peers_map[id]:
+                        to_close.add(id)
+
+            # Close connections that are no longer referenced
+            old_ids = set(self.peers_map.keys()) - set(peers_map.keys())
+            for id in old_ids:
+                if self.peers_map[id]:
+                    to_close.add(id)
+
+            # 2.  Refresh the grpc connections
+            yield self._refresh_grpc_connections(to_open, to_close)
+        except Exception, e:
+            log.exception('exception', e=e)
+
+    @inlineCallbacks
+    def _refresh_grpc_connections(self, to_open, to_close):
+        try:
+            log.info('grpc-channel-refresh', to_open=to_open,
+                     to_close=to_close)
+            # First open the connection
+            for id, host in to_open.iteritems():
+                if id in self.grpc_conn_map and self.grpc_conn_map[id]:
+                    # clear connection
+                    self._disconnect_from_peer(id)
+                if host:
+                    self.grpc_conn_map[id] = \
+                        yield self._connect_to_peer(host, self.grpc_port)
+
+            # Close the unused connection
+            for id in to_close:
+                if self.grpc_conn_map[id]:
+                    # clear connection
+                    self._disconnect_from_peer(id)
+        except Exception, e:
+            log.exception('exception', e=e)
+
+    @inlineCallbacks
+    def _connect_to_peer(self, host, port):
+        try:
+            channel = yield grpc.insecure_channel('{}:{}'.format(host, port))
+            log.info('grpc-channel-created-with-peer', peer=host)
+            returnValue(channel)
+        except Exception, e:
+            log.exception('exception', e=e)
+
+    def _disconnect_from_peer(self, peer_id):
+        try:
+            if self.grpc_conn_map[peer_id]:
+                # Let garbage collection clear the connect - no API exist to
+                # close the connection
+                # yield self.grpc_conn_map[peer_id].close()
+                self.grpc_conn_map[peer_id] = None
+                log.info('grpc-channel-closed-with-peer', peer_id=peer_id)
+        except Exception, e:
+            log.exception('exception', e=e)
+        finally:
+            self.grpc_conn_map.pop(peer_id)
+
+    @inlineCallbacks
+    def _reconnect_to_peer(self, peer_id):
+        try:
+            # First disconnect
+            yield self._disconnect_from_peer(peer_id)
+            # Then reconnect
+            peer_instance = self.peers_map.get(peer_id, None)
+            if peer_instance:
+                self.grpc_conn_map[peer_id] = \
+                    yield self._connect_to_peer(peer_instance['host'],
+                                                self.grpc_port)
+                log.info('reconnected-to-peer', peer_id=peer_id)
+                returnValue(True)
+            else:
+                log.info('peer-unavailable', peer_id=peer_id)
+        except Exception, e:
+            log.exception('exception', e=e)
+        returnValue(False)
+
+    @inlineCallbacks
+    def _dispatch_to_peer(self,
+                          core_id,
+                          method_name,
+                          request,
+                          context,
+                          retry=1):
+        """
+        Invoke a gRPC call to the remote server and return the response.
+        :param core_id:  The voltha instance where this request needs to be sent
+        :param method_name: The method name inside the service stub
+        :param request: The request protobuf message
+        :param context: grprc context
+        :return: The response as a protobuf message
+        """
+        log.debug('peer-dispatch',
+                  core_id=core_id,
+                  _method_name=method_name,
+                  request=request)
+
+        if core_id not in self.peers_map or not self.peers_map[core_id]:
+            log.exception('non-existent-core-id', core_id=core_id,
+                          peers_map=self.peers_map)
+            return
+
+        try:
+            # Always request from the local service when making request to peer
+            stub = VolthaLocalServiceStub
+            method = getattr(stub(self.grpc_conn_map[core_id]), method_name)
+            response, rendezvous = yield method.with_call(request,
+                                                          metadata=context.invocation_metadata())
+            log.debug('peer-response',
+                      core_id=core_id,
+                      response=response,
+                      rendezvous_metadata=rendezvous.trailing_metadata())
+            # TODO:  Should we return the metadata as well
+            returnValue(response)
+        except grpc._channel._Rendezvous, e:
+            code = e.code()
+            if code == grpc.StatusCode.UNAVAILABLE:
+                # Try to reconnect
+                status = self._reconnect_to_peer(core_id)
+                if status and retry > 0:
+                    response = yield self._dispatch_to_peer(core_id,
+                                                            method_name,
+                                                            request,
+                                                            context,
+                                                            retry=retry - 1)
+                    returnValue(response)
+            elif code in (
+                    grpc.StatusCode.NOT_FOUND,
+                    grpc.StatusCode.INVALID_ARGUMENT,
+                    grpc.StatusCode.ALREADY_EXISTS,
+                    grpc.StatusCode.UNAUTHENTICATED,
+                    grpc.StatusCode.PERMISSION_DENIED):
+
+                pass  # don't log error, these occur naturally
+
+            else:
+                log.exception('error-invoke', e=e)
+
+            log.warning('error-from-peer', code=code)
+            returnValue(DispatchError(code))
diff --git a/voltha/core/global_handler.py b/voltha/core/global_handler.py
index 6979de3..7c80826 100644
--- a/voltha/core/global_handler.py
+++ b/voltha/core/global_handler.py
@@ -18,20 +18,23 @@
 from twisted.internet.defer import returnValue
 
 from common.utils.grpc_utils import twisted_async
+from common.utils.id_generation import create_cluster_id
 from voltha.core.config.config_root import ConfigRoot
 from voltha.protos.device_pb2 import PmConfigs, Images
 from voltha.protos.voltha_pb2 import \
     add_VolthaGlobalServiceServicer_to_server, VolthaLocalServiceStub, \
     VolthaGlobalServiceServicer, Voltha, VolthaInstances, VolthaInstance, \
-    LogicalDevice, Ports, Flows, FlowGroups, Device, SelfTestResponse
+    LogicalDevice, Ports, Flows, FlowGroups, Device, SelfTestResponse, \
+    VolthaGlobalServiceStub, Devices, DeviceType, DeviceTypes, DeviceGroup, \
+    AlarmFilter, AlarmFilters
 from voltha.registry import registry
 from google.protobuf.empty_pb2 import Empty
+from dispatcher import DispatchError
 
 log = structlog.get_logger()
 
 
 class GlobalHandler(VolthaGlobalServiceServicer):
-
     def __init__(self, dispatcher, instance_id, **init_kw):
         self.dispatcher = dispatcher
         self.instance_id = instance_id
@@ -61,539 +64,585 @@
         return self.root.get('/', depth=1)
 
     @twisted_async
-    @inlineCallbacks
     def ListVolthaInstances(self, request, context):
         log.info('grpc-request', request=request)
-        items = yield registry('coordinator').get_members()
-        returnValue(VolthaInstances(items=items))
+        items = self.dispatcher.get_cluster_instances()
+        return VolthaInstances(items=items)
 
     @twisted_async
+    @inlineCallbacks
     def GetVolthaInstance(self, request, context):
         log.info('grpc-request', request=request)
-        instance_id = request.id
-        try:
-            return self.dispatcher.dispatch(
-                instance_id,
-                VolthaLocalServiceStub,
-                'GetVolthaInstance',
-                Empty(),
-                context)
-        except KeyError:
-            context.set_details(
-                'Voltha instance \'{}\' not found'.format(instance_id))
+        core_id = self.dispatcher.get_core_id_from_instance_id(request.id)
+        if not core_id:
+            log.info('invalid-instance-id', instance=request.id)
+            context.set_details('Voltha Instance error')
             context.set_code(StatusCode.NOT_FOUND)
-            return VolthaInstance()
+            returnValue(VolthaInstance())
+
+        response = yield self.dispatcher.dispatch('GetVolthaInstance',
+                                                  Empty(),
+                                                  context,
+                                                  core_id=core_id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Voltha Instance error')
+            context.set_code(response.error_code)
+            returnValue(VolthaInstance())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def ListLogicalDevices(self, request, context):
-        log.warning('temp-limited-implementation')
-        # TODO dispatching to local instead of collecting all
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'ListLogicalDevices',
-            Empty(),
-            context)
+        log.info('grpc-request', request=request)
+        response = yield self.dispatcher.dispatch('ListLogicalDevices',
+                                                  Empty(),
+                                                  context,
+                                                  broadcast=True)
+        log.info('grpc-response', response=response)
+        returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def GetLogicalDevice(self, request, context):
         log.info('grpc-request', request=request)
 
-        try:
-            instance_id = self.dispatcher.instance_id_by_logical_device_id(
-                request.id
-            )
-        except KeyError:
+        response = yield self.dispatcher.dispatch('GetLogicalDevice',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
             context.set_details(
-                'Logical device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return LogicalDevice()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'GetLogicalDevice',
-            request,
-            context)
+                'Logical device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(LogicalDevice())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def ListLogicalDevicePorts(self, request, context):
         log.info('grpc-request', request=request)
 
-        try:
-            instance_id = self.dispatcher.instance_id_by_logical_device_id(
-                request.id
-            )
-        except KeyError:
+        response = yield self.dispatcher.dispatch('ListLogicalDevicePorts',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
             context.set_details(
-                'Logical device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Ports()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'ListLogicalDevicePorts',
-            request,
-            context)
+                'Logical device ports \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Ports())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def ListLogicalDeviceFlows(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_logical_device_id(
-                request.id
-            )
-        except KeyError:
+        response = yield self.dispatcher.dispatch('ListLogicalDeviceFlows',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
             context.set_details(
-                'Logical device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Flows()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'ListLogicalDeviceFlows',
-            request,
-            context)
+                'Logical device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Flows())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def UpdateLogicalDeviceFlowTable(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_logical_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Logical device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Empty()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
+        response = yield self.dispatcher.dispatch(
             'UpdateLogicalDeviceFlowTable',
             request,
-            context)
+            context,
+            id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details(
+                'Logical device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Empty())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def ListLogicalDeviceFlowGroups(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_logical_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Logical device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return FlowGroups()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
+        response = yield self.dispatcher.dispatch(
             'ListLogicalDeviceFlowGroups',
             request,
-            context)
+            context,
+            id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details(
+                'Logical device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(FlowGroups())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def UpdateLogicalDeviceFlowGroupTable(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_logical_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Logical device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Empty()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
+        response = yield self.dispatcher.dispatch(
             'UpdateLogicalDeviceFlowGroupTable',
             request,
-            context)
+            context,
+            id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details(
+                'Logical device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Empty())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def ListDevices(self, request, context):
-        log.warning('temp-limited-implementation')
-        # TODO dispatching to local instead of collecting all
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'ListDevices',
-            request,
-            context)
+        log.info('grpc-request', request=request)
+        response = yield self.dispatcher.dispatch('ListDevices',
+                                                  Empty(),
+                                                  context,
+                                                  broadcast=True)
+        log.info('grpc-response', response=response)
+        returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def GetDevice(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Device()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'GetDevice',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('GetDevice',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Device())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def CreateDevice(self, request, context):
         log.info('grpc-request', request=request)
-        # TODO dispatching to local instead of passing it to leader
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'CreateDevice',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('CreateDevice',
+                                                  request,
+                                                  context)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Create device error')
+            context.set_code(response.error_code)
+            returnValue(Device())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def EnableDevice(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Device()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'EnableDevice',
-            request,
-            context)
-
+        response = yield self.dispatcher.dispatch('EnableDevice',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Device())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def DisableDevice(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Device()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'DisableDevice',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('DisableDevice',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Device())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def RebootDevice(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Device()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'RebootDevice',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('RebootDevice',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Device())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def DeleteDevice(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Device()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'DeleteDevice',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('DeleteDevice',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Empty())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(Empty())
 
     @twisted_async
+    @inlineCallbacks
     def ListDevicePorts(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Ports()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'ListDevicePorts',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('ListDevicePorts',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Ports())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def ListDevicePmConfigs(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return PmConfigs()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'ListDevicePmConfigs',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('ListDevicePmConfigs',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(PmConfigs())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def UpdateDevicePmConfigs(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Empty()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'UpdateDevicePmConfigs',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('UpdateDevicePmConfigs',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Empty())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def ListDeviceFlows(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Flows()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'ListDeviceFlows',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('ListDeviceFlows',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Flows())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def ListDeviceFlowGroups(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return FlowGroups()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'ListDeviceFlowGroups',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('ListDeviceFlowGroups',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(FlowGroups())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def ListDeviceTypes(self, request, context):
         log.info('grpc-request', request=request)
         # we always deflect this to the local instance, as we assume
         # they all loaded the same adapters, supporting the same device
         # types
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'ListDeviceTypes',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('ListDeviceTypes',
+                                                  request,
+                                                  context)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device types error')
+            context.set_code(response.error_code)
+            returnValue(DeviceTypes())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def GetDeviceType(self, request, context):
         log.info('grpc-request', request=request)
         # we always deflect this to the local instance, as we assume
         # they all loaded the same adapters, supporting the same device
         # types
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'GetDeviceType',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('GetDeviceType',
+                                                  request,
+                                                  context)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device type \'{}\' error'.format(
+                request.id))
+            context.set_code(response.error_code)
+            returnValue(DeviceType())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def ListDeviceGroups(self, request, context):
-        log.warning('temp-limited-implementation')
-        # TODO dispatching to local instead of collecting all
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'ListDeviceGroups',
-            Empty(),
-            context)
+        log.info('grpc-request', request=request)
+        response = yield self.dispatcher.dispatch('ListDeviceGroups',
+                                                  Empty(),
+                                                  context,
+                                                  broadcast=True)
+        log.info('grpc-response', response=response)
+        returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def GetDeviceGroup(self, request, context):
-        log.warning('temp-limited-implementation')
-        # TODO dispatching to local instead of collecting all
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'GetDeviceGroup',
-            request,
-            context)
-
+        log.info('grpc-request', request=request)
+        response = yield self.dispatcher.dispatch('GetDeviceGroup',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device group\'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(DeviceGroup())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def CreateAlarmFilter(self, request, context):
         log.info('grpc-request', request=request)
-        # TODO dispatching to local instead of passing it to leader
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'CreateAlarmFilter',
-            request,
-            context)
+        # Since AlarmFilter applies to the entire cluster, it will be assigned
+        # a global id (using a global core_id).  Every Voltha instance will
+        # have the same data.  Since the voltha instances are managed by
+        # docker swarm mode then whenever an instance goes down it will be
+        # brought up right away, hence reducing the chance of two instances
+        # having different data.   In future phases, we should adopt the
+        # strategy of having a unique persistence model for cluster data
+        # compare to instance data
+        try:
+            assert isinstance(request, AlarmFilter)
+            request.id = create_cluster_id()
+        except AssertionError, e:
+            context.set_details(e.message)
+            context.set_code(StatusCode.INVALID_ARGUMENT)
+            returnValue(AlarmFilter())
+
+        response = yield self.dispatcher.dispatch('CreateAlarmFilter',
+                                                  request,
+                                                  context,
+                                                  id=request.id,
+                                                  broadcast=True)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Create alarm error')
+            context.set_code(response.error_code)
+            returnValue(AlarmFilter())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def GetAlarmFilter(self, request, context):
-        log.warning('temp-limited-implementation')
-        # TODO dispatching to local instead of collecting all
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'GetAlarmFilter',
-            request,
-            context)
+        log.info('grpc-request', request=request)
+        response = yield self.dispatcher.dispatch('GetAlarmFilter',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Alarm filter\'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(AlarmFilter())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def UpdateAlarmFilter(self, request, context):
         log.info('grpc-request', request=request)
-        # TODO dispatching to local instead of passing it to leader
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'UpdateAlarmFilter',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('UpdateAlarmFilter',
+                                                  request,
+                                                  context,
+                                                  id=request.id,
+                                                  broadcast=True)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Alarm filter\'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(AlarmFilter())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def DeleteAlarmFilter(self, request, context):
         log.info('grpc-request', request=request)
-        # TODO dispatching to local instead of passing it to leader
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'DeleteAlarmFilter',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('DeleteAlarmFilter',
+                                                  request,
+                                                  context,
+                                                  id=request.id,
+                                                  broadcast=True)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Alarm filter\'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Empty())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(Empty())
 
     @twisted_async
+    @inlineCallbacks
     def ListAlarmFilters(self, request, context):
-        log.warning('temp-limited-implementation')
-        # TODO dispatching to local instead of collecting all
-        return self.dispatcher.dispatch(
-            self.instance_id,
-            VolthaLocalServiceStub,
-            'ListAlarmFilters',
-            Empty(),
-            context)
+        log.info('grpc-request', request=request)
+        response = yield self.dispatcher.dispatch('ListAlarmFilters',
+                                                  Empty(),
+                                                  context,
+                                                  broadcast=True)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Alarm filters error')
+            context.set_code(response.error_code)
+            returnValue(AlarmFilter())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def GetImages(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(request.id)
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return Images()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'GetImages',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('GetImages',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(Images())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
 
     @twisted_async
+    @inlineCallbacks
     def SelfTest(self, request, context):
         log.info('grpc-request', request=request)
-
-        try:
-            instance_id = self.dispatcher.instance_id_by_device_id(
-                request.id
-            )
-        except KeyError:
-            context.set_details(
-                'Device \'{}\' not found'.format(request.id))
-            context.set_code(StatusCode.NOT_FOUND)
-            return SelfTestResponse()
-
-        return self.dispatcher.dispatch(
-            instance_id,
-            VolthaLocalServiceStub,
-            'SelfTest',
-            request,
-            context)
+        response = yield self.dispatcher.dispatch('SelfTest',
+                                                  request,
+                                                  context,
+                                                  id=request.id)
+        log.info('grpc-response', response=response)
+        if isinstance(response, DispatchError):
+            log.info('grpc-error-response', error=response.error_code)
+            context.set_details('Device \'{}\' error'.format(request.id))
+            context.set_code(response.error_code)
+            returnValue(SelfTestResponse())
+        else:
+            log.info('grpc-success-response', response=response)
+            returnValue(response)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 63efcef..bc36a35 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -625,6 +625,7 @@
 
         try:
             alarm_filter = self.root.get('/alarm_filters/{}'.format(request.id))
+
             return alarm_filter
         except KeyError:
             context.set_details(
@@ -654,16 +655,13 @@
         try:
             assert isinstance(request, AlarmFilter)
             alarm_filter = request
-            assert alarm_filter.id == '', 'Alarm filter to be created cannot have id yet'
-
+            assert alarm_filter.id is not None, 'Local Alarm filter to be ' \
+                                              'created must have id'
         except AssertionError, e:
             context.set_details(e.message)
             context.set_code(StatusCode.INVALID_ARGUMENT)
             return AlarmFilter()
 
-        # fill additional data
-        alarm_filter.id = uuid4().hex[:12]
-
         # add device to tree
         self.root.add('/alarm_filters', alarm_filter)
 
diff --git a/voltha/leader.py b/voltha/leader.py
index 47ed8bc..ca8d9b4 100644
--- a/voltha/leader.py
+++ b/voltha/leader.py
@@ -24,6 +24,7 @@
 from simplejson import dumps, loads
 
 from common.utils.asleep import asleep
+from common.utils.id_generation import get_next_core_id
 
 log = get_logger()
 
@@ -68,9 +69,6 @@
         self.core_data_id_match = re.compile(
             self.CORE_STORE_KEY_EXTRACTOR % self.coord.core_store_prefix).match
 
-        self.core_store_assignment_key = self.coord.core_store_prefix + \
-                                         '/assignment'
-
         self.assignment_match = re.compile(
             self.ASSIGNMENT_EXTRACTOR % self.coord.assignment_prefix).match
 
@@ -155,21 +153,22 @@
         try:
             # Get the mapping record
             (_, mappings) = yield self.coord.kv_get(
-                self.core_store_assignment_key, recurse=True)
+                self.coord.core_store_assignment_key, recurse=True)
             if mappings:
                 self.core_store_assignment = loads(mappings[0]['Value'])
                 return
             else:  # Key has not been created yet
                 # Create the key with an empty dictionary value
                 value = dict()
-                result = yield self.coord.kv_put(self.core_store_assignment_key,
-                                                 dumps(value))
+                result = yield self.coord.kv_put(
+                    self.coord.core_store_assignment_key,
+                    dumps(value))
                 if not result:
                     raise ConfigMappingException(self.instance_id)
 
                 # Ensure the record was created
                 (_, mappings) = yield self.coord.kv_get(
-                    self.core_store_assignment_key, recurse=True)
+                    self.coord.core_store_assignment_key, recurse=True)
 
                 self.core_store_assignment = loads(mappings[0]['Value'])
 
@@ -205,9 +204,9 @@
                 self.coord.membership_prefix, index=index, recurse=True)
 
             # Only members with valid session are considered active
-            members = [{'id':self.member_id_match(e['Key']).group(2),
+            members = [{'id': self.member_id_match(e['Key']).group(2),
                         'host': loads(e['Value'])['host_address']}
-                            for e in results if 'Session' in e ]
+                       for e in results if 'Session' in e]
 
             log.info('active-members', active_members=members)
 
@@ -257,9 +256,6 @@
     @inlineCallbacks
     def _reassign_core_stores(self):
 
-        def _get_new_str_id(max_val_in_str):
-            return str(int(max_val_in_str) + 1)
-
         def _get_core_data_id_from_instance(instance_name):
             for id, instance in self.core_store_assignment.iteritems():
                 if instance and instance['id'] == instance_name:
@@ -287,7 +283,7 @@
 
             # 2. Update the mapping with the new set
             current_id = max(self.core_store_assignment) \
-                            if self.core_store_assignment else '0'
+                if self.core_store_assignment else '0000'
             for instance in self.members:
                 if instance['id'] not in existing_active_config_members:
                     # Add the member to the config map
@@ -297,21 +293,22 @@
                         updated_mapping[next_id] = instance
                     else:
                         # There are no empty slot, create new ids
-                        current_id = _get_new_str_id(current_id)
+                        current_id = get_next_core_id(current_id)
                         updated_mapping[current_id] = instance
 
             self.core_store_assignment = updated_mapping
             log.info('updated-assignment',
-                      core_store_assignment=self.core_store_assignment,
-                      inactive_members=inactive_members)
+                     core_store_assignment=self.core_store_assignment,
+                     inactive_members=inactive_members)
 
             # 3. save the mapping into consul
-            yield self.coord.kv_put(self.core_store_assignment_key,
+            yield self.coord.kv_put(self.coord.core_store_assignment_key,
                                     dumps(self.core_store_assignment))
 
             # 4. Assign the new workload to the newly created members
             curr_members_set = set([m['id'] for m in self.members])
-            new_members = curr_members_set.difference(existing_active_config_members)
+            new_members = curr_members_set.difference(
+                existing_active_config_members)
             for new_member_id in new_members:
                 yield self.coord.kv_put(
                     self.coord.assignment_prefix
@@ -331,88 +328,88 @@
             log.exception('config-reassignment-failure', e=e)
             self._restart_core_store_reassignment_soak_timer()
 
-    # @inlineCallbacks
-    # def _reassign_work(self):
-    #
-    #     log.info('reassign-work')
-    #
-    #     # Plan
-    #     #
-    #     # Step 1: calculate desired assignment from current members and
-    #     #         workload list (e.g., using consistent hashing or any other
-    #     #         algorithm
-    #     # Step 2: collect current assignments from consul
-    #     # Step 3: find the delta between the desired and actual assignments:
-    #     #         these form two lists:
-    #     #         1. new assignments to be made
-    #     #         2. obsolete assignments to be revoked
-    #     #         graceful handling may be desirable when moving existing
-    #     #         assignment from existing member to another member (to make
-    #     #         sure it is abandoned by old member before new takes charge)
-    #     # Step 4: orchestrate the assignment by adding/deleting(/locking)
-    #     #         entries in consul
-    #     #
-    #     # We must make sure while we are working on this, we do not re-enter
-    #     # into same method!
-    #
-    #     try:
-    #
-    #         # Step 1: generate wanted assignment (mapping work to members)
-    #
-    #         ring = HashRing(self.members)
-    #         wanted_assignments = dict()  # member_id -> set(work_id)
-    #         _ = [
-    #             wanted_assignments.setdefault(ring.get_node(work), set())
-    #                 .add(work)
-    #             for work in self.workload
-    #         ]
-    #         for (member, work) in sorted(wanted_assignments.iteritems()):
-    #             log.info('assignment',
-    #                      member=member, work_count=len(work))
-    #
-    #         # Step 2: discover current assignment (from consul)
-    #
-    #         (_, results) = yield self.coord.kv_get(
-    #             self.coord.assignment_prefix, recurse=True)
-    #
-    #         matches = [
-    #             (self.assignment_match(e['Key']), e) for e in results or []]
-    #
-    #         current_assignments = dict()  # member_id -> set(work_id)
-    #         _ = [
-    #             current_assignments.setdefault(
-    #                 m.groupdict()['member_id'], set())
-    #                 .add(m.groupdict()['work_id'])
-    #             for m, e in matches if m is not None
-    #         ]
-    #
-    #         # Step 3: handle revoked assignments first on a per member basis
-    #
-    #         for member_id, current_work in current_assignments.iteritems():
-    #             assert isinstance(current_work, set)
-    #             wanted_work = wanted_assignments.get(member_id, set())
-    #             work_to_revoke = current_work.difference(wanted_work)
-    #
-    #             # revoking work by simply deleting the assignment entry
-    #             # TODO if we want some feedback to see that member abandoned
-    #             # work, we could add a consul-based protocol here
-    #             for work_id in work_to_revoke:
-    #                 yield self.coord.kv_delete(
-    #                     self.coord.assignment_prefix
-    #                     + member_id + '/' + work_id)
-    #
-    #         # Step 4: assign new work as needed
-    #
-    #         for member_id, wanted_work in wanted_assignments.iteritems():
-    #             assert isinstance(wanted_work, set)
-    #             current_work = current_assignments.get(member_id, set())
-    #             work_to_assign = wanted_work.difference(current_work)
-    #
-    #             for work_id in work_to_assign:
-    #                 yield self.coord.kv_put(
-    #                     self.coord.assignment_prefix
-    #                     + member_id + '/' + work_id, '')
-    #
-    #     except Exception, e:
-    #         log.exception('failed-reassignment', e=e)
-    #         self._restart_reassignment_soak_timer()  # try again in a while
+            # @inlineCallbacks
+            # def _reassign_work(self):
+            #
+            #     log.info('reassign-work')
+            #
+            #     # Plan
+            #     #
+            #     # Step 1: calculate desired assignment from current members and
+            #     #         workload list (e.g., using consistent hashing or any other
+            #     #         algorithm
+            #     # Step 2: collect current assignments from consul
+            #     # Step 3: find the delta between the desired and actual assignments:
+            #     #         these form two lists:
+            #     #         1. new assignments to be made
+            #     #         2. obsolete assignments to be revoked
+            #     #         graceful handling may be desirable when moving existing
+            #     #         assignment from existing member to another member (to make
+            #     #         sure it is abandoned by old member before new takes charge)
+            #     # Step 4: orchestrate the assignment by adding/deleting(/locking)
+            #     #         entries in consul
+            #     #
+            #     # We must make sure while we are working on this, we do not re-enter
+            #     # into same method!
+            #
+            #     try:
+            #
+            #         # Step 1: generate wanted assignment (mapping work to members)
+            #
+            #         ring = HashRing(self.members)
+            #         wanted_assignments = dict()  # member_id -> set(work_id)
+            #         _ = [
+            #             wanted_assignments.setdefault(ring.get_node(work), set())
+            #                 .add(work)
+            #             for work in self.workload
+            #         ]
+            #         for (member, work) in sorted(wanted_assignments.iteritems()):
+            #             log.info('assignment',
+            #                      member=member, work_count=len(work))
+            #
+            #         # Step 2: discover current assignment (from consul)
+            #
+            #         (_, results) = yield self.coord.kv_get(
+            #             self.coord.assignment_prefix, recurse=True)
+            #
+            #         matches = [
+            #             (self.assignment_match(e['Key']), e) for e in results or []]
+            #
+            #         current_assignments = dict()  # member_id -> set(work_id)
+            #         _ = [
+            #             current_assignments.setdefault(
+            #                 m.groupdict()['member_id'], set())
+            #                 .add(m.groupdict()['work_id'])
+            #             for m, e in matches if m is not None
+            #         ]
+            #
+            #         # Step 3: handle revoked assignments first on a per member basis
+            #
+            #         for member_id, current_work in current_assignments.iteritems():
+            #             assert isinstance(current_work, set)
+            #             wanted_work = wanted_assignments.get(member_id, set())
+            #             work_to_revoke = current_work.difference(wanted_work)
+            #
+            #             # revoking work by simply deleting the assignment entry
+            #             # TODO if we want some feedback to see that member abandoned
+            #             # work, we could add a consul-based protocol here
+            #             for work_id in work_to_revoke:
+            #                 yield self.coord.kv_delete(
+            #                     self.coord.assignment_prefix
+            #                     + member_id + '/' + work_id)
+            #
+            #         # Step 4: assign new work as needed
+            #
+            #         for member_id, wanted_work in wanted_assignments.iteritems():
+            #             assert isinstance(wanted_work, set)
+            #             current_work = current_assignments.get(member_id, set())
+            #             work_to_assign = wanted_work.difference(current_work)
+            #
+            #             for work_id in work_to_assign:
+            #                 yield self.coord.kv_put(
+            #                     self.coord.assignment_prefix
+            #                     + member_id + '/' + work_id, '')
+            #
+            #     except Exception, e:
+            #         log.exception('failed-reassignment', e=e)
+            #         self._restart_reassignment_soak_timer()  # try again in a while
diff --git a/voltha/main.py b/voltha/main.py
index ae914d0..787eb36 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -317,6 +317,7 @@
                 VolthaCore(
                     instance_id=self.instance_id,
                     core_store_id = self.core_store_id,
+                    grpc_port=self.args.grpc_port,
                     version=VERSION,
                     log_level=LogLevel.INFO
                 )
diff --git a/voltha/worker.py b/voltha/worker.py
index aff83b1..bb8932a 100644
--- a/voltha/worker.py
+++ b/voltha/worker.py
@@ -19,6 +19,7 @@
 from twisted.internet import reactor
 from twisted.internet.base import DelayedCall
 from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from simplejson import dumps, loads
 
 from common.utils.asleep import asleep
 
@@ -46,6 +47,7 @@
         self.my_workload = set()  # list of work_id's assigned to me
 
         self.assignment_soak_timer = None
+        self.assignment_core_store_soak_timer = None
         self.my_candidate_workload = set()  # we stash here during soaking
 
         self.assignment_match = re.compile(
@@ -55,10 +57,13 @@
 
         self.wait_for_core_store_assignment = Deferred()
 
+        self.peers_map = None
+
     @inlineCallbacks
     def start(self):
         log.debug('starting')
         yield self._start_tracking_my_assignments()
+        yield self._start_tracking_my_peers()
         log.info('started')
         returnValue(self)
 
@@ -79,15 +84,15 @@
             returnValue(val)
 
     # Private methods:
-
     def _start_tracking_my_assignments(self):
         reactor.callLater(0, self._track_my_assignments, 0)
 
+    def _start_tracking_my_peers(self):
+        reactor.callLater(0, self._track_my_peers, 0)
+
     @inlineCallbacks
     def _track_my_assignments(self, index):
-
         try:
-
             # if there is no leader yet, wait for a stable leader
             d = self.coord.wait_for_a_leader()
             if not d.called:
@@ -105,13 +110,13 @@
             if results and not self.mycore_store_id:
                 # We have no store id set yet
                 core_stores = [c['Value'] for c in results if
-                           c['Key'] == self.coord.assignment_prefix +
-                           self.instance_id + '/' +
-                           self.coord.core_storage_suffix]
+                               c['Key'] == self.coord.assignment_prefix +
+                               self.instance_id + '/' +
+                               self.coord.core_storage_suffix]
                 if core_stores:
                     self.mycore_store_id = core_stores[0]
                     log.debug('store-assigned',
-                             mycore_store_id=self.mycore_store_id)
+                              mycore_store_id=self.mycore_store_id)
                     self._stash_and_restart_core_store_soak_timer()
 
             # 2.  Check whether we have been assigned a work item
@@ -130,9 +135,40 @@
             # to prevent flood
 
         finally:
-            if not self.halted:
+            if not self.halted and not self.mycore_store_id:
                 reactor.callLater(0, self._track_my_assignments, index)
 
+    @inlineCallbacks
+    def _track_my_peers(self, index):
+        try:
+            if self.mycore_store_id:
+                # Wait for updates to the store assigment key
+                (index, mappings) = yield self.coord.kv_get(
+                    self.coord.core_store_assignment_key,
+                    index=index,
+                    recurse=True)
+                if mappings:
+                    new_map = loads(mappings[0]['Value'])
+                    # Remove my id from my peers list
+                    new_map.pop(self.mycore_store_id)
+                    if self.peers_map is None or self.peers_map != new_map:
+                        self.coord.publish_peers_map_change(new_map)
+                        self.peers_map = new_map
+                        log.debug('peer-mapping-changed', mapping=new_map)
+
+        except Exception, e:
+            log.exception('peer-track-error', e=e)
+            yield asleep(
+                self.coord.worker_config.get(
+                    self.coord.worker_config[
+                        'assignments_track_error_to_avoid_flood'], 1))
+            # to prevent flood
+        finally:
+            if not self.halted:
+                # Wait longer if we have not received a core id yet
+                reactor.callLater(0 if self.mycore_store_id else 5,
+                                  self._track_my_peers, index)
+
     def _stash_and_restart_soak_timer(self, candidate_workload):
 
         log.debug('re-start-assignment-soaking')
@@ -151,9 +187,9 @@
         :return: None
         """
         log.debug('my-assignments-changed',
-                 old_count=len(self.my_workload),
-                 new_count=len(self.my_candidate_workload),
-                 workload=self.my_workload)
+                  old_count=len(self.my_workload),
+                  new_count=len(self.my_candidate_workload),
+                  workload=self.my_workload)
         self.my_workload, self.my_candidate_workload = \
             self.my_candidate_workload, None
 
@@ -161,14 +197,14 @@
 
         log.debug('re-start-assignment-config-soaking')
 
-        if self.assignment_soak_timer is not None:
-            if not self.assignment_soak_timer.called:
-                self.assignment_soak_timer.cancel()
+        if self.assignment_core_store_soak_timer is not None:
+            if not self.assignment_core_store_soak_timer.called:
+                self.assignment_core_store_soak_timer.cancel()
 
-        self.assignment_soak_timer = reactor.callLater(
+        self.assignment_core_store_soak_timer = reactor.callLater(
             self.soak_time, self._process_config_assignment)
 
     def _process_config_assignment(self):
         log.debug('process-config-assignment',
-                 mycore_store_id=self.mycore_store_id)
-        self.wait_for_core_store_assignment.callback(self.mycore_store_id)
\ No newline at end of file
+                  mycore_store_id=self.mycore_store_id)
+        self.wait_for_core_store_assignment.callback(self.mycore_store_id)