[VOL-1462] Sync data between two voltha cores in the same pair

This commit consists of the following updates:
1) Background data syncing between two cores after a transaction
is completed by one core.
2) Add transaction management to southbound APIs (adapter facing).
This is enabled got adapter registration only for now.
3) Fix an issue with flow decomposition
4) Add the rough-in to allow a packet to be send to an OFAgent
with a transaction ID.  Two cores can therefore send the same
packet and let the OFAgent discard the duplicate.  The work in
OFAgent remains.
5) Cleanups

Change-Id: Ibe9d75edb66cfd6a0954bdfeb16a7e7c8a3c53b6
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index cd662ac..88b0c7d 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -34,7 +34,7 @@
 type DeviceManager struct {
 	deviceAgents        map[string]*DeviceAgent
 	adapterProxy        *AdapterProxy
-	adapterMgr *AdapterManager
+	adapterMgr          *AdapterManager
 	logicalDeviceMgr    *LogicalDeviceManager
 	kafkaICProxy        *kafka.InterContainerProxy
 	stateTransitions    *TransitionMap
@@ -52,7 +52,7 @@
 	deviceMgr.kafkaICProxy = kafkaICProxy
 	deviceMgr.coreInstanceId = coreInstanceId
 	deviceMgr.clusterDataProxy = cdProxy
-	deviceMgr.adapterMgr= adapterMgr
+	deviceMgr.adapterMgr = adapterMgr
 	deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
 	return &deviceMgr
 }
@@ -96,16 +96,27 @@
 	delete(dMgr.deviceAgents, agent.deviceId)
 }
 
+// getDeviceAgent returns the agent managing the device.  If the device is not in memory, it will loads it, if it exists
 func (dMgr *DeviceManager) getDeviceAgent(deviceId string) *DeviceAgent {
-	// TODO If the device is not in memory it needs to be loaded first
 	dMgr.lockDeviceAgentsMap.Lock()
-	defer dMgr.lockDeviceAgentsMap.Unlock()
 	if agent, ok := dMgr.deviceAgents[deviceId]; ok {
+		dMgr.lockDeviceAgentsMap.Unlock()
 		return agent
+	} else {
+		//	Try to load into memory - loading will also create the device agent
+		dMgr.lockDeviceAgentsMap.Unlock()
+		if err := dMgr.load(deviceId); err == nil {
+			dMgr.lockDeviceAgentsMap.Lock()
+			defer dMgr.lockDeviceAgentsMap.Unlock()
+			if agent, ok = dMgr.deviceAgents[deviceId]; ok {
+				return agent
+			}
+		}
 	}
 	return nil
 }
 
+// listDeviceIdsFromMap returns the list of device IDs that are in memory
 func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
 	dMgr.lockDeviceAgentsMap.Lock()
 	defer dMgr.lockDeviceAgentsMap.Unlock()
@@ -122,7 +133,7 @@
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy)
 	dMgr.addDeviceAgentToMap(agent)
-	agent.start(ctx)
+	agent.start(ctx, false)
 
 	sendResponse(ctx, ch, agent.lastData)
 }
@@ -133,8 +144,6 @@
 	if agent := dMgr.getDeviceAgent(id.Id); agent != nil {
 		res = agent.enableDevice(ctx)
 		log.Debugw("EnableDevice-result", log.Fields{"result": res})
-	} else {
-		res = status.Errorf(codes.NotFound, "%s", id.Id)
 	}
 
 	sendResponse(ctx, ch, res)
@@ -181,6 +190,7 @@
 	sendResponse(ctx, ch, res)
 }
 
