[VOL-2193] Create mocks for Kafka Client and Etcd
This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.
Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go b/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go
new file mode 100644
index 0000000..2adb498
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go
@@ -0,0 +1,79 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "encoding/binary"
+ "time"
+
+ "go.uber.org/zap"
+)
+
+func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
+ totalStart := time.Now()
+ defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
+ keyCompactions := 0
+ defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }()
+
+ end := make([]byte, 8)
+ binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
+
+ last := make([]byte, 8+1+8)
+ for {
+ var rev revision
+
+ start := time.Now()
+
+ tx := s.b.BatchTx()
+ tx.Lock()
+ keys, _ := tx.UnsafeRange(keyBucketName, last, end, int64(s.cfg.CompactionBatchLimit))
+ for _, key := range keys {
+ rev = bytesToRev(key)
+ if _, ok := keep[rev]; !ok {
+ tx.UnsafeDelete(keyBucketName, key)
+ }
+ }
+
+ if len(keys) < s.cfg.CompactionBatchLimit {
+ rbytes := make([]byte, 8+1+8)
+ revToBytes(revision{main: compactMainRev}, rbytes)
+ tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
+ tx.Unlock()
+ if s.lg != nil {
+ s.lg.Info(
+ "finished scheduled compaction",
+ zap.Int64("compact-revision", compactMainRev),
+ zap.Duration("took", time.Since(totalStart)),
+ )
+ } else {
+ plog.Infof("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
+ }
+ return true
+ }
+
+ // update last
+ revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
+ tx.Unlock()
+ // Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
+ s.b.ForceCommit()
+ dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
+
+ select {
+ case <-time.After(10 * time.Millisecond):
+ case <-s.stopc:
+ return false
+ }
+ }
+}