Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 3 | # Copyright 2019 the original author or authors. |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 4 | # |
| 5 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | # you may not use this file except in compliance with the License. |
| 7 | # You may obtain a copy of the License at |
| 8 | # |
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | # |
| 11 | # Unless required by applicable law or agreed to in writing, software |
| 12 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | # |
| 17 | |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 18 | import sys |
| 19 | import getopt |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 20 | import logging |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 21 | from structlog import get_logger |
| 22 | from confluent_kafka import Consumer, KafkaError |
Shad Ansari | 33b3a24 | 2019-04-22 14:52:07 -0700 | [diff] [blame] | 23 | from voltha.registry import registry |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 24 | |
| 25 | log = get_logger() |
| 26 | |
| 27 | |
| 28 | class KConsumer(object): |
Shad Ansari | e969afc | 2019-04-05 15:16:41 -0700 | [diff] [blame] | 29 | def __init__(self, callback, *topics): |
Shad Ansari | 33b3a24 | 2019-04-22 14:52:07 -0700 | [diff] [blame] | 30 | kafka_proxy = registry('kafka_proxy') |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 31 | if kafka_proxy and not kafka_proxy.is_faulty(): |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 32 | self.kafka_endpoint = kafka_proxy.kafka_endpoint |
Shad Ansari | ecb0dca | 2019-04-03 15:53:03 -0700 | [diff] [blame] | 33 | log.debug('kafka-proxy-available', endpoint=self.kafka_endpoint) |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 34 | else: |
Shad Ansari | 995ca63 | 2019-04-08 19:43:46 -0700 | [diff] [blame] | 35 | log.error('kafka-proxy-unavailable') |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 36 | |
| 37 | conf = {'bootstrap.servers': self.kafka_endpoint, |
Shad Ansari | ecb0dca | 2019-04-03 15:53:03 -0700 | [diff] [blame] | 38 | 'group.id': "mygroup"} |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 39 | |
| 40 | logger = logging.getLogger('openolt-kafka-consumer') |
| 41 | logger.setLevel(logging.DEBUG) |
| 42 | handler = logging.StreamHandler() |
| 43 | handler.setFormatter(logging.Formatter( |
| 44 | '%(asctime)-15s %(levelname)-8s %(message)s')) |
| 45 | logger.addHandler(handler) |
| 46 | |
| 47 | # Create Consumer instance |
| 48 | # Hint: try debug='fetch' to generate some log messages |
| 49 | # self._c = Consumer(conf, logger=logger, debug='fetch') |
| 50 | log.debug('creating kafka consumer', conf=conf) |
| 51 | self._c = Consumer(conf, logger=logger) |
| 52 | |
| 53 | # Subscribe to topics |
| 54 | log.debug('subscribe to topics', topics=topics) |
Shad Ansari | ecb0dca | 2019-04-03 15:53:03 -0700 | [diff] [blame] | 55 | self.topics = list(topics) |
| 56 | self._c.subscribe(self.topics) |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 57 | |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 58 | # Read messages from Kafka and hand it to to callback |
| 59 | try: |
| 60 | while True: |
Shad Ansari | ecb0dca | 2019-04-03 15:53:03 -0700 | [diff] [blame] | 61 | log.debug('polling kafka for messages', topics=self.topics) |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 62 | msg = self._c.poll(timeout=1.0) |
| 63 | if msg is None: |
| 64 | continue |
| 65 | elif not msg.error(): |
Shad Ansari | ecb0dca | 2019-04-03 15:53:03 -0700 | [diff] [blame] | 66 | log.debug('got a kafka message', topic=msg.topic()) |
Shad Ansari | e969afc | 2019-04-05 15:16:41 -0700 | [diff] [blame] | 67 | callback(msg.topic(), msg.value()) |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 68 | elif msg.error().code() == KafkaError._PARTITION_EOF: |
| 69 | pass |
| 70 | else: |
| 71 | log.error('Error occured: {0}'.format(msg.error().str())) |
| 72 | |
| 73 | except KeyboardInterrupt: |
| 74 | pass |
| 75 | |
| 76 | finally: |
| 77 | # Close down consumer to commit final offsets. |
| 78 | self._c.close() |
| 79 | |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 80 | |
| 81 | def print_usage_and_exit(program_name): |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 82 | sys.stderr.write( |
| 83 | 'Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n' |
| 84 | % program_name) |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 85 | sys.exit(1) |
| 86 | |
| 87 | |
| 88 | if __name__ == '__main__': |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 89 | """ |
| 90 | Usage: |
| 91 | python openolt_kafka_consumer.py $(kubectl get pod -o wide \ |
Shad Ansari | ecb0dca | 2019-04-03 15:53:03 -0700 | [diff] [blame] | 92 | | grep cord-kafka-0 | awk '{print $6}'):9092 \ |
| 93 | mygroup openolt.ind.olt openolt.ind.pkt |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 94 | """ |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 95 | optlist, argv = getopt.getopt(sys.argv[1:], 'T:') |
| 96 | if len(argv) < 3: |
| 97 | print_usage_and_exit(sys.argv[0]) |
| 98 | |
| 99 | broker = argv[0] |
| 100 | group = argv[1] |
| 101 | topics = argv[2:] |
| 102 | # Consumer configuration |
| 103 | # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 104 | conf = {'bootstrap.servers': broker, |
Shad Ansari | ecb0dca | 2019-04-03 15:53:03 -0700 | [diff] [blame] | 105 | 'group.id': group} |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 106 | |
| 107 | logger = logging.getLogger('openolt-kafka-consumer') |
| 108 | logger.setLevel(logging.DEBUG) |
| 109 | handler = logging.StreamHandler() |
Shad Ansari | 2b7a9c8 | 2019-03-22 11:40:35 -0700 | [diff] [blame] | 110 | handler.setFormatter(logging.Formatter( |
| 111 | '%(asctime)-15s %(levelname)-8s %(message)s')) |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 112 | logger.addHandler(handler) |
| 113 | |
| 114 | # Create Consumer instance |
| 115 | # Hint: try debug='fetch' to generate some log messages |
| 116 | # c = Consumer(conf, logger=logger, debug='fetch') |
| 117 | c = Consumer(conf, logger=logger) |
| 118 | |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 119 | # Subscribe to topics |
Shad Ansari | ecb0dca | 2019-04-03 15:53:03 -0700 | [diff] [blame] | 120 | c.subscribe(topics) |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 121 | |
| 122 | # Read messages from Kafka, print to stdout |
| 123 | try: |
| 124 | while True: |
| 125 | msg = c.poll(timeout=1.0) |
| 126 | if msg is None: |
| 127 | continue |
| 128 | elif not msg.error(): |
Shad Ansari | ecb0dca | 2019-04-03 15:53:03 -0700 | [diff] [blame] | 129 | print('got a kafka message, topic: {0}'.format(msg.topic())) |
Shad Ansari | 060a6fc | 2019-03-14 10:03:55 -0700 | [diff] [blame] | 130 | print(msg.value()) |
| 131 | elif msg.error().code() == KafkaError._PARTITION_EOF: |
| 132 | # print('End of partition reached {0}/{1}' |
| 133 | # .format(msg.topic(), msg.partition())) |
| 134 | pass |
| 135 | else: |
| 136 | print('Error occured: {0}'.format(msg.error().str())) |
| 137 | |
| 138 | except KeyboardInterrupt: |
| 139 | pass |
| 140 | |
| 141 | finally: |
| 142 | # Close down consumer to commit final offsets. |
| 143 | c.close() |