[VOL-4292] OpenOLT Adapter changes for gRPC migration
Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go
index c4c54b2..c0918ba 100644
--- a/vendor/github.com/Shopify/sarama/client.go
+++ b/vendor/github.com/Shopify/sarama/client.go
@@ -17,12 +17,21 @@
// altered after it has been created.
Config() *Config
- // Controller returns the cluster controller broker. Requires Kafka 0.10 or higher.
+ // Controller returns the cluster controller broker. It will return a
+ // locally cached value if it's available. You can call RefreshController
+ // to update the cached value. Requires Kafka 0.10 or higher.
Controller() (*Broker, error)
+ // RefreshController retrieves the cluster controller from fresh metadata
+ // and stores it in the local cache. Requires Kafka 0.10 or higher.
+ RefreshController() (*Broker, error)
+
// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker
+ // Broker returns the active Broker if available for the broker ID.
+ Broker(brokerID int32) (*Broker, error)
+
// Topics returns the set of available topics as retrieved from cluster metadata.
Topics() ([]string, error)
@@ -50,6 +59,11 @@
// partition. Offline replicas are replicas which are offline
OfflineReplicas(topic string, partitionID int32) ([]int32, error)
+ // RefreshBrokers takes a list of addresses to be used as seed brokers.
+ // Existing broker connections are closed and the updated list of seed brokers
+ // will be used for the next metadata fetch.
+ RefreshBrokers(addrs []string) error
+
// RefreshMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics. If no topics are provided, it will refresh
// metadata for all topics.
@@ -149,10 +163,7 @@
coordinators: make(map[string]int32),
}
- random := rand.New(rand.NewSource(time.Now().UnixNano()))
- for _, index := range random.Perm(len(addrs)) {
- client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
- }
+ client.randomizeSeedBrokers(addrs)
if conf.Metadata.Full {
// do an initial fetch of all cluster metadata by specifying an empty list of topics
@@ -190,10 +201,20 @@
return brokers
}
+func (client *client) Broker(brokerID int32) (*Broker, error) {
+ client.lock.RLock()
+ defer client.lock.RUnlock()
+ broker, ok := client.brokers[brokerID]
+ if !ok {
+ return nil, ErrBrokerNotFound
+ }
+ _ = broker.Open(client.conf)
+ return broker, nil
+}
+
func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
var err error
for broker := client.any(); broker != nil; broker = client.any() {
-
req := &InitProducerIDRequest{}
response, err := broker.InitProducerID(req)
@@ -242,6 +263,9 @@
}
func (client *client) Closed() bool {
+ client.lock.RLock()
+ defer client.lock.RUnlock()
+
return client.brokers == nil
}
@@ -421,6 +445,27 @@
return leader, err
}
+func (client *client) RefreshBrokers(addrs []string) error {
+ if client.Closed() {
+ return ErrClosedClient
+ }
+
+ client.lock.Lock()
+ defer client.lock.Unlock()
+
+ for _, broker := range client.brokers {
+ _ = broker.Close()
+ delete(client.brokers, broker.ID())
+ }
+
+ client.seedBrokers = nil
+ client.deadSeeds = nil
+
+ client.randomizeSeedBrokers(addrs)
+
+ return nil
+}
+
func (client *client) RefreshMetadata(topics ...string) error {
if client.Closed() {
return ErrClosedClient
@@ -430,7 +475,7 @@
// error. This handles the case by returning an error instead of sending it
// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
for _, topic := range topics {
- if len(topic) == 0 {
+ if topic == "" {
return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
}
}
@@ -448,7 +493,6 @@
}
offset, err := client.getOffset(topic, partitionID, time)
-
if err != nil {
if err := client.RefreshMetadata(topic); err != nil {
return -1, err
@@ -484,6 +528,35 @@
return controller, nil
}
+// deregisterController removes the cached controllerID
+func (client *client) deregisterController() {
+ client.lock.Lock()
+ defer client.lock.Unlock()
+ delete(client.brokers, client.controllerID)
+}
+
+// RefreshController retrieves the cluster controller from fresh metadata
+// and stores it in the local cache. Requires Kafka 0.10 or higher.
+func (client *client) RefreshController() (*Broker, error) {
+ if client.Closed() {
+ return nil, ErrClosedClient
+ }
+
+ client.deregisterController()
+
+ if err := client.refreshMetadata(); err != nil {
+ return nil, err
+ }
+
+ controller := client.cachedController()
+ if controller == nil {
+ return nil, ErrControllerNotAvailable
+ }
+
+ _ = controller.Open(client.conf)
+ return controller, nil
+}
+
func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
@@ -525,10 +598,46 @@
// private broker management helpers
+func (client *client) randomizeSeedBrokers(addrs []string) {
+ random := rand.New(rand.NewSource(time.Now().UnixNano()))
+ for _, index := range random.Perm(len(addrs)) {
+ client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
+ }
+}
+
+func (client *client) updateBroker(brokers []*Broker) {
+ currentBroker := make(map[int32]*Broker, len(brokers))
+
+ for _, broker := range brokers {
+ currentBroker[broker.ID()] = broker
+ if client.brokers[broker.ID()] == nil { // add new broker
+ client.brokers[broker.ID()] = broker
+ Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
+ } else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address
+ safeAsyncClose(client.brokers[broker.ID()])
+ client.brokers[broker.ID()] = broker
+ Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
+ }
+ }
+
+ for id, broker := range client.brokers {
+ if _, exist := currentBroker[id]; !exist { // remove old broker
+ safeAsyncClose(broker)
+ delete(client.brokers, id)
+ Logger.Printf("client/broker remove invalid broker #%d with %s", broker.ID(), broker.Addr())
+ }
+ }
+}
+
// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
// or a previously registered Broker instance. You must hold the write lock before calling this function.
func (client *client) registerBroker(broker *Broker) {
+ if client.brokers == nil {
+ Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
+ return
+ }
+
if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
@@ -722,7 +831,7 @@
}
func (client *client) refreshMetadata() error {
- topics := []string{}
+ var topics []string
if !client.conf.Metadata.Full {
if specificTopics, err := client.MetadataTopics(); err != nil {
@@ -756,7 +865,7 @@
Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
return err
}
- Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
+ Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
if backoff > 0 {
time.Sleep(backoff)
}
@@ -782,7 +891,7 @@
req.Version = 1
}
response, err := broker.GetMetadata(req)
- switch err.(type) {
+ switch err := err.(type) {
case nil:
allKnownMetaData := len(topics) == 0
// valid response, use it
@@ -799,10 +908,15 @@
case KError:
// if SASL auth error return as this _should_ be a non retryable err for all brokers
- if err.(KError) == ErrSASLAuthenticationFailed {
+ if err == ErrSASLAuthenticationFailed {
Logger.Println("client/metadata failed SASL authentication")
return err
}
+
+ if err == ErrTopicAuthorizationFailed {
+ Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
+ return err
+ }
// else remove that broker and try again
Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
_ = broker.Close()
@@ -817,7 +931,7 @@
}
if broker != nil {
- Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
+ Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
return retry(ErrOutOfBrokers)
}
@@ -828,16 +942,19 @@
// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
+ if client.Closed() {
+ return
+ }
+
client.lock.Lock()
defer client.lock.Unlock()
// For all the brokers we received:
// - if it is a new ID, save it
// - if it is an existing ID, but the address we have is stale, discard the old one and save it
+ // - if some brokers is not exist in it, remove old broker
// - otherwise ignore it, replacing our existing one would just bounce the connection
- for _, broker := range data.Brokers {
- client.registerBroker(broker)
- }
+ client.updateBroker(data.Brokers)
client.controllerID = data.ControllerID
@@ -935,7 +1052,6 @@
request.CoordinatorType = CoordinatorGroup
response, err := broker.FindCoordinator(request)
-
if err != nil {
Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
@@ -966,6 +1082,10 @@
}
return retry(ErrConsumerCoordinatorNotAvailable)
+ case ErrGroupAuthorizationFailed:
+ Logger.Printf("client was not authorized to access group %s while attempting to find coordinator", consumerGroup)
+ return retry(ErrGroupAuthorizationFailed)
+
default:
return nil, response.Err
}