blob: 8a89edb1b7fd7491f84968df787386390e62b70c [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package kafka
2
3/**
4 * Copyright 2016 Confluent Inc.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19import (
20 "fmt"
21 "os"
22 "unsafe"
23)
24
25/*
26#include <stdlib.h>
27#include <librdkafka/rdkafka.h>
28#include "glue_rdkafka.h"
29
30
31#ifdef RD_KAFKA_V_HEADERS
32void chdrs_to_tmphdrs (rd_kafka_headers_t *chdrs, tmphdr_t *tmphdrs) {
33 size_t i = 0;
34 const char *name;
35 const void *val;
36 size_t size;
37
38 while (!rd_kafka_header_get_all(chdrs, i,
39 &tmphdrs[i].key,
40 &tmphdrs[i].val,
41 (size_t *)&tmphdrs[i].size))
42 i++;
43}
44#endif
45
46rd_kafka_event_t *_rk_queue_poll (rd_kafka_queue_t *rkq, int timeoutMs,
47 rd_kafka_event_type_t *evtype,
48 fetched_c_msg_t *fcMsg,
49 rd_kafka_event_t *prev_rkev) {
50 rd_kafka_event_t *rkev;
51
52 if (prev_rkev)
53 rd_kafka_event_destroy(prev_rkev);
54
55 rkev = rd_kafka_queue_poll(rkq, timeoutMs);
56 *evtype = rd_kafka_event_type(rkev);
57
58 if (*evtype == RD_KAFKA_EVENT_FETCH) {
59#ifdef RD_KAFKA_V_HEADERS
60 rd_kafka_headers_t *hdrs;
61#endif
62
63 fcMsg->msg = (rd_kafka_message_t *)rd_kafka_event_message_next(rkev);
64 fcMsg->ts = rd_kafka_message_timestamp(fcMsg->msg, &fcMsg->tstype);
65
66#ifdef RD_KAFKA_V_HEADERS
67 if (!rd_kafka_message_headers(fcMsg->msg, &hdrs)) {
68 fcMsg->tmphdrsCnt = rd_kafka_header_cnt(hdrs);
69 fcMsg->tmphdrs = malloc(sizeof(*fcMsg->tmphdrs) * fcMsg->tmphdrsCnt);
70 chdrs_to_tmphdrs(hdrs, fcMsg->tmphdrs);
71 } else {
72#else
73 if (1) {
74#endif
75 fcMsg->tmphdrs = NULL;
76 fcMsg->tmphdrsCnt = 0;
77 }
78 }
79 return rkev;
80}
81*/
82import "C"
83
84// Event generic interface
85type Event interface {
86 // String returns a human-readable representation of the event
87 String() string
88}
89
90// Specific event types
91
92// Stats statistics event
93type Stats struct {
94 statsJSON string
95}
96
97func (e Stats) String() string {
98 return e.statsJSON
99}
100
101// AssignedPartitions consumer group rebalance event: assigned partition set
102type AssignedPartitions struct {
103 Partitions []TopicPartition
104}
105
106func (e AssignedPartitions) String() string {
107 return fmt.Sprintf("AssignedPartitions: %v", e.Partitions)
108}
109
110// RevokedPartitions consumer group rebalance event: revoked partition set
111type RevokedPartitions struct {
112 Partitions []TopicPartition
113}
114
115func (e RevokedPartitions) String() string {
116 return fmt.Sprintf("RevokedPartitions: %v", e.Partitions)
117}
118
119// PartitionEOF consumer reached end of partition
120type PartitionEOF TopicPartition
121
122func (p PartitionEOF) String() string {
123 return fmt.Sprintf("EOF at %s", TopicPartition(p))
124}
125
126// OffsetsCommitted reports committed offsets
127type OffsetsCommitted struct {
128 Error error
129 Offsets []TopicPartition
130}
131
132func (o OffsetsCommitted) String() string {
133 return fmt.Sprintf("OffsetsCommitted (%v, %v)", o.Error, o.Offsets)
134}
135
136// eventPoll polls an event from the handler's C rd_kafka_queue_t,
137// translates it into an Event type and then sends on `channel` if non-nil, else returns the Event.
138// term_chan is an optional channel to monitor along with producing to channel
139// to indicate that `channel` is being terminated.
140// returns (event Event, terminate Bool) tuple, where Terminate indicates
141// if termChan received a termination event.
142func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) {
143
144 var prevRkev *C.rd_kafka_event_t
145 term := false
146
147 var retval Event
148
149 if channel == nil {
150 maxEvents = 1
151 }
152out:
153 for evcnt := 0; evcnt < maxEvents; evcnt++ {
154 var evtype C.rd_kafka_event_type_t
155 var fcMsg C.fetched_c_msg_t
156 rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)
157 prevRkev = rkev
158 timeoutMs = 0
159
160 retval = nil
161
162 switch evtype {
163 case C.RD_KAFKA_EVENT_FETCH:
164 // Consumer fetch event, new message.
165 // Extracted into temporary fcMsg for optimization
166 retval = h.newMessageFromFcMsg(&fcMsg)
167
168 case C.RD_KAFKA_EVENT_REBALANCE:
169 // Consumer rebalance event
170 // If the app provided a RebalanceCb to Subscribe*() or
171 // has go.application.rebalance.enable=true we create an event
172 // and forward it to the application thru the RebalanceCb or the
173 // Events channel respectively.
174 // Since librdkafka requires the rebalance event to be "acked" by
175 // the application to synchronize state we keep track of if the
176 // application performed Assign() or Unassign(), but this only works for
177 // the non-channel case. For the channel case we assume the application
178 // calls Assign() / Unassign().
179 // Failure to do so will "hang" the consumer, e.g., it wont start consuming
180 // and it wont close cleanly, so this error case should be visible
181 // immediately to the application developer.
182 appReassigned := false
183 if C.rd_kafka_event_error(rkev) == C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS {
184 if h.currAppRebalanceEnable {
185 // Application must perform Assign() call
186 var ev AssignedPartitions
187 ev.Partitions = newTopicPartitionsFromCparts(C.rd_kafka_event_topic_partition_list(rkev))
188 if channel != nil || h.c.rebalanceCb == nil {
189 retval = ev
190 appReassigned = true
191 } else {
192 appReassigned = h.c.rebalance(ev)
193 }
194 }
195
196 if !appReassigned {
197 C.rd_kafka_assign(h.rk, C.rd_kafka_event_topic_partition_list(rkev))
198 }
199 } else {
200 if h.currAppRebalanceEnable {
201 // Application must perform Unassign() call
202 var ev RevokedPartitions
203 ev.Partitions = newTopicPartitionsFromCparts(C.rd_kafka_event_topic_partition_list(rkev))
204 if channel != nil || h.c.rebalanceCb == nil {
205 retval = ev
206 appReassigned = true
207 } else {
208 appReassigned = h.c.rebalance(ev)
209 }
210 }
211
212 if !appReassigned {
213 C.rd_kafka_assign(h.rk, nil)
214 }
215 }
216
217 case C.RD_KAFKA_EVENT_ERROR:
218 // Error event
219 cErr := C.rd_kafka_event_error(rkev)
220 switch cErr {
221 case C.RD_KAFKA_RESP_ERR__PARTITION_EOF:
222 crktpar := C.rd_kafka_event_topic_partition(rkev)
223 if crktpar == nil {
224 break
225 }
226
227 defer C.rd_kafka_topic_partition_destroy(crktpar)
228 var peof PartitionEOF
229 setupTopicPartitionFromCrktpar((*TopicPartition)(&peof), crktpar)
230
231 retval = peof
232 default:
233 retval = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
234 }
235
236 case C.RD_KAFKA_EVENT_STATS:
237 retval = &Stats{C.GoString(C.rd_kafka_event_stats(rkev))}
238
239 case C.RD_KAFKA_EVENT_DR:
240 // Producer Delivery Report event
241 // Each such event contains delivery reports for all
242 // messages in the produced batch.
243 // Forward delivery reports to per-message's response channel
244 // or to the global Producer.Events channel, or none.
245 rkmessages := make([]*C.rd_kafka_message_t, int(C.rd_kafka_event_message_count(rkev)))
246
247 cnt := int(C.rd_kafka_event_message_array(rkev, (**C.rd_kafka_message_t)(unsafe.Pointer(&rkmessages[0])), C.size_t(len(rkmessages))))
248
249 for _, rkmessage := range rkmessages[:cnt] {
250 msg := h.newMessageFromC(rkmessage)
251 var ch *chan Event
252
253 if rkmessage._private != nil {
254 // Find cgoif by id
255 cg, found := h.cgoGet((int)((uintptr)(rkmessage._private)))
256 if found {
257 cdr := cg.(cgoDr)
258
259 if cdr.deliveryChan != nil {
260 ch = &cdr.deliveryChan
261 }
262 msg.Opaque = cdr.opaque
263 }
264 }
265
266 if ch == nil && h.fwdDr {
267 ch = &channel
268 }
269
270 if ch != nil {
271 select {
272 case *ch <- msg:
273 case <-termChan:
274 break out
275 }
276
277 } else {
278 retval = msg
279 break out
280 }
281 }
282
283 case C.RD_KAFKA_EVENT_OFFSET_COMMIT:
284 // Offsets committed
285 cErr := C.rd_kafka_event_error(rkev)
286 coffsets := C.rd_kafka_event_topic_partition_list(rkev)
287 var offsets []TopicPartition
288 if coffsets != nil {
289 offsets = newTopicPartitionsFromCparts(coffsets)
290 }
291
292 if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
293 retval = OffsetsCommitted{newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev)), offsets}
294 } else {
295 retval = OffsetsCommitted{nil, offsets}
296 }
297
298 case C.RD_KAFKA_EVENT_NONE:
299 // poll timed out: no events available
300 break out
301
302 default:
303 if rkev != nil {
304 fmt.Fprintf(os.Stderr, "Ignored event %s\n",
305 C.GoString(C.rd_kafka_event_name(rkev)))
306 }
307
308 }
309
310 if retval != nil {
311 if channel != nil {
312 select {
313 case channel <- retval:
314 case <-termChan:
315 retval = nil
316 term = true
317 break out
318 }
319 } else {
320 break out
321 }
322 }
323 }
324
325 if prevRkev != nil {
326 C.rd_kafka_event_destroy(prevRkev)
327 }
328
329 return retval, term
330}