[VOL-2193] Create mocks for Kafka Client and Etcd
This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.
Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/etcdserver/config.go b/vendor/go.etcd.io/etcd/etcdserver/config.go
new file mode 100644
index 0000000..88cd721
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/config.go
@@ -0,0 +1,307 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+import (
+ "context"
+ "fmt"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+
+ "go.etcd.io/etcd/pkg/netutil"
+ "go.etcd.io/etcd/pkg/transport"
+ "go.etcd.io/etcd/pkg/types"
+
+ bolt "go.etcd.io/bbolt"
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+)
+
+// ServerConfig holds the configuration of etcd as taken from the command line or discovery.
+type ServerConfig struct {
+ Name string
+ DiscoveryURL string
+ DiscoveryProxy string
+ ClientURLs types.URLs
+ PeerURLs types.URLs
+ DataDir string
+ // DedicatedWALDir config will make the etcd to write the WAL to the WALDir
+ // rather than the dataDir/member/wal.
+ DedicatedWALDir string
+
+ SnapshotCount uint64
+
+ // SnapshotCatchUpEntries is the number of entries for a slow follower
+ // to catch-up after compacting the raft storage entries.
+ // We expect the follower has a millisecond level latency with the leader.
+ // The max throughput is around 10K. Keep a 5K entries is enough for helping
+ // follower to catch up.
+ // WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries"
+ SnapshotCatchUpEntries uint64
+
+ MaxSnapFiles uint
+ MaxWALFiles uint
+
+ // BackendBatchInterval is the maximum time before commit the backend transaction.
+ BackendBatchInterval time.Duration
+ // BackendBatchLimit is the maximum operations before commit the backend transaction.
+ BackendBatchLimit int
+
+ // BackendFreelistType is the type of the backend boltdb freelist.
+ BackendFreelistType bolt.FreelistType
+
+ InitialPeerURLsMap types.URLsMap
+ InitialClusterToken string
+ NewCluster bool
+ PeerTLSInfo transport.TLSInfo
+
+ CORS map[string]struct{}
+
+ // HostWhitelist lists acceptable hostnames from client requests.
+ // If server is insecure (no TLS), server only accepts requests
+ // whose Host header value exists in this white list.
+ HostWhitelist map[string]struct{}
+
+ TickMs uint
+ ElectionTicks int
+
+ // InitialElectionTickAdvance is true, then local member fast-forwards
+ // election ticks to speed up "initial" leader election trigger. This
+ // benefits the case of larger election ticks. For instance, cross
+ // datacenter deployment may require longer election timeout of 10-second.
+ // If true, local node does not need wait up to 10-second. Instead,
+ // forwards its election ticks to 8-second, and have only 2-second left
+ // before leader election.
+ //
+ // Major assumptions are that:
+ // - cluster has no active leader thus advancing ticks enables faster
+ // leader election, or
+ // - cluster already has an established leader, and rejoining follower
+ // is likely to receive heartbeats from the leader after tick advance
+ // and before election timeout.
+ //
+ // However, when network from leader to rejoining follower is congested,
+ // and the follower does not receive leader heartbeat within left election
+ // ticks, disruptive election has to happen thus affecting cluster
+ // availabilities.
+ //
+ // Disabling this would slow down initial bootstrap process for cross
+ // datacenter deployments. Make your own tradeoffs by configuring
+ // --initial-election-tick-advance at the cost of slow initial bootstrap.
+ //
+ // If single-node, it advances ticks regardless.
+ //
+ // See https://github.com/etcd-io/etcd/issues/9333 for more detail.
+ InitialElectionTickAdvance bool
+
+ BootstrapTimeout time.Duration
+
+ AutoCompactionRetention time.Duration
+ AutoCompactionMode string
+ CompactionBatchLimit int
+ QuotaBackendBytes int64
+ MaxTxnOps uint
+
+ // MaxRequestBytes is the maximum request size to send over raft.
+ MaxRequestBytes uint
+
+ StrictReconfigCheck bool
+
+ // ClientCertAuthEnabled is true when cert has been signed by the client CA.
+ ClientCertAuthEnabled bool
+
+ AuthToken string
+ BcryptCost uint
+
+ // InitialCorruptCheck is true to check data corruption on boot
+ // before serving any peer/client traffic.
+ InitialCorruptCheck bool
+ CorruptCheckTime time.Duration
+
+ // PreVote is true to enable Raft Pre-Vote.
+ PreVote bool
+
+ // Logger logs server-side operations.
+ // If not nil, it disables "capnslog" and uses the given logger.
+ Logger *zap.Logger
+
+ // LoggerConfig is server logger configuration for Raft logger.
+ // Must be either: "LoggerConfig != nil" or "LoggerCore != nil && LoggerWriteSyncer != nil".
+ LoggerConfig *zap.Config
+ // LoggerCore is "zapcore.Core" for raft logger.
+ // Must be either: "LoggerConfig != nil" or "LoggerCore != nil && LoggerWriteSyncer != nil".
+ LoggerCore zapcore.Core
+ LoggerWriteSyncer zapcore.WriteSyncer
+
+ Debug bool
+
+ ForceNewCluster bool
+
+ // EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
+ EnableLeaseCheckpoint bool
+ // LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
+ LeaseCheckpointInterval time.Duration
+
+ EnableGRPCGateway bool
+}
+
+// VerifyBootstrap sanity-checks the initial config for bootstrap case
+// and returns an error for things that should never happen.
+func (c *ServerConfig) VerifyBootstrap() error {
+ if err := c.hasLocalMember(); err != nil {
+ return err
+ }
+ if err := c.advertiseMatchesCluster(); err != nil {
+ return err
+ }
+ if checkDuplicateURL(c.InitialPeerURLsMap) {
+ return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap)
+ }
+ if c.InitialPeerURLsMap.String() == "" && c.DiscoveryURL == "" {
+ return fmt.Errorf("initial cluster unset and no discovery URL found")
+ }
+ return nil
+}
+
+// VerifyJoinExisting sanity-checks the initial config for join existing cluster
+// case and returns an error for things that should never happen.
+func (c *ServerConfig) VerifyJoinExisting() error {
+ // The member has announced its peer urls to the cluster before starting; no need to
+ // set the configuration again.
+ if err := c.hasLocalMember(); err != nil {
+ return err
+ }
+ if checkDuplicateURL(c.InitialPeerURLsMap) {
+ return fmt.Errorf("initial cluster %s has duplicate url", c.InitialPeerURLsMap)
+ }
+ if c.DiscoveryURL != "" {
+ return fmt.Errorf("discovery URL should not be set when joining existing initial cluster")
+ }
+ return nil
+}
+
+// hasLocalMember checks that the cluster at least contains the local server.
+func (c *ServerConfig) hasLocalMember() error {
+ if urls := c.InitialPeerURLsMap[c.Name]; urls == nil {
+ return fmt.Errorf("couldn't find local name %q in the initial cluster configuration", c.Name)
+ }
+ return nil
+}
+
+// advertiseMatchesCluster confirms peer URLs match those in the cluster peer list.
+func (c *ServerConfig) advertiseMatchesCluster() error {
+ urls, apurls := c.InitialPeerURLsMap[c.Name], c.PeerURLs.StringSlice()
+ urls.Sort()
+ sort.Strings(apurls)
+ ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
+ defer cancel()
+ ok, err := netutil.URLStringsEqual(ctx, c.Logger, apurls, urls.StringSlice())
+ if ok {
+ return nil
+ }
+
+ initMap, apMap := make(map[string]struct{}), make(map[string]struct{})
+ for _, url := range c.PeerURLs {
+ apMap[url.String()] = struct{}{}
+ }
+ for _, url := range c.InitialPeerURLsMap[c.Name] {
+ initMap[url.String()] = struct{}{}
+ }
+
+ missing := []string{}
+ for url := range initMap {
+ if _, ok := apMap[url]; !ok {
+ missing = append(missing, url)
+ }
+ }
+ if len(missing) > 0 {
+ for i := range missing {
+ missing[i] = c.Name + "=" + missing[i]
+ }
+ mstr := strings.Join(missing, ",")
+ apStr := strings.Join(apurls, ",")
+ return fmt.Errorf("--initial-cluster has %s but missing from --initial-advertise-peer-urls=%s (%v)", mstr, apStr, err)
+ }
+
+ for url := range apMap {
+ if _, ok := initMap[url]; !ok {
+ missing = append(missing, url)
+ }
+ }
+ if len(missing) > 0 {
+ mstr := strings.Join(missing, ",")
+ umap := types.URLsMap(map[string]types.URLs{c.Name: c.PeerURLs})
+ return fmt.Errorf("--initial-advertise-peer-urls has %s but missing from --initial-cluster=%s", mstr, umap.String())
+ }
+
+ // resolved URLs from "--initial-advertise-peer-urls" and "--initial-cluster" did not match or failed
+ apStr := strings.Join(apurls, ",")
+ umap := types.URLsMap(map[string]types.URLs{c.Name: c.PeerURLs})
+ return fmt.Errorf("failed to resolve %s to match --initial-cluster=%s (%v)", apStr, umap.String(), err)
+}
+
+func (c *ServerConfig) MemberDir() string { return filepath.Join(c.DataDir, "member") }
+
+func (c *ServerConfig) WALDir() string {
+ if c.DedicatedWALDir != "" {
+ return c.DedicatedWALDir
+ }
+ return filepath.Join(c.MemberDir(), "wal")
+}
+
+func (c *ServerConfig) SnapDir() string { return filepath.Join(c.MemberDir(), "snap") }
+
+func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
+
+// ReqTimeout returns timeout for request to finish.
+func (c *ServerConfig) ReqTimeout() time.Duration {
+ // 5s for queue waiting, computation and disk IO delay
+ // + 2 * election timeout for possible leader election
+ return 5*time.Second + 2*time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond
+}
+
+func (c *ServerConfig) electionTimeout() time.Duration {
+ return time.Duration(c.ElectionTicks*int(c.TickMs)) * time.Millisecond
+}
+
+func (c *ServerConfig) peerDialTimeout() time.Duration {
+ // 1s for queue wait and election timeout
+ return time.Second + time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond
+}
+
+func checkDuplicateURL(urlsmap types.URLsMap) bool {
+ um := make(map[string]bool)
+ for _, urls := range urlsmap {
+ for _, url := range urls {
+ u := url.String()
+ if um[u] {
+ return true
+ }
+ um[u] = true
+ }
+ }
+ return false
+}
+
+func (c *ServerConfig) bootstrapTimeout() time.Duration {
+ if c.BootstrapTimeout != 0 {
+ return c.BootstrapTimeout
+ }
+ return time.Second
+}
+
+func (c *ServerConfig) backendPath() string { return filepath.Join(c.SnapDir(), "db") }