blob: 7eac912615b4bf3ac1c46a410b437934c8ddd533 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/**
2 * Copyright 2016 Confluent Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kafka
18
19import (
20 "fmt"
21 "math"
22 "time"
23 "unsafe"
24)
25
26/*
27#include <stdlib.h>
28#include <librdkafka/rdkafka.h>
29#include "glue_rdkafka.h"
30
31
32#ifdef RD_KAFKA_V_HEADERS
33// Convert tmphdrs to chdrs (created by this function).
34// If tmphdr.size == -1: value is considered Null
35// tmphdr.size == 0: value is considered empty (ignored)
36// tmphdr.size > 0: value is considered non-empty
37//
38// WARNING: The header keys and values will be freed by this function.
39void tmphdrs_to_chdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt,
40 rd_kafka_headers_t **chdrs) {
41 size_t i;
42
43 *chdrs = rd_kafka_headers_new(tmphdrsCnt);
44
45 for (i = 0 ; i < tmphdrsCnt ; i++) {
46 rd_kafka_header_add(*chdrs,
47 tmphdrs[i].key, -1,
48 tmphdrs[i].size == -1 ? NULL :
49 (tmphdrs[i].size == 0 ? "" : tmphdrs[i].val),
50 tmphdrs[i].size == -1 ? 0 : tmphdrs[i].size);
51 if (tmphdrs[i].size > 0)
52 free((void *)tmphdrs[i].val);
53 free((void *)tmphdrs[i].key);
54 }
55}
56
57#else
58void free_tmphdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt) {
59 size_t i;
60 for (i = 0 ; i < tmphdrsCnt ; i++) {
61 if (tmphdrs[i].size > 0)
62 free((void *)tmphdrs[i].val);
63 free((void *)tmphdrs[i].key);
64 }
65}
66#endif
67
68
69rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
70 rd_kafka_topic_t *rkt, int32_t partition,
71 int msgflags,
72 int valIsNull, void *val, size_t val_len,
73 int keyIsNull, void *key, size_t key_len,
74 int64_t timestamp,
75 tmphdr_t *tmphdrs, size_t tmphdrsCnt,
76 uintptr_t cgoid) {
77 void *valp = valIsNull ? NULL : val;
78 void *keyp = keyIsNull ? NULL : key;
79#ifdef RD_KAFKA_V_TIMESTAMP
80rd_kafka_resp_err_t err;
81#ifdef RD_KAFKA_V_HEADERS
82 rd_kafka_headers_t *hdrs = NULL;
83#endif
84#endif
85
86
87 if (tmphdrsCnt > 0) {
88#ifdef RD_KAFKA_V_HEADERS
89 tmphdrs_to_chdrs(tmphdrs, tmphdrsCnt, &hdrs);
90#else
91 free_tmphdrs(tmphdrs, tmphdrsCnt);
92 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
93#endif
94 }
95
96
97#ifdef RD_KAFKA_V_TIMESTAMP
98 err = rd_kafka_producev(rk,
99 RD_KAFKA_V_RKT(rkt),
100 RD_KAFKA_V_PARTITION(partition),
101 RD_KAFKA_V_MSGFLAGS(msgflags),
102 RD_KAFKA_V_VALUE(valp, val_len),
103 RD_KAFKA_V_KEY(keyp, key_len),
104 RD_KAFKA_V_TIMESTAMP(timestamp),
105#ifdef RD_KAFKA_V_HEADERS
106 RD_KAFKA_V_HEADERS(hdrs),
107#endif
108 RD_KAFKA_V_OPAQUE((void *)cgoid),
109 RD_KAFKA_V_END);
110#ifdef RD_KAFKA_V_HEADERS
111 if (err && hdrs)
112 rd_kafka_headers_destroy(hdrs);
113#endif
114 return err;
115#else
116 if (timestamp)
117 return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
118 if (rd_kafka_produce(rkt, partition, msgflags,
119 valp, val_len,
120 keyp, key_len,
121 (void *)cgoid) == -1)
122 return rd_kafka_last_error();
123 else
124 return RD_KAFKA_RESP_ERR_NO_ERROR;
125#endif
126}
127*/
128import "C"
129
130// Producer implements a High-level Apache Kafka Producer instance
131type Producer struct {
132 events chan Event
133 produceChannel chan *Message
134 handle handle
135
136 // Terminates the poller() goroutine
137 pollerTermChan chan bool
138}
139
140// String returns a human readable name for a Producer instance
141func (p *Producer) String() string {
142 return p.handle.String()
143}
144
145// get_handle implements the Handle interface
146func (p *Producer) gethandle() *handle {
147 return &p.handle
148}
149
150func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error {
151 if msg == nil || msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 {
152 return newErrorFromString(ErrInvalidArg, "")
153 }
154
155 crkt := p.handle.getRkt(*msg.TopicPartition.Topic)
156
157 // Three problems:
158 // 1) There's a difference between an empty Value or Key (length 0, proper pointer) and
159 // a null Value or Key (length 0, null pointer).
160 // 2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0])
161 // dereference can't be performed on a nil slice.
162 // 3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made
163 // in the call to the C function.
164 //
165 // Solution:
166 // Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers
167 // point to a 1-byte slice (but the length to send is still 0) so that the dereference (2)
168 // works.
169 // Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point
170 // to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3).
171 //
172 var valp []byte
173 var keyp []byte
174 oneByte := []byte{0}
175 var valIsNull C.int
176 var keyIsNull C.int
177 var valLen int
178 var keyLen int
179
180 if msg.Value == nil {
181 valIsNull = 1
182 valLen = 0
183 valp = oneByte
184 } else {
185 valLen = len(msg.Value)
186 if valLen > 0 {
187 valp = msg.Value
188 } else {
189 valp = oneByte
190 }
191 }
192
193 if msg.Key == nil {
194 keyIsNull = 1
195 keyLen = 0
196 keyp = oneByte
197 } else {
198 keyLen = len(msg.Key)
199 if keyLen > 0 {
200 keyp = msg.Key
201 } else {
202 keyp = oneByte
203 }
204 }
205
206 var cgoid int
207
208 // Per-message state that needs to be retained through the C code:
209 // delivery channel (if specified)
210 // message opaque (if specified)
211 // Since these cant be passed as opaque pointers to the C code,
212 // due to cgo constraints, we add them to a per-producer map for lookup
213 // when the C code triggers the callbacks or events.
214 if deliveryChan != nil || msg.Opaque != nil {
215 cgoid = p.handle.cgoPut(cgoDr{deliveryChan: deliveryChan, opaque: msg.Opaque})
216 }
217
218 var timestamp int64
219 if !msg.Timestamp.IsZero() {
220 timestamp = msg.Timestamp.UnixNano() / 1000000
221 }
222
223 // Convert headers to C-friendly tmphdrs
224 var tmphdrs []C.tmphdr_t
225 tmphdrsCnt := len(msg.Headers)
226
227 if tmphdrsCnt > 0 {
228 tmphdrs = make([]C.tmphdr_t, tmphdrsCnt)
229
230 for n, hdr := range msg.Headers {
231 // Make a copy of the key
232 // to avoid runtime panic with
233 // foreign Go pointers in cgo.
234 tmphdrs[n].key = C.CString(hdr.Key)
235 if hdr.Value != nil {
236 tmphdrs[n].size = C.ssize_t(len(hdr.Value))
237 if tmphdrs[n].size > 0 {
238 // Make a copy of the value
239 // to avoid runtime panic with
240 // foreign Go pointers in cgo.
241 tmphdrs[n].val = C.CBytes(hdr.Value)
242 }
243 } else {
244 // null value
245 tmphdrs[n].size = C.ssize_t(-1)
246 }
247 }
248 } else {
249 // no headers, need a dummy tmphdrs of size 1 to avoid index
250 // out of bounds panic in do_produce() call below.
251 // tmphdrsCnt will be 0.
252 tmphdrs = []C.tmphdr_t{{nil, nil, 0}}
253 }
254
255 cErr := C.do_produce(p.handle.rk, crkt,
256 C.int32_t(msg.TopicPartition.Partition),
257 C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
258 valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
259 keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
260 C.int64_t(timestamp),
261 (*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
262 (C.uintptr_t)(cgoid))
263 if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
264 if cgoid != 0 {
265 p.handle.cgoGet(cgoid)
266 }
267 return newError(cErr)
268 }
269
270 return nil
271}
272
273// Produce single message.
274// This is an asynchronous call that enqueues the message on the internal
275// transmit queue, thus returning immediately.
276// The delivery report will be sent on the provided deliveryChan if specified,
277// or on the Producer object's Events() channel if not.
278// msg.Timestamp requires librdkafka >= 0.9.4 (else returns ErrNotImplemented),
279// api.version.request=true, and broker >= 0.10.0.0.
280// msg.Headers requires librdkafka >= 0.11.4 (else returns ErrNotImplemented),
281// api.version.request=true, and broker >= 0.11.0.0.
282// Returns an error if message could not be enqueued.
283func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error {
284 return p.produce(msg, 0, deliveryChan)
285}
286
287// Produce a batch of messages.
288// These batches do not relate to the message batches sent to the broker, the latter
289// are collected on the fly internally in librdkafka.
290// WARNING: This is an experimental API.
291// NOTE: timestamps and headers are not supported with this API.
292func (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) error {
293 crkt := p.handle.getRkt(topic)
294
295 cmsgs := make([]C.rd_kafka_message_t, len(msgs))
296 for i, m := range msgs {
297 p.handle.messageToC(m, &cmsgs[i])
298 }
299 r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE,
300 (*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs)))
301 if r == -1 {
302 return newError(C.rd_kafka_last_error())
303 }
304
305 return nil
306}
307
308// Events returns the Events channel (read)
309func (p *Producer) Events() chan Event {
310 return p.events
311}
312
313// ProduceChannel returns the produce *Message channel (write)
314func (p *Producer) ProduceChannel() chan *Message {
315 return p.produceChannel
316}
317
318// Len returns the number of messages and requests waiting to be transmitted to the broker
319// as well as delivery reports queued for the application.
320// Includes messages on ProduceChannel.
321func (p *Producer) Len() int {
322 return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk))
323}
324
325// Flush and wait for outstanding messages and requests to complete delivery.
326// Includes messages on ProduceChannel.
327// Runs until value reaches zero or on timeoutMs.
328// Returns the number of outstanding events still un-flushed.
329func (p *Producer) Flush(timeoutMs int) int {
330 termChan := make(chan bool) // unused stand-in termChan
331
332 d, _ := time.ParseDuration(fmt.Sprintf("%dms", timeoutMs))
333 tEnd := time.Now().Add(d)
334 for p.Len() > 0 {
335 remain := tEnd.Sub(time.Now()).Seconds()
336 if remain <= 0.0 {
337 return p.Len()
338 }
339
340 p.handle.eventPoll(p.events,
341 int(math.Min(100, remain*1000)), 1000, termChan)
342 }
343
344 return 0
345}
346
347// Close a Producer instance.
348// The Producer object or its channels are no longer usable after this call.
349func (p *Producer) Close() {
350 // Wait for poller() (signaled by closing pollerTermChan)
351 // and channel_producer() (signaled by closing ProduceChannel)
352 close(p.pollerTermChan)
353 close(p.produceChannel)
354 p.handle.waitTerminated(2)
355
356 close(p.events)
357
358 p.handle.cleanup()
359
360 C.rd_kafka_destroy(p.handle.rk)
361}
362
363// NewProducer creates a new high-level Producer instance.
364//
365// conf is a *ConfigMap with standard librdkafka configuration properties, see here:
366//
367//
368//
369//
370//
371// Supported special configuration properties:
372// go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance).
373// These batches do not relate to Kafka message batches in any way.
374// Note: timestamps and headers are not supported with this interface.
375// go.delivery.reports (bool, true) - Forward per-message delivery reports to the
376// Events() channel.
377// go.events.channel.size (int, 1000000) - Events() channel size
378// go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
379//
380func NewProducer(conf *ConfigMap) (*Producer, error) {
381
382 err := versionCheck()
383 if err != nil {
384 return nil, err
385 }
386
387 p := &Producer{}
388
389 // before we do anything with the configuration, create a copy such that
390 // the original is not mutated.
391 confCopy := conf.clone()
392
393 v, err := confCopy.extract("go.batch.producer", false)
394 if err != nil {
395 return nil, err
396 }
397 batchProducer := v.(bool)
398
399 v, err = confCopy.extract("go.delivery.reports", true)
400 if err != nil {
401 return nil, err
402 }
403 p.handle.fwdDr = v.(bool)
404
405 v, err = confCopy.extract("go.events.channel.size", 1000000)
406 if err != nil {
407 return nil, err
408 }
409 eventsChanSize := v.(int)
410
411 v, err = confCopy.extract("go.produce.channel.size", 1000000)
412 if err != nil {
413 return nil, err
414 }
415 produceChannelSize := v.(int)
416
417 if int(C.rd_kafka_version()) < 0x01000000 {
418 // produce.offset.report is no longer used in librdkafka >= v1.0.0
419 v, _ = confCopy.extract("{topic}.produce.offset.report", nil)
420 if v == nil {
421 // Enable offset reporting by default, unless overriden.
422 confCopy.SetKey("{topic}.produce.offset.report", true)
423 }
424 }
425
426 // Convert ConfigMap to librdkafka conf_t
427 cConf, err := confCopy.convert()
428 if err != nil {
429 return nil, err
430 }
431
432 cErrstr := (*C.char)(C.malloc(C.size_t(256)))
433 defer C.free(unsafe.Pointer(cErrstr))
434
435 C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
436
437 // Create librdkafka producer instance
438 p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
439 if p.handle.rk == nil {
440 return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
441 }
442
443 p.handle.p = p
444 p.handle.setup()
445 p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk)
446 p.events = make(chan Event, eventsChanSize)
447 p.produceChannel = make(chan *Message, produceChannelSize)
448 p.pollerTermChan = make(chan bool)
449
450 go poller(p, p.pollerTermChan)
451
452 // non-batch or batch producer, only one must be used
453 if batchProducer {
454 go channelBatchProducer(p)
455 } else {
456 go channelProducer(p)
457 }
458
459 return p, nil
460}
461
462// channel_producer serves the ProduceChannel channel
463func channelProducer(p *Producer) {
464
465 for m := range p.produceChannel {
466 err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil)
467 if err != nil {
468 m.TopicPartition.Error = err
469 p.events <- m
470 }
471 }
472
473 p.handle.terminatedChan <- "channelProducer"
474}
475
476// channelBatchProducer serves the ProduceChannel channel and attempts to
477// improve cgo performance by using the produceBatch() interface.
478func channelBatchProducer(p *Producer) {
479 var buffered = make(map[string][]*Message)
480 bufferedCnt := 0
481 const batchSize int = 1000000
482 totMsgCnt := 0
483 totBatchCnt := 0
484
485 for m := range p.produceChannel {
486 buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
487 bufferedCnt++
488
489 loop2:
490 for true {
491 select {
492 case m, ok := <-p.produceChannel:
493 if !ok {
494 break loop2
495 }
496 if m == nil {
497 panic("nil message received on ProduceChannel")
498 }
499 if m.TopicPartition.Topic == nil {
500 panic(fmt.Sprintf("message without Topic received on ProduceChannel: %v", m))
501 }
502 buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
503 bufferedCnt++
504 if bufferedCnt >= batchSize {
505 break loop2
506 }
507 default:
508 break loop2
509 }
510 }
511
512 totBatchCnt++
513 totMsgCnt += len(buffered)
514
515 for topic, buffered2 := range buffered {
516 err := p.produceBatch(topic, buffered2, C.RD_KAFKA_MSG_F_BLOCK)
517 if err != nil {
518 for _, m = range buffered2 {
519 m.TopicPartition.Error = err
520 p.events <- m
521 }
522 }
523 }
524
525 buffered = make(map[string][]*Message)
526 bufferedCnt = 0
527 }
528 p.handle.terminatedChan <- "channelBatchProducer"
529}
530
531// poller polls the rd_kafka_t handle for events until signalled for termination
532func poller(p *Producer, termChan chan bool) {
533out:
534 for true {
535 select {
536 case _ = <-termChan:
537 break out
538
539 default:
540 _, term := p.handle.eventPoll(p.events, 100, 1000, termChan)
541 if term {
542 break out
543 }
544 break
545 }
546 }
547
548 p.handle.terminatedChan <- "poller"
549
550}
551
552// GetMetadata queries broker for cluster and topic metadata.
553// If topic is non-nil only information about that topic is returned, else if
554// allTopics is false only information about locally used topics is returned,
555// else information about all topics is returned.
556// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
557func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
558 return getMetadata(p, topic, allTopics, timeoutMs)
559}
560
561// QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
562// and partition.
563func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
564 return queryWatermarkOffsets(p, topic, partition, timeoutMs)
565}
566
567// OffsetsForTimes looks up offsets by timestamp for the given partitions.
568//
569// The returned offset for each partition is the earliest offset whose
570// timestamp is greater than or equal to the given timestamp in the
571// corresponding partition.
572//
573// The timestamps to query are represented as `.Offset` in the `times`
574// argument and the looked up offsets are represented as `.Offset` in the returned
575// `offsets` list.
576//
577// The function will block for at most timeoutMs milliseconds.
578//
579// Duplicate Topic+Partitions are not supported.
580// Per-partition errors may be returned in the `.Error` field.
581func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
582 return offsetsForTimes(p, times, timeoutMs)
583}