blob: 0016f8fbecdba96cb86279900ed0253b8aeaa383 [file] [log] [blame]
Scott Bakereee8dd82019-09-24 12:52:34 -07001package sarama
2
3import (
4 "math/rand"
5 "sort"
6 "sync"
7 "time"
8)
9
10// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
11// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
12// automatically when it passes out of scope. It is safe to share a client amongst many
13// users, however Kafka will process requests from a single client strictly in serial,
14// so it is generally more efficient to use the default one client per producer/consumer.
15type Client interface {
16 // Config returns the Config struct of the client. This struct should not be
17 // altered after it has been created.
18 Config() *Config
19
20 // Controller returns the cluster controller broker. Requires Kafka 0.10 or higher.
21 Controller() (*Broker, error)
22
23 // Brokers returns the current set of active brokers as retrieved from cluster metadata.
24 Brokers() []*Broker
25
26 // Topics returns the set of available topics as retrieved from cluster metadata.
27 Topics() ([]string, error)
28
29 // Partitions returns the sorted list of all partition IDs for the given topic.
30 Partitions(topic string) ([]int32, error)
31
32 // WritablePartitions returns the sorted list of all writable partition IDs for
33 // the given topic, where "writable" means "having a valid leader accepting
34 // writes".
35 WritablePartitions(topic string) ([]int32, error)
36
37 // Leader returns the broker object that is the leader of the current
38 // topic/partition, as determined by querying the cluster metadata.
39 Leader(topic string, partitionID int32) (*Broker, error)
40
41 // Replicas returns the set of all replica IDs for the given partition.
42 Replicas(topic string, partitionID int32) ([]int32, error)
43
44 // InSyncReplicas returns the set of all in-sync replica IDs for the given
45 // partition. In-sync replicas are replicas which are fully caught up with
46 // the partition leader.
47 InSyncReplicas(topic string, partitionID int32) ([]int32, error)
48
49 // RefreshMetadata takes a list of topics and queries the cluster to refresh the
50 // available metadata for those topics. If no topics are provided, it will refresh
51 // metadata for all topics.
52 RefreshMetadata(topics ...string) error
53
54 // GetOffset queries the cluster to get the most recent available offset at the
55 // given time (in milliseconds) on the topic/partition combination.
56 // Time should be OffsetOldest for the earliest available offset,
57 // OffsetNewest for the offset of the message that will be produced next, or a time.
58 GetOffset(topic string, partitionID int32, time int64) (int64, error)
59
60 // Coordinator returns the coordinating broker for a consumer group. It will
61 // return a locally cached value if it's available. You can call
62 // RefreshCoordinator to update the cached value. This function only works on
63 // Kafka 0.8.2 and higher.
64 Coordinator(consumerGroup string) (*Broker, error)
65
66 // RefreshCoordinator retrieves the coordinator for a consumer group and stores it
67 // in local cache. This function only works on Kafka 0.8.2 and higher.
68 RefreshCoordinator(consumerGroup string) error
69
70 // InitProducerID retrieves information required for Idempotent Producer
71 InitProducerID() (*InitProducerIDResponse, error)
72
73 // Close shuts down all broker connections managed by this client. It is required
74 // to call this function before a client object passes out of scope, as it will
75 // otherwise leak memory. You must close any Producers or Consumers using a client
76 // before you close the client.
77 Close() error
78
79 // Closed returns true if the client has already had Close called on it
80 Closed() bool
81}
82
83const (
84 // OffsetNewest stands for the log head offset, i.e. the offset that will be
85 // assigned to the next message that will be produced to the partition. You
86 // can send this to a client's GetOffset method to get this offset, or when
87 // calling ConsumePartition to start consuming new messages.
88 OffsetNewest int64 = -1
89 // OffsetOldest stands for the oldest offset available on the broker for a
90 // partition. You can send this to a client's GetOffset method to get this
91 // offset, or when calling ConsumePartition to start consuming from the
92 // oldest offset that is still available on the broker.
93 OffsetOldest int64 = -2
94)
95
96type client struct {
97 conf *Config
98 closer, closed chan none // for shutting down background metadata updater
99
100 // the broker addresses given to us through the constructor are not guaranteed to be returned in
101 // the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
102 // so we store them separately
103 seedBrokers []*Broker
104 deadSeeds []*Broker
105
106 controllerID int32 // cluster controller broker id
107 brokers map[int32]*Broker // maps broker ids to brokers
108 metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
109 metadataTopics map[string]none // topics that need to collect metadata
110 coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs
111
112 // If the number of partitions is large, we can get some churn calling cachedPartitions,
113 // so the result is cached. It is important to update this value whenever metadata is changed
114 cachedPartitionsResults map[string][maxPartitionIndex][]int32
115
116 lock sync.RWMutex // protects access to the maps that hold cluster state.
117}
118
119// NewClient creates a new Client. It connects to one of the given broker addresses
120// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
121// be retrieved from any of the given broker addresses, the client is not created.
122func NewClient(addrs []string, conf *Config) (Client, error) {
123 Logger.Println("Initializing new client")
124
125 if conf == nil {
126 conf = NewConfig()
127 }
128
129 if err := conf.Validate(); err != nil {
130 return nil, err
131 }
132
133 if len(addrs) < 1 {
134 return nil, ConfigurationError("You must provide at least one broker address")
135 }
136
137 client := &client{
138 conf: conf,
139 closer: make(chan none),
140 closed: make(chan none),
141 brokers: make(map[int32]*Broker),
142 metadata: make(map[string]map[int32]*PartitionMetadata),
143 metadataTopics: make(map[string]none),
144 cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
145 coordinators: make(map[string]int32),
146 }
147
148 random := rand.New(rand.NewSource(time.Now().UnixNano()))
149 for _, index := range random.Perm(len(addrs)) {
150 client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
151 }
152
153 if conf.Metadata.Full {
154 // do an initial fetch of all cluster metadata by specifying an empty list of topics
155 err := client.RefreshMetadata()
156 switch err {
157 case nil:
158 break
159 case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
160 // indicates that maybe part of the cluster is down, but is not fatal to creating the client
161 Logger.Println(err)
162 default:
163 close(client.closed) // we haven't started the background updater yet, so we have to do this manually
164 _ = client.Close()
165 return nil, err
166 }
167 }
168 go withRecover(client.backgroundMetadataUpdater)
169
170 Logger.Println("Successfully initialized new client")
171
172 return client, nil
173}
174
175func (client *client) Config() *Config {
176 return client.conf
177}
178
179func (client *client) Brokers() []*Broker {
180 client.lock.RLock()
181 defer client.lock.RUnlock()
182 brokers := make([]*Broker, 0, len(client.brokers))
183 for _, broker := range client.brokers {
184 brokers = append(brokers, broker)
185 }
186 return brokers
187}
188
189func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
190 var err error
191 for broker := client.any(); broker != nil; broker = client.any() {
192
193 req := &InitProducerIDRequest{}
194
195 response, err := broker.InitProducerID(req)
196 switch err.(type) {
197 case nil:
198 return response, nil
199 default:
200 // some error, remove that broker and try again
201 Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err)
202 _ = broker.Close()
203 client.deregisterBroker(broker)
204 }
205 }
206 return nil, err
207}
208
209func (client *client) Close() error {
210 if client.Closed() {
211 // Chances are this is being called from a defer() and the error will go unobserved
212 // so we go ahead and log the event in this case.
213 Logger.Printf("Close() called on already closed client")
214 return ErrClosedClient
215 }
216
217 // shutdown and wait for the background thread before we take the lock, to avoid races
218 close(client.closer)
219 <-client.closed
220
221 client.lock.Lock()
222 defer client.lock.Unlock()
223 Logger.Println("Closing Client")
224
225 for _, broker := range client.brokers {
226 safeAsyncClose(broker)
227 }
228
229 for _, broker := range client.seedBrokers {
230 safeAsyncClose(broker)
231 }
232
233 client.brokers = nil
234 client.metadata = nil
235 client.metadataTopics = nil
236
237 return nil
238}
239
240func (client *client) Closed() bool {
241 return client.brokers == nil
242}
243
244func (client *client) Topics() ([]string, error) {
245 if client.Closed() {
246 return nil, ErrClosedClient
247 }
248
249 client.lock.RLock()
250 defer client.lock.RUnlock()
251
252 ret := make([]string, 0, len(client.metadata))
253 for topic := range client.metadata {
254 ret = append(ret, topic)
255 }
256
257 return ret, nil
258}
259
260func (client *client) MetadataTopics() ([]string, error) {
261 if client.Closed() {
262 return nil, ErrClosedClient
263 }
264
265 client.lock.RLock()
266 defer client.lock.RUnlock()
267
268 ret := make([]string, 0, len(client.metadataTopics))
269 for topic := range client.metadataTopics {
270 ret = append(ret, topic)
271 }
272
273 return ret, nil
274}
275
276func (client *client) Partitions(topic string) ([]int32, error) {
277 if client.Closed() {
278 return nil, ErrClosedClient
279 }
280
281 partitions := client.cachedPartitions(topic, allPartitions)
282
283 if len(partitions) == 0 {
284 err := client.RefreshMetadata(topic)
285 if err != nil {
286 return nil, err
287 }
288 partitions = client.cachedPartitions(topic, allPartitions)
289 }
290
291 if partitions == nil {
292 return nil, ErrUnknownTopicOrPartition
293 }
294
295 return partitions, nil
296}
297
298func (client *client) WritablePartitions(topic string) ([]int32, error) {
299 if client.Closed() {
300 return nil, ErrClosedClient
301 }
302
303 partitions := client.cachedPartitions(topic, writablePartitions)
304
305 // len==0 catches when it's nil (no such topic) and the odd case when every single
306 // partition is undergoing leader election simultaneously. Callers have to be able to handle
307 // this function returning an empty slice (which is a valid return value) but catching it
308 // here the first time (note we *don't* catch it below where we return ErrUnknownTopicOrPartition) triggers
309 // a metadata refresh as a nicety so callers can just try again and don't have to manually
310 // trigger a refresh (otherwise they'd just keep getting a stale cached copy).
311 if len(partitions) == 0 {
312 err := client.RefreshMetadata(topic)
313 if err != nil {
314 return nil, err
315 }
316 partitions = client.cachedPartitions(topic, writablePartitions)
317 }
318
319 if partitions == nil {
320 return nil, ErrUnknownTopicOrPartition
321 }
322
323 return partitions, nil
324}
325
326func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) {
327 if client.Closed() {
328 return nil, ErrClosedClient
329 }
330
331 metadata := client.cachedMetadata(topic, partitionID)
332
333 if metadata == nil {
334 err := client.RefreshMetadata(topic)
335 if err != nil {
336 return nil, err
337 }
338 metadata = client.cachedMetadata(topic, partitionID)
339 }
340
341 if metadata == nil {
342 return nil, ErrUnknownTopicOrPartition
343 }
344
345 if metadata.Err == ErrReplicaNotAvailable {
346 return dupInt32Slice(metadata.Replicas), metadata.Err
347 }
348 return dupInt32Slice(metadata.Replicas), nil
349}
350
351func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
352 if client.Closed() {
353 return nil, ErrClosedClient
354 }
355
356 metadata := client.cachedMetadata(topic, partitionID)
357
358 if metadata == nil {
359 err := client.RefreshMetadata(topic)
360 if err != nil {
361 return nil, err
362 }
363 metadata = client.cachedMetadata(topic, partitionID)
364 }
365
366 if metadata == nil {
367 return nil, ErrUnknownTopicOrPartition
368 }
369
370 if metadata.Err == ErrReplicaNotAvailable {
371 return dupInt32Slice(metadata.Isr), metadata.Err
372 }
373 return dupInt32Slice(metadata.Isr), nil
374}
375
376func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
377 if client.Closed() {
378 return nil, ErrClosedClient
379 }
380
381 leader, err := client.cachedLeader(topic, partitionID)
382
383 if leader == nil {
384 err = client.RefreshMetadata(topic)
385 if err != nil {
386 return nil, err
387 }
388 leader, err = client.cachedLeader(topic, partitionID)
389 }
390
391 return leader, err
392}
393
394func (client *client) RefreshMetadata(topics ...string) error {
395 if client.Closed() {
396 return ErrClosedClient
397 }
398
399 // Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper
400 // error. This handles the case by returning an error instead of sending it
401 // off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
402 for _, topic := range topics {
403 if len(topic) == 0 {
404 return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
405 }
406 }
407
408 return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
409}
410
411func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
412 if client.Closed() {
413 return -1, ErrClosedClient
414 }
415
416 offset, err := client.getOffset(topic, partitionID, time)
417
418 if err != nil {
419 if err := client.RefreshMetadata(topic); err != nil {
420 return -1, err
421 }
422 return client.getOffset(topic, partitionID, time)
423 }
424
425 return offset, err
426}
427
428func (client *client) Controller() (*Broker, error) {
429 if client.Closed() {
430 return nil, ErrClosedClient
431 }
432
433 if !client.conf.Version.IsAtLeast(V0_10_0_0) {
434 return nil, ErrUnsupportedVersion
435 }
436
437 controller := client.cachedController()
438 if controller == nil {
439 if err := client.refreshMetadata(); err != nil {
440 return nil, err
441 }
442 controller = client.cachedController()
443 }
444
445 if controller == nil {
446 return nil, ErrControllerNotAvailable
447 }
448
449 _ = controller.Open(client.conf)
450 return controller, nil
451}
452
453func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
454 if client.Closed() {
455 return nil, ErrClosedClient
456 }
457
458 coordinator := client.cachedCoordinator(consumerGroup)
459
460 if coordinator == nil {
461 if err := client.RefreshCoordinator(consumerGroup); err != nil {
462 return nil, err
463 }
464 coordinator = client.cachedCoordinator(consumerGroup)
465 }
466
467 if coordinator == nil {
468 return nil, ErrConsumerCoordinatorNotAvailable
469 }
470
471 _ = coordinator.Open(client.conf)
472 return coordinator, nil
473}
474
475func (client *client) RefreshCoordinator(consumerGroup string) error {
476 if client.Closed() {
477 return ErrClosedClient
478 }
479
480 response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max)
481 if err != nil {
482 return err
483 }
484
485 client.lock.Lock()
486 defer client.lock.Unlock()
487 client.registerBroker(response.Coordinator)
488 client.coordinators[consumerGroup] = response.Coordinator.ID()
489 return nil
490}
491
492// private broker management helpers
493
494// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
495// in the brokers map. It returns the broker that is registered, which may be the provided broker,
496// or a previously registered Broker instance. You must hold the write lock before calling this function.
497func (client *client) registerBroker(broker *Broker) {
498 if client.brokers[broker.ID()] == nil {
499 client.brokers[broker.ID()] = broker
500 Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
501 } else if broker.Addr() != client.brokers[broker.ID()].Addr() {
502 safeAsyncClose(client.brokers[broker.ID()])
503 client.brokers[broker.ID()] = broker
504 Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
505 }
506}
507
508// deregisterBroker removes a broker from the seedsBroker list, and if it's
509// not the seedbroker, removes it from brokers map completely.
510func (client *client) deregisterBroker(broker *Broker) {
511 client.lock.Lock()
512 defer client.lock.Unlock()
513
514 if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
515 client.deadSeeds = append(client.deadSeeds, broker)
516 client.seedBrokers = client.seedBrokers[1:]
517 } else {
518 // we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
519 // but we really shouldn't have to; once that loop is made better this case can be
520 // removed, and the function generally can be renamed from `deregisterBroker` to
521 // `nextSeedBroker` or something
522 Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
523 delete(client.brokers, broker.ID())
524 }
525}
526
527func (client *client) resurrectDeadBrokers() {
528 client.lock.Lock()
529 defer client.lock.Unlock()
530
531 Logger.Printf("client/brokers resurrecting %d dead seed brokers", len(client.deadSeeds))
532 client.seedBrokers = append(client.seedBrokers, client.deadSeeds...)
533 client.deadSeeds = nil
534}
535
536func (client *client) any() *Broker {
537 client.lock.RLock()
538 defer client.lock.RUnlock()
539
540 if len(client.seedBrokers) > 0 {
541 _ = client.seedBrokers[0].Open(client.conf)
542 return client.seedBrokers[0]
543 }
544
545 // not guaranteed to be random *or* deterministic
546 for _, broker := range client.brokers {
547 _ = broker.Open(client.conf)
548 return broker
549 }
550
551 return nil
552}
553
554// private caching/lazy metadata helpers
555
556type partitionType int
557
558const (
559 allPartitions partitionType = iota
560 writablePartitions
561 // If you add any more types, update the partition cache in update()
562
563 // Ensure this is the last partition type value
564 maxPartitionIndex
565)
566
567func (client *client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
568 client.lock.RLock()
569 defer client.lock.RUnlock()
570
571 partitions := client.metadata[topic]
572 if partitions != nil {
573 return partitions[partitionID]
574 }
575
576 return nil
577}
578
579func (client *client) cachedPartitions(topic string, partitionSet partitionType) []int32 {
580 client.lock.RLock()
581 defer client.lock.RUnlock()
582
583 partitions, exists := client.cachedPartitionsResults[topic]
584
585 if !exists {
586 return nil
587 }
588 return partitions[partitionSet]
589}
590
591func (client *client) setPartitionCache(topic string, partitionSet partitionType) []int32 {
592 partitions := client.metadata[topic]
593
594 if partitions == nil {
595 return nil
596 }
597
598 ret := make([]int32, 0, len(partitions))
599 for _, partition := range partitions {
600 if partitionSet == writablePartitions && partition.Err == ErrLeaderNotAvailable {
601 continue
602 }
603 ret = append(ret, partition.ID)
604 }
605
606 sort.Sort(int32Slice(ret))
607 return ret
608}
609
610func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
611 client.lock.RLock()
612 defer client.lock.RUnlock()
613
614 partitions := client.metadata[topic]
615 if partitions != nil {
616 metadata, ok := partitions[partitionID]
617 if ok {
618 if metadata.Err == ErrLeaderNotAvailable {
619 return nil, ErrLeaderNotAvailable
620 }
621 b := client.brokers[metadata.Leader]
622 if b == nil {
623 return nil, ErrLeaderNotAvailable
624 }
625 _ = b.Open(client.conf)
626 return b, nil
627 }
628 }
629
630 return nil, ErrUnknownTopicOrPartition
631}
632
633func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
634 broker, err := client.Leader(topic, partitionID)
635 if err != nil {
636 return -1, err
637 }
638
639 request := &OffsetRequest{}
640 if client.conf.Version.IsAtLeast(V0_10_1_0) {
641 request.Version = 1
642 }
643 request.AddBlock(topic, partitionID, time, 1)
644
645 response, err := broker.GetAvailableOffsets(request)
646 if err != nil {
647 _ = broker.Close()
648 return -1, err
649 }
650
651 block := response.GetBlock(topic, partitionID)
652 if block == nil {
653 _ = broker.Close()
654 return -1, ErrIncompleteResponse
655 }
656 if block.Err != ErrNoError {
657 return -1, block.Err
658 }
659 if len(block.Offsets) != 1 {
660 return -1, ErrOffsetOutOfRange
661 }
662
663 return block.Offsets[0], nil
664}
665
666// core metadata update logic
667
668func (client *client) backgroundMetadataUpdater() {
669 defer close(client.closed)
670
671 if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
672 return
673 }
674
675 ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
676 defer ticker.Stop()
677
678 for {
679 select {
680 case <-ticker.C:
681 if err := client.refreshMetadata(); err != nil {
682 Logger.Println("Client background metadata update:", err)
683 }
684 case <-client.closer:
685 return
686 }
687 }
688}
689
690func (client *client) refreshMetadata() error {
691 topics := []string{}
692
693 if !client.conf.Metadata.Full {
694 if specificTopics, err := client.MetadataTopics(); err != nil {
695 return err
696 } else if len(specificTopics) == 0 {
697 return ErrNoTopicsToUpdateMetadata
698 } else {
699 topics = specificTopics
700 }
701 }
702
703 if err := client.RefreshMetadata(topics...); err != nil {
704 return err
705 }
706
707 return nil
708}
709
710func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
711 retry := func(err error) error {
712 if attemptsRemaining > 0 {
713 backoff := client.computeBackoff(attemptsRemaining)
714 Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
715 if backoff > 0 {
716 time.Sleep(backoff)
717 }
718 return client.tryRefreshMetadata(topics, attemptsRemaining-1)
719 }
720 return err
721 }
722
723 for broker := client.any(); broker != nil; broker = client.any() {
724 if len(topics) > 0 {
725 Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
726 } else {
727 Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
728 }
729
730 req := &MetadataRequest{Topics: topics}
731 if client.conf.Version.IsAtLeast(V0_10_0_0) {
732 req.Version = 1
733 }
734 response, err := broker.GetMetadata(req)
735
736 switch err.(type) {
737 case nil:
738 allKnownMetaData := len(topics) == 0
739 // valid response, use it
740 shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
741 if shouldRetry {
742 Logger.Println("client/metadata found some partitions to be leaderless")
743 return retry(err) // note: err can be nil
744 }
745 return err
746
747 case PacketEncodingError:
748 // didn't even send, return the error
749 return err
750 default:
751 // some other error, remove that broker and try again
752 Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
753 _ = broker.Close()
754 client.deregisterBroker(broker)
755 }
756 }
757
758 Logger.Println("client/metadata no available broker to send metadata request to")
759 client.resurrectDeadBrokers()
760 return retry(ErrOutOfBrokers)
761}
762
763// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
764func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
765 client.lock.Lock()
766 defer client.lock.Unlock()
767
768 // For all the brokers we received:
769 // - if it is a new ID, save it
770 // - if it is an existing ID, but the address we have is stale, discard the old one and save it
771 // - otherwise ignore it, replacing our existing one would just bounce the connection
772 for _, broker := range data.Brokers {
773 client.registerBroker(broker)
774 }
775
776 client.controllerID = data.ControllerID
777
778 if allKnownMetaData {
779 client.metadata = make(map[string]map[int32]*PartitionMetadata)
780 client.metadataTopics = make(map[string]none)
781 client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
782 }
783 for _, topic := range data.Topics {
784 // topics must be added firstly to `metadataTopics` to guarantee that all
785 // requested topics must be recorded to keep them trackable for periodically
786 // metadata refresh.
787 if _, exists := client.metadataTopics[topic.Name]; !exists {
788 client.metadataTopics[topic.Name] = none{}
789 }
790 delete(client.metadata, topic.Name)
791 delete(client.cachedPartitionsResults, topic.Name)
792
793 switch topic.Err {
794 case ErrNoError:
795 break
796 case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
797 err = topic.Err
798 continue
799 case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
800 err = topic.Err
801 retry = true
802 continue
803 case ErrLeaderNotAvailable: // retry, but store partial partition results
804 retry = true
805 break
806 default: // don't retry, don't store partial results
807 Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
808 err = topic.Err
809 continue
810 }
811
812 client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
813 for _, partition := range topic.Partitions {
814 client.metadata[topic.Name][partition.ID] = partition
815 if partition.Err == ErrLeaderNotAvailable {
816 retry = true
817 }
818 }
819
820 var partitionCache [maxPartitionIndex][]int32
821 partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
822 partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
823 client.cachedPartitionsResults[topic.Name] = partitionCache
824 }
825
826 return
827}
828
829func (client *client) cachedCoordinator(consumerGroup string) *Broker {
830 client.lock.RLock()
831 defer client.lock.RUnlock()
832 if coordinatorID, ok := client.coordinators[consumerGroup]; ok {
833 return client.brokers[coordinatorID]
834 }
835 return nil
836}
837
838func (client *client) cachedController() *Broker {
839 client.lock.RLock()
840 defer client.lock.RUnlock()
841
842 return client.brokers[client.controllerID]
843}
844
845func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
846 if client.conf.Metadata.Retry.BackoffFunc != nil {
847 maxRetries := client.conf.Metadata.Retry.Max
848 retries := maxRetries - attemptsRemaining
849 return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
850 } else {
851 return client.conf.Metadata.Retry.Backoff
852 }
853}
854
855func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
856 retry := func(err error) (*FindCoordinatorResponse, error) {
857 if attemptsRemaining > 0 {
858 backoff := client.computeBackoff(attemptsRemaining)
859 Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
860 time.Sleep(backoff)
861 return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1)
862 }
863 return nil, err
864 }
865
866 for broker := client.any(); broker != nil; broker = client.any() {
867 Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())
868
869 request := new(FindCoordinatorRequest)
870 request.CoordinatorKey = consumerGroup
871 request.CoordinatorType = CoordinatorGroup
872
873 response, err := broker.FindCoordinator(request)
874
875 if err != nil {
876 Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
877
878 switch err.(type) {
879 case PacketEncodingError:
880 return nil, err
881 default:
882 _ = broker.Close()
883 client.deregisterBroker(broker)
884 continue
885 }
886 }
887
888 switch response.Err {
889 case ErrNoError:
890 Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
891 return response, nil
892
893 case ErrConsumerCoordinatorNotAvailable:
894 Logger.Printf("client/coordinator coordinator for consumer group %s is not available\n", consumerGroup)
895
896 // This is very ugly, but this scenario will only happen once per cluster.
897 // The __consumer_offsets topic only has to be created one time.
898 // The number of partitions not configurable, but partition 0 should always exist.
899 if _, err := client.Leader("__consumer_offsets", 0); err != nil {
900 Logger.Printf("client/coordinator the __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
901 time.Sleep(2 * time.Second)
902 }
903
904 return retry(ErrConsumerCoordinatorNotAvailable)
905 default:
906 return nil, response.Err
907 }
908 }
909
910 Logger.Println("client/coordinator no available broker to send consumer metadata request to")
911 client.resurrectDeadBrokers()
912 return retry(ErrOutOfBrokers)
913}