diff --git a/adapters/common/utils.go b/adapters/common/utils.go
index 98468b0..810a3d0 100644
--- a/adapters/common/utils.go
+++ b/adapters/common/utils.go
@@ -45,3 +45,29 @@
 		rand.Intn(128),
 	)
 }
+
+const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
+const (
+	letterIdxBits = 6                    // 6 bits to represent a letter index
+	letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
+	letterIdxMax  = 63 / letterIdxBits   // # of letter indices fitting in 63 bits
+)
+
+var src = rand.NewSource(time.Now().UnixNano())
+
+func GetRandomString(n int) string {
+	b := make([]byte, n)
+	// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
+	for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
+		if remain == 0 {
+			cache, remain = src.Int63(), letterIdxMax
+		}
+		if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
+			b[i] = letterBytes[idx]
+			i--
+		}
+		cache >>= letterIdxBits
+		remain--
+	}
+	return string(b)
+}
\ No newline at end of file
diff --git a/compose/rw_core_concurrency_test.yml b/compose/rw_core_concurrency_test.yml
new file mode 100644
index 0000000..8f6f35e
--- /dev/null
+++ b/compose/rw_core_concurrency_test.yml
@@ -0,0 +1,71 @@
+---
+# Copyright 2019 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2'
+services:
+  rw_core1:
+    image: voltha-rw-core
+    entrypoint:
+        - /app/rw_core
+        - -kv_store_type=etcd
+        - -kv_store_host=${DOCKER_HOST_IP}
+        - -kv_store_port=2379
+        - -grpc_port=50057
+        - -banner=true
+        - -kafka_adapter_host=${DOCKER_HOST_IP}
+        - -kafka_adapter_port=9092
+        - -kafka_cluster_host=${DOCKER_HOST_IP}
+        - -kafka_cluster_port=9092
+        - -rw_core_topic=rwcore
+        - -kv_store_data_prefix=service/voltha
+        - -in_competing_mode=true
+        - -timeout_long_request=3000
+        - -timeout_request=300
+        - -log_level=0
+    ports:
+      - 50057:50057
+    volumes:
+    - "/var/run/docker.sock:/tmp/docker.sock"
+    networks:
+    - default
+
+  rw_core2:
+    image: voltha-rw-core
+    entrypoint:
+      - /app/rw_core
+      - -kv_store_type=etcd
+      - -kv_store_host=${DOCKER_HOST_IP}
+      - -kv_store_port=2379
+      - -grpc_port=50057
+      - -banner=true
+      - -kafka_adapter_host=${DOCKER_HOST_IP}
+      - -kafka_adapter_port=9092
+      - -kafka_cluster_host=${DOCKER_HOST_IP}
+      - -kafka_cluster_port=9092
+      - -rw_core_topic=rwcore
+      - -kv_store_data_prefix=service/voltha
+      - -in_competing_mode=true
+      - -timeout_long_request=3000
+      - -timeout_request=300
+      - -log_level=0
+    ports:
+      - 50058:50057
+    volumes:
+      - "/var/run/docker.sock:/tmp/docker.sock"
+    networks:
+      - default
+networks:
+  default:
+    driver: bridge
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 05d0af5..4359f7d 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -41,6 +41,10 @@
 	DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
 )
 
+const (
+	TransactionKey = "transactionID"
+)
+
 // requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
 // obtained from that channel, this interface is invoked.   This is used to handle
 // async requests into the Core via the kafka messaging bus
@@ -592,6 +596,25 @@
 	return
 }
 
+func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+	arg := &KVArg{
+		Key:   TransactionKey,
+		Value: &ic.StrType{Val: transactionId},
+	}
+
+	var marshalledArg *any.Any
+	var err error
+	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
+		log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+		return currentArgs
+	}
+	protoArg := &ic.Argument{
+		Key:   arg.Key,
+		Value: marshalledArg,
+	}
+	return append(currentArgs, protoArg)
+}
+
 func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
