[VOL-2193] Create mocks for Kafka Client and Etcd

This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.

Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/compactor.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/compactor.go
new file mode 100644
index 0000000..73a9684
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/compactor.go
@@ -0,0 +1,75 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3compactor
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+
+	"github.com/coreos/pkg/capnslog"
+	"github.com/jonboulle/clockwork"
+	"go.uber.org/zap"
+)
+
+var (
+	plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "compactor")
+)
+
+const (
+	ModePeriodic = "periodic"
+	ModeRevision = "revision"
+)
+
+// Compactor purges old log from the storage periodically.
+type Compactor interface {
+	// Run starts the main loop of the compactor in background.
+	// Use Stop() to halt the loop and release the resource.
+	Run()
+	// Stop halts the main loop of the compactor.
+	Stop()
+	// Pause temporally suspend the compactor not to run compaction. Resume() to unpose.
+	Pause()
+	// Resume restarts the compactor suspended by Pause().
+	Resume()
+}
+
+type Compactable interface {
+	Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
+}
+
+type RevGetter interface {
+	Rev() int64
+}
+
+// New returns a new Compactor based on given "mode".
+func New(
+	lg *zap.Logger,
+	mode string,
+	retention time.Duration,
+	rg RevGetter,
+	c Compactable,
+) (Compactor, error) {
+	switch mode {
+	case ModePeriodic:
+		return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
+	case ModeRevision:
+		return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
+	default:
+		return nil, fmt.Errorf("unsupported compaction mode %s", mode)
+	}
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/doc.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/doc.go
new file mode 100644
index 0000000..bb28046
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package v3compactor implements automated policies for compacting etcd's mvcc storage.
+package v3compactor
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/periodic.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/periodic.go
new file mode 100644
index 0000000..ab64cb7
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/periodic.go
@@ -0,0 +1,217 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3compactor
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+	"go.etcd.io/etcd/mvcc"
+
+	"github.com/jonboulle/clockwork"
+	"go.uber.org/zap"
+)
+
+// Periodic compacts the log by purging revisions older than
+// the configured retention time.
+type Periodic struct {
+	lg     *zap.Logger
+	clock  clockwork.Clock
+	period time.Duration
+
+	rg RevGetter
+	c  Compactable
+
+	revs   []int64
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	// mu protects paused
+	mu     sync.RWMutex
+	paused bool
+}
+
+// newPeriodic creates a new instance of Periodic compactor that purges
+// the log older than h Duration.
+func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
+	pc := &Periodic{
+		lg:     lg,
+		clock:  clock,
+		period: h,
+		rg:     rg,
+		c:      c,
+		revs:   make([]int64, 0),
+	}
+	pc.ctx, pc.cancel = context.WithCancel(context.Background())
+	return pc
+}
+
+/*
+Compaction period 1-hour:
+  1. compute compaction period, which is 1-hour
+  2. record revisions for every 1/10 of 1-hour (6-minute)
+  3. keep recording revisions with no compaction for first 1-hour
+  4. do compact with revs[0]
+	- success? contiue on for-loop and move sliding window; revs = revs[1:]
+	- failure? update revs, and retry after 1/10 of 1-hour (6-minute)
+
+Compaction period 24-hour:
+  1. compute compaction period, which is 1-hour
+  2. record revisions for every 1/10 of 1-hour (6-minute)
+  3. keep recording revisions with no compaction for first 24-hour
+  4. do compact with revs[0]
+	- success? contiue on for-loop and move sliding window; revs = revs[1:]
+	- failure? update revs, and retry after 1/10 of 1-hour (6-minute)
+
+Compaction period 59-min:
+  1. compute compaction period, which is 59-min
+  2. record revisions for every 1/10 of 59-min (5.9-min)
+  3. keep recording revisions with no compaction for first 59-min
+  4. do compact with revs[0]
+	- success? contiue on for-loop and move sliding window; revs = revs[1:]
+	- failure? update revs, and retry after 1/10 of 59-min (5.9-min)
+
+Compaction period 5-sec:
+  1. compute compaction period, which is 5-sec
+  2. record revisions for every 1/10 of 5-sec (0.5-sec)
+  3. keep recording revisions with no compaction for first 5-sec
+  4. do compact with revs[0]
+	- success? contiue on for-loop and move sliding window; revs = revs[1:]
+	- failure? update revs, and retry after 1/10 of 5-sec (0.5-sec)
+*/
+
+// Run runs periodic compactor.
+func (pc *Periodic) Run() {
+	compactInterval := pc.getCompactInterval()
+	retryInterval := pc.getRetryInterval()
+	retentions := pc.getRetentions()
+
+	go func() {
+		lastSuccess := pc.clock.Now()
+		baseInterval := pc.period
+		for {
+			pc.revs = append(pc.revs, pc.rg.Rev())
+			if len(pc.revs) > retentions {
+				pc.revs = pc.revs[1:] // pc.revs[0] is always the rev at pc.period ago
+			}
+
+			select {
+			case <-pc.ctx.Done():
+				return
+			case <-pc.clock.After(retryInterval):
+				pc.mu.Lock()
+				p := pc.paused
+				pc.mu.Unlock()
+				if p {
+					continue
+				}
+			}
+
+			if pc.clock.Now().Sub(lastSuccess) < baseInterval {
+				continue
+			}
+
+			// wait up to initial given period
+			if baseInterval == pc.period {
+				baseInterval = compactInterval
+			}
+			rev := pc.revs[0]
+
+			if pc.lg != nil {
+				pc.lg.Info(
+					"starting auto periodic compaction",
+					zap.Int64("revision", rev),
+					zap.Duration("compact-period", pc.period),
+				)
+			} else {
+				plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, pc.period)
+			}
+			_, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev})
+			if err == nil || err == mvcc.ErrCompacted {
+				if pc.lg != nil {
+					pc.lg.Info(
+						"completed auto periodic compaction",
+						zap.Int64("revision", rev),
+						zap.Duration("compact-period", pc.period),
+						zap.Duration("took", time.Since(lastSuccess)),
+					)
+				} else {
+					plog.Noticef("Finished auto-compaction at revision %d", rev)
+				}
+				lastSuccess = pc.clock.Now()
+			} else {
+				if pc.lg != nil {
+					pc.lg.Warn(
+						"failed auto periodic compaction",
+						zap.Int64("revision", rev),
+						zap.Duration("compact-period", pc.period),
+						zap.Duration("retry-interval", retryInterval),
+						zap.Error(err),
+					)
+				} else {
+					plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
+					plog.Noticef("Retry after %v", retryInterval)
+				}
+			}
+		}
+	}()
+}
+
+// if given compaction period x is <1-hour, compact every x duration.
+// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
+// if given compaction period x is >1-hour, compact every hour.
+// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
+func (pc *Periodic) getCompactInterval() time.Duration {
+	itv := pc.period
+	if itv > time.Hour {
+		itv = time.Hour
+	}
+	return itv
+}
+
+func (pc *Periodic) getRetentions() int {
+	return int(pc.period/pc.getRetryInterval()) + 1
+}
+
+const retryDivisor = 10
+
+func (pc *Periodic) getRetryInterval() time.Duration {
+	itv := pc.period
+	if itv > time.Hour {
+		itv = time.Hour
+	}
+	return itv / retryDivisor
+}
+
+// Stop stops periodic compactor.
+func (pc *Periodic) Stop() {
+	pc.cancel()
+}
+
+// Pause pauses periodic compactor.
+func (pc *Periodic) Pause() {
+	pc.mu.Lock()
+	pc.paused = true
+	pc.mu.Unlock()
+}
+
+// Resume resumes periodic compactor.
+func (pc *Periodic) Resume() {
+	pc.mu.Lock()
+	pc.paused = false
+	pc.mu.Unlock()
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/revision.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/revision.go
new file mode 100644
index 0000000..cf8ac43
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/revision.go
@@ -0,0 +1,143 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3compactor
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+	"go.etcd.io/etcd/mvcc"
+
+	"github.com/jonboulle/clockwork"
+	"go.uber.org/zap"
+)
+
+// Revision compacts the log by purging revisions older than
+// the configured reivison number. Compaction happens every 5 minutes.
+type Revision struct {
+	lg *zap.Logger
+
+	clock     clockwork.Clock
+	retention int64
+
+	rg RevGetter
+	c  Compactable
+
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	mu     sync.Mutex
+	paused bool
+}
+
+// newRevision creates a new instance of Revisonal compactor that purges
+// the log older than retention revisions from the current revision.
+func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
+	rc := &Revision{
+		lg:        lg,
+		clock:     clock,
+		retention: retention,
+		rg:        rg,
+		c:         c,
+	}
+	rc.ctx, rc.cancel = context.WithCancel(context.Background())
+	return rc
+}
+
+const revInterval = 5 * time.Minute
+
+// Run runs revision-based compactor.
+func (rc *Revision) Run() {
+	prev := int64(0)
+	go func() {
+		for {
+			select {
+			case <-rc.ctx.Done():
+				return
+			case <-rc.clock.After(revInterval):
+				rc.mu.Lock()
+				p := rc.paused
+				rc.mu.Unlock()
+				if p {
+					continue
+				}
+			}
+
+			rev := rc.rg.Rev() - rc.retention
+			if rev <= 0 || rev == prev {
+				continue
+			}
+
+			now := time.Now()
+			if rc.lg != nil {
+				rc.lg.Info(
+					"starting auto revision compaction",
+					zap.Int64("revision", rev),
+					zap.Int64("revision-compaction-retention", rc.retention),
+				)
+			} else {
+				plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, rc.retention)
+			}
+			_, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev})
+			if err == nil || err == mvcc.ErrCompacted {
+				prev = rev
+				if rc.lg != nil {
+					rc.lg.Info(
+						"completed auto revision compaction",
+						zap.Int64("revision", rev),
+						zap.Int64("revision-compaction-retention", rc.retention),
+						zap.Duration("took", time.Since(now)),
+					)
+				} else {
+					plog.Noticef("Finished auto-compaction at revision %d", rev)
+				}
+			} else {
+				if rc.lg != nil {
+					rc.lg.Warn(
+						"failed auto revision compaction",
+						zap.Int64("revision", rev),
+						zap.Int64("revision-compaction-retention", rc.retention),
+						zap.Duration("retry-interval", revInterval),
+						zap.Error(err),
+					)
+				} else {
+					plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
+					plog.Noticef("Retry after %v", revInterval)
+				}
+			}
+		}
+	}()
+}
+
+// Stop stops revision-based compactor.
+func (rc *Revision) Stop() {
+	rc.cancel()
+}
+
+// Pause pauses revision-based compactor.
+func (rc *Revision) Pause() {
+	rc.mu.Lock()
+	rc.paused = true
+	rc.mu.Unlock()
+}
+
+// Resume resumes revision-based compactor.
+func (rc *Revision) Resume() {
+	rc.mu.Lock()
+	rc.paused = false
+	rc.mu.Unlock()
+}