VOL-2226 reconcile device agent when taking over device from failed core;
remove lastData from agent struct and use deviceId/deviceType instead

Change-Id: I5321a4cf29c61a965f52cfada708604391947a1b
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 97c0b2d..e01d419 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -40,7 +40,6 @@
 	parentId         string
 	deviceType       string
 	isRootdevice     bool
-	lastData         *voltha.Device
 	adapterProxy     *AdapterProxy
 	adapterMgr       *AdapterManager
 	deviceMgr        *DeviceManager
@@ -51,28 +50,19 @@
 	defaultTimeout   int64
 }
 
-//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
-//preprovisioning
+//newDeviceAgent creates a new device agent. The device will be initialized when start() is called.
 func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout int64) *DeviceAgent {
 	var agent DeviceAgent
 	agent.adapterProxy = ap
-	cloned := (proto.Clone(device)).(*voltha.Device)
-	if cloned.Id == "" {
-		cloned.Id = CreateDeviceId()
-		cloned.AdminState = voltha.AdminState_PREPROVISIONED
-		cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
-		cloned.Flows = &ofp.Flows{Items: nil}
+	if device.Id == "" {
+		agent.deviceId = CreateDeviceId()
+	} else {
+		agent.deviceId = device.Id
 	}
-	if !device.GetRoot() && device.ProxyAddress != nil {
-		// Set the default vlan ID to the one specified by the parent adapter.  It can be
-		// overwritten by the child adapter during a device update request
-		cloned.Vlan = device.ProxyAddress.ChannelId
-	}
+
 	agent.isRootdevice = device.Root
-	agent.deviceId = cloned.Id
 	agent.parentId = device.ParentId
-	agent.deviceType = cloned.Type
-	agent.lastData = cloned
+	agent.deviceType = device.Type
 	agent.deviceMgr = deviceMgr
 	agent.adapterMgr = deviceMgr.adapterMgr
 	agent.exitChannel = make(chan int, 1)
@@ -82,28 +72,51 @@
 	return &agent
 }
 
-// start save the device to the data model and registers for callbacks on that device if loadFromdB is false.  Otherwise,
-// it will load the data from the dB and setup teh necessary callbacks and proxies.
-func (agent *DeviceAgent) start(ctx context.Context, loadFromdB bool) error {
+// start()
+// save the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.  Otherwise,
+// it will load the data from the dB and setup teh necessary callbacks and proxies. Returns the device that
+// was started.
+func (agent *DeviceAgent) start(ctx context.Context, deviceToCreate *voltha.Device) (*voltha.Device, error) {
+	var device *voltha.Device
+
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceId})
-	if loadFromdB {
-		if device := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceId, 1, false, ""); device != nil {
-			if d, ok := device.(*voltha.Device); ok {
-				agent.lastData = proto.Clone(d).(*voltha.Device)
-				agent.deviceType = agent.lastData.Adapter
+	if deviceToCreate == nil {
+		// Load the existing device
+		if loadedDevice := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceId, 1, true, ""); loadedDevice != nil {
+			var ok bool
+			if device, ok = loadedDevice.(*voltha.Device); ok {
+				agent.deviceType = device.Adapter
+			} else {
+				log.Errorw("failed-to-convert-device", log.Fields{"deviceId": agent.deviceId})
+				return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
 			}
 		} else {
 			log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
-			return status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+			return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
 		}
 		log.Debugw("device-loaded-from-dB", log.Fields{"deviceId": agent.deviceId})
 	} else {
+		// Create a new device
+		// Assumption is that AdminState, FlowGroups, and Flows are unitialized since this
+		// is a new device, so populate them here before passing the device to clusterDataProxy.AddWithId.
+		// agent.deviceId will also have been set during newDeviceAgent().
+		device = (proto.Clone(deviceToCreate)).(*voltha.Device)
+		device.Id = agent.deviceId
+		device.AdminState = voltha.AdminState_PREPROVISIONED
+		device.FlowGroups = &ofp.FlowGroups{Items: nil}
+		device.Flows = &ofp.Flows{Items: nil}
+		if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil {
+			// Set the default vlan ID to the one specified by the parent adapter.  It can be
+			// overwritten by the child adapter during a device update request
+			device.Vlan = deviceToCreate.ProxyAddress.ChannelId
+		}
+
 		// Add the initial device to the local model
-		if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceId, agent.lastData, ""); added == nil {
+		if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceId, device, ""); added == nil {
 			log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
-			return status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceId)
+			return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceId)
 		}
 	}
 
