blob: a9f0603a17ab7d9396c9fd04dddf77d0ecdbee13 [file] [log] [blame]
William Kurkianbde6fc92018-07-13 17:19:58 -04001/*
2 * Copyright 2018- Cisco
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka;
17import org.apache.kafka.clients.consumer.*;
18import org.apache.kafka.clients.consumer.Consumer;
19import org.apache.kafka.common.serialization.LongDeserializer;
20import org.apache.kafka.common.serialization.StringDeserializer;
21import org.apache.kafka.common.KafkaException;
22import java.util.Collections;
23import java.util.Properties;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
26import org.slf4j.Marker;
27import org.slf4j.MarkerFactory;
28
29import javax.xml.ws.http.HTTPException;
30
31import java.util.concurrent.TimeUnit;
32
33import ves.*;
34import config.Config;
35
36public class VolthaKafkaConsumer {
37
38 private final Logger logger = LoggerFactory.getLogger("VolthaKafkaConsumer");
39 private final String dataMarkerText = "DATA";
40 private final Marker dataMarker = MarkerFactory.getMarker(dataMarkerText);
41
42 private KafkaConsumer<Long, String> consumer;
43
44 public VolthaKafkaConsumer() {
45 logger.debug("VolthaKafkaConsumer constructor called");
46 initVesAgent();
47 consumer = createConsumer();
48 }
49
50 private void initVesAgent() {
51 VesAgent.initVes();
52 }
53
54 private KafkaConsumer<Long, String> createConsumer() {
55 logger.debug("Creating Kafka Consumer");
56
57 String kafkaAddress = Config.getKafkaAddress() + ":" + Config.getKafkaPort();
58 final Properties props = new Properties();
59 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
60 kafkaAddress);
61 props.put(ConsumerConfig.GROUP_ID_CONFIG,
62 "KafkaExampleConsumer");
63 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
64 LongDeserializer.class.getName());
65 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
66 StringDeserializer.class.getName());
67 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
68 false);
69
70 // Create the consumer using props.
71 final KafkaConsumer<Long, String> consumer =
72 new KafkaConsumer<>(props);
73
74 // Subscribe to the topic.
75 consumer.subscribe(Collections.singletonList(Config.getKafkaTopic()));
76 return consumer;
77 }
78
79 public void runConsumer() throws InterruptedException {
80
81 logger.debug("Starting Consumer");
82
83 while (true) {
84 ConsumerRecords<Long, String> consumerRecords;
85 try {
86 consumerRecords = consumer.poll(100000);
87 } catch (KafkaException e) {
88 logger.debug("Error with Kafka connection. Retrying in 15 seconds.");
89 this.consumer = createConsumer();
90 TimeUnit.SECONDS.sleep(15);
91 continue;
92 }
93 logger.info("{} Records retrieved from poll.", consumerRecords.count());
94
95 boolean commit = true;
96 try {
97 consumerRecords.forEach(record -> {
98 logger.info(dataMarker, "Consumer Record:({}, {}, {}, {})\n",
99 record.key(), record.value(),
100 record.partition(), record.offset());
101 logger.info("Attempting to send data to VES");
102 boolean success = VesAgent.sendToVES(record.value());
103 if (!success) {
104 throw new HTTPException(0);
105 } else {
106 logger.info("Sent Ves Message");
107 }
108 });
109 } catch (HTTPException e) {
110 logger.info("Ves message failed. Going back to polling.");
111 commit = false;
112 }
113 if (commit) {
114 consumer.commitAsync();
115 }
116 }
117 //consumer.close();
118 //logger.debug("DONE");
119 }
120
121}