blob: 11deb811134fd8b9d53475b6a2b4ecff624c7772 [file] [log] [blame]
William Kurkianbde6fc92018-07-13 17:19:58 -04001/*
William Kurkian1bedb412018-07-19 12:55:41 -04002* Copyright 2018- Cisco
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
William Kurkian18ec3442018-09-10 16:27:37 -040015*
William Kurkian1bedb412018-07-19 12:55:41 -040016*/
William Kurkianbde6fc92018-07-13 17:19:58 -040017package kafka;
18import org.apache.kafka.clients.consumer.*;
19import org.apache.kafka.clients.consumer.Consumer;
20import org.apache.kafka.common.serialization.LongDeserializer;
21import org.apache.kafka.common.serialization.StringDeserializer;
22import org.apache.kafka.common.KafkaException;
23import java.util.Collections;
24import java.util.Properties;
25import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27import org.slf4j.Marker;
28import org.slf4j.MarkerFactory;
29
William Kurkian1bedb412018-07-19 12:55:41 -040030import java.time.Instant;
31import java.time.Duration;
32
33import com.google.gson.JsonSyntaxException;
34
William Kurkianbde6fc92018-07-13 17:19:58 -040035import javax.xml.ws.http.HTTPException;
36
37import java.util.concurrent.TimeUnit;
38
39import ves.*;
40import config.Config;
41
42public class VolthaKafkaConsumer {
43
44 private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
45 private final String dataMarkerText = "DATA";
46 private final Marker dataMarker = MarkerFactory.getMarker(dataMarkerText);
47
48 private KafkaConsumer<Long, String> consumer;
49
William Kurkian18ec3442018-09-10 16:27:37 -040050 private KafkaConsumerType type;
51
52 public VolthaKafkaConsumer(KafkaConsumerType type) {
William Kurkian1bedb412018-07-19 12:55:41 -040053 logger.debug("VolthaKafkaConsumer constructor called");
54 initVesAgent();
William Kurkian18ec3442018-09-10 16:27:37 -040055 this.type = type;
William Kurkian1bedb412018-07-19 12:55:41 -040056 try {
57 consumer = createConsumer();
William Kurkian18ec3442018-09-10 16:27:37 -040058 } catch (Exception e) {
59 logger.error("Error with Kafka: ", e);
60 logger.error("Retrying in 15 seconds.");
William Kurkian1bedb412018-07-19 12:55:41 -040061 //Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
62 }
William Kurkianbde6fc92018-07-13 17:19:58 -040063 }
64
65 private void initVesAgent() {
William Kurkian1bedb412018-07-19 12:55:41 -040066 VesAgent.initVes();
William Kurkianbde6fc92018-07-13 17:19:58 -040067 }
68
69 private KafkaConsumer<Long, String> createConsumer() {
William Kurkian1bedb412018-07-19 12:55:41 -040070 logger.debug("Creating Kafka Consumer");
William Kurkianbde6fc92018-07-13 17:19:58 -040071
William Kurkian1bedb412018-07-19 12:55:41 -040072 String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
73 final Properties props = new Properties();
74 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
75 kafkaAddress);
76 props.put(ConsumerConfig.GROUP_ID_CONFIG,
William Kurkian18ec3442018-09-10 16:27:37 -040077 "VesAgent");
William Kurkian1bedb412018-07-19 12:55:41 -040078 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
79 LongDeserializer.class.getName());
80 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
81 StringDeserializer.class.getName());
82 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
83 false);
William Kurkian1bedb412018-07-19 12:55:41 -040084 // Create the consumer using props.
85 final KafkaConsumer<Long, String> consumer =
86 new KafkaConsumer<>(props);
William Kurkian1bedb412018-07-19 12:55:41 -040087 // Subscribe to the topic.
William Kurkian18ec3442018-09-10 16:27:37 -040088 switch (type) {
89 case ALARMS:
90 consumer.subscribe(Collections.singletonList(Config.getKafkaAlarmsTopic()));
91 break;
92 case KPIS:
93 consumer.subscribe(Collections.singletonList(Config.getKafkaKpisTopic()));
94 break;
95 }
William Kurkian1bedb412018-07-19 12:55:41 -040096 return consumer;
97 }
William Kurkianbde6fc92018-07-13 17:19:58 -040098
William Kurkian1bedb412018-07-19 12:55:41 -040099 public void runConsumer() throws InterruptedException {
William Kurkianbde6fc92018-07-13 17:19:58 -0400100
William Kurkian18ec3442018-09-10 16:27:37 -0400101 logger.debug("Starting Kafka Consumer");
William Kurkianbde6fc92018-07-13 17:19:58 -0400102
William Kurkian1bedb412018-07-19 12:55:41 -0400103 while (true) {
104 ConsumerRecords<Long, String> consumerRecords;
105 try {
106 if (consumer == null) {
107 this.consumer = createConsumer();
108 }
109 consumerRecords = consumer.poll(20000);
William Kurkian18ec3442018-09-10 16:27:37 -0400110 } catch (Exception e) {
111 logger.error("Error with kafka: ", e);
112 logger.error("Retrying in 15 seconds.");
William Kurkian1bedb412018-07-19 12:55:41 -0400113 consumer = null;
114 TimeUnit.SECONDS.sleep(15);
115 continue;
116 }
117 logger.info("{} Records retrieved from poll.", consumerRecords.count());
William Kurkianbde6fc92018-07-13 17:19:58 -0400118
William Kurkian1bedb412018-07-19 12:55:41 -0400119 boolean commit = true;
120 try {
121 consumerRecords.forEach(record -> {
122 Instant start = Instant.now();
123 logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
124 record.key(), record.value(),
125 record.partition(), record.offset());
126 logger.info("Attempting to send data to VES");
William Kurkian18ec3442018-09-10 16:27:37 -0400127 boolean success = VesAgent.sendToVES(type, record.value());
William Kurkian1bedb412018-07-19 12:55:41 -0400128 if (!success) {
129 throw new HTTPException(0);
130 } else {
131 Instant finish = Instant.now();
132 logger.info("Sent Ves Message. Took " + Duration.between(start, finish).toMillis() + " Milliseconds.");
133 }
134 });
135 } catch (HTTPException e) {
136 logger.info("Ves message failed. Going back to polling.");
137 commit = false;
138 } catch (JsonSyntaxException e) {
139 logger.error("Json Syntax Exception: ", e);
140 }
141 if (commit) {
142 consumer.commitAsync();
143 }
144 }
145 //consumer.close();
146 //logger.debug("DONE");
William Kurkianbde6fc92018-07-13 17:19:58 -0400147 }
148
149}