[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
index 176a126..ba1005a 100644
--- a/rw_core/core/device/agent_pm_config.go
+++ b/rw_core/core/device/agent_pm_config.go
@@ -18,79 +18,73 @@
import (
"context"
- "fmt"
+ "time"
+
"github.com/gogo/protobuf/proto"
- coreutils "github.com/opencord/voltha-go/rw_core/utils"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "time"
)
func (agent *Agent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
logger.Debugw(ctx, "update-pm-configs", log.Fields{"device-id": pmConfigs.Id})
- cloned := agent.cloneDeviceWithoutLock()
+ var rpce *voltha.RPCEvent
+ defer func() {
+ if rpce != nil {
+ go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
+ }
+ }()
+
+ cloned, err := agent.getDeviceReadOnly(ctx)
+ if err != nil {
+ return err
+ }
if !agent.proceedWithRequest(cloned) {
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", cloned.Id)
}
- cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+ // We need to send the response for the PM Config Updates in a synchronous manner to the adapter.
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": agent.adapterEndpoint,
+ })
+ return err
+ }
+ _, pmErr := client.UpdatePmConfig(ctx, &ic.PmConfigsInfo{
+ DeviceId: agent.deviceID,
+ PmConfigs: pmConfigs,
+ })
- // Send the request to the adapter
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- defer cancel()
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
- ch, pmErr := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
if pmErr != nil {
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, pmErr.Error(), nil)
return pmErr
}
-
- var rpce *voltha.RPCEvent
- defer func() {
- if rpce != nil {
- agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
- voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
- }
- }()
- // We need to send the response for the PM Config Updates in a synchronous manner to the caller.
- select {
- case rpcResponse, ok := <-ch:
- if !ok {
- pmErr = fmt.Errorf("response-channel-closed")
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, pmErr.Error(), nil)
- } else if rpcResponse.Err != nil {
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
- pmErr = rpcResponse.Err
- }
- case <-ctx.Done():
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
- pmErr = ctx.Err()
- }
-
// In case of no error for PM Config update, commit the new PM Config to DB.
- if pmErr == nil {
- // acquire lock for update the device to DB
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- // the Device properties might have changed due to other concurrent transactions on the device, so get latest copy
- cloned = agent.cloneDeviceWithoutLock()
- // commit new pm config
- cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
-
- // Store back the device to DB and release lock
- if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
- logger.Errorw(ctx, "error-updating-device-context-to-db", log.Fields{"device-id": agent.deviceID})
- rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
- return err
- }
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
}
+ // the Device properties might have changed due to other concurrent transactions on the device, so get latest copy
+ cloned = agent.cloneDeviceWithoutLock()
+ // commit new pm config
+ cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
- return pmErr
+ // Store back the device to DB and release lock
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+ logger.Errorw(ctx, "error-updating-device-context-to-db", log.Fields{"device-id": agent.deviceID})
+ rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
+ return err
+ }
+ return nil
}
func (agent *Agent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {