blob: dc431bb8297347f60181834d171ca7edbd706a4c [file] [log] [blame]
William Kurkianbde6fc92018-07-13 17:19:58 -04001/*
William Kurkian1bedb412018-07-19 12:55:41 -04002* Copyright 2018- Cisco
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
William Kurkianbde6fc92018-07-13 17:19:58 -040016package kafka;
17import org.apache.kafka.clients.consumer.*;
18import org.apache.kafka.clients.consumer.Consumer;
19import org.apache.kafka.common.serialization.LongDeserializer;
20import org.apache.kafka.common.serialization.StringDeserializer;
21import org.apache.kafka.common.KafkaException;
22import java.util.Collections;
23import java.util.Properties;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
26import org.slf4j.Marker;
27import org.slf4j.MarkerFactory;
28
William Kurkian1bedb412018-07-19 12:55:41 -040029import java.time.Instant;
30import java.time.Duration;
31
32import com.google.gson.JsonSyntaxException;
33
William Kurkianbde6fc92018-07-13 17:19:58 -040034import javax.xml.ws.http.HTTPException;
35
36import java.util.concurrent.TimeUnit;
37
38import ves.*;
39import config.Config;
40
William Kurkian1bedb412018-07-19 12:55:41 -040041
William Kurkianbde6fc92018-07-13 17:19:58 -040042public class VolthaKafkaConsumer {
43
44 private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
45 private final String dataMarkerText = "DATA";
46 private final Marker dataMarker = MarkerFactory.getMarker(dataMarkerText);
47
48 private KafkaConsumer<Long, String> consumer;
49
50 public VolthaKafkaConsumer() {
William Kurkian1bedb412018-07-19 12:55:41 -040051 logger.debug("VolthaKafkaConsumer constructor called");
52 initVesAgent();
53 try {
54 consumer = createConsumer();
55 } catch (KafkaException e) {
56 logger.error("Error with Kafka connection. Retrying in 15 seconds.");
57 //Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
58 }
William Kurkianbde6fc92018-07-13 17:19:58 -040059 }
60
61 private void initVesAgent() {
William Kurkian1bedb412018-07-19 12:55:41 -040062 VesAgent.initVes();
William Kurkianbde6fc92018-07-13 17:19:58 -040063 }
64
65 private KafkaConsumer<Long, String> createConsumer() {
William Kurkian1bedb412018-07-19 12:55:41 -040066 logger.debug("Creating Kafka Consumer");
William Kurkianbde6fc92018-07-13 17:19:58 -040067
William Kurkian1bedb412018-07-19 12:55:41 -040068 String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
69 final Properties props = new Properties();
70 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
71 kafkaAddress);
72 props.put(ConsumerConfig.GROUP_ID_CONFIG,
73 "KafkaExampleConsumer");
74 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
75 LongDeserializer.class.getName());
76 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
77 StringDeserializer.class.getName());
78 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
79 false);
William Kurkianbde6fc92018-07-13 17:19:58 -040080
William Kurkian1bedb412018-07-19 12:55:41 -040081 // Create the consumer using props.
82 final KafkaConsumer<Long, String> consumer =
83 new KafkaConsumer<>(props);
William Kurkianbde6fc92018-07-13 17:19:58 -040084
William Kurkian1bedb412018-07-19 12:55:41 -040085 // Subscribe to the topic.
86 consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
87 return consumer;
88 }
William Kurkianbde6fc92018-07-13 17:19:58 -040089
William Kurkian1bedb412018-07-19 12:55:41 -040090 public void runConsumer() throws InterruptedException {
William Kurkianbde6fc92018-07-13 17:19:58 -040091
William Kurkian1bedb412018-07-19 12:55:41 -040092 logger.debug("Starting Consumer");
William Kurkianbde6fc92018-07-13 17:19:58 -040093
William Kurkian1bedb412018-07-19 12:55:41 -040094 while (true) {
95 ConsumerRecords<Long, String> consumerRecords;
96 try {
97 if (consumer == null) {
98 this.consumer = createConsumer();
99 }
100 consumerRecords = consumer.poll(20000);
101 } catch (KafkaException e) {
102 logger.error("Error with Kafka connection. Retrying in 15 seconds.");
103 consumer = null;
104 TimeUnit.SECONDS.sleep(15);
105 continue;
106 }
107 logger.info("{} Records retrieved from poll.", consumerRecords.count());
William Kurkianbde6fc92018-07-13 17:19:58 -0400108
William Kurkian1bedb412018-07-19 12:55:41 -0400109 boolean commit = true;
110 try {
111 consumerRecords.forEach(record -> {
112 Instant start = Instant.now();
113 logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
114 record.key(), record.value(),
115 record.partition(), record.offset());
116 logger.info("Attempting to send data to VES");
117 boolean success = VesAgent.sendToVES(record.value());
118 if (!success) {
119 throw new HTTPException(0);
120 } else {
121 Instant finish = Instant.now();
122 logger.info("Sent Ves Message. Took " + Duration.between(start, finish).toMillis() + " Milliseconds.");
123 }
124 });
125 } catch (HTTPException e) {
126 logger.info("Ves message failed. Going back to polling.");
127 commit = false;
128 } catch (JsonSyntaxException e) {
129 logger.error("Json Syntax Exception: ", e);
130 }
131 if (commit) {
132 consumer.commitAsync();
133 }
134 }
135 //consumer.close();
136 //logger.debug("DONE");
William Kurkianbde6fc92018-07-13 17:19:58 -0400137 }
138
139}