khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame^] | 1 | /** |
| 2 | * Copyright 2016 Confluent Inc. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | // Package kafka provides high-level Apache Kafka producer and consumers |
| 18 | // using bindings on-top of the librdkafka C library. |
| 19 | // |
| 20 | // |
| 21 | // High-level Consumer |
| 22 | // |
| 23 | // * Decide if you want to read messages and events from the `.Events()` channel |
| 24 | // (set `"go.events.channel.enable": true`) or by calling `.Poll()`. |
| 25 | // |
| 26 | // * Create a Consumer with `kafka.NewConsumer()` providing at |
| 27 | // least the `bootstrap.servers` and `group.id` configuration properties. |
| 28 | // |
| 29 | // * Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics) |
| 30 | // to join the group with the specified subscription set. |
| 31 | // Subscriptions are atomic, calling `.Subscribe*()` again will leave |
| 32 | // the group and rejoin with the new set of topics. |
| 33 | // |
| 34 | // * Start reading events and messages from either the `.Events` channel |
| 35 | // or by calling `.Poll()`. |
| 36 | // |
| 37 | // * When the group has rebalanced each client member is assigned a |
| 38 | // (sub-)set of topic+partitions. |
| 39 | // By default the consumer will start fetching messages for its assigned |
| 40 | // partitions at this point, but your application may enable rebalance |
| 41 | // events to get an insight into what the assigned partitions where |
| 42 | // as well as set the initial offsets. To do this you need to pass |
| 43 | // `"go.application.rebalance.enable": true` to the `NewConsumer()` call |
| 44 | // mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event |
| 45 | // with the assigned partition set. You can optionally modify the initial |
| 46 | // offsets (they'll default to stored offsets and if there are no previously stored |
| 47 | // offsets it will fall back to `"default.topic.config": ConfigMap{"auto.offset.reset": ..}` |
| 48 | // which defaults to the `latest` message) and then call `.Assign(partitions)` |
| 49 | // to start consuming. If you don't need to modify the initial offsets you will |
| 50 | // not need to call `.Assign()`, the client will do so automatically for you if |
| 51 | // you dont. |
| 52 | // |
| 53 | // * As messages are fetched they will be made available on either the |
| 54 | // `.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`. |
| 55 | // |
| 56 | // * Handle messages, events and errors to your liking. |
| 57 | // |
| 58 | // * When you are done consuming call `.Close()` to commit final offsets |
| 59 | // and leave the consumer group. |
| 60 | // |
| 61 | // |
| 62 | // |
| 63 | // Producer |
| 64 | // |
| 65 | // * Create a Producer with `kafka.NewProducer()` providing at least |
| 66 | // the `bootstrap.servers` configuration properties. |
| 67 | // |
| 68 | // * Messages may now be produced either by sending a `*kafka.Message` |
| 69 | // on the `.ProduceChannel` or by calling `.Produce()`. |
| 70 | // |
| 71 | // * Producing is an asynchronous operation so the client notifies the application |
| 72 | // of per-message produce success or failure through something called delivery reports. |
| 73 | // Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message` |
| 74 | // and you should check `msg.TopicPartition.Error` for `nil` to find out if the message |
| 75 | // was succesfully delivered or not. |
| 76 | // It is also possible to direct delivery reports to alternate channels |
| 77 | // by providing a non-nil `chan Event` channel to `.Produce()`. |
| 78 | // If no delivery reports are wanted they can be completely disabled by |
| 79 | // setting configuration property `"go.delivery.reports": false`. |
| 80 | // |
| 81 | // * When you are done producing messages you will need to make sure all messages |
| 82 | // are indeed delivered to the broker (or failed), remember that this is |
| 83 | // an asynchronous client so some of your messages may be lingering in internal |
| 84 | // channels or tranmission queues. |
| 85 | // To do this you can either keep track of the messages you've produced |
| 86 | // and wait for their corresponding delivery reports, or call the convenience |
| 87 | // function `.Flush()` that will block until all message deliveries are done |
| 88 | // or the provided timeout elapses. |
| 89 | // |
| 90 | // * Finally call `.Close()` to decommission the producer. |
| 91 | // |
| 92 | // |
| 93 | // Events |
| 94 | // |
| 95 | // Apart from emitting messages and delivery reports the client also communicates |
| 96 | // with the application through a number of different event types. |
| 97 | // An application may choose to handle or ignore these events. |
| 98 | // |
| 99 | // Consumer events |
| 100 | // |
| 101 | // * `*kafka.Message` - a fetched message. |
| 102 | // |
| 103 | // * `AssignedPartitions` - The assigned partition set for this client following a rebalance. |
| 104 | // Requires `go.application.rebalance.enable` |
| 105 | // |
| 106 | // * `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance. |
| 107 | // `AssignedPartitions` and `RevokedPartitions` are symetrical. |
| 108 | // Requires `go.application.rebalance.enable` |
| 109 | // |
| 110 | // * `PartitionEOF` - Consumer has reached the end of a partition. |
| 111 | // NOTE: The consumer will keep trying to fetch new messages for the partition. |
| 112 | // |
| 113 | // * `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled). |
| 114 | // |
| 115 | // |
| 116 | // Producer events |
| 117 | // |
| 118 | // * `*kafka.Message` - delivery report for produced message. |
| 119 | // Check `.TopicPartition.Error` for delivery result. |
| 120 | // |
| 121 | // |
| 122 | // Generic events for both Consumer and Producer |
| 123 | // |
| 124 | // * `KafkaError` - client (error codes are prefixed with _) or broker error. |
| 125 | // These errors are normally just informational since the |
| 126 | // client will try its best to automatically recover (eventually). |
| 127 | // |
| 128 | // |
| 129 | // Hint: If your application registers a signal notification |
| 130 | // (signal.Notify) makes sure the signals channel is buffered to avoid |
| 131 | // possible complications with blocking Poll() calls. |
| 132 | // |
| 133 | // Note: The Confluent Kafka Go client is safe for concurrent use. |
| 134 | package kafka |
| 135 | |
| 136 | import ( |
| 137 | "fmt" |
| 138 | "unsafe" |
| 139 | ) |
| 140 | |
| 141 | /* |
| 142 | #include <stdlib.h> |
| 143 | #include <string.h> |
| 144 | #include <librdkafka/rdkafka.h> |
| 145 | |
| 146 | static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) { |
| 147 | return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL; |
| 148 | } |
| 149 | */ |
| 150 | import "C" |
| 151 | |
| 152 | // PartitionAny represents any partition (for partitioning), |
| 153 | // or unspecified value (for all other cases) |
| 154 | const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA) |
| 155 | |
| 156 | // TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset. |
| 157 | type TopicPartition struct { |
| 158 | Topic *string |
| 159 | Partition int32 |
| 160 | Offset Offset |
| 161 | Error error |
| 162 | } |
| 163 | |
| 164 | func (p TopicPartition) String() string { |
| 165 | topic := "<null>" |
| 166 | if p.Topic != nil { |
| 167 | topic = *p.Topic |
| 168 | } |
| 169 | if p.Error != nil { |
| 170 | return fmt.Sprintf("%s[%d]@%s(%s)", |
| 171 | topic, p.Partition, p.Offset, p.Error) |
| 172 | } |
| 173 | return fmt.Sprintf("%s[%d]@%s", |
| 174 | topic, p.Partition, p.Offset) |
| 175 | } |
| 176 | |
| 177 | // TopicPartitions is a slice of TopicPartitions that also implements |
| 178 | // the sort interface |
| 179 | type TopicPartitions []TopicPartition |
| 180 | |
| 181 | func (tps TopicPartitions) Len() int { |
| 182 | return len(tps) |
| 183 | } |
| 184 | |
| 185 | func (tps TopicPartitions) Less(i, j int) bool { |
| 186 | if *tps[i].Topic < *tps[j].Topic { |
| 187 | return true |
| 188 | } else if *tps[i].Topic > *tps[j].Topic { |
| 189 | return false |
| 190 | } |
| 191 | return tps[i].Partition < tps[j].Partition |
| 192 | } |
| 193 | |
| 194 | func (tps TopicPartitions) Swap(i, j int) { |
| 195 | tps[i], tps[j] = tps[j], tps[i] |
| 196 | } |
| 197 | |
| 198 | // new_cparts_from_TopicPartitions creates a new C rd_kafka_topic_partition_list_t |
| 199 | // from a TopicPartition array. |
| 200 | func newCPartsFromTopicPartitions(partitions []TopicPartition) (cparts *C.rd_kafka_topic_partition_list_t) { |
| 201 | cparts = C.rd_kafka_topic_partition_list_new(C.int(len(partitions))) |
| 202 | for _, part := range partitions { |
| 203 | ctopic := C.CString(*part.Topic) |
| 204 | defer C.free(unsafe.Pointer(ctopic)) |
| 205 | rktpar := C.rd_kafka_topic_partition_list_add(cparts, ctopic, C.int32_t(part.Partition)) |
| 206 | rktpar.offset = C.int64_t(part.Offset) |
| 207 | } |
| 208 | |
| 209 | return cparts |
| 210 | } |
| 211 | |
| 212 | func setupTopicPartitionFromCrktpar(partition *TopicPartition, crktpar *C.rd_kafka_topic_partition_t) { |
| 213 | |
| 214 | topic := C.GoString(crktpar.topic) |
| 215 | partition.Topic = &topic |
| 216 | partition.Partition = int32(crktpar.partition) |
| 217 | partition.Offset = Offset(crktpar.offset) |
| 218 | if crktpar.err != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 219 | partition.Error = newError(crktpar.err) |
| 220 | } |
| 221 | } |
| 222 | |
| 223 | func newTopicPartitionsFromCparts(cparts *C.rd_kafka_topic_partition_list_t) (partitions []TopicPartition) { |
| 224 | |
| 225 | partcnt := int(cparts.cnt) |
| 226 | |
| 227 | partitions = make([]TopicPartition, partcnt) |
| 228 | for i := 0; i < partcnt; i++ { |
| 229 | crktpar := C._c_rdkafka_topic_partition_list_entry(cparts, C.int(i)) |
| 230 | setupTopicPartitionFromCrktpar(&partitions[i], crktpar) |
| 231 | } |
| 232 | |
| 233 | return partitions |
| 234 | } |
| 235 | |
| 236 | // LibraryVersion returns the underlying librdkafka library version as a |
| 237 | // (version_int, version_str) tuple. |
| 238 | func LibraryVersion() (int, string) { |
| 239 | ver := (int)(C.rd_kafka_version()) |
| 240 | verstr := C.GoString(C.rd_kafka_version_str()) |
| 241 | return ver, verstr |
| 242 | } |