[VOL-1036] Initial implementation of device lifecycle management

Change-Id: I5aa58fdcbcd852f6f5eef35d48f25f76e20c0418
diff --git a/adapters/iadapter.py b/adapters/iadapter.py
index c6ea3ca..3388d5a 100644
--- a/adapters/iadapter.py
+++ b/adapters/iadapter.py
@@ -83,12 +83,16 @@
         raise NotImplementedError()
 
     def get_ofp_device_info(self, device):
-        log.debug('get_ofp_device_info', device_id=device.id)
-        return self.devices_handlers[device.id].get_ofp_device_info(device)
+        log.debug('get_ofp_device_info_start', device_id=device.id)
+        ofp_device_info = self.devices_handlers[device.id].get_ofp_device_info(device)
+        log.debug('get_ofp_device_info_ends', device_id=device.id)
+        return ofp_device_info
 
     def get_ofp_port_info(self, device, port_no):
-        log.debug('get_ofp_port_info', device_id=device.id, port_no=port_no)
-        return self.devices_handlers[device.id].get_ofp_port_info(device, port_no)
+        log.debug('get_ofp_port_info_start', device_id=device.id, port_no=port_no)
+        ofp_port_info = self.devices_handlers[device.id].get_ofp_port_info(device, port_no)
+        log.debug('get_ofp_port_info_ends', device_id=device.id, port_no=port_no)
+        return ofp_port_info
 
     def adopt_device(self, device):
         log.debug('adopt_device', device_id=device.id)
@@ -105,7 +109,8 @@
 
     def disable_device(self, device):
         log.info('disable-device', device_id=device.id)
-        reactor.callLater(0, self.devices_handlers[device.id].disable)
+        reactor.callLater(1, self.devices_handlers[device.id].disable)
+        log.debug('disable_device_done', device_id=device.id)
         return device
 
     def reenable_device(self, device):
diff --git a/adapters/kafka/adapter_request_facade.py b/adapters/kafka/adapter_request_facade.py
index 74ed934..2517a31 100644
--- a/adapters/kafka/adapter_request_facade.py
+++ b/adapters/kafka/adapter_request_facade.py
@@ -82,17 +82,17 @@
         d = Device()
         if device:
             device.Unpack(d)
-            return (True, self.adapter.adopt_device(d))
+            return True, self.adapter.adopt_device(d)
         else:
-            return (False, d)
+            return False, d
 
     def get_ofp_device_info(self, device):
         d = Device()
         if device:
             device.Unpack(d)
-            return (True, self.adapter.get_ofp_device_info(d))
+            return True, self.adapter.get_ofp_device_info(d)
         else:
-            return (False, d)
+            return False, d
 
     def get_ofp_port_info(self, device, port_no):
         d = Device()
@@ -104,7 +104,7 @@
         p = IntType()
         port_no.Unpack(p)
 
-        return (True, self.adapter.get_ofp_port_info(d, p.val))
+        return True, self.adapter.get_ofp_port_info(d, p.val)
 
 
     def reconcile_device(self, device):
@@ -114,10 +114,20 @@
         return self.adapter.abandon_device(device)
 
     def disable_device(self, device):
-        return self.adapter.disable_device(device)
+        d = Device()
+        if device:
+            device.Unpack(d)
+            return True, self.adapter.disable_device(d)
+        else:
+            return False, d
 
     def reenable_device(self, device):
-        return self.adapter.reenable_device(device)
+        d = Device()
+        if device:
+            device.Unpack(d)
+            return True, self.adapter.reenable_device(d)
+        else:
+            return False, d
 
     def reboot_device(self, device):
         d = Device()
diff --git a/adapters/kafka/core_proxy.py b/adapters/kafka/core_proxy.py
index bcc4239..32a4c9d 100644
--- a/adapters/kafka/core_proxy.py
+++ b/adapters/kafka/core_proxy.py
@@ -32,18 +32,18 @@
 from adapters.common.utils.id_generation import create_cluster_logical_device_ids
 from adapters.interface import IAdapterInterface
 from adapters.protos import third_party
-from adapters.protos.device_pb2 import Device, Port, PmConfigs
+from adapters.protos.device_pb2 import Device, Port, Ports, PmConfigs
 from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
     AlarmEventSeverity, AlarmEventState, AlarmEventCategory
 from adapters.protos.events_pb2 import KpiEvent
 from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
-    LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey, CoreInstance
+    LogicalPort, AlarmFilterRuleKey, CoreInstance
 from adapters.common.utils.registry import registry, IComponent
 from adapters.common.utils.id_generation import create_cluster_device_id
 import re
 from adapters.interface import ICoreSouthBoundInterface
 from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
-from adapters.protos.common_pb2 import ID
+from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
 from google.protobuf.message import Message
 from adapters.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
 
@@ -159,8 +159,17 @@
     # def add_device(self, device):
     #     raise NotImplementedError()
 
+    @wrap_request(Ports)
+    @inlineCallbacks
     def get_ports(self, device_id, port_type):
-        raise NotImplementedError()
+        id = ID()
+        id.id = device_id
+        p_type = IntType()
+        p_type.val = port_type
+        res = yield self.invoke(rpc="GetPorts",
+                                device_id=id,
+                                port_type=p_type)
+        returnValue(res)
 
     def get_child_devices(self, parent_device_id):
         raise NotImplementedError()
@@ -231,20 +240,18 @@
     def device_state_update(self, device_id,
                                    oper_status=None,
                                    connect_status=None):
-
         id = ID()
         id.id = device_id
         o_status = IntType()
-        if oper_status:
+        if oper_status or oper_status==OperStatus.UNKNOWN:
             o_status.val = oper_status
         else:
             o_status.val = -1
         c_status = IntType()
-        if connect_status:
+        if connect_status or connect_status==ConnectStatus.UNKNOWN:
             c_status.val = connect_status
         else:
             c_status.val = -1
-        a_status = IntType()
 
         res = yield self.invoke(rpc="DeviceStateUpdate",
                                 device_id=id,
@@ -254,34 +261,52 @@
 
     @wrap_request(None)
     @inlineCallbacks
+    def port_state_update(self,
+                          device_id,
+                          port_type,
+                          port_no,
+                          oper_status):
+        id = ID()
+        id.id = device_id
+        pt = IntType()
+        pt.val = port_type
+        pNo = IntType()
+        pNo.val = port_no
+        o_status = IntType()
+        o_status.val = oper_status
+
+        res = yield self.invoke(rpc="PortStateUpdate",
+                                device_id=id,
+                                port_type=pt,
+                                port_no=pNo,
+                                oper_status=o_status)
+        returnValue(res)
+
+
+
+    @wrap_request(None)
+    @inlineCallbacks
     def child_devices_state_update(self, parent_device_id,
                                    oper_status=None,
-                                   connect_status=None,
-                                   admin_state=None):
+                                   connect_status=None):
 
         id = ID()
         id.id = parent_device_id
         o_status = IntType()
-        if oper_status:
+        if oper_status or oper_status==OperStatus.UNKNOWN:
             o_status.val = oper_status
         else:
             o_status.val = -1
         c_status = IntType()
-        if connect_status:
+        if connect_status or connect_status==ConnectStatus.UNKNOWN:
             c_status.val = connect_status
         else:
             c_status.val = -1
-        a_status = IntType()
-        if admin_state:
-            a_status.val = admin_state
-        else:
-            a_status.val = -1
 
         res = yield self.invoke(rpc="child_devices_state_update",
                                 parent_device_id=id,
                                 oper_status=o_status,
-                                connect_status=c_status,
-                                admin_state=a_status)
+                                connect_status=c_status)
         returnValue(res)
 
 
diff --git a/adapters/ponsim_olt/ponsim_olt.py b/adapters/ponsim_olt/ponsim_olt.py
index 5e096b4..95bafaa 100644
--- a/adapters/ponsim_olt/ponsim_olt.py
+++ b/adapters/ponsim_olt/ponsim_olt.py
@@ -25,7 +25,7 @@
 import structlog
 from scapy.layers.l2 import Ether, Dot1Q
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
 from grpc._channel import _Rendezvous
 
 from adapters.common.frameio.frameio import BpfProgramFilter, hexify
@@ -245,11 +245,10 @@
 
             self.log.info('grpc-channel-closed')
 
+    @inlineCallbacks
     def _get_nni_port(self):
-        ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
-        if ports:
-            # For now, we use on one NNI port
-            return ports[0]
+        ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
+        returnValue(ports)
 
     @inlineCallbacks
     def activate(self, device):
@@ -272,7 +271,8 @@
             device.vendor = 'ponsim'
             device.model = 'n/a'
             device.serial_number = device.host_and_port
-            device.connect_status = ConnectStatus.REACHABLE
+            device.mac_address = "AA:BB:CC:DD:EE:FF"
+            # device.connect_status = ConnectStatus.REACHABLE
             yield self.adapter_agent.device_update(device)
 
             # Now set the initial PM configuration for this device
@@ -288,7 +288,7 @@
                 port_no=info.nni_port,
                 label='NNI facing Ethernet port',
                 type=Port.ETHERNET_NNI,
-                admin_state=AdminState.ENABLED,
+                # admin_state=AdminState.ENABLED,
                 oper_status=OperStatus.ACTIVE
             )
             self.nni_port = nni_port
@@ -297,10 +297,11 @@
                 port_no=1,
                 label='PON port',
                 type=Port.PON_OLT,
-                admin_state=AdminState.ENABLED,
+                # admin_state=AdminState.ENABLED,
                 oper_status=OperStatus.ACTIVE
             ))
-            yield self.adapter_agent.device_state_update(device.id, oper_status=OperStatus.ACTIVE)
+
+            yield self.adapter_agent.device_state_update(device.id, connect_status=ConnectStatus.REACHABLE, oper_status=OperStatus.ACTIVE)
 
             # register ONUS
             self.log.info('onu-found', onus=info.onus, len=len(info.onus))
@@ -319,7 +320,7 @@
 
             # TODO
             # Start collecting stats from the device after a brief pause
-            # self.start_kpi_collection(device.id)
+            self.start_kpi_collection(device.id)
         except Exception as e:
             log.exception("Exception-activating", e=e)
 
@@ -577,138 +578,69 @@
         log.info('self-test-device', device=device.id)
         raise NotImplementedError()
 
+
+    @inlineCallbacks
     def disable(self):
         self.log.info('disabling', device_id=self.device_id)
 
         self.stop_kpi_collection()
 
-        # Get the latest device reference
-        device = self.adapter_agent.get_device(self.device_id)
-
-        # Update the operational status to UNKNOWN
-        device.oper_status = OperStatus.UNKNOWN
-        device.connect_status = ConnectStatus.UNREACHABLE
-        self.adapter_agent.device_update(device)
-
-        # Remove the logical device
-        logical_device = self.adapter_agent.get_logical_device(
-            self.logical_device_id)
-        self.adapter_agent.delete_logical_device(logical_device)
-
-        # Disable all child devices first
-        self.adapter_agent.update_child_devices_state(self.device_id,
-                                                      admin_state=AdminState.DISABLED)
-
-        # Remove the peer references from this device
-        self.adapter_agent.delete_all_peer_references(self.device_id)
-
-        # Set all ports to disabled
-        self.adapter_agent.disable_all_ports(self.device_id)
+        # Update the operational status to UNKNOWN and connection status to UNREACHABLE
+        yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN, connect_status=ConnectStatus.UNREACHABLE)
 
         self.close_channel()
         self.log.info('disabled-grpc-channel')
 
-        #  Update the logice device mapping
-        if self.logical_device_id in \
-                self.adapter.logical_device_id_to_root_device_id:
-            del self.adapter.logical_device_id_to_root_device_id[
-                self.logical_device_id]
-
         # TODO:
         # 1) Remove all flows from the device
         # 2) Remove the device from ponsim
 
-        self.log.info('disabled', device_id=device.id)
+        self.log.info('disabled', device_id=self.device_id)
 
+    @inlineCallbacks
     def reenable(self):
         self.log.info('re-enabling', device_id=self.device_id)
 
-        # Get the latest device reference
-        device = self.adapter_agent.get_device(self.device_id)
-
         # Set the ofp_port_no and nni_port in case we bypassed the reconcile
         # process if the device was in DISABLED state on voltha restart
         if not self.ofp_port_no and not self.nni_port:
-            stub = ponsim_pb2.PonSimStub(self.get_channel())
+            yield self.get_channel()
+            stub = ponsim_pb2.PonSimStub(self.channel)
             info = stub.GetDeviceInfo(Empty())
             log.info('got-info', info=info)
             self.ofp_port_no = info.nni_port
-            self.nni_port = self._get_nni_port()
+            ports = yield self._get_nni_port()
+            # For ponsim, we are using only 1 NNI port
+            if ports.items:
+                self.nni_port = ports.items[0]
 
-        # Update the connect status to REACHABLE
-        device.connect_status = ConnectStatus.REACHABLE
-        self.adapter_agent.device_update(device)
+        # Update the state of the NNI port
+        yield self.adapter_agent.port_state_update(self.device_id,
+                                                   port_type=Port.ETHERNET_NNI,
+                                                   port_no=self.ofp_port_no,
+                                                   oper_status=OperStatus.ACTIVE)
 
-        # Set all ports to enabled
-        self.adapter_agent.enable_all_ports(self.device_id)
+        # Update the state of the PON port
+        yield self.adapter_agent.port_state_update(self.device_id,
+                                                   port_type=Port.PON_OLT,
+                                                   port_no=1,
+                                                   oper_status=OperStatus.ACTIVE)
 
-        ld = LogicalDevice(
-            # not setting id and datapth_id will let the adapter agent pick id
-            desc=ofp_desc(
-                hw_desc='simulated pon',
-                sw_desc='simulated pon',
-                serial_num=uuid4().hex,
-                dp_desc='n/a'
-            ),
-            switch_features=ofp_switch_features(
-                n_buffers=256,  # TODO fake for now
-                n_tables=2,  # TODO ditto
-                capabilities=(  # TODO and ditto
-                        OFPC_FLOW_STATS
-                        | OFPC_TABLE_STATS
-                        | OFPC_PORT_STATS
-                        | OFPC_GROUP_STATS
-                )
-            ),
-            root_device_id=device.id
-        )
-        mac_address = "AA:BB:CC:DD:EE:FF"
-        ld_initialized = self.adapter_agent.create_logical_device(ld,
-                                                                  dpid=mac_address)
-        cap = OFPPF_1GB_FD | OFPPF_FIBER
-        self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
-            id='nni',
-            ofp_port=ofp_port(
-                port_no=self.ofp_port_no,
-                hw_addr=mac_str_to_tuple(
-                    '00:00:00:00:00:%02x' % self.ofp_port_no),
-                name='nni',
-                config=0,
-                state=OFPPS_LIVE,
-                curr=cap,
-                advertised=cap,
-                peer=cap,
-                curr_speed=OFPPF_1GB_FD,
-                max_speed=OFPPF_1GB_FD
-            ),
-            device_id=device.id,
-            device_port_no=self.nni_port.port_no,
-            root_port=True
-        ))
+        # Set the operational state of the device to ACTIVE and connect status to REACHABLE
+        yield self.adapter_agent.device_state_update(self.device_id,
+                                                     connect_status=ConnectStatus.REACHABLE,
+                                                     oper_status=OperStatus.ACTIVE)
 
-        device = self.adapter_agent.get_device(device.id)
-        device.parent_id = ld_initialized.id
-        device.oper_status = OperStatus.ACTIVE
-        self.adapter_agent.device_update(device)
-        self.logical_device_id = ld_initialized.id
+        # TODO: establish frame grpc-stream
+        # yield reactor.callInThread(self.rcv_grpc)
 
-        # Reenable all child devices
-        self.adapter_agent.update_child_devices_state(device.id,
-                                                      admin_state=AdminState.ENABLED)
+        self.start_kpi_collection(self.device_id)
 
-        # establish frame grpc-stream
-        reactor.callInThread(self.rcv_grpc)
-
-        self.start_kpi_collection(device.id)
-
-        self.log.info('re-enabled', device_id=device.id)
+        self.log.info('re-enabled', device_id=self.device_id)
 
     def delete(self):
         self.log.info('deleting', device_id=self.device_id)
 
-        # Remove all child devices
-        self.adapter_agent.delete_all_child_devices(self.device_id)
-
         self.close_channel()
         self.log.info('disabled-grpc-channel')
 
@@ -725,7 +657,7 @@
             try:
                 # Step 1: gather metrics from device
                 port_metrics = \
-                    self.pm_metrics.collect_port_metrics(self.get_channel())
+                    self.pm_metrics.collect_port_metrics(self.channel)
 
                 # Step 2: prepare the KpiEvent for submission
                 # we can time-stamp them here (or could use time derived from OLT
diff --git a/adapters/ponsim_onu/ponsim_onu.py b/adapters/ponsim_onu/ponsim_onu.py
index 7e35c7f..d1b2e27 100644
--- a/adapters/ponsim_onu/ponsim_onu.py
+++ b/adapters/ponsim_onu/ponsim_onu.py
@@ -20,7 +20,7 @@
 
 import sys
 import structlog
-from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue
 from adapters.common.utils.asleep import asleep
 
 from adapters.iadapter import OnuAdapter
@@ -92,10 +92,10 @@
         device.root = False
         device.vendor = 'ponsim'
         device.model = 'n/a'
-        device.connect_status = ConnectStatus.REACHABLE
+        # device.connect_status = ConnectStatus.REACHABLE
         yield self.adapter_agent.device_update(device)
 
-        # register physical ports
+    # register physical ports
         self.uni_port = Port(
             port_no=2,
             label='UNI facing Ethernet port',
@@ -119,7 +119,7 @@
         self.adapter_agent.port_created(device.id, self.uni_port)
         self.adapter_agent.port_created(device.id, self.pon_port)
 
-        yield self.adapter_agent.device_state_update(device.id, oper_status=OperStatus.ACTIVE)
+        yield self.adapter_agent.device_state_update(device.id, connect_status=ConnectStatus.REACHABLE, oper_status=OperStatus.ACTIVE)
 
 
     def get_ofp_port_info(self, device, port_no):
@@ -147,17 +147,27 @@
             )
         )
 
-    def _get_uni_port(self):
-        ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
-        if ports:
-            # For now, we use on one uni port
-            return ports[0]
+    # def _get_uni_port(self):
+    #     ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
+    #     if ports:
+    #         # For now, we use on one uni port
+    #         return ports[0]
 
+    @inlineCallbacks
+    def _get_uni_port(self):
+        ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
+        returnValue(ports)
+
+    @inlineCallbacks
     def _get_pon_port(self):
-        ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
-        if ports:
-            # For now, we use on one uni port
-            return ports[0]
+        ports = yield self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+        returnValue(ports)
+
+    # def _get_pon_port(self):
+    #     ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+    #     if ports:
+    #         # For now, we use on one uni port
+    #         return ports[0]
 
     def reconcile(self, device):
         self.log.info('reconciling-ONU-device-starts')
@@ -250,114 +260,104 @@
         log.info('self-test-device', device=device.id)
         raise NotImplementedError()
 
+
+    @inlineCallbacks
     def disable(self):
         self.log.info('disabling', device_id=self.device_id)
 
-        # Get the latest device reference
-        device = self.adapter_agent.get_device(self.device_id)
-
-        # Disable all ports on that device
-        self.adapter_agent.disable_all_ports(self.device_id)
-
         # Update the device operational status to UNKNOWN
-        device.oper_status = OperStatus.UNKNOWN
-        device.connect_status = ConnectStatus.UNREACHABLE
-        self.adapter_agent.update_device(device)
-
-        # Remove the uni logical port from the OLT, if still present
-        parent_device = self.adapter_agent.get_device(device.parent_id)
-        assert parent_device
-        logical_device_id = parent_device.parent_id
-        assert logical_device_id
-        port_no = device.proxy_address.channel_id
-        port_id = 'uni-{}'.format(port_no)
-        try:
-            port = self.adapter_agent.get_logical_port(logical_device_id,
-                                                       port_id)
-            self.adapter_agent.delete_logical_port(logical_device_id, port)
-        except KeyError:
-            self.log.info('logical-port-not-found', device_id=self.device_id,
-                          portid=port_id)
-
-        # Remove pon port from parent
-        self.pon_port = self._get_pon_port()
-        self.adapter_agent.delete_port_reference_from_parent(self.device_id,
-                                                             self.pon_port)
-
-        # Just updating the port status may be an option as well
-        # port.ofp_port.config = OFPPC_NO_RECV
-        # yield self.adapter_agent.update_logical_port(logical_device_id,
-        #                                             port)
-        # Unregister for proxied message
-        self.adapter_agent.unregister_for_proxied_messages(
-            device.proxy_address)
+        yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN, connect_status=ConnectStatus.UNREACHABLE)
 
         # TODO:
         # 1) Remove all flows from the device
         # 2) Remove the device from ponsim
 
-        self.log.info('disabled', device_id=device.id)
+        self.log.info('disabled', device_id=self.device_id)
 
+    @inlineCallbacks
     def reenable(self):
         self.log.info('re-enabling', device_id=self.device_id)
         try:
             # Get the latest device reference
-            device = self.adapter_agent.get_device(self.device_id)
+            # device = self.adapter_agent.get_device(self.device_id)
 
             # First we verify that we got parent reference and proxy info
-            assert device.parent_id
-            assert device.proxy_address.device_id
-            assert device.proxy_address.channel_id
+            # assert device.parent_id
+            # assert device.proxy_address.device_id
+            # assert device.proxy_address.channel_id
 
             # Re-register for proxied messages right away
-            self.proxy_address = device.proxy_address
-            self.adapter_agent.register_for_proxied_messages(
-                device.proxy_address)
+            # self.proxy_address = device.proxy_address
+            # self.adapter_agent.register_for_proxied_messages(
+            #     device.proxy_address)
 
-            # Re-enable the ports on that device
-            self.adapter_agent.enable_all_ports(self.device_id)
+            # Refresh the port reference - we only use one port for now
+            ports = yield self._get_uni_port()
+            self.log.info('re-enabling-uni-ports', ports=ports)
+            if ports.items:
+                self.uni_port = ports.items[0]
 
-            # Refresh the port reference
-            self.uni_port = self._get_uni_port()
-            self.pon_port = self._get_pon_port()
+            ports = yield self._get_pon_port()
+            self.log.info('re-enabling-pon-ports', ports=ports)
+            if ports.items:
+                self.pon_port = ports.items[0]
+
+            # Update the state of the UNI port
+            yield self.adapter_agent.port_state_update(self.device_id,
+                                                   port_type=Port.ETHERNET_UNI,
+                                                   port_no=self.uni_port.port_no,
+                                                   oper_status=OperStatus.ACTIVE)
+
+            # Update the state of the PON port
+            yield self.adapter_agent.port_state_update(self.device_id,
+                                                   port_type=Port.PON_ONU,
+                                                   port_no=self.pon_port.port_no,
+                                                   oper_status=OperStatus.ACTIVE)
+
+
+            # # Re-enable the ports on that device
+            # self.adapter_agent.enable_all_ports(self.device_id)
+
 
             # Add the pon port reference to the parent
-            self.adapter_agent.add_port_reference_to_parent(device.id,
-                                                            self.pon_port)
+            # self.adapter_agent.add_port_reference_to_parent(device.id,
+            #                                                 self.pon_port)
 
             # Update the connect status to REACHABLE
-            device.connect_status = ConnectStatus.REACHABLE
-            self.adapter_agent.update_device(device)
+            # device.connect_status = ConnectStatus.REACHABLE
+            # self.adapter_agent.update_device(device)
 
             # re-add uni port to logical device
-            parent_device = self.adapter_agent.get_device(device.parent_id)
-            logical_device_id = parent_device.parent_id
-            assert logical_device_id
-            port_no = device.proxy_address.channel_id
-            cap = OFPPF_1GB_FD | OFPPF_FIBER
-            self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
-                id='uni-{}'.format(port_no),
-                ofp_port=ofp_port(
-                    port_no=port_no,
-                    hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
-                    name='uni-{}'.format(port_no),
-                    config=0,
-                    state=OFPPS_LIVE,
-                    curr=cap,
-                    advertised=cap,
-                    peer=cap,
-                    curr_speed=OFPPF_1GB_FD,
-                    max_speed=OFPPF_1GB_FD
-                ),
-                device_id=device.id,
-                device_port_no=self.uni_port.port_no
-            ))
+            # parent_device = self.adapter_agent.get_device(device.parent_id)
+            # logical_device_id = parent_device.parent_id
+            # assert logical_device_id
+            # port_no = device.proxy_address.channel_id
+            # cap = OFPPF_1GB_FD | OFPPF_FIBER
+            # self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
+            #     id='uni-{}'.format(port_no),
+            #     ofp_port=ofp_port(
+            #         port_no=port_no,
+            #         hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
+            #         name='uni-{}'.format(port_no),
+            #         config=0,
+            #         state=OFPPS_LIVE,
+            #         curr=cap,
+            #         advertised=cap,
+            #         peer=cap,
+            #         curr_speed=OFPPF_1GB_FD,
+            #         max_speed=OFPPF_1GB_FD
+            #     ),
+            #     device_id=device.id,
+            #     device_port_no=self.uni_port.port_no
+            # ))
 
-            device = self.adapter_agent.get_device(device.id)
-            device.oper_status = OperStatus.ACTIVE
-            self.adapter_agent.update_device(device)
+            # device = self.adapter_agent.get_device(device.id)
+            # device.oper_status = OperStatus.ACTIVE
+            # self.adapter_agent.update_device(device)
 
-            self.log.info('re-enabled', device_id=device.id)
+            yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.ACTIVE, connect_status=ConnectStatus.REACHABLE)
+
+            self.log.info('re-enabled', device_id=self.device_id)
         except Exception, e:
             self.log.exception('error-reenabling', e=e)
 
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 619bd17..78a8a5a 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -49,7 +49,7 @@
 	DefaultReturnErrors      = true
 	DefaultConsumerMaxwait   = 50
 	DefaultMaxProcessingTime = 100
-	DefaultRequestTimeout    = 200 // 200 milliseconds - to handle a wider latency range
+	DefaultRequestTimeout    = 500 // 500 milliseconds - to handle a wider latency range
 )
 
 type consumerChannels struct {
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index a0e25f3..bccb227 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -18,6 +18,7 @@
 import (
 	"context"
 	"github.com/golang/protobuf/ptypes"
+	a "github.com/golang/protobuf/ptypes/any"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/kafka"
 	ca "github.com/opencord/voltha-go/protos/core_adapter"
@@ -37,28 +38,47 @@
 	return &proxy
 }
 
+func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
+	if success {
+		return nil
+	} else {
+		unpackResult := &ca.Error{}
+		var err error
+		if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+		}
+		log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
+		// TODO:  Need to get the real error code
+		return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+	}
+}
+
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
+	rpc := "adopt_device"
 	topic := kafka.Topic{Name: device.Type}
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, "adopt_device", &topic, true, args...)
-	log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success, "result": result})
-	if success {
-		return nil
-	} else {
-		unpackResult := &ca.Error{}
-		var err error
-		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-		}
-		log.Debugw("AdoptDevice-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
-		// TODO:  Need to get the real error code
-		return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
+	log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
+	rpc := "disable_device"
+	topic := kafka.Topic{Name: device.Type}
+	args := make([]*kafka.KVArg, 1)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
 	}
+	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
@@ -76,7 +96,7 @@
 	return nil, nil
 }
 
-func (ap *AdapterProxy) ReconcileDevice(device voltha.Device) error {
+func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
 	log.Debug("ReconcileDevice")
 	return nil
 }
@@ -86,22 +106,26 @@
 	return nil
 }
 
-func (ap *AdapterProxy) DisableDevice(device voltha.Device) error {
-	log.Debug("DisableDevice")
-	return nil
+func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
+	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
+	rpc := "reenable_device"
+	topic := kafka.Topic{Name: device.Type}
+	args := make([]*kafka.KVArg, 1)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
-func (ap *AdapterProxy) ReEnableDevice(device voltha.Device) error {
-	log.Debug("ReEnableDevice")
-	return nil
-}
-
-func (ap *AdapterProxy) RebootDevice(device voltha.Device) error {
+func (ap *AdapterProxy) RebootDevice(device *voltha.Device) error {
 	log.Debug("RebootDevice")
 	return nil
 }
 
-func (ap *AdapterProxy) DeleteDevice(device voltha.Device) error {
+func (ap *AdapterProxy) DeleteDevice(device *voltha.Device) error {
 	log.Debug("DeleteDevice")
 	return nil
 }
@@ -172,7 +196,7 @@
 }
 
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
-	log.Debugw("GetOfpDeviceInfo", log.Fields{"device": device})
+	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
 	topic := kafka.Topic{Name: device.Type}
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
@@ -180,7 +204,7 @@
 		Value: device,
 	}
 	success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
-	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"device": device, "success": success, "result": result})
+	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
 		unpackResult := &ca.SwitchCapability{}
 		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
@@ -201,7 +225,7 @@
 }
 
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
-	log.Debug("GetOfpPortInfo", log.Fields{"device": device})
+	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
 	topic := kafka.Topic{Name: device.Type}
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
@@ -215,7 +239,7 @@
 	}
 
 	success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
-	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "device": device, "success": success, "result": result})
+	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
 		unpackResult := &ca.PortCapability{}
 		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index bfc4ee4..7ae9f1a 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -25,6 +25,7 @@
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"reflect"
 )
 
 type AdapterRequestHandlerProxy struct {
@@ -89,6 +90,24 @@
 	}
 }
 
+// updatePartialDeviceData updates a subset of a device that an Adapter can update.
+// TODO:  May need a specific proto to handle only a subset of a device that can be changed by an adapter
+func (rhp *AdapterRequestHandlerProxy) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
+	//		First retrieve the most up to date device info
+	var currentDevice *voltha.Device
+	var err error
+	if currentDevice, err = rhp.deviceMgr.getDevice(device.Id); err != nil {
+		return nil, err
+	}
+	cloned := reflect.ValueOf(currentDevice).Elem().Interface().(voltha.Device)
+	cloned.Root = device.Root
+	cloned.Vendor = device.Vendor
+	cloned.Model = device.Model
+	cloned.SerialNumber = device.SerialNumber
+	cloned.MacAddress = device.MacAddress
+	return &cloned, nil
+}
+
 func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ca.Argument) (*empty.Empty, error) {
 	if len(args) != 1 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
@@ -100,15 +119,21 @@
 		log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
 		return nil, err
 	}
-	log.Debugw("DeviceUpdate", log.Fields{"device": device})
+	log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
 
 	if rhp.TestMode { // Execute only for test cases
 		return new(empty.Empty), nil
 	}
-	if err := rhp.deviceMgr.updateDevice(device); err != nil {
-		log.Debugw("DeviceUpdate-error", log.Fields{"device": device, "error": err})
+
+	//Merge the adapter device info (only the ones an adapter can change) with the latest device data
+	if updatedDevice, err := rhp.mergeDeviceInfoFromAdapter(device); err != nil {
 		return nil, status.Errorf(codes.Internal, "%s", err.Error())
+	} else {
+		// An adapter request needs an Ack without having to wait for the update to be
+		// completed.  We therefore run the update in its own routine.
+		go rhp.deviceMgr.updateDevice(updatedDevice)
 	}
+
 	return new(empty.Empty), nil
 }
 
@@ -137,28 +162,30 @@
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
-	pID := &voltha.ID{}
-	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
-		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
-		return nil, err
-	}
-	// Porttype is an enum sent as an integer proto
+	deviceId := &voltha.ID{}
 	pt := &ca.IntType{}
-	if err := ptypes.UnmarshalAny(args[1].Value, pt); err != nil {
-		log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
-		return nil, err
+	for _, arg := range args {
+		switch arg.Key {
+		case "device_id":
+			if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+				log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+				return nil, err
+			}
+		case "port_type":
+			if err := ptypes.UnmarshalAny(arg.Value, pt); err != nil {
+				log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
-
-	log.Debugw("GetPorts", log.Fields{"deviceID": pID.Id, "portype": pt.Val})
-
+	log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val})
 	if rhp.TestMode { // Execute only for test cases
 		aPort := &voltha.Port{Label: "test_port"}
 		allPorts := &voltha.Ports{}
 		allPorts.Items = append(allPorts.Items, aPort)
 		return allPorts, nil
 	}
-	return nil, nil
-
+	return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ca.Argument) (*voltha.Device, error) {
@@ -254,29 +281,69 @@
 				log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
 				return nil, err
 			}
-			if operStatus.Val == -1 {
-				operStatus = nil
-			}
+			//if operStatus.Val == -1 {
+			//	operStatus = nil
+			//}
 		case "connect_status":
 			if err := ptypes.UnmarshalAny(arg.Value, connStatus); err != nil {
 				log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
 				return nil, err
 			}
-			if connStatus.Val == -1 {
-				connStatus = nil
-			}
+			//if connStatus.Val == -1 {
+			//	connStatus = nil
+			//}
 		}
 	}
-
 	log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
-
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
-	if err := rhp.deviceMgr.updateDeviceState(deviceId.Id, operStatus, connStatus); err != nil {
-		log.Debugw("DeviceUpdate-error", log.Fields{"deviceId": deviceId.Id, "error": err})
-		return nil, status.Errorf(codes.Internal, "%s", err.Error())
+
+	// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
+	go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val), voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+	return new(empty.Empty), nil
+}
+
+func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+	if len(args) < 2 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
 	}
+	deviceId := &voltha.ID{}
+	portType := &ca.IntType{}
+	portNo := &ca.IntType{}
+	operStatus := &ca.IntType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device_id":
+			if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+				log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+				return nil, err
+			}
+		case "oper_status":
+			if err := ptypes.UnmarshalAny(arg.Value, operStatus); err != nil {
+				log.Warnw("cannot-unmarshal-operStatus", log.Fields{"error": err})
+				return nil, err
+			}
+		case "port_type":
+			if err := ptypes.UnmarshalAny(arg.Value, portType); err != nil {
+				log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
+				return nil, err
+			}
+		case "port_no":
+			if err := ptypes.UnmarshalAny(arg.Value, portNo); err != nil {
+				log.Warnw("cannot-unmarshal-portno", log.Fields{"error": err})
+				return nil, err
+			}
+
+		}
+	}
+	log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus, "portType": portType, "portNo": portNo})
+	if rhp.TestMode { // Execute only for test cases
+		return nil, nil
+	}
+	go rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val), voltha.OperStatus_OperStatus(operStatus.Val))
 	return new(empty.Empty), nil
 }
 
@@ -309,10 +376,14 @@
 		return nil, nil
 	}
 
-	if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
-		log.Debugw("addport-error", log.Fields{"deviceId": deviceId.Id, "error": err})
-		return nil, status.Errorf(codes.Internal, "%s", err.Error())
-	}
+	// Run port creation in its own go routine
+	go rhp.deviceMgr.addPort(deviceId.Id, port)
+
+	//if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
+	//	log.Debugw("addport-error", log.Fields{"deviceId": deviceId.Id, "error": err})
+	//	return nil, status.Errorf(codes.Internal, "%s", err.Error())
+	//}
+	// Return an Ack
 	return new(empty.Empty), nil
 }
 
@@ -346,10 +417,14 @@
 		return nil, nil
 	}
 
-	if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
-		log.Debugw("update-pmconfigs-error", log.Fields{"deviceId": pmConfigs.Id, "error": err})
-		return nil, status.Errorf(codes.Internal, "%s", err.Error())
-	}
+	// Run PM config update in its own go routine
+	go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
+
+	//if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
+	//	log.Debugw("update-pmconfigs-error", log.Fields{"deviceId": pmConfigs.Id, "error": err})
+	//	return nil, status.Errorf(codes.Internal, "%s", err.Error())
+	//}
+	// Return an Ack
 	return new(empty.Empty), nil
 
 }
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 06f3ca3..480e32f 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -24,7 +24,6 @@
 	"github.com/opencord/voltha-go/protos/voltha"
 	"github.com/opencord/voltha-go/rw_core/config"
 	"google.golang.org/grpc"
-	"reflect"
 )
 
 type Core struct {
@@ -35,8 +34,8 @@
 	grpcNBIAPIHanfler *APIHandler
 	config            *config.RWCoreFlags
 	kmp               *kafka.KafkaMessagingProxy
-	clusterDataRoot   *model.Root
-	localDataRoot     *model.Root
+	clusterDataRoot   model.Root
+	localDataRoot     model.Root
 	clusterDataProxy  *model.Proxy
 	localDataProxy    *model.Proxy
 	exitChannel       chan int
@@ -52,10 +51,10 @@
 	core.exitChannel = make(chan int, 1)
 	core.config = cf
 	// TODO: Setup the KV store
-	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil, reflect.TypeOf(model.NonPersistedRevision{}))
-	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil, reflect.TypeOf(model.NonPersistedRevision{}))
-	core.clusterDataProxy = core.clusterDataRoot.Node.GetProxy("/", false)
-	core.localDataProxy = core.localDataRoot.Node.GetProxy("/", false)
+	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
+	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
+	core.clusterDataProxy = core.clusterDataRoot.GetProxy("/", false)
+	core.localDataProxy = core.localDataRoot.GetProxy("/", false)
 	return &core
 }
 
@@ -79,7 +78,7 @@
 	log.Info("core-stopped")
 }
 
-//startGRPCService creates the grpc service handler, registers it to the grpc server
+//startGRPCService creates the grpc service handlers, registers it to the grpc server
 // and starts the server
 func (core *Core) startGRPCService(ctx context.Context) {
 	//	create an insecure gserver server
@@ -129,7 +128,7 @@
 	requestProxy := NewAdapterRequestHandlerProxy(dMgr, ldMgr, cdProxy, ldProxy)
 	core.kmp.SubscribeWithTarget(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
 
-	log.Info("request-handler")
+	log.Info("request-handlers")
 	return nil
 }
 
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 805dd21..d9dacbc 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -17,6 +17,9 @@
 
 import (
 	"context"
+	"reflect"
+	"sync"
+
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
@@ -24,7 +27,6 @@
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-	"reflect"
 )
 
 type DeviceAgent struct {
@@ -33,64 +35,160 @@
 	adapterProxy     *AdapterProxy
 	deviceMgr        *DeviceManager
 	clusterDataProxy *model.Proxy
+	deviceProxy      *model.Proxy
 	exitChannel      chan int
+	lockDevice       sync.RWMutex
 }
 
 func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
 	var agent DeviceAgent
-	device.Id = CreateDeviceId()
-	agent.deviceId = device.Id
 	agent.adapterProxy = ap
-	agent.lastData = device
+	cloned := (proto.Clone(device)).(*voltha.Device)
+	cloned.Id = CreateDeviceId()
+	cloned.AdminState = voltha.AdminState_PREPROVISIONED
+	agent.deviceId = cloned.Id
+	agent.lastData = cloned
 	agent.deviceMgr = deviceMgr
 	agent.exitChannel = make(chan int, 1)
 	agent.clusterDataProxy = cdProxy
+	agent.lockDevice = sync.RWMutex{}
 	return &agent
 }
 
 func (agent *DeviceAgent) start(ctx context.Context) {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
 	log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
 	// Add the initial device to the local model
 	if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
 		log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
 	}
+	agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
+	//agent.deviceProxy = agent.clusterDataProxy.Root.Node.GetProxy("/", false)
+	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
 	log.Debug("device-agent-started")
 }
 
 func (agent *DeviceAgent) Stop(ctx context.Context) {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
 	log.Debug("stopping-device-agent")
 	agent.exitChannel <- 1
 	log.Debug("device-agent-stopped")
 }
 
+func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+		if d, ok := device.(*voltha.Device); ok {
+			cloned := proto.Clone(d).(*voltha.Device)
+			return cloned, nil
+		}
+	}
+	return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
+//getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
+// This function is meant so that we do not have duplicate code all over the device agent functions
+func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
+	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+		if d, ok := device.(*voltha.Device); ok {
+			cloned := proto.Clone(d).(*voltha.Device)
+			return cloned, nil
+		}
+	}
+	return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+}
+
 func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
-	log.Debugw("enableDevice", log.Fields{"id": agent.lastData.Id, "device": agent.lastData})
-	// Update the device status
-	if device, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("enableDevice", log.Fields{"id": agent.deviceId})
+	if device, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
-		cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
-		cloned.AdminState = voltha.AdminState_ENABLED
-		cloned.OperStatus = voltha.OperStatus_ACTIVATING
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
-		} else {
-			if err := agent.adapterProxy.AdoptDevice(ctx, &cloned); err != nil {
-				log.Debugw("enableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		if device.AdminState == voltha.AdminState_ENABLED {
+			log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceId})
+			//TODO:  Needs customized error message
+			return nil
+		}
+		// Verify whether we need to adopt the device the first time
+		// TODO: A state machine for these state transitions would be better (we just have to handle
+		// a limited set of states now or it may be an overkill)
+		if device.AdminState == voltha.AdminState_PREPROVISIONED {
+			// First send the request to an Adapter and wait for a response
+			if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
+				log.Debugw("adoptDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
 				return err
 			}
-			agent.lastData = &cloned
+		} else {
+			// First send the request to an Adapter and wait for a response
+			if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
+				log.Debugw("renableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+				return err
+			}
+		}
+		// Received an Ack (no error found above).  Now update the device in the model to the expected state
+		cloned := proto.Clone(device).(*voltha.Device)
+		cloned.AdminState = voltha.AdminState_ENABLED
+		cloned.OperStatus = voltha.OperStatus_ACTIVATING
+		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 	}
 	return nil
 }
 
-func (agent *DeviceAgent) getNNIPorts(ctx context.Context) *voltha.Ports {
-	log.Debugw("getNNIPorts", log.Fields{"id": agent.deviceId})
+func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
+	agent.lockDevice.Lock()
+	//defer agent.lockDevice.Unlock()
+	log.Debugw("disableDevice", log.Fields{"id": agent.deviceId})
+	// Get the most up to date the device info
+	if device, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		if device.AdminState == voltha.AdminState_DISABLED {
+			log.Debugw("device-already-disabled", log.Fields{"id": agent.deviceId})
+			//TODO:  Needs customized error message
+			agent.lockDevice.Unlock()
+			return nil
+		}
+		// First send the request to an Adapter and wait for a response
+		if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
+			log.Debugw("disableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			agent.lockDevice.Unlock()
+			return err
+		}
+		// Received an Ack (no error found above).  Now update the device in the model to the expected state
+		cloned := proto.Clone(device).(*voltha.Device)
+		cloned.AdminState = voltha.AdminState_DISABLED
+		// Set the state of all ports on that device to disable
+		for _, port := range cloned.Ports {
+			port.AdminState = voltha.AdminState_DISABLED
+			port.OperStatus = voltha.OperStatus_UNKNOWN
+		}
+		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+			agent.lockDevice.Unlock()
+			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+		}
+		agent.lockDevice.Unlock()
+		//TODO: callback will be invoked to handle this state change
+		//For now force the state transition to happen
+		if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
+			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
+			return err
+		}
+	}
+	return nil
+}
+
+func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+	log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
 	ports := &voltha.Ports{}
 	if device, _ := agent.deviceMgr.getDevice(agent.deviceId); device != nil {
 		for _, port := range device.Ports {
-			if port.Type == voltha.Port_ETHERNET_NNI {
+			if port.Type == portType {
 				ports.Items = append(ports.Items, port)
 			}
 		}
@@ -128,15 +226,25 @@
 	}
 }
 
