blob: ef8bf8df669b5c0bb393b3eb4ada3ca95b02a351 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import (
4 "compress/gzip"
5 "crypto/tls"
6 "fmt"
kesavandc71914f2022-03-25 11:19:03 +05307 "io"
kesavand2cde6582020-06-22 04:56:23 -04008 "net"
9 "regexp"
10 "time"
11
12 "github.com/rcrowley/go-metrics"
13 "golang.org/x/net/proxy"
14)
15
16const defaultClientID = "sarama"
17
18var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
19
20// Config is used to pass multiple configuration options to Sarama's constructors.
21type Config struct {
22 // Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
23 Admin struct {
kesavandc71914f2022-03-25 11:19:03 +053024 Retry struct {
25 // The total number of times to retry sending (retriable) admin requests (default 5).
26 // Similar to the `retries` setting of the JVM AdminClientConfig.
27 Max int
28 // Backoff time between retries of a failed request (default 100ms)
29 Backoff time.Duration
30 }
kesavand2cde6582020-06-22 04:56:23 -040031 // The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
32 // including topics, brokers, configurations and ACLs (defaults to 3 seconds).
33 Timeout time.Duration
34 }
35
36 // Net is the namespace for network-level properties used by the Broker, and
37 // shared by the Client/Producer/Consumer.
38 Net struct {
39 // How many outstanding requests a connection is allowed to have before
40 // sending on it blocks (default 5).
kesavandc71914f2022-03-25 11:19:03 +053041 // Throughput can improve but message ordering is not guaranteed if Producer.Idempotent is disabled, see:
42 // https://kafka.apache.org/protocol#protocol_network
43 // https://kafka.apache.org/28/documentation.html#producerconfigs_max.in.flight.requests.per.connection
kesavand2cde6582020-06-22 04:56:23 -040044 MaxOpenRequests int
45
46 // All three of the below configurations are similar to the
47 // `socket.timeout.ms` setting in JVM kafka. All of them default
48 // to 30 seconds.
49 DialTimeout time.Duration // How long to wait for the initial connection.
50 ReadTimeout time.Duration // How long to wait for a response.
51 WriteTimeout time.Duration // How long to wait for a transmit.
52
53 TLS struct {
54 // Whether or not to use TLS when connecting to the broker
55 // (defaults to false).
56 Enable bool
57 // The TLS configuration to use for secure connections if
58 // enabled (defaults to nil).
59 Config *tls.Config
60 }
61
62 // SASL based authentication with broker. While there are multiple SASL authentication methods
63 // the current implementation is limited to plaintext (SASL/PLAIN) authentication
64 SASL struct {
65 // Whether or not to use SASL authentication when connecting to the broker
66 // (defaults to false).
67 Enable bool
68 // SASLMechanism is the name of the enabled SASL mechanism.
69 // Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
70 Mechanism SASLMechanism
71 // Version is the SASL Protocol Version to use
72 // Kafka > 1.x should use V1, except on Azure EventHub which use V0
73 Version int16
74 // Whether or not to send the Kafka SASL handshake first if enabled
75 // (defaults to true). You should only set this to false if you're using
76 // a non-Kafka SASL proxy.
77 Handshake bool
kesavandc71914f2022-03-25 11:19:03 +053078 // AuthIdentity is an (optional) authorization identity (authzid) to
79 // use for SASL/PLAIN authentication (if different from User) when
80 // an authenticated user is permitted to act as the presented
81 // alternative user. See RFC4616 for details.
82 AuthIdentity string
83 // User is the authentication identity (authcid) to present for
84 // SASL/PLAIN or SASL/SCRAM authentication
85 User string
86 // Password for SASL/PLAIN authentication
kesavand2cde6582020-06-22 04:56:23 -040087 Password string
88 // authz id used for SASL/SCRAM authentication
89 SCRAMAuthzID string
90 // SCRAMClientGeneratorFunc is a generator of a user provided implementation of a SCRAM
91 // client used to perform the SCRAM exchange with the server.
92 SCRAMClientGeneratorFunc func() SCRAMClient
93 // TokenProvider is a user-defined callback for generating
94 // access tokens for SASL/OAUTHBEARER auth. See the
95 // AccessTokenProvider interface docs for proper implementation
96 // guidelines.
97 TokenProvider AccessTokenProvider
98
99 GSSAPI GSSAPIConfig
100 }
101
kesavandc71914f2022-03-25 11:19:03 +0530102 // KeepAlive specifies the keep-alive period for an active network connection (defaults to 0).
103 // If zero or positive, keep-alives are enabled.
104 // If negative, keep-alives are disabled.
kesavand2cde6582020-06-22 04:56:23 -0400105 KeepAlive time.Duration
106
107 // LocalAddr is the local address to use when dialing an
108 // address. The address must be of a compatible type for the
109 // network being dialed.
110 // If nil, a local address is automatically chosen.
111 LocalAddr net.Addr
112
113 Proxy struct {
114 // Whether or not to use proxy when connecting to the broker
115 // (defaults to false).
116 Enable bool
117 // The proxy dialer to use enabled (defaults to nil).
118 Dialer proxy.Dialer
119 }
120 }
121
122 // Metadata is the namespace for metadata management properties used by the
123 // Client, and shared by the Producer/Consumer.
124 Metadata struct {
125 Retry struct {
126 // The total number of times to retry a metadata request when the
127 // cluster is in the middle of a leader election (default 3).
128 Max int
129 // How long to wait for leader election to occur before retrying
130 // (default 250ms). Similar to the JVM's `retry.backoff.ms`.
131 Backoff time.Duration
132 // Called to compute backoff time dynamically. Useful for implementing
133 // more sophisticated backoff strategies. This takes precedence over
134 // `Backoff` if set.
135 BackoffFunc func(retries, maxRetries int) time.Duration
136 }
137 // How frequently to refresh the cluster metadata in the background.
138 // Defaults to 10 minutes. Set to 0 to disable. Similar to
139 // `topic.metadata.refresh.interval.ms` in the JVM version.
140 RefreshFrequency time.Duration
141
142 // Whether to maintain a full set of metadata for all topics, or just
143 // the minimal set that has been necessary so far. The full set is simpler
144 // and usually more convenient, but can take up a substantial amount of
145 // memory if you have many topics and partitions. Defaults to true.
146 Full bool
147
148 // How long to wait for a successful metadata response.
149 // Disabled by default which means a metadata request against an unreachable
150 // cluster (all brokers are unreachable or unresponsive) can take up to
151 // `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max`
152 // to fail.
153 Timeout time.Duration
kesavandc71914f2022-03-25 11:19:03 +0530154
155 // Whether to allow auto-create topics in metadata refresh. If set to true,
156 // the broker may auto-create topics that we requested which do not already exist,
157 // if it is configured to do so (`auto.create.topics.enable` is true). Defaults to true.
158 AllowAutoTopicCreation bool
kesavand2cde6582020-06-22 04:56:23 -0400159 }
160
161 // Producer is the namespace for configuration related to producing messages,
162 // used by the Producer.
163 Producer struct {
164 // The maximum permitted size of a message (defaults to 1000000). Should be
165 // set equal to or smaller than the broker's `message.max.bytes`.
166 MaxMessageBytes int
167 // The level of acknowledgement reliability needed from the broker (defaults
168 // to WaitForLocal). Equivalent to the `request.required.acks` setting of the
169 // JVM producer.
170 RequiredAcks RequiredAcks
171 // The maximum duration the broker will wait the receipt of the number of
172 // RequiredAcks (defaults to 10 seconds). This is only relevant when
173 // RequiredAcks is set to WaitForAll or a number > 1. Only supports
174 // millisecond resolution, nanoseconds will be truncated. Equivalent to
175 // the JVM producer's `request.timeout.ms` setting.
176 Timeout time.Duration
177 // The type of compression to use on messages (defaults to no compression).
178 // Similar to `compression.codec` setting of the JVM producer.
179 Compression CompressionCodec
180 // The level of compression to use on messages. The meaning depends
181 // on the actual compression type used and defaults to default compression
182 // level for the codec.
183 CompressionLevel int
184 // Generates partitioners for choosing the partition to send messages to
185 // (defaults to hashing the message key). Similar to the `partitioner.class`
186 // setting for the JVM producer.
187 Partitioner PartitionerConstructor
188 // If enabled, the producer will ensure that exactly one copy of each message is
189 // written.
190 Idempotent bool
191
192 // Return specifies what channels will be populated. If they are set to true,
193 // you must read from the respective channels to prevent deadlock. If,
194 // however, this config is used to create a `SyncProducer`, both must be set
195 // to true and you shall not read from the channels since the producer does
196 // this internally.
197 Return struct {
198 // If enabled, successfully delivered messages will be returned on the
199 // Successes channel (default disabled).
200 Successes bool
201
202 // If enabled, messages that failed to deliver will be returned on the
203 // Errors channel, including error (default enabled).
204 Errors bool
205 }
206
207 // The following config options control how often messages are batched up and
208 // sent to the broker. By default, messages are sent as fast as possible, and
209 // all messages received while the current batch is in-flight are placed
210 // into the subsequent batch.
211 Flush struct {
212 // The best-effort number of bytes needed to trigger a flush. Use the
213 // global sarama.MaxRequestSize to set a hard upper limit.
214 Bytes int
215 // The best-effort number of messages needed to trigger a flush. Use
216 // `MaxMessages` to set a hard upper limit.
217 Messages int
218 // The best-effort frequency of flushes. Equivalent to
219 // `queue.buffering.max.ms` setting of JVM producer.
220 Frequency time.Duration
221 // The maximum number of messages the producer will send in a single
222 // broker request. Defaults to 0 for unlimited. Similar to
223 // `queue.buffering.max.messages` in the JVM producer.
224 MaxMessages int
225 }
226
227 Retry struct {
228 // The total number of times to retry sending a message (default 3).
229 // Similar to the `message.send.max.retries` setting of the JVM producer.
230 Max int
231 // How long to wait for the cluster to settle between retries
232 // (default 100ms). Similar to the `retry.backoff.ms` setting of the
233 // JVM producer.
234 Backoff time.Duration
235 // Called to compute backoff time dynamically. Useful for implementing
236 // more sophisticated backoff strategies. This takes precedence over
237 // `Backoff` if set.
238 BackoffFunc func(retries, maxRetries int) time.Duration
239 }
kesavandc71914f2022-03-25 11:19:03 +0530240
241 // Interceptors to be called when the producer dispatcher reads the
242 // message for the first time. Interceptors allows to intercept and
243 // possible mutate the message before they are published to Kafka
244 // cluster. *ProducerMessage modified by the first interceptor's
245 // OnSend() is passed to the second interceptor OnSend(), and so on in
246 // the interceptor chain.
247 Interceptors []ProducerInterceptor
kesavand2cde6582020-06-22 04:56:23 -0400248 }
249
250 // Consumer is the namespace for configuration related to consuming messages,
251 // used by the Consumer.
252 Consumer struct {
253
254 // Group is the namespace for configuring consumer group.
255 Group struct {
256 Session struct {
257 // The timeout used to detect consumer failures when using Kafka's group management facility.
258 // The consumer sends periodic heartbeats to indicate its liveness to the broker.
259 // If no heartbeats are received by the broker before the expiration of this session timeout,
260 // then the broker will remove this consumer from the group and initiate a rebalance.
261 // Note that the value must be in the allowable range as configured in the broker configuration
262 // by `group.min.session.timeout.ms` and `group.max.session.timeout.ms` (default 10s)
263 Timeout time.Duration
264 }
265 Heartbeat struct {
266 // The expected time between heartbeats to the consumer coordinator when using Kafka's group
267 // management facilities. Heartbeats are used to ensure that the consumer's session stays active and
268 // to facilitate rebalancing when new consumers join or leave the group.
269 // The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no
270 // higher than 1/3 of that value.
271 // It can be adjusted even lower to control the expected time for normal rebalances (default 3s)
272 Interval time.Duration
273 }
274 Rebalance struct {
275 // Strategy for allocating topic partitions to members (default BalanceStrategyRange)
276 Strategy BalanceStrategy
277 // The maximum allowed time for each worker to join the group once a rebalance has begun.
278 // This is basically a limit on the amount of time needed for all tasks to flush any pending
279 // data and commit offsets. If the timeout is exceeded, then the worker will be removed from
280 // the group, which will cause offset commit failures (default 60s).
281 Timeout time.Duration
282
283 Retry struct {
284 // When a new consumer joins a consumer group the set of consumers attempt to "rebalance"
285 // the load to assign partitions to each consumer. If the set of consumers changes while
286 // this assignment is taking place the rebalance will fail and retry. This setting controls
287 // the maximum number of attempts before giving up (default 4).
288 Max int
289 // Backoff time between retries during rebalance (default 2s)
290 Backoff time.Duration
291 }
292 }
293 Member struct {
294 // Custom metadata to include when joining the group. The user data for all joined members
295 // can be retrieved by sending a DescribeGroupRequest to the broker that is the
296 // coordinator for the group.
297 UserData []byte
298 }
299 }
300
301 Retry struct {
302 // How long to wait after a failing to read from a partition before
303 // trying again (default 2s).
304 Backoff time.Duration
305 // Called to compute backoff time dynamically. Useful for implementing
306 // more sophisticated backoff strategies. This takes precedence over
307 // `Backoff` if set.
308 BackoffFunc func(retries int) time.Duration
309 }
310
311 // Fetch is the namespace for controlling how many bytes are retrieved by any
312 // given request.
313 Fetch struct {
314 // The minimum number of message bytes to fetch in a request - the broker
315 // will wait until at least this many are available. The default is 1,
316 // as 0 causes the consumer to spin when no messages are available.
317 // Equivalent to the JVM's `fetch.min.bytes`.
318 Min int32
319 // The default number of message bytes to fetch from the broker in each
320 // request (default 1MB). This should be larger than the majority of
321 // your messages, or else the consumer will spend a lot of time
322 // negotiating sizes and not actually consuming. Similar to the JVM's
323 // `fetch.message.max.bytes`.
324 Default int32
325 // The maximum number of message bytes to fetch from the broker in a
326 // single request. Messages larger than this will return
327 // ErrMessageTooLarge and will not be consumable, so you must be sure
328 // this is at least as large as your largest message. Defaults to 0
329 // (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
330 // global `sarama.MaxResponseSize` still applies.
331 Max int32
332 }
333 // The maximum amount of time the broker will wait for Consumer.Fetch.Min
334 // bytes to become available before it returns fewer than that anyways. The
335 // default is 250ms, since 0 causes the consumer to spin when no events are
336 // available. 100-500ms is a reasonable range for most cases. Kafka only
337 // supports precision up to milliseconds; nanoseconds will be truncated.
338 // Equivalent to the JVM's `fetch.wait.max.ms`.
339 MaxWaitTime time.Duration
340
341 // The maximum amount of time the consumer expects a message takes to
342 // process for the user. If writing to the Messages channel takes longer
343 // than this, that partition will stop fetching more messages until it
344 // can proceed again.
345 // Note that, since the Messages channel is buffered, the actual grace time is
kesavandc71914f2022-03-25 11:19:03 +0530346 // (MaxProcessingTime * ChannelBufferSize). Defaults to 100ms.
kesavand2cde6582020-06-22 04:56:23 -0400347 // If a message is not written to the Messages channel between two ticks
348 // of the expiryTicker then a timeout is detected.
349 // Using a ticker instead of a timer to detect timeouts should typically
350 // result in many fewer calls to Timer functions which may result in a
351 // significant performance improvement if many messages are being sent
352 // and timeouts are infrequent.
353 // The disadvantage of using a ticker instead of a timer is that
354 // timeouts will be less accurate. That is, the effective timeout could
355 // be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
356 // example, if `MaxProcessingTime` is 100ms then a delay of 180ms
357 // between two messages being sent may not be recognized as a timeout.
358 MaxProcessingTime time.Duration
359
360 // Return specifies what channels will be populated. If they are set to true,
361 // you must read from them to prevent deadlock.
362 Return struct {
363 // If enabled, any errors that occurred while consuming are returned on
364 // the Errors channel (default disabled).
365 Errors bool
366 }
367
368 // Offsets specifies configuration for how and when to commit consumed
369 // offsets. This currently requires the manual use of an OffsetManager
370 // but will eventually be automated.
371 Offsets struct {
kesavandc71914f2022-03-25 11:19:03 +0530372 // Deprecated: CommitInterval exists for historical compatibility
373 // and should not be used. Please use Consumer.Offsets.AutoCommit
kesavand2cde6582020-06-22 04:56:23 -0400374 CommitInterval time.Duration
375
kesavandc71914f2022-03-25 11:19:03 +0530376 // AutoCommit specifies configuration for commit messages automatically.
377 AutoCommit struct {
378 // Whether or not to auto-commit updated offsets back to the broker.
379 // (default enabled).
380 Enable bool
381
382 // How frequently to commit updated offsets. Ineffective unless
383 // auto-commit is enabled (default 1s)
384 Interval time.Duration
385 }
386
kesavand2cde6582020-06-22 04:56:23 -0400387 // The initial offset to use if no offset was previously committed.
388 // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
389 Initial int64
390
391 // The retention duration for committed offsets. If zero, disabled
392 // (in which case the `offsets.retention.minutes` option on the
393 // broker will be used). Kafka only supports precision up to
394 // milliseconds; nanoseconds will be truncated. Requires Kafka
395 // broker version 0.9.0 or later.
396 // (default is 0: disabled).
397 Retention time.Duration
398
399 Retry struct {
400 // The total number of times to retry failing commit
401 // requests during OffsetManager shutdown (default 3).
402 Max int
403 }
404 }
405
406 // IsolationLevel support 2 mode:
407 // - use `ReadUncommitted` (default) to consume and return all messages in message channel
408 // - use `ReadCommitted` to hide messages that are part of an aborted transaction
409 IsolationLevel IsolationLevel
kesavandc71914f2022-03-25 11:19:03 +0530410
411 // Interceptors to be called just before the record is sent to the
412 // messages channel. Interceptors allows to intercept and possible
413 // mutate the message before they are returned to the client.
414 // *ConsumerMessage modified by the first interceptor's OnConsume() is
415 // passed to the second interceptor OnConsume(), and so on in the
416 // interceptor chain.
417 Interceptors []ConsumerInterceptor
kesavand2cde6582020-06-22 04:56:23 -0400418 }
419
420 // A user-provided string sent with every request to the brokers for logging,
421 // debugging, and auditing purposes. Defaults to "sarama", but you should
422 // probably set it to something specific to your application.
423 ClientID string
kesavandc71914f2022-03-25 11:19:03 +0530424 // A rack identifier for this client. This can be any string value which
425 // indicates where this client is physically located.
426 // It corresponds with the broker config 'broker.rack'
427 RackID string
kesavand2cde6582020-06-22 04:56:23 -0400428 // The number of events to buffer in internal and external channels. This
429 // permits the producer and consumer to continue processing some messages
430 // in the background while user code is working, greatly improving throughput.
431 // Defaults to 256.
432 ChannelBufferSize int
kesavandc71914f2022-03-25 11:19:03 +0530433 // ApiVersionsRequest determines whether Sarama should send an
434 // ApiVersionsRequest message to each broker as part of its initial
435 // connection. This defaults to `true` to match the official Java client
436 // and most 3rdparty ones.
437 ApiVersionsRequest bool
kesavand2cde6582020-06-22 04:56:23 -0400438 // The version of Kafka that Sarama will assume it is running against.
439 // Defaults to the oldest supported stable version. Since Kafka provides
440 // backwards-compatibility, setting it to a version older than you have
441 // will not break anything, although it may prevent you from using the
442 // latest features. Setting it to a version greater than you are actually
443 // running may lead to random breakage.
444 Version KafkaVersion
445 // The registry to define metrics into.
446 // Defaults to a local registry.
447 // If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true"
448 // prior to starting Sarama.
449 // See Examples on how to use the metrics registry
450 MetricRegistry metrics.Registry
451}
452
453// NewConfig returns a new configuration instance with sane defaults.
454func NewConfig() *Config {
455 c := &Config{}
456
kesavandc71914f2022-03-25 11:19:03 +0530457 c.Admin.Retry.Max = 5
458 c.Admin.Retry.Backoff = 100 * time.Millisecond
kesavand2cde6582020-06-22 04:56:23 -0400459 c.Admin.Timeout = 3 * time.Second
460
461 c.Net.MaxOpenRequests = 5
462 c.Net.DialTimeout = 30 * time.Second
463 c.Net.ReadTimeout = 30 * time.Second
464 c.Net.WriteTimeout = 30 * time.Second
465 c.Net.SASL.Handshake = true
466 c.Net.SASL.Version = SASLHandshakeV0
467
468 c.Metadata.Retry.Max = 3
469 c.Metadata.Retry.Backoff = 250 * time.Millisecond
470 c.Metadata.RefreshFrequency = 10 * time.Minute
471 c.Metadata.Full = true
kesavandc71914f2022-03-25 11:19:03 +0530472 c.Metadata.AllowAutoTopicCreation = true
kesavand2cde6582020-06-22 04:56:23 -0400473
474 c.Producer.MaxMessageBytes = 1000000
475 c.Producer.RequiredAcks = WaitForLocal
476 c.Producer.Timeout = 10 * time.Second
477 c.Producer.Partitioner = NewHashPartitioner
478 c.Producer.Retry.Max = 3
479 c.Producer.Retry.Backoff = 100 * time.Millisecond
480 c.Producer.Return.Errors = true
481 c.Producer.CompressionLevel = CompressionLevelDefault
482
483 c.Consumer.Fetch.Min = 1
484 c.Consumer.Fetch.Default = 1024 * 1024
485 c.Consumer.Retry.Backoff = 2 * time.Second
486 c.Consumer.MaxWaitTime = 250 * time.Millisecond
487 c.Consumer.MaxProcessingTime = 100 * time.Millisecond
488 c.Consumer.Return.Errors = false
kesavandc71914f2022-03-25 11:19:03 +0530489 c.Consumer.Offsets.AutoCommit.Enable = true
490 c.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
kesavand2cde6582020-06-22 04:56:23 -0400491 c.Consumer.Offsets.Initial = OffsetNewest
492 c.Consumer.Offsets.Retry.Max = 3
493
494 c.Consumer.Group.Session.Timeout = 10 * time.Second
495 c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
496 c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange
497 c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
498 c.Consumer.Group.Rebalance.Retry.Max = 4
499 c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
500
501 c.ClientID = defaultClientID
502 c.ChannelBufferSize = 256
kesavandc71914f2022-03-25 11:19:03 +0530503 c.ApiVersionsRequest = true
504 c.Version = DefaultVersion
kesavand2cde6582020-06-22 04:56:23 -0400505 c.MetricRegistry = metrics.NewRegistry()
506
507 return c
508}
509
510// Validate checks a Config instance. It will return a
511// ConfigurationError if the specified values don't make sense.
512func (c *Config) Validate() error {
513 // some configuration values should be warned on but not fail completely, do those first
514 if !c.Net.TLS.Enable && c.Net.TLS.Config != nil {
515 Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.")
516 }
517 if !c.Net.SASL.Enable {
518 if c.Net.SASL.User != "" {
519 Logger.Println("Net.SASL is disabled but a non-empty username was provided.")
520 }
521 if c.Net.SASL.Password != "" {
522 Logger.Println("Net.SASL is disabled but a non-empty password was provided.")
523 }
524 }
525 if c.Producer.RequiredAcks > 1 {
526 Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
527 }
528 if c.Producer.MaxMessageBytes >= int(MaxRequestSize) {
529 Logger.Println("Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored.")
530 }
531 if c.Producer.Flush.Bytes >= int(MaxRequestSize) {
532 Logger.Println("Producer.Flush.Bytes must be smaller than MaxRequestSize; it will be ignored.")
533 }
534 if (c.Producer.Flush.Bytes > 0 || c.Producer.Flush.Messages > 0) && c.Producer.Flush.Frequency == 0 {
535 Logger.Println("Producer.Flush: Bytes or Messages are set, but Frequency is not; messages may not get flushed.")
536 }
537 if c.Producer.Timeout%time.Millisecond != 0 {
538 Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
539 }
540 if c.Consumer.MaxWaitTime < 100*time.Millisecond {
541 Logger.Println("Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
542 }
543 if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
544 Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
545 }
546 if c.Consumer.Offsets.Retention%time.Millisecond != 0 {
547 Logger.Println("Consumer.Offsets.Retention only supports millisecond precision; nanoseconds will be truncated.")
548 }
549 if c.Consumer.Group.Session.Timeout%time.Millisecond != 0 {
550 Logger.Println("Consumer.Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
551 }
552 if c.Consumer.Group.Heartbeat.Interval%time.Millisecond != 0 {
553 Logger.Println("Consumer.Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
554 }
555 if c.Consumer.Group.Rebalance.Timeout%time.Millisecond != 0 {
556 Logger.Println("Consumer.Group.Rebalance.Timeout only supports millisecond precision; nanoseconds will be truncated.")
557 }
558 if c.ClientID == defaultClientID {
559 Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
560 }
561
562 // validate Net values
563 switch {
564 case c.Net.MaxOpenRequests <= 0:
565 return ConfigurationError("Net.MaxOpenRequests must be > 0")
566 case c.Net.DialTimeout <= 0:
567 return ConfigurationError("Net.DialTimeout must be > 0")
568 case c.Net.ReadTimeout <= 0:
569 return ConfigurationError("Net.ReadTimeout must be > 0")
570 case c.Net.WriteTimeout <= 0:
571 return ConfigurationError("Net.WriteTimeout must be > 0")
kesavand2cde6582020-06-22 04:56:23 -0400572 case c.Net.SASL.Enable:
573 if c.Net.SASL.Mechanism == "" {
574 c.Net.SASL.Mechanism = SASLTypePlaintext
575 }
576
577 switch c.Net.SASL.Mechanism {
578 case SASLTypePlaintext:
579 if c.Net.SASL.User == "" {
580 return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
581 }
582 if c.Net.SASL.Password == "" {
583 return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
584 }
585 case SASLTypeOAuth:
586 if c.Net.SASL.TokenProvider == nil {
587 return ConfigurationError("An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider")
588 }
589 case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
590 if c.Net.SASL.User == "" {
591 return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
592 }
593 if c.Net.SASL.Password == "" {
594 return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
595 }
596 if c.Net.SASL.SCRAMClientGeneratorFunc == nil {
597 return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc")
598 }
599 case SASLTypeGSSAPI:
600 if c.Net.SASL.GSSAPI.ServiceName == "" {
601 return ConfigurationError("Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used")
602 }
603
604 if c.Net.SASL.GSSAPI.AuthType == KRB5_USER_AUTH {
605 if c.Net.SASL.GSSAPI.Password == "" {
606 return ConfigurationError("Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
607 "mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH")
608 }
609 } else if c.Net.SASL.GSSAPI.AuthType == KRB5_KEYTAB_AUTH {
610 if c.Net.SASL.GSSAPI.KeyTabPath == "" {
611 return ConfigurationError("Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
612 " and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH")
613 }
614 } else {
615 return ConfigurationError("Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH")
616 }
617 if c.Net.SASL.GSSAPI.KerberosConfigPath == "" {
618 return ConfigurationError("Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used")
619 }
620 if c.Net.SASL.GSSAPI.Username == "" {
621 return ConfigurationError("Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used")
622 }
623 if c.Net.SASL.GSSAPI.Realm == "" {
624 return ConfigurationError("Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used")
625 }
626 default:
627 msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s`, `%s` and `%s`",
628 SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypeGSSAPI)
629 return ConfigurationError(msg)
630 }
631 }
632
633 // validate the Admin values
634 switch {
635 case c.Admin.Timeout <= 0:
636 return ConfigurationError("Admin.Timeout must be > 0")
637 }
638
639 // validate the Metadata values
640 switch {
641 case c.Metadata.Retry.Max < 0:
642 return ConfigurationError("Metadata.Retry.Max must be >= 0")
643 case c.Metadata.Retry.Backoff < 0:
644 return ConfigurationError("Metadata.Retry.Backoff must be >= 0")
645 case c.Metadata.RefreshFrequency < 0:
646 return ConfigurationError("Metadata.RefreshFrequency must be >= 0")
647 }
648
649 // validate the Producer values
650 switch {
651 case c.Producer.MaxMessageBytes <= 0:
652 return ConfigurationError("Producer.MaxMessageBytes must be > 0")
653 case c.Producer.RequiredAcks < -1:
654 return ConfigurationError("Producer.RequiredAcks must be >= -1")
655 case c.Producer.Timeout <= 0:
656 return ConfigurationError("Producer.Timeout must be > 0")
657 case c.Producer.Partitioner == nil:
658 return ConfigurationError("Producer.Partitioner must not be nil")
659 case c.Producer.Flush.Bytes < 0:
660 return ConfigurationError("Producer.Flush.Bytes must be >= 0")
661 case c.Producer.Flush.Messages < 0:
662 return ConfigurationError("Producer.Flush.Messages must be >= 0")
663 case c.Producer.Flush.Frequency < 0:
664 return ConfigurationError("Producer.Flush.Frequency must be >= 0")
665 case c.Producer.Flush.MaxMessages < 0:
666 return ConfigurationError("Producer.Flush.MaxMessages must be >= 0")
667 case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
668 return ConfigurationError("Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set")
669 case c.Producer.Retry.Max < 0:
670 return ConfigurationError("Producer.Retry.Max must be >= 0")
671 case c.Producer.Retry.Backoff < 0:
672 return ConfigurationError("Producer.Retry.Backoff must be >= 0")
673 }
674
675 if c.Producer.Compression == CompressionLZ4 && !c.Version.IsAtLeast(V0_10_0_0) {
676 return ConfigurationError("lz4 compression requires Version >= V0_10_0_0")
677 }
678
679 if c.Producer.Compression == CompressionGZIP {
680 if c.Producer.CompressionLevel != CompressionLevelDefault {
kesavandc71914f2022-03-25 11:19:03 +0530681 if _, err := gzip.NewWriterLevel(io.Discard, c.Producer.CompressionLevel); err != nil {
kesavand2cde6582020-06-22 04:56:23 -0400682 return ConfigurationError(fmt.Sprintf("gzip compression does not work with level %d: %v", c.Producer.CompressionLevel, err))
683 }
684 }
685 }
686
kesavandc71914f2022-03-25 11:19:03 +0530687 if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) {
688 return ConfigurationError("zstd compression requires Version >= V2_1_0_0")
689 }
690
kesavand2cde6582020-06-22 04:56:23 -0400691 if c.Producer.Idempotent {
692 if !c.Version.IsAtLeast(V0_11_0_0) {
693 return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
694 }
695 if c.Producer.Retry.Max == 0 {
696 return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
697 }
698 if c.Producer.RequiredAcks != WaitForAll {
699 return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
700 }
701 if c.Net.MaxOpenRequests > 1 {
702 return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
703 }
704 }
705
706 // validate the Consumer values
707 switch {
708 case c.Consumer.Fetch.Min <= 0:
709 return ConfigurationError("Consumer.Fetch.Min must be > 0")
710 case c.Consumer.Fetch.Default <= 0:
711 return ConfigurationError("Consumer.Fetch.Default must be > 0")
712 case c.Consumer.Fetch.Max < 0:
713 return ConfigurationError("Consumer.Fetch.Max must be >= 0")
714 case c.Consumer.MaxWaitTime < 1*time.Millisecond:
715 return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
716 case c.Consumer.MaxProcessingTime <= 0:
717 return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
718 case c.Consumer.Retry.Backoff < 0:
719 return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
kesavandc71914f2022-03-25 11:19:03 +0530720 case c.Consumer.Offsets.AutoCommit.Interval <= 0:
721 return ConfigurationError("Consumer.Offsets.AutoCommit.Interval must be > 0")
kesavand2cde6582020-06-22 04:56:23 -0400722 case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
723 return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
724 case c.Consumer.Offsets.Retry.Max < 0:
725 return ConfigurationError("Consumer.Offsets.Retry.Max must be >= 0")
726 case c.Consumer.IsolationLevel != ReadUncommitted && c.Consumer.IsolationLevel != ReadCommitted:
727 return ConfigurationError("Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted")
728 }
729
kesavandc71914f2022-03-25 11:19:03 +0530730 if c.Consumer.Offsets.CommitInterval != 0 {
731 Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
732 " and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
733 }
734
kesavand2cde6582020-06-22 04:56:23 -0400735 // validate IsolationLevel
736 if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
737 return ConfigurationError("ReadCommitted requires Version >= V0_11_0_0")
738 }
739
740 // validate the Consumer Group values
741 switch {
742 case c.Consumer.Group.Session.Timeout <= 2*time.Millisecond:
743 return ConfigurationError("Consumer.Group.Session.Timeout must be >= 2ms")
744 case c.Consumer.Group.Heartbeat.Interval < 1*time.Millisecond:
745 return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms")
746 case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout:
747 return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout")
748 case c.Consumer.Group.Rebalance.Strategy == nil:
749 return ConfigurationError("Consumer.Group.Rebalance.Strategy must not be empty")
750 case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond:
751 return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms")
752 case c.Consumer.Group.Rebalance.Retry.Max < 0:
753 return ConfigurationError("Consumer.Group.Rebalance.Retry.Max must be >= 0")
754 case c.Consumer.Group.Rebalance.Retry.Backoff < 0:
755 return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
756 }
757
758 // validate misc shared values
759 switch {
760 case c.ChannelBufferSize < 0:
761 return ConfigurationError("ChannelBufferSize must be >= 0")
762 case !validID.MatchString(c.ClientID):
763 return ConfigurationError("ClientID is invalid")
764 }
765
766 return nil
767}
kesavandc71914f2022-03-25 11:19:03 +0530768
769func (c *Config) getDialer() proxy.Dialer {
770 if c.Net.Proxy.Enable {
771 Logger.Printf("using proxy %s", c.Net.Proxy.Dialer)
772 return c.Net.Proxy.Dialer
773 } else {
774 return &net.Dialer{
775 Timeout: c.Net.DialTimeout,
776 KeepAlive: c.Net.KeepAlive,
777 LocalAddr: c.Net.LocalAddr,
778 }
779 }
780}