SEBA-902 single-olt tests;
Pin protoc-gen-go to 1.3.2 to resolve compatibility issue;
Run go mod tidy / go mod vendor on importer;
Add Go Module support to demotest
Change-Id: Ifde824fc9a6317b0adc1e12bea54ee1f9b788906
diff --git a/vendor/github.com/Shopify/sarama/.golangci.yml b/vendor/github.com/Shopify/sarama/.golangci.yml
new file mode 100644
index 0000000..561c2c2
--- /dev/null
+++ b/vendor/github.com/Shopify/sarama/.golangci.yml
@@ -0,0 +1,74 @@
+run:
+ timeout: 5m
+ deadline: 10m
+
+linters-settings:
+ govet:
+ check-shadowing: false
+ golint:
+ min-confidence: 0
+ gocyclo:
+ min-complexity: 99
+ maligned:
+ suggest-new: true
+ dupl:
+ threshold: 100
+ goconst:
+ min-len: 2
+ min-occurrences: 3
+ misspell:
+ locale: US
+ goimports:
+ local-prefixes: github.com/Shopify/sarama
+ gocritic:
+ enabled-tags:
+ - diagnostic
+ - experimental
+ - opinionated
+ - performance
+ - style
+ disabled-checks:
+ - wrapperFunc
+ - ifElseChain
+ funlen:
+ lines: 300
+ statements: 300
+
+linters:
+ disable-all: true
+ enable:
+ - bodyclose
+ # - deadcode
+ - depguard
+ - dogsled
+ # - dupl
+ # - errcheck
+ - funlen
+ # - gocritic
+ - gocyclo
+ - gofmt
+ # - goimports
+ # - golint
+ # - gosec
+ # - gosimple
+ - govet
+ # - ineffassign
+ # - interfacer
+ # - misspell
+ # - nakedret
+ # - scopelint
+ # - staticcheck
+ # - structcheck
+ # - stylecheck
+ - typecheck
+ # - unconvert
+ # - unused
+ # - varcheck
+ - whitespace
+ # - goconst
+ # - gochecknoinits
+
+issues:
+ exclude:
+ - consider giving a name to these results
+ - include an explanation for nolint directive
diff --git a/vendor/github.com/Shopify/sarama/.travis.yml b/vendor/github.com/Shopify/sarama/.travis.yml
deleted file mode 100644
index cace313..0000000
--- a/vendor/github.com/Shopify/sarama/.travis.yml
+++ /dev/null
@@ -1,37 +0,0 @@
-dist: xenial
-language: go
-go:
-- 1.12.x
-- 1.13.x
-
-env:
- global:
- - KAFKA_PEERS=localhost:9091,localhost:9092,localhost:9093,localhost:9094,localhost:9095
- - TOXIPROXY_ADDR=http://localhost:8474
- - KAFKA_INSTALL_ROOT=/home/travis/kafka
- - KAFKA_HOSTNAME=localhost
- - DEBUG=true
- matrix:
- - KAFKA_VERSION=2.2.1 KAFKA_SCALA_VERSION=2.12
- - KAFKA_VERSION=2.3.0 KAFKA_SCALA_VERSION=2.12
-
-before_install:
-- export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
-- vagrant/install_cluster.sh
-- vagrant/boot_cluster.sh
-- vagrant/create_topics.sh
-- vagrant/run_java_producer.sh
-
-install: make install_dependencies
-
-script:
-- make test
-- make vet
-- make errcheck
-- if [[ "$TRAVIS_GO_VERSION" == 1.13* ]]; then make fmt; fi
-
-after_success:
-- go tool cover -func coverage.txt
-- bash <(curl -s https://codecov.io/bash)
-
-after_script: vagrant/halt_cluster.sh
diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md
index 844f481..2bebeb1 100644
--- a/vendor/github.com/Shopify/sarama/CHANGELOG.md
+++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md
@@ -1,5 +1,50 @@
# Changelog
+#### Unreleased
+
+#### Version 1.26.1 (2020-02-04)
+
+Improvements:
+- Add requests-in-flight metric ([1539](https://github.com/Shopify/sarama/pull/1539))
+- Fix misleading example for cluster admin ([1595](https://github.com/Shopify/sarama/pull/1595))
+- Replace Travis with GitHub Actions, linters housekeeping ([1573](https://github.com/Shopify/sarama/pull/1573))
+- Allow BalanceStrategy to provide custom assignment data ([1592](https://github.com/Shopify/sarama/pull/1592))
+
+Bug Fixes:
+- Adds back Consumer.Offsets.CommitInterval to fix API ([1590](https://github.com/Shopify/sarama/pull/1590))
+- Fix error message s/CommitInterval/AutoCommit.Interval ([1589](https://github.com/Shopify/sarama/pull/1589))
+
+#### Version 1.26.0 (2020-01-24)
+
+New Features:
+- Enable zstd compression
+ ([1574](https://github.com/Shopify/sarama/pull/1574),
+ [1582](https://github.com/Shopify/sarama/pull/1582))
+- Support headers in tools kafka-console-producer
+ ([1549](https://github.com/Shopify/sarama/pull/1549))
+
+Improvements:
+- Add SASL AuthIdentity to SASL frames (authzid)
+ ([1585](https://github.com/Shopify/sarama/pull/1585)).
+
+Bug Fixes:
+- Sending messages with ZStd compression enabled fails in multiple ways
+ ([1252](https://github.com/Shopify/sarama/issues/1252)).
+- Use the broker for any admin on BrokerConfig
+ ([1571](https://github.com/Shopify/sarama/pull/1571)).
+- Set DescribeConfigRequest Version field
+ ([1576](https://github.com/Shopify/sarama/pull/1576)).
+- ConsumerGroup flooding logs with client/metadata update req
+ ([1578](https://github.com/Shopify/sarama/pull/1578)).
+- MetadataRequest version in DescribeCluster
+ ([1580](https://github.com/Shopify/sarama/pull/1580)).
+- Fix deadlock in consumer group handleError
+ ([1581](https://github.com/Shopify/sarama/pull/1581))
+- Fill in the Fetch{Request,Response} protocol
+ ([1582](https://github.com/Shopify/sarama/pull/1582)).
+- Retry topic request on ControllerNotAvailable
+ ([1586](https://github.com/Shopify/sarama/pull/1586)).
+
#### Version 1.25.0 (2020-01-13)
New Features:
diff --git a/vendor/github.com/Shopify/sarama/Makefile b/vendor/github.com/Shopify/sarama/Makefile
index 9c8329e..c3b431a 100644
--- a/vendor/github.com/Shopify/sarama/Makefile
+++ b/vendor/github.com/Shopify/sarama/Makefile
@@ -1,56 +1,27 @@
-export GO111MODULE=on
+default: fmt get update test lint
-default: fmt vet errcheck test lint
+GO := GO111MODULE=on GOPRIVATE=github.com/linkedin GOSUMDB=off go
+GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
+GOTEST := $(GO) test -gcflags='-l' -p 3 -v -race -timeout 6m -coverprofile=profile.out -covermode=atomic
-# Taken from https://github.com/codecov/example-go#caveat-multiple-files
-.PHONY: test
-test:
- echo "mode: atomic" > coverage.txt
- for d in `go list ./...`; do \
- go test -p 1 -v -timeout 6m -race -coverprofile=profile.out -covermode=atomic $$d || exit 1; \
- if [ -f profile.out ]; then \
- tail +2 profile.out >> coverage.txt; \
- rm profile.out; \
- fi \
- done
+FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go')
+TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go')
-GOLINT := $(shell command -v golint)
-
-.PHONY: lint
-lint:
-ifndef GOLINT
- go get golang.org/x/lint/golint
-endif
- go list ./... | xargs golint
-
-.PHONY: vet
-vet:
- go vet ./...
-
-ERRCHECK := $(shell command -v errcheck)
-# See https://github.com/kisielk/errcheck/pull/141 for details on ignorepkg
-.PHONY: errcheck
-errcheck:
-ifndef ERRCHECK
- go get github.com/kisielk/errcheck
-endif
- errcheck -ignorepkg fmt github.com/Shopify/sarama/...
-
-.PHONY: fmt
-fmt:
- @if [ -n "$$(go fmt ./...)" ]; then echo 'Please run go fmt on your code.' && exit 1; fi
-
-.PHONY : install_dependencies
-install_dependencies: get
-
-.PHONY: get
get:
- go get -v ./...
+ $(GO) get ./...
+ $(GO) mod verify
+ $(GO) mod tidy
-.PHONY: clean
-clean:
- go clean ./...
+update:
+ $(GO) get -u -v all
+ $(GO) mod verify
+ $(GO) mod tidy
-.PHONY: tidy
-tidy:
- go mod tidy -v
+fmt:
+ gofmt -s -l -w $(FILES) $(TESTS)
+
+lint:
+ golangci-lint run
+
+test:
+ $(GOTEST) ./...
diff --git a/vendor/github.com/Shopify/sarama/Vagrantfile b/vendor/github.com/Shopify/sarama/Vagrantfile
index f4b848a..07d7ffb 100644
--- a/vendor/github.com/Shopify/sarama/Vagrantfile
+++ b/vendor/github.com/Shopify/sarama/Vagrantfile
@@ -1,14 +1,8 @@
-# -*- mode: ruby -*-
-# vi: set ft=ruby :
-
-# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
-VAGRANTFILE_API_VERSION = "2"
-
# We have 5 * 192MB ZK processes and 5 * 320MB Kafka processes => 2560MB
MEMORY = 3072
-Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
- config.vm.box = "ubuntu/trusty64"
+Vagrant.configure("2") do |config|
+ config.vm.box = "ubuntu/bionic64"
config.vm.provision :shell, path: "vagrant/provision.sh"
diff --git a/vendor/github.com/Shopify/sarama/admin.go b/vendor/github.com/Shopify/sarama/admin.go
index 6c9b1e9..dd63484 100644
--- a/vendor/github.com/Shopify/sarama/admin.go
+++ b/vendor/github.com/Shopify/sarama/admin.go
@@ -2,8 +2,11 @@
import (
"errors"
+ "fmt"
"math/rand"
+ "strconv"
"sync"
+ "time"
)
// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
@@ -132,8 +135,45 @@
return ca.client.Controller()
}
-func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
+func (ca *clusterAdmin) refreshController() (*Broker, error) {
+ return ca.client.RefreshController()
+}
+// isErrNoController returns `true` if the given error type unwraps to an
+// `ErrNotController` response from Kafka
+func isErrNoController(err error) bool {
+ switch e := err.(type) {
+ case *TopicError:
+ return e.Err == ErrNotController
+ case *TopicPartitionError:
+ return e.Err == ErrNotController
+ case KError:
+ return e == ErrNotController
+ }
+ return false
+}
+
+// retryOnError will repeatedly call the given (error-returning) func in the
+// case that its response is non-nil and retriable (as determined by the
+// provided retriable func) up to the maximum number of tries permitted by
+// the admin client configuration
+func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
+ var err error
+ for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
+ err = fn()
+ if err == nil || !retriable(err) {
+ return err
+ }
+ Logger.Printf(
+ "admin/request retrying after %dms... (%d attempts remaining)\n",
+ ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
+ time.Sleep(ca.conf.Admin.Retry.Backoff)
+ continue
+ }
+ return err
+}
+
+func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
if topic == "" {
return ErrInvalidTopic
}
@@ -158,26 +198,31 @@
request.Version = 2
}
- b, err := ca.Controller()
- if err != nil {
- return err
- }
+ return ca.retryOnError(isErrNoController, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
- rsp, err := b.CreateTopics(request)
- if err != nil {
- return err
- }
+ rsp, err := b.CreateTopics(request)
+ if err != nil {
+ return err
+ }
- topicErr, ok := rsp.TopicErrors[topic]
- if !ok {
- return ErrIncompleteResponse
- }
+ topicErr, ok := rsp.TopicErrors[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
- if topicErr.Err != ErrNoError {
- return topicErr
- }
+ if topicErr.Err != ErrNoError {
+ if topicErr.Err == ErrNotController {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
- return nil
+ return nil
+ })
}
func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
@@ -214,7 +259,7 @@
Topics: []string{},
}
- if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+ if ca.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 1
}
@@ -226,6 +271,16 @@
return response.Brokers, response.ControllerID, nil
}
+func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
+ brokers := ca.client.Brokers()
+ for _, b := range brokers {
+ if b.ID() == id {
+ return b, nil
+ }
+ }
+ return nil, fmt.Errorf("could not find broker id %d", id)
+}
+
func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
brokers := ca.client.Brokers()
if len(brokers) > 0 {
@@ -308,7 +363,6 @@
}
func (ca *clusterAdmin) DeleteTopic(topic string) error {
-
if topic == "" {
return ErrInvalidTopic
}
@@ -322,25 +376,31 @@
request.Version = 1
}
- b, err := ca.Controller()
- if err != nil {
- return err
- }
+ return ca.retryOnError(isErrNoController, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
- rsp, err := b.DeleteTopics(request)
- if err != nil {
- return err
- }
+ rsp, err := b.DeleteTopics(request)
+ if err != nil {
+ return err
+ }
- topicErr, ok := rsp.TopicErrorCodes[topic]
- if !ok {
- return ErrIncompleteResponse
- }
+ topicErr, ok := rsp.TopicErrorCodes[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
- if topicErr != ErrNoError {
- return topicErr
- }
- return nil
+ if topicErr != ErrNoError {
+ if topicErr == ErrNotController {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
+
+ return nil
+ })
}
func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
@@ -356,30 +416,34 @@
Timeout: ca.conf.Admin.Timeout,
}
- b, err := ca.Controller()
- if err != nil {
- return err
- }
+ return ca.retryOnError(isErrNoController, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
- rsp, err := b.CreatePartitions(request)
- if err != nil {
- return err
- }
+ rsp, err := b.CreatePartitions(request)
+ if err != nil {
+ return err
+ }
- topicErr, ok := rsp.TopicPartitionErrors[topic]
- if !ok {
- return ErrIncompleteResponse
- }
+ topicErr, ok := rsp.TopicPartitionErrors[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
- if topicErr.Err != ErrNoError {
- return topicErr
- }
+ if topicErr.Err != ErrNoError {
+ if topicErr.Err == ErrNotController {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
- return nil
+ return nil
+ })
}
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
-
if topic == "" {
return ErrInvalidTopic
}
@@ -432,8 +496,14 @@
return nil
}
-func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
+// Returns a bool indicating whether the resource request needs to go to a
+// specific broker
+func dependsOnSpecificNode(resource ConfigResource) bool {
+ return (resource.Type == BrokerResource && resource.Name != "") ||
+ resource.Type == BrokerLoggerResource
+}
+func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
var entries []ConfigEntry
var resources []*ConfigResource
resources = append(resources, &resource)
@@ -442,11 +512,31 @@
Resources: resources,
}
- b, err := ca.Controller()
+ if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+ request.Version = 1
+ }
+
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 2
+ }
+
+ var (
+ b *Broker
+ err error
+ )
+
+ // DescribeConfig of broker/broker logger must be sent to the broker in question
+ if dependsOnSpecificNode(resource) {
+ id, _ := strconv.Atoi(resource.Name)
+ b, err = ca.findBroker(int32(id))
+ } else {
+ b, err = ca.findAnyBroker()
+ }
if err != nil {
return nil, err
}
+ _ = b.Open(ca.client.Config())
rsp, err := b.DescribeConfigs(request)
if err != nil {
return nil, err
@@ -466,7 +556,6 @@
}
func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
-
var resources []*AlterConfigsResource
resources = append(resources, &AlterConfigsResource{
Type: resourceType,
@@ -479,11 +568,23 @@
ValidateOnly: validateOnly,
}
- b, err := ca.Controller()
+ var (
+ b *Broker
+ err error
+ )
+
+ // AlterConfig of broker/broker logger must be sent to the broker in question
+ if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
+ id, _ := strconv.Atoi(name)
+ b, err = ca.findBroker(int32(id))
+ } else {
+ b, err = ca.findAnyBroker()
+ }
if err != nil {
return err
}
+ _ = b.Open(ca.client.Config())
rsp, err := b.AlterConfigs(request)
if err != nil {
return err
@@ -518,7 +619,6 @@
}
func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
-
request := &DescribeAclsRequest{AclFilter: filter}
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
@@ -566,7 +666,6 @@
for _, mACL := range fr.MatchingAcls {
mAcls = append(mAcls, *mACL)
}
-
}
return mAcls, nil
}
@@ -580,7 +679,6 @@
return nil, err
}
groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
-
}
for broker, brokerGroups := range groupsPerBroker {
@@ -623,7 +721,6 @@
}
groupMaps <- groups
-
}(b, ca.conf)
}
diff --git a/vendor/github.com/Shopify/sarama/balance_strategy.go b/vendor/github.com/Shopify/sarama/balance_strategy.go
index 67c4d96..56da276 100644
--- a/vendor/github.com/Shopify/sarama/balance_strategy.go
+++ b/vendor/github.com/Shopify/sarama/balance_strategy.go
@@ -47,6 +47,10 @@
// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
// and returns a distribution plan.
Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
+
+ // AssignmentData returns the serialized assignment data for the specified
+ // memberID
+ AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
}
// --------------------------------------------------------------------
@@ -132,6 +136,11 @@
return plan, nil
}
+// AssignmentData simple strategies do not require any shared assignment data
+func (s *balanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
+ return nil, nil
+}
+
type balanceStrategySortable struct {
topic string
memberIDs []string
@@ -268,6 +277,15 @@
return plan, nil
}
+// AssignmentData serializes the set of topics currently assigned to the
+// specified member as part of the supplied balance plan
+func (s *stickyBalanceStrategy) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
+ return encode(&StickyAssignorUserDataV1{
+ Topics: topics,
+ Generation: generationID,
+ }, nil)
+}
+
func strsContains(s []string, value string) bool {
for _, entry := range s {
if entry == value {
diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go
index 8146749..d27ebd2 100644
--- a/vendor/github.com/Shopify/sarama/broker.go
+++ b/vendor/github.com/Shopify/sarama/broker.go
@@ -40,6 +40,7 @@
outgoingByteRate metrics.Meter
responseRate metrics.Meter
responseSize metrics.Histogram
+ requestsInFlight metrics.Counter
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerRequestSize metrics.Histogram
@@ -47,6 +48,7 @@
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
+ brokerRequestsInFlight metrics.Counter
kerberosAuthenticator GSSAPIKerberosAuth
}
@@ -182,6 +184,7 @@
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
+ b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 {
@@ -189,7 +192,6 @@
}
if conf.Net.SASL.Enable {
-
b.connErr = b.authenticateViaSASL()
if b.connErr != nil {
@@ -713,16 +715,19 @@
}
requestTime := time.Now()
+ // Will be decremented in responseReceiver (except error or request with NoResponse)
+ b.addRequestInFlightMetrics(1)
bytes, err := b.write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
return nil, err
}
b.correlationID++
if !promiseResponse {
// Record request latency without the response
- b.updateRequestLatencyMetrics(time.Since(requestTime))
+ b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
return nil, nil
}
@@ -817,6 +822,9 @@
for response := range b.responses {
if dead != nil {
+ // This was previously incremented in send() and
+ // we are not calling updateIncomingCommunicationMetrics()
+ b.addRequestInFlightMetrics(-1)
response.errors <- dead
continue
}
@@ -892,9 +900,12 @@
}
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
bytes, err := b.write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
return err
}
@@ -903,6 +914,7 @@
header := make([]byte, 8) // response header
_, err = b.readFull(header)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
return err
}
@@ -911,6 +923,7 @@
payload := make([]byte, length-4)
n, err := b.readFull(payload)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
return err
}
@@ -961,7 +974,6 @@
// default to V0 to allow for backward compatability when SASL is enabled
// but not the handshake
if b.conf.Net.SASL.Handshake {
-
handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version)
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
@@ -977,16 +989,18 @@
// sendAndReceiveV0SASLPlainAuth flows the v0 sasl auth NOT wrapped in the kafka protocol
func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
-
- length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
+ length := len(b.conf.Net.SASL.AuthIdentity) + 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
authBytes := make([]byte, length+4) //4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
- copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
+ copy(authBytes[4:], []byte(b.conf.Net.SASL.AuthIdentity+"\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
bytesWritten, err := b.write(authBytes)
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1011,11 +1025,13 @@
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)
-
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1068,16 +1084,18 @@
// if the broker responds with a challenge, in which case the token is
// rejected.
func (b *Broker) sendClientMessage(message []byte) (bool, error) {
-
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
correlationID := b.correlationID
bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
return false, err
}
- b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++
res := &SaslAuthenticateResponse{}
@@ -1108,22 +1126,25 @@
msg, err := scramClient.Step("")
if err != nil {
return fmt.Errorf("failed to advance the SCRAM exchange: %s", err.Error())
-
}
for !scramClient.Done() {
requestTime := time.Now()
+ // Will be decremented in updateIncomingCommunicationMetrics (except error)
+ b.addRequestInFlightMetrics(1)
correlationID := b.correlationID
bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
+ b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
- b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++
challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
if err != nil {
+ b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
return err
}
@@ -1216,7 +1237,7 @@
}
func (b *Broker) sendSASLPlainAuthClientResponse(correlationID int32) (int, error) {
- authBytes := []byte("\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
+ authBytes := []byte(b.conf.Net.SASL.AuthIdentity + "\x00" + b.conf.Net.SASL.User + "\x00" + b.conf.Net.SASL.Password)
rb := &SaslAuthenticateRequest{authBytes}
req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req, b.conf.MetricRegistry)
@@ -1228,7 +1249,6 @@
}
func (b *Broker) sendSASLOAuthBearerClientMessage(initialResp []byte, correlationID int32) (int, error) {
-
rb := &SaslAuthenticateRequest{initialResp}
req := &request{correlationID: correlationID, clientID: b.conf.ClientID, body: rb}
@@ -1277,7 +1297,7 @@
}
func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
- b.updateRequestLatencyMetrics(requestLatency)
+ b.updateRequestLatencyAndInFlightMetrics(requestLatency)
b.responseRate.Mark(1)
if b.brokerResponseRate != nil {
@@ -1296,7 +1316,7 @@
}
}
-func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
+func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
requestLatencyInMs := int64(requestLatency / time.Millisecond)
b.requestLatency.Update(requestLatencyInMs)
@@ -1304,6 +1324,14 @@
b.brokerRequestLatency.Update(requestLatencyInMs)
}
+ b.addRequestInFlightMetrics(-1)
+}
+
+func (b *Broker) addRequestInFlightMetrics(i int64) {
+ b.requestsInFlight.Inc(i)
+ if b.brokerRequestsInFlight != nil {
+ b.brokerRequestsInFlight.Inc(i)
+ }
}
func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
@@ -1322,7 +1350,6 @@
if b.brokerRequestSize != nil {
b.brokerRequestSize.Update(requestSize)
}
-
}
func (b *Broker) registerMetrics() {
@@ -1333,6 +1360,7 @@
b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
b.brokerResponseRate = b.registerMeter("response-rate")
b.brokerResponseSize = b.registerHistogram("response-size")
+ b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
}
func (b *Broker) unregisterMetrics() {
@@ -1352,3 +1380,9 @@
b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
}
+
+func (b *Broker) registerCounter(name string) metrics.Counter {
+ nameForBroker := getMetricNameForBroker(name, b)
+ b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
+ return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
+}
diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go
index e5b3557..057a57a 100644
--- a/vendor/github.com/Shopify/sarama/client.go
+++ b/vendor/github.com/Shopify/sarama/client.go
@@ -17,9 +17,15 @@
// altered after it has been created.
Config() *Config
- // Controller returns the cluster controller broker. Requires Kafka 0.10 or higher.
+ // Controller returns the cluster controller broker. It will return a
+ // locally cached value if it's available. You can call RefreshController
+ // to update the cached value. Requires Kafka 0.10 or higher.
Controller() (*Broker, error)
+ // RefreshController retrieves the cluster controller from fresh metadata
+ // and stores it in the local cache. Requires Kafka 0.10 or higher.
+ RefreshController() (*Broker, error)
+
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker
@@ -193,7 +199,6 @@
func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
var err error
for broker := client.any(); broker != nil; broker = client.any() {
-
req := &InitProducerIDRequest{}
response, err := broker.InitProducerID(req)
@@ -487,6 +492,35 @@
return controller, nil
}
+// deregisterController removes the cached controllerID
+func (client *client) deregisterController() {
+ client.lock.Lock()
+ defer client.lock.Unlock()
+ delete(client.brokers, client.controllerID)
+}
+
+// RefreshController retrieves the cluster controller from fresh metadata
+// and stores it in the local cache. Requires Kafka 0.10 or higher.
+func (client *client) RefreshController() (*Broker, error) {
+ if client.Closed() {
+ return nil, ErrClosedClient
+ }
+
+ client.deregisterController()
+
+ if err := client.refreshMetadata(); err != nil {
+ return nil, err
+ }
+
+ controller := client.cachedController()
+ if controller == nil {
+ return nil, ErrControllerNotAvailable
+ }
+
+ _ = controller.Open(client.conf)
+ return controller, nil
+}
+
func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
diff --git a/vendor/github.com/Shopify/sarama/config.go b/vendor/github.com/Shopify/sarama/config.go
index 69c7161..7495200 100644
--- a/vendor/github.com/Shopify/sarama/config.go
+++ b/vendor/github.com/Shopify/sarama/config.go
@@ -21,6 +21,13 @@
type Config struct {
// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
Admin struct {
+ Retry struct {
+ // The total number of times to retry sending (retriable) admin requests (default 5).
+ // Similar to the `retries` setting of the JVM AdminClientConfig.
+ Max int
+ // Backoff time between retries of a failed request (default 100ms)
+ Backoff time.Duration
+ }
// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
Timeout time.Duration
@@ -65,8 +72,15 @@
// (defaults to true). You should only set this to false if you're using
// a non-Kafka SASL proxy.
Handshake bool
- //username and password for SASL/PLAIN or SASL/SCRAM authentication
- User string
+ // AuthIdentity is an (optional) authorization identity (authzid) to
+ // use for SASL/PLAIN authentication (if different from User) when
+ // an authenticated user is permitted to act as the presented
+ // alternative user. See RFC4616 for details.
+ AuthIdentity string
+ // User is the authentication identity (authcid) to present for
+ // SASL/PLAIN or SASL/SCRAM authentication
+ User string
+ // Password for SASL/PLAIN authentication
Password string
// authz id used for SASL/SCRAM authentication
SCRAMAuthzID string
@@ -338,6 +352,11 @@
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
+ // Deprecated: CommitInterval exists for historical compatibility
+ // and should not be used. Please use Consumer.Offsets.AutoCommit
+ CommitInterval time.Duration
+
+ // AutoCommit specifies configuration for commit messages automatically.
AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
@@ -401,6 +420,8 @@
func NewConfig() *Config {
c := &Config{}
+ c.Admin.Retry.Max = 5
+ c.Admin.Retry.Backoff = 100 * time.Millisecond
c.Admin.Timeout = 3 * time.Second
c.Net.MaxOpenRequests = 5
@@ -629,6 +650,10 @@
}
}
+ if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) {
+ return ConfigurationError("zstd compression requires Version >= V2_1_0_0")
+ }
+
if c.Producer.Idempotent {
if !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
@@ -659,7 +684,7 @@
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.AutoCommit.Interval <= 0:
- return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
+ return ConfigurationError("Consumer.Offsets.AutoCommit.Interval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
case c.Consumer.Offsets.Retry.Max < 0:
@@ -668,6 +693,11 @@
return ConfigurationError("Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted")
}
+ if c.Consumer.Offsets.CommitInterval != 0 {
+ Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
+ " and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
+ }
+
// validate IsolationLevel
if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("ReadCommitted requires Version >= V0_11_0_0")
diff --git a/vendor/github.com/Shopify/sarama/config_resource_type.go b/vendor/github.com/Shopify/sarama/config_resource_type.go
index 5399d75..bef1053 100644
--- a/vendor/github.com/Shopify/sarama/config_resource_type.go
+++ b/vendor/github.com/Shopify/sarama/config_resource_type.go
@@ -1,22 +1,18 @@
package sarama
-//ConfigResourceType is a type for config resource
+// ConfigResourceType is a type for resources that have configs.
type ConfigResourceType int8
-// Taken from :
-// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
+// Taken from:
+// https://github.com/apache/kafka/blob/ed7c071e07f1f90e4c2895582f61ca090ced3c42/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L32-L55
const (
- //UnknownResource constant type
- UnknownResource ConfigResourceType = iota
- //AnyResource constant type
- AnyResource
- //TopicResource constant type
- TopicResource
- //GroupResource constant type
- GroupResource
- //ClusterResource constant type
- ClusterResource
- //BrokerResource constant type
- BrokerResource
+ // UnknownResource constant type
+ UnknownResource ConfigResourceType = 0
+ // TopicResource constant type
+ TopicResource ConfigResourceType = 2
+ // BrokerResource constant type
+ BrokerResource ConfigResourceType = 4
+ // BrokerLoggerResource constant type
+ BrokerLoggerResource ConfigResourceType = 8
)
diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go
index 72c4d7c..ff44ade 100644
--- a/vendor/github.com/Shopify/sarama/consumer.go
+++ b/vendor/github.com/Shopify/sarama/consumer.go
@@ -888,6 +888,10 @@
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
}
+ if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
+ request.Version = 10
+ }
+
for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
}
diff --git a/vendor/github.com/Shopify/sarama/consumer_group.go b/vendor/github.com/Shopify/sarama/consumer_group.go
index b974dd9..951f64b 100644
--- a/vendor/github.com/Shopify/sarama/consumer_group.go
+++ b/vendor/github.com/Shopify/sarama/consumer_group.go
@@ -120,9 +120,6 @@
c.closeOnce.Do(func() {
close(c.closed)
- c.lock.Lock()
- defer c.lock.Unlock()
-
// leave group
if e := c.leave(); e != nil {
err = e
@@ -175,6 +172,7 @@
// loop check topic partition numbers changed
// will trigger rebalance when any topic partitions number had changed
+ // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
go c.loopCheckPartitionNumbers(topics, sess)
// Wait for session exit signal
@@ -333,20 +331,14 @@
MemberId: c.memberID,
GenerationId: generationID,
}
+ strategy := c.config.Consumer.Group.Rebalance.Strategy
for memberID, topics := range plan {
assignment := &ConsumerGroupMemberAssignment{Topics: topics}
-
- // Include topic assignments in group-assignment userdata for each consumer-group member
- if c.config.Consumer.Group.Rebalance.Strategy.Name() == StickyBalanceStrategyName {
- userDataBytes, err := encode(&StickyAssignorUserDataV1{
- Topics: topics,
- Generation: generationID,
- }, nil)
- if err != nil {
- return nil, err
- }
- assignment.UserData = userDataBytes
+ userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
+ if err != nil {
+ return nil, err
}
+ assignment.UserData = userDataBytes
if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
return nil, err
}
@@ -384,8 +376,10 @@
return strategy.Plan(members, topics)
}
-// Leaves the cluster, called by Close, protected by lock.
+// Leaves the cluster, called by Close.
func (c *consumerGroup) leave() error {
+ c.lock.Lock()
+ defer c.lock.Unlock()
if c.memberID == "" {
return nil
}
@@ -430,9 +424,6 @@
return
}
- c.lock.Lock()
- defer c.lock.Unlock()
-
select {
case <-c.closed:
//consumer is closed
@@ -448,7 +439,7 @@
}
func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
- pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2)
+ pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
defer session.cancel()
defer pause.Stop()
var oldTopicToPartitionNum map[string]int
@@ -468,6 +459,10 @@
}
select {
case <-pause.C:
+ case <-session.ctx.Done():
+ Logger.Printf("loop check partition number coroutine will exit, topics %s", topics)
+ // if session closed by other, should be exited
+ return
case <-c.closed:
return
}
@@ -475,10 +470,6 @@
}
func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
- if err := c.client.RefreshMetadata(topics...); err != nil {
- Logger.Printf("Consumer Group refresh metadata failed %v", err)
- return nil, err
- }
topicToPartitionNum := make(map[string]int, len(topics))
for _, topic := range topics {
if partitionNum, err := c.client.Partitions(topic); err != nil {
diff --git a/vendor/github.com/Shopify/sarama/describe_configs_response.go b/vendor/github.com/Shopify/sarama/describe_configs_response.go
index 5737232..a18eeba 100644
--- a/vendor/github.com/Shopify/sarama/describe_configs_response.go
+++ b/vendor/github.com/Shopify/sarama/describe_configs_response.go
@@ -277,7 +277,6 @@
}
r.Synonyms[i] = s
}
-
}
return nil
}
diff --git a/vendor/github.com/Shopify/sarama/dev.yml b/vendor/github.com/Shopify/sarama/dev.yml
index 4c030de..6f6807e 100644
--- a/vendor/github.com/Shopify/sarama/dev.yml
+++ b/vendor/github.com/Shopify/sarama/dev.yml
@@ -2,7 +2,7 @@
up:
- go:
- version: '1.13.4'
+ version: '1.13.7'
commands:
test:
diff --git a/vendor/github.com/Shopify/sarama/fetch_request.go b/vendor/github.com/Shopify/sarama/fetch_request.go
index 4db9ddd..836e6de 100644
--- a/vendor/github.com/Shopify/sarama/fetch_request.go
+++ b/vendor/github.com/Shopify/sarama/fetch_request.go
@@ -1,20 +1,41 @@
package sarama
type fetchRequestBlock struct {
- fetchOffset int64
- maxBytes int32
+ Version int16
+ currentLeaderEpoch int32
+ fetchOffset int64
+ logStartOffset int64
+ maxBytes int32
}
-func (b *fetchRequestBlock) encode(pe packetEncoder) error {
+func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
+ b.Version = version
+ if b.Version >= 9 {
+ pe.putInt32(b.currentLeaderEpoch)
+ }
pe.putInt64(b.fetchOffset)
+ if b.Version >= 5 {
+ pe.putInt64(b.logStartOffset)
+ }
pe.putInt32(b.maxBytes)
return nil
}
-func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
+func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
+ b.Version = version
+ if b.Version >= 9 {
+ if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
+ return err
+ }
+ }
if b.fetchOffset, err = pd.getInt64(); err != nil {
return err
}
+ if b.Version >= 5 {
+ if b.logStartOffset, err = pd.getInt64(); err != nil {
+ return err
+ }
+ }
if b.maxBytes, err = pd.getInt32(); err != nil {
return err
}
@@ -25,12 +46,15 @@
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchRequest struct {
- MaxWaitTime int32
- MinBytes int32
- MaxBytes int32
- Version int16
- Isolation IsolationLevel
- blocks map[string]map[int32]*fetchRequestBlock
+ MaxWaitTime int32
+ MinBytes int32
+ MaxBytes int32
+ Version int16
+ Isolation IsolationLevel
+ SessionID int32
+ SessionEpoch int32
+ blocks map[string]map[int32]*fetchRequestBlock
+ forgotten map[string][]int32
}
type IsolationLevel int8
@@ -50,6 +74,10 @@
if r.Version >= 4 {
pe.putInt8(int8(r.Isolation))
}
+ if r.Version >= 7 {
+ pe.putInt32(r.SessionID)
+ pe.putInt32(r.SessionEpoch)
+ }
err = pe.putArrayLength(len(r.blocks))
if err != nil {
return err
@@ -65,17 +93,38 @@
}
for partition, block := range blocks {
pe.putInt32(partition)
- err = block.encode(pe)
+ err = block.encode(pe, r.Version)
if err != nil {
return err
}
}
}
+ if r.Version >= 7 {
+ err = pe.putArrayLength(len(r.forgotten))
+ if err != nil {
+ return err
+ }
+ for topic, partitions := range r.forgotten {
+ err = pe.putString(topic)
+ if err != nil {
+ return err
+ }
+ err = pe.putArrayLength(len(partitions))
+ if err != nil {
+ return err
+ }
+ for _, partition := range partitions {
+ pe.putInt32(partition)
+ }
+ }
+ }
+
return nil
}
func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
+
if _, err = pd.getInt32(); err != nil {
return err
}
@@ -97,6 +146,16 @@
}
r.Isolation = IsolationLevel(isolation)
}
+ if r.Version >= 7 {
+ r.SessionID, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ r.SessionEpoch, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ }
topicCount, err := pd.getArrayLength()
if err != nil {
return err
@@ -121,12 +180,43 @@
return err
}
fetchBlock := &fetchRequestBlock{}
- if err = fetchBlock.decode(pd); err != nil {
+ if err = fetchBlock.decode(pd, r.Version); err != nil {
return err
}
r.blocks[topic][partition] = fetchBlock
}
}
+
+ if r.Version >= 7 {
+ forgottenCount, err := pd.getArrayLength()
+ if err != nil {
+ return err
+ }
+ if forgottenCount == 0 {
+ return nil
+ }
+ r.forgotten = make(map[string][]int32)
+ for i := 0; i < forgottenCount; i++ {
+ topic, err := pd.getString()
+ if err != nil {
+ return err
+ }
+ partitionCount, err := pd.getArrayLength()
+ if err != nil {
+ return err
+ }
+ r.forgotten[topic] = make([]int32, partitionCount)
+
+ for j := 0; j < partitionCount; j++ {
+ partition, err := pd.getInt32()
+ if err != nil {
+ return err
+ }
+ r.forgotten[topic][j] = partition
+ }
+ }
+ }
+
return nil
}
@@ -140,16 +230,28 @@
func (r *FetchRequest) requiredVersion() KafkaVersion {
switch r.Version {
+ case 0:
+ return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
- case 4:
+ case 4, 5:
return V0_11_0_0
+ case 6:
+ return V1_0_0_0
+ case 7:
+ return V1_1_0_0
+ case 8:
+ return V2_0_0_0
+ case 9, 10:
+ return V2_1_0_0
+ case 11:
+ return V2_3_0_0
default:
- return MinVersion
+ return MaxVersion
}
}
@@ -158,13 +260,21 @@
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
}
+ if r.Version >= 7 && r.forgotten == nil {
+ r.forgotten = make(map[string][]int32)
+ }
+
if r.blocks[topic] == nil {
r.blocks[topic] = make(map[int32]*fetchRequestBlock)
}
tmp := new(fetchRequestBlock)
+ tmp.Version = r.Version
tmp.maxBytes = maxBytes
tmp.fetchOffset = fetchOffset
+ if r.Version >= 9 {
+ tmp.currentLeaderEpoch = int32(-1)
+ }
r.blocks[topic][partitionID] = tmp
}
diff --git a/vendor/github.com/Shopify/sarama/fetch_response.go b/vendor/github.com/Shopify/sarama/fetch_response.go
index 3afc187..26936d9 100644
--- a/vendor/github.com/Shopify/sarama/fetch_response.go
+++ b/vendor/github.com/Shopify/sarama/fetch_response.go
@@ -33,6 +33,7 @@
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
+ LogStartOffset int64
AbortedTransactions []*AbortedTransaction
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
@@ -57,6 +58,13 @@
return err
}
+ if version >= 5 {
+ b.LogStartOffset, err = pd.getInt64()
+ if err != nil {
+ return err
+ }
+ }
+
numTransact, err := pd.getArrayLength()
if err != nil {
return err
@@ -166,6 +174,10 @@
if version >= 4 {
pe.putInt64(b.LastStableOffset)
+ if version >= 5 {
+ pe.putInt64(b.LogStartOffset)
+ }
+
if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
return err
}
@@ -200,7 +212,9 @@
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
- Version int16 // v1 requires 0.9+, v2 requires 0.10+
+ ErrorCode int16
+ SessionID int32
+ Version int16
LogAppendTime bool
Timestamp time.Time
}
@@ -216,6 +230,17 @@
r.ThrottleTime = time.Duration(throttle) * time.Millisecond
}
+ if r.Version >= 7 {
+ r.ErrorCode, err = pd.getInt16()
+ if err != nil {
+ return err
+ }
+ r.SessionID, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ }
+
numTopics, err := pd.getArrayLength()
if err != nil {
return err
@@ -258,6 +283,11 @@
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}
+ if r.Version >= 7 {
+ pe.putInt16(r.ErrorCode)
+ pe.putInt32(r.SessionID)
+ }
+
err = pe.putArrayLength(len(r.Blocks))
if err != nil {
return err
@@ -281,7 +311,6 @@
return err
}
}
-
}
return nil
}
@@ -296,16 +325,28 @@
func (r *FetchResponse) requiredVersion() KafkaVersion {
switch r.Version {
+ case 0:
+ return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
- case 4:
+ case 4, 5:
return V0_11_0_0
+ case 6:
+ return V1_0_0_0
+ case 7:
+ return V1_1_0_0
+ case 8:
+ return V2_0_0_0
+ case 9, 10:
+ return V2_1_0_0
+ case 11:
+ return V2_3_0_0
default:
- return MinVersion
+ return MaxVersion
}
}
diff --git a/vendor/github.com/Shopify/sarama/go.mod b/vendor/github.com/Shopify/sarama/go.mod
index 8ba2c91..1dca1cc 100644
--- a/vendor/github.com/Shopify/sarama/go.mod
+++ b/vendor/github.com/Shopify/sarama/go.mod
@@ -5,25 +5,30 @@
require (
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/davecgh/go-spew v1.1.1
- github.com/eapache/go-resiliency v1.1.0
+ github.com/eapache/go-resiliency v1.2.0
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
github.com/eapache/queue v1.1.0
github.com/fortytw2/leaktest v1.3.0
- github.com/frankban/quicktest v1.4.1 // indirect
+ github.com/frankban/quicktest v1.7.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
- github.com/hashicorp/go-uuid v1.0.1 // indirect
- github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 // indirect
- github.com/klauspost/compress v1.9.7
- github.com/pierrec/lz4 v2.2.6+incompatible
- github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
- github.com/stretchr/testify v1.3.0
+ github.com/google/go-cmp v0.4.0 // indirect
+ github.com/hashicorp/go-uuid v1.0.2 // indirect
+ github.com/jcmturner/gofork v1.0.0 // indirect
+ github.com/klauspost/compress v1.9.8
+ github.com/kr/pretty v0.2.0 // indirect
+ github.com/pierrec/lz4 v2.4.1+incompatible
+ github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563
+ github.com/stretchr/testify v1.4.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/xdg/stringprep v1.0.0 // indirect
- golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 // indirect
- golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
+ golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 // indirect
+ golang.org/x/net v0.0.0-20200202094626-16171245cfb2
+ golang.org/x/text v0.3.2 // indirect
+ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
- gopkg.in/jcmturner/gokrb5.v7 v7.2.3
+ gopkg.in/jcmturner/gokrb5.v7 v7.5.0
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
+ gopkg.in/yaml.v2 v2.2.8 // indirect
)
diff --git a/vendor/github.com/Shopify/sarama/go.sum b/vendor/github.com/Shopify/sarama/go.sum
index 7f61258..06ec328 100644
--- a/vendor/github.com/Shopify/sarama/go.sum
+++ b/vendor/github.com/Shopify/sarama/go.sum
@@ -1,69 +1,81 @@
-github.com/DataDog/zstd v1.4.0 h1:vhoV+DUHnRZdKW1i5UMjAk2G4JY8wN4ayRfYDNdEhwo=
-github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
-github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
+github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
+github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
-github.com/frankban/quicktest v1.4.1 h1:Wv2VwvNn73pAdFIVUQRXYDFp31lXKbqblIXo/Q5GPSg=
-github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ=
+github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk=
+github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
-github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
-github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
-github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
-github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
-github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
-github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
-github.com/klauspost/compress v1.8.1 h1:oygt2ychZFHOB6M9gUgajzgKrwRgHbGC77NwA4COVgI=
-github.com/klauspost/compress v1.8.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
-github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs=
-github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
-github.com/klauspost/compress v1.9.7 h1:hYW1gP94JUmAhBtJ+LNz5My+gBobDxPR1iVuKug26aA=
-github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
+github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
+github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
+github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
+github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
+github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
-github.com/pierrec/lz4 v2.2.6+incompatible h1:6aCX4/YZ9v8q69hTyiR7dNLnTA3fgtKHVVW5BCd5Znw=
-github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg=
+github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
-github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ=
+github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
-github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 h1:bselrhR0Or1vomJZC8ZIjWtbDmn9OYFLX5Ik9alpJpE=
-golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
+golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w=
+golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
gopkg.in/jcmturner/goidentity.v3 v3.0.0 h1:1duIyWiTaYvVx3YX2CYtpJbUFd7/UuPYCfgXtQ3VTbI=
gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4=
-gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010=
-gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
+gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg=
+gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/vendor/github.com/Shopify/sarama/gssapi_kerberos.go b/vendor/github.com/Shopify/sarama/gssapi_kerberos.go
index 57f3ecb..32ca93d 100644
--- a/vendor/github.com/Shopify/sarama/gssapi_kerberos.go
+++ b/vendor/github.com/Shopify/sarama/gssapi_kerberos.go
@@ -200,7 +200,6 @@
/* This does the handshake for authorization */
func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
-
kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
if err != nil {
Logger.Printf("Kerberos client error: %s", err)
diff --git a/vendor/github.com/Shopify/sarama/message.go b/vendor/github.com/Shopify/sarama/message.go
index 7c54748..e48566b 100644
--- a/vendor/github.com/Shopify/sarama/message.go
+++ b/vendor/github.com/Shopify/sarama/message.go
@@ -85,7 +85,6 @@
payload = m.compressedCache
m.compressedCache = nil
} else if m.Value != nil {
-
payload, err = compress(m.Codec, m.CompressionLevel, m.Value)
if err != nil {
return err
diff --git a/vendor/github.com/Shopify/sarama/metadata_response.go b/vendor/github.com/Shopify/sarama/metadata_response.go
index b2d532e..916992d 100644
--- a/vendor/github.com/Shopify/sarama/metadata_response.go
+++ b/vendor/github.com/Shopify/sarama/metadata_response.go
@@ -318,5 +318,4 @@
pmatch.Isr = isr
pmatch.OfflineReplicas = offline
pmatch.Err = err
-
}
diff --git a/vendor/github.com/Shopify/sarama/mockbroker.go b/vendor/github.com/Shopify/sarama/mockbroker.go
index 4ed46a6..cd1a850 100644
--- a/vendor/github.com/Shopify/sarama/mockbroker.go
+++ b/vendor/github.com/Shopify/sarama/mockbroker.go
@@ -235,7 +235,6 @@
var bytesWritten int
var bytesRead int
for {
-
buffer, err := b.readToBytes(conn)
if err != nil {
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
@@ -245,7 +244,6 @@
bytesWritten = 0
if !b.isGSSAPI(buffer) {
-
req, br, err := decodeRequest(bytes.NewReader(buffer))
bytesRead = br
if err != nil {
@@ -294,7 +292,6 @@
break
}
bytesWritten = len(resHeader) + len(encodedRes)
-
} else {
// GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
// Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
@@ -317,7 +314,6 @@
b.notifier(bytesRead, bytesWritten)
}
b.lock.Unlock()
-
}
Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
}
diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go
index 7dcc93e..984e5aa 100644
--- a/vendor/github.com/Shopify/sarama/mockresponses.go
+++ b/vendor/github.com/Shopify/sarama/mockresponses.go
@@ -731,29 +731,78 @@
func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*DescribeConfigsRequest)
- res := &DescribeConfigsResponse{}
+ res := &DescribeConfigsResponse{
+ Version: req.Version,
+ }
+
+ includeSynonyms := (req.Version > 0)
for _, r := range req.Resources {
var configEntries []*ConfigEntry
switch r.Type {
- case TopicResource:
+ case BrokerResource:
configEntries = append(configEntries,
- &ConfigEntry{Name: "max.message.bytes",
- Value: "1000000",
- ReadOnly: false,
- Default: true,
- Sensitive: false,
- }, &ConfigEntry{Name: "retention.ms",
- Value: "5000",
- ReadOnly: false,
- Default: false,
- Sensitive: false,
- }, &ConfigEntry{Name: "password",
- Value: "12345",
- ReadOnly: false,
- Default: false,
- Sensitive: true,
- })
+ &ConfigEntry{
+ Name: "min.insync.replicas",
+ Value: "2",
+ ReadOnly: false,
+ Default: false,
+ },
+ )
+ res.Resources = append(res.Resources, &ResourceResponse{
+ Name: r.Name,
+ Configs: configEntries,
+ })
+ case BrokerLoggerResource:
+ configEntries = append(configEntries,
+ &ConfigEntry{
+ Name: "kafka.controller.KafkaController",
+ Value: "DEBUG",
+ ReadOnly: false,
+ Default: false,
+ },
+ )
+ res.Resources = append(res.Resources, &ResourceResponse{
+ Name: r.Name,
+ Configs: configEntries,
+ })
+ case TopicResource:
+ maxMessageBytes := &ConfigEntry{Name: "max.message.bytes",
+ Value: "1000000",
+ ReadOnly: false,
+ Default: true,
+ Sensitive: false,
+ }
+ if includeSynonyms {
+ maxMessageBytes.Synonyms = []*ConfigSynonym{
+ {
+ ConfigName: "max.message.bytes",
+ ConfigValue: "500000",
+ },
+ }
+ }
+ retentionMs := &ConfigEntry{Name: "retention.ms",
+ Value: "5000",
+ ReadOnly: false,
+ Default: false,
+ Sensitive: false,
+ }
+ if includeSynonyms {
+ retentionMs.Synonyms = []*ConfigSynonym{
+ {
+ ConfigName: "log.retention.ms",
+ ConfigValue: "2500",
+ },
+ }
+ }
+ password := &ConfigEntry{Name: "password",
+ Value: "12345",
+ ReadOnly: false,
+ Default: false,
+ Sensitive: true,
+ }
+ configEntries = append(
+ configEntries, maxMessageBytes, retentionMs, password)
res.Resources = append(res.Resources, &ResourceResponse{
Name: r.Name,
Configs: configEntries,
@@ -777,7 +826,7 @@
for _, r := range req.Resources {
res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
- Type: TopicResource,
+ Type: r.Type,
ErrorMsg: "",
})
}
diff --git a/vendor/github.com/Shopify/sarama/offset_manager.go b/vendor/github.com/Shopify/sarama/offset_manager.go
index e40f429..1940872 100644
--- a/vendor/github.com/Shopify/sarama/offset_manager.go
+++ b/vendor/github.com/Shopify/sarama/offset_manager.go
@@ -280,7 +280,6 @@
ConsumerID: om.memberID,
ConsumerGroupGeneration: om.generation,
}
-
}
om.pomsLock.RLock()
diff --git a/vendor/github.com/Shopify/sarama/produce_request.go b/vendor/github.com/Shopify/sarama/produce_request.go
index 0c755d0..178972a 100644
--- a/vendor/github.com/Shopify/sarama/produce_request.go
+++ b/vendor/github.com/Shopify/sarama/produce_request.go
@@ -214,6 +214,8 @@
return V0_10_0_0
case 3:
return V0_11_0_0
+ case 7:
+ return V2_1_0_0
default:
return MinVersion
}
diff --git a/vendor/github.com/Shopify/sarama/produce_response.go b/vendor/github.com/Shopify/sarama/produce_response.go
index 4c5cd35..e4f19a7 100644
--- a/vendor/github.com/Shopify/sarama/produce_response.go
+++ b/vendor/github.com/Shopify/sarama/produce_response.go
@@ -5,11 +5,27 @@
"time"
)
+// Protocol, http://kafka.apache.org/protocol.html
+// v1
+// v2 = v3 = v4
+// v5 = v6 = v7
+// Produce Response (Version: 7) => [responses] throttle_time_ms
+// responses => topic [partition_responses]
+// topic => STRING
+// partition_responses => partition error_code base_offset log_append_time log_start_offset
+// partition => INT32
+// error_code => INT16
+// base_offset => INT64
+// log_append_time => INT64
+// log_start_offset => INT64
+// throttle_time_ms => INT32
+
+// partition_responses in protocol
type ProduceResponseBlock struct {
- Err KError
- Offset int64
- // only provided if Version >= 2 and the broker is configured with `LogAppendTime`
- Timestamp time.Time
+ Err KError // v0, error_code
+ Offset int64 // v0, base_offset
+ Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
+ StartOffset int64 // v5, log_start_offset
}
func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
@@ -32,6 +48,13 @@
}
}
+ if version >= 5 {
+ b.StartOffset, err = pd.getInt64()
+ if err != nil {
+ return err
+ }
+ }
+
return nil
}
@@ -49,13 +72,17 @@
pe.putInt64(timestamp)
}
+ if version >= 5 {
+ pe.putInt64(b.StartOffset)
+ }
+
return nil
}
type ProduceResponse struct {
- Blocks map[string]map[int32]*ProduceResponseBlock
+ Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses
Version int16
- ThrottleTime time.Duration // only provided if Version >= 1
+ ThrottleTime time.Duration // v1, throttle_time_ms
}
func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -129,6 +156,7 @@
}
}
}
+
if r.Version >= 1 {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}
@@ -143,19 +171,6 @@
return r.Version
}
-func (r *ProduceResponse) requiredVersion() KafkaVersion {
- switch r.Version {
- case 1:
- return V0_9_0_0
- case 2:
- return V0_10_0_0
- case 3:
- return V0_11_0_0
- default:
- return MinVersion
- }
-}
-
func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
if r.Blocks == nil {
return nil
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index b684aa4..36c43c6 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -129,6 +129,10 @@
req.Version = 3
}
+ if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
+ req.Version = 7
+ }
+
for topic, partitionSets := range ps.msgs {
for partition, set := range partitionSets {
if req.Version >= 3 {
diff --git a/vendor/github.com/Shopify/sarama/request.go b/vendor/github.com/Shopify/sarama/request.go
index 97437d6..6e4ad87 100644
--- a/vendor/github.com/Shopify/sarama/request.go
+++ b/vendor/github.com/Shopify/sarama/request.go
@@ -105,7 +105,7 @@
case 0:
return &ProduceRequest{}
case 1:
- return &FetchRequest{}
+ return &FetchRequest{Version: version}
case 2:
return &OffsetRequest{Version: version}
case 3:
diff --git a/vendor/github.com/Shopify/sarama/sarama.go b/vendor/github.com/Shopify/sarama/sarama.go
index 1e0277a..48f362d 100644
--- a/vendor/github.com/Shopify/sarama/sarama.go
+++ b/vendor/github.com/Shopify/sarama/sarama.go
@@ -39,6 +39,10 @@
| response-rate-for-broker-<broker-id> | meter | Responses/second received from a given broker |
| response-size | histogram | Distribution of the response size in bytes for all brokers |
| response-size-for-broker-<broker-id> | histogram | Distribution of the response size in bytes for a given broker |
+ | requests-in-flight | counter | The current number of in-flight requests awaiting a response |
+ | | | for all brokers |
+ | requests-in-flight-for-broker-<broker-id> | counter | The current number of in-flight requests awaiting a response |
+ | | | for a given broker |
+----------------------------------------------+------------+---------------------------------------------------------------+
Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.