VOL-1691 Fix openolt adapter getting stuck while registartion with core
Change-Id: Ide8131f325bc15f1b909e14d7af6ee9bcd6b3b5b
diff --git a/vendor/gopkg.in/Shopify/sarama.v1/async_producer.go b/vendor/gopkg.in/Shopify/sarama.v1/async_producer.go
index 5174a35..11e0849 100644
--- a/vendor/gopkg.in/Shopify/sarama.v1/async_producer.go
+++ b/vendor/gopkg.in/Shopify/sarama.v1/async_producer.go
@@ -92,9 +92,8 @@
}
type asyncProducer struct {
- client Client
- conf *Config
- ownClient bool
+ client Client
+ conf *Config
errors chan *ProducerError
input, successes, retries chan *ProducerMessage
@@ -113,18 +112,19 @@
if err != nil {
return nil, err
}
-
- p, err := NewAsyncProducerFromClient(client)
- if err != nil {
- return nil, err
- }
- p.(*asyncProducer).ownClient = true
- return p, nil
+ return newAsyncProducer(client)
}
// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this producer.
func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
+ // For clients passed in by the client, ensure we don't
+ // call Close() on it.
+ cli := &nopCloserClient{client}
+ return newAsyncProducer(cli)
+}
+
+func newAsyncProducer(client Client) (AsyncProducer, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
@@ -191,10 +191,17 @@
// Partition is the partition that the message was sent to. This is only
// guaranteed to be defined if the message was successfully delivered.
Partition int32
- // Timestamp is the timestamp assigned to the message by the broker. This
- // is only guaranteed to be defined if the message was successfully
- // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
- // least version 0.10.0.
+ // Timestamp can vary in behaviour depending on broker configuration, being
+ // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
+ // and requiring version at least 0.10.0.
+ //
+ // When configured to CreateTime, the timestamp is specified by the producer
+ // either by explicitly setting this field, or when the message is added
+ // to a produce set.
+ //
+ // When configured to LogAppendTime, the timestamp assigned to the message
+ // by the broker. This is only guaranteed to be defined if the message was
+ // successfully delivered and RequiredAcks is not NoResponse.
Timestamp time.Time
retries int
@@ -999,11 +1006,9 @@
p.inFlight.Wait()
- if p.ownClient {
- err := p.client.Close()
- if err != nil {
- Logger.Println("producer/shutdown failed to close the embedded client:", err)
- }
+ err := p.client.Close()
+ if err != nil {
+ Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
close(p.input)