[SEBA-930] update GRPC version to 1.27 and change kafka message producing
Change-Id: I14145a1351eb2523fa54e66381ad97abc5eedf50
diff --git a/vendor/github.com/Shopify/sarama/.travis.yml b/vendor/github.com/Shopify/sarama/.travis.yml
index d609423..cace313 100644
--- a/vendor/github.com/Shopify/sarama/.travis.yml
+++ b/vendor/github.com/Shopify/sarama/.travis.yml
@@ -1,7 +1,6 @@
dist: xenial
language: go
go:
-- 1.11.x
- 1.12.x
- 1.13.x
diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md
index dfa7e75..844f481 100644
--- a/vendor/github.com/Shopify/sarama/CHANGELOG.md
+++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md
@@ -1,5 +1,29 @@
# Changelog
+#### Version 1.25.0 (2020-01-13)
+
+New Features:
+- Support TLS protocol in kafka-producer-performance
+ ([1538](https://github.com/Shopify/sarama/pull/1538)).
+- Add support for kafka 2.4.0
+ ([1552](https://github.com/Shopify/sarama/pull/1552)).
+
+Improvements:
+- Allow the Consumer to disable auto-commit offsets
+ ([1164](https://github.com/Shopify/sarama/pull/1164)).
+- Produce records with consistent timestamps
+ ([1455](https://github.com/Shopify/sarama/pull/1455)).
+
+Bug Fixes:
+- Fix incorrect SetTopicMetadata name mentions
+ ([1534](https://github.com/Shopify/sarama/pull/1534)).
+- Fix client.tryRefreshMetadata Println
+ ([1535](https://github.com/Shopify/sarama/pull/1535)).
+- Fix panic on calling updateMetadata on closed client
+ ([1531](https://github.com/Shopify/sarama/pull/1531)).
+- Fix possible faulty metrics in TestFuncProducing
+ ([1545](https://github.com/Shopify/sarama/pull/1545)).
+
#### Version 1.24.1 (2019-10-31)
New Features:
diff --git a/vendor/github.com/Shopify/sarama/README.md b/vendor/github.com/Shopify/sarama/README.md
index 0206fac..18ad7bf 100644
--- a/vendor/github.com/Shopify/sarama/README.md
+++ b/vendor/github.com/Shopify/sarama/README.md
@@ -20,7 +20,7 @@
Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
the two latest stable releases of Kafka and Go, and we provide a two month
grace period for older releases. This means we currently officially support
-Go 1.11 through 1.13, and Kafka 2.1 through 2.3, although older releases are
+Go 1.12 through 1.13, and Kafka 2.1 through 2.4, although older releases are
still likely to work.
Sarama follows semantic versioning and provides API stability via the gopkg.in service.
diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go
index 040cfe9..e5b3557 100644
--- a/vendor/github.com/Shopify/sarama/client.go
+++ b/vendor/github.com/Shopify/sarama/client.go
@@ -242,6 +242,9 @@
}
func (client *client) Closed() bool {
+ client.lock.RLock()
+ defer client.lock.RUnlock()
+
return client.brokers == nil
}
@@ -529,6 +532,11 @@
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
// or a previously registered Broker instance. You must hold the write lock before calling this function.
func (client *client) registerBroker(broker *Broker) {
+ if client.brokers == nil {
+ Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
+ return
+ }
+
if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
@@ -822,7 +830,7 @@
}
if broker != nil {
- Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
+ Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
return retry(ErrOutOfBrokers)
}
@@ -833,6 +841,10 @@
// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
+ if client.Closed() {
+ return
+ }
+
client.lock.Lock()
defer client.lock.Unlock()
diff --git a/vendor/github.com/Shopify/sarama/config.go b/vendor/github.com/Shopify/sarama/config.go
index e515e04..69c7161 100644
--- a/vendor/github.com/Shopify/sarama/config.go
+++ b/vendor/github.com/Shopify/sarama/config.go
@@ -338,8 +338,15 @@
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
- // How frequently to commit updated offsets. Defaults to 1s.
- CommitInterval time.Duration
+ AutoCommit struct {
+ // Whether or not to auto-commit updated offsets back to the broker.
+ // (default enabled).
+ Enable bool
+
+ // How frequently to commit updated offsets. Ineffective unless
+ // auto-commit is enabled (default 1s)
+ Interval time.Duration
+ }
// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
@@ -423,7 +430,8 @@
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
- c.Consumer.Offsets.CommitInterval = 1 * time.Second
+ c.Consumer.Offsets.AutoCommit.Enable = true
+ c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3
@@ -650,7 +658,7 @@
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
- case c.Consumer.Offsets.CommitInterval <= 0:
+ case c.Consumer.Offsets.AutoCommit.Interval <= 0:
return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
diff --git a/vendor/github.com/Shopify/sarama/consumer_group.go b/vendor/github.com/Shopify/sarama/consumer_group.go
index da99e88..b974dd9 100644
--- a/vendor/github.com/Shopify/sarama/consumer_group.go
+++ b/vendor/github.com/Shopify/sarama/consumer_group.go
@@ -417,12 +417,6 @@
}
func (c *consumerGroup) handleError(err error, topic string, partition int32) {
- select {
- case <-c.closed:
- return
- default:
- }
-
if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
err = &ConsumerError{
Topic: topic,
@@ -431,13 +425,25 @@
}
}
- if c.config.Consumer.Return.Errors {
- select {
- case c.errors <- err:
- default:
- }
- } else {
+ if !c.config.Consumer.Return.Errors {
Logger.Println(err)
+ return
+ }
+
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ select {
+ case <-c.closed:
+ //consumer is closed
+ return
+ default:
+ }
+
+ select {
+ case c.errors <- err:
+ default:
+ // no error listener
}
}
diff --git a/vendor/github.com/Shopify/sarama/dev.yml b/vendor/github.com/Shopify/sarama/dev.yml
index 481f681..4c030de 100644
--- a/vendor/github.com/Shopify/sarama/dev.yml
+++ b/vendor/github.com/Shopify/sarama/dev.yml
@@ -2,7 +2,7 @@
up:
- go:
- version: '1.13.1'
+ version: '1.13.4'
commands:
test:
diff --git a/vendor/github.com/Shopify/sarama/go.mod b/vendor/github.com/Shopify/sarama/go.mod
index 4337c00..8ba2c91 100644
--- a/vendor/github.com/Shopify/sarama/go.mod
+++ b/vendor/github.com/Shopify/sarama/go.mod
@@ -13,7 +13,7 @@
github.com/golang/snappy v0.0.1 // indirect
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 // indirect
- github.com/klauspost/compress v1.8.2
+ github.com/klauspost/compress v1.9.7
github.com/pierrec/lz4 v2.2.6+incompatible
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
github.com/stretchr/testify v1.3.0
diff --git a/vendor/github.com/Shopify/sarama/go.sum b/vendor/github.com/Shopify/sarama/go.sum
index d2f04ee..7f61258 100644
--- a/vendor/github.com/Shopify/sarama/go.sum
+++ b/vendor/github.com/Shopify/sarama/go.sum
@@ -27,6 +27,8 @@
github.com/klauspost/compress v1.8.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs=
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
+github.com/klauspost/compress v1.9.7 h1:hYW1gP94JUmAhBtJ+LNz5My+gBobDxPR1iVuKug26aA=
+github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
diff --git a/vendor/github.com/Shopify/sarama/offset_manager.go b/vendor/github.com/Shopify/sarama/offset_manager.go
index 923972f..e40f429 100644
--- a/vendor/github.com/Shopify/sarama/offset_manager.go
+++ b/vendor/github.com/Shopify/sarama/offset_manager.go
@@ -58,7 +58,7 @@
client: client,
conf: conf,
group: group,
- ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
+ ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
poms: make(map[string]map[int32]*partitionOffsetManager),
memberID: memberID,
@@ -233,7 +233,12 @@
}
}
+// flushToBroker is ignored if auto-commit offsets is disabled
func (om *offsetManager) flushToBroker() {
+ if !om.conf.Consumer.Offsets.AutoCommit.Enable {
+ return
+ }
+
req := om.constructRequest()
if req == nil {
return
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index bba0f7e..b684aa4 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -44,9 +44,10 @@
}
timestamp := msg.Timestamp
- if msg.Timestamp.IsZero() {
+ if timestamp.IsZero() {
timestamp = time.Now()
}
+ timestamp = timestamp.Truncate(time.Millisecond)
partitions := ps.msgs[msg.Topic]
if partitions == nil {
diff --git a/vendor/github.com/Shopify/sarama/utils.go b/vendor/github.com/Shopify/sarama/utils.go
index 7c815cd..9392793 100644
--- a/vendor/github.com/Shopify/sarama/utils.go
+++ b/vendor/github.com/Shopify/sarama/utils.go
@@ -161,6 +161,7 @@
V2_1_0_0 = newKafkaVersion(2, 1, 0, 0)
V2_2_0_0 = newKafkaVersion(2, 2, 0, 0)
V2_3_0_0 = newKafkaVersion(2, 3, 0, 0)
+ V2_4_0_0 = newKafkaVersion(2, 4, 0, 0)
SupportedVersions = []KafkaVersion{
V0_8_2_0,
@@ -185,9 +186,10 @@
V2_1_0_0,
V2_2_0_0,
V2_3_0_0,
+ V2_4_0_0,
}
MinVersion = V0_8_2_0
- MaxVersion = V2_3_0_0
+ MaxVersion = V2_4_0_0
)
//ParseKafkaVersion parses and returns kafka version or error from a string