[VOL-2193] Create mocks for Kafka Client and Etcd
This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.
Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/compactor.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/compactor.go
new file mode 100644
index 0000000..73a9684
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/compactor.go
@@ -0,0 +1,75 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3compactor
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+
+ "github.com/coreos/pkg/capnslog"
+ "github.com/jonboulle/clockwork"
+ "go.uber.org/zap"
+)
+
+var (
+ plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "compactor")
+)
+
+const (
+ ModePeriodic = "periodic"
+ ModeRevision = "revision"
+)
+
+// Compactor purges old log from the storage periodically.
+type Compactor interface {
+ // Run starts the main loop of the compactor in background.
+ // Use Stop() to halt the loop and release the resource.
+ Run()
+ // Stop halts the main loop of the compactor.
+ Stop()
+ // Pause temporally suspend the compactor not to run compaction. Resume() to unpose.
+ Pause()
+ // Resume restarts the compactor suspended by Pause().
+ Resume()
+}
+
+type Compactable interface {
+ Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
+}
+
+type RevGetter interface {
+ Rev() int64
+}
+
+// New returns a new Compactor based on given "mode".
+func New(
+ lg *zap.Logger,
+ mode string,
+ retention time.Duration,
+ rg RevGetter,
+ c Compactable,
+) (Compactor, error) {
+ switch mode {
+ case ModePeriodic:
+ return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
+ case ModeRevision:
+ return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
+ default:
+ return nil, fmt.Errorf("unsupported compaction mode %s", mode)
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/doc.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/doc.go
new file mode 100644
index 0000000..bb28046
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package v3compactor implements automated policies for compacting etcd's mvcc storage.
+package v3compactor
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/periodic.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/periodic.go
new file mode 100644
index 0000000..ab64cb7
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/periodic.go
@@ -0,0 +1,217 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3compactor
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/mvcc"
+
+ "github.com/jonboulle/clockwork"
+ "go.uber.org/zap"
+)
+
+// Periodic compacts the log by purging revisions older than
+// the configured retention time.
+type Periodic struct {
+ lg *zap.Logger
+ clock clockwork.Clock
+ period time.Duration
+
+ rg RevGetter
+ c Compactable
+
+ revs []int64
+ ctx context.Context
+ cancel context.CancelFunc
+
+ // mu protects paused
+ mu sync.RWMutex
+ paused bool
+}
+
+// newPeriodic creates a new instance of Periodic compactor that purges
+// the log older than h Duration.
+func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
+ pc := &Periodic{
+ lg: lg,
+ clock: clock,
+ period: h,
+ rg: rg,
+ c: c,
+ revs: make([]int64, 0),
+ }
+ pc.ctx, pc.cancel = context.WithCancel(context.Background())
+ return pc
+}
+
+/*
+Compaction period 1-hour:
+ 1. compute compaction period, which is 1-hour
+ 2. record revisions for every 1/10 of 1-hour (6-minute)
+ 3. keep recording revisions with no compaction for first 1-hour
+ 4. do compact with revs[0]
+ - success? contiue on for-loop and move sliding window; revs = revs[1:]
+ - failure? update revs, and retry after 1/10 of 1-hour (6-minute)
+
+Compaction period 24-hour:
+ 1. compute compaction period, which is 1-hour
+ 2. record revisions for every 1/10 of 1-hour (6-minute)
+ 3. keep recording revisions with no compaction for first 24-hour
+ 4. do compact with revs[0]
+ - success? contiue on for-loop and move sliding window; revs = revs[1:]
+ - failure? update revs, and retry after 1/10 of 1-hour (6-minute)
+
+Compaction period 59-min:
+ 1. compute compaction period, which is 59-min
+ 2. record revisions for every 1/10 of 59-min (5.9-min)
+ 3. keep recording revisions with no compaction for first 59-min
+ 4. do compact with revs[0]
+ - success? contiue on for-loop and move sliding window; revs = revs[1:]
+ - failure? update revs, and retry after 1/10 of 59-min (5.9-min)
+
+Compaction period 5-sec:
+ 1. compute compaction period, which is 5-sec
+ 2. record revisions for every 1/10 of 5-sec (0.5-sec)
+ 3. keep recording revisions with no compaction for first 5-sec
+ 4. do compact with revs[0]
+ - success? contiue on for-loop and move sliding window; revs = revs[1:]
+ - failure? update revs, and retry after 1/10 of 5-sec (0.5-sec)
+*/
+
+// Run runs periodic compactor.
+func (pc *Periodic) Run() {
+ compactInterval := pc.getCompactInterval()
+ retryInterval := pc.getRetryInterval()
+ retentions := pc.getRetentions()
+
+ go func() {
+ lastSuccess := pc.clock.Now()
+ baseInterval := pc.period
+ for {
+ pc.revs = append(pc.revs, pc.rg.Rev())
+ if len(pc.revs) > retentions {
+ pc.revs = pc.revs[1:] // pc.revs[0] is always the rev at pc.period ago
+ }
+
+ select {
+ case <-pc.ctx.Done():
+ return
+ case <-pc.clock.After(retryInterval):
+ pc.mu.Lock()
+ p := pc.paused
+ pc.mu.Unlock()
+ if p {
+ continue
+ }
+ }
+
+ if pc.clock.Now().Sub(lastSuccess) < baseInterval {
+ continue
+ }
+
+ // wait up to initial given period
+ if baseInterval == pc.period {
+ baseInterval = compactInterval
+ }
+ rev := pc.revs[0]
+
+ if pc.lg != nil {
+ pc.lg.Info(
+ "starting auto periodic compaction",
+ zap.Int64("revision", rev),
+ zap.Duration("compact-period", pc.period),
+ )
+ } else {
+ plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, pc.period)
+ }
+ _, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev})
+ if err == nil || err == mvcc.ErrCompacted {
+ if pc.lg != nil {
+ pc.lg.Info(
+ "completed auto periodic compaction",
+ zap.Int64("revision", rev),
+ zap.Duration("compact-period", pc.period),
+ zap.Duration("took", time.Since(lastSuccess)),
+ )
+ } else {
+ plog.Noticef("Finished auto-compaction at revision %d", rev)
+ }
+ lastSuccess = pc.clock.Now()
+ } else {
+ if pc.lg != nil {
+ pc.lg.Warn(
+ "failed auto periodic compaction",
+ zap.Int64("revision", rev),
+ zap.Duration("compact-period", pc.period),
+ zap.Duration("retry-interval", retryInterval),
+ zap.Error(err),
+ )
+ } else {
+ plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
+ plog.Noticef("Retry after %v", retryInterval)
+ }
+ }
+ }
+ }()
+}
+
+// if given compaction period x is <1-hour, compact every x duration.
+// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
+// if given compaction period x is >1-hour, compact every hour.
+// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
+func (pc *Periodic) getCompactInterval() time.Duration {
+ itv := pc.period
+ if itv > time.Hour {
+ itv = time.Hour
+ }
+ return itv
+}
+
+func (pc *Periodic) getRetentions() int {
+ return int(pc.period/pc.getRetryInterval()) + 1
+}
+
+const retryDivisor = 10
+
+func (pc *Periodic) getRetryInterval() time.Duration {
+ itv := pc.period
+ if itv > time.Hour {
+ itv = time.Hour
+ }
+ return itv / retryDivisor
+}
+
+// Stop stops periodic compactor.
+func (pc *Periodic) Stop() {
+ pc.cancel()
+}
+
+// Pause pauses periodic compactor.
+func (pc *Periodic) Pause() {
+ pc.mu.Lock()
+ pc.paused = true
+ pc.mu.Unlock()
+}
+
+// Resume resumes periodic compactor.
+func (pc *Periodic) Resume() {
+ pc.mu.Lock()
+ pc.paused = false
+ pc.mu.Unlock()
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/revision.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/revision.go
new file mode 100644
index 0000000..cf8ac43
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3compactor/revision.go
@@ -0,0 +1,143 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v3compactor
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/mvcc"
+
+ "github.com/jonboulle/clockwork"
+ "go.uber.org/zap"
+)
+
+// Revision compacts the log by purging revisions older than
+// the configured reivison number. Compaction happens every 5 minutes.
+type Revision struct {
+ lg *zap.Logger
+
+ clock clockwork.Clock
+ retention int64
+
+ rg RevGetter
+ c Compactable
+
+ ctx context.Context
+ cancel context.CancelFunc
+
+ mu sync.Mutex
+ paused bool
+}
+
+// newRevision creates a new instance of Revisonal compactor that purges
+// the log older than retention revisions from the current revision.
+func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
+ rc := &Revision{
+ lg: lg,
+ clock: clock,
+ retention: retention,
+ rg: rg,
+ c: c,
+ }
+ rc.ctx, rc.cancel = context.WithCancel(context.Background())
+ return rc
+}
+
+const revInterval = 5 * time.Minute
+
+// Run runs revision-based compactor.
+func (rc *Revision) Run() {
+ prev := int64(0)
+ go func() {
+ for {
+ select {
+ case <-rc.ctx.Done():
+ return
+ case <-rc.clock.After(revInterval):
+ rc.mu.Lock()
+ p := rc.paused
+ rc.mu.Unlock()
+ if p {
+ continue
+ }
+ }
+
+ rev := rc.rg.Rev() - rc.retention
+ if rev <= 0 || rev == prev {
+ continue
+ }
+
+ now := time.Now()
+ if rc.lg != nil {
+ rc.lg.Info(
+ "starting auto revision compaction",
+ zap.Int64("revision", rev),
+ zap.Int64("revision-compaction-retention", rc.retention),
+ )
+ } else {
+ plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, rc.retention)
+ }
+ _, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev})
+ if err == nil || err == mvcc.ErrCompacted {
+ prev = rev
+ if rc.lg != nil {
+ rc.lg.Info(
+ "completed auto revision compaction",
+ zap.Int64("revision", rev),
+ zap.Int64("revision-compaction-retention", rc.retention),
+ zap.Duration("took", time.Since(now)),
+ )
+ } else {
+ plog.Noticef("Finished auto-compaction at revision %d", rev)
+ }
+ } else {
+ if rc.lg != nil {
+ rc.lg.Warn(
+ "failed auto revision compaction",
+ zap.Int64("revision", rev),
+ zap.Int64("revision-compaction-retention", rc.retention),
+ zap.Duration("retry-interval", revInterval),
+ zap.Error(err),
+ )
+ } else {
+ plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
+ plog.Noticef("Retry after %v", revInterval)
+ }
+ }
+ }
+ }()
+}
+
+// Stop stops revision-based compactor.
+func (rc *Revision) Stop() {
+ rc.cancel()
+}
+
+// Pause pauses revision-based compactor.
+func (rc *Revision) Pause() {
+ rc.mu.Lock()
+ rc.paused = true
+ rc.mu.Unlock()
+}
+
+// Resume resumes revision-based compactor.
+func (rc *Revision) Resume() {
+ rc.mu.Lock()
+ rc.paused = false
+ rc.mu.Unlock()
+}