[VOL-4293] OpenONU Adapter update for gRPC migration
Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/vendor/github.com/Shopify/sarama/config.go b/vendor/github.com/Shopify/sarama/config.go
index e2e6513..43e739c 100644
--- a/vendor/github.com/Shopify/sarama/config.go
+++ b/vendor/github.com/Shopify/sarama/config.go
@@ -21,6 +21,13 @@
type Config struct {
// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
Admin struct {
+ Retry struct {
+ // The total number of times to retry sending (retriable) admin requests (default 5).
+ // Similar to the `retries` setting of the JVM AdminClientConfig.
+ Max int
+ // Backoff time between retries of a failed request (default 100ms)
+ Backoff time.Duration
+ }
// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
Timeout time.Duration
@@ -65,8 +72,15 @@
// (defaults to true). You should only set this to false if you're using
// a non-Kafka SASL proxy.
Handshake bool
- //username and password for SASL/PLAIN or SASL/SCRAM authentication
- User string
+ // AuthIdentity is an (optional) authorization identity (authzid) to
+ // use for SASL/PLAIN authentication (if different from User) when
+ // an authenticated user is permitted to act as the presented
+ // alternative user. See RFC4616 for details.
+ AuthIdentity string
+ // User is the authentication identity (authcid) to present for
+ // SASL/PLAIN or SASL/SCRAM authentication
+ User string
+ // Password for SASL/PLAIN authentication
Password string
// authz id used for SASL/SCRAM authentication
SCRAMAuthzID string
@@ -82,8 +96,9 @@
GSSAPI GSSAPIConfig
}
- // KeepAlive specifies the keep-alive period for an active network connection.
- // If zero, keep-alives are disabled. (default is 0: disabled).
+ // KeepAlive specifies the keep-alive period for an active network connection (defaults to 0).
+ // If zero or positive, keep-alives are enabled.
+ // If negative, keep-alives are disabled.
KeepAlive time.Duration
// LocalAddr is the local address to use when dialing an
@@ -214,6 +229,14 @@
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
}
+
+ // Interceptors to be called when the producer dispatcher reads the
+ // message for the first time. Interceptors allows to intercept and
+ // possible mutate the message before they are published to Kafka
+ // cluster. *ProducerMessage modified by the first interceptor's
+ // OnSend() is passed to the second interceptor OnSend(), and so on in
+ // the interceptor chain.
+ Interceptors []ProducerInterceptor
}
// Consumer is the namespace for configuration related to consuming messages,
@@ -312,7 +335,7 @@
// than this, that partition will stop fetching more messages until it
// can proceed again.
// Note that, since the Messages channel is buffered, the actual grace time is
- // (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
+ // (MaxProcessingTime * ChannelBufferSize). Defaults to 100ms.
// If a message is not written to the Messages channel between two ticks
// of the expiryTicker then a timeout is detected.
// Using a ticker instead of a timer to detect timeouts should typically
@@ -338,9 +361,21 @@
// offsets. This currently requires the manual use of an OffsetManager
// but will eventually be automated.
Offsets struct {
- // How frequently to commit updated offsets. Defaults to 1s.
+ // Deprecated: CommitInterval exists for historical compatibility
+ // and should not be used. Please use Consumer.Offsets.AutoCommit
CommitInterval time.Duration
+ // AutoCommit specifies configuration for commit messages automatically.
+ AutoCommit struct {
+ // Whether or not to auto-commit updated offsets back to the broker.
+ // (default enabled).
+ Enable bool
+
+ // How frequently to commit updated offsets. Ineffective unless
+ // auto-commit is enabled (default 1s)
+ Interval time.Duration
+ }
+
// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Initial int64
@@ -364,12 +399,24 @@
// - use `ReadUncommitted` (default) to consume and return all messages in message channel
// - use `ReadCommitted` to hide messages that are part of an aborted transaction
IsolationLevel IsolationLevel
+
+ // Interceptors to be called just before the record is sent to the
+ // messages channel. Interceptors allows to intercept and possible
+ // mutate the message before they are returned to the client.
+ // *ConsumerMessage modified by the first interceptor's OnConsume() is
+ // passed to the second interceptor OnConsume(), and so on in the
+ // interceptor chain.
+ Interceptors []ConsumerInterceptor
}
// A user-provided string sent with every request to the brokers for logging,
// debugging, and auditing purposes. Defaults to "sarama", but you should
// probably set it to something specific to your application.
ClientID string
+ // A rack identifier for this client. This can be any string value which
+ // indicates where this client is physically located.
+ // It corresponds with the broker config 'broker.rack'
+ RackID string
// The number of events to buffer in internal and external channels. This
// permits the producer and consumer to continue processing some messages
// in the background while user code is working, greatly improving throughput.
@@ -394,6 +441,8 @@
func NewConfig() *Config {
c := &Config{}
+ c.Admin.Retry.Max = 5
+ c.Admin.Retry.Backoff = 100 * time.Millisecond
c.Admin.Timeout = 3 * time.Second
c.Net.MaxOpenRequests = 5
@@ -423,7 +472,8 @@
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
- c.Consumer.Offsets.CommitInterval = 1 * time.Second
+ c.Consumer.Offsets.AutoCommit.Enable = true
+ c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3
@@ -436,7 +486,7 @@
c.ClientID = defaultClientID
c.ChannelBufferSize = 256
- c.Version = MinVersion
+ c.Version = DefaultVersion
c.MetricRegistry = metrics.NewRegistry()
return c
@@ -504,8 +554,6 @@
return ConfigurationError("Net.ReadTimeout must be > 0")
case c.Net.WriteTimeout <= 0:
return ConfigurationError("Net.WriteTimeout must be > 0")
- case c.Net.KeepAlive < 0:
- return ConfigurationError("Net.KeepAlive must be >= 0")
case c.Net.SASL.Enable:
if c.Net.SASL.Mechanism == "" {
c.Net.SASL.Mechanism = SASLTypePlaintext
@@ -621,6 +669,10 @@
}
}
+ if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) {
+ return ConfigurationError("zstd compression requires Version >= V2_1_0_0")
+ }
+
if c.Producer.Idempotent {
if !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
@@ -650,8 +702,8 @@
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
- case c.Consumer.Offsets.CommitInterval <= 0:
- return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
+ case c.Consumer.Offsets.AutoCommit.Interval <= 0:
+ return ConfigurationError("Consumer.Offsets.AutoCommit.Interval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
case c.Consumer.Offsets.Retry.Max < 0:
@@ -660,6 +712,11 @@
return ConfigurationError("Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted")
}
+ if c.Consumer.Offsets.CommitInterval != 0 {
+ Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
+ " and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
+ }
+
// validate IsolationLevel
if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("ReadCommitted requires Version >= V0_11_0_0")
@@ -693,3 +750,16 @@
return nil
}
+
+func (c *Config) getDialer() proxy.Dialer {
+ if c.Net.Proxy.Enable {
+ Logger.Printf("using proxy %s", c.Net.Proxy.Dialer)
+ return c.Net.Proxy.Dialer
+ } else {
+ return &net.Dialer{
+ Timeout: c.Net.DialTimeout,
+ KeepAlive: c.Net.KeepAlive,
+ LocalAddr: c.Net.LocalAddr,
+ }
+ }
+}