VOL-3874: DevicePmConfig has to updated to DB in the rw-core only on success response from adapter
- also revert to using github.com/golang/protobuf version 1.3.2
Change-Id: I149c48a61a8858169bd0903e6092376796dae212
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
index d726115..3e13772 100644
--- a/rw_core/core/device/agent_pm_config.go
+++ b/rw_core/core/device/agent_pm_config.go
@@ -18,38 +18,74 @@
import (
"context"
-
+ "fmt"
"github.com/gogo/protobuf/proto"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "time"
)
func (agent *Agent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
logger.Debugw(ctx, "update-pm-configs", log.Fields{"device-id": pmConfigs.Id})
cloned := agent.cloneDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
- // Store the device
- if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
- return err
- }
+
// Send the request to the adapter
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
- ch, err := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
- if err != nil {
- cancel()
- return err
+ ch, pmErr := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
+ if pmErr != nil {
+ return pmErr
}
- go agent.waitForAdapterResponse(subCtx, cancel, "updatePmConfigs", ch, agent.onSuccess, agent.onFailure)
- return nil
+
+ var rpce *voltha.RPCEvent
+ defer func() {
+ if rpce != nil {
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ }
+ }()
+ // We need to send the response for the PM Config Updates in a synchronous manner to the caller.
+ select {
+ case rpcResponse, ok := <-ch:
+ if !ok {
+ pmErr = fmt.Errorf("response-channel-closed")
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, pmErr.Error(), nil)
+ } else if rpcResponse.Err != nil {
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
+ pmErr = rpcResponse.Err
+ }
+ case <-ctx.Done():
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
+ pmErr = ctx.Err()
+ }
+
+ // In case of no error for PM Config update, commit the new PM Config to DB.
+ if pmErr == nil {
+ // acquire lock for update the device to DB
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ // the Device properties might have changed due to other concurrent transactions on the device, so get latest copy
+ cloned = agent.cloneDeviceWithoutLock()
+ // commit new pm config
+ cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+
+ // Store back the device to DB and release lock
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+ logger.Errorw(ctx, "error-updating-device-context-to-db", log.Fields{"device-id": agent.deviceID})
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ return err
+ }
+ }
+
+ return pmErr
}
func (agent *Agent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {