Added id code and removed from name

Added funtionality to consumer and send kpi events to onap

Change-Id: I0849e3b9188b1fa53210c341f1fc6e6228c5de9b
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/config/config.properties b/config/config.properties
index ce035c3..999b4b1 100644
--- a/config/config.properties
+++ b/config/config.properties
@@ -1,9 +1,11 @@
 onap_ves_address=onap
 onap_ves_port=30235
 
-kafka_address=kafka.voltha.svc.cluster.local
+kafka_address=cord-kafka
 kafka_port=9092
-kafka_topic=voltha.alarms
+
+kafka_alarms_topic=voltha.alarms
+kafka_kpis_topic=voltha.kpis
 
 co_id=001
 pod_id=001
diff --git a/src/main/java/config/Config.java b/src/main/java/config/Config.java
index ec27cfa..cce3045 100644
--- a/src/main/java/config/Config.java
+++ b/src/main/java/config/Config.java
@@ -60,10 +60,14 @@
 

     public static String getKafkaPort() {

         return get("kafka_port");

+    }
+
+    public static String getKafkaAlarmsTopic() {

+        return get("kafka_alarms_topic");

     }

 

-    public static String getKafkaTopic() {

-        return get("kafka_topic");

+    public static String getKafkaKpisTopic() {

+        return get("kafka_kpis_topic");

     }

 

     public static String getCoId() {

@@ -73,4 +77,4 @@
     public static String getPodId() {

         return get("pod_id");

     }

-}

+}
diff --git a/src/main/java/controller/Application.java b/src/main/java/controller/Application.java
index d7a0d0e..23566dd 100644
--- a/src/main/java/controller/Application.java
+++ b/src/main/java/controller/Application.java
@@ -20,6 +20,7 @@
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 import kafka.VolthaKafkaConsumer;
+import kafka.KafkaConsumerType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,23 +41,40 @@
 
     public static void main(String[] args) {
         Config.loadProperties("/opt/ves-agent/config.properties");
-        KafkaThread kafka = new KafkaThread();
-        kafka.start();
-        SpringApplication.run(Application.class, args);
+        KafkaAlarmsThread kafkaAlarms = new KafkaAlarmsThread();
+        kafkaAlarms.start();
+        KafkaKpisThread kafkaKpis = new KafkaKpisThread();
+        kafkaKpis.start();
+        //SpringApplication.run(Application.class, args);
     }
 
 }
-class KafkaThread extends Thread {
+class KafkaAlarmsThread extends Thread {
 
-    private final static Logger logger = LoggerFactory.getLogger("KafkaThread");
+    private final static Logger logger = LoggerFactory.getLogger("KafkaAlarmsThread");
 
     public void run() {
-        logger.debug("Start Kafka Consumer Thread");
+        logger.debug("Start Kafka Alarms Consumer Thread");
         try {
-            VolthaKafkaConsumer consumer = new VolthaKafkaConsumer();
+            VolthaKafkaConsumer consumer = new VolthaKafkaConsumer(KafkaConsumerType.ALARMS);
             consumer.runConsumer();
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage());
+        } catch (Exception e) {
+            logger.error("Error in Kafka Alarm thread", e);
+        }
+
+    }
+}
+class KafkaKpisThread extends Thread {
+
+    private final static Logger logger = LoggerFactory.getLogger("KafkaKpisThread");
+
+    public void run() {
+        logger.debug("Start Kafka KPIs Consumer Thread");
+        try {
+            VolthaKafkaConsumer consumer = new VolthaKafkaConsumer(KafkaConsumerType.KPIS);
+            consumer.runConsumer();
+        } catch (Exception e) {
+            logger.error("Error Kafka, KPI thread", e);
         }
 
     }
diff --git a/src/main/java/kafka/KafkaConsumerType.java b/src/main/java/kafka/KafkaConsumerType.java
new file mode 100644
index 0000000..13cd249
--- /dev/null
+++ b/src/main/java/kafka/KafkaConsumerType.java
@@ -0,0 +1,22 @@
+/*

+* Copyright 2018- Cisco

+*

+* Licensed under the Apache License, Version 2.0 (the "License");

+* you may not use this file except in compliance with the License.

+* You may obtain a copy of the License at

+*

+*     http://www.apache.org/licenses/LICENSE-2.0

+*

+* Unless required by applicable law or agreed to in writing, software

+* distributed under the License is distributed on an "AS IS" BASIS,

+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+* See the License for the specific language governing permissions and

+* limitations under the License.

+*/

+

+package kafka;

+

+public enum KafkaConsumerType {

+    ALARMS,

+    KPIS

+}

diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
index dc431bb..11deb81 100644
--- a/src/main/java/kafka/VolthaKafkaConsumer.java
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -12,6 +12,7 @@
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
+*
 */
 package kafka;
 import org.apache.kafka.clients.consumer.*;
@@ -38,7 +39,6 @@
 import ves.*;
 import config.Config;
 
-
 public class VolthaKafkaConsumer {
 
     private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
@@ -47,13 +47,17 @@
 
     private KafkaConsumer<Long, String> consumer;
 
-    public VolthaKafkaConsumer() {
+    private KafkaConsumerType type;
+
+    public VolthaKafkaConsumer(KafkaConsumerType type) {
         logger.debug("VolthaKafkaConsumer constructor called");
         initVesAgent();
+        this.type = type;
         try {
             consumer = createConsumer();
-        } catch (KafkaException e) {
-            logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+        } catch (Exception e) {
+            logger.error("Error with Kafka: ", e);
+            logger.error("Retrying in 15 seconds.");
             //Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
         }
     }
@@ -70,26 +74,31 @@
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
         kafkaAddress);
         props.put(ConsumerConfig.GROUP_ID_CONFIG,
-        "KafkaExampleConsumer");
+        "VesAgent");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
         LongDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
         StringDeserializer.class.getName());
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
         false);