+func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
+	log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
+	log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
+	return nil
+}
+
 func (agent *DeviceAgent) updateDevice(device *voltha.Device) error {
+	agent.lockDevice.Lock()
+	//defer agent.lockDevice.Unlock()
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
 	// Get the dev info from the model
-	if storedData, err := agent.deviceMgr.getDevice(device.Id); err != nil {
+	if storedData, err := agent.getDeviceWithoutLock(); err != nil {
+		agent.lockDevice.Unlock()
 		return status.Errorf(codes.NotFound, "%s", device.Id)
 	} else {
 		// store the changed data
-		cloned := (proto.Clone(device)).(*voltha.Device)
+		cloned := proto.Clone(device).(*voltha.Device)
 		afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+		agent.lockDevice.Unlock()
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", device.Id)
 		}
@@ -149,26 +257,77 @@
 	}
 }
 
-func (agent *DeviceAgent) updateDeviceState(operState *core_adapter.IntType, connState *core_adapter.IntType) error {
+func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
+	agent.lockDevice.Lock()
+	//defer agent.lockDevice.Unlock()
 	// Work only on latest data
-	if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+		agent.lockDevice.Unlock()
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
 		// clone the device
-		cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
-		if operState != nil {
-			cloned.OperStatus = voltha.OperStatus_OperStatus(operState.Val)
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
+		if s, ok := voltha.ConnectStatus_ConnectStatus_value[connStatus.String()]; ok {
+			log.Debugw("updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
+			cloned.ConnectStatus = connStatus
 		}
-		if connState != nil {
-			cloned.ConnectStatus = voltha.ConnectStatus_ConnectStatus(connState.Val)
+		if s, ok := voltha.OperStatus_OperStatus_value[operStatus.String()]; ok {
+			log.Debugw("updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
+			cloned.OperStatus = operStatus
 		}
-		log.Debugw("DeviceStateUpdate-device", log.Fields{"device": cloned})
+		log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
 		// Store the device
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+			agent.lockDevice.Unlock()
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
+		agent.lockDevice.Unlock()
 		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storeDevice, &cloned); err != nil {
+		if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
+			log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
+			return err
+		}
+		return nil
+	}
+}
+
+func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
+	agent.lockDevice.Lock()
+	//defer agent.lockDevice.Unlock()
+	// Work only on latest data
+	// TODO: Get list of ports from device directly instead of the entire device
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+		agent.lockDevice.Unlock()
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		// clone the device
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
+		if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
+			agent.lockDevice.Unlock()
+			return status.Errorf(codes.InvalidArgument, "%s", portType)
+		}
+		for _, port := range cloned.Ports {
+			if port.Type == portType && port.PortNo == portNo {
+				port.OperStatus = operStatus
+				// Set the admin status to ENABLED if the operational status is ACTIVE
+				// TODO: Set by northbound system?
+				if operStatus == voltha.OperStatus_ACTIVE {
+					port.AdminState = voltha.AdminState_ENABLED
+				}
+				break
+			}
+		}
+		log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
+		// Store the device
+		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+			agent.lockDevice.Unlock()
+			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		}
+		agent.lockDevice.Unlock()
+		// Perform the state transition
+		if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
 			log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
 			return err
 		}
@@ -177,17 +336,18 @@
 }
 
 func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
 	log.Debug("updatePmConfigs")
 	// Work only on latest data
-	if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
 		// clone the device
-		cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
-		cp := proto.Clone(pmConfigs)
-		cloned.PmConfigs = cp.(*voltha.PmConfigs)
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
 		// Store the device
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
+		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
@@ -196,21 +356,57 @@
 }
 
 func (agent *DeviceAgent) addPort(port *voltha.Port) error {
-	log.Debug("addPort")
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
 	// Work only on latest data
-	if storeDevice, err := agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
 		// clone the device
-		cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
 		if cloned.Ports == nil {
 			//	First port
+			log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
 			cloned.Ports = make([]*voltha.Port, 0)
 		}
-		cp := proto.Clone(port)
-		cloned.Ports = append(cloned.Ports, cp.(*voltha.Port))
+		cp := proto.Clone(port).(*voltha.Port)
+		// Set the admin state of the port to ENABLE if the operational state is ACTIVE
+		// TODO: Set by northbound system?
+		if cp.OperStatus == voltha.OperStatus_ACTIVE {
+			cp.AdminState = voltha.AdminState_ENABLED
+		}
+		cloned.Ports = append(cloned.Ports, cp)
 		// Store the device
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, "")
+		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+		if afterUpdate == nil {
+			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		}
+		return nil
+	}
+}
+
+func (agent *DeviceAgent) addPeerPort(port *voltha.Port_PeerPort) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debug("addPeerPort")
+	// Work only on latest data
+	if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		// clone the device
+		cloned := proto.Clone(storeDevice).(*voltha.Device)
+		// Get the peer port on the device based on the port no
+		for _, peerPort := range cloned.Ports {
+			if peerPort.PortNo == port.PortNo { // found port
+				cp := proto.Clone(port).(*voltha.Port_PeerPort)
+				peerPort.Peers = append(peerPort.Peers, cp)
+				log.Debugw("found-peer", log.Fields{"portNo": port.PortNo, "deviceId": agent.deviceId})
+				break
+			}
+		}
+		// Store the device
+		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
@@ -220,12 +416,14 @@
 
 // TODO: A generic device update by attribute
 func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
 	if value == nil {
 		return
 	}
 	var storeDevice *voltha.Device
 	var err error
-	if storeDevice, err = agent.deviceMgr.getDevice(agent.deviceId); err != nil {
+	if storeDevice, err = agent.getDeviceWithoutLock(); err != nil {
 		return
 	}
 	updated := false
@@ -247,10 +445,10 @@
 			}
 		}
 	}
-	log.Debugw("update-field-status", log.Fields{"device": storeDevice, "name": name, "updated": updated})
+	log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
 	//	Save the data
-	cloned := reflect.ValueOf(storeDevice).Elem().Interface().(voltha.Device)
-	if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, &cloned, false, ""); afterUpdate == nil {
+	cloned := proto.Clone(storeDevice).(*voltha.Device)
+	if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 		log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
 	}
 	return
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index fd18c10..e3dbed2 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -18,6 +18,7 @@
 import (
 	"context"
 	"errors"
+	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/kafka"
@@ -95,18 +96,18 @@
 }
 
 func (dMgr *DeviceManager) createDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
-	log.Debugw("createDevice-start", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
+	log.Debugw("createDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
 
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
 	dMgr.addDeviceAgentToMap(agent)
 	agent.start(ctx)
 
-	sendResponse(ctx, ch, nil)
+	sendResponse(ctx, ch, agent.lastData)
 }
 
 func (dMgr *DeviceManager) enableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
-	log.Debugw("enableDevice-start", log.Fields{"deviceid": id})
+	log.Debugw("enableDevice", log.Fields{"deviceid": id})
 
 	var res interface{}
 	if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
@@ -119,33 +120,44 @@
 	sendResponse(ctx, ch, res)
 }
 
-func (dMgr *DeviceManager) getDevice(id string) (*voltha.Device, error) {
-	log.Debugw("getDevice-start", log.Fields{"deviceid": id})
+func (dMgr *DeviceManager) disableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
+	log.Debugw("disableDevice", log.Fields{"deviceid": id})
 
-	if device := dMgr.clusterDataProxy.Get("/devices/"+id, 1, false, ""); device == nil {
-		return nil, status.Errorf(codes.NotFound, "%s", id)
+	var res interface{}
+	if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
+		res = agent.disableDevice(ctx)
+		log.Debugw("disableDevice-result", log.Fields{"result": res})
 	} else {
-		cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
-		return &cloned, nil
+		res = status.Errorf(codes.NotFound, "%s", id.Id)
 	}
+
+	sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) getDevice(id string) (*voltha.Device, error) {
+	log.Debugw("getDevice", log.Fields{"deviceid": id})
+	if agent := dMgr.getDeviceAgent(id); agent != nil {
+		return agent.getDevice()
+	}
+	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
 
 func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
-	log.Debug("ListDevices-start")
+	log.Debug("ListDevices")
 	result := &voltha.Devices{}
 	dMgr.lockDeviceAgentsMap.Lock()
 	defer dMgr.lockDeviceAgentsMap.Unlock()
 	for _, agent := range dMgr.deviceAgents {
-		if device := dMgr.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
-			cloned := reflect.ValueOf(device).Elem().Interface().(voltha.Device)
-			result.Items = append(result.Items, &cloned)
+		if device, err := agent.getDevice(); err == nil {
+			cloned := proto.Clone(device).(*voltha.Device)
+			result.Items = append(result.Items, cloned)
 		}
 	}
 	return result, nil
 }
 
 func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
-	log.Debugw("updateDevice-start", log.Fields{"deviceid": device.Id, "device": device})
+	log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
 
 	if agent := dMgr.getDeviceAgent(device.Id); agent != nil {
 		return agent.updateDevice(device)
@@ -155,9 +167,23 @@
 
 func (dMgr *DeviceManager) addPort(deviceId string, port *voltha.Port) error {
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent.addPort(port)
+		if err := agent.addPort(port); err != nil {
+			return err
+		}
+		//	Setup peer ports
+		meAsPeer := &voltha.Port_PeerPort{DeviceId: deviceId, PortNo: port.PortNo}
+		for _, peerPort := range port.Peers {
+			if agent := dMgr.getDeviceAgent(peerPort.DeviceId); agent != nil {
+				if err := agent.addPeerPort(meAsPeer); err != nil {
+					log.Errorw("failed-to-add-peer", log.Fields{"peer-device-id": peerPort.DeviceId})
+					return err
+				}
+			}
+		}
+		return nil
+	} else {
+		return status.Errorf(codes.NotFound, "%s", deviceId)
 	}
-	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
 func (dMgr *DeviceManager) updatePmConfigs(deviceId string, pmConfigs *voltha.PmConfigs) error {
@@ -168,7 +194,7 @@
 }
 
 func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*core_adapter.SwitchCapability, error) {
-	log.Debugw("getSwitchCapability-start", log.Fields{"deviceid": deviceId})
+	log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
 
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
 		return agent.getSwitchCapability(ctx)
@@ -176,17 +202,18 @@
 	return nil, status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-func (dMgr *DeviceManager) getNNIPorts(ctx context.Context, deviceId string) (*voltha.Ports, error) {
-	log.Debugw("getNNIPorts-start", log.Fields{"deviceid": deviceId})
+func (dMgr *DeviceManager) getPorts(ctx context.Context, deviceId string, portType voltha.Port_PortType) (*voltha.Ports, error) {
+	log.Debugw("getPorts", log.Fields{"deviceid": deviceId, "portType": portType})
 
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent.getNNIPorts(ctx), nil
+		return agent.getPorts(ctx, portType), nil
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+
 }
 
 func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*core_adapter.PortCapability, error) {
-	log.Debugw("getPortCapability-start", log.Fields{"deviceid": deviceId})
+	log.Debugw("getPortCapability", log.Fields{"deviceid": deviceId})
 
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
 		return agent.getPortCapability(ctx, portNo)
@@ -194,20 +221,27 @@
 	return nil, status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-func (dMgr *DeviceManager) updateDeviceState(deviceId string, operState *core_adapter.IntType, connState *core_adapter.IntType) error {
-	log.Debugw("updateDeviceState-start", log.Fields{"deviceid": deviceId, "operState": operState, "connState": connState})
+func (dMgr *DeviceManager) updateDeviceStatus(deviceId string, operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
+	log.Debugw("updateDeviceStatus", log.Fields{"deviceid": deviceId, "operStatus": operStatus, "connStatus": connStatus})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent.updateDeviceState(operState, connState)
+		return agent.updateDeviceStatus(operStatus, connStatus)
+	}
+	return status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
+func (dMgr *DeviceManager) updatePortState(deviceId string, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
+	log.Debugw("updatePortState", log.Fields{"deviceid": deviceId, "portType": portType, "portNo": portNo, "operStatus": operStatus})
+	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+		return agent.updatePortState(portType, portNo, operStatus)
 	}
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
 func (dMgr *DeviceManager) childDeviceDetected(parentDeviceId string, parentPortNo int64, deviceType string, channelId int64) error {
-	log.Debugw("childDeviceDetected-start", log.Fields{"parentDeviceId": parentDeviceId})
+	log.Debugw("childDeviceDetected", log.Fields{"parentDeviceId": parentDeviceId})
 
 	// Create the ONU device
 	childDevice := &voltha.Device{}
-	childDevice.Id = CreateDeviceId()
 	childDevice.Type = deviceType
 	childDevice.ParentId = parentDeviceId
 	childDevice.ParentPortNo = uint32(parentPortNo)
@@ -220,7 +254,7 @@
 	agent.start(nil)
 
 	// Activate the child device
-	if agent := dMgr.getDeviceAgent(childDevice.Id); agent != nil {
+	if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
 		return agent.enableDevice(nil)
 	}
 
@@ -229,16 +263,25 @@
 
 func (dMgr *DeviceManager) processTransition(previous *voltha.Device, current *voltha.Device) error {
 	// This will be triggered on every update to the device.
-	handler := dMgr.stateTransitions.GetTransitionHandler(previous, current)
-	if handler != nil {
-		log.Debugw("found-handler", log.Fields{"handler": funcName(handler)})
-		return handler(previous, current)
+	handlers := dMgr.stateTransitions.GetTransitionHandler(previous, current)
+	if handlers == nil {
+		log.Debugw("handlers-not-found", log.Fields{"deviceId": current.Id})
+		return nil
 	}
-	log.Debugw("handler-not-found", log.Fields{"deviceId": current.Id})
+	for _, handler := range handlers {
+		log.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
+		if err := handler(current); err != nil {
+			return err
+		}
+	}
+	//if handler != nil {
+	//	log.Debugw("found-handlers", log.Fields{"handlers": funcName(handler)})
+	//	return handler(current)
+	//}
 	return nil
 }
 
-func (dMgr *DeviceManager) createLogicalDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) createLogicalDevice(cDevice *voltha.Device) error {
 	log.Info("createLogicalDevice")
 	var logicalId *string
 	var err error
@@ -251,7 +294,80 @@
 	return nil
 }
 
-func (dMgr *DeviceManager) addUNILogicalPort(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) deleteLogicalDevice(cDevice *voltha.Device) error {
+	log.Info("deleteLogicalDevice")
+	var err error
+	if err = dMgr.logicalDeviceMgr.DeleteLogicalDevice(nil, cDevice); err != nil {
+		log.Warnw("deleteLogical-device-error", log.Fields{"deviceId": cDevice.Id})
+		return err
+	}
+	// Remove the logical device Id from the parent device
+	logicalId := ""
+	dMgr.UpdateDeviceAttribute(cDevice.Id, "ParentId", logicalId)
+	return nil
+}
+
+func (dMgr *DeviceManager) deleteLogicalPort(cDevice *voltha.Device) error {
+	log.Info("deleteLogicalPort")
+	var err error
+	if err = dMgr.logicalDeviceMgr.DeleteLogicalPort(nil, cDevice); err != nil {
+		log.Warnw("deleteLogical-port-error", log.Fields{"deviceId": cDevice.Id})
+		return err
+	}
+	//// Remove the logical device Id from the parent device
+	//logicalId := ""
+	//dMgr.UpdateDeviceAttribute(cDevice.Id, "ParentId", logicalId)
+	return nil
+}
+
+func (dMgr *DeviceManager) getParentDevice(childDevice *voltha.Device) *voltha.Device {
+	//	Sanity check
+	if childDevice.Root {
+		// childDevice is the parent device
+		return childDevice
+	}
+	parentDevice, _ := dMgr.getDevice(childDevice.ParentId)
+	return parentDevice
+}
+
+func (dMgr *DeviceManager) disableAllChildDevices(cDevice *voltha.Device) error {
+	log.Debug("disableAllChildDevices")
+	var childDeviceIds []string
+	var err error
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(cDevice); err != nil {
+		return status.Errorf(codes.NotFound, "%s", cDevice.Id)
+	}
+	if len(childDeviceIds) == 0 {
+		log.Debugw("no-child-device", log.Fields{"deviceId": cDevice.Id})
+	}
+	for _, childDeviceId := range childDeviceIds {
+		if agent := dMgr.getDeviceAgent(childDeviceId); agent != nil {
+			if err = agent.disableDevice(nil); err != nil {
+				log.Errorw("failure-disable-device", log.Fields{"deviceId": childDeviceId, "error": err.Error()})
+			}
+		}
+	}
+	return nil
+}
+
+func (dMgr *DeviceManager) getAllChildDeviceIds(cDevice *voltha.Device) ([]string, error) {
+	log.Info("getAllChildDeviceIds")
+	// Get latest device info
+	var device *voltha.Device
+	var err error
+	if device, err = dMgr.getDevice(cDevice.Id); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", cDevice.Id)
+	}
+	childDeviceIds := make([]string, 0)
+	for _, port := range device.Ports {
+		for _, peer := range port.Peers {
+			childDeviceIds = append(childDeviceIds, peer.DeviceId)
+		}
+	}
+	return childDeviceIds, nil
+}
+
+func (dMgr *DeviceManager) addUNILogicalPort(cDevice *voltha.Device) error {
 	log.Info("addUNILogicalPort")
 	if err := dMgr.logicalDeviceMgr.AddUNILogicalPort(nil, cDevice); err != nil {
 		log.Warnw("addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
@@ -260,32 +376,32 @@
 	return nil
 }
 
-func (dMgr *DeviceManager) activateDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) activateDevice(cDevice *voltha.Device) error {
 	log.Info("activateDevice")
 	return nil
 }
 
-func (dMgr *DeviceManager) disableDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
-	log.Info("disableDevice")
+func (dMgr *DeviceManager) disableDeviceHandler(cDevice *voltha.Device) error {
+	log.Info("disableDevice-donothing")
 	return nil
 }
 
-func (dMgr *DeviceManager) abandonDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) abandonDevice(cDevice *voltha.Device) error {
 	log.Info("abandonDevice")
 	return nil
 }
 
-func (dMgr *DeviceManager) reEnableDevice(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) reEnableDevice(cDevice *voltha.Device) error {
 	log.Info("reEnableDevice")
 	return nil
 }
 
-func (dMgr *DeviceManager) noOp(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) noOp(cDevice *voltha.Device) error {
 	log.Info("noOp")
 	return nil
 }
 
-func (dMgr *DeviceManager) notAllowed(pDevice *voltha.Device, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) notAllowed(pcDevice *voltha.Device) error {
 	log.Info("notAllowed")
 	return errors.New("Transition-not-allowed")
 }
@@ -304,7 +420,7 @@
 
 func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) *string {
 	if device, _ := dMgr.getDevice(deviceId); device != nil {
-		log.Infow("GetParentDeviceId", log.Fields{"device": device})
+		log.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
 		return &device.ParentId
 	}
 	return nil
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index 0f0239c..a84f03a 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -34,13 +34,13 @@
 	Operational voltha.OperStatus_OperStatus
 }
 
-type TransitionHandler func(*voltha.Device, *voltha.Device) error
+type TransitionHandler func(*voltha.Device) error
 
 type Transition struct {
 	deviceType    DeviceType
 	previousState DeviceState
 	currentState  DeviceState
-	handler       TransitionHandler
+	handlers      []TransitionHandler
 }
 
 type TransitionMap struct {
@@ -55,88 +55,93 @@
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.activateDevice})
+			handlers:      []TransitionHandler{dMgr.activateDevice}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.notAllowed})
+			handlers:      []TransitionHandler{dMgr.notAllowed}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.activateDevice})
+			handlers:      []TransitionHandler{dMgr.activateDevice}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.notAllowed})
+			handlers:      []TransitionHandler{dMgr.notAllowed}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.notAllowed})
+			handlers:      []TransitionHandler{dMgr.notAllowed}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    parent,
 			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
 			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
-			handler:       dMgr.createLogicalDevice})
+			handlers:      []TransitionHandler{dMgr.createLogicalDevice}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    child,
 			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVATING},
 			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_ACTIVE},
-			handler:       dMgr.addUNILogicalPort})
+			handlers:      []TransitionHandler{dMgr.addUNILogicalPort}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
-			deviceType:    any,
+			deviceType:    parent,
 			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.disableDevice})
