[VOL-4022] RW-Core Changes For ONU SW Upgrade
New Download/Activate/Retrieve APIs

Change-Id: I2d8a0ec7d8967fd76a261a108f743e75f84c98e9
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index 9816927..3781540 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -18,6 +18,8 @@
 
 import (
 	"context"
+
+	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-protos/v4/go/common"
 
 	"github.com/gogo/protobuf/proto"
@@ -387,3 +389,185 @@
 	}
 
 }
+
+func (agent *Agent) downloadImageToDevice(ctx context.Context, request *voltha.DeviceImageDownloadRequest) (*voltha.DeviceImageResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	logger.Debugw(ctx, "download-image-to-device", log.Fields{"device-id": agent.deviceID})
+	if agent.device.Root {
+		agent.requestQueue.RequestComplete()
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
+			"not supported by VOLTHA. Use Device Manager or other means", agent.deviceID)
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+
+	ch, err := agent.adapterProxy.DownloadImageToOnuDevice(subCtx, cloned, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) getImageStatus(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	logger.Debugw(ctx, "get-image-status", log.Fields{"device-id": agent.deviceID})
+
+	ch, err := agent.adapterProxy.GetOnuImageStatus(subCtx, cloned, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+}
+
+func (agent *Agent) activateImageOnDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	logger.Debugw(ctx, "activate-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+	ch, err := agent.adapterProxy.ActivateOnuImage(subCtx, cloned, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	return agent.getDeviceImageResponseFromAdapter(subCtx, ch)
+}
+
+func (agent *Agent) abortImageUpgradeToDevice(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	logger.Debugw(ctx, "abort-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+	ch, err := agent.adapterProxy.AbortImageUpgrade(subCtx, cloned, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) commitImage(ctx context.Context, request *voltha.DeviceImageRequest) (*voltha.DeviceImageResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	logger.Debugw(ctx, "commit-image-on-device", log.Fields{"device-id": agent.deviceID})
+
+	ch, err := agent.adapterProxy.CommitImage(subCtx, cloned, request)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	return agent.getDeviceImageResponseFromAdapter(ctx, ch)
+}
+
+func (agent *Agent) getOnuImages(ctx context.Context, id *common.ID) (*voltha.OnuImages, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
+
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+	defer cancel()
+	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
+	logger.Debugw(ctx, "get-onu-images", log.Fields{"device-id": agent.deviceID})
+
+	// Send the request to the adapter
+	ch, err := agent.adapterProxy.GetOnuImages(subCtx, cloned, id)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	//wait for adapter response
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+		} else if rpcResponse.Err != nil {
+			// return error
+			return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
+		} else {
+			resp := &voltha.OnuImages{}
+			if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
+				return nil, status.Errorf(codes.Internal, "%s", err.Error())
+			}
+
+			return resp, nil
+		}
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	}
+}
+
+func (agent *Agent) getDeviceImageResponseFromAdapter(ctx context.Context, ch chan *kafka.RpcResponse) (*voltha.DeviceImageResponse, error) {
+	//wait for adapter response
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+		} else if rpcResponse.Err != nil {
+			// return error
+			return nil, status.Errorf(codes.Internal, "%s", rpcResponse.Err.Error())
+		} else {
+			resp := &voltha.DeviceImageResponse{}
+			if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
+				return nil, status.Errorf(codes.Internal, "%s", err.Error())
+			}
+
+			if len(resp.DeviceImageStates) == 0 || resp.DeviceImageStates[0] == nil {
+				return nil, status.Errorf(codes.Internal, "invalid response from adapter")
+			}
+
+			return resp, nil
+		}
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	}
+}