VOL-1556 Add alarm simulation to voltha-go core

Change-Id: I23dcd720909a3e23cb203fd1ae32eada5fc4e34e
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
old mode 100644
new mode 100755
index 4ed1a71..541b502
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -29,10 +29,10 @@
 )
 
 type AdapterProxy struct {
-	TestMode     bool
+	TestMode              bool
 	deviceTopicRegistered bool
-	coreTopic *kafka.Topic
-	kafkaICProxy *kafka.InterContainerProxy
+	coreTopic             *kafka.Topic
+	kafkaICProxy          *kafka.InterContainerProxy
 }
 
 func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
@@ -62,18 +62,17 @@
 	ap.coreTopic = coreTopic
 }
 
-func (ap *AdapterProxy) getCoreTopic() kafka.Topic{
+func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
 	if ap.coreTopic != nil {
 		return *ap.coreTopic
 	}
-	return kafka.Topic{Name:ap.kafkaICProxy.DefaultTopic.Name}
+	return kafka.Topic{Name: ap.kafkaICProxy.DefaultTopic.Name}
 }
 
-func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic{
+func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
 	return kafka.Topic{Name: adapterName}
 }
 
-
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
 	rpc := "adopt_device"
@@ -484,3 +483,25 @@
 	log.Debug("UnSuppressAlarm")
 	return nil
 }
+
+func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulatereq *voltha.SimulateAlarmRequest) error {
+	log.Debugw("SimulateAlarm", log.Fields{"id": simulatereq.Id})
+	rpc := "simulate_alarm"
+	toTopic := ap.getAdapterTopic(device.Adapter)
+	args := make([]*kafka.KVArg, 2)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	args[1] = &kafka.KVArg{
+		Key:   "request",
+		Value: simulatereq,
+	}
+
+	// Use a device topic for the response as we are the only core handling requests for this device
+	replyToTopic := ap.getCoreTopic()
+	ap.deviceTopicRegistered = true
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+	log.Debugw("SimulateAlarm-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
+}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
old mode 100644
new mode 100755
index d00375f..9704fff
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -21,10 +21,10 @@
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
+	fu "github.com/opencord/voltha-go/rw_core/utils"
 	ic "github.com/opencord/voltha-protos/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/go/openflow_13"
 	"github.com/opencord/voltha-protos/go/voltha"
-	fu "github.com/opencord/voltha-go/rw_core/utils"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"reflect"
@@ -36,7 +36,7 @@
 	deviceType       string
 	lastData         *voltha.Device
 	adapterProxy     *AdapterProxy
-	adapterMgr *AdapterManager
+	adapterMgr       *AdapterManager
 	deviceMgr        *DeviceManager
 	clusterDataProxy *model.Proxy
 	deviceProxy      *model.Proxy
@@ -410,7 +410,7 @@
 		}
 	}
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
-	}
+}
 
 func (agent *DeviceAgent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
 	agent.lockDevice.Lock()
@@ -448,8 +448,8 @@
 		// The status of the AdminState will be changed following the update_download_status response from the adapter
 		// The image name will also be removed from the device list
 	}
-	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil}
-
+	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
 
 func (agent *DeviceAgent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
 	agent.lockDevice.Lock()
@@ -484,8 +484,7 @@
 		}
 	}
 	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
-	}
-
+}
 
 func (agent *DeviceAgent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
 	agent.lockDevice.Lock()
@@ -504,7 +503,7 @@
 	}
 }
 
