diff --git a/config/config.properties b/config/config.properties
index ce035c3..999b4b1 100644
--- a/config/config.properties
+++ b/config/config.properties
@@ -1,9 +1,11 @@
 onap_ves_address=onap
 onap_ves_port=30235
 
-kafka_address=kafka.voltha.svc.cluster.local
+kafka_address=cord-kafka
 kafka_port=9092
-kafka_topic=voltha.alarms
+
+kafka_alarms_topic=voltha.alarms
+kafka_kpis_topic=voltha.kpis
 
 co_id=001
 pod_id=001
diff --git a/src/main/java/config/Config.java b/src/main/java/config/Config.java
index ec27cfa..cce3045 100644
--- a/src/main/java/config/Config.java
+++ b/src/main/java/config/Config.java
@@ -60,10 +60,14 @@
 
     public static String getKafkaPort() {
         return get("kafka_port");
+    }
+
+    public static String getKafkaAlarmsTopic() {
+        return get("kafka_alarms_topic");
     }
 
-    public static String getKafkaTopic() {
-        return get("kafka_topic");
+    public static String getKafkaKpisTopic() {
+        return get("kafka_kpis_topic");
     }
 
     public static String getCoId() {
@@ -73,4 +77,4 @@
     public static String getPodId() {
         return get("pod_id");
     }
-}
+}
diff --git a/src/main/java/controller/Application.java b/src/main/java/controller/Application.java
index d7a0d0e..23566dd 100644
--- a/src/main/java/controller/Application.java
+++ b/src/main/java/controller/Application.java
@@ -20,6 +20,7 @@
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 import kafka.VolthaKafkaConsumer;
+import kafka.KafkaConsumerType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,23 +41,40 @@
 
     public static void main(String[] args) {
         Config.loadProperties("/opt/ves-agent/config.properties");
-        KafkaThread kafka = new KafkaThread();
-        kafka.start();
-        SpringApplication.run(Application.class, args);
+        KafkaAlarmsThread kafkaAlarms = new KafkaAlarmsThread();
+        kafkaAlarms.start();
+        KafkaKpisThread kafkaKpis = new KafkaKpisThread();
+        kafkaKpis.start();
+        //SpringApplication.run(Application.class, args);
     }
 
 }
-class KafkaThread extends Thread {
+class KafkaAlarmsThread extends Thread {
 
-    private final static Logger logger = LoggerFactory.getLogger("KafkaThread");
+    private final static Logger logger = LoggerFactory.getLogger("KafkaAlarmsThread");
 
     public void run() {
-        logger.debug("Start Kafka Consumer Thread");
+        logger.debug("Start Kafka Alarms Consumer Thread");
         try {
-            VolthaKafkaConsumer consumer = new VolthaKafkaConsumer();
+            VolthaKafkaConsumer consumer = new VolthaKafkaConsumer(KafkaConsumerType.ALARMS);
             consumer.runConsumer();
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage());
+        } catch (Exception e) {
+            logger.error("Error in Kafka Alarm thread", e);
+        }
+
+    }
+}
+class KafkaKpisThread extends Thread {
+
+    private final static Logger logger = LoggerFactory.getLogger("KafkaKpisThread");
+
+    public void run() {
+        logger.debug("Start Kafka KPIs Consumer Thread");
+        try {
+            VolthaKafkaConsumer consumer = new VolthaKafkaConsumer(KafkaConsumerType.KPIS);
+            consumer.runConsumer();
+        } catch (Exception e) {
+            logger.error("Error Kafka, KPI thread", e);
         }
 
     }
diff --git a/src/main/java/kafka/KafkaConsumerType.java b/src/main/java/kafka/KafkaConsumerType.java
new file mode 100644
index 0000000..13cd249
--- /dev/null
+++ b/src/main/java/kafka/KafkaConsumerType.java
@@ -0,0 +1,22 @@
+/*
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package kafka;
+
+public enum KafkaConsumerType {
+    ALARMS,
+    KPIS
+}
diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
index dc431bb..11deb81 100644
--- a/src/main/java/kafka/VolthaKafkaConsumer.java
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -12,6 +12,7 @@
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
+*
 */
 package kafka;
 import org.apache.kafka.clients.consumer.*;
@@ -38,7 +39,6 @@
 import ves.*;
 import config.Config;
 
-
 public class VolthaKafkaConsumer {
 
     private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
@@ -47,13 +47,17 @@
 
     private KafkaConsumer<Long, String> consumer;
 
-    public VolthaKafkaConsumer() {
+    private KafkaConsumerType type;
+
+    public VolthaKafkaConsumer(KafkaConsumerType type) {
         logger.debug("VolthaKafkaConsumer constructor called");
         initVesAgent();
+        this.type = type;
         try {
             consumer = createConsumer();
-        } catch (KafkaException e) {
-            logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+        } catch (Exception e) {
+            logger.error("Error with Kafka: ", e);
+            logger.error("Retrying in 15 seconds.");
             //Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
         }
     }
@@ -70,26 +74,31 @@
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
         kafkaAddress);
         props.put(ConsumerConfig.GROUP_ID_CONFIG,
-        "KafkaExampleConsumer");
+        "VesAgent");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
         LongDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
         StringDeserializer.class.getName());
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
         false);
