blob: 0fca0a30eabddcdbf0cc741982b46462ea7e1f0a [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "errors"
5 "fmt"
6)
7
8// ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
9// or otherwise failed to respond.
10var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")
11
khenaidood948f772021-08-11 17:49:24 -040012// ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.
13var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found")
14
khenaidooac637102019-01-14 15:44:34 -050015// ErrClosedClient is the error returned when a method is called on a client that has been closed.
16var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
17
18// ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
19// not contain the expected information.
20var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")
21
22// ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index
23// (meaning one outside of the range [0...numPartitions-1]).
24var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")
25
26// ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
27var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
28
29// ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
30var ErrNotConnected = errors.New("kafka: broker not connected")
31
32// ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected
33// when requesting messages, since as an optimization the server is allowed to return a partial message at the end
34// of the message set.
35var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
36
37// ErrShuttingDown is returned when a producer receives a message during shutdown.
38var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
39
40// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
41var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
42
43// ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing
44// a RecordBatch.
45var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")
46
47// ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version
48// is lower than 0.10.0.0.
49var ErrControllerNotAvailable = errors.New("kafka: controller is not available")
50
51// ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update
52// the metadata.
53var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")
54
khenaidood948f772021-08-11 17:49:24 -040055// ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism
56var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")
57
khenaidooac637102019-01-14 15:44:34 -050058// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
59// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
60type PacketEncodingError struct {
61 Info string
62}
63
64func (err PacketEncodingError) Error() string {
65 return fmt.Sprintf("kafka: error encoding packet: %s", err.Info)
66}
67
68// PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
69// This can be a bad CRC or length field, or any other invalid value.
70type PacketDecodingError struct {
71 Info string
72}
73
74func (err PacketDecodingError) Error() string {
75 return fmt.Sprintf("kafka: error decoding packet: %s", err.Info)
76}
77
78// ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer)
79// when the specified configuration is invalid.
80type ConfigurationError string
81
82func (err ConfigurationError) Error() string {
83 return "kafka: invalid configuration (" + string(err) + ")"
84}
85
86// KError is the type of error that can be returned directly by the Kafka broker.
87// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
88type KError int16
89
Scott Baker8461e152019-10-01 14:44:30 -070090// MultiError is used to contain multi error.
91type MultiError struct {
92 Errors *[]error
93}
94
95func (mErr MultiError) Error() string {
khenaidood948f772021-08-11 17:49:24 -040096 errString := ""
Scott Baker8461e152019-10-01 14:44:30 -070097 for _, err := range *mErr.Errors {
98 errString += err.Error() + ","
99 }
100 return errString
101}
102
khenaidood948f772021-08-11 17:49:24 -0400103func (mErr MultiError) PrettyError() string {
104 errString := ""
105 for _, err := range *mErr.Errors {
106 errString += err.Error() + "\n"
107 }
108 return errString
109}
110
Scott Baker8461e152019-10-01 14:44:30 -0700111// ErrDeleteRecords is the type of error returned when fail to delete the required records
112type ErrDeleteRecords struct {
113 MultiError
114}
115
116func (err ErrDeleteRecords) Error() string {
117 return "kafka server: failed to delete records " + err.MultiError.Error()
118}
119
khenaidood948f772021-08-11 17:49:24 -0400120type ErrReassignPartitions struct {
121 MultiError
122}
123
124func (err ErrReassignPartitions) Error() string {
125 return fmt.Sprintf("failed to reassign partitions for topic: \n%s", err.MultiError.PrettyError())
126}
127
khenaidooac637102019-01-14 15:44:34 -0500128// Numeric error codes returned by the Kafka server.
129const (
130 ErrNoError KError = 0
131 ErrUnknown KError = -1
132 ErrOffsetOutOfRange KError = 1
133 ErrInvalidMessage KError = 2
134 ErrUnknownTopicOrPartition KError = 3
135 ErrInvalidMessageSize KError = 4
136 ErrLeaderNotAvailable KError = 5
137 ErrNotLeaderForPartition KError = 6
138 ErrRequestTimedOut KError = 7
139 ErrBrokerNotAvailable KError = 8
140 ErrReplicaNotAvailable KError = 9
141 ErrMessageSizeTooLarge KError = 10
142 ErrStaleControllerEpochCode KError = 11
143 ErrOffsetMetadataTooLarge KError = 12
144 ErrNetworkException KError = 13
145 ErrOffsetsLoadInProgress KError = 14
146 ErrConsumerCoordinatorNotAvailable KError = 15
147 ErrNotCoordinatorForConsumer KError = 16
148 ErrInvalidTopic KError = 17
149 ErrMessageSetSizeTooLarge KError = 18
150 ErrNotEnoughReplicas KError = 19
151 ErrNotEnoughReplicasAfterAppend KError = 20
152 ErrInvalidRequiredAcks KError = 21
153 ErrIllegalGeneration KError = 22
154 ErrInconsistentGroupProtocol KError = 23
155 ErrInvalidGroupId KError = 24
156 ErrUnknownMemberId KError = 25
157 ErrInvalidSessionTimeout KError = 26
158 ErrRebalanceInProgress KError = 27
159 ErrInvalidCommitOffsetSize KError = 28
160 ErrTopicAuthorizationFailed KError = 29
161 ErrGroupAuthorizationFailed KError = 30
162 ErrClusterAuthorizationFailed KError = 31
163 ErrInvalidTimestamp KError = 32
164 ErrUnsupportedSASLMechanism KError = 33
165 ErrIllegalSASLState KError = 34
166 ErrUnsupportedVersion KError = 35
167 ErrTopicAlreadyExists KError = 36
168 ErrInvalidPartitions KError = 37
169 ErrInvalidReplicationFactor KError = 38
170 ErrInvalidReplicaAssignment KError = 39
171 ErrInvalidConfig KError = 40
172 ErrNotController KError = 41
173 ErrInvalidRequest KError = 42
174 ErrUnsupportedForMessageFormat KError = 43
175 ErrPolicyViolation KError = 44
176 ErrOutOfOrderSequenceNumber KError = 45
177 ErrDuplicateSequenceNumber KError = 46
178 ErrInvalidProducerEpoch KError = 47
179 ErrInvalidTxnState KError = 48
180 ErrInvalidProducerIDMapping KError = 49
181 ErrInvalidTransactionTimeout KError = 50
182 ErrConcurrentTransactions KError = 51
183 ErrTransactionCoordinatorFenced KError = 52
184 ErrTransactionalIDAuthorizationFailed KError = 53
185 ErrSecurityDisabled KError = 54
186 ErrOperationNotAttempted KError = 55
187 ErrKafkaStorageError KError = 56
188 ErrLogDirNotFound KError = 57
189 ErrSASLAuthenticationFailed KError = 58
190 ErrUnknownProducerID KError = 59
191 ErrReassignmentInProgress KError = 60
192 ErrDelegationTokenAuthDisabled KError = 61
193 ErrDelegationTokenNotFound KError = 62
194 ErrDelegationTokenOwnerMismatch KError = 63
195 ErrDelegationTokenRequestNotAllowed KError = 64
196 ErrDelegationTokenAuthorizationFailed KError = 65
197 ErrDelegationTokenExpired KError = 66
198 ErrInvalidPrincipalType KError = 67
199 ErrNonEmptyGroup KError = 68
200 ErrGroupIDNotFound KError = 69
201 ErrFetchSessionIDNotFound KError = 70
202 ErrInvalidFetchSessionEpoch KError = 71
203 ErrListenerNotFound KError = 72
William Kurkiandaa6bb22019-03-07 12:26:28 -0500204 ErrTopicDeletionDisabled KError = 73
205 ErrFencedLeaderEpoch KError = 74
206 ErrUnknownLeaderEpoch KError = 75
207 ErrUnsupportedCompressionType KError = 76
Scott Baker8461e152019-10-01 14:44:30 -0700208 ErrStaleBrokerEpoch KError = 77
209 ErrOffsetNotAvailable KError = 78
210 ErrMemberIdRequired KError = 79
211 ErrPreferredLeaderNotAvailable KError = 80
212 ErrGroupMaxSizeReached KError = 81
khenaidood948f772021-08-11 17:49:24 -0400213 ErrFencedInstancedId KError = 82
214 ErrEligibleLeadersNotAvailable KError = 83
215 ErrElectionNotNeeded KError = 84
216 ErrNoReassignmentInProgress KError = 85
217 ErrGroupSubscribedToTopic KError = 86
218 ErrInvalidRecord KError = 87
219 ErrUnstableOffsetCommit KError = 88
khenaidooac637102019-01-14 15:44:34 -0500220)
221
222func (err KError) Error() string {
223 // Error messages stolen/adapted from
224 // https://kafka.apache.org/protocol#protocol_error_codes
225 switch err {
226 case ErrNoError:
227 return "kafka server: Not an error, why are you printing me?"
228 case ErrUnknown:
229 return "kafka server: Unexpected (unknown?) server error."
230 case ErrOffsetOutOfRange:
231 return "kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition."
232 case ErrInvalidMessage:
233 return "kafka server: Message contents does not match its CRC."
234 case ErrUnknownTopicOrPartition:
235 return "kafka server: Request was for a topic or partition that does not exist on this broker."
236 case ErrInvalidMessageSize:
237 return "kafka server: The message has a negative size."
238 case ErrLeaderNotAvailable:
239 return "kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes."
240 case ErrNotLeaderForPartition:
241 return "kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date."
242 case ErrRequestTimedOut:
243 return "kafka server: Request exceeded the user-specified time limit in the request."
244 case ErrBrokerNotAvailable:
245 return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
246 case ErrReplicaNotAvailable:
247 return "kafka server: Replica information not available, one or more brokers are down."
248 case ErrMessageSizeTooLarge:
249 return "kafka server: Message was too large, server rejected it to avoid allocation error."
250 case ErrStaleControllerEpochCode:
251 return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)."
252 case ErrOffsetMetadataTooLarge:
253 return "kafka server: Specified a string larger than the configured maximum for offset metadata."
254 case ErrNetworkException:
255 return "kafka server: The server disconnected before a response was received."
256 case ErrOffsetsLoadInProgress:
257 return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition."
258 case ErrConsumerCoordinatorNotAvailable:
259 return "kafka server: Offset's topic has not yet been created."
260 case ErrNotCoordinatorForConsumer:
261 return "kafka server: Request was for a consumer group that is not coordinated by this broker."
262 case ErrInvalidTopic:
263 return "kafka server: The request attempted to perform an operation on an invalid topic."
264 case ErrMessageSetSizeTooLarge:
265 return "kafka server: The request included message batch larger than the configured segment size on the server."
266 case ErrNotEnoughReplicas:
267 return "kafka server: Messages are rejected since there are fewer in-sync replicas than required."
268 case ErrNotEnoughReplicasAfterAppend:
269 return "kafka server: Messages are written to the log, but to fewer in-sync replicas than required."
270 case ErrInvalidRequiredAcks:
271 return "kafka server: The number of required acks is invalid (should be either -1, 0, or 1)."
272 case ErrIllegalGeneration:
273 return "kafka server: The provided generation id is not the current generation."
274 case ErrInconsistentGroupProtocol:
275 return "kafka server: The provider group protocol type is incompatible with the other members."
276 case ErrInvalidGroupId:
277 return "kafka server: The provided group id was empty."
278 case ErrUnknownMemberId:
279 return "kafka server: The provided member is not known in the current generation."
280 case ErrInvalidSessionTimeout:
281 return "kafka server: The provided session timeout is outside the allowed range."
282 case ErrRebalanceInProgress:
283 return "kafka server: A rebalance for the group is in progress. Please re-join the group."
284 case ErrInvalidCommitOffsetSize:
285 return "kafka server: The provided commit metadata was too large."
286 case ErrTopicAuthorizationFailed:
287 return "kafka server: The client is not authorized to access this topic."
288 case ErrGroupAuthorizationFailed:
289 return "kafka server: The client is not authorized to access this group."
290 case ErrClusterAuthorizationFailed:
291 return "kafka server: The client is not authorized to send this request type."
292 case ErrInvalidTimestamp:
293 return "kafka server: The timestamp of the message is out of acceptable range."
294 case ErrUnsupportedSASLMechanism:
295 return "kafka server: The broker does not support the requested SASL mechanism."
296 case ErrIllegalSASLState:
297 return "kafka server: Request is not valid given the current SASL state."
298 case ErrUnsupportedVersion:
299 return "kafka server: The version of API is not supported."
300 case ErrTopicAlreadyExists:
301 return "kafka server: Topic with this name already exists."
302 case ErrInvalidPartitions:
303 return "kafka server: Number of partitions is invalid."
304 case ErrInvalidReplicationFactor:
305 return "kafka server: Replication-factor is invalid."
306 case ErrInvalidReplicaAssignment:
307 return "kafka server: Replica assignment is invalid."
308 case ErrInvalidConfig:
309 return "kafka server: Configuration is invalid."
310 case ErrNotController:
311 return "kafka server: This is not the correct controller for this cluster."
312 case ErrInvalidRequest:
313 return "kafka server: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details."
314 case ErrUnsupportedForMessageFormat:
315 return "kafka server: The requested operation is not supported by the message format version."
316 case ErrPolicyViolation:
317 return "kafka server: Request parameters do not satisfy the configured policy."
318 case ErrOutOfOrderSequenceNumber:
319 return "kafka server: The broker received an out of order sequence number."
320 case ErrDuplicateSequenceNumber:
321 return "kafka server: The broker received a duplicate sequence number."
322 case ErrInvalidProducerEpoch:
323 return "kafka server: Producer attempted an operation with an old epoch."
324 case ErrInvalidTxnState:
325 return "kafka server: The producer attempted a transactional operation in an invalid state."
326 case ErrInvalidProducerIDMapping:
327 return "kafka server: The producer attempted to use a producer id which is not currently assigned to its transactional id."
328 case ErrInvalidTransactionTimeout:
329 return "kafka server: The transaction timeout is larger than the maximum value allowed by the broker (as configured by max.transaction.timeout.ms)."
330 case ErrConcurrentTransactions:
331 return "kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing."
332 case ErrTransactionCoordinatorFenced:
333 return "kafka server: The transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer."
334 case ErrTransactionalIDAuthorizationFailed:
335 return "kafka server: Transactional ID authorization failed."
336 case ErrSecurityDisabled:
337 return "kafka server: Security features are disabled."
338 case ErrOperationNotAttempted:
339 return "kafka server: The broker did not attempt to execute this operation."
340 case ErrKafkaStorageError:
341 return "kafka server: Disk error when trying to access log file on the disk."
342 case ErrLogDirNotFound:
343 return "kafka server: The specified log directory is not found in the broker config."
344 case ErrSASLAuthenticationFailed:
345 return "kafka server: SASL Authentication failed."
346 case ErrUnknownProducerID:
347 return "kafka server: The broker could not locate the producer metadata associated with the Producer ID."
348 case ErrReassignmentInProgress:
349 return "kafka server: A partition reassignment is in progress."
350 case ErrDelegationTokenAuthDisabled:
351 return "kafka server: Delegation Token feature is not enabled."
352 case ErrDelegationTokenNotFound:
353 return "kafka server: Delegation Token is not found on server."
354 case ErrDelegationTokenOwnerMismatch:
355 return "kafka server: Specified Principal is not valid Owner/Renewer."
356 case ErrDelegationTokenRequestNotAllowed:
357 return "kafka server: Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on delegation token authenticated channels."
358 case ErrDelegationTokenAuthorizationFailed:
359 return "kafka server: Delegation Token authorization failed."
360 case ErrDelegationTokenExpired:
361 return "kafka server: Delegation Token is expired."
362 case ErrInvalidPrincipalType:
363 return "kafka server: Supplied principalType is not supported."
364 case ErrNonEmptyGroup:
365 return "kafka server: The group is not empty."
366 case ErrGroupIDNotFound:
367 return "kafka server: The group id does not exist."
368 case ErrFetchSessionIDNotFound:
369 return "kafka server: The fetch session ID was not found."
370 case ErrInvalidFetchSessionEpoch:
371 return "kafka server: The fetch session epoch is invalid."
372 case ErrListenerNotFound:
373 return "kafka server: There is no listener on the leader broker that matches the listener on which metadata request was processed."
William Kurkiandaa6bb22019-03-07 12:26:28 -0500374 case ErrTopicDeletionDisabled:
375 return "kafka server: Topic deletion is disabled."
376 case ErrFencedLeaderEpoch:
377 return "kafka server: The leader epoch in the request is older than the epoch on the broker."
378 case ErrUnknownLeaderEpoch:
379 return "kafka server: The leader epoch in the request is newer than the epoch on the broker."
380 case ErrUnsupportedCompressionType:
381 return "kafka server: The requesting client does not support the compression type of given partition."
Scott Baker8461e152019-10-01 14:44:30 -0700382 case ErrStaleBrokerEpoch:
383 return "kafka server: Broker epoch has changed"
384 case ErrOffsetNotAvailable:
385 return "kafka server: The leader high watermark has not caught up from a recent leader election so the offsets cannot be guaranteed to be monotonically increasing"
386 case ErrMemberIdRequired:
387 return "kafka server: The group member needs to have a valid member id before actually entering a consumer group"
388 case ErrPreferredLeaderNotAvailable:
389 return "kafka server: The preferred leader was not available"
390 case ErrGroupMaxSizeReached:
391 return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members."
khenaidood948f772021-08-11 17:49:24 -0400392 case ErrFencedInstancedId:
393 return "kafka server: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id."
394 case ErrEligibleLeadersNotAvailable:
395 return "kafka server: Eligible topic partition leaders are not available."
396 case ErrElectionNotNeeded:
397 return "kafka server: Leader election not needed for topic partition."
398 case ErrNoReassignmentInProgress:
399 return "kafka server: No partition reassignment is in progress."
400 case ErrGroupSubscribedToTopic:
401 return "kafka server: Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it."
402 case ErrInvalidRecord:
403 return "kafka server: This record has failed the validation on broker and hence will be rejected."
404 case ErrUnstableOffsetCommit:
405 return "kafka server: There are unstable offsets that need to be cleared."
khenaidooac637102019-01-14 15:44:34 -0500406 }
407
408 return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
409}