@@ -111,7 +124,7 @@
 	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
 
 	log.Debugw("device-agent-started", log.Fields{"deviceId": agent.deviceId})
-	return nil
+	return device, nil
 }
 
 // stop stops the device agent.  Not much to do for now
@@ -128,6 +141,20 @@
 
 }
 
+// Load the most recent state from the KVStore for the device.
+func (agent *DeviceAgent) reconcileWithKVStore() {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debug("reconciling-device-agent-devicetype")
+	// TODO: context timeout
+	if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceId, 1, true, ""); device != nil {
+		if d, ok := device.(*voltha.Device); ok {
+			agent.deviceType = d.Adapter
+			log.Debugw("reconciled-device-agent-devicetype", log.Fields{"Id": agent.deviceId, "type": agent.deviceType})
+		}
+	}
+}
+
 // GetDevice retrieves the latest device information from the data model
 func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
 	agent.lockDevice.RLock()
@@ -199,12 +226,12 @@
 		// Adopt the device if it was in preprovision state.  In all other cases, try to reenable it.
 		if previousAdminState == voltha.AdminState_PREPROVISIONED {
 			if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
-				log.Debugw("adoptDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+				log.Debugw("adoptDevice-error", log.Fields{"id": agent.deviceId, "error": err})
 				return err
 			}
 		} else {
 			if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
-				log.Debugw("renableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+				log.Debugw("renableDevice-error", log.Fields{"id": agent.deviceId, "error": err})
 				return err
 			}
 		}
@@ -221,7 +248,7 @@
 
 func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
 	if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
-		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceId, "error": err})
 		ch <- err
 	}
 	ch <- nil
@@ -229,7 +256,7 @@
 
 func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
 	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups, flowMetadata); err != nil {
-		log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.lastData.Id, "error": err})
+		log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.deviceId, "error": err})
 		ch <- err
 	}
 	ch <- nil
@@ -559,7 +586,7 @@
 		}
 
 		if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
-			log.Debugw("disableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			log.Debugw("disableDevice-error", log.Fields{"id": agent.deviceId, "error": err})
 			return err
 		}
 	}
@@ -597,7 +624,7 @@
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
 		if err := agent.adapterProxy.RebootDevice(ctx, device); err != nil {
-			log.Debugw("rebootDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			log.Debugw("rebootDevice-error", log.Fields{"id": agent.deviceId, "error": err})
 			return err
 		}
 	}
@@ -625,7 +652,7 @@
 		if device.AdminState != voltha.AdminState_PREPROVISIONED {
 			// Send the request to an Adapter only if the device is not in poreporovision state and wait for a response
 			if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
-				log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+				log.Debugw("deleteDevice-error", log.Fields{"id": agent.deviceId, "error": err})
 				return err
 			}
 		}
@@ -679,7 +706,7 @@
 		}
 		// Send the request to the adapter
 		if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
-			log.Errorw("update-pm-configs-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			log.Errorw("update-pm-configs-error", log.Fields{"id": agent.deviceId, "error": err})
 			return err
 		}
 		return nil
@@ -747,7 +774,7 @@
 		}
 		// Send the request to the adapter
 		if err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg); err != nil {
-			log.Debugw("downloadImage-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+			log.Debugw("downloadImage-error", log.Fields{"id": agent.deviceId, "error": err, "image": img.Name})
 			return nil, err
 		}
 	}
@@ -793,7 +820,7 @@
 			}
 			// Send the request to teh adapter
 			if err := agent.adapterProxy.CancelImageDownload(ctx, device, img); err != nil {
-				log.Debugw("cancelImageDownload-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+				log.Debugw("cancelImageDownload-error", log.Fields{"id": agent.deviceId, "error": err, "image": img.Name})
 				return nil, err
 			}
 		}
