[VOL-1386] This commit add "dep" as the package management tool
for voltha-go.
Change-Id: I52bc4911dd00a441756ec7c30f46d45091f3f90e
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/LICENSE b/vendor/github.com/confluentinc/confluent-kafka-go/LICENSE
new file mode 100644
index 0000000..e06d208
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/LICENSE
@@ -0,0 +1,202 @@
+Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/.gitignore b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/.gitignore
new file mode 100644
index 0000000..b1a2111
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/.gitignore
@@ -0,0 +1,2 @@
+testconf.json
+go_rdkafka_generr/go_rdkafka_generr
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/00version.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/00version.go
new file mode 100644
index 0000000..188dc03
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/00version.go
@@ -0,0 +1,58 @@
+package kafka
+
+/**
+ * Copyright 2016-2018 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "fmt"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+
+//Minimum required librdkafka version. This is checked both during
+//build-time and runtime.
+//Make sure to keep the MIN_RD_KAFKA_VERSION, MIN_VER_ERRSTR and #error
+//defines and strings in sync.
+//
+
+#define MIN_RD_KAFKA_VERSION 0x0000b0500
+
+#ifdef __APPLE__
+#define MIN_VER_ERRSTR "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
+#else
+#define MIN_VER_ERRSTR "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
+#endif
+
+#if RD_KAFKA_VERSION < MIN_RD_KAFKA_VERSION
+#ifdef __APPLE__
+#error "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
+#else
+#error "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
+#endif
+#endif
+*/
+import "C"
+
+func versionCheck() error {
+ ver, verstr := LibraryVersion()
+ if ver < C.MIN_RD_KAFKA_VERSION {
+ return newErrorFromString(ErrNotImplemented,
+ fmt.Sprintf("%s: librdkafka version %s (0x%x) detected",
+ C.MIN_VER_ERRSTR, verstr, ver))
+ }
+ return nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/README.md b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/README.md
new file mode 100644
index 0000000..6df4d54
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/README.md
@@ -0,0 +1,69 @@
+# Information for confluent-kafka-go developers
+
+Whenever librdkafka error codes are updated make sure to run generate before building:
+
+```
+ $ (cd go_rdkafka_generr && go install) && go generate
+ $ go build
+```
+
+
+
+
+## Testing
+
+Some of the tests included in this directory, the benchmark and integration tests in particular,
+require an existing Kafka cluster and a testconf.json configuration file to
+provide tests with bootstrap brokers, topic name, etc.
+
+The format of testconf.json is a JSON object:
+```
+{
+ "Brokers": "<bootstrap-brokers>",
+ "Topic": "<test-topic-name>"
+}
+```
+
+See testconf-example.json for an example and full set of available options.
+
+
+To run unit-tests:
+```
+$ go test
+```
+
+To run benchmark tests:
+```
+$ go test -bench .
+```
+
+For the code coverage:
+```
+$ go test -coverprofile=coverage.out -bench=.
+$ go tool cover -func=coverage.out
+```
+
+## Build tags (static linking)
+
+
+Different build types are supported through Go build tags (`-tags ..`),
+these tags should be specified on the **application** build command.
+
+ * `static` - Build with librdkafka linked statically (but librdkafka
+ dependencies linked dynamically).
+ * `static_all` - Build with all libraries linked statically.
+ * neither - Build with librdkafka (and its dependencies) linked dynamically.
+
+
+
+## Generating HTML documentation
+
+To generate one-page HTML documentation run the mk/doc-gen.py script from the
+top-level directory. This script requires the beautifulsoup4 Python package.
+
+```
+$ source .../your/virtualenv/bin/activate
+$ pip install beautifulsoup4
+...
+$ mk/doc-gen.py > kafka.html
+```
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminapi.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminapi.go
new file mode 100644
index 0000000..c2ba76c
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminapi.go
@@ -0,0 +1,942 @@
+/**
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "time"
+ "unsafe"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+#include <stdlib.h>
+
+static const rd_kafka_topic_result_t *
+topic_result_by_idx (const rd_kafka_topic_result_t **topics, size_t cnt, size_t idx) {
+ if (idx >= cnt)
+ return NULL;
+ return topics[idx];
+}
+
+static const rd_kafka_ConfigResource_t *
+ConfigResource_by_idx (const rd_kafka_ConfigResource_t **res, size_t cnt, size_t idx) {
+ if (idx >= cnt)
+ return NULL;
+ return res[idx];
+}
+
+static const rd_kafka_ConfigEntry_t *
+ConfigEntry_by_idx (const rd_kafka_ConfigEntry_t **entries, size_t cnt, size_t idx) {
+ if (idx >= cnt)
+ return NULL;
+ return entries[idx];
+}
+*/
+import "C"
+
+// AdminClient is derived from an existing Producer or Consumer
+type AdminClient struct {
+ handle *handle
+ isDerived bool // Derived from existing client handle
+}
+
+func durationToMilliseconds(t time.Duration) int {
+ if t > 0 {
+ return (int)(t.Seconds() * 1000.0)
+ }
+ return (int)(t)
+}
+
+// TopicResult provides per-topic operation result (error) information.
+type TopicResult struct {
+ // Topic name
+ Topic string
+ // Error, if any, of result. Check with `Error.Code() != ErrNoError`.
+ Error Error
+}
+
+// String returns a human-readable representation of a TopicResult.
+func (t TopicResult) String() string {
+ if t.Error.code == 0 {
+ return t.Topic
+ }
+ return fmt.Sprintf("%s (%s)", t.Topic, t.Error.str)
+}
+
+// TopicSpecification holds parameters for creating a new topic.
+// TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
+type TopicSpecification struct {
+ // Topic name to create.
+ Topic string
+ // Number of partitions in topic.
+ NumPartitions int
+ // Default replication factor for the topic's partitions, or zero
+ // if an explicit ReplicaAssignment is set.
+ ReplicationFactor int
+ // (Optional) Explicit replica assignment. The outer array is
+ // indexed by the partition number, while the inner per-partition array
+ // contains the replica broker ids. The first broker in each
+ // broker id list will be the preferred replica.
+ ReplicaAssignment [][]int32
+ // Topic configuration.
+ Config map[string]string
+}
+
+// PartitionsSpecification holds parameters for creating additional partitions for a topic.
+// PartitionsSpecification is analogous to NewPartitions in the Java Topic Admin API.
+type PartitionsSpecification struct {
+ // Topic to create more partitions for.
+ Topic string
+ // New partition count for topic, must be higher than current partition count.
+ IncreaseTo int
+ // (Optional) Explicit replica assignment. The outer array is
+ // indexed by the new partition index (i.e., 0 for the first added
+ // partition), while the inner per-partition array
+ // contains the replica broker ids. The first broker in each
+ // broker id list will be the preferred replica.
+ ReplicaAssignment [][]int32
+}
+
+// ResourceType represents an Apache Kafka resource type
+type ResourceType int
+
+const (
+ // ResourceUnknown - Unknown
+ ResourceUnknown = ResourceType(C.RD_KAFKA_RESOURCE_UNKNOWN)
+ // ResourceAny - match any resource type (DescribeConfigs)
+ ResourceAny = ResourceType(C.RD_KAFKA_RESOURCE_ANY)
+ // ResourceTopic - Topic
+ ResourceTopic = ResourceType(C.RD_KAFKA_RESOURCE_TOPIC)
+ // ResourceGroup - Group
+ ResourceGroup = ResourceType(C.RD_KAFKA_RESOURCE_GROUP)
+ // ResourceBroker - Broker
+ ResourceBroker = ResourceType(C.RD_KAFKA_RESOURCE_BROKER)
+)
+
+// String returns the human-readable representation of a ResourceType
+func (t ResourceType) String() string {
+ return C.GoString(C.rd_kafka_ResourceType_name(C.rd_kafka_ResourceType_t(t)))
+}
+
+// ResourceTypeFromString translates a resource type name/string to
+// a ResourceType value.
+func ResourceTypeFromString(typeString string) (ResourceType, error) {
+ switch strings.ToUpper(typeString) {
+ case "ANY":
+ return ResourceAny, nil
+ case "TOPIC":
+ return ResourceTopic, nil
+ case "GROUP":
+ return ResourceGroup, nil
+ case "BROKER":
+ return ResourceBroker, nil
+ default:
+ return ResourceUnknown, newGoError(ErrInvalidArg)
+ }
+}
+
+// ConfigSource represents an Apache Kafka config source
+type ConfigSource int
+
+const (
+ // ConfigSourceUnknown is the default value
+ ConfigSourceUnknown = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG)
+ // ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic
+ ConfigSourceDynamicTopic = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG)
+ // ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker
+ ConfigSourceDynamicBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG)
+ // ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster
+ ConfigSourceDynamicDefaultBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG)
+ // ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file)
+ ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG)
+ // ConfigSourceDefault is built-in default configuration for configs that have a default value
+ ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
+)
+
+// String returns the human-readable representation of a ConfigSource type
+func (t ConfigSource) String() string {
+ return C.GoString(C.rd_kafka_ConfigSource_name(C.rd_kafka_ConfigSource_t(t)))
+}
+
+// ConfigResource holds parameters for altering an Apache Kafka configuration resource
+type ConfigResource struct {
+ // Type of resource to set.
+ Type ResourceType
+ // Name of resource to set.
+ Name string
+ // Config entries to set.
+ // Configuration updates are atomic, any configuration property not provided
+ // here will be reverted (by the broker) to its default value.
+ // Use DescribeConfigs to retrieve the list of current configuration entry values.
+ Config []ConfigEntry
+}
+
+// String returns a human-readable representation of a ConfigResource
+func (c ConfigResource) String() string {
+ return fmt.Sprintf("Resource(%s, %s)", c.Type, c.Name)
+}
+
+// AlterOperation specifies the operation to perform on the ConfigEntry.
+// Currently only AlterOperationSet.
+type AlterOperation int
+
+const (
+ // AlterOperationSet sets/overwrites the configuration setting.
+ AlterOperationSet = iota
+)
+
+// String returns the human-readable representation of an AlterOperation
+func (o AlterOperation) String() string {
+ switch o {
+ case AlterOperationSet:
+ return "Set"
+ default:
+ return fmt.Sprintf("Unknown%d?", int(o))
+ }
+}
+
+// ConfigEntry holds parameters for altering a resource's configuration.
+type ConfigEntry struct {
+ // Name of configuration entry, e.g., topic configuration property name.
+ Name string
+ // Value of configuration entry.
+ Value string
+ // Operation to perform on the entry.
+ Operation AlterOperation
+}
+
+// StringMapToConfigEntries creates a new map of ConfigEntry objects from the
+// provided string map. The AlterOperation is set on each created entry.
+func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry {
+ var ceList []ConfigEntry
+
+ for k, v := range stringMap {
+ ceList = append(ceList, ConfigEntry{Name: k, Value: v, Operation: operation})
+ }
+
+ return ceList
+}
+
+// String returns a human-readable representation of a ConfigEntry.
+func (c ConfigEntry) String() string {
+ return fmt.Sprintf("%v %s=\"%s\"", c.Operation, c.Name, c.Value)
+}
+
+// ConfigEntryResult contains the result of a single configuration entry from a
+// DescribeConfigs request.
+type ConfigEntryResult struct {
+ // Name of configuration entry, e.g., topic configuration property name.
+ Name string
+ // Value of configuration entry.
+ Value string
+ // Source indicates the configuration source.
+ Source ConfigSource
+ // IsReadOnly indicates whether the configuration entry can be altered.
+ IsReadOnly bool
+ // IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset.
+ IsSensitive bool
+ // IsSynonym indicates whether the configuration entry is a synonym for another configuration property.
+ IsSynonym bool
+ // Synonyms contains a map of configuration entries that are synonyms to this configuration entry.
+ Synonyms map[string]ConfigEntryResult
+}
+
+// String returns a human-readable representation of a ConfigEntryResult.
+func (c ConfigEntryResult) String() string {
+ return fmt.Sprintf("%s=\"%s\"", c.Name, c.Value)
+}
+
+// setFromC sets up a ConfigEntryResult from a C ConfigEntry
+func configEntryResultFromC(cEntry *C.rd_kafka_ConfigEntry_t) (entry ConfigEntryResult) {
+ entry.Name = C.GoString(C.rd_kafka_ConfigEntry_name(cEntry))
+ cValue := C.rd_kafka_ConfigEntry_value(cEntry)
+ if cValue != nil {
+ entry.Value = C.GoString(cValue)
+ }
+ entry.Source = ConfigSource(C.rd_kafka_ConfigEntry_source(cEntry))
+ entry.IsReadOnly = cint2bool(C.rd_kafka_ConfigEntry_is_read_only(cEntry))
+ entry.IsSensitive = cint2bool(C.rd_kafka_ConfigEntry_is_sensitive(cEntry))
+ entry.IsSynonym = cint2bool(C.rd_kafka_ConfigEntry_is_synonym(cEntry))
+
+ var cSynCnt C.size_t
+ cSyns := C.rd_kafka_ConfigEntry_synonyms(cEntry, &cSynCnt)
+ if cSynCnt > 0 {
+ entry.Synonyms = make(map[string]ConfigEntryResult)
+ }
+
+ for si := 0; si < int(cSynCnt); si++ {
+ cSyn := C.ConfigEntry_by_idx(cSyns, cSynCnt, C.size_t(si))
+ Syn := configEntryResultFromC(cSyn)
+ entry.Synonyms[Syn.Name] = Syn
+ }
+
+ return entry
+}
+
+// ConfigResourceResult provides the result for a resource from a AlterConfigs or
+// DescribeConfigs request.
+type ConfigResourceResult struct {
+ // Type of returned result resource.
+ Type ResourceType
+ // Name of returned result resource.
+ Name string
+ // Error, if any, of returned result resource.
+ Error Error
+ // Config entries, if any, of returned result resource.
+ Config map[string]ConfigEntryResult
+}
+
+// String returns a human-readable representation of a ConfigResourceResult.
+func (c ConfigResourceResult) String() string {
+ if c.Error.Code() != 0 {
+ return fmt.Sprintf("ResourceResult(%s, %s, \"%v\")", c.Type, c.Name, c.Error)
+
+ }
+ return fmt.Sprintf("ResourceResult(%s, %s, %d config(s))", c.Type, c.Name, len(c.Config))
+}
+
+// waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens
+// first.
+// The returned result event is checked for errors its error is returned if set.
+func (a *AdminClient) waitResult(ctx context.Context, cQueue *C.rd_kafka_queue_t, cEventType C.rd_kafka_event_type_t) (rkev *C.rd_kafka_event_t, err error) {
+
+ resultChan := make(chan *C.rd_kafka_event_t)
+ closeChan := make(chan bool) // never written to, just closed
+
+ go func() {
+ for {
+ select {
+ case _, ok := <-closeChan:
+ if !ok {
+ // Context cancelled/timed out
+ close(resultChan)
+ return
+ }
+
+ default:
+ // Wait for result event for at most 50ms
+ // to avoid blocking for too long if
+ // context is cancelled.
+ rkev := C.rd_kafka_queue_poll(cQueue, 50)
+ if rkev != nil {
+ resultChan <- rkev
+ close(resultChan)
+ return
+ }
+ }
+ }
+ }()
+
+ select {
+ case rkev = <-resultChan:
+ // Result type check
+ if cEventType != C.rd_kafka_event_type(rkev) {
+ err = newErrorFromString(ErrInvalidType,
+ fmt.Sprintf("Expected %d result event, not %d", (int)(cEventType), (int)(C.rd_kafka_event_type(rkev))))
+ C.rd_kafka_event_destroy(rkev)
+ return nil, err
+ }
+
+ // Generic error handling
+ cErr := C.rd_kafka_event_error(rkev)
+ if cErr != 0 {
+ err = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
+ C.rd_kafka_event_destroy(rkev)
+ return nil, err
+ }
+ close(closeChan)
+ return rkev, nil
+ case <-ctx.Done():
+ // signal close to go-routine
+ close(closeChan)
+ // wait for close from go-routine to make sure it is done
+ // using cQueue before we return.
+ rkev, ok := <-resultChan
+ if ok {
+ // throw away result since context was cancelled
+ C.rd_kafka_event_destroy(rkev)
+ }
+ return nil, ctx.Err()
+ }
+}
+
+// cToTopicResults converts a C topic_result_t array to Go TopicResult list.
+func (a *AdminClient) cToTopicResults(cTopicRes **C.rd_kafka_topic_result_t, cCnt C.size_t) (result []TopicResult, err error) {
+
+ result = make([]TopicResult, int(cCnt))
+
+ for i := 0; i < int(cCnt); i++ {
+ cTopic := C.topic_result_by_idx(cTopicRes, cCnt, C.size_t(i))
+ result[i].Topic = C.GoString(C.rd_kafka_topic_result_name(cTopic))
+ result[i].Error = newErrorFromCString(
+ C.rd_kafka_topic_result_error(cTopic),
+ C.rd_kafka_topic_result_error_string(cTopic))
+ }
+
+ return result, nil
+}
+
+// cConfigResourceToResult converts a C ConfigResource result array to Go ConfigResourceResult
+func (a *AdminClient) cConfigResourceToResult(cRes **C.rd_kafka_ConfigResource_t, cCnt C.size_t) (result []ConfigResourceResult, err error) {
+
+ result = make([]ConfigResourceResult, int(cCnt))
+
+ for i := 0; i < int(cCnt); i++ {
+ cRes := C.ConfigResource_by_idx(cRes, cCnt, C.size_t(i))
+ result[i].Type = ResourceType(C.rd_kafka_ConfigResource_type(cRes))
+ result[i].Name = C.GoString(C.rd_kafka_ConfigResource_name(cRes))
+ result[i].Error = newErrorFromCString(
+ C.rd_kafka_ConfigResource_error(cRes),
+ C.rd_kafka_ConfigResource_error_string(cRes))
+ var cConfigCnt C.size_t
+ cConfigs := C.rd_kafka_ConfigResource_configs(cRes, &cConfigCnt)
+ if cConfigCnt > 0 {
+ result[i].Config = make(map[string]ConfigEntryResult)
+ }
+ for ci := 0; ci < int(cConfigCnt); ci++ {
+ cEntry := C.ConfigEntry_by_idx(cConfigs, cConfigCnt, C.size_t(ci))
+ entry := configEntryResultFromC(cEntry)
+ result[i].Config[entry.Name] = entry
+ }
+ }
+
+ return result, nil
+}
+
+// CreateTopics creates topics in cluster.
+//
+// The list of TopicSpecification objects define the per-topic partition count, replicas, etc.
+//
+// Topic creation is non-atomic and may succeed for some topics but fail for others,
+// make sure to check the result for topic-specific errors.
+//
+// Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
+func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error) {
+ cTopics := make([]*C.rd_kafka_NewTopic_t, len(topics))
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Convert Go TopicSpecifications to C TopicSpecifications
+ for i, topic := range topics {
+
+ var cReplicationFactor C.int
+ if topic.ReplicationFactor == 0 {
+ cReplicationFactor = -1
+ } else {
+ cReplicationFactor = C.int(topic.ReplicationFactor)
+ }
+ if topic.ReplicaAssignment != nil {
+ if cReplicationFactor != -1 {
+ return nil, newErrorFromString(ErrInvalidArg,
+ "TopicSpecification.ReplicationFactor and TopicSpecification.ReplicaAssignment are mutually exclusive")
+ }
+
+ if len(topic.ReplicaAssignment) != topic.NumPartitions {
+ return nil, newErrorFromString(ErrInvalidArg,
+ "TopicSpecification.ReplicaAssignment must contain exactly TopicSpecification.NumPartitions partitions")
+ }
+
+ } else if cReplicationFactor == -1 {
+ return nil, newErrorFromString(ErrInvalidArg,
+ "TopicSpecification.ReplicationFactor or TopicSpecification.ReplicaAssignment must be specified")
+ }
+
+ cTopics[i] = C.rd_kafka_NewTopic_new(
+ C.CString(topic.Topic),
+ C.int(topic.NumPartitions),
+ cReplicationFactor,
+ cErrstr, cErrstrSize)
+ if cTopics[i] == nil {
+ return nil, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Topic %s: %s", topic.Topic, C.GoString(cErrstr)))
+ }
+
+ defer C.rd_kafka_NewTopic_destroy(cTopics[i])
+
+ for p, replicas := range topic.ReplicaAssignment {
+ cReplicas := make([]C.int32_t, len(replicas))
+ for ri, replica := range replicas {
+ cReplicas[ri] = C.int32_t(replica)
+ }
+ cErr := C.rd_kafka_NewTopic_set_replica_assignment(
+ cTopics[i], C.int32_t(p),
+ (*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)),
+ cErrstr, cErrstrSize)
+ if cErr != 0 {
+ return nil, newCErrorFromString(cErr,
+ fmt.Sprintf("Failed to set replica assignment for topic %s partition %d: %s", topic.Topic, p, C.GoString(cErrstr)))
+ }
+ }
+
+ for key, value := range topic.Config {
+ cErr := C.rd_kafka_NewTopic_set_config(
+ cTopics[i],
+ C.CString(key), C.CString(value))
+ if cErr != 0 {
+ return nil, newCErrorFromString(cErr,
+ fmt.Sprintf("Failed to set config %s=%s for topic %s", key, value, topic.Topic))
+ }
+ }
+ }
+
+ // Convert Go AdminOptions (if any) to C AdminOptions
+ genericOptions := make([]AdminOption, len(options))
+ for i := range options {
+ genericOptions[i] = options[i]
+ }
+ cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATETOPICS, genericOptions)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+ // Create temporary queue for async operation
+ cQueue := C.rd_kafka_queue_new(a.handle.rk)
+ defer C.rd_kafka_queue_destroy(cQueue)
+
+ // Asynchronous call
+ C.rd_kafka_CreateTopics(
+ a.handle.rk,
+ (**C.rd_kafka_NewTopic_t)(&cTopics[0]),
+ C.size_t(len(cTopics)),
+ cOptions,
+ cQueue)
+
+ // Wait for result, error or context timeout
+ rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATETOPICS_RESULT)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ cRes := C.rd_kafka_event_CreateTopics_result(rkev)
+
+ // Convert result from C to Go
+ var cCnt C.size_t
+ cTopicRes := C.rd_kafka_CreateTopics_result_topics(cRes, &cCnt)
+
+ return a.cToTopicResults(cTopicRes, cCnt)
+}
+
+// DeleteTopics deletes a batch of topics.
+//
+// This operation is not transactional and may succeed for a subset of topics while
+// failing others.
+// It may take several seconds after the DeleteTopics result returns success for
+// all the brokers to become aware that the topics are gone. During this time,
+// topic metadata and configuration may continue to return information about deleted topics.
+//
+// Requires broker version >= 0.10.1.0
+func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error) {
+ cTopics := make([]*C.rd_kafka_DeleteTopic_t, len(topics))
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Convert Go DeleteTopics to C DeleteTopics
+ for i, topic := range topics {
+ cTopics[i] = C.rd_kafka_DeleteTopic_new(C.CString(topic))
+ if cTopics[i] == nil {
+ return nil, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Invalid arguments for topic %s", topic))
+ }
+
+ defer C.rd_kafka_DeleteTopic_destroy(cTopics[i])
+ }
+
+ // Convert Go AdminOptions (if any) to C AdminOptions
+ genericOptions := make([]AdminOption, len(options))
+ for i := range options {
+ genericOptions[i] = options[i]
+ }
+ cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DELETETOPICS, genericOptions)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+ // Create temporary queue for async operation
+ cQueue := C.rd_kafka_queue_new(a.handle.rk)
+ defer C.rd_kafka_queue_destroy(cQueue)
+
+ // Asynchronous call
+ C.rd_kafka_DeleteTopics(
+ a.handle.rk,
+ (**C.rd_kafka_DeleteTopic_t)(&cTopics[0]),
+ C.size_t(len(cTopics)),
+ cOptions,
+ cQueue)
+
+ // Wait for result, error or context timeout
+ rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DELETETOPICS_RESULT)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ cRes := C.rd_kafka_event_DeleteTopics_result(rkev)
+
+ // Convert result from C to Go
+ var cCnt C.size_t
+ cTopicRes := C.rd_kafka_DeleteTopics_result_topics(cRes, &cCnt)
+
+ return a.cToTopicResults(cTopicRes, cCnt)
+}
+
+// CreatePartitions creates additional partitions for topics.
+func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error) {
+ cParts := make([]*C.rd_kafka_NewPartitions_t, len(partitions))
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Convert Go PartitionsSpecification to C NewPartitions
+ for i, part := range partitions {
+ cParts[i] = C.rd_kafka_NewPartitions_new(C.CString(part.Topic), C.size_t(part.IncreaseTo), cErrstr, cErrstrSize)
+ if cParts[i] == nil {
+ return nil, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Topic %s: %s", part.Topic, C.GoString(cErrstr)))
+ }
+
+ defer C.rd_kafka_NewPartitions_destroy(cParts[i])
+
+ for pidx, replicas := range part.ReplicaAssignment {
+ cReplicas := make([]C.int32_t, len(replicas))
+ for ri, replica := range replicas {
+ cReplicas[ri] = C.int32_t(replica)
+ }
+ cErr := C.rd_kafka_NewPartitions_set_replica_assignment(
+ cParts[i], C.int32_t(pidx),
+ (*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)),
+ cErrstr, cErrstrSize)
+ if cErr != 0 {
+ return nil, newCErrorFromString(cErr,
+ fmt.Sprintf("Failed to set replica assignment for topic %s new partition index %d: %s", part.Topic, pidx, C.GoString(cErrstr)))
+ }
+ }
+
+ }
+
+ // Convert Go AdminOptions (if any) to C AdminOptions
+ genericOptions := make([]AdminOption, len(options))
+ for i := range options {
+ genericOptions[i] = options[i]
+ }
+ cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, genericOptions)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+ // Create temporary queue for async operation
+ cQueue := C.rd_kafka_queue_new(a.handle.rk)
+ defer C.rd_kafka_queue_destroy(cQueue)
+
+ // Asynchronous call
+ C.rd_kafka_CreatePartitions(
+ a.handle.rk,
+ (**C.rd_kafka_NewPartitions_t)(&cParts[0]),
+ C.size_t(len(cParts)),
+ cOptions,
+ cQueue)
+
+ // Wait for result, error or context timeout
+ rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ cRes := C.rd_kafka_event_CreatePartitions_result(rkev)
+
+ // Convert result from C to Go
+ var cCnt C.size_t
+ cTopicRes := C.rd_kafka_CreatePartitions_result_topics(cRes, &cCnt)
+
+ return a.cToTopicResults(cTopicRes, cCnt)
+}
+
+// AlterConfigs alters/updates cluster resource configuration.
+//
+// Updates are not transactional so they may succeed for a subset
+// of the provided resources while others fail.
+// The configuration for a particular resource is updated atomically,
+// replacing values using the provided ConfigEntrys and reverting
+// unspecified ConfigEntrys to their default values.
+//
+// Requires broker version >=0.11.0.0
+//
+// AlterConfigs will replace all existing configuration for
+// the provided resources with the new configuration given,
+// reverting all other configuration to their default values.
+//
+// Multiple resources and resource types may be set, but at most one
+// resource of type ResourceBroker is allowed per call since these
+// resource requests must be sent to the broker specified in the resource.
+func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error) {
+ cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources))
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Convert Go ConfigResources to C ConfigResources
+ for i, res := range resources {
+ cRes[i] = C.rd_kafka_ConfigResource_new(
+ C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name))
+ if cRes[i] == nil {
+ return nil, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Invalid arguments for resource %v", res))
+ }
+
+ defer C.rd_kafka_ConfigResource_destroy(cRes[i])
+
+ for _, entry := range res.Config {
+ var cErr C.rd_kafka_resp_err_t
+ switch entry.Operation {
+ case AlterOperationSet:
+ cErr = C.rd_kafka_ConfigResource_set_config(
+ cRes[i], C.CString(entry.Name), C.CString(entry.Value))
+ default:
+ panic(fmt.Sprintf("Invalid ConfigEntry.Operation: %v", entry.Operation))
+ }
+
+ if cErr != 0 {
+ return nil,
+ newCErrorFromString(cErr,
+ fmt.Sprintf("Failed to add configuration %s: %s",
+ entry, C.GoString(C.rd_kafka_err2str(cErr))))
+ }
+ }
+ }
+
+ // Convert Go AdminOptions (if any) to C AdminOptions
+ genericOptions := make([]AdminOption, len(options))
+ for i := range options {
+ genericOptions[i] = options[i]
+ }
+ cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_ALTERCONFIGS, genericOptions)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+ // Create temporary queue for async operation
+ cQueue := C.rd_kafka_queue_new(a.handle.rk)
+ defer C.rd_kafka_queue_destroy(cQueue)
+
+ // Asynchronous call
+ C.rd_kafka_AlterConfigs(
+ a.handle.rk,
+ (**C.rd_kafka_ConfigResource_t)(&cRes[0]),
+ C.size_t(len(cRes)),
+ cOptions,
+ cQueue)
+
+ // Wait for result, error or context timeout
+ rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_ALTERCONFIGS_RESULT)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ cResult := C.rd_kafka_event_AlterConfigs_result(rkev)
+
+ // Convert results from C to Go
+ var cCnt C.size_t
+ cResults := C.rd_kafka_AlterConfigs_result_resources(cResult, &cCnt)
+
+ return a.cConfigResourceToResult(cResults, cCnt)
+}
+
+// DescribeConfigs retrieves configuration for cluster resources.
+//
+// The returned configuration includes default values, use
+// ConfigEntryResult.IsDefault or ConfigEntryResult.Source to distinguish
+// default values from manually configured settings.
+//
+// The value of config entries where .IsSensitive is true
+// will always be nil to avoid disclosing sensitive
+// information, such as security settings.
+//
+// Configuration entries where .IsReadOnly is true can't be modified
+// (with AlterConfigs).
+//
+// Synonym configuration entries are returned if the broker supports
+// it (broker version >= 1.1.0). See .Synonyms.
+//
+// Requires broker version >=0.11.0.0
+//
+// Multiple resources and resource types may be requested, but at most
+// one resource of type ResourceBroker is allowed per call
+// since these resource requests must be sent to the broker specified
+// in the resource.
+func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error) {
+ cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources))
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Convert Go ConfigResources to C ConfigResources
+ for i, res := range resources {
+ cRes[i] = C.rd_kafka_ConfigResource_new(
+ C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name))
+ if cRes[i] == nil {
+ return nil, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Invalid arguments for resource %v", res))
+ }
+
+ defer C.rd_kafka_ConfigResource_destroy(cRes[i])
+ }
+
+ // Convert Go AdminOptions (if any) to C AdminOptions
+ genericOptions := make([]AdminOption, len(options))
+ for i := range options {
+ genericOptions[i] = options[i]
+ }
+ cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, genericOptions)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+ // Create temporary queue for async operation
+ cQueue := C.rd_kafka_queue_new(a.handle.rk)
+ defer C.rd_kafka_queue_destroy(cQueue)
+
+ // Asynchronous call
+ C.rd_kafka_DescribeConfigs(
+ a.handle.rk,
+ (**C.rd_kafka_ConfigResource_t)(&cRes[0]),
+ C.size_t(len(cRes)),
+ cOptions,
+ cQueue)
+
+ // Wait for result, error or context timeout
+ rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT)
+ if err != nil {
+ return nil, err
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ cResult := C.rd_kafka_event_DescribeConfigs_result(rkev)
+
+ // Convert results from C to Go
+ var cCnt C.size_t
+ cResults := C.rd_kafka_DescribeConfigs_result_resources(cResult, &cCnt)
+
+ return a.cConfigResourceToResult(cResults, cCnt)
+}
+
+// GetMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
+func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+ return getMetadata(a, topic, allTopics, timeoutMs)
+}
+
+// String returns a human readable name for an AdminClient instance
+func (a *AdminClient) String() string {
+ return fmt.Sprintf("admin-%s", a.handle.String())
+}
+
+// get_handle implements the Handle interface
+func (a *AdminClient) gethandle() *handle {
+ return a.handle
+}
+
+// Close an AdminClient instance.
+func (a *AdminClient) Close() {
+ if a.isDerived {
+ // Derived AdminClient needs no cleanup.
+ a.handle = &handle{}
+ return
+ }
+
+ a.handle.cleanup()
+
+ C.rd_kafka_destroy(a.handle.rk)
+}
+
+// NewAdminClient creats a new AdminClient instance with a new underlying client instance
+func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {
+
+ err := versionCheck()
+ if err != nil {
+ return nil, err
+ }
+
+ a := &AdminClient{}
+ a.handle = &handle{}
+
+ // Convert ConfigMap to librdkafka conf_t
+ cConf, err := conf.convert()
+ if err != nil {
+ return nil, err
+ }
+
+ cErrstr := (*C.char)(C.malloc(C.size_t(256)))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ // Create librdkafka producer instance. The Producer is somewhat cheaper than
+ // the consumer, but any instance type can be used for Admin APIs.
+ a.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
+ if a.handle.rk == nil {
+ return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+ }
+
+ a.isDerived = false
+ a.handle.setup()
+
+ return a, nil
+}
+
+// NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance.
+// The AdminClient will use the same configuration and connections as the parent instance.
+func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error) {
+ if p.handle.rk == nil {
+ return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed producer")
+ }
+
+ a = &AdminClient{}
+ a.handle = &p.handle
+ a.isDerived = true
+ return a, nil
+}
+
+// NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance.
+// The AdminClient will use the same configuration and connections as the parent instance.
+func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) {
+ if c.handle.rk == nil {
+ return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed consumer")
+ }
+
+ a = &AdminClient{}
+ a.handle = &c.handle
+ a.isDerived = true
+ return a, nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminoptions.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminoptions.go
new file mode 100644
index 0000000..19a8be0
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminoptions.go
@@ -0,0 +1,264 @@
+/**
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+ "fmt"
+ "time"
+ "unsafe"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+#include <stdlib.h>
+*/
+import "C"
+
+// AdminOptionOperationTimeout sets the broker's operation timeout, such as the
+// timeout for CreateTopics to complete the creation of topics on the controller
+// before returning a result to the application.
+//
+// CreateTopics, DeleteTopics, CreatePartitions:
+// a value 0 will return immediately after triggering topic
+// creation, while > 0 will wait this long for topic creation to propagate
+// in cluster.
+//
+// Default: 0 (return immediately).
+//
+// Valid for CreateTopics, DeleteTopics, CreatePartitions.
+type AdminOptionOperationTimeout struct {
+ isSet bool
+ val time.Duration
+}
+
+func (ao AdminOptionOperationTimeout) supportsCreateTopics() {
+}
+func (ao AdminOptionOperationTimeout) supportsDeleteTopics() {
+}
+func (ao AdminOptionOperationTimeout) supportsCreatePartitions() {
+}
+
+func (ao AdminOptionOperationTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
+ if !ao.isSet {
+ return nil
+ }
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ cErr := C.rd_kafka_AdminOptions_set_operation_timeout(
+ cOptions, C.int(durationToMilliseconds(ao.val)),
+ cErrstr, cErrstrSize)
+ if cErr != 0 {
+ C.rd_kafka_AdminOptions_destroy(cOptions)
+ return newCErrorFromString(cErr,
+ fmt.Sprintf("Failed to set operation timeout: %s", C.GoString(cErrstr)))
+
+ }
+
+ return nil
+}
+
+// SetAdminOperationTimeout sets the broker's operation timeout, such as the
+// timeout for CreateTopics to complete the creation of topics on the controller
+// before returning a result to the application.
+//
+// CreateTopics, DeleteTopics, CreatePartitions:
+// a value 0 will return immediately after triggering topic
+// creation, while > 0 will wait this long for topic creation to propagate
+// in cluster.
+//
+// Default: 0 (return immediately).
+//
+// Valid for CreateTopics, DeleteTopics, CreatePartitions.
+func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout) {
+ ao.isSet = true
+ ao.val = t
+ return ao
+}
+
+// AdminOptionRequestTimeout sets the overall request timeout, including broker
+// lookup, request transmission, operation time on broker, and response.
+//
+// Default: `socket.timeout.ms`.
+//
+// Valid for all Admin API methods.
+type AdminOptionRequestTimeout struct {
+ isSet bool
+ val time.Duration
+}
+
+func (ao AdminOptionRequestTimeout) supportsCreateTopics() {
+}
+func (ao AdminOptionRequestTimeout) supportsDeleteTopics() {
+}
+func (ao AdminOptionRequestTimeout) supportsCreatePartitions() {
+}
+func (ao AdminOptionRequestTimeout) supportsAlterConfigs() {
+}
+func (ao AdminOptionRequestTimeout) supportsDescribeConfigs() {
+}
+
+func (ao AdminOptionRequestTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
+ if !ao.isSet {
+ return nil
+ }
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ cErr := C.rd_kafka_AdminOptions_set_request_timeout(
+ cOptions, C.int(durationToMilliseconds(ao.val)),
+ cErrstr, cErrstrSize)
+ if cErr != 0 {
+ C.rd_kafka_AdminOptions_destroy(cOptions)
+ return newCErrorFromString(cErr,
+ fmt.Sprintf("%s", C.GoString(cErrstr)))
+
+ }
+
+ return nil
+}
+
+// SetAdminRequestTimeout sets the overall request timeout, including broker
+// lookup, request transmission, operation time on broker, and response.
+//
+// Default: `socket.timeout.ms`.
+//
+// Valid for all Admin API methods.
+func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout) {
+ ao.isSet = true
+ ao.val = t
+ return ao
+}
+
+// AdminOptionValidateOnly tells the broker to only validate the request,
+// without performing the requested operation (create topics, etc).
+//
+// Default: false.
+//
+// Valid for CreateTopics, CreatePartitions, AlterConfigs
+type AdminOptionValidateOnly struct {
+ isSet bool
+ val bool
+}
+
+func (ao AdminOptionValidateOnly) supportsCreateTopics() {
+}
+func (ao AdminOptionValidateOnly) supportsCreatePartitions() {
+}
+func (ao AdminOptionValidateOnly) supportsAlterConfigs() {
+}
+
+func (ao AdminOptionValidateOnly) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
+ if !ao.isSet {
+ return nil
+ }
+
+ cErrstrSize := C.size_t(512)
+ cErrstr := (*C.char)(C.malloc(cErrstrSize))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ cErr := C.rd_kafka_AdminOptions_set_validate_only(
+ cOptions, bool2cint(ao.val),
+ cErrstr, cErrstrSize)
+ if cErr != 0 {
+ C.rd_kafka_AdminOptions_destroy(cOptions)
+ return newCErrorFromString(cErr,
+ fmt.Sprintf("%s", C.GoString(cErrstr)))
+
+ }
+
+ return nil
+}
+
+// SetAdminValidateOnly tells the broker to only validate the request,
+// without performing the requested operation (create topics, etc).
+//
+// Default: false.
+//
+// Valid for CreateTopics, DeleteTopics, CreatePartitions, AlterConfigs
+func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly) {
+ ao.isSet = true
+ ao.val = validateOnly
+ return ao
+}
+
+// CreateTopicsAdminOption - see setters.
+//
+// See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
+type CreateTopicsAdminOption interface {
+ supportsCreateTopics()
+ apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+// DeleteTopicsAdminOption - see setters.
+//
+// See SetAdminRequestTimeout, SetAdminOperationTimeout.
+type DeleteTopicsAdminOption interface {
+ supportsDeleteTopics()
+ apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+// CreatePartitionsAdminOption - see setters.
+//
+// See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
+type CreatePartitionsAdminOption interface {
+ supportsCreatePartitions()
+ apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+// AlterConfigsAdminOption - see setters.
+//
+// See SetAdminRequestTimeout, SetAdminValidateOnly, SetAdminIncremental.
+type AlterConfigsAdminOption interface {
+ supportsAlterConfigs()
+ apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+// DescribeConfigsAdminOption - see setters.
+//
+// See SetAdminRequestTimeout.
+type DescribeConfigsAdminOption interface {
+ supportsDescribeConfigs()
+ apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+// AdminOption is a generic type not to be used directly.
+//
+// See CreateTopicsAdminOption et.al.
+type AdminOption interface {
+ apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+func adminOptionsSetup(h *handle, opType C.rd_kafka_admin_op_t, options []AdminOption) (*C.rd_kafka_AdminOptions_t, error) {
+
+ cOptions := C.rd_kafka_AdminOptions_new(h.rk, opType)
+ for _, opt := range options {
+ if opt == nil {
+ continue
+ }
+ err := opt.apply(cOptions)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return cOptions, nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/api.html b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/api.html
new file mode 100644
index 0000000..05c8fed
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/api.html
@@ -0,0 +1,1632 @@
+<!DOCTYPE html>
+<html>
+ <head>
+ <meta content="text/html; charset=utf-8" http-equiv="Content-Type">
+ <meta content="width=device-width, initial-scale=1" name="viewport">
+ <meta content="#375EAB" name="theme-color">
+ <title>
+ kafka - The Go Programming Language
+ </title>
+ <link href="http://golang.org/lib/godoc/style.css" rel="stylesheet" type="text/css">
+ <link href="http://golang.org/lib/godoc/jquery.treeview.css" rel="stylesheet">
+ <script type="text/javascript">
+ window.initFuncs = [];
+ </script>
+ </link>
+ </link>
+ </meta>
+ </meta>
+ </meta>
+ </head>
+ <body>
+ <div id="lowframe" style="position: fixed; bottom: 0; left: 0; height: 0; width: 100%; border-top: thin solid grey; background-color: white; overflow: auto;">
+ ...
+ </div>
+ <!-- #lowframe -->
+ <div class="wide" id="page">
+ <div class="container">
+ <h1>
+ Package kafka
+ </h1>
+ <div id="nav">
+ </div>
+ <!--
+ Copyright 2009 The Go Authors. All rights reserved.
+ Use of this source code is governed by a BSD-style
+ license that can be found in the LICENSE file.
+-->
+ <!--
+ Note: Static (i.e., not template-generated) href and id
+ attributes start with "pkg-" to make it impossible for
+ them to conflict with generated attributes (some of which
+ correspond to Go identifiers).
+-->
+ <script type="text/javascript">
+ document.ANALYSIS_DATA = null;
+ document.CALLGRAPH = null;
+ </script>
+ <div id="short-nav">
+ <dl>
+ <dd>
+ <code>
+ import "github.com/confluentinc/confluent-kafka-go/kafka"
+ </code>
+ </dd>
+ </dl>
+ <dl>
+ <dd>
+ <a class="overviewLink" href="#pkg-overview">
+ Overview
+ </a>
+ </dd>
+ <dd>
+ <a class="indexLink" href="#pkg-index">
+ Index
+ </a>
+ </dd>
+ <dd>
+ </dd>
+ </dl>
+ </div>
+ <!-- The package's Name is printed as title by the top-level template -->
+ <div class="toggleVisible" id="pkg-overview">
+ <div class="collapsed">
+ <h2 class="toggleButton" title="Click to show Overview section">
+ Overview â–¹
+ </h2>
+ </div>
+ <div class="expanded">
+ <h2 class="toggleButton" title="Click to hide Overview section">
+ Overview â–¾
+ </h2>
+ <p>
+ Package kafka provides high-level Apache Kafka producer and consumers
+using bindings on-top of the librdkafka C library.
+ </p>
+ <h3 id="hdr-High_level_Consumer">
+ High-level Consumer
+ </h3>
+ <p>
+ * Decide if you want to read messages and events from the `.Events()` channel
+(set `"go.events.channel.enable": true`) or by calling `.Poll()`.
+ </p>
+ <p>
+ * Create a Consumer with `kafka.NewConsumer()` providing at
+least the `bootstrap.servers` and `group.id` configuration properties.
+ </p>
+ <p>
+ * Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics)
+to join the group with the specified subscription set.
+Subscriptions are atomic, calling `.Subscribe*()` again will leave
+the group and rejoin with the new set of topics.
+ </p>
+ <p>
+ * Start reading events and messages from either the `.Events` channel
+or by calling `.Poll()`.
+ </p>
+ <p>
+ * When the group has rebalanced each client member is assigned a
+(sub-)set of topic+partitions.
+By default the consumer will start fetching messages for its assigned
+partitions at this point, but your application may enable rebalance
+events to get an insight into what the assigned partitions where
+as well as set the initial offsets. To do this you need to pass
+`"go.application.rebalance.enable": true` to the `NewConsumer()` call
+mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event
+with the assigned partition set. You can optionally modify the initial
+offsets (they'll default to stored offsets and if there are no previously stored
+offsets it will fall back to `"default.topic.config": ConfigMap{"auto.offset.reset": ..}`
+which defaults to the `latest` message) and then call `.Assign(partitions)`
+to start consuming. If you don't need to modify the initial offsets you will
+not need to call `.Assign()`, the client will do so automatically for you if
+you dont.
+ </p>
+ <p>
+ * As messages are fetched they will be made available on either the
+`.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.
+ </p>
+ <p>
+ * Handle messages, events and errors to your liking.
+ </p>
+ <p>
+ * When you are done consuming call `.Close()` to commit final offsets
+and leave the consumer group.
+ </p>
+ <h3 id="hdr-Producer">
+ Producer
+ </h3>
+ <p>
+ * Create a Producer with `kafka.NewProducer()` providing at least
+the `bootstrap.servers` configuration properties.
+ </p>
+ <p>
+ * Messages may now be produced either by sending a `*kafka.Message`
+on the `.ProduceChannel` or by calling `.Produce()`.
+ </p>
+ <p>
+ * Producing is an asynchronous operation so the client notifies the application
+of per-message produce success or failure through something called delivery reports.
+Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message`
+and you should check `msg.TopicPartition.Error` for `nil` to find out if the message
+was succesfully delivered or not.
+It is also possible to direct delivery reports to alternate channels
+by providing a non-nil `chan Event` channel to `.Produce()`.
+If no delivery reports are wanted they can be completely disabled by
+setting configuration property `"go.delivery.reports": false`.
+ </p>
+ <p>
+ * When you are done producing messages you will need to make sure all messages
+are indeed delivered to the broker (or failed), remember that this is
+an asynchronous client so some of your messages may be lingering in internal
+channels or tranmission queues.
+To do this you can either keep track of the messages you've produced
+and wait for their corresponding delivery reports, or call the convenience
+function `.Flush()` that will block until all message deliveries are done
+or the provided timeout elapses.
+ </p>
+ <p>
+ * Finally call `.Close()` to decommission the producer.
+ </p>
+ <h3 id="hdr-Events">
+ Events
+ </h3>
+ <p>
+ Apart from emitting messages and delivery reports the client also communicates
+with the application through a number of different event types.
+An application may choose to handle or ignore these events.
+ </p>
+ <h3 id="hdr-Consumer_events">
+ Consumer events
+ </h3>
+ <p>
+ * `*kafka.Message` - a fetched message.
+ </p>
+ <p>
+ * `AssignedPartitions` - The assigned partition set for this client following a rebalance.
+Requires `go.application.rebalance.enable`
+ </p>
+ <p>
+ * `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance.
+`AssignedPartitions` and `RevokedPartitions` are symetrical.
+Requires `go.application.rebalance.enable`
+ </p>
+ <p>
+ * `PartitionEOF` - Consumer has reached the end of a partition.
+NOTE: The consumer will keep trying to fetch new messages for the partition.
+ </p>
+ <p>
+ * `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).
+ </p>
+ <h3 id="hdr-Producer_events">
+ Producer events
+ </h3>
+ <p>
+ * `*kafka.Message` - delivery report for produced message.
+Check `.TopicPartition.Error` for delivery result.
+ </p>
+ <h3 id="hdr-Generic_events_for_both_Consumer_and_Producer">
+ Generic events for both Consumer and Producer
+ </h3>
+ <p>
+ * `KafkaError` - client (error codes are prefixed with _) or broker error.
+These errors are normally just informational since the
+client will try its best to automatically recover (eventually).
+ </p>
+ <p>
+ Hint: If your application registers a signal notification
+(signal.Notify) makes sure the signals channel is buffered to avoid
+possible complications with blocking Poll() calls.
+ </p>
+ </div>
+ </div>
+ <div class="toggleVisible" id="pkg-index">
+ <div class="collapsed">
+ <h2 class="toggleButton" title="Click to show Index section">
+ Index â–¹
+ </h2>
+ </div>
+ <div class="expanded">
+ <h2 class="toggleButton" title="Click to hide Index section">
+ Index â–¾
+ </h2>
+ <!-- Table of contents for API; must be named manual-nav to turn off auto nav. -->
+ <div id="manual-nav">
+ <dl>
+ <dd>
+ <a href="#pkg-constants">
+ Constants
+ </a>
+ </dd>
+ <dd>
+ <a href="#LibraryVersion">
+ func LibraryVersion() (int, string)
+ </a>
+ </dd>
+ <dd>
+ <a href="#AssignedPartitions">
+ type AssignedPartitions
+ </a>
+ </dd>
+ <dd>
+ <a href="#AssignedPartitions.String">
+ func (e AssignedPartitions) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#BrokerMetadata">
+ type BrokerMetadata
+ </a>
+ </dd>
+ <dd>
+ <a href="#ConfigMap">
+ type ConfigMap
+ </a>
+ </dd>
+ <dd>
+ <a href="#ConfigMap.Set">
+ func (m ConfigMap) Set(kv string) error
+ </a>
+ </dd>
+ <dd>
+ <a href="#ConfigMap.SetKey">
+ func (m ConfigMap) SetKey(key string, value ConfigValue) error
+ </a>
+ </dd>
+ <dd>
+ <a href="#ConfigValue">
+ type ConfigValue
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer">
+ type Consumer
+ </a>
+ </dd>
+ <dd>
+ <a href="#NewConsumer">
+ func NewConsumer(conf *ConfigMap) (*Consumer, error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.Assign">
+ func (c *Consumer) Assign(partitions []TopicPartition) (err error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.Close">
+ func (c *Consumer) Close() (err error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.Commit">
+ func (c *Consumer) Commit() ([]TopicPartition, error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.CommitMessage">
+ func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.CommitOffsets">
+ func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.Events">
+ func (c *Consumer) Events() chan Event
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.GetMetadata">
+ func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.Poll">
+ func (c *Consumer) Poll(timeoutMs int) (event Event)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.QueryWatermarkOffsets">
+ func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.String">
+ func (c *Consumer) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.Subscribe">
+ func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.SubscribeTopics">
+ func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.Unassign">
+ func (c *Consumer) Unassign() (err error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Consumer.Unsubscribe">
+ func (c *Consumer) Unsubscribe() (err error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Error">
+ type Error
+ </a>
+ </dd>
+ <dd>
+ <a href="#Error.Code">
+ func (e Error) Code() ErrorCode
+ </a>
+ </dd>
+ <dd>
+ <a href="#Error.Error">
+ func (e Error) Error() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#Error.String">
+ func (e Error) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#ErrorCode">
+ type ErrorCode
+ </a>
+ </dd>
+ <dd>
+ <a href="#ErrorCode.String">
+ func (c ErrorCode) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#Event">
+ type Event
+ </a>
+ </dd>
+ <dd>
+ <a href="#Handle">
+ type Handle
+ </a>
+ </dd>
+ <dd>
+ <a href="#Message">
+ type Message
+ </a>
+ </dd>
+ <dd>
+ <a href="#Message.String">
+ func (m *Message) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#Metadata">
+ type Metadata
+ </a>
+ </dd>
+ <dd>
+ <a href="#Offset">
+ type Offset
+ </a>
+ </dd>
+ <dd>
+ <a href="#NewOffset">
+ func NewOffset(offset interface{}) (Offset, error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#OffsetTail">
+ func OffsetTail(relativeOffset Offset) Offset
+ </a>
+ </dd>
+ <dd>
+ <a href="#Offset.Set">
+ func (o Offset) Set(offset interface{}) error
+ </a>
+ </dd>
+ <dd>
+ <a href="#Offset.String">
+ func (o Offset) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#OffsetsCommitted">
+ type OffsetsCommitted
+ </a>
+ </dd>
+ <dd>
+ <a href="#OffsetsCommitted.String">
+ func (o OffsetsCommitted) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#PartitionEOF">
+ type PartitionEOF
+ </a>
+ </dd>
+ <dd>
+ <a href="#PartitionEOF.String">
+ func (p PartitionEOF) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#PartitionMetadata">
+ type PartitionMetadata
+ </a>
+ </dd>
+ <dd>
+ <a href="#Producer">
+ type Producer
+ </a>
+ </dd>
+ <dd>
+ <a href="#NewProducer">
+ func NewProducer(conf *ConfigMap) (*Producer, error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Producer.Close">
+ func (p *Producer) Close()
+ </a>
+ </dd>
+ <dd>
+ <a href="#Producer.Events">
+ func (p *Producer) Events() chan Event
+ </a>
+ </dd>
+ <dd>
+ <a href="#Producer.Flush">
+ func (p *Producer) Flush(timeoutMs int) int
+ </a>
+ </dd>
+ <dd>
+ <a href="#Producer.GetMetadata">
+ func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Producer.Len">
+ func (p *Producer) Len() int
+ </a>
+ </dd>
+ <dd>
+ <a href="#Producer.Produce">
+ func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error
+ </a>
+ </dd>
+ <dd>
+ <a href="#Producer.ProduceChannel">
+ func (p *Producer) ProduceChannel() chan *Message
+ </a>
+ </dd>
+ <dd>
+ <a href="#Producer.QueryWatermarkOffsets">
+ func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
+ </a>
+ </dd>
+ <dd>
+ <a href="#Producer.String">
+ func (p *Producer) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#RebalanceCb">
+ type RebalanceCb
+ </a>
+ </dd>
+ <dd>
+ <a href="#RevokedPartitions">
+ type RevokedPartitions
+ </a>
+ </dd>
+ <dd>
+ <a href="#RevokedPartitions.String">
+ func (e RevokedPartitions) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#TimestampType">
+ type TimestampType
+ </a>
+ </dd>
+ <dd>
+ <a href="#TimestampType.String">
+ func (t TimestampType) String() string
+ </a>
+ </dd>
+ <dd>
+ <a href="#TopicMetadata">
+ type TopicMetadata
+ </a>
+ </dd>
+ <dd>
+ <a href="#TopicPartition">
+ type TopicPartition
+ </a>
+ </dd>
+ <dd>
+ <a href="#TopicPartition.String">
+ func (p TopicPartition) String() string
+ </a>
+ </dd>
+ </dl>
+ </div>
+ <!-- #manual-nav -->
+ <h4>
+ Package files
+ </h4>
+ <p>
+ <span style="font-size:90%">
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/build_dynamic.go">
+ build_dynamic.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go">
+ config.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go">
+ consumer.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go">
+ error.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go">
+ event.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go">
+ generated_errors.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/handle.go">
+ handle.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go">
+ kafka.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go">
+ message.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go">
+ metadata.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/misc.go">
+ misc.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go">
+ producer.go
+ </a>
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/testhelpers.go">
+ testhelpers.go
+ </a>
+ </span>
+ </p>
+ </div>
+ <!-- .expanded -->
+ </div>
+ <!-- #pkg-index -->
+ <div class="toggle" id="pkg-callgraph" style="display: none">
+ <div class="collapsed">
+ <h2 class="toggleButton" title="Click to show Internal Call Graph section">
+ Internal call graph â–¹
+ </h2>
+ </div>
+ <!-- .expanded -->
+ <div class="expanded">
+ <h2 class="toggleButton" title="Click to hide Internal Call Graph section">
+ Internal call graph â–¾
+ </h2>
+ <p>
+ In the call graph viewer below, each node
+ is a function belonging to this package
+ and its children are the functions it
+ calls—perhaps dynamically.
+ </p>
+ <p>
+ The root nodes are the entry points of the
+ package: functions that may be called from
+ outside the package.
+ There may be non-exported or anonymous
+ functions among them if they are called
+ dynamically from another package.
+ </p>
+ <p>
+ Click a node to visit that function's source code.
+ From there you can visit its callers by
+ clicking its declaring
+ <code>
+ func
+ </code>
+ token.
+ </p>
+ <p>
+ Functions may be omitted if they were
+ determined to be unreachable in the
+ particular programs or tests that were
+ analyzed.
+ </p>
+ <!-- Zero means show all package entry points. -->
+ <ul class="treeview" id="callgraph-0" style="margin-left: 0.5in">
+ </ul>
+ </div>
+ </div>
+ <!-- #pkg-callgraph -->
+ <h2 id="pkg-constants">
+ Constants
+ </h2>
+ <pre>const (
+ <span class="comment">// TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support</span>
+ <span id="TimestampNotAvailable">TimestampNotAvailable</span> = <a href="#TimestampType">TimestampType</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_TIMESTAMP_NOT_AVAILABLE">RD_KAFKA_TIMESTAMP_NOT_AVAILABLE</a>)
+ <span class="comment">// TimestampCreateTime indicates timestamp set by producer (source time)</span>
+ <span id="TimestampCreateTime">TimestampCreateTime</span> = <a href="#TimestampType">TimestampType</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_TIMESTAMP_CREATE_TIME">RD_KAFKA_TIMESTAMP_CREATE_TIME</a>)
+ <span class="comment">// TimestampLogAppendTime indicates timestamp set set by broker (store time)</span>
+ <span id="TimestampLogAppendTime">TimestampLogAppendTime</span> = <a href="#TimestampType">TimestampType</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME">RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME</a>)
+)</pre>
+ <pre>const <span id="OffsetBeginning">OffsetBeginning</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_BEGINNING">RD_KAFKA_OFFSET_BEGINNING</a>)</pre>
+ <p>
+ Earliest offset (logical)
+ </p>
+ <pre>const <span id="OffsetEnd">OffsetEnd</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_END">RD_KAFKA_OFFSET_END</a>)</pre>
+ <p>
+ Latest offset (logical)
+ </p>
+ <pre>const <span id="OffsetInvalid">OffsetInvalid</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_INVALID">RD_KAFKA_OFFSET_INVALID</a>)</pre>
+ <p>
+ Invalid/unspecified offset
+ </p>
+ <pre>const <span id="OffsetStored">OffsetStored</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_STORED">RD_KAFKA_OFFSET_STORED</a>)</pre>
+ <p>
+ Use stored offset
+ </p>
+ <pre>const <span id="PartitionAny">PartitionAny</span> = <a href="http://golang.org/pkg/builtin/#int32">int32</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_PARTITION_UA">RD_KAFKA_PARTITION_UA</a>)</pre>
+ <p>
+ Any partition (for partitioning), or unspecified value (for all other cases)
+ </p>
+ <h2 id="LibraryVersion">
+ func
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=10095:10130#L292">
+ LibraryVersion
+ </a>
+ </h2>
+ <pre>func LibraryVersion() (<a href="http://golang.org/pkg/builtin/#int">int</a>, <a href="http://golang.org/pkg/builtin/#string">string</a>)</pre>
+ <p>
+ LibraryVersion returns the underlying librdkafka library version as a
+(version_int, version_str) tuple.
+ </p>
+ <h2 id="AssignedPartitions">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1621:1684#L49">
+ AssignedPartitions
+ </a>
+ </h2>
+ <pre>type AssignedPartitions struct {
+ Partitions []<a href="#TopicPartition">TopicPartition</a>
+}</pre>
+ <p>
+ AssignedPartitions consumer group rebalance event: assigned partition set
+ </p>
+ <h3 id="AssignedPartitions.String">
+ func (AssignedPartitions)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1686:1729#L53">
+ String
+ </a>
+ </h3>
+ <pre>func (e <a href="#AssignedPartitions">AssignedPartitions</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <h2 id="BrokerMetadata">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1266:1331#L37">
+ BrokerMetadata
+ </a>
+ </h2>
+ <pre>type BrokerMetadata struct {
+ ID <a href="http://golang.org/pkg/builtin/#int32">int32</a>
+ Host <a href="http://golang.org/pkg/builtin/#string">string</a>
+ Port <a href="http://golang.org/pkg/builtin/#int">int</a>
+}</pre>
+ <p>
+ BrokerMetadata contains per-broker metadata
+ </p>
+ <h2 id="ConfigMap">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=1172:1209#L31">
+ ConfigMap
+ </a>
+ </h2>
+ <pre>type ConfigMap map[<a href="http://golang.org/pkg/builtin/#string">string</a>]<a href="#ConfigValue">ConfigValue</a></pre>
+ <p>
+ ConfigMap is a map contaning standard librdkafka configuration properties as documented in:
+ <a href="https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md">
+ https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md
+ </a>
+ </p>
+ <p>
+ The special property "default.topic.config" (optional) is a ConfigMap containing default topic
+configuration properties.
+ </p>
+ <h3 id="ConfigMap.Set">
+ func (ConfigMap)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=1813:1852#L52">
+ Set
+ </a>
+ </h3>
+ <pre>func (m <a href="#ConfigMap">ConfigMap</a>) Set(kv <a href="http://golang.org/pkg/builtin/#string">string</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+ <p>
+ Set implements flag.Set (command line argument parser) as a convenience
+for `-X key=value` config.
+ </p>
+ <h3 id="ConfigMap.SetKey">
+ func (ConfigMap)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=1370:1432#L36">
+ SetKey
+ </a>
+ </h3>
+ <pre>func (m <a href="#ConfigMap">ConfigMap</a>) SetKey(key <a href="http://golang.org/pkg/builtin/#string">string</a>, value <a href="#ConfigValue">ConfigValue</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+ <p>
+ SetKey sets configuration property key to value.
+For user convenience a key prefixed with {topic}. will be
+set on the "default.topic.config" sub-map.
+ </p>
+ <h2 id="ConfigValue">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=846:874#L24">
+ ConfigValue
+ </a>
+ </h2>
+ <pre>type ConfigValue interface{}</pre>
+ <p>
+ ConfigValue supports the following types:
+ </p>
+ <pre>bool, int, string, any type with the standard String() interface
+</pre>
+ <h2 id="Consumer">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=968:1205#L25">
+ Consumer
+ </a>
+ </h2>
+ <pre>type Consumer struct {
+ <span class="comment">// contains filtered or unexported fields</span>
+}</pre>
+ <p>
+ Consumer implements a High-level Apache Kafka Consumer instance
+ </p>
+ <h3 id="NewConsumer">
+ func
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=7696:7748#L242">
+ NewConsumer
+ </a>
+ </h3>
+ <pre>func NewConsumer(conf *<a href="#ConfigMap">ConfigMap</a>) (*<a href="#Consumer">Consumer</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ NewConsumer creates a new high-level Consumer instance.
+ </p>
+ <p>
+ Supported special configuration properties:
+ </p>
+ <pre>go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
+ If set to true the app must handle the AssignedPartitions and
+ RevokedPartitions events and call Assign() and Unassign()
+ respectively.
+go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
+go.events.channel.size (int, 1000) - Events() channel size
+</pre>
+ <p>
+ WARNING: Due to the buffering nature of channels (and queues in general) the
+use of the events channel risks receiving outdated events and
+messages. Minimizing go.events.channel.size reduces the risk
+and number of outdated events and messages but does not eliminate
+the factor completely. With a channel size of 1 at most one
+event or message may be outdated.
+ </p>
+ <h3 id="Consumer.Assign">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=2641:2707#L82">
+ Assign
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) Assign(partitions []<a href="#TopicPartition">TopicPartition</a>) (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ Assign an atomic set of partitions to consume.
+This replaces the current assignment.
+ </p>
+ <h3 id="Consumer.Close">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=6121:6159#L202">
+ Close
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) Close() (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ Close Consumer instance.
+The object is no longer usable after this call.
+ </p>
+ <h3 id="Consumer.Commit">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=4853:4906#L159">
+ Commit
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) Commit() ([]<a href="#TopicPartition">TopicPartition</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ Commit offsets for currently assigned partitions
+This is a blocking call.
+Returns the committed offsets on success.
+ </p>
+ <h3 id="Consumer.CommitMessage">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5070:5140#L166">
+ CommitMessage
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) CommitMessage(m *<a href="#Message">Message</a>) ([]<a href="#TopicPartition">TopicPartition</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ CommitMessage commits offset based on the provided message.
+This is a blocking call.
+Returns the committed offsets on success.
+ </p>
+ <h3 id="Consumer.CommitOffsets">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5473:5557#L178">
+ CommitOffsets
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) CommitOffsets(offsets []<a href="#TopicPartition">TopicPartition</a>) ([]<a href="#TopicPartition">TopicPartition</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ CommitOffsets commits the provided list of offsets
+This is a blocking call.
+Returns the committed offsets on success.
+ </p>
+ <h3 id="Consumer.Events">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5981:6019#L196">
+ Events
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) Events() chan <a href="#Event">Event</a></pre>
+ <p>
+ Events returns the Events channel (if enabled)
+ </p>
+ <h3 id="Consumer.GetMetadata">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=10490:10585#L347">
+ GetMetadata
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) GetMetadata(topic *<a href="http://golang.org/pkg/builtin/#string">string</a>, allTopics <a href="http://golang.org/pkg/builtin/#bool">bool</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (*<a href="#Metadata">Metadata</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ GetMetadata queries broker for cluster and topic metadata.
+If topic is non-nil only information about that topic is returned, else if
+allTopics is false only information about locally used topics is returned,
+else information about all topics is returned.
+ </p>
+ <h3 id="Consumer.Poll">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5809:5861#L190">
+ Poll
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) Poll(timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (event <a href="#Event">Event</a>)</pre>
+ <p>
+ Poll the consumer for messages or events.
+ </p>
+ <h3 id="hdr-Will_block_for_at_most_timeoutMs_milliseconds">
+ Will block for at most timeoutMs milliseconds
+ </h3>
+ <p>
+ The following callbacks may be triggered:
+ </p>
+ <pre>Subscribe()'s rebalanceCb
+</pre>
+ <p>
+ Returns nil on timeout, else an Event
+ </p>
+ <h3 id="Consumer.QueryWatermarkOffsets">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=10748:10863#L353">
+ QueryWatermarkOffsets
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) QueryWatermarkOffsets(topic <a href="http://golang.org/pkg/builtin/#string">string</a>, partition <a href="http://golang.org/pkg/builtin/#int32">int32</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (low, high <a href="http://golang.org/pkg/builtin/#int64">int64</a>, err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
+and partition.
+ </p>
+ <h3 id="Consumer.String">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=1272:1306#L36">
+ String
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <p>
+ Strings returns a human readable name for a Consumer instance
+ </p>
+ <h3 id="Consumer.Subscribe">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=1518:1591#L47">
+ Subscribe
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) Subscribe(topic <a href="http://golang.org/pkg/builtin/#string">string</a>, rebalanceCb <a href="#RebalanceCb">RebalanceCb</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+ <p>
+ Subscribe to a single topic
+This replaces the current subscription
+ </p>
+ <h3 id="Consumer.SubscribeTopics">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=1758:1846#L53">
+ SubscribeTopics
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) SubscribeTopics(topics []<a href="http://golang.org/pkg/builtin/#string">string</a>, rebalanceCb <a href="#RebalanceCb">RebalanceCb</a>) (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ SubscribeTopics subscribes to the provided list of topics.
+This replaces the current subscription.
+ </p>
+ <h3 id="Consumer.Unassign">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=3022:3063#L97">
+ Unassign
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) Unassign() (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ Unassign the current set of partitions to consume.
+ </p>
+ <h3 id="Consumer.Unsubscribe">
+ func (*Consumer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=2451:2495#L75">
+ Unsubscribe
+ </a>
+ </h3>
+ <pre>func (c *<a href="#Consumer">Consumer</a>) Unsubscribe() (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ Unsubscribe from the current subscription, if any.
+ </p>
+ <h2 id="Error">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=862:912#L19">
+ Error
+ </a>
+ </h2>
+ <pre>type Error struct {
+ <span class="comment">// contains filtered or unexported fields</span>
+}</pre>
+ <p>
+ Error provides a Kafka-specific error container
+ </p>
+ <h3 id="Error.Code">
+ func (Error)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=1649:1680#L57">
+ Code
+ </a>
+ </h3>
+ <pre>func (e <a href="#Error">Error</a>) Code() <a href="#ErrorCode">ErrorCode</a></pre>
+ <p>
+ Code returns the ErrorCode of an Error
+ </p>
+ <h3 id="Error.Error">
+ func (Error)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=1392:1421#L44">
+ Error
+ </a>
+ </h3>
+ <pre>func (e <a href="#Error">Error</a>) Error() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <p>
+ Error returns a human readable representation of an Error
+Same as Error.String()
+ </p>
+ <h3 id="Error.String">
+ func (Error)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=1508:1538#L49">
+ String
+ </a>
+ </h3>
+ <pre>func (e <a href="#Error">Error</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <p>
+ String returns a human readable representation of an Error
+ </p>
+ <h2 id="ErrorCode">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go?s=328:346#L1">
+ ErrorCode
+ </a>
+ </h2>
+ <pre>type ErrorCode <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
+ <p>
+ ErrorCode is the integer representation of local and broker error codes
+ </p>
+ <pre>const (
+ <span class="comment">// ErrBadMsg Local: Bad message format</span>
+ <span id="ErrBadMsg">ErrBadMsg</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__BAD_MSG">RD_KAFKA_RESP_ERR__BAD_MSG</a>)
+ <span class="comment">// ErrBadCompression Local: Invalid compressed data</span>
+ <span id="ErrBadCompression">ErrBadCompression</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__BAD_COMPRESSION">RD_KAFKA_RESP_ERR__BAD_COMPRESSION</a>)
+ <span class="comment">// ErrDestroy Local: Broker handle destroyed</span>
+ <span id="ErrDestroy">ErrDestroy</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__DESTROY">RD_KAFKA_RESP_ERR__DESTROY</a>)
+ <span class="comment">// ErrFail Local: Communication failure with broker</span>
+ <span id="ErrFail">ErrFail</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__FAIL">RD_KAFKA_RESP_ERR__FAIL</a>)
+ <span class="comment">// ErrTransport Local: Broker transport failure</span>
+ <span id="ErrTransport">ErrTransport</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__TRANSPORT">RD_KAFKA_RESP_ERR__TRANSPORT</a>)
+ <span class="comment">// ErrCritSysResource Local: Critical system resource failure</span>
+ <span id="ErrCritSysResource">ErrCritSysResource</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE">RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE</a>)
+ <span class="comment">// ErrResolve Local: Host resolution failure</span>
+ <span id="ErrResolve">ErrResolve</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__RESOLVE">RD_KAFKA_RESP_ERR__RESOLVE</a>)
+ <span class="comment">// ErrMsgTimedOut Local: Message timed out</span>
+ <span id="ErrMsgTimedOut">ErrMsgTimedOut</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__MSG_TIMED_OUT">RD_KAFKA_RESP_ERR__MSG_TIMED_OUT</a>)
+ <span class="comment">// ErrPartitionEOF Broker: No more messages</span>
+ <span id="ErrPartitionEOF">ErrPartitionEOF</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__PARTITION_EOF">RD_KAFKA_RESP_ERR__PARTITION_EOF</a>)
+ <span class="comment">// ErrUnknownPartition Local: Unknown partition</span>
+ <span id="ErrUnknownPartition">ErrUnknownPartition</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION">RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION</a>)
+ <span class="comment">// ErrFs Local: File or filesystem error</span>
+ <span id="ErrFs">ErrFs</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__FS">RD_KAFKA_RESP_ERR__FS</a>)
+ <span class="comment">// ErrUnknownTopic Local: Unknown topic</span>
+ <span id="ErrUnknownTopic">ErrUnknownTopic</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC">RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC</a>)
+ <span class="comment">// ErrAllBrokersDown Local: All broker connections are down</span>
+ <span id="ErrAllBrokersDown">ErrAllBrokersDown</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN">RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN</a>)
+ <span class="comment">// ErrInvalidArg Local: Invalid argument or configuration</span>
+ <span id="ErrInvalidArg">ErrInvalidArg</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__INVALID_ARG">RD_KAFKA_RESP_ERR__INVALID_ARG</a>)
+ <span class="comment">// ErrTimedOut Local: Timed out</span>
+ <span id="ErrTimedOut">ErrTimedOut</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__TIMED_OUT">RD_KAFKA_RESP_ERR__TIMED_OUT</a>)
+ <span class="comment">// ErrQueueFull Local: Queue full</span>
+ <span id="ErrQueueFull">ErrQueueFull</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__QUEUE_FULL">RD_KAFKA_RESP_ERR__QUEUE_FULL</a>)
+ <span class="comment">// ErrIsrInsuff Local: ISR count insufficient</span>
+ <span id="ErrIsrInsuff">ErrIsrInsuff</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__ISR_INSUFF">RD_KAFKA_RESP_ERR__ISR_INSUFF</a>)
+ <span class="comment">// ErrNodeUpdate Local: Broker node update</span>
+ <span id="ErrNodeUpdate">ErrNodeUpdate</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__NODE_UPDATE">RD_KAFKA_RESP_ERR__NODE_UPDATE</a>)
+ <span class="comment">// ErrSsl Local: SSL error</span>
+ <span id="ErrSsl">ErrSsl</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__SSL">RD_KAFKA_RESP_ERR__SSL</a>)
+ <span class="comment">// ErrWaitCoord Local: Waiting for coordinator</span>
+ <span id="ErrWaitCoord">ErrWaitCoord</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__WAIT_COORD">RD_KAFKA_RESP_ERR__WAIT_COORD</a>)
+ <span class="comment">// ErrUnknownGroup Local: Unknown group</span>
+ <span id="ErrUnknownGroup">ErrUnknownGroup</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_GROUP">RD_KAFKA_RESP_ERR__UNKNOWN_GROUP</a>)
+ <span class="comment">// ErrInProgress Local: Operation in progress</span>
+ <span id="ErrInProgress">ErrInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__IN_PROGRESS">RD_KAFKA_RESP_ERR__IN_PROGRESS</a>)
+ <span class="comment">// ErrPrevInProgress Local: Previous operation in progress</span>
+ <span id="ErrPrevInProgress">ErrPrevInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS">RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS</a>)
+ <span class="comment">// ErrExistingSubscription Local: Existing subscription</span>
+ <span id="ErrExistingSubscription">ErrExistingSubscription</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION">RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION</a>)
+ <span class="comment">// ErrAssignPartitions Local: Assign partitions</span>
+ <span id="ErrAssignPartitions">ErrAssignPartitions</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS">RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS</a>)
+ <span class="comment">// ErrRevokePartitions Local: Revoke partitions</span>
+ <span id="ErrRevokePartitions">ErrRevokePartitions</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS">RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS</a>)
+ <span class="comment">// ErrConflict Local: Conflicting use</span>
+ <span id="ErrConflict">ErrConflict</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__CONFLICT">RD_KAFKA_RESP_ERR__CONFLICT</a>)
+ <span class="comment">// ErrState Local: Erroneous state</span>
+ <span id="ErrState">ErrState</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__STATE">RD_KAFKA_RESP_ERR__STATE</a>)
+ <span class="comment">// ErrUnknownProtocol Local: Unknown protocol</span>
+ <span id="ErrUnknownProtocol">ErrUnknownProtocol</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL">RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL</a>)
+ <span class="comment">// ErrNotImplemented Local: Not implemented</span>
+ <span id="ErrNotImplemented">ErrNotImplemented</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED">RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED</a>)
+ <span class="comment">// ErrAuthentication Local: Authentication failure</span>
+ <span id="ErrAuthentication">ErrAuthentication</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__AUTHENTICATION">RD_KAFKA_RESP_ERR__AUTHENTICATION</a>)
+ <span class="comment">// ErrNoOffset Local: No offset stored</span>
+ <span id="ErrNoOffset">ErrNoOffset</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__NO_OFFSET">RD_KAFKA_RESP_ERR__NO_OFFSET</a>)
+ <span class="comment">// ErrOutdated Local: Outdated</span>
+ <span id="ErrOutdated">ErrOutdated</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__OUTDATED">RD_KAFKA_RESP_ERR__OUTDATED</a>)
+ <span class="comment">// ErrTimedOutQueue Local: Timed out in queue</span>
+ <span id="ErrTimedOutQueue">ErrTimedOutQueue</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE">RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE</a>)
+ <span class="comment">// ErrUnknown Unknown broker error</span>
+ <span id="ErrUnknown">ErrUnknown</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNKNOWN">RD_KAFKA_RESP_ERR_UNKNOWN</a>)
+ <span class="comment">// ErrNoError Success</span>
+ <span id="ErrNoError">ErrNoError</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NO_ERROR">RD_KAFKA_RESP_ERR_NO_ERROR</a>)
+ <span class="comment">// ErrOffsetOutOfRange Broker: Offset out of range</span>
+ <span id="ErrOffsetOutOfRange">ErrOffsetOutOfRange</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE">RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE</a>)
+ <span class="comment">// ErrInvalidMsg Broker: Invalid message</span>
+ <span id="ErrInvalidMsg">ErrInvalidMsg</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_MSG">RD_KAFKA_RESP_ERR_INVALID_MSG</a>)
+ <span class="comment">// ErrUnknownTopicOrPart Broker: Unknown topic or partition</span>
+ <span id="ErrUnknownTopicOrPart">ErrUnknownTopicOrPart</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART">RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART</a>)
+ <span class="comment">// ErrInvalidMsgSize Broker: Invalid message size</span>
+ <span id="ErrInvalidMsgSize">ErrInvalidMsgSize</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE">RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE</a>)
+ <span class="comment">// ErrLeaderNotAvailable Broker: Leader not available</span>
+ <span id="ErrLeaderNotAvailable">ErrLeaderNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE</a>)
+ <span class="comment">// ErrNotLeaderForPartition Broker: Not leader for partition</span>
+ <span id="ErrNotLeaderForPartition">ErrNotLeaderForPartition</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION">RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION</a>)
+ <span class="comment">// ErrRequestTimedOut Broker: Request timed out</span>
+ <span id="ErrRequestTimedOut">ErrRequestTimedOut</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT">RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT</a>)
+ <span class="comment">// ErrBrokerNotAvailable Broker: Broker not available</span>
+ <span id="ErrBrokerNotAvailable">ErrBrokerNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE</a>)
+ <span class="comment">// ErrReplicaNotAvailable Broker: Replica not available</span>
+ <span id="ErrReplicaNotAvailable">ErrReplicaNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE</a>)
+ <span class="comment">// ErrMsgSizeTooLarge Broker: Message size too large</span>
+ <span id="ErrMsgSizeTooLarge">ErrMsgSizeTooLarge</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE">RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE</a>)
+ <span class="comment">// ErrStaleCtrlEpoch Broker: StaleControllerEpochCode</span>
+ <span id="ErrStaleCtrlEpoch">ErrStaleCtrlEpoch</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH">RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH</a>)
+ <span class="comment">// ErrOffsetMetadataTooLarge Broker: Offset metadata string too large</span>
+ <span id="ErrOffsetMetadataTooLarge">ErrOffsetMetadataTooLarge</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE">RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE</a>)
+ <span class="comment">// ErrNetworkException Broker: Broker disconnected before response received</span>
+ <span id="ErrNetworkException">ErrNetworkException</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION">RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION</a>)
+ <span class="comment">// ErrGroupLoadInProgress Broker: Group coordinator load in progress</span>
+ <span id="ErrGroupLoadInProgress">ErrGroupLoadInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS">RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS</a>)
+ <span class="comment">// ErrGroupCoordinatorNotAvailable Broker: Group coordinator not available</span>
+ <span id="ErrGroupCoordinatorNotAvailable">ErrGroupCoordinatorNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE</a>)
+ <span class="comment">// ErrNotCoordinatorForGroup Broker: Not coordinator for group</span>
+ <span id="ErrNotCoordinatorForGroup">ErrNotCoordinatorForGroup</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP">RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP</a>)
+ <span class="comment">// ErrTopicException Broker: Invalid topic</span>
+ <span id="ErrTopicException">ErrTopicException</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION">RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION</a>)
+ <span class="comment">// ErrRecordListTooLarge Broker: Message batch larger than configured server segment size</span>
+ <span id="ErrRecordListTooLarge">ErrRecordListTooLarge</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE">RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE</a>)
+ <span class="comment">// ErrNotEnoughReplicas Broker: Not enough in-sync replicas</span>
+ <span id="ErrNotEnoughReplicas">ErrNotEnoughReplicas</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS">RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS</a>)
+ <span class="comment">// ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas</span>
+ <span id="ErrNotEnoughReplicasAfterAppend">ErrNotEnoughReplicasAfterAppend</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND">RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND</a>)
+ <span class="comment">// ErrInvalidRequiredAcks Broker: Invalid required acks value</span>
+ <span id="ErrInvalidRequiredAcks">ErrInvalidRequiredAcks</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS">RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS</a>)
+ <span class="comment">// ErrIllegalGeneration Broker: Specified group generation id is not valid</span>
+ <span id="ErrIllegalGeneration">ErrIllegalGeneration</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION">RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION</a>)
+ <span class="comment">// ErrInconsistentGroupProtocol Broker: Inconsistent group protocol</span>
+ <span id="ErrInconsistentGroupProtocol">ErrInconsistentGroupProtocol</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL">RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL</a>)
+ <span class="comment">// ErrInvalidGroupID Broker: Invalid group.id</span>
+ <span id="ErrInvalidGroupID">ErrInvalidGroupID</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_GROUP_ID">RD_KAFKA_RESP_ERR_INVALID_GROUP_ID</a>)
+ <span class="comment">// ErrUnknownMemberID Broker: Unknown member</span>
+ <span id="ErrUnknownMemberID">ErrUnknownMemberID</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID">RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID</a>)
+ <span class="comment">// ErrInvalidSessionTimeout Broker: Invalid session timeout</span>
+ <span id="ErrInvalidSessionTimeout">ErrInvalidSessionTimeout</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT">RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT</a>)
+ <span class="comment">// ErrRebalanceInProgress Broker: Group rebalance in progress</span>
+ <span id="ErrRebalanceInProgress">ErrRebalanceInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS">RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS</a>)
+ <span class="comment">// ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid</span>
+ <span id="ErrInvalidCommitOffsetSize">ErrInvalidCommitOffsetSize</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE">RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE</a>)
+ <span class="comment">// ErrTopicAuthorizationFailed Broker: Topic authorization failed</span>
+ <span id="ErrTopicAuthorizationFailed">ErrTopicAuthorizationFailed</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED">RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED</a>)
+ <span class="comment">// ErrGroupAuthorizationFailed Broker: Group authorization failed</span>
+ <span id="ErrGroupAuthorizationFailed">ErrGroupAuthorizationFailed</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED">RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED</a>)
+ <span class="comment">// ErrClusterAuthorizationFailed Broker: Cluster authorization failed</span>
+ <span id="ErrClusterAuthorizationFailed">ErrClusterAuthorizationFailed</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED">RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED</a>)
+ <span class="comment">// ErrInvalidTimestamp Broker: Invalid timestamp</span>
+ <span id="ErrInvalidTimestamp">ErrInvalidTimestamp</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP">RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP</a>)
+ <span class="comment">// ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism</span>
+ <span id="ErrUnsupportedSaslMechanism">ErrUnsupportedSaslMechanism</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM">RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM</a>)
+ <span class="comment">// ErrIllegalSaslState Broker: Request not valid in current SASL state</span>
+ <span id="ErrIllegalSaslState">ErrIllegalSaslState</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE">RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE</a>)
+ <span class="comment">// ErrUnsupportedVersion Broker: API version not supported</span>
+ <span id="ErrUnsupportedVersion">ErrUnsupportedVersion</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION">RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION</a>)
+)</pre>
+ <h3 id="ErrorCode.String">
+ func (ErrorCode)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go?s=415:449#L4">
+ String
+ </a>
+ </h3>
+ <pre>func (c <a href="#ErrorCode">ErrorCode</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <p>
+ String returns a human readable representation of an error code
+ </p>
+ <h2 id="Event">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1412:1517#L41">
+ Event
+ </a>
+ </h2>
+ <pre>type Event interface {
+ <span class="comment">// String returns a human-readable representation of the event</span>
+ String() <a href="http://golang.org/pkg/builtin/#string">string</a>
+}</pre>
+ <p>
+ Event generic interface
+ </p>
+ <h2 id="Handle">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/handle.go?s=822:868#L23">
+ Handle
+ </a>
+ </h2>
+ <pre>type Handle interface {
+ <span class="comment">// contains filtered or unexported methods</span>
+}</pre>
+ <p>
+ Handle represents a generic client handle containing common parts for
+both Producer and Consumer.
+ </p>
+ <h2 id="Message">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=2465:2649#L76">
+ Message
+ </a>
+ </h2>
+ <pre>type Message struct {
+ TopicPartition <a href="#TopicPartition">TopicPartition</a>
+ Value []<a href="http://golang.org/pkg/builtin/#byte">byte</a>
+ Key []<a href="http://golang.org/pkg/builtin/#byte">byte</a>
+ Timestamp <a href="http://golang.org/pkg/time/">time</a>.<a href="http://golang.org/pkg/time/#Time">Time</a>
+ TimestampType <a href="#TimestampType">TimestampType</a>
+ Opaque interface{}
+}</pre>
+ <p>
+ Message represents a Kafka message
+ </p>
+ <h3 id="Message.String">
+ func (*Message)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=2755:2788#L87">
+ String
+ </a>
+ </h3>
+ <pre>func (m *<a href="#Message">Message</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <p>
+ String returns a human readable representation of a Message.
+Key and payload are not represented.
+ </p>
+ <h2 id="Metadata">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1723:1842#L60">
+ Metadata
+ </a>
+ </h2>
+ <pre>type Metadata struct {
+ Brokers []<a href="#BrokerMetadata">BrokerMetadata</a>
+ Topics map[<a href="http://golang.org/pkg/builtin/#string">string</a>]<a href="#TopicMetadata">TopicMetadata</a>
+
+ OriginatingBroker <a href="#BrokerMetadata">BrokerMetadata</a>
+}</pre>
+ <p>
+ Metadata contains broker and topic metadata for all (matching) topics
+ </p>
+ <h2 id="Offset">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=6428:6445#L149">
+ Offset
+ </a>
+ </h2>
+ <pre>type Offset <a href="http://golang.org/pkg/builtin/#int64">int64</a></pre>
+ <p>
+ Offset type (int64) with support for canonical names
+ </p>
+ <h3 id="NewOffset">
+ func
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=7384:7434#L192">
+ NewOffset
+ </a>
+ </h3>
+ <pre>func NewOffset(offset interface{}) (<a href="#Offset">Offset</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ NewOffset creates a new Offset using the provided logical string, or an
+absolute int64 offset value.
+Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
+ </p>
+ <h3 id="OffsetTail">
+ func
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=8170:8215#L231">
+ OffsetTail
+ </a>
+ </h3>
+ <pre>func OffsetTail(relativeOffset <a href="#Offset">Offset</a>) <a href="#Offset">Offset</a></pre>
+ <p>
+ OffsetTail returns the logical offset relativeOffset from current end of partition
+ </p>
+ <h3 id="Offset.Set">
+ func (Offset)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=7064:7109#L179">
+ Set
+ </a>
+ </h3>
+ <pre>func (o <a href="#Offset">Offset</a>) Set(offset interface{}) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+ <p>
+ Set offset value, see NewOffset()
+ </p>
+ <h3 id="Offset.String">
+ func (Offset)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=6776:6807#L163">
+ String
+ </a>
+ </h3>
+ <pre>func (o <a href="#Offset">Offset</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <h2 id="OffsetsCommitted">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2266:2339#L74">
+ OffsetsCommitted
+ </a>
+ </h2>
+ <pre>type OffsetsCommitted struct {
+ Error <a href="http://golang.org/pkg/builtin/#error">error</a>
+ Offsets []<a href="#TopicPartition">TopicPartition</a>
+}</pre>
+ <p>
+ OffsetsCommitted reports committed offsets
+ </p>
+ <h3 id="OffsetsCommitted.String">
+ func (OffsetsCommitted)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2341:2382#L79">
+ String
+ </a>
+ </h3>
+ <pre>func (o <a href="#OffsetsCommitted">OffsetsCommitted</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <h2 id="PartitionEOF">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2091:2123#L67">
+ PartitionEOF
+ </a>
+ </h2>
+ <pre>type PartitionEOF <a href="#TopicPartition">TopicPartition</a></pre>
+ <p>
+ PartitionEOF consumer reached end of partition
+ </p>
+ <h3 id="PartitionEOF.String">
+ func (PartitionEOF)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2125:2162#L69">
+ String
+ </a>
+ </h3>
+ <pre>func (p <a href="#PartitionEOF">PartitionEOF</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <h2 id="PartitionMetadata">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1386:1503#L44">
+ PartitionMetadata
+ </a>
+ </h2>
+ <pre>type PartitionMetadata struct {
+ ID <a href="http://golang.org/pkg/builtin/#int32">int32</a>
+ Error <a href="#Error">Error</a>
+ Leader <a href="http://golang.org/pkg/builtin/#int32">int32</a>
+ Replicas []<a href="http://golang.org/pkg/builtin/#int32">int32</a>
+ Isrs []<a href="http://golang.org/pkg/builtin/#int32">int32</a>
+}</pre>
+ <p>
+ PartitionMetadata contains per-partition metadata
+ </p>
+ <h2 id="Producer">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=1101:1270#L30">
+ Producer
+ </a>
+ </h2>
+ <pre>type Producer struct {
+ <span class="comment">// contains filtered or unexported fields</span>
+}</pre>
+ <p>
+ Producer implements a High-level Apache Kafka Producer instance
+ </p>
+ <h3 id="NewProducer">
+ func
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=6249:6301#L203">
+ NewProducer
+ </a>
+ </h3>
+ <pre>func NewProducer(conf *<a href="#ConfigMap">ConfigMap</a>) (*<a href="#Producer">Producer</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ NewProducer creates a new high-level Producer instance.
+ </p>
+ <p>
+ conf is a *ConfigMap with standard librdkafka configuration properties, see here:
+ </p>
+ <p>
+ Supported special configuration properties:
+ </p>
+ <pre>go.batch.producer (bool, false) - Enable batch producer (experimental for increased performance).
+ These batches do not relate to Kafka message batches in any way.
+go.delivery.reports (bool, true) - Forward per-message delivery reports to the
+ Events() channel.
+go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
+</pre>
+ <h3 id="Producer.Close">
+ func (*Producer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=5283:5309#L174">
+ Close
+ </a>
+ </h3>
+ <pre>func (p *<a href="#Producer">Producer</a>) Close()</pre>
+ <p>
+ Close a Producer instance.
+The Producer object or its channels are no longer usable after this call.
+ </p>
+ <h3 id="Producer.Events">
+ func (*Producer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4035:4073#L134">
+ Events
+ </a>
+ </h3>
+ <pre>func (p *<a href="#Producer">Producer</a>) Events() chan <a href="#Event">Event</a></pre>
+ <p>
+ Events returns the Events channel (read)
+ </p>
+ <h3 id="Producer.Flush">
+ func (*Producer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4779:4822#L154">
+ Flush
+ </a>
+ </h3>
+ <pre>func (p *<a href="#Producer">Producer</a>) Flush(timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
+ <p>
+ Flush and wait for outstanding messages and requests to complete delivery.
+Includes messages on ProduceChannel.
+Runs until value reaches zero or on timeoutMs.
+Returns the number of outstanding events still un-flushed.
+ </p>
+ <h3 id="Producer.GetMetadata">
+ func (*Producer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=9852:9947#L354">
+ GetMetadata
+ </a>
+ </h3>
+ <pre>func (p *<a href="#Producer">Producer</a>) GetMetadata(topic *<a href="http://golang.org/pkg/builtin/#string">string</a>, allTopics <a href="http://golang.org/pkg/builtin/#bool">bool</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (*<a href="#Metadata">Metadata</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ GetMetadata queries broker for cluster and topic metadata.
+If topic is non-nil only information about that topic is returned, else if
+allTopics is false only information about locally used topics is returned,
+else information about all topics is returned.
+ </p>
+ <h3 id="Producer.Len">
+ func (*Producer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4429:4457#L146">
+ Len
+ </a>
+ </h3>
+ <pre>func (p *<a href="#Producer">Producer</a>) Len() <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
+ <p>
+ Len returns the number of messages and requests waiting to be transmitted to the broker
+as well as delivery reports queued for the application.
+Includes messages on ProduceChannel.
+ </p>
+ <h3 id="Producer.Produce">
+ func (*Producer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=3205:3276#L109">
+ Produce
+ </a>
+ </h3>
+ <pre>func (p *<a href="#Producer">Producer</a>) Produce(msg *<a href="#Message">Message</a>, deliveryChan chan <a href="#Event">Event</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+ <p>
+ Produce single message.
+This is an asynchronous call that enqueues the message on the internal
+transmit queue, thus returning immediately.
+The delivery report will be sent on the provided deliveryChan if specified,
+or on the Producer object's Events() channel if not.
+Returns an error if message could not be enqueued.
+ </p>
+ <h3 id="Producer.ProduceChannel">
+ func (*Producer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4159:4208#L139">
+ ProduceChannel
+ </a>
+ </h3>
+ <pre>func (p *<a href="#Producer">Producer</a>) ProduceChannel() chan *<a href="#Message">Message</a></pre>
+ <p>
+ ProduceChannel returns the produce *Message channel (write)
+ </p>
+ <h3 id="Producer.QueryWatermarkOffsets">
+ func (*Producer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=10110:10225#L360">
+ QueryWatermarkOffsets
+ </a>
+ </h3>
+ <pre>func (p *<a href="#Producer">Producer</a>) QueryWatermarkOffsets(topic <a href="http://golang.org/pkg/builtin/#string">string</a>, partition <a href="http://golang.org/pkg/builtin/#int32">int32</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (low, high <a href="http://golang.org/pkg/builtin/#int64">int64</a>, err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+ <p>
+ QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
+and partition.
+ </p>
+ <h3 id="Producer.String">
+ func (*Producer)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=1336:1370#L40">
+ String
+ </a>
+ </h3>
+ <pre>func (p *<a href="#Producer">Producer</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <p>
+ String returns a human readable name for a Producer instance
+ </p>
+ <h2 id="RebalanceCb">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=854:899#L22">
+ RebalanceCb
+ </a>
+ </h2>
+ <pre>type RebalanceCb func(*<a href="#Consumer">Consumer</a>, <a href="#Event">Event</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+ <p>
+ RebalanceCb provides a per-Subscribe*() rebalance event callback.
+The passed Event will be either AssignedPartitions or RevokedPartitions
+ </p>
+ <h2 id="RevokedPartitions">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1870:1932#L58">
+ RevokedPartitions
+ </a>
+ </h2>
+ <pre>type RevokedPartitions struct {
+ Partitions []<a href="#TopicPartition">TopicPartition</a>
+}</pre>
+ <p>
+ RevokedPartitions consumer group rebalance event: revoked partition set
+ </p>
+ <h3 id="RevokedPartitions.String">
+ func (RevokedPartitions)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1934:1976#L62">
+ String
+ </a>
+ </h3>
+ <pre>func (e <a href="#RevokedPartitions">RevokedPartitions</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <h2 id="TimestampType">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=1671:1693#L51">
+ TimestampType
+ </a>
+ </h2>
+ <pre>type TimestampType <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
+ <p>
+ TimestampType is a the Message timestamp type or source
+ </p>
+ <h3 id="TimestampType.String">
+ func (TimestampType)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=2187:2225#L62">
+ String
+ </a>
+ </h3>
+ <pre>func (t <a href="#TimestampType">TimestampType</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <h2 id="TopicMetadata">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1550:1648#L53">
+ TopicMetadata
+ </a>
+ </h2>
+ <pre>type TopicMetadata struct {
+ Topic <a href="http://golang.org/pkg/builtin/#string">string</a>
+ Partitions []<a href="#PartitionMetadata">PartitionMetadata</a>
+ Error <a href="#Error">Error</a>
+}</pre>
+ <p>
+ TopicMetadata contains per-topic metadata
+ </p>
+ <h2 id="TopicPartition">
+ type
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=8377:8478#L236">
+ TopicPartition
+ </a>
+ </h2>
+ <pre>type TopicPartition struct {
+ Topic *<a href="http://golang.org/pkg/builtin/#string">string</a>
+ Partition <a href="http://golang.org/pkg/builtin/#int32">int32</a>
+ Offset <a href="#Offset">Offset</a>
+ Error <a href="http://golang.org/pkg/builtin/#error">error</a>
+}</pre>
+ <p>
+ TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.
+ </p>
+ <h3 id="TopicPartition.String">
+ func (TopicPartition)
+ <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=8480:8519#L243">
+ String
+ </a>
+ </h3>
+ <pre>func (p <a href="#TopicPartition">TopicPartition</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+ <div id="footer">
+ Build version go1.6.
+ <br>
+ Except as
+ <a href="https://developers.google.com/site-policies#restrictions">
+ noted
+ </a>
+ ,
+the content of this page is licensed under the
+Creative Commons Attribution 3.0 License,
+and code is licensed under a
+ <a href="http://golang.org/LICENSE">
+ BSD license
+ </a>
+ .
+ <br>
+ <a href="http://golang.org/doc/tos.html">
+ Terms of Service
+ </a>
+ |
+ <a href="http://www.google.com/intl/en/policies/privacy/">
+ Privacy Policy
+ </a>
+ </br>
+ </br>
+ </div>
+ </div>
+ <!-- .container -->
+ </div>
+ <!-- #page -->
+ <!-- TODO(adonovan): load these from <head> using "defer" attribute? -->
+ <script src="http://golang.org/lib/godoc/jquery.js" type="text/javascript">
+ </script>
+ <script src="http://golang.org/lib/godoc/jquery.treeview.js" type="text/javascript">
+ </script>
+ <script src="http://golang.org/lib/godoc/jquery.treeview.edit.js" type="text/javascript">
+ </script>
+ <script src="http://golang.org/lib/godoc/godocs.js" type="text/javascript">
+ </script>
+ </body>
+</html>
+
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_dynamic.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_dynamic.go
new file mode 100644
index 0000000..c14c1f6
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_dynamic.go
@@ -0,0 +1,7 @@
+// +build !static
+// +build !static_all
+
+package kafka
+
+// #cgo pkg-config: rdkafka
+import "C"
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static.go
new file mode 100644
index 0000000..3c8799c
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static.go
@@ -0,0 +1,7 @@
+// +build static
+// +build !static_all
+
+package kafka
+
+// #cgo pkg-config: rdkafka-static
+import "C"
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static_all.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static_all.go
new file mode 100644
index 0000000..8afb8c9
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static_all.go
@@ -0,0 +1,8 @@
+// +build !static
+// +build static_all
+
+package kafka
+
+// #cgo pkg-config: rdkafka-static
+// #cgo LDFLAGS: -static
+import "C"
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/config.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/config.go
new file mode 100644
index 0000000..5866a47
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/config.go
@@ -0,0 +1,243 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "fmt"
+ "reflect"
+ "strings"
+ "unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+*/
+import "C"
+
+// ConfigValue supports the following types:
+// bool, int, string, any type with the standard String() interface
+type ConfigValue interface{}
+
+// ConfigMap is a map contaning standard librdkafka configuration properties as documented in:
+// https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md
+//
+// The special property "default.topic.config" (optional) is a ConfigMap containing default topic
+// configuration properties.
+type ConfigMap map[string]ConfigValue
+
+// SetKey sets configuration property key to value.
+// For user convenience a key prefixed with {topic}. will be
+// set on the "default.topic.config" sub-map.
+func (m ConfigMap) SetKey(key string, value ConfigValue) error {
+ if strings.HasPrefix(key, "{topic}.") {
+ _, found := m["default.topic.config"]
+ if !found {
+ m["default.topic.config"] = ConfigMap{}
+ }
+ m["default.topic.config"].(ConfigMap)[strings.TrimPrefix(key, "{topic}.")] = value
+ } else {
+ m[key] = value
+ }
+
+ return nil
+}
+
+// Set implements flag.Set (command line argument parser) as a convenience
+// for `-X key=value` config.
+func (m ConfigMap) Set(kv string) error {
+ i := strings.Index(kv, "=")
+ if i == -1 {
+ return Error{ErrInvalidArg, "Expected key=value"}
+ }
+
+ k := kv[:i]
+ v := kv[i+1:]
+
+ return m.SetKey(k, v)
+}
+
+func value2string(v ConfigValue) (ret string, errstr string) {
+
+ switch x := v.(type) {
+ case bool:
+ if x {
+ ret = "true"
+ } else {
+ ret = "false"
+ }
+ case int:
+ ret = fmt.Sprintf("%d", x)
+ case string:
+ ret = x
+ case fmt.Stringer:
+ ret = x.String()
+ default:
+ return "", fmt.Sprintf("Invalid value type %T", v)
+ }
+
+ return ret, ""
+}
+
+// rdkAnyconf abstracts rd_kafka_conf_t and rd_kafka_topic_conf_t
+// into a common interface.
+type rdkAnyconf interface {
+ set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t
+}
+
+func anyconfSet(anyconf rdkAnyconf, key string, val ConfigValue) (err error) {
+ value, errstr := value2string(val)
+ if errstr != "" {
+ return Error{ErrInvalidArg, fmt.Sprintf("%s for key %s (expected string,bool,int,ConfigMap)", errstr, key)}
+ }
+ cKey := C.CString(key)
+ cVal := C.CString(value)
+ cErrstr := (*C.char)(C.malloc(C.size_t(128)))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ if anyconf.set(cKey, cVal, cErrstr, 128) != C.RD_KAFKA_CONF_OK {
+ C.free(unsafe.Pointer(cKey))
+ C.free(unsafe.Pointer(cVal))
+ return newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+ }
+
+ return nil
+}
+
+// we need these typedefs to workaround a crash in golint
+// when parsing the set() methods below
+type rdkConf C.rd_kafka_conf_t
+type rdkTopicConf C.rd_kafka_topic_conf_t
+
+func (cConf *rdkConf) set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t {
+ return C.rd_kafka_conf_set((*C.rd_kafka_conf_t)(cConf), cKey, cVal, cErrstr, C.size_t(errstrSize))
+}
+
+func (ctopicConf *rdkTopicConf) set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t {
+ return C.rd_kafka_topic_conf_set((*C.rd_kafka_topic_conf_t)(ctopicConf), cKey, cVal, cErrstr, C.size_t(errstrSize))
+}
+
+func configConvertAnyconf(m ConfigMap, anyconf rdkAnyconf) (err error) {
+ // set plugins first, any plugin-specific configuration depends on
+ // the plugin to have already been set
+ pluginPaths, ok := m["plugin.library.paths"]
+ if ok {
+ err = anyconfSet(anyconf, "plugin.library.paths", pluginPaths)
+ if err != nil {
+ return err
+ }
+ }
+ for k, v := range m {
+ if k == "plugin.library.paths" {
+ continue
+ }
+ switch v.(type) {
+ case ConfigMap:
+ /* Special sub-ConfigMap, only used for default.topic.config */
+
+ if k != "default.topic.config" {
+ return Error{ErrInvalidArg, fmt.Sprintf("Invalid type for key %s", k)}
+ }
+
+ var cTopicConf = C.rd_kafka_topic_conf_new()
+
+ err = configConvertAnyconf(v.(ConfigMap),
+ (*rdkTopicConf)(cTopicConf))
+ if err != nil {
+ C.rd_kafka_topic_conf_destroy(cTopicConf)
+ return err
+ }
+
+ C.rd_kafka_conf_set_default_topic_conf(
+ (*C.rd_kafka_conf_t)(anyconf.(*rdkConf)),
+ (*C.rd_kafka_topic_conf_t)((*rdkTopicConf)(cTopicConf)))
+
+ default:
+ err = anyconfSet(anyconf, k, v)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+// convert ConfigMap to C rd_kafka_conf_t *
+func (m ConfigMap) convert() (cConf *C.rd_kafka_conf_t, err error) {
+ cConf = C.rd_kafka_conf_new()
+
+ err = configConvertAnyconf(m, (*rdkConf)(cConf))
+ if err != nil {
+ C.rd_kafka_conf_destroy(cConf)
+ return nil, err
+ }
+ return cConf, nil
+}
+
+// get finds key in the configmap and returns its value.
+// If the key is not found defval is returned.
+// If the key is found but the type is mismatched an error is returned.
+func (m ConfigMap) get(key string, defval ConfigValue) (ConfigValue, error) {
+ if strings.HasPrefix(key, "{topic}.") {
+ defconfCv, found := m["default.topic.config"]
+ if !found {
+ return defval, nil
+ }
+ return defconfCv.(ConfigMap).get(strings.TrimPrefix(key, "{topic}."), defval)
+ }
+
+ v, ok := m[key]
+ if !ok {
+ return defval, nil
+ }
+
+ if defval != nil && reflect.TypeOf(defval) != reflect.TypeOf(v) {
+ return nil, Error{ErrInvalidArg, fmt.Sprintf("%s expects type %T, not %T", key, defval, v)}
+ }
+
+ return v, nil
+}
+
+// extract performs a get() and if found deletes the key.
+func (m ConfigMap) extract(key string, defval ConfigValue) (ConfigValue, error) {
+
+ v, err := m.get(key, defval)
+ if err != nil {
+ return nil, err
+ }
+
+ delete(m, key)
+
+ return v, nil
+}
+
+func (m ConfigMap) clone() ConfigMap {
+ m2 := make(ConfigMap)
+ for k, v := range m {
+ m2[k] = v
+ }
+ return m2
+}
+
+// Get finds the given key in the ConfigMap and returns its value.
+// If the key is not found `defval` is returned.
+// If the key is found but the type does not match that of `defval` (unless nil)
+// an ErrInvalidArg error is returned.
+func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error) {
+ return m.get(key, defval)
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go
new file mode 100644
index 0000000..5c42ece
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go
@@ -0,0 +1,581 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "fmt"
+ "math"
+ "time"
+ "unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+
+
+static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) {
+ return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL;
+}
+*/
+import "C"
+
+// RebalanceCb provides a per-Subscribe*() rebalance event callback.
+// The passed Event will be either AssignedPartitions or RevokedPartitions
+type RebalanceCb func(*Consumer, Event) error
+
+// Consumer implements a High-level Apache Kafka Consumer instance
+type Consumer struct {
+ events chan Event
+ handle handle
+ eventsChanEnable bool
+ readerTermChan chan bool
+ rebalanceCb RebalanceCb
+ appReassigned bool
+ appRebalanceEnable bool // config setting
+}
+
+// Strings returns a human readable name for a Consumer instance
+func (c *Consumer) String() string {
+ return c.handle.String()
+}
+
+// getHandle implements the Handle interface
+func (c *Consumer) gethandle() *handle {
+ return &c.handle
+}
+
+// Subscribe to a single topic
+// This replaces the current subscription
+func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error {
+ return c.SubscribeTopics([]string{topic}, rebalanceCb)
+}
+
+// SubscribeTopics subscribes to the provided list of topics.
+// This replaces the current subscription.
+func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) {
+ ctopics := C.rd_kafka_topic_partition_list_new(C.int(len(topics)))
+ defer C.rd_kafka_topic_partition_list_destroy(ctopics)
+
+ for _, topic := range topics {
+ ctopic := C.CString(topic)
+ defer C.free(unsafe.Pointer(ctopic))
+ C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA)
+ }
+
+ e := C.rd_kafka_subscribe(c.handle.rk, ctopics)
+ if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(e)
+ }
+
+ c.rebalanceCb = rebalanceCb
+ c.handle.currAppRebalanceEnable = c.rebalanceCb != nil || c.appRebalanceEnable
+
+ return nil
+}
+
+// Unsubscribe from the current subscription, if any.
+func (c *Consumer) Unsubscribe() (err error) {
+ C.rd_kafka_unsubscribe(c.handle.rk)
+ return nil
+}
+
+// Assign an atomic set of partitions to consume.
+// This replaces the current assignment.
+func (c *Consumer) Assign(partitions []TopicPartition) (err error) {
+ c.appReassigned = true
+
+ cparts := newCPartsFromTopicPartitions(partitions)
+ defer C.rd_kafka_topic_partition_list_destroy(cparts)
+
+ e := C.rd_kafka_assign(c.handle.rk, cparts)
+ if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(e)
+ }
+
+ return nil
+}
+
+// Unassign the current set of partitions to consume.
+func (c *Consumer) Unassign() (err error) {
+ c.appReassigned = true
+
+ e := C.rd_kafka_assign(c.handle.rk, nil)
+ if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(e)
+ }
+
+ return nil
+}
+
+// commit offsets for specified offsets.
+// If offsets is nil the currently assigned partitions' offsets are committed.
+// This is a blocking call, caller will need to wrap in go-routine to
+// get async or throw-away behaviour.
+func (c *Consumer) commit(offsets []TopicPartition) (committedOffsets []TopicPartition, err error) {
+ var rkqu *C.rd_kafka_queue_t
+
+ rkqu = C.rd_kafka_queue_new(c.handle.rk)
+ defer C.rd_kafka_queue_destroy(rkqu)
+
+ var coffsets *C.rd_kafka_topic_partition_list_t
+ if offsets != nil {
+ coffsets = newCPartsFromTopicPartitions(offsets)
+ defer C.rd_kafka_topic_partition_list_destroy(coffsets)
+ }
+
+ cErr := C.rd_kafka_commit_queue(c.handle.rk, coffsets, rkqu, nil, nil)
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newError(cErr)
+ }
+
+ rkev := C.rd_kafka_queue_poll(rkqu, C.int(-1))
+ if rkev == nil {
+ // shouldn't happen
+ return nil, newError(C.RD_KAFKA_RESP_ERR__DESTROY)
+ }
+ defer C.rd_kafka_event_destroy(rkev)
+
+ if C.rd_kafka_event_type(rkev) != C.RD_KAFKA_EVENT_OFFSET_COMMIT {
+ panic(fmt.Sprintf("Expected OFFSET_COMMIT, got %s",
+ C.GoString(C.rd_kafka_event_name(rkev))))
+ }
+
+ cErr = C.rd_kafka_event_error(rkev)
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
+ }
+
+ cRetoffsets := C.rd_kafka_event_topic_partition_list(rkev)
+ if cRetoffsets == nil {
+ // no offsets, no error
+ return nil, nil
+ }
+ committedOffsets = newTopicPartitionsFromCparts(cRetoffsets)
+
+ return committedOffsets, nil
+}
+
+// Commit offsets for currently assigned partitions
+// This is a blocking call.
+// Returns the committed offsets on success.
+func (c *Consumer) Commit() ([]TopicPartition, error) {
+ return c.commit(nil)
+}
+
+// CommitMessage commits offset based on the provided message.
+// This is a blocking call.
+// Returns the committed offsets on success.
+func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) {
+ if m.TopicPartition.Error != nil {
+ return nil, Error{ErrInvalidArg, "Can't commit errored message"}
+ }
+ offsets := []TopicPartition{m.TopicPartition}
+ offsets[0].Offset++
+ return c.commit(offsets)
+}
+
+// CommitOffsets commits the provided list of offsets
+// This is a blocking call.
+// Returns the committed offsets on success.
+func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) {
+ return c.commit(offsets)
+}
+
+// StoreOffsets stores the provided list of offsets that will be committed
+// to the offset store according to `auto.commit.interval.ms` or manual
+// offset-less Commit().
+//
+// Returns the stored offsets on success. If at least one offset couldn't be stored,
+// an error and a list of offsets is returned. Each offset can be checked for
+// specific errors via its `.Error` member.
+func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error) {
+ coffsets := newCPartsFromTopicPartitions(offsets)
+ defer C.rd_kafka_topic_partition_list_destroy(coffsets)
+
+ cErr := C.rd_kafka_offsets_store(c.handle.rk, coffsets)
+
+ // coffsets might be annotated with an error
+ storedOffsets = newTopicPartitionsFromCparts(coffsets)
+
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return storedOffsets, newError(cErr)
+ }
+
+ return storedOffsets, nil
+}
+
+// Seek seeks the given topic partitions using the offset from the TopicPartition.
+//
+// If timeoutMs is not 0 the call will wait this long for the
+// seek to be performed. If the timeout is reached the internal state
+// will be unknown and this function returns ErrTimedOut.
+// If timeoutMs is 0 it will initiate the seek but return
+// immediately without any error reporting (e.g., async).
+//
+// Seek() may only be used for partitions already being consumed
+// (through Assign() or implicitly through a self-rebalanced Subscribe()).
+// To set the starting offset it is preferred to use Assign() and provide
+// a starting offset for each partition.
+//
+// Returns an error on failure or nil otherwise.
+func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error {
+ rkt := c.handle.getRkt(*partition.Topic)
+ cErr := C.rd_kafka_seek(rkt,
+ C.int32_t(partition.Partition),
+ C.int64_t(partition.Offset),
+ C.int(timeoutMs))
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(cErr)
+ }
+ return nil
+}
+
+// Poll the consumer for messages or events.
+//
+// Will block for at most timeoutMs milliseconds
+//
+// The following callbacks may be triggered:
+// Subscribe()'s rebalanceCb
+//
+// Returns nil on timeout, else an Event
+func (c *Consumer) Poll(timeoutMs int) (event Event) {
+ ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil)
+ return ev
+}
+
+// Events returns the Events channel (if enabled)
+func (c *Consumer) Events() chan Event {
+ return c.events
+}
+
+// ReadMessage polls the consumer for a message.
+//
+// This is a conveniance API that wraps Poll() and only returns
+// messages or errors. All other event types are discarded.
+//
+// The call will block for at most `timeout` waiting for
+// a new message or error. `timeout` may be set to -1 for
+// indefinite wait.
+//
+// Timeout is returned as (nil, err) where err is `kafka.(Error).Code == Kafka.ErrTimedOut`.
+//
+// Messages are returned as (msg, nil),
+// while general errors are returned as (nil, err),
+// and partition-specific errors are returned as (msg, err) where
+// msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
+//
+// All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.
+//
+func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) {
+
+ var absTimeout time.Time
+ var timeoutMs int
+
+ if timeout > 0 {
+ absTimeout = time.Now().Add(timeout)
+ timeoutMs = (int)(timeout.Seconds() * 1000.0)
+ } else {
+ timeoutMs = (int)(timeout)
+ }
+
+ for {
+ ev := c.Poll(timeoutMs)
+
+ switch e := ev.(type) {
+ case *Message:
+ if e.TopicPartition.Error != nil {
+ return e, e.TopicPartition.Error
+ }
+ return e, nil
+ case Error:
+ return nil, e
+ default:
+ // Ignore other event types
+ }
+
+ if timeout > 0 {
+ // Calculate remaining time
+ timeoutMs = int(math.Max(0.0, absTimeout.Sub(time.Now()).Seconds()*1000.0))
+ }
+
+ if timeoutMs == 0 && ev == nil {
+ return nil, newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
+ }
+
+ }
+
+}
+
+// Close Consumer instance.
+// The object is no longer usable after this call.
+func (c *Consumer) Close() (err error) {
+
+ if c.eventsChanEnable {
+ // Wait for consumerReader() to terminate (by closing readerTermChan)
+ close(c.readerTermChan)
+ c.handle.waitTerminated(1)
+ close(c.events)
+ }
+
+ C.rd_kafka_queue_destroy(c.handle.rkq)
+ c.handle.rkq = nil
+
+ e := C.rd_kafka_consumer_close(c.handle.rk)
+ if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(e)
+ }
+
+ c.handle.cleanup()
+
+ C.rd_kafka_destroy(c.handle.rk)
+
+ return nil
+}
+
+// NewConsumer creates a new high-level Consumer instance.
+//
+// Supported special configuration properties:
+// go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
+// If set to true the app must handle the AssignedPartitions and
+// RevokedPartitions events and call Assign() and Unassign()
+// respectively.
+// go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
+// go.events.channel.size (int, 1000) - Events() channel size
+//
+// WARNING: Due to the buffering nature of channels (and queues in general) the
+// use of the events channel risks receiving outdated events and
+// messages. Minimizing go.events.channel.size reduces the risk
+// and number of outdated events and messages but does not eliminate
+// the factor completely. With a channel size of 1 at most one
+// event or message may be outdated.
+func NewConsumer(conf *ConfigMap) (*Consumer, error) {
+
+ err := versionCheck()
+ if err != nil {
+ return nil, err
+ }
+
+ // before we do anything with the configuration, create a copy such that
+ // the original is not mutated.
+ confCopy := conf.clone()
+
+ groupid, _ := confCopy.get("group.id", nil)
+ if groupid == nil {
+ // without a group.id the underlying cgrp subsystem in librdkafka wont get started
+ // and without it there is no way to consume assigned partitions.
+ // So for now require the group.id, this might change in the future.
+ return nil, newErrorFromString(ErrInvalidArg, "Required property group.id not set")
+ }
+
+ c := &Consumer{}
+
+ v, err := confCopy.extract("go.application.rebalance.enable", false)
+ if err != nil {
+ return nil, err
+ }
+ c.appRebalanceEnable = v.(bool)
+
+ v, err = confCopy.extract("go.events.channel.enable", false)
+ if err != nil {
+ return nil, err
+ }
+ c.eventsChanEnable = v.(bool)
+
+ v, err = confCopy.extract("go.events.channel.size", 1000)
+ if err != nil {
+ return nil, err
+ }
+ eventsChanSize := v.(int)
+
+ cConf, err := confCopy.convert()
+ if err != nil {
+ return nil, err
+ }
+ cErrstr := (*C.char)(C.malloc(C.size_t(256)))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
+
+ c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)
+ if c.handle.rk == nil {
+ return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+ }
+
+ C.rd_kafka_poll_set_consumer(c.handle.rk)
+
+ c.handle.c = c
+ c.handle.setup()
+ c.handle.rkq = C.rd_kafka_queue_get_consumer(c.handle.rk)
+ if c.handle.rkq == nil {
+ // no cgrp (no group.id configured), revert to main queue.
+ c.handle.rkq = C.rd_kafka_queue_get_main(c.handle.rk)
+ }
+
+ if c.eventsChanEnable {
+ c.events = make(chan Event, eventsChanSize)
+ c.readerTermChan = make(chan bool)
+
+ /* Start rdkafka consumer queue reader -> events writer goroutine */
+ go consumerReader(c, c.readerTermChan)
+ }
+
+ return c, nil
+}
+
+// rebalance calls the application's rebalance callback, if any.
+// Returns true if the underlying assignment was updated, else false.
+func (c *Consumer) rebalance(ev Event) bool {
+ c.appReassigned = false
+
+ if c.rebalanceCb != nil {
+ c.rebalanceCb(c, ev)
+ }
+
+ return c.appReassigned
+}
+
+// consumerReader reads messages and events from the librdkafka consumer queue
+// and posts them on the consumer channel.
+// Runs until termChan closes
+func consumerReader(c *Consumer, termChan chan bool) {
+
+out:
+ for true {
+ select {
+ case _ = <-termChan:
+ break out
+ default:
+ _, term := c.handle.eventPoll(c.events, 100, 1000, termChan)
+ if term {
+ break out
+ }
+
+ }
+ }
+
+ c.handle.terminatedChan <- "consumerReader"
+ return
+
+}
+
+// GetMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
+func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+ return getMetadata(c, topic, allTopics, timeoutMs)
+}
+
+// QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
+// and partition.
+func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
+ return queryWatermarkOffsets(c, topic, partition, timeoutMs)
+}
+
+// OffsetsForTimes looks up offsets by timestamp for the given partitions.
+//
+// The returned offset for each partition is the earliest offset whose
+// timestamp is greater than or equal to the given timestamp in the
+// corresponding partition.
+//
+// The timestamps to query are represented as `.Offset` in the `times`
+// argument and the looked up offsets are represented as `.Offset` in the returned
+// `offsets` list.
+//
+// The function will block for at most timeoutMs milliseconds.
+//
+// Duplicate Topic+Partitions are not supported.
+// Per-partition errors may be returned in the `.Error` field.
+func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+ return offsetsForTimes(c, times, timeoutMs)
+}
+
+// Subscription returns the current subscription as set by Subscribe()
+func (c *Consumer) Subscription() (topics []string, err error) {
+ var cTopics *C.rd_kafka_topic_partition_list_t
+
+ cErr := C.rd_kafka_subscription(c.handle.rk, &cTopics)
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newError(cErr)
+ }
+ defer C.rd_kafka_topic_partition_list_destroy(cTopics)
+
+ topicCnt := int(cTopics.cnt)
+ topics = make([]string, topicCnt)
+ for i := 0; i < topicCnt; i++ {
+ crktpar := C._c_rdkafka_topic_partition_list_entry(cTopics,
+ C.int(i))
+ topics[i] = C.GoString(crktpar.topic)
+ }
+
+ return topics, nil
+}
+
+// Assignment returns the current partition assignments
+func (c *Consumer) Assignment() (partitions []TopicPartition, err error) {
+ var cParts *C.rd_kafka_topic_partition_list_t
+
+ cErr := C.rd_kafka_assignment(c.handle.rk, &cParts)
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newError(cErr)
+ }
+ defer C.rd_kafka_topic_partition_list_destroy(cParts)
+
+ partitions = newTopicPartitionsFromCparts(cParts)
+
+ return partitions, nil
+}
+
+// Committed retrieves committed offsets for the given set of partitions
+func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+ cparts := newCPartsFromTopicPartitions(partitions)
+ defer C.rd_kafka_topic_partition_list_destroy(cparts)
+ cerr := C.rd_kafka_committed(c.handle.rk, cparts, C.int(timeoutMs))
+ if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newError(cerr)
+ }
+
+ return newTopicPartitionsFromCparts(cparts), nil
+}
+
+// Pause consumption for the provided list of partitions
+//
+// Note that messages already enqueued on the consumer's Event channel
+// (if `go.events.channel.enable` has been set) will NOT be purged by
+// this call, set `go.events.channel.size` accordingly.
+func (c *Consumer) Pause(partitions []TopicPartition) (err error) {
+ cparts := newCPartsFromTopicPartitions(partitions)
+ defer C.rd_kafka_topic_partition_list_destroy(cparts)
+ cerr := C.rd_kafka_pause_partitions(c.handle.rk, cparts)
+ if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(cerr)
+ }
+ return nil
+}
+
+// Resume consumption for the provided list of partitions
+func (c *Consumer) Resume(partitions []TopicPartition) (err error) {
+ cparts := newCPartsFromTopicPartitions(partitions)
+ defer C.rd_kafka_topic_partition_list_destroy(cparts)
+ cerr := C.rd_kafka_resume_partitions(c.handle.rk, cparts)
+ if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return newError(cerr)
+ }
+ return nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/error.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/error.go
new file mode 100644
index 0000000..70a435f
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/error.go
@@ -0,0 +1,77 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Automatically generate error codes from librdkafka
+// See README for instructions
+//go:generate $GOPATH/bin/go_rdkafka_generr generated_errors.go
+
+/*
+#include <librdkafka/rdkafka.h>
+*/
+import "C"
+
+// Error provides a Kafka-specific error container
+type Error struct {
+ code ErrorCode
+ str string
+}
+
+func newError(code C.rd_kafka_resp_err_t) (err Error) {
+ return Error{ErrorCode(code), ""}
+}
+
+func newGoError(code ErrorCode) (err Error) {
+ return Error{code, ""}
+}
+
+func newErrorFromString(code ErrorCode, str string) (err Error) {
+ return Error{code, str}
+}
+
+func newErrorFromCString(code C.rd_kafka_resp_err_t, cstr *C.char) (err Error) {
+ var str string
+ if cstr != nil {
+ str = C.GoString(cstr)
+ } else {
+ str = ""
+ }
+ return Error{ErrorCode(code), str}
+}
+
+func newCErrorFromString(code C.rd_kafka_resp_err_t, str string) (err Error) {
+ return newErrorFromString(ErrorCode(code), str)
+}
+
+// Error returns a human readable representation of an Error
+// Same as Error.String()
+func (e Error) Error() string {
+ return e.String()
+}
+
+// String returns a human readable representation of an Error
+func (e Error) String() string {
+ if len(e.str) > 0 {
+ return e.str
+ }
+ return e.code.String()
+}
+
+// Code returns the ErrorCode of an Error
+func (e Error) Code() ErrorCode {
+ return e.code
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/event.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/event.go
new file mode 100644
index 0000000..8a89edb
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/event.go
@@ -0,0 +1,330 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "fmt"
+ "os"
+ "unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+#include "glue_rdkafka.h"
+
+
+#ifdef RD_KAFKA_V_HEADERS
+void chdrs_to_tmphdrs (rd_kafka_headers_t *chdrs, tmphdr_t *tmphdrs) {
+ size_t i = 0;
+ const char *name;
+ const void *val;
+ size_t size;
+
+ while (!rd_kafka_header_get_all(chdrs, i,
+ &tmphdrs[i].key,
+ &tmphdrs[i].val,
+ (size_t *)&tmphdrs[i].size))
+ i++;
+}
+#endif
+
+rd_kafka_event_t *_rk_queue_poll (rd_kafka_queue_t *rkq, int timeoutMs,
+ rd_kafka_event_type_t *evtype,
+ fetched_c_msg_t *fcMsg,
+ rd_kafka_event_t *prev_rkev) {
+ rd_kafka_event_t *rkev;
+
+ if (prev_rkev)
+ rd_kafka_event_destroy(prev_rkev);
+
+ rkev = rd_kafka_queue_poll(rkq, timeoutMs);
+ *evtype = rd_kafka_event_type(rkev);
+
+ if (*evtype == RD_KAFKA_EVENT_FETCH) {
+#ifdef RD_KAFKA_V_HEADERS
+ rd_kafka_headers_t *hdrs;
+#endif
+
+ fcMsg->msg = (rd_kafka_message_t *)rd_kafka_event_message_next(rkev);
+ fcMsg->ts = rd_kafka_message_timestamp(fcMsg->msg, &fcMsg->tstype);
+
+#ifdef RD_KAFKA_V_HEADERS
+ if (!rd_kafka_message_headers(fcMsg->msg, &hdrs)) {
+ fcMsg->tmphdrsCnt = rd_kafka_header_cnt(hdrs);
+ fcMsg->tmphdrs = malloc(sizeof(*fcMsg->tmphdrs) * fcMsg->tmphdrsCnt);
+ chdrs_to_tmphdrs(hdrs, fcMsg->tmphdrs);
+ } else {
+#else
+ if (1) {
+#endif
+ fcMsg->tmphdrs = NULL;
+ fcMsg->tmphdrsCnt = 0;
+ }
+ }
+ return rkev;
+}
+*/
+import "C"
+
+// Event generic interface
+type Event interface {
+ // String returns a human-readable representation of the event
+ String() string
+}
+
+// Specific event types
+
+// Stats statistics event
+type Stats struct {
+ statsJSON string
+}
+
+func (e Stats) String() string {
+ return e.statsJSON
+}
+
+// AssignedPartitions consumer group rebalance event: assigned partition set
+type AssignedPartitions struct {
+ Partitions []TopicPartition
+}
+
+func (e AssignedPartitions) String() string {
+ return fmt.Sprintf("AssignedPartitions: %v", e.Partitions)
+}
+
+// RevokedPartitions consumer group rebalance event: revoked partition set
+type RevokedPartitions struct {
+ Partitions []TopicPartition
+}
+
+func (e RevokedPartitions) String() string {
+ return fmt.Sprintf("RevokedPartitions: %v", e.Partitions)
+}
+
+// PartitionEOF consumer reached end of partition
+type PartitionEOF TopicPartition
+
+func (p PartitionEOF) String() string {
+ return fmt.Sprintf("EOF at %s", TopicPartition(p))
+}
+
+// OffsetsCommitted reports committed offsets
+type OffsetsCommitted struct {
+ Error error
+ Offsets []TopicPartition
+}
+
+func (o OffsetsCommitted) String() string {
+ return fmt.Sprintf("OffsetsCommitted (%v, %v)", o.Error, o.Offsets)
+}
+
+// eventPoll polls an event from the handler's C rd_kafka_queue_t,
+// translates it into an Event type and then sends on `channel` if non-nil, else returns the Event.
+// term_chan is an optional channel to monitor along with producing to channel
+// to indicate that `channel` is being terminated.
+// returns (event Event, terminate Bool) tuple, where Terminate indicates
+// if termChan received a termination event.
+func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) {
+
+ var prevRkev *C.rd_kafka_event_t
+ term := false
+
+ var retval Event
+
+ if channel == nil {
+ maxEvents = 1
+ }
+out:
+ for evcnt := 0; evcnt < maxEvents; evcnt++ {
+ var evtype C.rd_kafka_event_type_t
+ var fcMsg C.fetched_c_msg_t
+ rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)
+ prevRkev = rkev
+ timeoutMs = 0
+
+ retval = nil
+
+ switch evtype {
+ case C.RD_KAFKA_EVENT_FETCH:
+ // Consumer fetch event, new message.
+ // Extracted into temporary fcMsg for optimization
+ retval = h.newMessageFromFcMsg(&fcMsg)
+
+ case C.RD_KAFKA_EVENT_REBALANCE:
+ // Consumer rebalance event
+ // If the app provided a RebalanceCb to Subscribe*() or
+ // has go.application.rebalance.enable=true we create an event
+ // and forward it to the application thru the RebalanceCb or the
+ // Events channel respectively.
+ // Since librdkafka requires the rebalance event to be "acked" by
+ // the application to synchronize state we keep track of if the
+ // application performed Assign() or Unassign(), but this only works for
+ // the non-channel case. For the channel case we assume the application
+ // calls Assign() / Unassign().
+ // Failure to do so will "hang" the consumer, e.g., it wont start consuming
+ // and it wont close cleanly, so this error case should be visible
+ // immediately to the application developer.
+ appReassigned := false
+ if C.rd_kafka_event_error(rkev) == C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS {
+ if h.currAppRebalanceEnable {
+ // Application must perform Assign() call
+ var ev AssignedPartitions
+ ev.Partitions = newTopicPartitionsFromCparts(C.rd_kafka_event_topic_partition_list(rkev))
+ if channel != nil || h.c.rebalanceCb == nil {
+ retval = ev
+ appReassigned = true
+ } else {
+ appReassigned = h.c.rebalance(ev)
+ }
+ }
+
+ if !appReassigned {
+ C.rd_kafka_assign(h.rk, C.rd_kafka_event_topic_partition_list(rkev))
+ }
+ } else {
+ if h.currAppRebalanceEnable {
+ // Application must perform Unassign() call
+ var ev RevokedPartitions
+ ev.Partitions = newTopicPartitionsFromCparts(C.rd_kafka_event_topic_partition_list(rkev))
+ if channel != nil || h.c.rebalanceCb == nil {
+ retval = ev
+ appReassigned = true
+ } else {
+ appReassigned = h.c.rebalance(ev)
+ }
+ }
+
+ if !appReassigned {
+ C.rd_kafka_assign(h.rk, nil)
+ }
+ }
+
+ case C.RD_KAFKA_EVENT_ERROR:
+ // Error event
+ cErr := C.rd_kafka_event_error(rkev)
+ switch cErr {
+ case C.RD_KAFKA_RESP_ERR__PARTITION_EOF:
+ crktpar := C.rd_kafka_event_topic_partition(rkev)
+ if crktpar == nil {
+ break
+ }
+
+ defer C.rd_kafka_topic_partition_destroy(crktpar)
+ var peof PartitionEOF
+ setupTopicPartitionFromCrktpar((*TopicPartition)(&peof), crktpar)
+
+ retval = peof
+ default:
+ retval = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
+ }
+
+ case C.RD_KAFKA_EVENT_STATS:
+ retval = &Stats{C.GoString(C.rd_kafka_event_stats(rkev))}
+
+ case C.RD_KAFKA_EVENT_DR:
+ // Producer Delivery Report event
+ // Each such event contains delivery reports for all
+ // messages in the produced batch.
+ // Forward delivery reports to per-message's response channel
+ // or to the global Producer.Events channel, or none.
+ rkmessages := make([]*C.rd_kafka_message_t, int(C.rd_kafka_event_message_count(rkev)))
+
+ cnt := int(C.rd_kafka_event_message_array(rkev, (**C.rd_kafka_message_t)(unsafe.Pointer(&rkmessages[0])), C.size_t(len(rkmessages))))
+
+ for _, rkmessage := range rkmessages[:cnt] {
+ msg := h.newMessageFromC(rkmessage)
+ var ch *chan Event
+
+ if rkmessage._private != nil {
+ // Find cgoif by id
+ cg, found := h.cgoGet((int)((uintptr)(rkmessage._private)))
+ if found {
+ cdr := cg.(cgoDr)
+
+ if cdr.deliveryChan != nil {
+ ch = &cdr.deliveryChan
+ }
+ msg.Opaque = cdr.opaque
+ }
+ }
+
+ if ch == nil && h.fwdDr {
+ ch = &channel
+ }
+
+ if ch != nil {
+ select {
+ case *ch <- msg:
+ case <-termChan:
+ break out
+ }
+
+ } else {
+ retval = msg
+ break out
+ }
+ }
+
+ case C.RD_KAFKA_EVENT_OFFSET_COMMIT:
+ // Offsets committed
+ cErr := C.rd_kafka_event_error(rkev)
+ coffsets := C.rd_kafka_event_topic_partition_list(rkev)
+ var offsets []TopicPartition
+ if coffsets != nil {
+ offsets = newTopicPartitionsFromCparts(coffsets)
+ }
+
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ retval = OffsetsCommitted{newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev)), offsets}
+ } else {
+ retval = OffsetsCommitted{nil, offsets}
+ }
+
+ case C.RD_KAFKA_EVENT_NONE:
+ // poll timed out: no events available
+ break out
+
+ default:
+ if rkev != nil {
+ fmt.Fprintf(os.Stderr, "Ignored event %s\n",
+ C.GoString(C.rd_kafka_event_name(rkev)))
+ }
+
+ }
+
+ if retval != nil {
+ if channel != nil {
+ select {
+ case channel <- retval:
+ case <-termChan:
+ retval = nil
+ term = true
+ break out
+ }
+ } else {
+ break out
+ }
+ }
+ }
+
+ if prevRkev != nil {
+ C.rd_kafka_event_destroy(prevRkev)
+ }
+
+ return retval, term
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go
new file mode 100644
index 0000000..b9f68d8
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go
@@ -0,0 +1,225 @@
+package kafka
+// Copyright 2016 Confluent Inc.
+// AUTOMATICALLY GENERATED BY /home/maglun/gocode/bin/go_rdkafka_generr ON 2018-10-11 09:26:58.938371378 +0200 CEST m=+0.001256618 USING librdkafka 0.11.5
+
+/*
+#include <librdkafka/rdkafka.h>
+*/
+import "C"
+
+// ErrorCode is the integer representation of local and broker error codes
+type ErrorCode int
+
+// String returns a human readable representation of an error code
+func (c ErrorCode) String() string {
+ return C.GoString(C.rd_kafka_err2str(C.rd_kafka_resp_err_t(c)))
+}
+
+const (
+ // ErrBadMsg Local: Bad message format
+ ErrBadMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_MSG)
+ // ErrBadCompression Local: Invalid compressed data
+ ErrBadCompression ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_COMPRESSION)
+ // ErrDestroy Local: Broker handle destroyed
+ ErrDestroy ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__DESTROY)
+ // ErrFail Local: Communication failure with broker
+ ErrFail ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FAIL)
+ // ErrTransport Local: Broker transport failure
+ ErrTransport ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TRANSPORT)
+ // ErrCritSysResource Local: Critical system resource failure
+ ErrCritSysResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE)
+ // ErrResolve Local: Host resolution failure
+ ErrResolve ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RESOLVE)
+ // ErrMsgTimedOut Local: Message timed out
+ ErrMsgTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MSG_TIMED_OUT)
+ // ErrPartitionEOF Broker: No more messages
+ ErrPartitionEOF ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTITION_EOF)
+ // ErrUnknownPartition Local: Unknown partition
+ ErrUnknownPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
+ // ErrFs Local: File or filesystem error
+ ErrFs ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FS)
+ // ErrUnknownTopic Local: Unknown topic
+ ErrUnknownTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
+ // ErrAllBrokersDown Local: All broker connections are down
+ ErrAllBrokersDown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)
+ // ErrInvalidArg Local: Invalid argument or configuration
+ ErrInvalidArg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_ARG)
+ // ErrTimedOut Local: Timed out
+ ErrTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
+ // ErrQueueFull Local: Queue full
+ ErrQueueFull ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__QUEUE_FULL)
+ // ErrIsrInsuff Local: ISR count insufficient
+ ErrIsrInsuff ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ISR_INSUFF)
+ // ErrNodeUpdate Local: Broker node update
+ ErrNodeUpdate ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NODE_UPDATE)
+ // ErrSsl Local: SSL error
+ ErrSsl ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__SSL)
+ // ErrWaitCoord Local: Waiting for coordinator
+ ErrWaitCoord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_COORD)
+ // ErrUnknownGroup Local: Unknown group
+ ErrUnknownGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_GROUP)
+ // ErrInProgress Local: Operation in progress
+ ErrInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__IN_PROGRESS)
+ // ErrPrevInProgress Local: Previous operation in progress
+ ErrPrevInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS)
+ // ErrExistingSubscription Local: Existing subscription
+ ErrExistingSubscription ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION)
+ // ErrAssignPartitions Local: Assign partitions
+ ErrAssignPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
+ // ErrRevokePartitions Local: Revoke partitions
+ ErrRevokePartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS)
+ // ErrConflict Local: Conflicting use
+ ErrConflict ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CONFLICT)
+ // ErrState Local: Erroneous state
+ ErrState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__STATE)
+ // ErrUnknownProtocol Local: Unknown protocol
+ ErrUnknownProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL)
+ // ErrNotImplemented Local: Not implemented
+ ErrNotImplemented ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED)
+ // ErrAuthentication Local: Authentication failure
+ ErrAuthentication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTHENTICATION)
+ // ErrNoOffset Local: No offset stored
+ ErrNoOffset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NO_OFFSET)
+ // ErrOutdated Local: Outdated
+ ErrOutdated ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__OUTDATED)
+ // ErrTimedOutQueue Local: Timed out in queue
+ ErrTimedOutQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE)
+ // ErrUnsupportedFeature Local: Required feature not supported by broker
+ ErrUnsupportedFeature ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE)
+ // ErrWaitCache Local: Awaiting cache update
+ ErrWaitCache ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_CACHE)
+ // ErrIntr Local: Operation interrupted
+ ErrIntr ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INTR)
+ // ErrKeySerialization Local: Key serialization error
+ ErrKeySerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_SERIALIZATION)
+ // ErrValueSerialization Local: Value serialization error
+ ErrValueSerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION)
+ // ErrKeyDeserialization Local: Key deserialization error
+ ErrKeyDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION)
+ // ErrValueDeserialization Local: Value deserialization error
+ ErrValueDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION)
+ // ErrPartial Local: Partial response
+ ErrPartial ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTIAL)
+ // ErrReadOnly Local: Read-only object
+ ErrReadOnly ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__READ_ONLY)
+ // ErrNoent Local: No such entry
+ ErrNoent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOENT)
+ // ErrUnderflow Local: Read underflow
+ ErrUnderflow ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNDERFLOW)
+ // ErrInvalidType Local: Invalid type
+ ErrInvalidType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_TYPE)
+ // ErrUnknown Unknown broker error
+ ErrUnknown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN)
+ // ErrNoError Success
+ ErrNoError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_ERROR)
+ // ErrOffsetOutOfRange Broker: Offset out of range
+ ErrOffsetOutOfRange ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE)
+ // ErrInvalidMsg Broker: Invalid message
+ ErrInvalidMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG)
+ // ErrUnknownTopicOrPart Broker: Unknown topic or partition
+ ErrUnknownTopicOrPart ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
+ // ErrInvalidMsgSize Broker: Invalid message size
+ ErrInvalidMsgSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE)
+ // ErrLeaderNotAvailable Broker: Leader not available
+ ErrLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
+ // ErrNotLeaderForPartition Broker: Not leader for partition
+ ErrNotLeaderForPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION)
+ // ErrRequestTimedOut Broker: Request timed out
+ ErrRequestTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT)
+ // ErrBrokerNotAvailable Broker: Broker not available
+ ErrBrokerNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE)
+ // ErrReplicaNotAvailable Broker: Replica not available
+ ErrReplicaNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE)
+ // ErrMsgSizeTooLarge Broker: Message size too large
+ ErrMsgSizeTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE)
+ // ErrStaleCtrlEpoch Broker: StaleControllerEpochCode
+ ErrStaleCtrlEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH)
+ // ErrOffsetMetadataTooLarge Broker: Offset metadata string too large
+ ErrOffsetMetadataTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE)
+ // ErrNetworkException Broker: Broker disconnected before response received
+ ErrNetworkException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION)
+ // ErrGroupLoadInProgress Broker: Group coordinator load in progress
+ ErrGroupLoadInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS)
+ // ErrGroupCoordinatorNotAvailable Broker: Group coordinator not available
+ ErrGroupCoordinatorNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE)
+ // ErrNotCoordinatorForGroup Broker: Not coordinator for group
+ ErrNotCoordinatorForGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP)
+ // ErrTopicException Broker: Invalid topic
+ ErrTopicException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION)
+ // ErrRecordListTooLarge Broker: Message batch larger than configured server segment size
+ ErrRecordListTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE)
+ // ErrNotEnoughReplicas Broker: Not enough in-sync replicas
+ ErrNotEnoughReplicas ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS)
+ // ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas
+ ErrNotEnoughReplicasAfterAppend ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND)
+ // ErrInvalidRequiredAcks Broker: Invalid required acks value
+ ErrInvalidRequiredAcks ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS)
+ // ErrIllegalGeneration Broker: Specified group generation id is not valid
+ ErrIllegalGeneration ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
+ // ErrInconsistentGroupProtocol Broker: Inconsistent group protocol
+ ErrInconsistentGroupProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL)
+ // ErrInvalidGroupID Broker: Invalid group.id
+ ErrInvalidGroupID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_GROUP_ID)
+ // ErrUnknownMemberID Broker: Unknown member
+ ErrUnknownMemberID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
+ // ErrInvalidSessionTimeout Broker: Invalid session timeout
+ ErrInvalidSessionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT)
+ // ErrRebalanceInProgress Broker: Group rebalance in progress
+ ErrRebalanceInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS)
+ // ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid
+ ErrInvalidCommitOffsetSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE)
+ // ErrTopicAuthorizationFailed Broker: Topic authorization failed
+ ErrTopicAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED)
+ // ErrGroupAuthorizationFailed Broker: Group authorization failed
+ ErrGroupAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED)
+ // ErrClusterAuthorizationFailed Broker: Cluster authorization failed
+ ErrClusterAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED)
+ // ErrInvalidTimestamp Broker: Invalid timestamp
+ ErrInvalidTimestamp ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP)
+ // ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism
+ ErrUnsupportedSaslMechanism ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM)
+ // ErrIllegalSaslState Broker: Request not valid in current SASL state
+ ErrIllegalSaslState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE)
+ // ErrUnsupportedVersion Broker: API version not supported
+ ErrUnsupportedVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)
+ // ErrTopicAlreadyExists Broker: Topic already exists
+ ErrTopicAlreadyExists ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS)
+ // ErrInvalidPartitions Broker: Invalid number of partitions
+ ErrInvalidPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PARTITIONS)
+ // ErrInvalidReplicationFactor Broker: Invalid replication factor
+ ErrInvalidReplicationFactor ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR)
+ // ErrInvalidReplicaAssignment Broker: Invalid replica assignment
+ ErrInvalidReplicaAssignment ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT)
+ // ErrInvalidConfig Broker: Configuration is invalid
+ ErrInvalidConfig ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_CONFIG)
+ // ErrNotController Broker: Not controller for cluster
+ ErrNotController ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_CONTROLLER)
+ // ErrInvalidRequest Broker: Invalid request
+ ErrInvalidRequest ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUEST)
+ // ErrUnsupportedForMessageFormat Broker: Message format on broker does not support request
+ ErrUnsupportedForMessageFormat ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT)
+ // ErrPolicyViolation Broker: Isolation policy volation
+ ErrPolicyViolation ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_POLICY_VIOLATION)
+ // ErrOutOfOrderSequenceNumber Broker: Broker received an out of order sequence number
+ ErrOutOfOrderSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER)
+ // ErrDuplicateSequenceNumber Broker: Broker received a duplicate sequence number
+ ErrDuplicateSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER)
+ // ErrInvalidProducerEpoch Broker: Producer attempted an operation with an old epoch
+ ErrInvalidProducerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH)
+ // ErrInvalidTxnState Broker: Producer attempted a transactional operation in an invalid state
+ ErrInvalidTxnState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TXN_STATE)
+ // ErrInvalidProducerIDMapping Broker: Producer attempted to use a producer id which is not currently assigned to its transactional id
+ ErrInvalidProducerIDMapping ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING)
+ // ErrInvalidTransactionTimeout Broker: Transaction timeout is larger than the maximum value allowed by the broker's max.transaction.timeout.ms
+ ErrInvalidTransactionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT)
+ // ErrConcurrentTransactions Broker: Producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
+ ErrConcurrentTransactions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS)
+ // ErrTransactionCoordinatorFenced Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer
+ ErrTransactionCoordinatorFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED)
+ // ErrTransactionalIDAuthorizationFailed Broker: Transactional Id authorization failed
+ ErrTransactionalIDAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
+ // ErrSecurityDisabled Broker: Security features are disabled
+ ErrSecurityDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SECURITY_DISABLED)
+ // ErrOperationNotAttempted Broker: Operation not attempted
+ ErrOperationNotAttempted ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED)
+)
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/glue_rdkafka.h b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/glue_rdkafka.h
new file mode 100644
index 0000000..adcef9a
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/glue_rdkafka.h
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+
+/**
+ * Glue between Go, Cgo and librdkafka
+ */
+
+
+/**
+ * Temporary C to Go header representation
+ */
+typedef struct tmphdr_s {
+ const char *key;
+ const void *val; // producer: malloc()ed by Go code if size > 0
+ // consumer: owned by librdkafka
+ ssize_t size;
+} tmphdr_t;
+
+
+
+/**
+ * Represents a fetched C message, with all extra fields extracted
+ * to struct fields.
+ */
+typedef struct fetched_c_msg {
+ rd_kafka_message_t *msg;
+ rd_kafka_timestamp_type_t tstype;
+ int64_t ts;
+ tmphdr_t *tmphdrs;
+ size_t tmphdrsCnt;
+} fetched_c_msg_t;
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/handle.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/handle.go
new file mode 100644
index 0000000..c09e64d
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/handle.go
@@ -0,0 +1,207 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "fmt"
+ "sync"
+ "unsafe"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+#include <stdlib.h>
+*/
+import "C"
+
+// Handle represents a generic client handle containing common parts for
+// both Producer and Consumer.
+type Handle interface {
+ gethandle() *handle
+}
+
+// Common instance handle for both Producer and Consumer
+type handle struct {
+ rk *C.rd_kafka_t
+ rkq *C.rd_kafka_queue_t
+
+ // Termination of background go-routines
+ terminatedChan chan string // string is go-routine name
+
+ // Topic <-> rkt caches
+ rktCacheLock sync.Mutex
+ // topic name -> rkt cache
+ rktCache map[string]*C.rd_kafka_topic_t
+ // rkt -> topic name cache
+ rktNameCache map[*C.rd_kafka_topic_t]string
+
+ //
+ // cgo map
+ // Maps C callbacks based on cgoid back to its Go object
+ cgoLock sync.Mutex
+ cgoidNext uintptr
+ cgomap map[int]cgoif
+
+ //
+ // producer
+ //
+ p *Producer
+
+ // Forward delivery reports on Producer.Events channel
+ fwdDr bool
+
+ //
+ // consumer
+ //
+ c *Consumer
+
+ // Forward rebalancing ack responsibility to application (current setting)
+ currAppRebalanceEnable bool
+}
+
+func (h *handle) String() string {
+ return C.GoString(C.rd_kafka_name(h.rk))
+}
+
+func (h *handle) setup() {
+ h.rktCache = make(map[string]*C.rd_kafka_topic_t)
+ h.rktNameCache = make(map[*C.rd_kafka_topic_t]string)
+ h.cgomap = make(map[int]cgoif)
+ h.terminatedChan = make(chan string, 10)
+}
+
+func (h *handle) cleanup() {
+ for _, crkt := range h.rktCache {
+ C.rd_kafka_topic_destroy(crkt)
+ }
+
+ if h.rkq != nil {
+ C.rd_kafka_queue_destroy(h.rkq)
+ }
+}
+
+// waitTerminated waits termination of background go-routines.
+// termCnt is the number of goroutines expected to signal termination completion
+// on h.terminatedChan
+func (h *handle) waitTerminated(termCnt int) {
+ // Wait for termCnt termination-done events from goroutines
+ for ; termCnt > 0; termCnt-- {
+ _ = <-h.terminatedChan
+ }
+}
+
+// getRkt0 finds or creates and returns a C topic_t object from the local cache.
+func (h *handle) getRkt0(topic string, ctopic *C.char, doLock bool) (crkt *C.rd_kafka_topic_t) {
+ if doLock {
+ h.rktCacheLock.Lock()
+ defer h.rktCacheLock.Unlock()
+ }
+ crkt, ok := h.rktCache[topic]
+ if ok {
+ return crkt
+ }
+
+ if ctopic == nil {
+ ctopic = C.CString(topic)
+ defer C.free(unsafe.Pointer(ctopic))
+ }
+
+ crkt = C.rd_kafka_topic_new(h.rk, ctopic, nil)
+ if crkt == nil {
+ panic(fmt.Sprintf("Unable to create new C topic \"%s\": %s",
+ topic, C.GoString(C.rd_kafka_err2str(C.rd_kafka_last_error()))))
+ }
+
+ h.rktCache[topic] = crkt
+ h.rktNameCache[crkt] = topic
+
+ return crkt
+}
+
+// getRkt finds or creates and returns a C topic_t object from the local cache.
+func (h *handle) getRkt(topic string) (crkt *C.rd_kafka_topic_t) {
+ return h.getRkt0(topic, nil, true)
+}
+
+// getTopicNameFromRkt returns the topic name for a C topic_t object, preferably
+// using the local cache to avoid a cgo call.
+func (h *handle) getTopicNameFromRkt(crkt *C.rd_kafka_topic_t) (topic string) {
+ h.rktCacheLock.Lock()
+ defer h.rktCacheLock.Unlock()
+
+ topic, ok := h.rktNameCache[crkt]
+ if ok {
+ return topic
+ }
+
+ // we need our own copy/refcount of the crkt
+ ctopic := C.rd_kafka_topic_name(crkt)
+ topic = C.GoString(ctopic)
+
+ crkt = h.getRkt0(topic, ctopic, false /* dont lock */)
+
+ return topic
+}
+
+// cgoif is a generic interface for holding Go state passed as opaque
+// value to the C code.
+// Since pointers to complex Go types cannot be passed to C we instead create
+// a cgoif object, generate a unique id that is added to the cgomap,
+// and then pass that id to the C code. When the C code callback is called we
+// use the id to look up the cgoif object in the cgomap.
+type cgoif interface{}
+
+// delivery report cgoif container
+type cgoDr struct {
+ deliveryChan chan Event
+ opaque interface{}
+}
+
+// cgoPut adds object cg to the handle's cgo map and returns a
+// unique id for the added entry.
+// Thread-safe.
+// FIXME: the uniquity of the id is questionable over time.
+func (h *handle) cgoPut(cg cgoif) (cgoid int) {
+ h.cgoLock.Lock()
+ defer h.cgoLock.Unlock()
+
+ h.cgoidNext++
+ if h.cgoidNext == 0 {
+ h.cgoidNext++
+ }
+ cgoid = (int)(h.cgoidNext)
+ h.cgomap[cgoid] = cg
+ return cgoid
+}
+
+// cgoGet looks up cgoid in the cgo map, deletes the reference from the map
+// and returns the object, if found. Else returns nil, false.
+// Thread-safe.
+func (h *handle) cgoGet(cgoid int) (cg cgoif, found bool) {
+ if cgoid == 0 {
+ return nil, false
+ }
+
+ h.cgoLock.Lock()
+ defer h.cgoLock.Unlock()
+ cg, found = h.cgomap[cgoid]
+ if found {
+ delete(h.cgomap, cgoid)
+ }
+
+ return cg, found
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/header.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/header.go
new file mode 100644
index 0000000..67d6202
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/header.go
@@ -0,0 +1,67 @@
+package kafka
+
+/**
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "fmt"
+ "strconv"
+)
+
+/*
+#include <string.h>
+#include <librdkafka/rdkafka.h>
+#include "glue_rdkafka.h"
+*/
+import "C"
+
+// Header represents a single Kafka message header.
+//
+// Message headers are made up of a list of Header elements, retaining their original insert
+// order and allowing for duplicate Keys.
+//
+// Key is a human readable string identifying the header.
+// Value is the key's binary value, Kafka does not put any restrictions on the format of
+// of the Value but it should be made relatively compact.
+// The value may be a byte array, empty, or nil.
+//
+// NOTE: Message headers are not available on producer delivery report messages.
+type Header struct {
+ Key string // Header name (utf-8 string)
+ Value []byte // Header value (nil, empty, or binary)
+}
+
+// String returns the Header Key and data in a human representable possibly truncated form
+// suitable for displaying to the user.
+func (h Header) String() string {
+ if h.Value == nil {
+ return fmt.Sprintf("%s=nil", h.Key)
+ }
+
+ valueLen := len(h.Value)
+ if valueLen == 0 {
+ return fmt.Sprintf("%s=<empty>", h.Key)
+ }
+
+ truncSize := valueLen
+ trunc := ""
+ if valueLen > 50+15 {
+ truncSize = 50
+ trunc = fmt.Sprintf("(%d more bytes)", valueLen-truncSize)
+ }
+
+ return fmt.Sprintf("%s=%s%s", h.Key, strconv.Quote(string(h.Value[:truncSize])), trunc)
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go
new file mode 100644
index 0000000..4883ee2
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go
@@ -0,0 +1,242 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package kafka provides high-level Apache Kafka producer and consumers
+// using bindings on-top of the librdkafka C library.
+//
+//
+// High-level Consumer
+//
+// * Decide if you want to read messages and events from the `.Events()` channel
+// (set `"go.events.channel.enable": true`) or by calling `.Poll()`.
+//
+// * Create a Consumer with `kafka.NewConsumer()` providing at
+// least the `bootstrap.servers` and `group.id` configuration properties.
+//
+// * Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics)
+// to join the group with the specified subscription set.
+// Subscriptions are atomic, calling `.Subscribe*()` again will leave
+// the group and rejoin with the new set of topics.
+//
+// * Start reading events and messages from either the `.Events` channel
+// or by calling `.Poll()`.
+//
+// * When the group has rebalanced each client member is assigned a
+// (sub-)set of topic+partitions.
+// By default the consumer will start fetching messages for its assigned
+// partitions at this point, but your application may enable rebalance
+// events to get an insight into what the assigned partitions where
+// as well as set the initial offsets. To do this you need to pass
+// `"go.application.rebalance.enable": true` to the `NewConsumer()` call
+// mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event
+// with the assigned partition set. You can optionally modify the initial
+// offsets (they'll default to stored offsets and if there are no previously stored
+// offsets it will fall back to `"default.topic.config": ConfigMap{"auto.offset.reset": ..}`
+// which defaults to the `latest` message) and then call `.Assign(partitions)`
+// to start consuming. If you don't need to modify the initial offsets you will
+// not need to call `.Assign()`, the client will do so automatically for you if
+// you dont.
+//
+// * As messages are fetched they will be made available on either the
+// `.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.
+//
+// * Handle messages, events and errors to your liking.
+//
+// * When you are done consuming call `.Close()` to commit final offsets
+// and leave the consumer group.
+//
+//
+//
+// Producer
+//
+// * Create a Producer with `kafka.NewProducer()` providing at least
+// the `bootstrap.servers` configuration properties.
+//
+// * Messages may now be produced either by sending a `*kafka.Message`
+// on the `.ProduceChannel` or by calling `.Produce()`.
+//
+// * Producing is an asynchronous operation so the client notifies the application
+// of per-message produce success or failure through something called delivery reports.
+// Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message`
+// and you should check `msg.TopicPartition.Error` for `nil` to find out if the message
+// was succesfully delivered or not.
+// It is also possible to direct delivery reports to alternate channels
+// by providing a non-nil `chan Event` channel to `.Produce()`.
+// If no delivery reports are wanted they can be completely disabled by
+// setting configuration property `"go.delivery.reports": false`.
+//
+// * When you are done producing messages you will need to make sure all messages
+// are indeed delivered to the broker (or failed), remember that this is
+// an asynchronous client so some of your messages may be lingering in internal
+// channels or tranmission queues.
+// To do this you can either keep track of the messages you've produced
+// and wait for their corresponding delivery reports, or call the convenience
+// function `.Flush()` that will block until all message deliveries are done
+// or the provided timeout elapses.
+//
+// * Finally call `.Close()` to decommission the producer.
+//
+//
+// Events
+//
+// Apart from emitting messages and delivery reports the client also communicates
+// with the application through a number of different event types.
+// An application may choose to handle or ignore these events.
+//
+// Consumer events
+//
+// * `*kafka.Message` - a fetched message.
+//
+// * `AssignedPartitions` - The assigned partition set for this client following a rebalance.
+// Requires `go.application.rebalance.enable`
+//
+// * `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance.
+// `AssignedPartitions` and `RevokedPartitions` are symetrical.
+// Requires `go.application.rebalance.enable`
+//
+// * `PartitionEOF` - Consumer has reached the end of a partition.
+// NOTE: The consumer will keep trying to fetch new messages for the partition.
+//
+// * `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).
+//
+//
+// Producer events
+//
+// * `*kafka.Message` - delivery report for produced message.
+// Check `.TopicPartition.Error` for delivery result.
+//
+//
+// Generic events for both Consumer and Producer
+//
+// * `KafkaError` - client (error codes are prefixed with _) or broker error.
+// These errors are normally just informational since the
+// client will try its best to automatically recover (eventually).
+//
+//
+// Hint: If your application registers a signal notification
+// (signal.Notify) makes sure the signals channel is buffered to avoid
+// possible complications with blocking Poll() calls.
+//
+// Note: The Confluent Kafka Go client is safe for concurrent use.
+package kafka
+
+import (
+ "fmt"
+ "unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <string.h>
+#include <librdkafka/rdkafka.h>
+
+static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) {
+ return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL;
+}
+*/
+import "C"
+
+// PartitionAny represents any partition (for partitioning),
+// or unspecified value (for all other cases)
+const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)
+
+// TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.
+type TopicPartition struct {
+ Topic *string
+ Partition int32
+ Offset Offset
+ Error error
+}
+
+func (p TopicPartition) String() string {
+ topic := "<null>"
+ if p.Topic != nil {
+ topic = *p.Topic
+ }
+ if p.Error != nil {
+ return fmt.Sprintf("%s[%d]@%s(%s)",
+ topic, p.Partition, p.Offset, p.Error)
+ }
+ return fmt.Sprintf("%s[%d]@%s",
+ topic, p.Partition, p.Offset)
+}
+
+// TopicPartitions is a slice of TopicPartitions that also implements
+// the sort interface
+type TopicPartitions []TopicPartition
+
+func (tps TopicPartitions) Len() int {
+ return len(tps)
+}
+
+func (tps TopicPartitions) Less(i, j int) bool {
+ if *tps[i].Topic < *tps[j].Topic {
+ return true
+ } else if *tps[i].Topic > *tps[j].Topic {
+ return false
+ }
+ return tps[i].Partition < tps[j].Partition
+}
+
+func (tps TopicPartitions) Swap(i, j int) {
+ tps[i], tps[j] = tps[j], tps[i]
+}
+
+// new_cparts_from_TopicPartitions creates a new C rd_kafka_topic_partition_list_t
+// from a TopicPartition array.
+func newCPartsFromTopicPartitions(partitions []TopicPartition) (cparts *C.rd_kafka_topic_partition_list_t) {
+ cparts = C.rd_kafka_topic_partition_list_new(C.int(len(partitions)))
+ for _, part := range partitions {
+ ctopic := C.CString(*part.Topic)
+ defer C.free(unsafe.Pointer(ctopic))
+ rktpar := C.rd_kafka_topic_partition_list_add(cparts, ctopic, C.int32_t(part.Partition))
+ rktpar.offset = C.int64_t(part.Offset)
+ }
+
+ return cparts
+}
+
+func setupTopicPartitionFromCrktpar(partition *TopicPartition, crktpar *C.rd_kafka_topic_partition_t) {
+
+ topic := C.GoString(crktpar.topic)
+ partition.Topic = &topic
+ partition.Partition = int32(crktpar.partition)
+ partition.Offset = Offset(crktpar.offset)
+ if crktpar.err != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ partition.Error = newError(crktpar.err)
+ }
+}
+
+func newTopicPartitionsFromCparts(cparts *C.rd_kafka_topic_partition_list_t) (partitions []TopicPartition) {
+
+ partcnt := int(cparts.cnt)
+
+ partitions = make([]TopicPartition, partcnt)
+ for i := 0; i < partcnt; i++ {
+ crktpar := C._c_rdkafka_topic_partition_list_entry(cparts, C.int(i))
+ setupTopicPartitionFromCrktpar(&partitions[i], crktpar)
+ }
+
+ return partitions
+}
+
+// LibraryVersion returns the underlying librdkafka library version as a
+// (version_int, version_str) tuple.
+func LibraryVersion() (int, string) {
+ ver := (int)(C.rd_kafka_version())
+ verstr := C.GoString(C.rd_kafka_version_str())
+ return ver, verstr
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/message.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/message.go
new file mode 100644
index 0000000..3472d1c
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/message.go
@@ -0,0 +1,207 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "fmt"
+ "time"
+ "unsafe"
+)
+
+/*
+#include <string.h>
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+#include "glue_rdkafka.h"
+
+void setup_rkmessage (rd_kafka_message_t *rkmessage,
+ rd_kafka_topic_t *rkt, int32_t partition,
+ const void *payload, size_t len,
+ void *key, size_t keyLen, void *opaque) {
+ rkmessage->rkt = rkt;
+ rkmessage->partition = partition;
+ rkmessage->payload = (void *)payload;
+ rkmessage->len = len;
+ rkmessage->key = (void *)key;
+ rkmessage->key_len = keyLen;
+ rkmessage->_private = opaque;
+}
+*/
+import "C"
+
+// TimestampType is a the Message timestamp type or source
+//
+type TimestampType int
+
+const (
+ // TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support
+ TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
+ // TimestampCreateTime indicates timestamp set by producer (source time)
+ TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME)
+ // TimestampLogAppendTime indicates timestamp set set by broker (store time)
+ TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
+)
+
+func (t TimestampType) String() string {
+ switch t {
+ case TimestampCreateTime:
+ return "CreateTime"
+ case TimestampLogAppendTime:
+ return "LogAppendTime"
+ case TimestampNotAvailable:
+ fallthrough
+ default:
+ return "NotAvailable"
+ }
+}
+
+// Message represents a Kafka message
+type Message struct {
+ TopicPartition TopicPartition
+ Value []byte
+ Key []byte
+ Timestamp time.Time
+ TimestampType TimestampType
+ Opaque interface{}
+ Headers []Header
+}
+
+// String returns a human readable representation of a Message.
+// Key and payload are not represented.
+func (m *Message) String() string {
+ var topic string
+ if m.TopicPartition.Topic != nil {
+ topic = *m.TopicPartition.Topic
+ } else {
+ topic = ""
+ }
+ return fmt.Sprintf("%s[%d]@%s", topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
+}
+
+func (h *handle) getRktFromMessage(msg *Message) (crkt *C.rd_kafka_topic_t) {
+ if msg.TopicPartition.Topic == nil {
+ return nil
+ }
+
+ return h.getRkt(*msg.TopicPartition.Topic)
+}
+
+func (h *handle) newMessageFromFcMsg(fcMsg *C.fetched_c_msg_t) (msg *Message) {
+ msg = &Message{}
+
+ if fcMsg.ts != -1 {
+ ts := int64(fcMsg.ts)
+ msg.TimestampType = TimestampType(fcMsg.tstype)
+ msg.Timestamp = time.Unix(ts/1000, (ts%1000)*1000000)
+ }
+
+ if fcMsg.tmphdrsCnt > 0 {
+ msg.Headers = make([]Header, fcMsg.tmphdrsCnt)
+ for n := range msg.Headers {
+ tmphdr := (*[1 << 30]C.tmphdr_t)(unsafe.Pointer(fcMsg.tmphdrs))[n]
+ msg.Headers[n].Key = C.GoString(tmphdr.key)
+ if tmphdr.val != nil {
+ msg.Headers[n].Value = C.GoBytes(unsafe.Pointer(tmphdr.val), C.int(tmphdr.size))
+ } else {
+ msg.Headers[n].Value = nil
+ }
+ }
+ C.free(unsafe.Pointer(fcMsg.tmphdrs))
+ }
+
+ h.setupMessageFromC(msg, fcMsg.msg)
+
+ return msg
+}
+
+// setupMessageFromC sets up a message object from a C rd_kafka_message_t
+func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) {
+ if cmsg.rkt != nil {
+ topic := h.getTopicNameFromRkt(cmsg.rkt)
+ msg.TopicPartition.Topic = &topic
+ }
+ msg.TopicPartition.Partition = int32(cmsg.partition)
+ if cmsg.payload != nil {
+ msg.Value = C.GoBytes(unsafe.Pointer(cmsg.payload), C.int(cmsg.len))
+ }
+ if cmsg.key != nil {
+ msg.Key = C.GoBytes(unsafe.Pointer(cmsg.key), C.int(cmsg.key_len))
+ }
+ msg.TopicPartition.Offset = Offset(cmsg.offset)
+ if cmsg.err != 0 {
+ msg.TopicPartition.Error = newError(cmsg.err)
+ }
+}
+
+// newMessageFromC creates a new message object from a C rd_kafka_message_t
+// NOTE: For use with Producer: does not set message timestamp fields.
+func (h *handle) newMessageFromC(cmsg *C.rd_kafka_message_t) (msg *Message) {
+ msg = &Message{}
+
+ h.setupMessageFromC(msg, cmsg)
+
+ return msg
+}
+
+// messageToC sets up cmsg as a clone of msg
+func (h *handle) messageToC(msg *Message, cmsg *C.rd_kafka_message_t) {
+ var valp unsafe.Pointer
+ var keyp unsafe.Pointer
+
+ // to circumvent Cgo constraints we need to allocate C heap memory
+ // for both Value and Key (one allocation back to back)
+ // and copy the bytes from Value and Key to the C memory.
+ // We later tell librdkafka (in produce()) to free the
+ // C memory pointer when it is done.
+ var payload unsafe.Pointer
+
+ valueLen := 0
+ keyLen := 0
+ if msg.Value != nil {
+ valueLen = len(msg.Value)
+ }
+ if msg.Key != nil {
+ keyLen = len(msg.Key)
+ }
+
+ allocLen := valueLen + keyLen
+ if allocLen > 0 {
+ payload = C.malloc(C.size_t(allocLen))
+ if valueLen > 0 {
+ copy((*[1 << 30]byte)(payload)[0:valueLen], msg.Value)
+ valp = payload
+ }
+ if keyLen > 0 {
+ copy((*[1 << 30]byte)(payload)[valueLen:allocLen], msg.Key)
+ keyp = unsafe.Pointer(&((*[1 << 31]byte)(payload)[valueLen]))
+ }
+ }
+
+ cmsg.rkt = h.getRktFromMessage(msg)
+ cmsg.partition = C.int32_t(msg.TopicPartition.Partition)
+ cmsg.payload = valp
+ cmsg.len = C.size_t(valueLen)
+ cmsg.key = keyp
+ cmsg.key_len = C.size_t(keyLen)
+ cmsg._private = nil
+}
+
+// used for testing messageToC performance
+func (h *handle) messageToCDummy(msg *Message) {
+ var cmsg C.rd_kafka_message_t
+ h.messageToC(msg, &cmsg)
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go
new file mode 100644
index 0000000..061147d
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go
@@ -0,0 +1,158 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+ "unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+
+struct rd_kafka_metadata_broker *_getMetadata_broker_element(struct rd_kafka_metadata *m, int i) {
+ return &m->brokers[i];
+}
+
+struct rd_kafka_metadata_topic *_getMetadata_topic_element(struct rd_kafka_metadata *m, int i) {
+ return &m->topics[i];
+}
+
+struct rd_kafka_metadata_partition *_getMetadata_partition_element(struct rd_kafka_metadata *m, int topic_idx, int partition_idx) {
+ return &m->topics[topic_idx].partitions[partition_idx];
+}
+
+int32_t _get_int32_element (int32_t *arr, int i) {
+ return arr[i];
+}
+
+*/
+import "C"
+
+// BrokerMetadata contains per-broker metadata
+type BrokerMetadata struct {
+ ID int32
+ Host string
+ Port int
+}
+
+// PartitionMetadata contains per-partition metadata
+type PartitionMetadata struct {
+ ID int32
+ Error Error
+ Leader int32
+ Replicas []int32
+ Isrs []int32
+}
+
+// TopicMetadata contains per-topic metadata
+type TopicMetadata struct {
+ Topic string
+ Partitions []PartitionMetadata
+ Error Error
+}
+
+// Metadata contains broker and topic metadata for all (matching) topics
+type Metadata struct {
+ Brokers []BrokerMetadata
+ Topics map[string]TopicMetadata
+
+ OriginatingBroker BrokerMetadata
+}
+
+// getMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+func getMetadata(H Handle, topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+ h := H.gethandle()
+
+ var rkt *C.rd_kafka_topic_t
+ if topic != nil {
+ rkt = h.getRkt(*topic)
+ }
+
+ var cMd *C.struct_rd_kafka_metadata
+ cErr := C.rd_kafka_metadata(h.rk, bool2cint(allTopics),
+ rkt, &cMd, C.int(timeoutMs))
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newError(cErr)
+ }
+
+ m := Metadata{}
+ defer C.rd_kafka_metadata_destroy(cMd)
+
+ m.Brokers = make([]BrokerMetadata, cMd.broker_cnt)
+ for i := 0; i < int(cMd.broker_cnt); i++ {
+ b := C._getMetadata_broker_element(cMd, C.int(i))
+ m.Brokers[i] = BrokerMetadata{int32(b.id), C.GoString(b.host),
+ int(b.port)}
+ }
+
+ m.Topics = make(map[string]TopicMetadata, int(cMd.topic_cnt))
+ for i := 0; i < int(cMd.topic_cnt); i++ {
+ t := C._getMetadata_topic_element(cMd, C.int(i))
+
+ thisTopic := C.GoString(t.topic)
+ m.Topics[thisTopic] = TopicMetadata{Topic: thisTopic,
+ Error: newError(t.err),
+ Partitions: make([]PartitionMetadata, int(t.partition_cnt))}
+
+ for j := 0; j < int(t.partition_cnt); j++ {
+ p := C._getMetadata_partition_element(cMd, C.int(i), C.int(j))
+ m.Topics[thisTopic].Partitions[j] = PartitionMetadata{
+ ID: int32(p.id),
+ Error: newError(p.err),
+ Leader: int32(p.leader)}
+ m.Topics[thisTopic].Partitions[j].Replicas = make([]int32, int(p.replica_cnt))
+ for ir := 0; ir < int(p.replica_cnt); ir++ {
+ m.Topics[thisTopic].Partitions[j].Replicas[ir] = int32(C._get_int32_element(p.replicas, C.int(ir)))
+ }
+
+ m.Topics[thisTopic].Partitions[j].Isrs = make([]int32, int(p.isr_cnt))
+ for ii := 0; ii < int(p.isr_cnt); ii++ {
+ m.Topics[thisTopic].Partitions[j].Isrs[ii] = int32(C._get_int32_element(p.isrs, C.int(ii)))
+ }
+ }
+ }
+
+ m.OriginatingBroker = BrokerMetadata{int32(cMd.orig_broker_id),
+ C.GoString(cMd.orig_broker_name), 0}
+
+ return &m, nil
+}
+
+// queryWatermarkOffsets returns the broker's low and high offsets for the given topic
+// and partition.
+func queryWatermarkOffsets(H Handle, topic string, partition int32, timeoutMs int) (low, high int64, err error) {
+ h := H.gethandle()
+
+ ctopic := C.CString(topic)
+ defer C.free(unsafe.Pointer(ctopic))
+
+ var cLow, cHigh C.int64_t
+
+ e := C.rd_kafka_query_watermark_offsets(h.rk, ctopic, C.int32_t(partition),
+ &cLow, &cHigh, C.int(timeoutMs))
+ if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return 0, 0, newError(e)
+ }
+
+ low = int64(cLow)
+ high = int64(cHigh)
+ return low, high, nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/misc.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/misc.go
new file mode 100644
index 0000000..6d602ce
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/misc.go
@@ -0,0 +1,35 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import "C"
+
+// bool2int converts a bool to a C.int (1 or 0)
+func bool2cint(b bool) C.int {
+ if b {
+ return 1
+ }
+ return 0
+}
+
+// cint2bool converts a C.int to a bool
+func cint2bool(v C.int) bool {
+ if v == 0 {
+ return false
+ }
+ return true
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/offset.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/offset.go
new file mode 100644
index 0000000..5dd7fd2
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/offset.go
@@ -0,0 +1,144 @@
+/**
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+ "fmt"
+ "strconv"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+
+static int64_t _c_rdkafka_offset_tail(int64_t rel) {
+ return RD_KAFKA_OFFSET_TAIL(rel);
+}
+*/
+import "C"
+
+// Offset type (int64) with support for canonical names
+type Offset int64
+
+// OffsetBeginning represents the earliest offset (logical)
+const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)
+
+// OffsetEnd represents the latest offset (logical)
+const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)
+
+// OffsetInvalid represents an invalid/unspecified offset
+const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)
+
+// OffsetStored represents a stored offset
+const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)
+
+func (o Offset) String() string {
+ switch o {
+ case OffsetBeginning:
+ return "beginning"
+ case OffsetEnd:
+ return "end"
+ case OffsetInvalid:
+ return "unset"
+ case OffsetStored:
+ return "stored"
+ default:
+ return fmt.Sprintf("%d", int64(o))
+ }
+}
+
+// Set offset value, see NewOffset()
+func (o *Offset) Set(offset interface{}) error {
+ n, err := NewOffset(offset)
+
+ if err == nil {
+ *o = n
+ }
+
+ return err
+}
+
+// NewOffset creates a new Offset using the provided logical string, or an
+// absolute int64 offset value.
+// Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
+func NewOffset(offset interface{}) (Offset, error) {
+
+ switch v := offset.(type) {
+ case string:
+ switch v {
+ case "beginning":
+ fallthrough
+ case "earliest":
+ return Offset(OffsetBeginning), nil
+
+ case "end":
+ fallthrough
+ case "latest":
+ return Offset(OffsetEnd), nil
+
+ case "unset":
+ fallthrough
+ case "invalid":
+ return Offset(OffsetInvalid), nil
+
+ case "stored":
+ return Offset(OffsetStored), nil
+
+ default:
+ off, err := strconv.Atoi(v)
+ return Offset(off), err
+ }
+
+ case int:
+ return Offset((int64)(v)), nil
+ case int64:
+ return Offset(v), nil
+ default:
+ return OffsetInvalid, newErrorFromString(ErrInvalidArg,
+ fmt.Sprintf("Invalid offset type: %t", v))
+ }
+}
+
+// OffsetTail returns the logical offset relativeOffset from current end of partition
+func OffsetTail(relativeOffset Offset) Offset {
+ return Offset(C._c_rdkafka_offset_tail(C.int64_t(relativeOffset)))
+}
+
+// offsetsForTimes looks up offsets by timestamp for the given partitions.
+//
+// The returned offset for each partition is the earliest offset whose
+// timestamp is greater than or equal to the given timestamp in the
+// corresponding partition.
+//
+// The timestamps to query are represented as `.Offset` in the `times`
+// argument and the looked up offsets are represented as `.Offset` in the returned
+// `offsets` list.
+//
+// The function will block for at most timeoutMs milliseconds.
+//
+// Duplicate Topic+Partitions are not supported.
+// Per-partition errors may be returned in the `.Error` field.
+func offsetsForTimes(H Handle, times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+ cparts := newCPartsFromTopicPartitions(times)
+ defer C.rd_kafka_topic_partition_list_destroy(cparts)
+ cerr := C.rd_kafka_offsets_for_times(H.gethandle().rk, cparts, C.int(timeoutMs))
+ if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ return nil, newError(cerr)
+ }
+
+ return newTopicPartitionsFromCparts(cparts), nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/producer.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/producer.go
new file mode 100644
index 0000000..7eac912
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/producer.go
@@ -0,0 +1,583 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+ "fmt"
+ "math"
+ "time"
+ "unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+#include "glue_rdkafka.h"
+
+
+#ifdef RD_KAFKA_V_HEADERS
+// Convert tmphdrs to chdrs (created by this function).
+// If tmphdr.size == -1: value is considered Null
+// tmphdr.size == 0: value is considered empty (ignored)
+// tmphdr.size > 0: value is considered non-empty
+//
+// WARNING: The header keys and values will be freed by this function.
+void tmphdrs_to_chdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt,
+ rd_kafka_headers_t **chdrs) {
+ size_t i;
+
+ *chdrs = rd_kafka_headers_new(tmphdrsCnt);
+
+ for (i = 0 ; i < tmphdrsCnt ; i++) {
+ rd_kafka_header_add(*chdrs,
+ tmphdrs[i].key, -1,
+ tmphdrs[i].size == -1 ? NULL :
+ (tmphdrs[i].size == 0 ? "" : tmphdrs[i].val),
+ tmphdrs[i].size == -1 ? 0 : tmphdrs[i].size);
+ if (tmphdrs[i].size > 0)
+ free((void *)tmphdrs[i].val);
+ free((void *)tmphdrs[i].key);
+ }
+}
+
+#else
+void free_tmphdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt) {
+ size_t i;
+ for (i = 0 ; i < tmphdrsCnt ; i++) {
+ if (tmphdrs[i].size > 0)
+ free((void *)tmphdrs[i].val);
+ free((void *)tmphdrs[i].key);
+ }
+}
+#endif
+
+
+rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
+ rd_kafka_topic_t *rkt, int32_t partition,
+ int msgflags,
+ int valIsNull, void *val, size_t val_len,
+ int keyIsNull, void *key, size_t key_len,
+ int64_t timestamp,
+ tmphdr_t *tmphdrs, size_t tmphdrsCnt,
+ uintptr_t cgoid) {
+ void *valp = valIsNull ? NULL : val;
+ void *keyp = keyIsNull ? NULL : key;
+#ifdef RD_KAFKA_V_TIMESTAMP
+rd_kafka_resp_err_t err;
+#ifdef RD_KAFKA_V_HEADERS
+ rd_kafka_headers_t *hdrs = NULL;
+#endif
+#endif
+
+
+ if (tmphdrsCnt > 0) {
+#ifdef RD_KAFKA_V_HEADERS
+ tmphdrs_to_chdrs(tmphdrs, tmphdrsCnt, &hdrs);
+#else
+ free_tmphdrs(tmphdrs, tmphdrsCnt);
+ return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
+#endif
+ }
+
+
+#ifdef RD_KAFKA_V_TIMESTAMP
+ err = rd_kafka_producev(rk,
+ RD_KAFKA_V_RKT(rkt),
+ RD_KAFKA_V_PARTITION(partition),
+ RD_KAFKA_V_MSGFLAGS(msgflags),
+ RD_KAFKA_V_VALUE(valp, val_len),
+ RD_KAFKA_V_KEY(keyp, key_len),
+ RD_KAFKA_V_TIMESTAMP(timestamp),
+#ifdef RD_KAFKA_V_HEADERS
+ RD_KAFKA_V_HEADERS(hdrs),
+#endif
+ RD_KAFKA_V_OPAQUE((void *)cgoid),
+ RD_KAFKA_V_END);
+#ifdef RD_KAFKA_V_HEADERS
+ if (err && hdrs)
+ rd_kafka_headers_destroy(hdrs);
+#endif
+ return err;
+#else
+ if (timestamp)
+ return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
+ if (rd_kafka_produce(rkt, partition, msgflags,
+ valp, val_len,
+ keyp, key_len,
+ (void *)cgoid) == -1)
+ return rd_kafka_last_error();
+ else
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+#endif
+}
+*/
+import "C"
+
+// Producer implements a High-level Apache Kafka Producer instance
+type Producer struct {
+ events chan Event
+ produceChannel chan *Message
+ handle handle
+
+ // Terminates the poller() goroutine
+ pollerTermChan chan bool
+}
+
+// String returns a human readable name for a Producer instance
+func (p *Producer) String() string {
+ return p.handle.String()
+}
+
+// get_handle implements the Handle interface
+func (p *Producer) gethandle() *handle {
+ return &p.handle
+}
+
+func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error {
+ if msg == nil || msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 {
+ return newErrorFromString(ErrInvalidArg, "")
+ }
+
+ crkt := p.handle.getRkt(*msg.TopicPartition.Topic)
+
+ // Three problems:
+ // 1) There's a difference between an empty Value or Key (length 0, proper pointer) and
+ // a null Value or Key (length 0, null pointer).
+ // 2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0])
+ // dereference can't be performed on a nil slice.
+ // 3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made
+ // in the call to the C function.
+ //
+ // Solution:
+ // Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers
+ // point to a 1-byte slice (but the length to send is still 0) so that the dereference (2)
+ // works.
+ // Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point
+ // to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3).
+ //
+ var valp []byte
+ var keyp []byte
+ oneByte := []byte{0}
+ var valIsNull C.int
+ var keyIsNull C.int
+ var valLen int
+ var keyLen int
+
+ if msg.Value == nil {
+ valIsNull = 1
+ valLen = 0
+ valp = oneByte
+ } else {
+ valLen = len(msg.Value)
+ if valLen > 0 {
+ valp = msg.Value
+ } else {
+ valp = oneByte
+ }
+ }
+
+ if msg.Key == nil {
+ keyIsNull = 1
+ keyLen = 0
+ keyp = oneByte
+ } else {
+ keyLen = len(msg.Key)
+ if keyLen > 0 {
+ keyp = msg.Key
+ } else {
+ keyp = oneByte
+ }
+ }
+
+ var cgoid int
+
+ // Per-message state that needs to be retained through the C code:
+ // delivery channel (if specified)
+ // message opaque (if specified)
+ // Since these cant be passed as opaque pointers to the C code,
+ // due to cgo constraints, we add them to a per-producer map for lookup
+ // when the C code triggers the callbacks or events.
+ if deliveryChan != nil || msg.Opaque != nil {
+ cgoid = p.handle.cgoPut(cgoDr{deliveryChan: deliveryChan, opaque: msg.Opaque})
+ }
+
+ var timestamp int64
+ if !msg.Timestamp.IsZero() {
+ timestamp = msg.Timestamp.UnixNano() / 1000000
+ }
+
+ // Convert headers to C-friendly tmphdrs
+ var tmphdrs []C.tmphdr_t
+ tmphdrsCnt := len(msg.Headers)
+
+ if tmphdrsCnt > 0 {
+ tmphdrs = make([]C.tmphdr_t, tmphdrsCnt)
+
+ for n, hdr := range msg.Headers {
+ // Make a copy of the key
+ // to avoid runtime panic with
+ // foreign Go pointers in cgo.
+ tmphdrs[n].key = C.CString(hdr.Key)
+ if hdr.Value != nil {
+ tmphdrs[n].size = C.ssize_t(len(hdr.Value))
+ if tmphdrs[n].size > 0 {
+ // Make a copy of the value
+ // to avoid runtime panic with
+ // foreign Go pointers in cgo.
+ tmphdrs[n].val = C.CBytes(hdr.Value)
+ }
+ } else {
+ // null value
+ tmphdrs[n].size = C.ssize_t(-1)
+ }
+ }
+ } else {
+ // no headers, need a dummy tmphdrs of size 1 to avoid index
+ // out of bounds panic in do_produce() call below.
+ // tmphdrsCnt will be 0.
+ tmphdrs = []C.tmphdr_t{{nil, nil, 0}}
+ }
+
+ cErr := C.do_produce(p.handle.rk, crkt,
+ C.int32_t(msg.TopicPartition.Partition),
+ C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
+ valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
+ keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
+ C.int64_t(timestamp),
+ (*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
+ (C.uintptr_t)(cgoid))
+ if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+ if cgoid != 0 {
+ p.handle.cgoGet(cgoid)
+ }
+ return newError(cErr)
+ }
+
+ return nil
+}
+
+// Produce single message.
+// This is an asynchronous call that enqueues the message on the internal
+// transmit queue, thus returning immediately.
+// The delivery report will be sent on the provided deliveryChan if specified,
+// or on the Producer object's Events() channel if not.
+// msg.Timestamp requires librdkafka >= 0.9.4 (else returns ErrNotImplemented),
+// api.version.request=true, and broker >= 0.10.0.0.
+// msg.Headers requires librdkafka >= 0.11.4 (else returns ErrNotImplemented),
+// api.version.request=true, and broker >= 0.11.0.0.
+// Returns an error if message could not be enqueued.
+func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error {
+ return p.produce(msg, 0, deliveryChan)
+}
+
+// Produce a batch of messages.
+// These batches do not relate to the message batches sent to the broker, the latter
+// are collected on the fly internally in librdkafka.
+// WARNING: This is an experimental API.
+// NOTE: timestamps and headers are not supported with this API.
+func (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) error {
+ crkt := p.handle.getRkt(topic)
+
+ cmsgs := make([]C.rd_kafka_message_t, len(msgs))
+ for i, m := range msgs {
+ p.handle.messageToC(m, &cmsgs[i])
+ }
+ r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE,
+ (*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs)))
+ if r == -1 {
+ return newError(C.rd_kafka_last_error())
+ }
+
+ return nil
+}
+
+// Events returns the Events channel (read)
+func (p *Producer) Events() chan Event {
+ return p.events
+}
+
+// ProduceChannel returns the produce *Message channel (write)
+func (p *Producer) ProduceChannel() chan *Message {
+ return p.produceChannel
+}
+
+// Len returns the number of messages and requests waiting to be transmitted to the broker
+// as well as delivery reports queued for the application.
+// Includes messages on ProduceChannel.
+func (p *Producer) Len() int {
+ return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk))
+}
+
+// Flush and wait for outstanding messages and requests to complete delivery.
+// Includes messages on ProduceChannel.
+// Runs until value reaches zero or on timeoutMs.
+// Returns the number of outstanding events still un-flushed.
+func (p *Producer) Flush(timeoutMs int) int {
+ termChan := make(chan bool) // unused stand-in termChan
+
+ d, _ := time.ParseDuration(fmt.Sprintf("%dms", timeoutMs))
+ tEnd := time.Now().Add(d)
+ for p.Len() > 0 {
+ remain := tEnd.Sub(time.Now()).Seconds()
+ if remain <= 0.0 {
+ return p.Len()
+ }
+
+ p.handle.eventPoll(p.events,
+ int(math.Min(100, remain*1000)), 1000, termChan)
+ }
+
+ return 0
+}
+
+// Close a Producer instance.
+// The Producer object or its channels are no longer usable after this call.
+func (p *Producer) Close() {
+ // Wait for poller() (signaled by closing pollerTermChan)
+ // and channel_producer() (signaled by closing ProduceChannel)
+ close(p.pollerTermChan)
+ close(p.produceChannel)
+ p.handle.waitTerminated(2)
+
+ close(p.events)
+
+ p.handle.cleanup()
+
+ C.rd_kafka_destroy(p.handle.rk)
+}
+
+// NewProducer creates a new high-level Producer instance.
+//
+// conf is a *ConfigMap with standard librdkafka configuration properties, see here:
+//
+//
+//
+//
+//
+// Supported special configuration properties:
+// go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance).
+// These batches do not relate to Kafka message batches in any way.
+// Note: timestamps and headers are not supported with this interface.
+// go.delivery.reports (bool, true) - Forward per-message delivery reports to the
+// Events() channel.
+// go.events.channel.size (int, 1000000) - Events() channel size
+// go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
+//
+func NewProducer(conf *ConfigMap) (*Producer, error) {
+
+ err := versionCheck()
+ if err != nil {
+ return nil, err
+ }
+
+ p := &Producer{}
+
+ // before we do anything with the configuration, create a copy such that
+ // the original is not mutated.
+ confCopy := conf.clone()
+
+ v, err := confCopy.extract("go.batch.producer", false)
+ if err != nil {
+ return nil, err
+ }
+ batchProducer := v.(bool)
+
+ v, err = confCopy.extract("go.delivery.reports", true)
+ if err != nil {
+ return nil, err
+ }
+ p.handle.fwdDr = v.(bool)
+
+ v, err = confCopy.extract("go.events.channel.size", 1000000)
+ if err != nil {
+ return nil, err
+ }
+ eventsChanSize := v.(int)
+
+ v, err = confCopy.extract("go.produce.channel.size", 1000000)
+ if err != nil {
+ return nil, err
+ }
+ produceChannelSize := v.(int)
+
+ if int(C.rd_kafka_version()) < 0x01000000 {
+ // produce.offset.report is no longer used in librdkafka >= v1.0.0
+ v, _ = confCopy.extract("{topic}.produce.offset.report", nil)
+ if v == nil {
+ // Enable offset reporting by default, unless overriden.
+ confCopy.SetKey("{topic}.produce.offset.report", true)
+ }
+ }
+
+ // Convert ConfigMap to librdkafka conf_t
+ cConf, err := confCopy.convert()
+ if err != nil {
+ return nil, err
+ }
+
+ cErrstr := (*C.char)(C.malloc(C.size_t(256)))
+ defer C.free(unsafe.Pointer(cErrstr))
+
+ C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
+
+ // Create librdkafka producer instance
+ p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
+ if p.handle.rk == nil {
+ return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+ }
+
+ p.handle.p = p
+ p.handle.setup()
+ p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk)
+ p.events = make(chan Event, eventsChanSize)
+ p.produceChannel = make(chan *Message, produceChannelSize)
+ p.pollerTermChan = make(chan bool)
+
+ go poller(p, p.pollerTermChan)
+
+ // non-batch or batch producer, only one must be used
+ if batchProducer {
+ go channelBatchProducer(p)
+ } else {
+ go channelProducer(p)
+ }
+
+ return p, nil
+}
+
+// channel_producer serves the ProduceChannel channel
+func channelProducer(p *Producer) {
+
+ for m := range p.produceChannel {
+ err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil)
+ if err != nil {
+ m.TopicPartition.Error = err
+ p.events <- m
+ }
+ }
+
+ p.handle.terminatedChan <- "channelProducer"
+}
+
+// channelBatchProducer serves the ProduceChannel channel and attempts to
+// improve cgo performance by using the produceBatch() interface.
+func channelBatchProducer(p *Producer) {
+ var buffered = make(map[string][]*Message)
+ bufferedCnt := 0
+ const batchSize int = 1000000
+ totMsgCnt := 0
+ totBatchCnt := 0
+
+ for m := range p.produceChannel {
+ buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
+ bufferedCnt++
+
+ loop2:
+ for true {
+ select {
+ case m, ok := <-p.produceChannel:
+ if !ok {
+ break loop2
+ }
+ if m == nil {
+ panic("nil message received on ProduceChannel")
+ }
+ if m.TopicPartition.Topic == nil {
+ panic(fmt.Sprintf("message without Topic received on ProduceChannel: %v", m))
+ }
+ buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
+ bufferedCnt++
+ if bufferedCnt >= batchSize {
+ break loop2
+ }
+ default:
+ break loop2
+ }
+ }
+
+ totBatchCnt++
+ totMsgCnt += len(buffered)
+
+ for topic, buffered2 := range buffered {
+ err := p.produceBatch(topic, buffered2, C.RD_KAFKA_MSG_F_BLOCK)
+ if err != nil {
+ for _, m = range buffered2 {
+ m.TopicPartition.Error = err
+ p.events <- m
+ }
+ }
+ }
+
+ buffered = make(map[string][]*Message)
+ bufferedCnt = 0
+ }
+ p.handle.terminatedChan <- "channelBatchProducer"
+}
+
+// poller polls the rd_kafka_t handle for events until signalled for termination
+func poller(p *Producer, termChan chan bool) {
+out:
+ for true {
+ select {
+ case _ = <-termChan:
+ break out
+
+ default:
+ _, term := p.handle.eventPoll(p.events, 100, 1000, termChan)
+ if term {
+ break out
+ }
+ break
+ }
+ }
+
+ p.handle.terminatedChan <- "poller"
+
+}
+
+// GetMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
+func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+ return getMetadata(p, topic, allTopics, timeoutMs)
+}
+
+// QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
+// and partition.
+func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
+ return queryWatermarkOffsets(p, topic, partition, timeoutMs)
+}
+
+// OffsetsForTimes looks up offsets by timestamp for the given partitions.
+//
+// The returned offset for each partition is the earliest offset whose
+// timestamp is greater than or equal to the given timestamp in the
+// corresponding partition.
+//
+// The timestamps to query are represented as `.Offset` in the `times`
+// argument and the looked up offsets are represented as `.Offset` in the returned
+// `offsets` list.
+//
+// The function will block for at most timeoutMs milliseconds.
+//
+// Duplicate Topic+Partitions are not supported.
+// Per-partition errors may be returned in the `.Error` field.
+func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+ return offsetsForTimes(p, times, timeoutMs)
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testconf-example.json b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testconf-example.json
new file mode 100644
index 0000000..7024a9c
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testconf-example.json
@@ -0,0 +1,8 @@
+{
+ "Brokers": "mybroker or $BROKERS env",
+ "Topic": "test",
+ "GroupID": "testgroup",
+ "PerfMsgCount": 1000000,
+ "PerfMsgSize": 100,
+ "Config": ["api.version.request=true"]
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testhelpers.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testhelpers.go
new file mode 100644
index 0000000..22e8e1a
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testhelpers.go
@@ -0,0 +1,179 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "time"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+*/
+import "C"
+
+var testconf struct {
+ Brokers string
+ Topic string
+ GroupID string
+ PerfMsgCount int
+ PerfMsgSize int
+ Config []string
+ conf ConfigMap
+}
+
+// testconf_read reads the test suite config file testconf.json which must
+// contain at least Brokers and Topic string properties.
+// Returns true if the testconf was found and usable, false if no such file, or panics
+// if the file format is wrong.
+func testconfRead() bool {
+ cf, err := os.Open("testconf.json")
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "%% testconf.json not found - ignoring test\n")
+ return false
+ }
+
+ // Default values
+ testconf.PerfMsgCount = 2000000
+ testconf.PerfMsgSize = 100
+ testconf.GroupID = "testgroup"
+
+ jp := json.NewDecoder(cf)
+ err = jp.Decode(&testconf)
+ if err != nil {
+ panic(fmt.Sprintf("Failed to parse testconf: %s", err))
+ }
+
+ cf.Close()
+
+ if testconf.Brokers[0] == '$' {
+ // Read broker list from environment variable
+ testconf.Brokers = os.Getenv(testconf.Brokers[1:])
+ }
+
+ if testconf.Brokers == "" || testconf.Topic == "" {
+ panic("Missing Brokers or Topic in testconf.json")
+ }
+
+ return true
+}
+
+// update existing ConfigMap with key=value pairs from testconf.Config
+func (cm *ConfigMap) updateFromTestconf() error {
+ if testconf.Config == nil {
+ return nil
+ }
+
+ // Translate "key=value" pairs in Config to ConfigMap
+ for _, s := range testconf.Config {
+ err := cm.Set(s)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+
+}
+
+// Return the number of messages available in all partitions of a topic.
+// WARNING: This uses watermark offsets so it will be incorrect for compacted topics.
+func getMessageCountInTopic(topic string) (int, error) {
+
+ // Create consumer
+ config := &ConfigMap{"bootstrap.servers": testconf.Brokers,
+ "group.id": testconf.GroupID}
+ config.updateFromTestconf()
+
+ c, err := NewConsumer(config)
+ if err != nil {
+ return 0, err
+ }
+
+ // get metadata for the topic to find out number of partitions
+
+ metadata, err := c.GetMetadata(&topic, false, 5*1000)
+ if err != nil {
+ return 0, err
+ }
+
+ t, ok := metadata.Topics[topic]
+ if !ok {
+ return 0, newError(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
+ }
+
+ cnt := 0
+ for _, p := range t.Partitions {
+ low, high, err := c.QueryWatermarkOffsets(topic, p.ID, 5*1000)
+ if err != nil {
+ continue
+ }
+ cnt += int(high - low)
+ }
+
+ return cnt, nil
+}
+
+// getBrokerList returns a list of brokers (ids) in the cluster
+func getBrokerList(H Handle) (brokers []int32, err error) {
+ md, err := getMetadata(H, nil, true, 15*1000)
+ if err != nil {
+ return nil, err
+ }
+
+ brokers = make([]int32, len(md.Brokers))
+ for i, mdBroker := range md.Brokers {
+ brokers[i] = mdBroker.ID
+ }
+
+ return brokers, nil
+}
+
+// waitTopicInMetadata waits for the given topic to show up in metadata
+func waitTopicInMetadata(H Handle, topic string, timeoutMs int) error {
+ d, _ := time.ParseDuration(fmt.Sprintf("%dms", timeoutMs))
+ tEnd := time.Now().Add(d)
+
+ for {
+ remain := tEnd.Sub(time.Now()).Seconds()
+ if remain < 0.0 {
+ return newErrorFromString(ErrTimedOut,
+ fmt.Sprintf("Timed out waiting for topic %s to appear in metadata", topic))
+ }
+
+ md, err := getMetadata(H, nil, true, int(remain*1000))
+ if err != nil {
+ return err
+ }
+
+ for _, t := range md.Topics {
+ if t.Topic != topic {
+ continue
+ }
+ if t.Error.Code() != ErrNoError || len(t.Partitions) < 1 {
+ continue
+ }
+ // Proper topic found in metadata
+ return nil
+ }
+
+ time.Sleep(500 * 1000) // 500ms
+ }
+
+}