[VOL-1386]  This commit add "dep" as the package management tool
for voltha-go.

Change-Id: I52bc4911dd00a441756ec7c30f46d45091f3f90e
diff --git a/vendor/gopkg.in/Shopify/sarama.v1/produce_set.go b/vendor/gopkg.in/Shopify/sarama.v1/produce_set.go
new file mode 100644
index 0000000..4b42d9c
--- /dev/null
+++ b/vendor/gopkg.in/Shopify/sarama.v1/produce_set.go
@@ -0,0 +1,258 @@
+package sarama
+
+import (
+	"encoding/binary"
+	"errors"
+	"time"
+)
+
+type partitionSet struct {
+	msgs          []*ProducerMessage
+	recordsToSend Records
+	bufferBytes   int
+}
+
+type produceSet struct {
+	parent *asyncProducer
+	msgs   map[string]map[int32]*partitionSet
+
+	bufferBytes int
+	bufferCount int
+}
+
+func newProduceSet(parent *asyncProducer) *produceSet {
+	return &produceSet{
+		msgs:   make(map[string]map[int32]*partitionSet),
+		parent: parent,
+	}
+}
+
+func (ps *produceSet) add(msg *ProducerMessage) error {
+	var err error
+	var key, val []byte
+
+	if msg.Key != nil {
+		if key, err = msg.Key.Encode(); err != nil {
+			return err
+		}
+	}
+
+	if msg.Value != nil {
+		if val, err = msg.Value.Encode(); err != nil {
+			return err
+		}
+	}
+
+	timestamp := msg.Timestamp
+	if msg.Timestamp.IsZero() {
+		timestamp = time.Now()
+	}
+
+	partitions := ps.msgs[msg.Topic]
+	if partitions == nil {
+		partitions = make(map[int32]*partitionSet)
+		ps.msgs[msg.Topic] = partitions
+	}
+
+	var size int
+
+	set := partitions[msg.Partition]
+	if set == nil {
+		if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
+			batch := &RecordBatch{
+				FirstTimestamp:   timestamp,
+				Version:          2,
+				Codec:            ps.parent.conf.Producer.Compression,
+				CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
+				ProducerID:       ps.parent.txnmgr.producerID,
+				ProducerEpoch:    ps.parent.txnmgr.producerEpoch,
+			}
+			if ps.parent.conf.Producer.Idempotent {
+				batch.FirstSequence = msg.sequenceNumber
+			}
+			set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
+			size = recordBatchOverhead
+		} else {
+			set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
+		}
+		partitions[msg.Partition] = set
+	}
+	set.msgs = append(set.msgs, msg)
+
+	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
+		if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
+			return errors.New("Assertion failed: Message out of sequence added to a batch")
+		}
+		// We are being conservative here to avoid having to prep encode the record
+		size += maximumRecordOverhead
+		rec := &Record{
+			Key:            key,
+			Value:          val,
+			TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),
+		}
+		size += len(key) + len(val)
+		if len(msg.Headers) > 0 {
+			rec.Headers = make([]*RecordHeader, len(msg.Headers))
+			for i := range msg.Headers {
+				rec.Headers[i] = &msg.Headers[i]
+				size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
+			}
+		}
+		set.recordsToSend.RecordBatch.addRecord(rec)
+	} else {
+		msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
+		if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
+			msgToSend.Timestamp = timestamp
+			msgToSend.Version = 1
+		}
+		set.recordsToSend.MsgSet.addMessage(msgToSend)
+		size = producerMessageOverhead + len(key) + len(val)
+	}
+
+	set.bufferBytes += size
+	ps.bufferBytes += size
+	ps.bufferCount++
+
+	return nil
+}
+
+func (ps *produceSet) buildRequest() *ProduceRequest {
+	req := &ProduceRequest{
+		RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
+		Timeout:      int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
+	}
+	if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
+		req.Version = 2
+	}
+	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
+		req.Version = 3
+	}
+
+	for topic, partitionSets := range ps.msgs {
+		for partition, set := range partitionSets {
+			if req.Version >= 3 {
+				// If the API version we're hitting is 3 or greater, we need to calculate
+				// offsets for each record in the batch relative to FirstOffset.
+				// Additionally, we must set LastOffsetDelta to the value of the last offset
+				// in the batch. Since the OffsetDelta of the first record is 0, we know that the
+				// final record of any batch will have an offset of (# of records in batch) - 1.
+				// (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
+				//  under the RecordBatch section for details.)
+				rb := set.recordsToSend.RecordBatch
+				if len(rb.Records) > 0 {
+					rb.LastOffsetDelta = int32(len(rb.Records) - 1)
+					for i, record := range rb.Records {
+						record.OffsetDelta = int64(i)
+					}
+				}
+				req.AddBatch(topic, partition, rb)
+				continue
+			}
+			if ps.parent.conf.Producer.Compression == CompressionNone {
+				req.AddSet(topic, partition, set.recordsToSend.MsgSet)
+			} else {
+				// When compression is enabled, the entire set for each partition is compressed
+				// and sent as the payload of a single fake "message" with the appropriate codec
+				// set and no key. When the server sees a message with a compression codec, it
+				// decompresses the payload and treats the result as its message set.
+
+				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
+					// If our version is 0.10 or later, assign relative offsets
+					// to the inner messages. This lets the broker avoid
+					// recompressing the message set.
+					// (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
+					// for details on relative offsets.)
+					for i, msg := range set.recordsToSend.MsgSet.Messages {
+						msg.Offset = int64(i)
+					}
+				}
+				payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry)
+				if err != nil {
+					Logger.Println(err) // if this happens, it's basically our fault.
+					panic(err)
+				}
+				compMsg := &Message{
+					Codec:            ps.parent.conf.Producer.Compression,
+					CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
+					Key:              nil,
+					Value:            payload,
+					Set:              set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
+				}
+				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
+					compMsg.Version = 1
+					compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp
+				}
+				req.AddMessage(topic, partition, compMsg)
+			}
+		}
+	}
+
+	return req
+}
+
+func (ps *produceSet) eachPartition(cb func(topic string, partition int32, pSet *partitionSet)) {
+	for topic, partitionSet := range ps.msgs {
+		for partition, set := range partitionSet {
+			cb(topic, partition, set)
+		}
+	}
+}
+
+func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage {
+	if ps.msgs[topic] == nil {
+		return nil
+	}
+	set := ps.msgs[topic][partition]
+	if set == nil {
+		return nil
+	}
+	ps.bufferBytes -= set.bufferBytes
+	ps.bufferCount -= len(set.msgs)
+	delete(ps.msgs[topic], partition)
+	return set.msgs
+}
+
+func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
+	version := 1
+	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
+		version = 2
+	}
+
+	switch {
+	// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
+	case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
+		return true
+	// Would we overflow the size-limit of a message-batch for this partition?
+	case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
+		ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
+		return true
+	// Would we overflow simply in number of messages?
+	case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
+		return true
+	default:
+		return false
+	}
+}
+
+func (ps *produceSet) readyToFlush() bool {
+	switch {
+	// If we don't have any messages, nothing else matters
+	case ps.empty():
+		return false
+	// If all three config values are 0, we always flush as-fast-as-possible
+	case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
+		return true
+	// If we've passed the message trigger-point
+	case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
+		return true
+	// If we've passed the byte trigger-point
+	case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
+		return true
+	default:
+		return false
+	}
+}
+
+func (ps *produceSet) empty() bool {
+	return ps.bufferCount == 0
+}