diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
index a9f0603..dc431bb 100644
--- a/src/main/java/kafka/VolthaKafkaConsumer.java
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -1,18 +1,18 @@
 /*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package kafka;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -26,6 +26,11 @@
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 
+import java.time.Instant;
+import java.time.Duration;
+
+import com.google.gson.JsonSyntaxException;
+
 import javax.xml.ws.http.HTTPException;
 
 import java.util.concurrent.TimeUnit;
@@ -33,6 +38,7 @@
 import ves.*;
 import config.Config;
 
+
 public class VolthaKafkaConsumer {
 
     private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
@@ -42,80 +48,92 @@
     private KafkaConsumer<Long, String> consumer;
 
     public VolthaKafkaConsumer() {
-      logger.debug("VolthaKafkaConsumer constructor called");
-      initVesAgent();
-      consumer = createConsumer();
+        logger.debug("VolthaKafkaConsumer constructor called");
+        initVesAgent();
+        try {
+            consumer = createConsumer();
+        } catch (KafkaException e) {
+            logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+            //Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
+        }
     }
 
     private void initVesAgent() {
-      VesAgent.initVes();
+        VesAgent.initVes();
     }
 
     private KafkaConsumer<Long, String> createConsumer() {
-      logger.debug("Creating Kafka Consumer");
+        logger.debug("Creating Kafka Consumer");
 
-      String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
-      final Properties props = new Properties();
-      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                                  kafkaAddress);
-      props.put(ConsumerConfig.GROUP_ID_CONFIG,
-                                  "KafkaExampleConsumer");
-      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-              LongDeserializer.class.getName());
-      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-              StringDeserializer.class.getName());
-      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-              false);
+        String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        kafkaAddress);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG,
+        "KafkaExampleConsumer");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        LongDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        StringDeserializer.class.getName());
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        false);
 
-      // Create the consumer using props.
-      final KafkaConsumer<Long, String> consumer =
-                                  new KafkaConsumer<>(props);
+        // Create the consumer using props.
+        final KafkaConsumer<Long, String> consumer =
+        new KafkaConsumer<>(props);
 
-      // Subscribe to the topic.
-      consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
-      return consumer;
-  }
+        // Subscribe to the topic.
+        consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+        return consumer;
+    }
 
-  public void runConsumer() throws InterruptedException {
+    public void runConsumer() throws InterruptedException {
 
-      logger.debug("Starting Consumer");
+        logger.debug("Starting Consumer");
 
-      while (true) {
-	  ConsumerRecords<Long, String> consumerRecords;
- 	  try {
-	  	consumerRecords = consumer.poll(100000);
-	  } catch (KafkaException e) {
-		logger.debug("Error with Kafka connection. Retrying in 15 seconds.");
-		this.consumer = createConsumer();
-		TimeUnit.SECONDS.sleep(15);
-		continue;
-	  }
-          logger.info("{} Records retrieved from poll.", consumerRecords.count());
+        while (true) {
+            ConsumerRecords<Long, String> consumerRecords;
+            try {
+                if (consumer == null) {
+                    this.consumer = createConsumer();
+                }
+                consumerRecords = consumer.poll(20000);
+            } catch (KafkaException e) {
+                logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+                consumer = null;
+                TimeUnit.SECONDS.sleep(15);
+                continue;
+            }
+            logger.info("{} Records retrieved from poll.", consumerRecords.count());
 
-          boolean commit = true;
-          try {
-            consumerRecords.forEach(record -> {
-              logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
-                      record.key(), record.value(),
-                      record.partition(), record.offset());
-              logger.info("Attempting to send data to VES");
-              boolean success = VesAgent.sendToVES(record.value());
-              if (!success) {
-                throw new HTTPException(0);
-              } else {
-                logger.info("Sent Ves Message");
-              }
-            });
-          } catch (HTTPException e) {
-            logger.info("Ves message failed. Going back to polling.");
-            commit = false;
-          }
-          if (commit) {
-            consumer.commitAsync();
-          }
-      }
-      //consumer.close();
-      //logger.debug("DONE");
+            boolean commit = true;
+            try {
+                consumerRecords.forEach(record -> {
+                    Instant start = Instant.now();
+                    logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
+                    record.key(), record.value(),
+                    record.partition(), record.offset());
+                    logger.info("Attempting to send data to VES");
+                    boolean success = VesAgent.sendToVES(record.value());
+                    if (!success) {
+                        throw new HTTPException(0);
+                    } else {
+                        Instant finish = Instant.now();
+                        logger.info("Sent Ves Message. Took " + Duration.between(start, finish).toMillis() + " Milliseconds.");
+                    }
+                });
+            } catch (HTTPException e) {
+                logger.info("Ves message failed. Going back to polling.");
+                commit = false;
+            } catch (JsonSyntaxException e) {
+                logger.error("Json Syntax Exception: ", e);
+            }
+            if (commit) {
+                consumer.commitAsync();
+            }
+        }
+        //consumer.close();
+        //logger.debug("DONE");
     }
 
 }
