blob: ba121e69798b56af1fa86e9367fbe6b1cbe4141d [file] [log] [blame]
William Kurkianbde6fc92018-07-13 17:19:58 -04001/*
William Kurkian1bedb412018-07-19 12:55:41 -04002* Copyright 2018- Cisco
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
William Kurkian18ec3442018-09-10 16:27:37 -040015*
William Kurkian1bedb412018-07-19 12:55:41 -040016*/
William Kurkianbde6fc92018-07-13 17:19:58 -040017package kafka;
18import org.apache.kafka.clients.consumer.*;
19import org.apache.kafka.clients.consumer.Consumer;
20import org.apache.kafka.common.serialization.LongDeserializer;
21import org.apache.kafka.common.serialization.StringDeserializer;
22import org.apache.kafka.common.KafkaException;
23import java.util.Collections;
24import java.util.Properties;
25import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27import org.slf4j.Marker;
28import org.slf4j.MarkerFactory;
29
William Kurkian1bedb412018-07-19 12:55:41 -040030import java.time.Instant;
31import java.time.Duration;
32
33import com.google.gson.JsonSyntaxException;
34
William Kurkianbde6fc92018-07-13 17:19:58 -040035import javax.xml.ws.http.HTTPException;
36
37import java.util.concurrent.TimeUnit;
38
39import ves.*;
40import config.Config;
41
42public class VolthaKafkaConsumer {
43
44 private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
45 private final String dataMarkerText = "DATA";
46 private final Marker dataMarker = MarkerFactory.getMarker(dataMarkerText);
47
48 private KafkaConsumer<Long, String> consumer;
49
William Kurkian18ec3442018-09-10 16:27:37 -040050 private KafkaConsumerType type;
51
William Kurkian9600b5c2018-09-20 16:05:59 -040052 private VesAgent vesAgent;
53
William Kurkian18ec3442018-09-10 16:27:37 -040054 public VolthaKafkaConsumer(KafkaConsumerType type) {
William Kurkian1bedb412018-07-19 12:55:41 -040055 logger.debug("VolthaKafkaConsumer constructor called");
William Kurkian18ec3442018-09-10 16:27:37 -040056 this.type = type;
William Kurkian9600b5c2018-09-20 16:05:59 -040057 vesAgent = new VesAgent();
William Kurkian1bedb412018-07-19 12:55:41 -040058 try {
59 consumer = createConsumer();
William Kurkian18ec3442018-09-10 16:27:37 -040060 } catch (Exception e) {
61 logger.error("Error with Kafka: ", e);
62 logger.error("Retrying in 15 seconds.");
William Kurkian1bedb412018-07-19 12:55:41 -040063 //Don't try to resolve it here. Try again in the thread loo, in case this is a temporal issue
64 }
William Kurkianbde6fc92018-07-13 17:19:58 -040065 }
66
William Kurkianbde6fc92018-07-13 17:19:58 -040067 private KafkaConsumer<Long, String> createConsumer() {
William Kurkian1bedb412018-07-19 12:55:41 -040068 logger.debug("Creating Kafka Consumer");
William Kurkianbde6fc92018-07-13 17:19:58 -040069
William Kurkian1bedb412018-07-19 12:55:41 -040070 String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
71 final Properties props = new Properties();
72 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
73 kafkaAddress);
74 props.put(ConsumerConfig.GROUP_ID_CONFIG,
William Kurkian18ec3442018-09-10 16:27:37 -040075 "VesAgent");
William Kurkian1bedb412018-07-19 12:55:41 -040076 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
77 LongDeserializer.class.getName());
78 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
79 StringDeserializer.class.getName());
80 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
81 false);
William Kurkian1bedb412018-07-19 12:55:41 -040082 // Create the consumer using props.
83 final KafkaConsumer<Long, String> consumer =
84 new KafkaConsumer<>(props);
William Kurkian1bedb412018-07-19 12:55:41 -040085 // Subscribe to the topic.
William Kurkian18ec3442018-09-10 16:27:37 -040086 switch (type) {
87 case ALARMS:
88 consumer.subscribe(Collections.singletonList(Config.getKafkaAlarmsTopic()));
89 break;
90 case KPIS:
91 consumer.subscribe(Collections.singletonList(Config.getKafkaKpisTopic()));
92 break;
93 }
William Kurkian1bedb412018-07-19 12:55:41 -040094 return consumer;
95 }
William Kurkianbde6fc92018-07-13 17:19:58 -040096
William Kurkian1bedb412018-07-19 12:55:41 -040097 public void runConsumer() throws InterruptedException {
William Kurkianbde6fc92018-07-13 17:19:58 -040098
William Kurkian18ec3442018-09-10 16:27:37 -040099 logger.debug("Starting Kafka Consumer");
William Kurkianbde6fc92018-07-13 17:19:58 -0400100
William Kurkian1bedb412018-07-19 12:55:41 -0400101 while (true) {
102 ConsumerRecords<Long, String> consumerRecords;
103 try {
104 if (consumer == null) {
105 this.consumer = createConsumer();
106 }
107 consumerRecords = consumer.poll(20000);
William Kurkian18ec3442018-09-10 16:27:37 -0400108 } catch (Exception e) {
109 logger.error("Error with kafka: ", e);
110 logger.error("Retrying in 15 seconds.");
William Kurkian1bedb412018-07-19 12:55:41 -0400111 consumer = null;
112 TimeUnit.SECONDS.sleep(15);
113 continue;
114 }
115 logger.info("{} Records retrieved from poll.", consumerRecords.count());
William Kurkianbde6fc92018-07-13 17:19:58 -0400116
William Kurkian1bedb412018-07-19 12:55:41 -0400117 boolean commit = true;
118 try {
119 consumerRecords.forEach(record -> {
120 Instant start = Instant.now();
121 logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
122 record.key(), record.value(),
123 record.partition(), record.offset());
124 logger.info("Attempting to send data to VES");
William Kurkian9600b5c2018-09-20 16:05:59 -0400125 boolean success = vesAgent.sendToVES(type, record.value());
William Kurkian1bedb412018-07-19 12:55:41 -0400126 if (!success) {
127 throw new HTTPException(0);
128 } else {
129 Instant finish = Instant.now();
130 logger.info("Sent Ves Message. Took " + Duration.between(start, finish).toMillis() + " Milliseconds.");
131 }
132 });
133 } catch (HTTPException e) {
134 logger.info("Ves message failed. Going back to polling.");
135 commit = false;
136 } catch (JsonSyntaxException e) {
137 logger.error("Json Syntax Exception: ", e);
138 }
139 if (commit) {
140 consumer.commitAsync();
141 }
142 }
143 //consumer.close();
144 //logger.debug("DONE");
William Kurkianbde6fc92018-07-13 17:19:58 -0400145 }
146
147}