[VOL-1825] Implemetation of the reconcile feature in the Core

This commit implements the reconcile feature in the Core as
well as the necessary changes in the GO libraries.  Changes were
also made in the simulated OLT and ONU adapters to react to a
reconcile request following an adapter restart. These changes
were tested in a 1-node deployment (1 core pair) and 3-nodes
deployment (3 core-pairs but single adapter instances).  Test
was also done using the openolt to ensure the reconcile request
does not cause panic in the Core or Openolt since the reconcile
feature is not implemented in Openolt.

Change-Id: I178d619fbcfcc6caa0ee3abfd70fc03c384c5fea
diff --git a/rw_core/core/adapter_manager.go b/rw_core/core/adapter_manager.go
index ac856df..0ce1828 100644
--- a/rw_core/core/adapter_manager.go
+++ b/rw_core/core/adapter_manager.go
@@ -93,13 +93,14 @@
 	clusterDataProxy            *model.Proxy
 	adapterProxy                *model.Proxy
 	deviceTypeProxy             *model.Proxy
+	deviceMgr                   *DeviceManager
 	coreInstanceId              string
 	exitChannel                 chan int
 	lockAdaptersMap             sync.RWMutex
 	lockdDeviceTypeToAdapterMap sync.RWMutex
 }
 
-func newAdapterManager(cdProxy *model.Proxy, coreInstanceId string) *AdapterManager {
+func newAdapterManager(cdProxy *model.Proxy, coreInstanceId string, deviceMgr *DeviceManager) *AdapterManager {
 	var adapterMgr AdapterManager
 	adapterMgr.exitChannel = make(chan int, 1)
 	adapterMgr.coreInstanceId = coreInstanceId
@@ -108,6 +109,7 @@
 	adapterMgr.deviceTypeToAdapterMap = make(map[string]string)
 	adapterMgr.lockAdaptersMap = sync.RWMutex{}
 	adapterMgr.lockdDeviceTypeToAdapterMap = sync.RWMutex{}
+	adapterMgr.deviceMgr = deviceMgr
 	return &adapterMgr
 }
 
@@ -169,7 +171,13 @@
 }
 
 //updateAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory() {
+func (aMgr *AdapterManager) updateAdaptersAndDevicetypesInMemory(adapter *voltha.Adapter) {
+	if aMgr.getAdapter(adapter.Id) != nil {
+		//	Already registered - Adapter may have restarted.  Trigger the reconcile process for that adapter
+		go aMgr.deviceMgr.adapterRestarted(adapter)
+		return
+	}
+
 	// Update the adapters
 	if adaptersIf := aMgr.clusterDataProxy.List(context.Background(), "/adapters", 0, false, ""); adaptersIf != nil {
 		for _, adapterIf := range adaptersIf.([]interface{}) {
@@ -306,7 +314,8 @@
 	log.Debugw("registerAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
 
 	if aMgr.getAdapter(adapter.Id) != nil {
-		//	Already registered
+		//	Already registered - Adapter may have restarted.  Trigger the reconcile process for that adapter
+		go aMgr.deviceMgr.adapterRestarted(adapter)
 		return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceId}
 	}
 	// Save the adapter and the device types
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 816c179..41f71a6 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -239,9 +239,19 @@
 	return nil, nil
 }
 
-func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
-	log.Debug("ReconcileDevice")
-	return nil
+func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) error {
+	log.Debugw("ReconcileDevice", log.Fields{"deviceId": device.Id})
+	rpc := "Reconcile_device"
+	toTopic := ap.getAdapterTopic(device.Adapter)
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+	}
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := ap.getCoreTopic()
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	log.Debugw("ReconcileDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 1a00db8..33d14a9 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -148,7 +148,7 @@
 		if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
 			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
 			// Update our adapters in memory
-			go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory()
+			go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
 			// returning nil, nil instructs the callee to ignore this request
 			return nil, nil
 		} else {
@@ -1135,3 +1135,48 @@
 	//}
 	return new(empty.Empty), nil
 }
+
+func (rhp *AdapterRequestHandlerProxy) ReconcileChildDevices(args []*ic.Argument) (*empty.Empty, error) {
+	if len(args) < 2 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+	parentDeviceId := &voltha.ID{}
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "parent_device_id":
+			if err := ptypes.UnmarshalAny(arg.Value, parentDeviceId); err != nil {
+				log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
+	}
+	log.Debugw("ReconcileChildDevices", log.Fields{"deviceId": parentDeviceId.Id, "transactionID": transactionID.Val})
+
+	// Try to grab the transaction as this core may be competing with another Core
+	if rhp.competeForTransaction() {
+		if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
+			log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
+			// returning nil, nil instructs the callee to ignore this request
+			return nil, nil
+		} else {
+			defer txn.Close()
+		}
+	}
+
+	if rhp.TestMode { // Execute only for test cases
+		return nil, nil
+	}
+
+	// Run it in its own routine
+	go rhp.deviceMgr.reconcileChildDevices(parentDeviceId.Id)
+
+	return new(empty.Empty), nil
+}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index d82119c..fca8cce 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -82,8 +82,9 @@
 		log.Fatal("Failure-starting-kafkaMessagingProxy")
 	}
 	log.Debugw("values", log.Fields{"kmp": core.kmp})
