[VOL-2193] Create mocks for Kafka Client and Etcd
This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.
Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go b/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go
new file mode 100644
index 0000000..d5c8a88
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go
@@ -0,0 +1,339 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+ "bytes"
+ "math"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ bolt "go.etcd.io/bbolt"
+ "go.uber.org/zap"
+)
+
+type BatchTx interface {
+ ReadTx
+ UnsafeCreateBucket(name []byte)
+ UnsafePut(bucketName []byte, key []byte, value []byte)
+ UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
+ UnsafeDelete(bucketName []byte, key []byte)
+ // Commit commits a previous tx and begins a new writable one.
+ Commit()
+ // CommitAndStop commits the previous tx and does not create a new one.
+ CommitAndStop()
+}
+
+type batchTx struct {
+ sync.Mutex
+ tx *bolt.Tx
+ backend *backend
+
+ pending int
+}
+
+func (t *batchTx) Lock() {
+ t.Mutex.Lock()
+}
+
+func (t *batchTx) Unlock() {
+ if t.pending >= t.backend.batchLimit {
+ t.commit(false)
+ }
+ t.Mutex.Unlock()
+}
+
+// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
+// have appropriate semantics in BatchTx interface. Therefore should not be called.
+// TODO: might want to decouple ReadTx and BatchTx
+
+func (t *batchTx) RLock() {
+ panic("unexpected RLock")
+}
+
+func (t *batchTx) RUnlock() {
+ panic("unexpected RUnlock")
+}
+
+func (t *batchTx) UnsafeCreateBucket(name []byte) {
+ _, err := t.tx.CreateBucket(name)
+ if err != nil && err != bolt.ErrBucketExists {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to create a bucket",
+ zap.String("bucket-name", string(name)),
+ zap.Error(err),
+ )
+ } else {
+ plog.Fatalf("cannot create bucket %s (%v)", name, err)
+ }
+ }
+ t.pending++
+}
+
+// UnsafePut must be called holding the lock on the tx.
+func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
+ t.unsafePut(bucketName, key, value, false)
+}
+
+// UnsafeSeqPut must be called holding the lock on the tx.
+func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+ t.unsafePut(bucketName, key, value, true)
+}
+
+func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
+ bucket := t.tx.Bucket(bucketName)
+ if bucket == nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to find a bucket",
+ zap.String("bucket-name", string(bucketName)),
+ )
+ } else {
+ plog.Fatalf("bucket %s does not exist", bucketName)
+ }
+ }
+ if seq {
+ // it is useful to increase fill percent when the workloads are mostly append-only.
+ // this can delay the page split and reduce space usage.
+ bucket.FillPercent = 0.9
+ }
+ if err := bucket.Put(key, value); err != nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to write to a bucket",
+ zap.String("bucket-name", string(bucketName)),
+ zap.Error(err),
+ )
+ } else {
+ plog.Fatalf("cannot put key into bucket (%v)", err)
+ }
+ }
+ t.pending++
+}
+
+// UnsafeRange must be called holding the lock on the tx.
+func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+ bucket := t.tx.Bucket(bucketName)
+ if bucket == nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to find a bucket",
+ zap.String("bucket-name", string(bucketName)),
+ )
+ } else {
+ plog.Fatalf("bucket %s does not exist", bucketName)
+ }
+ }
+ return unsafeRange(bucket.Cursor(), key, endKey, limit)
+}
+
+func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
+ if limit <= 0 {
+ limit = math.MaxInt64
+ }
+ var isMatch func(b []byte) bool
+ if len(endKey) > 0 {
+ isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
+ } else {
+ isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
+ limit = 1
+ }
+
+ for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
+ vs = append(vs, cv)
+ keys = append(keys, ck)
+ if limit == int64(len(keys)) {
+ break
+ }
+ }
+ return keys, vs
+}
+
+// UnsafeDelete must be called holding the lock on the tx.
+func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
+ bucket := t.tx.Bucket(bucketName)
+ if bucket == nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to find a bucket",
+ zap.String("bucket-name", string(bucketName)),
+ )
+ } else {
+ plog.Fatalf("bucket %s does not exist", bucketName)
+ }
+ }
+ err := bucket.Delete(key)
+ if err != nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to delete a key",
+ zap.String("bucket-name", string(bucketName)),
+ zap.Error(err),
+ )
+ } else {
+ plog.Fatalf("cannot delete key from bucket (%v)", err)
+ }
+ }
+ t.pending++
+}
+
+// UnsafeForEach must be called holding the lock on the tx.
+func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+ return unsafeForEach(t.tx, bucketName, visitor)
+}
+
+func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
+ if b := tx.Bucket(bucket); b != nil {
+ return b.ForEach(visitor)
+ }
+ return nil
+}
+
+// Commit commits a previous tx and begins a new writable one.
+func (t *batchTx) Commit() {
+ t.Lock()
+ t.commit(false)
+ t.Unlock()
+}
+
+// CommitAndStop commits the previous tx and does not create a new one.
+func (t *batchTx) CommitAndStop() {
+ t.Lock()
+ t.commit(true)
+ t.Unlock()
+}
+
+func (t *batchTx) safePending() int {
+ t.Mutex.Lock()
+ defer t.Mutex.Unlock()
+ return t.pending
+}
+
+func (t *batchTx) commit(stop bool) {
+ // commit the last tx
+ if t.tx != nil {
+ if t.pending == 0 && !stop {
+ return
+ }
+
+ start := time.Now()
+
+ // gofail: var beforeCommit struct{}
+ err := t.tx.Commit()
+ // gofail: var afterCommit struct{}
+
+ rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
+ spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
+ writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
+ commitSec.Observe(time.Since(start).Seconds())
+ atomic.AddInt64(&t.backend.commits, 1)
+
+ t.pending = 0
+ if err != nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
+ } else {
+ plog.Fatalf("cannot commit tx (%s)", err)
+ }
+ }
+ }
+ if !stop {
+ t.tx = t.backend.begin(true)
+ }
+}
+
+type batchTxBuffered struct {
+ batchTx
+ buf txWriteBuffer
+}
+
+func newBatchTxBuffered(backend *backend) *batchTxBuffered {
+ tx := &batchTxBuffered{
+ batchTx: batchTx{backend: backend},
+ buf: txWriteBuffer{
+ txBuffer: txBuffer{make(map[string]*bucketBuffer)},
+ seq: true,
+ },
+ }
+ tx.Commit()
+ return tx
+}
+
+func (t *batchTxBuffered) Unlock() {
+ if t.pending != 0 {
+ t.backend.readTx.Lock() // blocks txReadBuffer for writing.
+ t.buf.writeback(&t.backend.readTx.buf)
+ t.backend.readTx.Unlock()
+ if t.pending >= t.backend.batchLimit {
+ t.commit(false)
+ }
+ }
+ t.batchTx.Unlock()
+}
+
+func (t *batchTxBuffered) Commit() {
+ t.Lock()
+ t.commit(false)
+ t.Unlock()
+}
+
+func (t *batchTxBuffered) CommitAndStop() {
+ t.Lock()
+ t.commit(true)
+ t.Unlock()
+}
+
+func (t *batchTxBuffered) commit(stop bool) {
+ // all read txs must be closed to acquire boltdb commit rwlock
+ t.backend.readTx.Lock()
+ t.unsafeCommit(stop)
+ t.backend.readTx.Unlock()
+}
+
+func (t *batchTxBuffered) unsafeCommit(stop bool) {
+ if t.backend.readTx.tx != nil {
+ // wait all store read transactions using the current boltdb tx to finish,
+ // then close the boltdb tx
+ go func(tx *bolt.Tx, wg *sync.WaitGroup) {
+ wg.Wait()
+ if err := tx.Rollback(); err != nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
+ } else {
+ plog.Fatalf("cannot rollback tx (%s)", err)
+ }
+ }
+ }(t.backend.readTx.tx, t.backend.readTx.txWg)
+ t.backend.readTx.reset()
+ }
+
+ t.batchTx.commit(stop)
+
+ if !stop {
+ t.backend.readTx.tx = t.backend.begin(false)
+ }
+}
+
+func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
+ t.batchTx.UnsafePut(bucketName, key, value)
+ t.buf.put(bucketName, key, value)
+}
+
+func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+ t.batchTx.UnsafeSeqPut(bucketName, key, value)
+ t.buf.putSeq(bucketName, key, value)
+}