blob: 4883ee2035ab25cd232611f30815524e76b5b455 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/**
2 * Copyright 2016 Confluent Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// Package kafka provides high-level Apache Kafka producer and consumers
18// using bindings on-top of the librdkafka C library.
19//
20//
21// High-level Consumer
22//
23// * Decide if you want to read messages and events from the `.Events()` channel
24// (set `"go.events.channel.enable": true`) or by calling `.Poll()`.
25//
26// * Create a Consumer with `kafka.NewConsumer()` providing at
27// least the `bootstrap.servers` and `group.id` configuration properties.
28//
29// * Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics)
30// to join the group with the specified subscription set.
31// Subscriptions are atomic, calling `.Subscribe*()` again will leave
32// the group and rejoin with the new set of topics.
33//
34// * Start reading events and messages from either the `.Events` channel
35// or by calling `.Poll()`.
36//
37// * When the group has rebalanced each client member is assigned a
38// (sub-)set of topic+partitions.
39// By default the consumer will start fetching messages for its assigned
40// partitions at this point, but your application may enable rebalance
41// events to get an insight into what the assigned partitions where
42// as well as set the initial offsets. To do this you need to pass
43// `"go.application.rebalance.enable": true` to the `NewConsumer()` call
44// mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event
45// with the assigned partition set. You can optionally modify the initial
46// offsets (they'll default to stored offsets and if there are no previously stored
47// offsets it will fall back to `"default.topic.config": ConfigMap{"auto.offset.reset": ..}`
48// which defaults to the `latest` message) and then call `.Assign(partitions)`
49// to start consuming. If you don't need to modify the initial offsets you will
50// not need to call `.Assign()`, the client will do so automatically for you if
51// you dont.
52//
53// * As messages are fetched they will be made available on either the
54// `.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.
55//
56// * Handle messages, events and errors to your liking.
57//
58// * When you are done consuming call `.Close()` to commit final offsets
59// and leave the consumer group.
60//
61//
62//
63// Producer
64//
65// * Create a Producer with `kafka.NewProducer()` providing at least
66// the `bootstrap.servers` configuration properties.
67//
68// * Messages may now be produced either by sending a `*kafka.Message`
69// on the `.ProduceChannel` or by calling `.Produce()`.
70//
71// * Producing is an asynchronous operation so the client notifies the application
72// of per-message produce success or failure through something called delivery reports.
73// Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message`
74// and you should check `msg.TopicPartition.Error` for `nil` to find out if the message
75// was succesfully delivered or not.
76// It is also possible to direct delivery reports to alternate channels
77// by providing a non-nil `chan Event` channel to `.Produce()`.
78// If no delivery reports are wanted they can be completely disabled by
79// setting configuration property `"go.delivery.reports": false`.
80//
81// * When you are done producing messages you will need to make sure all messages
82// are indeed delivered to the broker (or failed), remember that this is
83// an asynchronous client so some of your messages may be lingering in internal
84// channels or tranmission queues.
85// To do this you can either keep track of the messages you've produced
86// and wait for their corresponding delivery reports, or call the convenience
87// function `.Flush()` that will block until all message deliveries are done
88// or the provided timeout elapses.
89//
90// * Finally call `.Close()` to decommission the producer.
91//
92//
93// Events
94//
95// Apart from emitting messages and delivery reports the client also communicates
96// with the application through a number of different event types.
97// An application may choose to handle or ignore these events.
98//
99// Consumer events
100//
101// * `*kafka.Message` - a fetched message.
102//
103// * `AssignedPartitions` - The assigned partition set for this client following a rebalance.
104// Requires `go.application.rebalance.enable`
105//
106// * `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance.
107// `AssignedPartitions` and `RevokedPartitions` are symetrical.
108// Requires `go.application.rebalance.enable`
109//
110// * `PartitionEOF` - Consumer has reached the end of a partition.
111// NOTE: The consumer will keep trying to fetch new messages for the partition.
112//
113// * `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).
114//
115//
116// Producer events
117//
118// * `*kafka.Message` - delivery report for produced message.
119// Check `.TopicPartition.Error` for delivery result.
120//
121//
122// Generic events for both Consumer and Producer
123//
124// * `KafkaError` - client (error codes are prefixed with _) or broker error.
125// These errors are normally just informational since the
126// client will try its best to automatically recover (eventually).
127//
128//
129// Hint: If your application registers a signal notification
130// (signal.Notify) makes sure the signals channel is buffered to avoid
131// possible complications with blocking Poll() calls.
132//
133// Note: The Confluent Kafka Go client is safe for concurrent use.
134package kafka
135
136import (
137 "fmt"
138 "unsafe"
139)
140
141/*
142#include <stdlib.h>
143#include <string.h>
144#include <librdkafka/rdkafka.h>
145
146static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) {
147 return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL;
148}
149*/
150import "C"
151
152// PartitionAny represents any partition (for partitioning),
153// or unspecified value (for all other cases)
154const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)
155
156// TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.
157type TopicPartition struct {
158 Topic *string
159 Partition int32
160 Offset Offset
161 Error error
162}
163
164func (p TopicPartition) String() string {
165 topic := "<null>"
166 if p.Topic != nil {
167 topic = *p.Topic
168 }
169 if p.Error != nil {
170 return fmt.Sprintf("%s[%d]@%s(%s)",
171 topic, p.Partition, p.Offset, p.Error)
172 }
173 return fmt.Sprintf("%s[%d]@%s",
174 topic, p.Partition, p.Offset)
175}
176
177// TopicPartitions is a slice of TopicPartitions that also implements
178// the sort interface
179type TopicPartitions []TopicPartition
180
181func (tps TopicPartitions) Len() int {
182 return len(tps)
183}
184
185func (tps TopicPartitions) Less(i, j int) bool {
186 if *tps[i].Topic < *tps[j].Topic {
187 return true
188 } else if *tps[i].Topic > *tps[j].Topic {
189 return false
190 }
191 return tps[i].Partition < tps[j].Partition
192}
193
194func (tps TopicPartitions) Swap(i, j int) {
195 tps[i], tps[j] = tps[j], tps[i]
196}
197
198// new_cparts_from_TopicPartitions creates a new C rd_kafka_topic_partition_list_t
199// from a TopicPartition array.
200func newCPartsFromTopicPartitions(partitions []TopicPartition) (cparts *C.rd_kafka_topic_partition_list_t) {
201 cparts = C.rd_kafka_topic_partition_list_new(C.int(len(partitions)))
202 for _, part := range partitions {
203 ctopic := C.CString(*part.Topic)
204 defer C.free(unsafe.Pointer(ctopic))
205 rktpar := C.rd_kafka_topic_partition_list_add(cparts, ctopic, C.int32_t(part.Partition))
206 rktpar.offset = C.int64_t(part.Offset)
207 }
208
209 return cparts
210}
211
212func setupTopicPartitionFromCrktpar(partition *TopicPartition, crktpar *C.rd_kafka_topic_partition_t) {
213
214 topic := C.GoString(crktpar.topic)
215 partition.Topic = &topic
216 partition.Partition = int32(crktpar.partition)
217 partition.Offset = Offset(crktpar.offset)
218 if crktpar.err != C.RD_KAFKA_RESP_ERR_NO_ERROR {
219 partition.Error = newError(crktpar.err)
220 }
221}
222
223func newTopicPartitionsFromCparts(cparts *C.rd_kafka_topic_partition_list_t) (partitions []TopicPartition) {
224
225 partcnt := int(cparts.cnt)
226
227 partitions = make([]TopicPartition, partcnt)
228 for i := 0; i < partcnt; i++ {
229 crktpar := C._c_rdkafka_topic_partition_list_entry(cparts, C.int(i))
230 setupTopicPartitionFromCrktpar(&partitions[i], crktpar)
231 }
232
233 return partitions
234}
235
236// LibraryVersion returns the underlying librdkafka library version as a
237// (version_int, version_str) tuple.
238func LibraryVersion() (int, string) {
239 ver := (int)(C.rd_kafka_version())
240 verstr := C.GoString(C.rd_kafka_version_str())
241 return ver, verstr
242}