-
         // Create the consumer using props.
         final KafkaConsumer<Long, String> consumer =
         new KafkaConsumer<>(props);
-
         // Subscribe to the topic.
-        consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+        switch (type) {
+            case ALARMS:
+                consumer.subscribe(Collections.singletonList(Config.getKafkaAlarmsTopic()));
+                break;
+            case KPIS:
+                consumer.subscribe(Collections.singletonList(Config.getKafkaKpisTopic()));
+                break;
+        }
         return consumer;
     }
 
     public void runConsumer() throws InterruptedException {
 
-        logger.debug("Starting Consumer");
+        logger.debug("Starting Kafka Consumer");
 
         while (true) {
             ConsumerRecords<Long, String> consumerRecords;
@@ -98,8 +107,9 @@
                     this.consumer = createConsumer();
                 }
                 consumerRecords = consumer.poll(20000);
-            } catch (KafkaException e) {
-                logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+            } catch (Exception e) {
+                logger.error("Error with kafka: ", e);
+                logger.error("Retrying in 15 seconds.");
                 consumer = null;
                 TimeUnit.SECONDS.sleep(15);
                 continue;
@@ -114,7 +124,7 @@
                     record.key(), record.value(),
                     record.partition(), record.offset());
                     logger.info("Attempting to send data to VES");
-                    boolean success = VesAgent.sendToVES(record.value());
+                    boolean success = VesAgent.sendToVES(type, record.value());
                     if (!success) {
                         throw new HTTPException(0);
                     } else {
diff --git a/src/main/java/mapper/VesVolthaMessage.java b/src/main/java/mapper/VesVolthaAlarm.java
similarity index 97%
rename from src/main/java/mapper/VesVolthaMessage.java
rename to src/main/java/mapper/VesVolthaAlarm.java
index f0327df..743b8f1 100644
--- a/src/main/java/mapper/VesVolthaMessage.java
+++ b/src/main/java/mapper/VesVolthaAlarm.java
@@ -19,7 +19,7 @@
 import org.slf4j.LoggerFactory;
 import java.util.Map;
 
-public class VesVolthaMessage {
+public class VesVolthaAlarm {
     private String id = "";
     private String logical_device_id = "";
     private String raised_ts = "";
diff --git a/src/main/java/mapper/VesVolthaKpi.java b/src/main/java/mapper/VesVolthaKpi.java
new file mode 100644
index 0000000..b6bf02e
--- /dev/null
+++ b/src/main/java/mapper/VesVolthaKpi.java
@@ -0,0 +1,53 @@
+/*
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package mapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Map;
+import com.google.gson.annotations.JsonAdapter;
+
+public class VesVolthaKpi {
+    private String type = "";
+    private String ts = "";
+
+    //@JsonAdapter(StringTypeAdapter.class)
+    private String slice_data;
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getTs() {
+        return ts;
+    }
+
+    public void setTs(String ts) {
+        this.ts = ts;
+    }
+
+    public String getSliceData() {
+        return slice_data;
+    }
+
+    public void setSliceData(String slice_data) {
+        this.slice_data = slice_data;
+    }
+}
diff --git a/src/main/java/mapper/VesVolthaMapper.java b/src/main/java/mapper/VesVolthaMapper.java
index a3ee1ab..6d927ca 100644
--- a/src/main/java/mapper/VesVolthaMapper.java
+++ b/src/main/java/mapper/VesVolthaMapper.java
@@ -19,6 +19,7 @@
 import org.slf4j.LoggerFactory;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
 
 public class VesVolthaMapper {
 
@@ -30,7 +31,17 @@
         gson = new GsonBuilder().create();
     }
 
-    public VesVolthaMessage parseJson(String json) {
-        return gson.fromJson(json, VesVolthaMessage.class);
+    public VesVolthaAlarm parseAlarm(String json) {
+        return gson.fromJson(json, VesVolthaAlarm.class);
+    }
+
+    public VesVolthaKpi parseKpi(String json) {
+        //return gson.fromJson(json, VesVolthaKpi.class);
+        JsonObject body = gson.fromJson(json, JsonObject.class);
+        VesVolthaKpi kpi = new VesVolthaKpi();
+        kpi.setType(body.get("type").getAsString());
+        kpi.setTs(body.get("ts").getAsString());
+        kpi.setSliceData(body.get("slice_data").toString());
+        return kpi;
     }
 }
diff --git a/src/main/java/ves/VesAgent.java b/src/main/java/ves/VesAgent.java
index bea964d..ac42115 100644
--- a/src/main/java/ves/VesAgent.java
+++ b/src/main/java/ves/VesAgent.java
@@ -33,7 +33,9 @@
 import config.Config;
 
 import mapper.VesVolthaMapper;
-import mapper.VesVolthaMessage;
+import mapper.VesVolthaAlarm;
+import mapper.VesVolthaKpi;
+import kafka.KafkaConsumerType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,9 +65,31 @@
         }
     }
 
-    public static boolean sendToVES(String json) throws JsonSyntaxException {
-        VesVolthaMessage message = mapper.parseJson(json);
+    public static boolean sendToVES(KafkaConsumerType type, String json) throws JsonSyntaxException {
+        int code = 0;
+
+        switch (type) {
+            case ALARMS:
+                code = sendFault(json);
+                break;
+            case KPIS:
+                code = sendKpi(json);
+                break;
+        }
+
+        if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST ) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    private static int sendFault(String json) {
+        VesVolthaAlarm message = mapper.parseAlarm(json);
+
         String id = message.getId();
+        String[] idsplit = id.split("\\.");
+        String eventType = idsplit[idsplit.length-1];
         String ldeviceId = message.getLogicalDeviceId();
         String ts = message.getRaisedTS();
         String description = message.getDescription();
@@ -80,7 +104,7 @@
         EVEL_SEVERITIES vesSeverity = mapSeverity(severity);
         EVEL_SOURCE_TYPES vesType = mapType(type);
         EvelFault flt  = new EvelFault(
-            "Fault_VOLTHA_" + id,
+            "Fault_VOLTHA_" + eventType,
             ldeviceId + ":" + ts,
             id,
             description,
@@ -98,11 +122,25 @@
         logger.info("Sending fault event");
         int code = AgentMain.evel_post_event_immediate(flt);
         logger.info("Fault event http code received: " + code);
-        if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST ) {
-            return false;
-        } else {
-            return true;
-        }
+        return code;
+    }
+
+    private static int sendKpi(String json) {
+        VesVolthaKpi message = mapper.parseKpi(json);
+
+        EvelOther ev = new EvelOther("measurement_VOLTHA_KPI", "vmname_ip");
+        ev.evel_other_field_add("co_id", Config.getCoId());
+        ev.evel_other_field_add("pod_id", Config.getPodId());
+        ev.evel_other_field_add("type", message.getType());
+        ev.evel_other_field_add("ts", message.getTs());
+        ev.evel_other_field_add("slices", message.getSliceData());
+
+        ev.evel_other_field_add("voltha", json);
+
+        logger.info("Sending fault event");
+        int code = AgentMain.evel_post_event_immediate(ev);
+        logger.info("Fault event http code received: " + code);
+        return code;
     }
 
     private static EVEL_SEVERITIES mapSeverity(String severity) {
diff --git a/ves-agent.yaml b/ves-agent.yaml
index 29534de..8acf42b 100644
--- a/ves-agent.yaml
+++ b/ves-agent.yaml
@@ -16,7 +16,7 @@
 kind: Service
 metadata:
   name: ves
-  namespace: voltha
+  namespace: default
   labels:
     name: ves
 spec:
@@ -32,7 +32,7 @@
 kind: Deployment
 metadata:
   name: ves
-  namespace: voltha
+  namespace: default
 spec:
   replicas: 1
   template:
