Added id code and removed from name
Added funtionality to consumer and send kpi events to onap
Change-Id: I0849e3b9188b1fa53210c341f1fc6e6228c5de9b
Signed-off-by: William Kurkian <wkurkian@cisco.com>
diff --git a/src/main/java/kafka/VolthaKafkaConsumer.java b/src/main/java/kafka/VolthaKafkaConsumer.java
index dc431bb..11deb81 100644
--- a/src/main/java/kafka/VolthaKafkaConsumer.java
+++ b/src/main/java/kafka/VolthaKafkaConsumer.java
@@ -12,6 +12,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+*
*/
package kafka;
import org.apache.kafka.clients.consumer.*;
@@ -38,7 +39,6 @@
import ves.*;
import config.Config;
-
public class VolthaKafkaConsumer {
private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
@@ -47,13 +47,17 @@
private KafkaConsumer<Long, String> consumer;
- public VolthaKafkaConsumer() {
+ private KafkaConsumerType type;
+
+ public VolthaKafkaConsumer(KafkaConsumerType type) {
logger.debug("VolthaKafkaConsumer constructor called");
initVesAgent();
+ this.type = type;
try {
consumer = createConsumer();
- } catch (KafkaException e) {
- logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+ } catch (Exception e) {
+ logger.error("Error with Kafka: ", e);
+ logger.error("Retrying in 15 seconds.");
//Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
}
}
@@ -70,26 +74,31 @@
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
- "KafkaExampleConsumer");
+ "VesAgent");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false);
-
// Create the consumer using props.
final KafkaConsumer<Long, String> consumer =
new KafkaConsumer<>(props);
-
// Subscribe to the topic.
- consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
+ switch (type) {
+ case ALARMS:
+ consumer.subscribe(Collections.singletonList(Config.getKafkaAlarmsTopic()));
+ break;
+ case KPIS:
+ consumer.subscribe(Collections.singletonList(Config.getKafkaKpisTopic()));
+ break;
+ }
return consumer;
}
public void runConsumer() throws InterruptedException {
- logger.debug("Starting Consumer");
+ logger.debug("Starting Kafka Consumer");
while (true) {
ConsumerRecords<Long, String> consumerRecords;
@@ -98,8 +107,9 @@
this.consumer = createConsumer();
}
consumerRecords = consumer.poll(20000);
- } catch (KafkaException e) {
- logger.error("Error with Kafka connection. Retrying in 15 seconds.");
+ } catch (Exception e) {
+ logger.error("Error with kafka: ", e);
+ logger.error("Retrying in 15 seconds.");
consumer = null;
TimeUnit.SECONDS.sleep(15);
continue;
@@ -114,7 +124,7 @@
record.key(), record.value(),
record.partition(), record.offset());
logger.info("Attempting to send data to VES");
- boolean success = VesAgent.sendToVES(record.value());
+ boolean success = VesAgent.sendToVES(type, record.value());
if (!success) {
throw new HTTPException(0);
} else {