[VOL-1890] Flow decomposition fails after a core switch over

This commit fixes the issue reported in VOL-1890 by doing
the following:
1) Update the device graph if the device graph was built
with an older logical device topology (this can happen in
the standby by core as currently the model does not trigger
a core callback after it updates its data in memory following
a watch event from the KV store - this is a TODO item).
2) Update flows in a logical device without using the data
model proxy to flows directly.
3) Reconcile the list of devices and logical devices in the
core memory after a restart upon receive a reconcile request
from the api server.

After first review:
1) Update the code as per the comments
2) Change the device loading channel to have two simultaneous
routines invoking the device loading for the same device to
return the same result (i.e. if it's a success then both
should return success and conversely in the failure case)

After second review:
Change the sync map controlling the device in loading state
to regular map.

Change-Id: I4331ac2a8f87a7de272e919a31dfe4bbaaf327a0
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 71843ff..235aca5 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -30,15 +30,17 @@
 )
 
 type LogicalDeviceManager struct {
-	logicalDeviceAgents sync.Map
-	core                *Core
-	deviceMgr           *DeviceManager
-	grpcNbiHdlr         *APIHandler
-	adapterProxy        *AdapterProxy
-	kafkaICProxy        *kafka.InterContainerProxy
-	clusterDataProxy    *model.Proxy
-	exitChannel         chan int
-	defaultTimeout      int64
+	logicalDeviceAgents            sync.Map
+	core                           *Core
+	deviceMgr                      *DeviceManager
+	grpcNbiHdlr                    *APIHandler
+	adapterProxy                   *AdapterProxy
+	kafkaICProxy                   *kafka.InterContainerProxy
+	clusterDataProxy               *model.Proxy
+	exitChannel                    chan int
+	defaultTimeout                 int64
+	logicalDevicesLoadingLock      sync.RWMutex
+	logicalDeviceLoadingInProgress map[string][]chan int
 }
 
 func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, timeout int64) *LogicalDeviceManager {
@@ -49,6 +51,8 @@
 	logicalDeviceMgr.kafkaICProxy = kafkaICProxy
 	logicalDeviceMgr.clusterDataProxy = cdProxy
 	logicalDeviceMgr.defaultTimeout = timeout
+	logicalDeviceMgr.logicalDevicesLoadingLock = sync.RWMutex{}
+	logicalDeviceMgr.logicalDeviceLoadingInProgress = make(map[string][]chan int)
 	return &logicalDeviceMgr
 }
 
@@ -205,23 +209,49 @@
 
 // load loads a logical device manager in memory
 func (ldMgr *LogicalDeviceManager) load(lDeviceId string) error {
-	log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
-	// To prevent a race condition, let's hold the logical device agent map lock.  This will prevent a loading and
-	// a create logical device callback from occurring at the same time.
-	if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
-		// Proceed with the loading only if the logical device exist in the Model (could have been deleted)
-		if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
-			// Create a temp logical device Agent and let it load from memory
-			agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
-			if err := agent.start(nil, true); err != nil {
-				agent.stop(nil)
-				return err
-			}
-			ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
-		}
+	if lDeviceId == "" {
+		return nil
 	}
-	// TODO: load the child device
-	return nil
+	// Add a lock to prevent two concurrent calls from loading the same device twice
+	ldMgr.logicalDevicesLoadingLock.Lock()
+	if _, exist := ldMgr.logicalDeviceLoadingInProgress[lDeviceId]; !exist {
+		if ldAgent, _ := ldMgr.logicalDeviceAgents.Load(lDeviceId); ldAgent == nil {
+			ldMgr.logicalDeviceLoadingInProgress[lDeviceId] = []chan int{make(chan int, 1)}
+			ldMgr.logicalDevicesLoadingLock.Unlock()
+			if _, err := ldMgr.getLogicalDeviceFromModel(lDeviceId); err == nil {
+				log.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceId})
+				agent := newLogicalDeviceAgent(lDeviceId, "", ldMgr, ldMgr.deviceMgr, ldMgr.clusterDataProxy, ldMgr.defaultTimeout)
+				if err := agent.start(nil, true); err != nil {
+					agent.stop(nil)
+				} else {
+					ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceId, agent)
+				}
+			} else {
+				log.Debugw("logicalDevice not in model", log.Fields{"lDeviceId": lDeviceId})
+			}
+			// announce completion of task to any number of waiting channels
+			ldMgr.logicalDevicesLoadingLock.Lock()
+			if v, ok := ldMgr.logicalDeviceLoadingInProgress[lDeviceId]; ok {
+				for _, ch := range v {
+					close(ch)
+				}
+				delete(ldMgr.logicalDeviceLoadingInProgress, lDeviceId)
+			}
+			ldMgr.logicalDevicesLoadingLock.Unlock()
+		} else {
+			ldMgr.logicalDevicesLoadingLock.Unlock()
+		}
+	} else {
+		ch := make(chan int, 1)
+		ldMgr.logicalDeviceLoadingInProgress[lDeviceId] = append(ldMgr.logicalDeviceLoadingInProgress[lDeviceId], ch)
+		ldMgr.logicalDevicesLoadingLock.Unlock()
+		//	Wait for the channel to be closed, implying the process loading this device is done.
+		<-ch
+	}
+	if _, exist := ldMgr.logicalDeviceAgents.Load(lDeviceId); exist {
+		return nil
+	}
+	return status.Errorf(codes.Aborted, "Error loading logical device %s", lDeviceId)
 }
 
 func (ldMgr *LogicalDeviceManager) deleteLogicalDevice(ctx context.Context, device *voltha.Device) error {
@@ -416,11 +446,11 @@
 
 	log.Debugw("setupUNILogicalPorts", log.Fields{"logDeviceId": logDeviceId, "parentId": parentId})
 
-	if parentId == "" || logDeviceId == nil || *logDeviceId == "" {
+	if parentId == "" || logDeviceId == "" {
 		return errors.New("device-in-invalid-state")
 	}
 
-	if agent := ldMgr.getLogicalDeviceAgent(*logDeviceId); agent != nil {
+	if agent := ldMgr.getLogicalDeviceAgent(logDeviceId); agent != nil {
 		if err := agent.setupUNILogicalPorts(ctx, childDevice); err != nil {
 			return err
 		}