VOL-1556 Add alarm simulation to voltha-go core
Change-Id: I23dcd720909a3e23cb203fd1ae32eada5fc4e34e
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
old mode 100644
new mode 100755
index 4ed1a71..541b502
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -29,10 +29,10 @@
)
type AdapterProxy struct {
- TestMode bool
+ TestMode bool
deviceTopicRegistered bool
- coreTopic *kafka.Topic
- kafkaICProxy *kafka.InterContainerProxy
+ coreTopic *kafka.Topic
+ kafkaICProxy *kafka.InterContainerProxy
}
func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
@@ -62,18 +62,17 @@
ap.coreTopic = coreTopic
}
-func (ap *AdapterProxy) getCoreTopic() kafka.Topic{
+func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
if ap.coreTopic != nil {
return *ap.coreTopic
}
- return kafka.Topic{Name:ap.kafkaICProxy.DefaultTopic.Name}
+ return kafka.Topic{Name: ap.kafkaICProxy.DefaultTopic.Name}
}
-func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic{
+func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
return kafka.Topic{Name: adapterName}
}
-
func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
log.Debugw("AdoptDevice", log.Fields{"device": device})
rpc := "adopt_device"
@@ -484,3 +483,25 @@
log.Debug("UnSuppressAlarm")
return nil
}
+
+func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulatereq *voltha.SimulateAlarmRequest) error {
+ log.Debugw("SimulateAlarm", log.Fields{"id": simulatereq.Id})
+ rpc := "simulate_alarm"
+ toTopic := ap.getAdapterTopic(device.Adapter)
+ args := make([]*kafka.KVArg, 2)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "request",
+ Value: simulatereq,
+ }
+
+ // Use a device topic for the response as we are the only core handling requests for this device
+ replyToTopic := ap.getCoreTopic()
+ ap.deviceTopicRegistered = true
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ log.Debugw("SimulateAlarm-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
old mode 100644
new mode 100755
index d00375f..9704fff
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -21,10 +21,10 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
+ fu "github.com/opencord/voltha-go/rw_core/utils"
ic "github.com/opencord/voltha-protos/go/inter_container"
ofp "github.com/opencord/voltha-protos/go/openflow_13"
"github.com/opencord/voltha-protos/go/voltha"
- fu "github.com/opencord/voltha-go/rw_core/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"reflect"
@@ -36,7 +36,7 @@
deviceType string
lastData *voltha.Device
adapterProxy *AdapterProxy
- adapterMgr *AdapterManager
+ adapterMgr *AdapterManager
deviceMgr *DeviceManager
clusterDataProxy *model.Proxy
deviceProxy *model.Proxy
@@ -410,7 +410,7 @@
}
}
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
- }
+}
func (agent *DeviceAgent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
agent.lockDevice.Lock()
@@ -448,8 +448,8 @@
// The status of the AdminState will be changed following the update_download_status response from the adapter
// The image name will also be removed from the device list
}
- return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil}
-
+ return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
func (agent *DeviceAgent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
agent.lockDevice.Lock()
@@ -484,8 +484,7 @@
}
}
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
- }
-
+}
func (agent *DeviceAgent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
agent.lockDevice.Lock()
@@ -504,7 +503,7 @@
}
}
-func (agent *DeviceAgent) updateImageDownload(img *voltha.ImageDownload) error{
+func (agent *DeviceAgent) updateImageDownload(img *voltha.ImageDownload) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debugw("updateImageDownload", log.Fields{"id": agent.deviceId})
@@ -526,7 +525,7 @@
// Set the Admin state to enabled if required
if (img.DownloadState != voltha.ImageDownload_DOWNLOAD_REQUESTED &&
img.DownloadState != voltha.ImageDownload_DOWNLOAD_STARTED) ||
- (img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING){
+ (img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING) {
cloned.AdminState = voltha.AdminState_ENABLED
}
@@ -562,7 +561,7 @@
if device, err := agent.getDeviceWithoutLock(); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
- return &voltha.ImageDownloads{Items:device.ImageDownloads}, nil
+ return &voltha.ImageDownloads{Items: device.ImageDownloads}, nil
}
}
@@ -1016,3 +1015,20 @@
}
return
}
+
+func (agent *DeviceAgent) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("simulateAlarm", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // First send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.SimulateAlarm(ctx, device, simulatereq); err != nil {
+ log.Debugw("simulateAlarm-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
+ }
+ }
+ return nil
+}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
old mode 100644
new mode 100755
index f37244f..f9da623
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -496,7 +496,7 @@
}
// Notify the logical device manager to setup a logical port if needed
if port.Type == voltha.Port_ETHERNET_NNI || port.Type == voltha.Port_ETHERNET_UNI {
- if device , err := dMgr.GetDevice(deviceId); err == nil {
+ if device, err := dMgr.GetDevice(deviceId); err == nil {
go dMgr.logicalDeviceMgr.addLogicalPort(device, port)
} else {
log.Errorw("failed-to-retrieve-device", log.Fields{"deviceId": deviceId})
@@ -992,3 +992,16 @@
}
return nil
}
+
+func (dMgr *DeviceManager) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {
+ log.Debugw("simulateAlarm", log.Fields{"id": simulatereq.Id, "Indicator": simulatereq.Indicator, "IntfId": simulatereq.IntfId,
+ "PortTypeName": simulatereq.PortTypeName, "OnuDeviceId": simulatereq.OnuDeviceId, "InverseBitErrorRate": simulatereq.InverseBitErrorRate,
+ "Drift": simulatereq.Drift, "NewEqd": simulatereq.NewEqd, "OnuSerialNumber": simulatereq.OnuSerialNumber, "Operation": simulatereq.Operation})
+ var res interface{}
+ if agent := dMgr.getDeviceAgent(simulatereq.Id); agent != nil {
+ res = agent.simulateAlarm(ctx, simulatereq)
+ log.Debugw("SimulateAlarm-result", log.Fields{"result": res})
+ }
+ //TODO CLI always get successful response
+ sendResponse(ctx, ch, res)
+}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
old mode 100644
new mode 100755
index 7ed756c..6532b6e
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -835,13 +835,29 @@
return nil, nil
}
-//@TODO useless stub, what should this actually do?
func (handler *APIHandler) SimulateAlarm(
ctx context.Context,
in *voltha.SimulateAlarmRequest,
) (*common.OperationResp, error) {
- log.Debug("SimulateAlarm-stub")
- return nil, nil
+ log.Debugw("SimulateAlarm-request", log.Fields{"id": in.Id})
+ successResp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+ if isTestMode(ctx) {
+ return successResp, nil
+ }
+
+ if handler.competeForTransaction() {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:in.Id}, handler.longRunningRequestTimeout); err != nil {
+ failedresponse := &common.OperationResp{Code:voltha.OperationResp_OPERATION_FAILURE}
+ return failedresponse, err
+ } else {
+ defer txn.Close()
+ }
+ }
+
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.simulateAlarm(ctx, in, ch)
+ return successResp, nil
}
//@TODO useless stub, what should this actually do?