-func (agent *DeviceAgent) updateImageDownload(img *voltha.ImageDownload) error{
+func (agent *DeviceAgent) updateImageDownload(img *voltha.ImageDownload) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debugw("updateImageDownload", log.Fields{"id": agent.deviceId})
@@ -526,7 +525,7 @@
 		// Set the Admin state to enabled if required
 		if (img.DownloadState != voltha.ImageDownload_DOWNLOAD_REQUESTED &&
 			img.DownloadState != voltha.ImageDownload_DOWNLOAD_STARTED) ||
-			(img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING){
+			(img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING) {
 			cloned.AdminState = voltha.AdminState_ENABLED
 		}
 
@@ -562,7 +561,7 @@
 	if device, err := agent.getDeviceWithoutLock(); err != nil {
 		return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
-		return &voltha.ImageDownloads{Items:device.ImageDownloads}, nil
+		return &voltha.ImageDownloads{Items: device.ImageDownloads}, nil
 	}
 }
 
@@ -1016,3 +1015,20 @@
 	}
 	return
 }
+
+func (agent *DeviceAgent) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest) error {
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	log.Debugw("simulateAlarm", log.Fields{"id": agent.deviceId})
+	// Get the most up to date the device info
+	if device, err := agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	} else {
+		// First send the request to an Adapter and wait for a response
+		if err := agent.adapterProxy.SimulateAlarm(ctx, device, simulatereq); err != nil {
+			log.Debugw("simulateAlarm-error", log.Fields{"id": agent.lastData.Id, "error": err})
+			return err
+		}
+	}
+	return nil
+}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
old mode 100644
new mode 100755
index f37244f..f9da623
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -496,7 +496,7 @@
 		}
 		// Notify the logical device manager to setup a logical port if needed
 		if port.Type == voltha.Port_ETHERNET_NNI || port.Type == voltha.Port_ETHERNET_UNI {
-			if device , err := dMgr.GetDevice(deviceId); err == nil {
+			if device, err := dMgr.GetDevice(deviceId); err == nil {
 				go dMgr.logicalDeviceMgr.addLogicalPort(device, port)
 			} else {
 				log.Errorw("failed-to-retrieve-device", log.Fields{"deviceId": deviceId})
@@ -992,3 +992,16 @@
 	}
 	return nil
 }
+
+func (dMgr *DeviceManager) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {
+	log.Debugw("simulateAlarm", log.Fields{"id": simulatereq.Id, "Indicator": simulatereq.Indicator, "IntfId": simulatereq.IntfId,
+		"PortTypeName": simulatereq.PortTypeName, "OnuDeviceId": simulatereq.OnuDeviceId, "InverseBitErrorRate": simulatereq.InverseBitErrorRate,
+		"Drift": simulatereq.Drift, "NewEqd": simulatereq.NewEqd, "OnuSerialNumber": simulatereq.OnuSerialNumber, "Operation": simulatereq.Operation})
+	var res interface{}
+	if agent := dMgr.getDeviceAgent(simulatereq.Id); agent != nil {
+		res = agent.simulateAlarm(ctx, simulatereq)
+		log.Debugw("SimulateAlarm-result", log.Fields{"result": res})
+	}
+	//TODO CLI always get successful response
+	sendResponse(ctx, ch, res)
+}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
old mode 100644
new mode 100755
index 7ed756c..6532b6e
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -835,13 +835,29 @@
 	return nil, nil
 }
 
-//@TODO useless stub, what should this actually do?
 func (handler *APIHandler) SimulateAlarm(
 	ctx context.Context,
 	in *voltha.SimulateAlarmRequest,
 ) (*common.OperationResp, error) {
-	log.Debug("SimulateAlarm-stub")
-	return nil, nil
+	log.Debugw("SimulateAlarm-request", log.Fields{"id": in.Id})
+	successResp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+	if isTestMode(ctx) {
+		return successResp, nil
+	}
+
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:in.Id}, handler.longRunningRequestTimeout); err != nil {
+			failedresponse := &common.OperationResp{Code:voltha.OperationResp_OPERATION_FAILURE}
+			return failedresponse, err
+		} else {
+			defer txn.Close()
+		}
+	}
+
+	ch := make(chan interface{})
+	defer close(ch)
+	go handler.deviceMgr.simulateAlarm(ctx, in, ch)
+	return successResp, nil
 }
 
 //@TODO useless stub, what should this actually do?