[VOL-4022] RW-Core Changes For ONU SW Upgrade
New Download/Activate/Retrieve APIs
Change-Id: I2d8a0ec7d8967fd76a261a108f743e75f84c98e9
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index 9816927..3781540 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -18,6 +18,8 @@
import (
"context"
+
+ "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-protos/v4/go/common"
"github.com/gogo/protobuf/proto"
@@ -387,3 +389,185 @@
}
}
+
+func (agent *Agent) downloadImageToDevice(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ logger.Debugw(ctx, "download-image-to-device", log.Fields{"device-id": agent.deviceID})
+ if agent.device.Root {
+ agent.requestQueue.RequestComplete()
+ return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
+ "not supported by VOLTHA. Use Device Manager or other means", agent.deviceID)
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+ ch, err := agent.adapterProxy.DownloadImageToOnuDevice(subCtx, cloned, request)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) getImageStatus(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ logger.Debugw(ctx, "get-image-status", log.Fields{"device-id": agent.deviceID})
+
+ ch, err := agent.adapterProxy.GetOnuImageStatus(subCtx, cloned, request)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+}
+
+func (agent *Agent) activateImageOnDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ logger.Debugw(ctx, "activate-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+ ch, err := agent.adapterProxy.ActivateOnuImage(subCtx, cloned, request)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+}
+
+func (agent *Agent) abortImageUpgradeToDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ logger.Debugw(ctx, "abort-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+ ch, err := agent.adapterProxy.AbortImageUpgrade(subCtx, cloned, request)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) commitImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ logger.Debugw(ctx, "commit-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+ ch, err := agent.adapterProxy.CommitImage(subCtx, cloned, request)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) getOnuImages(ctx context.Context, id *common.ID) (*voltha.OnuImages, error) {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return nil, err
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ defer cancel()
+ subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+ logger.Debugw(ctx, "get-onu-images", log.Fields{"device-id": agent.deviceID})
+
+ // Send the request to the adapter
+ ch, err := agent.adapterProxy.GetOnuImages(subCtx, cloned, id)
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ return nil, err
+ }
+
+ //wait for adapter response
+ select {
+ case rpcResponse, ok := <-ch:
+ if !ok {
+ return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+ } else if rpcResponse.Err != nil {
+ // return error
+ return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
+ } else {
+ resp := &voltha.OnuImages{}
+ if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
+ return nil, status.Errorf(codes.Internal, "%s", err.Error())
+ }
+
+ return resp, nil
+ }
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+}
+
+func (agent *Agent) getDeviceImageResponseFromAdapter(ctx context.Context, ch chan *kafka.RpcResponse) (*voltha.DeviceImageResponse, error) {
+ //wait for adapter response
+ select {
+ case rpcResponse, ok := <-ch:
+ if !ok {
+ return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+ } else if rpcResponse.Err != nil {
+ // return error
+ return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
+ } else {
+ resp := &voltha.DeviceImageResponse{}
+ if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
+ return nil, status.Errorf(codes.Internal, "%s", err.Error())
+ }
+
+ if len(resp.DeviceImageStates) == 0 || resp.DeviceImageStates[0] == nil {
+ return nil, status.Errorf(codes.Internal, "invalid response from adapter")
+ }
+
+ return resp, nil
+ }
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+}