[VOL-1605] Update disable/reenable device management logic

This is the initial commit of updating the device management
logic around disable and reenable of a device.

Change-Id: If6d40a0055e5e1ab61503b9ae9c5a4070ec53f35
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index 11a22ab..5bbd176 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -152,6 +152,32 @@
 	return unPackResponse(rpc, deviceId, success, result)
 }
 
+func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_OperStatus) error {
+	log.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
+	rpc := "PortsStateUpdate"
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	toTopic := ap.getCoreTopic(deviceId)
+	args := make([]*kafka.KVArg, 2)
+	id := &voltha.ID{Id: deviceId}
+	oStatus := &ic.IntType{Val: int64(operStatus)}
+
+	args[0] = &kafka.KVArg{
+		Key:   "device_id",
+		Value: id,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "oper_status",
+		Value: oStatus,
+	}
+
+	// Use a device specific topic as we are the only adaptercore handling requests for this device
+	replyToTopic := ap.getAdapterTopic()
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+	log.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+	return unPackResponse(rpc, deviceId, success, result)
+}
+
 func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
 	connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
 	log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index dd8df8c..8b582b8 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -107,10 +107,78 @@
 }
 
 func (rhp *RequestHandlerProxy) Disable_device(args []*ic.Argument) (*empty.Empty, error) {
+	if len(args) < 3 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+
+	device := &voltha.Device{}
+	transactionID := &ic.StrType{}
+	fromTopic := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.FromTopic:
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+				return nil, err
+			}
+		}
+	}
+	//Update the core reference for that device
+	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+	//Invoke the Disable_device API on the adapter
+	if err := rhp.adapter.Disable_device(device); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
 	return new(empty.Empty), nil
 }
 
 func (rhp *RequestHandlerProxy) Reenable_device(args []*ic.Argument) (*empty.Empty, error) {
+	if len(args) < 3 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return nil, err
+	}
+
+	device := &voltha.Device{}
+	transactionID := &ic.StrType{}
+	fromTopic := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.FromTopic:
+			if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+				log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+				return nil, err
+			}
+		}
+	}
+	//Update the core reference for that device
+	rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+	//Invoke the Reenable_device API on the adapter
+	if err := rhp.adapter.Reenable_device(device); err != nil {
+		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+	}
 	return new(empty.Empty), nil
 }
 
diff --git a/adapters/simulated_olt/adaptercore/device_handler.go b/adapters/simulated_olt/adaptercore/device_handler.go
index 9aaeea4..07f31ff 100644
--- a/adapters/simulated_olt/adaptercore/device_handler.go
+++ b/adapters/simulated_olt/adaptercore/device_handler.go
@@ -200,3 +200,44 @@
 	log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
 	return nil
 }
+
+func (dh *DeviceHandler) DisableDevice(device *voltha.Device) {
+	cloned := proto.Clone(device).(*voltha.Device)
+	// Update the all ports state on that device to disable
+	if err := dh.coreProxy.PortsStateUpdate(nil, cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
+		log.Errorw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+		return
+	}
+
+	//Update the device state
+	cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+	cloned.OperStatus = voltha.OperStatus_UNKNOWN
+	dh.device = cloned
+
+	if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+		log.Errorw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+		return
+	}
+	log.Debugw("DisableDevice-end", log.Fields{"deviceId": device.Id})
+}
+
+func (dh *DeviceHandler) ReEnableDevice(device *voltha.Device) {
+
+	cloned := proto.Clone(device).(*voltha.Device)
+	// Update the all ports state on that device to enable
+	if err := dh.coreProxy.PortsStateUpdate(nil, cloned.Id, voltha.OperStatus_ACTIVE); err != nil {
+		log.Errorw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+		return
+	}
+
+	//Update the device state
+	cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
+	cloned.OperStatus = voltha.OperStatus_ACTIVE
+	dh.device = cloned
+
+	if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+		log.Errorw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+		return
+	}
+	log.Debugw("ReEnableDevice-end", log.Fields{"deviceId": device.Id})
+}
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index 029f032..fee4dc8 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -165,11 +165,29 @@
 }
 
 func (so *SimulatedOLT) Disable_device(device *voltha.Device) error {
-	return errors.New("UnImplemented")
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Infow("disable-device", log.Fields{"deviceId": device.Id})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler != nil {
+		go handler.DisableDevice(device)
+	}
+	return nil
 }
 
 func (so *SimulatedOLT) Reenable_device(device *voltha.Device) error {
-	return errors.New("UnImplemented")
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Infow("reenable-device", log.Fields{"deviceId": device.Id})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler != nil {
+		go handler.ReEnableDevice(device)
+	}
+	return nil
 }
 
 func (so *SimulatedOLT) Reboot_device(device *voltha.Device) error {
diff --git a/adapters/simulated_onu/adaptercore/device_handler.go b/adapters/simulated_onu/adaptercore/device_handler.go
index ec00426..a845427 100644
--- a/adapters/simulated_onu/adaptercore/device_handler.go
+++ b/adapters/simulated_onu/adaptercore/device_handler.go
@@ -182,3 +182,44 @@
 	log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": msg.Header.Id})
 	return nil
 }
+
+func (dh *DeviceHandler) DisableDevice(device *voltha.Device) {
+	cloned := proto.Clone(device).(*voltha.Device)
+	// Update the all ports state on that device to disable
+	if err := dh.coreProxy.PortsStateUpdate(nil, cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
+		log.Errorw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+		return
+	}
+
+	//Update the device state
+	cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+	cloned.OperStatus = voltha.OperStatus_UNKNOWN
+	dh.device = cloned
+
+	if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+		log.Errorw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+		return
+	}
+	log.Debugw("DisableDevice-end", log.Fields{"deviceId": device.Id})
+}
+
+func (dh *DeviceHandler) ReEnableDevice(device *voltha.Device) {
+
+	cloned := proto.Clone(device).(*voltha.Device)
+	// Update the all ports state on that device to enable
+	if err := dh.coreProxy.PortsStateUpdate(nil, cloned.Id, voltha.OperStatus_ACTIVE); err != nil {
+		log.Errorw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
+		return
+	}
+
+	//Update the device state
+	cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
+	cloned.OperStatus = voltha.OperStatus_ACTIVE
+	dh.device = cloned
+
+	if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+		log.Errorw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
+		return
+	}
+	log.Debugw("ReEnableDevice-end", log.Fields{"deviceId": device.Id})
+}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index f0b92d8..a74b377 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -157,11 +157,29 @@
 }
 
 func (so *SimulatedONU) Disable_device(device *voltha.Device) error {
-	return errors.New("UnImplemented")
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Infow("disable-device", log.Fields{"deviceId": device.Id})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler != nil {
+		go handler.DisableDevice(device)
+	}
+	return nil
 }
 
 func (so *SimulatedONU) Reenable_device(device *voltha.Device) error {
-	return errors.New("UnImplemented")
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Infow("reenable-device", log.Fields{"deviceId": device.Id})
+	var handler *DeviceHandler
+	if handler = so.getDeviceHandler(device.Id); handler != nil {
+		go handler.ReEnableDevice(device)
+	}
+	return nil
 }
 
 func (so *SimulatedONU) Reboot_device(device *voltha.Device) error {