[VOL-1349] EPON ONU adapter (package B)

Change-Id: I609ba349c429bc7e87c74b66bb1121841f9caef6
diff --git a/vendor/github.com/bsm/sarama-cluster/partitions.go b/vendor/github.com/bsm/sarama-cluster/partitions.go
new file mode 100644
index 0000000..bfaa587
--- /dev/null
+++ b/vendor/github.com/bsm/sarama-cluster/partitions.go
@@ -0,0 +1,290 @@
+package cluster
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"github.com/Shopify/sarama"
+)
+
+// PartitionConsumer allows code to consume individual partitions from the cluster.
+//
+// See docs for Consumer.Partitions() for more on how to implement this.
+type PartitionConsumer interface {
+	sarama.PartitionConsumer
+
+	// Topic returns the consumed topic name
+	Topic() string
+
+	// Partition returns the consumed partition
+	Partition() int32
+
+	// InitialOffset returns the offset used for creating the PartitionConsumer instance.
+	// The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
+	InitialOffset() int64
+  
+	// MarkOffset marks the offset of a message as preocessed.
+	MarkOffset(offset int64, metadata string)
+
+	// ResetOffset resets the offset to a previously processed message.
+	ResetOffset(offset int64, metadata string)
+}
+
+type partitionConsumer struct {
+	sarama.PartitionConsumer
+
+	state partitionState
+	mu    sync.Mutex
+
+	topic         string
+	partition     int32
+	initialOffset int64
+
+	closeOnce sync.Once
+	closeErr  error
+
+	dying, dead chan none
+}
+
+func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
+	offset := info.NextOffset(defaultOffset)
+	pcm, err := manager.ConsumePartition(topic, partition, offset)
+
+	// Resume from default offset, if requested offset is out-of-range
+	if err == sarama.ErrOffsetOutOfRange {
+		info.Offset = -1
+		offset = defaultOffset
+		pcm, err = manager.ConsumePartition(topic, partition, offset)
+	}
+	if err != nil {
+		return nil, err
+	}
+
+	return &partitionConsumer{
+		PartitionConsumer: pcm,
+		state:             partitionState{Info: info},
+
+		topic:         topic,
+		partition:     partition,
+		initialOffset: offset,
+
+		dying: make(chan none),
+		dead:  make(chan none),
+	}, nil
+}
+
+// Topic implements PartitionConsumer
+func (c *partitionConsumer) Topic() string { return c.topic }
+
+// Partition implements PartitionConsumer
+func (c *partitionConsumer) Partition() int32 { return c.partition }
+
+// InitialOffset implements PartitionConsumer
+func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset }
+
+// AsyncClose implements PartitionConsumer
+func (c *partitionConsumer) AsyncClose() {
+	c.closeOnce.Do(func() {
+		c.closeErr = c.PartitionConsumer.Close()
+		close(c.dying)
+	})
+}
+
+// Close implements PartitionConsumer
+func (c *partitionConsumer) Close() error {
+	c.AsyncClose()
+	<-c.dead
+	return c.closeErr
+}
+
+func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) {
+	defer close(c.dead)
+
+	for {
+		select {
+		case err, ok := <-c.Errors():
+			if !ok {
+				return
+			}
+			select {
+			case errors <- err:
+			case <-stopper:
+				return
+			case <-c.dying:
+				return
+			}
+		case <-stopper:
+			return
+		case <-c.dying:
+			return
+		}
+	}
+}
+
+func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
+	defer close(c.dead)
+
+	for {
+		select {
+		case msg, ok := <-c.Messages():
+			if !ok {
+				return
+			}
+			select {
+			case messages <- msg:
+			case <-stopper:
+				return
+			case <-c.dying:
+				return
+			}
+		case err, ok := <-c.Errors():
+			if !ok {
+				return
+			}
+			select {
+			case errors <- err:
+			case <-stopper:
+				return
+			case <-c.dying:
+				return
+			}
+		case <-stopper:
+			return
+		case <-c.dying:
+			return
+		}
+	}
+}
+
+func (c *partitionConsumer) getState() partitionState {
+	c.mu.Lock()
+	state := c.state
+	c.mu.Unlock()
+
+	return state
+}
+
+func (c *partitionConsumer) markCommitted(offset int64) {
+	c.mu.Lock()
+	if offset == c.state.Info.Offset {
+		c.state.Dirty = false
+	}
+	c.mu.Unlock()
+}
+
+// MarkOffset implements PartitionConsumer
+func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
+	c.mu.Lock()
+	if next := offset + 1; next > c.state.Info.Offset {
+		c.state.Info.Offset = next
+		c.state.Info.Metadata = metadata
+		c.state.Dirty = true
+	}
+	c.mu.Unlock()
+}
+
+// ResetOffset implements PartitionConsumer
+func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
+	c.mu.Lock()
+	if next := offset + 1; next <= c.state.Info.Offset {
+		c.state.Info.Offset = next
+		c.state.Info.Metadata = metadata
+		c.state.Dirty = true
+	}
+	c.mu.Unlock()
+}
+
+// --------------------------------------------------------------------
+
+type partitionState struct {
+	Info       offsetInfo
+	Dirty      bool
+	LastCommit time.Time
+}
+
+// --------------------------------------------------------------------
+
+type partitionMap struct {
+	data map[topicPartition]*partitionConsumer
+	mu   sync.RWMutex
+}
+
+func newPartitionMap() *partitionMap {
+	return &partitionMap{
+		data: make(map[topicPartition]*partitionConsumer),
+	}
+}
+
+func (m *partitionMap) IsSubscribedTo(topic string) bool {
+	m.mu.RLock()
+	defer m.mu.RUnlock()
+
+	for tp := range m.data {
+		if tp.Topic == topic {
+			return true
+		}
+	}
+	return false
+}
+
+func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer {
+	m.mu.RLock()
+	pc, _ := m.data[topicPartition{topic, partition}]
+	m.mu.RUnlock()
+	return pc
+}
+
+func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) {
+	m.mu.Lock()
+	m.data[topicPartition{topic, partition}] = pc
+	m.mu.Unlock()
+}
+
+func (m *partitionMap) Snapshot() map[topicPartition]partitionState {
+	m.mu.RLock()
+	defer m.mu.RUnlock()
+
+	snap := make(map[topicPartition]partitionState, len(m.data))
+	for tp, pc := range m.data {
+		snap[tp] = pc.getState()
+	}
+	return snap
+}
+
+func (m *partitionMap) Stop() {
+	m.mu.RLock()
+	defer m.mu.RUnlock()
+
+	var wg sync.WaitGroup
+	for tp := range m.data {
+		wg.Add(1)
+		go func(p *partitionConsumer) {
+			_ = p.Close()
+			wg.Done()
+		}(m.data[tp])
+	}
+	wg.Wait()
+}
+
+func (m *partitionMap) Clear() {
+	m.mu.Lock()
+	for tp := range m.data {
+		delete(m.data, tp)
+	}
+	m.mu.Unlock()
+}
+
+func (m *partitionMap) Info() map[string][]int32 {
+	info := make(map[string][]int32)
+	m.mu.RLock()
+	for tp := range m.data {
+		info[tp.Topic] = append(info[tp.Topic], tp.Partition)
+	}
+	m.mu.RUnlock()
+
+	for topic := range info {
+		sort.Sort(int32Slice(info[topic]))
+	}
+	return info
+}