This commit consists of adding two new northbound APIs to the Core
to be used mostly by the Affinity Router: ListDeviceIds retrieves the
ids of devices present in a Core memory; ReconcileDevices ia an API
the Affinity router can use to push a list of device ids to a Core
for the latter to load and reconcile the devices in memory (used
mostly of a core restart).
Change-Id: I0d292054e09a099ad8be7669fbc3fe3ba15a5579
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c42a64e..484ff35 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -60,15 +60,15 @@
// Do not call NewBackend constructor; it creates its own KV client
// Commented the backend for now until the issue between the model and the KV store
// is resolved.
- //backend := model.Backend{
- // MsgClient: kvClient,
- // StoreType: cf.KVStoreType,
- // Host: cf.KVStoreHost,
- // Port: cf.KVStorePort,
- // Timeout: cf.KVStoreTimeout,
- // PathPrefix: "service/voltha"}
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
+ backend := model.Backend{
+ Client: kvClient,
+ StoreType: cf.KVStoreType,
+ Host: cf.KVStoreHost,
+ Port: cf.KVStorePort,
+ Timeout: cf.KVStoreTimeout,
+ PathPrefix: "service/voltha"}
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
return &core
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 9e8710b..13e0adf 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -285,18 +285,15 @@
agent.lockDevice.Unlock()
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
}
- // Send the request to an Adapter and wait for a response
- if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
- log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
- agent.lockDevice.Unlock()
- return err
+ if device.AdminState != voltha.AdminState_PREPROVISIONED {
+ // Send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
+ log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ agent.lockDevice.Unlock()
+ return err
+ }
}
- // Set the device Admin state to DELETED in order to trigger the callback to delete
- // child devices, if any
- // Received an Ack (no error found above). Now update the device in the model to the expected state
- cloned := proto.Clone(device).(*voltha.Device)
- cloned.AdminState = voltha.AdminState_DELETED
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ if removed := agent.clusterDataProxy.Remove("/devices/"+agent.deviceId, ""); removed == nil {
agent.lockDevice.Unlock()
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index e50c035..3f27e10 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -104,6 +104,16 @@
return nil
}
+func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+ for key, _ := range dMgr.deviceAgents {
+ result.Items = append(result.Items, &voltha.ID{Id: key})
+ }
+ return result
+}
+
func (dMgr *DeviceManager) createDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
log.Debugw("createDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
@@ -202,19 +212,46 @@
return result, nil
}
-//func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
-// log.Debug("ListDevices")
-// result := &voltha.Devices{}
-// dMgr.lockDeviceAgentsMap.Lock()
-// defer dMgr.lockDeviceAgentsMap.Unlock()
-// for _, agent := range dMgr.deviceAgents {
-// if device, err := agent.getDevice(); err == nil {
-// //cloned := proto.Clone(device).(*voltha.Device)
-// result.Items = append(result.Items, device)
-// }
-// }
-// return result, nil
-//}
+// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
+func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
+ log.Debug("ListDeviceIDs")
+ // Report only device IDs that are in the device agent map
+ return dMgr.listDeviceIdsFromMap(), nil
+}
+
+//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+func (dMgr *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
+ log.Debug("ReconcileDevices")
+ var res interface{}
+ if ids != nil {
+ toReconcile := len(ids.Items)
+ reconciled := 0
+ for _, id := range ids.Items {
+ // Act on the device only if its not present in the agent map
+ if agent := dMgr.getDeviceAgent(id.Id); agent == nil {
+ // Device Id not in memory
+ log.Debugw("reconciling-device", log.Fields{"id": id.Id})
+ // Load device from model
+ if device := dMgr.clusterDataProxy.Get("/devices/"+id.Id, 0, false, ""); device != nil {
+ agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ dMgr.addDeviceAgentToMap(agent)
+ agent.start(nil)
+ reconciled += 1
+ } else {
+ log.Warnw("device-inexistent", log.Fields{"id": id.Id})
+ }
+ } else {
+ reconciled += 1
+ }
+ }
+ if toReconcile != reconciled {
+ res = status.Errorf(codes.DataLoss, "less-device-reconciled:%d/%d", reconciled, toReconcile)
+ }
+ } else {
+ res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
+ }
+ sendResponse(ctx, ch, res)
+}
func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 9b7fac7..fa6d0ca 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -168,6 +168,29 @@
return handler.deviceMgr.ListDevices()
}
+// ListDeviceIds returns the list of device ids managed by a voltha core
+func (handler *APIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
+ log.Debug("ListDeviceIDs")
+ if isTestMode(ctx) {
+ out := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+ return out, nil
+ }
+ return handler.deviceMgr.ListDeviceIds()
+}
+
+//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+func (handler *APIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
+ log.Debug("ReconcileDevices")
+ if isTestMode(ctx) {
+ out := new(empty.Empty)
+ return out, nil
+ }
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.ReconcileDevices(ctx, ids, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
+}
+
// GetLogicalDevice must be implemented in the read-only containers - should it also be implemented here?
func (handler *APIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
log.Debugw("GetLogicalDevice-request", log.Fields{"id": id})