-	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
 	core.deviceMgr = newDeviceManager(core)
+	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId, core.deviceMgr)
+	core.deviceMgr.adapterMgr = core.adapterMgr
 	core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy, core.config.DefaultCoreTimeout)
 
 	if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 43d21f2..d053ac4 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -526,6 +526,105 @@
 	sendResponse(ctx, ch, res)
 }
 
+// isOkToReconcile validates whether a device is in the correct status to be reconciled
+func isOkToReconcile(device *voltha.Device) bool {
+	if device == nil {
+		return false
+	}
+	return device.AdminState != voltha.AdminState_PREPROVISIONED && device.AdminState != voltha.AdminState_DELETED
+}
+
+// adapterRestarted is invoked whenever an adapter is restarted
+func (dMgr *DeviceManager) adapterRestarted(adapter *voltha.Adapter) error {
+	log.Debugw("adapter-restarted", log.Fields{"adapter": adapter.Id})
+
+	// Let's reconcile the device managed by this Core only
+	rootDeviceIds := dMgr.core.deviceOwnership.GetAllDeviceIdsOwnedByMe()
+	if len(rootDeviceIds) == 0 {
+		log.Debugw("nothing-to-reconcile", log.Fields{"adapterId": adapter.Id})
+		return nil
+	}
+
+	chnlsList := make([]chan interface{}, 0)
+	for _, rootDeviceId := range rootDeviceIds {
+		if rootDevice, _ := dMgr.getDeviceFromModel(rootDeviceId); rootDevice != nil {
+			if rootDevice.Adapter == adapter.Id {
+				if isOkToReconcile(rootDevice) {
+					log.Debugw("reconciling-root-device", log.Fields{"rootId": rootDevice.Id})
+					chnlsList = dMgr.sendReconcileDeviceRequest(rootDevice, chnlsList)
+				} else {
+					log.Debugw("not-reconciling-root-device", log.Fields{"rootId": rootDevice.Id, "state": rootDevice.AdminState})
+				}
+			} else { // Should we be reconciling the root's children instead?
+			childManagedByAdapter:
+				for _, port := range rootDevice.Ports {
+					for _, peer := range port.Peers {
+						if childDevice, _ := dMgr.getDeviceFromModel(peer.DeviceId); childDevice != nil {
+							if childDevice.Adapter == adapter.Id {
+								if isOkToReconcile(childDevice) {
+									log.Debugw("reconciling-child-device", log.Fields{"childId": childDevice.Id})
+									chnlsList = dMgr.sendReconcileDeviceRequest(childDevice, chnlsList)
+								} else {
+									log.Debugw("not-reconciling-child-device", log.Fields{"childId": childDevice.Id, "state": childDevice.AdminState})
+								}
+							} else {
+								// All child devices under a parent device are typically managed by the same adapter type.
+								// Therefore we only need to check whether the first device we retrieved is managed by that adapter
+								break childManagedByAdapter
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+	if len(chnlsList) > 0 {
+		// Wait for completion
+		if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, chnlsList...); res != nil {
+			return status.Errorf(codes.Aborted, "errors-%s", res)
+		}
+	} else {
+		log.Debugw("no-managed-device-to-reconcile", log.Fields{"adapterId": adapter.Id})
+	}
+	return nil
+}
+
+func (dMgr *DeviceManager) sendReconcileDeviceRequest(device *voltha.Device, chnlsList []chan interface{}) []chan interface{} {
+	// Send a reconcile request to the adapter. Since this Core may not be managing this device then there is no
+	// point of creating a device agent (if the device is not being managed by this Core) before sending the request
+	// to the adapter.   We will therefore bypass the adapter adapter and send the request directly to teh adapter via
+	// the adapter_proxy.
+	ch := make(chan interface{})
+	chnlsList = append(chnlsList, ch)
+	go func(device *voltha.Device) {
+		if err := dMgr.adapterProxy.ReconcileDevice(context.Background(), device); err != nil {
+			log.Errorw("reconcile-request-failed", log.Fields{"deviceId": device.Id, "error": err})
+			ch <- status.Errorf(codes.Internal, "device: %s", device.Id)
+		}
+		ch <- nil
+	}(device)
+
+	return chnlsList
+}
+
+func (dMgr *DeviceManager) reconcileChildDevices(parentDeviceId string) error {
+	if parentDevice, _ := dMgr.getDeviceFromModel(parentDeviceId); parentDevice != nil {
+		chnlsList := make([]chan interface{}, 0)
+		for _, port := range parentDevice.Ports {
+			for _, peer := range port.Peers {
+				if childDevice, _ := dMgr.getDeviceFromModel(peer.DeviceId); childDevice != nil {
+					chnlsList = dMgr.sendReconcileDeviceRequest(childDevice, chnlsList)
+				}
+			}
+		}
+		// Wait for completion
+		if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, chnlsList...); res != nil {
+			return status.Errorf(codes.Aborted, "errors-%s", res)
+		}
+	}
+	return nil
+}
+
 func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
 	log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
 	if agent := dMgr.getDeviceAgent(device.Id); agent != nil {
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index 6efbdb8..4b30188 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -173,6 +173,19 @@
 	return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
 }
 
+// getAllDeviceIdsOwnedByMe returns all the deviceIds (root device Ids) that is managed by this Core
+func (da *DeviceOwnership) GetAllDeviceIdsOwnedByMe() []string {
+	deviceIds := []string{}
+	da.deviceMapLock.Lock()
+	defer da.deviceMapLock.Unlock()
+	for _, ownership := range da.deviceMap {
+		if ownership.owned {
+			deviceIds = append(deviceIds, ownership.id)
+		}
+	}
+	return deviceIds
+}
+
 // OwnedByMe returns where this Core instance active owns this device.   This function will automatically
 // trigger the process to monitor the device and update the device ownership regularly.
 func (da *DeviceOwnership) OwnedByMe(id interface{}) bool {
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 70349d8..941cb3b 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -80,13 +80,8 @@
 	log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId, "loadFromdB": loadFromdB})
 	var ld *voltha.LogicalDevice
 	if !loadFromdB {
-		//Build the logical device based on information retrieved from the device adapter
-		var switchCap *ic.SwitchCapability
 		var err error
-		if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
-			log.Errorw("error-creating-logical-device", log.Fields{"error": err})
-			return err
-		}
+		// First create and store the logical device
 		ld = &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
 
 		// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
@@ -96,9 +91,6 @@
 			return err
 		}
 		ld.DatapathId = datapathID
-		ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
-		log.Debugw("Switch-capability", log.Fields{"Desc": ld.Desc, "fromAd": switchCap.Desc})
-		ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
 		ld.Flows = &ofp.Flows{Items: nil}
 		ld.FlowGroups = &ofp.FlowGroups{Items: nil}
 
@@ -111,6 +103,25 @@
 		}
 		agent.lockLogicalDevice.Unlock()
 
+		//Retrieve the switch capability from the device adapter
+		var switchCap *ic.SwitchCapability
+		if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
+			log.Errorw("error-creating-logical-device", log.Fields{"error": err})
+			return err
+		}
+
+		// Save the data
+		agent.lockLogicalDevice.Lock()
+		if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
+			log.Warnw("failed-to-load-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
+			return err
+		}
+		clonedLd := (proto.Clone(ld)).(*voltha.LogicalDevice)
+		clonedLd.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
+		log.Debugw("Switch-capability", log.Fields{"Desc": clonedLd.Desc})
+		clonedLd.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
+		agent.lockLogicalDevice.Unlock()
+
 		// TODO:  Set the logical ports in a separate call once the port update issue is fixed.
 		go agent.setupLogicalPorts(ctx)
 
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index b871cd4..a097736 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -134,23 +134,12 @@
 	return result, nil
 }
 
+//listLogicalDevices returns the list of all logical devices
 func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
 	log.Debug("ListAllLogicalDevices")
 	result := &voltha.LogicalDevices{}
 	if logicalDevices := ldMgr.clusterDataProxy.List(context.Background(), "/logical_devices", 0, false, ""); logicalDevices != nil {
 		for _, logicalDevice := range logicalDevices.([]interface{}) {
-			if agent := ldMgr.getLogicalDeviceAgent(logicalDevice.(*voltha.LogicalDevice).Id); agent == nil {
-				agent = newLogicalDeviceAgent(
-					logicalDevice.(*voltha.LogicalDevice).Id,
-					logicalDevice.(*voltha.LogicalDevice).RootDeviceId,
-					ldMgr,
-					ldMgr.deviceMgr,
-					ldMgr.clusterDataProxy,
-					ldMgr.defaultTimeout,
-				)
-				ldMgr.addLogicalDeviceAgentToMap(agent)
-				go agent.start(nil, true)
-			}
 			result.Items = append(result.Items, logicalDevice.(*voltha.LogicalDevice))
 		}
 	}