@@ -607,6 +630,9 @@
 		} else {
 			log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
 			// let the callee unpack the arguments as its the only one that knows the real proto type
+			// Augment the requestBody with the message Id as it will be used in scenarios where cores
+			// are set in pairs and competing
+			requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
 			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
 			if err != nil {
 				log.Warn(err)
@@ -623,7 +649,6 @@
 				returnedValues = make([]interface{}, 1)
 				returnedValues[0] = returnError
 			} else {
-				//log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
 				returnedValues = make([]interface{}, 0)
 				// Check for errors first
 				lastIndex := len(out) - 1
@@ -635,6 +660,8 @@
 						returnError = &ic.Error{Reason: "incorrect-error-returns"}
 						returnedValues = append(returnedValues, returnError)
 					}
+				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil()  {
+					return // Ignore case - when core is in competing mode
 				} else { // Non-error case
 					success = true
 					for idx, val := range out {
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 55c68a3..698fefa 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -459,7 +459,7 @@
 	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
 	select {
 	case ok := <-sc.producer.Successes():
-		log.Debugw("message-sent", log.Fields{"status": ok})
+		log.Debugw("message-sent", log.Fields{"status": ok.Topic})
 	case notOk := <-sc.producer.Errors():
 		log.Debugw("error-sending", log.Fields{"status": notOk})
 		return notOk
@@ -786,7 +786,7 @@
 				// Channel closed
 				break startloop
 			}
-			log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
+			log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
 			msgBody := msg.Value
 			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index d4d3c69..0be4e12 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -139,7 +139,7 @@
 //loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
 func (aMgr *AdapterManager) loadAdaptersAndDevicetypesInMemory() {
 	// Load the adapters
-	if adaptersIf := aMgr.clusterDataProxy.Get("/adapters", 0, false, ""); adaptersIf != nil {
+	if adaptersIf := aMgr.clusterDataProxy.List("/adapters", 0, false, ""); adaptersIf != nil {
 		for _, adapterIf := range adaptersIf.([]interface{}) {
 			if adapter, ok := adapterIf.(*voltha.Adapter); ok {
 				log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
@@ -153,7 +153,7 @@
 	}
 
 	// Load the device types
-	if deviceTypesIf := aMgr.clusterDataProxy.Get("/device_types", 0, false, ""); deviceTypesIf != nil {
+	if deviceTypesIf := aMgr.clusterDataProxy.List("/device_types", 0, false, ""); deviceTypesIf != nil {
 		dTypes := &voltha.DeviceTypes{Items:[]*voltha.DeviceType{}}
 		for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
 			if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
@@ -169,6 +169,31 @@
 	}
 }
 
+
+//updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
+func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory() {
+	// Update the adapters
+	if adaptersIf := aMgr.clusterDataProxy.List("/adapters", 0, false, ""); adaptersIf != nil {
+		for _, adapterIf := range adaptersIf.([]interface{}) {
+			if adapter, ok := adapterIf.(*voltha.Adapter); ok {
+				log.Debugw("found-existing-adapter", log.Fields{"adapterId": adapter.Id})
+				aMgr.updateAdapter(adapter)
+			}
+		}
+	}
+	// Update the device types
+	if deviceTypesIf := aMgr.clusterDataProxy.List("/device_types", 0, false, ""); deviceTypesIf != nil {
+		dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
+		for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
+			if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
+				log.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
+				aMgr.updateDeviceType(dType)
+			}
+		}
+	}
+}
+
+
 func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) {
 	aMgr.lockAdaptersMap.Lock()
 	defer aMgr.lockAdaptersMap.Unlock()
@@ -232,7 +257,9 @@
 	defer aMgr.lockAdaptersMap.Unlock()
 	for _, adapterAgent := range aMgr.adapterAgents {
 		if a := adapterAgent.getAdapter(); a != nil {
-			result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+			if a.Id != SENTINEL_ADAPTER_ID { // don't report the sentinel
+				result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+			}
 		}
 	}
 	return result, nil
@@ -270,7 +297,7 @@
 	defer aMgr.lockAdaptersMap.Unlock()
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
-	if adapterAgent, exist := aMgr.adapterAgents[deviceType.Adapter]; !exist {
+	if adapterAgent, exist := aMgr.adapterAgents[deviceType.Adapter]; exist {
 		adapterAgent.updateDeviceType(deviceType)
 	} else {
 		aMgr.adapterAgents[deviceType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: deviceType.Adapter},
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 98cc688..5fede68 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -22,6 +22,7 @@
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-go/kafka"
 	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
@@ -36,9 +37,14 @@
 	adapterMgr *AdapterManager
 	localDataProxy   *model.Proxy
 	clusterDataProxy *model.Proxy
+	defaultRequestTimeout int64
+	longRunningRequestTimeout int64
+	coreInCompetingMode bool
 }
 
-func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
+func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
+	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout int64,
+	defaultRequestTimeout int64) *AdapterRequestHandlerProxy {
 	var proxy AdapterRequestHandlerProxy
 	proxy.coreInstanceId = coreInstanceId
 	proxy.deviceMgr = dMgr
@@ -46,17 +52,43 @@
 	proxy.clusterDataProxy = cdProxy
 	proxy.localDataProxy = ldProxy
 	proxy.adapterMgr = aMgr
+	proxy.coreInCompetingMode = incompetingMode
+	proxy.defaultRequestTimeout = defaultRequestTimeout
+	proxy.longRunningRequestTimeout = longRunningRequestTimeout
 	return &proxy
 }
 
+func (rhp *AdapterRequestHandlerProxy) acquireTransaction(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
+	timeout := rhp.defaultRequestTimeout
+	if len(maxTimeout) > 0 {
+		timeout = maxTimeout[0]
+	}
+	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+	txn := NewKVTransaction(transactionId)
+	if txn == nil {
+		return nil,  errors.New("fail-to-create-transaction")
+	} else if txn.Acquired(timeout) {
+		return txn, nil
+	} else {
+		return nil, errors.New("failed-to-seize-request")
+	}
+}
+
+// competeForTransaction is a helper function to determine whether every request needs to compete with another
+// Core to execute the request
+func (rhp *AdapterRequestHandlerProxy) competeForTransaction() bool {
+	return rhp.coreInCompetingMode
+}
+
 func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
-	if len(args) != 2 {
+	if len(args) != 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	adapter := &voltha.Adapter{}
 	deviceTypes := &voltha.DeviceTypes{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "adapter":
@@ -69,9 +101,27 @@
 				log.Warnw("cannot-unmarshal-device-types", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "coreId": rhp.coreInstanceId})
+	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionID": transactionID.Val, "coreId": rhp.coreInstanceId})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// Update our adapters in memory
+			go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory()
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
@@ -80,17 +130,40 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
-	if len(args) != 1 {
+	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
+
 	pID := &voltha.ID{}
-	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
-		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device_id":
+			if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+				log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
-	log.Debugw("GetDevice", log.Fields{"deviceId": pID.Id})
+	log.Debugw("GetDevice", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return &voltha.Device{Id: pID.Id}, nil
@@ -125,17 +198,40 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 1 {
+	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
+
 	device := &voltha.Device{}
-	if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
-		log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
-	log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
+	log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return new(empty.Empty), nil
@@ -154,32 +250,56 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ic.Argument) (*voltha.Device, error) {
-	if len(args) < 1 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
+
 	pID := &voltha.ID{}
-	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
-		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device_id":
+			if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+				log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
-	log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id})
+	log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return &voltha.Device{Id: pID.Id}, nil
 	}
-	return nil, nil
+	return rhp.deviceMgr.GetDevice(pID.Id)
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
-	if len(args) != 2 {
+	if len(args) != 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	deviceId := &voltha.ID{}
 	pt := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -192,9 +312,14 @@
 				log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val})
+	log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val, "transactionID": transactionID.Val})
 	if rhp.TestMode { // Execute only for test cases
 		aPort := &voltha.Port{Label: "test_port"}
 		allPorts := &voltha.Ports{}
@@ -204,31 +329,54 @@
 	return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
 }
 
-func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Device, error) {
-	if len(args) != 1 {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Devices, error) {
+	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
+
 	pID := &voltha.ID{}
-	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
-		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device_id":
+			if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+				log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
-	log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id})
+	log.Debugw("GetChildDevices", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
-		return &voltha.Device{Id: pID.Id}, nil
+		return &voltha.Devices{Items:nil}, nil
 	}
-	//TODO: Complete
-	return nil, nil
+
+	return rhp.deviceMgr.getAllChildDevices(pID.Id)
 }
 
 // ChildDeviceDetected is invoked when a child device is detected.  The following
 // parameters are expected:
 // {parent_device_id, parent_port_no, child_device_type, proxy_address, admin_state, **kw)
 func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 4 {
+	if len(args) < 5 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -238,6 +386,7 @@
 	portNo := &ic.IntType{}
 	dt := &ic.StrType{}
 	chnlId := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "parent_device_id":
@@ -260,10 +409,26 @@
 				log.Warnw("cannot-unmarshal-channel-id", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 	log.Debugw("ChildDeviceDetected", log.Fields{"parentDeviceId": pID.Id, "parentPortNo": portNo.Val,
-		"deviceType": dt.Val, "channelId": chnlId.Val})
+		"deviceType": dt.Val, "channelId": chnlId.Val, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
@@ -275,7 +440,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 2 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -283,6 +448,7 @@
 	deviceId := &voltha.ID{}
 	operStatus := &ic.IntType{}
 	connStatus := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -300,9 +466,27 @@
 				log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
+	log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus,
+		"conn-status": connStatus, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
+
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
@@ -312,7 +496,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 2 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -320,6 +504,7 @@
 	deviceId := &voltha.ID{}
 	operStatus := &ic.IntType{}
 	connStatus := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -337,9 +522,27 @@
 				log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("ChildrenStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
+	log.Debugw("ChildrenStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus,
+		"conn-status": connStatus, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
+
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
@@ -350,7 +553,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 2 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -359,6 +562,7 @@
 	portType := &ic.IntType{}
 	portNo := &ic.IntType{}
 	operStatus := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -381,10 +585,27 @@
 				log.Warnw("cannot-unmarshal-portno", log.Fields{"error": err})
 				return nil, err
 			}
-
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus, "portType": portType, "portNo": portNo})
+	log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus,
+		"portType": portType, "portNo": portNo, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
+
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
@@ -393,13 +614,14 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 2 {
+	if len(args) != 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	deviceId := &voltha.ID{}
 	port := &voltha.Port{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -412,9 +634,25 @@
 				log.Warnw("cannot-unmarshal-port", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("PortCreated", log.Fields{"deviceId": deviceId.Id, "port": port})
+	log.Debugw("PortCreated", log.Fields{"deviceId": deviceId.Id, "port": port, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
@@ -426,13 +664,14 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 2 {
+	if len(args) != 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	pmConfigs := &voltha.PmConfigs{}
 	init := &ic.BoolType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_pm_config":
@@ -445,10 +684,26 @@
 				log.Warnw("cannot-unmarshal-boolean", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 	log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
-		"init": init})
+		"init": init,  "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
@@ -461,7 +716,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 3 {
+	if len(args) < 4 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -469,6 +724,7 @@
 	deviceId := &voltha.ID{}
 	portNo := &ic.IntType{}
 	packet := &ic.Packet{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -486,14 +742,23 @@
 				log.Warnw("cannot-unmarshal-packet", log.Fields{"error": err})
 				return nil, err
 			}
-
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet})
+	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet,
+		"transactionID": transactionID.Val})
+
+	// For performance reason, we do not compete for packet-in.  We process it and send the packet in.  later in the
+	// processing flow the duplicate packet will be discarded
+
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
-	go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), packet.Payload)
+	go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
 	return new(empty.Empty), nil
 }
 
@@ -506,6 +771,7 @@
 	}
 	deviceId := &voltha.ID{}
 	img := &voltha.ImageDownload{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -518,9 +784,27 @@
 				log.Warnw("cannot-unmarshal-imgaeDownload", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("UpdateImageDownload", log.Fields{"deviceId": deviceId.Id, "image-download": img})
+	log.Debugw("UpdateImageDownload", log.Fields{"deviceId": deviceId.Id, "image-download": img,
+		"transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
+
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c81141b..838235d 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -154,9 +154,11 @@
 	return nil
 }
 
-func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
-	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) error {
-	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy)
+func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
+	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
+	) error {
+	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+		core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
 	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
 
 	log.Info("request-handlers")
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 8bf8664..116e2bb 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -54,10 +54,10 @@
 	cloned := (proto.Clone(device)).(*voltha.Device)
 	if cloned.Id == "" {
 		cloned.Id = CreateDeviceId()
+		cloned.AdminState = voltha.AdminState_PREPROVISIONED
+		cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
+		cloned.Flows = &ofp.Flows{Items: nil}
 	}
-	cloned.AdminState = voltha.AdminState_PREPROVISIONED
-	cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
-	cloned.Flows = &ofp.Flows{Items: nil}
 	if !device.GetRoot() && device.ProxyAddress != nil {
 		// Set the default vlan ID to the one specified by the parent adapter.  It can be
 		// overwritten by the child adapter during a device update request
@@ -74,15 +74,29 @@
 	return &agent
 }
 
-// start save the device to the data model and registers for callbacks on that device
-func (agent *DeviceAgent) start(ctx context.Context) {
+// start save the device to the data model and registers for callbacks on that device if loadFromdB is false.  Otherwise,
+// it will load the data from the dB and setup teh necessary callbacks and proxies.
+func (agent *DeviceAgent) start(ctx context.Context, loadFromdB bool) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
-	log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
-	// Add the initial device to the local model
-	if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
-		log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
+	log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceId})
+	if loadFromdB {
+		if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+			if d, ok := device.(*voltha.Device); ok {
+				agent.lastData = proto.Clone(d).(*voltha.Device)
+			}
+		} else {
+			log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
+			return status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+		}
+		log.Debugw("device-loaded-from-dB", log.Fields{"device": agent.lastData})
+	} else {
+		// Add the initial device to the local model
+		if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
+			log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
+		}
 	}
