This update provides the meat for the image download feature
within the Core. Minimal testing was done with the CLI as the
adapters have not implemented this feature so far.
Change-Id: I771340876d9aa1f368642cd44a433ced3df52673
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index ec04618..3f8465c 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -203,22 +203,22 @@
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Download_image(args []*ic.Argument) (*empty.Empty, error) {
- return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Download_image(args []*ic.Argument) (*voltha.ImageDownload, error) {
+ return &voltha.ImageDownload{}, nil
}
-func (rhp *RequestHandlerProxy) Get_image_download_status(args []*ic.Argument) (*empty.Empty, error) {
- return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Get_image_download_status(args []*ic.Argument) (*voltha.ImageDownload, error) {
+ return &voltha.ImageDownload{}, nil
}
-func (rhp *RequestHandlerProxy) Cancel_image_download(args []*ic.Argument) (*empty.Empty, error) {
- return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Cancel_image_download(args []*ic.Argument) (*voltha.ImageDownload, error) {
+ return &voltha.ImageDownload{}, nil
}
-func (rhp *RequestHandlerProxy) Activate_image_update(args []*ic.Argument) (*empty.Empty, error) {
- return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Activate_image_update(args []*ic.Argument) (*voltha.ImageDownload, error) {
+ return &voltha.ImageDownload{}, nil
}
-func (rhp *RequestHandlerProxy) Revert_image_update(args []*ic.Argument) (*empty.Empty, error) {
- return new(empty.Empty), nil
+func (rhp *RequestHandlerProxy) Revert_image_update(args []*ic.Argument) (*voltha.ImageDownload, error) {
+ return &voltha.ImageDownload{}, nil
}
diff --git a/adapters/iAdapter.go b/adapters/iAdapter.go
index f8de35e..d0463f1 100644
--- a/adapters/iAdapter.go
+++ b/adapters/iAdapter.go
@@ -44,9 +44,9 @@
Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error)
Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error)
Process_inter_adapter_message(msg *ic.InterAdapterMessage) error
- Download_image(device *voltha.Device, request *voltha.ImageDownload) error
- Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) error
- Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) error
- Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) error
- Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) error
+ Download_image(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
}
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index bb6883a..aa6e248 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -222,22 +222,22 @@
return errors.New("UnImplemented")
}
-func (so *SimulatedOLT) Download_image(device *voltha.Device, request *voltha.ImageDownload) error {
- return errors.New("UnImplemented")
+func (so *SimulatedOLT) Download_image(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
}
-func (so *SimulatedOLT) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) error {
- return errors.New("UnImplemented")
+func (so *SimulatedOLT) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
}
-func (so *SimulatedOLT) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) error {
- return errors.New("UnImplemented")
+func (so *SimulatedOLT) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
}
-func (so *SimulatedOLT) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
- return errors.New("UnImplemented")
+func (so *SimulatedOLT) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
}
-func (so *SimulatedOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
- return errors.New("UnImplemented")
+func (so *SimulatedOLT) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index 0a8efc6..1aefcad 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -216,22 +216,22 @@
return errors.New("UnImplemented")
}
-func (so *SimulatedONU) Download_image(device *voltha.Device, request *voltha.ImageDownload) error {
- return errors.New("UnImplemented")
+func (so *SimulatedONU) Download_image(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
}
-func (so *SimulatedONU) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) error {
- return errors.New("UnImplemented")
+func (so *SimulatedONU) Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
}
-func (so *SimulatedONU) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) error {
- return errors.New("UnImplemented")
+func (so *SimulatedONU) Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
}
-func (so *SimulatedONU) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
- return errors.New("UnImplemented")
+func (so *SimulatedONU) Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
}
-func (so *SimulatedONU) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) error {
- return errors.New("UnImplemented")
+func (so *SimulatedONU) Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ return nil, errors.New("UnImplemented")
}
diff --git a/protos/device.proto b/protos/device.proto
index 16186fa..902a58c 100644
--- a/protos/device.proto
+++ b/protos/device.proto
@@ -110,6 +110,7 @@
DOWNLOAD_STARTED = 3;
DOWNLOAD_FAILED = 4;
DOWNLOAD_UNSUPPORTED = 5;
+ DOWNLOAD_CANCELLED = 6;
}
enum ImageDownloadFailureReason {
@@ -123,9 +124,10 @@
enum ImageActivateState {
IMAGE_UNKNOWN = 0;
IMAGE_INACTIVE = 1;
- IMAGE_ACTIVATE = 2;
+ IMAGE_ACTIVATING = 2;
IMAGE_ACTIVE = 3;
- IMAGE_REVERT = 4;
+ IMAGE_REVERTING = 4;
+ IMAGE_REVERTED = 5;
}
// Device Identifier
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index de8ae0b..e1d65da 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -26,7 +26,7 @@
from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
from python.adapters.interface import IAdapterInterface
from python.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
-from python.protos.device_pb2 import Device
+from python.protos.device_pb2 import Device, ImageDownload
from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
FlowGroupChanges, ofp_packet_out
from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
@@ -153,19 +153,85 @@
reason="device-invalid")
def download_image(self, device, request):
- return self.adapter.download_image(device, request)
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ img = ImageDownload()
+ if request:
+ request.Unpack(img)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.download_image(device, request)
def get_image_download_status(self, device, request):
- return self.adapter.get_image_download_status(device, request)
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ img = ImageDownload()
+ if request:
+ request.Unpack(img)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.get_image_download_status(device, request)
def cancel_image_download(self, device, request):
- return self.adapter.cancel_image_download(device, request)
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ img = ImageDownload()
+ if request:
+ request.Unpack(img)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.cancel_image_download(device, request)
def activate_image_update(self, device, request):
- return self.adapter.activate_image_update(device, request)
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ img = ImageDownload()
+ if request:
+ request.Unpack(img)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.activate_image_update(device, request)
def revert_image_update(self, device, request):
- return self.adapter.revert_image_update(device, request)
+ d = Device()
+ if device:
+ device.Unpack(d)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="device-invalid")
+ img = ImageDownload()
+ if request:
+ request.Unpack(img)
+ else:
+ return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+ reason="port-no-invalid")
+
+ return True, self.adapter.revert_image_update(device, request)
+
def self_test(self, device):
return self.adapter.self_test_device(device)
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index b26b161..d21cfdb 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -266,29 +266,125 @@
return nil, nil
}
-func (ap *AdapterProxy) DownloadImage(device voltha.Device, download voltha.ImageDownload) error {
- log.Debug("DownloadImage")
- return nil
+func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
+ log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
+ rpc := "download_image"
+ toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "request",
+ Value: download,
+ }
+ // Use a device specific topic as we are the only core handling requests for this device
+ replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
+
+ return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) GetImageDownloadStatus(device voltha.Device, download voltha.ImageDownload) error {
- log.Debug("GetImageDownloadStatus")
- return nil
+func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
+ rpc := "get_image_download_status"
+ toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "request",
+ Value: download,
+ }
+ // Use a device specific topic as we are the only core handling requests for this device
+ replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
+
+ if success {
+ unpackResult := &voltha.ImageDownload{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+ }
+ return unpackResult, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, err
+ }
+ log.Debugw("GetImageDownloadStatus-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
+ return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+ }
}
-func (ap *AdapterProxy) CancelImageDownload(device voltha.Device, download voltha.ImageDownload) error {
- log.Debug("CancelImageDownload")
- return nil
+func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
+ log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
+ rpc := "cancel_image_download"
+ toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "request",
+ Value: download,
+ }
+ // Use a device specific topic as we are the only core handling requests for this device
+ replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
+
+ return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) ActivateImageUpdate(device voltha.Device, download voltha.ImageDownload) error {
- log.Debug("ActivateImageUpdate")
- return nil
+func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
+ log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
+ rpc := "activate_image_update"
+ toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "request",
+ Value: download,
+ }
+ // Use a device specific topic as we are the only core handling requests for this device
+ replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
+
+ return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) RevertImageUpdate(device voltha.Device, download voltha.ImageDownload) error {
- log.Debug("RevertImageUpdate")
- return nil
+func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
+ log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
+ rpc := "revert_image_update"
+ toTopic := kafka.CreateSubTopic(device.Type, device.Id)
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "request",
+ Value: download,
+ }
+ // Use a device specific topic as we are the only core handling requests for this device
+ replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
+ log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
+
+ return unPackResponse(rpc, device.Id, success, result)
}
func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index d7e1b0a..6893179 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -495,3 +495,34 @@
go rhp.deviceMgr.PacketIn(deviceId.Id, uint32(portNo.Val), packet.Payload)
return new(empty.Empty), nil
}
+
+
+func (rhp *AdapterRequestHandlerProxy) UpdateImageDownload(args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 2 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ deviceId := &voltha.ID{}
+ img := &voltha.ImageDownload{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ log.Warnw("cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case "image_download":
+ if err := ptypes.UnmarshalAny(arg.Value, img); err != nil {
+ log.Warnw("cannot-unmarshal-imgaeDownload", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ log.Debugw("UpdateImageDownload", log.Fields{"deviceId": deviceId.Id, "image-download": img})
+ if rhp.TestMode { // Execute only for test cases
+ return nil, nil
+ }
+ go rhp.deviceMgr.updateImageDownload(deviceId.Id, img)
+ return new(empty.Empty), nil
+}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 3ccf808..b50ca94 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -302,6 +302,241 @@
return nil
}
+func (agent *DeviceAgent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("downloadImage", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ if device.AdminState != voltha.AdminState_ENABLED {
+ log.Debugw("device-not-enabled", log.Fields{"id": agent.deviceId})
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_ENABLED)
+ }
+ // Save the image
+ clonedImg := proto.Clone(img).(*voltha.ImageDownload)
+ clonedImg.State = voltha.ImageDownload_DOWNLOAD_REQUESTED
+ cloned := proto.Clone(device).(*voltha.Device)
+ if cloned.ImageDownloads == nil {
+ cloned.ImageDownloads = []*voltha.ImageDownload{clonedImg}
+ } else {
+ cloned.ImageDownloads = append(cloned.ImageDownloads, clonedImg)
+ }
+ cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ }
+ // Send the request to the adapter
+ if err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg); err != nil {
+ log.Debugw("downloadImage-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+ return nil, err
+ }
+ }
+ return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
+
+// isImageRegistered is a helper method to figure out if an image is already registered
+func isImageRegistered(img *voltha.ImageDownload, device *voltha.Device) bool {
+ for _, image := range device.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ return true
+ }
+ }
+ return false
+}
+
+func (agent *DeviceAgent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("cancelImageDownload", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // Verify whether the Image is in the list of image being downloaded
+ if !isImageRegistered(img, device) {
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceId, img.Name)
+ }
+
+ // Update image download state
+ cloned := proto.Clone(device).(*voltha.Device)
+ for _, image := range cloned.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ image.State = voltha.ImageDownload_DOWNLOAD_CANCELLED
+ }
+ }
+
+ //If device is in downloading state, send the request to cancel the download
+ if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
+ if err := agent.adapterProxy.CancelImageDownload(ctx, device, img); err != nil {
+ log.Debugw("cancelImageDownload-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+ return nil, err
+ }
+ // Set the device to Enabled
+ cloned.AdminState = voltha.AdminState_ENABLED
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ }
+ }
+ }
+ return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+ }
+
+func (agent *DeviceAgent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("activateImage", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // Verify whether the Image is in the list of image being downloaded
+ if !isImageRegistered(img, device) {
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceId, img.Name)
+ }
+
+ if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-in-downloading-state:%s", agent.deviceId, img.Name)
+ }
+ // Update image download state
+ cloned := proto.Clone(device).(*voltha.Device)
+ for _, image := range cloned.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ image.ImageState = voltha.ImageDownload_IMAGE_ACTIVATING
+ }
+ }
+ // Set the device to downloading_image
+ cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ }
+
+ if err := agent.adapterProxy.ActivateImageUpdate(ctx, device, img); err != nil {
+ log.Debugw("activateImage-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+ return nil, err
+ }
+ // The status of the AdminState will be changed following the update_download_status response from the adapter
+ // The image name will also be removed from the device list
+ }
+ return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil}
+
+
+func (agent *DeviceAgent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("revertImage", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // Verify whether the Image is in the list of image being downloaded
+ if !isImageRegistered(img, device) {
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceId, img.Name)
+ }
+
+ if device.AdminState != voltha.AdminState_ENABLED {
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-not-enabled-state:%s", agent.deviceId, img.Name)
+ }
+ // Update image download state
+ cloned := proto.Clone(device).(*voltha.Device)
+ for _, image := range cloned.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
+ }
+ }
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ }
+
+ if err := agent.adapterProxy.RevertImageUpdate(ctx, device, img); err != nil {
+ log.Debugw("revertImage-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+ return nil, err
+ }
+ }
+ return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+ }
+
+
+func (agent *DeviceAgent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("getImageDownloadStatus", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ if resp, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img); err != nil {
+ log.Debugw("getImageDownloadStatus-error", log.Fields{"id": agent.lastData.Id, "error": err, "image": img.Name})
+ return nil, err
+ } else {
+ return resp, nil
+ }
+ }
+}
+
+func (agent *DeviceAgent) updateImageDownload(img *voltha.ImageDownload) error{
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("updateImageDownload", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // Update the image as well as remove it if the download was cancelled
+ cloned := proto.Clone(device).(*voltha.Device)
+ clonedImages := make([]*voltha.ImageDownload, len(cloned.ImageDownloads))
+ for _, image := range cloned.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ if image.State != voltha.ImageDownload_DOWNLOAD_CANCELLED {
+ clonedImages = append(clonedImages, img)
+ }
+ }
+ }
+ cloned.ImageDownloads = clonedImages
+ // Set the Admin state to enabled if required
+ if (img.State != voltha.ImageDownload_DOWNLOAD_REQUESTED &&
+ img.State != voltha.ImageDownload_DOWNLOAD_STARTED) ||
+ (img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING){
+ cloned.AdminState = voltha.AdminState_ENABLED
+ }
+
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ }
+ }
+ return nil
+}
+
+func (agent *DeviceAgent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("getImageDownload", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ for _, image := range device.ImageDownloads {
+ if image.Id == img.Id && image.Name == img.Name {
+ return image, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "image-not-found:%s", img.Name)
+ }
+}
+
+func (agent *DeviceAgent) listImageDownloads(ctx context.Context, deviceId string) (*voltha.ImageDownloads, error) {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("listImageDownloads", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ return &voltha.ImageDownloads{Items:device.ImageDownloads}, nil
+ }
+}
+
// getPorts retrieves the ports information of the device based on the port type.
func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
@@ -380,7 +615,10 @@
return nil
}
// Perform the state transition in it's own go routine
- agent.deviceMgr.processTransition(previous, current)
+ if err := agent.deviceMgr.processTransition(previous, current); err != nil {
+ log.Errorw("failed-process-transition", log.Fields{"deviceId": previous.Id,
+ "previousAdminState": previous.AdminState, "currentAdminState": current.AdminState})
+ }
return nil
}(args...)
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 3f27e10..76475f9 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -583,6 +583,113 @@
return nil
}
+
+func (dMgr *DeviceManager) downloadImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
+ log.Debugw("downloadImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+ var res interface{}
+ var err error
+ if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if res, err = agent.downloadImage(ctx, img); err != nil {
+ log.Debugw("downloadImage-failed", log.Fields{"err": err, "imageName": img.Name})
+ res = err
+ }
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", img.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
+ log.Debugw("cancelImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+ var res interface{}
+ var err error
+ if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if res, err = agent.cancelImageDownload(ctx, img); err != nil {
+ log.Debugw("cancelImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
+ res = err
+ }
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", img.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) activateImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
+ log.Debugw("activateImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+ var res interface{}
+ var err error
+ if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if res, err = agent.activateImage(ctx, img); err != nil {
+ log.Debugw("activateImage-failed", log.Fields{"err": err, "imageName": img.Name})
+ res = err
+ }
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", img.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) revertImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
+ log.Debugw("revertImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+ var res interface{}
+ var err error
+ if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if res, err = agent.revertImage(ctx, img); err != nil {
+ log.Debugw("revertImage-failed", log.Fields{"err": err, "imageName": img.Name})
+ res = err
+ }
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", img.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
+func (dMgr *DeviceManager) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
+ log.Debugw("getImageDownloadStatus", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+ var res interface{}
+ var err error
+ if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ if res, err = agent.getImageDownloadStatus(ctx, img); err != nil {
+ log.Debugw("getImageDownloadStatus-failed", log.Fields{"err": err, "imageName": img.Name})
+ res = err
+ }
+ } else {
+ res = status.Errorf(codes.NotFound, "%s", img.Id)
+ }
+ sendResponse(ctx, ch, res)
+}
+
+
+func (dMgr *DeviceManager) updateImageDownload(deviceId string, img *voltha.ImageDownload) error {
+ log.Debugw("updateImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ if err := agent.updateImageDownload(img); err != nil {
+ log.Debugw("updateImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
+ return err
+ }
+ } else {
+ return status.Errorf(codes.NotFound, "%s", img.Id)
+ }
+ return nil
+}
+
+func (dMgr *DeviceManager) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ log.Debugw("getImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+ if agent := dMgr.getDeviceAgent(img.Id); agent != nil {
+ return agent.getImageDownload(ctx, img)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", img.Id)
+}
+
+func (dMgr *DeviceManager) listImageDownloads(ctx context.Context, deviceId string) (*voltha.ImageDownloads, error) {
+ log.Debugw("listImageDownloads", log.Fields{"deviceId": deviceId})
+ if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
+ return agent.listImageDownloads(ctx, deviceId)
+ }
+ return nil, status.Errorf(codes.NotFound, "%s", deviceId)
+}
+
+
func (dMgr *DeviceManager) activateDevice(cDevice *voltha.Device) error {
log.Info("activateDevice")
return nil
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index d11c420..dd777c0 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -37,6 +37,13 @@
const MAX_RESPONSE_TIME = 500 // milliseconds
+const (
+ IMAGE_DOWNLOAD = iota
+ CANCEL_IMAGE_DOWNLOAD = iota
+ ACTIVATE_IMAGE = iota
+ REVERT_IMAGE = iota
+)
+
type APIHandler struct {
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
@@ -425,6 +432,67 @@
return waitForNilResponseOnSuccess(ctx, ch)
}
+func (handler *APIHandler) acquireTransaction(ctx context.Context) (*KVTransaction, error) {
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return nil, err
+ } else if txn.Acquired(MAX_RESPONSE_TIME) {
+ return txn, nil
+ } else {
+ txn.Close()
+ return nil, errors.New("failed-to-seize-request")
+ }
+}
+
+// processImageRequest is a helper method to execute an image download request
+func (handler *APIHandler) processImageRequest(ctx context.Context, img *voltha.ImageDownload, requestType int) (*common.OperationResp, error) {
+ log.Debugw("processImageDownload", log.Fields{"img": *img, "requestType": requestType})
+ if isTestMode(ctx) {
+ resp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+ return resp, nil
+ }
+
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
+ return &common.OperationResp{}, err
+ } else {
+ defer txn.Close()
+ }
+
+ failedresponse := &common.OperationResp{Code:voltha.OperationResp_OPERATION_FAILURE}
+
+ ch := make(chan interface{})
+ defer close(ch)
+ switch requestType {
+ case IMAGE_DOWNLOAD:
+ go handler.deviceMgr.downloadImage(ctx, img, ch)
+ case CANCEL_IMAGE_DOWNLOAD:
+ go handler.deviceMgr.cancelImageDownload(ctx, img, ch)
+ case ACTIVATE_IMAGE:
+ go handler.deviceMgr.activateImage(ctx, img, ch)
+ case REVERT_IMAGE:
+ go handler.deviceMgr.revertImage(ctx, img, ch)
+ default:
+ log.Warn("invalid-request-type", log.Fields{"requestType": requestType})
+ return failedresponse, status.Errorf(codes.InvalidArgument, "%d", requestType)
+ }
+ select {
+ case res := <-ch:
+ if res != nil {
+ if err, ok := res.(error); ok {
+ return failedresponse, err
+ }
+ if opResp, ok := res.(*common.OperationResp); ok {
+ return opResp, nil
+ }
+ }
+ log.Warnw("download-image-unexpected-return-type", log.Fields{"result": res})
+ return failedresponse, status.Errorf(codes.Internal, "%s", res)
+ case <-ctx.Done():
+ log.Debug("downloadImage-client-timeout")
+ return nil, ctx.Err()
+ }
+}
+
func (handler *APIHandler) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
log.Debugw("DownloadImage-request", log.Fields{"img": *img})
if isTestMode(ctx) {
@@ -432,36 +500,109 @@
return resp, nil
}
- return nil, errors.New("UnImplemented")
+ return handler.processImageRequest(ctx, img, IMAGE_DOWNLOAD)
}
func (handler *APIHandler) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
- log.Debugw("CancelImageDownload-request", log.Fields{"img": *img})
+ log.Debugw("cancelImageDownload-request", log.Fields{"img": *img})
if isTestMode(ctx) {
resp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
return resp, nil
}
- return nil, errors.New("UnImplemented")
+ return handler.processImageRequest(ctx, img, CANCEL_IMAGE_DOWNLOAD)
}
func (handler *APIHandler) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
- log.Debugw("ActivateImageUpdate-request", log.Fields{"img": *img})
+ log.Debugw("activateImageUpdate-request", log.Fields{"img": *img})
if isTestMode(ctx) {
resp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
return resp, nil
}
- return nil, errors.New("UnImplemented")
+
+ return handler.processImageRequest(ctx, img, ACTIVATE_IMAGE)
}
func (handler *APIHandler) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
- log.Debugw("RevertImageUpdate-request", log.Fields{"img": *img})
+ log.Debugw("revertImageUpdate-request", log.Fields{"img": *img})
if isTestMode(ctx) {
resp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
return resp, nil
}
- return nil, errors.New("UnImplemented")
+
+ return handler.processImageRequest(ctx, img, REVERT_IMAGE)
}
+func (handler *APIHandler) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ log.Debugw("getImageDownloadStatus-request", log.Fields{"img": *img})
+ if isTestMode(ctx) {
+ resp := &voltha.ImageDownload{State: voltha.ImageDownload_DOWNLOAD_SUCCEEDED}
+ return resp, nil
+ }
+
+ failedresponse := &voltha.ImageDownload{State: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
+
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
+ return failedresponse, err
+ } else {
+ defer txn.Close()
+ }
+
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.getImageDownloadStatus(ctx, img, ch)
+
+ select {
+ case res := <-ch:
+ if res != nil {
+ if err, ok := res.(error); ok {
+ return failedresponse, err
+ }
+ if downloadResp, ok := res.(*voltha.ImageDownload); ok {
+ return downloadResp, nil
+ }
+ }
+ log.Warnw("download-image-status", log.Fields{"result": res})
+ return failedresponse, status.Errorf(codes.Internal, "%s", res)
+ case <-ctx.Done():
+ log.Debug("downloadImage-client-timeout")
+ return failedresponse, ctx.Err()
+ }
+}
+
+func (handler *APIHandler) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ log.Debugw("GetImageDownload-request", log.Fields{"img": *img})
+ if isTestMode(ctx) {
+ resp := &voltha.ImageDownload{State: voltha.ImageDownload_DOWNLOAD_SUCCEEDED}
+ return resp, nil
+ }
+
+ if download, err := handler.deviceMgr.getImageDownload(ctx, img); err != nil {
+ return &voltha.ImageDownload{State: voltha.ImageDownload_DOWNLOAD_UNKNOWN}, err
+ } else {
+ return download, nil
+ }
+}
+
+func (handler *APIHandler) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
+ log.Debugw("ListImageDownloads-request", log.Fields{"deviceId": id.Id})
+ if isTestMode(ctx) {
+ resp := &voltha.ImageDownloads{Items:[]*voltha.ImageDownload{}}
+ return resp, nil
+ }
+
+ if downloads, err := handler.deviceMgr.listImageDownloads(ctx, id.Id); err != nil {
+ failedResp := &voltha.ImageDownloads{
+ Items:[]*voltha.ImageDownload{
+ &voltha.ImageDownload{State: voltha.ImageDownload_DOWNLOAD_UNKNOWN},
+ },
+ }
+ return failedResp, err
+ } else {
+ return downloads, nil
+ }
+}
+
+
func (handler *APIHandler) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
log.Debugw("UpdateDevicePmConfigs-request", log.Fields{"configs": *configs})
if isTestMode(ctx) {
@@ -559,8 +700,7 @@
}
}
}
- log.Debugw("ReceivePacketsIn-request-done", log.Fields{"packetsIn": packetsIn})
- return nil
+ //TODO: FInd an elegant way to get out of the above loop when the Core is stopped
}
func (handler *APIHandler) sendChangeEvent(deviceId string, portStatus *openflow_13.OfpPortStatus) {
@@ -586,8 +726,8 @@
log.Errorw("Failed to send change event", log.Fields{"error": err})
}
}
- return nil
-}
+ // TODO: put the packet in the queue
+ }
func (handler *APIHandler) Subscribe(
ctx context.Context,