blob: 8a89edb1b7fd7491f84968df787386390e62b70c [file] [log] [blame]
package kafka
/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import (
"fmt"
"os"
"unsafe"
)
/*
#include <stdlib.h>
#include <librdkafka/rdkafka.h>
#include "glue_rdkafka.h"
#ifdef RD_KAFKA_V_HEADERS
void chdrs_to_tmphdrs (rd_kafka_headers_t *chdrs, tmphdr_t *tmphdrs) {
size_t i = 0;
const char *name;
const void *val;
size_t size;
while (!rd_kafka_header_get_all(chdrs, i,
&tmphdrs[i].key,
&tmphdrs[i].val,
(size_t *)&tmphdrs[i].size))
i++;
}
#endif
rd_kafka_event_t *_rk_queue_poll (rd_kafka_queue_t *rkq, int timeoutMs,
rd_kafka_event_type_t *evtype,
fetched_c_msg_t *fcMsg,
rd_kafka_event_t *prev_rkev) {
rd_kafka_event_t *rkev;
if (prev_rkev)
rd_kafka_event_destroy(prev_rkev);
rkev = rd_kafka_queue_poll(rkq, timeoutMs);
*evtype = rd_kafka_event_type(rkev);
if (*evtype == RD_KAFKA_EVENT_FETCH) {
#ifdef RD_KAFKA_V_HEADERS
rd_kafka_headers_t *hdrs;
#endif
fcMsg->msg = (rd_kafka_message_t *)rd_kafka_event_message_next(rkev);
fcMsg->ts = rd_kafka_message_timestamp(fcMsg->msg, &fcMsg->tstype);
#ifdef RD_KAFKA_V_HEADERS
if (!rd_kafka_message_headers(fcMsg->msg, &hdrs)) {
fcMsg->tmphdrsCnt = rd_kafka_header_cnt(hdrs);
fcMsg->tmphdrs = malloc(sizeof(*fcMsg->tmphdrs) * fcMsg->tmphdrsCnt);
chdrs_to_tmphdrs(hdrs, fcMsg->tmphdrs);
} else {
#else
if (1) {
#endif
fcMsg->tmphdrs = NULL;
fcMsg->tmphdrsCnt = 0;
}
}
return rkev;
}
*/
import "C"
// Event generic interface
type Event interface {
// String returns a human-readable representation of the event
String() string
}
// Specific event types
// Stats statistics event
type Stats struct {
statsJSON string
}
func (e Stats) String() string {
return e.statsJSON
}
// AssignedPartitions consumer group rebalance event: assigned partition set
type AssignedPartitions struct {
Partitions []TopicPartition
}
func (e AssignedPartitions) String() string {
return fmt.Sprintf("AssignedPartitions: %v", e.Partitions)
}
// RevokedPartitions consumer group rebalance event: revoked partition set
type RevokedPartitions struct {
Partitions []TopicPartition
}
func (e RevokedPartitions) String() string {
return fmt.Sprintf("RevokedPartitions: %v", e.Partitions)
}
// PartitionEOF consumer reached end of partition
type PartitionEOF TopicPartition
func (p PartitionEOF) String() string {
return fmt.Sprintf("EOF at %s", TopicPartition(p))
}
// OffsetsCommitted reports committed offsets
type OffsetsCommitted struct {
Error error
Offsets []TopicPartition
}
func (o OffsetsCommitted) String() string {
return fmt.Sprintf("OffsetsCommitted (%v, %v)", o.Error, o.Offsets)
}
// eventPoll polls an event from the handler's C rd_kafka_queue_t,
// translates it into an Event type and then sends on `channel` if non-nil, else returns the Event.
// term_chan is an optional channel to monitor along with producing to channel
// to indicate that `channel` is being terminated.
// returns (event Event, terminate Bool) tuple, where Terminate indicates
// if termChan received a termination event.
func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) {
var prevRkev *C.rd_kafka_event_t
term := false
var retval Event
if channel == nil {
maxEvents = 1
}
out:
for evcnt := 0; evcnt < maxEvents; evcnt++ {
var evtype C.rd_kafka_event_type_t
var fcMsg C.fetched_c_msg_t
rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)
prevRkev = rkev
timeoutMs = 0
retval = nil
switch evtype {
case C.RD_KAFKA_EVENT_FETCH:
// Consumer fetch event, new message.
// Extracted into temporary fcMsg for optimization
retval = h.newMessageFromFcMsg(&fcMsg)
case C.RD_KAFKA_EVENT_REBALANCE:
// Consumer rebalance event
// If the app provided a RebalanceCb to Subscribe*() or
// has go.application.rebalance.enable=true we create an event
// and forward it to the application thru the RebalanceCb or the
// Events channel respectively.
// Since librdkafka requires the rebalance event to be "acked" by
// the application to synchronize state we keep track of if the
// application performed Assign() or Unassign(), but this only works for
// the non-channel case. For the channel case we assume the application
// calls Assign() / Unassign().
// Failure to do so will "hang" the consumer, e.g., it wont start consuming
// and it wont close cleanly, so this error case should be visible
// immediately to the application developer.
appReassigned := false
if C.rd_kafka_event_error(rkev) == C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS {
if h.currAppRebalanceEnable {
// Application must perform Assign() call
var ev AssignedPartitions
ev.Partitions = newTopicPartitionsFromCparts(C.rd_kafka_event_topic_partition_list(rkev))
if channel != nil || h.c.rebalanceCb == nil {
retval = ev
appReassigned = true
} else {
appReassigned = h.c.rebalance(ev)
}
}
if !appReassigned {
C.rd_kafka_assign(h.rk, C.rd_kafka_event_topic_partition_list(rkev))
}
} else {
if h.currAppRebalanceEnable {
// Application must perform Unassign() call
var ev RevokedPartitions
ev.Partitions = newTopicPartitionsFromCparts(C.rd_kafka_event_topic_partition_list(rkev))
if channel != nil || h.c.rebalanceCb == nil {
retval = ev
appReassigned = true
} else {
appReassigned = h.c.rebalance(ev)
}
}
if !appReassigned {
C.rd_kafka_assign(h.rk, nil)
}
}
case C.RD_KAFKA_EVENT_ERROR:
// Error event
cErr := C.rd_kafka_event_error(rkev)
switch cErr {
case C.RD_KAFKA_RESP_ERR__PARTITION_EOF:
crktpar := C.rd_kafka_event_topic_partition(rkev)
if crktpar == nil {
break
}
defer C.rd_kafka_topic_partition_destroy(crktpar)
var peof PartitionEOF
setupTopicPartitionFromCrktpar((*TopicPartition)(&peof), crktpar)
retval = peof
default:
retval = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
}
case C.RD_KAFKA_EVENT_STATS:
retval = &Stats{C.GoString(C.rd_kafka_event_stats(rkev))}
case C.RD_KAFKA_EVENT_DR:
// Producer Delivery Report event
// Each such event contains delivery reports for all
// messages in the produced batch.
// Forward delivery reports to per-message's response channel
// or to the global Producer.Events channel, or none.
rkmessages := make([]*C.rd_kafka_message_t, int(C.rd_kafka_event_message_count(rkev)))
cnt := int(C.rd_kafka_event_message_array(rkev, (**C.rd_kafka_message_t)(unsafe.Pointer(&rkmessages[0])), C.size_t(len(rkmessages))))
for _, rkmessage := range rkmessages[:cnt] {
msg := h.newMessageFromC(rkmessage)
var ch *chan Event
if rkmessage._private != nil {
// Find cgoif by id
cg, found := h.cgoGet((int)((uintptr)(rkmessage._private)))
if found {
cdr := cg.(cgoDr)
if cdr.deliveryChan != nil {
ch = &cdr.deliveryChan
}
msg.Opaque = cdr.opaque
}
}
if ch == nil && h.fwdDr {
ch = &channel
}
if ch != nil {
select {
case *ch <- msg:
case <-termChan:
break out
}
} else {
retval = msg
break out
}
}
case C.RD_KAFKA_EVENT_OFFSET_COMMIT:
// Offsets committed
cErr := C.rd_kafka_event_error(rkev)
coffsets := C.rd_kafka_event_topic_partition_list(rkev)
var offsets []TopicPartition
if coffsets != nil {
offsets = newTopicPartitionsFromCparts(coffsets)
}
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
retval = OffsetsCommitted{newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev)), offsets}
} else {
retval = OffsetsCommitted{nil, offsets}
}
case C.RD_KAFKA_EVENT_NONE:
// poll timed out: no events available
break out
default:
if rkev != nil {
fmt.Fprintf(os.Stderr, "Ignored event %s\n",
C.GoString(C.rd_kafka_event_name(rkev)))
}
}
if retval != nil {
if channel != nil {
select {
case channel <- retval:
case <-termChan:
retval = nil
term = true
break out
}
} else {
break out
}
}
}
if prevRkev != nil {
C.rd_kafka_event_destroy(prevRkev)
}
return retval, term
}