+
 	agent.deviceProxy = agent.clusterDataProxy.Root.CreateProxy("/devices/"+agent.deviceId, false)
 	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
 
@@ -97,6 +111,7 @@
 	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
 
 	log.Debug("device-agent-started")
+	return nil
 }
 
 // stop stops the device agent.  Not much to do for now
@@ -112,7 +127,7 @@
 func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
-	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
 		if d, ok := device.(*voltha.Device); ok {
 			cloned := proto.Clone(d).(*voltha.Device)
 			return cloned, nil
@@ -124,7 +139,7 @@
 // getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
 // This function is meant so that we do not have duplicate code all over the device agent functions
 func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
-	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
 		if d, ok := device.(*voltha.Device); ok {
 			cloned := proto.Clone(d).(*voltha.Device)
 			return cloned, nil
@@ -808,7 +823,7 @@
 }
 
 //flowTableUpdated is the callback after flows have been updated in the model to push them
-//to the adapterAgents
+//to the adapters
 func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
@@ -842,7 +857,7 @@
 	}
 	groups := device.FlowGroups
 
-	// Send update to adapterAgents
+	// Send update to adapters
 	dType := agent.adapterMgr.getDeviceType(device.Type)
 	if !dType.AcceptsAddRemoveFlowUpdates {
 		if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
@@ -882,7 +897,7 @@
 }
 
 //groupTableUpdated is the callback after group table has been updated in the model to push them
-//to the adapterAgents
+//to the adapters
 func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
@@ -916,11 +931,9 @@
 	}
 	flows := device.Flows
 
