SEBA-749 Added on demand api code for omci test action

Change-Id: I1a52dc5ec78ac61001e434c8c98f0400e034dc50
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index e3c362e..d0a2e3f 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -342,3 +342,14 @@
 	replyToTopic := ap.getCoreTopic()
 	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, pDeviceID, args...)
 }
+
+func (ap *AdapterProxy) startOmciTest(ctx context.Context, device *voltha.Device, omcitestrequest *voltha.OmciTestRequest) (chan *kafka.RpcResponse, error) {
+	log.Debugw("Omci_test_Request_adapter_proxy", log.Fields{"device": device, "omciTestRequest": omcitestrequest})
+	rpc := "start_omci_test"
+	toTopic := ap.getAdapterTopic(device.Adapter)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := ap.getCoreTopic()
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id,
+		&kafka.KVArg{Key: "device", Value: device},
+		&kafka.KVArg{Key: "omcitestrequest", Value: omcitestrequest})
+}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 837e884..5f946ba 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -1703,3 +1703,41 @@
 	go agent.waitForAdapterResponse(subCtx, cancel, "childDeviceLost", ch, agent.onSuccess, agent.onFailure)
 	return nil
 }
+
+func (agent *DeviceAgent) startOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+
+	device := agent.getDeviceWithoutLock()
+	adapterName, err := agent.adapterMgr.getAdapterName(device.Type)
+	if err != nil {
+		agent.requestQueue.RequestComplete()
+		return nil, err
+	}
+
+	// Send request to the adapter
+	device.Adapter = adapterName
+	ch, err := agent.adapterProxy.startOmciTest(ctx, device, omcitestrequest)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+
+	// Wait for the adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+	}
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+
+	// Unmarshal and return the response
+	testResp := &voltha.TestResponse{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	}
+	log.Debugw("Omci_test_Request-Success-device-agent", log.Fields{"testResp": testResp})
+	return testResp, nil
+}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 0d77429..0f68432 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -1534,3 +1534,16 @@
 	}
 	return status.Errorf(codes.NotFound, "%s", curr.Id)
 }
+
+func (dMgr *DeviceManager) startOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+	log.Debugw("Omci_test_Request", log.Fields{"device-id": omcitestrequest.Id, "uuid": omcitestrequest.Uuid})
+	if agent := dMgr.getDeviceAgent(ctx, omcitestrequest.Id); agent != nil {
+		res, err := agent.startOmciTest(ctx, omcitestrequest)
+		if err != nil {
+			return nil, err
+		}
+		log.Debugw("Omci_test_Response_result-device-magnager", log.Fields{"result": res})
+		return res, nil
+	}
+	return nil, status.Errorf(codes.NotFound, "%s", omcitestrequest.Id)
+}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 0196f35..74cea5e 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -1215,3 +1215,8 @@
 	go handler.deviceMgr.disablePort(ctx, port, ch)
 	return waitForNilResponseOnSuccess(ctx, ch)
 }
+
+func (handler *APIHandler) StartOmciTestAction(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+	log.Debugw("Omci_test_Request", log.Fields{"id": omcitestrequest.Id, "uuid": omcitestrequest.Uuid})
+	return handler.deviceMgr.startOmciTest(ctx, omcitestrequest)
+}