This update provides the meat for the image download feature
within the Core.   Minimal testing was done with the CLI as the
adapters have not implemented this feature so far.

Change-Id: I771340876d9aa1f368642cd44a433ced3df52673
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index b26b161..d21cfdb 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -266,29 +266,125 @@
 	return nil, nil
 }
 
-func (ap *AdapterProxy) DownloadImage(device voltha.Device, download voltha.ImageDownload) error {
-	log.Debug("DownloadImage")
-	return nil
+func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
+	log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
+	rpc := "download_image"
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "request",
+		Value: download,
+	}
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
+
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
-func (ap *AdapterProxy) GetImageDownloadStatus(device voltha.Device, download voltha.ImageDownload) error {
-	log.Debug("GetImageDownloadStatus")
-	return nil
+func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+	log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
+	rpc := "get_image_download_status"
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "request",
+		Value: download,
+	}
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
+
+	if success {
+		unpackResult := &voltha.ImageDownload{}
+		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+		}
+		return unpackResult, nil
+	} else {
+		unpackResult := &ic.Error{}
+		var err error
+		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+			return nil, err
+		}
+		log.Debugw("GetImageDownloadStatus-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
+		return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+	}
 }
 
-func (ap *AdapterProxy) CancelImageDownload(device voltha.Device, download voltha.ImageDownload) error {
-	log.Debug("CancelImageDownload")
-	return nil
+func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
+	log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
+	rpc := "cancel_image_download"
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "request",
+		Value: download,
+	}
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
+
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
-func (ap *AdapterProxy) ActivateImageUpdate(device voltha.Device, download voltha.ImageDownload) error {
-	log.Debug("ActivateImageUpdate")
-	return nil
+func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
+	log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
+	rpc := "activate_image_update"
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "request",
+		Value: download,
+	}
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
+
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
-func (ap *AdapterProxy) RevertImageUpdate(device voltha.Device, download voltha.ImageDownload) error {
-	log.Debug("RevertImageUpdate")
-	return nil
+func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
+	log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
+	rpc := "revert_image_update"
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "request",
+		Value: download,
+	}
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+	log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
+
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {