khenaidoo | ac63710 | 2019-01-14 15:44:34 -0500 | [diff] [blame^] | 1 | package kafka |
| 2 | |
| 3 | /** |
| 4 | * Copyright 2016 Confluent Inc. |
| 5 | * |
| 6 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 7 | * you may not use this file except in compliance with the License. |
| 8 | * You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, software |
| 13 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | * See the License for the specific language governing permissions and |
| 16 | * limitations under the License. |
| 17 | */ |
| 18 | |
| 19 | import ( |
| 20 | "fmt" |
| 21 | "math" |
| 22 | "time" |
| 23 | "unsafe" |
| 24 | ) |
| 25 | |
| 26 | /* |
| 27 | #include <stdlib.h> |
| 28 | #include <librdkafka/rdkafka.h> |
| 29 | |
| 30 | |
| 31 | static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) { |
| 32 | return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL; |
| 33 | } |
| 34 | */ |
| 35 | import "C" |
| 36 | |
| 37 | // RebalanceCb provides a per-Subscribe*() rebalance event callback. |
| 38 | // The passed Event will be either AssignedPartitions or RevokedPartitions |
| 39 | type RebalanceCb func(*Consumer, Event) error |
| 40 | |
| 41 | // Consumer implements a High-level Apache Kafka Consumer instance |
| 42 | type Consumer struct { |
| 43 | events chan Event |
| 44 | handle handle |
| 45 | eventsChanEnable bool |
| 46 | readerTermChan chan bool |
| 47 | rebalanceCb RebalanceCb |
| 48 | appReassigned bool |
| 49 | appRebalanceEnable bool // config setting |
| 50 | } |
| 51 | |
| 52 | // Strings returns a human readable name for a Consumer instance |
| 53 | func (c *Consumer) String() string { |
| 54 | return c.handle.String() |
| 55 | } |
| 56 | |
| 57 | // getHandle implements the Handle interface |
| 58 | func (c *Consumer) gethandle() *handle { |
| 59 | return &c.handle |
| 60 | } |
| 61 | |
| 62 | // Subscribe to a single topic |
| 63 | // This replaces the current subscription |
| 64 | func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error { |
| 65 | return c.SubscribeTopics([]string{topic}, rebalanceCb) |
| 66 | } |
| 67 | |
| 68 | // SubscribeTopics subscribes to the provided list of topics. |
| 69 | // This replaces the current subscription. |
| 70 | func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) { |
| 71 | ctopics := C.rd_kafka_topic_partition_list_new(C.int(len(topics))) |
| 72 | defer C.rd_kafka_topic_partition_list_destroy(ctopics) |
| 73 | |
| 74 | for _, topic := range topics { |
| 75 | ctopic := C.CString(topic) |
| 76 | defer C.free(unsafe.Pointer(ctopic)) |
| 77 | C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA) |
| 78 | } |
| 79 | |
| 80 | e := C.rd_kafka_subscribe(c.handle.rk, ctopics) |
| 81 | if e != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 82 | return newError(e) |
| 83 | } |
| 84 | |
| 85 | c.rebalanceCb = rebalanceCb |
| 86 | c.handle.currAppRebalanceEnable = c.rebalanceCb != nil || c.appRebalanceEnable |
| 87 | |
| 88 | return nil |
| 89 | } |
| 90 | |
| 91 | // Unsubscribe from the current subscription, if any. |
| 92 | func (c *Consumer) Unsubscribe() (err error) { |
| 93 | C.rd_kafka_unsubscribe(c.handle.rk) |
| 94 | return nil |
| 95 | } |
| 96 | |
| 97 | // Assign an atomic set of partitions to consume. |
| 98 | // This replaces the current assignment. |
| 99 | func (c *Consumer) Assign(partitions []TopicPartition) (err error) { |
| 100 | c.appReassigned = true |
| 101 | |
| 102 | cparts := newCPartsFromTopicPartitions(partitions) |
| 103 | defer C.rd_kafka_topic_partition_list_destroy(cparts) |
| 104 | |
| 105 | e := C.rd_kafka_assign(c.handle.rk, cparts) |
| 106 | if e != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 107 | return newError(e) |
| 108 | } |
| 109 | |
| 110 | return nil |
| 111 | } |
| 112 | |
| 113 | // Unassign the current set of partitions to consume. |
| 114 | func (c *Consumer) Unassign() (err error) { |
| 115 | c.appReassigned = true |
| 116 | |
| 117 | e := C.rd_kafka_assign(c.handle.rk, nil) |
| 118 | if e != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 119 | return newError(e) |
| 120 | } |
| 121 | |
| 122 | return nil |
| 123 | } |
| 124 | |
| 125 | // commit offsets for specified offsets. |
| 126 | // If offsets is nil the currently assigned partitions' offsets are committed. |
| 127 | // This is a blocking call, caller will need to wrap in go-routine to |
| 128 | // get async or throw-away behaviour. |
| 129 | func (c *Consumer) commit(offsets []TopicPartition) (committedOffsets []TopicPartition, err error) { |
| 130 | var rkqu *C.rd_kafka_queue_t |
| 131 | |
| 132 | rkqu = C.rd_kafka_queue_new(c.handle.rk) |
| 133 | defer C.rd_kafka_queue_destroy(rkqu) |
| 134 | |
| 135 | var coffsets *C.rd_kafka_topic_partition_list_t |
| 136 | if offsets != nil { |
| 137 | coffsets = newCPartsFromTopicPartitions(offsets) |
| 138 | defer C.rd_kafka_topic_partition_list_destroy(coffsets) |
| 139 | } |
| 140 | |
| 141 | cErr := C.rd_kafka_commit_queue(c.handle.rk, coffsets, rkqu, nil, nil) |
| 142 | if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 143 | return nil, newError(cErr) |
| 144 | } |
| 145 | |
| 146 | rkev := C.rd_kafka_queue_poll(rkqu, C.int(-1)) |
| 147 | if rkev == nil { |
| 148 | // shouldn't happen |
| 149 | return nil, newError(C.RD_KAFKA_RESP_ERR__DESTROY) |
| 150 | } |
| 151 | defer C.rd_kafka_event_destroy(rkev) |
| 152 | |
| 153 | if C.rd_kafka_event_type(rkev) != C.RD_KAFKA_EVENT_OFFSET_COMMIT { |
| 154 | panic(fmt.Sprintf("Expected OFFSET_COMMIT, got %s", |
| 155 | C.GoString(C.rd_kafka_event_name(rkev)))) |
| 156 | } |
| 157 | |
| 158 | cErr = C.rd_kafka_event_error(rkev) |
| 159 | if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 160 | return nil, newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev)) |
| 161 | } |
| 162 | |
| 163 | cRetoffsets := C.rd_kafka_event_topic_partition_list(rkev) |
| 164 | if cRetoffsets == nil { |
| 165 | // no offsets, no error |
| 166 | return nil, nil |
| 167 | } |
| 168 | committedOffsets = newTopicPartitionsFromCparts(cRetoffsets) |
| 169 | |
| 170 | return committedOffsets, nil |
| 171 | } |
| 172 | |
| 173 | // Commit offsets for currently assigned partitions |
| 174 | // This is a blocking call. |
| 175 | // Returns the committed offsets on success. |
| 176 | func (c *Consumer) Commit() ([]TopicPartition, error) { |
| 177 | return c.commit(nil) |
| 178 | } |
| 179 | |
| 180 | // CommitMessage commits offset based on the provided message. |
| 181 | // This is a blocking call. |
| 182 | // Returns the committed offsets on success. |
| 183 | func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) { |
| 184 | if m.TopicPartition.Error != nil { |
| 185 | return nil, Error{ErrInvalidArg, "Can't commit errored message"} |
| 186 | } |
| 187 | offsets := []TopicPartition{m.TopicPartition} |
| 188 | offsets[0].Offset++ |
| 189 | return c.commit(offsets) |
| 190 | } |
| 191 | |
| 192 | // CommitOffsets commits the provided list of offsets |
| 193 | // This is a blocking call. |
| 194 | // Returns the committed offsets on success. |
| 195 | func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) { |
| 196 | return c.commit(offsets) |
| 197 | } |
| 198 | |
| 199 | // StoreOffsets stores the provided list of offsets that will be committed |
| 200 | // to the offset store according to `auto.commit.interval.ms` or manual |
| 201 | // offset-less Commit(). |
| 202 | // |
| 203 | // Returns the stored offsets on success. If at least one offset couldn't be stored, |
| 204 | // an error and a list of offsets is returned. Each offset can be checked for |
| 205 | // specific errors via its `.Error` member. |
| 206 | func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error) { |
| 207 | coffsets := newCPartsFromTopicPartitions(offsets) |
| 208 | defer C.rd_kafka_topic_partition_list_destroy(coffsets) |
| 209 | |
| 210 | cErr := C.rd_kafka_offsets_store(c.handle.rk, coffsets) |
| 211 | |
| 212 | // coffsets might be annotated with an error |
| 213 | storedOffsets = newTopicPartitionsFromCparts(coffsets) |
| 214 | |
| 215 | if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 216 | return storedOffsets, newError(cErr) |
| 217 | } |
| 218 | |
| 219 | return storedOffsets, nil |
| 220 | } |
| 221 | |
| 222 | // Seek seeks the given topic partitions using the offset from the TopicPartition. |
| 223 | // |
| 224 | // If timeoutMs is not 0 the call will wait this long for the |
| 225 | // seek to be performed. If the timeout is reached the internal state |
| 226 | // will be unknown and this function returns ErrTimedOut. |
| 227 | // If timeoutMs is 0 it will initiate the seek but return |
| 228 | // immediately without any error reporting (e.g., async). |
| 229 | // |
| 230 | // Seek() may only be used for partitions already being consumed |
| 231 | // (through Assign() or implicitly through a self-rebalanced Subscribe()). |
| 232 | // To set the starting offset it is preferred to use Assign() and provide |
| 233 | // a starting offset for each partition. |
| 234 | // |
| 235 | // Returns an error on failure or nil otherwise. |
| 236 | func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error { |
| 237 | rkt := c.handle.getRkt(*partition.Topic) |
| 238 | cErr := C.rd_kafka_seek(rkt, |
| 239 | C.int32_t(partition.Partition), |
| 240 | C.int64_t(partition.Offset), |
| 241 | C.int(timeoutMs)) |
| 242 | if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 243 | return newError(cErr) |
| 244 | } |
| 245 | return nil |
| 246 | } |
| 247 | |
| 248 | // Poll the consumer for messages or events. |
| 249 | // |
| 250 | // Will block for at most timeoutMs milliseconds |
| 251 | // |
| 252 | // The following callbacks may be triggered: |
| 253 | // Subscribe()'s rebalanceCb |
| 254 | // |
| 255 | // Returns nil on timeout, else an Event |
| 256 | func (c *Consumer) Poll(timeoutMs int) (event Event) { |
| 257 | ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil) |
| 258 | return ev |
| 259 | } |
| 260 | |
| 261 | // Events returns the Events channel (if enabled) |
| 262 | func (c *Consumer) Events() chan Event { |
| 263 | return c.events |
| 264 | } |
| 265 | |
| 266 | // ReadMessage polls the consumer for a message. |
| 267 | // |
| 268 | // This is a conveniance API that wraps Poll() and only returns |
| 269 | // messages or errors. All other event types are discarded. |
| 270 | // |
| 271 | // The call will block for at most `timeout` waiting for |
| 272 | // a new message or error. `timeout` may be set to -1 for |
| 273 | // indefinite wait. |
| 274 | // |
| 275 | // Timeout is returned as (nil, err) where err is `kafka.(Error).Code == Kafka.ErrTimedOut`. |
| 276 | // |
| 277 | // Messages are returned as (msg, nil), |
| 278 | // while general errors are returned as (nil, err), |
| 279 | // and partition-specific errors are returned as (msg, err) where |
| 280 | // msg.TopicPartition provides partition-specific information (such as topic, partition and offset). |
| 281 | // |
| 282 | // All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded. |
| 283 | // |
| 284 | func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) { |
| 285 | |
| 286 | var absTimeout time.Time |
| 287 | var timeoutMs int |
| 288 | |
| 289 | if timeout > 0 { |
| 290 | absTimeout = time.Now().Add(timeout) |
| 291 | timeoutMs = (int)(timeout.Seconds() * 1000.0) |
| 292 | } else { |
| 293 | timeoutMs = (int)(timeout) |
| 294 | } |
| 295 | |
| 296 | for { |
| 297 | ev := c.Poll(timeoutMs) |
| 298 | |
| 299 | switch e := ev.(type) { |
| 300 | case *Message: |
| 301 | if e.TopicPartition.Error != nil { |
| 302 | return e, e.TopicPartition.Error |
| 303 | } |
| 304 | return e, nil |
| 305 | case Error: |
| 306 | return nil, e |
| 307 | default: |
| 308 | // Ignore other event types |
| 309 | } |
| 310 | |
| 311 | if timeout > 0 { |
| 312 | // Calculate remaining time |
| 313 | timeoutMs = int(math.Max(0.0, absTimeout.Sub(time.Now()).Seconds()*1000.0)) |
| 314 | } |
| 315 | |
| 316 | if timeoutMs == 0 && ev == nil { |
| 317 | return nil, newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT) |
| 318 | } |
| 319 | |
| 320 | } |
| 321 | |
| 322 | } |
| 323 | |
| 324 | // Close Consumer instance. |
| 325 | // The object is no longer usable after this call. |
| 326 | func (c *Consumer) Close() (err error) { |
| 327 | |
| 328 | if c.eventsChanEnable { |
| 329 | // Wait for consumerReader() to terminate (by closing readerTermChan) |
| 330 | close(c.readerTermChan) |
| 331 | c.handle.waitTerminated(1) |
| 332 | close(c.events) |
| 333 | } |
| 334 | |
| 335 | C.rd_kafka_queue_destroy(c.handle.rkq) |
| 336 | c.handle.rkq = nil |
| 337 | |
| 338 | e := C.rd_kafka_consumer_close(c.handle.rk) |
| 339 | if e != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 340 | return newError(e) |
| 341 | } |
| 342 | |
| 343 | c.handle.cleanup() |
| 344 | |
| 345 | C.rd_kafka_destroy(c.handle.rk) |
| 346 | |
| 347 | return nil |
| 348 | } |
| 349 | |
| 350 | // NewConsumer creates a new high-level Consumer instance. |
| 351 | // |
| 352 | // Supported special configuration properties: |
| 353 | // go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel. |
| 354 | // If set to true the app must handle the AssignedPartitions and |
| 355 | // RevokedPartitions events and call Assign() and Unassign() |
| 356 | // respectively. |
| 357 | // go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental) |
| 358 | // go.events.channel.size (int, 1000) - Events() channel size |
| 359 | // |
| 360 | // WARNING: Due to the buffering nature of channels (and queues in general) the |
| 361 | // use of the events channel risks receiving outdated events and |
| 362 | // messages. Minimizing go.events.channel.size reduces the risk |
| 363 | // and number of outdated events and messages but does not eliminate |
| 364 | // the factor completely. With a channel size of 1 at most one |
| 365 | // event or message may be outdated. |
| 366 | func NewConsumer(conf *ConfigMap) (*Consumer, error) { |
| 367 | |
| 368 | err := versionCheck() |
| 369 | if err != nil { |
| 370 | return nil, err |
| 371 | } |
| 372 | |
| 373 | // before we do anything with the configuration, create a copy such that |
| 374 | // the original is not mutated. |
| 375 | confCopy := conf.clone() |
| 376 | |
| 377 | groupid, _ := confCopy.get("group.id", nil) |
| 378 | if groupid == nil { |
| 379 | // without a group.id the underlying cgrp subsystem in librdkafka wont get started |
| 380 | // and without it there is no way to consume assigned partitions. |
| 381 | // So for now require the group.id, this might change in the future. |
| 382 | return nil, newErrorFromString(ErrInvalidArg, "Required property group.id not set") |
| 383 | } |
| 384 | |
| 385 | c := &Consumer{} |
| 386 | |
| 387 | v, err := confCopy.extract("go.application.rebalance.enable", false) |
| 388 | if err != nil { |
| 389 | return nil, err |
| 390 | } |
| 391 | c.appRebalanceEnable = v.(bool) |
| 392 | |
| 393 | v, err = confCopy.extract("go.events.channel.enable", false) |
| 394 | if err != nil { |
| 395 | return nil, err |
| 396 | } |
| 397 | c.eventsChanEnable = v.(bool) |
| 398 | |
| 399 | v, err = confCopy.extract("go.events.channel.size", 1000) |
| 400 | if err != nil { |
| 401 | return nil, err |
| 402 | } |
| 403 | eventsChanSize := v.(int) |
| 404 | |
| 405 | cConf, err := confCopy.convert() |
| 406 | if err != nil { |
| 407 | return nil, err |
| 408 | } |
| 409 | cErrstr := (*C.char)(C.malloc(C.size_t(256))) |
| 410 | defer C.free(unsafe.Pointer(cErrstr)) |
| 411 | |
| 412 | C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR) |
| 413 | |
| 414 | c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256) |
| 415 | if c.handle.rk == nil { |
| 416 | return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr) |
| 417 | } |
| 418 | |
| 419 | C.rd_kafka_poll_set_consumer(c.handle.rk) |
| 420 | |
| 421 | c.handle.c = c |
| 422 | c.handle.setup() |
| 423 | c.handle.rkq = C.rd_kafka_queue_get_consumer(c.handle.rk) |
| 424 | if c.handle.rkq == nil { |
| 425 | // no cgrp (no group.id configured), revert to main queue. |
| 426 | c.handle.rkq = C.rd_kafka_queue_get_main(c.handle.rk) |
| 427 | } |
| 428 | |
| 429 | if c.eventsChanEnable { |
| 430 | c.events = make(chan Event, eventsChanSize) |
| 431 | c.readerTermChan = make(chan bool) |
| 432 | |
| 433 | /* Start rdkafka consumer queue reader -> events writer goroutine */ |
| 434 | go consumerReader(c, c.readerTermChan) |
| 435 | } |
| 436 | |
| 437 | return c, nil |
| 438 | } |
| 439 | |
| 440 | // rebalance calls the application's rebalance callback, if any. |
| 441 | // Returns true if the underlying assignment was updated, else false. |
| 442 | func (c *Consumer) rebalance(ev Event) bool { |
| 443 | c.appReassigned = false |
| 444 | |
| 445 | if c.rebalanceCb != nil { |
| 446 | c.rebalanceCb(c, ev) |
| 447 | } |
| 448 | |
| 449 | return c.appReassigned |
| 450 | } |
| 451 | |
| 452 | // consumerReader reads messages and events from the librdkafka consumer queue |
| 453 | // and posts them on the consumer channel. |
| 454 | // Runs until termChan closes |
| 455 | func consumerReader(c *Consumer, termChan chan bool) { |
| 456 | |
| 457 | out: |
| 458 | for true { |
| 459 | select { |
| 460 | case _ = <-termChan: |
| 461 | break out |
| 462 | default: |
| 463 | _, term := c.handle.eventPoll(c.events, 100, 1000, termChan) |
| 464 | if term { |
| 465 | break out |
| 466 | } |
| 467 | |
| 468 | } |
| 469 | } |
| 470 | |
| 471 | c.handle.terminatedChan <- "consumerReader" |
| 472 | return |
| 473 | |
| 474 | } |
| 475 | |
| 476 | // GetMetadata queries broker for cluster and topic metadata. |
| 477 | // If topic is non-nil only information about that topic is returned, else if |
| 478 | // allTopics is false only information about locally used topics is returned, |
| 479 | // else information about all topics is returned. |
| 480 | // GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API. |
| 481 | func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) { |
| 482 | return getMetadata(c, topic, allTopics, timeoutMs) |
| 483 | } |
| 484 | |
| 485 | // QueryWatermarkOffsets returns the broker's low and high offsets for the given topic |
| 486 | // and partition. |
| 487 | func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) { |
| 488 | return queryWatermarkOffsets(c, topic, partition, timeoutMs) |
| 489 | } |
| 490 | |
| 491 | // OffsetsForTimes looks up offsets by timestamp for the given partitions. |
| 492 | // |
| 493 | // The returned offset for each partition is the earliest offset whose |
| 494 | // timestamp is greater than or equal to the given timestamp in the |
| 495 | // corresponding partition. |
| 496 | // |
| 497 | // The timestamps to query are represented as `.Offset` in the `times` |
| 498 | // argument and the looked up offsets are represented as `.Offset` in the returned |
| 499 | // `offsets` list. |
| 500 | // |
| 501 | // The function will block for at most timeoutMs milliseconds. |
| 502 | // |
| 503 | // Duplicate Topic+Partitions are not supported. |
| 504 | // Per-partition errors may be returned in the `.Error` field. |
| 505 | func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) { |
| 506 | return offsetsForTimes(c, times, timeoutMs) |
| 507 | } |
| 508 | |
| 509 | // Subscription returns the current subscription as set by Subscribe() |
| 510 | func (c *Consumer) Subscription() (topics []string, err error) { |
| 511 | var cTopics *C.rd_kafka_topic_partition_list_t |
| 512 | |
| 513 | cErr := C.rd_kafka_subscription(c.handle.rk, &cTopics) |
| 514 | if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 515 | return nil, newError(cErr) |
| 516 | } |
| 517 | defer C.rd_kafka_topic_partition_list_destroy(cTopics) |
| 518 | |
| 519 | topicCnt := int(cTopics.cnt) |
| 520 | topics = make([]string, topicCnt) |
| 521 | for i := 0; i < topicCnt; i++ { |
| 522 | crktpar := C._c_rdkafka_topic_partition_list_entry(cTopics, |
| 523 | C.int(i)) |
| 524 | topics[i] = C.GoString(crktpar.topic) |
| 525 | } |
| 526 | |
| 527 | return topics, nil |
| 528 | } |
| 529 | |
| 530 | // Assignment returns the current partition assignments |
| 531 | func (c *Consumer) Assignment() (partitions []TopicPartition, err error) { |
| 532 | var cParts *C.rd_kafka_topic_partition_list_t |
| 533 | |
| 534 | cErr := C.rd_kafka_assignment(c.handle.rk, &cParts) |
| 535 | if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 536 | return nil, newError(cErr) |
| 537 | } |
| 538 | defer C.rd_kafka_topic_partition_list_destroy(cParts) |
| 539 | |
| 540 | partitions = newTopicPartitionsFromCparts(cParts) |
| 541 | |
| 542 | return partitions, nil |
| 543 | } |
| 544 | |
| 545 | // Committed retrieves committed offsets for the given set of partitions |
| 546 | func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) { |
| 547 | cparts := newCPartsFromTopicPartitions(partitions) |
| 548 | defer C.rd_kafka_topic_partition_list_destroy(cparts) |
| 549 | cerr := C.rd_kafka_committed(c.handle.rk, cparts, C.int(timeoutMs)) |
| 550 | if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 551 | return nil, newError(cerr) |
| 552 | } |
| 553 | |
| 554 | return newTopicPartitionsFromCparts(cparts), nil |
| 555 | } |
| 556 | |
| 557 | // Pause consumption for the provided list of partitions |
| 558 | // |
| 559 | // Note that messages already enqueued on the consumer's Event channel |
| 560 | // (if `go.events.channel.enable` has been set) will NOT be purged by |
| 561 | // this call, set `go.events.channel.size` accordingly. |
| 562 | func (c *Consumer) Pause(partitions []TopicPartition) (err error) { |
| 563 | cparts := newCPartsFromTopicPartitions(partitions) |
| 564 | defer C.rd_kafka_topic_partition_list_destroy(cparts) |
| 565 | cerr := C.rd_kafka_pause_partitions(c.handle.rk, cparts) |
| 566 | if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 567 | return newError(cerr) |
| 568 | } |
| 569 | return nil |
| 570 | } |
| 571 | |
| 572 | // Resume consumption for the provided list of partitions |
| 573 | func (c *Consumer) Resume(partitions []TopicPartition) (err error) { |
| 574 | cparts := newCPartsFromTopicPartitions(partitions) |
| 575 | defer C.rd_kafka_topic_partition_list_destroy(cparts) |
| 576 | cerr := C.rd_kafka_resume_partitions(c.handle.rk, cparts) |
| 577 | if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR { |
| 578 | return newError(cerr) |
| 579 | } |
| 580 | return nil |
| 581 | } |