[VOL-1462] Sync data between two voltha cores in the same pair

This commit consists of the following updates:
1) Background data syncing between two cores after a transaction
is completed by one core.
2) Add transaction management to southbound APIs (adapter facing).
This is enabled got adapter registration only for now.
3) Fix an issue with flow decomposition
4) Add the rough-in to allow a packet to be send to an OFAgent
with a transaction ID.  Two cores can therefore send the same
packet and let the OFAgent discard the duplicate.  The work in
OFAgent remains.
5) Cleanups

Change-Id: Ibe9d75edb66cfd6a0954bdfeb16a7e7c8a3c53b6
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 98cc688..5fede68 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -22,6 +22,7 @@
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-go/kafka"
 	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
@@ -36,9 +37,14 @@
 	adapterMgr *AdapterManager
 	localDataProxy   *model.Proxy
 	clusterDataProxy *model.Proxy
+	defaultRequestTimeout int64
+	longRunningRequestTimeout int64
+	coreInCompetingMode bool
 }
 
-func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy) *AdapterRequestHandlerProxy {
+func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
+	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout int64,
+	defaultRequestTimeout int64) *AdapterRequestHandlerProxy {
 	var proxy AdapterRequestHandlerProxy
 	proxy.coreInstanceId = coreInstanceId
 	proxy.deviceMgr = dMgr
@@ -46,17 +52,43 @@
 	proxy.clusterDataProxy = cdProxy
 	proxy.localDataProxy = ldProxy
 	proxy.adapterMgr = aMgr
+	proxy.coreInCompetingMode = incompetingMode
+	proxy.defaultRequestTimeout = defaultRequestTimeout
+	proxy.longRunningRequestTimeout = longRunningRequestTimeout
 	return &proxy
 }
 
+func (rhp *AdapterRequestHandlerProxy) acquireTransaction(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
+	timeout := rhp.defaultRequestTimeout
+	if len(maxTimeout) > 0 {
+		timeout = maxTimeout[0]
+	}
+	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+	txn := NewKVTransaction(transactionId)
+	if txn == nil {
+		return nil,  errors.New("fail-to-create-transaction")
+	} else if txn.Acquired(timeout) {
+		return txn, nil
+	} else {
+		return nil, errors.New("failed-to-seize-request")
+	}
+}
+
+// competeForTransaction is a helper function to determine whether every request needs to compete with another
+// Core to execute the request
+func (rhp *AdapterRequestHandlerProxy) competeForTransaction() bool {
+	return rhp.coreInCompetingMode
+}
+
 func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
-	if len(args) != 2 {
+	if len(args) != 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	adapter := &voltha.Adapter{}
 	deviceTypes := &voltha.DeviceTypes{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "adapter":
@@ -69,9 +101,27 @@
 				log.Warnw("cannot-unmarshal-device-types", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "coreId": rhp.coreInstanceId})
+	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionID": transactionID.Val, "coreId": rhp.coreInstanceId})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// Update our adapters in memory
+			go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory()
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
@@ -80,17 +130,40 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
-	if len(args) != 1 {
+	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
+
 	pID := &voltha.ID{}
-	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
-		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device_id":
+			if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+				log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
-	log.Debugw("GetDevice", log.Fields{"deviceId": pID.Id})
+	log.Debugw("GetDevice", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return &voltha.Device{Id: pID.Id}, nil
@@ -125,17 +198,40 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 1 {
+	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
+
 	device := &voltha.Device{}
-	if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
-		log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
-	log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
+	log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return new(empty.Empty), nil
@@ -154,32 +250,56 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ic.Argument) (*voltha.Device, error) {
-	if len(args) < 1 {
+	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
+
 	pID := &voltha.ID{}
-	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
-		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device_id":
+			if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+				log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
-	log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id})
+	log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return &voltha.Device{Id: pID.Id}, nil
 	}
-	return nil, nil
+	return rhp.deviceMgr.GetDevice(pID.Id)
 }
 
 func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
-	if len(args) != 2 {
+	if len(args) != 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	deviceId := &voltha.ID{}
 	pt := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -192,9 +312,14 @@
 				log.Warnw("cannot-unmarshal-porttype", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val})
+	log.Debugw("GetPorts", log.Fields{"deviceID": deviceId.Id, "portype": pt.Val, "transactionID": transactionID.Val})
 	if rhp.TestMode { // Execute only for test cases
 		aPort := &voltha.Port{Label: "test_port"}
 		allPorts := &voltha.Ports{}
@@ -204,31 +329,54 @@
 	return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
 }
 
-func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Device, error) {
-	if len(args) != 1 {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Devices, error) {
+	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
+
 	pID := &voltha.ID{}
-	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
-		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device_id":
+			if err := ptypes.UnmarshalAny(arg.Value, pID); err != nil {
+				log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
-	log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Id})
+	log.Debugw("GetChildDevices", log.Fields{"deviceId": pID.Id, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
-		return &voltha.Device{Id: pID.Id}, nil
+		return &voltha.Devices{Items:nil}, nil
 	}
-	//TODO: Complete
-	return nil, nil
+
+	return rhp.deviceMgr.getAllChildDevices(pID.Id)
 }
 
 // ChildDeviceDetected is invoked when a child device is detected.  The following
 // parameters are expected:
 // {parent_device_id, parent_port_no, child_device_type, proxy_address, admin_state, **kw)
 func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 4 {
+	if len(args) < 5 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -238,6 +386,7 @@
 	portNo := &ic.IntType{}
 	dt := &ic.StrType{}
 	chnlId := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "parent_device_id":
@@ -260,10 +409,26 @@
 				log.Warnw("cannot-unmarshal-channel-id", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 	log.Debugw("ChildDeviceDetected", log.Fields{"parentDeviceId": pID.Id, "parentPortNo": portNo.Val,
-		"deviceType": dt.Val, "channelId": chnlId.Val})
+		"deviceType": dt.Val, "channelId": chnlId.Val, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
@@ -275,7 +440,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 2 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -283,6 +448,7 @@
 	deviceId := &voltha.ID{}
 	operStatus := &ic.IntType{}
 	connStatus := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -300,9 +466,27 @@
 				log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
+	log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus,
+		"conn-status": connStatus, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
+
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
@@ -312,7 +496,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 2 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -320,6 +504,7 @@
 	deviceId := &voltha.ID{}
 	operStatus := &ic.IntType{}
 	connStatus := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -337,9 +522,27 @@
 				log.Warnw("cannot-unmarshal-connStatus", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("ChildrenStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus, "conn-status": connStatus})
+	log.Debugw("ChildrenStateUpdate", log.Fields{"deviceId": deviceId.Id, "oper-status": operStatus,
+		"conn-status": connStatus, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
+
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
@@ -350,7 +553,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 2 {
+	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -359,6 +562,7 @@
 	portType := &ic.IntType{}
 	portNo := &ic.IntType{}
 	operStatus := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -381,10 +585,27 @@
 				log.Warnw("cannot-unmarshal-portno", log.Fields{"error": err})
 				return nil, err
 			}
-
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus, "portType": portType, "portNo": portNo})
+	log.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId.Id, "operStatus": operStatus,
+		"portType": portType, "portNo": portNo, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
+
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
@@ -393,13 +614,14 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 2 {
+	if len(args) != 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	deviceId := &voltha.ID{}
 	port := &voltha.Port{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -412,9 +634,25 @@
 				log.Warnw("cannot-unmarshal-port", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("PortCreated", log.Fields{"deviceId": deviceId.Id, "port": port})
+	log.Debugw("PortCreated", log.Fields{"deviceId": deviceId.Id, "port": port, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
@@ -426,13 +664,14 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 2 {
+	if len(args) != 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	pmConfigs := &voltha.PmConfigs{}
 	init := &ic.BoolType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_pm_config":
@@ -445,10 +684,26 @@
 				log.Warnw("cannot-unmarshal-boolean", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 	log.Debugw("DevicePMConfigUpdate", log.Fields{"deviceId": pmConfigs.Id, "configs": pmConfigs,
-		"init": init})
+		"init": init,  "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
@@ -461,7 +716,7 @@
 }
 
 func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) < 3 {
+	if len(args) < 4 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
@@ -469,6 +724,7 @@
 	deviceId := &voltha.ID{}
 	portNo := &ic.IntType{}
 	packet := &ic.Packet{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -486,14 +742,23 @@
 				log.Warnw("cannot-unmarshal-packet", log.Fields{"error": err})
 				return nil, err
 			}
-
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet})
+	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet,
+		"transactionID": transactionID.Val})
+
+	// For performance reason, we do not compete for packet-in.  We process it and send the packet in.  later in the
+	// processing flow the duplicate packet will be discarded
+
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
-	go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), packet.Payload)
+	go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
 	return new(empty.Empty), nil
 }
 
@@ -506,6 +771,7 @@
 	}
 	deviceId := &voltha.ID{}
 	img := &voltha.ImageDownload{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -518,9 +784,27 @@
 				log.Warnw("cannot-unmarshal-imgaeDownload", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
-	log.Debugw("UpdateImageDownload", log.Fields{"deviceId": deviceId.Id, "image-download": img})
+	log.Debugw("UpdateImageDownload", log.Fields{"deviceId": deviceId.Id, "image-download": img,
+		"transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
+
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}