[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index e71af8a..0fda20e 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -18,20 +18,28 @@
import (
"context"
+ "errors"
+ "time"
- "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
- "github.com/opencord/voltha-protos/v4/go/common"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+
+ "github.com/opencord/voltha-protos/v5/go/common"
"github.com/gogo/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (agent *Agent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ var err error
+ var desc string
+ var prevAdminState, currAdminState common.AdminState_Types
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, operStatus, err, desc) }()
+
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
@@ -39,23 +47,26 @@
if agent.device.Root {
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
+ err = status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
"not supported by VOLTHA. Use Device Manager or other means", agent.deviceID)
+ return nil, err
}
device := agent.cloneDeviceWithoutLock()
if !agent.proceedWithRequest(device) {
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.",
- agent.deviceID)
+ err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ return nil, err
}
if device.ImageDownloads != nil {
for _, image := range device.ImageDownloads {
if image.DownloadState == voltha.ImageDownload_DOWNLOAD_REQUESTED {
- return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, already downloading image:%s",
+ err = status.Errorf(codes.FailedPrecondition, "device-id:%s, already downloading image:%s",
agent.deviceID, image.Name)
+ agent.requestQueue.RequestComplete()
+ return nil, err
}
}
}
@@ -71,22 +82,39 @@
cloned.ImageDownloads[index] = clonedImg
}
- cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
- return nil, err
- }
-
// Send the request to the adapter
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
- ch, err := agent.adapterProxy.DownloadImage(subCtx, cloned, clonedImg)
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
- cancel()
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return nil, err
}
- go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onImageSuccess, agent.onImageFailure)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ response, err := client.DownloadImage(subCtx, &ic.ImageDownloadMessage{
+ Device: cloned,
+ Image: clonedImg,
+ })
+ if err == nil {
+ agent.onImageSuccess(subCtx, response)
+ } else {
+ agent.onImageFailure(subCtx, err)
+ }
+ }()
+ cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
+ if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+ return nil, err
+ }
return &common.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
@@ -102,7 +130,14 @@
}
func (agent *Agent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+
+ var err error
+ var desc string
+ var prevAdminState, currAdminState common.AdminState_Types
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, &prevAdminState, &currAdminState, operStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
logger.Debugw(ctx, "cancel-image-download", log.Fields{"device-id": agent.deviceID})
@@ -111,8 +146,8 @@
if !agent.proceedWithRequest(cloned) {
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.",
- agent.deviceID)
+ err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ return nil, err
}
// Update image download state
@@ -121,33 +156,59 @@
agent.requestQueue.RequestComplete()
return nil, err
}
-
+ prevAdminState = cloned.AdminState
cloned.ImageDownloads[index].DownloadState = voltha.ImageDownload_DOWNLOAD_CANCELLED
if cloned.AdminState != voltha.AdminState_DOWNLOADING_IMAGE {
+ err = status.Errorf(codes.Aborted, "device not in image download state %s", cloned.Id)
agent.requestQueue.RequestComplete()
- } else {
- // Set the device to Enabled
- cloned.AdminState = voltha.AdminState_ENABLED
- if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
- return nil, err
- }
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
- ch, err := agent.adapterProxy.CancelImageDownload(subCtx, cloned, img)
- if err != nil {
- cancel()
- return nil, err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "cancelImageDownload", ch, agent.onImageSuccess,
- agent.onImageFailure)
+ return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}, err
}
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ response, err := client.CancelImageDownload(subCtx, &ic.ImageDownloadMessage{
+ Device: cloned,
+ Image: img,
+ })
+ if err == nil {
+ agent.onImageSuccess(subCtx, response)
+ } else {
+ agent.onImageFailure(subCtx, err)
+ }
+ }()
+
+ // Set the device to Enabled
+ cloned.AdminState = voltha.AdminState_ENABLED
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+ return nil, err
+ }
+ currAdminState = cloned.AdminState
+
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
func (agent *Agent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ var err error
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
logger.Debugw(ctx, "activate-image", log.Fields{"device-id": agent.deviceID})
@@ -155,9 +216,9 @@
cloned := agent.cloneDeviceWithoutLock()
if !agent.proceedWithRequest(cloned) {
+ err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconcile is in progress/failed.",
- agent.deviceID)
+ return nil, err
}
// Update image download state
@@ -169,72 +230,125 @@
if image.DownloadState != voltha.ImageDownload_DOWNLOAD_SUCCEEDED {
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-has-not-downloaded-image:%s", agent.deviceID, img.Name)
+ err = status.Errorf(codes.FailedPrecondition, "device-id:%s, device-has-not-downloaded-image:%s", agent.deviceID, img.Name)
+ return nil, err
}
//TODO does this need to be removed ?
if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
+ err = status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
+ return nil, err
}
- // Save the image
+ // Update the image
cloned.ImageDownloads[index].ImageState = voltha.ImageDownload_IMAGE_ACTIVATING
-
cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
+
+ // Send the request to the adapter
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ response, err := client.ActivateImageUpdate(subCtx, &ic.ImageDownloadMessage{
+ Device: cloned,
+ Image: img,
+ })
+ if err == nil {
+ agent.onImageSuccess(subCtx, response)
+ } else {
+ agent.onImageFailure(subCtx, err)
+ }
+ }()
+
+ // Save the image
if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return nil, err
}
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
- ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, cloned, img)
- if err != nil {
- cancel()
- return nil, err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "activateImageUpdate", ch, agent.onImageSuccess, agent.onFailure)
-
// The status of the AdminState will be changed following the update_download_status response from the adapter
// The image name will also be removed from the device list
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
func (agent *Agent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ var err error
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
logger.Debugw(ctx, "revert-image", log.Fields{"device-id": agent.deviceID})
- // Update image download state
cloned := agent.cloneDeviceWithoutLock()
+ if !agent.proceedWithRequest(cloned) {
+ agent.requestQueue.RequestComplete()
+ err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ return nil, err
+ }
+
+ // Update image download state
_, index, err := getImage(img, cloned)
if err != nil {
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+ err = status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+ return nil, err
}
if cloned.AdminState != voltha.AdminState_ENABLED {
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-not-enabled-state:%s", agent.deviceID, img.Name)
+ err = status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-not-enabled-state:%s", agent.deviceID, img.Name)
+ return nil, err
}
cloned.ImageDownloads[index].ImageState = voltha.ImageDownload_IMAGE_REVERTING
+ // Send the request to the adapter
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
+ go func() {
+ defer cancel()
+ _, err := client.RevertImageUpdate(subCtx, &ic.ImageDownloadMessage{
+ Device: cloned,
+ Image: img,
+ })
+ if err == nil {
+ agent.onSuccess(subCtx, nil, nil, true)
+ } else {
+ agent.onFailure(subCtx, err, nil, nil, true)
+ }
+ }()
+ // Save data
if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return nil, err
}
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
-
- ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, cloned, img)
- if err != nil {
- cancel()
- return nil, err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "revertImageUpdate", ch, agent.onSuccess, agent.onFailure)
-
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
@@ -244,40 +358,49 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
+ defer agent.requestQueue.RequestComplete()
+
device := agent.getDeviceReadOnlyWithoutLock()
- ch, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img)
- agent.requestQueue.RequestComplete()
+ if !agent.proceedWithRequest(device) {
+ return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ }
+
+ // Send the request to the adapter
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": device.AdapterEndpoint,
+ })
return nil, err
}
- // Wait for the adapter response
- rpcResponse, ok := <-ch
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
- }
- if rpcResponse.Err != nil {
- return nil, rpcResponse.Err
- }
- // Successful response
- imgDownload := &voltha.ImageDownload{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, imgDownload); err != nil {
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return imgDownload, nil
+ return client.GetImageDownloadStatus(ctx, &ic.ImageDownloadMessage{
+ Device: device,
+ Image: img,
+ })
}
func (agent *Agent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ var err error
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
+
logger.Debugw(ctx, "updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
cloned := agent.cloneDeviceWithoutLock()
if !agent.proceedWithRequest(cloned) {
agent.requestQueue.RequestComplete()
- return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.",
- agent.deviceID)
+ err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ return err
}
// Update the image as well as remove it if the download was cancelled
@@ -326,14 +449,25 @@
}
// onImageFailure brings back the device to Enabled state and sets the image to image download_failed.
-func (agent *Agent) onImageFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
+func (agent *Agent) onImageFailure(ctx context.Context, imgErr error) {
// original context has failed due to timeout , let's open a new one
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.internalTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
defer cancel()
+ rpc := coreutils.GetRPCMetadataFromContext(subCtx)
+
+ defer func() {
+ eCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
+ rpce := agent.deviceMgr.NewRPCEvent(eCtx, agent.deviceID, imgErr.Error(), nil)
+ go agent.deviceMgr.SendRPCEvent(eCtx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ desc := "adapter-response"
+ agent.logDeviceUpdate(ctx, nil, nil, operStatus, imgErr, desc)
+ }()
if err := agent.requestQueue.WaitForGreenLight(subCtx); err != nil {
- logger.Errorw(subCtx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
+ logger.Errorw(subCtx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err})
return
}
@@ -345,8 +479,8 @@
log.Fields{"rpc": rpc, "device-id": agent.deviceID})
return
}
- if res, ok := response.(error); ok {
- logger.Errorw(subCtx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
+ if imgErr != nil {
+ logger.Errorw(subCtx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": imgErr})
cloned := agent.cloneDeviceWithoutLock()
//TODO base this on IMAGE ID when created
var imageFailed *voltha.ImageDownload
@@ -362,7 +496,7 @@
}
if imageFailed == nil {
- logger.Errorw(subCtx, "can't find image", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+ logger.Errorw(subCtx, "can't find image", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
return
}
@@ -376,19 +510,24 @@
cloned.AdminState = voltha.AdminState_ENABLED
if err := agent.updateDeviceAndReleaseLock(subCtx, cloned); err != nil {
logger.Errorw(subCtx, "failed-enable-device-after-image-failure",
- log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
+ log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err})
}
} else {
- logger.Errorw(subCtx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+ logger.Errorw(subCtx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
return
}
- // TODO: Post failure message onto kafka
}
// onImageSuccess brings back the device to Enabled state and sets the image to image download_failed.
-func (agent *Agent) onImageSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- logger.Errorw(ctx, "cannot-obtain-lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
+func (agent *Agent) onImageSuccess(ctx context.Context, response interface{}) {
+ rpc := coreutils.GetRPCMetadataFromContext(ctx)
+
+ var err error
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ defer func() { agent.logDeviceUpdate(ctx, nil, nil, operStatus, err, desc) }()
+
+ if err = agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return
}
@@ -396,12 +535,10 @@
if !agent.proceedWithRequest(cloned) {
agent.requestQueue.RequestComplete()
- logger.Errorw(ctx, "Cannot complete operation as Device deletion is in progress or reconciling is in progress/failed.",
- log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+ err = status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
return
}
- logger.Infow(ctx, "rpc-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response, "args": reqArgs})
-
+ logger.Infow(ctx, "rpc-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response})
//TODO base this on IMAGE ID when created
var imageSucceeded *voltha.ImageDownload
var index int
@@ -416,7 +553,7 @@
}
if imageSucceeded == nil {
- logger.Errorw(ctx, "can't find image", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+ err = errors.New("can't find image")
return
}
//update image state on success
@@ -427,11 +564,14 @@
}
//Enabled is the only state we can go back to.
cloned.AdminState = voltha.AdminState_ENABLED
- if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+ if err = agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
logger.Errorw(ctx, "failed-enable-device-after-image-download-success",
- log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response, "args": reqArgs})
+ log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response})
}
-
+ // Update operation status
+ if err == nil {
+ operStatus.Code = common.OperationResp_OPERATION_SUCCESS
+ }
}
func (agent *Agent) downloadImageToDevice(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) {
@@ -447,19 +587,30 @@
}
cloned := agent.cloneDeviceWithoutLock()
+ if !agent.proceedWithRequest(cloned) {
+ agent.requestQueue.RequestComplete()
+ return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ }
// Send the request to the adapter
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
defer cancel()
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
- ch, err := agent.adapterProxy.DownloadImageToOnuDevice(subCtx, cloned, request)
agent.requestQueue.RequestComplete()
- if err != nil {
- return nil, err
- }
-
- return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+ return client.DownloadOnuImage(subCtx, request)
}
func (agent *Agent) getImageStatus(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
@@ -467,21 +618,33 @@
return nil, err
}
- cloned := agent.cloneDeviceWithoutLock()
-
- // Send the request to the adapter
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- defer cancel()
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
logger.Debugw(ctx, "get-image-status", log.Fields{"device-id": agent.deviceID})
- ch, err := agent.adapterProxy.GetOnuImageStatus(subCtx, cloned, request)
- agent.requestQueue.RequestComplete()
+ cloned := agent.cloneDeviceWithoutLock()
+ if !agent.proceedWithRequest(cloned) {
+ agent.requestQueue.RequestComplete()
+ return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ }
+
+ // Send the request to the adapter
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return nil, err
}
- return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+ return client.GetOnuImageStatus(subCtx, request)
}
func (agent *Agent) activateImageOnDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
@@ -489,21 +652,35 @@
return nil, err
}
- cloned := agent.cloneDeviceWithoutLock()
-
- // Send the request to the adapter
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- defer cancel()
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
logger.Debugw(ctx, "activate-image-on-device", log.Fields{"device-id": agent.deviceID})
- ch, err := agent.adapterProxy.ActivateOnuImage(subCtx, cloned, request)
- agent.requestQueue.RequestComplete()
+ cloned := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(cloned) {
+ agent.requestQueue.RequestComplete()
+ return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ }
+
+ // Send the request to the adapter
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return nil, err
}
- return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+ agent.requestQueue.RequestComplete()
+ return client.ActivateOnuImage(subCtx, request)
}
func (agent *Agent) abortImageUpgradeToDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
@@ -511,21 +688,35 @@
return nil, err
}
- cloned := agent.cloneDeviceWithoutLock()
-
- // Send the request to the adapter
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- defer cancel()
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
logger.Debugw(ctx, "abort-image-on-device", log.Fields{"device-id": agent.deviceID})
- ch, err := agent.adapterProxy.AbortImageUpgrade(subCtx, cloned, request)
- agent.requestQueue.RequestComplete()
- if err != nil {
- return nil, err
+ cloned := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(cloned) {
+ agent.requestQueue.RequestComplete()
+ return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
}
- return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+ // Send the request to the adapter
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+ agent.requestQueue.RequestComplete()
+
+ return client.AbortOnuImageUpgrade(subCtx, request)
}
func (agent *Agent) commitImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
@@ -533,21 +724,33 @@
return nil, err
}
- cloned := agent.cloneDeviceWithoutLock()
-
- // Send the request to the adapter
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- defer cancel()
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
logger.Debugw(ctx, "commit-image-on-device", log.Fields{"device-id": agent.deviceID})
- ch, err := agent.adapterProxy.CommitImage(subCtx, cloned, request)
- agent.requestQueue.RequestComplete()
- if err != nil {
- return nil, err
+ cloned := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(cloned) {
+ agent.requestQueue.RequestComplete()
+ return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
}
- return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+ // Send the request to the adapter
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
+ return nil, err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.rpcTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+ return client.CommitOnuImage(subCtx, request)
}
func (agent *Agent) getOnuImages(ctx context.Context, id *common.ID) (*voltha.OnuImages, error) {
@@ -555,63 +758,29 @@
return nil, err
}
- cloned := agent.cloneDeviceWithoutLock()
-
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- defer cancel()
- subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
logger.Debug(ctx, "get-onu-images")
+ cloned := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(cloned) {
+ agent.requestQueue.RequestComplete()
+ return nil, status.Errorf(codes.FailedPrecondition, "%s", "cannot complete operation as device deletion is in progress or reconciling is in progress/failed")
+ }
+
// Send the request to the adapter
- ch, err := agent.adapterProxy.GetOnuImages(subCtx, cloned, id)
- agent.requestQueue.RequestComplete()
+ client, err := agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
if err != nil {
+ logger.Errorw(ctx, "grpc-client-nil",
+ log.Fields{
+ "error": err,
+ "device-id": agent.deviceID,
+ "device-type": agent.deviceType,
+ "adapter-endpoint": cloned.AdapterEndpoint,
+ })
+ agent.requestQueue.RequestComplete()
return nil, err
}
- //wait for adapter response
- select {
- case rpcResponse, ok := <-ch:
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
- } else if rpcResponse.Err != nil {
- // return error
- return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
- } else {
- resp := &voltha.OnuImages{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
- return nil, status.Errorf(codes.Internal, "%s", err.Error())
- }
-
- return resp, nil
- }
- case <-ctx.Done():
- return nil, ctx.Err()
- }
-}
-
-func (agent *Agent) getDeviceImageResponseFromAdapter(ctx context.Context, ch chan *kafka.RpcResponse) (*voltha.DeviceImageResponse, error) {
- //wait for adapter response
- select {
- case rpcResponse, ok := <-ch:
- if !ok {
- return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
- } else if rpcResponse.Err != nil {
- // return error
- return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
- } else {
- resp := &voltha.DeviceImageResponse{}
- if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
- return nil, status.Errorf(codes.Internal, "%s", err.Error())
- }
-
- if len(resp.DeviceImageStates) == 0 || resp.DeviceImageStates[0] == nil {
- return nil, status.Errorf(codes.Internal, "invalid response from adapter")
- }
-
- return resp, nil
- }
- case <-ctx.Done():
- return nil, ctx.Err()
- }
+ agent.requestQueue.RequestComplete()
+ return client.GetOnuImages(ctx, id)
}