+// GetDevice will returns a device, either from memory or from the dB, if present
 func (dMgr *DeviceManager) GetDevice(id string) (*voltha.Device, error) {
 	log.Debugw("GetDevice", log.Fields{"deviceid": id})
 	if agent := dMgr.getDeviceAgent(id); agent != nil {
@@ -189,6 +199,13 @@
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
 
+func (dMgr *DeviceManager) IsDeviceInCache(id string) bool {
+	dMgr.lockDeviceAgentsMap.Lock()
+	defer dMgr.lockDeviceAgentsMap.Unlock()
+	_, exist := dMgr.deviceAgents[id]
+	return exist
+}
+
 func (dMgr *DeviceManager) IsRootDevice(id string) (bool, error) {
 	device, err := dMgr.GetDevice(id)
 	if err != nil {
@@ -203,10 +220,15 @@
 	result := &voltha.Devices{}
 	if devices := dMgr.clusterDataProxy.List("/devices", 0, false, ""); devices != nil {
 		for _, device := range devices.([]interface{}) {
-			if agent := dMgr.getDeviceAgent(device.(*voltha.Device).Id); agent == nil {
-				agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
-				dMgr.addDeviceAgentToMap(agent)
-				agent.start(nil)
+			// If device is not in memory then set it up
+			if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
+				agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+				if err := agent.start(nil, true); err != nil {
+					log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
+					agent.stop(nil)
+				} else {
+					dMgr.addDeviceAgentToMap(agent)
+				}
 			}
 			result.Items = append(result.Items, device.(*voltha.Device))
 		}
@@ -214,6 +236,97 @@
 	return result, nil
 }
 
+// loadDevice loads the deviceId in memory, if not present
+func (dMgr *DeviceManager) loadDevice(deviceId string) (*DeviceAgent, error) {
+	log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
+	// Sanity check
+	if deviceId == "" {
+		return nil, status.Error(codes.InvalidArgument, "deviceId empty")
+	}
+	if !dMgr.IsDeviceInCache(deviceId) {
+		agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id: deviceId}, dMgr, dMgr.clusterDataProxy)
+		if err := agent.start(nil, true); err != nil {
+			agent.stop(nil)
+			return nil, err
+		}
+		dMgr.addDeviceAgentToMap(agent)
+	}
+	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+		return agent, nil
+	}
+	return nil, status.Error(codes.NotFound, deviceId)  // This should nto happen
+}
+
+// loadRootDeviceParentAndChildren loads the children and parents of a root device in memory
+func (dMgr *DeviceManager) loadRootDeviceParentAndChildren(device *voltha.Device) error {
+	log.Debugw("loading-parent-and-children", log.Fields{"deviceId": device.Id})
+	if device.Root {
+		// Scenario A
+		if device.ParentId != "" {
+			//	 Load logical device if needed.
+			if err := dMgr.logicalDeviceMgr.load(device.ParentId); err != nil {
+				log.Warnw("failure-loading-logical-device", log.Fields{"lDeviceId": device.ParentId})
+			}
+		} else {
+			log.Debugw("no-parent-to-load", log.Fields{"deviceId": device.Id})
+		}
+		//	Load all child devices, if needed
+		if childDeviceIds, err := dMgr.getAllChildDeviceIds(device); err == nil {
+			for _, childDeviceId := range childDeviceIds {
+				if _, err := dMgr.loadDevice(childDeviceId); err != nil {
+					log.Warnw("failure-loading-device", log.Fields{"deviceId": childDeviceId})
+					return err
+				}
+			}
+			log.Debugw("loaded-children", log.Fields{"deviceId": device.Id, "numChildren": len(childDeviceIds)})
+		} else {
+			log.Debugw("no-child-to-load", log.Fields{"deviceId": device.Id})
+		}
+	}
+	return nil
+}
+
+// load loads the deviceId in memory, if not present, and also loads its accompanying parents and children.  Loading
+// in memory is for improved performance.  It is not imperative that a device needs to be in memory when a request
+// acting on the device is received by the core. In such a scenario, the Core will load the device in memory first
+// and the proceed with the request.
+func (dMgr *DeviceManager) load(deviceId string) error {
+	log.Debug("load...")
+	// First load the device - this may fail in case the device was deleted intentionally by the other core
+	var dAgent *DeviceAgent
+	var err error
+	if dAgent, err = dMgr.loadDevice(deviceId); err != nil {
+		log.Warnw("failure-loading-device", log.Fields{"deviceId": deviceId})
+		return err
+	}
+	// Get the loaded device details
+	var device *voltha.Device
+	if device, err = dAgent.getDevice(); err != nil {
+		return err
+	}
+
+	// If the device is in Pre-provisioning or deleted state stop here
+	if device.AdminState == voltha.AdminState_PREPROVISIONED || device.AdminState == voltha.AdminState_DELETED {
+		return nil
+	}
+
+	// Now we face two scenarios
+	if device.Root {
+		// Load all children as well as the parent of this device (logical_device)
+		if err := dMgr.loadRootDeviceParentAndChildren(device); err != nil {
+			log.Warnw("failure-loading-device-parent-and-children", log.Fields{"deviceId": deviceId})
+			return err
+		}
+		log.Debugw("successfully-loaded-parent-and-children", log.Fields{"deviceId": deviceId})
+	} else {
+		//	Scenario B - use the parentId of that device (root device) to trigger the loading
+		if device.ParentId != "" {
+			return dMgr.load(device.ParentId)
+		}
+	}
+	return nil
+}
+
 // ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
 func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
 	log.Debug("ListDeviceIDs")
@@ -230,17 +343,17 @@
 		reconciled := 0
 		for _, id := range ids.Items {
 			//	 Act on the device only if its not present in the agent map
-			if agent := dMgr.getDeviceAgent(id.Id); agent == nil {
+			if !dMgr.IsDeviceInCache(id.Id) {
 				//	Device Id not in memory
 				log.Debugw("reconciling-device", log.Fields{"id": id.Id})
-				// Load device from model
-				if device := dMgr.clusterDataProxy.Get("/devices/"+id.Id, 0, false, ""); device != nil {
-					agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
-					dMgr.addDeviceAgentToMap(agent)
-					agent.start(nil)
-					reconciled += 1
+				// Load device from dB
+				agent := newDeviceAgent(dMgr.adapterProxy, &voltha.Device{Id:id.Id}, dMgr, dMgr.clusterDataProxy)
+				if err := agent.start(nil, true); err != nil {
+					log.Warnw("failure-loading-device", log.Fields{"deviceId": id.Id})
+					agent.stop(nil)
 				} else {
-					log.Warnw("device-inexistent", log.Fields{"id": id.Id})
+					dMgr.addDeviceAgentToMap(agent)
+					reconciled += 1
 				}
 			} else {
 				reconciled += 1
@@ -393,7 +506,7 @@
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy)
 	dMgr.addDeviceAgentToMap(agent)
-	agent.start(nil)
+	agent.start(nil, false)
 
 	// Activate the child device
 	if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
@@ -430,7 +543,7 @@
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-func (dMgr *DeviceManager) PacketIn(deviceId string, port uint32, packet []byte) error {
+func (dMgr *DeviceManager) PacketIn(deviceId string, port uint32, transactionId string, packet []byte) error {
 	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId, "port": port})
 	// Get the logical device Id based on the deviceId
 	var device *voltha.Device
@@ -444,7 +557,7 @@
 		return status.Errorf(codes.FailedPrecondition, "%s", deviceId)
 	}
 
-	if err := dMgr.logicalDeviceMgr.packetIn(device.ParentId, port, packet); err != nil {
+	if err := dMgr.logicalDeviceMgr.packetIn(device.ParentId, port, transactionId, packet); err != nil {
 		return err
 	}
 	return nil
@@ -576,6 +689,23 @@
 	return childDeviceIds, nil
 }
 
+//getAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
+func (dMgr *DeviceManager) getAllChildDevices(parentDeviceId string) (*voltha.Devices, error) {
+	log.Debugw("getAllChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+	if parentDevice, err := dMgr.GetDevice(parentDeviceId); err == nil {
+		childDevices := make([]*voltha.Device, 0)
+		if childDeviceIds, er := dMgr.getAllChildDeviceIds(parentDevice); er == nil {
+			for _, deviceId := range childDeviceIds {
+				if d, e := dMgr.GetDevice(deviceId); e == nil && d != nil {
+					childDevices = append(childDevices, d)
+				}
+			}
+		}
+		return &voltha.Devices{Items: childDevices}, nil
+	}
+	return nil, status.Errorf(codes.NotFound, "%s", parentDeviceId)
+}
+
 func (dMgr *DeviceManager) addUNILogicalPort(cDevice *voltha.Device) error {
 	log.Info("addUNILogicalPort")
 	if err := dMgr.logicalDeviceMgr.addUNILogicalPort(nil, cDevice); err != nil {
@@ -585,7 +715,6 @@
 	return nil
 }
 
-
 func (dMgr *DeviceManager) downloadImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
 	log.Debugw("downloadImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
 	var res interface{}
@@ -661,7 +790,6 @@
 	sendResponse(ctx, ch, res)
 }
 
-
 func (dMgr *DeviceManager) updateImageDownload(deviceId string, img *voltha.ImageDownload) error {
 	log.Debugw("updateImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
@@ -691,7 +819,6 @@
 	return nil, status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-
 func (dMgr *DeviceManager) activateDevice(cDevice *voltha.Device) error {
 	log.Info("activateDevice")
 	return nil