WIP [VOL-2811] - Incorporate preliminary onu-adapter-go code into opencord repo
- reason "discovery-mibsync-complete" reached (via full MibUpload only, received data won't be stored yet)
- first review comments of patchset #4 considered
(please have a look into our inline-comments in Gerrit to know more about the current state)
- no refactoring done yet
Change-Id: Iac47817f8ce4bd28dd8132f530b0570d57ae99b8
Signed-off-by: Holger Hildebrandt <holger.hildebrandt@adtran.com>
@@ -0,0 +1,4 @@
@@ -0,0 +1,18 @@
+sudo: false
+language: go
+ - 1.10.x
+ - 1.9.x
+ - go get -u github.com/golang/dep/cmd/dep
+ - dep ensure
+ - make default test-race
+ apt:
+ packages:
+ - oracle-java8-set-default
@@ -0,0 +1,151 @@
+# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
+ name = "github.com/Shopify/sarama"
+ packages = ["."]
+ revision = "35324cf48e33d8260e1c7c18854465a904ade249"
+ version = "v1.17.0"
+ name = "github.com/davecgh/go-spew"
+ packages = ["spew"]
+ revision = "346938d642f2ec3594ed81d874461961cd0faa76"
+ version = "v1.1.0"
+ name = "github.com/eapache/go-resiliency"
+ packages = ["breaker"]
+ revision = "ea41b0fad31007accc7f806884dcdf3da98b79ce"
+ version = "v1.1.0"
+ branch = "master"
+ name = "github.com/eapache/go-xerial-snappy"
+ packages = ["."]
+ revision = "bb955e01b9346ac19dc29eb16586c90ded99a98c"
+ name = "github.com/eapache/queue"
+ packages = ["."]
+ revision = "44cc805cf13205b55f69e14bcb69867d1ae92f98"
+ version = "v1.1.0"
+ branch = "master"
+ name = "github.com/golang/snappy"
+ packages = ["."]
+ revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a"
+ name = "github.com/onsi/ginkgo"
+ packages = [
+ ".",
+ "config",
+ "extensions/table",
+ "internal/codelocation",
+ "internal/containernode",
+ "internal/failer",
+ "internal/leafnodes",
+ "internal/remote",
+ "internal/spec",
+ "internal/spec_iterator",
+ "internal/specrunner",
+ "internal/suite",
+ "internal/testingtproxy",
+ "internal/writer",
+ "reporters",
+ "reporters/stenographer",
+ "reporters/stenographer/support/go-colorable",
+ "reporters/stenographer/support/go-isatty",
+ "types"
+ ]
+ revision = "fa5fabab2a1bfbd924faf4c067d07ae414e2aedf"
+ version = "v1.5.0"
+ name = "github.com/onsi/gomega"
+ packages = [
+ ".",
+ "format",
+ "internal/assertion",
+ "internal/asyncassertion",
+ "internal/oraclematcher",
+ "internal/testingtsupport",
+ "matchers",
+ "matchers/support/goraph/bipartitegraph",
+ "matchers/support/goraph/edge",
+ "matchers/support/goraph/node",
+ "matchers/support/goraph/util",
+ "types"
+ ]
+ revision = "62bff4df71bdbc266561a0caee19f0594b17c240"
+ version = "v1.4.0"
+ name = "github.com/pierrec/lz4"
+ packages = [
+ ".",
+ "internal/xxh32"
+ ]
+ revision = "6b9367c9ff401dbc54fabce3fb8d972e799b702d"
+ version = "v2.0.2"
+ branch = "master"
+ name = "github.com/rcrowley/go-metrics"
+ packages = ["."]
+ revision = "e2704e165165ec55d062f5919b4b29494e9fa790"
+ branch = "master"
+ name = "golang.org/x/net"
+ packages = [
+ "html",
+ "html/atom",
+ "html/charset"
+ ]
+ revision = "afe8f62b1d6bbd81f31868121a50b06d8188e1f9"
+ branch = "master"
+ name = "golang.org/x/sys"
+ packages = ["unix"]
+ revision = "63fc586f45fe72d95d5240a5d5eb95e6503907d3"
+ name = "golang.org/x/text"
+ packages = [
+ "encoding",
+ "encoding/charmap",
+ "encoding/htmlindex",
+ "encoding/internal",
+ "encoding/internal/identifier",
+ "encoding/japanese",
+ "encoding/korean",
+ "encoding/simplifiedchinese",
+ "encoding/traditionalchinese",
+ "encoding/unicode",
+ "internal/gen",
+ "internal/tag",
+ "internal/utf8internal",
+ "language",
+ "runes",
+ "transform",
+ "unicode/cldr"
+ ]
+ revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
+ version = "v0.3.0"
+ name = "gopkg.in/yaml.v2"
+ packages = ["."]
+ revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
+ version = "v2.2.1"
+ analyzer-name = "dep"
+ analyzer-version = 1
+ inputs-digest = "2fa33a2d1ae87e0905ef09332bb4b3fda29179f6bcd48fd3b94070774b9e458b"
+ solver-name = "gps-cdcl"
+ solver-version = 1
@@ -0,0 +1,26 @@
+# Gopkg.toml example
+# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
+# for detailed Gopkg.toml documentation.
+# required = ["github.com/user/thing/cmd/thing"]
+# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
+# [[constraint]]
+# name = "github.com/user/project"
+# version = "1.0.0"
+# [[constraint]]
+# name = "github.com/user/project2"
+# branch = "dev"
+# source = "github.com/myfork/project2"
+# [[override]]
+# name = "github.com/x/y"
+# version = "2.4.0"
+ name = "github.com/Shopify/sarama"
+ version = "^1.14.0"
@@ -0,0 +1,22 @@
+(The MIT License)
+Copyright (c) 2017 Black Square Media Ltd
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+'Software'), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
@@ -0,0 +1,35 @@
+KAFKA_SRC= https://archive.apache.org/dist/kafka/$(KAFKA_VERSION)/$(KAFKA_DIR).tgz
+KAFKA_ROOT= testdata/$(KAFKA_DIR)
+PKG=$(shell go list ./... | grep -v vendor)
+default: vet test
+ go vet $(PKG)
+test: testdeps
+ KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60
+test-verbose: testdeps
+ KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v
+test-race: testdeps
+ KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v -race
+testdeps: $(KAFKA_ROOT)
+doc: README.md
+.PHONY: test testdeps vet doc
+# ---------------------------------------------------------------------
+ @mkdir -p $(dir $@)
+ cd $(dir $@) && curl -sSL $(KAFKA_SRC) | tar xz
+README.md: README.md.tpl $(wildcard *.go)
+ becca -package $(subst $(GOPATH)/src/,,$(PWD))
@@ -0,0 +1,151 @@
+# Sarama Cluster
+Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later).
+## Documentation
+Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster
+## Examples
+Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple
+topics and partitions are all passed to the single channel:
+package main
+import (
+ "fmt"
+ "log"
+ "os"
+ "os/signal"
+ cluster "github.com/bsm/sarama-cluster"
+func main() {
+ // init (custom) config, enable errors and notifications
+ config := cluster.NewConfig()
+ config.Consumer.Return.Errors = true
+ config.Group.Return.Notifications = true
+ // init consumer
+ brokers := []string{""}
+ topics := []string{"my_topic", "other_topic"}
+ consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
+ if err != nil {
+ panic(err)
+ }
+ defer consumer.Close()
+ // trap SIGINT to trigger a shutdown.
+ signals := make(chan os.Signal, 1)
+ signal.Notify(signals, os.Interrupt)
+ // consume errors
+ go func() {
+ for err := range consumer.Errors() {
+ log.Printf("Error: %s\n", err.Error())
+ }
+ }()
+ // consume notifications
+ go func() {
+ for ntf := range consumer.Notifications() {
+ log.Printf("Rebalanced: %+v\n", ntf)
+ }
+ }()
+ // consume messages, watch signals
+ for {
+ select {
+ case msg, ok := <-consumer.Messages():
+ if ok {
+ fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
+ consumer.MarkOffset(msg, "") // mark message as processed
+ }
+ case <-signals:
+ return
+ }
+ }
+Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level
+package main
+import (
+ "fmt"
+ "log"
+ "os"
+ "os/signal"
+ cluster "github.com/bsm/sarama-cluster"
+func main() {
+ // init (custom) config, set mode to ConsumerModePartitions
+ config := cluster.NewConfig()
+ config.Group.Mode = cluster.ConsumerModePartitions
+ // init consumer
+ brokers := []string{""}
+ topics := []string{"my_topic", "other_topic"}
+ consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
+ if err != nil {
+ panic(err)
+ }
+ defer consumer.Close()
+ // trap SIGINT to trigger a shutdown.
+ signals := make(chan os.Signal, 1)
+ signal.Notify(signals, os.Interrupt)
+ // consume partitions
+ for {
+ select {
+ case part, ok := <-consumer.Partitions():
+ if !ok {
+ return
+ }
+ // start a separate goroutine to consume messages
+ go func(pc cluster.PartitionConsumer) {
+ for msg := range pc.Messages() {
+ fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
+ consumer.MarkOffset(msg, "") // mark message as processed
+ }
+ }(part)
+ case <-signals:
+ return
+ }
+ }
+## Running tests
+You need to install Ginkgo & Gomega to run tests. Please see
+http://onsi.github.io/ginkgo for more details.
+To run tests, call:
+ $ make test
+## Troubleshooting
+### Consumer not receiving any messages?
+By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
+If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`.
@@ -0,0 +1,67 @@
+# Sarama Cluster
+Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later).
+## Documentation
+Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster
+## Examples
+Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple
+topics and partitions are all passed to the single channel:
+package main
+import (
+ "fmt"
+ "log"
+ "os"
+ "os/signal"
+ cluster "github.com/bsm/sarama-cluster"
+func main() {{ "ExampleConsumer" | code }}
+Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level
+package main
+import (
+ "fmt"
+ "log"
+ "os"
+ "os/signal"
+ cluster "github.com/bsm/sarama-cluster"
+func main() {{ "ExampleConsumer_Partitions" | code }}
+## Running tests
+You need to install Ginkgo & Gomega to run tests. Please see
+http://onsi.github.io/ginkgo for more details.
+To run tests, call:
+ $ make test
+## Troubleshooting
+### Consumer not receiving any messages?
+By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
+If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`.
@@ -0,0 +1,170 @@
+package cluster
+import (
+ "math"
+ "sort"
+ "github.com/Shopify/sarama"
+// NotificationType defines the type of notification
+type NotificationType uint8
+// String describes the notification type
+func (t NotificationType) String() string {
+ switch t {
+ case RebalanceStart:
+ return "rebalance start"
+ case RebalanceOK:
+ return "rebalance OK"
+ case RebalanceError:
+ return "rebalance error"
+ }
+ return "unknown"
+const (
+ UnknownNotification NotificationType = iota
+ RebalanceStart
+ RebalanceOK
+ RebalanceError
+// Notification are state events emitted by the consumers on rebalance
+type Notification struct {
+ // Type exposes the notification type
+ Type NotificationType
+ // Claimed contains topic/partitions that were claimed by this rebalance cycle
+ Claimed map[string][]int32
+ // Released contains topic/partitions that were released as part of this rebalance cycle
+ Released map[string][]int32
+ // Current are topic/partitions that are currently claimed to the consumer
+ Current map[string][]int32
+func newNotification(current map[string][]int32) *Notification {
+ return &Notification{
+ Type: RebalanceStart,
+ Current: current,
+ }
+func (n *Notification) success(current map[string][]int32) *Notification {
+ o := &Notification{
+ Type: RebalanceOK,
+ Claimed: make(map[string][]int32),
+ Released: make(map[string][]int32),
+ Current: current,
+ }
+ for topic, partitions := range current {
+ o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic]))
+ }
+ for topic, partitions := range n.Current {
+ o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic]))
+ }
+ return o
+// --------------------------------------------------------------------
+type topicInfo struct {
+ Partitions []int32
+ MemberIDs []string
+func (info topicInfo) Perform(s Strategy) map[string][]int32 {
+ if s == StrategyRoundRobin {
+ return info.RoundRobin()
+ }
+ return info.Ranges()
+func (info topicInfo) Ranges() map[string][]int32 {
+ sort.Strings(info.MemberIDs)
+ mlen := len(info.MemberIDs)
+ plen := len(info.Partitions)
+ res := make(map[string][]int32, mlen)
+ for pos, memberID := range info.MemberIDs {
+ n, i := float64(plen)/float64(mlen), float64(pos)
+ min := int(math.Floor(i*n + 0.5))
+ max := int(math.Floor((i+1)*n + 0.5))
+ sub := info.Partitions[min:max]
+ if len(sub) > 0 {
+ res[memberID] = sub
+ }
+ }
+ return res
+func (info topicInfo) RoundRobin() map[string][]int32 {
+ sort.Strings(info.MemberIDs)
+ mlen := len(info.MemberIDs)
+ res := make(map[string][]int32, mlen)
+ for i, pnum := range info.Partitions {
+ memberID := info.MemberIDs[i%mlen]
+ res[memberID] = append(res[memberID], pnum)
+ }
+ return res
+// --------------------------------------------------------------------
+type balancer struct {
+ client sarama.Client
+ topics map[string]topicInfo
+func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
+ balancer := newBalancer(client)
+ for memberID, meta := range members {
+ for _, topic := range meta.Topics {
+ if err := balancer.Topic(topic, memberID); err != nil {
+ return nil, err
+ }
+ }
+ }
+ return balancer, nil
+func newBalancer(client sarama.Client) *balancer {
+ return &balancer{
+ client: client,
+ topics: make(map[string]topicInfo),
+ }
+func (r *balancer) Topic(name string, memberID string) error {
+ topic, ok := r.topics[name]
+ if !ok {
+ nums, err := r.client.Partitions(name)
+ if err != nil {
+ return err
+ }
+ topic = topicInfo{
+ Partitions: nums,
+ MemberIDs: make([]string, 0, 1),
+ }
+ }
+ topic.MemberIDs = append(topic.MemberIDs, memberID)
+ r.topics[name] = topic
+ return nil
+func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
+ res := make(map[string]map[string][]int32, 1)
+ for topic, info := range r.topics {
+ for memberID, partitions := range info.Perform(s) {
+ if _, ok := res[memberID]; !ok {
+ res[memberID] = make(map[string][]int32, 1)
+ }
+ res[memberID][topic] = partitions
+ }
+ }
+ return res
@@ -0,0 +1,50 @@
+package cluster
+import (
+ "errors"
+ "sync/atomic"
+ "github.com/Shopify/sarama"
+var errClientInUse = errors.New("cluster: client is already used by another consumer")
+// Client is a group client
+type Client struct {
+ sarama.Client
+ config Config
+ inUse uint32
+// NewClient creates a new client instance
+func NewClient(addrs []string, config *Config) (*Client, error) {
+ if config == nil {
+ config = NewConfig()
+ }
+ if err := config.Validate(); err != nil {
+ return nil, err
+ }
+ client, err := sarama.NewClient(addrs, &config.Config)
+ if err != nil {
+ return nil, err
+ }
+ return &Client{Client: client, config: *config}, nil
+// ClusterConfig returns the cluster configuration.
+func (c *Client) ClusterConfig() *Config {
+ cfg := c.config
+ return &cfg
+func (c *Client) claim() bool {
+ return atomic.CompareAndSwapUint32(&c.inUse, 0, 1)
+func (c *Client) release() {
+ atomic.CompareAndSwapUint32(&c.inUse, 1, 0)
@@ -0,0 +1,25 @@
+package cluster
+// Strategy for partition to consumer assignement
+type Strategy string
+const (
+ // StrategyRange is the default and assigns partition ranges to consumers.
+ // Example with six partitions and two consumers:
+ // C1: [0, 1, 2]
+ // C2: [3, 4, 5]
+ StrategyRange Strategy = "range"
+ // StrategyRoundRobin assigns partitions by alternating over consumers.
+ // Example with six partitions and two consumers:
+ // C1: [0, 2, 4]
+ // C2: [1, 3, 5]
+ StrategyRoundRobin Strategy = "roundrobin"
+// Error instances are wrappers for internal errors with a context and
+// may be returned through the consumer's Errors() channel
+type Error struct {
+ Ctx string
+ error
@@ -0,0 +1,146 @@
+package cluster
+import (
+ "regexp"
+ "time"
+ "github.com/Shopify/sarama"
+var minVersion = sarama.V0_9_0_0
+type ConsumerMode uint8
+const (
+ ConsumerModeMultiplex ConsumerMode = iota
+ ConsumerModePartitions
+// Config extends sarama.Config with Group specific namespace
+type Config struct {
+ sarama.Config
+ // Group is the namespace for group management properties
+ Group struct {
+ // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange)
+ PartitionStrategy Strategy
+ // By default, messages and errors from the subscribed topics and partitions are all multiplexed and
+ // made available through the consumer's Messages() and Errors() channels.
+ //
+ // Users who require low-level access can enable ConsumerModePartitions where individual partitions
+ // are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions
+ // themselves.
+ Mode ConsumerMode
+ Offsets struct {
+ Retry struct {
+ // The numer retries when committing offsets (defaults to 3).
+ Max int
+ }
+ Synchronization struct {
+ // The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance
+ // NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration
+ DwellTime time.Duration
+ }
+ }
+ Session struct {
+ // The allowed session timeout for registered consumers (defaults to 30s).
+ // Must be within the allowed server range.
+ Timeout time.Duration
+ }
+ Heartbeat struct {
+ // Interval between each heartbeat (defaults to 3s). It should be no more
+ // than 1/3rd of the Group.Session.Timout setting
+ Interval time.Duration
+ }
+ // Return specifies which group channels will be populated. If they are set to true,
+ // you must read from the respective channels to prevent deadlock.
+ Return struct {
+ // If enabled, rebalance notification will be returned on the
+ // Notifications channel (default disabled).
+ Notifications bool
+ }
+ Topics struct {
+ // An additional whitelist of topics to subscribe to.
+ Whitelist *regexp.Regexp
+ // An additional blacklist of topics to avoid. If set, this will precede over
+ // the Whitelist setting.
+ Blacklist *regexp.Regexp
+ }
+ Member struct {
+ // Custom metadata to include when joining the group. The user data for all joined members
+ // can be retrieved by sending a DescribeGroupRequest to the broker that is the
+ // coordinator for the group.
+ UserData []byte
+ }
+ }
+// NewConfig returns a new configuration instance with sane defaults.
+func NewConfig() *Config {
+ c := &Config{
+ Config: *sarama.NewConfig(),
+ }
+ c.Group.PartitionStrategy = StrategyRange
+ c.Group.Offsets.Retry.Max = 3
+ c.Group.Offsets.Synchronization.DwellTime = c.Consumer.MaxProcessingTime
+ c.Group.Session.Timeout = 30 * time.Second
+ c.Group.Heartbeat.Interval = 3 * time.Second
+ c.Config.Version = minVersion
+ return c
+// Validate checks a Config instance. It will return a
+// sarama.ConfigurationError if the specified values don't make sense.
+func (c *Config) Validate() error {
+ if c.Group.Heartbeat.Interval%time.Millisecond != 0 {
+ sarama.Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
+ }
+ if c.Group.Session.Timeout%time.Millisecond != 0 {
+ sarama.Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
+ }
+ if c.Group.PartitionStrategy != StrategyRange && c.Group.PartitionStrategy != StrategyRoundRobin {
+ sarama.Logger.Println("Group.PartitionStrategy is not supported; range will be assumed.")
+ }
+ if !c.Version.IsAtLeast(minVersion) {
+ sarama.Logger.Println("Version is not supported; 0.9. will be assumed.")
+ c.Version = minVersion
+ }
+ if err := c.Config.Validate(); err != nil {
+ return err
+ }
+ // validate the Group values
+ switch {
+ case c.Group.Offsets.Retry.Max < 0:
+ return sarama.ConfigurationError("Group.Offsets.Retry.Max must be >= 0")
+ case c.Group.Offsets.Synchronization.DwellTime <= 0:
+ return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be > 0")
+ case c.Group.Offsets.Synchronization.DwellTime > 10*time.Minute:
+ return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be <= 10m")
+ case c.Group.Heartbeat.Interval <= 0:
+ return sarama.ConfigurationError("Group.Heartbeat.Interval must be > 0")
+ case c.Group.Session.Timeout <= 0:
+ return sarama.ConfigurationError("Group.Session.Timeout must be > 0")
+ case !c.Metadata.Full && c.Group.Topics.Whitelist != nil:
+ return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Whitelist is used")
+ case !c.Metadata.Full && c.Group.Topics.Blacklist != nil:
+ return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Blacklist is used")
+ }
+ // ensure offset is correct
+ switch c.Consumer.Offsets.Initial {
+ case sarama.OffsetOldest, sarama.OffsetNewest:
+ default:
+ return sarama.ConfigurationError("Consumer.Offsets.Initial must be either OffsetOldest or OffsetNewest")
+ }
+ return nil
@@ -0,0 +1,919 @@
+package cluster
+import (
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+ "github.com/Shopify/sarama"
+// Consumer is a cluster group consumer
+type Consumer struct {
+ client *Client
+ ownClient bool
+ consumer sarama.Consumer
+ subs *partitionMap
+ consumerID string
+ groupID string
+ memberID string
+ generationID int32
+ membershipMu sync.RWMutex
+ coreTopics []string
+ extraTopics []string
+ dying, dead chan none
+ closeOnce sync.Once
+ consuming int32
+ messages chan *sarama.ConsumerMessage
+ errors chan error
+ partitions chan PartitionConsumer
+ notifications chan *Notification
+ commitMu sync.Mutex
+// NewConsumer initializes a new consumer
+func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
+ client, err := NewClient(addrs, config)
+ if err != nil {
+ return nil, err
+ }
+ consumer, err := NewConsumerFromClient(client, groupID, topics)
+ if err != nil {
+ return nil, err
+ }
+ consumer.ownClient = true
+ return consumer, nil
+// NewConsumerFromClient initializes a new consumer from an existing client.
+// Please note that clients cannot be shared between consumers (due to Kafka internals),
+// they can only be re-used which requires the user to call Close() on the first consumer
+// before using this method again to initialize another one. Attempts to use a client with
+// more than one consumer at a time will return errors.
+func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
+ if !client.claim() {
+ return nil, errClientInUse
+ }
+ consumer, err := sarama.NewConsumerFromClient(client.Client)
+ if err != nil {
+ client.release()
+ return nil, err
+ }
+ sort.Strings(topics)
+ c := &Consumer{
+ client: client,
+ consumer: consumer,
+ subs: newPartitionMap(),
+ groupID: groupID,
+ coreTopics: topics,
+ dying: make(chan none),
+ dead: make(chan none),
+ messages: make(chan *sarama.ConsumerMessage),
+ errors: make(chan error, client.config.ChannelBufferSize),
+ partitions: make(chan PartitionConsumer, 1),
+ notifications: make(chan *Notification),
+ }
+ if err := c.client.RefreshCoordinator(groupID); err != nil {
+ client.release()
+ return nil, err
+ }
+ go c.mainLoop()
+ return c, nil
+// Messages returns the read channel for the messages that are returned by
+// the broker.
+// This channel will only return if Config.Group.Mode option is set to
+// ConsumerModeMultiplex (default).
+func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
+// Partitions returns the read channels for individual partitions of this broker.
+// This will channel will only return if Config.Group.Mode option is set to
+// ConsumerModePartitions.
+// The Partitions() channel must be listened to for the life of this consumer;
+// when a rebalance happens old partitions will be closed (naturally come to
+// completion) and new ones will be emitted. The returned channel will only close
+// when the consumer is completely shut down.
+func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
+// Errors returns a read channel of errors that occur during offset management, if
+// enabled. By default, errors are logged and not returned over this channel. If
+// you want to implement any custom error handling, set your config's
+// Consumer.Return.Errors setting to true, and read from this channel.
+func (c *Consumer) Errors() <-chan error { return c.errors }
+// Notifications returns a channel of Notifications that occur during consumer
+// rebalancing. Notifications will only be emitted over this channel, if your config's
+// Group.Return.Notifications setting to true.
+func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
+// HighWaterMarks returns the current high water marks for each topic and partition
+// Consistency between partitions is not guaranteed since high water marks are updated separately.
+func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
+// MarkOffset marks the provided message as processed, alongside a metadata string
+// that represents the state of the partition consumer at that point in time. The
+// metadata string can be used by another consumer to restore that state, so it
+// can resume consumption.
+// Note: calling MarkOffset does not necessarily commit the offset to the backend
+// store immediately for efficiency reasons, and it may never be committed if
+// your application crashes. This means that you may end up processing the same
+// message twice, and your processing should ideally be idempotent.
+func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
+ if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
+ sub.MarkOffset(msg.Offset, metadata)
+ }
+// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
+// See MarkOffset for additional explanation.
+func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
+ if sub := c.subs.Fetch(topic, partition); sub != nil {
+ sub.MarkOffset(offset, metadata)
+ }
+// MarkOffsets marks stashed offsets as processed.
+// See MarkOffset for additional explanation.
+func (c *Consumer) MarkOffsets(s *OffsetStash) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ for tp, info := range s.offsets {
+ if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
+ sub.MarkOffset(info.Offset, info.Metadata)
+ }
+ delete(s.offsets, tp)
+ }
+// ResetOffsets marks the provided message as processed, alongside a metadata string
+// that represents the state of the partition consumer at that point in time. The
+// metadata string can be used by another consumer to restore that state, so it
+// can resume consumption.
+// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
+func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
+ if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
+ sub.ResetOffset(msg.Offset, metadata)
+ }
+// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
+// See ResetOffset for additional explanation.
+func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
+ sub := c.subs.Fetch(topic, partition)
+ if sub != nil {
+ sub.ResetOffset(offset, metadata)
+ }
+// ResetOffsets marks stashed offsets as processed.
+// See ResetOffset for additional explanation.
+func (c *Consumer) ResetOffsets(s *OffsetStash) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ for tp, info := range s.offsets {
+ if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
+ sub.ResetOffset(info.Offset, info.Metadata)
+ }
+ delete(s.offsets, tp)
+ }
+// Subscriptions returns the consumed topics and partitions
+func (c *Consumer) Subscriptions() map[string][]int32 {
+ return c.subs.Info()
+// CommitOffsets allows to manually commit previously marked offsets. By default there is no
+// need to call this function as the consumer will commit offsets automatically
+// using the Config.Consumer.Offsets.CommitInterval setting.
+// Please be aware that calling this function during an internal rebalance cycle may return
+// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
+func (c *Consumer) CommitOffsets() error {
+ c.commitMu.Lock()
+ defer c.commitMu.Unlock()
+ memberID, generationID := c.membership()
+ req := &sarama.OffsetCommitRequest{
+ Version: 2,
+ ConsumerGroup: c.groupID,
+ ConsumerGroupGeneration: generationID,
+ ConsumerID: memberID,
+ RetentionTime: -1,
+ }
+ if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
+ req.RetentionTime = int64(ns / time.Millisecond)
+ }
+ snap := c.subs.Snapshot()
+ dirty := false
+ for tp, state := range snap {
+ if state.Dirty {
+ dirty = true
+ req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
+ }
+ }
+ if !dirty {
+ return nil
+ }
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return err
+ }
+ resp, err := broker.CommitOffset(req)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return err
+ }
+ for topic, errs := range resp.Errors {
+ for partition, kerr := range errs {
+ if kerr != sarama.ErrNoError {
+ err = kerr
+ } else if state, ok := snap[topicPartition{topic, partition}]; ok {
+ if sub := c.subs.Fetch(topic, partition); sub != nil {
+ sub.markCommitted(state.Info.Offset)
+ }
+ }
+ }
+ }
+ return err
+// Close safely closes the consumer and releases all resources
+func (c *Consumer) Close() (err error) {
+ c.closeOnce.Do(func() {
+ close(c.dying)
+ <-c.dead
+ if e := c.release(); e != nil {
+ err = e
+ }
+ if e := c.consumer.Close(); e != nil {
+ err = e
+ }
+ close(c.messages)
+ close(c.errors)
+ if e := c.leaveGroup(); e != nil {
+ err = e
+ }
+ close(c.partitions)
+ close(c.notifications)
+ // drain
+ for range c.messages {
+ }
+ for range c.errors {
+ }
+ for p := range c.partitions {
+ _ = p.Close()
+ }
+ for range c.notifications {
+ }
+ c.client.release()
+ if c.ownClient {
+ if e := c.client.Close(); e != nil {
+ err = e
+ }
+ }
+ })
+ return
+func (c *Consumer) mainLoop() {
+ defer close(c.dead)
+ defer atomic.StoreInt32(&c.consuming, 0)
+ for {
+ atomic.StoreInt32(&c.consuming, 0)
+ // Check if close was requested
+ select {
+ case <-c.dying:
+ return
+ default:
+ }
+ // Start next consume cycle
+ c.nextTick()
+ }
+func (c *Consumer) nextTick() {
+ // Remember previous subscriptions
+ var notification *Notification
+ if c.client.config.Group.Return.Notifications {
+ notification = newNotification(c.subs.Info())
+ }
+ // Refresh coordinator
+ if err := c.refreshCoordinator(); err != nil {
+ c.rebalanceError(err, nil)
+ return
+ }
+ // Release subscriptions
+ if err := c.release(); err != nil {
+ c.rebalanceError(err, nil)
+ return
+ }
+ // Issue rebalance start notification
+ if c.client.config.Group.Return.Notifications {
+ c.handleNotification(notification)
+ }
+ // Rebalance, fetch new subscriptions
+ subs, err := c.rebalance()
+ if err != nil {
+ c.rebalanceError(err, notification)
+ return
+ }
+ // Coordinate loops, make sure everything is
+ // stopped on exit
+ tomb := newLoopTomb()
+ defer tomb.Close()
+ // Start the heartbeat
+ tomb.Go(c.hbLoop)
+ // Subscribe to topic/partitions
+ if err := c.subscribe(tomb, subs); err != nil {
+ c.rebalanceError(err, notification)
+ return
+ }
+ // Update/issue notification with new claims
+ if c.client.config.Group.Return.Notifications {
+ notification = notification.success(subs)
+ c.handleNotification(notification)
+ }
+ // Start topic watcher loop
+ tomb.Go(c.twLoop)
+ // Start consuming and committing offsets
+ tomb.Go(c.cmLoop)
+ atomic.StoreInt32(&c.consuming, 1)
+ // Wait for signals
+ select {
+ case <-tomb.Dying():
+ case <-c.dying:
+ }
+// heartbeat loop, triggered by the mainLoop
+func (c *Consumer) hbLoop(stopped <-chan none) {
+ ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ switch err := c.heartbeat(); err {
+ case nil, sarama.ErrNoError:
+ case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
+ return
+ default:
+ c.handleError(&Error{Ctx: "heartbeat", error: err})
+ return
+ }
+ case <-stopped:
+ return
+ case <-c.dying:
+ return
+ }
+ }
+// topic watcher loop, triggered by the mainLoop
+func (c *Consumer) twLoop(stopped <-chan none) {
+ ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ topics, err := c.client.Topics()
+ if err != nil {
+ c.handleError(&Error{Ctx: "topics", error: err})
+ return
+ }
+ for _, topic := range topics {
+ if !c.isKnownCoreTopic(topic) &&
+ !c.isKnownExtraTopic(topic) &&
+ c.isPotentialExtraTopic(topic) {
+ return
+ }
+ }
+ case <-stopped:
+ return
+ case <-c.dying:
+ return
+ }
+ }
+// commit loop, triggered by the mainLoop
+func (c *Consumer) cmLoop(stopped <-chan none) {
+ ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
+ c.handleError(&Error{Ctx: "commit", error: err})
+ return
+ }
+ case <-stopped:
+ return
+ case <-c.dying:
+ return
+ }
+ }
+func (c *Consumer) rebalanceError(err error, n *Notification) {
+ if n != nil {
+ n.Type = RebalanceError
+ c.handleNotification(n)
+ }
+ switch err {
+ case sarama.ErrRebalanceInProgress:
+ default:
+ c.handleError(&Error{Ctx: "rebalance", error: err})
+ }
+ select {
+ case <-c.dying:
+ case <-time.After(c.client.config.Metadata.Retry.Backoff):
+ }
+func (c *Consumer) handleNotification(n *Notification) {
+ if c.client.config.Group.Return.Notifications {
+ select {
+ case c.notifications <- n:
+ case <-c.dying:
+ return
+ }
+ }
+func (c *Consumer) handleError(e *Error) {
+ if c.client.config.Consumer.Return.Errors {
+ select {
+ case c.errors <- e:
+ case <-c.dying:
+ return
+ }
+ } else {
+ sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
+ }
+// Releases the consumer and commits offsets, called from rebalance() and Close()
+func (c *Consumer) release() (err error) {
+ // Stop all consumers
+ c.subs.Stop()
+ // Clear subscriptions on exit
+ defer c.subs.Clear()
+ // Wait for messages to be processed
+ timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
+ defer timeout.Stop()
+ select {
+ case <-c.dying:
+ case <-timeout.C:
+ }
+ // Commit offsets, continue on errors
+ if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
+ err = e
+ }
+ return
+// --------------------------------------------------------------------
+// Performs a heartbeat, part of the mainLoop()
+func (c *Consumer) heartbeat() error {
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return err
+ }
+ memberID, generationID := c.membership()
+ resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
+ GroupId: c.groupID,
+ MemberId: memberID,
+ GenerationId: generationID,
+ })
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return err
+ }
+ return resp.Err
+// Performs a rebalance, part of the mainLoop()
+func (c *Consumer) rebalance() (map[string][]int32, error) {
+ memberID, _ := c.membership()
+ sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
+ allTopics, err := c.client.Topics()
+ if err != nil {
+ return nil, err
+ }
+ c.extraTopics = c.selectExtraTopics(allTopics)
+ sort.Strings(c.extraTopics)
+ // Re-join consumer group
+ strategy, err := c.joinGroup()
+ switch {
+ case err == sarama.ErrUnknownMemberId:
+ c.membershipMu.Lock()
+ c.memberID = ""
+ c.membershipMu.Unlock()
+ return nil, err
+ case err != nil:
+ return nil, err
+ }
+ // Sync consumer group state, fetch subscriptions
+ subs, err := c.syncGroup(strategy)
+ switch {
+ case err == sarama.ErrRebalanceInProgress:
+ return nil, err
+ case err != nil:
+ _ = c.leaveGroup()
+ return nil, err
+ }
+ return subs, nil
+// Performs the subscription, part of the mainLoop()
+func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
+ // fetch offsets
+ offsets, err := c.fetchOffsets(subs)
+ if err != nil {
+ _ = c.leaveGroup()
+ return err
+ }
+ // create consumers in parallel
+ var mu sync.Mutex
+ var wg sync.WaitGroup
+ for topic, partitions := range subs {
+ for _, partition := range partitions {
+ wg.Add(1)
+ info := offsets[topic][partition]
+ go func(topic string, partition int32) {
+ if e := c.createConsumer(tomb, topic, partition, info); e != nil {
+ mu.Lock()
+ err = e
+ mu.Unlock()
+ }
+ wg.Done()
+ }(topic, partition)
+ }
+ }
+ wg.Wait()
+ if err != nil {
+ _ = c.release()
+ _ = c.leaveGroup()
+ }
+ return err
+// --------------------------------------------------------------------
+// Send a request to the broker to join group on rebalance()
+func (c *Consumer) joinGroup() (*balancer, error) {
+ memberID, _ := c.membership()
+ req := &sarama.JoinGroupRequest{
+ GroupId: c.groupID,
+ MemberId: memberID,
+ SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
+ ProtocolType: "consumer",
+ }
+ meta := &sarama.ConsumerGroupMemberMetadata{
+ Version: 1,
+ Topics: append(c.coreTopics, c.extraTopics...),
+ UserData: c.client.config.Group.Member.UserData,
+ }
+ err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
+ if err != nil {
+ return nil, err
+ }
+ err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
+ if err != nil {
+ return nil, err
+ }
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ }
+ resp, err := broker.JoinGroup(req)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ } else if resp.Err != sarama.ErrNoError {
+ c.closeCoordinator(broker, resp.Err)
+ return nil, resp.Err
+ }
+ var strategy *balancer
+ if resp.LeaderId == resp.MemberId {
+ members, err := resp.GetMembers()
+ if err != nil {
+ return nil, err
+ }
+ strategy, err = newBalancerFromMeta(c.client, members)
+ if err != nil {
+ return nil, err
+ }
+ }
+ c.membershipMu.Lock()
+ c.memberID = resp.MemberId
+ c.generationID = resp.GenerationId
+ c.membershipMu.Unlock()
+ return strategy, nil
+// Send a request to the broker to sync the group on rebalance().
+// Returns a list of topics and partitions to consume.
+func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
+ memberID, generationID := c.membership()
+ req := &sarama.SyncGroupRequest{
+ GroupId: c.groupID,
+ MemberId: memberID,
+ GenerationId: generationID,
+ }
+ if strategy != nil {
+ for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
+ if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
+ Topics: topics,
+ }); err != nil {
+ return nil, err
+ }
+ }
+ }
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ }
+ resp, err := broker.SyncGroup(req)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ } else if resp.Err != sarama.ErrNoError {
+ c.closeCoordinator(broker, resp.Err)
+ return nil, resp.Err
+ }
+ // Return if there is nothing to subscribe to
+ if len(resp.MemberAssignment) == 0 {
+ return nil, nil
+ }
+ // Get assigned subscriptions
+ members, err := resp.GetMemberAssignment()
+ if err != nil {
+ return nil, err
+ }
+ // Sort partitions, for each topic
+ for topic := range members.Topics {
+ sort.Sort(int32Slice(members.Topics[topic]))
+ }
+ return members.Topics, nil
+// Fetches latest committed offsets for all subscriptions
+func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
+ offsets := make(map[string]map[int32]offsetInfo, len(subs))
+ req := &sarama.OffsetFetchRequest{
+ Version: 1,
+ ConsumerGroup: c.groupID,
+ }
+ for topic, partitions := range subs {
+ offsets[topic] = make(map[int32]offsetInfo, len(partitions))
+ for _, partition := range partitions {
+ offsets[topic][partition] = offsetInfo{Offset: -1}
+ req.AddPartition(topic, partition)
+ }
+ }
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ }
+ resp, err := broker.FetchOffset(req)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ }
+ for topic, partitions := range subs {
+ for _, partition := range partitions {
+ block := resp.GetBlock(topic, partition)
+ if block == nil {
+ return nil, sarama.ErrIncompleteResponse
+ }
+ if block.Err == sarama.ErrNoError {
+ offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
+ } else {
+ return nil, block.Err
+ }
+ }
+ }
+ return offsets, nil
+// Send a request to the broker to leave the group on failes rebalance() and on Close()
+func (c *Consumer) leaveGroup() error {
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return err
+ }
+ memberID, _ := c.membership()
+ if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
+ GroupId: c.groupID,
+ MemberId: memberID,
+ }); err != nil {
+ c.closeCoordinator(broker, err)
+ }
+ return err
+// --------------------------------------------------------------------
+func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
+ memberID, _ := c.membership()
+ sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
+ // Create partitionConsumer
+ pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
+ if err != nil {
+ return err
+ }
+ // Store in subscriptions
+ c.subs.Store(topic, partition, pc)
+ // Start partition consumer goroutine
+ tomb.Go(func(stopper <-chan none) {
+ if c.client.config.Group.Mode == ConsumerModePartitions {
+ pc.waitFor(stopper, c.errors)
+ } else {
+ pc.multiplex(stopper, c.messages, c.errors)
+ }
+ })
+ if c.client.config.Group.Mode == ConsumerModePartitions {
+ c.partitions <- pc
+ }
+ return nil
+func (c *Consumer) commitOffsetsWithRetry(retries int) error {
+ err := c.CommitOffsets()
+ if err != nil && retries > 0 {
+ return c.commitOffsetsWithRetry(retries - 1)
+ }
+ return err
+func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
+ if broker != nil {
+ _ = broker.Close()
+ }
+ switch err {
+ case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
+ _ = c.client.RefreshCoordinator(c.groupID)
+ }
+func (c *Consumer) selectExtraTopics(allTopics []string) []string {
+ extra := allTopics[:0]
+ for _, topic := range allTopics {
+ if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
+ extra = append(extra, topic)
+ }
+ }
+ return extra
+func (c *Consumer) isKnownCoreTopic(topic string) bool {
+ pos := sort.SearchStrings(c.coreTopics, topic)
+ return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
+func (c *Consumer) isKnownExtraTopic(topic string) bool {
+ pos := sort.SearchStrings(c.extraTopics, topic)
+ return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
+func (c *Consumer) isPotentialExtraTopic(topic string) bool {
+ rx := c.client.config.Group.Topics
+ if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
+ return false
+ }
+ if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
+ return true
+ }
+ return false
+func (c *Consumer) refreshCoordinator() error {
+ if err := c.refreshMetadata(); err != nil {
+ return err
+ }
+ return c.client.RefreshCoordinator(c.groupID)
+func (c *Consumer) refreshMetadata() (err error) {
+ if c.client.config.Metadata.Full {
+ err = c.client.RefreshMetadata()
+ } else {
+ var topics []string
+ if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
+ err = c.client.RefreshMetadata(topics...)
+ }
+ }
+ // maybe we didn't have authorization to describe all topics
+ switch err {
+ case sarama.ErrTopicAuthorizationFailed:
+ err = c.client.RefreshMetadata(c.coreTopics...)
+ }
+ return
+func (c *Consumer) membership() (memberID string, generationID int32) {
+ c.membershipMu.RLock()
+ memberID, generationID = c.memberID, c.generationID
+ c.membershipMu.RUnlock()
+ return
@@ -0,0 +1,8 @@
+Package cluster provides cluster extensions for Sarama, enabing users
+to consume topics across from multiple, balanced nodes.
+It requires Kafka v0.9+ and follows the steps guide, described in:
+package cluster
@@ -0,0 +1,69 @@
+package cluster
+import (
+ "sync"
+ "github.com/Shopify/sarama"
+// OffsetStash allows to accumulate offsets and
+// mark them as processed in a bulk
+type OffsetStash struct {
+ offsets map[topicPartition]offsetInfo
+ mu sync.Mutex
+// NewOffsetStash inits a blank stash
+func NewOffsetStash() *OffsetStash {
+ return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
+// MarkOffset stashes the provided message offset
+func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
+ s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
+// MarkPartitionOffset stashes the offset for the provided topic/partition combination
+func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ key := topicPartition{Topic: topic, Partition: partition}
+ if info := s.offsets[key]; offset >= info.Offset {
+ info.Offset = offset
+ info.Metadata = metadata
+ s.offsets[key] = info
+ }
+// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
+// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
+func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ key := topicPartition{Topic: topic, Partition: partition}
+ if info := s.offsets[key]; offset <= info.Offset {
+ info.Offset = offset
+ info.Metadata = metadata
+ s.offsets[key] = info
+ }
+// ResetOffset stashes the provided message offset
+// See ResetPartitionOffset for explanation
+func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
+ s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
+// Offsets returns the latest stashed offsets by topic-partition
+func (s *OffsetStash) Offsets() map[string]int64 {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ res := make(map[string]int64, len(s.offsets))
+ for tp, info := range s.offsets {
+ res[tp.String()] = info.Offset
+ }
+ return res
@@ -0,0 +1,290 @@
+package cluster
+import (
+ "sort"
+ "sync"
+ "time"
+ "github.com/Shopify/sarama"
+// PartitionConsumer allows code to consume individual partitions from the cluster.
+// See docs for Consumer.Partitions() for more on how to implement this.
+type PartitionConsumer interface {
+ sarama.PartitionConsumer
+ // Topic returns the consumed topic name
+ Topic() string
+ // Partition returns the consumed partition
+ Partition() int32
+ // InitialOffset returns the offset used for creating the PartitionConsumer instance.
+ // The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
+ InitialOffset() int64
+ // MarkOffset marks the offset of a message as preocessed.
+ MarkOffset(offset int64, metadata string)
+ // ResetOffset resets the offset to a previously processed message.
+ ResetOffset(offset int64, metadata string)
+type partitionConsumer struct {
+ sarama.PartitionConsumer
+ state partitionState
+ mu sync.Mutex
+ topic string
+ partition int32
+ initialOffset int64
+ closeOnce sync.Once
+ closeErr error
+ dying, dead chan none
+func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
+ offset := info.NextOffset(defaultOffset)
+ pcm, err := manager.ConsumePartition(topic, partition, offset)
+ // Resume from default offset, if requested offset is out-of-range
+ if err == sarama.ErrOffsetOutOfRange {
+ info.Offset = -1
+ offset = defaultOffset
+ pcm, err = manager.ConsumePartition(topic, partition, offset)
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &partitionConsumer{
+ PartitionConsumer: pcm,
+ state: partitionState{Info: info},
+ topic: topic,
+ partition: partition,
+ initialOffset: offset,
+ dying: make(chan none),
+ dead: make(chan none),
+ }, nil
+// Topic implements PartitionConsumer
+func (c *partitionConsumer) Topic() string { return c.topic }
+// Partition implements PartitionConsumer
+func (c *partitionConsumer) Partition() int32 { return c.partition }
+// InitialOffset implements PartitionConsumer
+func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset }
+// AsyncClose implements PartitionConsumer
+func (c *partitionConsumer) AsyncClose() {
+ c.closeOnce.Do(func() {
+ c.closeErr = c.PartitionConsumer.Close()
+ close(c.dying)
+ })
+// Close implements PartitionConsumer
+func (c *partitionConsumer) Close() error {
+ c.AsyncClose()
+ <-c.dead
+ return c.closeErr
+func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) {
+ defer close(c.dead)
+ for {
+ select {
+ case err, ok := <-c.Errors():
+ if !ok {
+ return
+ }
+ select {
+ case errors <- err:
+ case <-stopper:
+ return
+ case <-c.dying:
+ return
+ }
+ case <-stopper:
+ return
+ case <-c.dying:
+ return
+ }
+ }
+func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
+ defer close(c.dead)
+ for {
+ select {
+ case msg, ok := <-c.Messages():
+ if !ok {
+ return
+ }
+ select {
+ case messages <- msg:
+ case <-stopper:
+ return
+ case <-c.dying:
+ return
+ }
+ case err, ok := <-c.Errors():
+ if !ok {
+ return
+ }
+ select {
+ case errors <- err:
+ case <-stopper:
+ return
+ case <-c.dying:
+ return
+ }
+ case <-stopper:
+ return
+ case <-c.dying:
+ return
+ }
+ }
+func (c *partitionConsumer) getState() partitionState {
+ c.mu.Lock()
+ state := c.state
+ c.mu.Unlock()
+ return state
+func (c *partitionConsumer) markCommitted(offset int64) {
+ c.mu.Lock()
+ if offset == c.state.Info.Offset {
+ c.state.Dirty = false
+ }
+ c.mu.Unlock()
+// MarkOffset implements PartitionConsumer
+func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
+ c.mu.Lock()
+ if next := offset + 1; next > c.state.Info.Offset {
+ c.state.Info.Offset = next
+ c.state.Info.Metadata = metadata
+ c.state.Dirty = true
+ }
+ c.mu.Unlock()
+// ResetOffset implements PartitionConsumer
+func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
+ c.mu.Lock()
+ if next := offset + 1; next <= c.state.Info.Offset {
+ c.state.Info.Offset = next
+ c.state.Info.Metadata = metadata
+ c.state.Dirty = true
+ }
+ c.mu.Unlock()
+// --------------------------------------------------------------------
+type partitionState struct {
+ Info offsetInfo
+ Dirty bool
+ LastCommit time.Time
+// --------------------------------------------------------------------
+type partitionMap struct {
+ data map[topicPartition]*partitionConsumer
+ mu sync.RWMutex
+func newPartitionMap() *partitionMap {
+ return &partitionMap{
+ data: make(map[topicPartition]*partitionConsumer),
+ }
+func (m *partitionMap) IsSubscribedTo(topic string) bool {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ for tp := range m.data {
+ if tp.Topic == topic {
+ return true
+ }
+ }
+ return false
+func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer {
+ m.mu.RLock()
+ pc, _ := m.data[topicPartition{topic, partition}]
+ m.mu.RUnlock()
+ return pc
+func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) {
+ m.mu.Lock()
+ m.data[topicPartition{topic, partition}] = pc
+ m.mu.Unlock()
+func (m *partitionMap) Snapshot() map[topicPartition]partitionState {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ snap := make(map[topicPartition]partitionState, len(m.data))
+ for tp, pc := range m.data {
+ snap[tp] = pc.getState()
+ }
+ return snap
+func (m *partitionMap) Stop() {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ var wg sync.WaitGroup
+ for tp := range m.data {
+ wg.Add(1)
+ go func(p *partitionConsumer) {
+ _ = p.Close()
+ wg.Done()
+ }(m.data[tp])
+ }
+ wg.Wait()
+func (m *partitionMap) Clear() {
+ m.mu.Lock()
+ for tp := range m.data {
+ delete(m.data, tp)
+ }
+ m.mu.Unlock()
+func (m *partitionMap) Info() map[string][]int32 {
+ info := make(map[string][]int32)
+ m.mu.RLock()
+ for tp := range m.data {
+ info[tp.Topic] = append(info[tp.Topic], tp.Partition)
+ }
+ m.mu.RUnlock()
+ for topic := range info {
+ sort.Sort(int32Slice(info[topic]))
+ }
+ return info
diff --git a/vendor/github.com/bsm/sarama-cluster/util.go b/vendor/github.com/bsm/sarama-cluster/util.go
@@ -0,0 +1,75 @@
+package cluster
+import (
+ "fmt"
+ "sort"
+ "sync"
+type none struct{}
+type topicPartition struct {
+ Topic string
+ Partition int32
+func (tp *topicPartition) String() string {
+ return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition)
+type offsetInfo struct {
+ Offset int64
+ Metadata string
+func (i offsetInfo) NextOffset(fallback int64) int64 {
+ if i.Offset > -1 {
+ return i.Offset
+ }
+ return fallback
+type int32Slice []int32
+func (p int32Slice) Len() int { return len(p) }
+func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+func (p int32Slice) Diff(o int32Slice) (res []int32) {
+ on := len(o)
+ for _, x := range p {
+ n := sort.Search(on, func(i int) bool { return o[i] >= x })
+ if n < on && o[n] == x {
+ continue
+ }
+ res = append(res, x)
+ }
+ return
+// --------------------------------------------------------------------
+type loopTomb struct {
+ c chan none
+ o sync.Once
+ w sync.WaitGroup
+func newLoopTomb() *loopTomb {
+ return &loopTomb{c: make(chan none)}
+func (t *loopTomb) stop() { t.o.Do(func() { close(t.c) }) }
+func (t *loopTomb) Close() { t.stop(); t.w.Wait() }
+func (t *loopTomb) Dying() <-chan none { return t.c }
+func (t *loopTomb) Go(f func(<-chan none)) {
+ t.w.Add(1)
+ go func() {
+ defer t.stop()
+ defer t.w.Done()
+ f(t.c)
+ }()