blob: 1cb5400a5f9139dc6a2f68c92db0432fb242dadc [file] [log] [blame]
Shad Ansari060a6fc2019-03-14 10:03:55 -07001#!/usr/bin/env python
2#
Shad Ansari2b7a9c82019-03-22 11:40:35 -07003# Copyright 2019 the original author or authors.
Shad Ansari060a6fc2019-03-14 10:03:55 -07004#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
Shad Ansari060a6fc2019-03-14 10:03:55 -070018import sys
19import getopt
Shad Ansari060a6fc2019-03-14 10:03:55 -070020import logging
Shad Ansari2b7a9c82019-03-22 11:40:35 -070021from structlog import get_logger
22from confluent_kafka import Consumer, KafkaError
Shad Ansari33b3a242019-04-22 14:52:07 -070023from voltha.registry import registry
Shad Ansari2b7a9c82019-03-22 11:40:35 -070024
25log = get_logger()
26
27
28class KConsumer(object):
Shad Ansarie969afc2019-04-05 15:16:41 -070029 def __init__(self, callback, *topics):
Shad Ansari33b3a242019-04-22 14:52:07 -070030 kafka_proxy = registry('kafka_proxy')
Shad Ansari2b7a9c82019-03-22 11:40:35 -070031 if kafka_proxy and not kafka_proxy.is_faulty():
Shad Ansari2b7a9c82019-03-22 11:40:35 -070032 self.kafka_endpoint = kafka_proxy.kafka_endpoint
Shad Ansariecb0dca2019-04-03 15:53:03 -070033 log.debug('kafka-proxy-available', endpoint=self.kafka_endpoint)
Shad Ansari2b7a9c82019-03-22 11:40:35 -070034 else:
Shad Ansari995ca632019-04-08 19:43:46 -070035 log.error('kafka-proxy-unavailable')
Shad Ansari2b7a9c82019-03-22 11:40:35 -070036
37 conf = {'bootstrap.servers': self.kafka_endpoint,
Shad Ansariecb0dca2019-04-03 15:53:03 -070038 'group.id': "mygroup"}
Shad Ansari2b7a9c82019-03-22 11:40:35 -070039
40 logger = logging.getLogger('openolt-kafka-consumer')
41 logger.setLevel(logging.DEBUG)
42 handler = logging.StreamHandler()
43 handler.setFormatter(logging.Formatter(
44 '%(asctime)-15s %(levelname)-8s %(message)s'))
45 logger.addHandler(handler)
46
47 # Create Consumer instance
48 # Hint: try debug='fetch' to generate some log messages
49 # self._c = Consumer(conf, logger=logger, debug='fetch')
50 log.debug('creating kafka consumer', conf=conf)
51 self._c = Consumer(conf, logger=logger)
52
53 # Subscribe to topics
54 log.debug('subscribe to topics', topics=topics)
Shad Ansariecb0dca2019-04-03 15:53:03 -070055 self.topics = list(topics)
56 self._c.subscribe(self.topics)
Shad Ansari2b7a9c82019-03-22 11:40:35 -070057
Shad Ansari2b7a9c82019-03-22 11:40:35 -070058 # Read messages from Kafka and hand it to to callback
59 try:
60 while True:
Shad Ansariecb0dca2019-04-03 15:53:03 -070061 log.debug('polling kafka for messages', topics=self.topics)
Shad Ansari2b7a9c82019-03-22 11:40:35 -070062 msg = self._c.poll(timeout=1.0)
63 if msg is None:
64 continue
65 elif not msg.error():
Shad Ansariecb0dca2019-04-03 15:53:03 -070066 log.debug('got a kafka message', topic=msg.topic())
Shad Ansarie969afc2019-04-05 15:16:41 -070067 callback(msg.topic(), msg.value())
Shad Ansari2b7a9c82019-03-22 11:40:35 -070068 elif msg.error().code() == KafkaError._PARTITION_EOF:
69 pass
70 else:
71 log.error('Error occured: {0}'.format(msg.error().str()))
72
73 except KeyboardInterrupt:
74 pass
75
76 finally:
77 # Close down consumer to commit final offsets.
78 self._c.close()
79
Shad Ansari060a6fc2019-03-14 10:03:55 -070080
81def print_usage_and_exit(program_name):
Shad Ansari2b7a9c82019-03-22 11:40:35 -070082 sys.stderr.write(
83 'Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n'
84 % program_name)
Shad Ansari060a6fc2019-03-14 10:03:55 -070085 sys.exit(1)
86
87
88if __name__ == '__main__':
Shad Ansari2b7a9c82019-03-22 11:40:35 -070089 """
90 Usage:
91 python openolt_kafka_consumer.py $(kubectl get pod -o wide \
Shad Ansariecb0dca2019-04-03 15:53:03 -070092 | grep cord-kafka-0 | awk '{print $6}'):9092 \
93 mygroup openolt.ind.olt openolt.ind.pkt
Shad Ansari2b7a9c82019-03-22 11:40:35 -070094 """
Shad Ansari060a6fc2019-03-14 10:03:55 -070095 optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
96 if len(argv) < 3:
97 print_usage_and_exit(sys.argv[0])
98
99 broker = argv[0]
100 group = argv[1]
101 topics = argv[2:]
102 # Consumer configuration
103 # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Shad Ansari060a6fc2019-03-14 10:03:55 -0700104 conf = {'bootstrap.servers': broker,
Shad Ansariecb0dca2019-04-03 15:53:03 -0700105 'group.id': group}
Shad Ansari060a6fc2019-03-14 10:03:55 -0700106
107 logger = logging.getLogger('openolt-kafka-consumer')
108 logger.setLevel(logging.DEBUG)
109 handler = logging.StreamHandler()
Shad Ansari2b7a9c82019-03-22 11:40:35 -0700110 handler.setFormatter(logging.Formatter(
111 '%(asctime)-15s %(levelname)-8s %(message)s'))
Shad Ansari060a6fc2019-03-14 10:03:55 -0700112 logger.addHandler(handler)
113
114 # Create Consumer instance
115 # Hint: try debug='fetch' to generate some log messages
116 # c = Consumer(conf, logger=logger, debug='fetch')
117 c = Consumer(conf, logger=logger)
118
Shad Ansari060a6fc2019-03-14 10:03:55 -0700119 # Subscribe to topics
Shad Ansariecb0dca2019-04-03 15:53:03 -0700120 c.subscribe(topics)
Shad Ansari060a6fc2019-03-14 10:03:55 -0700121
122 # Read messages from Kafka, print to stdout
123 try:
124 while True:
125 msg = c.poll(timeout=1.0)
126 if msg is None:
127 continue
128 elif not msg.error():
Shad Ansariecb0dca2019-04-03 15:53:03 -0700129 print('got a kafka message, topic: {0}'.format(msg.topic()))
Shad Ansari060a6fc2019-03-14 10:03:55 -0700130 print(msg.value())
131 elif msg.error().code() == KafkaError._PARTITION_EOF:
132 # print('End of partition reached {0}/{1}'
133 # .format(msg.topic(), msg.partition()))
134 pass
135 else:
136 print('Error occured: {0}'.format(msg.error().str()))
137
138 except KeyboardInterrupt:
139 pass
140
141 finally:
142 # Close down consumer to commit final offsets.
143 c.close()