blob: ba121e69798b56af1fa86e9367fbe6b1cbe4141d [file] [log] [blame]
/*
* Copyright 2018- Cisco
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.KafkaException;
import java.util.Collections;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import java.time.Instant;
import java.time.Duration;
import com.google.gson.JsonSyntaxException;
import javax.xml.ws.http.HTTPException;
import java.util.concurrent.TimeUnit;
import ves.*;
import config.Config;
public class VolthaKafkaConsumer {
private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
private final String dataMarkerText = "DATA";
private final Marker dataMarker = MarkerFactory.getMarker(dataMarkerText);
private KafkaConsumer<Long, String> consumer;
private KafkaConsumerType type;
private VesAgent vesAgent;
public VolthaKafkaConsumer(KafkaConsumerType type) {
logger.debug("VolthaKafkaConsumer constructor called");
this.type = type;
vesAgent = new VesAgent();
try {
consumer = createConsumer();
} catch (Exception e) {
logger.error("Error with Kafka: ", e);
logger.error("Retrying in 15 seconds.");
//Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
}
}
private KafkaConsumer<Long, String> createConsumer() {
logger.debug("Creating Kafka Consumer");
String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"VesAgent");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false);
// Create the consumer using props.
final KafkaConsumer<Long, String> consumer =
new KafkaConsumer<>(props);
// Subscribe to the topic.
switch (type) {
case ALARMS:
consumer.subscribe(Collections.singletonList(Config.getKafkaAlarmsTopic()));
break;
case KPIS:
consumer.subscribe(Collections.singletonList(Config.getKafkaKpisTopic()));
break;
}
return consumer;
}
public void runConsumer() throws InterruptedException {
logger.debug("Starting Kafka Consumer");
while (true) {
ConsumerRecords<Long, String> consumerRecords;
try {
if (consumer == null) {
this.consumer = createConsumer();
}
consumerRecords = consumer.poll(20000);
} catch (Exception e) {
logger.error("Error with kafka: ", e);
logger.error("Retrying in 15 seconds.");
consumer = null;
TimeUnit.SECONDS.sleep(15);
continue;
}
logger.info("{} Records retrieved from poll.", consumerRecords.count());
boolean commit = true;
try {
consumerRecords.forEach(record -> {
Instant start = Instant.now();
logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
record.key(), record.value(),
record.partition(), record.offset());
logger.info("Attempting to send data to VES");
boolean success = vesAgent.sendToVES(type, record.value());
if (!success) {
throw new HTTPException(0);
} else {
Instant finish = Instant.now();
logger.info("Sent Ves Message. Took " + Duration.between(start, finish).toMillis() + " Milliseconds.");
}
});
} catch (HTTPException e) {
logger.info("Ves message failed. Going back to polling.");
commit = false;
} catch (JsonSyntaxException e) {
logger.error("Json Syntax Exception: ", e);
}
if (commit) {
consumer.commitAsync();
}
}
//consumer.close();
//logger.debug("DONE");
}
}