[VOL-1386]  This commit add "dep" as the package management tool
for voltha-go.

Change-Id: I52bc4911dd00a441756ec7c30f46d45091f3f90e
diff --git a/vendor/github.com/confluentinc/confluent-kafka-go/kafka/producer.go b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/producer.go
new file mode 100644
index 0000000..7eac912
--- /dev/null
+++ b/vendor/github.com/confluentinc/confluent-kafka-go/kafka/producer.go
@@ -0,0 +1,583 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import (
+	"fmt"
+	"math"
+	"time"
+	"unsafe"
+)
+
+/*
+#include <stdlib.h>
+#include <librdkafka/rdkafka.h>
+#include "glue_rdkafka.h"
+
+
+#ifdef RD_KAFKA_V_HEADERS
+// Convert tmphdrs to chdrs (created by this function).
+// If tmphdr.size == -1: value is considered Null
+//    tmphdr.size == 0:  value is considered empty (ignored)
+//    tmphdr.size > 0:   value is considered non-empty
+//
+// WARNING: The header keys and values will be freed by this function.
+void tmphdrs_to_chdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt,
+                       rd_kafka_headers_t **chdrs) {
+   size_t i;
+
+   *chdrs = rd_kafka_headers_new(tmphdrsCnt);
+
+   for (i = 0 ; i < tmphdrsCnt ; i++) {
+      rd_kafka_header_add(*chdrs,
+                          tmphdrs[i].key, -1,
+                          tmphdrs[i].size == -1 ? NULL :
+                          (tmphdrs[i].size == 0 ? "" : tmphdrs[i].val),
+                          tmphdrs[i].size == -1 ? 0 : tmphdrs[i].size);
+      if (tmphdrs[i].size > 0)
+         free((void *)tmphdrs[i].val);
+      free((void *)tmphdrs[i].key);
+   }
+}
+
+#else
+void free_tmphdrs (tmphdr_t *tmphdrs, size_t tmphdrsCnt) {
+   size_t i;
+   for (i = 0 ; i < tmphdrsCnt ; i++) {
+      if (tmphdrs[i].size > 0)
+         free((void *)tmphdrs[i].val);
+      free((void *)tmphdrs[i].key);
+   }
+}
+#endif
+
+
+rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
+          rd_kafka_topic_t *rkt, int32_t partition,
+          int msgflags,
+          int valIsNull, void *val, size_t val_len,
+          int keyIsNull, void *key, size_t key_len,
+          int64_t timestamp,
+          tmphdr_t *tmphdrs, size_t tmphdrsCnt,
+          uintptr_t cgoid) {
+  void *valp = valIsNull ? NULL : val;
+  void *keyp = keyIsNull ? NULL : key;
+#ifdef RD_KAFKA_V_TIMESTAMP
+rd_kafka_resp_err_t err;
+#ifdef RD_KAFKA_V_HEADERS
+  rd_kafka_headers_t *hdrs = NULL;
+#endif
+#endif
+
+
+  if (tmphdrsCnt > 0) {
+#ifdef RD_KAFKA_V_HEADERS
+     tmphdrs_to_chdrs(tmphdrs, tmphdrsCnt, &hdrs);
+#else
+     free_tmphdrs(tmphdrs, tmphdrsCnt);
+     return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
+#endif
+  }
+
+
+#ifdef RD_KAFKA_V_TIMESTAMP
+  err = rd_kafka_producev(rk,
+        RD_KAFKA_V_RKT(rkt),
+        RD_KAFKA_V_PARTITION(partition),
+        RD_KAFKA_V_MSGFLAGS(msgflags),
+        RD_KAFKA_V_VALUE(valp, val_len),
+        RD_KAFKA_V_KEY(keyp, key_len),
+        RD_KAFKA_V_TIMESTAMP(timestamp),
+#ifdef RD_KAFKA_V_HEADERS
+        RD_KAFKA_V_HEADERS(hdrs),
+#endif
+        RD_KAFKA_V_OPAQUE((void *)cgoid),
+        RD_KAFKA_V_END);
+#ifdef RD_KAFKA_V_HEADERS
+  if (err && hdrs)
+    rd_kafka_headers_destroy(hdrs);
+#endif
+  return err;
+#else
+  if (timestamp)
+      return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED;
+  if (rd_kafka_produce(rkt, partition, msgflags,
+                       valp, val_len,
+                       keyp, key_len,
+                       (void *)cgoid) == -1)
+      return rd_kafka_last_error();
+  else
+      return RD_KAFKA_RESP_ERR_NO_ERROR;
+#endif
+}
+*/
+import "C"
+
+// Producer implements a High-level Apache Kafka Producer instance
+type Producer struct {
+	events         chan Event
+	produceChannel chan *Message
+	handle         handle
+
+	// Terminates the poller() goroutine
+	pollerTermChan chan bool
+}
+
+// String returns a human readable name for a Producer instance
+func (p *Producer) String() string {
+	return p.handle.String()
+}
+
+// get_handle implements the Handle interface
+func (p *Producer) gethandle() *handle {
+	return &p.handle
+}
+
+func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event) error {
+	if msg == nil || msg.TopicPartition.Topic == nil || len(*msg.TopicPartition.Topic) == 0 {
+		return newErrorFromString(ErrInvalidArg, "")
+	}
+
+	crkt := p.handle.getRkt(*msg.TopicPartition.Topic)
+
+	// Three problems:
+	//  1) There's a difference between an empty Value or Key (length 0, proper pointer) and
+	//     a null Value or Key (length 0, null pointer).
+	//  2) we need to be able to send a null Value or Key, but the unsafe.Pointer(&slice[0])
+	//     dereference can't be performed on a nil slice.
+	//  3) cgo's pointer checking requires the unsafe.Pointer(slice..) call to be made
+	//     in the call to the C function.
+	//
+	// Solution:
+	//  Keep track of whether the Value or Key were nil (1), but let the valp and keyp pointers
+	//  point to a 1-byte slice (but the length to send is still 0) so that the dereference (2)
+	//  works.
+	//  Then perform the unsafe.Pointer() on the valp and keyp pointers (which now either point
+	//  to the original msg.Value and msg.Key or to the 1-byte slices) in the call to C (3).
+	//
+	var valp []byte
+	var keyp []byte
+	oneByte := []byte{0}
+	var valIsNull C.int
+	var keyIsNull C.int
+	var valLen int
+	var keyLen int
+
+	if msg.Value == nil {
+		valIsNull = 1
+		valLen = 0
+		valp = oneByte
+	} else {
+		valLen = len(msg.Value)
+		if valLen > 0 {
+			valp = msg.Value
+		} else {
+			valp = oneByte
+		}
+	}
+
+	if msg.Key == nil {
+		keyIsNull = 1
+		keyLen = 0
+		keyp = oneByte
+	} else {
+		keyLen = len(msg.Key)
+		if keyLen > 0 {
+			keyp = msg.Key
+		} else {
+			keyp = oneByte
+		}
+	}
+
+	var cgoid int
+
+	// Per-message state that needs to be retained through the C code:
+	//   delivery channel (if specified)
+	//   message opaque   (if specified)
+	// Since these cant be passed as opaque pointers to the C code,
+	// due to cgo constraints, we add them to a per-producer map for lookup
+	// when the C code triggers the callbacks or events.
+	if deliveryChan != nil || msg.Opaque != nil {
+		cgoid = p.handle.cgoPut(cgoDr{deliveryChan: deliveryChan, opaque: msg.Opaque})
+	}
+
+	var timestamp int64
+	if !msg.Timestamp.IsZero() {
+		timestamp = msg.Timestamp.UnixNano() / 1000000
+	}
+
+	// Convert headers to C-friendly tmphdrs
+	var tmphdrs []C.tmphdr_t
+	tmphdrsCnt := len(msg.Headers)
+
+	if tmphdrsCnt > 0 {
+		tmphdrs = make([]C.tmphdr_t, tmphdrsCnt)
+
+		for n, hdr := range msg.Headers {
+			// Make a copy of the key
+			// to avoid runtime panic with
+			// foreign Go pointers in cgo.
+			tmphdrs[n].key = C.CString(hdr.Key)
+			if hdr.Value != nil {
+				tmphdrs[n].size = C.ssize_t(len(hdr.Value))
+				if tmphdrs[n].size > 0 {
+					// Make a copy of the value
+					// to avoid runtime panic with
+					// foreign Go pointers in cgo.
+					tmphdrs[n].val = C.CBytes(hdr.Value)
+				}
+			} else {
+				// null value
+				tmphdrs[n].size = C.ssize_t(-1)
+			}
+		}
+	} else {
+		// no headers, need a dummy tmphdrs of size 1 to avoid index
+		// out of bounds panic in do_produce() call below.
+		// tmphdrsCnt will be 0.
+		tmphdrs = []C.tmphdr_t{{nil, nil, 0}}
+	}
+
+	cErr := C.do_produce(p.handle.rk, crkt,
+		C.int32_t(msg.TopicPartition.Partition),
+		C.int(msgFlags)|C.RD_KAFKA_MSG_F_COPY,
+		valIsNull, unsafe.Pointer(&valp[0]), C.size_t(valLen),
+		keyIsNull, unsafe.Pointer(&keyp[0]), C.size_t(keyLen),
+		C.int64_t(timestamp),
+		(*C.tmphdr_t)(unsafe.Pointer(&tmphdrs[0])), C.size_t(tmphdrsCnt),
+		(C.uintptr_t)(cgoid))
+	if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
+		if cgoid != 0 {
+			p.handle.cgoGet(cgoid)
+		}
+		return newError(cErr)
+	}
+
+	return nil
+}
+
+// Produce single message.
+// This is an asynchronous call that enqueues the message on the internal
+// transmit queue, thus returning immediately.
+// The delivery report will be sent on the provided deliveryChan if specified,
+// or on the Producer object's Events() channel if not.
+// msg.Timestamp requires librdkafka >= 0.9.4 (else returns ErrNotImplemented),
+// api.version.request=true, and broker >= 0.10.0.0.
+// msg.Headers requires librdkafka >= 0.11.4 (else returns ErrNotImplemented),
+// api.version.request=true, and broker >= 0.11.0.0.
+// Returns an error if message could not be enqueued.
+func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error {
+	return p.produce(msg, 0, deliveryChan)
+}
+
+// Produce a batch of messages.
+// These batches do not relate to the message batches sent to the broker, the latter
+// are collected on the fly internally in librdkafka.
+// WARNING: This is an experimental API.
+// NOTE: timestamps and headers are not supported with this API.
+func (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) error {
+	crkt := p.handle.getRkt(topic)
+
+	cmsgs := make([]C.rd_kafka_message_t, len(msgs))
+	for i, m := range msgs {
+		p.handle.messageToC(m, &cmsgs[i])
+	}
+	r := C.rd_kafka_produce_batch(crkt, C.RD_KAFKA_PARTITION_UA, C.int(msgFlags)|C.RD_KAFKA_MSG_F_FREE,
+		(*C.rd_kafka_message_t)(&cmsgs[0]), C.int(len(msgs)))
+	if r == -1 {
+		return newError(C.rd_kafka_last_error())
+	}
+
+	return nil
+}
+
+// Events returns the Events channel (read)
+func (p *Producer) Events() chan Event {
+	return p.events
+}
+
+// ProduceChannel returns the produce *Message channel (write)
+func (p *Producer) ProduceChannel() chan *Message {
+	return p.produceChannel
+}
+
+// Len returns the number of messages and requests waiting to be transmitted to the broker
+// as well as delivery reports queued for the application.
+// Includes messages on ProduceChannel.
+func (p *Producer) Len() int {
+	return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk))
+}
+
+// Flush and wait for outstanding messages and requests to complete delivery.
+// Includes messages on ProduceChannel.
+// Runs until value reaches zero or on timeoutMs.
+// Returns the number of outstanding events still un-flushed.
+func (p *Producer) Flush(timeoutMs int) int {
+	termChan := make(chan bool) // unused stand-in termChan
+
+	d, _ := time.ParseDuration(fmt.Sprintf("%dms", timeoutMs))
+	tEnd := time.Now().Add(d)
+	for p.Len() > 0 {
+		remain := tEnd.Sub(time.Now()).Seconds()
+		if remain <= 0.0 {
+			return p.Len()
+		}
+
+		p.handle.eventPoll(p.events,
+			int(math.Min(100, remain*1000)), 1000, termChan)
+	}
+
+	return 0
+}
+
+// Close a Producer instance.
+// The Producer object or its channels are no longer usable after this call.
+func (p *Producer) Close() {
+	// Wait for poller() (signaled by closing pollerTermChan)
+	// and channel_producer() (signaled by closing ProduceChannel)
+	close(p.pollerTermChan)
+	close(p.produceChannel)
+	p.handle.waitTerminated(2)
+
+	close(p.events)
+
+	p.handle.cleanup()
+
+	C.rd_kafka_destroy(p.handle.rk)
+}
+
+// NewProducer creates a new high-level Producer instance.
+//
+// conf is a *ConfigMap with standard librdkafka configuration properties, see here:
+//
+//
+//
+//
+//
+// Supported special configuration properties:
+//   go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance).
+//                                     These batches do not relate to Kafka message batches in any way.
+//                                     Note: timestamps and headers are not supported with this interface.
+//   go.delivery.reports (bool, true) - Forward per-message delivery reports to the
+//                                      Events() channel.
+//   go.events.channel.size (int, 1000000) - Events() channel size
+//   go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
+//
+func NewProducer(conf *ConfigMap) (*Producer, error) {
+
+	err := versionCheck()
+	if err != nil {
+		return nil, err
+	}
+
+	p := &Producer{}
+
+	// before we do anything with the configuration, create a copy such that
+	// the original is not mutated.
+	confCopy := conf.clone()
+
+	v, err := confCopy.extract("go.batch.producer", false)
+	if err != nil {
+		return nil, err
+	}
+	batchProducer := v.(bool)
+
+	v, err = confCopy.extract("go.delivery.reports", true)
+	if err != nil {
+		return nil, err
+	}
+	p.handle.fwdDr = v.(bool)
+
+	v, err = confCopy.extract("go.events.channel.size", 1000000)
+	if err != nil {
+		return nil, err
+	}
+	eventsChanSize := v.(int)
+
+	v, err = confCopy.extract("go.produce.channel.size", 1000000)
+	if err != nil {
+		return nil, err
+	}
+	produceChannelSize := v.(int)
+
+	if int(C.rd_kafka_version()) < 0x01000000 {
+		// produce.offset.report is no longer used in librdkafka >= v1.0.0
+		v, _ = confCopy.extract("{topic}.produce.offset.report", nil)
+		if v == nil {
+			// Enable offset reporting by default, unless overriden.
+			confCopy.SetKey("{topic}.produce.offset.report", true)
+		}
+	}
+
+	// Convert ConfigMap to librdkafka conf_t
+	cConf, err := confCopy.convert()
+	if err != nil {
+		return nil, err
+	}
+
+	cErrstr := (*C.char)(C.malloc(C.size_t(256)))
+	defer C.free(unsafe.Pointer(cErrstr))
+
+	C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR)
+
+	// Create librdkafka producer instance
+	p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)
+	if p.handle.rk == nil {
+		return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
+	}
+
+	p.handle.p = p
+	p.handle.setup()
+	p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk)
+	p.events = make(chan Event, eventsChanSize)
+	p.produceChannel = make(chan *Message, produceChannelSize)
+	p.pollerTermChan = make(chan bool)
+
+	go poller(p, p.pollerTermChan)
+
+	// non-batch or batch producer, only one must be used
+	if batchProducer {
+		go channelBatchProducer(p)
+	} else {
+		go channelProducer(p)
+	}
+
+	return p, nil
+}
+
+// channel_producer serves the ProduceChannel channel
+func channelProducer(p *Producer) {
+
+	for m := range p.produceChannel {
+		err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil)
+		if err != nil {
+			m.TopicPartition.Error = err
+			p.events <- m
+		}
+	}
+
+	p.handle.terminatedChan <- "channelProducer"
+}
+
+// channelBatchProducer serves the ProduceChannel channel and attempts to
+// improve cgo performance by using the produceBatch() interface.
+func channelBatchProducer(p *Producer) {
+	var buffered = make(map[string][]*Message)
+	bufferedCnt := 0
+	const batchSize int = 1000000
+	totMsgCnt := 0
+	totBatchCnt := 0
+
+	for m := range p.produceChannel {
+		buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
+		bufferedCnt++
+
+	loop2:
+		for true {
+			select {
+			case m, ok := <-p.produceChannel:
+				if !ok {
+					break loop2
+				}
+				if m == nil {
+					panic("nil message received on ProduceChannel")
+				}
+				if m.TopicPartition.Topic == nil {
+					panic(fmt.Sprintf("message without Topic received on ProduceChannel: %v", m))
+				}
+				buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
+				bufferedCnt++
+				if bufferedCnt >= batchSize {
+					break loop2
+				}
+			default:
+				break loop2
+			}
+		}
+
+		totBatchCnt++
+		totMsgCnt += len(buffered)
+
+		for topic, buffered2 := range buffered {
+			err := p.produceBatch(topic, buffered2, C.RD_KAFKA_MSG_F_BLOCK)
+			if err != nil {
+				for _, m = range buffered2 {
+					m.TopicPartition.Error = err
+					p.events <- m
+				}
+			}
+		}
+
+		buffered = make(map[string][]*Message)
+		bufferedCnt = 0
+	}
+	p.handle.terminatedChan <- "channelBatchProducer"
+}
+
+// poller polls the rd_kafka_t handle for events until signalled for termination
+func poller(p *Producer, termChan chan bool) {
+out:
+	for true {
+		select {
+		case _ = <-termChan:
+			break out
+
+		default:
+			_, term := p.handle.eventPoll(p.events, 100, 1000, termChan)
+			if term {
+				break out
+			}
+			break
+		}
+	}
+
+	p.handle.terminatedChan <- "poller"
+
+}
+
+// GetMetadata queries broker for cluster and topic metadata.
+// If topic is non-nil only information about that topic is returned, else if
+// allTopics is false only information about locally used topics is returned,
+// else information about all topics is returned.
+// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
+func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
+	return getMetadata(p, topic, allTopics, timeoutMs)
+}
+
+// QueryWatermarkOffsets returns the broker's low and high offsets for the given topic
+// and partition.
+func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
+	return queryWatermarkOffsets(p, topic, partition, timeoutMs)
+}
+
+// OffsetsForTimes looks up offsets by timestamp for the given partitions.
+//
+// The returned offset for each partition is the earliest offset whose
+// timestamp is greater than or equal to the given timestamp in the
+// corresponding partition.
+//
+// The timestamps to query are represented as `.Offset` in the `times`
+// argument and the looked up offsets are represented as `.Offset` in the returned
+// `offsets` list.
+//
+// The function will block for at most timeoutMs milliseconds.
+//
+// Duplicate Topic+Partitions are not supported.
+// Per-partition errors may be returned in the `.Error` field.
+func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
+	return offsetsForTimes(p, times, timeoutMs)
+}