[VOL-2193] Create mocks for Kafka Client and Etcd

This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.

Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/mvcc/watchable_store.go b/vendor/go.etcd.io/etcd/mvcc/watchable_store.go
new file mode 100644
index 0000000..3cf491d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/watchable_store.go
@@ -0,0 +1,552 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"sync"
+	"time"
+
+	"go.etcd.io/etcd/lease"
+	"go.etcd.io/etcd/mvcc/backend"
+	"go.etcd.io/etcd/mvcc/mvccpb"
+	"go.uber.org/zap"
+)
+
+// non-const so modifiable by tests
+var (
+	// chanBufLen is the length of the buffered chan
+	// for sending out watched events.
+	// TODO: find a good buf value. 1024 is just a random one that
+	// seems to be reasonable.
+	chanBufLen = 1024
+
+	// maxWatchersPerSync is the number of watchers to sync in a single batch
+	maxWatchersPerSync = 512
+)
+
+type watchable interface {
+	watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
+	progress(w *watcher)
+	rev() int64
+}
+
+type watchableStore struct {
+	*store
+
+	// mu protects watcher groups and batches. It should never be locked
+	// before locking store.mu to avoid deadlock.
+	mu sync.RWMutex
+
+	// victims are watcher batches that were blocked on the watch channel
+	victims []watcherBatch
+	victimc chan struct{}
+
+	// contains all unsynced watchers that needs to sync with events that have happened
+	unsynced watcherGroup
+
+	// contains all synced watchers that are in sync with the progress of the store.
+	// The key of the map is the key that the watcher watches on.
+	synced watcherGroup
+
+	stopc chan struct{}
+	wg    sync.WaitGroup
+}
+
+// cancelFunc updates unsynced and synced maps when running
+// cancel operations.
+type cancelFunc func()
+
+func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
+	return newWatchableStore(lg, b, le, ig, cfg)
+}
+
+func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
+	s := &watchableStore{
+		store:    NewStore(lg, b, le, ig, cfg),
+		victimc:  make(chan struct{}, 1),
+		unsynced: newWatcherGroup(),
+		synced:   newWatcherGroup(),
+		stopc:    make(chan struct{}),
+	}
+	s.store.ReadView = &readView{s}
+	s.store.WriteView = &writeView{s}
+	if s.le != nil {
+		// use this store as the deleter so revokes trigger watch events
+		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
+	}
+	s.wg.Add(2)
+	go s.syncWatchersLoop()
+	go s.syncVictimsLoop()
+	return s
+}
+
+func (s *watchableStore) Close() error {
+	close(s.stopc)
+	s.wg.Wait()
+	return s.store.Close()
+}
+
+func (s *watchableStore) NewWatchStream() WatchStream {
+	watchStreamGauge.Inc()
+	return &watchStream{
+		watchable: s,
+		ch:        make(chan WatchResponse, chanBufLen),
+		cancels:   make(map[WatchID]cancelFunc),
+		watchers:  make(map[WatchID]*watcher),
+	}
+}
+
+func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
+	wa := &watcher{
+		key:    key,
+		end:    end,
+		minRev: startRev,
+		id:     id,
+		ch:     ch,
+		fcs:    fcs,
+	}
+
+	s.mu.Lock()
+	s.revMu.RLock()
+	synced := startRev > s.store.currentRev || startRev == 0
+	if synced {
+		wa.minRev = s.store.currentRev + 1
+		if startRev > wa.minRev {
+			wa.minRev = startRev
+		}
+	}
+	if synced {
+		s.synced.add(wa)
+	} else {
+		slowWatcherGauge.Inc()
+		s.unsynced.add(wa)
+	}
+	s.revMu.RUnlock()
+	s.mu.Unlock()
+
+	watcherGauge.Inc()
+
+	return wa, func() { s.cancelWatcher(wa) }
+}
+
+// cancelWatcher removes references of the watcher from the watchableStore
+func (s *watchableStore) cancelWatcher(wa *watcher) {
+	for {
+		s.mu.Lock()
+		if s.unsynced.delete(wa) {
+			slowWatcherGauge.Dec()
+			break
+		} else if s.synced.delete(wa) {
+			break
+		} else if wa.compacted {
+			break
+		} else if wa.ch == nil {
+			// already canceled (e.g., cancel/close race)
+			break
+		}
+
+		if !wa.victim {
+			panic("watcher not victim but not in watch groups")
+		}
+
+		var victimBatch watcherBatch
+		for _, wb := range s.victims {
+			if wb[wa] != nil {
+				victimBatch = wb
+				break
+			}
+		}
+		if victimBatch != nil {
+			slowWatcherGauge.Dec()
+			delete(victimBatch, wa)
+			break
+		}
+
+		// victim being processed so not accessible; retry
+		s.mu.Unlock()
+		time.Sleep(time.Millisecond)
+	}
+
+	watcherGauge.Dec()
+	wa.ch = nil
+	s.mu.Unlock()
+}
+
+func (s *watchableStore) Restore(b backend.Backend) error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	err := s.store.Restore(b)
+	if err != nil {
+		return err
+	}
+
+	for wa := range s.synced.watchers {
+		wa.restore = true
+		s.unsynced.add(wa)
+	}
+	s.synced = newWatcherGroup()
+	return nil
+}
+
+// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
+func (s *watchableStore) syncWatchersLoop() {
+	defer s.wg.Done()
+
+	for {
+		s.mu.RLock()
+		st := time.Now()
+		lastUnsyncedWatchers := s.unsynced.size()
+		s.mu.RUnlock()
+
+		unsyncedWatchers := 0
+		if lastUnsyncedWatchers > 0 {
+			unsyncedWatchers = s.syncWatchers()
+		}
+		syncDuration := time.Since(st)
+
+		waitDuration := 100 * time.Millisecond
+		// more work pending?
+		if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
+			// be fair to other store operations by yielding time taken
+			waitDuration = syncDuration
+		}
+
+		select {
+		case <-time.After(waitDuration):
+		case <-s.stopc:
+			return
+		}
+	}
+}
+
+// syncVictimsLoop tries to write precomputed watcher responses to
+// watchers that had a blocked watcher channel
+func (s *watchableStore) syncVictimsLoop() {
+	defer s.wg.Done()
+
+	for {
+		for s.moveVictims() != 0 {
+			// try to update all victim watchers
+		}
+		s.mu.RLock()
+		isEmpty := len(s.victims) == 0
+		s.mu.RUnlock()
+
+		var tickc <-chan time.Time
+		if !isEmpty {
+			tickc = time.After(10 * time.Millisecond)
+		}
+
+		select {
+		case <-tickc:
+		case <-s.victimc:
+		case <-s.stopc:
+			return
+		}
+	}
+}
+
+// moveVictims tries to update watches with already pending event data
+func (s *watchableStore) moveVictims() (moved int) {
+	s.mu.Lock()
+	victims := s.victims
+	s.victims = nil
+	s.mu.Unlock()
+
+	var newVictim watcherBatch
+	for _, wb := range victims {
+		// try to send responses again
+		for w, eb := range wb {
+			// watcher has observed the store up to, but not including, w.minRev
+			rev := w.minRev - 1
+			if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
+				pendingEventsGauge.Add(float64(len(eb.evs)))
+			} else {
+				if newVictim == nil {
+					newVictim = make(watcherBatch)
+				}
+				newVictim[w] = eb
+				continue
+			}
+			moved++
+		}
+
+		// assign completed victim watchers to unsync/sync
+		s.mu.Lock()
+		s.store.revMu.RLock()
+		curRev := s.store.currentRev
+		for w, eb := range wb {
+			if newVictim != nil && newVictim[w] != nil {
+				// couldn't send watch response; stays victim
+				continue
+			}
+			w.victim = false
+			if eb.moreRev != 0 {
+				w.minRev = eb.moreRev
+			}
+			if w.minRev <= curRev {
+				s.unsynced.add(w)
+			} else {
+				slowWatcherGauge.Dec()
+				s.synced.add(w)
+			}
+		}
+		s.store.revMu.RUnlock()
+		s.mu.Unlock()
+	}
+
+	if len(newVictim) > 0 {
+		s.mu.Lock()
+		s.victims = append(s.victims, newVictim)
+		s.mu.Unlock()
+	}
+
+	return moved
+}
+
+// syncWatchers syncs unsynced watchers by:
+//	1. choose a set of watchers from the unsynced watcher group
+//	2. iterate over the set to get the minimum revision and remove compacted watchers
+//	3. use minimum revision to get all key-value pairs and send those events to watchers
+//	4. remove synced watchers in set from unsynced group and move to synced group
+func (s *watchableStore) syncWatchers() int {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	if s.unsynced.size() == 0 {
+		return 0
+	}
+
+	s.store.revMu.RLock()
+	defer s.store.revMu.RUnlock()
+
+	// in order to find key-value pairs from unsynced watchers, we need to
+	// find min revision index, and these revisions can be used to
+	// query the backend store of key-value pairs
+	curRev := s.store.currentRev
+	compactionRev := s.store.compactMainRev
+
+	wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
+	minBytes, maxBytes := newRevBytes(), newRevBytes()
+	revToBytes(revision{main: minRev}, minBytes)
+	revToBytes(revision{main: curRev + 1}, maxBytes)
+
+	// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
+	// values are actual key-value pairs in backend.
+	tx := s.store.b.ReadTx()
+	tx.RLock()
+	revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
+	var evs []mvccpb.Event
+	if s.store != nil && s.store.lg != nil {
+		evs = kvsToEvents(s.store.lg, wg, revs, vs)
+	} else {
+		// TODO: remove this in v3.5
+		evs = kvsToEvents(nil, wg, revs, vs)
+	}
+	tx.RUnlock()
+
+	var victims watcherBatch
+	wb := newWatcherBatch(wg, evs)
+	for w := range wg.watchers {
+		w.minRev = curRev + 1
+
+		eb, ok := wb[w]
+		if !ok {
+			// bring un-notified watcher to synced
+			s.synced.add(w)
+			s.unsynced.delete(w)
+			continue
+		}
+
+		if eb.moreRev != 0 {
+			w.minRev = eb.moreRev
+		}
+
+		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
+			pendingEventsGauge.Add(float64(len(eb.evs)))
+		} else {
+			if victims == nil {
+				victims = make(watcherBatch)
+			}
+			w.victim = true
+		}
+
+		if w.victim {
+			victims[w] = eb
+		} else {
+			if eb.moreRev != 0 {
+				// stay unsynced; more to read
+				continue
+			}
+			s.synced.add(w)
+		}
+		s.unsynced.delete(w)
+	}
+	s.addVictim(victims)
+
+	vsz := 0
+	for _, v := range s.victims {
+		vsz += len(v)
+	}
+	slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
+
+	return s.unsynced.size()
+}
+
+// kvsToEvents gets all events for the watchers from all key-value pairs
+func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
+	for i, v := range vals {
+		var kv mvccpb.KeyValue
+		if err := kv.Unmarshal(v); err != nil {
+			if lg != nil {
+				lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
+			} else {
+				plog.Panicf("cannot unmarshal event: %v", err)
+			}
+		}
+
+		if !wg.contains(string(kv.Key)) {
+			continue
+		}
+
+		ty := mvccpb.PUT
+		if isTombstone(revs[i]) {
+			ty = mvccpb.DELETE
+			// patch in mod revision so watchers won't skip
+			kv.ModRevision = bytesToRev(revs[i]).main
+		}
+		evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
+	}
+	return evs
+}
+
+// notify notifies the fact that given event at the given rev just happened to
+// watchers that watch on the key of the event.
+func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
+	var victim watcherBatch
+	for w, eb := range newWatcherBatch(&s.synced, evs) {
+		if eb.revs != 1 {
+			if s.store != nil && s.store.lg != nil {
+				s.store.lg.Panic(
+					"unexpected multiple revisions in watch notification",
+					zap.Int("number-of-revisions", eb.revs),
+				)
+			} else {
+				plog.Panicf("unexpected multiple revisions in notification")
+			}
+		}
+		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
+			pendingEventsGauge.Add(float64(len(eb.evs)))
+		} else {
+			// move slow watcher to victims
+			w.minRev = rev + 1
+			if victim == nil {
+				victim = make(watcherBatch)
+			}
+			w.victim = true
+			victim[w] = eb
+			s.synced.delete(w)
+			slowWatcherGauge.Inc()
+		}
+	}
+	s.addVictim(victim)
+}
+
+func (s *watchableStore) addVictim(victim watcherBatch) {
+	if victim == nil {
+		return
+	}
+	s.victims = append(s.victims, victim)
+	select {
+	case s.victimc <- struct{}{}:
+	default:
+	}
+}
+
+func (s *watchableStore) rev() int64 { return s.store.Rev() }
+
+func (s *watchableStore) progress(w *watcher) {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+
+	if _, ok := s.synced.watchers[w]; ok {
+		w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
+		// If the ch is full, this watcher is receiving events.
+		// We do not need to send progress at all.
+	}
+}
+
+type watcher struct {
+	// the watcher key
+	key []byte
+	// end indicates the end of the range to watch.
+	// If end is set, the watcher is on a range.
+	end []byte
+
+	// victim is set when ch is blocked and undergoing victim processing
+	victim bool
+
+	// compacted is set when the watcher is removed because of compaction
+	compacted bool
+
+	// restore is true when the watcher is being restored from leader snapshot
+	// which means that this watcher has just been moved from "synced" to "unsynced"
+	// watcher group, possibly with a future revision when it was first added
+	// to the synced watcher
+	// "unsynced" watcher revision must always be <= current revision,
+	// except when the watcher were to be moved from "synced" watcher group
+	restore bool
+
+	// minRev is the minimum revision update the watcher will accept
+	minRev int64
+	id     WatchID
+
+	fcs []FilterFunc
+	// a chan to send out the watch response.
+	// The chan might be shared with other watchers.
+	ch chan<- WatchResponse
+}
+
+func (w *watcher) send(wr WatchResponse) bool {
+	progressEvent := len(wr.Events) == 0
+
+	if len(w.fcs) != 0 {
+		ne := make([]mvccpb.Event, 0, len(wr.Events))
+		for i := range wr.Events {
+			filtered := false
+			for _, filter := range w.fcs {
+				if filter(wr.Events[i]) {
+					filtered = true
+					break
+				}
+			}
+			if !filtered {
+				ne = append(ne, wr.Events[i])
+			}
+		}
+		wr.Events = ne
+	}
+
+	// if all events are filtered out, we should send nothing.
+	if !progressEvent && len(wr.Events) == 0 {
+		return true
+	}
+	select {
+	case w.ch <- wr:
+		return true
+	default:
+		return false
+	}
+}