-
         // Create the consumer using props.
         final KafkaConsumer<Long, String> consumer =
         new KafkaConsumer<>(props);
-
         // Subscribe to the topic.
-        consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+        switch (type) {
+            case ALARMS:
+                consumer.subscribe(Collections.singletonList(Config.getKafkaAlarmsTopic()));
+                break;
+            case KPIS:
+                consumer.subscribe(Collections.singletonList(Config.getKafkaKpisTopic()));
+                break;
+        }
         return consumer;
     }
 
     public void runConsumer() throws InterruptedException {
 
-        logger.debug("Starting Consumer");
+        logger.debug("Starting Kafka Consumer");
 
         while (true) {
             ConsumerRecords<Long, String> consumerRecords;
@@ -98,8 +107,9 @@
                     this.consumer = createConsumer();
                 }
                 consumerRecords = consumer.poll(20000);
-            } catch (KafkaException e) {
-                logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+            } catch (Exception e) {
+                logger.error("Error with kafka: ", e);
+                logger.error("Retrying in 15 seconds.");
                 consumer = null;
                 TimeUnit.SECONDS.sleep(15);
                 continue;
@@ -114,7 +124,7 @@
                     record.key(), record.value(),
                     record.partition(), record.offset());
                     logger.info("Attempting to send data to VES");
