diff --git a/config/config.properties b/config/config.properties
index bde07af..2c4a4c5 100644
--- a/config/config.properties
+++ b/config/config.properties
@@ -1,4 +1,4 @@
-onap_ves_address=onap2
+onap_ves_address=onap
 onap_ves_port=30235
 
 kafka_address=kafka.voltha.svc.cluster.local
diff --git a/libs/evel_javalib2-1.1.0.jar b/libs/evel_javalib2-1.1.0.jar
index 8d21f35..2d9da20 100644
--- a/libs/evel_javalib2-1.1.0.jar
+++ b/libs/evel_javalib2-1.1.0.jar
Binary files differ
diff --git a/pom.xml b/pom.xml
index c492835..0e2c36f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,19 +1,19 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
- ~ Copyright 2018- Cisco
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
+~ Copyright 2018- Cisco
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
@@ -29,7 +29,7 @@
 
     <properties>
         <java.version>1.8</java.version>
-    	<docker.image.prefix>osam</docker.image.prefix>
+        <docker.image.prefix>osam</docker.image.prefix>
     </properties>
 
     <dependencies>
@@ -42,16 +42,22 @@
             <artifactId>spring-boot-starter-test</artifactId>
             <scope>test</scope>
         </dependency>
-	<dependency>
-    	    <groupId>org.apache.kafka</groupId>
-    	    <artifactId>kafka-clients</artifactId>
-    	    <version>1.1.0</version>
-	</dependency>
-	<dependency>
-        	<groupId>org.onap.vnfsdk.ves-agent</groupId>
-        	<artifactId>evel_javalib2</artifactId>
-        	<version>1.1.0</version>
-    	</dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>1.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.vnfsdk.ves-agent</groupId>
+            <artifactId>evel_javalib2</artifactId>
+            <version>1.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.8.5</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
 
@@ -60,19 +66,19 @@
             <plugin>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
-	    </plugin>
-	    <plugin>
-                        <groupId>com.spotify</groupId>
-                        <artifactId>dockerfile-maven-plugin</artifactId>
-                        <version>1.3.6</version>
-                        <configuration>
-                                <repository>${docker.image.prefix}/${project.artifactId}</repository>
-                                <buildArgs>
-                                        <JAR_FILE>target/${project.build.finalName}.jar</JAR_FILE>
-					<PROPERTIES_FILE>config/config.properties</PROPERTIES_FILE>
-                                </buildArgs>
-                        </configuration>
-                </plugin>
+            </plugin>
+            <plugin>
+                <groupId>com.spotify</groupId>
+                <artifactId>dockerfile-maven-plugin</artifactId>
+                <version>1.3.6</version>
+                <configuration>
+                    <repository>${docker.image.prefix}/${project.artifactId}</repository>
+                    <buildArgs>
+                        <JAR_FILE>target/${project.build.finalName}.jar</JAR_FILE>
+                        <PROPERTIES_FILE>config/config.properties</PROPERTIES_FILE>
+                    </buildArgs>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git a/src/main/java/config/Config.java b/src/main/java/config/Config.java
index abf5d73..3e520e3 100644
--- a/src/main/java/config/Config.java
+++ b/src/main/java/config/Config.java
@@ -1,18 +1,18 @@
 /*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package config;
 
 import java.util.Properties;
@@ -23,46 +23,46 @@
 
 public class Config {
 
-  private static Properties properties;
+    private static Properties properties;
 
-  private final static Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
+    private final static Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
 
-  public static void loadProperties(String file) {
-    // create application properties with default
-    try {
-      properties = new Properties();
+    public static void loadProperties(String file) {
+        // create application properties with default
+        try {
+            properties = new Properties();
 
-      // now load properties
-      // from last invocation
-      FileInputStream in = new FileInputStream(file);
-      properties.load(in);
-      in.close();
-    } catch (Exception e) {
-      logger.error(e.getMessage());
+            // now load properties
+            // from last invocation
+            FileInputStream in = new FileInputStream(file);
+            properties.load(in);
+            in.close();
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        }
     }
-  }
 
-  public static String get(String key) {
-    return (String)properties.get(key);
-  }
+    public static String get(String key) {
+        return (String)properties.get(key);
+    }
 
-  public static String getVesAddress() {
-    return get("onap_ves_address");
-  }
+    public static String getVesAddress() {
+        return get("onap_ves_address");
+    }
 
-  public static String getVesPort() {
-    return get("onap_ves_port");
-  }
+    public static String getVesPort() {
+        return get("onap_ves_port");
+    }
 
-  public static String getKafkaAddress() {
-    return get("kafka_address");
-  }
+    public static String getKafkaAddress() {
+        return get("kafka_address");
+    }
 
-  public static String getKafkaPort() {
-    return get("kafka_port");
-  }
+    public static String getKafkaPort() {
+        return get("kafka_port");
+    }
 
-  public static String getKafkaTopic() {
-    return get("kafka_topic");
-  }
+    public static String getKafkaTopic() {
+        return get("kafka_topic");
+    }
 }
diff --git a/src/main/java/controller/Application.java b/src/main/java/controller/Application.java
index 6ae05b8..d7a0d0e 100644
--- a/src/main/java/controller/Application.java
+++ b/src/main/java/controller/Application.java
@@ -1,18 +1,18 @@
 /*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package controller;
 
 import org.springframework.boot.SpringApplication;
@@ -40,7 +40,7 @@
 
     public static void main(String[] args) {
         Config.loadProperties("/opt/ves-agent/config.properties");
-	      KafkaThread kafka = new KafkaThread();
+        KafkaThread kafka = new KafkaThread();
         kafka.start();
         SpringApplication.run(Application.class, args);
     }
@@ -51,13 +51,13 @@
     private final static Logger logger = LoggerFactory.getLogger("KafkaThread");
 
     public void run() {
-      logger.debug("Start Kafka Consumer Thread");
-       try {
-         VolthaKafkaConsumer consumer = new VolthaKafkaConsumer();
-       	 consumer.runConsumer();
-       } catch (InterruptedException e) {
-          logger.error(e.getMessage());
-       }
+        logger.debug("Start Kafka Consumer Thread");
+        try {
+            VolthaKafkaConsumer consumer = new VolthaKafkaConsumer();
+            consumer.runConsumer();
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage());
+        }
 
     }
- }
+}
diff --git a/src/main/java/filter/MarkerFilter.java b/src/main/java/filter/MarkerFilter.java
index 8c20563..9887409 100644
--- a/src/main/java/filter/MarkerFilter.java
+++ b/src/main/java/filter/MarkerFilter.java
@@ -1,18 +1,18 @@
 /*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package filter;
 
 import java.util.Arrays;
@@ -31,10 +31,10 @@
 	Marker markerToMatch;
 
 	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see ch.qos.logback.core.filter.Filter#decide(java.lang.Object)
-	 */
+	* (non-Javadoc)
+	*
+	* @see ch.qos.logback.core.filter.Filter#decide(java.lang.Object)
+	*/
 	@Override
 	public FilterReply decide(ILoggingEvent event) {
 		if (!isStarted()) {
@@ -53,10 +53,10 @@
 	}
 
 	/**
-	 * The marker to match in the event.
-	 * 
-	 * @param markerToMatch
-	 */
+	* The marker to match in the event.
+	*
+	* @param markerToMatch
+	*/
 	public void setMarker(String markerStr) {
 		if (markerStr != null) {
 			Marker marker = MarkerFactory.getMarker(markerStr);
@@ -65,10 +65,10 @@
 	}
 
 	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see ch.qos.logback.core.filter.Filter#start()
-	 */
+	* (non-Javadoc)
+	*
+	* @see ch.qos.logback.core.filter.Filter#start()
+	*/
 	@Override
 	public void start() {
 		if (this.markerToMatch != null) {
diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
index a9f0603..dc431bb 100644
--- a/src/main/java/kafka/VolthaKafkaConsumer.java
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -1,18 +1,18 @@
 /*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package kafka;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -26,6 +26,11 @@
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 
+import java.time.Instant;
+import java.time.Duration;
+
+import com.google.gson.JsonSyntaxException;
+
 import javax.xml.ws.http.HTTPException;
 
 import java.util.concurrent.TimeUnit;
@@ -33,6 +38,7 @@
 import ves.*;
 import config.Config;
 
+
 public class VolthaKafkaConsumer {
 
     private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
@@ -42,80 +48,92 @@
     private KafkaConsumer<Long, String> consumer;
 
     public VolthaKafkaConsumer() {
-      logger.debug("VolthaKafkaConsumer constructor called");
-      initVesAgent();
-      consumer = createConsumer();
+        logger.debug("VolthaKafkaConsumer constructor called");
+        initVesAgent();
+        try {
+            consumer = createConsumer();
+        } catch (KafkaException e) {
+            logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+            //Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
+        }
     }
 
     private void initVesAgent() {
-      VesAgent.initVes();
+        VesAgent.initVes();
     }
 
     private KafkaConsumer<Long, String> createConsumer() {
-      logger.debug("Creating Kafka Consumer");
+        logger.debug("Creating Kafka Consumer");
 
-      String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
-      final Properties props = new Properties();
-      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                                  kafkaAddress);
-      props.put(ConsumerConfig.GROUP_ID_CONFIG,
-                                  "KafkaExampleConsumer");
-      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-              LongDeserializer.class.getName());
-      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-              StringDeserializer.class.getName());
-      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-              false);
+        String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        kafkaAddress);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG,
+        "KafkaExampleConsumer");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        LongDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        StringDeserializer.class.getName());
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        false);
 
-      // Create the consumer using props.
-      final KafkaConsumer<Long, String> consumer =
-                                  new KafkaConsumer<>(props);
+        // Create the consumer using props.
+        final KafkaConsumer<Long, String> consumer =
+        new KafkaConsumer<>(props);
 
-      // Subscribe to the topic.
-      consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
-      return consumer;
-  }
+        // Subscribe to the topic.
+        consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+        return consumer;
+    }
 
-  public void runConsumer() throws InterruptedException {
+    public void runConsumer() throws InterruptedException {
 
-      logger.debug("Starting Consumer");
+        logger.debug("Starting Consumer");
 
-      while (true) {
-	  ConsumerRecords<Long, String> consumerRecords;
- 	  try {
-	  	consumerRecords = consumer.poll(100000);
-	  } catch (KafkaException e) {
-		logger.debug("Error with Kafka connection. Retrying in 15 seconds.");
-		this.consumer = createConsumer();
-		TimeUnit.SECONDS.sleep(15);
-		continue;
-	  }
-          logger.info("{} Records retrieved from poll.", consumerRecords.count());
+        while (true) {
+            ConsumerRecords<Long, String> consumerRecords;
+            try {
+                if (consumer == null) {
+                    this.consumer = createConsumer();
+                }
+                consumerRecords = consumer.poll(20000);
+            } catch (KafkaException e) {
+                logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+                consumer = null;
+                TimeUnit.SECONDS.sleep(15);
+                continue;
+            }
+            logger.info("{} Records retrieved from poll.", consumerRecords.count());
 
-          boolean commit = true;
-          try {
-            consumerRecords.forEach(record -> {
-              logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
-                      record.key(), record.value(),
-                      record.partition(), record.offset());
-              logger.info("Attempting to send data to VES");
-              boolean success = VesAgent.sendToVES(record.value());
-              if (!success) {
-                throw new HTTPException(0);
-              } else {
-                logger.info("Sent Ves Message");
-              }
-            });
-          } catch (HTTPException e) {
-            logger.info("Ves message failed. Going back to polling.");
-            commit = false;
-          }
-          if (commit) {
-            consumer.commitAsync();
-          }
-      }
-      //consumer.close();
-      //logger.debug("DONE");
+            boolean commit = true;
+            try {
+                consumerRecords.forEach(record -> {
+                    Instant start = Instant.now();
+                    logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
+                    record.key(), record.value(),
+                    record.partition(), record.offset());
+                    logger.info("Attempting to send data to VES");
+                    boolean success = VesAgent.sendToVES(record.value());
+                    if (!success) {
+                        throw new HTTPException(0);
+                    } else {
+                        Instant finish = Instant.now();
+                        logger.info("Sent Ves Message. Took " + Duration.between(start, finish).toMillis() + " Milliseconds.");
+                    }
+                });
+            } catch (HTTPException e) {
+                logger.info("Ves message failed. Going back to polling.");
+                commit = false;
+            } catch (JsonSyntaxException e) {
+                logger.error("Json Syntax Exception: ", e);
+            }
+            if (commit) {
+                consumer.commitAsync();
+            }
+        }
+        //consumer.close();
+        //logger.debug("DONE");
     }
 
 }
diff --git a/src/main/java/mapper/VesVolthaMapper.java b/src/main/java/mapper/VesVolthaMapper.java
new file mode 100644
index 0000000..a3ee1ab
--- /dev/null
+++ b/src/main/java/mapper/VesVolthaMapper.java
@@ -0,0 +1,36 @@
+/*
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package mapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+public class VesVolthaMapper {
+
+    private final Logger logger = LoggerFactory.getLogger("VesVolthaMapper");
+
+    private Gson gson;
+
+    public VesVolthaMapper() {
+        gson = new GsonBuilder().create();
+    }
+
+    public VesVolthaMessage parseJson(String json) {
+        return gson.fromJson(json, VesVolthaMessage.class);
+    }
+}
diff --git a/src/main/java/mapper/VesVolthaMessage.java b/src/main/java/mapper/VesVolthaMessage.java
new file mode 100644
index 0000000..f0327df
--- /dev/null
+++ b/src/main/java/mapper/VesVolthaMessage.java
@@ -0,0 +1,76 @@
+/*
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package mapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Map;
+
+public class VesVolthaMessage {
+    private String id = "";
+    private String logical_device_id = "";
+    private String raised_ts = "";
+    private String description = "";
+    private String type = "";
+    private String category = "";
+    private String severity = "";
+    private String state = "";
+    private String resource_id = "";
+    private Map<String,String> context;
+
+    public String getId() {
+        return id;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public String getLogicalDeviceId() {
+        return logical_device_id;
+    }
+
+    public String getRaisedTS() {
+        return raised_ts;
+    }
+
+    public String getCategory() {
+        //Passing type instead of category to map bewteen VES and VOLTHA.
+        return category;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public String getSeverity() {
+        return severity;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public Map<String,String> getContext() {
+        return context;
+    }
+
+    public String getResourceId() {
+        return resource_id;
+    }
+
+
+}
diff --git a/src/main/java/ves/VesAgent.java b/src/main/java/ves/VesAgent.java
index b0465f7..a9a723d 100644
--- a/src/main/java/ves/VesAgent.java
+++ b/src/main/java/ves/VesAgent.java
@@ -1,18 +1,18 @@
 /*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package ves;
 
 import evel_javalibrary.att.com.*;
@@ -32,17 +32,24 @@
 import org.apache.log4j.Level;
 import config.Config;
 
+import mapper.VesVolthaMapper;
+import mapper.VesVolthaMessage;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.google.gson.JsonSyntaxException;
 
 public class VesAgent {
 
-  private static final Logger logger = LoggerFactory.getLogger("VesAgent");
+    private static final Logger logger = LoggerFactory.getLogger("VesAgent");
 
-  public static void initVes() {
-    logger.info("Initializing VES Agent");
-    try {
-        AgentMain.evel_initialize("http://"+Config.getVesAddress(),
+    private static VesVolthaMapper mapper;
+
+    public static void initVes() {
+        logger.info("Initializing VES Agent");
+        try {
+            mapper = new VesVolthaMapper();
+            AgentMain.evel_initialize("http://"+Config.getVesAddress(),
                 Integer.parseInt(Config.getVesPort()),
                 //  "http://1.2.3.4", 8080,
                 //"/vendor_event_listener","/example_vnf",
@@ -52,31 +59,62 @@
                 null, null, null,
                 //"/home/gokul/newwk/demo/vnfs/VES5.0/evel/sslcerts2/my-keystore.jks", "changeit", "changeit",
                 Level.TRACE);
-    } catch( Exception e ) {
-        e.printStackTrace();
+        } catch( Exception e ) {
+            e.printStackTrace();
+        }
     }
-  }
 
-  public static boolean sendToVES(String json) {
-
-    EvelFault flt  = new EvelFault("Fault_VOLTHA_failed", "tbd_event_key_unique_to_source",
-            "NIC error", "Hardware failed",
+    public static boolean sendToVES(String json) throws JsonSyntaxException {
+        VesVolthaMessage message = mapper.parseJson(json);
+        String id = message.getId();
+        String ldeviceId = message.getLogicalDeviceId();
+        String ts = message.getRaisedTS();
+        String description = message.getDescription();
+        //Type in Voltha needs to be category in VES
+        String category = message.getType();
+        //Category in VOLTHA needs to be type in VES
+        String type = message.getCategory();
+        String severity = message.getSeverity();
+        String state = message.getState();
+        String resourceId = message.getResourceId();
+        
+        EVEL_SEVERITIES vesSeverity = mapSeverity(severity);
+        EVEL_SOURCE_TYPES vesType = mapType(type);
+        EvelFault flt  = new EvelFault(
+            "Fault_VOLTHA_" + id,
+            ldeviceId + ":" + ts,
+            id,
+            description,
             EvelHeader.PRIORITIES.EVEL_PRIORITY_HIGH,
-            EVEL_SEVERITIES.EVEL_SEVERITY_MAJOR,
-            EVEL_SOURCE_TYPES.EVEL_SOURCE_CARD,
+            vesSeverity,
+            vesType,
             EVEL_VF_STATUSES.EVEL_VF_STATUS_ACTIVE);
-    flt.evel_fault_addl_info_add("voltha", json);
-    //flt.evel_fault_addl_info_add("nicsw", "fail");
-    flt.evel_fault_category_set("Communication");
-    logger.info("Sending fault event");
-    int code = AgentMain.evel_post_event_immediate(flt);
-    logger.info("Fault event http code received: " + code);
-    if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST )
-    {
-      return false;
-    } else {
-      return true;
-    }
-  }
+        flt.evel_fault_addl_info_add("voltha", json);
+        flt.evel_fault_addl_info_add("state", state);
+        flt.evel_fault_addl_info_add("resourceId", resourceId);
+        flt.evel_fault_category_set(category);
 
+        logger.info("Sending fault event");
+        int code = AgentMain.evel_post_event_immediate(flt);
+        logger.info("Fault event http code received: " + code);
+        if(code == 0 || code >= HttpURLConnection.HTTP_BAD_REQUEST ) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    private static EVEL_SEVERITIES mapSeverity(String severity) {
+        String severityUpper = severity.toUpperCase();
+        switch (severityUpper) {
+            case "INDETERMINATE":
+                return EVEL_SEVERITIES.EVEL_SEVERITY_NORMAL;
+            default:
+                return EVEL_SEVERITIES.valueOf("EVEL_SEVERITY_" + severityUpper);
+        }
+    }
+
+    private static EVEL_SOURCE_TYPES mapType(String type) {
+        return EVEL_SOURCE_TYPES.valueOf("EVEL_SOURCE_" + type.toUpperCase());
+    }
 }
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 89b8950..e4fcb12 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -1,17 +1,17 @@
 <!--
- Copyright 2018- Cisco
- 
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- 
-     http://www.apache.org/licenses/LICENSE-2.0
- 
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
+Copyright 2018- Cisco
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 -->
 <configuration>
 
@@ -24,45 +24,45 @@
 
     <logger name="org.apache.kafka" level="INFO"/>
     <logger name="org.apache.kafka.common.metrics" level="INFO"/>
-    
+
     <appender name="FILE_MAIN" class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>/opt/ves-agent/log-%d{yyyy-MM-dd}.txt</fileNamePattern>
-        <maxHistory>30</maxHistory>
-        <totalSizeCap>5GB</totalSizeCap>
-      </rollingPolicy>
-      <append>true</append>
-      <encoder>
-         <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
-      </encoder>
-   </appender>
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>/opt/ves-agent/log-%d{yyyy-MM-dd}.txt</fileNamePattern>
+            <maxHistory>30</maxHistory>
+            <totalSizeCap>5GB</totalSizeCap>
+        </rollingPolicy>
+        <append>true</append>
+        <encoder>
+            <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
+        </encoder>
+    </appender>
 
-   <appender name="FILE_DATA" class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>/opt/ves-agent/data-%d{yyyy-MM-dd}.txt</fileNamePattern>
-        <maxHistory>30</maxHistory> 
-        <totalSizeCap>1GB</totalSizeCap>
-      </rollingPolicy>
-      <append>true</append>
-      <filter class="filter.MarkerFilter">
-        <marker>DATA</marker>
-        <onMismatch>DENY</onMismatch>
-      </filter>
-      <filter class="filter.MarkerFilter">
-	<marker>DATA</marker>
-	<onMatch>ACCEPT</onMatch>
-      </filter>
-      <encoder>
-         <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
-      </encoder>
-   </appender>
+    <appender name="FILE_DATA" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>/opt/ves-agent/data-%d{yyyy-MM-dd}.txt</fileNamePattern>
+            <maxHistory>30</maxHistory>
+            <totalSizeCap>1GB</totalSizeCap>
+        </rollingPolicy>
+        <append>true</append>
+        <filter class="filter.MarkerFilter">
+            <marker>DATA</marker>
+            <onMismatch>DENY</onMismatch>
+        </filter>
+        <filter class="filter.MarkerFilter">
+            <marker>DATA</marker>
+            <onMatch>ACCEPT</onMatch>
+        </filter>
+        <encoder>
+            <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
+        </encoder>
+    </appender>
 
-   <logger name="org.springframework" level="WARNING"/>
+    <logger name="org.springframework" level="WARNING"/>
 
-   <root level="debug">
-      <appender-ref ref="FILE_MAIN" />
-      <appender-ref ref="FILE_DATA" />
-      <appender-ref ref="STDOUT" />
-   </root>
+    <root level="debug">
+        <appender-ref ref="FILE_MAIN" />
+        <appender-ref ref="FILE_DATA" />
+        <appender-ref ref="STDOUT" />
+    </root>
 
 </configuration>
diff --git a/ves-agent.yaml b/ves-agent.yaml
index 756f73e..29534de 100644
--- a/ves-agent.yaml
+++ b/ves-agent.yaml
@@ -11,7 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
- */
+
 apiVersion: v1
 kind: Service
 metadata:
@@ -52,8 +52,8 @@
               valueFrom:
                 fieldRef:
                   fieldPath: metadata.namespace
-          
+
           ports:
             - containerPort: 8080
               name: rest-port
-          imagePullPolicy: IfNotPresent
+          imagePullPolicy: Always
