[VOL-1605] Update disable/reenable device management logic
This is the initial commit of updating the device management
logic around disable and reenable of a device.
Change-Id: If6d40a0055e5e1ab61503b9ae9c5a4070ec53f35
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index 11a22ab..5bbd176 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -152,6 +152,32 @@
return unPackResponse(rpc, deviceId, success, result)
}
+func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_OperStatus) error {
+ log.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
+ rpc := "PortsStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+ oStatus := &ic.IntType{Val: int64(operStatus)}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "oper_status",
+ Value: oStatus,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ log.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
+
func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index dd8df8c..8b582b8 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -107,10 +107,78 @@
}
func (rhp *RequestHandlerProxy) Disable_device(args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+ //Invoke the Disable_device API on the adapter
+ if err := rhp.adapter.Disable_device(device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
func (rhp *RequestHandlerProxy) Reenable_device(args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+ //Invoke the Reenable_device API on the adapter
+ if err := rhp.adapter.Reenable_device(device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
diff --git a/adapters/simulated_olt/adaptercore/device_handler.go b/adapters/simulated_olt/adaptercore/device_handler.go
index 9aaeea4..07f31ff 100644
--- a/adapters/simulated_olt/adaptercore/device_handler.go
+++ b/adapters/simulated_olt/adaptercore/device_handler.go
@@ -200,3 +200,44 @@
log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
return nil
}
+
+func (dh *DeviceHandler) DisableDevice(device *voltha.Device) {
+ cloned := proto.Clone(device).(*voltha.Device)
+ // Update the all ports state on that device to disable
+ if err := dh.coreProxy.PortsStateUpdate(nil, cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
+ log.Errorw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return
+ }
+
+ //Update the device state
+ cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+ cloned.OperStatus = voltha.OperStatus_UNKNOWN
+ dh.device = cloned
+
+ if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ log.Errorw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return
+ }
+ log.Debugw("DisableDevice-end", log.Fields{"deviceId": device.Id})
+}
+
+func (dh *DeviceHandler) ReEnableDevice(device *voltha.Device) {
+
+ cloned := proto.Clone(device).(*voltha.Device)
+ // Update the all ports state on that device to enable
+ if err := dh.coreProxy.PortsStateUpdate(nil, cloned.Id, voltha.OperStatus_ACTIVE); err != nil {
+ log.Errorw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return
+ }
+
+ //Update the device state
+ cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
+ cloned.OperStatus = voltha.OperStatus_ACTIVE
+ dh.device = cloned
+
+ if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ log.Errorw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return
+ }
+ log.Debugw("ReEnableDevice-end", log.Fields{"deviceId": device.Id})
+}
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index 029f032..fee4dc8 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -165,11 +165,29 @@
}
func (so *SimulatedOLT) Disable_device(device *voltha.Device) error {
- return errors.New("UnImplemented")
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Infow("disable-device", log.Fields{"deviceId": device.Id})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.DisableDevice(device)
+ }
+ return nil
}
func (so *SimulatedOLT) Reenable_device(device *voltha.Device) error {
- return errors.New("UnImplemented")
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Infow("reenable-device", log.Fields{"deviceId": device.Id})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.ReEnableDevice(device)
+ }
+ return nil
}
func (so *SimulatedOLT) Reboot_device(device *voltha.Device) error {
diff --git a/adapters/simulated_onu/adaptercore/device_handler.go b/adapters/simulated_onu/adaptercore/device_handler.go
index ec00426..a845427 100644
--- a/adapters/simulated_onu/adaptercore/device_handler.go
+++ b/adapters/simulated_onu/adaptercore/device_handler.go
@@ -182,3 +182,44 @@
log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
return nil
}
+
+func (dh *DeviceHandler) DisableDevice(device *voltha.Device) {
+ cloned := proto.Clone(device).(*voltha.Device)
+ // Update the all ports state on that device to disable
+ if err := dh.coreProxy.PortsStateUpdate(nil, cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
+ log.Errorw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return
+ }
+
+ //Update the device state
+ cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+ cloned.OperStatus = voltha.OperStatus_UNKNOWN
+ dh.device = cloned
+
+ if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ log.Errorw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return
+ }
+ log.Debugw("DisableDevice-end", log.Fields{"deviceId": device.Id})
+}
+
+func (dh *DeviceHandler) ReEnableDevice(device *voltha.Device) {
+
+ cloned := proto.Clone(device).(*voltha.Device)
+ // Update the all ports state on that device to enable
+ if err := dh.coreProxy.PortsStateUpdate(nil, cloned.Id, voltha.OperStatus_ACTIVE); err != nil {
+ log.Errorw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return
+ }
+
+ //Update the device state
+ cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
+ cloned.OperStatus = voltha.OperStatus_ACTIVE
+ dh.device = cloned
+
+ if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ log.Errorw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+ return
+ }
+ log.Debugw("ReEnableDevice-end", log.Fields{"deviceId": device.Id})
+}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index f0b92d8..a74b377 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -157,11 +157,29 @@
}
func (so *SimulatedONU) Disable_device(device *voltha.Device) error {
- return errors.New("UnImplemented")
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Infow("disable-device", log.Fields{"deviceId": device.Id})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.DisableDevice(device)
+ }
+ return nil
}
func (so *SimulatedONU) Reenable_device(device *voltha.Device) error {
- return errors.New("UnImplemented")
+ if device == nil {
+ log.Warn("device-is-nil")
+ return errors.New("nil-device")
+ }
+ log.Infow("reenable-device", log.Fields{"deviceId": device.Id})
+ var handler *DeviceHandler
+ if handler = so.getDeviceHandler(device.Id); handler != nil {
+ go handler.ReEnableDevice(device)
+ }
+ return nil
}
func (so *SimulatedONU) Reboot_device(device *voltha.Device) error {