[VOL-4290] Voltha go library updates for gRPC migration
Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/vendor/github.com/Shopify/sarama/offset_manager.go b/vendor/github.com/Shopify/sarama/offset_manager.go
index 923972f..4f480a0 100644
--- a/vendor/github.com/Shopify/sarama/offset_manager.go
+++ b/vendor/github.com/Shopify/sarama/offset_manager.go
@@ -19,6 +19,10 @@
// will otherwise leak memory. You must call this after all the
// PartitionOffsetManagers are closed.
Close() error
+
+ // Commit commits the offsets. This method can be used if AutoCommit.Enable is
+ // set to false.
+ Commit()
}
type offsetManager struct {
@@ -58,7 +62,6 @@
client: client,
conf: conf,
group: group,
- ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
poms: make(map[string]map[int32]*partitionOffsetManager),
memberID: memberID,
@@ -67,7 +70,10 @@
closing: make(chan none),
closed: make(chan none),
}
- go withRecover(om.mainLoop)
+ if conf.Consumer.Offsets.AutoCommit.Enable {
+ om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
+ go withRecover(om.mainLoop)
+ }
return om, nil
}
@@ -99,16 +105,20 @@
om.closeOnce.Do(func() {
// exit the mainLoop
close(om.closing)
- <-om.closed
+ if om.conf.Consumer.Offsets.AutoCommit.Enable {
+ <-om.closed
+ }
// mark all POMs as closed
om.asyncClosePOMs()
// flush one last time
- for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
- om.flushToBroker()
- if om.releasePOMs(false) == 0 {
- break
+ if om.conf.Consumer.Offsets.AutoCommit.Enable {
+ for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
+ om.flushToBroker()
+ if om.releasePOMs(false) == 0 {
+ break
+ }
}
}
@@ -225,14 +235,18 @@
for {
select {
case <-om.ticker.C:
- om.flushToBroker()
- om.releasePOMs(false)
+ om.Commit()
case <-om.closing:
return
}
}
}
+func (om *offsetManager) Commit() {
+ om.flushToBroker()
+ om.releasePOMs(false)
+}
+
func (om *offsetManager) flushToBroker() {
req := om.constructRequest()
if req == nil {
@@ -275,7 +289,6 @@
ConsumerID: om.memberID,
ConsumerGroupGeneration: om.generation,
}
-
}
om.pomsLock.RLock()