blob: 5c42ece9e75fab4d1c4306f7168cd0cbef9fab5c [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package kafka
2
3/**
4 * Copyright 2016 Confluent Inc.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19import (
20 "fmt"
21 "math"
22 "time"
23 "unsafe"
24)
25
26/*
27#include <stdlib.h>
28#include <librdkafka/rdkafka.h>
29
30
31static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) {
32 return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL;
33}
34*/
35import "C"
36
37// RebalanceCb provides a per-Subscribe*() rebalance event callback.
38// The passed Event will be either AssignedPartitions or RevokedPartitions
39type RebalanceCb func(*Consumer, Event) error
40
41// Consumer implements a High-level Apache Kafka Consumer instance
42type Consumer struct {
43 events chan Event
44 handle handle
45 eventsChanEnable bool
46 readerTermChan chan bool
47 rebalanceCb RebalanceCb
48 appReassigned bool
49 appRebalanceEnable bool // config setting
50}
51
52// Strings returns a human readable name for a Consumer instance
53func (c *Consumer) String() string {
54 return c.handle.String()
55}
56
57// getHandle implements the Handle interface
58func (c *Consumer) gethandle() *handle {
59 return &c.handle
60}
61
62// Subscribe to a single topic
63// This replaces the current subscription
64func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error {
65 return c.SubscribeTopics([]string{topic}, rebalanceCb)
66}
67
68// SubscribeTopics subscribes to the provided list of topics.
69// This replaces the current subscription.
70func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) {
71 ctopics := C.rd_kafka_topic_partition_list_new(C.int(len(topics)))
72 defer C.rd_kafka_topic_partition_list_destroy(ctopics)
73
74 for _, topic := range topics {
75 ctopic := C.CString(topic)
76 defer C.free(unsafe.Pointer(ctopic))
77 C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA)
78 }
79
80 e := C.rd_kafka_subscribe(c.handle.rk, ctopics)
81 if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
82 return newError(e)
83 }
84
85 c.rebalanceCb = rebalanceCb
86 c.handle.currAppRebalanceEnable = c.rebalanceCb != nil || c.appRebalanceEnable
87
88 return nil
89}
90
91// Unsubscribe from the current subscription, if any.
92func (c *Consumer) Unsubscribe() (err error) {
93 C.rd_kafka_unsubscribe(c.handle.rk)
94 return nil
95}
96
97// Assign an atomic set of partitions to consume.
98// This replaces the current assignment.
99func (c *Consumer) Assign(partitions []TopicPartition) (err error) {
100 c.appReassigned = true
101
102 cparts := newCPartsFromTopicPartitions(partitions)
103 defer C.rd_kafka_topic_partition_list_destroy(cparts)
104
105 e := C.rd_kafka_assign(c.handle.rk, cparts)
106 if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
107 return newError(e)
108 }
109
110 return nil
111}
112
113// Unassign the current set of partitions to consume.
114func (c *Consumer) Unassign() (err error) {
115 c.appReassigned = true
116
117 e := C.rd_kafka_assign(c.handle.rk, nil)
118 if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
119 return newError(e)
120 }
121
122 return nil
123}
124
125// commit offsets for specified offsets.
126// If offsets is nil the currently assigned partitions' offsets are committed.
127// This is a blocking call, caller will need to wrap in go-routine to
128// get async or throw-away behaviour.
129func (c *Consumer) commit(offsets []TopicPartition) (committedOffsets []TopicPartition, err error) {
130 var rkqu *C.rd_kafka_queue_t
131
132 rkqu = C.rd_kafka_queue_new(c.handle.rk)
133 defer C.rd_kafka_queue_destroy(rkqu)
134
135 var coffsets *C.rd_kafka_topic_partition_list_t
136 if offsets != nil {
137 coffsets = newCPartsFromTopicPartitions(offsets)
138 defer C.rd_kafka_topic_partition_list_destroy(coffsets)
139 }
140
141 cErr := C.rd_kafka_commit_queue(c.handle.rk, coffsets, rkqu, nil, nil)
142 if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
143 return nil, newError(cErr)
144 }
145
146 rkev := C.rd_kafka_queue_poll(rkqu, C.int(-1))
147 if rkev == nil {
148 // shouldn't happen
149 return nil, newError(C.RD_KAFKA_RESP_ERR__DESTROY)
150 }
151 defer C.rd_kafka_event_destroy(rkev)
152
153 if C.rd_kafka_event_type(rkev) != C.RD_KAFKA_EVENT_OFFSET_COMMIT {
154 panic(fmt.Sprintf("Expected OFFSET_COMMIT, got %s",
155 C.GoString(C.rd_kafka_event_name(rkev))))
156 }
157
158 cErr = C.rd_kafka_event_error(rkev)
159 if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
160 return nil, newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
161 }
162
163 cRetoffsets := C.rd_kafka_event_topic_partition_list(rkev)
164 if cRetoffsets == nil {
165 // no offsets, no error
166 return nil, nil
167 }
168 committedOffsets = newTopicPartitionsFromCparts(cRetoffsets)
169
170 return committedOffsets, nil
171}
172
173// Commit offsets for currently assigned partitions
174// This is a blocking call.
175// Returns the committed offsets on success.
176func (c *Consumer) Commit() ([]TopicPartition, error) {
177 return c.commit(nil)
178}
179
180// CommitMessage commits offset based on the provided message.
181// This is a blocking call.
182// Returns the committed offsets on success.
183func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) {
184 if m.TopicPartition.Error != nil {
185 return nil, Error{ErrInvalidArg, "Can't commit errored message"}
186 }
187 offsets := []TopicPartition{m.TopicPartition}
188 offsets[0].Offset++
189 return c.commit(offsets)
190}
191
192// CommitOffsets commits the provided list of offsets
193// This is a blocking call.
194// Returns the committed offsets on success.
195func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) {
196 return c.commit(offsets)
197}
198
199// StoreOffsets stores the provided list of offsets that will be committed
200// to the offset store according to `auto.commit.interval.ms` or manual
201// offset-less Commit().
202//
203// Returns the stored offsets on success. If at least one offset couldn't be stored,
204// an error and a list of offsets is returned. Each offset can be checked for
205// specific errors via its `.Error` member.
206func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error) {
207 coffsets := newCPartsFromTopicPartitions(offsets)
208 defer C.rd_kafka_topic_partition_list_destroy(coffsets)
209
210 cErr := C.rd_kafka_offsets_store(c.handle.rk, coffsets)
211
212 // coffsets might be annotated with an error
213 storedOffsets = newTopicPartitionsFromCparts(coffsets)
214
215 if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
216 return storedOffsets, newError(cErr)
217 }
218
219 return storedOffsets, nil
220}
221
222// Seek seeks the given topic partitions using the offset from the TopicPartition.
223//
224// If timeoutMs is not 0 the call will wait this long for the
225// seek to be performed. If the timeout is reached the internal state
226// will be unknown and this function returns ErrTimedOut.
227// If timeoutMs is 0 it will initiate the seek but return
228// immediately without any error reporting (e.g., async).
229//
230// Seek() may only be used for partitions already being consumed
231// (through Assign() or implicitly through a self-rebalanced Subscribe()).
232// To set the starting offset it is preferred to use Assign() and provide
233// a starting offset for each partition.
234//
235// Returns an error on failure or nil otherwise.
236func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error {
237 rkt := c.handle.getRkt(*partition.Topic)
238 cErr := C.rd_kafka_seek(rkt,
239 C.int32_t(partition.Partition),
240 C.int64_t(partition.Offset),
241 C.int(timeoutMs))
242 if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
243 return newError(cErr)
244 }
245 return nil
246}
247
248// Poll the consumer for messages or events.
249//
250// Will block for at most timeoutMs milliseconds
251//
252// The following callbacks may be triggered:
253// Subscribe()'s rebalanceCb
254//
255// Returns nil on timeout, else an Event
256func (c *Consumer) Poll(timeoutMs int) (event Event) {
257 ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil)
258 return ev
259}
260
261// Events returns the Events channel (if enabled)
262func (c *Consumer) Events() chan Event {
263 return c.events
264}
265
266// ReadMessage polls the consumer for a message.
267//
268// This is a conveniance API that wraps Poll() and only returns
269// messages or errors. All other event types are discarded.
270//
271// The call will block for at most `timeout` waiting for
272// a new message or error. `timeout` may be set to -1 for
273// indefinite wait.
274//
275// Timeout is returned as (nil, err) where err is `kafka.(Error).Code == Kafka.ErrTimedOut`.
276//
277// Messages are returned as (msg, nil),
278// while general errors are returned as (nil, err),
279// and partition-specific errors are returned as (msg, err) where
280// msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
281//
282// All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.
283//
284func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) {
285
286 var absTimeout time.Time
287 var timeoutMs int
288
289 if timeout > 0 {
290 absTimeout = time.Now().Add(timeout)
291 timeoutMs = (int)(timeout.Seconds() * 1000.0)
292 } else {
293 timeoutMs = (int)(timeout)
294 }
295
296 for {
297 ev := c.Poll(timeoutMs)
298
299 switch e := ev.(type) {
300 case *Message:
301 if e.TopicPartition.Error != nil {
302 return e, e.TopicPartition.Error
303 }
304 return e, nil
305 case Error:
306 return nil, e
307 default:
308 // Ignore other event types
309 }
310
311 if timeout > 0 {
312 // Calculate remaining time
313 timeoutMs = int(math.Max(0.0, absTimeout.Sub(time.Now()).Seconds()*1000.0))
314 }
315
316 if timeoutMs == 0 && ev == nil {
317 return nil, newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
318 }
319
320 }
321
322}
323
324// Close Consumer instance.
325// The object is no longer usable after this call.
326func (c *Consumer) Close() (err error) {
327
328 if c.eventsChanEnable {
329 // Wait for consumerReader() to terminate (by closing readerTermChan)
330 close(c.readerTermChan)
331 c.handle.waitTerminated(1)
332 close(c.events)
333 }
334
335 C.rd_kafka_queue_destroy(c.handle.rkq)
336 c.handle.rkq = nil
337
338 e := C.rd_kafka_consumer_close(c.handle.rk)
339 if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
340 return newError(e)
341 }
342
343 c.handle.cleanup()
344
345 C.rd_kafka_destroy(c.handle.rk)
346
347 return nil
348}
349
350// NewConsumer creates a new high-level Consumer instance.
351//
352// Supported special configuration properties:
353// go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
354// If set to true the app must handle the AssignedPartitions and
355// RevokedPartitions events and call Assign() and Unassign()
356// respectively.
357// go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
358// go.events.channel.size (int, 1000) - Events() channel size
359//
360// WARNING: Due to the buffering nature of channels (and queues in general) the
361// use of the events channel risks receiving outdated events and
362// messages. Minimizing go.events.channel.size reduces the risk
363// and number of outdated events and messages but does not eliminate
364// the factor completely. With a channel size of 1 at most one
365// event or message may be outdated.
366func NewConsumer(conf *ConfigMap) (*Consumer, error) {
367
368 err := versionCheck()
369 if err != nil {
370 return nil, err
371 }
372
373 // before we do anything with the configuration, create a copy such that
374 // the original is not mutated.
375 confCopy := conf.clone()
376
377 groupid, _ := confCopy.get("group.id", nil)
378 if groupid == nil {
379 // without a group.id the underlying cgrp subsystem in librdkafka wont get started
380 // and without it there is no way to consume assigned partitions.
381 // So for now require the group.id, this might change in the future.
382 return nil, newErrorFromString(ErrInvalidArg, "Required property group.id not set")
383 }
384
385 c := &Consumer{}
386
387 v, err := confCopy.extract("go.application.rebalance.enable", false)
388 if err != nil {
389 return nil, err
390 }
391 c.appRebalanceEnable = v.(bool)
392
393 v, err = confCopy.extract("go.events.channel.enable", false)
394 if err != nil {
395 return nil, err
396 }
397 c.eventsChanEnable = v.(bool)
398
399 v, err = confCopy.extract("go.events.channel.size", 1000)
400 if err != nil {
401 return nil, err
402 }
403 eventsChanSize := v.(int)
404
405 cConf, err := confCopy.convert()
406 if err != nil {
407 return nil, err
408 }
409 cErrstr := (*C.char)(C.malloc(C.size_t(256)))
410 defer C.free(unsafe.Pointer(cErrstr))
411
412 C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
413
414 c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)
415 if c.handle.rk == nil {
416 return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
417 }
418
419 C.rd_kafka_poll_set_consumer(c.handle.rk)
420
421 c.handle.c = c
422 c.handle.setup()
423 c.handle.rkq = C.rd_kafka_queue_get_consumer(c.handle.rk)
424 if c.handle.rkq == nil {
425 // no cgrp (no group.id configured), revert to main queue.
426 c.handle.rkq = C.rd_kafka_queue_get_main(c.handle.rk)
427 }
428
429 if c.eventsChanEnable {
430 c.events = make(chan Event, eventsChanSize)
431 c.readerTermChan = make(chan bool)
432
433 /* Start rdkafka consumer queue reader -> events writer goroutine */
434 go consumerReader(c, c.readerTermChan)
435 }
436
437 return c, nil
438}
439
440// rebalance calls the application's rebalance callback, if any.
441// Returns true if the underlying assignment was updated, else false.
442func (c *Consumer) rebalance(ev Event) bool {
443 c.appReassigned = false
444
445 if c.rebalanceCb != nil {
446 c.rebalanceCb(c, ev)
447 }
448
449 return c.appReassigned
450}
451
452// consumerReader reads messages and events from the librdkafka consumer queue
453// and posts them on the consumer channel.
454// Runs until termChan closes
455func consumerReader(c *Consumer, termChan chan bool) {
456
457out:
458 for true {
459 select {
460 case _ = <-termChan:
461 break out
462 default:
463 _, term := c.handle.eventPoll(c.events, 100, 1000, termChan)
464 if term {
465 break out
466 }
467
468 }
469 }
470
471 c.handle.terminatedChan <- "consumerReader"
472 return
473
474}
475
476// GetMetadata queries broker for cluster and topic metadata.
477// If topic is non-nil only information about that topic is returned, else if
478// allTopics is false only information about locally used topics is returned,
479// else information about all topics is returned.
480// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
481func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
482 return getMetadata(c, topic, allTopics, timeoutMs)
483}
484
485// QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
486// and partition.
487func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
488 return queryWatermarkOffsets(c, topic, partition, timeoutMs)
489}
490
491// OffsetsForTimes looks up offsets by timestamp for the given partitions.
492//
493// The returned offset for each partition is the earliest offset whose
494// timestamp is greater than or equal to the given timestamp in the
495// corresponding partition.
496//
497// The timestamps to query are represented as `.Offset` in the `times`
498// argument and the looked up offsets are represented as `.Offset` in the returned
499// `offsets` list.
500//
501// The function will block for at most timeoutMs milliseconds.
502//
503// Duplicate Topic+Partitions are not supported.
504// Per-partition errors may be returned in the `.Error` field.
505func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
506 return offsetsForTimes(c, times, timeoutMs)
507}
508
509// Subscription returns the current subscription as set by Subscribe()
510func (c *Consumer) Subscription() (topics []string, err error) {
511 var cTopics *C.rd_kafka_topic_partition_list_t
512
513 cErr := C.rd_kafka_subscription(c.handle.rk, &cTopics)
514 if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
515 return nil, newError(cErr)
516 }
517 defer C.rd_kafka_topic_partition_list_destroy(cTopics)
518
519 topicCnt := int(cTopics.cnt)
520 topics = make([]string, topicCnt)
521 for i := 0; i < topicCnt; i++ {
522 crktpar := C._c_rdkafka_topic_partition_list_entry(cTopics,
523 C.int(i))
524 topics[i] = C.GoString(crktpar.topic)
525 }
526
527 return topics, nil
528}
529
530// Assignment returns the current partition assignments
531func (c *Consumer) Assignment() (partitions []TopicPartition, err error) {
532 var cParts *C.rd_kafka_topic_partition_list_t
533
534 cErr := C.rd_kafka_assignment(c.handle.rk, &cParts)
535 if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
536 return nil, newError(cErr)
537 }
538 defer C.rd_kafka_topic_partition_list_destroy(cParts)
539
540 partitions = newTopicPartitionsFromCparts(cParts)
541
542 return partitions, nil
543}
544
545// Committed retrieves committed offsets for the given set of partitions
546func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
547 cparts := newCPartsFromTopicPartitions(partitions)
548 defer C.rd_kafka_topic_partition_list_destroy(cparts)
549 cerr := C.rd_kafka_committed(c.handle.rk, cparts, C.int(timeoutMs))
550 if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
551 return nil, newError(cerr)
552 }
553
554 return newTopicPartitionsFromCparts(cparts), nil
555}
556
557// Pause consumption for the provided list of partitions
558//
559// Note that messages already enqueued on the consumer's Event channel
560// (if `go.events.channel.enable` has been set) will NOT be purged by
561// this call, set `go.events.channel.size` accordingly.
562func (c *Consumer) Pause(partitions []TopicPartition) (err error) {
563 cparts := newCPartsFromTopicPartitions(partitions)
564 defer C.rd_kafka_topic_partition_list_destroy(cparts)
565 cerr := C.rd_kafka_pause_partitions(c.handle.rk, cparts)
566 if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
567 return newError(cerr)
568 }
569 return nil
570}
571
572// Resume consumption for the provided list of partitions
573func (c *Consumer) Resume(partitions []TopicPartition) (err error) {
574 cparts := newCPartsFromTopicPartitions(partitions)
575 defer C.rd_kafka_topic_partition_list_destroy(cparts)
576 cerr := C.rd_kafka_resume_partitions(c.handle.rk, cparts)
577 if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
578 return newError(cerr)
579 }
580 return nil
581}