[VOL-1890] Flow decomposition fails after a core switch over

Cherry Pick into the master branch from voltha-2.1

Change-Id: I84c3a83b5b9115d6ec334af29634486ba7148634
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 71843ff..235aca5 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -30,15 +30,17 @@
 )
 
 type LogicalDeviceManager struct {
-	logicalDeviceAgents sync.Map
-	core                *Core
-	deviceMgr           *DeviceManager
-	grpcNbiHdlr         *APIHandler
-	adapterProxy        *AdapterProxy
-	kafkaICProxy        *kafka.InterContainerProxy
-	clusterDataProxy    *model.Proxy
-	exitChannel         chan int
-	defaultTimeout      int64
+	logicalDeviceAgents            sync.Map
+	core                           *Core
+	deviceMgr                      *DeviceManager
+	grpcNbiHdlr                    *APIHandler
+	adapterProxy                   *AdapterProxy
+	kafkaICProxy                   *kafka.InterContainerProxy
+	clusterDataProxy               *model.Proxy
+	exitChannel                    chan int
+	defaultTimeout                 int64
+	logicalDevicesLoadingLock      sync.RWMutex
+	logicalDeviceLoadingInProgress map[string][]chan int
 }
 
 func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
@@ -49,6 +51,8 @@
 	logicalDeviceMgr.kafkaICProxy = kafkaICProxy
 	logicalDeviceMgr.clusterDataProxy = cdProxy
 	logicalDeviceMgr.defaultTimeout = timeout
+	logicalDeviceMgr.logicalDevicesLoadingLock = sync.RWMutex{}
+	logicalDeviceMgr.logicalDeviceLoadingInProgress = make(map[string][]chan int)
 	return &logicalDeviceMgr
 }
 
@@ -205,23 +209,49 @@
 
 // load loads a logical device manager in memory
 func (ldMgr *LogicalDeviceManager) load(lDeviceId string) error {
-	log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
-	// To prevent a race condition, let's hold the logical device agent map lock.  This will prevent a loading and
-	// a create logical device callback from occurring at the same time.
-	if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
-		// Proceed with the loading only if the logical device exist in the Model (could have been deleted)
-		if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
-			// Create a temp logical device Agent and let it load from memory
-			agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
-			if err := agent.start(nil, true); err != nil {
-				agent.stop(nil)
-				return err
-			}
-			ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
-		}
+	if lDeviceId == "" {
+		return nil
 	}
-	// TODO: load the child device
-	return nil
+	// Add a lock to prevent two concurrent calls from loading the same device twice
+	ldMgr.logicalDevicesLoadingLock.Lock()
+	if _, exist := ldMgr.logicalDeviceLoadingInProgress[lDeviceId]; !exist {
+		if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
+			ldMgr.logicalDeviceLoadingInProgress[lDeviceId] = []chan int{make(chan int, 1)}
+			ldMgr.logicalDevicesLoadingLock.Unlock()
+			if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
+				log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
+				agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+				if err := agent.start(nil, true); err != nil {
+					agent.stop(nil)
+				} else {
+					ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
+				}
+			} else {
+				log.Debugw("logicalDevice not in model", log.Fields{"lDeviceId": lDeviceId})
+			}
+			// announce completion of task to any number of waiting channels
+			ldMgr.logicalDevicesLoadingLock.Lock()
+			if v, ok := ldMgr.logicalDeviceLoadingInProgress[lDeviceId]; ok {
+				for _, ch := range v {
+					close(ch)
+				}
+				delete(ldMgr.logicalDeviceLoadingInProgress, lDeviceId)
+			}
+			ldMgr.logicalDevicesLoadingLock.Unlock()
+		} else {
+			ldMgr.logicalDevicesLoadingLock.Unlock()
+		}
+	} else {
+		ch := make(chan int, 1)
+		ldMgr.logicalDeviceLoadingInProgress[lDeviceId] = append(ldMgr.logicalDeviceLoadingInProgress[lDeviceId], ch)
+		ldMgr.logicalDevicesLoadingLock.Unlock()
+		//	Wait for the channel to be closed, implying the process loading this device is done.
+		<-ch
+	}
+	if _, exist := ldMgr.logicalDeviceAgents.Load(lDeviceId); exist {
+		return nil
+	}
+	return status.Errorf(codes.Aborted, "Error loading logical device %s", lDeviceId)
 }
 
 func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
@@ -416,11 +446,11 @@
 
 	log.Debugw("setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
 
-	if parentId == "" || logDeviceId == nil || *logDeviceId == "" {
+	if parentId == "" || logDeviceId == "" {
 		return errors.New("device-in-invalid-state")
 	}
 
-	if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
+	if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
 		if err := agent.setupUNILogicalPorts(ctx, childDevice); err != nil {
 			return err
 		}