-	// Send update to adapterAgents
-	// TODO: Check whether the device supports incremental flow changes
-	// Assume false for test
-	acceptsAddRemoveFlowUpdates := false
-	if !acceptsAddRemoveFlowUpdates {
+	// Send update to adapters
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if !dType.AcceptsAddRemoveFlowUpdates {
 		if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
 			log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
 			return err
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index cd662ac..88b0c7d 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -34,7 +34,7 @@
 type DeviceManager struct {
 	deviceAgents        map[string]*DeviceAgent
 	adapterProxy        *AdapterProxy
-	adapterMgr *AdapterManager
+	adapterMgr          *AdapterManager
 	logicalDeviceMgr    *LogicalDeviceManager
 	kafkaICProxy        *kafka.InterContainerProxy
 	stateTransitions    *TransitionMap
@@ -52,7 +52,7 @@
 	deviceMgr.kafkaICProxy = kafkaICProxy
 	deviceMgr.coreInstanceId = coreInstanceId
 	deviceMgr.clusterDataProxy = cdProxy
-	deviceMgr.adapterMgr= adapterMgr
+	deviceMgr.adapterMgr = adapterMgr
 	deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
 	return &deviceMgr
 }
@@ -96,16 +96,27 @@
 	delete(dMgr.deviceAgents, agent.deviceId)
 }
 
+// getDeviceAgent returns the agent managing the device.  If the device is not in memory, it will loads it, if it exists
 func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
-	// TODO If the device is not in memory it needs to be loaded first
 	dMgr.lockDeviceAgentsMap.Lock()
-	defer dMgr.lockDeviceAgentsMap.Unlock()
 	if agent, ok := dMgr.deviceAgents[deviceId]; ok {
+		dMgr.lockDeviceAgentsMap.Unlock()
 		return agent
+	} else {
+		//	Try to load into memory - loading will also create the device agent
+		dMgr.lockDeviceAgentsMap.Unlock()
+		if err := dMgr.load(deviceId); err == nil {
+			dMgr.lockDeviceAgentsMap.Lock()
+			defer dMgr.lockDeviceAgentsMap.Unlock()
+			if agent, ok = dMgr.deviceAgents[deviceId]; ok {
+				return agent
+			}
+		}
 	}
 	return nil
 }
 
+// listDeviceIdsFromMap returns the list of device IDs that are in memory
 func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
 	dMgr.lockDeviceAgentsMap.Lock()
 	defer dMgr.lockDeviceAgentsMap.Unlock()
@@ -122,7 +133,7 @@
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
 	dMgr.addDeviceAgentToMap(agent)
-	agent.start(ctx)
+	agent.start(ctx, false)
 
 	sendResponse(ctx, ch, agent.lastData)
 }
@@ -133,8 +144,6 @@
 	if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
 		res = agent.enableDevice(ctx)
 		log.Debugw("EnableDevice-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id.Id)
 	}
 
 	sendResponse(ctx, ch, res)
@@ -181,6 +190,7 @@
 	sendResponse(ctx, ch, res)
 }
 
+// GetDevice will returns a device, either from memory or from the dB, if present
 func (dMgr *DeviceManager) GetDevice(id string) (*voltha.Device, error) {
 	log.Debugw("GetDevice", log.Fields{"deviceid": id})
 	if agent := dMgr.getDeviceAgent(id); agent != nil {
@@ -189,6 +199,13 @@
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
 
+func (dMgr *DeviceManager) IsDeviceInCache(id string) bool {
+	dMgr.lockDeviceAgentsMap.Lock()
+	defer dMgr.lockDeviceAgentsMap.Unlock()
+	_, exist := dMgr.deviceAgents[id]
+	return exist
+}
+
 func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
 	device, err := dMgr.GetDevice(id)
 	if err != nil {
@@ -203,10 +220,15 @@
 	result := &voltha.Devices{}
 	if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
 		for _, device := range devices.([]interface{}) {
-			if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
-				agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
-				dMgr.addDeviceAgentToMap(agent)
-				agent.start(nil)
+			// If device is not in memory then set it up
+			if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
+				agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+				if err := agent.start(nil, true); err != nil {
+					log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
+					agent.stop(nil)
+				} else {
+					dMgr.addDeviceAgentToMap(agent)
+				}
 			}
 			result.Items = append(result.Items, device.(*voltha.Device))
 		}
@@ -214,6 +236,97 @@
 	return result, nil
 }
 
+// loadDevice loads the deviceId in memory, if not present
+func (dMgr *DeviceManager) loadDevice(deviceId string) (*DeviceAgent, error) {
+	log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
+	// Sanity check
+	if deviceId == "" {
+		return nil, status.Error(codes.InvalidArgument, "deviceId empty")
+	}
+	if !dMgr.IsDeviceInCache(deviceId) {
+		agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy)
+		if err := agent.start(nil, true); err != nil {
+			agent.stop(nil)
+			return nil, err
+		}
+		dMgr.addDeviceAgentToMap(agent)
+	}
+	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+		return agent, nil
+	}
+	return nil, status.Error(codes.NotFound, deviceId)  // This should nto happen
+}
+
+// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
+func (dMgr *DeviceManager) loadRootDeviceParentAndChildren(device *voltha.Device) error {
+	log.Debugw("loading-parent-and-children", log.Fields{"deviceId": device.Id})
+	if device.Root {
+		// Scenario A
+		if device.ParentId != "" {
+			//	 Load logical device if needed.
+			if err := dMgr.logicalDeviceMgr.load(device.ParentId); err != nil {
+				log.Warnw("failure-loading-logical-device", log.Fields{"lDeviceId": device.ParentId})
+			}
+		} else {
+			log.Debugw("no-parent-to-load", log.Fields{"deviceId": device.Id})
+		}
+		//	Load all child devices, if needed
+		if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
+			for _, childDeviceId := range childDeviceIds {
+				if _, err := dMgr.loadDevice(childDeviceId); err != nil {
+					log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId})
+					return err
+				}
+			}
+			log.Debugw("loaded-children", log.Fields{"deviceId": device.Id, "numChildren": len(childDeviceIds)})
+		} else {
+			log.Debugw("no-child-to-load", log.Fields{"deviceId": device.Id})
+		}
+	}
+	return nil
+}
+
+// load loads the deviceId in memory, if not present, and also loads its accompanying parents and children.  Loading
+// in memory is for improved performance.  It is not imperative that a device needs to be in memory when a request
+// acting on the device is received by the core. In such a scenario, the Core will load the device in memory first
+// and the proceed with the request.
+func (dMgr *DeviceManager) load(deviceId string) error {
+	log.Debug("load...")
+	// First load the device - this may fail in case the device was deleted intentionally by the other core
+	var dAgent *DeviceAgent
+	var err error
+	if dAgent, err = dMgr.loadDevice(deviceId); err != nil {
+		log.Warnw("failure-loading-device", log.Fields{"deviceId": deviceId})
+		return err
+	}
+	// Get the loaded device details
+	var device *voltha.Device
+	if device, err = dAgent.getDevice(); err != nil {
+		return err
+	}
+
+	// If the device is in Pre-provisioning or deleted state stop here
+	if device.AdminState == voltha.AdminState_PREPROVISIONED || device.AdminState == voltha.AdminState_DELETED {
+		return nil
+	}
+
+	// Now we face two scenarios
+	if device.Root {
+		// Load all children as well as the parent of this device (logical_device)
+		if err := dMgr.loadRootDeviceParentAndChildren(device); err != nil {
+			log.Warnw("failure-loading-device-parent-and-children", log.Fields{"deviceId": deviceId})
+			return err
+		}
+		log.Debugw("successfully-loaded-parent-and-children", log.Fields{"deviceId": deviceId})
+	} else {
+		//	Scenario B - use the parentId of that device (root device) to trigger the loading
+		if device.ParentId != "" {
+			return dMgr.load(device.ParentId)
+		}
+	}
+	return nil
+}
+
 // ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
 func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
 	log.Debug("ListDeviceIDs")
@@ -230,17 +343,17 @@
 		reconciled := 0
 		for _, id := range ids.Items {
 			//	 Act on the device only if its not present in the agent map
-			if agent := dMgr.getDeviceAgent(id.Id); agent == nil {
+			if !dMgr.IsDeviceInCache(id.Id) {
 				//	Device Id not in memory
 				log.Debugw("reconciling-device", log.Fields{"id": id.Id})
-				// Load device from model
-				if device := dMgr.clusterDataProxy.Get("/devices/"+id.Id, 0, false, ""); device != nil {
-					agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
-					dMgr.addDeviceAgentToMap(agent)
-					agent.start(nil)
-					reconciled += 1
+				// Load device from dB
+				agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id:id.Id}, dMgr, dMgr.clusterDataProxy)
+				if err := agent.start(nil, true); err != nil {
+					log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
+					agent.stop(nil)
 				} else {
-					log.Warnw("device-inexistent", log.Fields{"id": id.Id})
+					dMgr.addDeviceAgentToMap(agent)
+					reconciled += 1
 				}
 			} else {
 				reconciled += 1
@@ -393,7 +506,7 @@
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy)
 	dMgr.addDeviceAgentToMap(agent)
-	agent.start(nil)
+	agent.start(nil, false)
 
 	// Activate the child device
 	if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
@@ -430,7 +543,7 @@
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-func (dMgr *DeviceManager) PacketIn(deviceId string, port uint32, packet []byte) error {
+func (dMgr *DeviceManager) PacketIn(deviceId string, port uint32, transactionId string, packet []byte) error {
 	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId, "port": port})
 	// Get the logical device Id based on the deviceId
 	var device *voltha.Device
@@ -444,7 +557,7 @@
 		return status.Errorf(codes.FailedPrecondition, "%s", deviceId)
 	}
 
-	if err := dMgr.logicalDeviceMgr.packetIn(device.ParentId, port, packet); err != nil {
+	if err := dMgr.logicalDeviceMgr.packetIn(device.ParentId, port, transactionId, packet); err != nil {
 		return err
 	}
 	return nil
@@ -576,6 +689,23 @@
 	return childDeviceIds, nil
 }
 
+//getAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
+func (dMgr *DeviceManager) getAllChildDevices(parentDeviceId string) (*voltha.Devices, error) {
+	log.Debugw("getAllChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+	if parentDevice, err := dMgr.GetDevice(parentDeviceId); err == nil {
+		childDevices := make([]*voltha.Device, 0)
+		if childDeviceIds, er := dMgr.getAllChildDeviceIds(parentDevice); er == nil {
+			for _, deviceId := range childDeviceIds {
+				if d, e := dMgr.GetDevice(deviceId); e == nil && d != nil {
+					childDevices = append(childDevices, d)
+				}
+			}
+		}
+		return &voltha.Devices{Items: childDevices}, nil
+	}
+	return nil, status.Errorf(codes.NotFound, "%s", parentDeviceId)
+}
+
 func (dMgr *DeviceManager) addUNILogicalPort(cDevice *voltha.Device) error {
 	log.Info("addUNILogicalPort")
 	if err := dMgr.logicalDeviceMgr.addUNILogicalPort(nil, cDevice); err != nil {
@@ -585,7 +715,6 @@
 	return nil
 }
 
-
 func (dMgr *DeviceManager) downloadImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
 	log.Debugw("downloadImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
 	var res interface{}
@@ -661,7 +790,6 @@
 	sendResponse(ctx, ch, res)
 }
 
-
 func (dMgr *DeviceManager) updateImageDownload(deviceId string, img *voltha.ImageDownload) error {
 	log.Debugw("updateImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
@@ -691,7 +819,6 @@
 	return nil, status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-
 func (dMgr *DeviceManager) activateDevice(cDevice *voltha.Device) error {
 	log.Info("activateDevice")
 	return nil
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 877fcb4..f72d615 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -35,8 +35,6 @@
 //TODO:  Move this Tag into the proto file
 const OF_CONTROLLER_TAG= "voltha_backend_name"
 
-//const MAX_RESPONSE_TIME = int64(500) // milliseconds
-
 const (
 	IMAGE_DOWNLOAD = iota
 	CANCEL_IMAGE_DOWNLOAD     = iota
@@ -44,6 +42,15 @@
 	REVERT_IMAGE = iota
 )
 
+
+type deviceID struct {
+	id string
+}
+
+type logicalDeviceID struct {
+	id string
+}
+
 type APIHandler struct {
 	deviceMgr        *DeviceManager
 	logicalDeviceMgr *LogicalDeviceManager
@@ -118,7 +125,7 @@
 	return handler.coreInCompetingMode
 }
 
-func (handler *APIHandler) acquireTransaction(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+func (handler *APIHandler) acquireTransaction(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := handler.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
@@ -130,6 +137,20 @@
 	} else if txn.Acquired(timeout) {
 		return txn, nil
 	} else {
+		if id != nil {
+			// The id can either be a device Id or a logical device id.
+			if dId, ok := id.(*deviceID); ok {
+				// Since this core has not processed this request, let's load the device, along with its extended
+				// family (parents and children) in memory.   This will keep this core in-sync with its paired core as
+				// much as possible. The watch feature in the core model will ensure that the contents of those objects in
+				// memory are in sync.
+				time.Sleep(2 * time.Second)
+				go handler.deviceMgr.load(dId.id)
+			} else if ldId, ok := id.(*logicalDeviceID); ok {
+				// This will load the logical device along with its children and grandchildren
+				go handler.logicalDeviceMgr.load(ldId.id)
+			}
+		}
 		return nil, errors.New("failed-to-seize-request")
 	}
 }
@@ -173,7 +194,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -194,7 +215,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -216,7 +237,7 @@
 
 	if handler.competeForTransaction() {
 		if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.acquireTransaction(ctx); err != nil {
+			if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
@@ -239,7 +260,7 @@
 
 	if handler.competeForTransaction() {
 		if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.acquireTransaction(ctx); err != nil {
+			if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
@@ -324,7 +345,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
 			return &voltha.Device{}, err
 		} else {
 			defer txn.Close()
@@ -361,7 +382,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, handler.longRunningRequestTimeout); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -382,7 +403,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -403,7 +424,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -424,7 +445,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -446,7 +467,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
 			return &common.OperationResp{}, err
 		} else {
 			defer txn.Close()
@@ -537,7 +558,7 @@
 	failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
 			return failedresponse, err
 		} else {
 			defer txn.Close()
@@ -670,7 +691,8 @@
 	return nil
 }
 
-func (handler *APIHandler) sendPacketIn(deviceId string, packet *openflow_13.OfpPacketIn) {
+func (handler *APIHandler) sendPacketIn(deviceId string, transationId string, packet *openflow_13.OfpPacketIn) {
+	// TODO: Augment the OF PacketIn to include the transactionId
 	packetIn := openflow_13.PacketIn{Id: deviceId, PacketIn: packet}
 	log.Debugw("sendPacketIn", log.Fields{"packetIn": packetIn})
 	// Enqueue the packet
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 7875da7..bda249f 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -66,63 +66,75 @@
 }
 
 // start creates the logical device and add it to the data model
-func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
-	log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
-	//Build the logical device based on information retrieved from the device adapter
-	var switchCap *ic.SwitchCapability
-	var err error
-	if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
-		log.Errorw("error-creating-logical-device", log.Fields{"error": err})
-		return err
-	}
-
-	ld := &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
-
-	// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
-	var datapathID uint64
-	if datapathID, err = CreateDataPathId(agent.logicalDeviceId); err != nil {
-		log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
-		return err
-	}
-	ld.DatapathId = datapathID
-	ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
-	ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
-	ld.Flows = &ofp.Flows{Items: nil}
-	ld.FlowGroups = &ofp.FlowGroups{Items: nil}
-
-	//Add logical ports to the logical device based on the number of NNI ports discovered
-	//First get the default port capability - TODO:  each NNI port may have different capabilities,
-	//hence. may need to extract the port by the NNI port id defined by the adapter during device
-	//creation
-	var nniPorts *voltha.Ports
-	if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
-		log.Errorw("error-creating-logical-port", log.Fields{"error": err})
-	}
-	var portCap *ic.PortCapability
-	for _, port := range nniPorts.Items {
-		log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
-		if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
+func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromdB bool) error {
+	log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId, "loadFromdB": loadFromdB})
+	var ld *voltha.LogicalDevice
+	if !loadFromdB {
+		//Build the logical device based on information retrieved from the device adapter
+		var switchCap *ic.SwitchCapability
+		var err error
+		if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
 			log.Errorw("error-creating-logical-device", log.Fields{"error": err})
 			return err
 		}
-		portCap.Port.RootPort = true
-		lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
-		lp.DeviceId = agent.rootDeviceId
-		lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
-		lp.OfpPort.PortNo = port.PortNo
-		lp.OfpPort.Name = lp.Id
-		lp.DevicePortNo = port.PortNo
-		ld.Ports = append(ld.Ports, lp)
+
+		ld = &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
+
+		// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
+		var datapathID uint64
+		if datapathID, err = CreateDataPathId(agent.logicalDeviceId); err != nil {
+			log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
+			return err
+		}
+		ld.DatapathId = datapathID
+		ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
+		ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
+		ld.Flows = &ofp.Flows{Items: nil}
+		ld.FlowGroups = &ofp.FlowGroups{Items: nil}
+
+		//Add logical ports to the logical device based on the number of NNI ports discovered
+		//First get the default port capability - TODO:  each NNI port may have different capabilities,
+		//hence. may need to extract the port by the NNI port id defined by the adapter during device
+		//creation
+		var nniPorts *voltha.Ports
+		if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
+			log.Errorw("error-creating-logical-port", log.Fields{"error": err})
+		}
+		var portCap *ic.PortCapability
+		for _, port := range nniPorts.Items {
+			log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
+			if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
+				log.Errorw("error-creating-logical-device", log.Fields{"error": err})
+				return err
+			}
+			portCap.Port.RootPort = true
+			lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+			lp.DeviceId = agent.rootDeviceId
+			lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
+			lp.OfpPort.PortNo = port.PortNo
+			lp.OfpPort.Name = lp.Id
+			lp.DevicePortNo = port.PortNo
+			ld.Ports = append(ld.Ports, lp)
+		}
+		agent.lockLogicalDevice.Lock()
+		//defer agent.lockLogicalDevice.Unlock()
+		// Save the logical device
+		if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
+			log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+		} else {
+			log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+		}
+		agent.lockLogicalDevice.Unlock()
+	} else {
+		//	load from dB - the logical may not exist at this time.  On error, just return and the calling function
+		// will destroy this agent.
+		var err error
+		if ld, err = agent.GetLogicalDevice(); err != nil {
+			log.Warnw("failed-to-load-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+			return err
+		}
 	}
 	agent.lockLogicalDevice.Lock()
-	//defer agent.lockLogicalDevice.Unlock()
-	// Save the logical device
-	if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
-		log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
-	} else {
-		log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
-	}
-
 	agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
 		false)
@@ -1033,9 +1045,9 @@
 	}
 }
 
-func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
-	log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
+func (agent *LogicalDeviceAgent) packetIn(port uint32, transactionId string, packet []byte) {
+	log.Debugw("packet-in", log.Fields{"port": port, "packet": packet, "transactionId": transactionId})
 	packetIn := fd.MkPacketIn(port, packet)
-	agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packetIn)
+	agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, transactionId, packetIn)
 	log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
 }
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 18bc30a..b4dc7ea 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -113,7 +113,7 @@
 func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
 	log.Debug("ListAllLogicalDevices")
 	result := &voltha.LogicalDevices{}
