[VOL-1512] Set device ownership

This commit consists of the following:
1) Set device ownership per Core in a Core-pair such that only 1
Core actively process a device (i.e. handles all the requests for
that device) while the other Core in the pair passively watch for
updates on that device and will take over in case the owner Core
failed to process the transaction.
2) Cleanup the lock mechanisms to ensure we use a read lock when
needed instead of just a lock.
3) Update logical port additions such that ports are added only when
the device is enabled.
4) Update the port Ids for the logical ports.
5) Update some sarama client configs for performance - this is an
ongoing tune up.
6) Update the adapter request handler in the Core to send back an
ACK immediately to the adapter request instead of processing the
request fully and then sending an ACK.  This reduces the latency
over kafka and therefore reduces the likelihood of timeouts.

Change-Id: I9149bf3ba6fbad38e3a29c76ea8dba2f9f731d29
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index 0be4e12..ec3f0db 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -28,15 +28,14 @@
 )
 
 const (
-	SENTINEL_ADAPTER_ID = "adapter_sentinel"
+	SENTINEL_ADAPTER_ID    = "adapter_sentinel"
 	SENTINEL_DEVICETYPE_ID = "device_type_sentinel"
-
 )
 
 type AdapterAgent struct {
-	adapter *voltha.Adapter
+	adapter     *voltha.Adapter
 	deviceTypes map[string]*voltha.DeviceType
-	lock sync.RWMutex
+	lock        sync.RWMutex
 }
 
 func newAdapterAgent(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) *AdapterAgent {
@@ -77,14 +76,14 @@
 }
 
 func (aa *AdapterAgent) updateAdapter(adapter *voltha.Adapter) {
-	aa.lock.RLock()
-	defer aa.lock.RUnlock()
+	aa.lock.Lock()
+	defer aa.lock.Unlock()
 	aa.adapter = adapter
 }
 
-func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType)  {
-	aa.lock.RLock()
-	defer aa.lock.RUnlock()
+func (aa *AdapterAgent) updateDeviceType(deviceType *voltha.DeviceType) {
+	aa.lock.Lock()
+	defer aa.lock.Unlock()
 	aa.deviceTypes[deviceType.Id] = deviceType
 }
 
@@ -112,7 +111,7 @@
 	return &adapterMgr
 }
 
-func (aMgr *AdapterManager) start(ctx context.Context)  {
+func (aMgr *AdapterManager) start(ctx context.Context) {
 	log.Info("starting-adapter-manager")
 
 	// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
@@ -154,7 +153,7 @@
 
 	// Load the device types
 	if deviceTypesIf := aMgr.clusterDataProxy.List("/device_types", 0, false, ""); deviceTypesIf != nil {
-		dTypes := &voltha.DeviceTypes{Items:[]*voltha.DeviceType{}}
+		dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
 		for _, deviceTypeIf := range deviceTypesIf.([]interface{}) {
 			if dType, ok := deviceTypeIf.(*voltha.DeviceType); ok {
 				log.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
@@ -165,11 +164,10 @@
 	} else {
 		log.Debug("no-existing-device-type-found")
 		//	No device types data.   In order to have a proxy setup for that path let's create a fake device type
-		aMgr.addDeviceTypes(&voltha.DeviceTypes{Items:[]*voltha.DeviceType{&voltha.DeviceType{Id:SENTINEL_DEVICETYPE_ID, Adapter:SENTINEL_ADAPTER_ID}}}, true)
+		aMgr.addDeviceTypes(&voltha.DeviceTypes{Items: []*voltha.DeviceType{&voltha.DeviceType{Id: SENTINEL_DEVICETYPE_ID, Adapter: SENTINEL_ADAPTER_ID}}}, true)
 	}
 }
 
-
 //updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
 func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory() {
 	// Update the adapters
@@ -193,7 +191,6 @@
 	}
 }
 
-
 func (aMgr *AdapterManager) addAdapter(adapter *voltha.Adapter, saveToDb bool) {
 	aMgr.lockAdaptersMap.Lock()
 	defer aMgr.lockAdaptersMap.Unlock()
@@ -215,7 +212,6 @@
 	}
 }
 
-
 func (aMgr *AdapterManager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) {
 	if deviceTypes == nil {
 		return
@@ -231,7 +227,7 @@
 			adapterAgent.updateDeviceType(clonedDType)
 		} else {
 			log.Debugw("adapter-not-exist", log.Fields{"deviceTypes": deviceTypes, "adapterId": clonedDType.Adapter})
-			aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id:clonedDType.Adapter}, deviceTypes)
+			aMgr.adapterAgents[clonedDType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: clonedDType.Adapter}, deviceTypes)
 		}
 		aMgr.deviceTypeToAdapterMap[clonedDType.Id] = clonedDType.Adapter
 	}
