Added id code and removed from name
Added funtionality to consumer and send kpi events to onap
Change-Id: I0849e3b9188b1fa53210c341f1fc6e6228c5de9b
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/src/main/java/controller/Application.java b/src/main/java/controller/Application.java
index d7a0d0e..23566dd 100644
--- a/src/main/java/controller/Application.java
+++ b/src/main/java/controller/Application.java
@@ -20,6 +20,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import kafka.VolthaKafkaConsumer;
+import kafka.KafkaConsumerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,23 +41,40 @@
public static void main(String[] args) {
Config.loadProperties("/opt/ves-agent/config.properties");
- KafkaThread kafka = new KafkaThread();
- kafka.start();
- SpringApplication.run(Application.class, args);
+ KafkaAlarmsThread kafkaAlarms = new KafkaAlarmsThread();
+ kafkaAlarms.start();
+ KafkaKpisThread kafkaKpis = new KafkaKpisThread();
+ kafkaKpis.start();
+ //SpringApplication.run(Application.class, args);
}
}
-class KafkaThread extends Thread {
+class KafkaAlarmsThread extends Thread {
- private final static Logger logger = LoggerFactory.getLogger("KafkaThread");
+ private final static Logger logger = LoggerFactory.getLogger("KafkaAlarmsThread");
public void run() {
- logger.debug("Start Kafka Consumer Thread");
+ logger.debug("Start Kafka Alarms Consumer Thread");
try {
- VolthaKafkaConsumer consumer = new VolthaKafkaConsumer();
+ VolthaKafkaConsumer consumer = new VolthaKafkaConsumer(KafkaConsumerType.ALARMS);
consumer.runConsumer();
- } catch (InterruptedException e) {
- logger.error(e.getMessage());
+ } catch (Exception e) {
+ logger.error("Error in Kafka Alarm thread", e);
+ }
+
+ }
+}
+class KafkaKpisThread extends Thread {
+
+ private final static Logger logger = LoggerFactory.getLogger("KafkaKpisThread");
+
+ public void run() {
+ logger.debug("Start Kafka KPIs Consumer Thread");
+ try {
+ VolthaKafkaConsumer consumer = new VolthaKafkaConsumer(KafkaConsumerType.KPIS);
+ consumer.runConsumer();
+ } catch (Exception e) {
+ logger.error("Error Kafka, KPI thread", e);
}
}