VOL-1460 Updated core to use voltha-protos instead of local protos
Moved protos to python directory in order to maintain functionaly of containers built there.
Added capability to do local builds of protos
Added instructions on running dep ensure for getting protos.
Updated github.com/golang/protobuf version to v1.3.1
Change-Id: Ia6ef55f07f0d5dcb5b750d7c37b21b71db85bfc4
diff --git a/vendor/github.com/Shopify/sarama/.travis.yml b/vendor/github.com/Shopify/sarama/.travis.yml
index eb54a0d..b7f7874 100644
--- a/vendor/github.com/Shopify/sarama/.travis.yml
+++ b/vendor/github.com/Shopify/sarama/.travis.yml
@@ -11,9 +11,9 @@
- KAFKA_HOSTNAME=localhost
- DEBUG=true
matrix:
- - KAFKA_VERSION=1.1.1
- - KAFKA_VERSION=2.0.1
- - KAFKA_VERSION=2.1.0
+ - KAFKA_VERSION=1.1.1 KAFKA_SCALA_VERSION=2.11
+ - KAFKA_VERSION=2.0.1 KAFKA_SCALA_VERSION=2.12
+ - KAFKA_VERSION=2.1.0 KAFKA_SCALA_VERSION=2.12
before_install:
- export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md
index afc8511..0e69c2d 100644
--- a/vendor/github.com/Shopify/sarama/CHANGELOG.md
+++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md
@@ -1,5 +1,41 @@
# Changelog
+#### Version 1.21.0 (2019-02-24)
+
+New Features:
+- Add CreateAclRequest, DescribeAclRequest, DeleteAclRequest
+ ([1236](https://github.com/Shopify/sarama/pull/1236)).
+- Add DescribeTopic, DescribeConsumerGroup, ListConsumerGroups, ListConsumerGroupOffsets admin requests
+ ([1178](https://github.com/Shopify/sarama/pull/1178)).
+- Implement SASL/OAUTHBEARER
+ ([1240](https://github.com/Shopify/sarama/pull/1240)).
+
+Improvements:
+- Add Go mod support
+ ([1282](https://github.com/Shopify/sarama/pull/1282)).
+- Add error codes 73—76
+ ([1239](https://github.com/Shopify/sarama/pull/1239)).
+- Add retry backoff function
+ ([1160](https://github.com/Shopify/sarama/pull/1160)).
+- Maintain metadata in the producer even when retries are disabled
+ ([1189](https://github.com/Shopify/sarama/pull/1189)).
+- Include ReplicaAssignment in ListTopics
+ ([1274](https://github.com/Shopify/sarama/pull/1274)).
+- Add producer performance tool
+ ([1222](https://github.com/Shopify/sarama/pull/1222)).
+- Add support LogAppend timestamps
+ ([1258](https://github.com/Shopify/sarama/pull/1258)).
+
+Bug Fixes:
+- Fix potential deadlock when a heartbeat request fails
+ ([1286](https://github.com/Shopify/sarama/pull/1286)).
+- Fix consuming compacted topic
+ ([1227](https://github.com/Shopify/sarama/pull/1227)).
+- Set correct Kafka version for DescribeConfigsRequest v1
+ ([1277](https://github.com/Shopify/sarama/pull/1277)).
+- Update kafka test version
+ ([1273](https://github.com/Shopify/sarama/pull/1273)).
+
#### Version 1.20.1 (2019-01-10)
New Features:
diff --git a/vendor/github.com/Shopify/sarama/Makefile b/vendor/github.com/Shopify/sarama/Makefile
index 8fcf219..09f743c 100644
--- a/vendor/github.com/Shopify/sarama/Makefile
+++ b/vendor/github.com/Shopify/sarama/Makefile
@@ -1,3 +1,5 @@
+export GO111MODULE=on
+
default: fmt vet errcheck test
# Taken from https://github.com/codecov/example-go#caveat-multiple-files
diff --git a/vendor/github.com/Shopify/sarama/acl_bindings.go b/vendor/github.com/Shopify/sarama/acl_bindings.go
index 5151735..b91c282 100644
--- a/vendor/github.com/Shopify/sarama/acl_bindings.go
+++ b/vendor/github.com/Shopify/sarama/acl_bindings.go
@@ -1,17 +1,26 @@
package sarama
type Resource struct {
- ResourceType AclResourceType
- ResourceName string
+ ResourceType AclResourceType
+ ResourceName string
+ ResoucePatternType AclResourcePatternType
}
-func (r *Resource) encode(pe packetEncoder) error {
+func (r *Resource) encode(pe packetEncoder, version int16) error {
pe.putInt8(int8(r.ResourceType))
if err := pe.putString(r.ResourceName); err != nil {
return err
}
+ if version == 1 {
+ if r.ResoucePatternType == AclPatternUnknown {
+ Logger.Print("Cannot encode an unknown resource pattern type, using Literal instead")
+ r.ResoucePatternType = AclPatternLiteral
+ }
+ pe.putInt8(int8(r.ResoucePatternType))
+ }
+
return nil
}
@@ -25,6 +34,13 @@
if r.ResourceName, err = pd.getString(); err != nil {
return err
}
+ if version == 1 {
+ pattern, err := pd.getInt8()
+ if err != nil {
+ return err
+ }
+ r.ResoucePatternType = AclResourcePatternType(pattern)
+ }
return nil
}
@@ -80,8 +96,8 @@
Acls []*Acl
}
-func (r *ResourceAcls) encode(pe packetEncoder) error {
- if err := r.Resource.encode(pe); err != nil {
+func (r *ResourceAcls) encode(pe packetEncoder, version int16) error {
+ if err := r.Resource.encode(pe, version); err != nil {
return err
}
diff --git a/vendor/github.com/Shopify/sarama/acl_create_request.go b/vendor/github.com/Shopify/sarama/acl_create_request.go
index 0b6ecbe..d3d5ad8 100644
--- a/vendor/github.com/Shopify/sarama/acl_create_request.go
+++ b/vendor/github.com/Shopify/sarama/acl_create_request.go
@@ -1,6 +1,7 @@
package sarama
type CreateAclsRequest struct {
+ Version int16
AclCreations []*AclCreation
}
@@ -10,7 +11,7 @@
}
for _, aclCreation := range c.AclCreations {
- if err := aclCreation.encode(pe); err != nil {
+ if err := aclCreation.encode(pe, c.Version); err != nil {
return err
}
}
@@ -19,6 +20,7 @@
}
func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error) {
+ c.Version = version
n, err := pd.getArrayLength()
if err != nil {
return err
@@ -41,11 +43,16 @@
}
func (d *CreateAclsRequest) version() int16 {
- return 0
+ return d.Version
}
func (d *CreateAclsRequest) requiredVersion() KafkaVersion {
- return V0_11_0_0
+ switch d.Version {
+ case 1:
+ return V2_0_0_0
+ default:
+ return V0_11_0_0
+ }
}
type AclCreation struct {
@@ -53,8 +60,8 @@
Acl
}
-func (a *AclCreation) encode(pe packetEncoder) error {
- if err := a.Resource.encode(pe); err != nil {
+func (a *AclCreation) encode(pe packetEncoder, version int16) error {
+ if err := a.Resource.encode(pe, version); err != nil {
return err
}
if err := a.Acl.encode(pe); err != nil {
diff --git a/vendor/github.com/Shopify/sarama/acl_delete_request.go b/vendor/github.com/Shopify/sarama/acl_delete_request.go
index 4133dce..5e94ad7 100644
--- a/vendor/github.com/Shopify/sarama/acl_delete_request.go
+++ b/vendor/github.com/Shopify/sarama/acl_delete_request.go
@@ -1,6 +1,7 @@
package sarama
type DeleteAclsRequest struct {
+ Version int
Filters []*AclFilter
}
@@ -10,6 +11,7 @@
}
for _, filter := range d.Filters {
+ filter.Version = d.Version
if err := filter.encode(pe); err != nil {
return err
}
@@ -19,6 +21,7 @@
}
func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error) {
+ d.Version = int(version)
n, err := pd.getArrayLength()
if err != nil {
return err
@@ -27,6 +30,7 @@
d.Filters = make([]*AclFilter, n)
for i := 0; i < n; i++ {
d.Filters[i] = new(AclFilter)
+ d.Filters[i].Version = int(version)
if err := d.Filters[i].decode(pd, version); err != nil {
return err
}
@@ -40,9 +44,14 @@
}
func (d *DeleteAclsRequest) version() int16 {
- return 0
+ return int16(d.Version)
}
func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
- return V0_11_0_0
+ switch d.Version {
+ case 1:
+ return V2_0_0_0
+ default:
+ return V0_11_0_0
+ }
}
diff --git a/vendor/github.com/Shopify/sarama/acl_delete_response.go b/vendor/github.com/Shopify/sarama/acl_delete_response.go
index b5e1c45..a885fe5 100644
--- a/vendor/github.com/Shopify/sarama/acl_delete_response.go
+++ b/vendor/github.com/Shopify/sarama/acl_delete_response.go
@@ -3,6 +3,7 @@
import "time"
type DeleteAclsResponse struct {
+ Version int16
ThrottleTime time.Duration
FilterResponses []*FilterResponse
}
@@ -15,7 +16,7 @@
}
for _, filterResponse := range a.FilterResponses {
- if err := filterResponse.encode(pe); err != nil {
+ if err := filterResponse.encode(pe, a.Version); err != nil {
return err
}
}
@@ -51,7 +52,7 @@
}
func (d *DeleteAclsResponse) version() int16 {
- return 0
+ return int16(d.Version)
}
func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
@@ -64,7 +65,7 @@
MatchingAcls []*MatchingAcl
}
-func (f *FilterResponse) encode(pe packetEncoder) error {
+func (f *FilterResponse) encode(pe packetEncoder, version int16) error {
pe.putInt16(int16(f.Err))
if err := pe.putNullableString(f.ErrMsg); err != nil {
return err
@@ -74,7 +75,7 @@
return err
}
for _, matchingAcl := range f.MatchingAcls {
- if err := matchingAcl.encode(pe); err != nil {
+ if err := matchingAcl.encode(pe, version); err != nil {
return err
}
}
@@ -115,13 +116,13 @@
Acl
}
-func (m *MatchingAcl) encode(pe packetEncoder) error {
+func (m *MatchingAcl) encode(pe packetEncoder, version int16) error {
pe.putInt16(int16(m.Err))
if err := pe.putNullableString(m.ErrMsg); err != nil {
return err
}
- if err := m.Resource.encode(pe); err != nil {
+ if err := m.Resource.encode(pe, version); err != nil {
return err
}
diff --git a/vendor/github.com/Shopify/sarama/acl_describe_request.go b/vendor/github.com/Shopify/sarama/acl_describe_request.go
index 02a5a1f..3c95320 100644
--- a/vendor/github.com/Shopify/sarama/acl_describe_request.go
+++ b/vendor/github.com/Shopify/sarama/acl_describe_request.go
@@ -1,14 +1,18 @@
package sarama
type DescribeAclsRequest struct {
+ Version int
AclFilter
}
func (d *DescribeAclsRequest) encode(pe packetEncoder) error {
+ d.AclFilter.Version = d.Version
return d.AclFilter.encode(pe)
}
func (d *DescribeAclsRequest) decode(pd packetDecoder, version int16) (err error) {
+ d.Version = int(version)
+ d.AclFilter.Version = int(version)
return d.AclFilter.decode(pd, version)
}
@@ -17,9 +21,14 @@
}
func (d *DescribeAclsRequest) version() int16 {
- return 0
+ return int16(d.Version)
}
func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
- return V0_11_0_0
+ switch d.Version {
+ case 1:
+ return V2_0_0_0
+ default:
+ return V0_11_0_0
+ }
}
diff --git a/vendor/github.com/Shopify/sarama/acl_describe_response.go b/vendor/github.com/Shopify/sarama/acl_describe_response.go
index 5bc9497..db3a88c 100644
--- a/vendor/github.com/Shopify/sarama/acl_describe_response.go
+++ b/vendor/github.com/Shopify/sarama/acl_describe_response.go
@@ -3,6 +3,7 @@
import "time"
type DescribeAclsResponse struct {
+ Version int16
ThrottleTime time.Duration
Err KError
ErrMsg *string
@@ -22,7 +23,7 @@
}
for _, resourceAcl := range d.ResourceAcls {
- if err := resourceAcl.encode(pe); err != nil {
+ if err := resourceAcl.encode(pe, d.Version); err != nil {
return err
}
}
@@ -72,9 +73,14 @@
}
func (d *DescribeAclsResponse) version() int16 {
- return 0
+ return int16(d.Version)
}
func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
- return V0_11_0_0
+ switch d.Version {
+ case 1:
+ return V2_0_0_0
+ default:
+ return V0_11_0_0
+ }
}
diff --git a/vendor/github.com/Shopify/sarama/acl_filter.go b/vendor/github.com/Shopify/sarama/acl_filter.go
index 9706354..fad5558 100644
--- a/vendor/github.com/Shopify/sarama/acl_filter.go
+++ b/vendor/github.com/Shopify/sarama/acl_filter.go
@@ -1,12 +1,14 @@
package sarama
type AclFilter struct {
- ResourceType AclResourceType
- ResourceName *string
- Principal *string
- Host *string
- Operation AclOperation
- PermissionType AclPermissionType
+ Version int
+ ResourceType AclResourceType
+ ResourceName *string
+ ResourcePatternTypeFilter AclResourcePatternType
+ Principal *string
+ Host *string
+ Operation AclOperation
+ PermissionType AclPermissionType
}
func (a *AclFilter) encode(pe packetEncoder) error {
@@ -14,6 +16,11 @@
if err := pe.putNullableString(a.ResourceName); err != nil {
return err
}
+
+ if a.Version == 1 {
+ pe.putInt8(int8(a.ResourcePatternTypeFilter))
+ }
+
if err := pe.putNullableString(a.Principal); err != nil {
return err
}
@@ -37,6 +44,16 @@
return err
}
+ if a.Version == 1 {
+ pattern, err := pd.getInt8()
+
+ if err != nil {
+ return err
+ }
+
+ a.ResourcePatternTypeFilter = AclResourcePatternType(pattern)
+ }
+
if a.Principal, err = pd.getNullableString(); err != nil {
return err
}
diff --git a/vendor/github.com/Shopify/sarama/acl_types.go b/vendor/github.com/Shopify/sarama/acl_types.go
index 19da6f2..72b7985 100644
--- a/vendor/github.com/Shopify/sarama/acl_types.go
+++ b/vendor/github.com/Shopify/sarama/acl_types.go
@@ -40,3 +40,15 @@
AclResourceCluster AclResourceType = 4
AclResourceTransactionalID AclResourceType = 5
)
+
+type AclResourcePatternType int
+
+// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
+
+const (
+ AclPatternUnknown AclResourcePatternType = iota
+ AclPatternAny
+ AclPatternMatch
+ AclPatternLiteral
+ AclPatternPrefixed
+)
diff --git a/vendor/github.com/Shopify/sarama/admin.go b/vendor/github.com/Shopify/sarama/admin.go
index 5272575..18b055a 100644
--- a/vendor/github.com/Shopify/sarama/admin.go
+++ b/vendor/github.com/Shopify/sarama/admin.go
@@ -1,6 +1,10 @@
package sarama
-import "errors"
+import (
+ "errors"
+ "math/rand"
+ "sync"
+)
// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
@@ -13,6 +17,12 @@
// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
+ // List the topics available in the cluster with the default options.
+ ListTopics() (map[string]TopicDetail, error)
+
+ // Describe some topics in the cluster
+ DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
+
// Delete a topic. It may take several seconds after the DeleteTopic to returns success
// and for all the brokers to become aware that the topics are gone.
// During this time, listTopics may continue to return information about the deleted topic.
@@ -65,6 +75,18 @@
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
+ // List the consumer groups available in the cluster.
+ ListConsumerGroups() (map[string]string, error)
+
+ // Describe the given consumer group
+ DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
+
+ // List the consumer group offsets available in the cluster.
+ ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
+
+ // Get information about the nodes in the cluster
+ DescribeCluster() (brokers []*Broker, controllerID int32, err error)
+
// Close shuts down the admin and closes underlying client.
Close() error
}
@@ -150,6 +172,127 @@
return nil
}
+func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
+ controller, err := ca.Controller()
+ if err != nil {
+ return nil, err
+ }
+
+ request := &MetadataRequest{
+ Topics: topics,
+ AllowAutoTopicCreation: false,
+ }
+
+ if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+ request.Version = 4
+ }
+
+ response, err := controller.GetMetadata(request)
+ if err != nil {
+ return nil, err
+ }
+ return response.Topics, nil
+}
+
+func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
+ controller, err := ca.Controller()
+ if err != nil {
+ return nil, int32(0), err
+ }
+
+ request := &MetadataRequest{
+ Topics: []string{},
+ }
+
+ response, err := controller.GetMetadata(request)
+ if err != nil {
+ return nil, int32(0), err
+ }
+
+ return response.Brokers, response.ControllerID, nil
+}
+
+func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
+ brokers := ca.client.Brokers()
+ if len(brokers) > 0 {
+ index := rand.Intn(len(brokers))
+ return brokers[index], nil
+ }
+ return nil, errors.New("no available broker")
+}
+
+func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
+ // In order to build TopicDetails we need to first get the list of all
+ // topics using a MetadataRequest and then get their configs using a
+ // DescribeConfigsRequest request. To avoid sending many requests to the
+ // broker, we use a single DescribeConfigsRequest.
+
+ // Send the all-topic MetadataRequest
+ b, err := ca.findAnyBroker()
+ if err != nil {
+ return nil, err
+ }
+ _ = b.Open(ca.client.Config())
+
+ metadataReq := &MetadataRequest{}
+ metadataResp, err := b.GetMetadata(metadataReq)
+ if err != nil {
+ return nil, err
+ }
+
+ topicsDetailsMap := make(map[string]TopicDetail)
+
+ var describeConfigsResources []*ConfigResource
+
+ for _, topic := range metadataResp.Topics {
+ topicDetails := TopicDetail{
+ NumPartitions: int32(len(topic.Partitions)),
+ }
+ if len(topic.Partitions) > 0 {
+ topicDetails.ReplicaAssignment = map[int32][]int32{}
+ for _, partition := range topic.Partitions {
+ topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
+ }
+ topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
+ }
+ topicsDetailsMap[topic.Name] = topicDetails
+
+ // we populate the resources we want to describe from the MetadataResponse
+ topicResource := ConfigResource{
+ Type: TopicResource,
+ Name: topic.Name,
+ }
+ describeConfigsResources = append(describeConfigsResources, &topicResource)
+ }
+
+ // Send the DescribeConfigsRequest
+ describeConfigsReq := &DescribeConfigsRequest{
+ Resources: describeConfigsResources,
+ }
+ describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, resource := range describeConfigsResp.Resources {
+ topicDetails := topicsDetailsMap[resource.Name]
+ topicDetails.ConfigEntries = make(map[string]*string)
+
+ for _, entry := range resource.Configs {
+ // only include non-default non-sensitive config
+ // (don't actually think topic config will ever be sensitive)
+ if entry.Default || entry.Sensitive {
+ continue
+ }
+ topicDetails.ConfigEntries[entry.Name] = &entry.Value
+ }
+
+ topicsDetailsMap[resource.Name] = topicDetails
+ }
+
+ return topicsDetailsMap, nil
+}
+
func (ca *clusterAdmin) DeleteTopic(topic string) error {
if topic == "" {
@@ -380,3 +523,92 @@
}
return mAcls, nil
}
+
+func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
+ groupsPerBroker := make(map[*Broker][]string)
+
+ for _, group := range groups {
+ controller, err := ca.client.Coordinator(group)
+ if err != nil {
+ return nil, err
+ }
+ groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
+
+ }
+
+ for broker, brokerGroups := range groupsPerBroker {
+ response, err := broker.DescribeGroups(&DescribeGroupsRequest{
+ Groups: brokerGroups,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ result = append(result, response.Groups...)
+ }
+ return result, nil
+}
+
+func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
+ allGroups = make(map[string]string)
+
+ // Query brokers in parallel, since we have to query *all* brokers
+ brokers := ca.client.Brokers()
+ groupMaps := make(chan map[string]string, len(brokers))
+ errors := make(chan error, len(brokers))
+ wg := sync.WaitGroup{}
+
+ for _, b := range brokers {
+ wg.Add(1)
+ go func(b *Broker, conf *Config) {
+ defer wg.Done()
+ _ = b.Open(conf) // Ensure that broker is opened
+
+ response, err := b.ListGroups(&ListGroupsRequest{})
+ if err != nil {
+ errors <- err
+ return
+ }
+
+ groups := make(map[string]string)
+ for group, typ := range response.Groups {
+ groups[group] = typ
+ }
+
+ groupMaps <- groups
+
+ }(b, ca.conf)
+ }
+
+ wg.Wait()
+ close(groupMaps)
+ close(errors)
+
+ for groupMap := range groupMaps {
+ for group, protocolType := range groupMap {
+ allGroups[group] = protocolType
+ }
+ }
+
+ // Intentionally return only the first error for simplicity
+ err = <-errors
+ return
+}
+
+func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
+ coordinator, err := ca.client.Coordinator(group)
+ if err != nil {
+ return nil, err
+ }
+
+ request := &OffsetFetchRequest{
+ ConsumerGroup: group,
+ partitions: topicPartitions,
+ }
+
+ if ca.conf.Version.IsAtLeast(V0_8_2_2) {
+ request.Version = 1
+ }
+
+ return coordinator.FetchOffset(request)
+}
diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
index 89a0c70..5174a35 100644
--- a/vendor/github.com/Shopify/sarama/async_producer.go
+++ b/vendor/github.com/Shopify/sarama/async_producer.go
@@ -483,6 +483,19 @@
return input
}
+func (pp *partitionProducer) backoff(retries int) {
+ var backoff time.Duration
+ if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
+ maxRetries := pp.parent.conf.Producer.Retry.Max
+ backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
+ } else {
+ backoff = pp.parent.conf.Producer.Retry.Backoff
+ }
+ if backoff > 0 {
+ time.Sleep(backoff)
+ }
+}
+
func (pp *partitionProducer) dispatch() {
// try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
// on the first message
@@ -493,11 +506,31 @@
pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
}
+ defer func() {
+ if pp.brokerProducer != nil {
+ pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
+ }
+ }()
+
for msg := range pp.input {
+
+ if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
+ select {
+ case <-pp.brokerProducer.abandoned:
+ // a message on the abandoned channel means that our current broker selection is out of date
+ Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
+ pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
+ pp.brokerProducer = nil
+ time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+ default:
+ // producer connection is still open.
+ }
+ }
+
if msg.retries > pp.highWatermark {
// a new, higher, retry level; handle it and then back off
pp.newHighWatermark(msg.retries)
- time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+ pp.backoff(msg.retries)
} else if pp.highWatermark > 0 {
// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
if msg.retries < pp.highWatermark {
@@ -525,7 +558,7 @@
if pp.brokerProducer == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnError(msg, err)
- time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
+ pp.backoff(msg.retries)
continue
}
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
@@ -533,10 +566,6 @@
pp.brokerProducer.input <- msg
}
-
- if pp.brokerProducer != nil {
- pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
- }
}
func (pp *partitionProducer) newHighWatermark(hwm int) {
@@ -637,6 +666,10 @@
close(responses)
})
+ if p.conf.Producer.Retry.Max <= 0 {
+ bp.abandoned = make(chan struct{})
+ }
+
return bp
}
@@ -655,6 +688,7 @@
input chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse
+ abandoned chan struct{}
buffer *produceSet
timer <-chan time.Time
@@ -829,9 +863,17 @@
// Retriable errors
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
- retryTopics = append(retryTopics, topic)
+ if bp.parent.conf.Producer.Retry.Max <= 0 {
+ bp.parent.abandonBrokerConnection(bp.broker)
+ bp.parent.returnErrors(pSet.msgs, block.Err)
+ } else {
+ retryTopics = append(retryTopics, topic)
+ }
// Other non-retriable errors
default:
+ if bp.parent.conf.Producer.Retry.Max <= 0 {
+ bp.parent.abandonBrokerConnection(bp.broker)
+ }
bp.parent.returnErrors(pSet.msgs, block.Err)
}
})
@@ -1048,5 +1090,10 @@
p.brokerLock.Lock()
defer p.brokerLock.Unlock()
+ bc, ok := p.brokers[broker]
+ if ok && bc.abandoned != nil {
+ close(bc.abandoned)
+ }
+
delete(p.brokers, broker)
}
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 6a33b80..9129089 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -6,7 +6,9 @@
"fmt"
"io"
"net"
+ "sort"
"strconv"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -46,6 +48,50 @@
brokerResponseSize metrics.Histogram
}
+// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
+type SASLMechanism string
+
+const (
+ // SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
+ SASLTypeOAuth = "OAUTHBEARER"
+ // SASLTypePlaintext represents the SASL/PLAIN mechanism
+ SASLTypePlaintext = "PLAIN"
+ // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
+ // server negotiate SASL auth using opaque packets.
+ SASLHandshakeV0 = int16(0)
+ // SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
+ // server negotiate SASL by wrapping tokens with Kafka protocol headers.
+ SASLHandshakeV1 = int16(1)
+ // SASLExtKeyAuth is the reserved extension key name sent as part of the
+ // SASL/OAUTHBEARER intial client response
+ SASLExtKeyAuth = "auth"
+)
+
+// AccessToken contains an access token used to authenticate a
+// SASL/OAUTHBEARER client along with associated metadata.
+type AccessToken struct {
+ // Token is the access token payload.
+ Token string
+ // Extensions is a optional map of arbitrary key-value pairs that can be
+ // sent with the SASL/OAUTHBEARER initial client response. These values are
+ // ignored by the SASL server if they are unexpected. This feature is only
+ // supported by Kafka >= 2.1.0.
+ Extensions map[string]string
+}
+
+// AccessTokenProvider is the interface that encapsulates how implementors
+// can generate access tokens for Kafka broker authentication.
+type AccessTokenProvider interface {
+ // Token returns an access token. The implementation should ensure token
+ // reuse so that multiple calls at connect time do not create multiple
+ // tokens. The implementation should also periodically refresh the token in
+ // order to guarantee that each call returns an unexpired token. This
+ // method should not block indefinitely--a timeout error should be returned
+ // after a short period of inactivity so that the broker connection logic
+ // can log debugging information and retry.
+ Token() (*AccessToken, error)
+}
+
type responsePromise struct {
requestTime time.Time
correlationID int32
@@ -125,7 +171,9 @@
}
if conf.Net.SASL.Enable {
- b.connErr = b.sendAndReceiveSASLPlainAuth()
+
+ b.connErr = b.authenticateViaSASL()
+
if b.connErr != nil {
err = b.conn.Close()
if err == nil {
@@ -744,8 +792,16 @@
close(b.done)
}
-func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
- rb := &SaslHandshakeRequest{"PLAIN"}
+func (b *Broker) authenticateViaSASL() error {
+ if b.conf.Net.SASL.Mechanism == SASLTypeOAuth {
+ return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
+ }
+ return b.sendAndReceiveSASLPlainAuth()
+}
+
+func (b *Broker) sendAndReceiveSASLHandshake(saslType string, version int16) error {
+ rb := &SaslHandshakeRequest{Mechanism: saslType, Version: version}
+
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
if err != nil {
@@ -814,7 +870,7 @@
// of responding to bad credentials but thats how its being done today.
func (b *Broker) sendAndReceiveSASLPlainAuth() error {
if b.conf.Net.SASL.Handshake {
- handshakeErr := b.sendAndReceiveSASLPlainHandshake()
+ handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, SASLHandshakeV0)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
@@ -853,6 +909,157 @@
return nil
}
+// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
+// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
+func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
+
+ if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
+ return err
+ }
+
+ token, err := provider.Token()
+
+ if err != nil {
+ return err
+ }
+
+ requestTime := time.Now()
+
+ correlationID := b.correlationID
+
+ bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
+
+ if err != nil {
+ return err
+ }
+
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
+
+ b.correlationID++
+
+ bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
+
+ if err != nil {
+ return err
+ }
+
+ requestLatency := time.Since(requestTime)
+ b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
+
+ return nil
+}
+
+// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
+// https://tools.ietf.org/html/rfc7628
+func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
+
+ var ext string
+
+ if token.Extensions != nil && len(token.Extensions) > 0 {
+ if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
+ return []byte{}, fmt.Errorf("The extension `%s` is invalid", SASLExtKeyAuth)
+ }
+ ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
+ }
+
+ resp := []byte(fmt.Sprintf("n,,\x01auth=Bearer %s%s\x01\x01", token.Token, ext))
+
+ return resp, nil
+}
+
+// mapToString returns a list of key-value pairs ordered by key.
+// keyValSep separates the key from the value. elemSep separates each pair.
+func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
+
+ buf := make([]string, 0, len(extensions))
+
+ for k, v := range extensions {
+ buf = append(buf, k+keyValSep+v)
+ }
+
+ sort.Strings(buf)
+
+ return strings.Join(buf, elemSep)
+}
+
+func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
+
+ initialResp, err := buildClientInitialResponse(token)
+
+ if err != nil {
+ return 0, err
+ }
+
+ rb := &SaslAuthenticateRequest{initialResp}
+
+ req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
+
+ buf, err := encode(req, b.conf.MetricRegistry)
+
+ if err != nil {
+ return 0, err
+ }
+
+ if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
+ return 0, err
+ }
+
+ return b.conn.Write(buf)
+}
+
+func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
+
+ buf := make([]byte, 8)
+
+ bytesRead, err := io.ReadFull(b.conn, buf)
+
+ if err != nil {
+ return bytesRead, err
+ }
+
+ header := responseHeader{}
+
+ err = decode(buf, &header)
+
+ if err != nil {
+ return bytesRead, err
+ }
+
+ if header.correlationID != correlationID {
+ return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
+ }
+
+ buf = make([]byte, header.length-4)
+
+ c, err := io.ReadFull(b.conn, buf)
+
+ bytesRead += c
+
+ if err != nil {
+ return bytesRead, err
+ }
+
+ res := &SaslAuthenticateResponse{}
+
+ if err := versionedDecode(buf, res, 0); err != nil {
+ return bytesRead, err
+ }
+
+ if err != nil {
+ return bytesRead, err
+ }
+
+ if res.Err != ErrNoError {
+ return bytesRead, res.Err
+ }
+
+ if len(res.SaslAuthBytes) > 0 {
+ Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
+ }
+
+ return bytesRead, nil
+}
+
func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
b.updateRequestLatencyMetrics(requestLatency)
b.responseRate.Mark(1)
diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go
index 79be5ce..0016f8f 100644
--- a/vendor/github.com/Shopify/sarama/client.go
+++ b/vendor/github.com/Shopify/sarama/client.go
@@ -710,8 +710,11 @@
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
retry := func(err error) error {
if attemptsRemaining > 0 {
+ backoff := client.computeBackoff(attemptsRemaining)
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
- time.Sleep(client.conf.Metadata.Retry.Backoff)
+ if backoff > 0 {
+ time.Sleep(backoff)
+ }
return client.tryRefreshMetadata(topics, attemptsRemaining-1)
}
return err
@@ -839,11 +842,22 @@
return client.brokers[client.controllerID]
}
+func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
+ if client.conf.Metadata.Retry.BackoffFunc != nil {
+ maxRetries := client.conf.Metadata.Retry.Max
+ retries := maxRetries - attemptsRemaining
+ return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
+ } else {
+ return client.conf.Metadata.Retry.Backoff
+ }
+}
+
func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
retry := func(err error) (*FindCoordinatorResponse, error) {
if attemptsRemaining > 0 {
- Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
- time.Sleep(client.conf.Metadata.Retry.Backoff)
+ backoff := client.computeBackoff(attemptsRemaining)
+ Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
+ time.Sleep(backoff)
return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1)
}
return nil, err
diff --git a/vendor/github.com/Shopify/sarama/config.go b/vendor/github.com/Shopify/sarama/config.go
index ed20522..9495b7f 100644
--- a/vendor/github.com/Shopify/sarama/config.go
+++ b/vendor/github.com/Shopify/sarama/config.go
@@ -54,6 +54,9 @@
// Whether or not to use SASL authentication when connecting to the broker
// (defaults to false).
Enable bool
+ // SASLMechanism is the name of the enabled SASL mechanism.
+ // Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
+ Mechanism SASLMechanism
// Whether or not to send the Kafka SASL handshake first if enabled
// (defaults to true). You should only set this to false if you're using
// a non-Kafka SASL proxy.
@@ -61,6 +64,11 @@
//username and password for SASL/PLAIN authentication
User string
Password string
+ // TokenProvider is a user-defined callback for generating
+ // access tokens for SASL/OAUTHBEARER auth. See the
+ // AccessTokenProvider interface docs for proper implementation
+ // guidelines.
+ TokenProvider AccessTokenProvider
}
// KeepAlive specifies the keep-alive period for an active network connection.
@@ -84,6 +92,10 @@
// How long to wait for leader election to occur before retrying
// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
Backoff time.Duration
+ // Called to compute backoff time dynamically. Useful for implementing
+ // more sophisticated backoff strategies. This takes precedence over
+ // `Backoff` if set.
+ BackoffFunc func(retries, maxRetries int) time.Duration
}
// How frequently to refresh the cluster metadata in the background.
// Defaults to 10 minutes. Set to 0 to disable. Similar to
@@ -171,6 +183,10 @@
// (default 100ms). Similar to the `retry.backoff.ms` setting of the
// JVM producer.
Backoff time.Duration
+ // Called to compute backoff time dynamically. Useful for implementing
+ // more sophisticated backoff strategies. This takes precedence over
+ // `Backoff` if set.
+ BackoffFunc func(retries, maxRetries int) time.Duration
}
}
@@ -229,6 +245,10 @@
// How long to wait after a failing to read from a partition before
// trying again (default 2s).
Backoff time.Duration
+ // Called to compute backoff time dynamically. Useful for implementing
+ // more sophisticated backoff strategies. This takes precedence over
+ // `Backoff` if set.
+ BackoffFunc func(retries int) time.Duration
}
// Fetch is the namespace for controlling how many bytes are retrieved by any
@@ -454,10 +474,25 @@
return ConfigurationError("Net.WriteTimeout must be > 0")
case c.Net.KeepAlive < 0:
return ConfigurationError("Net.KeepAlive must be >= 0")
- case c.Net.SASL.Enable == true && c.Net.SASL.User == "":
- return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
- case c.Net.SASL.Enable == true && c.Net.SASL.Password == "":
- return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
+ case c.Net.SASL.Enable:
+ // For backwards compatibility, empty mechanism value defaults to PLAIN
+ isSASLPlain := len(c.Net.SASL.Mechanism) == 0 || c.Net.SASL.Mechanism == SASLTypePlaintext
+ if isSASLPlain {
+ if c.Net.SASL.User == "" {
+ return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
+ }
+ if c.Net.SASL.Password == "" {
+ return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
+ }
+ } else if c.Net.SASL.Mechanism == SASLTypeOAuth {
+ if c.Net.SASL.TokenProvider == nil {
+ return ConfigurationError("An AccessTokenProvider instance must be provided to Net.SASL.User.TokenProvider")
+ }
+ } else {
+ msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s` and `%s`",
+ SASLTypeOAuth, SASLTypePlaintext)
+ return ConfigurationError(msg)
+ }
}
// validate the Admin values
diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go
index 33d9d14..ce72ff1 100644
--- a/vendor/github.com/Shopify/sarama/consumer.go
+++ b/vendor/github.com/Shopify/sarama/consumer.go
@@ -314,6 +314,8 @@
fetchSize int32
offset int64
+
+ retries int32
}
var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
@@ -332,12 +334,21 @@
}
}
+func (child *partitionConsumer) computeBackoff() time.Duration {
+ if child.conf.Consumer.Retry.BackoffFunc != nil {
+ retries := atomic.AddInt32(&child.retries, 1)
+ return child.conf.Consumer.Retry.BackoffFunc(int(retries))
+ } else {
+ return child.conf.Consumer.Retry.Backoff
+ }
+}
+
func (child *partitionConsumer) dispatcher() {
for range child.trigger {
select {
case <-child.dying:
close(child.trigger)
- case <-time.After(child.conf.Consumer.Retry.Backoff):
+ case <-time.After(child.computeBackoff()):
if child.broker != nil {
child.consumer.unrefBrokerConsumer(child.broker)
child.broker = nil
@@ -451,6 +462,10 @@
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)
+ if child.responseResult == nil {
+ atomic.StoreInt32(&child.retries, 0)
+ }
+
for i, msg := range msgs {
messageSelect:
select {
@@ -487,9 +502,13 @@
for _, msgBlock := range msgSet.Messages {
for _, msg := range msgBlock.Messages() {
offset := msg.Offset
+ timestamp := msg.Msg.Timestamp
if msg.Msg.Version >= 1 {
baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
offset += baseOffset
+ if msg.Msg.LogAppendTime {
+ timestamp = msgBlock.Msg.Timestamp
+ }
}
if offset < child.offset {
continue
@@ -500,14 +519,14 @@
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: offset,
- Timestamp: msg.Msg.Timestamp,
+ Timestamp: timestamp,
BlockTimestamp: msgBlock.Msg.Timestamp,
})
child.offset = offset + 1
}
}
if len(messages) == 0 {
- return nil, ErrIncompleteResponse
+ child.offset++
}
return messages, nil
}
@@ -519,19 +538,23 @@
if offset < child.offset {
continue
}
+ timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
+ if batch.LogAppendTime {
+ timestamp = batch.MaxTimestamp
+ }
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: rec.Key,
Value: rec.Value,
Offset: offset,
- Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
+ Timestamp: timestamp,
Headers: rec.Headers,
})
child.offset = offset + 1
}
if len(messages) == 0 {
- child.offset += 1
+ child.offset++
}
return messages, nil
}
@@ -787,6 +810,9 @@
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
}
+ if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
+ request.Version = 1
+ }
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}
diff --git a/vendor/github.com/Shopify/sarama/consumer_group.go b/vendor/github.com/Shopify/sarama/consumer_group.go
index bb6a2c2..8c8babc 100644
--- a/vendor/github.com/Shopify/sarama/consumer_group.go
+++ b/vendor/github.com/Shopify/sarama/consumer_group.go
@@ -657,6 +657,12 @@
resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
if err != nil {
_ = coordinator.Close()
+
+ if retries <= 0 {
+ s.parent.handleError(err, "", -1)
+ return
+ }
+
retries--
continue
}
diff --git a/vendor/github.com/Shopify/sarama/describe_configs_request.go b/vendor/github.com/Shopify/sarama/describe_configs_request.go
index 416a4fe..ccb587b 100644
--- a/vendor/github.com/Shopify/sarama/describe_configs_request.go
+++ b/vendor/github.com/Shopify/sarama/describe_configs_request.go
@@ -103,7 +103,7 @@
func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
- return V1_0_0_0
+ return V1_1_0_0
case 2:
return V2_0_0_0
default:
diff --git a/vendor/github.com/Shopify/sarama/errors.go b/vendor/github.com/Shopify/sarama/errors.go
index c11421d..87a4c61 100644
--- a/vendor/github.com/Shopify/sarama/errors.go
+++ b/vendor/github.com/Shopify/sarama/errors.go
@@ -157,6 +157,10 @@
ErrFetchSessionIDNotFound KError = 70
ErrInvalidFetchSessionEpoch KError = 71
ErrListenerNotFound KError = 72
+ ErrTopicDeletionDisabled KError = 73
+ ErrFencedLeaderEpoch KError = 74
+ ErrUnknownLeaderEpoch KError = 75
+ ErrUnsupportedCompressionType KError = 76
)
func (err KError) Error() string {
@@ -311,6 +315,14 @@
return "kafka server: The fetch session epoch is invalid."
case ErrListenerNotFound:
return "kafka server: There is no listener on the leader broker that matches the listener on which metadata request was processed."
+ case ErrTopicDeletionDisabled:
+ return "kafka server: Topic deletion is disabled."
+ case ErrFencedLeaderEpoch:
+ return "kafka server: The leader epoch in the request is older than the epoch on the broker."
+ case ErrUnknownLeaderEpoch:
+ return "kafka server: The leader epoch in the request is newer than the epoch on the broker."
+ case ErrUnsupportedCompressionType:
+ return "kafka server: The requesting client does not support the compression type of given partition."
}
return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
diff --git a/vendor/github.com/Shopify/sarama/fetch_response.go b/vendor/github.com/Shopify/sarama/fetch_response.go
index 90acfc2..9df99c1 100644
--- a/vendor/github.com/Shopify/sarama/fetch_response.go
+++ b/vendor/github.com/Shopify/sarama/fetch_response.go
@@ -186,9 +186,11 @@
}
type FetchResponse struct {
- Blocks map[string]map[int32]*FetchResponseBlock
- ThrottleTime time.Duration
- Version int16 // v1 requires 0.9+, v2 requires 0.10+
+ Blocks map[string]map[int32]*FetchResponseBlock
+ ThrottleTime time.Duration
+ Version int16 // v1 requires 0.9+, v2 requires 0.10+
+ LogAppendTime bool
+ Timestamp time.Time
}
func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -355,10 +357,13 @@
return kb, vb
}
-func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
+func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
frb := r.getOrCreateBlock(topic, partition)
kb, vb := encodeKV(key, value)
- msg := &Message{Key: kb, Value: vb}
+ if r.LogAppendTime {
+ timestamp = r.Timestamp
+ }
+ msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
if len(frb.RecordsSet) == 0 {
records := newLegacyRecords(&MessageSet{})
@@ -368,18 +373,26 @@
set.Messages = append(set.Messages, msgBlock)
}
-func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
+func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
frb := r.getOrCreateBlock(topic, partition)
kb, vb := encodeKV(key, value)
- rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
if len(frb.RecordsSet) == 0 {
- records := newDefaultRecords(&RecordBatch{Version: 2})
+ records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
frb.RecordsSet = []*Records{&records}
}
batch := frb.RecordsSet[0].RecordBatch
+ rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
batch.addRecord(rec)
}
+func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
+ r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
+}
+
+func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
+ r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
+}
+
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
frb := r.getOrCreateBlock(topic, partition)
if len(frb.RecordsSet) == 0 {
diff --git a/vendor/github.com/Shopify/sarama/message.go b/vendor/github.com/Shopify/sarama/message.go
index 51d3309..f64c79b 100644
--- a/vendor/github.com/Shopify/sarama/message.go
+++ b/vendor/github.com/Shopify/sarama/message.go
@@ -5,12 +5,15 @@
"time"
)
-// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
-type CompressionCodec int8
-
// The lowest 3 bits contain the compression codec used for the message
const compressionCodecMask int8 = 0x07
+// Bit 3 set for "LogAppend" timestamps
+const timestampTypeMask = 0x08
+
+// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
+type CompressionCodec int8
+
const (
CompressionNone CompressionCodec = 0
CompressionGZIP CompressionCodec = 1
@@ -36,6 +39,7 @@
type Message struct {
Codec CompressionCodec // codec used to compress the message contents
CompressionLevel int // compression level
+ LogAppendTime bool // the used timestamp is LogAppendTime
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
@@ -52,6 +56,9 @@
pe.putInt8(m.Version)
attributes := int8(m.Codec) & compressionCodecMask
+ if m.LogAppendTime {
+ attributes |= timestampTypeMask
+ }
pe.putInt8(attributes)
if m.Version >= 1 {
@@ -108,6 +115,7 @@
return err
}
m.Codec = CompressionCodec(attribute & compressionCodecMask)
+ m.LogAppendTime = attribute×tampTypeMask == timestampTypeMask
if m.Version == 1 {
if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
@@ -149,7 +157,7 @@
return pd.pop()
}
-// decodes a message set from a previousy encoded bulk-message
+// decodes a message set from a previously encoded bulk-message
func (m *Message) decodeSet() (err error) {
pd := realDecoder{raw: m.Value}
m.Set = &MessageSet{}
diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go
index fe55200..348c223 100644
--- a/vendor/github.com/Shopify/sarama/mockresponses.go
+++ b/vendor/github.com/Shopify/sarama/mockresponses.go
@@ -66,6 +66,69 @@
return res
}
+type MockListGroupsResponse struct {
+ groups map[string]string
+ t TestReporter
+}
+
+func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
+ return &MockListGroupsResponse{
+ groups: make(map[string]string),
+ t: t,
+ }
+}
+
+func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
+ request := reqBody.(*ListGroupsRequest)
+ _ = request
+ response := &ListGroupsResponse{
+ Groups: m.groups,
+ }
+ return response
+}
+
+func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
+ m.groups[groupID] = protocolType
+ return m
+}
+
+type MockDescribeGroupsResponse struct {
+ groups map[string]*GroupDescription
+ t TestReporter
+}
+
+func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
+ return &MockDescribeGroupsResponse{
+ t: t,
+ groups: make(map[string]*GroupDescription),
+ }
+}
+
+func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
+ m.groups[groupID] = description
+ return m
+}
+
+func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
+ request := reqBody.(*DescribeGroupsRequest)
+
+ response := &DescribeGroupsResponse{}
+ for _, requestedGroup := range request.Groups {
+ if group, ok := m.groups[requestedGroup]; ok {
+ response.Groups = append(response.Groups, group)
+ } else {
+ // Mimic real kafka - if a group doesn't exist, return
+ // an entry with state "Dead"
+ response.Groups = append(response.Groups, &GroupDescription{
+ GroupId: requestedGroup,
+ State: "Dead",
+ })
+ }
+ }
+
+ return response
+}
+
// MockMetadataResponse is a `MetadataResponse` builder.
type MockMetadataResponse struct {
controllerID int32
@@ -111,17 +174,25 @@
for addr, brokerID := range mmr.brokers {
metadataResponse.AddBroker(addr, brokerID)
}
+
+ // Generate set of replicas
+ replicas := []int32{}
+
+ for _, brokerID := range mmr.brokers {
+ replicas = append(replicas, brokerID)
+ }
+
if len(metadataRequest.Topics) == 0 {
for topic, partitions := range mmr.leaders {
for partition, brokerID := range partitions {
- metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
+ metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
}
}
return metadataResponse
}
for _, topic := range metadataRequest.Topics {
for partition, brokerID := range mmr.leaders[topic] {
- metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
+ metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
}
}
return metadataResponse
@@ -631,16 +702,32 @@
req := reqBody.(*DescribeConfigsRequest)
res := &DescribeConfigsResponse{}
- var configEntries []*ConfigEntry
- configEntries = append(configEntries, &ConfigEntry{Name: "my_topic",
- Value: "my_topic",
- ReadOnly: true,
- Default: true,
- Sensitive: false,
- })
-
for _, r := range req.Resources {
- res.Resources = append(res.Resources, &ResourceResponse{Name: r.Name, Configs: configEntries})
+ var configEntries []*ConfigEntry
+ switch r.Type {
+ case TopicResource:
+ configEntries = append(configEntries,
+ &ConfigEntry{Name: "max.message.bytes",
+ Value: "1000000",
+ ReadOnly: false,
+ Default: true,
+ Sensitive: false,
+ }, &ConfigEntry{Name: "retention.ms",
+ Value: "5000",
+ ReadOnly: false,
+ Default: false,
+ Sensitive: false,
+ }, &ConfigEntry{Name: "password",
+ Value: "12345",
+ ReadOnly: false,
+ Default: false,
+ Sensitive: true,
+ })
+ res.Resources = append(res.Resources, &ResourceResponse{
+ Name: r.Name,
+ Configs: configEntries,
+ })
+ }
}
return res
}
@@ -706,10 +793,64 @@
return res
}
+type MockSaslAuthenticateResponse struct {
+ t TestReporter
+ kerror KError
+ saslAuthBytes []byte
+}
+
+func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
+ return &MockSaslAuthenticateResponse{t: t}
+}
+
+func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
+ res := &SaslAuthenticateResponse{}
+ res.Err = msar.kerror
+ res.SaslAuthBytes = msar.saslAuthBytes
+ return res
+}
+
+func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
+ msar.kerror = kerror
+ return msar
+}
+
+func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
+ msar.saslAuthBytes = saslAuthBytes
+ return msar
+}
+
type MockDeleteAclsResponse struct {
t TestReporter
}
+type MockSaslHandshakeResponse struct {
+ enabledMechanisms []string
+ kerror KError
+ t TestReporter
+}
+
+func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
+ return &MockSaslHandshakeResponse{t: t}
+}
+
+func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
+ res := &SaslHandshakeResponse{}
+ res.Err = mshr.kerror
+ res.EnabledMechanisms = mshr.enabledMechanisms
+ return res
+}
+
+func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
+ mshr.kerror = kerror
+ return mshr
+}
+
+func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
+ mshr.enabledMechanisms = enabledMechanisms
+ return mshr
+}
+
func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
return &MockDeleteAclsResponse{t: t}
}
diff --git a/vendor/github.com/Shopify/sarama/offset_manager.go b/vendor/github.com/Shopify/sarama/offset_manager.go
index 8ea857f..2432f7b 100644
--- a/vendor/github.com/Shopify/sarama/offset_manager.go
+++ b/vendor/github.com/Shopify/sarama/offset_manager.go
@@ -120,6 +120,14 @@
return nil
}
+func (om *offsetManager) computeBackoff(retries int) time.Duration {
+ if om.conf.Metadata.Retry.BackoffFunc != nil {
+ return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
+ } else {
+ return om.conf.Metadata.Retry.Backoff
+ }
+}
+
func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
broker, err := om.coordinator()
if err != nil {
@@ -161,10 +169,11 @@
if retries <= 0 {
return 0, "", block.Err
}
+ backoff := om.computeBackoff(retries)
select {
case <-om.closing:
return 0, "", block.Err
- case <-time.After(om.conf.Metadata.Retry.Backoff):
+ case <-time.After(backoff):
}
return om.fetchInitialOffset(topic, partition, retries-1)
default:
diff --git a/vendor/github.com/Shopify/sarama/record_batch.go b/vendor/github.com/Shopify/sarama/record_batch.go
index e0f183f..a36f7e6 100644
--- a/vendor/github.com/Shopify/sarama/record_batch.go
+++ b/vendor/github.com/Shopify/sarama/record_batch.go
@@ -36,6 +36,7 @@
Codec CompressionCodec
CompressionLevel int
Control bool
+ LogAppendTime bool
LastOffsetDelta int32
FirstTimestamp time.Time
MaxTimestamp time.Time
@@ -120,6 +121,7 @@
}
b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
b.Control = attributes&controlMask == controlMask
+ b.LogAppendTime = attributes×tampTypeMask == timestampTypeMask
if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
return err
@@ -200,6 +202,9 @@
if b.Control {
attr |= controlMask
}
+ if b.LogAppendTime {
+ attr |= timestampTypeMask
+ }
return attr
}
diff --git a/vendor/github.com/Shopify/sarama/request.go b/vendor/github.com/Shopify/sarama/request.go
index 4d211a1..f7eea59 100644
--- a/vendor/github.com/Shopify/sarama/request.go
+++ b/vendor/github.com/Shopify/sarama/request.go
@@ -140,6 +140,8 @@
return &DescribeConfigsRequest{}
case 33:
return &AlterConfigsRequest{}
+ case 36:
+ return &SaslAuthenticateRequest{}
case 37:
return &CreatePartitionsRequest{}
case 42:
diff --git a/vendor/github.com/Shopify/sarama/sasl_handshake_request.go b/vendor/github.com/Shopify/sarama/sasl_handshake_request.go
index fbbc894..fe5ba05 100644
--- a/vendor/github.com/Shopify/sarama/sasl_handshake_request.go
+++ b/vendor/github.com/Shopify/sarama/sasl_handshake_request.go
@@ -2,6 +2,7 @@
type SaslHandshakeRequest struct {
Mechanism string
+ Version int16
}
func (r *SaslHandshakeRequest) encode(pe packetEncoder) error {
@@ -25,7 +26,7 @@
}
func (r *SaslHandshakeRequest) version() int16 {
- return 0
+ return r.Version
}
func (r *SaslHandshakeRequest) requiredVersion() KafkaVersion {