blob: 5c42ece9e75fab4d1c4306f7168cd0cbef9fab5c [file] [log] [blame]
package kafka
/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import (
"fmt"
"math"
"time"
"unsafe"
)
/*
#include <stdlib.h>
#include <librdkafka/rdkafka.h>
static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) {
return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL;
}
*/
import "C"
// RebalanceCb provides a per-Subscribe*() rebalance event callback.
// The passed Event will be either AssignedPartitions or RevokedPartitions
type RebalanceCb func(*Consumer, Event) error
// Consumer implements a High-level Apache Kafka Consumer instance
type Consumer struct {
events chan Event
handle handle
eventsChanEnable bool
readerTermChan chan bool
rebalanceCb RebalanceCb
appReassigned bool
appRebalanceEnable bool // config setting
}
// Strings returns a human readable name for a Consumer instance
func (c *Consumer) String() string {
return c.handle.String()
}
// getHandle implements the Handle interface
func (c *Consumer) gethandle() *handle {
return &c.handle
}
// Subscribe to a single topic
// This replaces the current subscription
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error {
return c.SubscribeTopics([]string{topic}, rebalanceCb)
}
// SubscribeTopics subscribes to the provided list of topics.
// This replaces the current subscription.
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) {
ctopics := C.rd_kafka_topic_partition_list_new(C.int(len(topics)))
defer C.rd_kafka_topic_partition_list_destroy(ctopics)
for _, topic := range topics {
ctopic := C.CString(topic)
defer C.free(unsafe.Pointer(ctopic))
C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA)
}
e := C.rd_kafka_subscribe(c.handle.rk, ctopics)
if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(e)
}
c.rebalanceCb = rebalanceCb
c.handle.currAppRebalanceEnable = c.rebalanceCb != nil || c.appRebalanceEnable
return nil
}
// Unsubscribe from the current subscription, if any.
func (c *Consumer) Unsubscribe() (err error) {
C.rd_kafka_unsubscribe(c.handle.rk)
return nil
}
// Assign an atomic set of partitions to consume.
// This replaces the current assignment.
func (c *Consumer) Assign(partitions []TopicPartition) (err error) {
c.appReassigned = true
cparts := newCPartsFromTopicPartitions(partitions)
defer C.rd_kafka_topic_partition_list_destroy(cparts)
e := C.rd_kafka_assign(c.handle.rk, cparts)
if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(e)
}
return nil
}
// Unassign the current set of partitions to consume.
func (c *Consumer) Unassign() (err error) {
c.appReassigned = true
e := C.rd_kafka_assign(c.handle.rk, nil)
if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(e)
}
return nil
}
// commit offsets for specified offsets.
// If offsets is nil the currently assigned partitions' offsets are committed.
// This is a blocking call, caller will need to wrap in go-routine to
// get async or throw-away behaviour.
func (c *Consumer) commit(offsets []TopicPartition) (committedOffsets []TopicPartition, err error) {
var rkqu *C.rd_kafka_queue_t
rkqu = C.rd_kafka_queue_new(c.handle.rk)
defer C.rd_kafka_queue_destroy(rkqu)
var coffsets *C.rd_kafka_topic_partition_list_t
if offsets != nil {
coffsets = newCPartsFromTopicPartitions(offsets)
defer C.rd_kafka_topic_partition_list_destroy(coffsets)
}
cErr := C.rd_kafka_commit_queue(c.handle.rk, coffsets, rkqu, nil, nil)
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newError(cErr)
}
rkev := C.rd_kafka_queue_poll(rkqu, C.int(-1))
if rkev == nil {
// shouldn't happen
return nil, newError(C.RD_KAFKA_RESP_ERR__DESTROY)
}
defer C.rd_kafka_event_destroy(rkev)
if C.rd_kafka_event_type(rkev) != C.RD_KAFKA_EVENT_OFFSET_COMMIT {
panic(fmt.Sprintf("Expected OFFSET_COMMIT, got %s",
C.GoString(C.rd_kafka_event_name(rkev))))
}
cErr = C.rd_kafka_event_error(rkev)
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
}
cRetoffsets := C.rd_kafka_event_topic_partition_list(rkev)
if cRetoffsets == nil {
// no offsets, no error
return nil, nil
}
committedOffsets = newTopicPartitionsFromCparts(cRetoffsets)
return committedOffsets, nil
}
// Commit offsets for currently assigned partitions
// This is a blocking call.
// Returns the committed offsets on success.
func (c *Consumer) Commit() ([]TopicPartition, error) {
return c.commit(nil)
}
// CommitMessage commits offset based on the provided message.
// This is a blocking call.
// Returns the committed offsets on success.
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) {
if m.TopicPartition.Error != nil {
return nil, Error{ErrInvalidArg, "Can't commit errored message"}
}
offsets := []TopicPartition{m.TopicPartition}
offsets[0].Offset++
return c.commit(offsets)
}
// CommitOffsets commits the provided list of offsets
// This is a blocking call.
// Returns the committed offsets on success.
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) {
return c.commit(offsets)
}
// StoreOffsets stores the provided list of offsets that will be committed
// to the offset store according to `auto.commit.interval.ms` or manual
// offset-less Commit().
//
// Returns the stored offsets on success. If at least one offset couldn't be stored,
// an error and a list of offsets is returned. Each offset can be checked for
// specific errors via its `.Error` member.
func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error) {
coffsets := newCPartsFromTopicPartitions(offsets)
defer C.rd_kafka_topic_partition_list_destroy(coffsets)
cErr := C.rd_kafka_offsets_store(c.handle.rk, coffsets)
// coffsets might be annotated with an error
storedOffsets = newTopicPartitionsFromCparts(coffsets)
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return storedOffsets, newError(cErr)
}
return storedOffsets, nil
}
// Seek seeks the given topic partitions using the offset from the TopicPartition.
//
// If timeoutMs is not 0 the call will wait this long for the
// seek to be performed. If the timeout is reached the internal state
// will be unknown and this function returns ErrTimedOut.
// If timeoutMs is 0 it will initiate the seek but return
// immediately without any error reporting (e.g., async).
//
// Seek() may only be used for partitions already being consumed
// (through Assign() or implicitly through a self-rebalanced Subscribe()).
// To set the starting offset it is preferred to use Assign() and provide
// a starting offset for each partition.
//
// Returns an error on failure or nil otherwise.
func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error {
rkt := c.handle.getRkt(*partition.Topic)
cErr := C.rd_kafka_seek(rkt,
C.int32_t(partition.Partition),
C.int64_t(partition.Offset),
C.int(timeoutMs))
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(cErr)
}
return nil
}
// Poll the consumer for messages or events.
//
// Will block for at most timeoutMs milliseconds
//
// The following callbacks may be triggered:
// Subscribe()'s rebalanceCb
//
// Returns nil on timeout, else an Event
func (c *Consumer) Poll(timeoutMs int) (event Event) {
ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil)
return ev
}
// Events returns the Events channel (if enabled)
func (c *Consumer) Events() chan Event {
return c.events
}
// ReadMessage polls the consumer for a message.
//
// This is a conveniance API that wraps Poll() and only returns
// messages or errors. All other event types are discarded.
//
// The call will block for at most `timeout` waiting for
// a new message or error. `timeout` may be set to -1 for
// indefinite wait.
//
// Timeout is returned as (nil, err) where err is `kafka.(Error).Code == Kafka.ErrTimedOut`.
//
// Messages are returned as (msg, nil),
// while general errors are returned as (nil, err),
// and partition-specific errors are returned as (msg, err) where
// msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
//
// All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.
//
func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) {
var absTimeout time.Time
var timeoutMs int
if timeout > 0 {
absTimeout = time.Now().Add(timeout)
timeoutMs = (int)(timeout.Seconds() * 1000.0)
} else {
timeoutMs = (int)(timeout)
}
for {
ev := c.Poll(timeoutMs)
switch e := ev.(type) {
case *Message:
if e.TopicPartition.Error != nil {
return e, e.TopicPartition.Error
}
return e, nil
case Error:
return nil, e
default:
// Ignore other event types
}
if timeout > 0 {
// Calculate remaining time
timeoutMs = int(math.Max(0.0, absTimeout.Sub(time.Now()).Seconds()*1000.0))
}
if timeoutMs == 0 && ev == nil {
return nil, newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
}
}
}
// Close Consumer instance.
// The object is no longer usable after this call.
func (c *Consumer) Close() (err error) {
if c.eventsChanEnable {
// Wait for consumerReader() to terminate (by closing readerTermChan)
close(c.readerTermChan)
c.handle.waitTerminated(1)
close(c.events)
}
C.rd_kafka_queue_destroy(c.handle.rkq)
c.handle.rkq = nil
e := C.rd_kafka_consumer_close(c.handle.rk)
if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(e)
}
c.handle.cleanup()
C.rd_kafka_destroy(c.handle.rk)
return nil
}
// NewConsumer creates a new high-level Consumer instance.
//
// Supported special configuration properties:
// go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
// If set to true the app must handle the AssignedPartitions and
// RevokedPartitions events and call Assign() and Unassign()
// respectively.
// go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
// go.events.channel.size (int, 1000) - Events() channel size
//
// WARNING: Due to the buffering nature of channels (and queues in general) the
// use of the events channel risks receiving outdated events and
// messages. Minimizing go.events.channel.size reduces the risk
// and number of outdated events and messages but does not eliminate
// the factor completely. With a channel size of 1 at most one
// event or message may be outdated.
func NewConsumer(conf *ConfigMap) (*Consumer, error) {
err := versionCheck()
if err != nil {
return nil, err
}
// before we do anything with the configuration, create a copy such that
// the original is not mutated.
confCopy := conf.clone()
groupid, _ := confCopy.get("group.id", nil)
if groupid == nil {
// without a group.id the underlying cgrp subsystem in librdkafka wont get started
// and without it there is no way to consume assigned partitions.
// So for now require the group.id, this might change in the future.
return nil, newErrorFromString(ErrInvalidArg, "Required property group.id not set")
}
c := &Consumer{}
v, err := confCopy.extract("go.application.rebalance.enable", false)
if err != nil {
return nil, err
}
c.appRebalanceEnable = v.(bool)
v, err = confCopy.extract("go.events.channel.enable", false)
if err != nil {
return nil, err
}
c.eventsChanEnable = v.(bool)
v, err = confCopy.extract("go.events.channel.size", 1000)
if err != nil {
return nil, err
}
eventsChanSize := v.(int)
cConf, err := confCopy.convert()
if err != nil {
return nil, err
}
cErrstr := (*C.char)(C.malloc(C.size_t(256)))
defer C.free(unsafe.Pointer(cErrstr))
C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)
if c.handle.rk == nil {
return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
}
C.rd_kafka_poll_set_consumer(c.handle.rk)
c.handle.c = c
c.handle.setup()
c.handle.rkq = C.rd_kafka_queue_get_consumer(c.handle.rk)
if c.handle.rkq == nil {
// no cgrp (no group.id configured), revert to main queue.
c.handle.rkq = C.rd_kafka_queue_get_main(c.handle.rk)
}
if c.eventsChanEnable {
c.events = make(chan Event, eventsChanSize)
c.readerTermChan = make(chan bool)
/* Start rdkafka consumer queue reader -> events writer goroutine */
go consumerReader(c, c.readerTermChan)
}
return c, nil
}
// rebalance calls the application's rebalance callback, if any.
// Returns true if the underlying assignment was updated, else false.
func (c *Consumer) rebalance(ev Event) bool {
c.appReassigned = false
if c.rebalanceCb != nil {
c.rebalanceCb(c, ev)
}
return c.appReassigned
}
// consumerReader reads messages and events from the librdkafka consumer queue
// and posts them on the consumer channel.
// Runs until termChan closes
func consumerReader(c *Consumer, termChan chan bool) {
out:
for true {
select {
case _ = <-termChan:
break out
default:
_, term := c.handle.eventPoll(c.events, 100, 1000, termChan)
if term {
break out
}
}
}
c.handle.terminatedChan <- "consumerReader"
return
}
// GetMetadata queries broker for cluster and topic metadata.
// If topic is non-nil only information about that topic is returned, else if
// allTopics is false only information about locally used topics is returned,
// else information about all topics is returned.
// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
return getMetadata(c, topic, allTopics, timeoutMs)
}
// QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
// and partition.
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
return queryWatermarkOffsets(c, topic, partition, timeoutMs)
}
// OffsetsForTimes looks up offsets by timestamp for the given partitions.
//
// The returned offset for each partition is the earliest offset whose
// timestamp is greater than or equal to the given timestamp in the
// corresponding partition.
//
// The timestamps to query are represented as `.Offset` in the `times`
// argument and the looked up offsets are represented as `.Offset` in the returned
// `offsets` list.
//
// The function will block for at most timeoutMs milliseconds.
//
// Duplicate Topic+Partitions are not supported.
// Per-partition errors may be returned in the `.Error` field.
func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
return offsetsForTimes(c, times, timeoutMs)
}
// Subscription returns the current subscription as set by Subscribe()
func (c *Consumer) Subscription() (topics []string, err error) {
var cTopics *C.rd_kafka_topic_partition_list_t
cErr := C.rd_kafka_subscription(c.handle.rk, &cTopics)
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newError(cErr)
}
defer C.rd_kafka_topic_partition_list_destroy(cTopics)
topicCnt := int(cTopics.cnt)
topics = make([]string, topicCnt)
for i := 0; i < topicCnt; i++ {
crktpar := C._c_rdkafka_topic_partition_list_entry(cTopics,
C.int(i))
topics[i] = C.GoString(crktpar.topic)
}
return topics, nil
}
// Assignment returns the current partition assignments
func (c *Consumer) Assignment() (partitions []TopicPartition, err error) {
var cParts *C.rd_kafka_topic_partition_list_t
cErr := C.rd_kafka_assignment(c.handle.rk, &cParts)
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newError(cErr)
}
defer C.rd_kafka_topic_partition_list_destroy(cParts)
partitions = newTopicPartitionsFromCparts(cParts)
return partitions, nil
}
// Committed retrieves committed offsets for the given set of partitions
func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
cparts := newCPartsFromTopicPartitions(partitions)
defer C.rd_kafka_topic_partition_list_destroy(cparts)
cerr := C.rd_kafka_committed(c.handle.rk, cparts, C.int(timeoutMs))
if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newError(cerr)
}
return newTopicPartitionsFromCparts(cparts), nil
}
// Pause consumption for the provided list of partitions
//
// Note that messages already enqueued on the consumer's Event channel
// (if `go.events.channel.enable` has been set) will NOT be purged by
// this call, set `go.events.channel.size` accordingly.
func (c *Consumer) Pause(partitions []TopicPartition) (err error) {
cparts := newCPartsFromTopicPartitions(partitions)
defer C.rd_kafka_topic_partition_list_destroy(cparts)
cerr := C.rd_kafka_pause_partitions(c.handle.rk, cparts)
if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(cerr)
}
return nil
}
// Resume consumption for the provided list of partitions
func (c *Consumer) Resume(partitions []TopicPartition) (err error) {
cparts := newCPartsFromTopicPartitions(partitions)
defer C.rd_kafka_topic_partition_list_destroy(cparts)
cerr := C.rd_kafka_resume_partitions(c.handle.rk, cparts)
if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(cerr)
}
return nil
}