[VOL-2193] Create mocks for Kafka Client and Etcd

This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.

Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/etcdserver/config.go b/vendor/go.etcd.io/etcd/etcdserver/config.go
new file mode 100644
index 0000000..88cd721
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/config.go
@@ -0,0 +1,307 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package etcdserver
+import (
+	"context"
+	"fmt"
+	"path/filepath"
+	"sort"
+	"strings"
+	"time"
+	"go.etcd.io/etcd/pkg/netutil"
+	"go.etcd.io/etcd/pkg/transport"
+	"go.etcd.io/etcd/pkg/types"
+	bolt "go.etcd.io/bbolt"
+	"go.uber.org/zap"
+	"go.uber.org/zap/zapcore"
+// ServerConfig holds the configuration of etcd as taken from the command line or discovery.
+type ServerConfig struct {
+	Name           string
+	DiscoveryURL   string
+	DiscoveryProxy string
+	ClientURLs     types.URLs
+	PeerURLs       types.URLs
+	DataDir        string
+	// DedicatedWALDir config will make the etcd to write the WAL to the WALDir
+	// rather than the dataDir/member/wal.
+	DedicatedWALDir string
+	SnapshotCount uint64
+	// SnapshotCatchUpEntries is the number of entries for a slow follower
+	// to catch-up after compacting the raft storage entries.
+	// We expect the follower has a millisecond level latency with the leader.
+	// The max throughput is around 10K. Keep a 5K entries is enough for helping
+	// follower to catch up.
+	// WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries"
+	SnapshotCatchUpEntries uint64
+	MaxSnapFiles uint
+	MaxWALFiles  uint
+	// BackendBatchInterval is the maximum time before commit the backend transaction.
+	BackendBatchInterval time.Duration
+	// BackendBatchLimit is the maximum operations before commit the backend transaction.
+	BackendBatchLimit int
+	// BackendFreelistType is the type of the backend boltdb freelist.
+	BackendFreelistType bolt.FreelistType
+	InitialPeerURLsMap  types.URLsMap
+	InitialClusterToken string
+	NewCluster          bool
+	PeerTLSInfo         transport.TLSInfo
+	CORS map[string]struct{}
+	// HostWhitelist lists acceptable hostnames from client requests.
+	// If server is insecure (no TLS), server only accepts requests
+	// whose Host header value exists in this white list.
+	HostWhitelist map[string]struct{}
+	TickMs        uint
+	ElectionTicks int
+	// InitialElectionTickAdvance is true, then local member fast-forwards
+	// election ticks to speed up "initial" leader election trigger. This
+	// benefits the case of larger election ticks. For instance, cross
+	// datacenter deployment may require longer election timeout of 10-second.
+	// If true, local node does not need wait up to 10-second. Instead,
+	// forwards its election ticks to 8-second, and have only 2-second left
+	// before leader election.
+	//
+	// Major assumptions are that:
+	//  - cluster has no active leader thus advancing ticks enables faster
+	//    leader election, or
+	//  - cluster already has an established leader, and rejoining follower
+	//    is likely to receive heartbeats from the leader after tick advance
+	//    and before election timeout.
+	//
+	// However, when network from leader to rejoining follower is congested,
+	// and the follower does not receive leader heartbeat within left election
+	// ticks, disruptive election has to happen thus affecting cluster
+	// availabilities.
+	//
+	// Disabling this would slow down initial bootstrap process for cross
+	// datacenter deployments. Make your own tradeoffs by configuring
+	// --initial-election-tick-advance at the cost of slow initial bootstrap.
+	//
+	// If single-node, it advances ticks regardless.
+	//
+	// See https://github.com/etcd-io/etcd/issues/9333 for more detail.
+	InitialElectionTickAdvance bool
+	BootstrapTimeout time.Duration
+	AutoCompactionRetention time.Duration
+	AutoCompactionMode      string
+	CompactionBatchLimit    int
+	QuotaBackendBytes       int64
+	MaxTxnOps               uint
+	// MaxRequestBytes is the maximum request size to send over raft.
+	MaxRequestBytes uint
+	StrictReconfigCheck bool
+	// ClientCertAuthEnabled is true when cert has been signed by the client CA.
+	ClientCertAuthEnabled bool
+	AuthToken  string
+	BcryptCost uint
+	// InitialCorruptCheck is true to check data corruption on boot
+	// before serving any peer/client traffic.
+	InitialCorruptCheck bool
+	CorruptCheckTime    time.Duration
+	// PreVote is true to enable Raft Pre-Vote.
+	PreVote bool
+	// Logger logs server-side operations.
+	// If not nil, it disables "capnslog" and uses the given logger.
+	Logger *zap.Logger
+	// LoggerConfig is server logger configuration for Raft logger.
+	// Must be either: "LoggerConfig != nil" or "LoggerCore != nil && LoggerWriteSyncer != nil".
+	LoggerConfig *zap.Config
+	// LoggerCore is "zapcore.Core" for raft logger.
+	// Must be either: "LoggerConfig != nil" or "LoggerCore != nil && LoggerWriteSyncer != nil".
+	LoggerCore        zapcore.Core
+	LoggerWriteSyncer zapcore.WriteSyncer
+	Debug bool
+	ForceNewCluster bool
+	// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
+	EnableLeaseCheckpoint bool
+	// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
+	LeaseCheckpointInterval time.Duration
+	EnableGRPCGateway bool
+// VerifyBootstrap sanity-checks the initial config for bootstrap case
+// and returns an error for things that should never happen.
+func (c *ServerConfig) VerifyBootstrap() error {
+	if err := c.hasLocalMember(); err != nil {
+		return err
+	}
+	if err := c.advertiseMatchesCluster(); err != nil {
+		return err
+	}
+	if checkDuplicateURL(c.InitialPeerURLsMap) {
+		return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap)
+	}
+	if c.InitialPeerURLsMap.String() == "" && c.DiscoveryURL == "" {
+		return fmt.Errorf("initial cluster unset and no discovery URL found")
+	}
+	return nil
+// VerifyJoinExisting sanity-checks the initial config for join existing cluster
+// case and returns an error for things that should never happen.
+func (c *ServerConfig) VerifyJoinExisting() error {
+	// The member has announced its peer urls to the cluster before starting; no need to
+	// set the configuration again.
+	if err := c.hasLocalMember(); err != nil {
+		return err
+	}
+	if checkDuplicateURL(c.InitialPeerURLsMap) {
+		return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap)
+	}
+	if c.DiscoveryURL != "" {
+		return fmt.Errorf("discovery URL should not be set when joining existing initial cluster")
+	}
+	return nil
+// hasLocalMember checks that the cluster at least contains the local server.
+func (c *ServerConfig) hasLocalMember() error {
+	if urls := c.InitialPeerURLsMap[c.Name]; urls == nil {
+		return fmt.Errorf("couldn't find local name %q in the initial cluster configuration", c.Name)
+	}
+	return nil
+// advertiseMatchesCluster confirms peer URLs match those in the cluster peer list.
+func (c *ServerConfig) advertiseMatchesCluster() error {
+	urls, apurls := c.InitialPeerURLsMap[c.Name], c.PeerURLs.StringSlice()
+	urls.Sort()
+	sort.Strings(apurls)
+	ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
+	defer cancel()
+	ok, err := netutil.URLStringsEqual(ctx, c.Logger, apurls, urls.StringSlice())
+	if ok {
+		return nil
+	}
+	initMap, apMap := make(map[string]struct{}), make(map[string]struct{})
+	for _, url := range c.PeerURLs {
+		apMap[url.String()] = struct{}{}
+	}
+	for _, url := range c.InitialPeerURLsMap[c.Name] {
+		initMap[url.String()] = struct{}{}
+	}
+	missing := []string{}
+	for url := range initMap {
+		if _, ok := apMap[url]; !ok {
+			missing = append(missing, url)
+		}
+	}
+	if len(missing) > 0 {
+		for i := range missing {
+			missing[i] = c.Name + "=" + missing[i]
+		}
+		mstr := strings.Join(missing, ",")
+		apStr := strings.Join(apurls, ",")
+		return fmt.Errorf("--initial-cluster has %s but missing from --initial-advertise-peer-urls=%s (%v)", mstr, apStr, err)
+	}
+	for url := range apMap {
+		if _, ok := initMap[url]; !ok {
+			missing = append(missing, url)
+		}
+	}
+	if len(missing) > 0 {
+		mstr := strings.Join(missing, ",")
+		umap := types.URLsMap(map[string]types.URLs{c.Name: c.PeerURLs})
+		return fmt.Errorf("--initial-advertise-peer-urls has %s but missing from --initial-cluster=%s", mstr, umap.String())
+	}
+	// resolved URLs from "--initial-advertise-peer-urls" and "--initial-cluster" did not match or failed
+	apStr := strings.Join(apurls, ",")
+	umap := types.URLsMap(map[string]types.URLs{c.Name: c.PeerURLs})
+	return fmt.Errorf("failed to resolve %s to match --initial-cluster=%s (%v)", apStr, umap.String(), err)
+func (c *ServerConfig) MemberDir() string { return filepath.Join(c.DataDir, "member") }
+func (c *ServerConfig) WALDir() string {
+	if c.DedicatedWALDir != "" {
+		return c.DedicatedWALDir
+	}
+	return filepath.Join(c.MemberDir(), "wal")
+func (c *ServerConfig) SnapDir() string { return filepath.Join(c.MemberDir(), "snap") }
+func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
+// ReqTimeout returns timeout for request to finish.
+func (c *ServerConfig) ReqTimeout() time.Duration {
+	// 5s for queue waiting, computation and disk IO delay
+	// + 2 * election timeout for possible leader election
+	return 5*time.Second + 2*time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond
+func (c *ServerConfig) electionTimeout() time.Duration {
+	return time.Duration(c.ElectionTicks*int(c.TickMs)) * time.Millisecond
+func (c *ServerConfig) peerDialTimeout() time.Duration {
+	// 1s for queue wait and election timeout
+	return time.Second + time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond
+func checkDuplicateURL(urlsmap types.URLsMap) bool {
+	um := make(map[string]bool)
+	for _, urls := range urlsmap {
+		for _, url := range urls {
+			u := url.String()
+			if um[u] {
+				return true
+			}
+			um[u] = true
+		}
+	}
+	return false
+func (c *ServerConfig) bootstrapTimeout() time.Duration {
+	if c.BootstrapTimeout != 0 {
+		return c.BootstrapTimeout
+	}
+	return time.Second
+func (c *ServerConfig) backendPath() string { return filepath.Join(c.SnapDir(), "db") }