Added id code and removed from name
Added funtionality to consumer and send kpi events to onap
Change-Id: I0849e3b9188b1fa53210c341f1fc6e6228c5de9b
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/src/main/java/kafka/KafkaConsumerType.java b/src/main/java/kafka/KafkaConsumerType.java
new file mode 100644
index 0000000..13cd249
--- /dev/null
+++ b/src/main/java/kafka/KafkaConsumerType.java
@@ -0,0 +1,22 @@
+/*
+* Copyright 2018- Cisco
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package kafka;
+
+public enum KafkaConsumerType {
+ ALARMS,
+ KPIS
+}
diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
index dc431bb..11deb81 100644
--- a/src/main/java/kafka/VolthaKafkaConsumer.java
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -12,6 +12,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+*
*/
package kafka;
import org.apache.kafka.clients.consumer.*;
@@ -38,7 +39,6 @@
import ves.*;
import config.Config;
-
public class VolthaKafkaConsumer {
private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
@@ -47,13 +47,17 @@
private KafkaConsumer<Long, String> consumer;
- public VolthaKafkaConsumer() {
+ private KafkaConsumerType type;
+
+ public VolthaKafkaConsumer(KafkaConsumerType type) {
logger.debug("VolthaKafkaConsumer constructor called");
initVesAgent();
+ this.type = type;
try {
consumer = createConsumer();
- } catch (KafkaException e) {
- logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+ } catch (Exception e) {
+ logger.error("Error with Kafka: ", e);
+ logger.error("Retrying in 15 seconds.");
//Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
}
}
@@ -70,26 +74,31 @@
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
- "KafkaExampleConsumer");
+ "VesAgent");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false);
-
// Create the consumer using props.
final KafkaConsumer<Long, String> consumer =
new KafkaConsumer<>(props);
-
// Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+ switch (type) {
+ case ALARMS:
+ consumer.subscribe(Collections.singletonList(Config.getKafkaAlarmsTopic()));
+ break;
+ case KPIS:
+ consumer.subscribe(Collections.singletonList(Config.getKafkaKpisTopic()));
+ break;
+ }
return consumer;
}
public void runConsumer() throws InterruptedException {
- logger.debug("Starting Consumer");
+ logger.debug("Starting Kafka Consumer");
while (true) {
ConsumerRecords<Long, String> consumerRecords;
@@ -98,8 +107,9 @@
this.consumer = createConsumer();
}
consumerRecords = consumer.poll(20000);
- } catch (KafkaException e) {
- logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+ } catch (Exception e) {
+ logger.error("Error with kafka: ", e);
+ logger.error("Retrying in 15 seconds.");
consumer = null;
TimeUnit.SECONDS.sleep(15);
continue;
@@ -114,7 +124,7 @@
record.key(), record.value(),
record.partition(), record.offset());
logger.info("Attempting to send data to VES");
- boolean success = VesAgent.sendToVES(record.value());
+ boolean success = VesAgent.sendToVES(type, record.value());
if (!success) {
throw new HTTPException(0);
} else {