[VOL-2193] Create mocks for Kafka Client and Etcd

This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.

Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go b/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go
new file mode 100644
index 0000000..d5c8a88
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go
@@ -0,0 +1,339 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+	"bytes"
+	"math"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	bolt "go.etcd.io/bbolt"
+	"go.uber.org/zap"
+)
+
+type BatchTx interface {
+	ReadTx
+	UnsafeCreateBucket(name []byte)
+	UnsafePut(bucketName []byte, key []byte, value []byte)
+	UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
+	UnsafeDelete(bucketName []byte, key []byte)
+	// Commit commits a previous tx and begins a new writable one.
+	Commit()
+	// CommitAndStop commits the previous tx and does not create a new one.
+	CommitAndStop()
+}
+
+type batchTx struct {
+	sync.Mutex
+	tx      *bolt.Tx
+	backend *backend
+
+	pending int
+}
+
+func (t *batchTx) Lock() {
+	t.Mutex.Lock()
+}
+
+func (t *batchTx) Unlock() {
+	if t.pending >= t.backend.batchLimit {
+		t.commit(false)
+	}
+	t.Mutex.Unlock()
+}
+
+// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
+// have appropriate semantics in BatchTx interface. Therefore should not be called.
+// TODO: might want to decouple ReadTx and BatchTx
+
+func (t *batchTx) RLock() {
+	panic("unexpected RLock")
+}
+
+func (t *batchTx) RUnlock() {
+	panic("unexpected RUnlock")
+}
+
+func (t *batchTx) UnsafeCreateBucket(name []byte) {
+	_, err := t.tx.CreateBucket(name)
+	if err != nil && err != bolt.ErrBucketExists {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to create a bucket",
+				zap.String("bucket-name", string(name)),
+				zap.Error(err),
+			)
+		} else {
+			plog.Fatalf("cannot create bucket %s (%v)", name, err)
+		}
+	}
+	t.pending++
+}
+
+// UnsafePut must be called holding the lock on the tx.
+func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
+	t.unsafePut(bucketName, key, value, false)
+}
+
+// UnsafeSeqPut must be called holding the lock on the tx.
+func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+	t.unsafePut(bucketName, key, value, true)
+}
+
+func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
+	bucket := t.tx.Bucket(bucketName)
+	if bucket == nil {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to find a bucket",
+				zap.String("bucket-name", string(bucketName)),
+			)
+		} else {
+			plog.Fatalf("bucket %s does not exist", bucketName)
+		}
+	}
+	if seq {
+		// it is useful to increase fill percent when the workloads are mostly append-only.
+		// this can delay the page split and reduce space usage.
+		bucket.FillPercent = 0.9
+	}
+	if err := bucket.Put(key, value); err != nil {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to write to a bucket",
+				zap.String("bucket-name", string(bucketName)),
+				zap.Error(err),
+			)
+		} else {
+			plog.Fatalf("cannot put key into bucket (%v)", err)
+		}
+	}
+	t.pending++
+}
+
+// UnsafeRange must be called holding the lock on the tx.
+func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	bucket := t.tx.Bucket(bucketName)
+	if bucket == nil {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to find a bucket",
+				zap.String("bucket-name", string(bucketName)),
+			)
+		} else {
+			plog.Fatalf("bucket %s does not exist", bucketName)
+		}
+	}
+	return unsafeRange(bucket.Cursor(), key, endKey, limit)
+}
+
+func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
+	if limit <= 0 {
+		limit = math.MaxInt64
+	}
+	var isMatch func(b []byte) bool
+	if len(endKey) > 0 {
+		isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
+	} else {
+		isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
+		limit = 1
+	}
+
+	for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
+		vs = append(vs, cv)
+		keys = append(keys, ck)
+		if limit == int64(len(keys)) {
+			break
+		}
+	}
+	return keys, vs
+}
+
+// UnsafeDelete must be called holding the lock on the tx.
+func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
+	bucket := t.tx.Bucket(bucketName)
+	if bucket == nil {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to find a bucket",
+				zap.String("bucket-name", string(bucketName)),
+			)
+		} else {
+			plog.Fatalf("bucket %s does not exist", bucketName)
+		}
+	}
+	err := bucket.Delete(key)
+	if err != nil {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to delete a key",
+				zap.String("bucket-name", string(bucketName)),
+				zap.Error(err),
+			)
+		} else {
+			plog.Fatalf("cannot delete key from bucket (%v)", err)
+		}
+	}
+	t.pending++
+}
+
+// UnsafeForEach must be called holding the lock on the tx.
+func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+	return unsafeForEach(t.tx, bucketName, visitor)
+}
+
+func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
+	if b := tx.Bucket(bucket); b != nil {
+		return b.ForEach(visitor)
+	}
+	return nil
+}
+
+// Commit commits a previous tx and begins a new writable one.
+func (t *batchTx) Commit() {
+	t.Lock()
+	t.commit(false)
+	t.Unlock()
+}
+
+// CommitAndStop commits the previous tx and does not create a new one.
+func (t *batchTx) CommitAndStop() {
+	t.Lock()
+	t.commit(true)
+	t.Unlock()
+}
+
+func (t *batchTx) safePending() int {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+	return t.pending
+}
+
+func (t *batchTx) commit(stop bool) {
+	// commit the last tx
+	if t.tx != nil {
+		if t.pending == 0 && !stop {
+			return
+		}
+
+		start := time.Now()
+
+		// gofail: var beforeCommit struct{}
+		err := t.tx.Commit()
+		// gofail: var afterCommit struct{}
+
+		rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
+		spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
+		writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
+		commitSec.Observe(time.Since(start).Seconds())
+		atomic.AddInt64(&t.backend.commits, 1)
+
+		t.pending = 0
+		if err != nil {
+			if t.backend.lg != nil {
+				t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
+			} else {
+				plog.Fatalf("cannot commit tx (%s)", err)
+			}
+		}
+	}
+	if !stop {
+		t.tx = t.backend.begin(true)
+	}
+}
+
+type batchTxBuffered struct {
+	batchTx
+	buf txWriteBuffer
+}
+
+func newBatchTxBuffered(backend *backend) *batchTxBuffered {
+	tx := &batchTxBuffered{
+		batchTx: batchTx{backend: backend},
+		buf: txWriteBuffer{
+			txBuffer: txBuffer{make(map[string]*bucketBuffer)},
+			seq:      true,
+		},
+	}
+	tx.Commit()
+	return tx
+}
+
+func (t *batchTxBuffered) Unlock() {
+	if t.pending != 0 {
+		t.backend.readTx.Lock() // blocks txReadBuffer for writing.
+		t.buf.writeback(&t.backend.readTx.buf)
+		t.backend.readTx.Unlock()
+		if t.pending >= t.backend.batchLimit {
+			t.commit(false)
+		}
+	}
+	t.batchTx.Unlock()
+}
+
+func (t *batchTxBuffered) Commit() {
+	t.Lock()
+	t.commit(false)
+	t.Unlock()
+}
+
+func (t *batchTxBuffered) CommitAndStop() {
+	t.Lock()
+	t.commit(true)
+	t.Unlock()
+}
+
+func (t *batchTxBuffered) commit(stop bool) {
+	// all read txs must be closed to acquire boltdb commit rwlock
+	t.backend.readTx.Lock()
+	t.unsafeCommit(stop)
+	t.backend.readTx.Unlock()
+}
+
+func (t *batchTxBuffered) unsafeCommit(stop bool) {
+	if t.backend.readTx.tx != nil {
+		// wait all store read transactions using the current boltdb tx to finish,
+		// then close the boltdb tx
+		go func(tx *bolt.Tx, wg *sync.WaitGroup) {
+			wg.Wait()
+			if err := tx.Rollback(); err != nil {
+				if t.backend.lg != nil {
+					t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
+				} else {
+					plog.Fatalf("cannot rollback tx (%s)", err)
+				}
+			}
+		}(t.backend.readTx.tx, t.backend.readTx.txWg)
+		t.backend.readTx.reset()
+	}
+
+	t.batchTx.commit(stop)
+
+	if !stop {
+		t.backend.readTx.tx = t.backend.begin(false)
+	}
+}
+
+func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
+	t.batchTx.UnsafePut(bucketName, key, value)
+	t.buf.put(bucketName, key, value)
+}
+
+func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+	t.batchTx.UnsafeSeqPut(bucketName, key, value)
+	t.buf.putSeq(bucketName, key, value)
+}