VOL-2868 Model simplification/removal.
Reduced the model to its most commonly used functions. (Removed unused logic & test cases.)
Reworked remaining functions to be more intuitive to use, and to more closely follow golang conventions.
Change-Id: Ibbb267ff37e039b73489b4379aa2654208614d5b
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 4c2b9f6..69391fb 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -19,6 +19,7 @@
import (
"context"
"encoding/hex"
+ "errors"
"fmt"
"github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -100,21 +101,17 @@
var device *voltha.Device
if deviceToCreate == nil {
// Load the existing device
- loadedDevice, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
+ device := &voltha.Device{}
+ have, err := agent.clusterDataProxy.Get(ctx, "devices/"+agent.deviceID, device)
if err != nil {
return nil, err
+ } else if !have {
+ return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
}
- if loadedDevice != nil {
- var ok bool
- if device, ok = loadedDevice.(*voltha.Device); ok {
- agent.deviceType = device.Adapter
- agent.device = proto.Clone(device).(*voltha.Device)
- } else {
- return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
- }
- } else {
- return nil, status.Errorf(codes.NotFound, "device-%s-loading-failed", agent.deviceID)
- }
+
+ agent.deviceType = device.Adapter
+ agent.device = proto.Clone(device).(*voltha.Device)
+
logger.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
} else {
// Create a new device
@@ -133,12 +130,8 @@
}
// Add the initial device to the local model
- added, err := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceID, device, "")
- if err != nil {
- return nil, err
- }
- if added == nil {
- return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceID)
+ if err := agent.clusterDataProxy.AddWithID(ctx, "devices", agent.deviceID, device); err != nil {
+ return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
}
agent.device = device
}
@@ -163,13 +156,9 @@
logger.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
// Remove the device from the KV store
- removed, err := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, "")
- if err != nil {
+ if err := agent.clusterDataProxy.Remove(ctx, "devices/"+agent.deviceID); err != nil {
return err
}
- if removed == nil {
- logger.Debugw("device-already-removed", log.Fields{"device-id": agent.deviceID})
- }
close(agent.exitChannel)
@@ -189,18 +178,17 @@
defer agent.requestQueue.RequestComplete()
logger.Debug("reconciling-device-agent-devicetype")
// TODO: context timeout
- device, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
- if err != nil {
+ device := &voltha.Device{}
+ if have, err := agent.clusterDataProxy.Get(ctx, "devices/"+agent.deviceID, device); err != nil {
logger.Errorw("kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
return
+ } else if !have {
+ return // not found in kv
}
- if device != nil {
- if d, ok := device.(*voltha.Device); ok {
- agent.deviceType = d.Adapter
- agent.device = proto.Clone(d).(*voltha.Device)
- logger.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
- }
- }
+
+ agent.deviceType = device.Adapter
+ agent.device = device
+ logger.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
}
// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
@@ -1527,13 +1515,13 @@
//This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
// It is an internal helper function.
func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, device, strict, txid)
- if err != nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
+ if agent.stopped {
+ return errors.New("device agent stopped")
}
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
+
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if err := agent.clusterDataProxy.Update(updateCtx, "devices/"+agent.deviceID, device); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
logger.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})