[VOL-1512] Set device ownership

This commit consists of the following:
1) Set device ownership per Core in a Core-pair such that only 1
Core actively process a device (i.e. handles all the requests for
that device) while the other Core in the pair passively watch for
updates on that device and will take over in case the owner Core
failed to process the transaction.
2) Cleanup the lock mechanisms to ensure we use a read lock when
needed instead of just a lock.
3) Update logical port additions such that ports are added only when
the device is enabled.
4) Update the port Ids for the logical ports.
5) Update some sarama client configs for performance - this is an
ongoing tune up.
6) Update the adapter request handler in the Core to send back an
ACK immediately to the adapter request instead of processing the
request fully and then sending an ACK.  This reduces the latency
over kafka and therefore reduces the likelihood of timeouts.

Change-Id: I9149bf3ba6fbad38e3a29c76ea8dba2f9f731d29
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index f229383..92262ab 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -20,12 +20,18 @@
 	"fmt"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/opencord/voltha-go/rw_core/utils"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"sync"
 	"time"
 )
 
+func init() {
+	log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
 type ownership struct {
 	id    string
 	owned bool
@@ -38,19 +44,27 @@
 	kvClient           kvstore.Client
 	reservationTimeout int64 // Duration in seconds
 	ownershipPrefix    string
+	deviceMgr          *DeviceManager
+	logicalDeviceMgr   *LogicalDeviceManager
 	deviceMap          map[string]*ownership
 	deviceMapLock      *sync.RWMutex
+	deviceToKeyMap     map[string]string
+	deviceToKeyMapLock *sync.RWMutex
 }
 
-func NewDeviceOwnership(id string, kvClient kvstore.Client, ownershipPrefix string, reservationTimeout int64) *DeviceOwnership {
+func NewDeviceOwnership(id string, kvClient kvstore.Client, deviceMgr *DeviceManager, logicalDeviceMgr *LogicalDeviceManager, ownershipPrefix string, reservationTimeout int64) *DeviceOwnership {
 	var deviceOwnership DeviceOwnership
 	deviceOwnership.instanceId = id
 	deviceOwnership.exitChannel = make(chan int, 1)
 	deviceOwnership.kvClient = kvClient
+	deviceOwnership.deviceMgr = deviceMgr
+	deviceOwnership.logicalDeviceMgr = logicalDeviceMgr
 	deviceOwnership.ownershipPrefix = ownershipPrefix
 	deviceOwnership.reservationTimeout = reservationTimeout
 	deviceOwnership.deviceMap = make(map[string]*ownership)
 	deviceOwnership.deviceMapLock = &sync.RWMutex{}
+	deviceOwnership.deviceToKeyMap = make(map[string]string)
+	deviceOwnership.deviceToKeyMapLock = &sync.RWMutex{}
 	return &deviceOwnership
 }
 
@@ -63,14 +77,18 @@
 	log.Info("stopping-deviceOwnership")
 	da.exitChannel <- 1
 	// Need to flush all device reservations
+	da.abandonAllDevices()
 	log.Info("deviceOwnership-stopped")
 }
 
 func (da *DeviceOwnership) tryToReserveKey(id string) bool {
 	var currOwner string
-	// Try to reserve the key
+	//Try to reserve the key
 	kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
 	value, err := da.kvClient.Reserve(kvKey, da.instanceId, da.reservationTimeout)
+	if err != nil {
+		log.Errorw("error", log.Fields{"error": err, "id": id, "instanceId": da.instanceId})
+	}
 	if value != nil {
 		if currOwner, err = kvstore.ToString(value); err != nil {
 			log.Error("unexpected-owner-type")
@@ -80,55 +98,61 @@
 	return false
 }
 
-func (da *DeviceOwnership) startOwnershipMonitoring(id string, chnl chan int) {
-	var op string
+func (da *DeviceOwnership) renewReservation(id string) bool {
+	// Try to reserve the key
+	kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
+	if err := da.kvClient.RenewReservation(kvKey); err != nil {
+		log.Errorw("reservation-renewal-error", log.Fields{"error": err, "instance": da.instanceId})
+		return false
+	}
+	return true
+}
 
-startloop:
+func (da *DeviceOwnership) MonitorOwnership(id string, chnl chan int) {
+	op := "starting"
+	exit := false
+	ticker := time.NewTicker(time.Duration(da.reservationTimeout) / 3 * time.Second)
 	for {
-		da.deviceMapLock.RLock()
-		val, exist := da.deviceMap[id]
-		da.deviceMapLock.RUnlock()
-		if exist && val.owned {
+		select {
+		case <-da.exitChannel:
+			log.Infow("closing-monitoring", log.Fields{"Id": id})
+			exit = true
+		case <-ticker.C:
+			log.Debugw(fmt.Sprintf("%s-reservation", op), log.Fields{"Id": id})
+		case <-chnl:
+			log.Infow("closing-device-monitoring", log.Fields{"Id": id})
+			exit = true
+		}
+		if exit {
+			ticker.Stop()
+			break
+		}
+		deviceOwned, ownedByMe := da.getOwnership(id)
+		if deviceOwned && ownedByMe {
 			// Device owned; renew reservation
 			op = "renew"
-			kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
-			if err := da.kvClient.RenewReservation(kvKey); err != nil {
-				log.Errorw("reservation-renewal-error", log.Fields{"error": err})
+			if da.renewReservation(id) {
+				log.Debugw("reservation-renewed", log.Fields{"id": id, "instanceId": da.instanceId})
+			} else {
+				log.Debugw("reservation-not-renewed", log.Fields{"id": id, "instanceId": da.instanceId})
 			}
 		} else {
-			// Device not owned; try to seize ownership
+			// Device not owned or not owned by me; try to seize ownership
 			op = "retry"
 			if err := da.setOwnership(id, da.tryToReserveKey(id)); err != nil {
 				log.Errorw("unexpected-error", log.Fields{"error": err})
 			}
 		}
-		select {
-		case <-da.exitChannel:
-			log.Infow("closing-monitoring", log.Fields{"Id": id})
-			break startloop
-		case <-time.After(time.Duration(da.reservationTimeout) / 3 * time.Second):
-			msg := fmt.Sprintf("%s-reservation", op)
-			log.Infow(msg, log.Fields{"Id": id})
-		case <-chnl:
-			log.Infow("closing-device-monitoring", log.Fields{"Id": id})
-			break startloop
-		}
 	}
 }
 
-func (da *DeviceOwnership) getOwnership(id string) bool {
+func (da *DeviceOwnership) getOwnership(id string) (bool, bool) {
 	da.deviceMapLock.RLock()
 	defer da.deviceMapLock.RUnlock()
 	if val, exist := da.deviceMap[id]; exist {
-		return val.owned
+		return true, val.owned
 	}
-	log.Debugw("setting-up-new-ownership", log.Fields{"Id": id})
-	// Not owned by me or maybe anybody else.  Try to reserve it
-	reservedByMe := da.tryToReserveKey(id)
-	myChnl := make(chan int)
-	da.deviceMap[id] = &ownership{id: id, owned: reservedByMe, chnl: myChnl}
-	go da.startOwnershipMonitoring(id, myChnl)
-	return reservedByMe
+	return false, false
 }
 
 func (da *DeviceOwnership) setOwnership(id string, owner bool) error {
@@ -146,8 +170,26 @@
 
 // OwnedByMe returns where this Core instance active owns this device.   This function will automatically
 // trigger the process to monitor the device and update the device ownership regularly.
-func (da *DeviceOwnership) OwnedByMe(id string) bool {
-	return da.getOwnership(id)
+func (da *DeviceOwnership) OwnedByMe(id interface{}) bool {
+	// Retrieve the ownership key based on the id
+	var ownershipKey string
+	var err error
+	if ownershipKey, err = da.getOwnershipKey(id); err != nil {
+		log.Warnw("no-ownershipkey", log.Fields{"error": err})
+		return false
+	}
+
+	deviceOwned, ownedByMe := da.getOwnership(ownershipKey)
+	if deviceOwned {
+		return ownedByMe
+	}
+	// Not owned by me or maybe anybody else.  Try to reserve it
+	reservedByMe := da.tryToReserveKey(ownershipKey)
+	myChnl := make(chan int)
+	da.deviceMap[ownershipKey] = &ownership{id: ownershipKey, owned: reservedByMe, chnl: myChnl}
+	log.Debugw("set-new-ownership", log.Fields{"Id": ownershipKey, "owned": reservedByMe})
+	go da.MonitorOwnership(ownershipKey, myChnl)
+	return reservedByMe
 }
 
 //AbandonDevice must be invoked whenever a device is deleted from the Core
@@ -163,3 +205,73 @@
 	}
 	return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
 }
+
+//abandonAllDevices must be invoked whenever a device is deleted from the Core
+func (da *DeviceOwnership) abandonAllDevices() {
+	da.deviceMapLock.Lock()
+	defer da.deviceMapLock.Unlock()
+	for _, val := range da.deviceMap {
+		close(val.chnl)
+	}
+}
+
+func (da *DeviceOwnership) getDeviceKey(id string) (string, error) {
+	da.deviceToKeyMapLock.RLock()
+	defer da.deviceToKeyMapLock.RUnlock()
+	if val, exist := da.deviceToKeyMap[id]; exist {
+		return val, nil
+	}
+	return "", status.Error(codes.NotFound, fmt.Sprintf("not-present-%s", id))
+}
+
+func (da *DeviceOwnership) updateDeviceKey(id string, key string) error {
+	da.deviceToKeyMapLock.Lock()
+	defer da.deviceToKeyMapLock.Unlock()
+	if _, exist := da.deviceToKeyMap[id]; exist {
+		return status.Error(codes.AlreadyExists, fmt.Sprintf("already-present-%s", id))
+	}
+	da.deviceToKeyMap[id] = key
+	return nil
+}
+
+func (da *DeviceOwnership) getOwnershipKey(id interface{}) (string, error) {
+	if id == nil {
+		return "", status.Error(codes.InvalidArgument, "nil-id")
+	}
+	var device *voltha.Device
+	var lDevice *voltha.LogicalDevice
+	// The id can either be a device Id or a logical device id.
+	if dId, ok := id.(*utils.DeviceID); ok {
+		// Use cache if present
+		if val, err := da.getDeviceKey(dId.Id); err == nil {
+			return val, nil
+		}
+		if device, _ = da.deviceMgr.GetDevice(dId.Id); device == nil {
+			return "", status.Error(codes.NotFound, fmt.Sprintf("id-absent-%s", dId))
+		}
+		if device.Root {
+			if err := da.updateDeviceKey(dId.Id, device.Id); err != nil {
+				log.Warnw("Error-updating-cache", log.Fields{"id": dId.Id, "key": device.Id, "error": err})
+			}
+			return device.Id, nil
+		} else {
+			if err := da.updateDeviceKey(dId.Id, device.ParentId); err != nil {
+				log.Warnw("Error-updating-cache", log.Fields{"id": dId.Id, "key": device.ParentId, "error": err})
+			}
+			return device.ParentId, nil
+		}
+	} else if ldId, ok := id.(*utils.LogicalDeviceID); ok {
+		// Use cache if present
+		if val, err := da.getDeviceKey(ldId.Id); err == nil {
+			return val, nil
+		}
+		if lDevice, _ = da.logicalDeviceMgr.getLogicalDevice(ldId.Id); lDevice == nil {
+			return "", status.Error(codes.NotFound, fmt.Sprintf("id-absent-%s", ldId))
+		}
+		if err := da.updateDeviceKey(ldId.Id, lDevice.RootDeviceId); err != nil {
+			log.Warnw("Error-updating-cache", log.Fields{"id": ldId.Id, "key": lDevice.RootDeviceId, "error": err})
+		}
+		return lDevice.RootDeviceId, nil
+	}
+	return "", status.Error(codes.NotFound, fmt.Sprintf("id-%s", id))
+}