Added id code and removed from name

Added funtionality to consumer and send kpi events to onap

Change-Id: I0849e3b9188b1fa53210c341f1fc6e6228c5de9b
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
index dc431bb..11deb81 100644
--- a/src/main/java/kafka/VolthaKafkaConsumer.java
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -12,6 +12,7 @@
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
+*
 */
 package kafka;
 import org.apache.kafka.clients.consumer.*;
@@ -38,7 +39,6 @@
 import ves.*;
 import config.Config;
 
-
 public class VolthaKafkaConsumer {
 
     private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
@@ -47,13 +47,17 @@
 
     private KafkaConsumer<Long, String> consumer;
 
-    public VolthaKafkaConsumer() {
+    private KafkaConsumerType type;
+
+    public VolthaKafkaConsumer(KafkaConsumerType type) {
         logger.debug("VolthaKafkaConsumer constructor called");
         initVesAgent();
+        this.type = type;
         try {
             consumer = createConsumer();
-        } catch (KafkaException e) {
-            logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+        } catch (Exception e) {
+            logger.error("Error with Kafka: ", e);
+            logger.error("Retrying in 15 seconds.");
             //Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
         }
     }
@@ -70,26 +74,31 @@
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
         kafkaAddress);
         props.put(ConsumerConfig.GROUP_ID_CONFIG,
-        "KafkaExampleConsumer");
+        "VesAgent");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
         LongDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
         StringDeserializer.class.getName());
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
         false);
-
         // Create the consumer using props.
         final KafkaConsumer<Long, String> consumer =
         new KafkaConsumer<>(props);
-
         // Subscribe to the topic.
-        consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+        switch (type) {
+            case ALARMS:
+                consumer.subscribe(Collections.singletonList(Config.getKafkaAlarmsTopic()));
+                break;
+            case KPIS:
+                consumer.subscribe(Collections.singletonList(Config.getKafkaKpisTopic()));
+                break;
+        }
         return consumer;
     }
 
     public void runConsumer() throws InterruptedException {
 
-        logger.debug("Starting Consumer");
+        logger.debug("Starting Kafka Consumer");
 
         while (true) {
             ConsumerRecords<Long, String> consumerRecords;
@@ -98,8 +107,9 @@
                     this.consumer = createConsumer();
                 }
                 consumerRecords = consumer.poll(20000);
-            } catch (KafkaException e) {
-                logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+            } catch (Exception e) {
+                logger.error("Error with kafka: ", e);
+                logger.error("Retrying in 15 seconds.");
                 consumer = null;
                 TimeUnit.SECONDS.sleep(15);
                 continue;
@@ -114,7 +124,7 @@
                     record.key(), record.value(),
                     record.partition(), record.offset());
                     logger.info("Attempting to send data to VES");
-                    boolean success = VesAgent.sendToVES(record.value());
+                    boolean success = VesAgent.sendToVES(type, record.value());
                     if (!success) {
                         throw new HTTPException(0);
                     } else {