VOL-2112 move to voltha-lib-go
Change-Id: I3435b8acb982deeab6b6ac28e798d7722ad01d0a
diff --git a/vendor/github.com/Shopify/sarama/.travis.yml b/vendor/github.com/Shopify/sarama/.travis.yml
index b7f7874..4331fa1 100644
--- a/vendor/github.com/Shopify/sarama/.travis.yml
+++ b/vendor/github.com/Shopify/sarama/.travis.yml
@@ -1,7 +1,8 @@
+dist: xenial
language: go
go:
-- 1.10.x
- 1.11.x
+- 1.12.x
env:
global:
@@ -11,15 +12,16 @@
- KAFKA_HOSTNAME=localhost
- DEBUG=true
matrix:
- - KAFKA_VERSION=1.1.1 KAFKA_SCALA_VERSION=2.11
- - KAFKA_VERSION=2.0.1 KAFKA_SCALA_VERSION=2.12
- - KAFKA_VERSION=2.1.0 KAFKA_SCALA_VERSION=2.12
+ - KAFKA_VERSION=2.1.1 KAFKA_SCALA_VERSION=2.12
+ - KAFKA_VERSION=2.2.1 KAFKA_SCALA_VERSION=2.12
+ - KAFKA_VERSION=2.3.0 KAFKA_SCALA_VERSION=2.12
before_install:
- export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
- vagrant/install_cluster.sh
- vagrant/boot_cluster.sh
- vagrant/create_topics.sh
+- vagrant/run_java_producer.sh
install: make install_dependencies
@@ -27,7 +29,7 @@
- make test
- make vet
- make errcheck
-- if [[ "$TRAVIS_GO_VERSION" == 1.11* ]]; then make fmt; fi
+- if [[ "$TRAVIS_GO_VERSION" == 1.12* ]]; then make fmt; fi
after_success:
- bash <(curl -s https://codecov.io/bash)
diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md
index 0e69c2d..02bd0ff 100644
--- a/vendor/github.com/Shopify/sarama/CHANGELOG.md
+++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md
@@ -1,5 +1,109 @@
# Changelog
+#### Version 1.23.1 (2019-07-22)
+
+Bug Fixes:
+- Fix fetch delete bug record
+ ([1425](https://github.com/Shopify/sarama/pull/1425)).
+- Handle SASL/OAUTHBEARER token rejection
+ ([1428](https://github.com/Shopify/sarama/pull/1428)).
+
+#### Version 1.23.0 (2019-07-02)
+
+New Features:
+- Add support for Kafka 2.3.0
+ ([1418](https://github.com/Shopify/sarama/pull/1418)).
+- Add support for ListConsumerGroupOffsets v2
+ ([1374](https://github.com/Shopify/sarama/pull/1374)).
+- Add support for DeleteConsumerGroup
+ ([1417](https://github.com/Shopify/sarama/pull/1417)).
+- Add support for SASLVersion configuration
+ ([1410](https://github.com/Shopify/sarama/pull/1410)).
+- Add kerberos support
+ ([1366](https://github.com/Shopify/sarama/pull/1366)).
+
+Improvements:
+- Improve sasl_scram_client example
+ ([1406](https://github.com/Shopify/sarama/pull/1406)).
+- Fix shutdown and race-condition in consumer-group example
+ ([1404](https://github.com/Shopify/sarama/pull/1404)).
+- Add support for error codes 77—81
+ ([1397](https://github.com/Shopify/sarama/pull/1397)).
+- Pool internal objects allocated per message
+ ([1385](https://github.com/Shopify/sarama/pull/1385)).
+- Reduce packet decoder allocations
+ ([1373](https://github.com/Shopify/sarama/pull/1373)).
+- Support timeout when fetching metadata
+ ([1359](https://github.com/Shopify/sarama/pull/1359)).
+
+Bug Fixes:
+- Fix fetch size integer overflow
+ ([1376](https://github.com/Shopify/sarama/pull/1376)).
+- Handle and log throttled FetchResponses
+ ([1383](https://github.com/Shopify/sarama/pull/1383)).
+- Refactor misspelled word Resouce to Resource
+ ([1368](https://github.com/Shopify/sarama/pull/1368)).
+
+#### Version 1.22.1 (2019-04-29)
+
+Improvements:
+- Use zstd 1.3.8
+ ([1350](https://github.com/Shopify/sarama/pull/1350)).
+- Add support for SaslHandshakeRequest v1
+ ([1354](https://github.com/Shopify/sarama/pull/1354)).
+
+Bug Fixes:
+- Fix V5 MetadataRequest nullable topics array
+ ([1353](https://github.com/Shopify/sarama/pull/1353)).
+- Use a different SCRAM client for each broker connection
+ ([1349](https://github.com/Shopify/sarama/pull/1349)).
+- Fix AllowAutoTopicCreation for MetadataRequest greater than v3
+ ([1344](https://github.com/Shopify/sarama/pull/1344)).
+
+#### Version 1.22.0 (2019-04-09)
+
+New Features:
+- Add Offline Replicas Operation to Client
+ ([1318](https://github.com/Shopify/sarama/pull/1318)).
+- Allow using proxy when connecting to broker
+ ([1326](https://github.com/Shopify/sarama/pull/1326)).
+- Implement ReadCommitted
+ ([1307](https://github.com/Shopify/sarama/pull/1307)).
+- Add support for Kafka 2.2.0
+ ([1331](https://github.com/Shopify/sarama/pull/1331)).
+- Add SASL SCRAM-SHA-512 and SCRAM-SHA-256 mechanismes
+ ([1331](https://github.com/Shopify/sarama/pull/1295)).
+
+Improvements:
+- Unregister all broker metrics on broker stop
+ ([1232](https://github.com/Shopify/sarama/pull/1232)).
+- Add SCRAM authentication example
+ ([1303](https://github.com/Shopify/sarama/pull/1303)).
+- Add consumergroup examples
+ ([1304](https://github.com/Shopify/sarama/pull/1304)).
+- Expose consumer batch size metric
+ ([1296](https://github.com/Shopify/sarama/pull/1296)).
+- Add TLS options to console producer and consumer
+ ([1300](https://github.com/Shopify/sarama/pull/1300)).
+- Reduce client close bookkeeping
+ ([1297](https://github.com/Shopify/sarama/pull/1297)).
+- Satisfy error interface in create responses
+ ([1154](https://github.com/Shopify/sarama/pull/1154)).
+- Please lint gods
+ ([1346](https://github.com/Shopify/sarama/pull/1346)).
+
+Bug Fixes:
+- Fix multi consumer group instance crash
+ ([1338](https://github.com/Shopify/sarama/pull/1338)).
+- Update lz4 to latest version
+ ([1347](https://github.com/Shopify/sarama/pull/1347)).
+- Retry ErrNotCoordinatorForConsumer in new consumergroup session
+ ([1231](https://github.com/Shopify/sarama/pull/1231)).
+- Fix cleanup error handler
+ ([1332](https://github.com/Shopify/sarama/pull/1332)).
+- Fix rate condition in PartitionConsumer
+ ([1156](https://github.com/Shopify/sarama/pull/1156)).
+
#### Version 1.21.0 (2019-02-24)
New Features:
diff --git a/vendor/github.com/Shopify/sarama/Makefile b/vendor/github.com/Shopify/sarama/Makefile
index 09f743c..360b220 100644
--- a/vendor/github.com/Shopify/sarama/Makefile
+++ b/vendor/github.com/Shopify/sarama/Makefile
@@ -1,11 +1,12 @@
export GO111MODULE=on
-default: fmt vet errcheck test
+default: fmt vet errcheck test lint
# Taken from https://github.com/codecov/example-go#caveat-multiple-files
+.PHONY: test
test:
echo "" > coverage.txt
- for d in `go list ./... | grep -v vendor`; do \
+ for d in `go list ./...`; do \
go test -p 1 -v -timeout 240s -race -coverprofile=profile.out -covermode=atomic $$d || exit 1; \
if [ -f profile.out ]; then \
cat profile.out >> coverage.txt; \
@@ -13,20 +14,39 @@
fi \
done
+GOLINT := $(shell command -v golint)
+
+.PHONY: lint
+lint:
+ifndef GOLINT
+ go get golang.org/x/lint/golint
+endif
+ go list ./... | xargs golint
+
+.PHONY: vet
vet:
go vet ./...
+ERRCHECK := $(shell command -v errcheck)
# See https://github.com/kisielk/errcheck/pull/141 for details on ignorepkg
+.PHONY: errcheck
errcheck:
+ifndef ERRCHECK
+ go get github.com/kisielk/errcheck
+endif
errcheck -ignorepkg fmt github.com/Shopify/sarama/...
+.PHONY: fmt
fmt:
@if [ -n "$$(go fmt ./...)" ]; then echo 'Please run go fmt on your code.' && exit 1; fi
-install_dependencies: install_errcheck get
+.PHONY : install_dependencies
+install_dependencies: get
-install_errcheck:
- go get github.com/kisielk/errcheck
-
+.PHONY: get
get:
- go get -t
+ go get -t -v ./...
+
+.PHONY: clean
+clean:
+ go clean ./...
diff --git a/vendor/github.com/Shopify/sarama/README.md b/vendor/github.com/Shopify/sarama/README.md
index f241b89..4cd736b 100644
--- a/vendor/github.com/Shopify/sarama/README.md
+++ b/vendor/github.com/Shopify/sarama/README.md
@@ -21,7 +21,7 @@
Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
the two latest stable releases of Kafka and Go, and we provide a two month
grace period for older releases. This means we currently officially support
-Go 1.8 through 1.11, and Kafka 1.0 through 2.0, although older releases are
+Go 1.11 through 1.12, and Kafka 2.0 through 2.3, although older releases are
still likely to work.
Sarama follows semantic versioning and provides API stability via the gopkg.in service.
diff --git a/vendor/github.com/Shopify/sarama/acl_bindings.go b/vendor/github.com/Shopify/sarama/acl_bindings.go
index b91c282..50b689d 100644
--- a/vendor/github.com/Shopify/sarama/acl_bindings.go
+++ b/vendor/github.com/Shopify/sarama/acl_bindings.go
@@ -1,9 +1,10 @@
package sarama
+//Resource holds information about acl resource type
type Resource struct {
- ResourceType AclResourceType
- ResourceName string
- ResoucePatternType AclResourcePatternType
+ ResourceType AclResourceType
+ ResourceName string
+ ResourcePatternType AclResourcePatternType
}
func (r *Resource) encode(pe packetEncoder, version int16) error {
@@ -14,11 +15,11 @@
}
if version == 1 {
- if r.ResoucePatternType == AclPatternUnknown {
+ if r.ResourcePatternType == AclPatternUnknown {
Logger.Print("Cannot encode an unknown resource pattern type, using Literal instead")
- r.ResoucePatternType = AclPatternLiteral
+ r.ResourcePatternType = AclPatternLiteral
}
- pe.putInt8(int8(r.ResoucePatternType))
+ pe.putInt8(int8(r.ResourcePatternType))
}
return nil
@@ -39,12 +40,13 @@
if err != nil {
return err
}
- r.ResoucePatternType = AclResourcePatternType(pattern)
+ r.ResourcePatternType = AclResourcePatternType(pattern)
}
return nil
}
+//Acl holds information about acl type
type Acl struct {
Principal string
Host string
@@ -91,6 +93,7 @@
return nil
}
+//ResourceAcls is an acl resource type
type ResourceAcls struct {
Resource
Acls []*Acl
diff --git a/vendor/github.com/Shopify/sarama/acl_create_request.go b/vendor/github.com/Shopify/sarama/acl_create_request.go
index d3d5ad8..da1cdef 100644
--- a/vendor/github.com/Shopify/sarama/acl_create_request.go
+++ b/vendor/github.com/Shopify/sarama/acl_create_request.go
@@ -1,5 +1,6 @@
package sarama
+//CreateAclsRequest is an acl creation request
type CreateAclsRequest struct {
Version int16
AclCreations []*AclCreation
@@ -38,16 +39,16 @@
return nil
}
-func (d *CreateAclsRequest) key() int16 {
+func (c *CreateAclsRequest) key() int16 {
return 30
}
-func (d *CreateAclsRequest) version() int16 {
- return d.Version
+func (c *CreateAclsRequest) version() int16 {
+ return c.Version
}
-func (d *CreateAclsRequest) requiredVersion() KafkaVersion {
- switch d.Version {
+func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
+ switch c.Version {
case 1:
return V2_0_0_0
default:
@@ -55,6 +56,7 @@
}
}
+//AclCreation is a wrapper around Resource and Acl type
type AclCreation struct {
Resource
Acl
diff --git a/vendor/github.com/Shopify/sarama/acl_create_response.go b/vendor/github.com/Shopify/sarama/acl_create_response.go
index 8a56f35..f5a5e9a 100644
--- a/vendor/github.com/Shopify/sarama/acl_create_response.go
+++ b/vendor/github.com/Shopify/sarama/acl_create_response.go
@@ -2,6 +2,7 @@
import "time"
+//CreateAclsResponse is a an acl reponse creation type
type CreateAclsResponse struct {
ThrottleTime time.Duration
AclCreationResponses []*AclCreationResponse
@@ -46,18 +47,19 @@
return nil
}
-func (d *CreateAclsResponse) key() int16 {
+func (c *CreateAclsResponse) key() int16 {
return 30
}
-func (d *CreateAclsResponse) version() int16 {
+func (c *CreateAclsResponse) version() int16 {
return 0
}
-func (d *CreateAclsResponse) requiredVersion() KafkaVersion {
+func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
+//AclCreationResponse is an acl creation response type
type AclCreationResponse struct {
Err KError
ErrMsg *string
diff --git a/vendor/github.com/Shopify/sarama/acl_delete_request.go b/vendor/github.com/Shopify/sarama/acl_delete_request.go
index 5e94ad7..15908ea 100644
--- a/vendor/github.com/Shopify/sarama/acl_delete_request.go
+++ b/vendor/github.com/Shopify/sarama/acl_delete_request.go
@@ -1,5 +1,6 @@
package sarama
+//DeleteAclsRequest is a delete acl request
type DeleteAclsRequest struct {
Version int
Filters []*AclFilter
diff --git a/vendor/github.com/Shopify/sarama/acl_delete_response.go b/vendor/github.com/Shopify/sarama/acl_delete_response.go
index a885fe5..6529565 100644
--- a/vendor/github.com/Shopify/sarama/acl_delete_response.go
+++ b/vendor/github.com/Shopify/sarama/acl_delete_response.go
@@ -2,21 +2,22 @@
import "time"
+//DeleteAclsResponse is a delete acl response
type DeleteAclsResponse struct {
Version int16
ThrottleTime time.Duration
FilterResponses []*FilterResponse
}
-func (a *DeleteAclsResponse) encode(pe packetEncoder) error {
- pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
+func (d *DeleteAclsResponse) encode(pe packetEncoder) error {
+ pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
- if err := pe.putArrayLength(len(a.FilterResponses)); err != nil {
+ if err := pe.putArrayLength(len(d.FilterResponses)); err != nil {
return err
}
- for _, filterResponse := range a.FilterResponses {
- if err := filterResponse.encode(pe, a.Version); err != nil {
+ for _, filterResponse := range d.FilterResponses {
+ if err := filterResponse.encode(pe, d.Version); err != nil {
return err
}
}
@@ -24,22 +25,22 @@
return nil
}
-func (a *DeleteAclsResponse) decode(pd packetDecoder, version int16) (err error) {
+func (d *DeleteAclsResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
- a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+ d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
n, err := pd.getArrayLength()
if err != nil {
return err
}
- a.FilterResponses = make([]*FilterResponse, n)
+ d.FilterResponses = make([]*FilterResponse, n)
for i := 0; i < n; i++ {
- a.FilterResponses[i] = new(FilterResponse)
- if err := a.FilterResponses[i].decode(pd, version); err != nil {
+ d.FilterResponses[i] = new(FilterResponse)
+ if err := d.FilterResponses[i].decode(pd, version); err != nil {
return err
}
}
@@ -59,6 +60,7 @@
return V0_11_0_0
}
+//FilterResponse is a filter response type
type FilterResponse struct {
Err KError
ErrMsg *string
@@ -109,6 +111,7 @@
return nil
}
+//MatchingAcl is a matching acl type
type MatchingAcl struct {
Err KError
ErrMsg *string
diff --git a/vendor/github.com/Shopify/sarama/acl_describe_request.go b/vendor/github.com/Shopify/sarama/acl_describe_request.go
index 3c95320..5222d46 100644
--- a/vendor/github.com/Shopify/sarama/acl_describe_request.go
+++ b/vendor/github.com/Shopify/sarama/acl_describe_request.go
@@ -1,5 +1,6 @@
package sarama
+//DescribeAclsRequest is a secribe acl request type
type DescribeAclsRequest struct {
Version int
AclFilter
diff --git a/vendor/github.com/Shopify/sarama/acl_describe_response.go b/vendor/github.com/Shopify/sarama/acl_describe_response.go
index db3a88c..12126e5 100644
--- a/vendor/github.com/Shopify/sarama/acl_describe_response.go
+++ b/vendor/github.com/Shopify/sarama/acl_describe_response.go
@@ -2,6 +2,7 @@
import "time"
+//DescribeAclsResponse is a describe acl response type
type DescribeAclsResponse struct {
Version int16
ThrottleTime time.Duration
diff --git a/vendor/github.com/Shopify/sarama/acl_types.go b/vendor/github.com/Shopify/sarama/acl_types.go
index 72b7985..c10ad7b 100644
--- a/vendor/github.com/Shopify/sarama/acl_types.go
+++ b/vendor/github.com/Shopify/sarama/acl_types.go
@@ -1,50 +1,51 @@
package sarama
-type AclOperation int
+type (
+ AclOperation int
+
+ AclPermissionType int
+
+ AclResourceType int
+
+ AclResourcePatternType int
+)
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
const (
- AclOperationUnknown AclOperation = 0
- AclOperationAny AclOperation = 1
- AclOperationAll AclOperation = 2
- AclOperationRead AclOperation = 3
- AclOperationWrite AclOperation = 4
- AclOperationCreate AclOperation = 5
- AclOperationDelete AclOperation = 6
- AclOperationAlter AclOperation = 7
- AclOperationDescribe AclOperation = 8
- AclOperationClusterAction AclOperation = 9
- AclOperationDescribeConfigs AclOperation = 10
- AclOperationAlterConfigs AclOperation = 11
- AclOperationIdempotentWrite AclOperation = 12
+ AclOperationUnknown AclOperation = iota
+ AclOperationAny
+ AclOperationAll
+ AclOperationRead
+ AclOperationWrite
+ AclOperationCreate
+ AclOperationDelete
+ AclOperationAlter
+ AclOperationDescribe
+ AclOperationClusterAction
+ AclOperationDescribeConfigs
+ AclOperationAlterConfigs
+ AclOperationIdempotentWrite
)
-type AclPermissionType int
-
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
const (
- AclPermissionUnknown AclPermissionType = 0
- AclPermissionAny AclPermissionType = 1
- AclPermissionDeny AclPermissionType = 2
- AclPermissionAllow AclPermissionType = 3
+ AclPermissionUnknown AclPermissionType = iota
+ AclPermissionAny
+ AclPermissionDeny
+ AclPermissionAllow
)
-type AclResourceType int
-
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
const (
- AclResourceUnknown AclResourceType = 0
- AclResourceAny AclResourceType = 1
- AclResourceTopic AclResourceType = 2
- AclResourceGroup AclResourceType = 3
- AclResourceCluster AclResourceType = 4
- AclResourceTransactionalID AclResourceType = 5
+ AclResourceUnknown AclResourceType = iota
+ AclResourceAny
+ AclResourceTopic
+ AclResourceGroup
+ AclResourceCluster
+ AclResourceTransactionalID
)
-type AclResourcePatternType int
-
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
-
const (
AclPatternUnknown AclResourcePatternType = iota
AclPatternAny
diff --git a/vendor/github.com/Shopify/sarama/add_offsets_to_txn_request.go b/vendor/github.com/Shopify/sarama/add_offsets_to_txn_request.go
index 6da166c..fc227ab 100644
--- a/vendor/github.com/Shopify/sarama/add_offsets_to_txn_request.go
+++ b/vendor/github.com/Shopify/sarama/add_offsets_to_txn_request.go
@@ -1,5 +1,6 @@
package sarama
+//AddOffsetsToTxnRequest adds offsets to a transaction request
type AddOffsetsToTxnRequest struct {
TransactionalID string
ProducerID int64
diff --git a/vendor/github.com/Shopify/sarama/add_offsets_to_txn_response.go b/vendor/github.com/Shopify/sarama/add_offsets_to_txn_response.go
index 3a46151..c88c1f8 100644
--- a/vendor/github.com/Shopify/sarama/add_offsets_to_txn_response.go
+++ b/vendor/github.com/Shopify/sarama/add_offsets_to_txn_response.go
@@ -4,6 +4,7 @@
"time"
)
+//AddOffsetsToTxnResponse is a response type for adding offsets to txns
type AddOffsetsToTxnResponse struct {
ThrottleTime time.Duration
Err KError
diff --git a/vendor/github.com/Shopify/sarama/add_partitions_to_txn_request.go b/vendor/github.com/Shopify/sarama/add_partitions_to_txn_request.go
index a8a5922..8d4b42e 100644
--- a/vendor/github.com/Shopify/sarama/add_partitions_to_txn_request.go
+++ b/vendor/github.com/Shopify/sarama/add_partitions_to_txn_request.go
@@ -1,5 +1,6 @@
package sarama
+//AddPartitionsToTxnRequest is a add paartition request
type AddPartitionsToTxnRequest struct {
TransactionalID string
ProducerID int64
diff --git a/vendor/github.com/Shopify/sarama/add_partitions_to_txn_response.go b/vendor/github.com/Shopify/sarama/add_partitions_to_txn_response.go
index 581c556..eb4f23e 100644
--- a/vendor/github.com/Shopify/sarama/add_partitions_to_txn_response.go
+++ b/vendor/github.com/Shopify/sarama/add_partitions_to_txn_response.go
@@ -4,6 +4,7 @@
"time"
)
+//AddPartitionsToTxnResponse is a partition errors to transaction type
type AddPartitionsToTxnResponse struct {
ThrottleTime time.Duration
Errors map[string][]*PartitionError
@@ -82,6 +83,7 @@
return V0_11_0_0
}
+//PartitionError is a partition error type
type PartitionError struct {
Partition int32
Err KError
diff --git a/vendor/github.com/Shopify/sarama/admin.go b/vendor/github.com/Shopify/sarama/admin.go
index 18b055a..1db6a0e 100644
--- a/vendor/github.com/Shopify/sarama/admin.go
+++ b/vendor/github.com/Shopify/sarama/admin.go
@@ -20,7 +20,7 @@
// List the topics available in the cluster with the default options.
ListTopics() (map[string]TopicDetail, error)
- // Describe some topics in the cluster
+ // Describe some topics in the cluster.
DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
// Delete a topic. It may take several seconds after the DeleteTopic to returns success
@@ -78,12 +78,15 @@
// List the consumer groups available in the cluster.
ListConsumerGroups() (map[string]string, error)
- // Describe the given consumer group
+ // Describe the given consumer groups.
DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
// List the consumer group offsets available in the cluster.
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
+ // Delete a consumer group.
+ DeleteConsumerGroup(group string) error
+
// Get information about the nodes in the cluster
DescribeCluster() (brokers []*Broker, controllerID int32, err error)
@@ -131,7 +134,7 @@
}
if detail == nil {
- return errors.New("You must specify topic details")
+ return errors.New("you must specify topic details")
}
topicDetails := make(map[string]*TopicDetail)
@@ -166,7 +169,7 @@
}
if topicErr.Err != ErrNoError {
- return topicErr.Err
+ return topicErr
}
return nil
@@ -183,7 +186,9 @@
AllowAutoTopicCreation: false,
}
- if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+ if ca.conf.Version.IsAtLeast(V1_0_0_0) {
+ request.Version = 5
+ } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 4
}
@@ -358,7 +363,7 @@
}
if topicErr.Err != ErrNoError {
- return topicErr.Err
+ return topicErr
}
return nil
@@ -369,29 +374,50 @@
if topic == "" {
return ErrInvalidTopic
}
-
- topics := make(map[string]*DeleteRecordsRequestTopic)
- topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
- request := &DeleteRecordsRequest{
- Topics: topics,
- Timeout: ca.conf.Admin.Timeout,
+ partitionPerBroker := make(map[*Broker][]int32)
+ for partition := range partitionOffsets {
+ broker, err := ca.client.Leader(topic, partition)
+ if err != nil {
+ return err
+ }
+ if _, ok := partitionPerBroker[broker]; ok {
+ partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
+ } else {
+ partitionPerBroker[broker] = []int32{partition}
+ }
}
+ errs := make([]error, 0)
+ for broker, partitions := range partitionPerBroker {
+ topics := make(map[string]*DeleteRecordsRequestTopic)
+ recordsToDelete := make(map[int32]int64)
+ for _, p := range partitions {
+ recordsToDelete[p] = partitionOffsets[p]
+ }
+ topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
+ request := &DeleteRecordsRequest{
+ Topics: topics,
+ Timeout: ca.conf.Admin.Timeout,
+ }
- b, err := ca.Controller()
- if err != nil {
- return err
+ rsp, err := broker.DeleteRecords(request)
+ if err != nil {
+ errs = append(errs, err)
+ } else {
+ deleteRecordsResponseTopic, ok := rsp.Topics[topic]
+ if !ok {
+ errs = append(errs, ErrIncompleteResponse)
+ } else {
+ for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
+ if deleteRecordsResponsePartition.Err != ErrNoError {
+ errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
+ }
+ }
+ }
+ }
}
-
- rsp, err := b.DeleteRecords(request)
- if err != nil {
- return err
+ if len(errs) > 0 {
+ return ErrDeleteRecords{MultiError{&errs}}
}
-
- _, ok := rsp.Topics[topic]
- if !ok {
- return ErrIncompleteResponse
- }
-
//todo since we are dealing with couple of partitions it would be good if we return slice of errors
//for each partition instead of one error
return nil
@@ -606,9 +632,38 @@
partitions: topicPartitions,
}
- if ca.conf.Version.IsAtLeast(V0_8_2_2) {
+ if ca.conf.Version.IsAtLeast(V0_10_2_0) {
+ request.Version = 2
+ } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
request.Version = 1
}
return coordinator.FetchOffset(request)
}
+
+func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
+ coordinator, err := ca.client.Coordinator(group)
+ if err != nil {
+ return err
+ }
+
+ request := &DeleteGroupsRequest{
+ Groups: []string{group},
+ }
+
+ resp, err := coordinator.DeleteGroups(request)
+ if err != nil {
+ return err
+ }
+
+ groupErr, ok := resp.GroupErrorCodes[group]
+ if !ok {
+ return ErrIncompleteResponse
+ }
+
+ if groupErr != ErrNoError {
+ return groupErr
+ }
+
+ return nil
+}
diff --git a/vendor/github.com/Shopify/sarama/alter_configs_request.go b/vendor/github.com/Shopify/sarama/alter_configs_request.go
index 48c44ea..26c275b 100644
--- a/vendor/github.com/Shopify/sarama/alter_configs_request.go
+++ b/vendor/github.com/Shopify/sarama/alter_configs_request.go
@@ -1,45 +1,47 @@
package sarama
+//AlterConfigsRequest is an alter config request type
type AlterConfigsRequest struct {
Resources []*AlterConfigsResource
ValidateOnly bool
}
+//AlterConfigsResource is an alter config resource type
type AlterConfigsResource struct {
Type ConfigResourceType
Name string
ConfigEntries map[string]*string
}
-func (acr *AlterConfigsRequest) encode(pe packetEncoder) error {
- if err := pe.putArrayLength(len(acr.Resources)); err != nil {
+func (a *AlterConfigsRequest) encode(pe packetEncoder) error {
+ if err := pe.putArrayLength(len(a.Resources)); err != nil {
return err
}
- for _, r := range acr.Resources {
+ for _, r := range a.Resources {
if err := r.encode(pe); err != nil {
return err
}
}
- pe.putBool(acr.ValidateOnly)
+ pe.putBool(a.ValidateOnly)
return nil
}
-func (acr *AlterConfigsRequest) decode(pd packetDecoder, version int16) error {
+func (a *AlterConfigsRequest) decode(pd packetDecoder, version int16) error {
resourceCount, err := pd.getArrayLength()
if err != nil {
return err
}
- acr.Resources = make([]*AlterConfigsResource, resourceCount)
- for i := range acr.Resources {
+ a.Resources = make([]*AlterConfigsResource, resourceCount)
+ for i := range a.Resources {
r := &AlterConfigsResource{}
err = r.decode(pd, version)
if err != nil {
return err
}
- acr.Resources[i] = r
+ a.Resources[i] = r
}
validateOnly, err := pd.getBool()
@@ -47,22 +49,22 @@
return err
}
- acr.ValidateOnly = validateOnly
+ a.ValidateOnly = validateOnly
return nil
}
-func (ac *AlterConfigsResource) encode(pe packetEncoder) error {
- pe.putInt8(int8(ac.Type))
+func (a *AlterConfigsResource) encode(pe packetEncoder) error {
+ pe.putInt8(int8(a.Type))
- if err := pe.putString(ac.Name); err != nil {
+ if err := pe.putString(a.Name); err != nil {
return err
}
- if err := pe.putArrayLength(len(ac.ConfigEntries)); err != nil {
+ if err := pe.putArrayLength(len(a.ConfigEntries)); err != nil {
return err
}
- for configKey, configValue := range ac.ConfigEntries {
+ for configKey, configValue := range a.ConfigEntries {
if err := pe.putString(configKey); err != nil {
return err
}
@@ -74,18 +76,18 @@
return nil
}
-func (ac *AlterConfigsResource) decode(pd packetDecoder, version int16) error {
+func (a *AlterConfigsResource) decode(pd packetDecoder, version int16) error {
t, err := pd.getInt8()
if err != nil {
return err
}
- ac.Type = ConfigResourceType(t)
+ a.Type = ConfigResourceType(t)
name, err := pd.getString()
if err != nil {
return err
}
- ac.Name = name
+ a.Name = name
n, err := pd.getArrayLength()
if err != nil {
@@ -93,13 +95,13 @@
}
if n > 0 {
- ac.ConfigEntries = make(map[string]*string, n)
+ a.ConfigEntries = make(map[string]*string, n)
for i := 0; i < n; i++ {
configKey, err := pd.getString()
if err != nil {
return err
}
- if ac.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
+ if a.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
return err
}
}
@@ -107,14 +109,14 @@
return err
}
-func (acr *AlterConfigsRequest) key() int16 {
+func (a *AlterConfigsRequest) key() int16 {
return 33
}
-func (acr *AlterConfigsRequest) version() int16 {
+func (a *AlterConfigsRequest) version() int16 {
return 0
}
-func (acr *AlterConfigsRequest) requiredVersion() KafkaVersion {
+func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
diff --git a/vendor/github.com/Shopify/sarama/alter_configs_response.go b/vendor/github.com/Shopify/sarama/alter_configs_response.go
index 29b09e1..3893663 100644
--- a/vendor/github.com/Shopify/sarama/alter_configs_response.go
+++ b/vendor/github.com/Shopify/sarama/alter_configs_response.go
@@ -2,11 +2,13 @@
import "time"
+//AlterConfigsResponse is a reponse type for alter config
type AlterConfigsResponse struct {
ThrottleTime time.Duration
Resources []*AlterConfigsResourceResponse
}
+//AlterConfigsResourceResponse is a reponse type for alter config resource
type AlterConfigsResourceResponse struct {
ErrorCode int16
ErrorMsg string
@@ -14,21 +16,21 @@
Name string
}
-func (ct *AlterConfigsResponse) encode(pe packetEncoder) error {
- pe.putInt32(int32(ct.ThrottleTime / time.Millisecond))
+func (a *AlterConfigsResponse) encode(pe packetEncoder) error {
+ pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
- if err := pe.putArrayLength(len(ct.Resources)); err != nil {
+ if err := pe.putArrayLength(len(a.Resources)); err != nil {
return err
}
- for i := range ct.Resources {
- pe.putInt16(ct.Resources[i].ErrorCode)
- err := pe.putString(ct.Resources[i].ErrorMsg)
+ for i := range a.Resources {
+ pe.putInt16(a.Resources[i].ErrorCode)
+ err := pe.putString(a.Resources[i].ErrorMsg)
if err != nil {
return nil
}
- pe.putInt8(int8(ct.Resources[i].Type))
- err = pe.putString(ct.Resources[i].Name)
+ pe.putInt8(int8(a.Resources[i].Type))
+ err = pe.putString(a.Resources[i].Name)
if err != nil {
return nil
}
@@ -37,59 +39,59 @@
return nil
}
-func (acr *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
+func (a *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
- acr.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+ a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
responseCount, err := pd.getArrayLength()
if err != nil {
return err
}
- acr.Resources = make([]*AlterConfigsResourceResponse, responseCount)
+ a.Resources = make([]*AlterConfigsResourceResponse, responseCount)
- for i := range acr.Resources {
- acr.Resources[i] = new(AlterConfigsResourceResponse)
+ for i := range a.Resources {
+ a.Resources[i] = new(AlterConfigsResourceResponse)
errCode, err := pd.getInt16()
if err != nil {
return err
}
- acr.Resources[i].ErrorCode = errCode
+ a.Resources[i].ErrorCode = errCode
e, err := pd.getString()
if err != nil {
return err
}
- acr.Resources[i].ErrorMsg = e
+ a.Resources[i].ErrorMsg = e
t, err := pd.getInt8()
if err != nil {
return err
}
- acr.Resources[i].Type = ConfigResourceType(t)
+ a.Resources[i].Type = ConfigResourceType(t)
name, err := pd.getString()
if err != nil {
return err
}
- acr.Resources[i].Name = name
+ a.Resources[i].Name = name
}
return nil
}
-func (r *AlterConfigsResponse) key() int16 {
+func (a *AlterConfigsResponse) key() int16 {
return 32
}
-func (r *AlterConfigsResponse) version() int16 {
+func (a *AlterConfigsResponse) version() int16 {
return 0
}
-func (r *AlterConfigsResponse) requiredVersion() KafkaVersion {
+func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
diff --git a/vendor/github.com/Shopify/sarama/api_versions_request.go b/vendor/github.com/Shopify/sarama/api_versions_request.go
index ab65f01..b33167c 100644
--- a/vendor/github.com/Shopify/sarama/api_versions_request.go
+++ b/vendor/github.com/Shopify/sarama/api_versions_request.go
@@ -1,24 +1,25 @@
package sarama
+//ApiVersionsRequest ...
type ApiVersionsRequest struct {
}
-func (r *ApiVersionsRequest) encode(pe packetEncoder) error {
+func (a *ApiVersionsRequest) encode(pe packetEncoder) error {
return nil
}
-func (r *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
+func (a *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
return nil
}
-func (r *ApiVersionsRequest) key() int16 {
+func (a *ApiVersionsRequest) key() int16 {
return 18
}
-func (r *ApiVersionsRequest) version() int16 {
+func (a *ApiVersionsRequest) version() int16 {
return 0
}
-func (r *ApiVersionsRequest) requiredVersion() KafkaVersion {
+func (a *ApiVersionsRequest) requiredVersion() KafkaVersion {
return V0_10_0_0
}
diff --git a/vendor/github.com/Shopify/sarama/api_versions_response.go b/vendor/github.com/Shopify/sarama/api_versions_response.go
index 23bc326..bb1f0b3 100644
--- a/vendor/github.com/Shopify/sarama/api_versions_response.go
+++ b/vendor/github.com/Shopify/sarama/api_versions_response.go
@@ -1,5 +1,6 @@
package sarama
+//ApiVersionsResponseBlock is an api version reponse block type
type ApiVersionsResponseBlock struct {
ApiKey int16
MinVersion int16
@@ -31,6 +32,7 @@
return nil
}
+//ApiVersionsResponse is an api version response type
type ApiVersionsResponse struct {
Err KError
ApiVersions []*ApiVersionsResponseBlock
diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
index 5174a35..11e0849 100644
--- a/vendor/github.com/Shopify/sarama/async_producer.go
+++ b/vendor/github.com/Shopify/sarama/async_producer.go
@@ -92,9 +92,8 @@
}
type asyncProducer struct {
- client Client
- conf *Config
- ownClient bool
+ client Client
+ conf *Config
errors chan *ProducerError
input, successes, retries chan *ProducerMessage
@@ -113,18 +112,19 @@
if err != nil {
return nil, err
}
-
- p, err := NewAsyncProducerFromClient(client)
- if err != nil {
- return nil, err
- }
- p.(*asyncProducer).ownClient = true
- return p, nil
+ return newAsyncProducer(client)
}
// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this producer.
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
+ // For clients passed in by the client, ensure we don't
+ // call Close() on it.
+ cli := &nopCloserClient{client}
+ return newAsyncProducer(cli)
+}
+
+func newAsyncProducer(client Client) (AsyncProducer, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
@@ -191,10 +191,17 @@
// Partition is the partition that the message was sent to. This is only
// guaranteed to be defined if the message was successfully delivered.
Partition int32
- // Timestamp is the timestamp assigned to the message by the broker. This
- // is only guaranteed to be defined if the message was successfully
- // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
- // least version 0.10.0.
+ // Timestamp can vary in behaviour depending on broker configuration, being
+ // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
+ // and requiring version at least 0.10.0.
+ //
+ // When configured to CreateTime, the timestamp is specified by the producer
+ // either by explicitly setting this field, or when the message is added
+ // to a produce set.
+ //
+ // When configured to LogAppendTime, the timestamp assigned to the message
+ // by the broker. This is only guaranteed to be defined if the message was
+ // successfully delivered and RequiredAcks is not NoResponse.
Timestamp time.Time
retries int
@@ -999,11 +1006,9 @@
p.inFlight.Wait()
- if p.ownClient {
- err := p.client.Close()
- if err != nil {
- Logger.Println("producer/shutdown failed to close the embedded client:", err)
- }
+ err := p.client.Close()
+ if err != nil {
+ Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
close(p.input)
diff --git a/vendor/github.com/Shopify/sarama/balance_strategy.go b/vendor/github.com/Shopify/sarama/balance_strategy.go
index e78988d..2fce17f 100644
--- a/vendor/github.com/Shopify/sarama/balance_strategy.go
+++ b/vendor/github.com/Shopify/sarama/balance_strategy.go
@@ -24,7 +24,7 @@
// --------------------------------------------------------------------
// BalanceStrategy is used to balance topics and partitions
-// across memebers of a consumer group
+// across members of a consumer group
type BalanceStrategy interface {
// Name uniquely identifies the strategy.
Name() string
@@ -78,7 +78,7 @@
// Name implements BalanceStrategy.
func (s *balanceStrategy) Name() string { return s.name }
-// Balance implements BalanceStrategy.
+// Plan implements BalanceStrategy.
func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
// Build members by topic map
mbt := make(map[string][]string)
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 9129089..9c3e5a0 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -13,24 +13,25 @@
"sync/atomic"
"time"
- "github.com/rcrowley/go-metrics"
+ metrics "github.com/rcrowley/go-metrics"
)
// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
type Broker struct {
- id int32
- addr string
+ conf *Config
rack *string
- conf *Config
+ id int32
+ addr string
correlationID int32
conn net.Conn
connErr error
lock sync.Mutex
opened int32
+ responses chan responsePromise
+ done chan bool
- responses chan responsePromise
- done chan bool
+ registeredMetrics []string
incomingByteRate metrics.Meter
requestRate metrics.Meter
@@ -46,6 +47,8 @@
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
+
+ kerberosAuthenticator GSSAPIKerberosAuth
}
// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
@@ -56,6 +59,11 @@
SASLTypeOAuth = "OAUTHBEARER"
// SASLTypePlaintext represents the SASL/PLAIN mechanism
SASLTypePlaintext = "PLAIN"
+ // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
+ SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
+ // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
+ SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
+ SASLTypeGSSAPI = "GSSAPI"
// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
// server negotiate SASL auth using opaque packets.
SASLHandshakeV0 = int16(0)
@@ -92,6 +100,20 @@
Token() (*AccessToken, error)
}
+// SCRAMClient is a an interface to a SCRAM
+// client implementation.
+type SCRAMClient interface {
+ // Begin prepares the client for the SCRAM exchange
+ // with the server with a user name and a password
+ Begin(userName, password, authzID string) error
+ // Step steps client through the SCRAM exchange. It is
+ // called repeatedly until it errors or `Done` returns true.
+ Step(challenge string) (response string, err error)
+ // Done should return true when the SCRAM conversation
+ // is over.
+ Done() bool
+}
+
type responsePromise struct {
requestTime time.Time
correlationID int32
@@ -137,6 +159,8 @@
if conf.Net.TLS.Enable {
b.conn, b.connErr = tls.DialWithDialer(&dialer, "tcp", b.addr, conf.Net.TLS.Config)
+ } else if conf.Net.Proxy.Enable {
+ b.conn, b.connErr = conf.Net.Proxy.Dialer.Dial("tcp", b.addr)
} else {
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
}
@@ -161,13 +185,7 @@
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 {
- b.brokerIncomingByteRate = getOrRegisterBrokerMeter("incoming-byte-rate", b, conf.MetricRegistry)
- b.brokerRequestRate = getOrRegisterBrokerMeter("request-rate", b, conf.MetricRegistry)
- b.brokerRequestSize = getOrRegisterBrokerHistogram("request-size", b, conf.MetricRegistry)
- b.brokerRequestLatency = getOrRegisterBrokerHistogram("request-latency-in-ms", b, conf.MetricRegistry)
- b.brokerOutgoingByteRate = getOrRegisterBrokerMeter("outgoing-byte-rate", b, conf.MetricRegistry)
- b.brokerResponseRate = getOrRegisterBrokerMeter("response-rate", b, conf.MetricRegistry)
- b.brokerResponseSize = getOrRegisterBrokerHistogram("response-size", b, conf.MetricRegistry)
+ b.registerMetrics()
}
if conf.Net.SASL.Enable {
@@ -210,6 +228,7 @@
return b.conn != nil, b.connErr
}
+//Close closes the broker resources
func (b *Broker) Close() error {
b.lock.Lock()
defer b.lock.Unlock()
@@ -228,12 +247,7 @@
b.done = nil
b.responses = nil
- if b.id >= 0 {
- b.conf.MetricRegistry.Unregister(getMetricNameForBroker("incoming-byte-rate", b))
- b.conf.MetricRegistry.Unregister(getMetricNameForBroker("request-rate", b))
- b.conf.MetricRegistry.Unregister(getMetricNameForBroker("outgoing-byte-rate", b))
- b.conf.MetricRegistry.Unregister(getMetricNameForBroker("response-rate", b))
- }
+ b.unregisterMetrics()
if err == nil {
Logger.Printf("Closed connection to broker %s\n", b.addr)
@@ -267,6 +281,7 @@
return *b.rack
}
+//GetMetadata send a metadata request and returns a metadata response or error
func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
response := new(MetadataResponse)
@@ -279,6 +294,7 @@
return response, nil
}
+//GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error
func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
response := new(ConsumerMetadataResponse)
@@ -291,6 +307,7 @@
return response, nil
}
+//FindCoordinator sends a find coordinate request and returns a response or error
func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
response := new(FindCoordinatorResponse)
@@ -303,6 +320,7 @@
return response, nil
}
+//GetAvailableOffsets return an offset response or error
func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
response := new(OffsetResponse)
@@ -315,9 +333,12 @@
return response, nil
}
+//Produce returns a produce response or error
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
- var response *ProduceResponse
- var err error
+ var (
+ response *ProduceResponse
+ err error
+ )
if request.RequiredAcks == NoResponse {
err = b.sendAndReceive(request, nil)
@@ -333,11 +354,11 @@
return response, nil
}
+//Fetch returns a FetchResponse or error
func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
response := new(FetchResponse)
err := b.sendAndReceive(request, response)
-
if err != nil {
return nil, err
}
@@ -345,11 +366,11 @@
return response, nil
}
+//CommitOffset return an Offset commit reponse or error
func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
response := new(OffsetCommitResponse)
err := b.sendAndReceive(request, response)
-
if err != nil {
return nil, err
}
@@ -357,11 +378,11 @@
return response, nil
}
+//FetchOffset returns an offset fetch response or error
func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
response := new(OffsetFetchResponse)
err := b.sendAndReceive(request, response)
-
if err != nil {
return nil, err
}
@@ -369,6 +390,7 @@
return response, nil
}
+//JoinGroup returns a join group response or error
func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) {
response := new(JoinGroupResponse)
@@ -380,6 +402,7 @@
return response, nil
}
+//SyncGroup returns a sync group response or error
func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) {
response := new(SyncGroupResponse)
@@ -391,6 +414,7 @@
return response, nil
}
+//LeaveGroup return a leave group response or error
func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) {
response := new(LeaveGroupResponse)
@@ -402,6 +426,7 @@
return response, nil
}
+//Heartbeat returns a heartbeat response or error
func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) {
response := new(HeartbeatResponse)
@@ -413,6 +438,7 @@
return response, nil
}
+//ListGroups return a list group response or error
func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) {
response := new(ListGroupsResponse)
@@ -424,6 +450,7 @@
return response, nil
}
+//DescribeGroups return describe group response or error
func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) {
response := new(DescribeGroupsResponse)
@@ -435,6 +462,7 @@
return response, nil
}
+//ApiVersions return api version response or error
func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) {
response := new(ApiVersionsResponse)
@@ -446,6 +474,7 @@
return response, nil
}
+//CreateTopics send a create topic request and returns create topic response
func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
response := new(CreateTopicsResponse)
@@ -457,6 +486,7 @@
return response, nil
}
+//DeleteTopics sends a delete topic request and returns delete topic response
func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
response := new(DeleteTopicsResponse)
@@ -468,6 +498,8 @@
return response, nil
}
+//CreatePartitions sends a create partition request and returns create
+//partitions response or error
func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
response := new(CreatePartitionsResponse)
@@ -479,6 +511,8 @@
return response, nil
}
+//DeleteRecords send a request to delete records and return delete record
+//response or error
func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
response := new(DeleteRecordsResponse)
@@ -490,6 +524,7 @@
return response, nil
}
+//DescribeAcls sends a describe acl request and returns a response or error
func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
response := new(DescribeAclsResponse)
@@ -501,6 +536,7 @@
return response, nil
}
+//CreateAcls sends a create acl request and returns a response or error
func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) {
response := new(CreateAclsResponse)
@@ -512,6 +548,7 @@
return response, nil
}
+//DeleteAcls sends a delete acl request and returns a response or error
func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) {
response := new(DeleteAclsResponse)
@@ -523,6 +560,7 @@
return response, nil
}
+//InitProducerID sends an init producer request and returns a response or error
func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
response := new(InitProducerIDResponse)
@@ -534,6 +572,8 @@
return response, nil
}
+//AddPartitionsToTxn send a request to add partition to txn and returns
+//a response or error
func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
response := new(AddPartitionsToTxnResponse)
@@ -545,6 +585,8 @@
return response, nil
}
+//AddOffsetsToTxn sends a request to add offsets to txn and returns a response
+//or error
func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
response := new(AddOffsetsToTxnResponse)
@@ -556,6 +598,7 @@
return response, nil
}
+//EndTxn sends a request to end txn and returns a response or error
func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
response := new(EndTxnResponse)
@@ -567,6 +610,8 @@
return response, nil
}
+//TxnOffsetCommit sends a request to commit transaction offsets and returns
+//a response or error
func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
response := new(TxnOffsetCommitResponse)
@@ -578,6 +623,8 @@
return response, nil
}
+//DescribeConfigs sends a request to describe config and returns a response or
+//error
func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
response := new(DescribeConfigsResponse)
@@ -589,6 +636,7 @@
return response, nil
}
+//AlterConfigs sends a request to alter config and return a response or error
func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
response := new(AlterConfigsResponse)
@@ -600,6 +648,7 @@
return response, nil
}
+//DeleteGroups sends a request to delete groups and returns a response or error
func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
response := new(DeleteGroupsResponse)
@@ -638,7 +687,7 @@
requestTime := time.Now()
bytes, err := b.conn.Write(buf)
- b.updateOutgoingCommunicationMetrics(bytes)
+ b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
if err != nil {
return nil, err
}
@@ -658,7 +707,6 @@
func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
promise, err := b.send(req, res != nil)
-
if err != nil {
return err
}
@@ -707,11 +755,11 @@
}
func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
-
host, portstr, err := net.SplitHostPort(b.addr)
if err != nil {
return err
}
+
port, err := strconv.Atoi(portstr)
if err != nil {
return err
@@ -739,6 +787,7 @@
func (b *Broker) responseReceiver() {
var dead error
header := make([]byte, 8)
+
for response := range b.responses {
if dead != nil {
response.errors <- dead
@@ -793,14 +842,28 @@
}
func (b *Broker) authenticateViaSASL() error {
- if b.conf.Net.SASL.Mechanism == SASLTypeOAuth {
+ switch b.conf.Net.SASL.Mechanism {
+ case SASLTypeOAuth:
return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
+ case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
+ return b.sendAndReceiveSASLSCRAMv1()
+ case SASLTypeGSSAPI:
+ return b.sendAndReceiveKerberos()
+ default:
+ return b.sendAndReceiveSASLPlainAuth()
}
- return b.sendAndReceiveSASLPlainAuth()
}
-func (b *Broker) sendAndReceiveSASLHandshake(saslType string, version int16) error {
- rb := &SaslHandshakeRequest{Mechanism: saslType, Version: version}
+func (b *Broker) sendAndReceiveKerberos() error {
+ b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
+ if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
+ b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
+ }
+ return b.kerberosAuthenticator.Authorize(b)
+}
+
+func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
+ rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
@@ -828,6 +891,7 @@
Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
return err
}
+
length := binary.BigEndian.Uint32(header[:4])
payload := make([]byte, length-4)
n, err := io.ReadFull(b.conn, payload)
@@ -835,23 +899,29 @@
Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
return err
}
+
b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
res := &SaslHandshakeResponse{}
+
err = versionedDecode(payload, res, 0)
if err != nil {
Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
return err
}
+
if res.Err != ErrNoError {
Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
return res.Err
}
- Logger.Print("Successful SASL handshake")
+
+ Logger.Print("Successful SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
return nil
}
-// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
-// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
+// Kafka 0.10.x supported SASL PLAIN/Kerberos via KAFKA-3149 (KIP-43).
+// Kafka 1.x.x onward added a SaslAuthenticate request/response message which
+// wraps the SASL flow in the Kafka protocol, which allows for returning
+// meaningful errors on authentication failure.
//
// In SASL Plain, Kafka expects the auth header to be in the following format
// Message format (from https://tools.ietf.org/html/rfc4616):
@@ -865,17 +935,34 @@
// SAFE = UTF1 / UTF2 / UTF3 / UTF4
// ;; any UTF-8 encoded Unicode character except NUL
//
+// With SASL v0 handshake and auth then:
// When credentials are valid, Kafka returns a 4 byte array of null characters.
-// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
-// of responding to bad credentials but thats how its being done today.
+// When credentials are invalid, Kafka closes the connection.
+//
+// With SASL v1 handshake and auth then:
+// When credentials are invalid, Kafka replies with a SaslAuthenticate response
+// containing an error code and message detailing the authentication failure.
func (b *Broker) sendAndReceiveSASLPlainAuth() error {
+ // default to V0 to allow for backward compatability when SASL is enabled
+ // but not the handshake
if b.conf.Net.SASL.Handshake {
- handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, SASLHandshakeV0)
+
+ handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
}
}
+
+ if b.conf.Net.SASL.Version == SASLHandshakeV1 {
+ return b.sendAndReceiveV1SASLPlainAuth()
+ }
+ return b.sendAndReceiveV0SASLPlainAuth()
+}
+
+// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
+func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
+
length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
authBytes := make([]byte, length+4) //4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
@@ -909,55 +996,197 @@
return nil
}
+// sendAndReceiveV1SASLPlainAuth flows the v1 sasl authentication using the kafka protocol
+func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {
+ correlationID := b.correlationID
+
+ requestTime := time.Now()
+
+ bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
+
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
+
+ if err != nil {
+ Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
+ return err
+ }
+
+ b.correlationID++
+
+ bytesRead, err := b.receiveSASLServerResponse(&SaslAuthenticateResponse{}, correlationID)
+ b.updateIncomingCommunicationMetrics(bytesRead, time.Since(requestTime))
+
+ // With v1 sasl we get an error message set in the response we can return
+ if err != nil {
+ Logger.Printf("Error returned from broker during SASL flow %s: %s\n", b.addr, err.Error())
+ return err
+ }
+
+ return nil
+}
+
// sendAndReceiveSASLOAuth performs the authentication flow as described by KIP-255
// https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876
func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
-
if err := b.sendAndReceiveSASLHandshake(SASLTypeOAuth, SASLHandshakeV1); err != nil {
return err
}
token, err := provider.Token()
-
if err != nil {
return err
}
- requestTime := time.Now()
-
- correlationID := b.correlationID
-
- bytesWritten, err := b.sendSASLOAuthBearerClientResponse(token, correlationID)
-
+ message, err := buildClientFirstMessage(token)
if err != nil {
return err
}
+ challenged, err := b.sendClientMessage(message)
+ if err != nil {
+ return err
+ }
+
+ if challenged {
+ // Abort the token exchange. The broker returns the failure code.
+ _, err = b.sendClientMessage([]byte(`\x01`))
+ }
+
+ return err
+}
+
+// sendClientMessage sends a SASL/OAUTHBEARER client message and returns true
+// if the broker responds with a challenge, in which case the token is
+// rejected.
+func (b *Broker) sendClientMessage(message []byte) (bool, error) {
+
+ requestTime := time.Now()
+ correlationID := b.correlationID
+
+ bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
+ if err != nil {
+ return false, err
+ }
+
b.updateOutgoingCommunicationMetrics(bytesWritten)
-
b.correlationID++
- bytesRead, err := b.receiveSASLOAuthBearerServerResponse(correlationID)
-
- if err != nil {
- return err
- }
+ res := &SaslAuthenticateResponse{}
+ bytesRead, err := b.receiveSASLServerResponse(res, correlationID)
requestLatency := time.Since(requestTime)
b.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
+ isChallenge := len(res.SaslAuthBytes) > 0
+
+ if isChallenge && err != nil {
+ Logger.Printf("Broker rejected authentication token: %s", res.SaslAuthBytes)
+ }
+
+ return isChallenge, err
+}
+
+func (b *Broker) sendAndReceiveSASLSCRAMv1() error {
+ if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV1); err != nil {
+ return err
+ }
+
+ scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
+ if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
+ return fmt.Errorf("failed to start SCRAM exchange with the server: %s", err.Error())
+ }
+
+ msg, err := scramClient.Step("")
+ if err != nil {
+ return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
+
+ }
+
+ for !scramClient.Done() {
+ requestTime := time.Now()
+ correlationID := b.correlationID
+ bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
+ if err != nil {
+ Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
+ return err
+ }
+
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
+ b.correlationID++
+ challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
+ if err != nil {
+ Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
+ return err
+ }
+
+ b.updateIncomingCommunicationMetrics(len(challenge), time.Since(requestTime))
+ msg, err = scramClient.Step(string(challenge))
+ if err != nil {
+ Logger.Println("SASL authentication failed", err)
+ return err
+ }
+ }
+
+ Logger.Println("SASL authentication succeeded")
return nil
}
+func (b *Broker) sendSaslAuthenticateRequest(correlationID int32, msg []byte) (int, error) {
+ rb := &SaslAuthenticateRequest{msg}
+ req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
+ buf, err := encode(req, b.conf.MetricRegistry)
+ if err != nil {
+ return 0, err
+ }
+
+ if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
+ return 0, err
+ }
+
+ return b.conn.Write(buf)
+}
+
+func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, error) {
+ buf := make([]byte, responseLengthSize+correlationIDSize)
+ _, err := io.ReadFull(b.conn, buf)
+ if err != nil {
+ return nil, err
+ }
+
+ header := responseHeader{}
+ err = decode(buf, &header)
+ if err != nil {
+ return nil, err
+ }
+
+ if header.correlationID != correlationID {
+ return nil, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
+ }
+
+ buf = make([]byte, header.length-correlationIDSize)
+ _, err = io.ReadFull(b.conn, buf)
+ if err != nil {
+ return nil, err
+ }
+
+ res := &SaslAuthenticateResponse{}
+ if err := versionedDecode(buf, res, 0); err != nil {
+ return nil, err
+ }
+ if res.Err != ErrNoError {
+ return nil, res.Err
+ }
+ return res.SaslAuthBytes, nil
+}
+
// Build SASL/OAUTHBEARER initial client response as described by RFC-7628
// https://tools.ietf.org/html/rfc7628
-func buildClientInitialResponse(token *AccessToken) ([]byte, error) {
-
+func buildClientFirstMessage(token *AccessToken) ([]byte, error) {
var ext string
if token.Extensions != nil && len(token.Extensions) > 0 {
if _, ok := token.Extensions[SASLExtKeyAuth]; ok {
- return []byte{}, fmt.Errorf("The extension `%s` is invalid", SASLExtKeyAuth)
+ return []byte{}, fmt.Errorf("the extension `%s` is invalid", SASLExtKeyAuth)
}
ext = "\x01" + mapToString(token.Extensions, "=", "\x01")
}
@@ -970,7 +1199,6 @@
// mapToString returns a list of key-value pairs ordered by key.
// keyValSep separates the key from the value. elemSep separates each pair.
func mapToString(extensions map[string]string, keyValSep string, elemSep string) string {
-
buf := make([]string, 0, len(extensions))
for k, v := range extensions {
@@ -982,20 +1210,30 @@
return strings.Join(buf, elemSep)
}
-func (b *Broker) sendSASLOAuthBearerClientResponse(token *AccessToken, correlationID int32) (int, error) {
-
- initialResp, err := buildClientInitialResponse(token)
-
+func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
+ authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
+ rb := &SaslAuthenticateRequest{authBytes}
+ req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
+ buf, err := encode(req, b.conf.MetricRegistry)
if err != nil {
return 0, err
}
+ err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
+ if err != nil {
+ Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
+ return 0, err
+ }
+ return b.conn.Write(buf)
+}
+
+func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
+
rb := &SaslAuthenticateRequest{initialResp}
req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
-
if err != nil {
return 0, err
}
@@ -1007,12 +1245,11 @@
return b.conn.Write(buf)
}
-func (b *Broker) receiveSASLOAuthBearerServerResponse(correlationID int32) (int, error) {
+func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correlationID int32) (int, error) {
- buf := make([]byte, 8)
+ buf := make([]byte, responseLengthSize+correlationIDSize)
bytesRead, err := io.ReadFull(b.conn, buf)
-
if err != nil {
return bytesRead, err
}
@@ -1020,7 +1257,6 @@
header := responseHeader{}
err = decode(buf, &header)
-
if err != nil {
return bytesRead, err
}
@@ -1029,48 +1265,39 @@
return bytesRead, fmt.Errorf("correlation ID didn't match, wanted %d, got %d", b.correlationID, header.correlationID)
}
- buf = make([]byte, header.length-4)
+ buf = make([]byte, header.length-correlationIDSize)
c, err := io.ReadFull(b.conn, buf)
-
bytesRead += c
-
if err != nil {
return bytesRead, err
}
- res := &SaslAuthenticateResponse{}
-
if err := versionedDecode(buf, res, 0); err != nil {
return bytesRead, err
}
- if err != nil {
- return bytesRead, err
- }
-
if res.Err != ErrNoError {
return bytesRead, res.Err
}
- if len(res.SaslAuthBytes) > 0 {
- Logger.Printf("Received SASL auth response: %s", res.SaslAuthBytes)
- }
-
return bytesRead, nil
}
func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
b.updateRequestLatencyMetrics(requestLatency)
b.responseRate.Mark(1)
+
if b.brokerResponseRate != nil {
b.brokerResponseRate.Mark(1)
}
+
responseSize := int64(bytes)
b.incomingByteRate.Mark(responseSize)
if b.brokerIncomingByteRate != nil {
b.brokerIncomingByteRate.Mark(responseSize)
}
+
b.responseSize.Update(responseSize)
if b.brokerResponseSize != nil {
b.brokerResponseSize.Update(responseSize)
@@ -1080,9 +1307,11 @@
func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
requestLatencyInMs := int64(requestLatency / time.Millisecond)
b.requestLatency.Update(requestLatencyInMs)
+
if b.brokerRequestLatency != nil {
b.brokerRequestLatency.Update(requestLatencyInMs)
}
+
}
func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1090,13 +1319,44 @@
if b.brokerRequestRate != nil {
b.brokerRequestRate.Mark(1)
}
+
requestSize := int64(bytes)
b.outgoingByteRate.Mark(requestSize)
if b.brokerOutgoingByteRate != nil {
b.brokerOutgoingByteRate.Mark(requestSize)
}
+
b.requestSize.Update(requestSize)
if b.brokerRequestSize != nil {
b.brokerRequestSize.Update(requestSize)
}
+
+}
+
+func (b *Broker) registerMetrics() {
+ b.brokerIncomingByteRate = b.registerMeter("incoming-byte-rate")
+ b.brokerRequestRate = b.registerMeter("request-rate")
+ b.brokerRequestSize = b.registerHistogram("request-size")
+ b.brokerRequestLatency = b.registerHistogram("request-latency-in-ms")
+ b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
+ b.brokerResponseRate = b.registerMeter("response-rate")
+ b.brokerResponseSize = b.registerHistogram("response-size")
+}
+
+func (b *Broker) unregisterMetrics() {
+ for _, name := range b.registeredMetrics {
+ b.conf.MetricRegistry.Unregister(name)
+ }
+}
+
+func (b *Broker) registerMeter(name string) metrics.Meter {
+ nameForBroker := getMetricNameForBroker(name, b)
+ b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+ return metrics.GetOrRegisterMeter(nameForBroker, b.conf.MetricRegistry)
+}
+
+func (b *Broker) registerHistogram(name string) metrics.Histogram {
+ nameForBroker := getMetricNameForBroker(name, b)
+ b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+ return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
}
diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go
index 0016f8f..c4c54b2 100644
--- a/vendor/github.com/Shopify/sarama/client.go
+++ b/vendor/github.com/Shopify/sarama/client.go
@@ -46,6 +46,10 @@
// the partition leader.
InSyncReplicas(topic string, partitionID int32) ([]int32, error)
+ // OfflineReplicas returns the set of all offline replica IDs for the given
+ // partition. Offline replicas are replicas which are offline
+ OfflineReplicas(topic string, partitionID int32) ([]int32, error)
+
// RefreshMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics. If no topics are provided, it will refresh
// metadata for all topics.
@@ -288,7 +292,8 @@
partitions = client.cachedPartitions(topic, allPartitions)
}
- if partitions == nil {
+ // no partitions found after refresh metadata
+ if len(partitions) == 0 {
return nil, ErrUnknownTopicOrPartition
}
@@ -373,6 +378,31 @@
return dupInt32Slice(metadata.Isr), nil
}
+func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
+ if client.Closed() {
+ return nil, ErrClosedClient
+ }
+
+ metadata := client.cachedMetadata(topic, partitionID)
+
+ if metadata == nil {
+ err := client.RefreshMetadata(topic)
+ if err != nil {
+ return nil, err
+ }
+ metadata = client.cachedMetadata(topic, partitionID)
+ }
+
+ if metadata == nil {
+ return nil, ErrUnknownTopicOrPartition
+ }
+
+ if metadata.Err == ErrReplicaNotAvailable {
+ return dupInt32Slice(metadata.OfflineReplicas), metadata.Err
+ }
+ return dupInt32Slice(metadata.OfflineReplicas), nil
+}
+
func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
@@ -405,7 +435,11 @@
}
}
- return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
+ deadline := time.Time{}
+ if client.conf.Metadata.Timeout > 0 {
+ deadline = time.Now().Add(client.conf.Metadata.Timeout)
+ }
+ return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}
func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
@@ -707,32 +741,47 @@
return nil
}
-func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
+func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
+ pastDeadline := func(backoff time.Duration) bool {
+ if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
+ // we are past the deadline
+ return true
+ }
+ return false
+ }
retry := func(err error) error {
if attemptsRemaining > 0 {
backoff := client.computeBackoff(attemptsRemaining)
+ if pastDeadline(backoff) {
+ Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
+ return err
+ }
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
if backoff > 0 {
time.Sleep(backoff)
}
- return client.tryRefreshMetadata(topics, attemptsRemaining-1)
+ return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
}
return err
}
- for broker := client.any(); broker != nil; broker = client.any() {
+ broker := client.any()
+ for ; broker != nil && !pastDeadline(0); broker = client.any() {
+ allowAutoTopicCreation := true
if len(topics) > 0 {
Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
} else {
+ allowAutoTopicCreation = false
Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
}
- req := &MetadataRequest{Topics: topics}
- if client.conf.Version.IsAtLeast(V0_10_0_0) {
+ req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
+ if client.conf.Version.IsAtLeast(V1_0_0_0) {
+ req.Version = 5
+ } else if client.conf.Version.IsAtLeast(V0_10_0_0) {
req.Version = 1
}
response, err := broker.GetMetadata(req)
-
switch err.(type) {
case nil:
allKnownMetaData := len(topics) == 0
@@ -747,6 +796,18 @@
case PacketEncodingError:
// didn't even send, return the error
return err
+
+ case KError:
+ // if SASL auth error return as this _should_ be a non retryable err for all brokers
+ if err.(KError) == ErrSASLAuthenticationFailed {
+ Logger.Println("client/metadata failed SASL authentication")
+ return err
+ }
+ // else remove that broker and try again
+ Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
+ _ = broker.Close()
+ client.deregisterBroker(broker)
+
default:
// some other error, remove that broker and try again
Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
@@ -755,6 +816,11 @@
}
}
+ if broker != nil {
+ Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
+ return retry(ErrOutOfBrokers)
+ }
+
Logger.Println("client/metadata no available broker to send metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
@@ -792,7 +858,7 @@
switch topic.Err {
case ErrNoError:
- break
+ // no-op
case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
err = topic.Err
continue
@@ -802,7 +868,6 @@
continue
case ErrLeaderNotAvailable: // retry, but store partial partition results
retry = true
- break
default: // don't retry, don't store partial results
Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
err = topic.Err
@@ -847,9 +912,8 @@
maxRetries := client.conf.Metadata.Retry.Max
retries := maxRetries - attemptsRemaining
return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
- } else {
- return client.conf.Metadata.Retry.Backoff
}
+ return client.conf.Metadata.Retry.Backoff
}
func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
@@ -911,3 +975,18 @@
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
}
+
+// nopCloserClient embeds an existing Client, but disables
+// the Close method (yet all other methods pass
+// through unchanged). This is for use in larger structs
+// where it is undesirable to close the client that was
+// passed in by the caller.
+type nopCloserClient struct {
+ Client
+}
+
+// Close intercepts and purposely does not call the underlying
+// client's Close() method.
+func (ncc *nopCloserClient) Close() error {
+ return nil
+}
diff --git a/vendor/github.com/Shopify/sarama/config.go b/vendor/github.com/Shopify/sarama/config.go
index 9495b7f..e2e6513 100644
--- a/vendor/github.com/Shopify/sarama/config.go
+++ b/vendor/github.com/Shopify/sarama/config.go
@@ -10,6 +10,7 @@
"time"
"github.com/rcrowley/go-metrics"
+ "golang.org/x/net/proxy"
)
const defaultClientID = "sarama"
@@ -57,18 +58,28 @@
// SASLMechanism is the name of the enabled SASL mechanism.
// Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
Mechanism SASLMechanism
+ // Version is the SASL Protocol Version to use
+ // Kafka > 1.x should use V1, except on Azure EventHub which use V0
+ Version int16
// Whether or not to send the Kafka SASL handshake first if enabled
// (defaults to true). You should only set this to false if you're using
// a non-Kafka SASL proxy.
Handshake bool
- //username and password for SASL/PLAIN authentication
+ //username and password for SASL/PLAIN or SASL/SCRAM authentication
User string
Password string
+ // authz id used for SASL/SCRAM authentication
+ SCRAMAuthzID string
+ // SCRAMClientGeneratorFunc is a generator of a user provided implementation of a SCRAM
+ // client used to perform the SCRAM exchange with the server.
+ SCRAMClientGeneratorFunc func() SCRAMClient
// TokenProvider is a user-defined callback for generating
// access tokens for SASL/OAUTHBEARER auth. See the
// AccessTokenProvider interface docs for proper implementation
// guidelines.
TokenProvider AccessTokenProvider
+
+ GSSAPI GSSAPIConfig
}
// KeepAlive specifies the keep-alive period for an active network connection.
@@ -80,6 +91,14 @@
// network being dialed.
// If nil, a local address is automatically chosen.
LocalAddr net.Addr
+
+ Proxy struct {
+ // Whether or not to use proxy when connecting to the broker
+ // (defaults to false).
+ Enable bool
+ // The proxy dialer to use enabled (defaults to nil).
+ Dialer proxy.Dialer
+ }
}
// Metadata is the namespace for metadata management properties used by the
@@ -107,6 +126,13 @@
// and usually more convenient, but can take up a substantial amount of
// memory if you have many topics and partitions. Defaults to true.
Full bool
+
+ // How long to wait for a successful metadata response.
+ // Disabled by default which means a metadata request against an unreachable
+ // cluster (all brokers are unreachable or unresponsive) can take up to
+ // `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max`
+ // to fail.
+ Timeout time.Duration
}
// Producer is the namespace for configuration related to producing messages,
@@ -333,6 +359,11 @@
Max int
}
}
+
+ // IsolationLevel support 2 mode:
+ // - use `ReadUncommitted` (default) to consume and return all messages in message channel
+ // - use `ReadCommitted` to hide messages that are part of an aborted transaction
+ IsolationLevel IsolationLevel
}
// A user-provided string sent with every request to the brokers for logging,
@@ -370,6 +401,7 @@
c.Net.ReadTimeout = 30 * time.Second
c.Net.WriteTimeout = 30 * time.Second
c.Net.SASL.Handshake = true
+ c.Net.SASL.Version = SASLHandshakeV0
c.Metadata.Retry.Max = 3
c.Metadata.Retry.Backoff = 250 * time.Millisecond
@@ -414,10 +446,10 @@
// ConfigurationError if the specified values don't make sense.
func (c *Config) Validate() error {
// some configuration values should be warned on but not fail completely, do those first
- if c.Net.TLS.Enable == false && c.Net.TLS.Config != nil {
+ if !c.Net.TLS.Enable && c.Net.TLS.Config != nil {
Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.")
}
- if c.Net.SASL.Enable == false {
+ if !c.Net.SASL.Enable {
if c.Net.SASL.User != "" {
Logger.Println("Net.SASL is disabled but a non-empty username was provided.")
}
@@ -475,22 +507,62 @@
case c.Net.KeepAlive < 0:
return ConfigurationError("Net.KeepAlive must be >= 0")
case c.Net.SASL.Enable:
- // For backwards compatibility, empty mechanism value defaults to PLAIN
- isSASLPlain := len(c.Net.SASL.Mechanism) == 0 || c.Net.SASL.Mechanism == SASLTypePlaintext
- if isSASLPlain {
+ if c.Net.SASL.Mechanism == "" {
+ c.Net.SASL.Mechanism = SASLTypePlaintext
+ }
+
+ switch c.Net.SASL.Mechanism {
+ case SASLTypePlaintext:
if c.Net.SASL.User == "" {
return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
}
if c.Net.SASL.Password == "" {
return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
}
- } else if c.Net.SASL.Mechanism == SASLTypeOAuth {
+ case SASLTypeOAuth:
if c.Net.SASL.TokenProvider == nil {
- return ConfigurationError("An AccessTokenProvider instance must be provided to Net.SASL.User.TokenProvider")
+ return ConfigurationError("An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider")
}
- } else {
- msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s` and `%s`",
- SASLTypeOAuth, SASLTypePlaintext)
+ case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
+ if c.Net.SASL.User == "" {
+ return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
+ }
+ if c.Net.SASL.Password == "" {
+ return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
+ }
+ if c.Net.SASL.SCRAMClientGeneratorFunc == nil {
+ return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc")
+ }
+ case SASLTypeGSSAPI:
+ if c.Net.SASL.GSSAPI.ServiceName == "" {
+ return ConfigurationError("Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used")
+ }
+
+ if c.Net.SASL.GSSAPI.AuthType == KRB5_USER_AUTH {
+ if c.Net.SASL.GSSAPI.Password == "" {
+ return ConfigurationError("Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
+ "mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH")
+ }
+ } else if c.Net.SASL.GSSAPI.AuthType == KRB5_KEYTAB_AUTH {
+ if c.Net.SASL.GSSAPI.KeyTabPath == "" {
+ return ConfigurationError("Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
+ " and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH")
+ }
+ } else {
+ return ConfigurationError("Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH")
+ }
+ if c.Net.SASL.GSSAPI.KerberosConfigPath == "" {
+ return ConfigurationError("Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used")
+ }
+ if c.Net.SASL.GSSAPI.Username == "" {
+ return ConfigurationError("Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used")
+ }
+ if c.Net.SASL.GSSAPI.Realm == "" {
+ return ConfigurationError("Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used")
+ }
+ default:
+ msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s`, `%s` and `%s`",
+ SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypeGSSAPI)
return ConfigurationError(msg)
}
}
@@ -584,6 +656,13 @@
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
case c.Consumer.Offsets.Retry.Max < 0:
return ConfigurationError("Consumer.Offsets.Retry.Max must be >= 0")
+ case c.Consumer.IsolationLevel != ReadUncommitted && c.Consumer.IsolationLevel != ReadCommitted:
+ return ConfigurationError("Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted")
+ }
+
+ // validate IsolationLevel
+ if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
+ return ConfigurationError("ReadCommitted requires Version >= V0_11_0_0")
}
// validate the Consumer Group values
diff --git a/vendor/github.com/Shopify/sarama/config_resource_type.go b/vendor/github.com/Shopify/sarama/config_resource_type.go
index 848cc9c..5399d75 100644
--- a/vendor/github.com/Shopify/sarama/config_resource_type.go
+++ b/vendor/github.com/Shopify/sarama/config_resource_type.go
@@ -1,15 +1,22 @@
package sarama
+//ConfigResourceType is a type for config resource
type ConfigResourceType int8
// Taken from :
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
const (
- UnknownResource ConfigResourceType = 0
- AnyResource ConfigResourceType = 1
- TopicResource ConfigResourceType = 2
- GroupResource ConfigResourceType = 3
- ClusterResource ConfigResourceType = 4
- BrokerResource ConfigResourceType = 5
+ //UnknownResource constant type
+ UnknownResource ConfigResourceType = iota
+ //AnyResource constant type
+ AnyResource
+ //TopicResource constant type
+ TopicResource
+ //GroupResource constant type
+ GroupResource
+ //ClusterResource constant type
+ ClusterResource
+ //BrokerResource constant type
+ BrokerResource
)
diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go
index ce72ff1..72c4d7c 100644
--- a/vendor/github.com/Shopify/sarama/consumer.go
+++ b/vendor/github.com/Shopify/sarama/consumer.go
@@ -3,20 +3,24 @@
import (
"errors"
"fmt"
+ "math"
"sync"
"sync/atomic"
"time"
+
+ "github.com/rcrowley/go-metrics"
)
// ConsumerMessage encapsulates a Kafka message returned by the consumer.
type ConsumerMessage struct {
- Key, Value []byte
- Topic string
- Partition int32
- Offset int64
+ Headers []*RecordHeader // only set if kafka is version 0.11+
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
- Headers []*RecordHeader // only set if kafka is version 0.11+
+
+ Key, Value []byte
+ Topic string
+ Partition int32
+ Offset int64
}
// ConsumerError is what is provided to the user when an error occurs.
@@ -43,13 +47,7 @@
// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
// scope.
-//
-// Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking.
-// For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library
-// builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the
-// https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
type Consumer interface {
-
// Topics returns the set of available topics as retrieved from the cluster
// metadata. This method is the same as Client.Topics(), and is provided for
// convenience.
@@ -75,13 +73,11 @@
}
type consumer struct {
- client Client
- conf *Config
- ownClient bool
-
- lock sync.Mutex
+ conf *Config
children map[string]map[int32]*partitionConsumer
brokerConsumers map[*Broker]*brokerConsumer
+ client Client
+ lock sync.Mutex
}
// NewConsumer creates a new consumer using the given broker addresses and configuration.
@@ -90,18 +86,19 @@
if err != nil {
return nil, err
}
-
- c, err := NewConsumerFromClient(client)
- if err != nil {
- return nil, err
- }
- c.(*consumer).ownClient = true
- return c, nil
+ return newConsumer(client)
}
// NewConsumerFromClient creates a new consumer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this consumer.
func NewConsumerFromClient(client Client) (Consumer, error) {
+ // For clients passed in by the client, ensure we don't
+ // call Close() on it.
+ cli := &nopCloserClient{client}
+ return newConsumer(cli)
+}
+
+func newConsumer(client Client) (Consumer, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
@@ -118,10 +115,7 @@
}
func (c *consumer) Close() error {
- if c.ownClient {
- return c.client.Close()
- }
- return nil
+ return c.client.Close()
}
func (c *consumer) Topics() ([]string, error) {
@@ -261,12 +255,11 @@
// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
//
// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
-// consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process
+// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
type PartitionConsumer interface {
-
// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
@@ -298,24 +291,22 @@
type partitionConsumer struct {
highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
- consumer *consumer
- conf *Config
- topic string
- partition int32
+ consumer *consumer
+ conf *Config
broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
feeder chan *FetchResponse
trigger, dying chan none
- responseResult error
closeOnce sync.Once
-
- fetchSize int32
- offset int64
-
- retries int32
+ topic string
+ partition int32
+ responseResult error
+ fetchSize int32
+ offset int64
+ retries int32
}
var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
@@ -338,9 +329,8 @@
if child.conf.Consumer.Retry.BackoffFunc != nil {
retries := atomic.AddInt32(&child.retries, 1)
return child.conf.Consumer.Retry.BackoffFunc(int(retries))
- } else {
- return child.conf.Consumer.Retry.Backoff
}
+ return child.conf.Consumer.Retry.Backoff
}
func (child *partitionConsumer) dispatcher() {
@@ -432,12 +422,6 @@
func (child *partitionConsumer) Close() error {
child.AsyncClose()
- go withRecover(func() {
- for range child.messages {
- // drain
- }
- })
-
var errors ConsumerErrors
for err := range child.errors {
errors = append(errors, err)
@@ -469,14 +453,22 @@
for i, msg := range msgs {
messageSelect:
select {
+ case <-child.dying:
+ child.broker.acks.Done()
+ continue feederLoop
case child.messages <- msg:
firstAttempt = true
case <-expiryTicker.C:
if !firstAttempt {
child.responseResult = errTimedOut
child.broker.acks.Done()
+ remainingLoop:
for _, msg = range msgs[i:] {
- child.messages <- msg
+ select {
+ case child.messages <- msg:
+ case <-child.dying:
+ break remainingLoop
+ }
}
child.broker.input <- child
continue feederLoop
@@ -532,7 +524,8 @@
}
func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
- var messages []*ConsumerMessage
+ messages := make([]*ConsumerMessage, 0, len(batch.Records))
+
for _, rec := range batch.Records {
offset := batch.FirstOffset + rec.OffsetDelta
if offset < child.offset {
@@ -560,6 +553,23 @@
}
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
+ var (
+ metricRegistry = child.conf.MetricRegistry
+ consumerBatchSizeMetric metrics.Histogram
+ )
+
+ if metricRegistry != nil {
+ consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
+ }
+
+ // If request was throttled and empty we log and return without error
+ if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
+ Logger.Printf(
+ "consumer/broker/%d FetchResponse throttled %v\n",
+ child.broker.broker.ID(), response.ThrottleTime)
+ return nil, nil
+ }
+
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return nil, ErrIncompleteResponse
@@ -573,6 +583,9 @@
if err != nil {
return nil, err
}
+
+ consumerBatchSizeMetric.Update(int64(nRecs))
+
if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
if err != nil {
@@ -587,6 +600,10 @@
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
+ // check int32 overflow
+ if child.fetchSize < 0 {
+ child.fetchSize = math.MaxInt32
+ }
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
@@ -600,6 +617,12 @@
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
+ // abortedProducerIDs contains producerID which message should be ignored as uncommitted
+ // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
+ // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
+ abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
+ abortedTransactions := block.getAbortedTransactions()
+
messages := []*ConsumerMessage{}
for _, records := range block.RecordsSet {
switch records.recordsType {
@@ -611,13 +634,55 @@
messages = append(messages, messageSetMessages...)
case defaultRecords:
+ // Consume remaining abortedTransaction up to last offset of current batch
+ for _, txn := range abortedTransactions {
+ if txn.FirstOffset > records.RecordBatch.LastOffset() {
+ break
+ }
+ abortedProducerIDs[txn.ProducerID] = struct{}{}
+ // Pop abortedTransactions so that we never add it again
+ abortedTransactions = abortedTransactions[1:]
+ }
+
recordBatchMessages, err := child.parseRecords(records.RecordBatch)
if err != nil {
return nil, err
}
- if control, err := records.isControl(); err != nil || control {
+
+ // Parse and commit offset but do not expose messages that are:
+ // - control records
+ // - part of an aborted transaction when set to `ReadCommitted`
+
+ // control record
+ isControl, err := records.isControl()
+ if err != nil {
+ // I don't know why there is this continue in case of error to begin with
+ // Safe bet is to ignore control messages if ReadUncommitted
+ // and block on them in case of error and ReadCommitted
+ if child.conf.Consumer.IsolationLevel == ReadCommitted {
+ return nil, err
+ }
continue
}
+ if isControl {
+ controlRecord, err := records.getControlRecord()
+ if err != nil {
+ return nil, err
+ }
+
+ if controlRecord.Type == ControlRecordAbort {
+ delete(abortedProducerIDs, records.RecordBatch.ProducerID)
+ }
+ continue
+ }
+
+ // filter aborted transactions
+ if child.conf.Consumer.IsolationLevel == ReadCommitted {
+ _, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
+ if records.RecordBatch.IsTransactional && isAborted {
+ continue
+ }
+ }
messages = append(messages, recordBatchMessages...)
default:
@@ -628,15 +693,13 @@
return messages, nil
}
-// brokerConsumer
-
type brokerConsumer struct {
consumer *consumer
broker *Broker
input chan *partitionConsumer
newSubscriptions chan []*partitionConsumer
- wait chan none
subscriptions map[*partitionConsumer]none
+ wait chan none
acks sync.WaitGroup
refs int
}
@@ -658,14 +721,14 @@
return bc
}
+// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
+// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
+// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
+// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
+// so the main goroutine can block waiting for work if it has none.
func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer
- // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
- // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
- // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
- // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
- // so the main goroutine can block waiting for work if it has none.
for {
if len(buffer) > 0 {
select {
@@ -698,10 +761,10 @@
close(bc.newSubscriptions)
}
+//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
func (bc *brokerConsumer) subscriptionConsumer() {
<-bc.wait // wait for our first piece of work
- // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptions(newSubscriptions)
@@ -742,20 +805,20 @@
close(child.trigger)
delete(bc.subscriptions, child)
default:
- break
+ // no-op
}
}
}
+//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
func (bc *brokerConsumer) handleResponses() {
- // handles the response codes left for us by our subscriptions, and abandons ones that have been closed
for child := range bc.subscriptions {
result := child.responseResult
child.responseResult = nil
switch result {
case nil:
- break
+ // no-op
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
@@ -822,7 +885,7 @@
}
if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 4
- request.Isolation = ReadUncommitted // We don't support yet transactions.
+ request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
}
for child := range bc.subscriptions {
diff --git a/vendor/github.com/Shopify/sarama/consumer_group.go b/vendor/github.com/Shopify/sarama/consumer_group.go
index 8c8babc..8de9513 100644
--- a/vendor/github.com/Shopify/sarama/consumer_group.go
+++ b/vendor/github.com/Shopify/sarama/consumer_group.go
@@ -52,8 +52,7 @@
}
type consumerGroup struct {
- client Client
- ownClient bool
+ client Client
config *Config
consumer Consumer
@@ -73,20 +72,24 @@
return nil, err
}
- c, err := NewConsumerGroupFromClient(groupID, client)
+ c, err := newConsumerGroup(groupID, client)
if err != nil {
_ = client.Close()
- return nil, err
}
-
- c.(*consumerGroup).ownClient = true
- return c, nil
+ return c, err
}
// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this consumer.
// PLEASE NOTE: consumer groups can only re-use but not share clients.
func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
+ // For clients passed in by the client, ensure we don't
+ // call Close() on it.
+ cli := &nopCloserClient{client}
+ return newConsumerGroup(groupID, cli)
+}
+
+func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
config := client.Config()
if !config.Version.IsAtLeast(V0_10_2_0) {
return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
@@ -131,10 +134,8 @@
err = e
}
- if c.ownClient {
- if e := c.client.Close(); e != nil {
- err = e
- }
+ if e := c.client.Close(); e != nil {
+ err = e
}
})
return
@@ -162,14 +163,8 @@
return err
}
- // Get coordinator
- coordinator, err := c.client.Coordinator(c.groupID)
- if err != nil {
- return err
- }
-
// Init session
- sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
+ sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
if err == ErrClosedClient {
return ErrClosedConsumerGroup
} else if err != nil {
@@ -183,7 +178,33 @@
return sess.release(true)
}
-func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
+func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
+ select {
+ case <-c.closed:
+ return nil, ErrClosedConsumerGroup
+ case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
+ }
+
+ if refreshCoordinator {
+ err := c.client.RefreshCoordinator(c.groupID)
+ if err != nil {
+ return c.retryNewSession(ctx, topics, handler, retries, true)
+ }
+ }
+
+ return c.newSession(ctx, topics, handler, retries-1)
+}
+
+func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
+ coordinator, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ if retries <= 0 {
+ return nil, err
+ }
+
+ return c.retryNewSession(ctx, topics, handler, retries, true)
+ }
+
// Join consumer group
join, err := c.joinGroupRequest(coordinator, topics)
if err != nil {
@@ -195,19 +216,19 @@
c.memberID = join.MemberId
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
- return c.newSession(ctx, coordinator, topics, handler, retries)
+ return c.newSession(ctx, topics, handler, retries)
+ case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
+ if retries <= 0 {
+ return nil, join.Err
+ }
+
+ return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, join.Err
}
- select {
- case <-c.closed:
- return nil, ErrClosedConsumerGroup
- case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
- }
-
- return c.newSession(ctx, coordinator, topics, handler, retries-1)
+ return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, join.Err
}
@@ -236,19 +257,19 @@
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
- return c.newSession(ctx, coordinator, topics, handler, retries)
+ return c.newSession(ctx, topics, handler, retries)
+ case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
+ if retries <= 0 {
+ return nil, sync.Err
+ }
+
+ return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, sync.Err
}
- select {
- case <-c.closed:
- return nil, ErrClosedConsumerGroup
- case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
- }
-
- return c.newSession(ctx, coordinator, topics, handler, retries-1)
+ return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, sync.Err
}
@@ -613,7 +634,7 @@
s.releaseOnce.Do(func() {
if withCleanup {
if e := s.handler.Cleanup(s); e != nil {
- s.parent.handleError(err, "", -1)
+ s.parent.handleError(e, "", -1)
err = e
}
}
diff --git a/vendor/github.com/Shopify/sarama/control_record.go b/vendor/github.com/Shopify/sarama/control_record.go
new file mode 100644
index 0000000..9b75ab5
--- /dev/null
+++ b/vendor/github.com/Shopify/sarama/control_record.go
@@ -0,0 +1,72 @@
+package sarama
+
+//ControlRecordType ...
+type ControlRecordType int
+
+const (
+ //ControlRecordAbort is a control record for abort
+ ControlRecordAbort ControlRecordType = iota
+ //ControlRecordCommit is a control record for commit
+ ControlRecordCommit
+ //ControlRecordUnknown is a control record of unknown type
+ ControlRecordUnknown
+)
+
+// Control records are returned as a record by fetchRequest
+// However unlike "normal" records, they mean nothing application wise.
+// They only serve internal logic for supporting transactions.
+type ControlRecord struct {
+ Version int16
+ CoordinatorEpoch int32
+ Type ControlRecordType
+}
+
+func (cr *ControlRecord) decode(key, value packetDecoder) error {
+ var err error
+ cr.Version, err = value.getInt16()
+ if err != nil {
+ return err
+ }
+
+ cr.CoordinatorEpoch, err = value.getInt32()
+ if err != nil {
+ return err
+ }
+
+ // There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
+ // Either way, all these version can only be 0 for now
+ cr.Version, err = key.getInt16()
+ if err != nil {
+ return err
+ }
+
+ recordType, err := key.getInt16()
+ if err != nil {
+ return err
+ }
+
+ switch recordType {
+ case 0:
+ cr.Type = ControlRecordAbort
+ case 1:
+ cr.Type = ControlRecordCommit
+ default:
+ // from JAVA implementation:
+ // UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
+ cr.Type = ControlRecordUnknown
+ }
+ return nil
+}
+
+func (cr *ControlRecord) encode(key, value packetEncoder) {
+ value.putInt16(cr.Version)
+ value.putInt32(cr.CoordinatorEpoch)
+ key.putInt16(cr.Version)
+
+ switch cr.Type {
+ case ControlRecordAbort:
+ key.putInt16(0)
+ case ControlRecordCommit:
+ key.putInt16(1)
+ }
+}
diff --git a/vendor/github.com/Shopify/sarama/crc32_field.go b/vendor/github.com/Shopify/sarama/crc32_field.go
index 1f14443..38189a3 100644
--- a/vendor/github.com/Shopify/sarama/crc32_field.go
+++ b/vendor/github.com/Shopify/sarama/crc32_field.go
@@ -4,6 +4,7 @@
"encoding/binary"
"fmt"
"hash/crc32"
+ "sync"
)
type crcPolynomial int8
@@ -13,6 +14,22 @@
crcCastagnoli
)
+var crc32FieldPool = sync.Pool{}
+
+func acquireCrc32Field(polynomial crcPolynomial) *crc32Field {
+ val := crc32FieldPool.Get()
+ if val != nil {
+ c := val.(*crc32Field)
+ c.polynomial = polynomial
+ return c
+ }
+ return newCRC32Field(polynomial)
+}
+
+func releaseCrc32Field(c *crc32Field) {
+ crc32FieldPool.Put(c)
+}
+
var castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
// crc32Field implements the pushEncoder and pushDecoder interfaces for calculating CRC32s.
diff --git a/vendor/github.com/Shopify/sarama/create_partitions_response.go b/vendor/github.com/Shopify/sarama/create_partitions_response.go
index abd621c..bb18204 100644
--- a/vendor/github.com/Shopify/sarama/create_partitions_response.go
+++ b/vendor/github.com/Shopify/sarama/create_partitions_response.go
@@ -1,6 +1,9 @@
package sarama
-import "time"
+import (
+ "fmt"
+ "time"
+)
type CreatePartitionsResponse struct {
ThrottleTime time.Duration
@@ -69,6 +72,14 @@
ErrMsg *string
}
+func (t *TopicPartitionError) Error() string {
+ text := t.Err.Error()
+ if t.ErrMsg != nil {
+ text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
+ }
+ return text
+}
+
func (t *TopicPartitionError) encode(pe packetEncoder) error {
pe.putInt16(int16(t.Err))
diff --git a/vendor/github.com/Shopify/sarama/create_topics_response.go b/vendor/github.com/Shopify/sarama/create_topics_response.go
index 66207e0..a493e02 100644
--- a/vendor/github.com/Shopify/sarama/create_topics_response.go
+++ b/vendor/github.com/Shopify/sarama/create_topics_response.go
@@ -1,6 +1,9 @@
package sarama
-import "time"
+import (
+ "fmt"
+ "time"
+)
type CreateTopicsResponse struct {
Version int16
@@ -83,6 +86,14 @@
ErrMsg *string
}
+func (t *TopicError) Error() string {
+ text := t.Err.Error()
+ if t.ErrMsg != nil {
+ text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
+ }
+ return text
+}
+
func (t *TopicError) encode(pe packetEncoder, version int16) error {
pe.putInt16(int16(t.Err))
diff --git a/vendor/github.com/Shopify/sarama/describe_configs_response.go b/vendor/github.com/Shopify/sarama/describe_configs_response.go
index 63fb6ea..5737232 100644
--- a/vendor/github.com/Shopify/sarama/describe_configs_response.go
+++ b/vendor/github.com/Shopify/sarama/describe_configs_response.go
@@ -26,12 +26,12 @@
}
const (
- SourceUnknown ConfigSource = 0
- SourceTopic ConfigSource = 1
- SourceDynamicBroker ConfigSource = 2
- SourceDynamicDefaultBroker ConfigSource = 3
- SourceStaticBroker ConfigSource = 4
- SourceDefault ConfigSource = 5
+ SourceUnknown ConfigSource = iota
+ SourceTopic
+ SourceDynamicBroker
+ SourceDynamicDefaultBroker
+ SourceStaticBroker
+ SourceDefault
)
type DescribeConfigsResponse struct {
diff --git a/vendor/github.com/Shopify/sarama/dev.yml b/vendor/github.com/Shopify/sarama/dev.yml
index 97eed3a..3f4d569 100644
--- a/vendor/github.com/Shopify/sarama/dev.yml
+++ b/vendor/github.com/Shopify/sarama/dev.yml
@@ -2,7 +2,7 @@
up:
- go:
- version: '1.11'
+ version: '1.12'
commands:
test:
diff --git a/vendor/github.com/Shopify/sarama/errors.go b/vendor/github.com/Shopify/sarama/errors.go
index 87a4c61..c6a8be7 100644
--- a/vendor/github.com/Shopify/sarama/errors.go
+++ b/vendor/github.com/Shopify/sarama/errors.go
@@ -81,6 +81,28 @@
// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
type KError int16
+// MultiError is used to contain multi error.
+type MultiError struct {
+ Errors *[]error
+}
+
+func (mErr MultiError) Error() string {
+ var errString = ""
+ for _, err := range *mErr.Errors {
+ errString += err.Error() + ","
+ }
+ return errString
+}
+
+// ErrDeleteRecords is the type of error returned when fail to delete the required records
+type ErrDeleteRecords struct {
+ MultiError
+}
+
+func (err ErrDeleteRecords) Error() string {
+ return "kafka server: failed to delete records " + err.MultiError.Error()
+}
+
// Numeric error codes returned by the Kafka server.
const (
ErrNoError KError = 0
@@ -161,6 +183,11 @@
ErrFencedLeaderEpoch KError = 74
ErrUnknownLeaderEpoch KError = 75
ErrUnsupportedCompressionType KError = 76
+ ErrStaleBrokerEpoch KError = 77
+ ErrOffsetNotAvailable KError = 78
+ ErrMemberIdRequired KError = 79
+ ErrPreferredLeaderNotAvailable KError = 80
+ ErrGroupMaxSizeReached KError = 81
)
func (err KError) Error() string {
@@ -323,6 +350,16 @@
return "kafka server: The leader epoch in the request is newer than the epoch on the broker."
case ErrUnsupportedCompressionType:
return "kafka server: The requesting client does not support the compression type of given partition."
+ case ErrStaleBrokerEpoch:
+ return "kafka server: Broker epoch has changed"
+ case ErrOffsetNotAvailable:
+ return "kafka server: The leader high watermark has not caught up from a recent leader election so the offsets cannot be guaranteed to be monotonically increasing"
+ case ErrMemberIdRequired:
+ return "kafka server: The group member needs to have a valid member id before actually entering a consumer group"
+ case ErrPreferredLeaderNotAvailable:
+ return "kafka server: The preferred leader was not available"
+ case ErrGroupMaxSizeReached:
+ return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members."
}
return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
diff --git a/vendor/github.com/Shopify/sarama/fetch_request.go b/vendor/github.com/Shopify/sarama/fetch_request.go
index 462ab8a..4db9ddd 100644
--- a/vendor/github.com/Shopify/sarama/fetch_request.go
+++ b/vendor/github.com/Shopify/sarama/fetch_request.go
@@ -36,8 +36,8 @@
type IsolationLevel int8
const (
- ReadUncommitted IsolationLevel = 0
- ReadCommitted IsolationLevel = 1
+ ReadUncommitted IsolationLevel = iota
+ ReadCommitted
)
func (r *FetchRequest) encode(pe packetEncoder) (err error) {
diff --git a/vendor/github.com/Shopify/sarama/fetch_response.go b/vendor/github.com/Shopify/sarama/fetch_response.go
index 9df99c1..3afc187 100644
--- a/vendor/github.com/Shopify/sarama/fetch_response.go
+++ b/vendor/github.com/Shopify/sarama/fetch_response.go
@@ -1,6 +1,7 @@
package sarama
import (
+ "sort"
"time"
)
@@ -185,6 +186,17 @@
return pe.pop()
}
+func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
+ // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
+ // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
+ at := b.AbortedTransactions
+ sort.Slice(
+ at,
+ func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
+ )
+ return at
+}
+
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
@@ -385,6 +397,65 @@
batch.addRecord(rec)
}
+// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
+// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
+// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
+func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
+ frb := r.getOrCreateBlock(topic, partition)
+ kb, vb := encodeKV(key, value)
+
+ records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
+ batch := &RecordBatch{
+ Version: 2,
+ LogAppendTime: r.LogAppendTime,
+ FirstTimestamp: timestamp,
+ MaxTimestamp: r.Timestamp,
+ FirstOffset: offset,
+ LastOffsetDelta: 0,
+ ProducerID: producerID,
+ IsTransactional: isTransactional,
+ }
+ rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
+ batch.addRecord(rec)
+ records.RecordBatch = batch
+
+ frb.RecordsSet = append(frb.RecordsSet, &records)
+}
+
+func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
+ frb := r.getOrCreateBlock(topic, partition)
+
+ // batch
+ batch := &RecordBatch{
+ Version: 2,
+ LogAppendTime: r.LogAppendTime,
+ FirstTimestamp: timestamp,
+ MaxTimestamp: r.Timestamp,
+ FirstOffset: offset,
+ LastOffsetDelta: 0,
+ ProducerID: producerID,
+ IsTransactional: true,
+ Control: true,
+ }
+
+ // records
+ records := newDefaultRecords(nil)
+ records.RecordBatch = batch
+
+ // record
+ crAbort := ControlRecord{
+ Version: 0,
+ Type: recordType,
+ }
+ crKey := &realEncoder{raw: make([]byte, 4)}
+ crValue := &realEncoder{raw: make([]byte, 6)}
+ crAbort.encode(crKey, crValue)
+ rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
+ batch.addRecord(rec)
+
+ frb.RecordsSet = append(frb.RecordsSet, &records)
+}
+
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
}
@@ -393,6 +464,15 @@
r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
}
+func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
+ r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
+}
+
+func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
+ // define controlRecord key and value
+ r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
+}
+
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
frb := r.getOrCreateBlock(topic, partition)
if len(frb.RecordsSet) == 0 {
diff --git a/vendor/github.com/Shopify/sarama/find_coordinator_request.go b/vendor/github.com/Shopify/sarama/find_coordinator_request.go
index 0ab5cb5..ff2ad20 100644
--- a/vendor/github.com/Shopify/sarama/find_coordinator_request.go
+++ b/vendor/github.com/Shopify/sarama/find_coordinator_request.go
@@ -3,8 +3,8 @@
type CoordinatorType int8
const (
- CoordinatorGroup CoordinatorType = 0
- CoordinatorTransaction CoordinatorType = 1
+ CoordinatorGroup CoordinatorType = iota
+ CoordinatorTransaction
)
type FindCoordinatorRequest struct {
diff --git a/vendor/github.com/Shopify/sarama/go.mod b/vendor/github.com/Shopify/sarama/go.mod
index 3715129..8c45155 100644
--- a/vendor/github.com/Shopify/sarama/go.mod
+++ b/vendor/github.com/Shopify/sarama/go.mod
@@ -1,13 +1,24 @@
module github.com/Shopify/sarama
require (
- github.com/DataDog/zstd v1.3.5
+ github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/davecgh/go-spew v1.1.1
github.com/eapache/go-resiliency v1.1.0
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
github.com/eapache/queue v1.1.0
- github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
- github.com/pierrec/lz4 v2.0.5+incompatible
+ github.com/golang/snappy v0.0.1 // indirect
+ github.com/hashicorp/go-uuid v1.0.1 // indirect
+ github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03
+ github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
+ github.com/stretchr/testify v1.3.0
+ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
+ github.com/xdg/stringprep v1.0.0 // indirect
+ golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 // indirect
+ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
+ gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
+ gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
+ gopkg.in/jcmturner/gokrb5.v7 v7.2.3
+ gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
)
diff --git a/vendor/github.com/Shopify/sarama/go.sum b/vendor/github.com/Shopify/sarama/go.sum
index 58e2e91..4dbc6d2 100644
--- a/vendor/github.com/Shopify/sarama/go.sum
+++ b/vendor/github.com/Shopify/sarama/go.sum
@@ -1,7 +1,8 @@
-github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14=
-github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 h1:2T/jmrHeTezcCM58lvEQXs0UpQJCo5SoGAcg+mbSTIg=
+github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
@@ -10,9 +11,41 @@
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
-github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
-github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
-github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
-github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
+github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
+github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
+github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU=
+github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
+github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
+github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
+github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
+github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE=
+golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
+gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
+gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
+gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
+gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010=
+gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
+gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
+gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
diff --git a/vendor/github.com/Shopify/sarama/gssapi_kerberos.go b/vendor/github.com/Shopify/sarama/gssapi_kerberos.go
new file mode 100644
index 0000000..49b632d
--- /dev/null
+++ b/vendor/github.com/Shopify/sarama/gssapi_kerberos.go
@@ -0,0 +1,257 @@
+package sarama
+
+import (
+ "encoding/binary"
+ "fmt"
+ "github.com/jcmturner/gofork/encoding/asn1"
+ "gopkg.in/jcmturner/gokrb5.v7/asn1tools"
+ "gopkg.in/jcmturner/gokrb5.v7/gssapi"
+ "gopkg.in/jcmturner/gokrb5.v7/iana/chksumtype"
+ "gopkg.in/jcmturner/gokrb5.v7/iana/keyusage"
+ "gopkg.in/jcmturner/gokrb5.v7/messages"
+ "gopkg.in/jcmturner/gokrb5.v7/types"
+ "io"
+ "strings"
+ "time"
+)
+
+const (
+ TOK_ID_KRB_AP_REQ = 256
+ GSS_API_GENERIC_TAG = 0x60
+ KRB5_USER_AUTH = 1
+ KRB5_KEYTAB_AUTH = 2
+ GSS_API_INITIAL = 1
+ GSS_API_VERIFY = 2
+ GSS_API_FINISH = 3
+)
+
+type GSSAPIConfig struct {
+ AuthType int
+ KeyTabPath string
+ KerberosConfigPath string
+ ServiceName string
+ Username string
+ Password string
+ Realm string
+}
+
+type GSSAPIKerberosAuth struct {
+ Config *GSSAPIConfig
+ ticket messages.Ticket
+ encKey types.EncryptionKey
+ NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
+ step int
+}
+
+type KerberosClient interface {
+ Login() error
+ GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
+ Domain() string
+ CName() types.PrincipalName
+ Destroy()
+}
+
+/*
+*
+* Appends length in big endian before payload, and send it to kafka
+*
+ */
+
+func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
+ length := len(payload)
+ finalPackage := make([]byte, length+4) //4 byte length header + payload
+ copy(finalPackage[4:], payload)
+ binary.BigEndian.PutUint32(finalPackage, uint32(length))
+ bytes, err := broker.conn.Write(finalPackage)
+ if err != nil {
+ return bytes, err
+ }
+ return bytes, nil
+}
+
+/*
+*
+* Read length (4 bytes) and then read the payload
+*
+ */
+
+func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
+ bytesRead := 0
+ lengthInBytes := make([]byte, 4)
+ bytes, err := io.ReadFull(broker.conn, lengthInBytes)
+ if err != nil {
+ return nil, bytesRead, err
+ }
+ bytesRead += bytes
+ payloadLength := binary.BigEndian.Uint32(lengthInBytes)
+ payloadBytes := make([]byte, payloadLength) // buffer for read..
+ bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
+ if err != nil {
+ return payloadBytes, bytesRead, err
+ }
+ bytesRead += bytes
+ return payloadBytes, bytesRead, nil
+}
+
+func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
+ a := make([]byte, 24)
+ flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
+ binary.LittleEndian.PutUint32(a[:4], 16)
+ for _, i := range flags {
+ f := binary.LittleEndian.Uint32(a[20:24])
+ f |= uint32(i)
+ binary.LittleEndian.PutUint32(a[20:24], f)
+ }
+ return a
+}
+
+/*
+*
+* Construct Kerberos AP_REQ package, conforming to RFC-4120
+* https://tools.ietf.org/html/rfc4120#page-84
+*
+ */
+func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
+ domain string, cname types.PrincipalName,
+ ticket messages.Ticket,
+ sessionKey types.EncryptionKey) ([]byte, error) {
+ auth, err := types.NewAuthenticator(domain, cname)
+ if err != nil {
+ return nil, err
+ }
+ auth.Cksum = types.Checksum{
+ CksumType: chksumtype.GSSAPI,
+ Checksum: krbAuth.newAuthenticatorChecksum(),
+ }
+ APReq, err := messages.NewAPReq(
+ ticket,
+ sessionKey,
+ auth,
+ )
+ if err != nil {
+ return nil, err
+ }
+ aprBytes := make([]byte, 2)
+ binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
+ tb, err := APReq.Marshal()
+ if err != nil {
+ return nil, err
+ }
+ aprBytes = append(aprBytes, tb...)
+ return aprBytes, nil
+}
+
+/*
+*
+* Append the GSS-API header to the payload, conforming to RFC-2743
+* Section 3.1, Mechanism-Independent Token Format
+*
+* https://tools.ietf.org/html/rfc2743#page-81
+*
+* GSSAPIHeader + <specific mechanism payload>
+*
+ */
+func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
+ oidBytes, err := asn1.Marshal(gssapi.OID(gssapi.OIDKRB5))
+ if err != nil {
+ return nil, err
+ }
+ tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
+ GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
+ GSSHeader = append(GSSHeader, oidBytes...)
+ GSSPackage := append(GSSHeader, payload...)
+ return GSSPackage, nil
+}
+
+func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient KerberosClient) ([]byte, error) {
+ switch krbAuth.step {
+ case GSS_API_INITIAL:
+ aprBytes, err := krbAuth.createKrb5Token(
+ kerberosClient.Domain(),
+ kerberosClient.CName(),
+ krbAuth.ticket,
+ krbAuth.encKey)
+ if err != nil {
+ return nil, err
+ }
+ krbAuth.step = GSS_API_VERIFY
+ return krbAuth.appendGSSAPIHeader(aprBytes)
+ case GSS_API_VERIFY:
+ wrapTokenReq := gssapi.WrapToken{}
+ if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
+ return nil, err
+ }
+ // Validate response.
+ isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
+ if !isValid {
+ return nil, err
+ }
+
+ wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
+ if err != nil {
+ return nil, err
+ }
+ krbAuth.step = GSS_API_FINISH
+ return wrapTokenResponse.Marshal()
+ }
+ return nil, nil
+}
+
+/* This does the handshake for authorization */
+func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
+
+ kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
+ if err != nil {
+ Logger.Printf("Kerberos client error: %s", err)
+ return err
+ }
+
+ err = kerberosClient.Login()
+ if err != nil {
+ Logger.Printf("Kerberos client error: %s", err)
+ return err
+ }
+ // Construct SPN using serviceName and host
+ // SPN format: <SERVICE>/<FQDN>
+
+ host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
+ spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
+
+ ticket, encKey, err := kerberosClient.GetServiceTicket(spn)
+
+ if err != nil {
+ Logger.Printf("Error getting Kerberos service ticket : %s", err)
+ return err
+ }
+ krbAuth.ticket = ticket
+ krbAuth.encKey = encKey
+ krbAuth.step = GSS_API_INITIAL
+ var receivedBytes []byte = nil
+ defer kerberosClient.Destroy()
+ for {
+ packBytes, err := krbAuth.initSecContext(receivedBytes, kerberosClient)
+ if err != nil {
+ Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
+ return err
+ }
+ requestTime := time.Now()
+ bytesWritten, err := krbAuth.writePackage(broker, packBytes)
+ if err != nil {
+ Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
+ return err
+ }
+ broker.updateOutgoingCommunicationMetrics(bytesWritten)
+ if krbAuth.step == GSS_API_VERIFY {
+ var bytesRead = 0
+ receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
+ requestLatency := time.Since(requestTime)
+ broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
+ if err != nil {
+ Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
+ return err
+ }
+ } else if krbAuth.step == GSS_API_FINISH {
+ return nil
+ }
+ }
+}
diff --git a/vendor/github.com/Shopify/sarama/kerberos_client.go b/vendor/github.com/Shopify/sarama/kerberos_client.go
new file mode 100644
index 0000000..91b998f
--- /dev/null
+++ b/vendor/github.com/Shopify/sarama/kerberos_client.go
@@ -0,0 +1,51 @@
+package sarama
+
+import (
+ krb5client "gopkg.in/jcmturner/gokrb5.v7/client"
+ krb5config "gopkg.in/jcmturner/gokrb5.v7/config"
+ "gopkg.in/jcmturner/gokrb5.v7/keytab"
+ "gopkg.in/jcmturner/gokrb5.v7/types"
+)
+
+type KerberosGoKrb5Client struct {
+ krb5client.Client
+}
+
+func (c *KerberosGoKrb5Client) Domain() string {
+ return c.Credentials.Domain()
+}
+
+func (c *KerberosGoKrb5Client) CName() types.PrincipalName {
+ return c.Credentials.CName()
+}
+
+/*
+*
+* Create kerberos client used to obtain TGT and TGS tokens
+* used gokrb5 library, which is a pure go kerberos client with
+* some GSS-API capabilities, and SPNEGO support. Kafka does not use SPNEGO
+* it uses pure Kerberos 5 solution (RFC-4121 and RFC-4120).
+*
+ */
+func NewKerberosClient(config *GSSAPIConfig) (KerberosClient, error) {
+ cfg, err := krb5config.Load(config.KerberosConfigPath)
+ if err != nil {
+ return nil, err
+ }
+ return createClient(config, cfg)
+}
+
+func createClient(config *GSSAPIConfig, cfg *krb5config.Config) (KerberosClient, error) {
+ var client *krb5client.Client
+ if config.AuthType == KRB5_KEYTAB_AUTH {
+ kt, err := keytab.Load(config.KeyTabPath)
+ if err != nil {
+ return nil, err
+ }
+ client = krb5client.NewClientWithKeytab(config.Username, config.Realm, kt, cfg)
+ } else {
+ client = krb5client.NewClientWithPassword(config.Username,
+ config.Realm, config.Password, cfg)
+ }
+ return &KerberosGoKrb5Client{*client}, nil
+}
diff --git a/vendor/github.com/Shopify/sarama/length_field.go b/vendor/github.com/Shopify/sarama/length_field.go
index da199a7..7d864f6 100644
--- a/vendor/github.com/Shopify/sarama/length_field.go
+++ b/vendor/github.com/Shopify/sarama/length_field.go
@@ -1,6 +1,9 @@
package sarama
-import "encoding/binary"
+import (
+ "encoding/binary"
+ "sync"
+)
// LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
type lengthField struct {
@@ -8,6 +11,20 @@
length int32
}
+var lengthFieldPool = sync.Pool{}
+
+func acquireLengthField() *lengthField {
+ val := lengthFieldPool.Get()
+ if val != nil {
+ return val.(*lengthField)
+ }
+ return &lengthField{}
+}
+
+func releaseLengthField(m *lengthField) {
+ lengthFieldPool.Put(m)
+}
+
func (l *lengthField) decode(pd packetDecoder) error {
var err error
l.length, err = pd.getInt32()
diff --git a/vendor/github.com/Shopify/sarama/message.go b/vendor/github.com/Shopify/sarama/message.go
index f64c79b..7c54748 100644
--- a/vendor/github.com/Shopify/sarama/message.go
+++ b/vendor/github.com/Shopify/sarama/message.go
@@ -5,37 +5,44 @@
"time"
)
-// The lowest 3 bits contain the compression codec used for the message
-const compressionCodecMask int8 = 0x07
+const (
+ //CompressionNone no compression
+ CompressionNone CompressionCodec = iota
+ //CompressionGZIP compression using GZIP
+ CompressionGZIP
+ //CompressionSnappy compression using snappy
+ CompressionSnappy
+ //CompressionLZ4 compression using LZ4
+ CompressionLZ4
+ //CompressionZSTD compression using ZSTD
+ CompressionZSTD
-// Bit 3 set for "LogAppend" timestamps
-const timestampTypeMask = 0x08
+ // The lowest 3 bits contain the compression codec used for the message
+ compressionCodecMask int8 = 0x07
+
+ // Bit 3 set for "LogAppend" timestamps
+ timestampTypeMask = 0x08
+
+ // CompressionLevelDefault is the constant to use in CompressionLevel
+ // to have the default compression level for any codec. The value is picked
+ // that we don't use any existing compression levels.
+ CompressionLevelDefault = -1000
+)
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
type CompressionCodec int8
-const (
- CompressionNone CompressionCodec = 0
- CompressionGZIP CompressionCodec = 1
- CompressionSnappy CompressionCodec = 2
- CompressionLZ4 CompressionCodec = 3
- CompressionZSTD CompressionCodec = 4
-)
-
func (cc CompressionCodec) String() string {
return []string{
"none",
"gzip",
"snappy",
"lz4",
+ "zstd",
}[int(cc)]
}
-// CompressionLevelDefault is the constant to use in CompressionLevel
-// to have the default compression level for any codec. The value is picked
-// that we don't use any existing compression levels.
-const CompressionLevelDefault = -1000
-
+//Message is a kafka message type
type Message struct {
Codec CompressionCodec // codec used to compress the message contents
CompressionLevel int // compression level
@@ -96,7 +103,10 @@
}
func (m *Message) decode(pd packetDecoder) (err error) {
- err = pd.push(newCRC32Field(crcIEEE))
+ crc32Decoder := acquireCrc32Field(crcIEEE)
+ defer releaseCrc32Field(crc32Decoder)
+
+ err = pd.push(crc32Decoder)
if err != nil {
return err
}
diff --git a/vendor/github.com/Shopify/sarama/message_set.go b/vendor/github.com/Shopify/sarama/message_set.go
index 600c7c4..6523ec2 100644
--- a/vendor/github.com/Shopify/sarama/message_set.go
+++ b/vendor/github.com/Shopify/sarama/message_set.go
@@ -29,7 +29,10 @@
return err
}
- if err = pd.push(&lengthField{}); err != nil {
+ lengthDecoder := acquireLengthField()
+ defer releaseLengthField(lengthDecoder)
+
+ if err = pd.push(lengthDecoder); err != nil {
return err
}
diff --git a/vendor/github.com/Shopify/sarama/metadata_request.go b/vendor/github.com/Shopify/sarama/metadata_request.go
index 17dc428..1b590d3 100644
--- a/vendor/github.com/Shopify/sarama/metadata_request.go
+++ b/vendor/github.com/Shopify/sarama/metadata_request.go
@@ -37,15 +37,8 @@
if err != nil {
return err
}
- if size < 0 {
- return nil
- } else {
- topicCount := size
- if topicCount == 0 {
- return nil
- }
-
- r.Topics = make([]string, topicCount)
+ if size > 0 {
+ r.Topics = make([]string, size)
for i := range r.Topics {
topic, err := pd.getString()
if err != nil {
diff --git a/vendor/github.com/Shopify/sarama/metadata_response.go b/vendor/github.com/Shopify/sarama/metadata_response.go
index c402d05..b2d532e 100644
--- a/vendor/github.com/Shopify/sarama/metadata_response.go
+++ b/vendor/github.com/Shopify/sarama/metadata_response.go
@@ -296,7 +296,7 @@
return tmatch
}
-func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
+func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) {
tmatch := r.AddTopic(topic, ErrNoError)
var pmatch *PartitionMetadata
@@ -316,6 +316,7 @@
pmatch.Leader = brokerID
pmatch.Replicas = replicas
pmatch.Isr = isr
+ pmatch.OfflineReplicas = offline
pmatch.Err = err
}
diff --git a/vendor/github.com/Shopify/sarama/metrics.go b/vendor/github.com/Shopify/sarama/metrics.go
index 4869708..90e5a87 100644
--- a/vendor/github.com/Shopify/sarama/metrics.go
+++ b/vendor/github.com/Shopify/sarama/metrics.go
@@ -28,14 +28,6 @@
return fmt.Sprintf(name+"-for-broker-%d", broker.ID())
}
-func getOrRegisterBrokerMeter(name string, broker *Broker, r metrics.Registry) metrics.Meter {
- return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, broker), r)
-}
-
-func getOrRegisterBrokerHistogram(name string, broker *Broker, r metrics.Registry) metrics.Histogram {
- return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
-}
-
func getMetricNameForTopic(name string, topic string) string {
// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
// cf. KAFKA-1902 and KAFKA-2337
diff --git a/vendor/github.com/Shopify/sarama/mockbroker.go b/vendor/github.com/Shopify/sarama/mockbroker.go
index 55ef1e2..4ed46a6 100644
--- a/vendor/github.com/Shopify/sarama/mockbroker.go
+++ b/vendor/github.com/Shopify/sarama/mockbroker.go
@@ -18,6 +18,8 @@
expectationTimeout = 500 * time.Millisecond
)
+type GSSApiHandlerFunc func([]byte) []byte
+
type requestHandlerFunc func(req *request) (res encoder)
// RequestNotifierFunc is invoked when a mock broker processes a request successfully
@@ -49,18 +51,19 @@
// It is not necessary to prefix message length or correlation ID to your
// response bytes, the server does that automatically as a convenience.
type MockBroker struct {
- brokerID int32
- port int32
- closing chan none
- stopper chan none
- expectations chan encoder
- listener net.Listener
- t TestReporter
- latency time.Duration
- handler requestHandlerFunc
- notifier RequestNotifierFunc
- history []RequestResponse
- lock sync.Mutex
+ brokerID int32
+ port int32
+ closing chan none
+ stopper chan none
+ expectations chan encoder
+ listener net.Listener
+ t TestReporter
+ latency time.Duration
+ handler requestHandlerFunc
+ notifier RequestNotifierFunc
+ history []RequestResponse
+ lock sync.Mutex
+ gssApiHandler GSSApiHandlerFunc
}
// RequestResponse represents a Request/Response pair processed by MockBroker.
@@ -173,6 +176,43 @@
Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
}
+func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) {
+ b.gssApiHandler = handler
+}
+
+func (b *MockBroker) readToBytes(r io.Reader) ([]byte, error) {
+ var (
+ bytesRead int
+ lengthBytes = make([]byte, 4)
+ )
+
+ if _, err := io.ReadFull(r, lengthBytes); err != nil {
+ return nil, err
+ }
+
+ bytesRead += len(lengthBytes)
+ length := int32(binary.BigEndian.Uint32(lengthBytes))
+
+ if length <= 4 || length > MaxRequestSize {
+ return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
+ }
+
+ encodedReq := make([]byte, length)
+ if _, err := io.ReadFull(r, encodedReq); err != nil {
+ return nil, err
+ }
+
+ bytesRead += len(encodedReq)
+
+ fullBytes := append(lengthBytes, encodedReq...)
+
+ return fullBytes, nil
+}
+
+func (b *MockBroker) isGSSAPI(buffer []byte) bool {
+ return buffer[4] == 0x60 || bytes.Equal(buffer[4:6], []byte{0x05, 0x04})
+}
+
func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
@@ -192,59 +232,92 @@
}()
resHeader := make([]byte, 8)
+ var bytesWritten int
+ var bytesRead int
for {
- req, bytesRead, err := decodeRequest(conn)
+
+ buffer, err := b.readToBytes(conn)
if err != nil {
- Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
+ Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
b.serverError(err)
break
}
- if b.latency > 0 {
- time.Sleep(b.latency)
- }
+ bytesWritten = 0
+ if !b.isGSSAPI(buffer) {
- b.lock.Lock()
- res := b.handler(req)
- b.history = append(b.history, RequestResponse{req.body, res})
- b.lock.Unlock()
-
- if res == nil {
- Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
- continue
- }
- Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
-
- encodedRes, err := encode(res, nil)
- if err != nil {
- b.serverError(err)
- break
- }
- if len(encodedRes) == 0 {
- b.lock.Lock()
- if b.notifier != nil {
- b.notifier(bytesRead, 0)
+ req, br, err := decodeRequest(bytes.NewReader(buffer))
+ bytesRead = br
+ if err != nil {
+ Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
+ b.serverError(err)
+ break
}
- b.lock.Unlock()
- continue
- }
- binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
- binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
- if _, err = conn.Write(resHeader); err != nil {
- b.serverError(err)
- break
- }
- if _, err = conn.Write(encodedRes); err != nil {
- b.serverError(err)
- break
+ if b.latency > 0 {
+ time.Sleep(b.latency)
+ }
+
+ b.lock.Lock()
+ res := b.handler(req)
+ b.history = append(b.history, RequestResponse{req.body, res})
+ b.lock.Unlock()
+
+ if res == nil {
+ Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
+ continue
+ }
+ Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
+
+ encodedRes, err := encode(res, nil)
+ if err != nil {
+ b.serverError(err)
+ break
+ }
+ if len(encodedRes) == 0 {
+ b.lock.Lock()
+ if b.notifier != nil {
+ b.notifier(bytesRead, 0)
+ }
+ b.lock.Unlock()
+ continue
+ }
+
+ binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
+ binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
+ if _, err = conn.Write(resHeader); err != nil {
+ b.serverError(err)
+ break
+ }
+ if _, err = conn.Write(encodedRes); err != nil {
+ b.serverError(err)
+ break
+ }
+ bytesWritten = len(resHeader) + len(encodedRes)
+
+ } else {
+ // GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
+ // Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
+ b.lock.Lock()
+ res := b.gssApiHandler(buffer)
+ b.lock.Unlock()
+ if res == nil {
+ Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))
+ continue
+ }
+ if _, err = conn.Write(res); err != nil {
+ b.serverError(err)
+ break
+ }
+ bytesWritten = len(res)
}
b.lock.Lock()
if b.notifier != nil {
- b.notifier(bytesRead, len(resHeader)+len(encodedRes))
+ b.notifier(bytesRead, bytesWritten)
}
b.lock.Unlock()
+
}
Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
}
diff --git a/vendor/github.com/Shopify/sarama/mockkerberos.go b/vendor/github.com/Shopify/sarama/mockkerberos.go
new file mode 100644
index 0000000..affeb2d
--- /dev/null
+++ b/vendor/github.com/Shopify/sarama/mockkerberos.go
@@ -0,0 +1,123 @@
+package sarama
+
+import (
+ "encoding/binary"
+ "encoding/hex"
+ "gopkg.in/jcmturner/gokrb5.v7/credentials"
+ "gopkg.in/jcmturner/gokrb5.v7/gssapi"
+ "gopkg.in/jcmturner/gokrb5.v7/iana/keyusage"
+ "gopkg.in/jcmturner/gokrb5.v7/messages"
+ "gopkg.in/jcmturner/gokrb5.v7/types"
+)
+
+type KafkaGSSAPIHandler struct {
+ client *MockKerberosClient
+ badResponse bool
+ badKeyChecksum bool
+}
+
+func (h *KafkaGSSAPIHandler) MockKafkaGSSAPI(buffer []byte) []byte {
+ // Default payload used for verify
+ err := h.client.Login() // Mock client construct keys when login
+ if err != nil {
+ return nil
+ }
+ if h.badResponse { // Returns trash
+ return []byte{0x00, 0x00, 0x00, 0x01, 0xAD}
+ }
+
+ var pack = gssapi.WrapToken{
+ Flags: KRB5_USER_AUTH,
+ EC: 12,
+ RRC: 0,
+ SndSeqNum: 3398292281,
+ Payload: []byte{0x11, 0x00}, // 1100
+ }
+ // Compute checksum
+ if h.badKeyChecksum {
+ pack.CheckSum = []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
+ } else {
+ err = pack.SetCheckSum(h.client.ASRep.DecryptedEncPart.Key, keyusage.GSSAPI_ACCEPTOR_SEAL)
+ if err != nil {
+ return nil
+ }
+ }
+
+ packBytes, err := pack.Marshal()
+ if err != nil {
+ return nil
+ }
+ lenBytes := len(packBytes)
+ response := make([]byte, lenBytes+4)
+ copy(response[4:], packBytes)
+ binary.BigEndian.PutUint32(response, uint32(lenBytes))
+ return response
+}
+
+type MockKerberosClient struct {
+ asReqBytes string
+ asRepBytes string
+ ASRep messages.ASRep
+ credentials *credentials.Credentials
+ mockError error
+ errorStage string
+}
+
+func (c *MockKerberosClient) Login() error {
+ if c.errorStage == "login" && c.mockError != nil {
+ return c.mockError
+ }
+ c.asRepBytes = "6b8202e9308202e5a003020105a10302010ba22b30293027a103020113a220041e301c301aa003020112a1131b114" +
+ "558414d504c452e434f4d636c69656e74a30d1b0b4558414d504c452e434f4da4133011a003020101a10a30081b06636c69656e7" +
+ "4a5820156618201523082014ea003020105a10d1b0b4558414d504c452e434f4da220301ea003020102a11730151b066b7262746" +
+ "7741b0b4558414d504c452e434f4da382011430820110a003020112a103020101a28201020481ffdb9891175d106818e61008c51" +
+ "d0b3462bca92f3bf9d4cfa82de4c4d7aff9994ec87c573e3a3d54dcb2bb79618c76f2bf4a3d006f90d5bdbd049bc18f48be39203" +
+ "549ca02acaf63f292b12404f9b74c34b83687119d8f56552ccc0c50ebee2a53bb114c1b4619bb1d5d31f0f49b4d40a08a9b4c046" +
+ "2e1398d0b648be1c0e50c552ad16e1d8d8e74263dd0bf0ec591e4797dfd40a9a1be4ae830d03a306e053fd7586fef84ffc5e4a83" +
+ "7c3122bf3e6a40fe87e84019f6283634461b955712b44a5f7386c278bff94ec2c2dc0403247e29c2450e853471ceababf9b8911f" +
+ "997f2e3010b046d2c49eb438afb0f4c210821e80d4ffa4c9521eb895dcd68610b3feaa682012c30820128a003020112a282011f0" +
+ "482011bce73cbce3f1dd17661c412005f0f2257c756fe8e98ff97e6ec24b7bab66e5fd3a3827aeeae4757af0c6e892948122d8b2" +
+ "03c8df48df0ef5d142d0e416d688f11daa0fcd63d96bdd431d02b8e951c664eeff286a2be62383d274a04016d5f0e141da58cb86" +
+ "331de64063062f4f885e8e9ce5b181ca2fdc67897c5995e0ae1ae0c171a64493ff7bd91bc6d89cd4fce1e2b3ea0a10e34b0d5eda" +
+ "aa38ee727b50c5632ed1d2f2b457908e616178d0d80b72af209fb8ac9dbaa1768fa45931392b36b6d8c12400f8ded2efaa0654d0" +
+ "da1db966e8b5aab4706c800f95d559664646041fdb38b411c62fc0fbe0d25083a28562b0e1c8df16e62e9d5626b0addee489835f" +
+ "eedb0f26c05baa596b69b17f47920aa64b29dc77cfcc97ba47885"
+ apRepBytes, err := hex.DecodeString(c.asRepBytes)
+ if err != nil {
+ return err
+ }
+ err = c.ASRep.Unmarshal(apRepBytes)
+ if err != nil {
+ return err
+ }
+ c.credentials = credentials.New("client", "EXAMPLE.COM").WithPassword("qwerty")
+ _, err = c.ASRep.DecryptEncPart(c.credentials)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (c *MockKerberosClient) GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error) {
+ if c.errorStage == "service_ticket" && c.mockError != nil {
+ return messages.Ticket{}, types.EncryptionKey{}, c.mockError
+ }
+ return c.ASRep.Ticket, c.ASRep.DecryptedEncPart.Key, nil
+}
+
+func (c *MockKerberosClient) Domain() string {
+ return "EXAMPLE.COM"
+}
+func (c *MockKerberosClient) CName() types.PrincipalName {
+ var p = types.PrincipalName{
+ NameType: KRB5_USER_AUTH,
+ NameString: []string{
+ "kafka",
+ "kafka",
+ },
+ }
+ return p
+}
+func (c *MockKerberosClient) Destroy() {
+ // Do nothing.
+}
diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go
index 348c223..c78f0ac 100644
--- a/vendor/github.com/Shopify/sarama/mockresponses.go
+++ b/vendor/github.com/Shopify/sarama/mockresponses.go
@@ -2,6 +2,7 @@
import (
"fmt"
+ "strings"
)
// TestReporter has methods matching go's testing.T to avoid importing
@@ -177,7 +178,7 @@
// Generate set of replicas
replicas := []int32{}
-
+ offlineReplicas := []int32{}
for _, brokerID := range mmr.brokers {
replicas = append(replicas, brokerID)
}
@@ -185,14 +186,14 @@
if len(metadataRequest.Topics) == 0 {
for topic, partitions := range mmr.leaders {
for partition, brokerID := range partitions {
- metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
+ metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
}
}
return metadataResponse
}
for _, topic := range metadataRequest.Topics {
for partition, brokerID := range mmr.leaders[topic] {
- metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
+ metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
}
}
return metadataResponse
@@ -573,6 +574,7 @@
// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
type MockOffsetFetchResponse struct {
offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
+ error KError
t TestReporter
}
@@ -598,15 +600,25 @@
return mr
}
+func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
+ mr.error = kerror
+ return mr
+}
+
func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*OffsetFetchRequest)
group := req.ConsumerGroup
- res := &OffsetFetchResponse{}
+ res := &OffsetFetchResponse{Version: req.Version}
+
for topic, partitions := range mr.offsets[group] {
for partition, block := range partitions {
res.AddBlock(topic, partition, block)
}
}
+
+ if res.Version >= 2 {
+ res.Err = mr.error
+ }
return res
}
@@ -620,10 +632,20 @@
func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*CreateTopicsRequest)
- res := &CreateTopicsResponse{}
+ res := &CreateTopicsResponse{
+ Version: req.Version,
+ }
res.TopicErrors = make(map[string]*TopicError)
- for topic, _ := range req.TopicDetails {
+ for topic := range req.TopicDetails {
+ if res.Version >= 1 && strings.HasPrefix(topic, "_") {
+ msg := "insufficient permissions to create topic with reserved prefix"
+ res.TopicErrors[topic] = &TopicError{
+ Err: ErrTopicAuthorizationFailed,
+ ErrMsg: &msg,
+ }
+ continue
+ }
res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
}
return res
@@ -661,7 +683,15 @@
res := &CreatePartitionsResponse{}
res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
- for topic, _ := range req.TopicPartitions {
+ for topic := range req.TopicPartitions {
+ if strings.HasPrefix(topic, "_") {
+ msg := "insufficient permissions to create partition on topic with reserved prefix"
+ res.TopicPartitionErrors[topic] = &TopicPartitionError{
+ Err: ErrTopicAuthorizationFailed,
+ ErrMsg: &msg,
+ }
+ continue
+ }
res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
}
return res
@@ -682,7 +712,7 @@
for topic, deleteRecordRequestTopic := range req.Topics {
partitions := make(map[int32]*DeleteRecordsResponsePartition)
- for partition, _ := range deleteRecordRequestTopic.PartitionOffsets {
+ for partition := range deleteRecordRequestTopic.PartitionOffsets {
partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
}
res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
@@ -866,3 +896,26 @@
}
return res
}
+
+type MockDeleteGroupsResponse struct {
+ deletedGroups []string
+}
+
+func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
+ return &MockDeleteGroupsResponse{}
+}
+
+func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
+ m.deletedGroups = groups
+ return m
+}
+
+func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
+ resp := &DeleteGroupsResponse{
+ GroupErrorCodes: map[string]KError{},
+ }
+ for _, group := range m.deletedGroups {
+ resp.GroupErrorCodes[group] = ErrNoError
+ }
+ return resp
+}
diff --git a/vendor/github.com/Shopify/sarama/offset_commit_request.go b/vendor/github.com/Shopify/sarama/offset_commit_request.go
index 1ec583e..5732ed9 100644
--- a/vendor/github.com/Shopify/sarama/offset_commit_request.go
+++ b/vendor/github.com/Shopify/sarama/offset_commit_request.go
@@ -200,11 +200,11 @@
func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) {
partitions := r.blocks[topic]
if partitions == nil {
- return 0, "", errors.New("No such offset")
+ return 0, "", errors.New("no such offset")
}
block := partitions[partitionID]
if block == nil {
- return 0, "", errors.New("No such offset")
+ return 0, "", errors.New("no such offset")
}
return block.offset, block.metadata, nil
}
diff --git a/vendor/github.com/Shopify/sarama/offset_manager.go b/vendor/github.com/Shopify/sarama/offset_manager.go
index 2432f7b..923972f 100644
--- a/vendor/github.com/Shopify/sarama/offset_manager.go
+++ b/vendor/github.com/Shopify/sarama/offset_manager.go
@@ -333,7 +333,6 @@
pom.handleError(err)
case ErrOffsetsLoadInProgress:
// nothing wrong but we didn't commit, we'll get it next time round
- break
case ErrUnknownTopicOrPartition:
// let the user know *and* try redispatching - if topic-auto-create is
// enabled, redispatching should trigger a metadata req and create the
@@ -576,6 +575,6 @@
func (pom *partitionOffsetManager) release() {
pom.releaseOnce.Do(func() {
- go close(pom.errors)
+ close(pom.errors)
})
}
diff --git a/vendor/github.com/Shopify/sarama/packet_decoder.go b/vendor/github.com/Shopify/sarama/packet_decoder.go
index 74805cc..9be854c 100644
--- a/vendor/github.com/Shopify/sarama/packet_decoder.go
+++ b/vendor/github.com/Shopify/sarama/packet_decoder.go
@@ -27,6 +27,7 @@
remaining() int
getSubset(length int) (packetDecoder, error)
peek(offset, length int) (packetDecoder, error) // similar to getSubset, but it doesn't advance the offset
+ peekInt8(offset int) (int8, error) // similar to peek, but just one byte
// Stacks, see PushDecoder
push(in pushDecoder) error
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index 4b42d9c..bba0f7e 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -81,7 +81,7 @@
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
- return errors.New("Assertion failed: Message out of sequence added to a batch")
+ return errors.New("assertion failed: message out of sequence added to a batch")
}
// We are being conservative here to avoid having to prep encode the record
size += maximumRecordOverhead
diff --git a/vendor/github.com/Shopify/sarama/real_decoder.go b/vendor/github.com/Shopify/sarama/real_decoder.go
index 23045e7..085cbb3 100644
--- a/vendor/github.com/Shopify/sarama/real_decoder.go
+++ b/vendor/github.com/Shopify/sarama/real_decoder.go
@@ -290,6 +290,14 @@
return &realDecoder{raw: rd.raw[off : off+length]}, nil
}
+func (rd *realDecoder) peekInt8(offset int) (int8, error) {
+ const byteLen = 1
+ if rd.remaining() < offset+byteLen {
+ return -1, ErrInsufficientData
+ }
+ return int8(rd.raw[rd.off+offset]), nil
+}
+
// stacks
func (rd *realDecoder) push(in pushDecoder) error {
diff --git a/vendor/github.com/Shopify/sarama/record.go b/vendor/github.com/Shopify/sarama/record.go
index cded308..cdccfe3 100644
--- a/vendor/github.com/Shopify/sarama/record.go
+++ b/vendor/github.com/Shopify/sarama/record.go
@@ -6,10 +6,12 @@
)
const (
+ isTransactionalMask = 0x10
controlMask = 0x20
maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
)
+//RecordHeader stores key and value for a record header
type RecordHeader struct {
Key []byte
Value []byte
@@ -33,15 +35,16 @@
return nil
}
+//Record is kafka record type
type Record struct {
+ Headers []*RecordHeader
+
Attributes int8
TimestampDelta time.Duration
OffsetDelta int64
Key []byte
Value []byte
- Headers []*RecordHeader
-
- length varintLengthField
+ length varintLengthField
}
func (r *Record) encode(pe packetEncoder) error {
diff --git a/vendor/github.com/Shopify/sarama/record_batch.go b/vendor/github.com/Shopify/sarama/record_batch.go
index a36f7e6..c653763 100644
--- a/vendor/github.com/Shopify/sarama/record_batch.go
+++ b/vendor/github.com/Shopify/sarama/record_batch.go
@@ -45,11 +45,16 @@
FirstSequence int32
Records []*Record
PartialTrailingRecord bool
+ IsTransactional bool
compressedRecords []byte
recordsLen int // uncompressed records size
}
+func (b *RecordBatch) LastOffset() int64 {
+ return b.FirstOffset + int64(b.LastOffsetDelta)
+}
+
func (b *RecordBatch) encode(pe packetEncoder) error {
if b.Version != 2 {
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
@@ -111,7 +116,10 @@
return err
}
- if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
+ crc32Decoder := acquireCrc32Field(crcCastagnoli)
+ defer releaseCrc32Field(crc32Decoder)
+
+ if err = pd.push(crc32Decoder); err != nil {
return err
}
@@ -122,6 +130,7 @@
b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
b.Control = attributes&controlMask == controlMask
b.LogAppendTime = attributes×tampTypeMask == timestampTypeMask
+ b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask
if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
return err
@@ -205,6 +214,9 @@
if b.LogAppendTime {
attr |= timestampTypeMask
}
+ if b.IsTransactional {
+ attr |= isTransactionalMask
+ }
return attr
}
diff --git a/vendor/github.com/Shopify/sarama/records.go b/vendor/github.com/Shopify/sarama/records.go
index 192f592..98160c7 100644
--- a/vendor/github.com/Shopify/sarama/records.go
+++ b/vendor/github.com/Shopify/sarama/records.go
@@ -185,10 +185,20 @@
}
func magicValue(pd packetDecoder) (int8, error) {
- dec, err := pd.peek(magicOffset, magicLength)
- if err != nil {
- return 0, err
+ return pd.peekInt8(magicOffset)
+}
+
+func (r *Records) getControlRecord() (ControlRecord, error) {
+ if r.RecordBatch == nil || len(r.RecordBatch.Records) <= 0 {
+ return ControlRecord{}, fmt.Errorf("cannot get control record, record batch is empty")
}
- return dec.getInt8()
+ firstRecord := r.RecordBatch.Records[0]
+ controlRecord := ControlRecord{}
+ err := controlRecord.decode(&realDecoder{raw: firstRecord.Key}, &realDecoder{raw: firstRecord.Value})
+ if err != nil {
+ return ControlRecord{}, err
+ }
+
+ return controlRecord, nil
}
diff --git a/vendor/github.com/Shopify/sarama/request.go b/vendor/github.com/Shopify/sarama/request.go
index f7eea59..5ed8ca4 100644
--- a/vendor/github.com/Shopify/sarama/request.go
+++ b/vendor/github.com/Shopify/sarama/request.go
@@ -20,51 +20,67 @@
body protocolBody
}
-func (r *request) encode(pe packetEncoder) (err error) {
+func (r *request) encode(pe packetEncoder) error {
pe.push(&lengthField{})
pe.putInt16(r.body.key())
pe.putInt16(r.body.version())
pe.putInt32(r.correlationID)
- err = pe.putString(r.clientID)
+
+ err := pe.putString(r.clientID)
if err != nil {
return err
}
+
err = r.body.encode(pe)
if err != nil {
return err
}
+
return pe.pop()
}
func (r *request) decode(pd packetDecoder) (err error) {
- var key int16
- if key, err = pd.getInt16(); err != nil {
+ key, err := pd.getInt16()
+ if err != nil {
return err
}
- var version int16
- if version, err = pd.getInt16(); err != nil {
+
+ version, err := pd.getInt16()
+ if err != nil {
return err
}
- if r.correlationID, err = pd.getInt32(); err != nil {
+
+ r.correlationID, err = pd.getInt32()
+ if err != nil {
return err
}
+
r.clientID, err = pd.getString()
+ if err != nil {
+ return err
+ }
r.body = allocateBody(key, version)
if r.body == nil {
return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
}
+
return r.body.decode(pd, version)
}
-func decodeRequest(r io.Reader) (req *request, bytesRead int, err error) {
- lengthBytes := make([]byte, 4)
+func decodeRequest(r io.Reader) (*request, int, error) {
+ var (
+ bytesRead int
+ lengthBytes = make([]byte, 4)
+ )
+
if _, err := io.ReadFull(r, lengthBytes); err != nil {
return nil, bytesRead, err
}
- bytesRead += len(lengthBytes)
+ bytesRead += len(lengthBytes)
length := int32(binary.BigEndian.Uint32(lengthBytes))
+
if length <= 4 || length > MaxRequestSize {
return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
}
@@ -73,12 +89,14 @@
if _, err := io.ReadFull(r, encodedReq); err != nil {
return nil, bytesRead, err
}
+
bytesRead += len(encodedReq)
- req = &request{}
+ req := &request{}
if err := decode(encodedReq, req); err != nil {
return nil, bytesRead, err
}
+
return req, bytesRead, nil
}
diff --git a/vendor/github.com/Shopify/sarama/response_header.go b/vendor/github.com/Shopify/sarama/response_header.go
index f3f4d27..7a75918 100644
--- a/vendor/github.com/Shopify/sarama/response_header.go
+++ b/vendor/github.com/Shopify/sarama/response_header.go
@@ -2,6 +2,9 @@
import "fmt"
+const responseLengthSize = 4
+const correlationIDSize = 4
+
type responseHeader struct {
length int32
correlationID int32
diff --git a/vendor/github.com/Shopify/sarama/sarama.go b/vendor/github.com/Shopify/sarama/sarama.go
index 7d5dc60..1e0277a 100644
--- a/vendor/github.com/Shopify/sarama/sarama.go
+++ b/vendor/github.com/Shopify/sarama/sarama.go
@@ -10,10 +10,7 @@
depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the
SyncProducer can still sometimes be lost.
-To consume messages, use the Consumer. Note that Sarama's Consumer implementation does not currently support automatic
-consumer-group rebalancing and offset tracking. For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the
-https://github.com/wvanbergen/kafka library builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9
-and later), the https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
+To consume messages, use Consumer or Consumer-Group API.
For lower-level needs, the Broker and Request/Response objects permit precise control over each connection
and message sent on the wire; the Client provides higher-level metadata management that is shared between
@@ -61,6 +58,14 @@
| compression-ratio-for-topic-<topic> | histogram | Distribution of the compression ratio times 100 of record batches for a given topic |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
+Consumer related metrics:
+
+ +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
+ | Name | Type | Description |
+ +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
+ | consumer-batch-size | histogram | Distribution of the number of messages in a batch |
+ +-------------------------------------------+------------+--------------------------------------------------------------------------------------+
+
*/
package sarama
@@ -69,10 +74,29 @@
"log"
)
-// Logger is the instance of a StdLogger interface that Sarama writes connection
-// management events to. By default it is set to discard all log messages via ioutil.Discard,
-// but you can set it to redirect wherever you want.
-var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)
+var (
+ // Logger is the instance of a StdLogger interface that Sarama writes connection
+ // management events to. By default it is set to discard all log messages via ioutil.Discard,
+ // but you can set it to redirect wherever you want.
+ Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)
+
+ // PanicHandler is called for recovering from panics spawned internally to the library (and thus
+ // not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
+ PanicHandler func(interface{})
+
+ // MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
+ // to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
+ // with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
+ // to process.
+ MaxRequestSize int32 = 100 * 1024 * 1024
+
+ // MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
+ // a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
+ // protect the client from running out of memory. Please note that brokers do not have any natural limit on
+ // the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
+ // (see https://issues.apache.org/jira/browse/KAFKA-2063).
+ MaxResponseSize int32 = 100 * 1024 * 1024
+)
// StdLogger is used to log error messages.
type StdLogger interface {
@@ -80,20 +104,3 @@
Printf(format string, v ...interface{})
Println(v ...interface{})
}
-
-// PanicHandler is called for recovering from panics spawned internally to the library (and thus
-// not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
-var PanicHandler func(interface{})
-
-// MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
-// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
-// with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
-// to process.
-var MaxRequestSize int32 = 100 * 1024 * 1024
-
-// MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
-// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
-// protect the client from running out of memory. Please note that brokers do not have any natural limit on
-// the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
-// (see https://issues.apache.org/jira/browse/KAFKA-2063).
-var MaxResponseSize int32 = 100 * 1024 * 1024
diff --git a/vendor/github.com/Shopify/sarama/utils.go b/vendor/github.com/Shopify/sarama/utils.go
index 7dcbf03..7c815cd 100644
--- a/vendor/github.com/Shopify/sarama/utils.go
+++ b/vendor/github.com/Shopify/sarama/utils.go
@@ -159,6 +159,8 @@
V2_0_0_0 = newKafkaVersion(2, 0, 0, 0)
V2_0_1_0 = newKafkaVersion(2, 0, 1, 0)
V2_1_0_0 = newKafkaVersion(2, 1, 0, 0)
+ V2_2_0_0 = newKafkaVersion(2, 2, 0, 0)
+ V2_3_0_0 = newKafkaVersion(2, 3, 0, 0)
SupportedVersions = []KafkaVersion{
V0_8_2_0,
@@ -181,11 +183,14 @@
V2_0_0_0,
V2_0_1_0,
V2_1_0_0,
+ V2_2_0_0,
+ V2_3_0_0,
}
MinVersion = V0_8_2_0
- MaxVersion = V2_1_0_0
+ MaxVersion = V2_3_0_0
)
+//ParseKafkaVersion parses and returns kafka version or error from a string
func ParseKafkaVersion(s string) (KafkaVersion, error) {
if len(s) < 5 {
return MinVersion, fmt.Errorf("invalid version `%s`", s)