khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame^] | 1 | /** |
| 2 | * Copyright 2016 Confluent Inc. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package kafka |
| 18 | |
| 19 | import ( |
| 20 | "fmt" |
| 21 | "math" |
| 22 | "time" |
| 23 | "unsafe" |
| 24 | ) |
| 25 | |
| 26 | /* |
| 27 | #include <stdlib.h> |
| 28 | #include <librdkafka/rdkafka.h> |
| 29 | #include "glue_rdkafka.h" |
| 30 | |
| 31 | |
| 32 | #ifdef RD_KAFKA_V_HEADERS |
| 33 | // Convert tmphdrs to chdrs (created by this function). |
| 34 | // If tmphdr.size == -1: value is considered Null |
| 35 | // tmphdr.size == 0: value is considered empty (ignored) |
| 36 | // tmphdr.size > 0: value is considered non-empty |
| 37 | // |
| 38 | // WARNING: The header keys and values will be freed by this function. |
| 39 | void tmphdrs_to_chdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt, |
| 40 | rd_kafka_headers_t **chdrs) { |
| 41 | size_t i; |
| 42 | |
| 43 | *chdrs = rd_kafka_headers_new(tmphdrsCnt); |
| 44 | |
| 45 | for (i = 0 ; i < tmphdrsCnt ; i++) { |
| 46 | rd_kafka_header_add(*chdrs, |
| 47 | tmphdrs[i].key, -1, |
| 48 | tmphdrs[i].size == -1 ? NULL : |
| 49 | (tmphdrs[i].size == 0 ? "" : tmphdrs[i].val), |
| 50 | tmphdrs[i].size == -1 ? 0 : tmphdrs[i].size); |
| 51 | if (tmphdrs[i].size > 0) |
| 52 | free((void *)tmphdrs[i].val); |
| 53 | free((void *)tmphdrs[i].key); |
| 54 | } |
| 55 | } |
| 56 | |
| 57 | #else |
| 58 | void free_tmphdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt) { |
| 59 | size_t i; |
| 60 | for (i = 0 ; i < tmphdrsCnt ; i++) { |
| 61 | if (tmphdrs[i].size > 0) |
| 62 | free((void *)tmphdrs[i].val); |
| 63 | free((void *)tmphdrs[i].key); |
| 64 | } |
| 65 | } |
| 66 | #endif |
| 67 | |
| 68 | |
| 69 | rd_kafka_resp_err_t do_produce (rd_kafka_t *rk, |
| 70 | rd_kafka_topic_t *rkt, int32_t partition, |
| 71 | int msgflags, |
| 72 | int valIsNull, void *val, size_t val_len, |
| 73 | int keyIsNull, void *key, size_t key_len, |
| 74 | int64_t timestamp, |
| 75 | tmphdr_t *tmphdrs, size_t tmphdrsCnt, |
| 76 | uintptr_t cgoid) { |
| 77 | void *valp = valIsNull ? NULL : val; |
| 78 | void *keyp = keyIsNull ? NULL : key; |
| 79 | #ifdef RD_KAFKA_V_TIMESTAMP |
| 80 | rd_kafka_resp_err_t err; |
| 81 | #ifdef RD_KAFKA_V_HEADERS |
| 82 | rd_kafka_headers_t *hdrs = NULL; |
| 83 | #endif |
| 84 | #endif |
| 85 | |
| 86 | |
| 87 | if (tmphdrsCnt > 0) { |
| 88 | #ifdef RD_KAFKA_V_HEADERS |
| 89 | tmphdrs_to_chdrs(tmphdrs, tmphdrsCnt, &hdrs); |
| 90 | #else |
| 91 | free_tmphdrs(tmphdrs, tmphdrsCnt); |
| 92 | return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; |
| 93 | #endif |
| 94 | } |
| 95 | |
| 96 | |
| 97 | #ifdef RD_KAFKA_V_TIMESTAMP |
| 98 | err = rd_kafka_producev(rk, |
| 99 | RD_KAFKA_V_RKT(rkt), |
| 100 | RD_KAFKA_V_PARTITION(partition), |
| 101 | RD_KAFKA_V_MSGFLAGS(msgflags), |
| 102 | RD_KAFKA_V_VALUE(valp, val_len), |
| 103 | RD_KAFKA_V_KEY(keyp, key_len), |
| 104 | RD_KAFKA_V_TIMESTAMP(timestamp), |
| 105 | #ifdef RD_KAFKA_V_HEADERS |
| 106 | RD_KAFKA_V_HEADERS(hdrs), |
| 107 | #endif |
| 108 | RD_KAFKA_V_OPAQUE((void *)cgoid), |
| 109 | RD_KAFKA_V_END); |
| 110 | #ifdef RD_KAFKA_V_HEADERS |
| 111 | if (err && hdrs) |
| 112 | rd_kafka_headers_destroy(hdrs); |
| 113 | #endif |
| 114 | return err; |
| 115 | #else |
| 116 | if (timestamp) |
| 117 | return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; |
| 118 | if (rd_kafka_produce(rkt, partition, msgflags, |
| 119 | valp, val_len, |
| 120 | keyp, key_len, |
| 121 | (void *)cgoid) == -1) |
| 122 | return rd_kafka_last_error(); |
| 123 | else |
| 124 | return RD_KAFKA_RESP_ERR_NO_ERROR; |
| 125 | #endif |
| 126 | } |
| 127 | */ |
| 128 | import "C" |
| 129 | |
| 130 | // Producer implements a High-level Apache Kafka Producer instance |
| 131 | type Producer struct { |
| 132 | events chan Event |
| 133 | produceChannel chan *Message |
| 134 | handle handle |
| 135 | |
| 136 | // Terminates the poller() goroutine |
| 137 | pollerTermChan chan bool |
| 138 | } |
| 139 | |
| 140 | // String returns a human readable name for a Producer instance |
| 141 | func (p *Producer) String() string { |
| 142 | return p.handle.String() |
| 143 | } |
| 144 | |
| 145 | // get_handle implements the Handle interface |
| 146 | func (p *Producer) gethandle() *handle { |
| 147 | return &p.handle |
| 148 | } |
| 149 | |
| 150 | func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error { |
| 151 | if msg == nil || msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 { |
| 152 | return newErrorFromString(ErrInvalidArg, "") |
| 153 | } |
| 154 | |
| 155 | crkt := p.handle.getRkt(*msg.TopicPartition.Topic) |
| 156 | |
| 157 | // Three problems: |
| 158 | // 1) There's a difference between an empty Value or Key (length 0, proper pointer) and |
| 159 | // a null Value or Key (length 0, null pointer). |
| 160 | // 2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0]) |
| 161 | // dereference can't be performed on a nil slice. |
| 162 | // 3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made |
| 163 | // in the call to the C function. |
| 164 | // |
| 165 | // Solution: |
| 166 | // Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers |
| 167 | // point to a 1-byte slice (but the length to send is still 0) so that the dereference (2) |
| 168 | // works. |
| 169 | // Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point |
| 170 | // to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3). |
| 171 | // |
| 172 | var valp []byte |
| 173 | var keyp []byte |
| 174 | oneByte := []byte{0} |
| 175 | var valIsNull C.int |
| 176 | var keyIsNull C.int |
| 177 | var valLen int |
| 178 | var keyLen int |
| 179 | |
| 180 | if msg.Value == nil { |
| 181 | valIsNull = 1 |
| 182 | valLen = 0 |
| 183 | valp = oneByte |
| 184 | } else { |
| 185 | valLen = len(msg.Value) |
| 186 | if valLen > 0 { |
| 187 | valp = msg.Value |
| 188 | } else { |
| 189 | valp = oneByte |
| 190 | } |
| 191 | } |
| 192 | |
| 193 | if msg.Key == nil { |
| 194 | keyIsNull = 1 |
| 195 | keyLen = 0 |
| 196 | keyp = oneByte |
| 197 | } else { |
| 198 | keyLen = len(msg.Key) |
| 199 | if keyLen > 0 { |
| 200 | keyp = msg.Key |
| 201 | } else { |
| 202 | keyp = oneByte |
| 203 | } |
| 204 | } |
| 205 | |
| 206 | var cgoid int |
| 207 | |
| 208 | // Per-message state that needs to be retained through the C code: |
| 209 | // delivery channel (if specified) |
| 210 | // message opaque (if specified) |
| 211 | // Since these cant be passed as opaque pointers to the C code, |
| 212 | // due to cgo constraints, we add them to a per-producer map for lookup |
| 213 | // when the C code triggers the callbacks or events. |
| 214 | if deliveryChan != nil || msg.Opaque != nil { |
| 215 | cgoid = p.handle.cgoPut(cgoDr{deliveryChan: deliveryChan, opaque: msg.Opaque}) |
| 216 | } |
| 217 | |
| 218 | var timestamp int64 |
| 219 | if !msg.Timestamp.IsZero() { |
| 220 | timestamp = msg.Timestamp.UnixNano() / 1000000 |
| 221 | } |
| 222 | |
| 223 | // Convert headers to C-friendly tmphdrs |
| 224 | var tmphdrs []C.tmphdr_t |
| 225 | tmphdrsCnt := len(msg.Headers) |
| 226 | |
| 227 | if tmphdrsCnt > 0 { |
| 228 | tmphdrs = make([]C.tmphdr_t, tmphdrsCnt) |
| 229 | |
| 230 | for n, hdr := range msg.Headers { |
| 231 | // Make a copy of the key |
| 232 | // to avoid runtime panic with |
| 233 | // foreign Go pointers in cgo. |
| 234 | tmphdrs[n].key = C.CString(hdr.Key) |
| 235 | if hdr.Value != nil { |
| 236 | tmphdrs[n].size = C.ssize_t(len(hdr.Value)) |
| 237 | if tmphdrs[n].size > 0 { |
| 238 | // Make a copy of the value |
| 239 | // to avoid runtime panic with |
| 240 | // foreign Go pointers in cgo. |
| 241 | tmphdrs[n].val = C.CBytes(hdr.Value) |
| 242 | } |
| 243 | } else { |
| 244 | // null value |
| 245 | tmphdrs[n].size = C.ssize_t(-1) |
| 246 | } |
| 247 | } |
| 248 | } else { |
| 249 | // no headers, need a dummy tmphdrs of size 1 to avoid index |
| 250 | // out of bounds panic in do_produce() call below. |
| 251 | // tmphdrsCnt will be 0. |
| 252 | tmphdrs = []C.tmphdr_t{{nil, nil, 0}} |
| 253 | } |
| 254 | |
| 255 | cErr := C.do_produce(p.handle.rk, crkt, |
| 256 | C.int32_t(msg.TopicPartition.Partition), |
| 257 | C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY, |
| 258 | valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen), |
| 259 | keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen), |
| 260 | C.int64_t(timestamp), |
| 261 | (*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt), |
| 262 | (C.uintptr_t)(cgoid)) |
| 263 | if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 264 | if cgoid != 0 { |
| 265 | p.handle.cgoGet(cgoid) |
| 266 | } |
| 267 | return newError(cErr) |
| 268 | } |
| 269 | |
| 270 | return nil |
| 271 | } |
| 272 | |
| 273 | // Produce single message. |
| 274 | // This is an asynchronous call that enqueues the message on the internal |
| 275 | // transmit queue, thus returning immediately. |
| 276 | // The delivery report will be sent on the provided deliveryChan if specified, |
| 277 | // or on the Producer object's Events() channel if not. |
| 278 | // msg.Timestamp requires librdkafka >= 0.9.4 (else returns ErrNotImplemented), |
| 279 | // api.version.request=true, and broker >= 0.10.0.0. |
| 280 | // msg.Headers requires librdkafka >= 0.11.4 (else returns ErrNotImplemented), |
| 281 | // api.version.request=true, and broker >= 0.11.0.0. |
| 282 | // Returns an error if message could not be enqueued. |
| 283 | func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error { |
| 284 | return p.produce(msg, 0, deliveryChan) |
| 285 | } |
| 286 | |
| 287 | // Produce a batch of messages. |
| 288 | // These batches do not relate to the message batches sent to the broker, the latter |
| 289 | // are collected on the fly internally in librdkafka. |
| 290 | // WARNING: This is an experimental API. |
| 291 | // NOTE: timestamps and headers are not supported with this API. |
| 292 | func (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) error { |
| 293 | crkt := p.handle.getRkt(topic) |
| 294 | |
| 295 | cmsgs := make([]C.rd_kafka_message_t, len(msgs)) |
| 296 | for i, m := range msgs { |
| 297 | p.handle.messageToC(m, &cmsgs[i]) |
| 298 | } |
| 299 | r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE, |
| 300 | (*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs))) |
| 301 | if r == -1 { |
| 302 | return newError(C.rd_kafka_last_error()) |
| 303 | } |
| 304 | |
| 305 | return nil |
| 306 | } |
| 307 | |
| 308 | // Events returns the Events channel (read) |
| 309 | func (p *Producer) Events() chan Event { |
| 310 | return p.events |
| 311 | } |
| 312 | |
| 313 | // ProduceChannel returns the produce *Message channel (write) |
| 314 | func (p *Producer) ProduceChannel() chan *Message { |
| 315 | return p.produceChannel |
| 316 | } |
| 317 | |
| 318 | // Len returns the number of messages and requests waiting to be transmitted to the broker |
| 319 | // as well as delivery reports queued for the application. |
| 320 | // Includes messages on ProduceChannel. |
| 321 | func (p *Producer) Len() int { |
| 322 | return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk)) |
| 323 | } |
| 324 | |
| 325 | // Flush and wait for outstanding messages and requests to complete delivery. |
| 326 | // Includes messages on ProduceChannel. |
| 327 | // Runs until value reaches zero or on timeoutMs. |
| 328 | // Returns the number of outstanding events still un-flushed. |
| 329 | func (p *Producer) Flush(timeoutMs int) int { |
| 330 | termChan := make(chan bool) // unused stand-in termChan |
| 331 | |
| 332 | d, _ := time.ParseDuration(fmt.Sprintf("%dms", timeoutMs)) |
| 333 | tEnd := time.Now().Add(d) |
| 334 | for p.Len() > 0 { |
| 335 | remain := tEnd.Sub(time.Now()).Seconds() |
| 336 | if remain <= 0.0 { |
| 337 | return p.Len() |
| 338 | } |
| 339 | |
| 340 | p.handle.eventPoll(p.events, |
| 341 | int(math.Min(100, remain*1000)), 1000, termChan) |
| 342 | } |
| 343 | |
| 344 | return 0 |
| 345 | } |
| 346 | |
| 347 | // Close a Producer instance. |
| 348 | // The Producer object or its channels are no longer usable after this call. |
| 349 | func (p *Producer) Close() { |
| 350 | // Wait for poller() (signaled by closing pollerTermChan) |
| 351 | // and channel_producer() (signaled by closing ProduceChannel) |
| 352 | close(p.pollerTermChan) |
| 353 | close(p.produceChannel) |
| 354 | p.handle.waitTerminated(2) |
| 355 | |
| 356 | close(p.events) |
| 357 | |
| 358 | p.handle.cleanup() |
| 359 | |
| 360 | C.rd_kafka_destroy(p.handle.rk) |
| 361 | } |
| 362 | |
| 363 | // NewProducer creates a new high-level Producer instance. |
| 364 | // |
| 365 | // conf is a *ConfigMap with standard librdkafka configuration properties, see here: |
| 366 | // |
| 367 | // |
| 368 | // |
| 369 | // |
| 370 | // |
| 371 | // Supported special configuration properties: |
| 372 | // go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance). |
| 373 | // These batches do not relate to Kafka message batches in any way. |
| 374 | // Note: timestamps and headers are not supported with this interface. |
| 375 | // go.delivery.reports (bool, true) - Forward per-message delivery reports to the |
| 376 | // Events() channel. |
| 377 | // go.events.channel.size (int, 1000000) - Events() channel size |
| 378 | // go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages) |
| 379 | // |
| 380 | func NewProducer(conf *ConfigMap) (*Producer, error) { |
| 381 | |
| 382 | err := versionCheck() |
| 383 | if err != nil { |
| 384 | return nil, err |
| 385 | } |
| 386 | |
| 387 | p := &Producer{} |
| 388 | |
| 389 | // before we do anything with the configuration, create a copy such that |
| 390 | // the original is not mutated. |
| 391 | confCopy := conf.clone() |
| 392 | |
| 393 | v, err := confCopy.extract("go.batch.producer", false) |
| 394 | if err != nil { |
| 395 | return nil, err |
| 396 | } |
| 397 | batchProducer := v.(bool) |
| 398 | |
| 399 | v, err = confCopy.extract("go.delivery.reports", true) |
| 400 | if err != nil { |
| 401 | return nil, err |
| 402 | } |
| 403 | p.handle.fwdDr = v.(bool) |
| 404 | |
| 405 | v, err = confCopy.extract("go.events.channel.size", 1000000) |
| 406 | if err != nil { |
| 407 | return nil, err |
| 408 | } |
| 409 | eventsChanSize := v.(int) |
| 410 | |
| 411 | v, err = confCopy.extract("go.produce.channel.size", 1000000) |
| 412 | if err != nil { |
| 413 | return nil, err |
| 414 | } |
| 415 | produceChannelSize := v.(int) |
| 416 | |
| 417 | if int(C.rd_kafka_version()) < 0x01000000 { |
| 418 | // produce.offset.report is no longer used in librdkafka >= v1.0.0 |
| 419 | v, _ = confCopy.extract("{topic}.produce.offset.report", nil) |
| 420 | if v == nil { |
| 421 | // Enable offset reporting by default, unless overriden. |
| 422 | confCopy.SetKey("{topic}.produce.offset.report", true) |
| 423 | } |
| 424 | } |
| 425 | |
| 426 | // Convert ConfigMap to librdkafka conf_t |
| 427 | cConf, err := confCopy.convert() |
| 428 | if err != nil { |
| 429 | return nil, err |
| 430 | } |
| 431 | |
| 432 | cErrstr := (*C.char)(C.malloc(C.size_t(256))) |
| 433 | defer C.free(unsafe.Pointer(cErrstr)) |
| 434 | |
| 435 | C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR) |
| 436 | |
| 437 | // Create librdkafka producer instance |
| 438 | p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256) |
| 439 | if p.handle.rk == nil { |
| 440 | return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr) |
| 441 | } |
| 442 | |
| 443 | p.handle.p = p |
| 444 | p.handle.setup() |
| 445 | p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk) |
| 446 | p.events = make(chan Event, eventsChanSize) |
| 447 | p.produceChannel = make(chan *Message, produceChannelSize) |
| 448 | p.pollerTermChan = make(chan bool) |
| 449 | |
| 450 | go poller(p, p.pollerTermChan) |
| 451 | |
| 452 | // non-batch or batch producer, only one must be used |
| 453 | if batchProducer { |
| 454 | go channelBatchProducer(p) |
| 455 | } else { |
| 456 | go channelProducer(p) |
| 457 | } |
| 458 | |
| 459 | return p, nil |
| 460 | } |
| 461 | |
| 462 | // channel_producer serves the ProduceChannel channel |
| 463 | func channelProducer(p *Producer) { |
| 464 | |
| 465 | for m := range p.produceChannel { |
| 466 | err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil) |
| 467 | if err != nil { |
| 468 | m.TopicPartition.Error = err |
| 469 | p.events <- m |
| 470 | } |
| 471 | } |
| 472 | |
| 473 | p.handle.terminatedChan <- "channelProducer" |
| 474 | } |
| 475 | |
| 476 | // channelBatchProducer serves the ProduceChannel channel and attempts to |
| 477 | // improve cgo performance by using the produceBatch() interface. |
| 478 | func channelBatchProducer(p *Producer) { |
| 479 | var buffered = make(map[string][]*Message) |
| 480 | bufferedCnt := 0 |
| 481 | const batchSize int = 1000000 |
| 482 | totMsgCnt := 0 |
| 483 | totBatchCnt := 0 |
| 484 | |
| 485 | for m := range p.produceChannel { |
| 486 | buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m) |
| 487 | bufferedCnt++ |
| 488 | |
| 489 | loop2: |
| 490 | for true { |
| 491 | select { |
| 492 | case m, ok := <-p.produceChannel: |
| 493 | if !ok { |
| 494 | break loop2 |
| 495 | } |
| 496 | if m == nil { |
| 497 | panic("nil message received on ProduceChannel") |
| 498 | } |
| 499 | if m.TopicPartition.Topic == nil { |
| 500 | panic(fmt.Sprintf("message without Topic received on ProduceChannel: %v", m)) |
| 501 | } |
| 502 | buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m) |
| 503 | bufferedCnt++ |
| 504 | if bufferedCnt >= batchSize { |
| 505 | break loop2 |
| 506 | } |
| 507 | default: |
| 508 | break loop2 |
| 509 | } |
| 510 | } |
| 511 | |
| 512 | totBatchCnt++ |
| 513 | totMsgCnt += len(buffered) |
| 514 | |
| 515 | for topic, buffered2 := range buffered { |
| 516 | err := p.produceBatch(topic, buffered2, C.RD_KAFKA_MSG_F_BLOCK) |
| 517 | if err != nil { |
| 518 | for _, m = range buffered2 { |
| 519 | m.TopicPartition.Error = err |
| 520 | p.events <- m |
| 521 | } |
| 522 | } |
| 523 | } |
| 524 | |
| 525 | buffered = make(map[string][]*Message) |
| 526 | bufferedCnt = 0 |
| 527 | } |
| 528 | p.handle.terminatedChan <- "channelBatchProducer" |
| 529 | } |
| 530 | |
| 531 | // poller polls the rd_kafka_t handle for events until signalled for termination |
| 532 | func poller(p *Producer, termChan chan bool) { |
| 533 | out: |
| 534 | for true { |
| 535 | select { |
| 536 | case _ = <-termChan: |
| 537 | break out |
| 538 | |
| 539 | default: |
| 540 | _, term := p.handle.eventPoll(p.events, 100, 1000, termChan) |
| 541 | if term { |
| 542 | break out |
| 543 | } |
| 544 | break |
| 545 | } |
| 546 | } |
| 547 | |
| 548 | p.handle.terminatedChan <- "poller" |
| 549 | |
| 550 | } |
| 551 | |
| 552 | // GetMetadata queries broker for cluster and topic metadata. |
| 553 | // If topic is non-nil only information about that topic is returned, else if |
| 554 | // allTopics is false only information about locally used topics is returned, |
| 555 | // else information about all topics is returned. |
| 556 | // GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API. |
| 557 | func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) { |
| 558 | return getMetadata(p, topic, allTopics, timeoutMs) |
| 559 | } |
| 560 | |
| 561 | // QueryWatermarkOffsets returns the broker's low and high offsets for the given topic |
| 562 | // and partition. |
| 563 | func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) { |
| 564 | return queryWatermarkOffsets(p, topic, partition, timeoutMs) |
| 565 | } |
| 566 | |
| 567 | // OffsetsForTimes looks up offsets by timestamp for the given partitions. |
| 568 | // |
| 569 | // The returned offset for each partition is the earliest offset whose |
| 570 | // timestamp is greater than or equal to the given timestamp in the |
| 571 | // corresponding partition. |
| 572 | // |
| 573 | // The timestamps to query are represented as `.Offset` in the `times` |
| 574 | // argument and the looked up offsets are represented as `.Offset` in the returned |
| 575 | // `offsets` list. |
| 576 | // |
| 577 | // The function will block for at most timeoutMs milliseconds. |
| 578 | // |
| 579 | // Duplicate Topic+Partitions are not supported. |
| 580 | // Per-partition errors may be returned in the `.Error` field. |
| 581 | func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) { |
| 582 | return offsetsForTimes(p, times, timeoutMs) |
| 583 | } |