blob: 05c8fed8c27e9df744b69d18d11ec70099e6481e [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta content="text/html; charset=utf-8" http-equiv="Content-Type">
<meta content="width=device-width, initial-scale=1" name="viewport">
<meta content="#375EAB" name="theme-color">
<title>
kafka - The Go Programming Language
</title>
<link href="http://golang.org/lib/godoc/style.css" rel="stylesheet" type="text/css">
<link href="http://golang.org/lib/godoc/jquery.treeview.css" rel="stylesheet">
<script type="text/javascript">
window.initFuncs = [];
</script>
</link>
</link>
</meta>
</meta>
</meta>
</head>
<body>
<div id="lowframe" style="position: fixed; bottom: 0; left: 0; height: 0; width: 100%; border-top: thin solid grey; background-color: white; overflow: auto;">
...
</div>
<!-- #lowframe -->
<div class="wide" id="page">
<div class="container">
<h1>
Package kafka
</h1>
<div id="nav">
</div>
<!--
Copyright 2009 The Go Authors. All rights reserved.
Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file.
-->
<!--
Note: Static (i.e., not template-generated) href and id
attributes start with "pkg-" to make it impossible for
them to conflict with generated attributes (some of which
correspond to Go identifiers).
-->
<script type="text/javascript">
document.ANALYSIS_DATA = null;
document.CALLGRAPH = null;
</script>
<div id="short-nav">
<dl>
<dd>
<code>
import "github.com/confluentinc/confluent-kafka-go/kafka"
</code>
</dd>
</dl>
<dl>
<dd>
<a class="overviewLink" href="#pkg-overview">
Overview
</a>
</dd>
<dd>
<a class="indexLink" href="#pkg-index">
Index
</a>
</dd>
<dd>
</dd>
</dl>
</div>
<!-- The package's Name is printed as title by the top-level template -->
<div class="toggleVisible" id="pkg-overview">
<div class="collapsed">
<h2 class="toggleButton" title="Click to show Overview section">
Overview ▹
</h2>
</div>
<div class="expanded">
<h2 class="toggleButton" title="Click to hide Overview section">
Overview ▾
</h2>
<p>
Package kafka provides high-level Apache Kafka producer and consumers
using bindings on-top of the librdkafka C library.
</p>
<h3 id="hdr-High_level_Consumer">
High-level Consumer
</h3>
<p>
* Decide if you want to read messages and events from the `.Events()` channel
(set `"go.events.channel.enable": true`) or by calling `.Poll()`.
</p>
<p>
* Create a Consumer with `kafka.NewConsumer()` providing at
least the `bootstrap.servers` and `group.id` configuration properties.
</p>
<p>
* Call `.Subscribe()` or (`.SubscribeTopics()` to subscribe to multiple topics)
to join the group with the specified subscription set.
Subscriptions are atomic, calling `.Subscribe*()` again will leave
the group and rejoin with the new set of topics.
</p>
<p>
* Start reading events and messages from either the `.Events` channel
or by calling `.Poll()`.
</p>
<p>
* When the group has rebalanced each client member is assigned a
(sub-)set of topic+partitions.
By default the consumer will start fetching messages for its assigned
partitions at this point, but your application may enable rebalance
events to get an insight into what the assigned partitions where
as well as set the initial offsets. To do this you need to pass
`"go.application.rebalance.enable": true` to the `NewConsumer()` call
mentioned above. You will (eventually) see a `kafka.AssignedPartitions` event
with the assigned partition set. You can optionally modify the initial
offsets (they'll default to stored offsets and if there are no previously stored
offsets it will fall back to `"default.topic.config": ConfigMap{"auto.offset.reset": ..}`
which defaults to the `latest` message) and then call `.Assign(partitions)`
to start consuming. If you don't need to modify the initial offsets you will
not need to call `.Assign()`, the client will do so automatically for you if
you dont.
</p>
<p>
* As messages are fetched they will be made available on either the
`.Events` channel or by calling `.Poll()`, look for event type `*kafka.Message`.
</p>
<p>
* Handle messages, events and errors to your liking.
</p>
<p>
* When you are done consuming call `.Close()` to commit final offsets
and leave the consumer group.
</p>
<h3 id="hdr-Producer">
Producer
</h3>
<p>
* Create a Producer with `kafka.NewProducer()` providing at least
the `bootstrap.servers` configuration properties.
</p>
<p>
* Messages may now be produced either by sending a `*kafka.Message`
on the `.ProduceChannel` or by calling `.Produce()`.
</p>
<p>
* Producing is an asynchronous operation so the client notifies the application
of per-message produce success or failure through something called delivery reports.
Delivery reports are by default emitted on the `.Events()` channel as `*kafka.Message`
and you should check `msg.TopicPartition.Error` for `nil` to find out if the message
was succesfully delivered or not.
It is also possible to direct delivery reports to alternate channels
by providing a non-nil `chan Event` channel to `.Produce()`.
If no delivery reports are wanted they can be completely disabled by
setting configuration property `"go.delivery.reports": false`.
</p>
<p>
* When you are done producing messages you will need to make sure all messages
are indeed delivered to the broker (or failed), remember that this is
an asynchronous client so some of your messages may be lingering in internal
channels or tranmission queues.
To do this you can either keep track of the messages you've produced
and wait for their corresponding delivery reports, or call the convenience
function `.Flush()` that will block until all message deliveries are done
or the provided timeout elapses.
</p>
<p>
* Finally call `.Close()` to decommission the producer.
</p>
<h3 id="hdr-Events">
Events
</h3>
<p>
Apart from emitting messages and delivery reports the client also communicates
with the application through a number of different event types.
An application may choose to handle or ignore these events.
</p>
<h3 id="hdr-Consumer_events">
Consumer events
</h3>
<p>
* `*kafka.Message` - a fetched message.
</p>
<p>
* `AssignedPartitions` - The assigned partition set for this client following a rebalance.
Requires `go.application.rebalance.enable`
</p>
<p>
* `RevokedPartitions` - The counter part to `AssignedPartitions` following a rebalance.
`AssignedPartitions` and `RevokedPartitions` are symetrical.
Requires `go.application.rebalance.enable`
</p>
<p>
* `PartitionEOF` - Consumer has reached the end of a partition.
NOTE: The consumer will keep trying to fetch new messages for the partition.
</p>
<p>
* `OffsetsCommitted` - Offset commit results (when `enable.auto.commit` is enabled).
</p>
<h3 id="hdr-Producer_events">
Producer events
</h3>
<p>
* `*kafka.Message` - delivery report for produced message.
Check `.TopicPartition.Error` for delivery result.
</p>
<h3 id="hdr-Generic_events_for_both_Consumer_and_Producer">
Generic events for both Consumer and Producer
</h3>
<p>
* `KafkaError` - client (error codes are prefixed with _) or broker error.
These errors are normally just informational since the
client will try its best to automatically recover (eventually).
</p>
<p>
Hint: If your application registers a signal notification
(signal.Notify) makes sure the signals channel is buffered to avoid
possible complications with blocking Poll() calls.
</p>
</div>
</div>
<div class="toggleVisible" id="pkg-index">
<div class="collapsed">
<h2 class="toggleButton" title="Click to show Index section">
Index ▹
</h2>
</div>
<div class="expanded">
<h2 class="toggleButton" title="Click to hide Index section">
Index ▾
</h2>
<!-- Table of contents for API; must be named manual-nav to turn off auto nav. -->
<div id="manual-nav">
<dl>
<dd>
<a href="#pkg-constants">
Constants
</a>
</dd>
<dd>
<a href="#LibraryVersion">
func LibraryVersion() (int, string)
</a>
</dd>
<dd>
<a href="#AssignedPartitions">
type AssignedPartitions
</a>
</dd>
<dd>
<a href="#AssignedPartitions.String">
func (e AssignedPartitions) String() string
</a>
</dd>
<dd>
<a href="#BrokerMetadata">
type BrokerMetadata
</a>
</dd>
<dd>
<a href="#ConfigMap">
type ConfigMap
</a>
</dd>
<dd>
<a href="#ConfigMap.Set">
func (m ConfigMap) Set(kv string) error
</a>
</dd>
<dd>
<a href="#ConfigMap.SetKey">
func (m ConfigMap) SetKey(key string, value ConfigValue) error
</a>
</dd>
<dd>
<a href="#ConfigValue">
type ConfigValue
</a>
</dd>
<dd>
<a href="#Consumer">
type Consumer
</a>
</dd>
<dd>
<a href="#NewConsumer">
func NewConsumer(conf *ConfigMap) (*Consumer, error)
</a>
</dd>
<dd>
<a href="#Consumer.Assign">
func (c *Consumer) Assign(partitions []TopicPartition) (err error)
</a>
</dd>
<dd>
<a href="#Consumer.Close">
func (c *Consumer) Close() (err error)
</a>
</dd>
<dd>
<a href="#Consumer.Commit">
func (c *Consumer) Commit() ([]TopicPartition, error)
</a>
</dd>
<dd>
<a href="#Consumer.CommitMessage">
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
</a>
</dd>
<dd>
<a href="#Consumer.CommitOffsets">
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
</a>
</dd>
<dd>
<a href="#Consumer.Events">
func (c *Consumer) Events() chan Event
</a>
</dd>
<dd>
<a href="#Consumer.GetMetadata">
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
</a>
</dd>
<dd>
<a href="#Consumer.Poll">
func (c *Consumer) Poll(timeoutMs int) (event Event)
</a>
</dd>
<dd>
<a href="#Consumer.QueryWatermarkOffsets">
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
</a>
</dd>
<dd>
<a href="#Consumer.String">
func (c *Consumer) String() string
</a>
</dd>
<dd>
<a href="#Consumer.Subscribe">
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
</a>
</dd>
<dd>
<a href="#Consumer.SubscribeTopics">
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
</a>
</dd>
<dd>
<a href="#Consumer.Unassign">
func (c *Consumer) Unassign() (err error)
</a>
</dd>
<dd>
<a href="#Consumer.Unsubscribe">
func (c *Consumer) Unsubscribe() (err error)
</a>
</dd>
<dd>
<a href="#Error">
type Error
</a>
</dd>
<dd>
<a href="#Error.Code">
func (e Error) Code() ErrorCode
</a>
</dd>
<dd>
<a href="#Error.Error">
func (e Error) Error() string
</a>
</dd>
<dd>
<a href="#Error.String">
func (e Error) String() string
</a>
</dd>
<dd>
<a href="#ErrorCode">
type ErrorCode
</a>
</dd>
<dd>
<a href="#ErrorCode.String">
func (c ErrorCode) String() string
</a>
</dd>
<dd>
<a href="#Event">
type Event
</a>
</dd>
<dd>
<a href="#Handle">
type Handle
</a>
</dd>
<dd>
<a href="#Message">
type Message
</a>
</dd>
<dd>
<a href="#Message.String">
func (m *Message) String() string
</a>
</dd>
<dd>
<a href="#Metadata">
type Metadata
</a>
</dd>
<dd>
<a href="#Offset">
type Offset
</a>
</dd>
<dd>
<a href="#NewOffset">
func NewOffset(offset interface{}) (Offset, error)
</a>
</dd>
<dd>
<a href="#OffsetTail">
func OffsetTail(relativeOffset Offset) Offset
</a>
</dd>
<dd>
<a href="#Offset.Set">
func (o Offset) Set(offset interface{}) error
</a>
</dd>
<dd>
<a href="#Offset.String">
func (o Offset) String() string
</a>
</dd>
<dd>
<a href="#OffsetsCommitted">
type OffsetsCommitted
</a>
</dd>
<dd>
<a href="#OffsetsCommitted.String">
func (o OffsetsCommitted) String() string
</a>
</dd>
<dd>
<a href="#PartitionEOF">
type PartitionEOF
</a>
</dd>
<dd>
<a href="#PartitionEOF.String">
func (p PartitionEOF) String() string
</a>
</dd>
<dd>
<a href="#PartitionMetadata">
type PartitionMetadata
</a>
</dd>
<dd>
<a href="#Producer">
type Producer
</a>
</dd>
<dd>
<a href="#NewProducer">
func NewProducer(conf *ConfigMap) (*Producer, error)
</a>
</dd>
<dd>
<a href="#Producer.Close">
func (p *Producer) Close()
</a>
</dd>
<dd>
<a href="#Producer.Events">
func (p *Producer) Events() chan Event
</a>
</dd>
<dd>
<a href="#Producer.Flush">
func (p *Producer) Flush(timeoutMs int) int
</a>
</dd>
<dd>
<a href="#Producer.GetMetadata">
func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
</a>
</dd>
<dd>
<a href="#Producer.Len">
func (p *Producer) Len() int
</a>
</dd>
<dd>
<a href="#Producer.Produce">
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error
</a>
</dd>
<dd>
<a href="#Producer.ProduceChannel">
func (p *Producer) ProduceChannel() chan *Message
</a>
</dd>
<dd>
<a href="#Producer.QueryWatermarkOffsets">
func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
</a>
</dd>
<dd>
<a href="#Producer.String">
func (p *Producer) String() string
</a>
</dd>
<dd>
<a href="#RebalanceCb">
type RebalanceCb
</a>
</dd>
<dd>
<a href="#RevokedPartitions">
type RevokedPartitions
</a>
</dd>
<dd>
<a href="#RevokedPartitions.String">
func (e RevokedPartitions) String() string
</a>
</dd>
<dd>
<a href="#TimestampType">
type TimestampType
</a>
</dd>
<dd>
<a href="#TimestampType.String">
func (t TimestampType) String() string
</a>
</dd>
<dd>
<a href="#TopicMetadata">
type TopicMetadata
</a>
</dd>
<dd>
<a href="#TopicPartition">
type TopicPartition
</a>
</dd>
<dd>
<a href="#TopicPartition.String">
func (p TopicPartition) String() string
</a>
</dd>
</dl>
</div>
<!-- #manual-nav -->
<h4>
Package files
</h4>
<p>
<span style="font-size:90%">
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/build_dynamic.go">
build_dynamic.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go">
config.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go">
consumer.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go">
error.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go">
event.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go">
generated_errors.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/handle.go">
handle.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go">
kafka.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go">
message.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go">
metadata.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/misc.go">
misc.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go">
producer.go
</a>
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/testhelpers.go">
testhelpers.go
</a>
</span>
</p>
</div>
<!-- .expanded -->
</div>
<!-- #pkg-index -->
<div class="toggle" id="pkg-callgraph" style="display: none">
<div class="collapsed">
<h2 class="toggleButton" title="Click to show Internal Call Graph section">
Internal call graph ▹
</h2>
</div>
<!-- .expanded -->
<div class="expanded">
<h2 class="toggleButton" title="Click to hide Internal Call Graph section">
Internal call graph ▾
</h2>
<p>
In the call graph viewer below, each node
is a function belonging to this package
and its children are the functions it
calls—perhaps dynamically.
</p>
<p>
The root nodes are the entry points of the
package: functions that may be called from
outside the package.
There may be non-exported or anonymous
functions among them if they are called
dynamically from another package.
</p>
<p>
Click a node to visit that function's source code.
From there you can visit its callers by
clicking its declaring
<code>
func
</code>
token.
</p>
<p>
Functions may be omitted if they were
determined to be unreachable in the
particular programs or tests that were
analyzed.
</p>
<!-- Zero means show all package entry points. -->
<ul class="treeview" id="callgraph-0" style="margin-left: 0.5in">
</ul>
</div>
</div>
<!-- #pkg-callgraph -->
<h2 id="pkg-constants">
Constants
</h2>
<pre>const (
<span class="comment">// TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support</span>
<span id="TimestampNotAvailable">TimestampNotAvailable</span> = <a href="#TimestampType">TimestampType</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_TIMESTAMP_NOT_AVAILABLE">RD_KAFKA_TIMESTAMP_NOT_AVAILABLE</a>)
<span class="comment">// TimestampCreateTime indicates timestamp set by producer (source time)</span>
<span id="TimestampCreateTime">TimestampCreateTime</span> = <a href="#TimestampType">TimestampType</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_TIMESTAMP_CREATE_TIME">RD_KAFKA_TIMESTAMP_CREATE_TIME</a>)
<span class="comment">// TimestampLogAppendTime indicates timestamp set set by broker (store time)</span>
<span id="TimestampLogAppendTime">TimestampLogAppendTime</span> = <a href="#TimestampType">TimestampType</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME">RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME</a>)
)</pre>
<pre>const <span id="OffsetBeginning">OffsetBeginning</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_BEGINNING">RD_KAFKA_OFFSET_BEGINNING</a>)</pre>
<p>
Earliest offset (logical)
</p>
<pre>const <span id="OffsetEnd">OffsetEnd</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_END">RD_KAFKA_OFFSET_END</a>)</pre>
<p>
Latest offset (logical)
</p>
<pre>const <span id="OffsetInvalid">OffsetInvalid</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_INVALID">RD_KAFKA_OFFSET_INVALID</a>)</pre>
<p>
Invalid/unspecified offset
</p>
<pre>const <span id="OffsetStored">OffsetStored</span> = <a href="#Offset">Offset</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_OFFSET_STORED">RD_KAFKA_OFFSET_STORED</a>)</pre>
<p>
Use stored offset
</p>
<pre>const <span id="PartitionAny">PartitionAny</span> = <a href="http://golang.org/pkg/builtin/#int32">int32</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_PARTITION_UA">RD_KAFKA_PARTITION_UA</a>)</pre>
<p>
Any partition (for partitioning), or unspecified value (for all other cases)
</p>
<h2 id="LibraryVersion">
func
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=10095:10130#L292">
LibraryVersion
</a>
</h2>
<pre>func LibraryVersion() (<a href="http://golang.org/pkg/builtin/#int">int</a>, <a href="http://golang.org/pkg/builtin/#string">string</a>)</pre>
<p>
LibraryVersion returns the underlying librdkafka library version as a
(version_int, version_str) tuple.
</p>
<h2 id="AssignedPartitions">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1621:1684#L49">
AssignedPartitions
</a>
</h2>
<pre>type AssignedPartitions struct {
Partitions []<a href="#TopicPartition">TopicPartition</a>
}</pre>
<p>
AssignedPartitions consumer group rebalance event: assigned partition set
</p>
<h3 id="AssignedPartitions.String">
func (AssignedPartitions)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1686:1729#L53">
String
</a>
</h3>
<pre>func (e <a href="#AssignedPartitions">AssignedPartitions</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<h2 id="BrokerMetadata">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1266:1331#L37">
BrokerMetadata
</a>
</h2>
<pre>type BrokerMetadata struct {
ID <a href="http://golang.org/pkg/builtin/#int32">int32</a>
Host <a href="http://golang.org/pkg/builtin/#string">string</a>
Port <a href="http://golang.org/pkg/builtin/#int">int</a>
}</pre>
<p>
BrokerMetadata contains per-broker metadata
</p>
<h2 id="ConfigMap">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=1172:1209#L31">
ConfigMap
</a>
</h2>
<pre>type ConfigMap map[<a href="http://golang.org/pkg/builtin/#string">string</a>]<a href="#ConfigValue">ConfigValue</a></pre>
<p>
ConfigMap is a map contaning standard librdkafka configuration properties as documented in:
<a href="https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md">
https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md
</a>
</p>
<p>
The special property "default.topic.config" (optional) is a ConfigMap containing default topic
configuration properties.
</p>
<h3 id="ConfigMap.Set">
func (ConfigMap)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=1813:1852#L52">
Set
</a>
</h3>
<pre>func (m <a href="#ConfigMap">ConfigMap</a>) Set(kv <a href="http://golang.org/pkg/builtin/#string">string</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
<p>
Set implements flag.Set (command line argument parser) as a convenience
for `-X key=value` config.
</p>
<h3 id="ConfigMap.SetKey">
func (ConfigMap)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=1370:1432#L36">
SetKey
</a>
</h3>
<pre>func (m <a href="#ConfigMap">ConfigMap</a>) SetKey(key <a href="http://golang.org/pkg/builtin/#string">string</a>, value <a href="#ConfigValue">ConfigValue</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
<p>
SetKey sets configuration property key to value.
For user convenience a key prefixed with {topic}. will be
set on the "default.topic.config" sub-map.
</p>
<h2 id="ConfigValue">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/config.go?s=846:874#L24">
ConfigValue
</a>
</h2>
<pre>type ConfigValue interface{}</pre>
<p>
ConfigValue supports the following types:
</p>
<pre>bool, int, string, any type with the standard String() interface
</pre>
<h2 id="Consumer">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=968:1205#L25">
Consumer
</a>
</h2>
<pre>type Consumer struct {
<span class="comment">// contains filtered or unexported fields</span>
}</pre>
<p>
Consumer implements a High-level Apache Kafka Consumer instance
</p>
<h3 id="NewConsumer">
func
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=7696:7748#L242">
NewConsumer
</a>
</h3>
<pre>func NewConsumer(conf *<a href="#ConfigMap">ConfigMap</a>) (*<a href="#Consumer">Consumer</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
NewConsumer creates a new high-level Consumer instance.
</p>
<p>
Supported special configuration properties:
</p>
<pre>go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
If set to true the app must handle the AssignedPartitions and
RevokedPartitions events and call Assign() and Unassign()
respectively.
go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
go.events.channel.size (int, 1000) - Events() channel size
</pre>
<p>
WARNING: Due to the buffering nature of channels (and queues in general) the
use of the events channel risks receiving outdated events and
messages. Minimizing go.events.channel.size reduces the risk
and number of outdated events and messages but does not eliminate
the factor completely. With a channel size of 1 at most one
event or message may be outdated.
</p>
<h3 id="Consumer.Assign">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=2641:2707#L82">
Assign
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) Assign(partitions []<a href="#TopicPartition">TopicPartition</a>) (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
Assign an atomic set of partitions to consume.
This replaces the current assignment.
</p>
<h3 id="Consumer.Close">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=6121:6159#L202">
Close
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) Close() (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
Close Consumer instance.
The object is no longer usable after this call.
</p>
<h3 id="Consumer.Commit">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=4853:4906#L159">
Commit
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) Commit() ([]<a href="#TopicPartition">TopicPartition</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
Commit offsets for currently assigned partitions
This is a blocking call.
Returns the committed offsets on success.
</p>
<h3 id="Consumer.CommitMessage">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5070:5140#L166">
CommitMessage
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) CommitMessage(m *<a href="#Message">Message</a>) ([]<a href="#TopicPartition">TopicPartition</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
CommitMessage commits offset based on the provided message.
This is a blocking call.
Returns the committed offsets on success.
</p>
<h3 id="Consumer.CommitOffsets">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5473:5557#L178">
CommitOffsets
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) CommitOffsets(offsets []<a href="#TopicPartition">TopicPartition</a>) ([]<a href="#TopicPartition">TopicPartition</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
CommitOffsets commits the provided list of offsets
This is a blocking call.
Returns the committed offsets on success.
</p>
<h3 id="Consumer.Events">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5981:6019#L196">
Events
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) Events() chan <a href="#Event">Event</a></pre>
<p>
Events returns the Events channel (if enabled)
</p>
<h3 id="Consumer.GetMetadata">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=10490:10585#L347">
GetMetadata
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) GetMetadata(topic *<a href="http://golang.org/pkg/builtin/#string">string</a>, allTopics <a href="http://golang.org/pkg/builtin/#bool">bool</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (*<a href="#Metadata">Metadata</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
GetMetadata queries broker for cluster and topic metadata.
If topic is non-nil only information about that topic is returned, else if
allTopics is false only information about locally used topics is returned,
else information about all topics is returned.
</p>
<h3 id="Consumer.Poll">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=5809:5861#L190">
Poll
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) Poll(timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (event <a href="#Event">Event</a>)</pre>
<p>
Poll the consumer for messages or events.
</p>
<h3 id="hdr-Will_block_for_at_most_timeoutMs_milliseconds">
Will block for at most timeoutMs milliseconds
</h3>
<p>
The following callbacks may be triggered:
</p>
<pre>Subscribe()'s rebalanceCb
</pre>
<p>
Returns nil on timeout, else an Event
</p>
<h3 id="Consumer.QueryWatermarkOffsets">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=10748:10863#L353">
QueryWatermarkOffsets
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) QueryWatermarkOffsets(topic <a href="http://golang.org/pkg/builtin/#string">string</a>, partition <a href="http://golang.org/pkg/builtin/#int32">int32</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (low, high <a href="http://golang.org/pkg/builtin/#int64">int64</a>, err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
and partition.
</p>
<h3 id="Consumer.String">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=1272:1306#L36">
String
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<p>
Strings returns a human readable name for a Consumer instance
</p>
<h3 id="Consumer.Subscribe">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=1518:1591#L47">
Subscribe
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) Subscribe(topic <a href="http://golang.org/pkg/builtin/#string">string</a>, rebalanceCb <a href="#RebalanceCb">RebalanceCb</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
<p>
Subscribe to a single topic
This replaces the current subscription
</p>
<h3 id="Consumer.SubscribeTopics">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=1758:1846#L53">
SubscribeTopics
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) SubscribeTopics(topics []<a href="http://golang.org/pkg/builtin/#string">string</a>, rebalanceCb <a href="#RebalanceCb">RebalanceCb</a>) (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
SubscribeTopics subscribes to the provided list of topics.
This replaces the current subscription.
</p>
<h3 id="Consumer.Unassign">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=3022:3063#L97">
Unassign
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) Unassign() (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
Unassign the current set of partitions to consume.
</p>
<h3 id="Consumer.Unsubscribe">
func (*Consumer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=2451:2495#L75">
Unsubscribe
</a>
</h3>
<pre>func (c *<a href="#Consumer">Consumer</a>) Unsubscribe() (err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
Unsubscribe from the current subscription, if any.
</p>
<h2 id="Error">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=862:912#L19">
Error
</a>
</h2>
<pre>type Error struct {
<span class="comment">// contains filtered or unexported fields</span>
}</pre>
<p>
Error provides a Kafka-specific error container
</p>
<h3 id="Error.Code">
func (Error)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=1649:1680#L57">
Code
</a>
</h3>
<pre>func (e <a href="#Error">Error</a>) Code() <a href="#ErrorCode">ErrorCode</a></pre>
<p>
Code returns the ErrorCode of an Error
</p>
<h3 id="Error.Error">
func (Error)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=1392:1421#L44">
Error
</a>
</h3>
<pre>func (e <a href="#Error">Error</a>) Error() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<p>
Error returns a human readable representation of an Error
Same as Error.String()
</p>
<h3 id="Error.String">
func (Error)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/error.go?s=1508:1538#L49">
String
</a>
</h3>
<pre>func (e <a href="#Error">Error</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<p>
String returns a human readable representation of an Error
</p>
<h2 id="ErrorCode">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go?s=328:346#L1">
ErrorCode
</a>
</h2>
<pre>type ErrorCode <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
<p>
ErrorCode is the integer representation of local and broker error codes
</p>
<pre>const (
<span class="comment">// ErrBadMsg Local: Bad message format</span>
<span id="ErrBadMsg">ErrBadMsg</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__BAD_MSG">RD_KAFKA_RESP_ERR__BAD_MSG</a>)
<span class="comment">// ErrBadCompression Local: Invalid compressed data</span>
<span id="ErrBadCompression">ErrBadCompression</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__BAD_COMPRESSION">RD_KAFKA_RESP_ERR__BAD_COMPRESSION</a>)
<span class="comment">// ErrDestroy Local: Broker handle destroyed</span>
<span id="ErrDestroy">ErrDestroy</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__DESTROY">RD_KAFKA_RESP_ERR__DESTROY</a>)
<span class="comment">// ErrFail Local: Communication failure with broker</span>
<span id="ErrFail">ErrFail</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__FAIL">RD_KAFKA_RESP_ERR__FAIL</a>)
<span class="comment">// ErrTransport Local: Broker transport failure</span>
<span id="ErrTransport">ErrTransport</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__TRANSPORT">RD_KAFKA_RESP_ERR__TRANSPORT</a>)
<span class="comment">// ErrCritSysResource Local: Critical system resource failure</span>
<span id="ErrCritSysResource">ErrCritSysResource</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE">RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE</a>)
<span class="comment">// ErrResolve Local: Host resolution failure</span>
<span id="ErrResolve">ErrResolve</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__RESOLVE">RD_KAFKA_RESP_ERR__RESOLVE</a>)
<span class="comment">// ErrMsgTimedOut Local: Message timed out</span>
<span id="ErrMsgTimedOut">ErrMsgTimedOut</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__MSG_TIMED_OUT">RD_KAFKA_RESP_ERR__MSG_TIMED_OUT</a>)
<span class="comment">// ErrPartitionEOF Broker: No more messages</span>
<span id="ErrPartitionEOF">ErrPartitionEOF</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__PARTITION_EOF">RD_KAFKA_RESP_ERR__PARTITION_EOF</a>)
<span class="comment">// ErrUnknownPartition Local: Unknown partition</span>
<span id="ErrUnknownPartition">ErrUnknownPartition</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION">RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION</a>)
<span class="comment">// ErrFs Local: File or filesystem error</span>
<span id="ErrFs">ErrFs</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__FS">RD_KAFKA_RESP_ERR__FS</a>)
<span class="comment">// ErrUnknownTopic Local: Unknown topic</span>
<span id="ErrUnknownTopic">ErrUnknownTopic</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC">RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC</a>)
<span class="comment">// ErrAllBrokersDown Local: All broker connections are down</span>
<span id="ErrAllBrokersDown">ErrAllBrokersDown</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN">RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN</a>)
<span class="comment">// ErrInvalidArg Local: Invalid argument or configuration</span>
<span id="ErrInvalidArg">ErrInvalidArg</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__INVALID_ARG">RD_KAFKA_RESP_ERR__INVALID_ARG</a>)
<span class="comment">// ErrTimedOut Local: Timed out</span>
<span id="ErrTimedOut">ErrTimedOut</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__TIMED_OUT">RD_KAFKA_RESP_ERR__TIMED_OUT</a>)
<span class="comment">// ErrQueueFull Local: Queue full</span>
<span id="ErrQueueFull">ErrQueueFull</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__QUEUE_FULL">RD_KAFKA_RESP_ERR__QUEUE_FULL</a>)
<span class="comment">// ErrIsrInsuff Local: ISR count insufficient</span>
<span id="ErrIsrInsuff">ErrIsrInsuff</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__ISR_INSUFF">RD_KAFKA_RESP_ERR__ISR_INSUFF</a>)
<span class="comment">// ErrNodeUpdate Local: Broker node update</span>
<span id="ErrNodeUpdate">ErrNodeUpdate</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__NODE_UPDATE">RD_KAFKA_RESP_ERR__NODE_UPDATE</a>)
<span class="comment">// ErrSsl Local: SSL error</span>
<span id="ErrSsl">ErrSsl</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__SSL">RD_KAFKA_RESP_ERR__SSL</a>)
<span class="comment">// ErrWaitCoord Local: Waiting for coordinator</span>
<span id="ErrWaitCoord">ErrWaitCoord</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__WAIT_COORD">RD_KAFKA_RESP_ERR__WAIT_COORD</a>)
<span class="comment">// ErrUnknownGroup Local: Unknown group</span>
<span id="ErrUnknownGroup">ErrUnknownGroup</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_GROUP">RD_KAFKA_RESP_ERR__UNKNOWN_GROUP</a>)
<span class="comment">// ErrInProgress Local: Operation in progress</span>
<span id="ErrInProgress">ErrInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__IN_PROGRESS">RD_KAFKA_RESP_ERR__IN_PROGRESS</a>)
<span class="comment">// ErrPrevInProgress Local: Previous operation in progress</span>
<span id="ErrPrevInProgress">ErrPrevInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS">RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS</a>)
<span class="comment">// ErrExistingSubscription Local: Existing subscription</span>
<span id="ErrExistingSubscription">ErrExistingSubscription</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION">RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION</a>)
<span class="comment">// ErrAssignPartitions Local: Assign partitions</span>
<span id="ErrAssignPartitions">ErrAssignPartitions</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS">RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS</a>)
<span class="comment">// ErrRevokePartitions Local: Revoke partitions</span>
<span id="ErrRevokePartitions">ErrRevokePartitions</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS">RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS</a>)
<span class="comment">// ErrConflict Local: Conflicting use</span>
<span id="ErrConflict">ErrConflict</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__CONFLICT">RD_KAFKA_RESP_ERR__CONFLICT</a>)
<span class="comment">// ErrState Local: Erroneous state</span>
<span id="ErrState">ErrState</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__STATE">RD_KAFKA_RESP_ERR__STATE</a>)
<span class="comment">// ErrUnknownProtocol Local: Unknown protocol</span>
<span id="ErrUnknownProtocol">ErrUnknownProtocol</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL">RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL</a>)
<span class="comment">// ErrNotImplemented Local: Not implemented</span>
<span id="ErrNotImplemented">ErrNotImplemented</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED">RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED</a>)
<span class="comment">// ErrAuthentication Local: Authentication failure</span>
<span id="ErrAuthentication">ErrAuthentication</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__AUTHENTICATION">RD_KAFKA_RESP_ERR__AUTHENTICATION</a>)
<span class="comment">// ErrNoOffset Local: No offset stored</span>
<span id="ErrNoOffset">ErrNoOffset</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__NO_OFFSET">RD_KAFKA_RESP_ERR__NO_OFFSET</a>)
<span class="comment">// ErrOutdated Local: Outdated</span>
<span id="ErrOutdated">ErrOutdated</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__OUTDATED">RD_KAFKA_RESP_ERR__OUTDATED</a>)
<span class="comment">// ErrTimedOutQueue Local: Timed out in queue</span>
<span id="ErrTimedOutQueue">ErrTimedOutQueue</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE">RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE</a>)
<span class="comment">// ErrUnknown Unknown broker error</span>
<span id="ErrUnknown">ErrUnknown</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNKNOWN">RD_KAFKA_RESP_ERR_UNKNOWN</a>)
<span class="comment">// ErrNoError Success</span>
<span id="ErrNoError">ErrNoError</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NO_ERROR">RD_KAFKA_RESP_ERR_NO_ERROR</a>)
<span class="comment">// ErrOffsetOutOfRange Broker: Offset out of range</span>
<span id="ErrOffsetOutOfRange">ErrOffsetOutOfRange</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE">RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE</a>)
<span class="comment">// ErrInvalidMsg Broker: Invalid message</span>
<span id="ErrInvalidMsg">ErrInvalidMsg</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_MSG">RD_KAFKA_RESP_ERR_INVALID_MSG</a>)
<span class="comment">// ErrUnknownTopicOrPart Broker: Unknown topic or partition</span>
<span id="ErrUnknownTopicOrPart">ErrUnknownTopicOrPart</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART">RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART</a>)
<span class="comment">// ErrInvalidMsgSize Broker: Invalid message size</span>
<span id="ErrInvalidMsgSize">ErrInvalidMsgSize</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE">RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE</a>)
<span class="comment">// ErrLeaderNotAvailable Broker: Leader not available</span>
<span id="ErrLeaderNotAvailable">ErrLeaderNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE</a>)
<span class="comment">// ErrNotLeaderForPartition Broker: Not leader for partition</span>
<span id="ErrNotLeaderForPartition">ErrNotLeaderForPartition</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION">RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION</a>)
<span class="comment">// ErrRequestTimedOut Broker: Request timed out</span>
<span id="ErrRequestTimedOut">ErrRequestTimedOut</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT">RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT</a>)
<span class="comment">// ErrBrokerNotAvailable Broker: Broker not available</span>
<span id="ErrBrokerNotAvailable">ErrBrokerNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE</a>)
<span class="comment">// ErrReplicaNotAvailable Broker: Replica not available</span>
<span id="ErrReplicaNotAvailable">ErrReplicaNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE</a>)
<span class="comment">// ErrMsgSizeTooLarge Broker: Message size too large</span>
<span id="ErrMsgSizeTooLarge">ErrMsgSizeTooLarge</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE">RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE</a>)
<span class="comment">// ErrStaleCtrlEpoch Broker: StaleControllerEpochCode</span>
<span id="ErrStaleCtrlEpoch">ErrStaleCtrlEpoch</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH">RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH</a>)
<span class="comment">// ErrOffsetMetadataTooLarge Broker: Offset metadata string too large</span>
<span id="ErrOffsetMetadataTooLarge">ErrOffsetMetadataTooLarge</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE">RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE</a>)
<span class="comment">// ErrNetworkException Broker: Broker disconnected before response received</span>
<span id="ErrNetworkException">ErrNetworkException</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION">RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION</a>)
<span class="comment">// ErrGroupLoadInProgress Broker: Group coordinator load in progress</span>
<span id="ErrGroupLoadInProgress">ErrGroupLoadInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS">RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS</a>)
<span class="comment">// ErrGroupCoordinatorNotAvailable Broker: Group coordinator not available</span>
<span id="ErrGroupCoordinatorNotAvailable">ErrGroupCoordinatorNotAvailable</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE">RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE</a>)
<span class="comment">// ErrNotCoordinatorForGroup Broker: Not coordinator for group</span>
<span id="ErrNotCoordinatorForGroup">ErrNotCoordinatorForGroup</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP">RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP</a>)
<span class="comment">// ErrTopicException Broker: Invalid topic</span>
<span id="ErrTopicException">ErrTopicException</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION">RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION</a>)
<span class="comment">// ErrRecordListTooLarge Broker: Message batch larger than configured server segment size</span>
<span id="ErrRecordListTooLarge">ErrRecordListTooLarge</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE">RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE</a>)
<span class="comment">// ErrNotEnoughReplicas Broker: Not enough in-sync replicas</span>
<span id="ErrNotEnoughReplicas">ErrNotEnoughReplicas</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS">RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS</a>)
<span class="comment">// ErrNotEnoughReplicasAfterAppend Broker: Message(s) written to insufficient number of in-sync replicas</span>
<span id="ErrNotEnoughReplicasAfterAppend">ErrNotEnoughReplicasAfterAppend</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND">RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND</a>)
<span class="comment">// ErrInvalidRequiredAcks Broker: Invalid required acks value</span>
<span id="ErrInvalidRequiredAcks">ErrInvalidRequiredAcks</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS">RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS</a>)
<span class="comment">// ErrIllegalGeneration Broker: Specified group generation id is not valid</span>
<span id="ErrIllegalGeneration">ErrIllegalGeneration</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION">RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION</a>)
<span class="comment">// ErrInconsistentGroupProtocol Broker: Inconsistent group protocol</span>
<span id="ErrInconsistentGroupProtocol">ErrInconsistentGroupProtocol</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL">RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL</a>)
<span class="comment">// ErrInvalidGroupID Broker: Invalid group.id</span>
<span id="ErrInvalidGroupID">ErrInvalidGroupID</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_GROUP_ID">RD_KAFKA_RESP_ERR_INVALID_GROUP_ID</a>)
<span class="comment">// ErrUnknownMemberID Broker: Unknown member</span>
<span id="ErrUnknownMemberID">ErrUnknownMemberID</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID">RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID</a>)
<span class="comment">// ErrInvalidSessionTimeout Broker: Invalid session timeout</span>
<span id="ErrInvalidSessionTimeout">ErrInvalidSessionTimeout</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT">RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT</a>)
<span class="comment">// ErrRebalanceInProgress Broker: Group rebalance in progress</span>
<span id="ErrRebalanceInProgress">ErrRebalanceInProgress</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS">RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS</a>)
<span class="comment">// ErrInvalidCommitOffsetSize Broker: Commit offset data size is not valid</span>
<span id="ErrInvalidCommitOffsetSize">ErrInvalidCommitOffsetSize</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE">RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE</a>)
<span class="comment">// ErrTopicAuthorizationFailed Broker: Topic authorization failed</span>
<span id="ErrTopicAuthorizationFailed">ErrTopicAuthorizationFailed</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED">RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED</a>)
<span class="comment">// ErrGroupAuthorizationFailed Broker: Group authorization failed</span>
<span id="ErrGroupAuthorizationFailed">ErrGroupAuthorizationFailed</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED">RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED</a>)
<span class="comment">// ErrClusterAuthorizationFailed Broker: Cluster authorization failed</span>
<span id="ErrClusterAuthorizationFailed">ErrClusterAuthorizationFailed</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED">RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED</a>)
<span class="comment">// ErrInvalidTimestamp Broker: Invalid timestamp</span>
<span id="ErrInvalidTimestamp">ErrInvalidTimestamp</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP">RD_KAFKA_RESP_ERR_INVALID_TIMESTAMP</a>)
<span class="comment">// ErrUnsupportedSaslMechanism Broker: Unsupported SASL mechanism</span>
<span id="ErrUnsupportedSaslMechanism">ErrUnsupportedSaslMechanism</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM">RD_KAFKA_RESP_ERR_UNSUPPORTED_SASL_MECHANISM</a>)
<span class="comment">// ErrIllegalSaslState Broker: Request not valid in current SASL state</span>
<span id="ErrIllegalSaslState">ErrIllegalSaslState</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE">RD_KAFKA_RESP_ERR_ILLEGAL_SASL_STATE</a>)
<span class="comment">// ErrUnsupportedVersion Broker: API version not supported</span>
<span id="ErrUnsupportedVersion">ErrUnsupportedVersion</span> <a href="#ErrorCode">ErrorCode</a> = <a href="#ErrorCode">ErrorCode</a>(<a href="http://golang.org/pkg/C/">C</a>.<a href="http://golang.org/pkg/C/#RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION">RD_KAFKA_RESP_ERR_UNSUPPORTED_VERSION</a>)
)</pre>
<h3 id="ErrorCode.String">
func (ErrorCode)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/generated_errors.go?s=415:449#L4">
String
</a>
</h3>
<pre>func (c <a href="#ErrorCode">ErrorCode</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<p>
String returns a human readable representation of an error code
</p>
<h2 id="Event">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1412:1517#L41">
Event
</a>
</h2>
<pre>type Event interface {
<span class="comment">// String returns a human-readable representation of the event</span>
String() <a href="http://golang.org/pkg/builtin/#string">string</a>
}</pre>
<p>
Event generic interface
</p>
<h2 id="Handle">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/handle.go?s=822:868#L23">
Handle
</a>
</h2>
<pre>type Handle interface {
<span class="comment">// contains filtered or unexported methods</span>
}</pre>
<p>
Handle represents a generic client handle containing common parts for
both Producer and Consumer.
</p>
<h2 id="Message">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=2465:2649#L76">
Message
</a>
</h2>
<pre>type Message struct {
TopicPartition <a href="#TopicPartition">TopicPartition</a>
Value []<a href="http://golang.org/pkg/builtin/#byte">byte</a>
Key []<a href="http://golang.org/pkg/builtin/#byte">byte</a>
Timestamp <a href="http://golang.org/pkg/time/">time</a>.<a href="http://golang.org/pkg/time/#Time">Time</a>
TimestampType <a href="#TimestampType">TimestampType</a>
Opaque interface{}
}</pre>
<p>
Message represents a Kafka message
</p>
<h3 id="Message.String">
func (*Message)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=2755:2788#L87">
String
</a>
</h3>
<pre>func (m *<a href="#Message">Message</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<p>
String returns a human readable representation of a Message.
Key and payload are not represented.
</p>
<h2 id="Metadata">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1723:1842#L60">
Metadata
</a>
</h2>
<pre>type Metadata struct {
Brokers []<a href="#BrokerMetadata">BrokerMetadata</a>
Topics map[<a href="http://golang.org/pkg/builtin/#string">string</a>]<a href="#TopicMetadata">TopicMetadata</a>
OriginatingBroker <a href="#BrokerMetadata">BrokerMetadata</a>
}</pre>
<p>
Metadata contains broker and topic metadata for all (matching) topics
</p>
<h2 id="Offset">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=6428:6445#L149">
Offset
</a>
</h2>
<pre>type Offset <a href="http://golang.org/pkg/builtin/#int64">int64</a></pre>
<p>
Offset type (int64) with support for canonical names
</p>
<h3 id="NewOffset">
func
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=7384:7434#L192">
NewOffset
</a>
</h3>
<pre>func NewOffset(offset interface{}) (<a href="#Offset">Offset</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
NewOffset creates a new Offset using the provided logical string, or an
absolute int64 offset value.
Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
</p>
<h3 id="OffsetTail">
func
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=8170:8215#L231">
OffsetTail
</a>
</h3>
<pre>func OffsetTail(relativeOffset <a href="#Offset">Offset</a>) <a href="#Offset">Offset</a></pre>
<p>
OffsetTail returns the logical offset relativeOffset from current end of partition
</p>
<h3 id="Offset.Set">
func (Offset)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=7064:7109#L179">
Set
</a>
</h3>
<pre>func (o <a href="#Offset">Offset</a>) Set(offset interface{}) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
<p>
Set offset value, see NewOffset()
</p>
<h3 id="Offset.String">
func (Offset)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=6776:6807#L163">
String
</a>
</h3>
<pre>func (o <a href="#Offset">Offset</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<h2 id="OffsetsCommitted">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2266:2339#L74">
OffsetsCommitted
</a>
</h2>
<pre>type OffsetsCommitted struct {
Error <a href="http://golang.org/pkg/builtin/#error">error</a>
Offsets []<a href="#TopicPartition">TopicPartition</a>
}</pre>
<p>
OffsetsCommitted reports committed offsets
</p>
<h3 id="OffsetsCommitted.String">
func (OffsetsCommitted)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2341:2382#L79">
String
</a>
</h3>
<pre>func (o <a href="#OffsetsCommitted">OffsetsCommitted</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<h2 id="PartitionEOF">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2091:2123#L67">
PartitionEOF
</a>
</h2>
<pre>type PartitionEOF <a href="#TopicPartition">TopicPartition</a></pre>
<p>
PartitionEOF consumer reached end of partition
</p>
<h3 id="PartitionEOF.String">
func (PartitionEOF)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=2125:2162#L69">
String
</a>
</h3>
<pre>func (p <a href="#PartitionEOF">PartitionEOF</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<h2 id="PartitionMetadata">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1386:1503#L44">
PartitionMetadata
</a>
</h2>
<pre>type PartitionMetadata struct {
ID <a href="http://golang.org/pkg/builtin/#int32">int32</a>
Error <a href="#Error">Error</a>
Leader <a href="http://golang.org/pkg/builtin/#int32">int32</a>
Replicas []<a href="http://golang.org/pkg/builtin/#int32">int32</a>
Isrs []<a href="http://golang.org/pkg/builtin/#int32">int32</a>
}</pre>
<p>
PartitionMetadata contains per-partition metadata
</p>
<h2 id="Producer">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=1101:1270#L30">
Producer
</a>
</h2>
<pre>type Producer struct {
<span class="comment">// contains filtered or unexported fields</span>
}</pre>
<p>
Producer implements a High-level Apache Kafka Producer instance
</p>
<h3 id="NewProducer">
func
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=6249:6301#L203">
NewProducer
</a>
</h3>
<pre>func NewProducer(conf *<a href="#ConfigMap">ConfigMap</a>) (*<a href="#Producer">Producer</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
NewProducer creates a new high-level Producer instance.
</p>
<p>
conf is a *ConfigMap with standard librdkafka configuration properties, see here:
</p>
<p>
Supported special configuration properties:
</p>
<pre>go.batch.producer (bool, false) - Enable batch producer (experimental for increased performance).
These batches do not relate to Kafka message batches in any way.
go.delivery.reports (bool, true) - Forward per-message delivery reports to the
Events() channel.
go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
</pre>
<h3 id="Producer.Close">
func (*Producer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=5283:5309#L174">
Close
</a>
</h3>
<pre>func (p *<a href="#Producer">Producer</a>) Close()</pre>
<p>
Close a Producer instance.
The Producer object or its channels are no longer usable after this call.
</p>
<h3 id="Producer.Events">
func (*Producer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4035:4073#L134">
Events
</a>
</h3>
<pre>func (p *<a href="#Producer">Producer</a>) Events() chan <a href="#Event">Event</a></pre>
<p>
Events returns the Events channel (read)
</p>
<h3 id="Producer.Flush">
func (*Producer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4779:4822#L154">
Flush
</a>
</h3>
<pre>func (p *<a href="#Producer">Producer</a>) Flush(timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
<p>
Flush and wait for outstanding messages and requests to complete delivery.
Includes messages on ProduceChannel.
Runs until value reaches zero or on timeoutMs.
Returns the number of outstanding events still un-flushed.
</p>
<h3 id="Producer.GetMetadata">
func (*Producer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=9852:9947#L354">
GetMetadata
</a>
</h3>
<pre>func (p *<a href="#Producer">Producer</a>) GetMetadata(topic *<a href="http://golang.org/pkg/builtin/#string">string</a>, allTopics <a href="http://golang.org/pkg/builtin/#bool">bool</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (*<a href="#Metadata">Metadata</a>, <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
GetMetadata queries broker for cluster and topic metadata.
If topic is non-nil only information about that topic is returned, else if
allTopics is false only information about locally used topics is returned,
else information about all topics is returned.
</p>
<h3 id="Producer.Len">
func (*Producer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4429:4457#L146">
Len
</a>
</h3>
<pre>func (p *<a href="#Producer">Producer</a>) Len() <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
<p>
Len returns the number of messages and requests waiting to be transmitted to the broker
as well as delivery reports queued for the application.
Includes messages on ProduceChannel.
</p>
<h3 id="Producer.Produce">
func (*Producer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=3205:3276#L109">
Produce
</a>
</h3>
<pre>func (p *<a href="#Producer">Producer</a>) Produce(msg *<a href="#Message">Message</a>, deliveryChan chan <a href="#Event">Event</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
<p>
Produce single message.
This is an asynchronous call that enqueues the message on the internal
transmit queue, thus returning immediately.
The delivery report will be sent on the provided deliveryChan if specified,
or on the Producer object's Events() channel if not.
Returns an error if message could not be enqueued.
</p>
<h3 id="Producer.ProduceChannel">
func (*Producer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=4159:4208#L139">
ProduceChannel
</a>
</h3>
<pre>func (p *<a href="#Producer">Producer</a>) ProduceChannel() chan *<a href="#Message">Message</a></pre>
<p>
ProduceChannel returns the produce *Message channel (write)
</p>
<h3 id="Producer.QueryWatermarkOffsets">
func (*Producer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=10110:10225#L360">
QueryWatermarkOffsets
</a>
</h3>
<pre>func (p *<a href="#Producer">Producer</a>) QueryWatermarkOffsets(topic <a href="http://golang.org/pkg/builtin/#string">string</a>, partition <a href="http://golang.org/pkg/builtin/#int32">int32</a>, timeoutMs <a href="http://golang.org/pkg/builtin/#int">int</a>) (low, high <a href="http://golang.org/pkg/builtin/#int64">int64</a>, err <a href="http://golang.org/pkg/builtin/#error">error</a>)</pre>
<p>
QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
and partition.
</p>
<h3 id="Producer.String">
func (*Producer)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go?s=1336:1370#L40">
String
</a>
</h3>
<pre>func (p *<a href="#Producer">Producer</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<p>
String returns a human readable name for a Producer instance
</p>
<h2 id="RebalanceCb">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/consumer.go?s=854:899#L22">
RebalanceCb
</a>
</h2>
<pre>type RebalanceCb func(*<a href="#Consumer">Consumer</a>, <a href="#Event">Event</a>) <a href="http://golang.org/pkg/builtin/#error">error</a></pre>
<p>
RebalanceCb provides a per-Subscribe*() rebalance event callback.
The passed Event will be either AssignedPartitions or RevokedPartitions
</p>
<h2 id="RevokedPartitions">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1870:1932#L58">
RevokedPartitions
</a>
</h2>
<pre>type RevokedPartitions struct {
Partitions []<a href="#TopicPartition">TopicPartition</a>
}</pre>
<p>
RevokedPartitions consumer group rebalance event: revoked partition set
</p>
<h3 id="RevokedPartitions.String">
func (RevokedPartitions)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/event.go?s=1934:1976#L62">
String
</a>
</h3>
<pre>func (e <a href="#RevokedPartitions">RevokedPartitions</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<h2 id="TimestampType">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=1671:1693#L51">
TimestampType
</a>
</h2>
<pre>type TimestampType <a href="http://golang.org/pkg/builtin/#int">int</a></pre>
<p>
TimestampType is a the Message timestamp type or source
</p>
<h3 id="TimestampType.String">
func (TimestampType)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/message.go?s=2187:2225#L62">
String
</a>
</h3>
<pre>func (t <a href="#TimestampType">TimestampType</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<h2 id="TopicMetadata">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/metadata.go?s=1550:1648#L53">
TopicMetadata
</a>
</h2>
<pre>type TopicMetadata struct {
Topic <a href="http://golang.org/pkg/builtin/#string">string</a>
Partitions []<a href="#PartitionMetadata">PartitionMetadata</a>
Error <a href="#Error">Error</a>
}</pre>
<p>
TopicMetadata contains per-topic metadata
</p>
<h2 id="TopicPartition">
type
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=8377:8478#L236">
TopicPartition
</a>
</h2>
<pre>type TopicPartition struct {
Topic *<a href="http://golang.org/pkg/builtin/#string">string</a>
Partition <a href="http://golang.org/pkg/builtin/#int32">int32</a>
Offset <a href="#Offset">Offset</a>
Error <a href="http://golang.org/pkg/builtin/#error">error</a>
}</pre>
<p>
TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset.
</p>
<h3 id="TopicPartition.String">
func (TopicPartition)
<a href="http://golang.org/src/github.com/confluentinc/confluent-kafka-go/kafka/kafka.go?s=8480:8519#L243">
String
</a>
</h3>
<pre>func (p <a href="#TopicPartition">TopicPartition</a>) String() <a href="http://golang.org/pkg/builtin/#string">string</a></pre>
<div id="footer">
Build version go1.6.
<br>
Except as
<a href="https://developers.google.com/site-policies#restrictions">
noted
</a>
,
the content of this page is licensed under the
Creative Commons Attribution 3.0 License,
and code is licensed under a
<a href="http://golang.org/LICENSE">
BSD license
</a>
.
<br>
<a href="http://golang.org/doc/tos.html">
Terms of Service
</a>
|
<a href="http://www.google.com/intl/en/policies/privacy/">
Privacy Policy
</a>
</br>
</br>
</div>
</div>
<!-- .container -->
</div>
<!-- #page -->
<!-- TODO(adonovan): load these from <head> using "defer" attribute? -->
<script src="http://golang.org/lib/godoc/jquery.js" type="text/javascript">
</script>
<script src="http://golang.org/lib/godoc/jquery.treeview.js" type="text/javascript">
</script>
<script src="http://golang.org/lib/godoc/jquery.treeview.edit.js" type="text/javascript">
</script>
<script src="http://golang.org/lib/godoc/godocs.js" type="text/javascript">
</script>
</body>
</html>