VOL-2226 reconcile device agent when taking over device from failed core;
remove lastData from agent struct and use deviceId/deviceType instead
Change-Id: I5321a4cf29c61a965f52cfada708604391947a1b
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 97c0b2d..e01d419 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -40,7 +40,6 @@
parentId string
deviceType string
isRootdevice bool
- lastData *voltha.Device
adapterProxy *AdapterProxy
adapterMgr *AdapterManager
deviceMgr *DeviceManager
@@ -51,28 +50,19 @@
defaultTimeout int64
}
-//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
-//preprovisioning
+//newDeviceAgent creates a new device agent. The device will be initialized when start() is called.
func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy, timeout int64) *DeviceAgent {
var agent DeviceAgent
agent.adapterProxy = ap
- cloned := (proto.Clone(device)).(*voltha.Device)
- if cloned.Id == "" {
- cloned.Id = CreateDeviceId()
- cloned.AdminState = voltha.AdminState_PREPROVISIONED
- cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
- cloned.Flows = &ofp.Flows{Items: nil}
+ if device.Id == "" {
+ agent.deviceId = CreateDeviceId()
+ } else {
+ agent.deviceId = device.Id
}
- if !device.GetRoot() && device.ProxyAddress != nil {
- // Set the default vlan ID to the one specified by the parent adapter. It can be
- // overwritten by the child adapter during a device update request
- cloned.Vlan = device.ProxyAddress.ChannelId
- }
+
agent.isRootdevice = device.Root
- agent.deviceId = cloned.Id
agent.parentId = device.ParentId
- agent.deviceType = cloned.Type
- agent.lastData = cloned
+ agent.deviceType = device.Type
agent.deviceMgr = deviceMgr
agent.adapterMgr = deviceMgr.adapterMgr
agent.exitChannel = make(chan int, 1)
@@ -82,28 +72,51 @@
return &agent
}
-// start save the device to the data model and registers for callbacks on that device if loadFromdB is false. Otherwise,
-// it will load the data from the dB and setup teh necessary callbacks and proxies.
-func (agent *DeviceAgent) start(ctx context.Context, loadFromdB bool) error {
+// start()
+// save the device to the data model and registers for callbacks on that device if deviceToCreate!=nil. Otherwise,
+// it will load the data from the dB and setup teh necessary callbacks and proxies. Returns the device that
+// was started.
+func (agent *DeviceAgent) start(ctx context.Context, deviceToCreate *voltha.Device) (*voltha.Device, error) {
+ var device *voltha.Device
+
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceId})
- if loadFromdB {
- if device := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceId, 1, false, ""); device != nil {
- if d, ok := device.(*voltha.Device); ok {
- agent.lastData = proto.Clone(d).(*voltha.Device)
- agent.deviceType = agent.lastData.Adapter
+ if deviceToCreate == nil {
+ // Load the existing device
+ if loadedDevice := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceId, 1, true, ""); loadedDevice != nil {
+ var ok bool
+ if device, ok = loadedDevice.(*voltha.Device); ok {
+ agent.deviceType = device.Adapter
+ } else {
+ log.Errorw("failed-to-convert-device", log.Fields{"deviceId": agent.deviceId})
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
}
} else {
log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
- return status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
}
log.Debugw("device-loaded-from-dB", log.Fields{"deviceId": agent.deviceId})
} else {
+ // Create a new device
+ // Assumption is that AdminState, FlowGroups, and Flows are unitialized since this
+ // is a new device, so populate them here before passing the device to clusterDataProxy.AddWithId.
+ // agent.deviceId will also have been set during newDeviceAgent().
+ device = (proto.Clone(deviceToCreate)).(*voltha.Device)
+ device.Id = agent.deviceId
+ device.AdminState = voltha.AdminState_PREPROVISIONED
+ device.FlowGroups = &ofp.FlowGroups{Items: nil}
+ device.Flows = &ofp.Flows{Items: nil}
+ if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil {
+ // Set the default vlan ID to the one specified by the parent adapter. It can be
+ // overwritten by the child adapter during a device update request
+ device.Vlan = deviceToCreate.ProxyAddress.ChannelId
+ }
+
// Add the initial device to the local model
- if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceId, agent.lastData, ""); added == nil {
+ if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceId, device, ""); added == nil {
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
- return status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceId)
+ return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceId)
}
}
@@ -111,7 +124,7 @@
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
log.Debugw("device-agent-started", log.Fields{"deviceId": agent.deviceId})
- return nil
+ return device, nil
}
// stop stops the device agent. Not much to do for now
@@ -128,6 +141,20 @@
}
+// Load the most recent state from the KVStore for the device.
+func (agent *DeviceAgent) reconcileWithKVStore() {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debug("reconciling-device-agent-devicetype")
+ // TODO: context timeout
+ if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceId, 1, true, ""); device != nil {
+ if d, ok := device.(*voltha.Device); ok {
+ agent.deviceType = d.Adapter
+ log.Debugw("reconciled-device-agent-devicetype", log.Fields{"Id": agent.deviceId, "type": agent.deviceType})
+ }
+ }
+}
+
// GetDevice retrieves the latest device information from the data model
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
agent.lockDevice.RLock()
@@ -199,12 +226,12 @@
// Adopt the device if it was in preprovision state. In all other cases, try to reenable it.
if previousAdminState == voltha.AdminState_PREPROVISIONED {
if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
- log.Debugw("adoptDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ log.Debugw("adoptDevice-error", log.Fields{"id": agent.deviceId, "error": err})
return err
}
} else {
if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
- log.Debugw("renableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ log.Debugw("renableDevice-error", log.Fields{"id": agent.deviceId, "error": err})
return err
}
}
@@ -221,7 +248,7 @@
func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
- log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceId, "error": err})
ch <- err
}
ch <- nil
@@ -229,7 +256,7 @@
func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, ch chan interface{}) {
if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups, flowMetadata); err != nil {
- log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.deviceId, "error": err})
ch <- err
}
ch <- nil
@@ -559,7 +586,7 @@
}
if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
- log.Debugw("disableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ log.Debugw("disableDevice-error", log.Fields{"id": agent.deviceId, "error": err})
return err
}
}
@@ -597,7 +624,7 @@
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
if err := agent.adapterProxy.RebootDevice(ctx, device); err != nil {
- log.Debugw("rebootDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ log.Debugw("rebootDevice-error", log.Fields{"id": agent.deviceId, "error": err})
return err
}
}
@@ -625,7 +652,7 @@
if device.AdminState != voltha.AdminState_PREPROVISIONED {
// Send the request to an Adapter only if the device is not in poreporovision state and wait for a response
if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
- log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ log.Debugw("deleteDevice-error", log.Fields{"id": agent.deviceId, "error": err})
return err
}
}
@@ -679,7 +706,7 @@
}
// Send the request to the adapter
if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
- log.Errorw("update-pm-configs-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ log.Errorw("update-pm-configs-error", log.Fields{"id": agent.deviceId, "error": err})
return err
}
return nil
@@ -747,7 +774,7 @@
}
// Send the request to the adapter
if err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg); err != nil {
- log.Debugw("downloadImage-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+ log.Debugw("downloadImage-error", log.Fields{"id": agent.deviceId, "error": err, "image": img.Name})
return nil, err
}
}
@@ -793,7 +820,7 @@
}
// Send the request to teh adapter
if err := agent.adapterProxy.CancelImageDownload(ctx, device, img); err != nil {
- log.Debugw("cancelImageDownload-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+ log.Debugw("cancelImageDownload-error", log.Fields{"id": agent.deviceId, "error": err, "image": img.Name})
return nil, err
}
}
@@ -831,7 +858,7 @@
}
if err := agent.adapterProxy.ActivateImageUpdate(ctx, device, img); err != nil {
- log.Debugw("activateImage-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+ log.Debugw("activateImage-error", log.Fields{"id": agent.deviceId, "error": err, "image": img.Name})
return nil, err
}
// The status of the AdminState will be changed following the update_download_status response from the adapter
@@ -869,7 +896,7 @@
}
if err := agent.adapterProxy.RevertImageUpdate(ctx, device, img); err != nil {
- log.Debugw("revertImage-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+ log.Debugw("revertImage-error", log.Fields{"id": agent.deviceId, "error": err, "image": img.Name})
return nil, err
}
}
@@ -885,7 +912,7 @@
return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
if resp, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img); err != nil {
- log.Debugw("getImageDownloadStatus-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+ log.Debugw("getImageDownloadStatus-error", log.Fields{"id": agent.deviceId, "error": err, "image": img.Name})
return nil, err
} else {
return resp, nil
@@ -1004,10 +1031,15 @@
}
func (agent *DeviceAgent) packetOut(outPort uint32, packet *ofp.OfpPacketOut) error {
+ // If deviceType=="" then we must have taken ownership of this device.
+ // Fixes VOL-2226 where a core would take ownership and have stale data
+ if agent.deviceType == "" {
+ agent.reconcileWithKVStore()
+ }
// Send packet to adapter
if err := agent.adapterProxy.packetOut(agent.deviceType, agent.deviceId, outPort, packet); err != nil {
log.Debugw("packet-out-error", log.Fields{
- "id": agent.lastData.Id,
+ "id": agent.deviceId,
"error": err,
"packet": hex.EncodeToString(packet.Data),
})
@@ -1335,7 +1367,7 @@
} else {
// First send the request to an Adapter and wait for a response
if err := agent.adapterProxy.SimulateAlarm(ctx, device, simulatereq); err != nil {
- log.Debugw("simulateAlarm-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ log.Debugw("simulateAlarm-error", log.Fields{"id": agent.deviceId, "error": err})
return err
}
}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 2a03dd4..56db2b1 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -159,9 +159,14 @@
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
dMgr.addDeviceAgentToMap(agent)
- agent.start(ctx, false)
+ device, err := agent.start(ctx, device)
+ if err != nil {
+ log.Errorf("Failed to start device")
+ sendResponse(ctx, ch, errors.New("Failed to start device"))
+ return
+ }
- sendResponse(ctx, ch, agent.lastData)
+ sendResponse(ctx, ch, device)
}
func (dMgr *DeviceManager) enableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
@@ -376,7 +381,7 @@
if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
log.Debugw("loading-device-from-Model", log.Fields{"id": device.(*voltha.Device).Id})
agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if err := agent.start(nil, true); err != nil {
+ if _, err := agent.start(nil, nil); err != nil {
log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
agent.stop(nil)
} else {
@@ -435,7 +440,7 @@
if device, err = dMgr.getDeviceFromModel(deviceId); err == nil {
log.Debugw("loading-device", log.Fields{"deviceId": deviceId})
agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
- if err = agent.start(nil, true); err != nil {
+ if _, err = agent.start(nil, nil); err != nil {
log.Warnw("Failure loading device", log.Fields{"deviceId": deviceId, "error": err})
agent.stop(nil)
} else {
@@ -949,7 +954,11 @@
// Create and start a device agent for that device
agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
dMgr.addDeviceAgentToMap(agent)
- agent.start(nil, false)
+ childDevice, err = agent.start(nil, childDevice)
+ if err != nil {
+ log.Error("error-starting-child")
+ return nil, err
+ }
// Since this Core has handled this request then it therefore owns this child device. Set the
// ownership of this device to this Core
@@ -963,7 +972,7 @@
// Publish on the messaging bus that we have discovered new devices
go dMgr.kafkaICProxy.DeviceDiscovered(agent.deviceId, deviceType, parentDeviceId, dMgr.coreInstanceId)
- return agent.lastData, nil
+ return childDevice, nil
}
func (dMgr *DeviceManager) processTransition(previous *voltha.Device, current *voltha.Device) error {