-
+			handlers:      []TransitionHandler{dMgr.deleteLogicalDevice, dMgr.disableAllChildDevices}})
+	transitionMap.transitions = append(transitionMap.transitions,
+		Transition{
+			deviceType:    child,
+			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+			currentState:  DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+			handlers:      []TransitionHandler{dMgr.deleteLogicalPort}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.abandonDevice})
+			handlers:      []TransitionHandler{dMgr.abandonDevice}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.notAllowed})
+			handlers:      []TransitionHandler{dMgr.notAllowed}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.abandonDevice})
+			handlers:      []TransitionHandler{dMgr.abandonDevice}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.reEnableDevice})
+			handlers:      []TransitionHandler{dMgr.reEnableDevice}})
 
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.notAllowed})
+			handlers:      []TransitionHandler{dMgr.notAllowed}})
 
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_DOWNLOADING_IMAGE, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_DISABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handler:       dMgr.notAllowed})
+			handlers:      []TransitionHandler{dMgr.notAllowed}})
 
 	return &transitionMap
 }
@@ -146,38 +151,38 @@
 }
 
 // isMatched matches a state transition.  It returns whether there is a match and if there is whether it is an exact match
-func getHandler(previous *DeviceState, current *DeviceState, transition *Transition) (TransitionHandler, bool) {
+func getHandler(previous *DeviceState, current *DeviceState, transition *Transition) ([]TransitionHandler, bool) {
 
 	// Do we have an exact match?
 	if *previous == transition.previousState && *current == transition.currentState {
-		return transition.handler, true
+		return transition.handlers, true
 	}
 	// If the admin state changed then prioritize it first
 	if previous.Admin != current.Admin {
 		if previous.Admin == transition.previousState.Admin && current.Admin == transition.currentState.Admin {
-			return transition.handler, false
+			return transition.handlers, false
 		}
 	}
 	// If the operational state changed then prioritize it in second position
 	if previous.Operational != current.Operational {
 		if previous.Operational == transition.previousState.Operational && current.Operational == transition.currentState.Operational {
-			return transition.handler, false
+			return transition.handlers, false
 		}
 	}
 	// If the connection state changed then prioritize it in third position
 	if previous.Connection != current.Connection {
 		if previous.Connection == transition.previousState.Connection && current.Connection == transition.currentState.Connection {
-			return transition.handler, false
+			return transition.handlers, false
 		}
 	}
 	return nil, false
 }
 
-func (tMap *TransitionMap) GetTransitionHandler(pDevice *voltha.Device, cDevice *voltha.Device) TransitionHandler {
+func (tMap *TransitionMap) GetTransitionHandler(pDevice *voltha.Device, cDevice *voltha.Device) []TransitionHandler {
 	//1. Get the previous and current set of states
 	pState := getDeviceStates(pDevice)
 	cState := getDeviceStates(cDevice)
-	log.Infow("DeviceType", log.Fields{"device": pDevice})
+	//log.Infow("DeviceType", log.Fields{"device": pDevice})
 	deviceType := parent
 	if !pDevice.Root {
 		log.Info("device is child")
@@ -186,8 +191,8 @@
 	log.Infof("deviceType:%d-deviceId:%s-previous:%v-current:%v", deviceType, pDevice.Id, pState, cState)
 
 	//2. Go over transition array to get the right transition
-	var currentMatch TransitionHandler
-	var tempHandler TransitionHandler
+	var currentMatch []TransitionHandler
+	var tempHandler []TransitionHandler
 	var exactMatch bool
 	var deviceTypeMatch bool
 	for _, aTransition := range tMap.transitions {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index bd28322..d446438 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -48,19 +48,14 @@
 
 func (handler *APIHandler) UpdateLogLevel(ctx context.Context, logging *voltha.Logging) (*empty.Empty, error) {
 	log.Debugw("UpdateLogLevel-request", log.Fields{"newloglevel": logging.Level, "intval": int(logging.Level)})
-	if isTestMode(ctx) {
-		out := new(empty.Empty)
-		log.SetPackageLogLevel(logging.PackageName, int(logging.Level))
-		return out, nil
-	}
-	return nil, errors.New("Unimplemented")
-
+	out := new(empty.Empty)
+	log.SetPackageLogLevel(logging.PackageName, int(logging.Level))
+	return out, nil
 }
 
 func processEnableDevicePort(ctx context.Context, id *voltha.LogicalPortId, ch chan error) {
 	log.Debugw("processEnableDevicePort", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
 	ch <- status.Errorf(100, "%d-%s", 100, "erreur")
-
 }
 
 func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
@@ -141,15 +136,17 @@
 	go handler.deviceMgr.createDevice(ctx, device, ch)
 	select {
 	case res := <-ch:
-		if res == nil {
-			return &voltha.Device{Id: device.Id}, nil
-		} else if err, ok := res.(error); ok {
-			return &voltha.Device{Id: device.Id}, err
-		} else {
-			log.Warnw("create-device-unexpected-return-type", log.Fields{"result": res})
-			err = status.Errorf(codes.Internal, "%s", res)
-			return &voltha.Device{Id: device.Id}, err
+		if res != nil {
+			if err, ok := res.(error); ok {
+				return &voltha.Device{}, err
+			}
+			if d, ok := res.(*voltha.Device); ok {
+				return d, nil
+			}
 		}
+		log.Warnw("create-device-unexpected-return-type", log.Fields{"result": res})
+		err := status.Errorf(codes.Internal, "%s", res)
+		return &voltha.Device{}, err
 	case <-ctx.Done():
 		log.Debug("createdevice-client-timeout")
 		return nil, ctx.Err()
@@ -188,6 +185,24 @@
 		out := new(empty.Empty)
 		return out, nil
 	}
+	ch := make(chan interface{})
+	defer close(ch)
+	go handler.deviceMgr.disableDevice(ctx, id, ch)
+	select {
+	case res := <-ch:
+		if res == nil {
+			return new(empty.Empty), nil
+		} else if err, ok := res.(error); ok {
+			return new(empty.Empty), err
+		} else {
+			log.Warnw("disable-device-unexpected-return-type", log.Fields{"result": res})
+			err = status.Errorf(codes.Internal, "%s", res)
+			return new(empty.Empty), err
+		}
+	case <-ctx.Done():
+		log.Debug("enabledevice-client-timeout")
+		return nil, ctx.Err()
+	}
 	return nil, errors.New("Unimplemented")
 }
 
diff --git a/rw_core/core/grpc_nbi_api_handler_client_test.go b/rw_core/core/grpc_nbi_api_handler_client_test.go
index 6a32ee5..21300cc 100644
--- a/rw_core/core/grpc_nbi_api_handler_client_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_client_test.go
@@ -39,7 +39,6 @@
 Prerequite:  These tests require the rw_core to run prior to executing these test cases.
 */
 
-
 func setup() {
 	var err error
 
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 117c869..218478f 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -25,17 +25,18 @@
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-	"reflect"
+	"sync"
 )
 
 type LogicalDeviceAgent struct {
-	logicalDeviceId  string
-	lastData         *voltha.LogicalDevice
-	rootDeviceId     string
-	deviceMgr        *DeviceManager
-	ldeviceMgr       *LogicalDeviceManager
-	clusterDataProxy *model.Proxy
-	exitChannel      chan int
+	logicalDeviceId   string
+	lastData          *voltha.LogicalDevice
+	rootDeviceId      string
+	deviceMgr         *DeviceManager
+	ldeviceMgr        *LogicalDeviceManager
+	clusterDataProxy  *model.Proxy
+	exitChannel       chan int
+	lockLogicalDevice sync.RWMutex
 }
 
 func NewLogicalDeviceAgent(id string, device *voltha.Device, ldeviceMgr *LogicalDeviceManager, deviceMgr *DeviceManager,
@@ -47,11 +48,12 @@
 	agent.deviceMgr = deviceMgr
 	agent.clusterDataProxy = cdProxy
 	agent.ldeviceMgr = ldeviceMgr
+	agent.lockLogicalDevice = sync.RWMutex{}
 	return &agent
 }
 
 func (agent *LogicalDeviceAgent) Start(ctx context.Context) error {
-	log.Info("starting-logical_device-agent")
+	log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 	//Build the logical device based on information retrieved from the device adapter
 	var switchCap *ca.SwitchCapability
 	var err error
@@ -68,7 +70,7 @@
 	//hence. may need to extract the port by the NNI port id defined by the adapter during device
 	//creation
 	var nniPorts *voltha.Ports
-	if nniPorts, err = agent.deviceMgr.getNNIPorts(ctx, agent.rootDeviceId); err != nil {
+	if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
 		log.Errorw("error-creating-logical-port", log.Fields{"error": err})
 	}
 	var portCap *ca.PortCapability
@@ -80,8 +82,11 @@
 		}
 
 		lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+		lp.DeviceId = agent.rootDeviceId
 		ld.Ports = append(ld.Ports, lp)
 	}
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
 	// Save the logical device
 	if added := agent.clusterDataProxy.Add("/logical_devices", ld, ""); added == nil {
 		log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
@@ -92,8 +97,30 @@
 	return nil
 }
 
+func (agent *LogicalDeviceAgent) getLogicalDevice() (*voltha.LogicalDevice, error) {
+	log.Debug("getLogicalDevice")
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+		cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
+		return cloned, nil
+	}
+	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
+func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
+	log.Debug("getLogicalDeviceWithoutLock")
+	logicalDevice := agent.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
+	if lDevice, ok := logicalDevice.(*voltha.LogicalDevice); ok {
+		cloned := proto.Clone(lDevice).(*voltha.LogicalDevice)
+		return cloned, nil
+	}
+	return nil, status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceId)
+}
+
 func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, portNo uint32) error {
-	log.Info("addUNILogicalPort-start")
+	log.Infow("addUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
 	var portCap *ca.PortCapability
 	var err error
@@ -101,29 +128,67 @@
 		log.Errorw("error-creating-logical-port", log.Fields{"error": err})
 		return err
 	}
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
 	// Get stored logical device
-	if ldevice, err := agent.ldeviceMgr.getLogicalDevice(agent.logicalDeviceId); err != nil {
+	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
 		return status.Error(codes.NotFound, agent.logicalDeviceId)
 	} else {
-		cloned := reflect.ValueOf(ldevice).Elem().Interface().(voltha.LogicalDevice)
-		lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+		cloned := proto.Clone(ldevice).(*voltha.LogicalDevice)
+		lp := proto.Clone(portCap.Port).(*voltha.LogicalPort)
+		lp.DeviceId = childDevice.Id
 		cloned.Ports = append(cloned.Ports, lp)
-		afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, &cloned, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "failed-add-UNI-port:%s", agent.logicalDeviceId)
-		}
+		return agent.updateLogicalDeviceWithoutLock(cloned)
+	}
+}
+
+//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
+	cloned := proto.Clone(logicalDevice).(*voltha.LogicalDevice)
+	afterUpdate := agent.clusterDataProxy.Update("/logical_devices/"+agent.logicalDeviceId, cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceId)
+	}
+	return nil
+}
+
+// deleteLogicalPort removes the logical port associated with a child device
+func (agent *LogicalDeviceAgent) deleteLogicalPort(device *voltha.Device) error {
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	// Get the most up to date logical device
+	var logicaldevice *voltha.LogicalDevice
+	if logicaldevice, _ = agent.getLogicalDeviceWithoutLock(); logicaldevice == nil {
+		log.Debugw("no-logical-device", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "deviceId": device.Id})
 		return nil
 	}
+	index := -1
+	for i, logicalPort := range logicaldevice.Ports {
+		if logicalPort.DeviceId == device.Id {
+			index = i
+			break
+		}
+	}
+	if index >= 0 {
+		copy(logicaldevice.Ports[index:], logicaldevice.Ports[index+1:])
+		logicaldevice.Ports[len(logicaldevice.Ports)-1] = nil
+		logicaldevice.Ports = logicaldevice.Ports[:len(logicaldevice.Ports)-1]
+		log.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		return agent.updateLogicalDeviceWithoutLock(logicaldevice)
+	}
+	return nil
 }
 
 func (agent *LogicalDeviceAgent) Stop(ctx context.Context) {
 	log.Info("stopping-logical_device-agent")
+	agent.lockLogicalDevice.Lock()
+	defer agent.lockLogicalDevice.Unlock()
+	//Remove the logical device from the model
+	if removed := agent.clusterDataProxy.Remove("/logical_devices/"+agent.logicalDeviceId, ""); removed == nil {
+		log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+	} else {
+		log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+	}
 	agent.exitChannel <- 1
 	log.Info("logical_device-agent-stopped")
 }
-
-func (agent *LogicalDeviceAgent) getLogicalDevice(ctx context.Context) *voltha.LogicalDevice {
-	log.Debug("getLogicalDevice")
-	cp := proto.Clone(agent.lastData)
-	return cp.(*voltha.LogicalDevice)
-}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index aa22d57..bef078c 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -24,7 +24,6 @@
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-	"reflect"
 	"strings"
 	"sync"
 )
@@ -78,33 +77,36 @@
 	return nil
 }
 
+func (ldMgr *LogicalDeviceManager) deleteLogicalDeviceAgent(logicalDeviceId string) {
+	ldMgr.lockLogicalDeviceAgentsMap.Lock()
+	defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+	delete(ldMgr.logicalDeviceAgents, logicalDeviceId)
+}
+
+// getLogicalDevice provides a cloned most up to date logical device
 func (ldMgr *LogicalDeviceManager) getLogicalDevice(id string) (*voltha.LogicalDevice, error) {
-	log.Debugw("getlogicalDevice-start", log.Fields{"logicaldeviceid": id})
-	logicalDevice := ldMgr.clusterDataProxy.Get("/logical_devices/"+id, 1, false, "")
-	if logicalDevice != nil {
-		cloned := reflect.ValueOf(logicalDevice).Elem().Interface().(voltha.LogicalDevice)
-		return &cloned, nil
+	log.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
+	if agent := ldMgr.getLogicalDeviceAgent(id); agent != nil {
+		return agent.getLogicalDevice()
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
 
 func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
-	log.Debug("listLogicalDevices-start")
+	log.Debug("listLogicalDevices")
 	result := &voltha.LogicalDevices{}
 	ldMgr.lockLogicalDeviceAgentsMap.Lock()
 	defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
 	for _, agent := range ldMgr.logicalDeviceAgents {
-		logicalDevice := ldMgr.clusterDataProxy.Get("/logical_devices/"+agent.logicalDeviceId, 1, false, "")
-		if logicalDevice != nil {
-			cloned := reflect.ValueOf(logicalDevice).Elem().Interface().(voltha.LogicalDevice)
-			result.Items = append(result.Items, &cloned)
+		if lDevice, err := agent.getLogicalDevice(); err == nil {
+			result.Items = append(result.Items, lDevice)
 		}
 	}
 	return result, nil
 }
 
 func (ldMgr *LogicalDeviceManager) CreateLogicalDevice(ctx context.Context, device *voltha.Device) (*string, error) {
-	log.Infow("creating-logical-device-start", log.Fields{"deviceId": device.Id})
+	log.Debugw("creating-logical-device", log.Fields{"deviceId": device.Id})
 	// Sanity check
 	if !device.Root {
 		return nil, errors.New("Device-not-root")
@@ -116,18 +118,74 @@
 	// in the Device model.  May need to be moved out.
 	macAddress := device.MacAddress
 	id := strings.Replace(macAddress, ":", "", -1)
-	log.Debugw("setting-logical-device-id", log.Fields{"logicaldeviceId": id})
+	if id == "" {
+		log.Errorw("mac-address-not-set", log.Fields{"deviceId": device.Id})
+		return nil, errors.New("mac-address-not-set")
+	}
+	log.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
 
 	agent := NewLogicalDeviceAgent(id, device, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
 	ldMgr.addLogicalDeviceAgentToMap(agent)
 	go agent.Start(ctx)
 
-	log.Info("creating-logical-device-ends")
+	log.Debug("creating-logical-device-ends")
 	return &id, nil
 }
 
+func (ldMgr *LogicalDeviceManager) DeleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
+	log.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
+	// Sanity check
+	if !device.Root {
+		return errors.New("Device-not-root")
+	}
+	logDeviceId := device.ParentId
+	if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
+		// Stop the logical device agent
+		agent.Stop(ctx)
+		//Remove the logical device agent from the Map
+		ldMgr.deleteLogicalDeviceAgent(logDeviceId)
+	}
+
+	log.Debug("deleting-logical-device-ends")
+	return nil
+}
+
+func (ldMgr *LogicalDeviceManager) getLogicalDeviceId(device *voltha.Device) (*string, error) {
+	// Device can either be a parent or a child device
+	if device.Root {
+		// Parent device.  The ID of a parent device is the logical device ID
+		return &device.ParentId, nil
+	}
+	// Device is child device
+	//	retrieve parent device using child device ID
+	if parentDevice := ldMgr.deviceMgr.getParentDevice(device); parentDevice != nil {
+		return &parentDevice.ParentId, nil
+	}
+	return nil, status.Errorf(codes.NotFound, "%s", device.Id)
+}
+
+// DeleteLogicalDevice removes the logical port associated with a child device
+func (ldMgr *LogicalDeviceManager) DeleteLogicalPort(ctx context.Context, device *voltha.Device) error {
+	log.Debugw("deleting-logical-port", log.Fields{"deviceId": device.Id})
+	// Sanity check
+	if device.Root {
+		return errors.New("Device-root")
+	}
+	logDeviceId, _ := ldMgr.getLogicalDeviceId(device)
+	if logDeviceId == nil {
+		log.Debugw("no-logical-device-present", log.Fields{"deviceId": device.Id})
+		return nil
+	}
+	if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
+		agent.deleteLogicalPort(device)
+	}
+
+	log.Debug("deleting-logical-port-ends")
+	return nil
+}
+
 func (ldMgr *LogicalDeviceManager) AddUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
-	log.Infow("AddUNILogicalPort-start", log.Fields{"deviceId": childDevice.Id})
+	log.Debugw("AddUNILogicalPort", log.Fields{"deviceId": childDevice.Id})
 	// Sanity check
 	if childDevice.Root {
 		return errors.New("Device-root")
@@ -137,7 +195,7 @@
 	parentId := childDevice.ParentId
 	logDeviceId := ldMgr.deviceMgr.GetParentDeviceId(parentId)
 
-	log.Infow("AddUNILogicalPort", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
+	log.Debugw("AddUNILogicalPort", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
 
 	if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
 		return agent.addUNILogicalPort(ctx, childDevice, childDevice.ProxyAddress.ChannelId)
diff --git a/rw_core/main.go b/rw_core/main.go
index 25adee6..f53a6ba 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -221,7 +221,7 @@
 	}
 
 	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
-	log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.WarnLevel)
 
 	defer log.CleanUp()
 
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
index 99ba2c4..b31f2ce 100644
--- a/tests/kafka/kafka_inter_container_messaging_test.go
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -42,12 +42,12 @@
 	log.SetAllLogLevel(log.ErrorLevel)
 
 	coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
-		kk.KafkaHost("192.168.0.20"),
+		kk.KafkaHost("192.168.0.17"),
 		kk.KafkaPort(9092),
 		kk.DefaultTopic(&kk.Topic{Name: "Core"}))
 
 	adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
-		kk.KafkaHost("192.168.0.20"),
+		kk.KafkaHost("192.168.0.17"),
 		kk.KafkaPort(9092),
 		kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
 
@@ -186,7 +186,7 @@
 
 func TestGetDevice(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoMsg := &voltha.ID{Id:trnsId}
+	protoMsg := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 1)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -212,7 +212,7 @@
 
 func TestGetDeviceTimeout(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoMsg := &voltha.ID{Id:trnsId}
+	protoMsg := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 1)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -237,7 +237,7 @@
 
 func TestGetChildDevice(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoMsg := &voltha.ID{Id:trnsId}
+	protoMsg := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 1)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -260,7 +260,7 @@
 
 func TestGetChildDevices(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoMsg := &voltha.ID{Id:trnsId}
+	protoMsg := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 1)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -283,7 +283,7 @@
 
 func TestGetPorts(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoArg1 := &voltha.ID{Id:trnsId}
+	protoArg1 := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 2)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -311,7 +311,7 @@
 
 func TestGetPortsMissingArgs(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoArg1 := &voltha.ID{Id:trnsId}
+	protoArg1 := &voltha.ID{Id: trnsId}
 	args := make([]*kk.KVArg, 1)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
@@ -443,6 +443,36 @@
 	assert.NotNil(t, unpackResult)
 }
 
+func TestDeviceStateChange(t *testing.T) {
+	log.SetAllLogLevel(log.DebugLevel)
+	trnsId := uuid.New().String()
+	protoArg1 := &voltha.ID{Id: trnsId}
+	args := make([]*kk.KVArg, 4)
+	args[0] = &kk.KVArg{
+		Key:   "device_id",
+		Value: protoArg1,
+	}
+	protoArg2 := &ca.IntType{Val: 1}
+	args[1] = &kk.KVArg{
+		Key:   "oper_status",
+		Value: protoArg2,
+	}
+	protoArg3 := &ca.IntType{Val: 1}
+	args[2] = &kk.KVArg{
+		Key:   "connect_status",
+		Value: protoArg3,
+	}
+
+	rpc := "DeviceStateUpdate"
+	topic := kk.Topic{Name: "Core"}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, true)
+	assert.Nil(t, result)
+}
+
 func TestStopKafkaProxy(t *testing.T) {
 	adapterKafkaProxy.Stop()
 	coreKafkaProxy.Stop()