[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
index f9bf7de..e7bb371 100644
--- a/rw_core/core/device/agent_port.go
+++ b/rw_core/core/device/agent_port.go
@@ -19,13 +19,14 @@
import (
"context"
"fmt"
- "github.com/opencord/voltha-protos/v4/go/common"
+
+ "github.com/opencord/voltha-protos/v5/go/common"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/rw_core/core/device/port"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -149,9 +150,10 @@
func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
logger.Debugw(ctx, "addPort", log.Fields{"device-id": agent.deviceID})
var desc string
+ var err error
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
- defer agent.logDeviceUpdate(ctx, "addPort", nil, nil, operStatus, &desc)
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
port.AdminState = voltha.AdminState_ENABLED
@@ -171,6 +173,7 @@
if oldPort.Label != "" || oldPort.Type != voltha.Port_PON_OLT {
logger.Debugw(ctx, "port-already-exists", log.Fields{"port": port})
desc = fmt.Sprintf("port already exists, port : %s", port)
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
return nil
}
@@ -237,97 +240,118 @@
func (agent *Agent) disablePort(ctx context.Context, portID uint32) error {
logger.Debugw(ctx, "disable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+ var err error
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-
- defer agent.logDeviceUpdate(ctx, "disablePort", nil, nil, operStatus, &desc)
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
portHandle, have := agent.portLoader.Lock(portID)
if !have {
- desc = fmt.Sprintf("Invalid argument portID: %v", portID)
- return status.Errorf(codes.InvalidArgument, "%v", portID)
+ err = status.Errorf(codes.InvalidArgument, "%v", portID)
+ return err
}
defer portHandle.Unlock()
oldPort := portHandle.GetReadOnly()
if oldPort.Type != voltha.Port_PON_OLT {
- desc = fmt.Sprintf("Disabling of Port Type %v unimplemented", oldPort.Type)
- return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", oldPort.Type)
+ err = status.Errorf(codes.Unimplemented, "disabling of Port Type %v unimplemented", oldPort.Type)
+ return err
}
newPort := *oldPort
newPort.AdminState = voltha.AdminState_DISABLED
if err := portHandle.Update(ctx, &newPort); err != nil {
- desc = err.Error()
return err
}
//send request to adapter
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
- desc = err.Error()
return err
}
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
- ch, err := agent.adapterProxy.DisablePort(ctx, device, &newPort)
+ // Send the request to the adapter
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
- desc = err.Error()
- cancel()
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
return err
}
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
- go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure, nil)
+ go func() {
+ defer cancel()
+ _, err := client.DisablePort(subCtx, &newPort)
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, true)
+ } else {
+ agent.onFailure(subCtx, err, nil, nil, true)
+ }
+ }()
return nil
}
func (agent *Agent) enablePort(ctx context.Context, portID uint32) error {
logger.Debugw(ctx, "enable-port", log.Fields{"device-id": agent.deviceID, "port-no": portID})
+ var err error
var desc string
operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
-
- defer agent.logDeviceUpdate(ctx, "enablePort", nil, nil, operStatus, &desc)
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
portHandle, have := agent.portLoader.Lock(portID)
if !have {
- desc = fmt.Sprintf("Invalid Argument portID: %v", portID)
- return status.Errorf(codes.InvalidArgument, "%v", portID)
+ err = status.Errorf(codes.InvalidArgument, "%v", portID)
+ return err
}
defer portHandle.Unlock()
oldPort := portHandle.GetReadOnly()
if oldPort.Type != voltha.Port_PON_OLT {
- desc = fmt.Sprintf("Enabling of Port Type %v unimplemented", oldPort.Type)
- return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", oldPort.Type)
+ err = status.Errorf(codes.Unimplemented, "enabling of Port Type %v unimplemented", oldPort.Type)
+ return err
}
newPort := *oldPort
newPort.AdminState = voltha.AdminState_ENABLED
- if err := portHandle.Update(ctx, &newPort); err != nil {
- desc = err.Error()
+ if err = portHandle.Update(ctx, &newPort); err != nil {
return err
}
//send request to adapter
device, err := agent.getDeviceReadOnly(ctx)
if err != nil {
- desc = err.Error()
return err
}
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
- ch, err := agent.adapterProxy.EnablePort(ctx, device, &newPort)
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
- desc = err.Error()
- cancel()
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
return err
}
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
- go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure, nil)
+ go func() {
+ defer cancel()
+ _, err := client.EnablePort(subCtx, &newPort)
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, true)
+ } else {
+ agent.onFailure(subCtx, err, nil, nil, true)
+ }
+ }()
return nil
}