A set of fixes and features
Some updates to handle errors in certain scenarios.
Formatted code
Initial work to parse json.
Added code to map certain json values from incoming data into parts of the VES format.
Added lines to map type and severity to VES
Completed support for mapping alarm messages. Also added timing functionality for tracking message send times

Change-Id: Iaf9ce372552fce3f744476719f5fd79548d22ef4
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
index a9f0603..dc431bb 100644
--- a/src/main/java/kafka/VolthaKafkaConsumer.java
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -1,18 +1,18 @@
 /*
- * Copyright 2018- Cisco
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package kafka;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -26,6 +26,11 @@
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 
+import java.time.Instant;
+import java.time.Duration;
+
+import com.google.gson.JsonSyntaxException;
+
 import javax.xml.ws.http.HTTPException;
 
 import java.util.concurrent.TimeUnit;
@@ -33,6 +38,7 @@
 import ves.*;
 import config.Config;
 
+
 public class VolthaKafkaConsumer {
 
     private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
@@ -42,80 +48,92 @@
     private KafkaConsumer<Long, String> consumer;
 
     public VolthaKafkaConsumer() {
-      logger.debug("VolthaKafkaConsumer constructor called");
-      initVesAgent();
-      consumer = createConsumer();
+        logger.debug("VolthaKafkaConsumer constructor called");
+        initVesAgent();
+        try {
+            consumer = createConsumer();
+        } catch (KafkaException e) {
+            logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+            //Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
+        }
     }
 
     private void initVesAgent() {
-      VesAgent.initVes();
+        VesAgent.initVes();
     }
 
     private KafkaConsumer<Long, String> createConsumer() {
-      logger.debug("Creating Kafka Consumer");
+        logger.debug("Creating Kafka Consumer");
 
-      String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
-      final Properties props = new Properties();
-      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                                  kafkaAddress);
-      props.put(ConsumerConfig.GROUP_ID_CONFIG,
-                                  "KafkaExampleConsumer");
-      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-              LongDeserializer.class.getName());
-      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-              StringDeserializer.class.getName());
-      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-              false);
+        String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        kafkaAddress);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG,
+        "KafkaExampleConsumer");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+        LongDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+        StringDeserializer.class.getName());
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        false);
 
-      // Create the consumer using props.
-      final KafkaConsumer<Long, String> consumer =
-                                  new KafkaConsumer<>(props);
+        // Create the consumer using props.
+        final KafkaConsumer<Long, String> consumer =
+        new KafkaConsumer<>(props);
 
-      // Subscribe to the topic.
-      consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
-      return consumer;
-  }
+        // Subscribe to the topic.
+        consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+        return consumer;
+    }
 
-  public void runConsumer() throws InterruptedException {
+    public void runConsumer() throws InterruptedException {
 
-      logger.debug("Starting Consumer");
+        logger.debug("Starting Consumer");
 
-      while (true) {
-	  ConsumerRecords<Long, String> consumerRecords;
- 	  try {
-	  	consumerRecords = consumer.poll(100000);
-	  } catch (KafkaException e) {
-		logger.debug("Error with Kafka connection. Retrying in 15 seconds.");
-		this.consumer = createConsumer();
-		TimeUnit.SECONDS.sleep(15);
-		continue;
-	  }
-          logger.info("{} Records retrieved from poll.", consumerRecords.count());
+        while (true) {
+            ConsumerRecords<Long, String> consumerRecords;
+            try {
+                if (consumer == null) {
+                    this.consumer = createConsumer();
+                }
+                consumerRecords = consumer.poll(20000);
+            } catch (KafkaException e) {
+                logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+                consumer = null;
+                TimeUnit.SECONDS.sleep(15);
+                continue;
+            }
+            logger.info("{} Records retrieved from poll.", consumerRecords.count());
 
-          boolean commit = true;
-          try {
-            consumerRecords.forEach(record -> {
-              logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
-                      record.key(), record.value(),
-                      record.partition(), record.offset());
-              logger.info("Attempting to send data to VES");
-              boolean success = VesAgent.sendToVES(record.value());
-              if (!success) {
-                throw new HTTPException(0);
-              } else {
-                logger.info("Sent Ves Message");
-              }
-            });
-          } catch (HTTPException e) {
-            logger.info("Ves message failed. Going back to polling.");
-            commit = false;
-          }
-          if (commit) {
-            consumer.commitAsync();
-          }
-      }
-      //consumer.close();
-      //logger.debug("DONE");
+            boolean commit = true;
+            try {
+                consumerRecords.forEach(record -> {
+                    Instant start = Instant.now();
+                    logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
+                    record.key(), record.value(),
+                    record.partition(), record.offset());
+                    logger.info("Attempting to send data to VES");
+                    boolean success = VesAgent.sendToVES(record.value());
+                    if (!success) {
+                        throw new HTTPException(0);
+                    } else {
+                        Instant finish = Instant.now();
+                        logger.info("Sent Ves Message. Took " + Duration.between(start, finish).toMillis() + " Milliseconds.");
+                    }
+                });
+            } catch (HTTPException e) {
+                logger.info("Ves message failed. Going back to polling.");
+                commit = false;
+            } catch (JsonSyntaxException e) {
+                logger.error("Json Syntax Exception: ", e);
+            }
+            if (commit) {
+                consumer.commitAsync();
+            }
+        }
+        //consumer.close();
+        //logger.debug("DONE");
     }
 
 }