-	if logicalDevices := ldMgr.clusterDataProxy.Get("/logical_devices", 0, false, ""); logicalDevices != nil {
+	if logicalDevices := ldMgr.clusterDataProxy.List("/logical_devices", 0, false, ""); logicalDevices != nil {
 		for _, logicalDevice := range logicalDevices.([]interface{}) {
 			if agent := ldMgr.getLogicalDeviceAgent(logicalDevice.(*voltha.LogicalDevice).Id); agent == nil {
 				agent = newLogicalDeviceAgent(
@@ -124,7 +124,7 @@
 					ldMgr.clusterDataProxy,
 				)
 				ldMgr.addLogicalDeviceAgentToMap(agent)
-				go agent.start(nil)
+				go agent.start(nil, true)
 			}
 			result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
 		}
@@ -166,12 +166,32 @@
 
 	agent := newLogicalDeviceAgent(id, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
 	ldMgr.addLogicalDeviceAgentToMap(agent)
-	go agent.start(ctx)
+	go agent.start(ctx, false)
 
 	log.Debug("creating-logical-device-ends")
 	return &id, nil
 }
 
+// load loads a logical device manager in memory
+func (ldMgr *LogicalDeviceManager) load(lDeviceId string) error {
+	log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
+	// To prevent a race condition, let's hold the logical device agent map lock.  This will prevent a loading and
+	// a create logical device callback from occurring at the same time.
+	ldMgr.lockLogicalDeviceAgentsMap.Lock()
+	defer ldMgr.lockLogicalDeviceAgentsMap.Unlock()
+	if ldAgent, _ := ldMgr.logicalDeviceAgents[lDeviceId]; ldAgent == nil {
+		// Logical device not in memory - create a temp logical device Agent and let it load from memory
+		agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy)
+		if err := agent.start(nil, true); err != nil {
+			agent.stop(nil)
+			return err
+		}
+		ldMgr.logicalDeviceAgents[agent.logicalDeviceId] = agent
+	}
+	// TODO: load the child device
+	return nil
+}
+
 func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("deleting-logical-device", log.Fields{"deviceId": device.Id})
 	// Sanity check
@@ -365,10 +385,10 @@
 	}
 }
 
-func (ldMgr *LogicalDeviceManager) packetIn(logicalDeviceId string, port uint32, packet []byte) error {
+func (ldMgr *LogicalDeviceManager) packetIn(logicalDeviceId string, port uint32, transactionId string, packet []byte) error {
 	log.Debugw("packetIn", log.Fields{"logicalDeviceId": logicalDeviceId, "port": port})
 	if agent := ldMgr.getLogicalDeviceAgent(logicalDeviceId); agent != nil {
-		agent.packetIn(port, packet)
+		agent.packetIn(port, transactionId, packet)
 	} else {
 		log.Error("logical-device-not-exist", log.Fields{"logicalDeviceId": logicalDeviceId})
 	}
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index a6b90aa..f702633 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -497,7 +497,7 @@
 		return 0
 	}
 	if md <= 0xffffffff {
-		log.Warnw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
+		log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
 		return md
 	}
 	return md & 0xffffffff
@@ -512,7 +512,7 @@
 		return 0
 	}
 	if md <= 0xffffffff {
-		log.Warnw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
+		log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
 		return md
 	}
 	return (md >> 32) & 0xffffffff
@@ -1256,20 +1256,17 @@
 		return deviceRules
 	}
 
