[VOL-1386]  This commit add "dep" as the package management tool
for voltha-go.

Change-Id: I52bc4911dd00a441756ec7c30f46d45091f3f90e
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/LICENSE b/vendor/github.com/confluentinc/confluent-kafka-go/LICENSE
new file mode 100644
index 0000000..e06d208
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/LICENSE
@@ -0,0 +1,202 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/.gitignore b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/.gitignore
new file mode 100644
index 0000000..b1a2111
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/.gitignore
@@ -0,0 +1,2 @@
+testconf.json
+go_rdkafka_generr/go_rdkafka_generr
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/00version.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/00version.go
new file mode 100644
index 0000000..188dc03
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/00version.go
@@ -0,0 +1,58 @@
+package kafka
+
+/**
+ * Copyright 2016-2018 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+	"fmt"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+
+//Minimum required librdkafka version. This is checked both during
+//build-time and runtime.
+//Make sure to keep the MIN_RD_KAFKA_VERSION, MIN_VER_ERRSTR and #error
+//defines and strings in sync.
+//
+
+#define MIN_RD_KAFKA_VERSION 0x0000b0500
+
+#ifdef __APPLE__
+#define MIN_VER_ERRSTR "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
+#else
+#define MIN_VER_ERRSTR "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
+#endif
+
+#if RD_KAFKA_VERSION < MIN_RD_KAFKA_VERSION
+#ifdef __APPLE__
+#error "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`"
+#else
+#error "confluent-kafka-go requires librdkafka v0.11.5 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html"
+#endif
+#endif
+*/
+import "C"
+
+func versionCheck() error {
+	ver, verstr := LibraryVersion()
+	if ver < C.MIN_RD_KAFKA_VERSION {
+		return newErrorFromString(ErrNotImplemented,
+			fmt.Sprintf("%s: librdkafka version %s (0x%x) detected",
+				C.MIN_VER_ERRSTR, verstr, ver))
+	}
+	return nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/README.md b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/README.md
new file mode 100644
index 0000000..6df4d54
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/README.md
@@ -0,0 +1,69 @@
+# Information for confluent-kafka-go developers
+
+Whenever librdkafka error codes are updated make sure to run generate before building:
+
+```
+  $ (cd go_rdkafka_generr && go install) && go generate
+  $ go build
+```
+
+
+
+
+## Testing
+
+Some of the tests included in this directory, the benchmark and integration tests in particular,
+require an existing Kafka cluster and a testconf.json configuration file to
+provide tests with bootstrap brokers, topic name, etc.
+
+The format of testconf.json is a JSON object:
+```
+{
+  "Brokers": "<bootstrap-brokers>",
+  "Topic": "<test-topic-name>"
+}
+```
+
+See testconf-example.json for an example and full set of available options.
+
+
+To run unit-tests:
+```
+$ go test
+```
+
+To run benchmark tests:
+```
+$ go test -bench .
+```
+
+For the code coverage:
+```
+$ go test -coverprofile=coverage.out -bench=.
+$ go tool cover -func=coverage.out
+```
+
+## Build tags (static linking)
+
+
+Different build types are supported through Go build tags (`-tags ..`),
+these tags should be specified on the **application** build command.
+
+ * `static` - Build with librdkafka linked statically (but librdkafka
+              dependencies linked dynamically).
+ * `static_all` - Build with all libraries linked statically.
+ * neither - Build with librdkafka (and its dependencies) linked dynamically.
+
+
+
+## Generating HTML documentation
+
+To generate one-page HTML documentation run the mk/doc-gen.py script from the
+top-level directory. This script requires the beautifulsoup4 Python package.
+
+```
+$ source .../your/virtualenv/bin/activate
+$ pip install beautifulsoup4
+...
+$ mk/doc-gen.py > kafka.html
+```
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminapi.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminapi.go
new file mode 100644
index 0000000..c2ba76c
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminapi.go
@@ -0,0 +1,942 @@
+/**
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"context"
+	"fmt"
+	"strings"
+	"time"
+	"unsafe"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+#include <stdlib.h>
+
+static const rd_kafka_topic_result_t *
+topic_result_by_idx (const rd_kafka_topic_result_t **topics, size_t cnt, size_t idx) {
+    if (idx >= cnt)
+      return NULL;
+    return topics[idx];
+}
+
+static const rd_kafka_ConfigResource_t *
+ConfigResource_by_idx (const rd_kafka_ConfigResource_t **res, size_t cnt, size_t idx) {
+    if (idx >= cnt)
+      return NULL;
+    return res[idx];
+}
+
+static const rd_kafka_ConfigEntry_t *
+ConfigEntry_by_idx (const rd_kafka_ConfigEntry_t **entries, size_t cnt, size_t idx) {
+    if (idx >= cnt)
+      return NULL;
+    return entries[idx];
+}
+*/
+import "C"
+
+// AdminClient is derived from an existing Producer or Consumer
+type AdminClient struct {
+	handle    *handle
+	isDerived bool // Derived from existing client handle
+}
+
+func durationToMilliseconds(t time.Duration) int {
+	if t > 0 {
+		return (int)(t.Seconds() * 1000.0)
+	}
+	return (int)(t)
+}
+
+// TopicResult provides per-topic operation result (error) information.
+type TopicResult struct {
+	// Topic name
+	Topic string
+	// Error, if any, of result. Check with `Error.Code() != ErrNoError`.
+	Error Error
+}
+
+// String returns a human-readable representation of a TopicResult.
+func (t TopicResult) String() string {
+	if t.Error.code == 0 {
+		return t.Topic
+	}
+	return fmt.Sprintf("%s (%s)", t.Topic, t.Error.str)
+}
+
+// TopicSpecification holds parameters for creating a new topic.
+// TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
+type TopicSpecification struct {
+	// Topic name to create.
+	Topic string
+	// Number of partitions in topic.
+	NumPartitions int
+	// Default replication factor for the topic's partitions, or zero
+	// if an explicit ReplicaAssignment is set.
+	ReplicationFactor int
+	// (Optional) Explicit replica assignment. The outer array is
+	// indexed by the partition number, while the inner per-partition array
+	// contains the replica broker ids. The first broker in each
+	// broker id list will be the preferred replica.
+	ReplicaAssignment [][]int32
+	// Topic configuration.
+	Config map[string]string
+}
+
+// PartitionsSpecification holds parameters for creating additional partitions for a topic.
+// PartitionsSpecification is analogous to NewPartitions in the Java Topic Admin API.
+type PartitionsSpecification struct {
+	// Topic to create more partitions for.
+	Topic string
+	// New partition count for topic, must be higher than current partition count.
+	IncreaseTo int
+	// (Optional) Explicit replica assignment. The outer array is
+	// indexed by the new partition index (i.e., 0 for the first added
+	// partition), while the inner per-partition array
+	// contains the replica broker ids. The first broker in each
+	// broker id list will be the preferred replica.
+	ReplicaAssignment [][]int32
+}
+
+// ResourceType represents an Apache Kafka resource type
+type ResourceType int
+
+const (
+	// ResourceUnknown - Unknown
+	ResourceUnknown = ResourceType(C.RD_KAFKA_RESOURCE_UNKNOWN)
+	// ResourceAny - match any resource type (DescribeConfigs)
+	ResourceAny = ResourceType(C.RD_KAFKA_RESOURCE_ANY)
+	// ResourceTopic - Topic
+	ResourceTopic = ResourceType(C.RD_KAFKA_RESOURCE_TOPIC)
+	// ResourceGroup - Group
+	ResourceGroup = ResourceType(C.RD_KAFKA_RESOURCE_GROUP)
+	// ResourceBroker - Broker
+	ResourceBroker = ResourceType(C.RD_KAFKA_RESOURCE_BROKER)
+)
+
+// String returns the human-readable representation of a ResourceType
+func (t ResourceType) String() string {
+	return C.GoString(C.rd_kafka_ResourceType_name(C.rd_kafka_ResourceType_t(t)))
+}
+
+// ResourceTypeFromString translates a resource type name/string to
+// a ResourceType value.
+func ResourceTypeFromString(typeString string) (ResourceType, error) {
+	switch strings.ToUpper(typeString) {
+	case "ANY":
+		return ResourceAny, nil
+	case "TOPIC":
+		return ResourceTopic, nil
+	case "GROUP":
+		return ResourceGroup, nil
+	case "BROKER":
+		return ResourceBroker, nil
+	default:
+		return ResourceUnknown, newGoError(ErrInvalidArg)
+	}
+}
+
+// ConfigSource represents an Apache Kafka config source
+type ConfigSource int
+
+const (
+	// ConfigSourceUnknown is the default value
+	ConfigSourceUnknown = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG)
+	// ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic
+	ConfigSourceDynamicTopic = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG)
+	// ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker
+	ConfigSourceDynamicBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG)
+	// ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster
+	ConfigSourceDynamicDefaultBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG)
+	// ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file)
+	ConfigSourceStaticBroker = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG)
+	// ConfigSourceDefault is built-in default configuration for configs that have a default value
+	ConfigSourceDefault = ConfigSource(C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG)
+)
+
+// String returns the human-readable representation of a ConfigSource type
+func (t ConfigSource) String() string {
+	return C.GoString(C.rd_kafka_ConfigSource_name(C.rd_kafka_ConfigSource_t(t)))
+}
+
+// ConfigResource holds parameters for altering an Apache Kafka configuration resource
+type ConfigResource struct {
+	// Type of resource to set.
+	Type ResourceType
+	// Name of resource to set.
+	Name string
+	// Config entries to set.
+	// Configuration updates are atomic, any configuration property not provided
+	// here will be reverted (by the broker) to its default value.
+	// Use DescribeConfigs to retrieve the list of current configuration entry values.
+	Config []ConfigEntry
+}
+
+// String returns a human-readable representation of a ConfigResource
+func (c ConfigResource) String() string {
+	return fmt.Sprintf("Resource(%s, %s)", c.Type, c.Name)
+}
+
+// AlterOperation specifies the operation to perform on the ConfigEntry.
+// Currently only AlterOperationSet.
+type AlterOperation int
+
+const (
+	// AlterOperationSet sets/overwrites the configuration setting.
+	AlterOperationSet = iota
+)
+
+// String returns the human-readable representation of an AlterOperation
+func (o AlterOperation) String() string {
+	switch o {
+	case AlterOperationSet:
+		return "Set"
+	default:
+		return fmt.Sprintf("Unknown%d?", int(o))
+	}
+}
+
+// ConfigEntry holds parameters for altering a resource's configuration.
+type ConfigEntry struct {
+	// Name of configuration entry, e.g., topic configuration property name.
+	Name string
+	// Value of configuration entry.
+	Value string
+	// Operation to perform on the entry.
+	Operation AlterOperation
+}
+
+// StringMapToConfigEntries creates a new map of ConfigEntry objects from the
+// provided string map. The AlterOperation is set on each created entry.
+func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry {
+	var ceList []ConfigEntry
+
+	for k, v := range stringMap {
+		ceList = append(ceList, ConfigEntry{Name: k, Value: v, Operation: operation})
+	}
+
+	return ceList
+}
+
+// String returns a human-readable representation of a ConfigEntry.
+func (c ConfigEntry) String() string {
+	return fmt.Sprintf("%v %s=\"%s\"", c.Operation, c.Name, c.Value)
+}
+
+// ConfigEntryResult contains the result of a single configuration entry from a
+// DescribeConfigs request.
+type ConfigEntryResult struct {
+	// Name of configuration entry, e.g., topic configuration property name.
+	Name string
+	// Value of configuration entry.
+	Value string
+	// Source indicates the configuration source.
+	Source ConfigSource
+	// IsReadOnly indicates whether the configuration entry can be altered.
+	IsReadOnly bool
+	// IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset.
+	IsSensitive bool
+	// IsSynonym indicates whether the configuration entry is a synonym for another configuration property.
+	IsSynonym bool
+	// Synonyms contains a map of configuration entries that are synonyms to this configuration entry.
+	Synonyms map[string]ConfigEntryResult
+}
+
+// String returns a human-readable representation of a ConfigEntryResult.
+func (c ConfigEntryResult) String() string {
+	return fmt.Sprintf("%s=\"%s\"", c.Name, c.Value)
+}
+
+// setFromC sets up a ConfigEntryResult from a C ConfigEntry
+func configEntryResultFromC(cEntry *C.rd_kafka_ConfigEntry_t) (entry ConfigEntryResult) {
+	entry.Name = C.GoString(C.rd_kafka_ConfigEntry_name(cEntry))
+	cValue := C.rd_kafka_ConfigEntry_value(cEntry)
+	if cValue != nil {
+		entry.Value = C.GoString(cValue)
+	}
+	entry.Source = ConfigSource(C.rd_kafka_ConfigEntry_source(cEntry))
+	entry.IsReadOnly = cint2bool(C.rd_kafka_ConfigEntry_is_read_only(cEntry))
+	entry.IsSensitive = cint2bool(C.rd_kafka_ConfigEntry_is_sensitive(cEntry))
+	entry.IsSynonym = cint2bool(C.rd_kafka_ConfigEntry_is_synonym(cEntry))
+
+	var cSynCnt C.size_t
+	cSyns := C.rd_kafka_ConfigEntry_synonyms(cEntry, &cSynCnt)
+	if cSynCnt > 0 {
+		entry.Synonyms = make(map[string]ConfigEntryResult)
+	}
+
+	for si := 0; si < int(cSynCnt); si++ {
+		cSyn := C.ConfigEntry_by_idx(cSyns, cSynCnt, C.size_t(si))
+		Syn := configEntryResultFromC(cSyn)
+		entry.Synonyms[Syn.Name] = Syn
+	}
+
+	return entry
+}
+
+// ConfigResourceResult provides the result for a resource from a AlterConfigs or
+// DescribeConfigs request.
+type ConfigResourceResult struct {
+	// Type of returned result resource.
+	Type ResourceType
+	// Name of returned result resource.
+	Name string
+	// Error, if any, of returned result resource.
+	Error Error
+	// Config entries, if any, of returned result resource.
+	Config map[string]ConfigEntryResult
+}
+
+// String returns a human-readable representation of a ConfigResourceResult.
+func (c ConfigResourceResult) String() string {
+	if c.Error.Code() != 0 {
+		return fmt.Sprintf("ResourceResult(%s, %s, \"%v\")", c.Type, c.Name, c.Error)
+
+	}
+	return fmt.Sprintf("ResourceResult(%s, %s, %d config(s))", c.Type, c.Name, len(c.Config))
+}
+
+// waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens
+// first.
+// The returned result event is checked for errors its error is returned if set.
+func (a *AdminClient) waitResult(ctx context.Context, cQueue *C.rd_kafka_queue_t, cEventType C.rd_kafka_event_type_t) (rkev *C.rd_kafka_event_t, err error) {
+
+	resultChan := make(chan *C.rd_kafka_event_t)
+	closeChan := make(chan bool) // never written to, just closed
+
+	go func() {
+		for {
+			select {
+			case _, ok := <-closeChan:
+				if !ok {
+					// Context cancelled/timed out
+					close(resultChan)
+					return
+				}
+
+			default:
+				// Wait for result event for at most 50ms
+				// to avoid blocking for too long if
+				// context is cancelled.
+				rkev := C.rd_kafka_queue_poll(cQueue, 50)
+				if rkev != nil {
+					resultChan <- rkev
+					close(resultChan)
+					return
+				}
+			}
+		}
+	}()
+
+	select {
+	case rkev = <-resultChan:
+		// Result type check
+		if cEventType != C.rd_kafka_event_type(rkev) {
+			err = newErrorFromString(ErrInvalidType,
+				fmt.Sprintf("Expected %d result event, not %d", (int)(cEventType), (int)(C.rd_kafka_event_type(rkev))))
+			C.rd_kafka_event_destroy(rkev)
+			return nil, err
+		}
+
+		// Generic error handling
+		cErr := C.rd_kafka_event_error(rkev)
+		if cErr != 0 {
+			err = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
+			C.rd_kafka_event_destroy(rkev)
+			return nil, err
+		}
+		close(closeChan)
+		return rkev, nil
+	case <-ctx.Done():
+		// signal close to go-routine
+		close(closeChan)
+		// wait for close from go-routine to make sure it is done
+		// using cQueue before we return.
+		rkev, ok := <-resultChan
+		if ok {
+			// throw away result since context was cancelled
+			C.rd_kafka_event_destroy(rkev)
+		}
+		return nil, ctx.Err()
+	}
+}
+
+// cToTopicResults converts a C topic_result_t array to Go TopicResult list.
+func (a *AdminClient) cToTopicResults(cTopicRes **C.rd_kafka_topic_result_t, cCnt C.size_t) (result []TopicResult, err error) {
+
+	result = make([]TopicResult, int(cCnt))
+
+	for i := 0; i < int(cCnt); i++ {
+		cTopic := C.topic_result_by_idx(cTopicRes, cCnt, C.size_t(i))
+		result[i].Topic = C.GoString(C.rd_kafka_topic_result_name(cTopic))
+		result[i].Error = newErrorFromCString(
+			C.rd_kafka_topic_result_error(cTopic),
+			C.rd_kafka_topic_result_error_string(cTopic))
+	}
+
+	return result, nil
+}
+
+// cConfigResourceToResult converts a C ConfigResource result array to Go ConfigResourceResult
+func (a *AdminClient) cConfigResourceToResult(cRes **C.rd_kafka_ConfigResource_t, cCnt C.size_t) (result []ConfigResourceResult, err error) {
+
+	result = make([]ConfigResourceResult, int(cCnt))
+
+	for i := 0; i < int(cCnt); i++ {
+		cRes := C.ConfigResource_by_idx(cRes, cCnt, C.size_t(i))
+		result[i].Type = ResourceType(C.rd_kafka_ConfigResource_type(cRes))
+		result[i].Name = C.GoString(C.rd_kafka_ConfigResource_name(cRes))
+		result[i].Error = newErrorFromCString(
+			C.rd_kafka_ConfigResource_error(cRes),
+			C.rd_kafka_ConfigResource_error_string(cRes))
+		var cConfigCnt C.size_t
+		cConfigs := C.rd_kafka_ConfigResource_configs(cRes, &cConfigCnt)
+		if cConfigCnt > 0 {
+			result[i].Config = make(map[string]ConfigEntryResult)
+		}
+		for ci := 0; ci < int(cConfigCnt); ci++ {
+			cEntry := C.ConfigEntry_by_idx(cConfigs, cConfigCnt, C.size_t(ci))
+			entry := configEntryResultFromC(cEntry)
+			result[i].Config[entry.Name] = entry
+		}
+	}
+
+	return result, nil
+}
+
+// CreateTopics creates topics in cluster.
+//
+// The list of TopicSpecification objects define the per-topic partition count, replicas, etc.
+//
+// Topic creation is non-atomic and may succeed for some topics but fail for others,
+// make sure to check the result for topic-specific errors.
+//
+// Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API.
+func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error) {
+	cTopics := make([]*C.rd_kafka_NewTopic_t, len(topics))
+
+	cErrstrSize := C.size_t(512)
+	cErrstr := (*C.char)(C.malloc(cErrstrSize))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	// Convert Go TopicSpecifications to C TopicSpecifications
+	for i, topic := range topics {
+
+		var cReplicationFactor C.int
+		if topic.ReplicationFactor == 0 {
+			cReplicationFactor = -1
+		} else {
+			cReplicationFactor = C.int(topic.ReplicationFactor)
+		}
+		if topic.ReplicaAssignment != nil {
+			if cReplicationFactor != -1 {
+				return nil, newErrorFromString(ErrInvalidArg,
+					"TopicSpecification.ReplicationFactor and TopicSpecification.ReplicaAssignment are mutually exclusive")
+			}
+
+			if len(topic.ReplicaAssignment) != topic.NumPartitions {
+				return nil, newErrorFromString(ErrInvalidArg,
+					"TopicSpecification.ReplicaAssignment must contain exactly TopicSpecification.NumPartitions partitions")
+			}
+
+		} else if cReplicationFactor == -1 {
+			return nil, newErrorFromString(ErrInvalidArg,
+				"TopicSpecification.ReplicationFactor or TopicSpecification.ReplicaAssignment must be specified")
+		}
+
+		cTopics[i] = C.rd_kafka_NewTopic_new(
+			C.CString(topic.Topic),
+			C.int(topic.NumPartitions),
+			cReplicationFactor,
+			cErrstr, cErrstrSize)
+		if cTopics[i] == nil {
+			return nil, newErrorFromString(ErrInvalidArg,
+				fmt.Sprintf("Topic %s: %s", topic.Topic, C.GoString(cErrstr)))
+		}
+
+		defer C.rd_kafka_NewTopic_destroy(cTopics[i])
+
+		for p, replicas := range topic.ReplicaAssignment {
+			cReplicas := make([]C.int32_t, len(replicas))
+			for ri, replica := range replicas {
+				cReplicas[ri] = C.int32_t(replica)
+			}
+			cErr := C.rd_kafka_NewTopic_set_replica_assignment(
+				cTopics[i], C.int32_t(p),
+				(*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)),
+				cErrstr, cErrstrSize)
+			if cErr != 0 {
+				return nil, newCErrorFromString(cErr,
+					fmt.Sprintf("Failed to set replica assignment for topic %s partition %d: %s", topic.Topic, p, C.GoString(cErrstr)))
+			}
+		}
+
+		for key, value := range topic.Config {
+			cErr := C.rd_kafka_NewTopic_set_config(
+				cTopics[i],
+				C.CString(key), C.CString(value))
+			if cErr != 0 {
+				return nil, newCErrorFromString(cErr,
+					fmt.Sprintf("Failed to set config %s=%s for topic %s", key, value, topic.Topic))
+			}
+		}
+	}
+
+	// Convert Go AdminOptions (if any) to C AdminOptions
+	genericOptions := make([]AdminOption, len(options))
+	for i := range options {
+		genericOptions[i] = options[i]
+	}
+	cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATETOPICS, genericOptions)
+	if err != nil {
+		return nil, err
+	}
+	defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+	// Create temporary queue for async operation
+	cQueue := C.rd_kafka_queue_new(a.handle.rk)
+	defer C.rd_kafka_queue_destroy(cQueue)
+
+	// Asynchronous call
+	C.rd_kafka_CreateTopics(
+		a.handle.rk,
+		(**C.rd_kafka_NewTopic_t)(&cTopics[0]),
+		C.size_t(len(cTopics)),
+		cOptions,
+		cQueue)
+
+	// Wait for result, error or context timeout
+	rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATETOPICS_RESULT)
+	if err != nil {
+		return nil, err
+	}
+	defer C.rd_kafka_event_destroy(rkev)
+
+	cRes := C.rd_kafka_event_CreateTopics_result(rkev)
+
+	// Convert result from C to Go
+	var cCnt C.size_t
+	cTopicRes := C.rd_kafka_CreateTopics_result_topics(cRes, &cCnt)
+
+	return a.cToTopicResults(cTopicRes, cCnt)
+}
+
+// DeleteTopics deletes a batch of topics.
+//
+// This operation is not transactional and may succeed for a subset of topics while
+// failing others.
+// It may take several seconds after the DeleteTopics result returns success for
+// all the brokers to become aware that the topics are gone. During this time,
+// topic metadata and configuration may continue to return information about deleted topics.
+//
+// Requires broker version >= 0.10.1.0
+func (a *AdminClient) DeleteTopics(ctx context.Context, topics []string, options ...DeleteTopicsAdminOption) (result []TopicResult, err error) {
+	cTopics := make([]*C.rd_kafka_DeleteTopic_t, len(topics))
+
+	cErrstrSize := C.size_t(512)
+	cErrstr := (*C.char)(C.malloc(cErrstrSize))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	// Convert Go DeleteTopics to C DeleteTopics
+	for i, topic := range topics {
+		cTopics[i] = C.rd_kafka_DeleteTopic_new(C.CString(topic))
+		if cTopics[i] == nil {
+			return nil, newErrorFromString(ErrInvalidArg,
+				fmt.Sprintf("Invalid arguments for topic %s", topic))
+		}
+
+		defer C.rd_kafka_DeleteTopic_destroy(cTopics[i])
+	}
+
+	// Convert Go AdminOptions (if any) to C AdminOptions
+	genericOptions := make([]AdminOption, len(options))
+	for i := range options {
+		genericOptions[i] = options[i]
+	}
+	cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DELETETOPICS, genericOptions)
+	if err != nil {
+		return nil, err
+	}
+	defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+	// Create temporary queue for async operation
+	cQueue := C.rd_kafka_queue_new(a.handle.rk)
+	defer C.rd_kafka_queue_destroy(cQueue)
+
+	// Asynchronous call
+	C.rd_kafka_DeleteTopics(
+		a.handle.rk,
+		(**C.rd_kafka_DeleteTopic_t)(&cTopics[0]),
+		C.size_t(len(cTopics)),
+		cOptions,
+		cQueue)
+
+	// Wait for result, error or context timeout
+	rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DELETETOPICS_RESULT)
+	if err != nil {
+		return nil, err
+	}
+	defer C.rd_kafka_event_destroy(rkev)
+
+	cRes := C.rd_kafka_event_DeleteTopics_result(rkev)
+
+	// Convert result from C to Go
+	var cCnt C.size_t
+	cTopicRes := C.rd_kafka_DeleteTopics_result_topics(cRes, &cCnt)
+
+	return a.cToTopicResults(cTopicRes, cCnt)
+}
+
+// CreatePartitions creates additional partitions for topics.
+func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error) {
+	cParts := make([]*C.rd_kafka_NewPartitions_t, len(partitions))
+
+	cErrstrSize := C.size_t(512)
+	cErrstr := (*C.char)(C.malloc(cErrstrSize))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	// Convert Go PartitionsSpecification to C NewPartitions
+	for i, part := range partitions {
+		cParts[i] = C.rd_kafka_NewPartitions_new(C.CString(part.Topic), C.size_t(part.IncreaseTo), cErrstr, cErrstrSize)
+		if cParts[i] == nil {
+			return nil, newErrorFromString(ErrInvalidArg,
+				fmt.Sprintf("Topic %s: %s", part.Topic, C.GoString(cErrstr)))
+		}
+
+		defer C.rd_kafka_NewPartitions_destroy(cParts[i])
+
+		for pidx, replicas := range part.ReplicaAssignment {
+			cReplicas := make([]C.int32_t, len(replicas))
+			for ri, replica := range replicas {
+				cReplicas[ri] = C.int32_t(replica)
+			}
+			cErr := C.rd_kafka_NewPartitions_set_replica_assignment(
+				cParts[i], C.int32_t(pidx),
+				(*C.int32_t)(&cReplicas[0]), C.size_t(len(cReplicas)),
+				cErrstr, cErrstrSize)
+			if cErr != 0 {
+				return nil, newCErrorFromString(cErr,
+					fmt.Sprintf("Failed to set replica assignment for topic %s new partition index %d: %s", part.Topic, pidx, C.GoString(cErrstr)))
+			}
+		}
+
+	}
+
+	// Convert Go AdminOptions (if any) to C AdminOptions
+	genericOptions := make([]AdminOption, len(options))
+	for i := range options {
+		genericOptions[i] = options[i]
+	}
+	cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, genericOptions)
+	if err != nil {
+		return nil, err
+	}
+	defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+	// Create temporary queue for async operation
+	cQueue := C.rd_kafka_queue_new(a.handle.rk)
+	defer C.rd_kafka_queue_destroy(cQueue)
+
+	// Asynchronous call
+	C.rd_kafka_CreatePartitions(
+		a.handle.rk,
+		(**C.rd_kafka_NewPartitions_t)(&cParts[0]),
+		C.size_t(len(cParts)),
+		cOptions,
+		cQueue)
+
+	// Wait for result, error or context timeout
+	rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT)
+	if err != nil {
+		return nil, err
+	}
+	defer C.rd_kafka_event_destroy(rkev)
+
+	cRes := C.rd_kafka_event_CreatePartitions_result(rkev)
+
+	// Convert result from C to Go
+	var cCnt C.size_t
+	cTopicRes := C.rd_kafka_CreatePartitions_result_topics(cRes, &cCnt)
+
+	return a.cToTopicResults(cTopicRes, cCnt)
+}
+
+// AlterConfigs alters/updates cluster resource configuration.
+//
+// Updates are not transactional so they may succeed for a subset
+// of the provided resources while others fail.
+// The configuration for a particular resource is updated atomically,
+// replacing values using the provided ConfigEntrys and reverting
+// unspecified ConfigEntrys to their default values.
+//
+// Requires broker version >=0.11.0.0
+//
+// AlterConfigs will replace all existing configuration for
+// the provided resources with the new configuration given,
+// reverting all other configuration to their default values.
+//
+// Multiple resources and resource types may be set, but at most one
+// resource of type ResourceBroker is allowed per call since these
+// resource requests must be sent to the broker specified in the resource.
+func (a *AdminClient) AlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error) {
+	cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources))
+
+	cErrstrSize := C.size_t(512)
+	cErrstr := (*C.char)(C.malloc(cErrstrSize))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	// Convert Go ConfigResources to C ConfigResources
+	for i, res := range resources {
+		cRes[i] = C.rd_kafka_ConfigResource_new(
+			C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name))
+		if cRes[i] == nil {
+			return nil, newErrorFromString(ErrInvalidArg,
+				fmt.Sprintf("Invalid arguments for resource %v", res))
+		}
+
+		defer C.rd_kafka_ConfigResource_destroy(cRes[i])
+
+		for _, entry := range res.Config {
+			var cErr C.rd_kafka_resp_err_t
+			switch entry.Operation {
+			case AlterOperationSet:
+				cErr = C.rd_kafka_ConfigResource_set_config(
+					cRes[i], C.CString(entry.Name), C.CString(entry.Value))
+			default:
+				panic(fmt.Sprintf("Invalid ConfigEntry.Operation: %v", entry.Operation))
+			}
+
+			if cErr != 0 {
+				return nil,
+					newCErrorFromString(cErr,
+						fmt.Sprintf("Failed to add configuration %s: %s",
+							entry, C.GoString(C.rd_kafka_err2str(cErr))))
+			}
+		}
+	}
+
+	// Convert Go AdminOptions (if any) to C AdminOptions
+	genericOptions := make([]AdminOption, len(options))
+	for i := range options {
+		genericOptions[i] = options[i]
+	}
+	cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_ALTERCONFIGS, genericOptions)
+	if err != nil {
+		return nil, err
+	}
+	defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+	// Create temporary queue for async operation
+	cQueue := C.rd_kafka_queue_new(a.handle.rk)
+	defer C.rd_kafka_queue_destroy(cQueue)
+
+	// Asynchronous call
+	C.rd_kafka_AlterConfigs(
+		a.handle.rk,
+		(**C.rd_kafka_ConfigResource_t)(&cRes[0]),
+		C.size_t(len(cRes)),
+		cOptions,
+		cQueue)
+
+	// Wait for result, error or context timeout
+	rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_ALTERCONFIGS_RESULT)
+	if err != nil {
+		return nil, err
+	}
+	defer C.rd_kafka_event_destroy(rkev)
+
+	cResult := C.rd_kafka_event_AlterConfigs_result(rkev)
+
+	// Convert results from C to Go
+	var cCnt C.size_t
+	cResults := C.rd_kafka_AlterConfigs_result_resources(cResult, &cCnt)
+
+	return a.cConfigResourceToResult(cResults, cCnt)
+}
+
+// DescribeConfigs retrieves configuration for cluster resources.
+//
+// The returned configuration includes default values, use
+// ConfigEntryResult.IsDefault or ConfigEntryResult.Source to distinguish
+// default values from manually configured settings.
+//
+// The value of config entries where .IsSensitive is true
+// will always be nil to avoid disclosing sensitive
+// information, such as security settings.
+//
+// Configuration entries where .IsReadOnly is true can't be modified
+// (with AlterConfigs).
+//
+// Synonym configuration entries are returned if the broker supports
+// it (broker version >= 1.1.0). See .Synonyms.
+//
+// Requires broker version >=0.11.0.0
+//
+// Multiple resources and resource types may be requested, but at most
+// one resource of type ResourceBroker is allowed per call
+// since these resource requests must be sent to the broker specified
+// in the resource.
+func (a *AdminClient) DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error) {
+	cRes := make([]*C.rd_kafka_ConfigResource_t, len(resources))
+
+	cErrstrSize := C.size_t(512)
+	cErrstr := (*C.char)(C.malloc(cErrstrSize))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	// Convert Go ConfigResources to C ConfigResources
+	for i, res := range resources {
+		cRes[i] = C.rd_kafka_ConfigResource_new(
+			C.rd_kafka_ResourceType_t(res.Type), C.CString(res.Name))
+		if cRes[i] == nil {
+			return nil, newErrorFromString(ErrInvalidArg,
+				fmt.Sprintf("Invalid arguments for resource %v", res))
+		}
+
+		defer C.rd_kafka_ConfigResource_destroy(cRes[i])
+	}
+
+	// Convert Go AdminOptions (if any) to C AdminOptions
+	genericOptions := make([]AdminOption, len(options))
+	for i := range options {
+		genericOptions[i] = options[i]
+	}
+	cOptions, err := adminOptionsSetup(a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, genericOptions)
+	if err != nil {
+		return nil, err
+	}
+	defer C.rd_kafka_AdminOptions_destroy(cOptions)
+
+	// Create temporary queue for async operation
+	cQueue := C.rd_kafka_queue_new(a.handle.rk)
+	defer C.rd_kafka_queue_destroy(cQueue)
+
+	// Asynchronous call
+	C.rd_kafka_DescribeConfigs(
+		a.handle.rk,
+		(**C.rd_kafka_ConfigResource_t)(&cRes[0]),
+		C.size_t(len(cRes)),
+		cOptions,
+		cQueue)
+
+	// Wait for result, error or context timeout
+	rkev, err := a.waitResult(ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT)
+	if err != nil {
+		return nil, err
+	}
+	defer C.rd_kafka_event_destroy(rkev)
+
+	cResult := C.rd_kafka_event_DescribeConfigs_result(rkev)
+
+	// Convert results from C to Go
+	var cCnt C.size_t
+	cResults := C.rd_kafka_DescribeConfigs_result_resources(cResult, &cCnt)
+
+	return a.cConfigResourceToResult(cResults, cCnt)
+}
+
+// GetMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
+func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+	return getMetadata(a, topic, allTopics, timeoutMs)
+}
+
+// String returns a human readable name for an AdminClient instance
+func (a *AdminClient) String() string {
+	return fmt.Sprintf("admin-%s", a.handle.String())
+}
+
+// get_handle implements the Handle interface
+func (a *AdminClient) gethandle() *handle {
+	return a.handle
+}
+
+// Close an AdminClient instance.
+func (a *AdminClient) Close() {
+	if a.isDerived {
+		// Derived AdminClient needs no cleanup.
+		a.handle = &handle{}
+		return
+	}
+
+	a.handle.cleanup()
+
+	C.rd_kafka_destroy(a.handle.rk)
+}
+
+// NewAdminClient creats a new AdminClient instance with a new underlying client instance
+func NewAdminClient(conf *ConfigMap) (*AdminClient, error) {
+
+	err := versionCheck()
+	if err != nil {
+		return nil, err
+	}
+
+	a := &AdminClient{}
+	a.handle = &handle{}
+
+	// Convert ConfigMap to librdkafka conf_t
+	cConf, err := conf.convert()
+	if err != nil {
+		return nil, err
+	}
+
+	cErrstr := (*C.char)(C.malloc(C.size_t(256)))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	// Create librdkafka producer instance. The Producer is somewhat cheaper than
+	// the consumer, but any instance type can be used for Admin APIs.
+	a.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
+	if a.handle.rk == nil {
+		return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+	}
+
+	a.isDerived = false
+	a.handle.setup()
+
+	return a, nil
+}
+
+// NewAdminClientFromProducer derives a new AdminClient from an existing Producer instance.
+// The AdminClient will use the same configuration and connections as the parent instance.
+func NewAdminClientFromProducer(p *Producer) (a *AdminClient, err error) {
+	if p.handle.rk == nil {
+		return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed producer")
+	}
+
+	a = &AdminClient{}
+	a.handle = &p.handle
+	a.isDerived = true
+	return a, nil
+}
+
+// NewAdminClientFromConsumer derives a new AdminClient from an existing Consumer instance.
+// The AdminClient will use the same configuration and connections as the parent instance.
+func NewAdminClientFromConsumer(c *Consumer) (a *AdminClient, err error) {
+	if c.handle.rk == nil {
+		return nil, newErrorFromString(ErrInvalidArg, "Can't derive AdminClient from closed consumer")
+	}
+
+	a = &AdminClient{}
+	a.handle = &c.handle
+	a.isDerived = true
+	return a, nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminoptions.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminoptions.go
new file mode 100644
index 0000000..19a8be0
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/adminoptions.go
@@ -0,0 +1,264 @@
+/**
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"fmt"
+	"time"
+	"unsafe"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+#include <stdlib.h>
+*/
+import "C"
+
+// AdminOptionOperationTimeout sets the broker's operation timeout, such as the
+// timeout for CreateTopics to complete the creation of topics on the controller
+// before returning a result to the application.
+//
+// CreateTopics, DeleteTopics, CreatePartitions:
+// a value 0 will return immediately after triggering topic
+// creation, while > 0 will wait this long for topic creation to propagate
+// in cluster.
+//
+// Default: 0 (return immediately).
+//
+// Valid for CreateTopics, DeleteTopics, CreatePartitions.
+type AdminOptionOperationTimeout struct {
+	isSet bool
+	val   time.Duration
+}
+
+func (ao AdminOptionOperationTimeout) supportsCreateTopics() {
+}
+func (ao AdminOptionOperationTimeout) supportsDeleteTopics() {
+}
+func (ao AdminOptionOperationTimeout) supportsCreatePartitions() {
+}
+
+func (ao AdminOptionOperationTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
+	if !ao.isSet {
+		return nil
+	}
+
+	cErrstrSize := C.size_t(512)
+	cErrstr := (*C.char)(C.malloc(cErrstrSize))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	cErr := C.rd_kafka_AdminOptions_set_operation_timeout(
+		cOptions, C.int(durationToMilliseconds(ao.val)),
+		cErrstr, cErrstrSize)
+	if cErr != 0 {
+		C.rd_kafka_AdminOptions_destroy(cOptions)
+		return newCErrorFromString(cErr,
+			fmt.Sprintf("Failed to set operation timeout: %s", C.GoString(cErrstr)))
+
+	}
+
+	return nil
+}
+
+// SetAdminOperationTimeout sets the broker's operation timeout, such as the
+// timeout for CreateTopics to complete the creation of topics on the controller
+// before returning a result to the application.
+//
+// CreateTopics, DeleteTopics, CreatePartitions:
+// a value 0 will return immediately after triggering topic
+// creation, while > 0 will wait this long for topic creation to propagate
+// in cluster.
+//
+// Default: 0 (return immediately).
+//
+// Valid for CreateTopics, DeleteTopics, CreatePartitions.
+func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout) {
+	ao.isSet = true
+	ao.val = t
+	return ao
+}
+
+// AdminOptionRequestTimeout sets the overall request timeout, including broker
+// lookup, request transmission, operation time on broker, and response.
+//
+// Default: `socket.timeout.ms`.
+//
+// Valid for all Admin API methods.
+type AdminOptionRequestTimeout struct {
+	isSet bool
+	val   time.Duration
+}
+
+func (ao AdminOptionRequestTimeout) supportsCreateTopics() {
+}
+func (ao AdminOptionRequestTimeout) supportsDeleteTopics() {
+}
+func (ao AdminOptionRequestTimeout) supportsCreatePartitions() {
+}
+func (ao AdminOptionRequestTimeout) supportsAlterConfigs() {
+}
+func (ao AdminOptionRequestTimeout) supportsDescribeConfigs() {
+}
+
+func (ao AdminOptionRequestTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
+	if !ao.isSet {
+		return nil
+	}
+
+	cErrstrSize := C.size_t(512)
+	cErrstr := (*C.char)(C.malloc(cErrstrSize))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	cErr := C.rd_kafka_AdminOptions_set_request_timeout(
+		cOptions, C.int(durationToMilliseconds(ao.val)),
+		cErrstr, cErrstrSize)
+	if cErr != 0 {
+		C.rd_kafka_AdminOptions_destroy(cOptions)
+		return newCErrorFromString(cErr,
+			fmt.Sprintf("%s", C.GoString(cErrstr)))
+
+	}
+
+	return nil
+}
+
+// SetAdminRequestTimeout sets the overall request timeout, including broker
+// lookup, request transmission, operation time on broker, and response.
+//
+// Default: `socket.timeout.ms`.
+//
+// Valid for all Admin API methods.
+func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout) {
+	ao.isSet = true
+	ao.val = t
+	return ao
+}
+
+// AdminOptionValidateOnly tells the broker to only validate the request,
+// without performing the requested operation (create topics, etc).
+//
+// Default: false.
+//
+// Valid for CreateTopics, CreatePartitions, AlterConfigs
+type AdminOptionValidateOnly struct {
+	isSet bool
+	val   bool
+}
+
+func (ao AdminOptionValidateOnly) supportsCreateTopics() {
+}
+func (ao AdminOptionValidateOnly) supportsCreatePartitions() {
+}
+func (ao AdminOptionValidateOnly) supportsAlterConfigs() {
+}
+
+func (ao AdminOptionValidateOnly) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
+	if !ao.isSet {
+		return nil
+	}
+
+	cErrstrSize := C.size_t(512)
+	cErrstr := (*C.char)(C.malloc(cErrstrSize))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	cErr := C.rd_kafka_AdminOptions_set_validate_only(
+		cOptions, bool2cint(ao.val),
+		cErrstr, cErrstrSize)
+	if cErr != 0 {
+		C.rd_kafka_AdminOptions_destroy(cOptions)
+		return newCErrorFromString(cErr,
+			fmt.Sprintf("%s", C.GoString(cErrstr)))
+
+	}
+
+	return nil
+}
+
+// SetAdminValidateOnly tells the broker to only validate the request,
+// without performing the requested operation (create topics, etc).
+//
+// Default: false.
+//
+// Valid for CreateTopics, DeleteTopics, CreatePartitions, AlterConfigs
+func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly) {
+	ao.isSet = true
+	ao.val = validateOnly
+	return ao
+}
+
+// CreateTopicsAdminOption - see setters.
+//
+// See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
+type CreateTopicsAdminOption interface {
+	supportsCreateTopics()
+	apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+// DeleteTopicsAdminOption - see setters.
+//
+// See SetAdminRequestTimeout, SetAdminOperationTimeout.
+type DeleteTopicsAdminOption interface {
+	supportsDeleteTopics()
+	apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+// CreatePartitionsAdminOption - see setters.
+//
+// See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
+type CreatePartitionsAdminOption interface {
+	supportsCreatePartitions()
+	apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+// AlterConfigsAdminOption - see setters.
+//
+// See SetAdminRequestTimeout, SetAdminValidateOnly, SetAdminIncremental.
+type AlterConfigsAdminOption interface {
+	supportsAlterConfigs()
+	apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+// DescribeConfigsAdminOption - see setters.
+//
+// See SetAdminRequestTimeout.
+type DescribeConfigsAdminOption interface {
+	supportsDescribeConfigs()
+	apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+// AdminOption is a generic type not to be used directly.
+//
+// See CreateTopicsAdminOption et.al.
+type AdminOption interface {
+	apply(cOptions *C.rd_kafka_AdminOptions_t) error
+}
+
+func adminOptionsSetup(h *handle, opType C.rd_kafka_admin_op_t, options []AdminOption) (*C.rd_kafka_AdminOptions_t, error) {
+
+	cOptions := C.rd_kafka_AdminOptions_new(h.rk, opType)
+	for _, opt := range options {
+		if opt == nil {
+			continue
+		}
+		err := opt.apply(cOptions)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return cOptions, nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/api.html b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/api.html
new file mode 100644
index 0000000..05c8fed
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/api.html
@@ -0,0 +1,1632 @@
+<!DOCTYPE html>
+<html>
+ <head>
+  <meta content="text/html; charset=utf-8" http-equiv="Content-Type">
+   <meta content="width=device-width, initial-scale=1" name="viewport">
+    <meta content="#375EAB" name="theme-color">
+     <title>
+      kafka - The Go Programming Language
+     </title>
+     <link href="http://golang.org/lib/godoc/style.css" rel="stylesheet" type="text/css">
+      <link href="http://golang.org/lib/godoc/jquery.treeview.css" rel="stylesheet">
+       <script type="text/javascript">
+        window.initFuncs = [];
+       </script>
+      </link>
+     </link>
+    </meta>
+   </meta>
+  </meta>
+ </head>
+ <body>
+  <div id="lowframe" style="position: fixed; bottom: 0; left: 0; height: 0; width: 100%; border-top: thin solid grey; background-color: white; overflow: auto;">
+   ...
+  </div>
+  <!-- #lowframe -->
+  <div class="wide" id="page">
+   <div class="container">
+    <h1>
+     Package kafka
+    </h1>
+    <div id="nav">
+    </div>
+    <!--
+	Copyright 2009 The Go Authors. All rights reserved.
+	Use of this source code is governed by a BSD-style
+	license that can be found in the LICENSE file.
+-->
+    <!--
+	Note: Static (i.e., not template-generated) href and id
+	attributes start with "pkg-" to make it impossible for
+	them to conflict with generated attributes (some of which
+	correspond to Go identifiers).
+-->
+    <script type="text/javascript">
+     document.ANALYSIS_DATA = null;
+	document.CALLGRAPH = null;
+    </script>
+    <div id="short-nav">
+     <dl>
+      <dd>
+       <code>
+        import "github.com/confluentinc/confluent-kafka-go/kafka"
+       </code>
+      </dd>
+     </dl>
+     <dl>
+      <dd>
+       <a class="overviewLink" href="#pkg-overview">
+        Overview
+       </a>
+      </dd>
+      <dd>
+       <a class="indexLink" href="#pkg-index">
+        Index
+       </a>
+      </dd>
+      <dd>
+      </dd>
+     </dl>
+    </div>
+    <!-- The package's Name is printed as title by the top-level template -->
+    <div class="toggleVisible" id="pkg-overview">
+     <div class="collapsed">
+      <h2 class="toggleButton" title="Click to show Overview section">
+       Overview â–¹
+      </h2>
+     </div>
+     <div class="expanded">
+      <h2 class="toggleButton" title="Click to hide Overview section">
+       Overview â–¾
+      </h2>
+      <p>
+       Package kafka provides high-level Apache Kafka producer and consumers
+using bindings on-top of the librdkafka C library.
+      </p>
+      <h3 id="hdr-High_level_Consumer">
+       High-level Consumer
+      </h3>
+      <p>
+       * Decide if you want to read messages and events from the `.Events()` channel
+(set `"go.events.channel.enable": true`) or by calling `.Poll()`.
+      </p>
+      <p>
+       * Create a Consumer with `kafka.NewConsumer()` providing at
+least the `bootstrap.servers` and `group.id` configuration properties.
+      </p>
+      <p>
+       * Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics)
+to join the group with the specified subscription set.
+Subscriptions are atomic, calling `.Subscribe*()` again will leave
+the group and rejoin with the new set of topics.
+      </p>
+      <p>
+       * Start reading events and messages from either the `.Events` channel
+or by calling `.Poll()`.
+      </p>
+      <p>
+       * When the group has rebalanced each client member is assigned a
+(sub-)set of topic+partitions.
+By default the consumer will start fetching messages for its assigned
+partitions at this point, but your application may enable rebalance
+events to get an insight into what the assigned partitions where
+as well as set the initial offsets. To do this you need to pass
+`"go.application.rebalance.enable": true` to the `NewConsumer()` call
+mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event
+with the assigned partition set. You can optionally modify the initial
+offsets (they'll default to stored offsets and if there are no previously stored
+offsets it will fall back to `"default.topic.config": ConfigMap{"auto.offset.reset": ..}`
+which defaults to the `latest` message) and then call `.Assign(partitions)`
+to start consuming. If you don't need to modify the initial offsets you will
+not need to call `.Assign()`, the client will do so automatically for you if
+you dont.
+      </p>
+      <p>
+       * As messages are fetched they will be made available on either the
+`.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.
+      </p>
+      <p>
+       * Handle messages, events and errors to your liking.
+      </p>
+      <p>
+       * When you are done consuming call `.Close()` to commit final offsets
+and leave the consumer group.
+      </p>
+      <h3 id="hdr-Producer">
+       Producer
+      </h3>
+      <p>
+       * Create a Producer with `kafka.NewProducer()` providing at least
+the `bootstrap.servers` configuration properties.
+      </p>
+      <p>
+       * Messages may now be produced either by sending a `*kafka.Message`
+on the `.ProduceChannel` or by calling `.Produce()`.
+      </p>
+      <p>
+       * Producing is an asynchronous operation so the client notifies the application
+of per-message produce success or failure through something called delivery reports.
+Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message`
+and you should check `msg.TopicPartition.Error` for `nil` to find out if the message
+was succesfully delivered or not.
+It is also possible to direct delivery reports to alternate channels
+by providing a non-nil `chan Event` channel to `.Produce()`.
+If no delivery reports are wanted they can be completely disabled by
+setting configuration property `"go.delivery.reports": false`.
+      </p>
+      <p>
+       * When you are done producing messages you will need to make sure all messages
+are indeed delivered to the broker (or failed), remember that this is
+an asynchronous client so some of your messages may be lingering in internal
+channels or tranmission queues.
+To do this you can either keep track of the messages you've produced
+and wait for their corresponding delivery reports, or call the convenience
+function `.Flush()` that will block until all message deliveries are done
+or the provided timeout elapses.
+      </p>
+      <p>
+       * Finally call `.Close()` to decommission the producer.
+      </p>
+      <h3 id="hdr-Events">
+       Events
+      </h3>
+      <p>
+       Apart from emitting messages and delivery reports the client also communicates
+with the application through a number of different event types.
+An application may choose to handle or ignore these events.
+      </p>
+      <h3 id="hdr-Consumer_events">
+       Consumer events
+      </h3>
+      <p>
+       * `*kafka.Message` - a fetched message.
+      </p>
+      <p>
+       * `AssignedPartitions` - The assigned partition set for this client following a rebalance.
+Requires `go.application.rebalance.enable`
+      </p>
+      <p>
+       * `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance.
+`AssignedPartitions` and `RevokedPartitions` are symetrical.
+Requires `go.application.rebalance.enable`
+      </p>
+      <p>
+       * `PartitionEOF` - Consumer has reached the end of a partition.
+NOTE: The consumer will keep trying to fetch new messages for the partition.
+      </p>
+      <p>
+       * `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).
+      </p>
+      <h3 id="hdr-Producer_events">
+       Producer events
+      </h3>
+      <p>
+       * `*kafka.Message` - delivery report for produced message.
+Check `.TopicPartition.Error` for delivery result.
+      </p>
+      <h3 id="hdr-Generic_events_for_both_Consumer_and_Producer">
+       Generic events for both Consumer and Producer
+      </h3>
+      <p>
+       * `KafkaError` - client (error codes are prefixed with _) or broker error.
+These errors are normally just informational since the
+client will try its best to automatically recover (eventually).
+      </p>
+      <p>
+       Hint: If your application registers a signal notification
+(signal.Notify) makes sure the signals channel is buffered to avoid
+possible complications with blocking Poll() calls.
+      </p>
+     </div>
+    </div>
+    <div class="toggleVisible" id="pkg-index">
+     <div class="collapsed">
+      <h2 class="toggleButton" title="Click to show Index section">
+       Index â–¹
+      </h2>
+     </div>
+     <div class="expanded">
+      <h2 class="toggleButton" title="Click to hide Index section">
+       Index â–¾
+      </h2>
+      <!-- Table of contents for API; must be named manual-nav to turn off auto nav. -->
+      <div id="manual-nav">
+       <dl>
+        <dd>
+         <a href="#pkg-constants">
+          Constants
+         </a>
+        </dd>
+        <dd>
+         <a href="#LibraryVersion">
+          func LibraryVersion() (int, string)
+         </a>
+        </dd>
+        <dd>
+         <a href="#AssignedPartitions">
+          type AssignedPartitions
+         </a>
+        </dd>
+        <dd>
+         <a href="#AssignedPartitions.String">
+          func (e AssignedPartitions) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#BrokerMetadata">
+          type BrokerMetadata
+         </a>
+        </dd>
+        <dd>
+         <a href="#ConfigMap">
+          type ConfigMap
+         </a>
+        </dd>
+        <dd>
+         <a href="#ConfigMap.Set">
+          func (m ConfigMap) Set(kv string) error
+         </a>
+        </dd>
+        <dd>
+         <a href="#ConfigMap.SetKey">
+          func (m ConfigMap) SetKey(key string, value ConfigValue) error
+         </a>
+        </dd>
+        <dd>
+         <a href="#ConfigValue">
+          type ConfigValue
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer">
+          type Consumer
+         </a>
+        </dd>
+        <dd>
+         <a href="#NewConsumer">
+          func NewConsumer(conf *ConfigMap) (*Consumer, error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.Assign">
+          func (c *Consumer) Assign(partitions []TopicPartition) (err error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.Close">
+          func (c *Consumer) Close() (err error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.Commit">
+          func (c *Consumer) Commit() ([]TopicPartition, error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.CommitMessage">
+          func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.CommitOffsets">
+          func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.Events">
+          func (c *Consumer) Events() chan Event
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.GetMetadata">
+          func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.Poll">
+          func (c *Consumer) Poll(timeoutMs int) (event Event)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.QueryWatermarkOffsets">
+          func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.String">
+          func (c *Consumer) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.Subscribe">
+          func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.SubscribeTopics">
+          func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.Unassign">
+          func (c *Consumer) Unassign() (err error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Consumer.Unsubscribe">
+          func (c *Consumer) Unsubscribe() (err error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Error">
+          type Error
+         </a>
+        </dd>
+        <dd>
+         <a href="#Error.Code">
+          func (e Error) Code() ErrorCode
+         </a>
+        </dd>
+        <dd>
+         <a href="#Error.Error">
+          func (e Error) Error() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#Error.String">
+          func (e Error) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#ErrorCode">
+          type ErrorCode
+         </a>
+        </dd>
+        <dd>
+         <a href="#ErrorCode.String">
+          func (c ErrorCode) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#Event">
+          type Event
+         </a>
+        </dd>
+        <dd>
+         <a href="#Handle">
+          type Handle
+         </a>
+        </dd>
+        <dd>
+         <a href="#Message">
+          type Message
+         </a>
+        </dd>
+        <dd>
+         <a href="#Message.String">
+          func (m *Message) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#Metadata">
+          type Metadata
+         </a>
+        </dd>
+        <dd>
+         <a href="#Offset">
+          type Offset
+         </a>
+        </dd>
+        <dd>
+         <a href="#NewOffset">
+          func NewOffset(offset interface{}) (Offset, error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#OffsetTail">
+          func OffsetTail(relativeOffset Offset) Offset
+         </a>
+        </dd>
+        <dd>
+         <a href="#Offset.Set">
+          func (o Offset) Set(offset interface{}) error
+         </a>
+        </dd>
+        <dd>
+         <a href="#Offset.String">
+          func (o Offset) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#OffsetsCommitted">
+          type OffsetsCommitted
+         </a>
+        </dd>
+        <dd>
+         <a href="#OffsetsCommitted.String">
+          func (o OffsetsCommitted) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#PartitionEOF">
+          type PartitionEOF
+         </a>
+        </dd>
+        <dd>
+         <a href="#PartitionEOF.String">
+          func (p PartitionEOF) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#PartitionMetadata">
+          type PartitionMetadata
+         </a>
+        </dd>
+        <dd>
+         <a href="#Producer">
+          type Producer
+         </a>
+        </dd>
+        <dd>
+         <a href="#NewProducer">
+          func NewProducer(conf *ConfigMap) (*Producer, error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Producer.Close">
+          func (p *Producer) Close()
+         </a>
+        </dd>
+        <dd>
+         <a href="#Producer.Events">
+          func (p *Producer) Events() chan Event
+         </a>
+        </dd>
+        <dd>
+         <a href="#Producer.Flush">
+          func (p *Producer) Flush(timeoutMs int) int
+         </a>
+        </dd>
+        <dd>
+         <a href="#Producer.GetMetadata">
+          func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Producer.Len">
+          func (p *Producer) Len() int
+         </a>
+        </dd>
+        <dd>
+         <a href="#Producer.Produce">
+          func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error
+         </a>
+        </dd>
+        <dd>
+         <a href="#Producer.ProduceChannel">
+          func (p *Producer) ProduceChannel() chan *Message
+         </a>
+        </dd>
+        <dd>
+         <a href="#Producer.QueryWatermarkOffsets">
+          func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
+         </a>
+        </dd>
+        <dd>
+         <a href="#Producer.String">
+          func (p *Producer) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#RebalanceCb">
+          type RebalanceCb
+         </a>
+        </dd>
+        <dd>
+         <a href="#RevokedPartitions">
+          type RevokedPartitions
+         </a>
+        </dd>
+        <dd>
+         <a href="#RevokedPartitions.String">
+          func (e RevokedPartitions) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#TimestampType">
+          type TimestampType
+         </a>
+        </dd>
+        <dd>
+         <a href="#TimestampType.String">
+          func (t TimestampType) String() string
+         </a>
+        </dd>
+        <dd>
+         <a href="#TopicMetadata">
+          type TopicMetadata
+         </a>
+        </dd>
+        <dd>
+         <a href="#TopicPartition">
+          type TopicPartition
+         </a>
+        </dd>
+        <dd>
+         <a href="#TopicPartition.String">
+          func (p TopicPartition) String() string
+         </a>
+        </dd>
+       </dl>
+      </div>
+      <!-- #manual-nav -->
+      <h4>
+       Package files
+      </h4>
+      <p>
+       <span style="font-size:90%">
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/build_dynamic.go">
+         build_dynamic.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go">
+         config.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go">
+         consumer.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go">
+         error.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go">
+         event.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go">
+         generated_errors.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/handle.go">
+         handle.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go">
+         kafka.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go">
+         message.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go">
+         metadata.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/misc.go">
+         misc.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go">
+         producer.go
+        </a>
+        <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/testhelpers.go">
+         testhelpers.go
+        </a>
+       </span>
+      </p>
+     </div>
+     <!-- .expanded -->
+    </div>
+    <!-- #pkg-index -->
+    <div class="toggle" id="pkg-callgraph" style="display: none">
+     <div class="collapsed">
+      <h2 class="toggleButton" title="Click to show Internal Call Graph section">
+       Internal call graph â–¹
+      </h2>
+     </div>
+     <!-- .expanded -->
+     <div class="expanded">
+      <h2 class="toggleButton" title="Click to hide Internal Call Graph section">
+       Internal call graph â–¾
+      </h2>
+      <p>
+       In the call graph viewer below, each node
+			  is a function belonging to this package
+			  and its children are the functions it
+			  calls—perhaps dynamically.
+      </p>
+      <p>
+       The root nodes are the entry points of the
+			  package: functions that may be called from
+			  outside the package.
+			  There may be non-exported or anonymous
+			  functions among them if they are called
+			  dynamically from another package.
+      </p>
+      <p>
+       Click a node to visit that function's source code.
+			  From there you can visit its callers by
+			  clicking its declaring
+       <code>
+        func
+       </code>
+       token.
+      </p>
+      <p>
+       Functions may be omitted if they were
+			  determined to be unreachable in the
+			  particular programs or tests that were
+			  analyzed.
+      </p>
+      <!-- Zero means show all package entry points. -->
+      <ul class="treeview" id="callgraph-0" style="margin-left: 0.5in">
+      </ul>
+     </div>
+    </div>
+    <!-- #pkg-callgraph -->
+    <h2 id="pkg-constants">
+     Constants
+    </h2>
+    <pre>const (
+    <span class="comment">// TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support</span>
+    <span id="TimestampNotAvailable">TimestampNotAvailable</span> = <a href="#TimestampType">TimestampType</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_TIMESTAMP_NOT_AVAILABLE">RD_KAFKA_TIMESTAMP_NOT_AVAILABLE</a>)
+    <span class="comment">// TimestampCreateTime indicates timestamp set by producer (source time)</span>
+    <span id="TimestampCreateTime">TimestampCreateTime</span> = <a href="#TimestampType">TimestampType</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_TIMESTAMP_CREATE_TIME">RD_KAFKA_TIMESTAMP_CREATE_TIME</a>)
+    <span class="comment">// TimestampLogAppendTime indicates timestamp set set by broker (store time)</span>
+    <span id="TimestampLogAppendTime">TimestampLogAppendTime</span> = <a href="#TimestampType">TimestampType</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME">RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME</a>)
+)</pre>
+    <pre>const <span id="OffsetBeginning">OffsetBeginning</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_BEGINNING">RD_KAFKA_OFFSET_BEGINNING</a>)</pre>
+    <p>
+     Earliest offset (logical)
+    </p>
+    <pre>const <span id="OffsetEnd">OffsetEnd</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_END">RD_KAFKA_OFFSET_END</a>)</pre>
+    <p>
+     Latest offset (logical)
+    </p>
+    <pre>const <span id="OffsetInvalid">OffsetInvalid</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_INVALID">RD_KAFKA_OFFSET_INVALID</a>)</pre>
+    <p>
+     Invalid/unspecified offset
+    </p>
+    <pre>const <span id="OffsetStored">OffsetStored</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_STORED">RD_KAFKA_OFFSET_STORED</a>)</pre>
+    <p>
+     Use stored offset
+    </p>
+    <pre>const <span id="PartitionAny">PartitionAny</span> = <a href="http://golang.org/pkg/builtin/#int32">int32</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_PARTITION_UA">RD_KAFKA_PARTITION_UA</a>)</pre>
+    <p>
+     Any partition (for partitioning), or unspecified value (for all other cases)
+    </p>
+    <h2 id="LibraryVersion">
+     func
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=10095:10130#L292">
+      LibraryVersion
+     </a>
+    </h2>
+    <pre>func LibraryVersion() (<a href="http://golang.org/pkg/builtin/#int">int</a>, <a href="http://golang.org/pkg/builtin/#string">string</a>)</pre>
+    <p>
+     LibraryVersion returns the underlying librdkafka library version as a
+(version_int, version_str) tuple.
+    </p>
+    <h2 id="AssignedPartitions">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1621:1684#L49">
+      AssignedPartitions
+     </a>
+    </h2>
+    <pre>type AssignedPartitions struct {
+    Partitions []<a href="#TopicPartition">TopicPartition</a>
+}</pre>
+    <p>
+     AssignedPartitions consumer group rebalance event: assigned partition set
+    </p>
+    <h3 id="AssignedPartitions.String">
+     func (AssignedPartitions)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1686:1729#L53">
+      String
+     </a>
+    </h3>
+    <pre>func (e <a href="#AssignedPartitions">AssignedPartitions</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <h2 id="BrokerMetadata">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1266:1331#L37">
+      BrokerMetadata
+     </a>
+    </h2>
+    <pre>type BrokerMetadata struct {
+    ID   <a href="http://golang.org/pkg/builtin/#int32">int32</a>
+    Host <a href="http://golang.org/pkg/builtin/#string">string</a>
+    Port <a href="http://golang.org/pkg/builtin/#int">int</a>
+}</pre>
+    <p>
+     BrokerMetadata contains per-broker metadata
+    </p>
+    <h2 id="ConfigMap">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=1172:1209#L31">
+      ConfigMap
+     </a>
+    </h2>
+    <pre>type ConfigMap map[<a href="http://golang.org/pkg/builtin/#string">string</a>]<a href="#ConfigValue">ConfigValue</a></pre>
+    <p>
+     ConfigMap is a map contaning standard librdkafka configuration properties as documented in:
+     <a href="https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md">
+      https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md
+     </a>
+    </p>
+    <p>
+     The special property "default.topic.config" (optional) is a ConfigMap containing default topic
+configuration properties.
+    </p>
+    <h3 id="ConfigMap.Set">
+     func (ConfigMap)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=1813:1852#L52">
+      Set
+     </a>
+    </h3>
+    <pre>func (m <a href="#ConfigMap">ConfigMap</a>) Set(kv <a href="http://golang.org/pkg/builtin/#string">string</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+    <p>
+     Set implements flag.Set (command line argument parser) as a convenience
+for `-X key=value` config.
+    </p>
+    <h3 id="ConfigMap.SetKey">
+     func (ConfigMap)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=1370:1432#L36">
+      SetKey
+     </a>
+    </h3>
+    <pre>func (m <a href="#ConfigMap">ConfigMap</a>) SetKey(key <a href="http://golang.org/pkg/builtin/#string">string</a>, value <a href="#ConfigValue">ConfigValue</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+    <p>
+     SetKey sets configuration property key to value.
+For user convenience a key prefixed with {topic}. will be
+set on the "default.topic.config" sub-map.
+    </p>
+    <h2 id="ConfigValue">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=846:874#L24">
+      ConfigValue
+     </a>
+    </h2>
+    <pre>type ConfigValue interface{}</pre>
+    <p>
+     ConfigValue supports the following types:
+    </p>
+    <pre>bool, int, string, any type with the standard String() interface
+</pre>
+    <h2 id="Consumer">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=968:1205#L25">
+      Consumer
+     </a>
+    </h2>
+    <pre>type Consumer struct {
+    <span class="comment">// contains filtered or unexported fields</span>
+}</pre>
+    <p>
+     Consumer implements a High-level Apache Kafka Consumer instance
+    </p>
+    <h3 id="NewConsumer">
+     func
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=7696:7748#L242">
+      NewConsumer
+     </a>
+    </h3>
+    <pre>func NewConsumer(conf *<a href="#ConfigMap">ConfigMap</a>) (*<a href="#Consumer">Consumer</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     NewConsumer creates a new high-level Consumer instance.
+    </p>
+    <p>
+     Supported special configuration properties:
+    </p>
+    <pre>go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
+                                     If set to true the app must handle the AssignedPartitions and
+                                     RevokedPartitions events and call Assign() and Unassign()
+                                     respectively.
+go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
+go.events.channel.size (int, 1000) - Events() channel size
+</pre>
+    <p>
+     WARNING: Due to the buffering nature of channels (and queues in general) the
+use of the events channel risks receiving outdated events and
+messages. Minimizing go.events.channel.size reduces the risk
+and number of outdated events and messages but does not eliminate
+the factor completely. With a channel size of 1 at most one
+event or message may be outdated.
+    </p>
+    <h3 id="Consumer.Assign">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=2641:2707#L82">
+      Assign
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) Assign(partitions []<a href="#TopicPartition">TopicPartition</a>) (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     Assign an atomic set of partitions to consume.
+This replaces the current assignment.
+    </p>
+    <h3 id="Consumer.Close">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=6121:6159#L202">
+      Close
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) Close() (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     Close Consumer instance.
+The object is no longer usable after this call.
+    </p>
+    <h3 id="Consumer.Commit">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=4853:4906#L159">
+      Commit
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) Commit() ([]<a href="#TopicPartition">TopicPartition</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     Commit offsets for currently assigned partitions
+This is a blocking call.
+Returns the committed offsets on success.
+    </p>
+    <h3 id="Consumer.CommitMessage">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5070:5140#L166">
+      CommitMessage
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) CommitMessage(m *<a href="#Message">Message</a>) ([]<a href="#TopicPartition">TopicPartition</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     CommitMessage commits offset based on the provided message.
+This is a blocking call.
+Returns the committed offsets on success.
+    </p>
+    <h3 id="Consumer.CommitOffsets">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5473:5557#L178">
+      CommitOffsets
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) CommitOffsets(offsets []<a href="#TopicPartition">TopicPartition</a>) ([]<a href="#TopicPartition">TopicPartition</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     CommitOffsets commits the provided list of offsets
+This is a blocking call.
+Returns the committed offsets on success.
+    </p>
+    <h3 id="Consumer.Events">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5981:6019#L196">
+      Events
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) Events() chan <a href="#Event">Event</a></pre>
+    <p>
+     Events returns the Events channel (if enabled)
+    </p>
+    <h3 id="Consumer.GetMetadata">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=10490:10585#L347">
+      GetMetadata
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) GetMetadata(topic *<a href="http://golang.org/pkg/builtin/#string">string</a>, allTopics <a href="http://golang.org/pkg/builtin/#bool">bool</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (*<a href="#Metadata">Metadata</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     GetMetadata queries broker for cluster and topic metadata.
+If topic is non-nil only information about that topic is returned, else if
+allTopics is false only information about locally used topics is returned,
+else information about all topics is returned.
+    </p>
+    <h3 id="Consumer.Poll">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5809:5861#L190">
+      Poll
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) Poll(timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (event <a href="#Event">Event</a>)</pre>
+    <p>
+     Poll the consumer for messages or events.
+    </p>
+    <h3 id="hdr-Will_block_for_at_most_timeoutMs_milliseconds">
+     Will block for at most timeoutMs milliseconds
+    </h3>
+    <p>
+     The following callbacks may be triggered:
+    </p>
+    <pre>Subscribe()'s rebalanceCb
+</pre>
+    <p>
+     Returns nil on timeout, else an Event
+    </p>
+    <h3 id="Consumer.QueryWatermarkOffsets">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=10748:10863#L353">
+      QueryWatermarkOffsets
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) QueryWatermarkOffsets(topic <a href="http://golang.org/pkg/builtin/#string">string</a>, partition <a href="http://golang.org/pkg/builtin/#int32">int32</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (low, high <a href="http://golang.org/pkg/builtin/#int64">int64</a>, err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
+and partition.
+    </p>
+    <h3 id="Consumer.String">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=1272:1306#L36">
+      String
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <p>
+     Strings returns a human readable name for a Consumer instance
+    </p>
+    <h3 id="Consumer.Subscribe">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=1518:1591#L47">
+      Subscribe
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) Subscribe(topic <a href="http://golang.org/pkg/builtin/#string">string</a>, rebalanceCb <a href="#RebalanceCb">RebalanceCb</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+    <p>
+     Subscribe to a single topic
+This replaces the current subscription
+    </p>
+    <h3 id="Consumer.SubscribeTopics">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=1758:1846#L53">
+      SubscribeTopics
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) SubscribeTopics(topics []<a href="http://golang.org/pkg/builtin/#string">string</a>, rebalanceCb <a href="#RebalanceCb">RebalanceCb</a>) (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     SubscribeTopics subscribes to the provided list of topics.
+This replaces the current subscription.
+    </p>
+    <h3 id="Consumer.Unassign">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=3022:3063#L97">
+      Unassign
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) Unassign() (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     Unassign the current set of partitions to consume.
+    </p>
+    <h3 id="Consumer.Unsubscribe">
+     func (*Consumer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=2451:2495#L75">
+      Unsubscribe
+     </a>
+    </h3>
+    <pre>func (c *<a href="#Consumer">Consumer</a>) Unsubscribe() (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     Unsubscribe from the current subscription, if any.
+    </p>
+    <h2 id="Error">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=862:912#L19">
+      Error
+     </a>
+    </h2>
+    <pre>type Error struct {
+    <span class="comment">// contains filtered or unexported fields</span>
+}</pre>
+    <p>
+     Error provides a Kafka-specific error container
+    </p>
+    <h3 id="Error.Code">
+     func (Error)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=1649:1680#L57">
+      Code
+     </a>
+    </h3>
+    <pre>func (e <a href="#Error">Error</a>) Code() <a href="#ErrorCode">ErrorCode</a></pre>
+    <p>
+     Code returns the ErrorCode of an Error
+    </p>
+    <h3 id="Error.Error">
+     func (Error)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=1392:1421#L44">
+      Error
+     </a>
+    </h3>
+    <pre>func (e <a href="#Error">Error</a>) Error() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <p>
+     Error returns a human readable representation of an Error
+Same as Error.String()
+    </p>
+    <h3 id="Error.String">
+     func (Error)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=1508:1538#L49">
+      String
+     </a>
+    </h3>
+    <pre>func (e <a href="#Error">Error</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <p>
+     String returns a human readable representation of an Error
+    </p>
+    <h2 id="ErrorCode">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go?s=328:346#L1">
+      ErrorCode
+     </a>
+    </h2>
+    <pre>type ErrorCode <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
+    <p>
+     ErrorCode is the integer representation of local and broker error codes
+    </p>
+    <pre>const (
+    <span class="comment">// ErrBadMsg Local: Bad message format</span>
+    <span id="ErrBadMsg">ErrBadMsg</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__BAD_MSG">RD_KAFKA_RESP_ERR__BAD_MSG</a>)
+    <span class="comment">// ErrBadCompression Local: Invalid compressed data</span>
+    <span id="ErrBadCompression">ErrBadCompression</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__BAD_COMPRESSION">RD_KAFKA_RESP_ERR__BAD_COMPRESSION</a>)
+    <span class="comment">// ErrDestroy Local: Broker handle destroyed</span>
+    <span id="ErrDestroy">ErrDestroy</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__DESTROY">RD_KAFKA_RESP_ERR__DESTROY</a>)
+    <span class="comment">// ErrFail Local: Communication failure with broker</span>
+    <span id="ErrFail">ErrFail</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__FAIL">RD_KAFKA_RESP_ERR__FAIL</a>)
+    <span class="comment">// ErrTransport Local: Broker transport failure</span>
+    <span id="ErrTransport">ErrTransport</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__TRANSPORT">RD_KAFKA_RESP_ERR__TRANSPORT</a>)
+    <span class="comment">// ErrCritSysResource Local: Critical system resource failure</span>
+    <span id="ErrCritSysResource">ErrCritSysResource</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE">RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE</a>)
+    <span class="comment">// ErrResolve Local: Host resolution failure</span>
+    <span id="ErrResolve">ErrResolve</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__RESOLVE">RD_KAFKA_RESP_ERR__RESOLVE</a>)
+    <span class="comment">// ErrMsgTimedOut Local: Message timed out</span>
+    <span id="ErrMsgTimedOut">ErrMsgTimedOut</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__MSG_TIMED_OUT">RD_KAFKA_RESP_ERR__MSG_TIMED_OUT</a>)
+    <span class="comment">// ErrPartitionEOF Broker: No more messages</span>
+    <span id="ErrPartitionEOF">ErrPartitionEOF</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__PARTITION_EOF">RD_KAFKA_RESP_ERR__PARTITION_EOF</a>)
+    <span class="comment">// ErrUnknownPartition Local: Unknown partition</span>
+    <span id="ErrUnknownPartition">ErrUnknownPartition</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION">RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION</a>)
+    <span class="comment">// ErrFs Local: File or filesystem error</span>
+    <span id="ErrFs">ErrFs</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__FS">RD_KAFKA_RESP_ERR__FS</a>)
+    <span class="comment">// ErrUnknownTopic Local: Unknown topic</span>
+    <span id="ErrUnknownTopic">ErrUnknownTopic</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC">RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC</a>)
+    <span class="comment">// ErrAllBrokersDown Local: All broker connections are down</span>
+    <span id="ErrAllBrokersDown">ErrAllBrokersDown</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN">RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN</a>)
+    <span class="comment">// ErrInvalidArg Local: Invalid argument or configuration</span>
+    <span id="ErrInvalidArg">ErrInvalidArg</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__INVALID_ARG">RD_KAFKA_RESP_ERR__INVALID_ARG</a>)
+    <span class="comment">// ErrTimedOut Local: Timed out</span>
+    <span id="ErrTimedOut">ErrTimedOut</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__TIMED_OUT">RD_KAFKA_RESP_ERR__TIMED_OUT</a>)
+    <span class="comment">// ErrQueueFull Local: Queue full</span>
+    <span id="ErrQueueFull">ErrQueueFull</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__QUEUE_FULL">RD_KAFKA_RESP_ERR__QUEUE_FULL</a>)
+    <span class="comment">// ErrIsrInsuff Local: ISR count insufficient</span>
+    <span id="ErrIsrInsuff">ErrIsrInsuff</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__ISR_INSUFF">RD_KAFKA_RESP_ERR__ISR_INSUFF</a>)
+    <span class="comment">// ErrNodeUpdate Local: Broker node update</span>
+    <span id="ErrNodeUpdate">ErrNodeUpdate</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__NODE_UPDATE">RD_KAFKA_RESP_ERR__NODE_UPDATE</a>)
+    <span class="comment">// ErrSsl Local: SSL error</span>
+    <span id="ErrSsl">ErrSsl</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__SSL">RD_KAFKA_RESP_ERR__SSL</a>)
+    <span class="comment">// ErrWaitCoord Local: Waiting for coordinator</span>
+    <span id="ErrWaitCoord">ErrWaitCoord</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__WAIT_COORD">RD_KAFKA_RESP_ERR__WAIT_COORD</a>)
+    <span class="comment">// ErrUnknownGroup Local: Unknown group</span>
+    <span id="ErrUnknownGroup">ErrUnknownGroup</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_GROUP">RD_KAFKA_RESP_ERR__UNKNOWN_GROUP</a>)
+    <span class="comment">// ErrInProgress Local: Operation in progress</span>
+    <span id="ErrInProgress">ErrInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__IN_PROGRESS">RD_KAFKA_RESP_ERR__IN_PROGRESS</a>)
+    <span class="comment">// ErrPrevInProgress Local: Previous operation in progress</span>
+    <span id="ErrPrevInProgress">ErrPrevInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS">RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS</a>)
+    <span class="comment">// ErrExistingSubscription Local: Existing subscription</span>
+    <span id="ErrExistingSubscription">ErrExistingSubscription</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION">RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION</a>)
+    <span class="comment">// ErrAssignPartitions Local: Assign partitions</span>
+    <span id="ErrAssignPartitions">ErrAssignPartitions</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS">RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS</a>)
+    <span class="comment">// ErrRevokePartitions Local: Revoke partitions</span>
+    <span id="ErrRevokePartitions">ErrRevokePartitions</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS">RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS</a>)
+    <span class="comment">// ErrConflict Local: Conflicting use</span>
+    <span id="ErrConflict">ErrConflict</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__CONFLICT">RD_KAFKA_RESP_ERR__CONFLICT</a>)
+    <span class="comment">// ErrState Local: Erroneous state</span>
+    <span id="ErrState">ErrState</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__STATE">RD_KAFKA_RESP_ERR__STATE</a>)
+    <span class="comment">// ErrUnknownProtocol Local: Unknown protocol</span>
+    <span id="ErrUnknownProtocol">ErrUnknownProtocol</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL">RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL</a>)
+    <span class="comment">// ErrNotImplemented Local: Not implemented</span>
+    <span id="ErrNotImplemented">ErrNotImplemented</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED">RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED</a>)
+    <span class="comment">// ErrAuthentication Local: Authentication failure</span>
+    <span id="ErrAuthentication">ErrAuthentication</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__AUTHENTICATION">RD_KAFKA_RESP_ERR__AUTHENTICATION</a>)
+    <span class="comment">// ErrNoOffset Local: No offset stored</span>
+    <span id="ErrNoOffset">ErrNoOffset</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__NO_OFFSET">RD_KAFKA_RESP_ERR__NO_OFFSET</a>)
+    <span class="comment">// ErrOutdated Local: Outdated</span>
+    <span id="ErrOutdated">ErrOutdated</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__OUTDATED">RD_KAFKA_RESP_ERR__OUTDATED</a>)
+    <span class="comment">// ErrTimedOutQueue Local: Timed out in queue</span>
+    <span id="ErrTimedOutQueue">ErrTimedOutQueue</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE">RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE</a>)
+    <span class="comment">// ErrUnknown Unknown broker error</span>
+    <span id="ErrUnknown">ErrUnknown</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNKNOWN">RD_KAFKA_RESP_ERR_UNKNOWN</a>)
+    <span class="comment">// ErrNoError Success</span>
+    <span id="ErrNoError">ErrNoError</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NO_ERROR">RD_KAFKA_RESP_ERR_NO_ERROR</a>)
+    <span class="comment">// ErrOffsetOutOfRange Broker: Offset out of range</span>
+    <span id="ErrOffsetOutOfRange">ErrOffsetOutOfRange</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE">RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE</a>)
+    <span class="comment">// ErrInvalidMsg Broker: Invalid message</span>
+    <span id="ErrInvalidMsg">ErrInvalidMsg</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_MSG">RD_KAFKA_RESP_ERR_INVALID_MSG</a>)
+    <span class="comment">// ErrUnknownTopicOrPart Broker: Unknown topic or partition</span>
+    <span id="ErrUnknownTopicOrPart">ErrUnknownTopicOrPart</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART">RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART</a>)
+    <span class="comment">// ErrInvalidMsgSize Broker: Invalid message size</span>
+    <span id="ErrInvalidMsgSize">ErrInvalidMsgSize</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE">RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE</a>)
+    <span class="comment">// ErrLeaderNotAvailable Broker: Leader not available</span>
+    <span id="ErrLeaderNotAvailable">ErrLeaderNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE</a>)
+    <span class="comment">// ErrNotLeaderForPartition Broker: Not leader for partition</span>
+    <span id="ErrNotLeaderForPartition">ErrNotLeaderForPartition</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION">RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION</a>)
+    <span class="comment">// ErrRequestTimedOut Broker: Request timed out</span>
+    <span id="ErrRequestTimedOut">ErrRequestTimedOut</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT">RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT</a>)
+    <span class="comment">// ErrBrokerNotAvailable Broker: Broker not available</span>
+    <span id="ErrBrokerNotAvailable">ErrBrokerNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE</a>)
+    <span class="comment">// ErrReplicaNotAvailable Broker: Replica not available</span>
+    <span id="ErrReplicaNotAvailable">ErrReplicaNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE</a>)
+    <span class="comment">// ErrMsgSizeTooLarge Broker: Message size too large</span>
+    <span id="ErrMsgSizeTooLarge">ErrMsgSizeTooLarge</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE">RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE</a>)
+    <span class="comment">// ErrStaleCtrlEpoch Broker: StaleControllerEpochCode</span>
+    <span id="ErrStaleCtrlEpoch">ErrStaleCtrlEpoch</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH">RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH</a>)
+    <span class="comment">// ErrOffsetMetadataTooLarge Broker: Offset metadata string too large</span>
+    <span id="ErrOffsetMetadataTooLarge">ErrOffsetMetadataTooLarge</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE">RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE</a>)
+    <span class="comment">// ErrNetworkException Broker: Broker disconnected before response received</span>
+    <span id="ErrNetworkException">ErrNetworkException</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION">RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION</a>)
+    <span class="comment">// ErrGroupLoadInProgress Broker: Group coordinator load in progress</span>
+    <span id="ErrGroupLoadInProgress">ErrGroupLoadInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS">RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS</a>)
+    <span class="comment">// ErrGroupCoordinatorNotAvailable Broker: Group coordinator not available</span>
+    <span id="ErrGroupCoordinatorNotAvailable">ErrGroupCoordinatorNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE</a>)
+    <span class="comment">// ErrNotCoordinatorForGroup Broker: Not coordinator for group</span>
+    <span id="ErrNotCoordinatorForGroup">ErrNotCoordinatorForGroup</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP">RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP</a>)
+    <span class="comment">// ErrTopicException Broker: Invalid topic</span>
+    <span id="ErrTopicException">ErrTopicException</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION">RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION</a>)
+    <span class="comment">// ErrRecordListTooLarge Broker: Message batch larger than configured server segment size</span>
+    <span id="ErrRecordListTooLarge">ErrRecordListTooLarge</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE">RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE</a>)
+    <span class="comment">// ErrNotEnoughReplicas Broker: Not enough in-sync replicas</span>
+    <span id="ErrNotEnoughReplicas">ErrNotEnoughReplicas</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS">RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS</a>)
+    <span class="comment">// ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas</span>
+    <span id="ErrNotEnoughReplicasAfterAppend">ErrNotEnoughReplicasAfterAppend</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND">RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND</a>)
+    <span class="comment">// ErrInvalidRequiredAcks Broker: Invalid required acks value</span>
+    <span id="ErrInvalidRequiredAcks">ErrInvalidRequiredAcks</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS">RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS</a>)
+    <span class="comment">// ErrIllegalGeneration Broker: Specified group generation id is not valid</span>
+    <span id="ErrIllegalGeneration">ErrIllegalGeneration</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION">RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION</a>)
+    <span class="comment">// ErrInconsistentGroupProtocol Broker: Inconsistent group protocol</span>
+    <span id="ErrInconsistentGroupProtocol">ErrInconsistentGroupProtocol</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL">RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL</a>)
+    <span class="comment">// ErrInvalidGroupID Broker: Invalid group.id</span>
+    <span id="ErrInvalidGroupID">ErrInvalidGroupID</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_GROUP_ID">RD_KAFKA_RESP_ERR_INVALID_GROUP_ID</a>)
+    <span class="comment">// ErrUnknownMemberID Broker: Unknown member</span>
+    <span id="ErrUnknownMemberID">ErrUnknownMemberID</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID">RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID</a>)
+    <span class="comment">// ErrInvalidSessionTimeout Broker: Invalid session timeout</span>
+    <span id="ErrInvalidSessionTimeout">ErrInvalidSessionTimeout</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT">RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT</a>)
+    <span class="comment">// ErrRebalanceInProgress Broker: Group rebalance in progress</span>
+    <span id="ErrRebalanceInProgress">ErrRebalanceInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS">RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS</a>)
+    <span class="comment">// ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid</span>
+    <span id="ErrInvalidCommitOffsetSize">ErrInvalidCommitOffsetSize</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE">RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE</a>)
+    <span class="comment">// ErrTopicAuthorizationFailed Broker: Topic authorization failed</span>
+    <span id="ErrTopicAuthorizationFailed">ErrTopicAuthorizationFailed</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED">RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED</a>)
+    <span class="comment">// ErrGroupAuthorizationFailed Broker: Group authorization failed</span>
+    <span id="ErrGroupAuthorizationFailed">ErrGroupAuthorizationFailed</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED">RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED</a>)
+    <span class="comment">// ErrClusterAuthorizationFailed Broker: Cluster authorization failed</span>
+    <span id="ErrClusterAuthorizationFailed">ErrClusterAuthorizationFailed</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED">RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED</a>)
+    <span class="comment">// ErrInvalidTimestamp Broker: Invalid timestamp</span>
+    <span id="ErrInvalidTimestamp">ErrInvalidTimestamp</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP">RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP</a>)
+    <span class="comment">// ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism</span>
+    <span id="ErrUnsupportedSaslMechanism">ErrUnsupportedSaslMechanism</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM">RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM</a>)
+    <span class="comment">// ErrIllegalSaslState Broker: Request not valid in current SASL state</span>
+    <span id="ErrIllegalSaslState">ErrIllegalSaslState</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE">RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE</a>)
+    <span class="comment">// ErrUnsupportedVersion Broker: API version not supported</span>
+    <span id="ErrUnsupportedVersion">ErrUnsupportedVersion</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION">RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION</a>)
+)</pre>
+    <h3 id="ErrorCode.String">
+     func (ErrorCode)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go?s=415:449#L4">
+      String
+     </a>
+    </h3>
+    <pre>func (c <a href="#ErrorCode">ErrorCode</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <p>
+     String returns a human readable representation of an error code
+    </p>
+    <h2 id="Event">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1412:1517#L41">
+      Event
+     </a>
+    </h2>
+    <pre>type Event interface {
+    <span class="comment">// String returns a human-readable representation of the event</span>
+    String() <a href="http://golang.org/pkg/builtin/#string">string</a>
+}</pre>
+    <p>
+     Event generic interface
+    </p>
+    <h2 id="Handle">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/handle.go?s=822:868#L23">
+      Handle
+     </a>
+    </h2>
+    <pre>type Handle interface {
+    <span class="comment">// contains filtered or unexported methods</span>
+}</pre>
+    <p>
+     Handle represents a generic client handle containing common parts for
+both Producer and Consumer.
+    </p>
+    <h2 id="Message">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=2465:2649#L76">
+      Message
+     </a>
+    </h2>
+    <pre>type Message struct {
+    TopicPartition <a href="#TopicPartition">TopicPartition</a>
+    Value          []<a href="http://golang.org/pkg/builtin/#byte">byte</a>
+    Key            []<a href="http://golang.org/pkg/builtin/#byte">byte</a>
+    Timestamp      <a href="http://golang.org/pkg/time/">time</a>.<a href="http://golang.org/pkg/time/#Time">Time</a>
+    TimestampType  <a href="#TimestampType">TimestampType</a>
+    Opaque         interface{}
+}</pre>
+    <p>
+     Message represents a Kafka message
+    </p>
+    <h3 id="Message.String">
+     func (*Message)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=2755:2788#L87">
+      String
+     </a>
+    </h3>
+    <pre>func (m *<a href="#Message">Message</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <p>
+     String returns a human readable representation of a Message.
+Key and payload are not represented.
+    </p>
+    <h2 id="Metadata">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1723:1842#L60">
+      Metadata
+     </a>
+    </h2>
+    <pre>type Metadata struct {
+    Brokers []<a href="#BrokerMetadata">BrokerMetadata</a>
+    Topics  map[<a href="http://golang.org/pkg/builtin/#string">string</a>]<a href="#TopicMetadata">TopicMetadata</a>
+
+    OriginatingBroker <a href="#BrokerMetadata">BrokerMetadata</a>
+}</pre>
+    <p>
+     Metadata contains broker and topic metadata for all (matching) topics
+    </p>
+    <h2 id="Offset">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=6428:6445#L149">
+      Offset
+     </a>
+    </h2>
+    <pre>type Offset <a href="http://golang.org/pkg/builtin/#int64">int64</a></pre>
+    <p>
+     Offset type (int64) with support for canonical names
+    </p>
+    <h3 id="NewOffset">
+     func
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=7384:7434#L192">
+      NewOffset
+     </a>
+    </h3>
+    <pre>func NewOffset(offset interface{}) (<a href="#Offset">Offset</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     NewOffset creates a new Offset using the provided logical string, or an
+absolute int64 offset value.
+Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
+    </p>
+    <h3 id="OffsetTail">
+     func
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=8170:8215#L231">
+      OffsetTail
+     </a>
+    </h3>
+    <pre>func OffsetTail(relativeOffset <a href="#Offset">Offset</a>) <a href="#Offset">Offset</a></pre>
+    <p>
+     OffsetTail returns the logical offset relativeOffset from current end of partition
+    </p>
+    <h3 id="Offset.Set">
+     func (Offset)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=7064:7109#L179">
+      Set
+     </a>
+    </h3>
+    <pre>func (o <a href="#Offset">Offset</a>) Set(offset interface{}) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+    <p>
+     Set offset value, see NewOffset()
+    </p>
+    <h3 id="Offset.String">
+     func (Offset)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=6776:6807#L163">
+      String
+     </a>
+    </h3>
+    <pre>func (o <a href="#Offset">Offset</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <h2 id="OffsetsCommitted">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2266:2339#L74">
+      OffsetsCommitted
+     </a>
+    </h2>
+    <pre>type OffsetsCommitted struct {
+    Error   <a href="http://golang.org/pkg/builtin/#error">error</a>
+    Offsets []<a href="#TopicPartition">TopicPartition</a>
+}</pre>
+    <p>
+     OffsetsCommitted reports committed offsets
+    </p>
+    <h3 id="OffsetsCommitted.String">
+     func (OffsetsCommitted)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2341:2382#L79">
+      String
+     </a>
+    </h3>
+    <pre>func (o <a href="#OffsetsCommitted">OffsetsCommitted</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <h2 id="PartitionEOF">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2091:2123#L67">
+      PartitionEOF
+     </a>
+    </h2>
+    <pre>type PartitionEOF <a href="#TopicPartition">TopicPartition</a></pre>
+    <p>
+     PartitionEOF consumer reached end of partition
+    </p>
+    <h3 id="PartitionEOF.String">
+     func (PartitionEOF)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2125:2162#L69">
+      String
+     </a>
+    </h3>
+    <pre>func (p <a href="#PartitionEOF">PartitionEOF</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <h2 id="PartitionMetadata">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1386:1503#L44">
+      PartitionMetadata
+     </a>
+    </h2>
+    <pre>type PartitionMetadata struct {
+    ID       <a href="http://golang.org/pkg/builtin/#int32">int32</a>
+    Error    <a href="#Error">Error</a>
+    Leader   <a href="http://golang.org/pkg/builtin/#int32">int32</a>
+    Replicas []<a href="http://golang.org/pkg/builtin/#int32">int32</a>
+    Isrs     []<a href="http://golang.org/pkg/builtin/#int32">int32</a>
+}</pre>
+    <p>
+     PartitionMetadata contains per-partition metadata
+    </p>
+    <h2 id="Producer">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=1101:1270#L30">
+      Producer
+     </a>
+    </h2>
+    <pre>type Producer struct {
+    <span class="comment">// contains filtered or unexported fields</span>
+}</pre>
+    <p>
+     Producer implements a High-level Apache Kafka Producer instance
+    </p>
+    <h3 id="NewProducer">
+     func
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=6249:6301#L203">
+      NewProducer
+     </a>
+    </h3>
+    <pre>func NewProducer(conf *<a href="#ConfigMap">ConfigMap</a>) (*<a href="#Producer">Producer</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     NewProducer creates a new high-level Producer instance.
+    </p>
+    <p>
+     conf is a *ConfigMap with standard librdkafka configuration properties, see here:
+    </p>
+    <p>
+     Supported special configuration properties:
+    </p>
+    <pre>go.batch.producer (bool, false) - Enable batch producer (experimental for increased performance).
+                                  These batches do not relate to Kafka message batches in any way.
+go.delivery.reports (bool, true) - Forward per-message delivery reports to the
+                                   Events() channel.
+go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
+</pre>
+    <h3 id="Producer.Close">
+     func (*Producer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=5283:5309#L174">
+      Close
+     </a>
+    </h3>
+    <pre>func (p *<a href="#Producer">Producer</a>) Close()</pre>
+    <p>
+     Close a Producer instance.
+The Producer object or its channels are no longer usable after this call.
+    </p>
+    <h3 id="Producer.Events">
+     func (*Producer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4035:4073#L134">
+      Events
+     </a>
+    </h3>
+    <pre>func (p *<a href="#Producer">Producer</a>) Events() chan <a href="#Event">Event</a></pre>
+    <p>
+     Events returns the Events channel (read)
+    </p>
+    <h3 id="Producer.Flush">
+     func (*Producer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4779:4822#L154">
+      Flush
+     </a>
+    </h3>
+    <pre>func (p *<a href="#Producer">Producer</a>) Flush(timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
+    <p>
+     Flush and wait for outstanding messages and requests to complete delivery.
+Includes messages on ProduceChannel.
+Runs until value reaches zero or on timeoutMs.
+Returns the number of outstanding events still un-flushed.
+    </p>
+    <h3 id="Producer.GetMetadata">
+     func (*Producer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=9852:9947#L354">
+      GetMetadata
+     </a>
+    </h3>
+    <pre>func (p *<a href="#Producer">Producer</a>) GetMetadata(topic *<a href="http://golang.org/pkg/builtin/#string">string</a>, allTopics <a href="http://golang.org/pkg/builtin/#bool">bool</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (*<a href="#Metadata">Metadata</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     GetMetadata queries broker for cluster and topic metadata.
+If topic is non-nil only information about that topic is returned, else if
+allTopics is false only information about locally used topics is returned,
+else information about all topics is returned.
+    </p>
+    <h3 id="Producer.Len">
+     func (*Producer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4429:4457#L146">
+      Len
+     </a>
+    </h3>
+    <pre>func (p *<a href="#Producer">Producer</a>) Len() <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
+    <p>
+     Len returns the number of messages and requests waiting to be transmitted to the broker
+as well as delivery reports queued for the application.
+Includes messages on ProduceChannel.
+    </p>
+    <h3 id="Producer.Produce">
+     func (*Producer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=3205:3276#L109">
+      Produce
+     </a>
+    </h3>
+    <pre>func (p *<a href="#Producer">Producer</a>) Produce(msg *<a href="#Message">Message</a>, deliveryChan chan <a href="#Event">Event</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+    <p>
+     Produce single message.
+This is an asynchronous call that enqueues the message on the internal
+transmit queue, thus returning immediately.
+The delivery report will be sent on the provided deliveryChan if specified,
+or on the Producer object's Events() channel if not.
+Returns an error if message could not be enqueued.
+    </p>
+    <h3 id="Producer.ProduceChannel">
+     func (*Producer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4159:4208#L139">
+      ProduceChannel
+     </a>
+    </h3>
+    <pre>func (p *<a href="#Producer">Producer</a>) ProduceChannel() chan *<a href="#Message">Message</a></pre>
+    <p>
+     ProduceChannel returns the produce *Message channel (write)
+    </p>
+    <h3 id="Producer.QueryWatermarkOffsets">
+     func (*Producer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=10110:10225#L360">
+      QueryWatermarkOffsets
+     </a>
+    </h3>
+    <pre>func (p *<a href="#Producer">Producer</a>) QueryWatermarkOffsets(topic <a href="http://golang.org/pkg/builtin/#string">string</a>, partition <a href="http://golang.org/pkg/builtin/#int32">int32</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (low, high <a href="http://golang.org/pkg/builtin/#int64">int64</a>, err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
+    <p>
+     QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
+and partition.
+    </p>
+    <h3 id="Producer.String">
+     func (*Producer)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=1336:1370#L40">
+      String
+     </a>
+    </h3>
+    <pre>func (p *<a href="#Producer">Producer</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <p>
+     String returns a human readable name for a Producer instance
+    </p>
+    <h2 id="RebalanceCb">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=854:899#L22">
+      RebalanceCb
+     </a>
+    </h2>
+    <pre>type RebalanceCb func(*<a href="#Consumer">Consumer</a>, <a href="#Event">Event</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
+    <p>
+     RebalanceCb provides a per-Subscribe*() rebalance event callback.
+The passed Event will be either AssignedPartitions or RevokedPartitions
+    </p>
+    <h2 id="RevokedPartitions">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1870:1932#L58">
+      RevokedPartitions
+     </a>
+    </h2>
+    <pre>type RevokedPartitions struct {
+    Partitions []<a href="#TopicPartition">TopicPartition</a>
+}</pre>
+    <p>
+     RevokedPartitions consumer group rebalance event: revoked partition set
+    </p>
+    <h3 id="RevokedPartitions.String">
+     func (RevokedPartitions)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1934:1976#L62">
+      String
+     </a>
+    </h3>
+    <pre>func (e <a href="#RevokedPartitions">RevokedPartitions</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <h2 id="TimestampType">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=1671:1693#L51">
+      TimestampType
+     </a>
+    </h2>
+    <pre>type TimestampType <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
+    <p>
+     TimestampType is a the Message timestamp type or source
+    </p>
+    <h3 id="TimestampType.String">
+     func (TimestampType)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=2187:2225#L62">
+      String
+     </a>
+    </h3>
+    <pre>func (t <a href="#TimestampType">TimestampType</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <h2 id="TopicMetadata">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1550:1648#L53">
+      TopicMetadata
+     </a>
+    </h2>
+    <pre>type TopicMetadata struct {
+    Topic      <a href="http://golang.org/pkg/builtin/#string">string</a>
+    Partitions []<a href="#PartitionMetadata">PartitionMetadata</a>
+    Error      <a href="#Error">Error</a>
+}</pre>
+    <p>
+     TopicMetadata contains per-topic metadata
+    </p>
+    <h2 id="TopicPartition">
+     type
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=8377:8478#L236">
+      TopicPartition
+     </a>
+    </h2>
+    <pre>type TopicPartition struct {
+    Topic     *<a href="http://golang.org/pkg/builtin/#string">string</a>
+    Partition <a href="http://golang.org/pkg/builtin/#int32">int32</a>
+    Offset    <a href="#Offset">Offset</a>
+    Error     <a href="http://golang.org/pkg/builtin/#error">error</a>
+}</pre>
+    <p>
+     TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.
+    </p>
+    <h3 id="TopicPartition.String">
+     func (TopicPartition)
+     <a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=8480:8519#L243">
+      String
+     </a>
+    </h3>
+    <pre>func (p <a href="#TopicPartition">TopicPartition</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
+    <div id="footer">
+     Build version go1.6.
+     <br>
+      Except as
+      <a href="https://developers.google.com/site-policies#restrictions">
+       noted
+      </a>
+      ,
+the content of this page is licensed under the
+Creative Commons Attribution 3.0 License,
+and code is licensed under a
+      <a href="http://golang.org/LICENSE">
+       BSD license
+      </a>
+      .
+      <br>
+       <a href="http://golang.org/doc/tos.html">
+        Terms of Service
+       </a>
+       |
+       <a href="http://www.google.com/intl/en/policies/privacy/">
+        Privacy Policy
+       </a>
+      </br>
+     </br>
+    </div>
+   </div>
+   <!-- .container -->
+  </div>
+  <!-- #page -->
+  <!-- TODO(adonovan): load these from <head> using "defer" attribute? -->
+  <script src="http://golang.org/lib/godoc/jquery.js" type="text/javascript">
+  </script>
+  <script src="http://golang.org/lib/godoc/jquery.treeview.js" type="text/javascript">
+  </script>
+  <script src="http://golang.org/lib/godoc/jquery.treeview.edit.js" type="text/javascript">
+  </script>
+  <script src="http://golang.org/lib/godoc/godocs.js" type="text/javascript">
+  </script>
+ </body>
+</html>
+
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_dynamic.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_dynamic.go
new file mode 100644
index 0000000..c14c1f6
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_dynamic.go
@@ -0,0 +1,7 @@
+// +build !static
+// +build !static_all
+
+package kafka
+
+// #cgo pkg-config: rdkafka
+import "C"
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static.go
new file mode 100644
index 0000000..3c8799c
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static.go
@@ -0,0 +1,7 @@
+// +build static
+// +build !static_all
+
+package kafka
+
+// #cgo pkg-config: rdkafka-static
+import "C"
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static_all.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static_all.go
new file mode 100644
index 0000000..8afb8c9
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/build_static_all.go
@@ -0,0 +1,8 @@
+// +build !static
+// +build static_all
+
+package kafka
+
+// #cgo pkg-config: rdkafka-static
+// #cgo LDFLAGS: -static
+import "C"
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/config.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/config.go
new file mode 100644
index 0000000..5866a47
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/config.go
@@ -0,0 +1,243 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+	"fmt"
+	"reflect"
+	"strings"
+	"unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+*/
+import "C"
+
+// ConfigValue supports the following types:
+//  bool, int, string, any type with the standard String() interface
+type ConfigValue interface{}
+
+// ConfigMap is a map contaning standard librdkafka configuration properties as documented in:
+// https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md
+//
+// The special property "default.topic.config" (optional) is a ConfigMap containing default topic
+// configuration properties.
+type ConfigMap map[string]ConfigValue
+
+// SetKey sets configuration property key to value.
+// For user convenience a key prefixed with {topic}. will be
+// set on the "default.topic.config" sub-map.
+func (m ConfigMap) SetKey(key string, value ConfigValue) error {
+	if strings.HasPrefix(key, "{topic}.") {
+		_, found := m["default.topic.config"]
+		if !found {
+			m["default.topic.config"] = ConfigMap{}
+		}
+		m["default.topic.config"].(ConfigMap)[strings.TrimPrefix(key, "{topic}.")] = value
+	} else {
+		m[key] = value
+	}
+
+	return nil
+}
+
+// Set implements flag.Set (command line argument parser) as a convenience
+// for `-X key=value` config.
+func (m ConfigMap) Set(kv string) error {
+	i := strings.Index(kv, "=")
+	if i == -1 {
+		return Error{ErrInvalidArg, "Expected key=value"}
+	}
+
+	k := kv[:i]
+	v := kv[i+1:]
+
+	return m.SetKey(k, v)
+}
+
+func value2string(v ConfigValue) (ret string, errstr string) {
+
+	switch x := v.(type) {
+	case bool:
+		if x {
+			ret = "true"
+		} else {
+			ret = "false"
+		}
+	case int:
+		ret = fmt.Sprintf("%d", x)
+	case string:
+		ret = x
+	case fmt.Stringer:
+		ret = x.String()
+	default:
+		return "", fmt.Sprintf("Invalid value type %T", v)
+	}
+
+	return ret, ""
+}
+
+// rdkAnyconf abstracts rd_kafka_conf_t and rd_kafka_topic_conf_t
+// into a common interface.
+type rdkAnyconf interface {
+	set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t
+}
+
+func anyconfSet(anyconf rdkAnyconf, key string, val ConfigValue) (err error) {
+	value, errstr := value2string(val)
+	if errstr != "" {
+		return Error{ErrInvalidArg, fmt.Sprintf("%s for key %s (expected string,bool,int,ConfigMap)", errstr, key)}
+	}
+	cKey := C.CString(key)
+	cVal := C.CString(value)
+	cErrstr := (*C.char)(C.malloc(C.size_t(128)))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	if anyconf.set(cKey, cVal, cErrstr, 128) != C.RD_KAFKA_CONF_OK {
+		C.free(unsafe.Pointer(cKey))
+		C.free(unsafe.Pointer(cVal))
+		return newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+	}
+
+	return nil
+}
+
+// we need these typedefs to workaround a crash in golint
+// when parsing the set() methods below
+type rdkConf C.rd_kafka_conf_t
+type rdkTopicConf C.rd_kafka_topic_conf_t
+
+func (cConf *rdkConf) set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t {
+	return C.rd_kafka_conf_set((*C.rd_kafka_conf_t)(cConf), cKey, cVal, cErrstr, C.size_t(errstrSize))
+}
+
+func (ctopicConf *rdkTopicConf) set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t {
+	return C.rd_kafka_topic_conf_set((*C.rd_kafka_topic_conf_t)(ctopicConf), cKey, cVal, cErrstr, C.size_t(errstrSize))
+}
+
+func configConvertAnyconf(m ConfigMap, anyconf rdkAnyconf) (err error) {
+	// set plugins first, any plugin-specific configuration depends on
+	// the plugin to have already been set
+	pluginPaths, ok := m["plugin.library.paths"]
+	if ok {
+		err = anyconfSet(anyconf, "plugin.library.paths", pluginPaths)
+		if err != nil {
+			return err
+		}
+	}
+	for k, v := range m {
+		if k == "plugin.library.paths" {
+			continue
+		}
+		switch v.(type) {
+		case ConfigMap:
+			/* Special sub-ConfigMap, only used for default.topic.config */
+
+			if k != "default.topic.config" {
+				return Error{ErrInvalidArg, fmt.Sprintf("Invalid type for key %s", k)}
+			}
+
+			var cTopicConf = C.rd_kafka_topic_conf_new()
+
+			err = configConvertAnyconf(v.(ConfigMap),
+				(*rdkTopicConf)(cTopicConf))
+			if err != nil {
+				C.rd_kafka_topic_conf_destroy(cTopicConf)
+				return err
+			}
+
+			C.rd_kafka_conf_set_default_topic_conf(
+				(*C.rd_kafka_conf_t)(anyconf.(*rdkConf)),
+				(*C.rd_kafka_topic_conf_t)((*rdkTopicConf)(cTopicConf)))
+
+		default:
+			err = anyconfSet(anyconf, k, v)
+			if err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+// convert ConfigMap to C rd_kafka_conf_t *
+func (m ConfigMap) convert() (cConf *C.rd_kafka_conf_t, err error) {
+	cConf = C.rd_kafka_conf_new()
+
+	err = configConvertAnyconf(m, (*rdkConf)(cConf))
+	if err != nil {
+		C.rd_kafka_conf_destroy(cConf)
+		return nil, err
+	}
+	return cConf, nil
+}
+
+// get finds key in the configmap and returns its value.
+// If the key is not found defval is returned.
+// If the key is found but the type is mismatched an error is returned.
+func (m ConfigMap) get(key string, defval ConfigValue) (ConfigValue, error) {
+	if strings.HasPrefix(key, "{topic}.") {
+		defconfCv, found := m["default.topic.config"]
+		if !found {
+			return defval, nil
+		}
+		return defconfCv.(ConfigMap).get(strings.TrimPrefix(key, "{topic}."), defval)
+	}
+
+	v, ok := m[key]
+	if !ok {
+		return defval, nil
+	}
+
+	if defval != nil && reflect.TypeOf(defval) != reflect.TypeOf(v) {
+		return nil, Error{ErrInvalidArg, fmt.Sprintf("%s expects type %T, not %T", key, defval, v)}
+	}
+
+	return v, nil
+}
+
+// extract performs a get() and if found deletes the key.
+func (m ConfigMap) extract(key string, defval ConfigValue) (ConfigValue, error) {
+
+	v, err := m.get(key, defval)
+	if err != nil {
+		return nil, err
+	}
+
+	delete(m, key)
+
+	return v, nil
+}
+
+func (m ConfigMap) clone() ConfigMap {
+	m2 := make(ConfigMap)
+	for k, v := range m {
+		m2[k] = v
+	}
+	return m2
+}
+
+// Get finds the given key in the ConfigMap and returns its value.
+// If the key is not found `defval` is returned.
+// If the key is found but the type does not match that of `defval` (unless nil)
+// an ErrInvalidArg error is returned.
+func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error) {
+	return m.get(key, defval)
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go
new file mode 100644
index 0000000..5c42ece
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go
@@ -0,0 +1,581 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+	"fmt"
+	"math"
+	"time"
+	"unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+
+
+static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) {
+   return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL;
+}
+*/
+import "C"
+
+// RebalanceCb provides a per-Subscribe*() rebalance event callback.
+// The passed Event will be either AssignedPartitions or RevokedPartitions
+type RebalanceCb func(*Consumer, Event) error
+
+// Consumer implements a High-level Apache Kafka Consumer instance
+type Consumer struct {
+	events             chan Event
+	handle             handle
+	eventsChanEnable   bool
+	readerTermChan     chan bool
+	rebalanceCb        RebalanceCb
+	appReassigned      bool
+	appRebalanceEnable bool // config setting
+}
+
+// Strings returns a human readable name for a Consumer instance
+func (c *Consumer) String() string {
+	return c.handle.String()
+}
+
+// getHandle implements the Handle interface
+func (c *Consumer) gethandle() *handle {
+	return &c.handle
+}
+
+// Subscribe to a single topic
+// This replaces the current subscription
+func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error {
+	return c.SubscribeTopics([]string{topic}, rebalanceCb)
+}
+
+// SubscribeTopics subscribes to the provided list of topics.
+// This replaces the current subscription.
+func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) {
+	ctopics := C.rd_kafka_topic_partition_list_new(C.int(len(topics)))
+	defer C.rd_kafka_topic_partition_list_destroy(ctopics)
+
+	for _, topic := range topics {
+		ctopic := C.CString(topic)
+		defer C.free(unsafe.Pointer(ctopic))
+		C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA)
+	}
+
+	e := C.rd_kafka_subscribe(c.handle.rk, ctopics)
+	if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return newError(e)
+	}
+
+	c.rebalanceCb = rebalanceCb
+	c.handle.currAppRebalanceEnable = c.rebalanceCb != nil || c.appRebalanceEnable
+
+	return nil
+}
+
+// Unsubscribe from the current subscription, if any.
+func (c *Consumer) Unsubscribe() (err error) {
+	C.rd_kafka_unsubscribe(c.handle.rk)
+	return nil
+}
+
+// Assign an atomic set of partitions to consume.
+// This replaces the current assignment.
+func (c *Consumer) Assign(partitions []TopicPartition) (err error) {
+	c.appReassigned = true
+
+	cparts := newCPartsFromTopicPartitions(partitions)
+	defer C.rd_kafka_topic_partition_list_destroy(cparts)
+
+	e := C.rd_kafka_assign(c.handle.rk, cparts)
+	if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return newError(e)
+	}
+
+	return nil
+}
+
+// Unassign the current set of partitions to consume.
+func (c *Consumer) Unassign() (err error) {
+	c.appReassigned = true
+
+	e := C.rd_kafka_assign(c.handle.rk, nil)
+	if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return newError(e)
+	}
+
+	return nil
+}
+
+// commit offsets for specified offsets.
+// If offsets is nil the currently assigned partitions' offsets are committed.
+// This is a blocking call, caller will need to wrap in go-routine to
+// get async or throw-away behaviour.
+func (c *Consumer) commit(offsets []TopicPartition) (committedOffsets []TopicPartition, err error) {
+	var rkqu *C.rd_kafka_queue_t
+
+	rkqu = C.rd_kafka_queue_new(c.handle.rk)
+	defer C.rd_kafka_queue_destroy(rkqu)
+
+	var coffsets *C.rd_kafka_topic_partition_list_t
+	if offsets != nil {
+		coffsets = newCPartsFromTopicPartitions(offsets)
+		defer C.rd_kafka_topic_partition_list_destroy(coffsets)
+	}
+
+	cErr := C.rd_kafka_commit_queue(c.handle.rk, coffsets, rkqu, nil, nil)
+	if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return nil, newError(cErr)
+	}
+
+	rkev := C.rd_kafka_queue_poll(rkqu, C.int(-1))
+	if rkev == nil {
+		// shouldn't happen
+		return nil, newError(C.RD_KAFKA_RESP_ERR__DESTROY)
+	}
+	defer C.rd_kafka_event_destroy(rkev)
+
+	if C.rd_kafka_event_type(rkev) != C.RD_KAFKA_EVENT_OFFSET_COMMIT {
+		panic(fmt.Sprintf("Expected OFFSET_COMMIT, got %s",
+			C.GoString(C.rd_kafka_event_name(rkev))))
+	}
+
+	cErr = C.rd_kafka_event_error(rkev)
+	if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return nil, newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
+	}
+
+	cRetoffsets := C.rd_kafka_event_topic_partition_list(rkev)
+	if cRetoffsets == nil {
+		// no offsets, no error
+		return nil, nil
+	}
+	committedOffsets = newTopicPartitionsFromCparts(cRetoffsets)
+
+	return committedOffsets, nil
+}
+
+// Commit offsets for currently assigned partitions
+// This is a blocking call.
+// Returns the committed offsets on success.
+func (c *Consumer) Commit() ([]TopicPartition, error) {
+	return c.commit(nil)
+}
+
+// CommitMessage commits offset based on the provided message.
+// This is a blocking call.
+// Returns the committed offsets on success.
+func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) {
+	if m.TopicPartition.Error != nil {
+		return nil, Error{ErrInvalidArg, "Can't commit errored message"}
+	}
+	offsets := []TopicPartition{m.TopicPartition}
+	offsets[0].Offset++
+	return c.commit(offsets)
+}
+
+// CommitOffsets commits the provided list of offsets
+// This is a blocking call.
+// Returns the committed offsets on success.
+func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) {
+	return c.commit(offsets)
+}
+
+// StoreOffsets stores the provided list of offsets that will be committed
+// to the offset store according to `auto.commit.interval.ms` or manual
+// offset-less Commit().
+//
+// Returns the stored offsets on success. If at least one offset couldn't be stored,
+// an error and a list of offsets is returned. Each offset can be checked for
+// specific errors via its `.Error` member.
+func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error) {
+	coffsets := newCPartsFromTopicPartitions(offsets)
+	defer C.rd_kafka_topic_partition_list_destroy(coffsets)
+
+	cErr := C.rd_kafka_offsets_store(c.handle.rk, coffsets)
+
+	// coffsets might be annotated with an error
+	storedOffsets = newTopicPartitionsFromCparts(coffsets)
+
+	if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return storedOffsets, newError(cErr)
+	}
+
+	return storedOffsets, nil
+}
+
+// Seek seeks the given topic partitions using the offset from the TopicPartition.
+//
+// If timeoutMs is not 0 the call will wait this long for the
+// seek to be performed. If the timeout is reached the internal state
+// will be unknown and this function returns ErrTimedOut.
+// If timeoutMs is 0 it will initiate the seek but return
+// immediately without any error reporting (e.g., async).
+//
+// Seek() may only be used for partitions already being consumed
+// (through Assign() or implicitly through a self-rebalanced Subscribe()).
+// To set the starting offset it is preferred to use Assign() and provide
+// a starting offset for each partition.
+//
+// Returns an error on failure or nil otherwise.
+func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error {
+	rkt := c.handle.getRkt(*partition.Topic)
+	cErr := C.rd_kafka_seek(rkt,
+		C.int32_t(partition.Partition),
+		C.int64_t(partition.Offset),
+		C.int(timeoutMs))
+	if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return newError(cErr)
+	}
+	return nil
+}
+
+// Poll the consumer for messages or events.
+//
+// Will block for at most timeoutMs milliseconds
+//
+// The following callbacks may be triggered:
+//   Subscribe()'s rebalanceCb
+//
+// Returns nil on timeout, else an Event
+func (c *Consumer) Poll(timeoutMs int) (event Event) {
+	ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil)
+	return ev
+}
+
+// Events returns the Events channel (if enabled)
+func (c *Consumer) Events() chan Event {
+	return c.events
+}
+
+// ReadMessage polls the consumer for a message.
+//
+// This is a conveniance API that wraps Poll() and only returns
+// messages or errors. All other event types are discarded.
+//
+// The call will block for at most `timeout` waiting for
+// a new message or error. `timeout` may be set to -1 for
+// indefinite wait.
+//
+// Timeout is returned as (nil, err) where err is `kafka.(Error).Code == Kafka.ErrTimedOut`.
+//
+// Messages are returned as (msg, nil),
+// while general errors are returned as (nil, err),
+// and partition-specific errors are returned as (msg, err) where
+// msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
+//
+// All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.
+//
+func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) {
+
+	var absTimeout time.Time
+	var timeoutMs int
+
+	if timeout > 0 {
+		absTimeout = time.Now().Add(timeout)
+		timeoutMs = (int)(timeout.Seconds() * 1000.0)
+	} else {
+		timeoutMs = (int)(timeout)
+	}
+
+	for {
+		ev := c.Poll(timeoutMs)
+
+		switch e := ev.(type) {
+		case *Message:
+			if e.TopicPartition.Error != nil {
+				return e, e.TopicPartition.Error
+			}
+			return e, nil
+		case Error:
+			return nil, e
+		default:
+			// Ignore other event types
+		}
+
+		if timeout > 0 {
+			// Calculate remaining time
+			timeoutMs = int(math.Max(0.0, absTimeout.Sub(time.Now()).Seconds()*1000.0))
+		}
+
+		if timeoutMs == 0 && ev == nil {
+			return nil, newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
+		}
+
+	}
+
+}
+
+// Close Consumer instance.
+// The object is no longer usable after this call.
+func (c *Consumer) Close() (err error) {
+
+	if c.eventsChanEnable {
+		// Wait for consumerReader() to terminate (by closing readerTermChan)
+		close(c.readerTermChan)
+		c.handle.waitTerminated(1)
+		close(c.events)
+	}
+
+	C.rd_kafka_queue_destroy(c.handle.rkq)
+	c.handle.rkq = nil
+
+	e := C.rd_kafka_consumer_close(c.handle.rk)
+	if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return newError(e)
+	}
+
+	c.handle.cleanup()
+
+	C.rd_kafka_destroy(c.handle.rk)
+
+	return nil
+}
+
+// NewConsumer creates a new high-level Consumer instance.
+//
+// Supported special configuration properties:
+//   go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
+//                                        If set to true the app must handle the AssignedPartitions and
+//                                        RevokedPartitions events and call Assign() and Unassign()
+//                                        respectively.
+//   go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
+//   go.events.channel.size (int, 1000) - Events() channel size
+//
+// WARNING: Due to the buffering nature of channels (and queues in general) the
+// use of the events channel risks receiving outdated events and
+// messages. Minimizing go.events.channel.size reduces the risk
+// and number of outdated events and messages but does not eliminate
+// the factor completely. With a channel size of 1 at most one
+// event or message may be outdated.
+func NewConsumer(conf *ConfigMap) (*Consumer, error) {
+
+	err := versionCheck()
+	if err != nil {
+		return nil, err
+	}
+
+	// before we do anything with the configuration, create a copy such that
+	// the original is not mutated.
+	confCopy := conf.clone()
+
+	groupid, _ := confCopy.get("group.id", nil)
+	if groupid == nil {
+		// without a group.id the underlying cgrp subsystem in librdkafka wont get started
+		// and without it there is no way to consume assigned partitions.
+		// So for now require the group.id, this might change in the future.
+		return nil, newErrorFromString(ErrInvalidArg, "Required property group.id not set")
+	}
+
+	c := &Consumer{}
+
+	v, err := confCopy.extract("go.application.rebalance.enable", false)
+	if err != nil {
+		return nil, err
+	}
+	c.appRebalanceEnable = v.(bool)
+
+	v, err = confCopy.extract("go.events.channel.enable", false)
+	if err != nil {
+		return nil, err
+	}
+	c.eventsChanEnable = v.(bool)
+
+	v, err = confCopy.extract("go.events.channel.size", 1000)
+	if err != nil {
+		return nil, err
+	}
+	eventsChanSize := v.(int)
+
+	cConf, err := confCopy.convert()
+	if err != nil {
+		return nil, err
+	}
+	cErrstr := (*C.char)(C.malloc(C.size_t(256)))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
+
+	c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)
+	if c.handle.rk == nil {
+		return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+	}
+
+	C.rd_kafka_poll_set_consumer(c.handle.rk)
+
+	c.handle.c = c
+	c.handle.setup()
+	c.handle.rkq = C.rd_kafka_queue_get_consumer(c.handle.rk)
+	if c.handle.rkq == nil {
+		// no cgrp (no group.id configured), revert to main queue.
+		c.handle.rkq = C.rd_kafka_queue_get_main(c.handle.rk)
+	}
+
+	if c.eventsChanEnable {
+		c.events = make(chan Event, eventsChanSize)
+		c.readerTermChan = make(chan bool)
+
+		/* Start rdkafka consumer queue reader -> events writer goroutine */
+		go consumerReader(c, c.readerTermChan)
+	}
+
+	return c, nil
+}
+
+// rebalance calls the application's rebalance callback, if any.
+// Returns true if the underlying assignment was updated, else false.
+func (c *Consumer) rebalance(ev Event) bool {
+	c.appReassigned = false
+
+	if c.rebalanceCb != nil {
+		c.rebalanceCb(c, ev)
+	}
+
+	return c.appReassigned
+}
+
+// consumerReader reads messages and events from the librdkafka consumer queue
+// and posts them on the consumer channel.
+// Runs until termChan closes
+func consumerReader(c *Consumer, termChan chan bool) {
+
+out:
+	for true {
+		select {
+		case _ = <-termChan:
+			break out
+		default:
+			_, term := c.handle.eventPoll(c.events, 100, 1000, termChan)
+			if term {
+				break out
+			}
+
+		}
+	}
+
+	c.handle.terminatedChan <- "consumerReader"
+	return
+
+}
+
+// GetMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
+func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+	return getMetadata(c, topic, allTopics, timeoutMs)
+}
+
+// QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
+// and partition.
+func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
+	return queryWatermarkOffsets(c, topic, partition, timeoutMs)
+}
+
+// OffsetsForTimes looks up offsets by timestamp for the given partitions.
+//
+// The returned offset for each partition is the earliest offset whose
+// timestamp is greater than or equal to the given timestamp in the
+// corresponding partition.
+//
+// The timestamps to query are represented as `.Offset` in the `times`
+// argument and the looked up offsets are represented as `.Offset` in the returned
+// `offsets` list.
+//
+// The function will block for at most timeoutMs milliseconds.
+//
+// Duplicate Topic+Partitions are not supported.
+// Per-partition errors may be returned in the `.Error` field.
+func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+	return offsetsForTimes(c, times, timeoutMs)
+}
+
+// Subscription returns the current subscription as set by Subscribe()
+func (c *Consumer) Subscription() (topics []string, err error) {
+	var cTopics *C.rd_kafka_topic_partition_list_t
+
+	cErr := C.rd_kafka_subscription(c.handle.rk, &cTopics)
+	if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return nil, newError(cErr)
+	}
+	defer C.rd_kafka_topic_partition_list_destroy(cTopics)
+
+	topicCnt := int(cTopics.cnt)
+	topics = make([]string, topicCnt)
+	for i := 0; i < topicCnt; i++ {
+		crktpar := C._c_rdkafka_topic_partition_list_entry(cTopics,
+			C.int(i))
+		topics[i] = C.GoString(crktpar.topic)
+	}
+
+	return topics, nil
+}
+
+// Assignment returns the current partition assignments
+func (c *Consumer) Assignment() (partitions []TopicPartition, err error) {
+	var cParts *C.rd_kafka_topic_partition_list_t
+
+	cErr := C.rd_kafka_assignment(c.handle.rk, &cParts)
+	if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return nil, newError(cErr)
+	}
+	defer C.rd_kafka_topic_partition_list_destroy(cParts)
+
+	partitions = newTopicPartitionsFromCparts(cParts)
+
+	return partitions, nil
+}
+
+// Committed retrieves committed offsets for the given set of partitions
+func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+	cparts := newCPartsFromTopicPartitions(partitions)
+	defer C.rd_kafka_topic_partition_list_destroy(cparts)
+	cerr := C.rd_kafka_committed(c.handle.rk, cparts, C.int(timeoutMs))
+	if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return nil, newError(cerr)
+	}
+
+	return newTopicPartitionsFromCparts(cparts), nil
+}
+
+// Pause consumption for the provided list of partitions
+//
+// Note that messages already enqueued on the consumer's Event channel
+// (if `go.events.channel.enable` has been set) will NOT be purged by
+// this call, set `go.events.channel.size` accordingly.
+func (c *Consumer) Pause(partitions []TopicPartition) (err error) {
+	cparts := newCPartsFromTopicPartitions(partitions)
+	defer C.rd_kafka_topic_partition_list_destroy(cparts)
+	cerr := C.rd_kafka_pause_partitions(c.handle.rk, cparts)
+	if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return newError(cerr)
+	}
+	return nil
+}
+
+// Resume consumption for the provided list of partitions
+func (c *Consumer) Resume(partitions []TopicPartition) (err error) {
+	cparts := newCPartsFromTopicPartitions(partitions)
+	defer C.rd_kafka_topic_partition_list_destroy(cparts)
+	cerr := C.rd_kafka_resume_partitions(c.handle.rk, cparts)
+	if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return newError(cerr)
+	}
+	return nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/error.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/error.go
new file mode 100644
index 0000000..70a435f
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/error.go
@@ -0,0 +1,77 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Automatically generate error codes from librdkafka
+// See README for instructions
+//go:generate $GOPATH/bin/go_rdkafka_generr generated_errors.go
+
+/*
+#include <librdkafka/rdkafka.h>
+*/
+import "C"
+
+// Error provides a Kafka-specific error container
+type Error struct {
+	code ErrorCode
+	str  string
+}
+
+func newError(code C.rd_kafka_resp_err_t) (err Error) {
+	return Error{ErrorCode(code), ""}
+}
+
+func newGoError(code ErrorCode) (err Error) {
+	return Error{code, ""}
+}
+
+func newErrorFromString(code ErrorCode, str string) (err Error) {
+	return Error{code, str}
+}
+
+func newErrorFromCString(code C.rd_kafka_resp_err_t, cstr *C.char) (err Error) {
+	var str string
+	if cstr != nil {
+		str = C.GoString(cstr)
+	} else {
+		str = ""
+	}
+	return Error{ErrorCode(code), str}
+}
+
+func newCErrorFromString(code C.rd_kafka_resp_err_t, str string) (err Error) {
+	return newErrorFromString(ErrorCode(code), str)
+}
+
+// Error returns a human readable representation of an Error
+// Same as Error.String()
+func (e Error) Error() string {
+	return e.String()
+}
+
+// String returns a human readable representation of an Error
+func (e Error) String() string {
+	if len(e.str) > 0 {
+		return e.str
+	}
+	return e.code.String()
+}
+
+// Code returns the ErrorCode of an Error
+func (e Error) Code() ErrorCode {
+	return e.code
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/event.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/event.go
new file mode 100644
index 0000000..8a89edb
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/event.go
@@ -0,0 +1,330 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+	"fmt"
+	"os"
+	"unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+#include "glue_rdkafka.h"
+
+
+#ifdef RD_KAFKA_V_HEADERS
+void chdrs_to_tmphdrs (rd_kafka_headers_t *chdrs, tmphdr_t *tmphdrs) {
+   size_t i = 0;
+   const char *name;
+   const void *val;
+   size_t size;
+
+   while (!rd_kafka_header_get_all(chdrs, i,
+                                   &tmphdrs[i].key,
+                                   &tmphdrs[i].val,
+                                   (size_t *)&tmphdrs[i].size))
+     i++;
+}
+#endif
+
+rd_kafka_event_t *_rk_queue_poll (rd_kafka_queue_t *rkq, int timeoutMs,
+                                  rd_kafka_event_type_t *evtype,
+                                  fetched_c_msg_t *fcMsg,
+                                  rd_kafka_event_t *prev_rkev) {
+    rd_kafka_event_t *rkev;
+
+    if (prev_rkev)
+      rd_kafka_event_destroy(prev_rkev);
+
+    rkev = rd_kafka_queue_poll(rkq, timeoutMs);
+    *evtype = rd_kafka_event_type(rkev);
+
+    if (*evtype == RD_KAFKA_EVENT_FETCH) {
+#ifdef RD_KAFKA_V_HEADERS
+        rd_kafka_headers_t *hdrs;
+#endif
+
+        fcMsg->msg = (rd_kafka_message_t *)rd_kafka_event_message_next(rkev);
+        fcMsg->ts = rd_kafka_message_timestamp(fcMsg->msg, &fcMsg->tstype);
+
+#ifdef RD_KAFKA_V_HEADERS
+        if (!rd_kafka_message_headers(fcMsg->msg, &hdrs)) {
+           fcMsg->tmphdrsCnt = rd_kafka_header_cnt(hdrs);
+           fcMsg->tmphdrs = malloc(sizeof(*fcMsg->tmphdrs) * fcMsg->tmphdrsCnt);
+           chdrs_to_tmphdrs(hdrs, fcMsg->tmphdrs);
+        } else {
+#else
+        if (1) {
+#endif
+           fcMsg->tmphdrs = NULL;
+           fcMsg->tmphdrsCnt = 0;
+        }
+    }
+    return rkev;
+}
+*/
+import "C"
+
+// Event generic interface
+type Event interface {
+	// String returns a human-readable representation of the event
+	String() string
+}
+
+// Specific event types
+
+// Stats statistics event
+type Stats struct {
+	statsJSON string
+}
+
+func (e Stats) String() string {
+	return e.statsJSON
+}
+
+// AssignedPartitions consumer group rebalance event: assigned partition set
+type AssignedPartitions struct {
+	Partitions []TopicPartition
+}
+
+func (e AssignedPartitions) String() string {
+	return fmt.Sprintf("AssignedPartitions: %v", e.Partitions)
+}
+
+// RevokedPartitions consumer group rebalance event: revoked partition set
+type RevokedPartitions struct {
+	Partitions []TopicPartition
+}
+
+func (e RevokedPartitions) String() string {
+	return fmt.Sprintf("RevokedPartitions: %v", e.Partitions)
+}
+
+// PartitionEOF consumer reached end of partition
+type PartitionEOF TopicPartition
+
+func (p PartitionEOF) String() string {
+	return fmt.Sprintf("EOF at %s", TopicPartition(p))
+}
+
+// OffsetsCommitted reports committed offsets
+type OffsetsCommitted struct {
+	Error   error
+	Offsets []TopicPartition
+}
+
+func (o OffsetsCommitted) String() string {
+	return fmt.Sprintf("OffsetsCommitted (%v, %v)", o.Error, o.Offsets)
+}
+
+// eventPoll polls an event from the handler's C rd_kafka_queue_t,
+// translates it into an Event type and then sends on `channel` if non-nil, else returns the Event.
+// term_chan is an optional channel to monitor along with producing to channel
+// to indicate that `channel` is being terminated.
+// returns (event Event, terminate Bool) tuple, where Terminate indicates
+// if termChan received a termination event.
+func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) {
+
+	var prevRkev *C.rd_kafka_event_t
+	term := false
+
+	var retval Event
+
+	if channel == nil {
+		maxEvents = 1
+	}
+out:
+	for evcnt := 0; evcnt < maxEvents; evcnt++ {
+		var evtype C.rd_kafka_event_type_t
+		var fcMsg C.fetched_c_msg_t
+		rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)
+		prevRkev = rkev
+		timeoutMs = 0
+
+		retval = nil
+
+		switch evtype {
+		case C.RD_KAFKA_EVENT_FETCH:
+			// Consumer fetch event, new message.
+			// Extracted into temporary fcMsg for optimization
+			retval = h.newMessageFromFcMsg(&fcMsg)
+
+		case C.RD_KAFKA_EVENT_REBALANCE:
+			// Consumer rebalance event
+			// If the app provided a RebalanceCb to Subscribe*() or
+			// has go.application.rebalance.enable=true we create an event
+			// and forward it to the application thru the RebalanceCb or the
+			// Events channel respectively.
+			// Since librdkafka requires the rebalance event to be "acked" by
+			// the application to synchronize state we keep track of if the
+			// application performed Assign() or Unassign(), but this only works for
+			// the non-channel case. For the channel case we assume the application
+			// calls Assign() / Unassign().
+			// Failure to do so will "hang" the consumer, e.g., it wont start consuming
+			// and it wont close cleanly, so this error case should be visible
+			// immediately to the application developer.
+			appReassigned := false
+			if C.rd_kafka_event_error(rkev) == C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS {
+				if h.currAppRebalanceEnable {
+					// Application must perform Assign() call
+					var ev AssignedPartitions
+					ev.Partitions = newTopicPartitionsFromCparts(C.rd_kafka_event_topic_partition_list(rkev))
+					if channel != nil || h.c.rebalanceCb == nil {
+						retval = ev
+						appReassigned = true
+					} else {
+						appReassigned = h.c.rebalance(ev)
+					}
+				}
+
+				if !appReassigned {
+					C.rd_kafka_assign(h.rk, C.rd_kafka_event_topic_partition_list(rkev))
+				}
+			} else {
+				if h.currAppRebalanceEnable {
+					// Application must perform Unassign() call
+					var ev RevokedPartitions
+					ev.Partitions = newTopicPartitionsFromCparts(C.rd_kafka_event_topic_partition_list(rkev))
+					if channel != nil || h.c.rebalanceCb == nil {
+						retval = ev
+						appReassigned = true
+					} else {
+						appReassigned = h.c.rebalance(ev)
+					}
+				}
+
+				if !appReassigned {
+					C.rd_kafka_assign(h.rk, nil)
+				}
+			}
+
+		case C.RD_KAFKA_EVENT_ERROR:
+			// Error event
+			cErr := C.rd_kafka_event_error(rkev)
+			switch cErr {
+			case C.RD_KAFKA_RESP_ERR__PARTITION_EOF:
+				crktpar := C.rd_kafka_event_topic_partition(rkev)
+				if crktpar == nil {
+					break
+				}
+
+				defer C.rd_kafka_topic_partition_destroy(crktpar)
+				var peof PartitionEOF
+				setupTopicPartitionFromCrktpar((*TopicPartition)(&peof), crktpar)
+
+				retval = peof
+			default:
+				retval = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
+			}
+
+		case C.RD_KAFKA_EVENT_STATS:
+			retval = &Stats{C.GoString(C.rd_kafka_event_stats(rkev))}
+
+		case C.RD_KAFKA_EVENT_DR:
+			// Producer Delivery Report event
+			// Each such event contains delivery reports for all
+			// messages in the produced batch.
+			// Forward delivery reports to per-message's response channel
+			// or to the global Producer.Events channel, or none.
+			rkmessages := make([]*C.rd_kafka_message_t, int(C.rd_kafka_event_message_count(rkev)))
+
+			cnt := int(C.rd_kafka_event_message_array(rkev, (**C.rd_kafka_message_t)(unsafe.Pointer(&rkmessages[0])), C.size_t(len(rkmessages))))
+
+			for _, rkmessage := range rkmessages[:cnt] {
+				msg := h.newMessageFromC(rkmessage)
+				var ch *chan Event
+
+				if rkmessage._private != nil {
+					// Find cgoif by id
+					cg, found := h.cgoGet((int)((uintptr)(rkmessage._private)))
+					if found {
+						cdr := cg.(cgoDr)
+
+						if cdr.deliveryChan != nil {
+							ch = &cdr.deliveryChan
+						}
+						msg.Opaque = cdr.opaque
+					}
+				}
+
+				if ch == nil && h.fwdDr {
+					ch = &channel
+				}
+
+				if ch != nil {
+					select {
+					case *ch <- msg:
+					case <-termChan:
+						break out
+					}
+
+				} else {
+					retval = msg
+					break out
+				}
+			}
+
+		case C.RD_KAFKA_EVENT_OFFSET_COMMIT:
+			// Offsets committed
+			cErr := C.rd_kafka_event_error(rkev)
+			coffsets := C.rd_kafka_event_topic_partition_list(rkev)
+			var offsets []TopicPartition
+			if coffsets != nil {
+				offsets = newTopicPartitionsFromCparts(coffsets)
+			}
+
+			if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+				retval = OffsetsCommitted{newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev)), offsets}
+			} else {
+				retval = OffsetsCommitted{nil, offsets}
+			}
+
+		case C.RD_KAFKA_EVENT_NONE:
+			// poll timed out: no events available
+			break out
+
+		default:
+			if rkev != nil {
+				fmt.Fprintf(os.Stderr, "Ignored event %s\n",
+					C.GoString(C.rd_kafka_event_name(rkev)))
+			}
+
+		}
+
+		if retval != nil {
+			if channel != nil {
+				select {
+				case channel <- retval:
+				case <-termChan:
+					retval = nil
+					term = true
+					break out
+				}
+			} else {
+				break out
+			}
+		}
+	}
+
+	if prevRkev != nil {
+		C.rd_kafka_event_destroy(prevRkev)
+	}
+
+	return retval, term
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go
new file mode 100644
index 0000000..b9f68d8
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go
@@ -0,0 +1,225 @@
+package kafka
+// Copyright 2016 Confluent Inc.
+// AUTOMATICALLY GENERATED BY /home/maglun/gocode/bin/go_rdkafka_generr ON 2018-10-11 09:26:58.938371378 +0200 CEST m=+0.001256618 USING librdkafka 0.11.5
+
+/*
+#include <librdkafka/rdkafka.h>
+*/
+import "C"
+
+// ErrorCode is the integer representation of local and broker error codes
+type ErrorCode int
+
+// String returns a human readable representation of an error code
+func (c ErrorCode) String() string {
+      return C.GoString(C.rd_kafka_err2str(C.rd_kafka_resp_err_t(c)))
+}
+
+const (
+    // ErrBadMsg Local: Bad message format
+    ErrBadMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_MSG)
+    // ErrBadCompression Local: Invalid compressed data
+    ErrBadCompression ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__BAD_COMPRESSION)
+    // ErrDestroy Local: Broker handle destroyed
+    ErrDestroy ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__DESTROY)
+    // ErrFail Local: Communication failure with broker
+    ErrFail ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FAIL)
+    // ErrTransport Local: Broker transport failure
+    ErrTransport ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TRANSPORT)
+    // ErrCritSysResource Local: Critical system resource failure
+    ErrCritSysResource ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE)
+    // ErrResolve Local: Host resolution failure
+    ErrResolve ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__RESOLVE)
+    // ErrMsgTimedOut Local: Message timed out
+    ErrMsgTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__MSG_TIMED_OUT)
+    // ErrPartitionEOF Broker: No more messages
+    ErrPartitionEOF ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTITION_EOF)
+    // ErrUnknownPartition Local: Unknown partition
+    ErrUnknownPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
+    // ErrFs Local: File or filesystem error
+    ErrFs ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__FS)
+    // ErrUnknownTopic Local: Unknown topic
+    ErrUnknownTopic ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
+    // ErrAllBrokersDown Local: All broker connections are down
+    ErrAllBrokersDown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)
+    // ErrInvalidArg Local: Invalid argument or configuration
+    ErrInvalidArg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_ARG)
+    // ErrTimedOut Local: Timed out
+    ErrTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
+    // ErrQueueFull Local: Queue full
+    ErrQueueFull ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__QUEUE_FULL)
+    // ErrIsrInsuff Local: ISR count insufficient
+    ErrIsrInsuff ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ISR_INSUFF)
+    // ErrNodeUpdate Local: Broker node update
+    ErrNodeUpdate ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NODE_UPDATE)
+    // ErrSsl Local: SSL error
+    ErrSsl ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__SSL)
+    // ErrWaitCoord Local: Waiting for coordinator
+    ErrWaitCoord ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_COORD)
+    // ErrUnknownGroup Local: Unknown group
+    ErrUnknownGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_GROUP)
+    // ErrInProgress Local: Operation in progress
+    ErrInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__IN_PROGRESS)
+    // ErrPrevInProgress Local: Previous operation in progress
+    ErrPrevInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS)
+    // ErrExistingSubscription Local: Existing subscription
+    ErrExistingSubscription ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION)
+    // ErrAssignPartitions Local: Assign partitions
+    ErrAssignPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
+    // ErrRevokePartitions Local: Revoke partitions
+    ErrRevokePartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS)
+    // ErrConflict Local: Conflicting use
+    ErrConflict ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__CONFLICT)
+    // ErrState Local: Erroneous state
+    ErrState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__STATE)
+    // ErrUnknownProtocol Local: Unknown protocol
+    ErrUnknownProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL)
+    // ErrNotImplemented Local: Not implemented
+    ErrNotImplemented ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED)
+    // ErrAuthentication Local: Authentication failure
+    ErrAuthentication ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__AUTHENTICATION)
+    // ErrNoOffset Local: No offset stored
+    ErrNoOffset ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NO_OFFSET)
+    // ErrOutdated Local: Outdated
+    ErrOutdated ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__OUTDATED)
+    // ErrTimedOutQueue Local: Timed out in queue
+    ErrTimedOutQueue ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE)
+    // ErrUnsupportedFeature Local: Required feature not supported by broker
+    ErrUnsupportedFeature ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE)
+    // ErrWaitCache Local: Awaiting cache update
+    ErrWaitCache ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__WAIT_CACHE)
+    // ErrIntr Local: Operation interrupted
+    ErrIntr ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INTR)
+    // ErrKeySerialization Local: Key serialization error
+    ErrKeySerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_SERIALIZATION)
+    // ErrValueSerialization Local: Value serialization error
+    ErrValueSerialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_SERIALIZATION)
+    // ErrKeyDeserialization Local: Key deserialization error
+    ErrKeyDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__KEY_DESERIALIZATION)
+    // ErrValueDeserialization Local: Value deserialization error
+    ErrValueDeserialization ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__VALUE_DESERIALIZATION)
+    // ErrPartial Local: Partial response
+    ErrPartial ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__PARTIAL)
+    // ErrReadOnly Local: Read-only object
+    ErrReadOnly ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__READ_ONLY)
+    // ErrNoent Local: No such entry
+    ErrNoent ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__NOENT)
+    // ErrUnderflow Local: Read underflow
+    ErrUnderflow ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__UNDERFLOW)
+    // ErrInvalidType Local: Invalid type
+    ErrInvalidType ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR__INVALID_TYPE)
+    // ErrUnknown Unknown broker error
+    ErrUnknown ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN)
+    // ErrNoError Success
+    ErrNoError ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NO_ERROR)
+    // ErrOffsetOutOfRange Broker: Offset out of range
+    ErrOffsetOutOfRange ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE)
+    // ErrInvalidMsg Broker: Invalid message
+    ErrInvalidMsg ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG)
+    // ErrUnknownTopicOrPart Broker: Unknown topic or partition
+    ErrUnknownTopicOrPart ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
+    // ErrInvalidMsgSize Broker: Invalid message size
+    ErrInvalidMsgSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE)
+    // ErrLeaderNotAvailable Broker: Leader not available
+    ErrLeaderNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE)
+    // ErrNotLeaderForPartition Broker: Not leader for partition
+    ErrNotLeaderForPartition ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION)
+    // ErrRequestTimedOut Broker: Request timed out
+    ErrRequestTimedOut ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT)
+    // ErrBrokerNotAvailable Broker: Broker not available
+    ErrBrokerNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE)
+    // ErrReplicaNotAvailable Broker: Replica not available
+    ErrReplicaNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE)
+    // ErrMsgSizeTooLarge Broker: Message size too large
+    ErrMsgSizeTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE)
+    // ErrStaleCtrlEpoch Broker: StaleControllerEpochCode
+    ErrStaleCtrlEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH)
+    // ErrOffsetMetadataTooLarge Broker: Offset metadata string too large
+    ErrOffsetMetadataTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE)
+    // ErrNetworkException Broker: Broker disconnected before response received
+    ErrNetworkException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION)
+    // ErrGroupLoadInProgress Broker: Group coordinator load in progress
+    ErrGroupLoadInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS)
+    // ErrGroupCoordinatorNotAvailable Broker: Group coordinator not available
+    ErrGroupCoordinatorNotAvailable ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE)
+    // ErrNotCoordinatorForGroup Broker: Not coordinator for group
+    ErrNotCoordinatorForGroup ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP)
+    // ErrTopicException Broker: Invalid topic
+    ErrTopicException ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION)
+    // ErrRecordListTooLarge Broker: Message batch larger than configured server segment size
+    ErrRecordListTooLarge ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE)
+    // ErrNotEnoughReplicas Broker: Not enough in-sync replicas
+    ErrNotEnoughReplicas ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS)
+    // ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas
+    ErrNotEnoughReplicasAfterAppend ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND)
+    // ErrInvalidRequiredAcks Broker: Invalid required acks value
+    ErrInvalidRequiredAcks ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS)
+    // ErrIllegalGeneration Broker: Specified group generation id is not valid
+    ErrIllegalGeneration ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
+    // ErrInconsistentGroupProtocol Broker: Inconsistent group protocol
+    ErrInconsistentGroupProtocol ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL)
+    // ErrInvalidGroupID Broker: Invalid group.id
+    ErrInvalidGroupID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_GROUP_ID)
+    // ErrUnknownMemberID Broker: Unknown member
+    ErrUnknownMemberID ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
+    // ErrInvalidSessionTimeout Broker: Invalid session timeout
+    ErrInvalidSessionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT)
+    // ErrRebalanceInProgress Broker: Group rebalance in progress
+    ErrRebalanceInProgress ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS)
+    // ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid
+    ErrInvalidCommitOffsetSize ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE)
+    // ErrTopicAuthorizationFailed Broker: Topic authorization failed
+    ErrTopicAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED)
+    // ErrGroupAuthorizationFailed Broker: Group authorization failed
+    ErrGroupAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED)
+    // ErrClusterAuthorizationFailed Broker: Cluster authorization failed
+    ErrClusterAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED)
+    // ErrInvalidTimestamp Broker: Invalid timestamp
+    ErrInvalidTimestamp ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP)
+    // ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism
+    ErrUnsupportedSaslMechanism ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM)
+    // ErrIllegalSaslState Broker: Request not valid in current SASL state
+    ErrIllegalSaslState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE)
+    // ErrUnsupportedVersion Broker: API version not supported
+    ErrUnsupportedVersion ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION)
+    // ErrTopicAlreadyExists Broker: Topic already exists
+    ErrTopicAlreadyExists ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS)
+    // ErrInvalidPartitions Broker: Invalid number of partitions
+    ErrInvalidPartitions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PARTITIONS)
+    // ErrInvalidReplicationFactor Broker: Invalid replication factor
+    ErrInvalidReplicationFactor ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR)
+    // ErrInvalidReplicaAssignment Broker: Invalid replica assignment
+    ErrInvalidReplicaAssignment ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT)
+    // ErrInvalidConfig Broker: Configuration is invalid
+    ErrInvalidConfig ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_CONFIG)
+    // ErrNotController Broker: Not controller for cluster
+    ErrNotController ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_NOT_CONTROLLER)
+    // ErrInvalidRequest Broker: Invalid request
+    ErrInvalidRequest ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_REQUEST)
+    // ErrUnsupportedForMessageFormat Broker: Message format on broker does not support request
+    ErrUnsupportedForMessageFormat ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT)
+    // ErrPolicyViolation Broker: Isolation policy volation
+    ErrPolicyViolation ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_POLICY_VIOLATION)
+    // ErrOutOfOrderSequenceNumber Broker: Broker received an out of order sequence number
+    ErrOutOfOrderSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER)
+    // ErrDuplicateSequenceNumber Broker: Broker received a duplicate sequence number
+    ErrDuplicateSequenceNumber ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER)
+    // ErrInvalidProducerEpoch Broker: Producer attempted an operation with an old epoch
+    ErrInvalidProducerEpoch ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH)
+    // ErrInvalidTxnState Broker: Producer attempted a transactional operation in an invalid state
+    ErrInvalidTxnState ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TXN_STATE)
+    // ErrInvalidProducerIDMapping Broker: Producer attempted to use a producer id which is not currently assigned to its transactional id
+    ErrInvalidProducerIDMapping ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING)
+    // ErrInvalidTransactionTimeout Broker: Transaction timeout is larger than the maximum value allowed by the broker's max.transaction.timeout.ms
+    ErrInvalidTransactionTimeout ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT)
+    // ErrConcurrentTransactions Broker: Producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing
+    ErrConcurrentTransactions ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS)
+    // ErrTransactionCoordinatorFenced Broker: Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer
+    ErrTransactionCoordinatorFenced ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTION_COORDINATOR_FENCED)
+    // ErrTransactionalIDAuthorizationFailed Broker: Transactional Id authorization failed
+    ErrTransactionalIDAuthorizationFailed ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED)
+    // ErrSecurityDisabled Broker: Security features are disabled
+    ErrSecurityDisabled ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_SECURITY_DISABLED)
+    // ErrOperationNotAttempted Broker: Operation not attempted
+    ErrOperationNotAttempted ErrorCode = ErrorCode(C.RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED)
+)
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/glue_rdkafka.h b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/glue_rdkafka.h
new file mode 100644
index 0000000..adcef9a
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/glue_rdkafka.h
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+
+/**
+ * Glue between Go, Cgo and librdkafka
+ */
+
+
+/**
+ * Temporary C to Go header representation
+ */
+typedef struct tmphdr_s {
+  const char *key;
+  const void *val;   // producer: malloc()ed by Go code if size > 0
+                     // consumer: owned by librdkafka
+  ssize_t     size;
+} tmphdr_t;
+
+
+
+/**
+ * Represents a fetched C message, with all extra fields extracted
+ * to struct fields.
+ */
+typedef struct fetched_c_msg {
+  rd_kafka_message_t *msg;
+  rd_kafka_timestamp_type_t tstype;
+  int64_t ts;
+  tmphdr_t *tmphdrs;
+  size_t    tmphdrsCnt;
+} fetched_c_msg_t;
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/handle.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/handle.go
new file mode 100644
index 0000000..c09e64d
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/handle.go
@@ -0,0 +1,207 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+	"fmt"
+	"sync"
+	"unsafe"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+#include <stdlib.h>
+*/
+import "C"
+
+// Handle represents a generic client handle containing common parts for
+// both Producer and Consumer.
+type Handle interface {
+	gethandle() *handle
+}
+
+// Common instance handle for both Producer and Consumer
+type handle struct {
+	rk  *C.rd_kafka_t
+	rkq *C.rd_kafka_queue_t
+
+	// Termination of background go-routines
+	terminatedChan chan string // string is go-routine name
+
+	// Topic <-> rkt caches
+	rktCacheLock sync.Mutex
+	// topic name -> rkt cache
+	rktCache map[string]*C.rd_kafka_topic_t
+	// rkt -> topic name cache
+	rktNameCache map[*C.rd_kafka_topic_t]string
+
+	//
+	// cgo map
+	// Maps C callbacks based on cgoid back to its Go object
+	cgoLock   sync.Mutex
+	cgoidNext uintptr
+	cgomap    map[int]cgoif
+
+	//
+	// producer
+	//
+	p *Producer
+
+	// Forward delivery reports on Producer.Events channel
+	fwdDr bool
+
+	//
+	// consumer
+	//
+	c *Consumer
+
+	// Forward rebalancing ack responsibility to application (current setting)
+	currAppRebalanceEnable bool
+}
+
+func (h *handle) String() string {
+	return C.GoString(C.rd_kafka_name(h.rk))
+}
+
+func (h *handle) setup() {
+	h.rktCache = make(map[string]*C.rd_kafka_topic_t)
+	h.rktNameCache = make(map[*C.rd_kafka_topic_t]string)
+	h.cgomap = make(map[int]cgoif)
+	h.terminatedChan = make(chan string, 10)
+}
+
+func (h *handle) cleanup() {
+	for _, crkt := range h.rktCache {
+		C.rd_kafka_topic_destroy(crkt)
+	}
+
+	if h.rkq != nil {
+		C.rd_kafka_queue_destroy(h.rkq)
+	}
+}
+
+// waitTerminated waits termination of background go-routines.
+// termCnt is the number of goroutines expected to signal termination completion
+// on h.terminatedChan
+func (h *handle) waitTerminated(termCnt int) {
+	// Wait for termCnt termination-done events from goroutines
+	for ; termCnt > 0; termCnt-- {
+		_ = <-h.terminatedChan
+	}
+}
+
+// getRkt0 finds or creates and returns a C topic_t object from the local cache.
+func (h *handle) getRkt0(topic string, ctopic *C.char, doLock bool) (crkt *C.rd_kafka_topic_t) {
+	if doLock {
+		h.rktCacheLock.Lock()
+		defer h.rktCacheLock.Unlock()
+	}
+	crkt, ok := h.rktCache[topic]
+	if ok {
+		return crkt
+	}
+
+	if ctopic == nil {
+		ctopic = C.CString(topic)
+		defer C.free(unsafe.Pointer(ctopic))
+	}
+
+	crkt = C.rd_kafka_topic_new(h.rk, ctopic, nil)
+	if crkt == nil {
+		panic(fmt.Sprintf("Unable to create new C topic \"%s\": %s",
+			topic, C.GoString(C.rd_kafka_err2str(C.rd_kafka_last_error()))))
+	}
+
+	h.rktCache[topic] = crkt
+	h.rktNameCache[crkt] = topic
+
+	return crkt
+}
+
+// getRkt finds or creates and returns a C topic_t object from the local cache.
+func (h *handle) getRkt(topic string) (crkt *C.rd_kafka_topic_t) {
+	return h.getRkt0(topic, nil, true)
+}
+
+// getTopicNameFromRkt returns the topic name for a C topic_t object, preferably
+// using the local cache to avoid a cgo call.
+func (h *handle) getTopicNameFromRkt(crkt *C.rd_kafka_topic_t) (topic string) {
+	h.rktCacheLock.Lock()
+	defer h.rktCacheLock.Unlock()
+
+	topic, ok := h.rktNameCache[crkt]
+	if ok {
+		return topic
+	}
+
+	// we need our own copy/refcount of the crkt
+	ctopic := C.rd_kafka_topic_name(crkt)
+	topic = C.GoString(ctopic)
+
+	crkt = h.getRkt0(topic, ctopic, false /* dont lock */)
+
+	return topic
+}
+
+// cgoif is a generic interface for holding Go state passed as opaque
+// value to the C code.
+// Since pointers to complex Go types cannot be passed to C we instead create
+// a cgoif object, generate a unique id that is added to the cgomap,
+// and then pass that id to the C code. When the C code callback is called we
+// use the id to look up the cgoif object in the cgomap.
+type cgoif interface{}
+
+// delivery report cgoif container
+type cgoDr struct {
+	deliveryChan chan Event
+	opaque       interface{}
+}
+
+// cgoPut adds object cg to the handle's cgo map and returns a
+// unique id for the added entry.
+// Thread-safe.
+// FIXME: the uniquity of the id is questionable over time.
+func (h *handle) cgoPut(cg cgoif) (cgoid int) {
+	h.cgoLock.Lock()
+	defer h.cgoLock.Unlock()
+
+	h.cgoidNext++
+	if h.cgoidNext == 0 {
+		h.cgoidNext++
+	}
+	cgoid = (int)(h.cgoidNext)
+	h.cgomap[cgoid] = cg
+	return cgoid
+}
+
+// cgoGet looks up cgoid in the cgo map, deletes the reference from the map
+// and returns the object, if found. Else returns nil, false.
+// Thread-safe.
+func (h *handle) cgoGet(cgoid int) (cg cgoif, found bool) {
+	if cgoid == 0 {
+		return nil, false
+	}
+
+	h.cgoLock.Lock()
+	defer h.cgoLock.Unlock()
+	cg, found = h.cgomap[cgoid]
+	if found {
+		delete(h.cgomap, cgoid)
+	}
+
+	return cg, found
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/header.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/header.go
new file mode 100644
index 0000000..67d6202
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/header.go
@@ -0,0 +1,67 @@
+package kafka
+
+/**
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+	"fmt"
+	"strconv"
+)
+
+/*
+#include <string.h>
+#include <librdkafka/rdkafka.h>
+#include "glue_rdkafka.h"
+*/
+import "C"
+
+// Header represents a single Kafka message header.
+//
+// Message headers are made up of a list of Header elements, retaining their original insert
+// order and allowing for duplicate Keys.
+//
+// Key is a human readable string identifying the header.
+// Value is the key's binary value, Kafka does not put any restrictions on the format of
+// of the Value but it should be made relatively compact.
+// The value may be a byte array, empty, or nil.
+//
+// NOTE: Message headers are not available on producer delivery report messages.
+type Header struct {
+	Key   string // Header name (utf-8 string)
+	Value []byte // Header value (nil, empty, or binary)
+}
+
+// String returns the Header Key and data in a human representable possibly truncated form
+// suitable for displaying to the user.
+func (h Header) String() string {
+	if h.Value == nil {
+		return fmt.Sprintf("%s=nil", h.Key)
+	}
+
+	valueLen := len(h.Value)
+	if valueLen == 0 {
+		return fmt.Sprintf("%s=<empty>", h.Key)
+	}
+
+	truncSize := valueLen
+	trunc := ""
+	if valueLen > 50+15 {
+		truncSize = 50
+		trunc = fmt.Sprintf("(%d more bytes)", valueLen-truncSize)
+	}
+
+	return fmt.Sprintf("%s=%s%s", h.Key, strconv.Quote(string(h.Value[:truncSize])), trunc)
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go
new file mode 100644
index 0000000..4883ee2
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go
@@ -0,0 +1,242 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package kafka provides high-level Apache Kafka producer and consumers
+// using bindings on-top of the librdkafka C library.
+//
+//
+// High-level Consumer
+//
+// * Decide if you want to read messages and events from the `.Events()` channel
+// (set `"go.events.channel.enable": true`) or by calling `.Poll()`.
+//
+// * Create a Consumer with `kafka.NewConsumer()` providing at
+// least the `bootstrap.servers` and `group.id` configuration properties.
+//
+// * Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics)
+// to join the group with the specified subscription set.
+// Subscriptions are atomic, calling `.Subscribe*()` again will leave
+// the group and rejoin with the new set of topics.
+//
+// * Start reading events and messages from either the `.Events` channel
+// or by calling `.Poll()`.
+//
+// * When the group has rebalanced each client member is assigned a
+// (sub-)set of topic+partitions.
+// By default the consumer will start fetching messages for its assigned
+// partitions at this point, but your application may enable rebalance
+// events to get an insight into what the assigned partitions where
+// as well as set the initial offsets. To do this you need to pass
+// `"go.application.rebalance.enable": true` to the `NewConsumer()` call
+// mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event
+// with the assigned partition set. You can optionally modify the initial
+// offsets (they'll default to stored offsets and if there are no previously stored
+// offsets it will fall back to `"default.topic.config": ConfigMap{"auto.offset.reset": ..}`
+// which defaults to the `latest` message) and then call `.Assign(partitions)`
+// to start consuming. If you don't need to modify the initial offsets you will
+// not need to call `.Assign()`, the client will do so automatically for you if
+// you dont.
+//
+// * As messages are fetched they will be made available on either the
+// `.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.
+//
+// * Handle messages, events and errors to your liking.
+//
+// * When you are done consuming call `.Close()` to commit final offsets
+// and leave the consumer group.
+//
+//
+//
+// Producer
+//
+// * Create a Producer with `kafka.NewProducer()` providing at least
+// the `bootstrap.servers` configuration properties.
+//
+// * Messages may now be produced either by sending a `*kafka.Message`
+// on the `.ProduceChannel` or by calling `.Produce()`.
+//
+// * Producing is an asynchronous operation so the client notifies the application
+// of per-message produce success or failure through something called delivery reports.
+// Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message`
+// and you should check `msg.TopicPartition.Error` for `nil` to find out if the message
+// was succesfully delivered or not.
+// It is also possible to direct delivery reports to alternate channels
+// by providing a non-nil `chan Event` channel to `.Produce()`.
+// If no delivery reports are wanted they can be completely disabled by
+// setting configuration property `"go.delivery.reports": false`.
+//
+// * When you are done producing messages you will need to make sure all messages
+// are indeed delivered to the broker (or failed), remember that this is
+// an asynchronous client so some of your messages may be lingering in internal
+// channels or tranmission queues.
+// To do this you can either keep track of the messages you've produced
+// and wait for their corresponding delivery reports, or call the convenience
+// function `.Flush()` that will block until all message deliveries are done
+// or the provided timeout elapses.
+//
+// * Finally call `.Close()` to decommission the producer.
+//
+//
+// Events
+//
+// Apart from emitting messages and delivery reports the client also communicates
+// with the application through a number of different event types.
+// An application may choose to handle or ignore these events.
+//
+// Consumer events
+//
+// * `*kafka.Message` - a fetched message.
+//
+// * `AssignedPartitions` - The assigned partition set for this client following a rebalance.
+// Requires `go.application.rebalance.enable`
+//
+// * `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance.
+// `AssignedPartitions` and `RevokedPartitions` are symetrical.
+// Requires `go.application.rebalance.enable`
+//
+// * `PartitionEOF` - Consumer has reached the end of a partition.
+// NOTE: The consumer will keep trying to fetch new messages for the partition.
+//
+// * `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).
+//
+//
+// Producer events
+//
+// * `*kafka.Message` - delivery report for produced message.
+// Check `.TopicPartition.Error` for delivery result.
+//
+//
+// Generic events for both Consumer and Producer
+//
+// * `KafkaError` - client (error codes are prefixed with _) or broker error.
+// These errors are normally just informational since the
+// client will try its best to automatically recover (eventually).
+//
+//
+// Hint: If your application registers a signal notification
+// (signal.Notify) makes sure the signals channel is buffered to avoid
+// possible complications with blocking Poll() calls.
+//
+// Note: The Confluent Kafka Go client is safe for concurrent use.
+package kafka
+
+import (
+	"fmt"
+	"unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <string.h>
+#include <librdkafka/rdkafka.h>
+
+static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) {
+   return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL;
+}
+*/
+import "C"
+
+// PartitionAny represents any partition (for partitioning),
+// or unspecified value (for all other cases)
+const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)
+
+// TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.
+type TopicPartition struct {
+	Topic     *string
+	Partition int32
+	Offset    Offset
+	Error     error
+}
+
+func (p TopicPartition) String() string {
+	topic := "<null>"
+	if p.Topic != nil {
+		topic = *p.Topic
+	}
+	if p.Error != nil {
+		return fmt.Sprintf("%s[%d]@%s(%s)",
+			topic, p.Partition, p.Offset, p.Error)
+	}
+	return fmt.Sprintf("%s[%d]@%s",
+		topic, p.Partition, p.Offset)
+}
+
+// TopicPartitions is a slice of TopicPartitions that also implements
+// the sort interface
+type TopicPartitions []TopicPartition
+
+func (tps TopicPartitions) Len() int {
+	return len(tps)
+}
+
+func (tps TopicPartitions) Less(i, j int) bool {
+	if *tps[i].Topic < *tps[j].Topic {
+		return true
+	} else if *tps[i].Topic > *tps[j].Topic {
+		return false
+	}
+	return tps[i].Partition < tps[j].Partition
+}
+
+func (tps TopicPartitions) Swap(i, j int) {
+	tps[i], tps[j] = tps[j], tps[i]
+}
+
+// new_cparts_from_TopicPartitions creates a new C rd_kafka_topic_partition_list_t
+// from a TopicPartition array.
+func newCPartsFromTopicPartitions(partitions []TopicPartition) (cparts *C.rd_kafka_topic_partition_list_t) {
+	cparts = C.rd_kafka_topic_partition_list_new(C.int(len(partitions)))
+	for _, part := range partitions {
+		ctopic := C.CString(*part.Topic)
+		defer C.free(unsafe.Pointer(ctopic))
+		rktpar := C.rd_kafka_topic_partition_list_add(cparts, ctopic, C.int32_t(part.Partition))
+		rktpar.offset = C.int64_t(part.Offset)
+	}
+
+	return cparts
+}
+
+func setupTopicPartitionFromCrktpar(partition *TopicPartition, crktpar *C.rd_kafka_topic_partition_t) {
+
+	topic := C.GoString(crktpar.topic)
+	partition.Topic = &topic
+	partition.Partition = int32(crktpar.partition)
+	partition.Offset = Offset(crktpar.offset)
+	if crktpar.err != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		partition.Error = newError(crktpar.err)
+	}
+}
+
+func newTopicPartitionsFromCparts(cparts *C.rd_kafka_topic_partition_list_t) (partitions []TopicPartition) {
+
+	partcnt := int(cparts.cnt)
+
+	partitions = make([]TopicPartition, partcnt)
+	for i := 0; i < partcnt; i++ {
+		crktpar := C._c_rdkafka_topic_partition_list_entry(cparts, C.int(i))
+		setupTopicPartitionFromCrktpar(&partitions[i], crktpar)
+	}
+
+	return partitions
+}
+
+// LibraryVersion returns the underlying librdkafka library version as a
+// (version_int, version_str) tuple.
+func LibraryVersion() (int, string) {
+	ver := (int)(C.rd_kafka_version())
+	verstr := C.GoString(C.rd_kafka_version_str())
+	return ver, verstr
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/message.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/message.go
new file mode 100644
index 0000000..3472d1c
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/message.go
@@ -0,0 +1,207 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+	"fmt"
+	"time"
+	"unsafe"
+)
+
+/*
+#include <string.h>
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+#include "glue_rdkafka.h"
+
+void setup_rkmessage (rd_kafka_message_t *rkmessage,
+                      rd_kafka_topic_t *rkt, int32_t partition,
+                      const void *payload, size_t len,
+                      void *key, size_t keyLen, void *opaque) {
+     rkmessage->rkt       = rkt;
+     rkmessage->partition = partition;
+     rkmessage->payload   = (void *)payload;
+     rkmessage->len       = len;
+     rkmessage->key       = (void *)key;
+     rkmessage->key_len   = keyLen;
+     rkmessage->_private  = opaque;
+}
+*/
+import "C"
+
+// TimestampType is a the Message timestamp type or source
+//
+type TimestampType int
+
+const (
+	// TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support
+	TimestampNotAvailable = TimestampType(C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
+	// TimestampCreateTime indicates timestamp set by producer (source time)
+	TimestampCreateTime = TimestampType(C.RD_KAFKA_TIMESTAMP_CREATE_TIME)
+	// TimestampLogAppendTime indicates timestamp set set by broker (store time)
+	TimestampLogAppendTime = TimestampType(C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME)
+)
+
+func (t TimestampType) String() string {
+	switch t {
+	case TimestampCreateTime:
+		return "CreateTime"
+	case TimestampLogAppendTime:
+		return "LogAppendTime"
+	case TimestampNotAvailable:
+		fallthrough
+	default:
+		return "NotAvailable"
+	}
+}
+
+// Message represents a Kafka message
+type Message struct {
+	TopicPartition TopicPartition
+	Value          []byte
+	Key            []byte
+	Timestamp      time.Time
+	TimestampType  TimestampType
+	Opaque         interface{}
+	Headers        []Header
+}
+
+// String returns a human readable representation of a Message.
+// Key and payload are not represented.
+func (m *Message) String() string {
+	var topic string
+	if m.TopicPartition.Topic != nil {
+		topic = *m.TopicPartition.Topic
+	} else {
+		topic = ""
+	}
+	return fmt.Sprintf("%s[%d]@%s", topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
+}
+
+func (h *handle) getRktFromMessage(msg *Message) (crkt *C.rd_kafka_topic_t) {
+	if msg.TopicPartition.Topic == nil {
+		return nil
+	}
+
+	return h.getRkt(*msg.TopicPartition.Topic)
+}
+
+func (h *handle) newMessageFromFcMsg(fcMsg *C.fetched_c_msg_t) (msg *Message) {
+	msg = &Message{}
+
+	if fcMsg.ts != -1 {
+		ts := int64(fcMsg.ts)
+		msg.TimestampType = TimestampType(fcMsg.tstype)
+		msg.Timestamp = time.Unix(ts/1000, (ts%1000)*1000000)
+	}
+
+	if fcMsg.tmphdrsCnt > 0 {
+		msg.Headers = make([]Header, fcMsg.tmphdrsCnt)
+		for n := range msg.Headers {
+			tmphdr := (*[1 << 30]C.tmphdr_t)(unsafe.Pointer(fcMsg.tmphdrs))[n]
+			msg.Headers[n].Key = C.GoString(tmphdr.key)
+			if tmphdr.val != nil {
+				msg.Headers[n].Value = C.GoBytes(unsafe.Pointer(tmphdr.val), C.int(tmphdr.size))
+			} else {
+				msg.Headers[n].Value = nil
+			}
+		}
+		C.free(unsafe.Pointer(fcMsg.tmphdrs))
+	}
+
+	h.setupMessageFromC(msg, fcMsg.msg)
+
+	return msg
+}
+
+// setupMessageFromC sets up a message object from a C rd_kafka_message_t
+func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) {
+	if cmsg.rkt != nil {
+		topic := h.getTopicNameFromRkt(cmsg.rkt)
+		msg.TopicPartition.Topic = &topic
+	}
+	msg.TopicPartition.Partition = int32(cmsg.partition)
+	if cmsg.payload != nil {
+		msg.Value = C.GoBytes(unsafe.Pointer(cmsg.payload), C.int(cmsg.len))
+	}
+	if cmsg.key != nil {
+		msg.Key = C.GoBytes(unsafe.Pointer(cmsg.key), C.int(cmsg.key_len))
+	}
+	msg.TopicPartition.Offset = Offset(cmsg.offset)
+	if cmsg.err != 0 {
+		msg.TopicPartition.Error = newError(cmsg.err)
+	}
+}
+
+// newMessageFromC creates a new message object from a C rd_kafka_message_t
+// NOTE: For use with Producer: does not set message timestamp fields.
+func (h *handle) newMessageFromC(cmsg *C.rd_kafka_message_t) (msg *Message) {
+	msg = &Message{}
+
+	h.setupMessageFromC(msg, cmsg)
+
+	return msg
+}
+
+// messageToC sets up cmsg as a clone of msg
+func (h *handle) messageToC(msg *Message, cmsg *C.rd_kafka_message_t) {
+	var valp unsafe.Pointer
+	var keyp unsafe.Pointer
+
+	// to circumvent Cgo constraints we need to allocate C heap memory
+	// for both Value and Key (one allocation back to back)
+	// and copy the bytes from Value and Key to the C memory.
+	// We later tell librdkafka (in produce()) to free the
+	// C memory pointer when it is done.
+	var payload unsafe.Pointer
+
+	valueLen := 0
+	keyLen := 0
+	if msg.Value != nil {
+		valueLen = len(msg.Value)
+	}
+	if msg.Key != nil {
+		keyLen = len(msg.Key)
+	}
+
+	allocLen := valueLen + keyLen
+	if allocLen > 0 {
+		payload = C.malloc(C.size_t(allocLen))
+		if valueLen > 0 {
+			copy((*[1 << 30]byte)(payload)[0:valueLen], msg.Value)
+			valp = payload
+		}
+		if keyLen > 0 {
+			copy((*[1 << 30]byte)(payload)[valueLen:allocLen], msg.Key)
+			keyp = unsafe.Pointer(&((*[1 << 31]byte)(payload)[valueLen]))
+		}
+	}
+
+	cmsg.rkt = h.getRktFromMessage(msg)
+	cmsg.partition = C.int32_t(msg.TopicPartition.Partition)
+	cmsg.payload = valp
+	cmsg.len = C.size_t(valueLen)
+	cmsg.key = keyp
+	cmsg.key_len = C.size_t(keyLen)
+	cmsg._private = nil
+}
+
+// used for testing messageToC performance
+func (h *handle) messageToCDummy(msg *Message) {
+	var cmsg C.rd_kafka_message_t
+	h.messageToC(msg, &cmsg)
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go
new file mode 100644
index 0000000..061147d
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go
@@ -0,0 +1,158 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+
+struct rd_kafka_metadata_broker *_getMetadata_broker_element(struct rd_kafka_metadata *m, int i) {
+  return &m->brokers[i];
+}
+
+struct rd_kafka_metadata_topic *_getMetadata_topic_element(struct rd_kafka_metadata *m, int i) {
+  return &m->topics[i];
+}
+
+struct rd_kafka_metadata_partition *_getMetadata_partition_element(struct rd_kafka_metadata *m, int topic_idx, int partition_idx) {
+  return &m->topics[topic_idx].partitions[partition_idx];
+}
+
+int32_t _get_int32_element (int32_t *arr, int i) {
+  return arr[i];
+}
+
+*/
+import "C"
+
+// BrokerMetadata contains per-broker metadata
+type BrokerMetadata struct {
+	ID   int32
+	Host string
+	Port int
+}
+
+// PartitionMetadata contains per-partition metadata
+type PartitionMetadata struct {
+	ID       int32
+	Error    Error
+	Leader   int32
+	Replicas []int32
+	Isrs     []int32
+}
+
+// TopicMetadata contains per-topic metadata
+type TopicMetadata struct {
+	Topic      string
+	Partitions []PartitionMetadata
+	Error      Error
+}
+
+// Metadata contains broker and topic metadata for all (matching) topics
+type Metadata struct {
+	Brokers []BrokerMetadata
+	Topics  map[string]TopicMetadata
+
+	OriginatingBroker BrokerMetadata
+}
+
+// getMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+func getMetadata(H Handle, topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+	h := H.gethandle()
+
+	var rkt *C.rd_kafka_topic_t
+	if topic != nil {
+		rkt = h.getRkt(*topic)
+	}
+
+	var cMd *C.struct_rd_kafka_metadata
+	cErr := C.rd_kafka_metadata(h.rk, bool2cint(allTopics),
+		rkt, &cMd, C.int(timeoutMs))
+	if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return nil, newError(cErr)
+	}
+
+	m := Metadata{}
+	defer C.rd_kafka_metadata_destroy(cMd)
+
+	m.Brokers = make([]BrokerMetadata, cMd.broker_cnt)
+	for i := 0; i < int(cMd.broker_cnt); i++ {
+		b := C._getMetadata_broker_element(cMd, C.int(i))
+		m.Brokers[i] = BrokerMetadata{int32(b.id), C.GoString(b.host),
+			int(b.port)}
+	}
+
+	m.Topics = make(map[string]TopicMetadata, int(cMd.topic_cnt))
+	for i := 0; i < int(cMd.topic_cnt); i++ {
+		t := C._getMetadata_topic_element(cMd, C.int(i))
+
+		thisTopic := C.GoString(t.topic)
+		m.Topics[thisTopic] = TopicMetadata{Topic: thisTopic,
+			Error:      newError(t.err),
+			Partitions: make([]PartitionMetadata, int(t.partition_cnt))}
+
+		for j := 0; j < int(t.partition_cnt); j++ {
+			p := C._getMetadata_partition_element(cMd, C.int(i), C.int(j))
+			m.Topics[thisTopic].Partitions[j] = PartitionMetadata{
+				ID:     int32(p.id),
+				Error:  newError(p.err),
+				Leader: int32(p.leader)}
+			m.Topics[thisTopic].Partitions[j].Replicas = make([]int32, int(p.replica_cnt))
+			for ir := 0; ir < int(p.replica_cnt); ir++ {
+				m.Topics[thisTopic].Partitions[j].Replicas[ir] = int32(C._get_int32_element(p.replicas, C.int(ir)))
+			}
+
+			m.Topics[thisTopic].Partitions[j].Isrs = make([]int32, int(p.isr_cnt))
+			for ii := 0; ii < int(p.isr_cnt); ii++ {
+				m.Topics[thisTopic].Partitions[j].Isrs[ii] = int32(C._get_int32_element(p.isrs, C.int(ii)))
+			}
+		}
+	}
+
+	m.OriginatingBroker = BrokerMetadata{int32(cMd.orig_broker_id),
+		C.GoString(cMd.orig_broker_name), 0}
+
+	return &m, nil
+}
+
+// queryWatermarkOffsets returns the broker's low and high offsets for the given topic
+// and partition.
+func queryWatermarkOffsets(H Handle, topic string, partition int32, timeoutMs int) (low, high int64, err error) {
+	h := H.gethandle()
+
+	ctopic := C.CString(topic)
+	defer C.free(unsafe.Pointer(ctopic))
+
+	var cLow, cHigh C.int64_t
+
+	e := C.rd_kafka_query_watermark_offsets(h.rk, ctopic, C.int32_t(partition),
+		&cLow, &cHigh, C.int(timeoutMs))
+	if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return 0, 0, newError(e)
+	}
+
+	low = int64(cLow)
+	high = int64(cHigh)
+	return low, high, nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/misc.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/misc.go
new file mode 100644
index 0000000..6d602ce
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/misc.go
@@ -0,0 +1,35 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import "C"
+
+// bool2int converts a bool to a C.int (1 or 0)
+func bool2cint(b bool) C.int {
+	if b {
+		return 1
+	}
+	return 0
+}
+
+// cint2bool converts a C.int to a bool
+func cint2bool(v C.int) bool {
+	if v == 0 {
+		return false
+	}
+	return true
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/offset.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/offset.go
new file mode 100644
index 0000000..5dd7fd2
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/offset.go
@@ -0,0 +1,144 @@
+/**
+ * Copyright 2017 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"fmt"
+	"strconv"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+
+static int64_t _c_rdkafka_offset_tail(int64_t rel) {
+   return RD_KAFKA_OFFSET_TAIL(rel);
+}
+*/
+import "C"
+
+// Offset type (int64) with support for canonical names
+type Offset int64
+
+// OffsetBeginning represents the earliest offset (logical)
+const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)
+
+// OffsetEnd represents the latest offset (logical)
+const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)
+
+// OffsetInvalid represents an invalid/unspecified offset
+const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)
+
+// OffsetStored represents a stored offset
+const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)
+
+func (o Offset) String() string {
+	switch o {
+	case OffsetBeginning:
+		return "beginning"
+	case OffsetEnd:
+		return "end"
+	case OffsetInvalid:
+		return "unset"
+	case OffsetStored:
+		return "stored"
+	default:
+		return fmt.Sprintf("%d", int64(o))
+	}
+}
+
+// Set offset value, see NewOffset()
+func (o *Offset) Set(offset interface{}) error {
+	n, err := NewOffset(offset)
+
+	if err == nil {
+		*o = n
+	}
+
+	return err
+}
+
+// NewOffset creates a new Offset using the provided logical string, or an
+// absolute int64 offset value.
+// Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
+func NewOffset(offset interface{}) (Offset, error) {
+
+	switch v := offset.(type) {
+	case string:
+		switch v {
+		case "beginning":
+			fallthrough
+		case "earliest":
+			return Offset(OffsetBeginning), nil
+
+		case "end":
+			fallthrough
+		case "latest":
+			return Offset(OffsetEnd), nil
+
+		case "unset":
+			fallthrough
+		case "invalid":
+			return Offset(OffsetInvalid), nil
+
+		case "stored":
+			return Offset(OffsetStored), nil
+
+		default:
+			off, err := strconv.Atoi(v)
+			return Offset(off), err
+		}
+
+	case int:
+		return Offset((int64)(v)), nil
+	case int64:
+		return Offset(v), nil
+	default:
+		return OffsetInvalid, newErrorFromString(ErrInvalidArg,
+			fmt.Sprintf("Invalid offset type: %t", v))
+	}
+}
+
+// OffsetTail returns the logical offset relativeOffset from current end of partition
+func OffsetTail(relativeOffset Offset) Offset {
+	return Offset(C._c_rdkafka_offset_tail(C.int64_t(relativeOffset)))
+}
+
+// offsetsForTimes looks up offsets by timestamp for the given partitions.
+//
+// The returned offset for each partition is the earliest offset whose
+// timestamp is greater than or equal to the given timestamp in the
+// corresponding partition.
+//
+// The timestamps to query are represented as `.Offset` in the `times`
+// argument and the looked up offsets are represented as `.Offset` in the returned
+// `offsets` list.
+//
+// The function will block for at most timeoutMs milliseconds.
+//
+// Duplicate Topic+Partitions are not supported.
+// Per-partition errors may be returned in the `.Error` field.
+func offsetsForTimes(H Handle, times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+	cparts := newCPartsFromTopicPartitions(times)
+	defer C.rd_kafka_topic_partition_list_destroy(cparts)
+	cerr := C.rd_kafka_offsets_for_times(H.gethandle().rk, cparts, C.int(timeoutMs))
+	if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		return nil, newError(cerr)
+	}
+
+	return newTopicPartitionsFromCparts(cparts), nil
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/producer.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/producer.go
new file mode 100644
index 0000000..7eac912
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/producer.go
@@ -0,0 +1,583 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"fmt"
+	"math"
+	"time"
+	"unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+#include "glue_rdkafka.h"
+
+
+#ifdef RD_KAFKA_V_HEADERS
+// Convert tmphdrs to chdrs (created by this function).
+// If tmphdr.size == -1: value is considered Null
+//    tmphdr.size == 0:  value is considered empty (ignored)
+//    tmphdr.size > 0:   value is considered non-empty
+//
+// WARNING: The header keys and values will be freed by this function.
+void tmphdrs_to_chdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt,
+                       rd_kafka_headers_t **chdrs) {
+   size_t i;
+
+   *chdrs = rd_kafka_headers_new(tmphdrsCnt);
+
+   for (i = 0 ; i < tmphdrsCnt ; i++) {
+      rd_kafka_header_add(*chdrs,
+                          tmphdrs[i].key, -1,
+                          tmphdrs[i].size == -1 ? NULL :
+                          (tmphdrs[i].size == 0 ? "" : tmphdrs[i].val),
+                          tmphdrs[i].size == -1 ? 0 : tmphdrs[i].size);
+      if (tmphdrs[i].size > 0)
+         free((void *)tmphdrs[i].val);
+      free((void *)tmphdrs[i].key);
+   }
+}
+
+#else
+void free_tmphdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt) {
+   size_t i;
+   for (i = 0 ; i < tmphdrsCnt ; i++) {
+      if (tmphdrs[i].size > 0)
+         free((void *)tmphdrs[i].val);
+      free((void *)tmphdrs[i].key);
+   }
+}
+#endif
+
+
+rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
+          rd_kafka_topic_t *rkt, int32_t partition,
+          int msgflags,
+          int valIsNull, void *val, size_t val_len,
+          int keyIsNull, void *key, size_t key_len,
+          int64_t timestamp,
+          tmphdr_t *tmphdrs, size_t tmphdrsCnt,
+          uintptr_t cgoid) {
+  void *valp = valIsNull ? NULL : val;
+  void *keyp = keyIsNull ? NULL : key;
+#ifdef RD_KAFKA_V_TIMESTAMP
+rd_kafka_resp_err_t err;
+#ifdef RD_KAFKA_V_HEADERS
+  rd_kafka_headers_t *hdrs = NULL;
+#endif
+#endif
+
+
+  if (tmphdrsCnt > 0) {
+#ifdef RD_KAFKA_V_HEADERS
+     tmphdrs_to_chdrs(tmphdrs, tmphdrsCnt, &hdrs);
+#else
+     free_tmphdrs(tmphdrs, tmphdrsCnt);
+     return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
+#endif
+  }
+
+
+#ifdef RD_KAFKA_V_TIMESTAMP
+  err = rd_kafka_producev(rk,
+        RD_KAFKA_V_RKT(rkt),
+        RD_KAFKA_V_PARTITION(partition),
+        RD_KAFKA_V_MSGFLAGS(msgflags),
+        RD_KAFKA_V_VALUE(valp, val_len),
+        RD_KAFKA_V_KEY(keyp, key_len),
+        RD_KAFKA_V_TIMESTAMP(timestamp),
+#ifdef RD_KAFKA_V_HEADERS
+        RD_KAFKA_V_HEADERS(hdrs),
+#endif
+        RD_KAFKA_V_OPAQUE((void *)cgoid),
+        RD_KAFKA_V_END);
+#ifdef RD_KAFKA_V_HEADERS
+  if (err && hdrs)
+    rd_kafka_headers_destroy(hdrs);
+#endif
+  return err;
+#else
+  if (timestamp)
+      return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
+  if (rd_kafka_produce(rkt, partition, msgflags,
+                       valp, val_len,
+                       keyp, key_len,
+                       (void *)cgoid) == -1)
+      return rd_kafka_last_error();
+  else
+      return RD_KAFKA_RESP_ERR_NO_ERROR;
+#endif
+}
+*/
+import "C"
+
+// Producer implements a High-level Apache Kafka Producer instance
+type Producer struct {
+	events         chan Event
+	produceChannel chan *Message
+	handle         handle
+
+	// Terminates the poller() goroutine
+	pollerTermChan chan bool
+}
+
+// String returns a human readable name for a Producer instance
+func (p *Producer) String() string {
+	return p.handle.String()
+}
+
+// get_handle implements the Handle interface
+func (p *Producer) gethandle() *handle {
+	return &p.handle
+}
+
+func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error {
+	if msg == nil || msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 {
+		return newErrorFromString(ErrInvalidArg, "")
+	}
+
+	crkt := p.handle.getRkt(*msg.TopicPartition.Topic)
+
+	// Three problems:
+	//  1) There's a difference between an empty Value or Key (length 0, proper pointer) and
+	//     a null Value or Key (length 0, null pointer).
+	//  2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0])
+	//     dereference can't be performed on a nil slice.
+	//  3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made
+	//     in the call to the C function.
+	//
+	// Solution:
+	//  Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers
+	//  point to a 1-byte slice (but the length to send is still 0) so that the dereference (2)
+	//  works.
+	//  Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point
+	//  to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3).
+	//
+	var valp []byte
+	var keyp []byte
+	oneByte := []byte{0}
+	var valIsNull C.int
+	var keyIsNull C.int
+	var valLen int
+	var keyLen int
+
+	if msg.Value == nil {
+		valIsNull = 1
+		valLen = 0
+		valp = oneByte
+	} else {
+		valLen = len(msg.Value)
+		if valLen > 0 {
+			valp = msg.Value
+		} else {
+			valp = oneByte
+		}
+	}
+
+	if msg.Key == nil {
+		keyIsNull = 1
+		keyLen = 0
+		keyp = oneByte
+	} else {
+		keyLen = len(msg.Key)
+		if keyLen > 0 {
+			keyp = msg.Key
+		} else {
+			keyp = oneByte
+		}
+	}
+
+	var cgoid int
+
+	// Per-message state that needs to be retained through the C code:
+	//   delivery channel (if specified)
+	//   message opaque   (if specified)
+	// Since these cant be passed as opaque pointers to the C code,
+	// due to cgo constraints, we add them to a per-producer map for lookup
+	// when the C code triggers the callbacks or events.
+	if deliveryChan != nil || msg.Opaque != nil {
+		cgoid = p.handle.cgoPut(cgoDr{deliveryChan: deliveryChan, opaque: msg.Opaque})
+	}
+
+	var timestamp int64
+	if !msg.Timestamp.IsZero() {
+		timestamp = msg.Timestamp.UnixNano() / 1000000
+	}
+
+	// Convert headers to C-friendly tmphdrs
+	var tmphdrs []C.tmphdr_t
+	tmphdrsCnt := len(msg.Headers)
+
+	if tmphdrsCnt > 0 {
+		tmphdrs = make([]C.tmphdr_t, tmphdrsCnt)
+
+		for n, hdr := range msg.Headers {
+			// Make a copy of the key
+			// to avoid runtime panic with
+			// foreign Go pointers in cgo.
+			tmphdrs[n].key = C.CString(hdr.Key)
+			if hdr.Value != nil {
+				tmphdrs[n].size = C.ssize_t(len(hdr.Value))
+				if tmphdrs[n].size > 0 {
+					// Make a copy of the value
+					// to avoid runtime panic with
+					// foreign Go pointers in cgo.
+					tmphdrs[n].val = C.CBytes(hdr.Value)
+				}
+			} else {
+				// null value
+				tmphdrs[n].size = C.ssize_t(-1)
+			}
+		}
+	} else {
+		// no headers, need a dummy tmphdrs of size 1 to avoid index
+		// out of bounds panic in do_produce() call below.
+		// tmphdrsCnt will be 0.
+		tmphdrs = []C.tmphdr_t{{nil, nil, 0}}
+	}
+
+	cErr := C.do_produce(p.handle.rk, crkt,
+		C.int32_t(msg.TopicPartition.Partition),
+		C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
+		valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
+		keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
+		C.int64_t(timestamp),
+		(*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
+		(C.uintptr_t)(cgoid))
+	if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		if cgoid != 0 {
+			p.handle.cgoGet(cgoid)
+		}
+		return newError(cErr)
+	}
+
+	return nil
+}
+
+// Produce single message.
+// This is an asynchronous call that enqueues the message on the internal
+// transmit queue, thus returning immediately.
+// The delivery report will be sent on the provided deliveryChan if specified,
+// or on the Producer object's Events() channel if not.
+// msg.Timestamp requires librdkafka >= 0.9.4 (else returns ErrNotImplemented),
+// api.version.request=true, and broker >= 0.10.0.0.
+// msg.Headers requires librdkafka >= 0.11.4 (else returns ErrNotImplemented),
+// api.version.request=true, and broker >= 0.11.0.0.
+// Returns an error if message could not be enqueued.
+func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error {
+	return p.produce(msg, 0, deliveryChan)
+}
+
+// Produce a batch of messages.
+// These batches do not relate to the message batches sent to the broker, the latter
+// are collected on the fly internally in librdkafka.
+// WARNING: This is an experimental API.
+// NOTE: timestamps and headers are not supported with this API.
+func (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) error {
+	crkt := p.handle.getRkt(topic)
+
+	cmsgs := make([]C.rd_kafka_message_t, len(msgs))
+	for i, m := range msgs {
+		p.handle.messageToC(m, &cmsgs[i])
+	}
+	r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE,
+		(*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs)))
+	if r == -1 {
+		return newError(C.rd_kafka_last_error())
+	}
+
+	return nil
+}
+
+// Events returns the Events channel (read)
+func (p *Producer) Events() chan Event {
+	return p.events
+}
+
+// ProduceChannel returns the produce *Message channel (write)
+func (p *Producer) ProduceChannel() chan *Message {
+	return p.produceChannel
+}
+
+// Len returns the number of messages and requests waiting to be transmitted to the broker
+// as well as delivery reports queued for the application.
+// Includes messages on ProduceChannel.
+func (p *Producer) Len() int {
+	return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk))
+}
+
+// Flush and wait for outstanding messages and requests to complete delivery.
+// Includes messages on ProduceChannel.
+// Runs until value reaches zero or on timeoutMs.
+// Returns the number of outstanding events still un-flushed.
+func (p *Producer) Flush(timeoutMs int) int {
+	termChan := make(chan bool) // unused stand-in termChan
+
+	d, _ := time.ParseDuration(fmt.Sprintf("%dms", timeoutMs))
+	tEnd := time.Now().Add(d)
+	for p.Len() > 0 {
+		remain := tEnd.Sub(time.Now()).Seconds()
+		if remain <= 0.0 {
+			return p.Len()
+		}
+
+		p.handle.eventPoll(p.events,
+			int(math.Min(100, remain*1000)), 1000, termChan)
+	}
+
+	return 0
+}
+
+// Close a Producer instance.
+// The Producer object or its channels are no longer usable after this call.
+func (p *Producer) Close() {
+	// Wait for poller() (signaled by closing pollerTermChan)
+	// and channel_producer() (signaled by closing ProduceChannel)
+	close(p.pollerTermChan)
+	close(p.produceChannel)
+	p.handle.waitTerminated(2)
+
+	close(p.events)
+
+	p.handle.cleanup()
+
+	C.rd_kafka_destroy(p.handle.rk)
+}
+
+// NewProducer creates a new high-level Producer instance.
+//
+// conf is a *ConfigMap with standard librdkafka configuration properties, see here:
+//
+//
+//
+//
+//
+// Supported special configuration properties:
+//   go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance).
+//                                     These batches do not relate to Kafka message batches in any way.
+//                                     Note: timestamps and headers are not supported with this interface.
+//   go.delivery.reports (bool, true) - Forward per-message delivery reports to the
+//                                      Events() channel.
+//   go.events.channel.size (int, 1000000) - Events() channel size
+//   go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
+//
+func NewProducer(conf *ConfigMap) (*Producer, error) {
+
+	err := versionCheck()
+	if err != nil {
+		return nil, err
+	}
+
+	p := &Producer{}
+
+	// before we do anything with the configuration, create a copy such that
+	// the original is not mutated.
+	confCopy := conf.clone()
+
+	v, err := confCopy.extract("go.batch.producer", false)
+	if err != nil {
+		return nil, err
+	}
+	batchProducer := v.(bool)
+
+	v, err = confCopy.extract("go.delivery.reports", true)
+	if err != nil {
+		return nil, err
+	}
+	p.handle.fwdDr = v.(bool)
+
+	v, err = confCopy.extract("go.events.channel.size", 1000000)
+	if err != nil {
+		return nil, err
+	}
+	eventsChanSize := v.(int)
+
+	v, err = confCopy.extract("go.produce.channel.size", 1000000)
+	if err != nil {
+		return nil, err
+	}
+	produceChannelSize := v.(int)
+
+	if int(C.rd_kafka_version()) < 0x01000000 {
+		// produce.offset.report is no longer used in librdkafka >= v1.0.0
+		v, _ = confCopy.extract("{topic}.produce.offset.report", nil)
+		if v == nil {
+			// Enable offset reporting by default, unless overriden.
+			confCopy.SetKey("{topic}.produce.offset.report", true)
+		}
+	}
+
+	// Convert ConfigMap to librdkafka conf_t
+	cConf, err := confCopy.convert()
+	if err != nil {
+		return nil, err
+	}
+
+	cErrstr := (*C.char)(C.malloc(C.size_t(256)))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
+
+	// Create librdkafka producer instance
+	p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
+	if p.handle.rk == nil {
+		return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+	}
+
+	p.handle.p = p
+	p.handle.setup()
+	p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk)
+	p.events = make(chan Event, eventsChanSize)
+	p.produceChannel = make(chan *Message, produceChannelSize)
+	p.pollerTermChan = make(chan bool)
+
+	go poller(p, p.pollerTermChan)
+
+	// non-batch or batch producer, only one must be used
+	if batchProducer {
+		go channelBatchProducer(p)
+	} else {
+		go channelProducer(p)
+	}
+
+	return p, nil
+}
+
+// channel_producer serves the ProduceChannel channel
+func channelProducer(p *Producer) {
+
+	for m := range p.produceChannel {
+		err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil)
+		if err != nil {
+			m.TopicPartition.Error = err
+			p.events <- m
+		}
+	}
+
+	p.handle.terminatedChan <- "channelProducer"
+}
+
+// channelBatchProducer serves the ProduceChannel channel and attempts to
+// improve cgo performance by using the produceBatch() interface.
+func channelBatchProducer(p *Producer) {
+	var buffered = make(map[string][]*Message)
+	bufferedCnt := 0
+	const batchSize int = 1000000
+	totMsgCnt := 0
+	totBatchCnt := 0
+
+	for m := range p.produceChannel {
+		buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
+		bufferedCnt++
+
+	loop2:
+		for true {
+			select {
+			case m, ok := <-p.produceChannel:
+				if !ok {
+					break loop2
+				}
+				if m == nil {
+					panic("nil message received on ProduceChannel")
+				}
+				if m.TopicPartition.Topic == nil {
+					panic(fmt.Sprintf("message without Topic received on ProduceChannel: %v", m))
+				}
+				buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
+				bufferedCnt++
+				if bufferedCnt >= batchSize {
+					break loop2
+				}
+			default:
+				break loop2
+			}
+		}
+
+		totBatchCnt++
+		totMsgCnt += len(buffered)
+
+		for topic, buffered2 := range buffered {
+			err := p.produceBatch(topic, buffered2, C.RD_KAFKA_MSG_F_BLOCK)
+			if err != nil {
+				for _, m = range buffered2 {
+					m.TopicPartition.Error = err
+					p.events <- m
+				}
+			}
+		}
+
+		buffered = make(map[string][]*Message)
+		bufferedCnt = 0
+	}
+	p.handle.terminatedChan <- "channelBatchProducer"
+}
+
+// poller polls the rd_kafka_t handle for events until signalled for termination
+func poller(p *Producer, termChan chan bool) {
+out:
+	for true {
+		select {
+		case _ = <-termChan:
+			break out
+
+		default:
+			_, term := p.handle.eventPoll(p.events, 100, 1000, termChan)
+			if term {
+				break out
+			}
+			break
+		}
+	}
+
+	p.handle.terminatedChan <- "poller"
+
+}
+
+// GetMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
+func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+	return getMetadata(p, topic, allTopics, timeoutMs)
+}
+
+// QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
+// and partition.
+func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
+	return queryWatermarkOffsets(p, topic, partition, timeoutMs)
+}
+
+// OffsetsForTimes looks up offsets by timestamp for the given partitions.
+//
+// The returned offset for each partition is the earliest offset whose
+// timestamp is greater than or equal to the given timestamp in the
+// corresponding partition.
+//
+// The timestamps to query are represented as `.Offset` in the `times`
+// argument and the looked up offsets are represented as `.Offset` in the returned
+// `offsets` list.
+//
+// The function will block for at most timeoutMs milliseconds.
+//
+// Duplicate Topic+Partitions are not supported.
+// Per-partition errors may be returned in the `.Error` field.
+func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+	return offsetsForTimes(p, times, timeoutMs)
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testconf-example.json b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testconf-example.json
new file mode 100644
index 0000000..7024a9c
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testconf-example.json
@@ -0,0 +1,8 @@
+{
+    "Brokers": "mybroker or $BROKERS env",
+    "Topic": "test",
+    "GroupID": "testgroup",
+    "PerfMsgCount": 1000000,
+    "PerfMsgSize": 100,
+    "Config": ["api.version.request=true"]
+}
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testhelpers.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testhelpers.go
new file mode 100644
index 0000000..22e8e1a
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/testhelpers.go
@@ -0,0 +1,179 @@
+package kafka
+
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+	"encoding/json"
+	"fmt"
+	"os"
+	"time"
+)
+
+/*
+#include <librdkafka/rdkafka.h>
+*/
+import "C"
+
+var testconf struct {
+	Brokers      string
+	Topic        string
+	GroupID      string
+	PerfMsgCount int
+	PerfMsgSize  int
+	Config       []string
+	conf         ConfigMap
+}
+
+// testconf_read reads the test suite config file testconf.json which must
+// contain at least Brokers and Topic string properties.
+// Returns true if the testconf was found and usable, false if no such file, or panics
+// if the file format is wrong.
+func testconfRead() bool {
+	cf, err := os.Open("testconf.json")
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "%% testconf.json not found - ignoring test\n")
+		return false
+	}
+
+	// Default values
+	testconf.PerfMsgCount = 2000000
+	testconf.PerfMsgSize = 100
+	testconf.GroupID = "testgroup"
+
+	jp := json.NewDecoder(cf)
+	err = jp.Decode(&testconf)
+	if err != nil {
+		panic(fmt.Sprintf("Failed to parse testconf: %s", err))
+	}
+
+	cf.Close()
+
+	if testconf.Brokers[0] == '$' {
+		// Read broker list from environment variable
+		testconf.Brokers = os.Getenv(testconf.Brokers[1:])
+	}
+
+	if testconf.Brokers == "" || testconf.Topic == "" {
+		panic("Missing Brokers or Topic in testconf.json")
+	}
+
+	return true
+}
+
+// update existing ConfigMap with key=value pairs from testconf.Config
+func (cm *ConfigMap) updateFromTestconf() error {
+	if testconf.Config == nil {
+		return nil
+	}
+
+	// Translate "key=value" pairs in Config to ConfigMap
+	for _, s := range testconf.Config {
+		err := cm.Set(s)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+
+}
+
+// Return the number of messages available in all partitions of a topic.
+// WARNING: This uses watermark offsets so it will be incorrect for compacted topics.
+func getMessageCountInTopic(topic string) (int, error) {
+
+	// Create consumer
+	config := &ConfigMap{"bootstrap.servers": testconf.Brokers,
+		"group.id": testconf.GroupID}
+	config.updateFromTestconf()
+
+	c, err := NewConsumer(config)
+	if err != nil {
+		return 0, err
+	}
+
+	// get metadata for the topic to find out number of partitions
+
+	metadata, err := c.GetMetadata(&topic, false, 5*1000)
+	if err != nil {
+		return 0, err
+	}
+
+	t, ok := metadata.Topics[topic]
+	if !ok {
+		return 0, newError(C.RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
+	}
+
+	cnt := 0
+	for _, p := range t.Partitions {
+		low, high, err := c.QueryWatermarkOffsets(topic, p.ID, 5*1000)
+		if err != nil {
+			continue
+		}
+		cnt += int(high - low)
+	}
+
+	return cnt, nil
+}
+
+// getBrokerList returns a list of brokers (ids) in the cluster
+func getBrokerList(H Handle) (brokers []int32, err error) {
+	md, err := getMetadata(H, nil, true, 15*1000)
+	if err != nil {
+		return nil, err
+	}
+
+	brokers = make([]int32, len(md.Brokers))
+	for i, mdBroker := range md.Brokers {
+		brokers[i] = mdBroker.ID
+	}
+
+	return brokers, nil
+}
+
+// waitTopicInMetadata waits for the given topic to show up in metadata
+func waitTopicInMetadata(H Handle, topic string, timeoutMs int) error {
+	d, _ := time.ParseDuration(fmt.Sprintf("%dms", timeoutMs))
+	tEnd := time.Now().Add(d)
+
+	for {
+		remain := tEnd.Sub(time.Now()).Seconds()
+		if remain < 0.0 {
+			return newErrorFromString(ErrTimedOut,
+				fmt.Sprintf("Timed out waiting for topic %s to appear in metadata", topic))
+		}
+
+		md, err := getMetadata(H, nil, true, int(remain*1000))
+		if err != nil {
+			return err
+		}
+
+		for _, t := range md.Topics {
+			if t.Topic != topic {
+				continue
+			}
+			if t.Error.Code() != ErrNoError || len(t.Partitions) < 1 {
+				continue
+			}
+			// Proper topic found in metadata
+			return nil
+		}
+
+		time.Sleep(500 * 1000) // 500ms
+	}
+
+}