Added id code and removed from name

Added funtionality to consumer and send kpi events to onap

Change-Id: I0849e3b9188b1fa53210c341f1fc6e6228c5de9b
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/src/main/java/controller/Application.java b/src/main/java/controller/Application.java
index d7a0d0e..23566dd 100644
--- a/src/main/java/controller/Application.java
+++ b/src/main/java/controller/Application.java
@@ -20,6 +20,7 @@
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 import kafka.VolthaKafkaConsumer;
+import kafka.KafkaConsumerType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,23 +41,40 @@
 
     public static void main(String[] args) {
         Config.loadProperties("/opt/ves-agent/config.properties");
-        KafkaThread kafka = new KafkaThread();
-        kafka.start();
-        SpringApplication.run(Application.class, args);
+        KafkaAlarmsThread kafkaAlarms = new KafkaAlarmsThread();
+        kafkaAlarms.start();
+        KafkaKpisThread kafkaKpis = new KafkaKpisThread();
+        kafkaKpis.start();
+        //SpringApplication.run(Application.class, args);
     }
 
 }
-class KafkaThread extends Thread {
+class KafkaAlarmsThread extends Thread {
 
-    private final static Logger logger = LoggerFactory.getLogger("KafkaThread");
+    private final static Logger logger = LoggerFactory.getLogger("KafkaAlarmsThread");
 
     public void run() {
-        logger.debug("Start Kafka Consumer Thread");
+        logger.debug("Start Kafka Alarms Consumer Thread");
         try {
-            VolthaKafkaConsumer consumer = new VolthaKafkaConsumer();
+            VolthaKafkaConsumer consumer = new VolthaKafkaConsumer(KafkaConsumerType.ALARMS);
             consumer.runConsumer();
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage());
+        } catch (Exception e) {
+            logger.error("Error in Kafka Alarm thread", e);
+        }
+
+    }
+}
+class KafkaKpisThread extends Thread {
+
+    private final static Logger logger = LoggerFactory.getLogger("KafkaKpisThread");
+
+    public void run() {
+        logger.debug("Start Kafka KPIs Consumer Thread");
+        try {
+            VolthaKafkaConsumer consumer = new VolthaKafkaConsumer(KafkaConsumerType.KPIS);
+            consumer.runConsumer();
+        } catch (Exception e) {
+            logger.error("Error Kafka, KPI thread", e);
         }
 
     }