@@ -831,7 +858,7 @@
 		}
 
 		if err := agent.adapterProxy.ActivateImageUpdate(ctx, device, img); err != nil {
-			log.Debugw("activateImage-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+			log.Debugw("activateImage-error", log.Fields{"id": agent.deviceId, "error": err, "image": img.Name})
 			return nil, err
 		}
 		// The status of the AdminState will be changed following the update_download_status response from the adapter
@@ -869,7 +896,7 @@
 		}
 
 		if err := agent.adapterProxy.RevertImageUpdate(ctx, device, img); err != nil {
-			log.Debugw("revertImage-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+			log.Debugw("revertImage-error", log.Fields{"id": agent.deviceId, "error": err, "image": img.Name})
 			return nil, err
 		}
 	}
@@ -885,7 +912,7 @@
 		return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
 		if resp, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img); err != nil {
-			log.Debugw("getImageDownloadStatus-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+			log.Debugw("getImageDownloadStatus-error", log.Fields{"id": agent.deviceId, "error": err, "image": img.Name})
 			return nil, err
 		} else {
 			return resp, nil
@@ -1004,10 +1031,15 @@
 }
 
 func (agent *DeviceAgent) packetOut(outPort uint32, packet *ofp.OfpPacketOut) error {
+	// If deviceType=="" then we must have taken ownership of this device.
+	// Fixes VOL-2226 where a core would take ownership and have stale data
+	if agent.deviceType == "" {
+		agent.reconcileWithKVStore()
+	}
 	//	Send packet to adapter
 	if err := agent.adapterProxy.packetOut(agent.deviceType, agent.deviceId, outPort, packet); err != nil {
 		log.Debugw("packet-out-error", log.Fields{
-			"id":     agent.lastData.Id,
+			"id":     agent.deviceId,
 			"error":  err,
 			"packet": hex.EncodeToString(packet.Data),
 		})
@@ -1335,7 +1367,7 @@
 	} else {
 		// First send the request to an Adapter and wait for a response
 		if err := agent.adapterProxy.SimulateAlarm(ctx, device, simulatereq); err != nil {
-			log.Debugw("simulateAlarm-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			log.Debugw("simulateAlarm-error", log.Fields{"id": agent.deviceId, "error": err})
 			return err
 		}
 	}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 2a03dd4..56db2b1 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -159,9 +159,14 @@
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 	dMgr.addDeviceAgentToMap(agent)
-	agent.start(ctx, false)
+	device, err := agent.start(ctx, device)
+	if err != nil {
+		log.Errorf("Failed to start device")
+		sendResponse(ctx, ch, errors.New("Failed to start device"))
+		return
+	}
 
-	sendResponse(ctx, ch, agent.lastData)
+	sendResponse(ctx, ch, device)
 }
 
 func (dMgr *DeviceManager) enableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
@@ -376,7 +381,7 @@
 			if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
 				log.Debugw("loading-device-from-Model", log.Fields{"id": device.(*voltha.Device).Id})
 				agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
-				if err := agent.start(nil, true); err != nil {
+				if _, err := agent.start(nil, nil); err != nil {
 					log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
 					agent.stop(nil)
 				} else {
@@ -435,7 +440,7 @@
 			if device, err = dMgr.getDeviceFromModel(deviceId); err == nil {
 				log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
 				agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
-				if err = agent.start(nil, true); err != nil {
+				if _, err = agent.start(nil, nil); err != nil {
 					log.Warnw("Failure loading device", log.Fields{"deviceId": deviceId, "error": err})
 					agent.stop(nil)
 				} else {
@@ -949,7 +954,11 @@
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 	dMgr.addDeviceAgentToMap(agent)
-	agent.start(nil, false)
+	childDevice, err = agent.start(nil, childDevice)
+	if err != nil {
+		log.Error("error-starting-child")
+		return nil, err
+	}
 
 	// Since this Core has handled this request then it therefore owns this child device.  Set the
 	// ownership of this device to this Core
@@ -963,7 +972,7 @@
 	// Publish on the messaging bus that we have discovered new devices
 	go dMgr.kafkaICProxy.DeviceDiscovered(agent.deviceId, deviceType, parentDeviceId, dMgr.coreInstanceId)
 
-	return agent.lastData, nil
+	return childDevice, nil
 }
 
 func (dMgr *DeviceManager) processTransition(previous *voltha.Device, current *voltha.Device) error {