[VOL-1512] Set device ownership

This commit consists of the following:
1) Set device ownership per Core in a Core-pair such that only 1
Core actively process a device (i.e. handles all the requests for
that device) while the other Core in the pair passively watch for
updates on that device and will take over in case the owner Core
failed to process the transaction.
2) Cleanup the lock mechanisms to ensure we use a read lock when
needed instead of just a lock.
3) Update logical port additions such that ports are added only when
the device is enabled.
4) Update the port Ids for the logical ports.
5) Update some sarama client configs for performance - this is an
ongoing tune up.
6) Update the adapter request handler in the Core to send back an
ACK immediately to the adapter request instead of processing the
request fully and then sending an ACK.  This reduces the latency
over kafka and therefore reduces the likelihood of timeouts.

Change-Id: I9149bf3ba6fbad38e3a29c76ea8dba2f9f731d29
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 3d9487f..f450ca2 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -25,6 +25,7 @@
 	"github.com/opencord/voltha-go/kafka"
 	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/opencord/voltha-go/rw_core/utils"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -65,11 +66,11 @@
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
 	}
-	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
 	txn := NewKVTransaction(transactionId)
 	if txn == nil {
 		return nil, errors.New("fail-to-create-transaction")
 	} else if txn.Acquired(timeout) {
+		log.Debugw("acquired-request", log.Fields{"xtrnsId": transactionId})
 		return txn, nil
 	} else {
 		return nil, errors.New("failed-to-seize-request")
@@ -82,20 +83,23 @@
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
 	}
-	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
 	txn := NewKVTransaction(transactionId)
 	if txn == nil {
 		return nil, errors.New("fail-to-create-transaction")
 	}
 
-	if rhp.core.deviceOwnership.OwnedByMe(devId) {
+	if rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id:devId}) {
+		log.Debugw("owned-by-me", log.Fields{"Id": devId})
 		if txn.Acquired(timeout) {
+			log.Debugw("processing-request", log.Fields{"Id": devId})
 			return txn, nil
 		} else {
 			return nil, errors.New("failed-to-seize-request")
 		}
 	} else {
+		log.Debugw("not-owned-by-me", log.Fields{"Id": devId})
 		if txn.Monitor(timeout) {
+			log.Debugw("timeout-processing-request", log.Fields{"Id": devId})
 			return txn, nil
 		} else {
 			return nil, errors.New("device-not-owned")
@@ -270,9 +274,10 @@
 	if updatedDevice, err := rhp.mergeDeviceInfoFromAdapter(device); err != nil {
 		return nil, status.Errorf(codes.Internal, "%s", err.Error())
 	} else {
-		if err := rhp.deviceMgr.updateDevice(updatedDevice); err != nil {
-			return nil, err
-		}
+		go rhp.deviceMgr.updateDevice(updatedDevice)
+		//if err := rhp.deviceMgr.updateDevice(updatedDevice); err != nil {
+		//	return nil, err
+		//}
 	}
 
 	return new(empty.Empty), nil
@@ -365,9 +370,8 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, proxyAddress.DeviceId); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
-			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
 		} else {
 			defer txn.Close()
@@ -599,10 +603,13 @@
 		return nil, nil
 	}
 	// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
-	if err := rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
-		voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
-		return nil, err
-	}
+	go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
+		voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+
+	//if err := rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
+	//	voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
+	//	return nil, err
+	//}
 	return new(empty.Empty), nil
 }
 
@@ -659,10 +666,13 @@
 	}
 
 	// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
-	if err := rhp.deviceMgr.updateChildrenStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
-		voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
-		return nil, err
-	}
+	go rhp.deviceMgr.updateChildrenStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
+		voltha.ConnectStatus_ConnectStatus(connStatus.Val))
+
+	//if err := rhp.deviceMgr.updateChildrenStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
+	//	voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
+	//	return nil, err
+	//}
 	return new(empty.Empty), nil
 }
 
@@ -723,10 +733,14 @@
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
-	if err := rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
-		voltha.OperStatus_OperStatus(operStatus.Val)); err != nil {
-		return nil, err
-	}
+
+	go rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+		voltha.OperStatus_OperStatus(operStatus.Val))
+
+	//if err := rhp.deviceMgr.updatePortState(deviceId.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+	//	voltha.OperStatus_OperStatus(operStatus.Val)); err != nil {
+	//	return nil, err
+	//}
 	return new(empty.Empty), nil
 }
 
@@ -774,9 +788,10 @@
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
-	if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
-		return nil, err
-	}
+	go rhp.deviceMgr.addPort(deviceId.Id, port)
+	//if err := rhp.deviceMgr.addPort(deviceId.Id, port); err != nil {
+	//	return nil, err
+	//}
 
 	return new(empty.Empty), nil
 }
@@ -827,9 +842,10 @@
 		return nil, nil
 	}
 
-	if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
-		return nil, err
-	}
+	go rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs)
+	//if err := rhp.deviceMgr.updatePmConfigs(pmConfigs.Id, pmConfigs); err != nil {
+	//	return nil, err
+	//}
 
 	return new(empty.Empty), nil
 }
@@ -877,9 +893,10 @@
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
-	if err := rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
-		return nil, err
-	}
+	go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
+	//if err := rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
+	//	return nil, err
+	//}
 	return new(empty.Empty), nil
 }
 
@@ -928,8 +945,9 @@
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
-	if err := rhp.deviceMgr.updateImageDownload(deviceId.Id, img); err != nil {
-		return nil, err
-	}
+	go rhp.deviceMgr.updateImageDownload(deviceId.Id, img)
+	//if err := rhp.deviceMgr.updateImageDownload(deviceId.Id, img); err != nil {
+	//	return nil, err
+	//}
 	return new(empty.Empty), nil
 }