-	var ingressDevice *voltha.Device
-	var err error
-	if ingressDevice, err = fd.deviceMgr.GetDevice(route[0].DeviceID); err != nil {
-		log.Errorw("ingress-device-not-found", log.Fields{"deviceId": route[0].DeviceID})
-		return deviceRules
-	}
-
-	isDownstream := ingressDevice.Root
-	isUpstream := !isDownstream
-
 	// Process controller bound flow
 	if outPortNo != 0 && (outPortNo&0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER) {
 		deviceRules = fd.processControllerBoundFlow(agent, route, inPortNo, outPortNo, flow)
 	} else {
+		var ingressDevice *voltha.Device
+		var err error
+		if ingressDevice, err = fd.deviceMgr.GetDevice(route[0].DeviceID); err != nil {
+			log.Errorw("ingress-device-not-found", log.Fields{"deviceId": route[0].DeviceID, "flow": flow})
+			return deviceRules
+		}
+		isUpstream := !ingressDevice.Root
 		if isUpstream {
 			deviceRules = fd.processUpstreamNonControllerBoundFlow(agent, route, inPortNo, outPortNo, flow)
 		} else if HasNextTable(flow) {
diff --git a/tests/core/grpc_nbi_api_handler_client_test.go b/tests/core/api/grpc_nbi_api_handler_client_test.go
similarity index 99%
rename from tests/core/grpc_nbi_api_handler_client_test.go
rename to tests/core/api/grpc_nbi_api_handler_client_test.go
index 420bf39..ef0defa 100644
--- a/tests/core/grpc_nbi_api_handler_client_test.go
+++ b/tests/core/api/grpc_nbi_api_handler_client_test.go
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package core
+package api
 
 import (
 	"context"
diff --git a/tests/core/concurrency/core_concurrency_test.go b/tests/core/concurrency/core_concurrency_test.go
new file mode 100644
index 0000000..d781ffc
--- /dev/null
+++ b/tests/core/concurrency/core_concurrency_test.go
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package concurrency
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/google/uuid"
+	com "github.com/opencord/voltha-go/adapters/common"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/protos/common"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+	"os"
+	"os/exec"
+	"strings"
+	"testing"
+)
+
+var conns []*grpc.ClientConn
+var stubs []voltha.VolthaServiceClient
+var volthaSerialNumberKey string
+var grpcPorts []int
+
+/*
+ This series of tests are executed with two RW_Cores
+*/
+
+var devices map[string]*voltha.Device
+
+func setup() {
+	var err error
+
+	if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+	log.SetAllLogLevel(log.ErrorLevel)
+
+	grpcPorts = []int{50057, 50058}
+	stubs = make([]voltha.VolthaServiceClient, 0)
+	conns = make([]*grpc.ClientConn, 0)
+
+	volthaSerialNumberKey = "voltha_serial_number"
+	devices = make(map[string]*voltha.Device)
+}
+
+func connectToCore(port int) (voltha.VolthaServiceClient, error)  {
+	grpcHostIP := os.Getenv("DOCKER_HOST_IP")
+	grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, port)
+	conn, err := grpc.Dial(grpcHost, grpc.WithInsecure())
+	if err != nil {
+		log.Fatalf("did not connect: %s", err)
+		return nil, errors.New("failure-to-connect")
+	}
+	conns = append(conns, conn)
+	return voltha.NewVolthaServiceClient(conn), nil
+}
+
+func setupGrpcConnection() []voltha.VolthaServiceClient {
+	// We have 2 concurrent cores.  Connect to them
+	for _, port := range grpcPorts {
+		if client, err := connectToCore(port); err == nil {
+			stubs = append(stubs, client)
+			log.Infow("connected", log.Fields{"port": port})
+		}
+	}
+	return stubs
+}
+
+func clearAllDevices(clearMap bool) {
+	for key, _ := range devices {
+		ctx := context.Background()
+		response, err := stubs[1].DeleteDevice(ctx, &voltha.ID{Id: key})
+		log.Infow("response", log.Fields{"res": response, "error": err})
+		if clearMap {
+			delete(devices, key)
+		}
+	}
+}
+
+// Verify if all ids are present in the global list of devices
+func hasAllIds(ids *voltha.IDs) bool {
+	if ids == nil && len(devices) == 0 {
+		return true
+	}
+	if ids == nil {
+		return false
+	}
+	for _, id := range ids.Items {
+		if _, exist := devices[id.Id]; !exist {
+			return false
+		}
+	}
+	return true
+}
+
+func startKafka() {
+	fmt.Println("Starting Kafka and Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func startEtcd() {
+	fmt.Println("Starting Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopKafka() {
+	fmt.Println("Stopping Kafka and Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/docker-compose-zk-kafka-test.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func stopEtcd() {
+	fmt.Println("Stopping Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/docker-compose-etcd.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func startCores() {
+	fmt.Println("Starting voltha cores ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopCores() {
+	fmt.Println("Stopping voltha cores ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/rw_core_concurrency_test.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func startSimulatedOLTAndONUAdapters() {
+	fmt.Println("Starting simulated OLT and ONU adapters ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopSimulatedOLTAndONUAdapters() {
+	fmt.Println("Stopping simulated OLT and ONU adapters ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../../compose/adapters-simulated.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+
+func sendCreateDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, device *voltha.Device, ch chan interface{} ) {
+	fmt.Println("Sending  create device ...")
+	if response, err := stub.CreateDevice(ctx, device); err != nil {
+		ch <- err
+	} else {
+		ch <- response
+	}
+}
+
+func sendEnableDeviceRequest(ctx context.Context, stub voltha.VolthaServiceClient, deviceId string, ch chan interface{} ) {
+	fmt.Println("Sending enable device ...")
+	if response, err := stub.EnableDevice(ctx, &common.ID{Id:deviceId}); err != nil {
+		ch <- err
+	} else {
+		ch <- response
+	}
+}
+
+//// createPonsimDevice sends two requests to each core and waits for both responses
+//func createPonsimDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
+//	ui := uuid.New()
+//	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+//	//preprovision_olt -t ponsim_olt -H 172.20.0.11:50060
+//	device := &voltha.Device{Type: "ponsim_olt"}
+//	device.Address = &voltha.Device_HostAndPort{HostAndPort:"172.20.0.11:50060"}
+//	ch := make(chan interface{})
+//	defer close(ch)
+//	requestNum := 0
+//	for _, stub := range stubs {
+//		go sendCreateDeviceRequest(ctx, stub, device, ch)
+//		requestNum += 1
+//	}
+//	fmt.Println("Waiting for create device response ...")
+//	receivedResponse := 0
+//	var err error
+//	var returnedDevice *voltha.Device
+//	select {
+//	case res, ok := <-ch:
+//		receivedResponse += 1
+//		if !ok {
+//		} else if er, ok := res.(error); ok {
+//			err = er
+//		} else if d, ok := res.(*voltha.Device); ok {
+//			returnedDevice = d
+//		}
+//		if receivedResponse == requestNum {
+//			break
+//		}
+//	}
+//	if returnedDevice != nil {
+//		return returnedDevice, nil
+//	}
+//	return nil, err
+//}
+
+// createDevice sends two requests to each core and waits for both responses
+func createDevice(stubs []voltha.VolthaServiceClient) (*voltha.Device, error) {
+	ui := uuid.New()
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+	randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
+	device := &voltha.Device{Type: "simulated_olt", MacAddress:randomMacAddress}
+	ch := make(chan interface{})
+	defer close(ch)
+	requestNum := 0
+	for _, stub := range stubs {
+		go sendCreateDeviceRequest(ctx, stub, device, ch)
+		requestNum += 1
+	}
+	fmt.Println("Waiting for create device response ...")
+	receivedResponse := 0
+	var err error
+	var returnedDevice *voltha.Device
+	select {
+	case res, ok := <-ch:
+		receivedResponse += 1
+		if !ok {
+		} else if er, ok := res.(error); ok {
+			err = er
+		} else if d, ok := res.(*voltha.Device); ok {
+			returnedDevice = d
+		}
+		if receivedResponse == requestNum {
+			break
+		}
+	}
+	if returnedDevice != nil {
+		return returnedDevice, nil
+	}
+	return nil, err
+}
+
+// enableDevices sends two requests to each core for each device and waits for both responses before sending another
+// enable request for a different device.
+func enableAllDevices(stubs []voltha.VolthaServiceClient) error {
+	for deviceId, val := range devices {
+		if val.AdminState == voltha.AdminState_PREPROVISIONED {
+			ui := uuid.New()
+			ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(volthaSerialNumberKey, ui.String()))
+			ch := make(chan interface{})
+			defer close(ch)
+			requestNum := 0
+			for _, stub := range stubs {
+				go sendEnableDeviceRequest(ctx, stub, deviceId, ch)
+				requestNum +=1
+			}
+			receivedResponse := 0
+			var err error
+			fmt.Println("Waiting for enable device response ...")
+			validResponseReceived := false
+			select {
+			case res, ok := <-ch:
+				receivedResponse += 1
+				if !ok {
+				} else if er, ok := res.(error); ok {
+					err = er
+				} else if _ , ok := res.(*empty.Empty); ok {
+					validResponseReceived = true
+				}
+				if receivedResponse == requestNum {
+					break
+				}
+			}
+			if validResponseReceived {
+				return nil
+			}
+			return err
+		}
+	}
+	return nil
+}
+
+
+func TestConcurrentRequests(t *testing.T) {
+	fmt.Println("Testing Concurrent requests ...")
+
+	////0. Start kafka and Ectd
+	//startKafka()
+	//startEtcd()
+	//
+	////1. Start the core
+	//startCores()
+	//
+	////2. Start the simulated adapters
+	//startSimulatedOLTAndONUAdapters()
+	//
+	//// Wait until the core and adapters sync up
+	//time.Sleep(10 * time.Second)
+
+	stubs = setupGrpcConnection()
+
+	//3.  Create the devices
+	response, err := createDevice(stubs)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	devices[response.Id] = response
+
+	//4. Enable all the devices
+	err = enableAllDevices(stubs)
+	assert.Nil(t, err)
+
+	////5. Store simulated adapters
+	//stopSimulatedOLTAndONUAdapters()
+	//
+	////6. Store the core
+	//stopCores()
+	//
+	////7. Stop Kafka and Etcd
+	//stopKafka()
+	//stopEtcd()
+}
+
+
+func shutdown() {
+	for _, conn := range conns {
+		conn.Close()
+	}
+}
+
+func TestMain(m *testing.M) {
+	setup()
+	code := m.Run()
+	shutdown()
+	os.Exit(code)
+}