@@ -252,9 +248,9 @@
 }
 
 func (aMgr *AdapterManager) listAdapters(ctx context.Context) (*voltha.Adapters, error) {
-	result := &voltha.Adapters{Items:[]*voltha.Adapter{}}
-	aMgr.lockAdaptersMap.Lock()
-	defer aMgr.lockAdaptersMap.Unlock()
+	result := &voltha.Adapters{Items: []*voltha.Adapter{}}
+	aMgr.lockAdaptersMap.RLock()
+	defer aMgr.lockAdaptersMap.RUnlock()
 	for _, adapterAgent := range aMgr.adapterAgents {
 		if a := adapterAgent.getAdapter(); a != nil {
 			if a.Id != SENTINEL_ADAPTER_ID { // don't report the sentinel
@@ -272,8 +268,8 @@
 }
 
 func (aMgr *AdapterManager) getAdapter(adapterId string) *voltha.Adapter {
-	aMgr.lockAdaptersMap.Lock()
-	defer aMgr.lockAdaptersMap.Unlock()
+	aMgr.lockAdaptersMap.RLock()
+	defer aMgr.lockAdaptersMap.RUnlock()
 	if adapterAgent, ok := aMgr.adapterAgents[adapterId]; ok {
 		return adapterAgent.getAdapter()
 	}
@@ -281,7 +277,7 @@
 }
 
 //updateAdapter updates an adapter if it exist.  Otherwise, it creates it.
-func (aMgr *AdapterManager) updateAdapter(adapter *voltha.Adapter)  {
+func (aMgr *AdapterManager) updateAdapter(adapter *voltha.Adapter) {
 	aMgr.lockAdaptersMap.Lock()
 	defer aMgr.lockAdaptersMap.Unlock()
 	if adapterAgent, ok := aMgr.adapterAgents[adapter.Id]; ok {
@@ -292,7 +288,7 @@
 }
 
 //updateDeviceType updates an adapter if it exist.  Otherwise, it creates it.
-func (aMgr *AdapterManager) updateDeviceType(deviceType *voltha.DeviceType)  {
+func (aMgr *AdapterManager) updateDeviceType(deviceType *voltha.DeviceType) {
 	aMgr.lockAdaptersMap.Lock()
 	defer aMgr.lockAdaptersMap.Unlock()
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
@@ -301,17 +297,17 @@
 		adapterAgent.updateDeviceType(deviceType)
 	} else {
 		aMgr.adapterAgents[deviceType.Adapter] = newAdapterAgent(&voltha.Adapter{Id: deviceType.Adapter},
-														&voltha.DeviceTypes{Items:[]*voltha.DeviceType{deviceType}})
+			&voltha.DeviceTypes{Items: []*voltha.DeviceType{deviceType}})
 	}
 	aMgr.deviceTypeToAdapterMap[deviceType.Id] = deviceType.Adapter
 }
 
-func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes  *voltha.DeviceTypes) *voltha.CoreInstance {
+func (aMgr *AdapterManager) registerAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) *voltha.CoreInstance {
 	log.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
 
 	if aMgr.getAdapter(adapter.Id) != nil {
 		//	Already registered
-		return &voltha.CoreInstance{InstanceId:aMgr.coreInstanceId}
+		return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceId}
 	}
 	// Save the adapter and the device types
 	aMgr.addAdapter(adapter, true)
@@ -319,7 +315,7 @@
 
 	log.Debugw("adapter-registered", log.Fields{"adapter": adapter.Id})
 
-	return &voltha.CoreInstance{InstanceId:aMgr.coreInstanceId}
+	return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceId}
 }
 
 //getAdapterName returns the name of the device adapter that service this device type
@@ -333,7 +329,7 @@
 }
 
 // getDeviceType returns the device type proto definition given the name of the device type
-func (aMgr *AdapterManager) getDeviceType(deviceType string)  *voltha.DeviceType {
+func (aMgr *AdapterManager) getDeviceType(deviceType string) *voltha.DeviceType {
 	aMgr.lockdDeviceTypeToAdapterMap.Lock()
 	defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
 	if adapterId, exist := aMgr.deviceTypeToAdapterMap[deviceType]; exist {
@@ -408,4 +404,4 @@
 		}
 	}
 	return nil
-}
\ No newline at end of file
+}