-                    boolean success = VesAgent.sendToVES(record.value());
+                    boolean success = VesAgent.sendToVES(type, record.value());
                     if (!success) {
                         throw new HTTPException(0);
                     } else {
diff --git a/src/main/java/mapper/VesVolthaMessage.java b/src/main/java/mapper/VesVolthaAlarm.java
similarity index 97%
rename from src/main/java/mapper/VesVolthaMessage.java
rename to src/main/java/mapper/VesVolthaAlarm.java
index f0327df..743b8f1 100644
--- a/src/main/java/mapper/VesVolthaMessage.java
+++ b/src/main/java/mapper/VesVolthaAlarm.java
@@ -19,7 +19,7 @@
 import org.slf4j.LoggerFactory;

 import java.util.Map;

 

-public class VesVolthaMessage {

+public class VesVolthaAlarm {

     private String id = "";

     private String logical_device_id = "";

     private String raised_ts = "";

diff --git a/src/main/java/mapper/VesVolthaKpi.java b/src/main/java/mapper/VesVolthaKpi.java
new file mode 100644
index 0000000..b6bf02e
--- /dev/null
+++ b/src/main/java/mapper/VesVolthaKpi.java
@@ -0,0 +1,53 @@
+/*

+* Copyright 2018- Cisco

+*

+* Licensed under the Apache License, Version 2.0 (the "License");

+* you may not use this file except in compliance with the License.

+* You may obtain a copy of the License at

+*

+*     http://www.apache.org/licenses/LICENSE-2.0

+*

+* Unless required by applicable law or agreed to in writing, software

+* distributed under the License is distributed on an "AS IS" BASIS,

+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+* See the License for the specific language governing permissions and

+* limitations under the License.

+*/

+package mapper;

+

+import org.slf4j.Logger;

+import org.slf4j.LoggerFactory;

+import java.util.Map;

+import com.google.gson.annotations.JsonAdapter;

+

+public class VesVolthaKpi {

+    private String type = "";

+    private String ts = "";

+

+    //@JsonAdapter(StringTypeAdapter.class)

+    private String slice_data;

+

+    public String getType() {

+        return type;

+    }

+

+    public void setType(String type) {

+        this.type = type;

+    }

+

+    public String getTs() {

+        return ts;

+    }

+

+    public void setTs(String ts) {

+        this.ts = ts;

+    }

+

+    public String getSliceData() {

+        return slice_data;

+    }

+

+    public void setSliceData(String slice_data) {

+        this.slice_data = slice_data;

+    }

+}

diff --git a/src/main/java/mapper/VesVolthaMapper.java b/src/main/java/mapper/VesVolthaMapper.java
index a3ee1ab..6d927ca 100644
--- a/src/main/java/mapper/VesVolthaMapper.java
+++ b/src/main/java/mapper/VesVolthaMapper.java
@@ -19,6 +19,7 @@
 import org.slf4j.LoggerFactory;

 import com.google.gson.Gson;

 import com.google.gson.GsonBuilder;

+import com.google.gson.JsonObject;

 

 public class VesVolthaMapper {

 

@@ -30,7 +31,17 @@
         gson = new GsonBuilder().create();

     }

 

-    public VesVolthaMessage parseJson(String json) {

-        return gson.fromJson(json, VesVolthaMessage.class);

+    public VesVolthaAlarm parseAlarm(String json) {

+        return gson.fromJson(json, VesVolthaAlarm.class);

+    }

+

+    public VesVolthaKpi parseKpi(String json) {

+        //return gson.fromJson(json, VesVolthaKpi.class);

+        JsonObject body = gson.fromJson(json, JsonObject.class);

+        VesVolthaKpi kpi = new VesVolthaKpi();

+        kpi.setType(body.get("type").getAsString());

+        kpi.setTs(body.get("ts").getAsString());

+        kpi.setSliceData(body.get("slice_data").toString());

+        return kpi;

     }

 }

diff --git a/src/main/java/ves/VesAgent.java b/src/main/java/ves/VesAgent.java
index bea964d..ac42115 100644
--- a/src/main/java/ves/VesAgent.java
+++ b/src/main/java/ves/VesAgent.java
@@ -33,7 +33,9 @@
 import config.Config;

 

 import mapper.VesVolthaMapper;

-import mapper.VesVolthaMessage;

+import mapper.VesVolthaAlarm;

+import mapper.VesVolthaKpi;

+import kafka.KafkaConsumerType;

 

 import org.slf4j.Logger;

 import org.slf4j.LoggerFactory;

@@ -63,9 +65,31 @@
         }

     }

 

-    public static boolean sendToVES(String json) throws JsonSyntaxException {

-        VesVolthaMessage message = mapper.parseJson(json);

+    public static boolean sendToVES(KafkaConsumerType type, String json) throws JsonSyntaxException {

+        int code = 0;

+

+        switch (type) {

+            case ALARMS:

+                code = sendFault(json);

+                break;

+            case KPIS:

+                code = sendKpi(json);

+                break;

+        }

+

+        if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST ) {

+            return false;

+        } else {

+            return true;

+        }

+    }

+

+    private static int sendFault(String json) {

+        VesVolthaAlarm message = mapper.parseAlarm(json);

+

         String id = message.getId();

+        String[] idsplit = id.split("\\.");

+        String eventType = idsplit[idsplit.length-1];

         String ldeviceId = message.getLogicalDeviceId();

         String ts = message.getRaisedTS();

         String description = message.getDescription();

@@ -80,7 +104,7 @@
         EVEL_SEVERITIES vesSeverity = mapSeverity(severity);

         EVEL_SOURCE_TYPES vesType = mapType(type);

         EvelFault flt  = new EvelFault(

-            "Fault_VOLTHA_" + id,

+            "Fault_VOLTHA_" + eventType,

             ldeviceId + ":" + ts,

             id,

             description,

@@ -98,11 +122,25 @@
         logger.info("Sending fault event");

         int code = AgentMain.evel_post_event_immediate(flt);

         logger.info("Fault event http code received: " + code);

-        if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST ) {

-            return false;

-        } else {

-            return true;

-        }

+        return code;

+    }

+

+    private static int sendKpi(String json) {

+        VesVolthaKpi message = mapper.parseKpi(json);

+

+        EvelOther ev = new EvelOther("measurement_VOLTHA_KPI", "vmname_ip");

+        ev.evel_other_field_add("co_id", Config.getCoId());

+        ev.evel_other_field_add("pod_id", Config.getPodId());

+        ev.evel_other_field_add("type", message.getType());

+        ev.evel_other_field_add("ts", message.getTs());

+        ev.evel_other_field_add("slices", message.getSliceData());

+

+        ev.evel_other_field_add("voltha", json);

+

+        logger.info("Sending fault event");

+        int code = AgentMain.evel_post_event_immediate(ev);

+        logger.info("Fault event http code received: " + code);

+        return code;

     }

 

     private static EVEL_SEVERITIES mapSeverity(String severity) {

diff --git a/ves-agent.yaml b/ves-agent.yaml
index 29534de..8acf42b 100644
--- a/ves-agent.yaml
+++ b/ves-agent.yaml
@@ -16,7 +16,7 @@
 kind: Service
 metadata:
   name: ves
-  namespace: voltha
+  namespace: default
   labels:
     name: ves
 spec:
@@ -32,7 +32,7 @@
 kind: Deployment
 metadata:
   name: ves
-  namespace: voltha
+  namespace: default
 spec:
   replicas: 1
   template: