[VOL-1890] Flow decomposition fails after a core switch over

Cherry Pick into the master branch from voltha-2.1

Change-Id: I84c3a83b5b9115d6ec334af29634486ba7148634
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f417b05..f5b017c 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,19 +33,21 @@
 )
 
 type DeviceManager struct {
-	deviceAgents      sync.Map
-	rootDevices       map[string]bool
-	lockRootDeviceMap sync.RWMutex
-	core              *Core
-	adapterProxy      *AdapterProxy
-	adapterMgr        *AdapterManager
-	logicalDeviceMgr  *LogicalDeviceManager
-	kafkaICProxy      *kafka.InterContainerProxy
-	stateTransitions  *TransitionMap
-	clusterDataProxy  *model.Proxy
-	coreInstanceId    string
-	exitChannel       chan int
-	defaultTimeout    int64
+	deviceAgents            sync.Map
+	rootDevices             map[string]bool
+	lockRootDeviceMap       sync.RWMutex
+	core                    *Core
+	adapterProxy            *AdapterProxy
+	adapterMgr              *AdapterManager
+	logicalDeviceMgr        *LogicalDeviceManager
+	kafkaICProxy            *kafka.InterContainerProxy
+	stateTransitions        *TransitionMap
+	clusterDataProxy        *model.Proxy
+	coreInstanceId          string
+	exitChannel             chan int
+	defaultTimeout          int64
+	devicesLoadingLock      sync.RWMutex
+	deviceLoadingInProgress map[string][]chan int
 }
 
 func newDeviceManager(core *Core) *DeviceManager {
@@ -60,6 +62,8 @@
 	deviceMgr.adapterMgr = core.adapterMgr
 	deviceMgr.lockRootDeviceMap = sync.RWMutex{}
 	deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
+	deviceMgr.devicesLoadingLock = sync.RWMutex{}
+	deviceMgr.deviceLoadingInProgress = make(map[string][]chan int)
 	return &deviceMgr
 }
 
@@ -98,7 +102,7 @@
 
 }
 
-func (dMgr *DeviceManager) deleteDeviceAgentToMap(agent *DeviceAgent) {
+func (dMgr *DeviceManager) deleteDeviceAgentFromMap(agent *DeviceAgent) {
 	dMgr.deviceAgents.Delete(agent.deviceId)
 	dMgr.lockRootDeviceMap.Lock()
 	defer dMgr.lockRootDeviceMap.Unlock()
@@ -220,7 +224,7 @@
 		}
 		if agent := dMgr.getDeviceAgent(id); agent != nil {
 			agent.stop(nil)
-			dMgr.deleteDeviceAgentToMap(agent)
+			dMgr.deleteDeviceAgentFromMap(agent)
 			// Abandon the device ownership
 			dMgr.core.deviceOwnership.AbandonDevice(id)
 		}
@@ -390,28 +394,52 @@
 
 // loadDevice loads the deviceId in memory, if not present
 func (dMgr *DeviceManager) loadDevice(deviceId string) (*DeviceAgent, error) {
-	log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
-	// Sanity check
 	if deviceId == "" {
 		return nil, status.Error(codes.InvalidArgument, "deviceId empty")
 	}
-	if !dMgr.IsDeviceInCache(deviceId) {
-		// Proceed with the loading only if the device exist in the Model (could have been deleted)
-		if device, err := dMgr.getDeviceFromModel(deviceId); err == nil {
-			agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
-			if err := agent.start(nil, true); err != nil {
-				agent.stop(nil)
-				return nil, err
+	var err error
+	var device *voltha.Device
+	dMgr.devicesLoadingLock.Lock()
+	if _, exist := dMgr.deviceLoadingInProgress[deviceId]; !exist {
+		if !dMgr.IsDeviceInCache(deviceId) {
+			dMgr.deviceLoadingInProgress[deviceId] = []chan int{make(chan int, 1)}
+			dMgr.devicesLoadingLock.Unlock()
+			// Proceed with the loading only if the device exist in the Model (could have been deleted)
+			if device, err = dMgr.getDeviceFromModel(deviceId); err == nil {
+				log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
+				agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+				if err = agent.start(nil, true); err != nil {
+					log.Warnw("Failure loading device", log.Fields{"deviceId": deviceId, "error": err})
+					agent.stop(nil)
+				} else {
+					dMgr.addDeviceAgentToMap(agent)
+				}
+			} else {
+				log.Debugw("Device not in model", log.Fields{"deviceId": deviceId})
 			}
-			dMgr.addDeviceAgentToMap(agent)
+			// announce completion of task to any number of waiting channels
+			dMgr.devicesLoadingLock.Lock()
+			if v, ok := dMgr.deviceLoadingInProgress[deviceId]; ok {
+				for _, ch := range v {
+					close(ch)
+				}
+				delete(dMgr.deviceLoadingInProgress, deviceId)
+			}
+			dMgr.devicesLoadingLock.Unlock()
 		} else {
-			return nil, status.Error(codes.NotFound, deviceId)
+			dMgr.devicesLoadingLock.Unlock()
 		}
+	} else {
+		ch := make(chan int, 1)
+		dMgr.deviceLoadingInProgress[deviceId] = append(dMgr.deviceLoadingInProgress[deviceId], ch)
+		dMgr.devicesLoadingLock.Unlock()
+		//	Wait for the channel to be closed, implying the process loading this device is done.
+		<-ch
 	}
-	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
-		return agent, nil
+	if agent, ok := dMgr.deviceAgents.Load(deviceId); ok {
+		return agent.(*DeviceAgent), nil
 	}
-	return nil, status.Error(codes.NotFound, deviceId) // This should not happen
+	return nil, status.Errorf(codes.Aborted, "Error loading device %s", deviceId)
 }
 
 // loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
@@ -431,7 +459,7 @@
 		if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
 			for _, childDeviceId := range childDeviceIds {
 				if _, err := dMgr.loadDevice(childDeviceId); err != nil {
-					log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId})
+					log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId, "error": err})
 					return err
 				}
 			}
@@ -490,35 +518,24 @@
 	return dMgr.listDeviceIdsFromMap(), nil
 }
 
-//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+//ReconcileDevices is a request to a voltha core to update its list of managed devices.  This will
+//trigger loading the devices along with their children and parent in memory
 func (dMgr *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
-	log.Debug("ReconcileDevices")
+	log.Debugw("ReconcileDevices", log.Fields{"numDevices": len(ids.Items)})
 	var res interface{}
-	if ids != nil {
+	if ids != nil && len(ids.Items) != 0 {
 		toReconcile := len(ids.Items)
 		reconciled := 0
+		var err error
 		for _, id := range ids.Items {
-			//	 Act on the device only if its not present in the agent map
-			if !dMgr.IsDeviceInCache(id.Id) {
-				//	Device Id not in memory
-				log.Debugw("reconciling-device", log.Fields{"id": id.Id})
-				// Proceed with the loading only if the device exist in the Model (could have been deleted)
-				if device, err := dMgr.getDeviceFromModel(id.Id); err == nil {
-					agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
-					if err := agent.start(nil, true); err != nil {
-						log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
-						agent.stop(nil)
-					} else {
-						dMgr.addDeviceAgentToMap(agent)
-						reconciled += 1
-					}
-				} else {
-					reconciled += 1
-				}
+			if err = dMgr.load(id.Id); err != nil {
+				log.Warnw("failure-reconciling-device", log.Fields{"deviceId": id.Id, "error": err})
+			} else {
+				reconciled += 1
 			}
 		}
 		if toReconcile != reconciled {
-			res = status.Errorf(codes.DataLoss, "less-device-reconciled:%d/%d", reconciled, toReconcile)
+			res = status.Errorf(codes.DataLoss, "less-device-reconciled-than-requested:%d/%d", reconciled, toReconcile)
 		}
 	} else {
 		res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
@@ -1137,7 +1154,7 @@
 				allChildDeleted = false
 			} else {
 				agent.stop(nil)
-				dMgr.deleteDeviceAgentToMap(agent)
+				dMgr.deleteDeviceAgentFromMap(agent)
 			}
 		}
 	}
@@ -1369,12 +1386,12 @@
 	}
 }
 
-func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) *string {
+func (dMgr *DeviceManager) GetParentDeviceId(deviceId string) string {
 	if device, _ := dMgr.GetDevice(deviceId); device != nil {
 		log.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
-		return &device.ParentId
+		return device.ParentId
 	}
-	return nil
+	return ""
 }
 
 func (dMgr *DeviceManager) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {