WIP [VOL-2811] - Incorporate preliminary onu-adapter-go code into opencord repo
- reason "discovery-mibsync-complete" reached (via full MibUpload only, received data won't be stored yet)
- first review comments of patchset #4 considered
(please have a look into our inline-comments in Gerrit to know more about the current state)
- no refactoring done yet
Change-Id: Iac47817f8ce4bd28dd8132f530b0570d57ae99b8
Signed-off-by: Holger Hildebrandt <holger.hildebrandt@adtran.com>
diff --git a/vendor/github.com/bsm/sarama-cluster/offsets.go b/vendor/github.com/bsm/sarama-cluster/offsets.go
new file mode 100644
index 0000000..4223ac5
--- /dev/null
+++ b/vendor/github.com/bsm/sarama-cluster/offsets.go
@@ -0,0 +1,69 @@
+package cluster
+
+import (
+ "sync"
+
+ "github.com/Shopify/sarama"
+)
+
+// OffsetStash allows to accumulate offsets and
+// mark them as processed in a bulk
+type OffsetStash struct {
+ offsets map[topicPartition]offsetInfo
+ mu sync.Mutex
+}
+
+// NewOffsetStash inits a blank stash
+func NewOffsetStash() *OffsetStash {
+ return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
+}
+
+// MarkOffset stashes the provided message offset
+func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
+ s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
+}
+
+// MarkPartitionOffset stashes the offset for the provided topic/partition combination
+func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ key := topicPartition{Topic: topic, Partition: partition}
+ if info := s.offsets[key]; offset >= info.Offset {
+ info.Offset = offset
+ info.Metadata = metadata
+ s.offsets[key] = info
+ }
+}
+
+// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
+// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
+func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ key := topicPartition{Topic: topic, Partition: partition}
+ if info := s.offsets[key]; offset <= info.Offset {
+ info.Offset = offset
+ info.Metadata = metadata
+ s.offsets[key] = info
+ }
+}
+
+// ResetOffset stashes the provided message offset
+// See ResetPartitionOffset for explanation
+func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
+ s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
+}
+
+// Offsets returns the latest stashed offsets by topic-partition
+func (s *OffsetStash) Offsets() map[string]int64 {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ res := make(map[string]int64, len(s.offsets))
+ for tp, info := range s.offsets {
+ res[tp.String()] = info.Offset
+ }
+ return res
+}