[VOL-2193] Create mocks for Kafka Client and Etcd

This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.

Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/backend.go b/vendor/go.etcd.io/etcd/mvcc/backend/backend.go
new file mode 100644
index 0000000..bffd749
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/backend.go
@@ -0,0 +1,570 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package backend
+import (
+	"fmt"
+	"hash/crc32"
+	"io"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"sync"
+	"sync/atomic"
+	"time"
+	"github.com/coreos/pkg/capnslog"
+	humanize "github.com/dustin/go-humanize"
+	bolt "go.etcd.io/bbolt"
+	"go.uber.org/zap"
+var (
+	defaultBatchLimit    = 10000
+	defaultBatchInterval = 100 * time.Millisecond
+	defragLimit = 10000
+	// initialMmapSize is the initial size of the mmapped region. Setting this larger than
+	// the potential max db size can prevent writer from blocking reader.
+	// This only works for linux.
+	initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
+	plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc/backend")
+	// minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
+	minSnapshotWarningTimeout = 30 * time.Second
+type Backend interface {
+	// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
+	ReadTx() ReadTx
+	BatchTx() BatchTx
+	// ConcurrentReadTx returns a non-blocking read transaction.
+	ConcurrentReadTx() ReadTx
+	Snapshot() Snapshot
+	Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
+	// Size returns the current size of the backend physically allocated.
+	// The backend can hold DB space that is not utilized at the moment,
+	// since it can conduct pre-allocation or spare unused space for recycling.
+	// Use SizeInUse() instead for the actual DB size.
+	Size() int64
+	// SizeInUse returns the current size of the backend logically in use.
+	// Since the backend can manage free space in a non-byte unit such as
+	// number of pages, the returned value can be not exactly accurate in bytes.
+	SizeInUse() int64
+	// OpenReadTxN returns the number of currently open read transactions in the backend.
+	OpenReadTxN() int64
+	Defrag() error
+	ForceCommit()
+	Close() error
+type Snapshot interface {
+	// Size gets the size of the snapshot.
+	Size() int64
+	// WriteTo writes the snapshot into the given writer.
+	WriteTo(w io.Writer) (n int64, err error)
+	// Close closes the snapshot.
+	Close() error
+type backend struct {
+	// size and commits are used with atomic operations so they must be
+	// 64-bit aligned, otherwise 32-bit tests will crash
+	// size is the number of bytes allocated in the backend
+	size int64
+	// sizeInUse is the number of bytes actually used in the backend
+	sizeInUse int64
+	// commits counts number of commits since start
+	commits int64
+	// openReadTxN is the number of currently open read transactions in the backend
+	openReadTxN int64
+	mu sync.RWMutex
+	db *bolt.DB
+	batchInterval time.Duration
+	batchLimit    int
+	batchTx       *batchTxBuffered
+	readTx *readTx
+	stopc chan struct{}
+	donec chan struct{}
+	lg *zap.Logger
+type BackendConfig struct {
+	// Path is the file path to the backend file.
+	Path string
+	// BatchInterval is the maximum time before flushing the BatchTx.
+	BatchInterval time.Duration
+	// BatchLimit is the maximum puts before flushing the BatchTx.
+	BatchLimit int
+	// BackendFreelistType is the backend boltdb's freelist type.
+	BackendFreelistType bolt.FreelistType
+	// MmapSize is the number of bytes to mmap for the backend.
+	MmapSize uint64
+	// Logger logs backend-side operations.
+	Logger *zap.Logger
+func DefaultBackendConfig() BackendConfig {
+	return BackendConfig{
+		BatchInterval: defaultBatchInterval,
+		BatchLimit:    defaultBatchLimit,
+		MmapSize:      initialMmapSize,
+	}
+func New(bcfg BackendConfig) Backend {
+	return newBackend(bcfg)
+func NewDefaultBackend(path string) Backend {
+	bcfg := DefaultBackendConfig()
+	bcfg.Path = path
+	return newBackend(bcfg)
+func newBackend(bcfg BackendConfig) *backend {
+	bopts := &bolt.Options{}
+	if boltOpenOptions != nil {
+		*bopts = *boltOpenOptions
+	}
+	bopts.InitialMmapSize = bcfg.mmapSize()
+	bopts.FreelistType = bcfg.BackendFreelistType
+	db, err := bolt.Open(bcfg.Path, 0600, bopts)
+	if err != nil {
+		if bcfg.Logger != nil {
+			bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
+		} else {
+			plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
+		}
+	}
+	// In future, may want to make buffering optional for low-concurrency systems
+	// or dynamically swap between buffered/non-buffered depending on workload.
+	b := &backend{
+		db: db,
+		batchInterval: bcfg.BatchInterval,
+		batchLimit:    bcfg.BatchLimit,
+		readTx: &readTx{
+			buf: txReadBuffer{
+				txBuffer: txBuffer{make(map[string]*bucketBuffer)},
+			},
+			buckets: make(map[string]*bolt.Bucket),
+			txWg:    new(sync.WaitGroup),
+		},
+		stopc: make(chan struct{}),
+		donec: make(chan struct{}),
+		lg: bcfg.Logger,
+	}
+	b.batchTx = newBatchTxBuffered(b)
+	go b.run()
+	return b
+// BatchTx returns the current batch tx in coalescer. The tx can be used for read and
+// write operations. The write result can be retrieved within the same tx immediately.
+// The write result is isolated with other txs until the current one get committed.
+func (b *backend) BatchTx() BatchTx {
+	return b.batchTx
+func (b *backend) ReadTx() ReadTx { return b.readTx }
+// ConcurrentReadTx creates and returns a new ReadTx, which:
+// A) creates and keeps a copy of backend.readTx.txReadBuffer,
+// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
+func (b *backend) ConcurrentReadTx() ReadTx {
+	b.readTx.RLock()
+	defer b.readTx.RUnlock()
+	// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
+	b.readTx.txWg.Add(1)
+	// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
+	return &concurrentReadTx{
+		buf:     b.readTx.buf.unsafeCopy(),
+		tx:      b.readTx.tx,
+		txMu:    &b.readTx.txMu,
+		buckets: b.readTx.buckets,
+		txWg:    b.readTx.txWg,
+	}
+// ForceCommit forces the current batching tx to commit.
+func (b *backend) ForceCommit() {
+	b.batchTx.Commit()
+func (b *backend) Snapshot() Snapshot {
+	b.batchTx.Commit()
+	b.mu.RLock()
+	defer b.mu.RUnlock()
+	tx, err := b.db.Begin(false)
+	if err != nil {
+		if b.lg != nil {
+			b.lg.Fatal("failed to begin tx", zap.Error(err))
+		} else {
+			plog.Fatalf("cannot begin tx (%s)", err)
+		}
+	}
+	stopc, donec := make(chan struct{}), make(chan struct{})
+	dbBytes := tx.Size()
+	go func() {
+		defer close(donec)
+		// sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
+		// assuming a min tcp throughput of 100MB/s.
+		var sendRateBytes int64 = 100 * 1024 * 1014
+		warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
+		if warningTimeout < minSnapshotWarningTimeout {
+			warningTimeout = minSnapshotWarningTimeout
+		}
+		start := time.Now()
+		ticker := time.NewTicker(warningTimeout)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-ticker.C:
+				if b.lg != nil {
+					b.lg.Warn(
+						"snapshotting taking too long to transfer",
+						zap.Duration("taking", time.Since(start)),
+						zap.Int64("bytes", dbBytes),
+						zap.String("size", humanize.Bytes(uint64(dbBytes))),
+					)
+				} else {
+					plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start)
+				}
+			case <-stopc:
+				snapshotTransferSec.Observe(time.Since(start).Seconds())
+				return
+			}
+		}
+	}()
+	return &snapshot{tx, stopc, donec}
+type IgnoreKey struct {
+	Bucket string
+	Key    string
+func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
+	h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
+	b.mu.RLock()
+	defer b.mu.RUnlock()
+	err := b.db.View(func(tx *bolt.Tx) error {
+		c := tx.Cursor()
+		for next, _ := c.First(); next != nil; next, _ = c.Next() {
+			b := tx.Bucket(next)
+			if b == nil {
+				return fmt.Errorf("cannot get hash of bucket %s", string(next))
+			}
+			h.Write(next)
+			b.ForEach(func(k, v []byte) error {
+				bk := IgnoreKey{Bucket: string(next), Key: string(k)}
+				if _, ok := ignores[bk]; !ok {
+					h.Write(k)
+					h.Write(v)
+				}
+				return nil
+			})
+		}
+		return nil
+	})
+	if err != nil {
+		return 0, err
+	}
+	return h.Sum32(), nil
+func (b *backend) Size() int64 {
+	return atomic.LoadInt64(&b.size)
+func (b *backend) SizeInUse() int64 {
+	return atomic.LoadInt64(&b.sizeInUse)
+func (b *backend) run() {
+	defer close(b.donec)
+	t := time.NewTimer(b.batchInterval)
+	defer t.Stop()
+	for {
+		select {
+		case <-t.C:
+		case <-b.stopc:
+			b.batchTx.CommitAndStop()
+			return
+		}
+		if b.batchTx.safePending() != 0 {
+			b.batchTx.Commit()
+		}
+		t.Reset(b.batchInterval)
+	}
+func (b *backend) Close() error {
+	close(b.stopc)
+	<-b.donec
+	return b.db.Close()
+// Commits returns total number of commits since start
+func (b *backend) Commits() int64 {
+	return atomic.LoadInt64(&b.commits)
+func (b *backend) Defrag() error {
+	return b.defrag()
+func (b *backend) defrag() error {
+	now := time.Now()
+	// TODO: make this non-blocking?
+	// lock batchTx to ensure nobody is using previous tx, and then
+	// close previous ongoing tx.
+	b.batchTx.Lock()
+	defer b.batchTx.Unlock()
+	// lock database after lock tx to avoid deadlock.
+	b.mu.Lock()
+	defer b.mu.Unlock()
+	// block concurrent read requests while resetting tx
+	b.readTx.Lock()
+	defer b.readTx.Unlock()
+	b.batchTx.unsafeCommit(true)
+	b.batchTx.tx = nil
+	tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
+	if err != nil {
+		return err
+	}
+	dbp := b.db.Path()
+	tdbp := tmpdb.Path()
+	size1, sizeInUse1 := b.Size(), b.SizeInUse()
+	if b.lg != nil {
+		b.lg.Info(
+			"defragmenting",
+			zap.String("path", dbp),
+			zap.Int64("current-db-size-bytes", size1),
+			zap.String("current-db-size", humanize.Bytes(uint64(size1))),
+			zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
+			zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
+		)
+	}
+	err = defragdb(b.db, tmpdb, defragLimit)
+	if err != nil {
+		tmpdb.Close()
+		os.RemoveAll(tmpdb.Path())
+		return err
+	}
+	err = b.db.Close()
+	if err != nil {
+		if b.lg != nil {
+			b.lg.Fatal("failed to close database", zap.Error(err))
+		} else {
+			plog.Fatalf("cannot close database (%s)", err)
+		}
+	}
+	err = tmpdb.Close()
+	if err != nil {
+		if b.lg != nil {
+			b.lg.Fatal("failed to close tmp database", zap.Error(err))
+		} else {
+			plog.Fatalf("cannot close database (%s)", err)
+		}
+	}
+	err = os.Rename(tdbp, dbp)
+	if err != nil {
+		if b.lg != nil {
+			b.lg.Fatal("failed to rename tmp database", zap.Error(err))
+		} else {
+			plog.Fatalf("cannot rename database (%s)", err)
+		}
+	}
+	b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
+	if err != nil {
+		if b.lg != nil {
+			b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
+		} else {
+			plog.Panicf("cannot open database at %s (%v)", dbp, err)
+		}
+	}
+	b.batchTx.tx = b.unsafeBegin(true)
+	b.readTx.reset()
+	b.readTx.tx = b.unsafeBegin(false)
+	size := b.readTx.tx.Size()
+	db := b.readTx.tx.DB()
+	atomic.StoreInt64(&b.size, size)
+	atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
+	took := time.Since(now)
+	defragSec.Observe(took.Seconds())
+	size2, sizeInUse2 := b.Size(), b.SizeInUse()
+	if b.lg != nil {
+		b.lg.Info(
+			"defragmented",
+			zap.String("path", dbp),
+			zap.Int64("current-db-size-bytes-diff", size2-size1),
+			zap.Int64("current-db-size-bytes", size2),
+			zap.String("current-db-size", humanize.Bytes(uint64(size2))),
+			zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
+			zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
+			zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
+			zap.Duration("took", took),
+		)
+	}
+	return nil
+func defragdb(odb, tmpdb *bolt.DB, limit int) error {
+	// open a tx on tmpdb for writes
+	tmptx, err := tmpdb.Begin(true)
+	if err != nil {
+		return err
+	}
+	// open a tx on old db for read
+	tx, err := odb.Begin(false)
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+	c := tx.Cursor()
+	count := 0
+	for next, _ := c.First(); next != nil; next, _ = c.Next() {
+		b := tx.Bucket(next)
+		if b == nil {
+			return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
+		}
+		tmpb, berr := tmptx.CreateBucketIfNotExists(next)
+		if berr != nil {
+			return berr
+		}
+		tmpb.FillPercent = 0.9 // for seq write in for each
+		b.ForEach(func(k, v []byte) error {
+			count++
+			if count > limit {
+				err = tmptx.Commit()
+				if err != nil {
+					return err
+				}
+				tmptx, err = tmpdb.Begin(true)
+				if err != nil {
+					return err
+				}
+				tmpb = tmptx.Bucket(next)
+				tmpb.FillPercent = 0.9 // for seq write in for each
+				count = 0
+			}
+			return tmpb.Put(k, v)
+		})
+	}
+	return tmptx.Commit()
+func (b *backend) begin(write bool) *bolt.Tx {
+	b.mu.RLock()
+	tx := b.unsafeBegin(write)
+	b.mu.RUnlock()
+	size := tx.Size()
+	db := tx.DB()
+	stats := db.Stats()
+	atomic.StoreInt64(&b.size, size)
+	atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
+	atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))
+	return tx
+func (b *backend) unsafeBegin(write bool) *bolt.Tx {
+	tx, err := b.db.Begin(write)
+	if err != nil {
+		if b.lg != nil {
+			b.lg.Fatal("failed to begin tx", zap.Error(err))
+		} else {
+			plog.Fatalf("cannot begin tx (%s)", err)
+		}
+	}
+	return tx
+func (b *backend) OpenReadTxN() int64 {
+	return atomic.LoadInt64(&b.openReadTxN)
+// NewTmpBackend creates a backend implementation for testing.
+func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
+	dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
+	if err != nil {
+		panic(err)
+	}
+	tmpPath := filepath.Join(dir, "database")
+	bcfg := DefaultBackendConfig()
+	bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = tmpPath, batchInterval, batchLimit
+	return newBackend(bcfg), tmpPath
+func NewDefaultTmpBackend() (*backend, string) {
+	return NewTmpBackend(defaultBatchInterval, defaultBatchLimit)
+type snapshot struct {
+	*bolt.Tx
+	stopc chan struct{}
+	donec chan struct{}
+func (s *snapshot) Close() error {
+	close(s.stopc)
+	<-s.donec
+	return s.Tx.Rollback()
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go b/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go
new file mode 100644
index 0000000..d5c8a88
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go
@@ -0,0 +1,339 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package backend
+import (
+	"bytes"
+	"math"
+	"sync"
+	"sync/atomic"
+	"time"
+	bolt "go.etcd.io/bbolt"
+	"go.uber.org/zap"
+type BatchTx interface {
+	ReadTx
+	UnsafeCreateBucket(name []byte)
+	UnsafePut(bucketName []byte, key []byte, value []byte)
+	UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
+	UnsafeDelete(bucketName []byte, key []byte)
+	// Commit commits a previous tx and begins a new writable one.
+	Commit()
+	// CommitAndStop commits the previous tx and does not create a new one.
+	CommitAndStop()
+type batchTx struct {
+	sync.Mutex
+	tx      *bolt.Tx
+	backend *backend
+	pending int
+func (t *batchTx) Lock() {
+	t.Mutex.Lock()
+func (t *batchTx) Unlock() {
+	if t.pending >= t.backend.batchLimit {
+		t.commit(false)
+	}
+	t.Mutex.Unlock()
+// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
+// have appropriate semantics in BatchTx interface. Therefore should not be called.
+// TODO: might want to decouple ReadTx and BatchTx
+func (t *batchTx) RLock() {
+	panic("unexpected RLock")
+func (t *batchTx) RUnlock() {
+	panic("unexpected RUnlock")
+func (t *batchTx) UnsafeCreateBucket(name []byte) {
+	_, err := t.tx.CreateBucket(name)
+	if err != nil && err != bolt.ErrBucketExists {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to create a bucket",
+				zap.String("bucket-name", string(name)),
+				zap.Error(err),
+			)
+		} else {
+			plog.Fatalf("cannot create bucket %s (%v)", name, err)
+		}
+	}
+	t.pending++
+// UnsafePut must be called holding the lock on the tx.
+func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
+	t.unsafePut(bucketName, key, value, false)
+// UnsafeSeqPut must be called holding the lock on the tx.
+func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+	t.unsafePut(bucketName, key, value, true)
+func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
+	bucket := t.tx.Bucket(bucketName)
+	if bucket == nil {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to find a bucket",
+				zap.String("bucket-name", string(bucketName)),
+			)
+		} else {
+			plog.Fatalf("bucket %s does not exist", bucketName)
+		}
+	}
+	if seq {
+		// it is useful to increase fill percent when the workloads are mostly append-only.
+		// this can delay the page split and reduce space usage.
+		bucket.FillPercent = 0.9
+	}
+	if err := bucket.Put(key, value); err != nil {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to write to a bucket",
+				zap.String("bucket-name", string(bucketName)),
+				zap.Error(err),
+			)
+		} else {
+			plog.Fatalf("cannot put key into bucket (%v)", err)
+		}
+	}
+	t.pending++
+// UnsafeRange must be called holding the lock on the tx.
+func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	bucket := t.tx.Bucket(bucketName)
+	if bucket == nil {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to find a bucket",
+				zap.String("bucket-name", string(bucketName)),
+			)
+		} else {
+			plog.Fatalf("bucket %s does not exist", bucketName)
+		}
+	}
+	return unsafeRange(bucket.Cursor(), key, endKey, limit)
+func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
+	if limit <= 0 {
+		limit = math.MaxInt64
+	}
+	var isMatch func(b []byte) bool
+	if len(endKey) > 0 {
+		isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
+	} else {
+		isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
+		limit = 1
+	}
+	for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
+		vs = append(vs, cv)
+		keys = append(keys, ck)
+		if limit == int64(len(keys)) {
+			break
+		}
+	}
+	return keys, vs
+// UnsafeDelete must be called holding the lock on the tx.
+func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
+	bucket := t.tx.Bucket(bucketName)
+	if bucket == nil {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to find a bucket",
+				zap.String("bucket-name", string(bucketName)),
+			)
+		} else {
+			plog.Fatalf("bucket %s does not exist", bucketName)
+		}
+	}
+	err := bucket.Delete(key)
+	if err != nil {
+		if t.backend.lg != nil {
+			t.backend.lg.Fatal(
+				"failed to delete a key",
+				zap.String("bucket-name", string(bucketName)),
+				zap.Error(err),
+			)
+		} else {
+			plog.Fatalf("cannot delete key from bucket (%v)", err)
+		}
+	}
+	t.pending++
+// UnsafeForEach must be called holding the lock on the tx.
+func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+	return unsafeForEach(t.tx, bucketName, visitor)
+func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
+	if b := tx.Bucket(bucket); b != nil {
+		return b.ForEach(visitor)
+	}
+	return nil
+// Commit commits a previous tx and begins a new writable one.
+func (t *batchTx) Commit() {
+	t.Lock()
+	t.commit(false)
+	t.Unlock()
+// CommitAndStop commits the previous tx and does not create a new one.
+func (t *batchTx) CommitAndStop() {
+	t.Lock()
+	t.commit(true)
+	t.Unlock()
+func (t *batchTx) safePending() int {
+	t.Mutex.Lock()
+	defer t.Mutex.Unlock()
+	return t.pending
+func (t *batchTx) commit(stop bool) {
+	// commit the last tx
+	if t.tx != nil {
+		if t.pending == 0 && !stop {
+			return
+		}
+		start := time.Now()
+		// gofail: var beforeCommit struct{}
+		err := t.tx.Commit()
+		// gofail: var afterCommit struct{}
+		rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
+		spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
+		writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
+		commitSec.Observe(time.Since(start).Seconds())
+		atomic.AddInt64(&t.backend.commits, 1)
+		t.pending = 0
+		if err != nil {
+			if t.backend.lg != nil {
+				t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
+			} else {
+				plog.Fatalf("cannot commit tx (%s)", err)
+			}
+		}
+	}
+	if !stop {
+		t.tx = t.backend.begin(true)
+	}
+type batchTxBuffered struct {
+	batchTx
+	buf txWriteBuffer
+func newBatchTxBuffered(backend *backend) *batchTxBuffered {
+	tx := &batchTxBuffered{
+		batchTx: batchTx{backend: backend},
+		buf: txWriteBuffer{
+			txBuffer: txBuffer{make(map[string]*bucketBuffer)},
+			seq:      true,
+		},
+	}
+	tx.Commit()
+	return tx
+func (t *batchTxBuffered) Unlock() {
+	if t.pending != 0 {
+		t.backend.readTx.Lock() // blocks txReadBuffer for writing.
+		t.buf.writeback(&t.backend.readTx.buf)
+		t.backend.readTx.Unlock()
+		if t.pending >= t.backend.batchLimit {
+			t.commit(false)
+		}
+	}
+	t.batchTx.Unlock()
+func (t *batchTxBuffered) Commit() {
+	t.Lock()
+	t.commit(false)
+	t.Unlock()
+func (t *batchTxBuffered) CommitAndStop() {
+	t.Lock()
+	t.commit(true)
+	t.Unlock()
+func (t *batchTxBuffered) commit(stop bool) {
+	// all read txs must be closed to acquire boltdb commit rwlock
+	t.backend.readTx.Lock()
+	t.unsafeCommit(stop)
+	t.backend.readTx.Unlock()
+func (t *batchTxBuffered) unsafeCommit(stop bool) {
+	if t.backend.readTx.tx != nil {
+		// wait all store read transactions using the current boltdb tx to finish,
+		// then close the boltdb tx
+		go func(tx *bolt.Tx, wg *sync.WaitGroup) {
+			wg.Wait()
+			if err := tx.Rollback(); err != nil {
+				if t.backend.lg != nil {
+					t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
+				} else {
+					plog.Fatalf("cannot rollback tx (%s)", err)
+				}
+			}
+		}(t.backend.readTx.tx, t.backend.readTx.txWg)
+		t.backend.readTx.reset()
+	}
+	t.batchTx.commit(stop)
+	if !stop {
+		t.backend.readTx.tx = t.backend.begin(false)
+	}
+func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
+	t.batchTx.UnsafePut(bucketName, key, value)
+	t.buf.put(bucketName, key, value)
+func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+	t.batchTx.UnsafeSeqPut(bucketName, key, value)
+	t.buf.putSeq(bucketName, key, value)
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/config_default.go b/vendor/go.etcd.io/etcd/mvcc/backend/config_default.go
new file mode 100644
index 0000000..f15f030
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/config_default.go
@@ -0,0 +1,23 @@
+// Copyright 2016 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// +build !linux,!windows
+package backend
+import bolt "go.etcd.io/bbolt"
+var boltOpenOptions *bolt.Options
+func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/config_linux.go b/vendor/go.etcd.io/etcd/mvcc/backend/config_linux.go
new file mode 100644
index 0000000..f712671
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/config_linux.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package backend
+import (
+	"syscall"
+	bolt "go.etcd.io/bbolt"
+// syscall.MAP_POPULATE on linux 2.6.23+ does sequential read-ahead
+// which can speed up entire-database read with boltdb. We want to
+// enable MAP_POPULATE for faster key-value store recovery in storage
+// package. If your kernel version is lower than 2.6.23
+// (https://github.com/torvalds/linux/releases/tag/v2.6.23), mmap might
+// silently ignore this flag. Please update your kernel to prevent this.
+var boltOpenOptions = &bolt.Options{
+	MmapFlags:      syscall.MAP_POPULATE,
+	NoFreelistSync: true,
+func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/config_windows.go b/vendor/go.etcd.io/etcd/mvcc/backend/config_windows.go
new file mode 100644
index 0000000..c650059
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/config_windows.go
@@ -0,0 +1,26 @@
+// Copyright 2017 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// +build windows
+package backend
+import bolt "go.etcd.io/bbolt"
+var boltOpenOptions *bolt.Options = nil
+// setting mmap size != 0 on windows will allocate the entire
+// mmap size for the file, instead of growing it. So, force 0.
+func (bcfg *BackendConfig) mmapSize() int { return 0 }
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/doc.go b/vendor/go.etcd.io/etcd/mvcc/backend/doc.go
new file mode 100644
index 0000000..9cc42fa
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Package backend defines a standard interface for etcd's backend MVCC storage.
+package backend
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/metrics.go b/vendor/go.etcd.io/etcd/mvcc/backend/metrics.go
new file mode 100644
index 0000000..d9641af
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/metrics.go
@@ -0,0 +1,95 @@
+// Copyright 2016 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package backend
+import "github.com/prometheus/client_golang/prometheus"
+var (
+	commitSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "disk",
+		Name:      "backend_commit_duration_seconds",
+		Help:      "The latency distributions of commit called by backend.",
+		// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+		// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+		Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+	})
+	rebalanceSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd_debugging",
+		Subsystem: "disk",
+		Name:      "backend_commit_rebalance_duration_seconds",
+		Help:      "The latency distributions of commit.rebalance called by bboltdb backend.",
+		// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+		// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+		Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+	})
+	spillSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd_debugging",
+		Subsystem: "disk",
+		Name:      "backend_commit_spill_duration_seconds",
+		Help:      "The latency distributions of commit.spill called by bboltdb backend.",
+		// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+		// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+		Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+	})
+	writeSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd_debugging",
+		Subsystem: "disk",
+		Name:      "backend_commit_write_duration_seconds",
+		Help:      "The latency distributions of commit.write called by bboltdb backend.",
+		// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+		// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+		Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+	})
+	defragSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "disk",
+		Name:      "backend_defrag_duration_seconds",
+		Help:      "The latency distribution of backend defragmentation.",
+		// 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
+		// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
+		// highest bucket start of 0.1 sec * 2^12 == 409.6 sec
+		Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
+	})
+	snapshotTransferSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "disk",
+		Name:      "backend_snapshot_duration_seconds",
+		Help:      "The latency distribution of backend snapshots.",
+		// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
+		// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
+		Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
+	})
+func init() {
+	prometheus.MustRegister(commitSec)
+	prometheus.MustRegister(rebalanceSec)
+	prometheus.MustRegister(spillSec)
+	prometheus.MustRegister(writeSec)
+	prometheus.MustRegister(defragSec)
+	prometheus.MustRegister(snapshotTransferSec)
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/read_tx.go b/vendor/go.etcd.io/etcd/mvcc/backend/read_tx.go
new file mode 100644
index 0000000..91fe72e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/read_tx.go
@@ -0,0 +1,210 @@
+// Copyright 2017 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package backend
+import (
+	"bytes"
+	"math"
+	"sync"
+	bolt "go.etcd.io/bbolt"
+// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
+// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
+// is known to never overwrite any key so range is safe.
+var safeRangeBucket = []byte("key")
+type ReadTx interface {
+	Lock()
+	Unlock()
+	RLock()
+	RUnlock()
+	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
+	UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
+type readTx struct {
+	// mu protects accesses to the txReadBuffer
+	mu  sync.RWMutex
+	buf txReadBuffer
+	// TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
+	// txMu protects accesses to buckets and tx on Range requests.
+	txMu    sync.RWMutex
+	tx      *bolt.Tx
+	buckets map[string]*bolt.Bucket
+	// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
+	txWg *sync.WaitGroup
+func (rt *readTx) Lock()    { rt.mu.Lock() }
+func (rt *readTx) Unlock()  { rt.mu.Unlock() }
+func (rt *readTx) RLock()   { rt.mu.RLock() }
+func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
+func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	if endKey == nil {
+		// forbid duplicates for single keys
+		limit = 1
+	}
+	if limit <= 0 {
+		limit = math.MaxInt64
+	}
+	if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
+		panic("do not use unsafeRange on non-keys bucket")
+	}
+	keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
+	if int64(len(keys)) == limit {
+		return keys, vals
+	}
+	// find/cache bucket
+	bn := string(bucketName)
+	rt.txMu.RLock()
+	bucket, ok := rt.buckets[bn]
+	rt.txMu.RUnlock()
+	if !ok {
+		rt.txMu.Lock()
+		bucket = rt.tx.Bucket(bucketName)
+		rt.buckets[bn] = bucket
+		rt.txMu.Unlock()
+	}
+	// ignore missing bucket since may have been created in this batch
+	if bucket == nil {
+		return keys, vals
+	}
+	rt.txMu.Lock()
+	c := bucket.Cursor()
+	rt.txMu.Unlock()
+	k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
+	return append(k2, keys...), append(v2, vals...)
+func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+	dups := make(map[string]struct{})
+	getDups := func(k, v []byte) error {
+		dups[string(k)] = struct{}{}
+		return nil
+	}
+	visitNoDup := func(k, v []byte) error {
+		if _, ok := dups[string(k)]; ok {
+			return nil
+		}
+		return visitor(k, v)
+	}
+	if err := rt.buf.ForEach(bucketName, getDups); err != nil {
+		return err
+	}
+	rt.txMu.Lock()
+	err := unsafeForEach(rt.tx, bucketName, visitNoDup)
+	rt.txMu.Unlock()
+	if err != nil {
+		return err
+	}
+	return rt.buf.ForEach(bucketName, visitor)
+func (rt *readTx) reset() {
+	rt.buf.reset()
+	rt.buckets = make(map[string]*bolt.Bucket)
+	rt.tx = nil
+	rt.txWg = new(sync.WaitGroup)
+// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
+type concurrentReadTx struct {
+	buf     txReadBuffer
+	txMu    *sync.RWMutex
+	tx      *bolt.Tx
+	buckets map[string]*bolt.Bucket
+	txWg    *sync.WaitGroup
+func (rt *concurrentReadTx) Lock()   {}
+func (rt *concurrentReadTx) Unlock() {}
+// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
+func (rt *concurrentReadTx) RLock() {}
+// RUnlock signals the end of concurrentReadTx.
+func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
+func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+	dups := make(map[string]struct{})
+	getDups := func(k, v []byte) error {
+		dups[string(k)] = struct{}{}
+		return nil
+	}
+	visitNoDup := func(k, v []byte) error {
+		if _, ok := dups[string(k)]; ok {
+			return nil
+		}
+		return visitor(k, v)
+	}
+	if err := rt.buf.ForEach(bucketName, getDups); err != nil {
+		return err
+	}
+	rt.txMu.Lock()
+	err := unsafeForEach(rt.tx, bucketName, visitNoDup)
+	rt.txMu.Unlock()
+	if err != nil {
+		return err
+	}
+	return rt.buf.ForEach(bucketName, visitor)
+func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	if endKey == nil {
+		// forbid duplicates for single keys
+		limit = 1
+	}
+	if limit <= 0 {
+		limit = math.MaxInt64
+	}
+	if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
+		panic("do not use unsafeRange on non-keys bucket")
+	}
+	keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
+	if int64(len(keys)) == limit {
+		return keys, vals
+	}
+	// find/cache bucket
+	bn := string(bucketName)
+	rt.txMu.RLock()
+	bucket, ok := rt.buckets[bn]
+	rt.txMu.RUnlock()
+	if !ok {
+		rt.txMu.Lock()
+		bucket = rt.tx.Bucket(bucketName)
+		rt.buckets[bn] = bucket
+		rt.txMu.Unlock()
+	}
+	// ignore missing bucket since may have been created in this batch
+	if bucket == nil {
+		return keys, vals
+	}
+	rt.txMu.Lock()
+	c := bucket.Cursor()
+	rt.txMu.Unlock()
+	k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
+	return append(k2, keys...), append(v2, vals...)
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/tx_buffer.go b/vendor/go.etcd.io/etcd/mvcc/backend/tx_buffer.go
new file mode 100644
index 0000000..d734638
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/tx_buffer.go
@@ -0,0 +1,203 @@
+// Copyright 2017 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package backend
+import (
+	"bytes"
+	"sort"
+// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
+type txBuffer struct {
+	buckets map[string]*bucketBuffer
+func (txb *txBuffer) reset() {
+	for k, v := range txb.buckets {
+		if v.used == 0 {
+			// demote
+			delete(txb.buckets, k)
+		}
+		v.used = 0
+	}
+// txWriteBuffer buffers writes of pending updates that have not yet committed.
+type txWriteBuffer struct {
+	txBuffer
+	seq bool
+func (txw *txWriteBuffer) put(bucket, k, v []byte) {
+	txw.seq = false
+	txw.putSeq(bucket, k, v)
+func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
+	b, ok := txw.buckets[string(bucket)]
+	if !ok {
+		b = newBucketBuffer()
+		txw.buckets[string(bucket)] = b
+	}
+	b.add(k, v)
+func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
+	for k, wb := range txw.buckets {
+		rb, ok := txr.buckets[k]
+		if !ok {
+			delete(txw.buckets, k)
+			txr.buckets[k] = wb
+			continue
+		}
+		if !txw.seq && wb.used > 1 {
+			// assume no duplicate keys
+			sort.Sort(wb)
+		}
+		rb.merge(wb)
+	}
+	txw.reset()
+// txReadBuffer accesses buffered updates.
+type txReadBuffer struct{ txBuffer }
+func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+	if b := txr.buckets[string(bucketName)]; b != nil {
+		return b.Range(key, endKey, limit)
+	}
+	return nil, nil
+func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+	if b := txr.buckets[string(bucketName)]; b != nil {
+		return b.ForEach(visitor)
+	}
+	return nil
+// unsafeCopy returns a copy of txReadBuffer, caller should acquire backend.readTx.RLock()
+func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
+	txrCopy := txReadBuffer{
+		txBuffer: txBuffer{
+			buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
+		},
+	}
+	for bucketName, bucket := range txr.txBuffer.buckets {
+		txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
+	}
+	return txrCopy
+type kv struct {
+	key []byte
+	val []byte
+// bucketBuffer buffers key-value pairs that are pending commit.
+type bucketBuffer struct {
+	buf []kv
+	// used tracks number of elements in use so buf can be reused without reallocation.
+	used int
+func newBucketBuffer() *bucketBuffer {
+	return &bucketBuffer{buf: make([]kv, 512), used: 0}
+func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
+	f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
+	idx := sort.Search(bb.used, f)
+	if idx < 0 {
+		return nil, nil
+	}
+	if len(endKey) == 0 {
+		if bytes.Equal(key, bb.buf[idx].key) {
+			keys = append(keys, bb.buf[idx].key)
+			vals = append(vals, bb.buf[idx].val)
+		}
+		return keys, vals
+	}
+	if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
+		return nil, nil
+	}
+	for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
+		if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
+			break
+		}
+		keys = append(keys, bb.buf[i].key)
+		vals = append(vals, bb.buf[i].val)
+	}
+	return keys, vals
+func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error {
+	for i := 0; i < bb.used; i++ {
+		if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil {
+			return err
+		}
+	}
+	return nil
+func (bb *bucketBuffer) add(k, v []byte) {
+	bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
+	bb.used++
+	if bb.used == len(bb.buf) {
+		buf := make([]kv, (3*len(bb.buf))/2)
+		copy(buf, bb.buf)
+		bb.buf = buf
+	}
+// merge merges data from bb into bbsrc.
+func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
+	for i := 0; i < bbsrc.used; i++ {
+		bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val)
+	}
+	if bb.used == bbsrc.used {
+		return
+	}
+	if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
+		return
+	}
+	sort.Stable(bb)
+	// remove duplicates, using only newest update
+	widx := 0
+	for ridx := 1; ridx < bb.used; ridx++ {
+		if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
+			widx++
+		}
+		bb.buf[widx] = bb.buf[ridx]
+	}
+	bb.used = widx + 1
+func (bb *bucketBuffer) Len() int { return bb.used }
+func (bb *bucketBuffer) Less(i, j int) bool {
+	return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
+func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }
+func (bb *bucketBuffer) Copy() *bucketBuffer {
+	bbCopy := bucketBuffer{
+		buf:  make([]kv, len(bb.buf)),
+		used: bb.used,
+	}
+	copy(bbCopy.buf, bb.buf)
+	return &bbCopy
diff --git a/vendor/go.etcd.io/etcd/mvcc/doc.go b/vendor/go.etcd.io/etcd/mvcc/doc.go
new file mode 100644
index 0000000..ad5be03
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// Package mvcc defines etcd's stable MVCC storage.
+package mvcc
diff --git a/vendor/go.etcd.io/etcd/mvcc/index.go b/vendor/go.etcd.io/etcd/mvcc/index.go
new file mode 100644
index 0000000..f8cc6df
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/index.go
@@ -0,0 +1,258 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"sort"
+	"sync"
+	"github.com/google/btree"
+	"go.uber.org/zap"
+type index interface {
+	Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
+	Range(key, end []byte, atRev int64) ([][]byte, []revision)
+	Revisions(key, end []byte, atRev int64) []revision
+	Put(key []byte, rev revision)
+	Tombstone(key []byte, rev revision) error
+	RangeSince(key, end []byte, rev int64) []revision
+	Compact(rev int64) map[revision]struct{}
+	Keep(rev int64) map[revision]struct{}
+	Equal(b index) bool
+	Insert(ki *keyIndex)
+	KeyIndex(ki *keyIndex) *keyIndex
+type treeIndex struct {
+	sync.RWMutex
+	tree *btree.BTree
+	lg   *zap.Logger
+func newTreeIndex(lg *zap.Logger) index {
+	return &treeIndex{
+		tree: btree.New(32),
+		lg:   lg,
+	}
+func (ti *treeIndex) Put(key []byte, rev revision) {
+	keyi := &keyIndex{key: key}
+	ti.Lock()
+	defer ti.Unlock()
+	item := ti.tree.Get(keyi)
+	if item == nil {
+		keyi.put(ti.lg, rev.main, rev.sub)
+		ti.tree.ReplaceOrInsert(keyi)
+		return
+	}
+	okeyi := item.(*keyIndex)
+	okeyi.put(ti.lg, rev.main, rev.sub)
+func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
+	keyi := &keyIndex{key: key}
+	ti.RLock()
+	defer ti.RUnlock()
+	if keyi = ti.keyIndex(keyi); keyi == nil {
+		return revision{}, revision{}, 0, ErrRevisionNotFound
+	}
+	return keyi.get(ti.lg, atRev)
+func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
+	ti.RLock()
+	defer ti.RUnlock()
+	return ti.keyIndex(keyi)
+func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
+	if item := ti.tree.Get(keyi); item != nil {
+		return item.(*keyIndex)
+	}
+	return nil
+func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
+	keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
+	ti.RLock()
+	defer ti.RUnlock()
+	ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
+		if len(endi.key) > 0 && !item.Less(endi) {
+			return false
+		}
+		f(item.(*keyIndex))
+		return true
+	})
+func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
+	if end == nil {
+		rev, _, _, err := ti.Get(key, atRev)
+		if err != nil {
+			return nil
+		}
+		return []revision{rev}
+	}
+	ti.visit(key, end, func(ki *keyIndex) {
+		if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
+			revs = append(revs, rev)
+		}
+	})
+	return revs
+func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
+	if end == nil {
+		rev, _, _, err := ti.Get(key, atRev)
+		if err != nil {
+			return nil, nil
+		}
+		return [][]byte{key}, []revision{rev}
+	}
+	ti.visit(key, end, func(ki *keyIndex) {
+		if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
+			revs = append(revs, rev)
+			keys = append(keys, ki.key)
+		}
+	})
+	return keys, revs
+func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
+	keyi := &keyIndex{key: key}
+	ti.Lock()
+	defer ti.Unlock()
+	item := ti.tree.Get(keyi)
+	if item == nil {
+		return ErrRevisionNotFound
+	}
+	ki := item.(*keyIndex)
+	return ki.tombstone(ti.lg, rev.main, rev.sub)
+// RangeSince returns all revisions from key(including) to end(excluding)
+// at or after the given rev. The returned slice is sorted in the order
+// of revision.
+func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
+	keyi := &keyIndex{key: key}
+	ti.RLock()
+	defer ti.RUnlock()
+	if end == nil {
+		item := ti.tree.Get(keyi)
+		if item == nil {
+			return nil
+		}
+		keyi = item.(*keyIndex)
+		return keyi.since(ti.lg, rev)
+	}
+	endi := &keyIndex{key: end}
+	var revs []revision
+	ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
+		if len(endi.key) > 0 && !item.Less(endi) {
+			return false
+		}
+		curKeyi := item.(*keyIndex)
+		revs = append(revs, curKeyi.since(ti.lg, rev)...)
+		return true
+	})
+	sort.Sort(revisions(revs))
+	return revs
+func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
+	available := make(map[revision]struct{})
+	if ti.lg != nil {
+		ti.lg.Info("compact tree index", zap.Int64("revision", rev))
+	} else {
+		plog.Printf("store.index: compact %d", rev)
+	}
+	ti.Lock()
+	clone := ti.tree.Clone()
+	ti.Unlock()
+	clone.Ascend(func(item btree.Item) bool {
+		keyi := item.(*keyIndex)
+		//Lock is needed here to prevent modification to the keyIndex while
+		//compaction is going on or revision added to empty before deletion
+		ti.Lock()
+		keyi.compact(ti.lg, rev, available)
+		if keyi.isEmpty() {
+			item := ti.tree.Delete(keyi)
+			if item == nil {
+				if ti.lg != nil {
+					ti.lg.Panic("failed to delete during compaction")
+				} else {
+					plog.Panic("store.index: unexpected delete failure during compaction")
+				}
+			}
+		}
+		ti.Unlock()
+		return true
+	})
+	return available
+// Keep finds all revisions to be kept for a Compaction at the given rev.
+func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
+	available := make(map[revision]struct{})
+	ti.RLock()
+	defer ti.RUnlock()
+	ti.tree.Ascend(func(i btree.Item) bool {
+		keyi := i.(*keyIndex)
+		keyi.keep(rev, available)
+		return true
+	})
+	return available
+func (ti *treeIndex) Equal(bi index) bool {
+	b := bi.(*treeIndex)
+	if ti.tree.Len() != b.tree.Len() {
+		return false
+	}
+	equal := true
+	ti.tree.Ascend(func(item btree.Item) bool {
+		aki := item.(*keyIndex)
+		bki := b.tree.Get(item).(*keyIndex)
+		if !aki.equal(bki) {
+			equal = false
+			return false
+		}
+		return true
+	})
+	return equal
+func (ti *treeIndex) Insert(ki *keyIndex) {
+	ti.Lock()
+	defer ti.Unlock()
+	ti.tree.ReplaceOrInsert(ki)
diff --git a/vendor/go.etcd.io/etcd/mvcc/key_index.go b/vendor/go.etcd.io/etcd/mvcc/key_index.go
new file mode 100644
index 0000000..cf77cb4
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/key_index.go
@@ -0,0 +1,402 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"bytes"
+	"errors"
+	"fmt"
+	"github.com/google/btree"
+	"go.uber.org/zap"
+var (
+	ErrRevisionNotFound = errors.New("mvcc: revision not found")
+// keyIndex stores the revisions of a key in the backend.
+// Each keyIndex has at least one key generation.
+// Each generation might have several key versions.
+// Tombstone on a key appends an tombstone version at the end
+// of the current generation and creates a new empty generation.
+// Each version of a key has an index pointing to the backend.
+// For example: put(1.0);put(2.0);tombstone(3.0);put(4.0);tombstone(5.0) on key "foo"
+// generate a keyIndex:
+// key:     "foo"
+// rev: 5
+// generations:
+//    {empty}
+//    {4.0, 5.0(t)}
+//    {1.0, 2.0, 3.0(t)}
+// Compact a keyIndex removes the versions with smaller or equal to
+// rev except the largest one. If the generation becomes empty
+// during compaction, it will be removed. if all the generations get
+// removed, the keyIndex should be removed.
+// For example:
+// compact(2) on the previous example
+// generations:
+//    {empty}
+//    {4.0, 5.0(t)}
+//    {2.0, 3.0(t)}
+// compact(4)
+// generations:
+//    {empty}
+//    {4.0, 5.0(t)}
+// compact(5):
+// generations:
+//    {empty} -> key SHOULD be removed.
+// compact(6):
+// generations:
+//    {empty} -> key SHOULD be removed.
+type keyIndex struct {
+	key         []byte
+	modified    revision // the main rev of the last modification
+	generations []generation
+// put puts a revision to the keyIndex.
+func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
+	rev := revision{main: main, sub: sub}
+	if !rev.GreaterThan(ki.modified) {
+		if lg != nil {
+			lg.Panic(
+				"'put' with an unexpected smaller revision",
+				zap.Int64("given-revision-main", rev.main),
+				zap.Int64("given-revision-sub", rev.sub),
+				zap.Int64("modified-revision-main", ki.modified.main),
+				zap.Int64("modified-revision-sub", ki.modified.sub),
+			)
+		} else {
+			plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
+		}
+	}
+	if len(ki.generations) == 0 {
+		ki.generations = append(ki.generations, generation{})
+	}
+	g := &ki.generations[len(ki.generations)-1]
+	if len(g.revs) == 0 { // create a new key
+		keysGauge.Inc()
+		g.created = rev
+	}
+	g.revs = append(g.revs, rev)
+	g.ver++
+	ki.modified = rev
+func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
+	if len(ki.generations) != 0 {
+		if lg != nil {
+			lg.Panic(
+				"'restore' got an unexpected non-empty generations",
+				zap.Int("generations-size", len(ki.generations)),
+			)
+		} else {
+			plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
+		}
+	}
+	ki.modified = modified
+	g := generation{created: created, ver: ver, revs: []revision{modified}}
+	ki.generations = append(ki.generations, g)
+	keysGauge.Inc()
+// tombstone puts a revision, pointing to a tombstone, to the keyIndex.
+// It also creates a new empty generation in the keyIndex.
+// It returns ErrRevisionNotFound when tombstone on an empty generation.
+func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
+	if ki.isEmpty() {
+		if lg != nil {
+			lg.Panic(
+				"'tombstone' got an unexpected empty keyIndex",
+				zap.String("key", string(ki.key)),
+			)
+		} else {
+			plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
+		}
+	}
+	if ki.generations[len(ki.generations)-1].isEmpty() {
+		return ErrRevisionNotFound
+	}
+	ki.put(lg, main, sub)
+	ki.generations = append(ki.generations, generation{})
+	keysGauge.Dec()
+	return nil
+// get gets the modified, created revision and version of the key that satisfies the given atRev.
+// Rev must be higher than or equal to the given atRev.
+func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
+	if ki.isEmpty() {
+		if lg != nil {
+			lg.Panic(
+				"'get' got an unexpected empty keyIndex",
+				zap.String("key", string(ki.key)),
+			)
+		} else {
+			plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
+		}
+	}
+	g := ki.findGeneration(atRev)
+	if g.isEmpty() {
+		return revision{}, revision{}, 0, ErrRevisionNotFound
+	}
+	n := g.walk(func(rev revision) bool { return rev.main > atRev })
+	if n != -1 {
+		return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
+	}
+	return revision{}, revision{}, 0, ErrRevisionNotFound
+// since returns revisions since the given rev. Only the revision with the
+// largest sub revision will be returned if multiple revisions have the same
+// main revision.
+func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
+	if ki.isEmpty() {
+		if lg != nil {
+			lg.Panic(
+				"'since' got an unexpected empty keyIndex",
+				zap.String("key", string(ki.key)),
+			)
+		} else {
+			plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
+		}
+	}
+	since := revision{rev, 0}
+	var gi int
+	// find the generations to start checking
+	for gi = len(ki.generations) - 1; gi > 0; gi-- {
+		g := ki.generations[gi]
+		if g.isEmpty() {
+			continue
+		}
+		if since.GreaterThan(g.created) {
+			break
+		}
+	}
+	var revs []revision
+	var last int64
+	for ; gi < len(ki.generations); gi++ {
+		for _, r := range ki.generations[gi].revs {
+			if since.GreaterThan(r) {
+				continue
+			}
+			if r.main == last {
+				// replace the revision with a new one that has higher sub value,
+				// because the original one should not be seen by external
+				revs[len(revs)-1] = r
+				continue
+			}
+			revs = append(revs, r)
+			last = r.main
+		}
+	}
+	return revs
+// compact compacts a keyIndex by removing the versions with smaller or equal
+// revision than the given atRev except the largest one (If the largest one is
+// a tombstone, it will not be kept).
+// If a generation becomes empty during compaction, it will be removed.
+func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
+	if ki.isEmpty() {
+		if lg != nil {
+			lg.Panic(
+				"'compact' got an unexpected empty keyIndex",
+				zap.String("key", string(ki.key)),
+			)
+		} else {
+			plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
+		}
+	}
+	genIdx, revIndex := ki.doCompact(atRev, available)
+	g := &ki.generations[genIdx]
+	if !g.isEmpty() {
+		// remove the previous contents.
+		if revIndex != -1 {
+			g.revs = g.revs[revIndex:]
+		}
+		// remove any tombstone
+		if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {
+			delete(available, g.revs[0])
+			genIdx++
+		}
+	}
+	// remove the previous generations.
+	ki.generations = ki.generations[genIdx:]
+// keep finds the revision to be kept if compact is called at given atRev.
+func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
+	if ki.isEmpty() {
+		return
+	}
+	genIdx, revIndex := ki.doCompact(atRev, available)
+	g := &ki.generations[genIdx]
+	if !g.isEmpty() {
+		// remove any tombstone
+		if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
+			delete(available, g.revs[revIndex])
+		}
+	}
+func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
+	// walk until reaching the first revision smaller or equal to "atRev",
+	// and add the revision to the available map
+	f := func(rev revision) bool {
+		if rev.main <= atRev {
+			available[rev] = struct{}{}
+			return false
+		}
+		return true
+	}
+	genIdx, g := 0, &ki.generations[0]
+	// find first generation includes atRev or created after atRev
+	for genIdx < len(ki.generations)-1 {
+		if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
+			break
+		}
+		genIdx++
+		g = &ki.generations[genIdx]
+	}
+	revIndex = g.walk(f)
+	return genIdx, revIndex
+func (ki *keyIndex) isEmpty() bool {
+	return len(ki.generations) == 1 && ki.generations[0].isEmpty()
+// findGeneration finds out the generation of the keyIndex that the
+// given rev belongs to. If the given rev is at the gap of two generations,
+// which means that the key does not exist at the given rev, it returns nil.
+func (ki *keyIndex) findGeneration(rev int64) *generation {
+	lastg := len(ki.generations) - 1
+	cg := lastg
+	for cg >= 0 {
+		if len(ki.generations[cg].revs) == 0 {
+			cg--
+			continue
+		}
+		g := ki.generations[cg]
+		if cg != lastg {
+			if tomb := g.revs[len(g.revs)-1].main; tomb <= rev {
+				return nil
+			}
+		}
+		if g.revs[0].main <= rev {
+			return &ki.generations[cg]
+		}
+		cg--
+	}
+	return nil
+func (ki *keyIndex) Less(b btree.Item) bool {
+	return bytes.Compare(ki.key, b.(*keyIndex).key) == -1
+func (ki *keyIndex) equal(b *keyIndex) bool {
+	if !bytes.Equal(ki.key, b.key) {
+		return false
+	}
+	if ki.modified != b.modified {
+		return false
+	}
+	if len(ki.generations) != len(b.generations) {
+		return false
+	}
+	for i := range ki.generations {
+		ag, bg := ki.generations[i], b.generations[i]
+		if !ag.equal(bg) {
+			return false
+		}
+	}
+	return true
+func (ki *keyIndex) String() string {
+	var s string
+	for _, g := range ki.generations {
+		s += g.String()
+	}
+	return s
+// generation contains multiple revisions of a key.
+type generation struct {
+	ver     int64
+	created revision // when the generation is created (put in first revision).
+	revs    []revision
+func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
+// walk walks through the revisions in the generation in descending order.
+// It passes the revision to the given function.
+// walk returns until: 1. it finishes walking all pairs 2. the function returns false.
+// walk returns the position at where it stopped. If it stopped after
+// finishing walking, -1 will be returned.
+func (g *generation) walk(f func(rev revision) bool) int {
+	l := len(g.revs)
+	for i := range g.revs {
+		ok := f(g.revs[l-i-1])
+		if !ok {
+			return l - i - 1
+		}
+	}
+	return -1
+func (g *generation) String() string {
+	return fmt.Sprintf("g: created[%d] ver[%d], revs %#v\n", g.created, g.ver, g.revs)
+func (g generation) equal(b generation) bool {
+	if g.ver != b.ver {
+		return false
+	}
+	if len(g.revs) != len(b.revs) {
+		return false
+	}
+	for i := range g.revs {
+		ar, br := g.revs[i], b.revs[i]
+		if ar != br {
+			return false
+		}
+	}
+	return true
diff --git a/vendor/go.etcd.io/etcd/mvcc/kv.go b/vendor/go.etcd.io/etcd/mvcc/kv.go
new file mode 100644
index 0000000..8e898a5
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kv.go
@@ -0,0 +1,149 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"go.etcd.io/etcd/lease"
+	"go.etcd.io/etcd/mvcc/backend"
+	"go.etcd.io/etcd/mvcc/mvccpb"
+type RangeOptions struct {
+	Limit int64
+	Rev   int64
+	Count bool
+type RangeResult struct {
+	KVs   []mvccpb.KeyValue
+	Rev   int64
+	Count int
+type ReadView interface {
+	// FirstRev returns the first KV revision at the time of opening the txn.
+	// After a compaction, the first revision increases to the compaction
+	// revision.
+	FirstRev() int64
+	// Rev returns the revision of the KV at the time of opening the txn.
+	Rev() int64
+	// Range gets the keys in the range at rangeRev.
+	// The returned rev is the current revision of the KV when the operation is executed.
+	// If rangeRev <=0, range gets the keys at currentRev.
+	// If `end` is nil, the request returns the key.
+	// If `end` is not nil and not empty, it gets the keys in range [key, range_end).
+	// If `end` is not nil and empty, it gets the keys greater than or equal to key.
+	// Limit limits the number of keys returned.
+	// If the required rev is compacted, ErrCompacted will be returned.
+	Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error)
+// TxnRead represents a read-only transaction with operations that will not
+// block other read transactions.
+type TxnRead interface {
+	ReadView
+	// End marks the transaction is complete and ready to commit.
+	End()
+type WriteView interface {
+	// DeleteRange deletes the given range from the store.
+	// A deleteRange increases the rev of the store if any key in the range exists.
+	// The number of key deleted will be returned.
+	// The returned rev is the current revision of the KV when the operation is executed.
+	// It also generates one event for each key delete in the event history.
+	// if the `end` is nil, deleteRange deletes the key.
+	// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
+	DeleteRange(key, end []byte) (n, rev int64)
+	// Put puts the given key, value into the store. Put also takes additional argument lease to
+	// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
+	// id.
+	// A put also increases the rev of the store, and generates one event in the event history.
+	// The returned rev is the current revision of the KV when the operation is executed.
+	Put(key, value []byte, lease lease.LeaseID) (rev int64)
+// TxnWrite represents a transaction that can modify the store.
+type TxnWrite interface {
+	TxnRead
+	WriteView
+	// Changes gets the changes made since opening the write txn.
+	Changes() []mvccpb.KeyValue
+// txnReadWrite coerces a read txn to a write, panicking on any write operation.
+type txnReadWrite struct{ TxnRead }
+func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") }
+func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+	panic("unexpected Put")
+func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }
+func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }
+type KV interface {
+	ReadView
+	WriteView
+	// Read creates a read transaction.
+	Read() TxnRead
+	// Write creates a write transaction.
+	Write() TxnWrite
+	// Hash computes the hash of the KV's backend.
+	Hash() (hash uint32, revision int64, err error)
+	// HashByRev computes the hash of all MVCC revisions up to a given revision.
+	HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
+	// Compact frees all superseded keys with revisions less than rev.
+	Compact(rev int64) (<-chan struct{}, error)
+	// Commit commits outstanding txns into the underlying backend.
+	Commit()
+	// Restore restores the KV store from a backend.
+	Restore(b backend.Backend) error
+	Close() error
+// WatchableKV is a KV that can be watched.
+type WatchableKV interface {
+	KV
+	Watchable
+// Watchable is the interface that wraps the NewWatchStream function.
+type Watchable interface {
+	// NewWatchStream returns a WatchStream that can be used to
+	// watch events happened or happening on the KV.
+	NewWatchStream() WatchStream
+// ConsistentWatchableKV is a WatchableKV that understands the consistency
+// algorithm and consistent index.
+// If the consistent index of executing entry is not larger than the
+// consistent index of ConsistentWatchableKV, all operations in
+// this entry are skipped and return empty response.
+type ConsistentWatchableKV interface {
+	WatchableKV
+	// ConsistentIndex returns the current consistent index of the KV.
+	ConsistentIndex() uint64
diff --git a/vendor/go.etcd.io/etcd/mvcc/kv_view.go b/vendor/go.etcd.io/etcd/mvcc/kv_view.go
new file mode 100644
index 0000000..bd2e777
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kv_view.go
@@ -0,0 +1,51 @@
+// Copyright 2017 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import "go.etcd.io/etcd/lease"
+type readView struct{ kv KV }
+func (rv *readView) FirstRev() int64 {
+	tr := rv.kv.Read()
+	defer tr.End()
+	return tr.FirstRev()
+func (rv *readView) Rev() int64 {
+	tr := rv.kv.Read()
+	defer tr.End()
+	return tr.Rev()
+func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+	tr := rv.kv.Read()
+	defer tr.End()
+	return tr.Range(key, end, ro)
+type writeView struct{ kv KV }
+func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
+	tw := wv.kv.Write()
+	defer tw.End()
+	return tw.DeleteRange(key, end)
+func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+	tw := wv.kv.Write()
+	defer tw.End()
+	return tw.Put(key, value, lease)
diff --git a/vendor/go.etcd.io/etcd/mvcc/kvstore.go b/vendor/go.etcd.io/etcd/mvcc/kvstore.go
new file mode 100644
index 0000000..e367ebb
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kvstore.go
@@ -0,0 +1,620 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"context"
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"hash/crc32"
+	"math"
+	"sync"
+	"sync/atomic"
+	"time"
+	"go.etcd.io/etcd/lease"
+	"go.etcd.io/etcd/mvcc/backend"
+	"go.etcd.io/etcd/mvcc/mvccpb"
+	"go.etcd.io/etcd/pkg/schedule"
+	"github.com/coreos/pkg/capnslog"
+	"go.uber.org/zap"
+var (
+	keyBucketName  = []byte("key")
+	metaBucketName = []byte("meta")
+	consistentIndexKeyName  = []byte("consistent_index")
+	scheduledCompactKeyName = []byte("scheduledCompactRev")
+	finishedCompactKeyName  = []byte("finishedCompactRev")
+	ErrCompacted = errors.New("mvcc: required revision has been compacted")
+	ErrFutureRev = errors.New("mvcc: required revision is a future revision")
+	ErrCanceled  = errors.New("mvcc: watcher is canceled")
+	ErrClosed    = errors.New("mvcc: closed")
+	plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc")
+const (
+	// markedRevBytesLen is the byte length of marked revision.
+	// The first `revBytesLen` bytes represents a normal revision. The last
+	// one byte is the mark.
+	markedRevBytesLen      = revBytesLen + 1
+	markBytePosition       = markedRevBytesLen - 1
+	markTombstone     byte = 't'
+var restoreChunkKeys = 10000 // non-const for testing
+var defaultCompactBatchLimit = 1000
+// ConsistentIndexGetter is an interface that wraps the Get method.
+// Consistent index is the offset of an entry in a consistent replicated log.
+type ConsistentIndexGetter interface {
+	// ConsistentIndex returns the consistent index of current executing entry.
+	ConsistentIndex() uint64
+type StoreConfig struct {
+	CompactionBatchLimit int
+type store struct {
+	ReadView
+	WriteView
+	// consistentIndex caches the "consistent_index" key's value. Accessed
+	// through atomics so must be 64-bit aligned.
+	consistentIndex uint64
+	cfg StoreConfig
+	// mu read locks for txns and write locks for non-txn store changes.
+	mu sync.RWMutex
+	ig ConsistentIndexGetter
+	b       backend.Backend
+	kvindex index
+	le lease.Lessor
+	// revMuLock protects currentRev and compactMainRev.
+	// Locked at end of write txn and released after write txn unlock lock.
+	// Locked before locking read txn and released after locking.
+	revMu sync.RWMutex
+	// currentRev is the revision of the last completed transaction.
+	currentRev int64
+	// compactMainRev is the main revision of the last compaction.
+	compactMainRev int64
+	// bytesBuf8 is a byte slice of length 8
+	// to avoid a repetitive allocation in saveIndex.
+	bytesBuf8 []byte
+	fifoSched schedule.Scheduler
+	stopc chan struct{}
+	lg *zap.Logger
+// NewStore returns a new store. It is useful to create a store inside
+// mvcc pkg. It should only be used for testing externally.
+func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
+	if cfg.CompactionBatchLimit == 0 {
+		cfg.CompactionBatchLimit = defaultCompactBatchLimit
+	}
+	s := &store{
+		cfg:     cfg,
+		b:       b,
+		ig:      ig,
+		kvindex: newTreeIndex(lg),
+		le: le,
+		currentRev:     1,
+		compactMainRev: -1,
+		bytesBuf8: make([]byte, 8),
+		fifoSched: schedule.NewFIFOScheduler(),
+		stopc: make(chan struct{}),
+		lg: lg,
+	}
+	s.ReadView = &readView{s}
+	s.WriteView = &writeView{s}
+	if s.le != nil {
+		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
+	}
+	tx := s.b.BatchTx()
+	tx.Lock()
+	tx.UnsafeCreateBucket(keyBucketName)
+	tx.UnsafeCreateBucket(metaBucketName)
+	tx.Unlock()
+	s.b.ForceCommit()
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if err := s.restore(); err != nil {
+		// TODO: return the error instead of panic here?
+		panic("failed to recover store from backend")
+	}
+	return s
+func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
+	if ctx == nil || ctx.Err() != nil {
+		s.mu.Lock()
+		select {
+		case <-s.stopc:
+		default:
+			f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
+			s.fifoSched.Schedule(f)
+		}
+		s.mu.Unlock()
+		return
+	}
+	close(ch)
+func (s *store) Hash() (hash uint32, revision int64, err error) {
+	// TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly
+	start := time.Now()
+	s.b.ForceCommit()
+	h, err := s.b.Hash(DefaultIgnores)
+	hashSec.Observe(time.Since(start).Seconds())
+	return h, s.currentRev, err
+func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
+	start := time.Now()
+	s.mu.RLock()
+	s.revMu.RLock()
+	compactRev, currentRev = s.compactMainRev, s.currentRev
+	s.revMu.RUnlock()
+	if rev > 0 && rev <= compactRev {
+		s.mu.RUnlock()
+		return 0, 0, compactRev, ErrCompacted
+	} else if rev > 0 && rev > currentRev {
+		s.mu.RUnlock()
+		return 0, currentRev, 0, ErrFutureRev
+	}
+	if rev == 0 {
+		rev = currentRev
+	}
+	keep := s.kvindex.Keep(rev)
+	tx := s.b.ReadTx()
+	tx.RLock()
+	defer tx.RUnlock()
+	s.mu.RUnlock()
+	upper := revision{main: rev + 1}
+	lower := revision{main: compactRev + 1}
+	h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
+	h.Write(keyBucketName)
+	err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
+		kr := bytesToRev(k)
+		if !upper.GreaterThan(kr) {
+			return nil
+		}
+		// skip revisions that are scheduled for deletion
+		// due to compacting; don't skip if there isn't one.
+		if lower.GreaterThan(kr) && len(keep) > 0 {
+			if _, ok := keep[kr]; !ok {
+				return nil
+			}
+		}
+		h.Write(k)
+		h.Write(v)
+		return nil
+	})
+	hash = h.Sum32()
+	hashRevSec.Observe(time.Since(start).Seconds())
+	return hash, currentRev, compactRev, err
+func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
+	s.revMu.Lock()
+	if rev <= s.compactMainRev {
+		ch := make(chan struct{})
+		f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
+		s.fifoSched.Schedule(f)
+		s.revMu.Unlock()
+		return ch, ErrCompacted
+	}
+	if rev > s.currentRev {
+		s.revMu.Unlock()
+		return nil, ErrFutureRev
+	}
+	s.compactMainRev = rev
+	rbytes := newRevBytes()
+	revToBytes(revision{main: rev}, rbytes)
+	tx := s.b.BatchTx()
+	tx.Lock()
+	tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
+	tx.Unlock()
+	// ensure that desired compaction is persisted
+	s.b.ForceCommit()
+	s.revMu.Unlock()
+	return nil, nil
+func (s *store) compact(rev int64) (<-chan struct{}, error) {
+	start := time.Now()
+	keep := s.kvindex.Compact(rev)
+	ch := make(chan struct{})
+	var j = func(ctx context.Context) {
+		if ctx.Err() != nil {
+			s.compactBarrier(ctx, ch)
+			return
+		}
+		if !s.scheduleCompaction(rev, keep) {
+			s.compactBarrier(nil, ch)
+			return
+		}
+		close(ch)
+	}
+	s.fifoSched.Schedule(j)
+	indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
+	return ch, nil
+func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
+	ch, err := s.updateCompactRev(rev)
+	if nil != err {
+		return ch, err
+	}
+	return s.compact(rev)
+func (s *store) Compact(rev int64) (<-chan struct{}, error) {
+	s.mu.Lock()
+	ch, err := s.updateCompactRev(rev)
+	if err != nil {
+		s.mu.Unlock()
+		return ch, err
+	}
+	s.mu.Unlock()
+	return s.compact(rev)
+// DefaultIgnores is a map of keys to ignore in hash checking.
+var DefaultIgnores map[backend.IgnoreKey]struct{}
+func init() {
+	DefaultIgnores = map[backend.IgnoreKey]struct{}{
+		// consistent index might be changed due to v2 internal sync, which
+		// is not controllable by the user.
+		{Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
+	}
+func (s *store) Commit() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	tx := s.b.BatchTx()
+	tx.Lock()
+	s.saveIndex(tx)
+	tx.Unlock()
+	s.b.ForceCommit()
+func (s *store) Restore(b backend.Backend) error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	close(s.stopc)
+	s.fifoSched.Stop()
+	atomic.StoreUint64(&s.consistentIndex, 0)
+	s.b = b
+	s.kvindex = newTreeIndex(s.lg)
+	s.currentRev = 1
+	s.compactMainRev = -1
+	s.fifoSched = schedule.NewFIFOScheduler()
+	s.stopc = make(chan struct{})
+	return s.restore()
+func (s *store) restore() error {
+	s.setupMetricsReporter()
+	min, max := newRevBytes(), newRevBytes()
+	revToBytes(revision{main: 1}, min)
+	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
+	keyToLease := make(map[string]lease.LeaseID)
+	// restore index
+	tx := s.b.BatchTx()
+	tx.Lock()
+	_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
+	if len(finishedCompactBytes) != 0 {
+		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
+		if s.lg != nil {
+			s.lg.Info(
+				"restored last compact revision",
+				zap.String("meta-bucket-name", string(metaBucketName)),
+				zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
+				zap.Int64("restored-compact-revision", s.compactMainRev),
+			)
+		} else {
+			plog.Printf("restore compact to %d", s.compactMainRev)
+		}
+	}
+	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
+	scheduledCompact := int64(0)
+	if len(scheduledCompactBytes) != 0 {
+		scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
+	}
+	// index keys concurrently as they're loaded in from tx
+	keysGauge.Set(0)
+	rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
+	for {
+		keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
+		if len(keys) == 0 {
+			break
+		}
+		// rkvc blocks if the total pending keys exceeds the restore
+		// chunk size to keep keys from consuming too much memory.
+		restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
+		if len(keys) < restoreChunkKeys {
+			// partial set implies final set
+			break
+		}
+		// next set begins after where this one ended
+		newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
+		newMin.sub++
+		revToBytes(newMin, min)
+	}
+	close(rkvc)
+	s.currentRev = <-revc
+	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
+	// the correct revision should be set to compaction revision in the case, not the largest revision
+	// we have seen.
+	if s.currentRev < s.compactMainRev {
+		s.currentRev = s.compactMainRev
+	}
+	if scheduledCompact <= s.compactMainRev {
+		scheduledCompact = 0
+	}
+	for key, lid := range keyToLease {
+		if s.le == nil {
+			panic("no lessor to attach lease")
+		}
+		err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
+		if err != nil {
+			if s.lg != nil {
+				s.lg.Warn(
+					"failed to attach a lease",
+					zap.String("lease-id", fmt.Sprintf("%016x", lid)),
+					zap.Error(err),
+				)
+			} else {
+				plog.Errorf("unexpected Attach error: %v", err)
+			}
+		}
+	}
+	tx.Unlock()
+	if scheduledCompact != 0 {
+		s.compactLockfree(scheduledCompact)
+		if s.lg != nil {
+			s.lg.Info(
+				"resume scheduled compaction",
+				zap.String("meta-bucket-name", string(metaBucketName)),
+				zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
+				zap.Int64("scheduled-compact-revision", scheduledCompact),
+			)
+		} else {
+			plog.Printf("resume scheduled compaction at %d", scheduledCompact)
+		}
+	}
+	return nil
+type revKeyValue struct {
+	key  []byte
+	kv   mvccpb.KeyValue
+	kstr string
+func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
+	rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
+	go func() {
+		currentRev := int64(1)
+		defer func() { revc <- currentRev }()
+		// restore the tree index from streaming the unordered index.
+		kiCache := make(map[string]*keyIndex, restoreChunkKeys)
+		for rkv := range rkvc {
+			ki, ok := kiCache[rkv.kstr]
+			// purge kiCache if many keys but still missing in the cache
+			if !ok && len(kiCache) >= restoreChunkKeys {
+				i := 10
+				for k := range kiCache {
+					delete(kiCache, k)
+					if i--; i == 0 {
+						break
+					}
+				}
+			}
+			// cache miss, fetch from tree index if there
+			if !ok {
+				ki = &keyIndex{key: rkv.kv.Key}
+				if idxKey := idx.KeyIndex(ki); idxKey != nil {
+					kiCache[rkv.kstr], ki = idxKey, idxKey
+					ok = true
+				}
+			}
+			rev := bytesToRev(rkv.key)
+			currentRev = rev.main
+			if ok {
+				if isTombstone(rkv.key) {
+					ki.tombstone(lg, rev.main, rev.sub)
+					continue
+				}
+				ki.put(lg, rev.main, rev.sub)
+			} else if !isTombstone(rkv.key) {
+				ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
+				idx.Insert(ki)
+				kiCache[rkv.kstr] = ki
+			}
+		}
+	}()
+	return rkvc, revc
+func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
+	for i, key := range keys {
+		rkv := revKeyValue{key: key}
+		if err := rkv.kv.Unmarshal(vals[i]); err != nil {
+			if lg != nil {
+				lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
+			} else {
+				plog.Fatalf("cannot unmarshal event: %v", err)
+			}
+		}
+		rkv.kstr = string(rkv.kv.Key)
+		if isTombstone(key) {
+			delete(keyToLease, rkv.kstr)
+		} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
+			keyToLease[rkv.kstr] = lid
+		} else {
+			delete(keyToLease, rkv.kstr)
+		}
+		kvc <- rkv
+	}
+func (s *store) Close() error {
+	close(s.stopc)
+	s.fifoSched.Stop()
+	return nil
+func (s *store) saveIndex(tx backend.BatchTx) {
+	if s.ig == nil {
+		return
+	}
+	bs := s.bytesBuf8
+	ci := s.ig.ConsistentIndex()
+	binary.BigEndian.PutUint64(bs, ci)
+	// put the index into the underlying backend
+	// tx has been locked in TxnBegin, so there is no need to lock it again
+	tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
+	atomic.StoreUint64(&s.consistentIndex, ci)
+func (s *store) ConsistentIndex() uint64 {
+	if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 {
+		return ci
+	}
+	tx := s.b.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+	_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
+	if len(vs) == 0 {
+		return 0
+	}
+	v := binary.BigEndian.Uint64(vs[0])
+	atomic.StoreUint64(&s.consistentIndex, v)
+	return v
+func (s *store) setupMetricsReporter() {
+	b := s.b
+	reportDbTotalSizeInBytesMu.Lock()
+	reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
+	reportDbTotalSizeInBytesMu.Unlock()
+	reportDbTotalSizeInBytesDebugMu.Lock()
+	reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) }
+	reportDbTotalSizeInBytesDebugMu.Unlock()
+	reportDbTotalSizeInUseInBytesMu.Lock()
+	reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
+	reportDbTotalSizeInUseInBytesMu.Unlock()
+	reportDbOpenReadTxNMu.Lock()
+	reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
+	reportDbOpenReadTxNMu.Unlock()
+	reportCurrentRevMu.Lock()
+	reportCurrentRev = func() float64 {
+		s.revMu.RLock()
+		defer s.revMu.RUnlock()
+		return float64(s.currentRev)
+	}
+	reportCurrentRevMu.Unlock()
+	reportCompactRevMu.Lock()
+	reportCompactRev = func() float64 {
+		s.revMu.RLock()
+		defer s.revMu.RUnlock()
+		return float64(s.compactMainRev)
+	}
+	reportCompactRevMu.Unlock()
+// appendMarkTombstone appends tombstone mark to normal revision bytes.
+func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
+	if len(b) != revBytesLen {
+		if lg != nil {
+			lg.Panic(
+				"cannot append tombstone mark to non-normal revision bytes",
+				zap.Int("expected-revision-bytes-size", revBytesLen),
+				zap.Int("given-revision-bytes-size", len(b)),
+			)
+		} else {
+			plog.Panicf("cannot append mark to non normal revision bytes")
+		}
+	}
+	return append(b, markTombstone)
+// isTombstone checks whether the revision bytes is a tombstone.
+func isTombstone(b []byte) bool {
+	return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
diff --git a/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go b/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go
new file mode 100644
index 0000000..2adb498
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go
@@ -0,0 +1,79 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"encoding/binary"
+	"time"
+	"go.uber.org/zap"
+func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
+	totalStart := time.Now()
+	defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
+	keyCompactions := 0
+	defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }()
+	end := make([]byte, 8)
+	binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
+	last := make([]byte, 8+1+8)
+	for {
+		var rev revision
+		start := time.Now()
+		tx := s.b.BatchTx()
+		tx.Lock()
+		keys, _ := tx.UnsafeRange(keyBucketName, last, end, int64(s.cfg.CompactionBatchLimit))
+		for _, key := range keys {
+			rev = bytesToRev(key)
+			if _, ok := keep[rev]; !ok {
+				tx.UnsafeDelete(keyBucketName, key)
+			}
+		}
+		if len(keys) < s.cfg.CompactionBatchLimit {
+			rbytes := make([]byte, 8+1+8)
+			revToBytes(revision{main: compactMainRev}, rbytes)
+			tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
+			tx.Unlock()
+			if s.lg != nil {
+				s.lg.Info(
+					"finished scheduled compaction",
+					zap.Int64("compact-revision", compactMainRev),
+					zap.Duration("took", time.Since(totalStart)),
+				)
+			} else {
+				plog.Infof("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
+			}
+			return true
+		}
+		// update last
+		revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
+		tx.Unlock()
+		// Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
+		s.b.ForceCommit()
+		dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
+		select {
+		case <-time.After(10 * time.Millisecond):
+		case <-s.stopc:
+			return false
+		}
+	}
diff --git a/vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go b/vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go
new file mode 100644
index 0000000..9698254
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go
@@ -0,0 +1,313 @@
+// Copyright 2017 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"go.etcd.io/etcd/lease"
+	"go.etcd.io/etcd/mvcc/backend"
+	"go.etcd.io/etcd/mvcc/mvccpb"
+	"go.uber.org/zap"
+type storeTxnRead struct {
+	s  *store
+	tx backend.ReadTx
+	firstRev int64
+	rev      int64
+func (s *store) Read() TxnRead {
+	s.mu.RLock()
+	s.revMu.RLock()
+	// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
+	// ConcurrentReadTx is created, it will not block write transaction.
+	tx := s.b.ConcurrentReadTx()
+	tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
+	firstRev, rev := s.compactMainRev, s.currentRev
+	s.revMu.RUnlock()
+	return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
+func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
+func (tr *storeTxnRead) Rev() int64      { return tr.rev }
+func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+	return tr.rangeKeys(key, end, tr.Rev(), ro)
+func (tr *storeTxnRead) End() {
+	tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
+	tr.s.mu.RUnlock()
+type storeTxnWrite struct {
+	storeTxnRead
+	tx backend.BatchTx
+	// beginRev is the revision where the txn begins; it will write to the next revision.
+	beginRev int64
+	changes  []mvccpb.KeyValue
+func (s *store) Write() TxnWrite {
+	s.mu.RLock()
+	tx := s.b.BatchTx()
+	tx.Lock()
+	tw := &storeTxnWrite{
+		storeTxnRead: storeTxnRead{s, tx, 0, 0},
+		tx:           tx,
+		beginRev:     s.currentRev,
+		changes:      make([]mvccpb.KeyValue, 0, 4),
+	}
+	return newMetricsTxnWrite(tw)
+func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
+func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+	rev := tw.beginRev
+	if len(tw.changes) > 0 {
+		rev++
+	}
+	return tw.rangeKeys(key, end, rev, ro)
+func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
+	if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
+		return n, tw.beginRev + 1
+	}
+	return 0, tw.beginRev
+func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
+	tw.put(key, value, lease)
+	return tw.beginRev + 1
+func (tw *storeTxnWrite) End() {
+	// only update index if the txn modifies the mvcc state.
+	if len(tw.changes) != 0 {
+		tw.s.saveIndex(tw.tx)
+		// hold revMu lock to prevent new read txns from opening until writeback.
+		tw.s.revMu.Lock()
+		tw.s.currentRev++
+	}
+	tw.tx.Unlock()
+	if len(tw.changes) != 0 {
+		tw.s.revMu.Unlock()
+	}
+	tw.s.mu.RUnlock()
+func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
+	rev := ro.Rev
+	if rev > curRev {
+		return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
+	}
+	if rev <= 0 {
+		rev = curRev
+	}
+	if rev < tr.s.compactMainRev {
+		return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
+	}
+	revpairs := tr.s.kvindex.Revisions(key, end, rev)
+	if len(revpairs) == 0 {
+		return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
+	}
+	if ro.Count {
+		return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
+	}
+	limit := int(ro.Limit)
+	if limit <= 0 || limit > len(revpairs) {
+		limit = len(revpairs)
+	}
+	kvs := make([]mvccpb.KeyValue, limit)
+	revBytes := newRevBytes()
+	for i, revpair := range revpairs[:len(kvs)] {
+		revToBytes(revpair, revBytes)
+		_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
+		if len(vs) != 1 {
+			if tr.s.lg != nil {
+				tr.s.lg.Fatal(
+					"range failed to find revision pair",
+					zap.Int64("revision-main", revpair.main),
+					zap.Int64("revision-sub", revpair.sub),
+				)
+			} else {
+				plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
+			}
+		}
+		if err := kvs[i].Unmarshal(vs[0]); err != nil {
+			if tr.s.lg != nil {
+				tr.s.lg.Fatal(
+					"failed to unmarshal mvccpb.KeyValue",
+					zap.Error(err),
+				)
+			} else {
+				plog.Fatalf("cannot unmarshal event: %v", err)
+			}
+		}
+	}
+	return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
+func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
+	rev := tw.beginRev + 1
+	c := rev
+	oldLease := lease.NoLease
+	// if the key exists before, use its previous created and
+	// get its previous leaseID
+	_, created, ver, err := tw.s.kvindex.Get(key, rev)
+	if err == nil {
+		c = created.main
+		oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
+	}
+	ibytes := newRevBytes()
+	idxRev := revision{main: rev, sub: int64(len(tw.changes))}
+	revToBytes(idxRev, ibytes)
+	ver = ver + 1
+	kv := mvccpb.KeyValue{
+		Key:            key,
+		Value:          value,
+		CreateRevision: c,
+		ModRevision:    rev,
+		Version:        ver,
+		Lease:          int64(leaseID),
+	}
+	d, err := kv.Marshal()
+	if err != nil {
+		if tw.storeTxnRead.s.lg != nil {
+			tw.storeTxnRead.s.lg.Fatal(
+				"failed to marshal mvccpb.KeyValue",
+				zap.Error(err),
+			)
+		} else {
+			plog.Fatalf("cannot marshal event: %v", err)
+		}
+	}
+	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
+	tw.s.kvindex.Put(key, idxRev)
+	tw.changes = append(tw.changes, kv)
+	if oldLease != lease.NoLease {
+		if tw.s.le == nil {
+			panic("no lessor to detach lease")
+		}
+		err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
+		if err != nil {
+			if tw.storeTxnRead.s.lg != nil {
+				tw.storeTxnRead.s.lg.Fatal(
+					"failed to detach old lease from a key",
+					zap.Error(err),
+				)
+			} else {
+				plog.Errorf("unexpected error from lease detach: %v", err)
+			}
+		}
+	}
+	if leaseID != lease.NoLease {
+		if tw.s.le == nil {
+			panic("no lessor to attach lease")
+		}
+		err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
+		if err != nil {
+			panic("unexpected error from lease Attach")
+		}
+	}
+func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
+	rrev := tw.beginRev
+	if len(tw.changes) > 0 {
+		rrev++
+	}
+	keys, _ := tw.s.kvindex.Range(key, end, rrev)
+	if len(keys) == 0 {
+		return 0
+	}
+	for _, key := range keys {
+		tw.delete(key)
+	}
+	return int64(len(keys))
+func (tw *storeTxnWrite) delete(key []byte) {
+	ibytes := newRevBytes()
+	idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
+	revToBytes(idxRev, ibytes)
+	if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
+		ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
+	} else {
+		// TODO: remove this in v3.5
+		ibytes = appendMarkTombstone(nil, ibytes)
+	}
+	kv := mvccpb.KeyValue{Key: key}
+	d, err := kv.Marshal()
+	if err != nil {
+		if tw.storeTxnRead.s.lg != nil {
+			tw.storeTxnRead.s.lg.Fatal(
+				"failed to marshal mvccpb.KeyValue",
+				zap.Error(err),
+			)
+		} else {
+			plog.Fatalf("cannot marshal event: %v", err)
+		}
+	}
+	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
+	err = tw.s.kvindex.Tombstone(key, idxRev)
+	if err != nil {
+		if tw.storeTxnRead.s.lg != nil {
+			tw.storeTxnRead.s.lg.Fatal(
+				"failed to tombstone an existing key",
+				zap.String("key", string(key)),
+				zap.Error(err),
+			)
+		} else {
+			plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
+		}
+	}
+	tw.changes = append(tw.changes, kv)
+	item := lease.LeaseItem{Key: string(key)}
+	leaseID := tw.s.le.GetLease(item)
+	if leaseID != lease.NoLease {
+		err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
+		if err != nil {
+			if tw.storeTxnRead.s.lg != nil {
+				tw.storeTxnRead.s.lg.Fatal(
+					"failed to detach old lease from a key",
+					zap.Error(err),
+				)
+			} else {
+				plog.Errorf("cannot detach %v", err)
+			}
+		}
+	}
+func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }
diff --git a/vendor/go.etcd.io/etcd/mvcc/metrics.go b/vendor/go.etcd.io/etcd/mvcc/metrics.go
new file mode 100644
index 0000000..7526ee4
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/metrics.go
@@ -0,0 +1,336 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"sync"
+	"github.com/prometheus/client_golang/prometheus"
+var (
+	rangeCounter = prometheus.NewCounter(
+		prometheus.CounterOpts{
+			Namespace: "etcd",
+			Subsystem: "mvcc",
+			Name:      "range_total",
+			Help:      "Total number of ranges seen by this member.",
+		})
+	rangeCounterDebug = prometheus.NewCounter(
+		prometheus.CounterOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "range_total",
+			Help:      "Total number of ranges seen by this member.",
+		})
+	putCounter = prometheus.NewCounter(
+		prometheus.CounterOpts{
+			Namespace: "etcd",
+			Subsystem: "mvcc",
+			Name:      "put_total",
+			Help:      "Total number of puts seen by this member.",
+		})
+	// TODO: remove in 3.5 release
+	putCounterDebug = prometheus.NewCounter(
+		prometheus.CounterOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "put_total",
+			Help:      "Total number of puts seen by this member.",
+		})
+	deleteCounter = prometheus.NewCounter(
+		prometheus.CounterOpts{
+			Namespace: "etcd",
+			Subsystem: "mvcc",
+			Name:      "delete_total",
+			Help:      "Total number of deletes seen by this member.",
+		})
+	// TODO: remove in 3.5 release
+	deleteCounterDebug = prometheus.NewCounter(
+		prometheus.CounterOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "delete_total",
+			Help:      "Total number of deletes seen by this member.",
+		})
+	txnCounter = prometheus.NewCounter(
+		prometheus.CounterOpts{
+			Namespace: "etcd",
+			Subsystem: "mvcc",
+			Name:      "txn_total",
+			Help:      "Total number of txns seen by this member.",
+		})
+	txnCounterDebug = prometheus.NewCounter(
+		prometheus.CounterOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "txn_total",
+			Help:      "Total number of txns seen by this member.",
+		})
+	keysGauge = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "keys_total",
+			Help:      "Total number of keys.",
+		})
+	watchStreamGauge = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "watch_stream_total",
+			Help:      "Total number of watch streams.",
+		})
+	watcherGauge = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "watcher_total",
+			Help:      "Total number of watchers.",
+		})
+	slowWatcherGauge = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "slow_watcher_total",
+			Help:      "Total number of unsynced slow watchers.",
+		})
+	totalEventsCounter = prometheus.NewCounter(
+		prometheus.CounterOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "events_total",
+			Help:      "Total number of events sent by this member.",
+		})
+	pendingEventsGauge = prometheus.NewGauge(
+		prometheus.GaugeOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "pending_events_total",
+			Help:      "Total number of pending events to be sent.",
+		})
+	indexCompactionPauseMs = prometheus.NewHistogram(
+		prometheus.HistogramOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "index_compaction_pause_duration_milliseconds",
+			Help:      "Bucketed histogram of index compaction pause duration.",
+			// lowest bucket start of upper bound 0.5 ms with factor 2
+			// highest bucket start of 0.5 ms * 2^13 == 4.096 sec
+			Buckets: prometheus.ExponentialBuckets(0.5, 2, 14),
+		})
+	dbCompactionPauseMs = prometheus.NewHistogram(
+		prometheus.HistogramOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "db_compaction_pause_duration_milliseconds",
+			Help:      "Bucketed histogram of db compaction pause duration.",
+			// lowest bucket start of upper bound 1 ms with factor 2
+			// highest bucket start of 1 ms * 2^12 == 4.096 sec
+			Buckets: prometheus.ExponentialBuckets(1, 2, 13),
+		})
+	dbCompactionTotalMs = prometheus.NewHistogram(
+		prometheus.HistogramOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "db_compaction_total_duration_milliseconds",
+			Help:      "Bucketed histogram of db compaction total duration.",
+			// lowest bucket start of upper bound 100 ms with factor 2
+			// highest bucket start of 100 ms * 2^13 == 8.192 sec
+			Buckets: prometheus.ExponentialBuckets(100, 2, 14),
+		})
+	dbCompactionKeysCounter = prometheus.NewCounter(
+		prometheus.CounterOpts{
+			Namespace: "etcd_debugging",
+			Subsystem: "mvcc",
+			Name:      "db_compaction_keys_total",
+			Help:      "Total number of db keys compacted.",
+		})
+	dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "mvcc",
+		Name:      "db_total_size_in_bytes",
+		Help:      "Total size of the underlying database physically allocated in bytes.",
+	},
+		func() float64 {
+			reportDbTotalSizeInBytesMu.RLock()
+			defer reportDbTotalSizeInBytesMu.RUnlock()
+			return reportDbTotalSizeInBytes()
+		},
+	)
+	// overridden by mvcc initialization
+	reportDbTotalSizeInBytesMu sync.RWMutex
+	reportDbTotalSizeInBytes   = func() float64 { return 0 }
+	// TODO: remove this in v3.5
+	dbTotalSizeDebug = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+		Namespace: "etcd_debugging",
+		Subsystem: "mvcc",
+		Name:      "db_total_size_in_bytes",
+		Help:      "Total size of the underlying database physically allocated in bytes.",
+	},
+		func() float64 {
+			reportDbTotalSizeInBytesDebugMu.RLock()
+			defer reportDbTotalSizeInBytesDebugMu.RUnlock()
+			return reportDbTotalSizeInBytesDebug()
+		},
+	)
+	// overridden by mvcc initialization
+	reportDbTotalSizeInBytesDebugMu sync.RWMutex
+	reportDbTotalSizeInBytesDebug   = func() float64 { return 0 }
+	dbTotalSizeInUse = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "mvcc",
+		Name:      "db_total_size_in_use_in_bytes",
+		Help:      "Total size of the underlying database logically in use in bytes.",
+	},
+		func() float64 {
+			reportDbTotalSizeInUseInBytesMu.RLock()
+			defer reportDbTotalSizeInUseInBytesMu.RUnlock()
+			return reportDbTotalSizeInUseInBytes()
+		},
+	)
+	// overridden by mvcc initialization
+	reportDbTotalSizeInUseInBytesMu sync.RWMutex
+	reportDbTotalSizeInUseInBytes   = func() float64 { return 0 }
+	dbOpenReadTxN = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "mvcc",
+		Name:      "db_open_read_transactions",
+		Help:      "The number of currently open read transactions",
+	},
+		func() float64 {
+			reportDbOpenReadTxNMu.RLock()
+			defer reportDbOpenReadTxNMu.RUnlock()
+			return reportDbOpenReadTxN()
+		},
+	)
+	// overridden by mvcc initialization
+	reportDbOpenReadTxNMu sync.RWMutex
+	reportDbOpenReadTxN   = func() float64 { return 0 }
+	hashSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "mvcc",
+		Name:      "hash_duration_seconds",
+		Help:      "The latency distribution of storage hash operation.",
+		// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
+		// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
+		// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
+		Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
+	})
+	hashRevSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "mvcc",
+		Name:      "hash_rev_duration_seconds",
+		Help:      "The latency distribution of storage hash by revision operation.",
+		// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
+		// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
+		// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
+		Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
+	})
+	currentRev = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+		Namespace: "etcd_debugging",
+		Subsystem: "mvcc",
+		Name:      "current_revision",
+		Help:      "The current revision of store.",
+	},
+		func() float64 {
+			reportCurrentRevMu.RLock()
+			defer reportCurrentRevMu.RUnlock()
+			return reportCurrentRev()
+		},
+	)
+	// overridden by mvcc initialization
+	reportCurrentRevMu sync.RWMutex
+	reportCurrentRev   = func() float64 { return 0 }
+	compactRev = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+		Namespace: "etcd_debugging",
+		Subsystem: "mvcc",
+		Name:      "compact_revision",
+		Help:      "The revision of the last compaction in store.",
+	},
+		func() float64 {
+			reportCompactRevMu.RLock()
+			defer reportCompactRevMu.RUnlock()
+			return reportCompactRev()
+		},
+	)
+	// overridden by mvcc initialization
+	reportCompactRevMu sync.RWMutex
+	reportCompactRev   = func() float64 { return 0 }
+func init() {
+	prometheus.MustRegister(rangeCounter)
+	prometheus.MustRegister(rangeCounterDebug)
+	prometheus.MustRegister(putCounter)
+	prometheus.MustRegister(putCounterDebug)
+	prometheus.MustRegister(deleteCounter)
+	prometheus.MustRegister(deleteCounterDebug)
+	prometheus.MustRegister(txnCounter)
+	prometheus.MustRegister(txnCounterDebug)
+	prometheus.MustRegister(keysGauge)
+	prometheus.MustRegister(watchStreamGauge)
+	prometheus.MustRegister(watcherGauge)
+	prometheus.MustRegister(slowWatcherGauge)
+	prometheus.MustRegister(totalEventsCounter)
+	prometheus.MustRegister(pendingEventsGauge)
+	prometheus.MustRegister(indexCompactionPauseMs)
+	prometheus.MustRegister(dbCompactionPauseMs)
+	prometheus.MustRegister(dbCompactionTotalMs)
+	prometheus.MustRegister(dbCompactionKeysCounter)
+	prometheus.MustRegister(dbTotalSize)
+	prometheus.MustRegister(dbTotalSizeDebug)
+	prometheus.MustRegister(dbTotalSizeInUse)
+	prometheus.MustRegister(dbOpenReadTxN)
+	prometheus.MustRegister(hashSec)
+	prometheus.MustRegister(hashRevSec)
+	prometheus.MustRegister(currentRev)
+	prometheus.MustRegister(compactRev)
+// ReportEventReceived reports that an event is received.
+// This function should be called when the external systems received an
+// event from mvcc.Watcher.
+func ReportEventReceived(n int) {
+	pendingEventsGauge.Sub(float64(n))
+	totalEventsCounter.Add(float64(n))
diff --git a/vendor/go.etcd.io/etcd/mvcc/metrics_txn.go b/vendor/go.etcd.io/etcd/mvcc/metrics_txn.go
new file mode 100644
index 0000000..64b629c
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/metrics_txn.go
@@ -0,0 +1,67 @@
+// Copyright 2017 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import "go.etcd.io/etcd/lease"
+type metricsTxnWrite struct {
+	TxnWrite
+	ranges  uint
+	puts    uint
+	deletes uint
+func newMetricsTxnRead(tr TxnRead) TxnRead {
+	return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0}
+func newMetricsTxnWrite(tw TxnWrite) TxnWrite {
+	return &metricsTxnWrite{tw, 0, 0, 0}
+func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) {
+	tw.ranges++
+	return tw.TxnWrite.Range(key, end, ro)
+func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {
+	tw.deletes++
+	return tw.TxnWrite.DeleteRange(key, end)
+func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+	tw.puts++
+	return tw.TxnWrite.Put(key, value, lease)
+func (tw *metricsTxnWrite) End() {
+	defer tw.TxnWrite.End()
+	if sum := tw.ranges + tw.puts + tw.deletes; sum > 1 {
+		txnCounter.Inc()
+		txnCounterDebug.Inc() // TODO: remove in 3.5 release
+	}
+	ranges := float64(tw.ranges)
+	rangeCounter.Add(ranges)
+	rangeCounterDebug.Add(ranges) // TODO: remove in 3.5 release
+	puts := float64(tw.puts)
+	putCounter.Add(puts)
+	putCounterDebug.Add(puts) // TODO: remove in 3.5 release
+	deletes := float64(tw.deletes)
+	deleteCounter.Add(deletes)
+	deleteCounterDebug.Add(deletes) // TODO: remove in 3.5 release
diff --git a/vendor/go.etcd.io/etcd/mvcc/revision.go b/vendor/go.etcd.io/etcd/mvcc/revision.go
new file mode 100644
index 0000000..d621386
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/revision.go
@@ -0,0 +1,67 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import "encoding/binary"
+// revBytesLen is the byte length of a normal revision.
+// First 8 bytes is the revision.main in big-endian format. The 9th byte
+// is a '_'. The last 8 bytes is the revision.sub in big-endian format.
+const revBytesLen = 8 + 1 + 8
+// A revision indicates modification of the key-value space.
+// The set of changes that share same main revision changes the key-value space atomically.
+type revision struct {
+	// main is the main revision of a set of changes that happen atomically.
+	main int64
+	// sub is the sub revision of a change in a set of changes that happen
+	// atomically. Each change has different increasing sub revision in that
+	// set.
+	sub int64
+func (a revision) GreaterThan(b revision) bool {
+	if a.main > b.main {
+		return true
+	}
+	if a.main < b.main {
+		return false
+	}
+	return a.sub > b.sub
+func newRevBytes() []byte {
+	return make([]byte, revBytesLen, markedRevBytesLen)
+func revToBytes(rev revision, bytes []byte) {
+	binary.BigEndian.PutUint64(bytes, uint64(rev.main))
+	bytes[8] = '_'
+	binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
+func bytesToRev(bytes []byte) revision {
+	return revision{
+		main: int64(binary.BigEndian.Uint64(bytes[0:8])),
+		sub:  int64(binary.BigEndian.Uint64(bytes[9:])),
+	}
+type revisions []revision
+func (a revisions) Len() int           { return len(a) }
+func (a revisions) Less(i, j int) bool { return a[j].GreaterThan(a[i]) }
+func (a revisions) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
diff --git a/vendor/go.etcd.io/etcd/mvcc/util.go b/vendor/go.etcd.io/etcd/mvcc/util.go
new file mode 100644
index 0000000..032621a
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/util.go
@@ -0,0 +1,57 @@
+// Copyright 2016 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"encoding/binary"
+	"fmt"
+	"go.etcd.io/etcd/mvcc/backend"
+	"go.etcd.io/etcd/mvcc/mvccpb"
+func UpdateConsistentIndex(be backend.Backend, index uint64) {
+	tx := be.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+	var oldi uint64
+	_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
+	if len(vs) != 0 {
+		oldi = binary.BigEndian.Uint64(vs[0])
+	}
+	if index <= oldi {
+		return
+	}
+	bs := make([]byte, 8)
+	binary.BigEndian.PutUint64(bs, index)
+	tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
+func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
+	ibytes := newRevBytes()
+	revToBytes(revision{main: kv.ModRevision}, ibytes)
+	d, err := kv.Marshal()
+	if err != nil {
+		panic(fmt.Errorf("cannot marshal event: %v", err))
+	}
+	be.BatchTx().Lock()
+	be.BatchTx().UnsafePut(keyBucketName, ibytes, d)
+	be.BatchTx().Unlock()
diff --git a/vendor/go.etcd.io/etcd/mvcc/watchable_store.go b/vendor/go.etcd.io/etcd/mvcc/watchable_store.go
new file mode 100644
index 0000000..3cf491d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/watchable_store.go
@@ -0,0 +1,552 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"sync"
+	"time"
+	"go.etcd.io/etcd/lease"
+	"go.etcd.io/etcd/mvcc/backend"
+	"go.etcd.io/etcd/mvcc/mvccpb"
+	"go.uber.org/zap"
+// non-const so modifiable by tests
+var (
+	// chanBufLen is the length of the buffered chan
+	// for sending out watched events.
+	// TODO: find a good buf value. 1024 is just a random one that
+	// seems to be reasonable.
+	chanBufLen = 1024
+	// maxWatchersPerSync is the number of watchers to sync in a single batch
+	maxWatchersPerSync = 512
+type watchable interface {
+	watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
+	progress(w *watcher)
+	rev() int64
+type watchableStore struct {
+	*store
+	// mu protects watcher groups and batches. It should never be locked
+	// before locking store.mu to avoid deadlock.
+	mu sync.RWMutex
+	// victims are watcher batches that were blocked on the watch channel
+	victims []watcherBatch
+	victimc chan struct{}
+	// contains all unsynced watchers that needs to sync with events that have happened
+	unsynced watcherGroup
+	// contains all synced watchers that are in sync with the progress of the store.
+	// The key of the map is the key that the watcher watches on.
+	synced watcherGroup
+	stopc chan struct{}
+	wg    sync.WaitGroup
+// cancelFunc updates unsynced and synced maps when running
+// cancel operations.
+type cancelFunc func()
+func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
+	return newWatchableStore(lg, b, le, ig, cfg)
+func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
+	s := &watchableStore{
+		store:    NewStore(lg, b, le, ig, cfg),
+		victimc:  make(chan struct{}, 1),
+		unsynced: newWatcherGroup(),
+		synced:   newWatcherGroup(),
+		stopc:    make(chan struct{}),
+	}
+	s.store.ReadView = &readView{s}
+	s.store.WriteView = &writeView{s}
+	if s.le != nil {
+		// use this store as the deleter so revokes trigger watch events
+		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
+	}
+	s.wg.Add(2)
+	go s.syncWatchersLoop()
+	go s.syncVictimsLoop()
+	return s
+func (s *watchableStore) Close() error {
+	close(s.stopc)
+	s.wg.Wait()
+	return s.store.Close()
+func (s *watchableStore) NewWatchStream() WatchStream {
+	watchStreamGauge.Inc()
+	return &watchStream{
+		watchable: s,
+		ch:        make(chan WatchResponse, chanBufLen),
+		cancels:   make(map[WatchID]cancelFunc),
+		watchers:  make(map[WatchID]*watcher),
+	}
+func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
+	wa := &watcher{
+		key:    key,
+		end:    end,
+		minRev: startRev,
+		id:     id,
+		ch:     ch,
+		fcs:    fcs,
+	}
+	s.mu.Lock()
+	s.revMu.RLock()
+	synced := startRev > s.store.currentRev || startRev == 0
+	if synced {
+		wa.minRev = s.store.currentRev + 1
+		if startRev > wa.minRev {
+			wa.minRev = startRev
+		}
+	}
+	if synced {
+		s.synced.add(wa)
+	} else {
+		slowWatcherGauge.Inc()
+		s.unsynced.add(wa)
+	}
+	s.revMu.RUnlock()
+	s.mu.Unlock()
+	watcherGauge.Inc()
+	return wa, func() { s.cancelWatcher(wa) }
+// cancelWatcher removes references of the watcher from the watchableStore
+func (s *watchableStore) cancelWatcher(wa *watcher) {
+	for {
+		s.mu.Lock()
+		if s.unsynced.delete(wa) {
+			slowWatcherGauge.Dec()
+			break
+		} else if s.synced.delete(wa) {
+			break
+		} else if wa.compacted {
+			break
+		} else if wa.ch == nil {
+			// already canceled (e.g., cancel/close race)
+			break
+		}
+		if !wa.victim {
+			panic("watcher not victim but not in watch groups")
+		}
+		var victimBatch watcherBatch
+		for _, wb := range s.victims {
+			if wb[wa] != nil {
+				victimBatch = wb
+				break
+			}
+		}
+		if victimBatch != nil {
+			slowWatcherGauge.Dec()
+			delete(victimBatch, wa)
+			break
+		}
+		// victim being processed so not accessible; retry
+		s.mu.Unlock()
+		time.Sleep(time.Millisecond)
+	}
+	watcherGauge.Dec()
+	wa.ch = nil
+	s.mu.Unlock()
+func (s *watchableStore) Restore(b backend.Backend) error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	err := s.store.Restore(b)
+	if err != nil {
+		return err
+	}
+	for wa := range s.synced.watchers {
+		wa.restore = true
+		s.unsynced.add(wa)
+	}
+	s.synced = newWatcherGroup()
+	return nil
+// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
+func (s *watchableStore) syncWatchersLoop() {
+	defer s.wg.Done()
+	for {
+		s.mu.RLock()
+		st := time.Now()
+		lastUnsyncedWatchers := s.unsynced.size()
+		s.mu.RUnlock()
+		unsyncedWatchers := 0
+		if lastUnsyncedWatchers > 0 {
+			unsyncedWatchers = s.syncWatchers()
+		}
+		syncDuration := time.Since(st)
+		waitDuration := 100 * time.Millisecond
+		// more work pending?
+		if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
+			// be fair to other store operations by yielding time taken
+			waitDuration = syncDuration
+		}
+		select {
+		case <-time.After(waitDuration):
+		case <-s.stopc:
+			return
+		}
+	}
+// syncVictimsLoop tries to write precomputed watcher responses to
+// watchers that had a blocked watcher channel
+func (s *watchableStore) syncVictimsLoop() {
+	defer s.wg.Done()
+	for {
+		for s.moveVictims() != 0 {
+			// try to update all victim watchers
+		}
+		s.mu.RLock()
+		isEmpty := len(s.victims) == 0
+		s.mu.RUnlock()
+		var tickc <-chan time.Time
+		if !isEmpty {
+			tickc = time.After(10 * time.Millisecond)
+		}
+		select {
+		case <-tickc:
+		case <-s.victimc:
+		case <-s.stopc:
+			return
+		}
+	}
+// moveVictims tries to update watches with already pending event data
+func (s *watchableStore) moveVictims() (moved int) {
+	s.mu.Lock()
+	victims := s.victims
+	s.victims = nil
+	s.mu.Unlock()
+	var newVictim watcherBatch
+	for _, wb := range victims {
+		// try to send responses again
+		for w, eb := range wb {
+			// watcher has observed the store up to, but not including, w.minRev
+			rev := w.minRev - 1
+			if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
+				pendingEventsGauge.Add(float64(len(eb.evs)))
+			} else {
+				if newVictim == nil {
+					newVictim = make(watcherBatch)
+				}
+				newVictim[w] = eb
+				continue
+			}
+			moved++
+		}
+		// assign completed victim watchers to unsync/sync
+		s.mu.Lock()
+		s.store.revMu.RLock()
+		curRev := s.store.currentRev
+		for w, eb := range wb {
+			if newVictim != nil && newVictim[w] != nil {
+				// couldn't send watch response; stays victim
+				continue
+			}
+			w.victim = false
+			if eb.moreRev != 0 {
+				w.minRev = eb.moreRev
+			}
+			if w.minRev <= curRev {
+				s.unsynced.add(w)
+			} else {
+				slowWatcherGauge.Dec()
+				s.synced.add(w)
+			}
+		}
+		s.store.revMu.RUnlock()
+		s.mu.Unlock()
+	}
+	if len(newVictim) > 0 {
+		s.mu.Lock()
+		s.victims = append(s.victims, newVictim)
+		s.mu.Unlock()
+	}
+	return moved
+// syncWatchers syncs unsynced watchers by:
+//	1. choose a set of watchers from the unsynced watcher group
+//	2. iterate over the set to get the minimum revision and remove compacted watchers
+//	3. use minimum revision to get all key-value pairs and send those events to watchers
+//	4. remove synced watchers in set from unsynced group and move to synced group
+func (s *watchableStore) syncWatchers() int {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.unsynced.size() == 0 {
+		return 0
+	}
+	s.store.revMu.RLock()
+	defer s.store.revMu.RUnlock()
+	// in order to find key-value pairs from unsynced watchers, we need to
+	// find min revision index, and these revisions can be used to
+	// query the backend store of key-value pairs
+	curRev := s.store.currentRev
+	compactionRev := s.store.compactMainRev
+	wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
+	minBytes, maxBytes := newRevBytes(), newRevBytes()
+	revToBytes(revision{main: minRev}, minBytes)
+	revToBytes(revision{main: curRev + 1}, maxBytes)
+	// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
+	// values are actual key-value pairs in backend.
+	tx := s.store.b.ReadTx()
+	tx.RLock()
+	revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
+	var evs []mvccpb.Event
+	if s.store != nil && s.store.lg != nil {
+		evs = kvsToEvents(s.store.lg, wg, revs, vs)
+	} else {
+		// TODO: remove this in v3.5
+		evs = kvsToEvents(nil, wg, revs, vs)
+	}
+	tx.RUnlock()
+	var victims watcherBatch
+	wb := newWatcherBatch(wg, evs)
+	for w := range wg.watchers {
+		w.minRev = curRev + 1
+		eb, ok := wb[w]
+		if !ok {
+			// bring un-notified watcher to synced
+			s.synced.add(w)
+			s.unsynced.delete(w)
+			continue
+		}
+		if eb.moreRev != 0 {
+			w.minRev = eb.moreRev
+		}
+		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
+			pendingEventsGauge.Add(float64(len(eb.evs)))
+		} else {
+			if victims == nil {
+				victims = make(watcherBatch)
+			}
+			w.victim = true
+		}
+		if w.victim {
+			victims[w] = eb
+		} else {
+			if eb.moreRev != 0 {
+				// stay unsynced; more to read
+				continue
+			}
+			s.synced.add(w)
+		}
+		s.unsynced.delete(w)
+	}
+	s.addVictim(victims)
+	vsz := 0
+	for _, v := range s.victims {
+		vsz += len(v)
+	}
+	slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
+	return s.unsynced.size()
+// kvsToEvents gets all events for the watchers from all key-value pairs
+func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
+	for i, v := range vals {
+		var kv mvccpb.KeyValue
+		if err := kv.Unmarshal(v); err != nil {
+			if lg != nil {
+				lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
+			} else {
+				plog.Panicf("cannot unmarshal event: %v", err)
+			}
+		}
+		if !wg.contains(string(kv.Key)) {
+			continue
+		}
+		ty := mvccpb.PUT
+		if isTombstone(revs[i]) {
+			ty = mvccpb.DELETE
+			// patch in mod revision so watchers won't skip
+			kv.ModRevision = bytesToRev(revs[i]).main
+		}
+		evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
+	}
+	return evs
+// notify notifies the fact that given event at the given rev just happened to
+// watchers that watch on the key of the event.
+func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
+	var victim watcherBatch
+	for w, eb := range newWatcherBatch(&s.synced, evs) {
+		if eb.revs != 1 {
+			if s.store != nil && s.store.lg != nil {
+				s.store.lg.Panic(
+					"unexpected multiple revisions in watch notification",
+					zap.Int("number-of-revisions", eb.revs),
+				)
+			} else {
+				plog.Panicf("unexpected multiple revisions in notification")
+			}
+		}
+		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
+			pendingEventsGauge.Add(float64(len(eb.evs)))
+		} else {
+			// move slow watcher to victims
+			w.minRev = rev + 1
+			if victim == nil {
+				victim = make(watcherBatch)
+			}
+			w.victim = true
+			victim[w] = eb
+			s.synced.delete(w)
+			slowWatcherGauge.Inc()
+		}
+	}
+	s.addVictim(victim)
+func (s *watchableStore) addVictim(victim watcherBatch) {
+	if victim == nil {
+		return
+	}
+	s.victims = append(s.victims, victim)
+	select {
+	case s.victimc <- struct{}{}:
+	default:
+	}
+func (s *watchableStore) rev() int64 { return s.store.Rev() }
+func (s *watchableStore) progress(w *watcher) {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+	if _, ok := s.synced.watchers[w]; ok {
+		w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
+		// If the ch is full, this watcher is receiving events.
+		// We do not need to send progress at all.
+	}
+type watcher struct {
+	// the watcher key
+	key []byte
+	// end indicates the end of the range to watch.
+	// If end is set, the watcher is on a range.
+	end []byte
+	// victim is set when ch is blocked and undergoing victim processing
+	victim bool
+	// compacted is set when the watcher is removed because of compaction
+	compacted bool
+	// restore is true when the watcher is being restored from leader snapshot
+	// which means that this watcher has just been moved from "synced" to "unsynced"
+	// watcher group, possibly with a future revision when it was first added
+	// to the synced watcher
+	// "unsynced" watcher revision must always be <= current revision,
+	// except when the watcher were to be moved from "synced" watcher group
+	restore bool
+	// minRev is the minimum revision update the watcher will accept
+	minRev int64
+	id     WatchID
+	fcs []FilterFunc
+	// a chan to send out the watch response.
+	// The chan might be shared with other watchers.
+	ch chan<- WatchResponse
+func (w *watcher) send(wr WatchResponse) bool {
+	progressEvent := len(wr.Events) == 0
+	if len(w.fcs) != 0 {
+		ne := make([]mvccpb.Event, 0, len(wr.Events))
+		for i := range wr.Events {
+			filtered := false
+			for _, filter := range w.fcs {
+				if filter(wr.Events[i]) {
+					filtered = true
+					break
+				}
+			}
+			if !filtered {
+				ne = append(ne, wr.Events[i])
+			}
+		}
+		wr.Events = ne
+	}
+	// if all events are filtered out, we should send nothing.
+	if !progressEvent && len(wr.Events) == 0 {
+		return true
+	}
+	select {
+	case w.ch <- wr:
+		return true
+	default:
+		return false
+	}
diff --git a/vendor/go.etcd.io/etcd/mvcc/watchable_store_txn.go b/vendor/go.etcd.io/etcd/mvcc/watchable_store_txn.go
new file mode 100644
index 0000000..3bcfa4d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/watchable_store_txn.go
@@ -0,0 +1,51 @@
+// Copyright 2017 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import "go.etcd.io/etcd/mvcc/mvccpb"
+func (tw *watchableStoreTxnWrite) End() {
+	changes := tw.Changes()
+	if len(changes) == 0 {
+		tw.TxnWrite.End()
+		return
+	}
+	rev := tw.Rev() + 1
+	evs := make([]mvccpb.Event, len(changes))
+	for i, change := range changes {
+		evs[i].Kv = &changes[i]
+		if change.CreateRevision == 0 {
+			evs[i].Type = mvccpb.DELETE
+			evs[i].Kv.ModRevision = rev
+		} else {
+			evs[i].Type = mvccpb.PUT
+		}
+	}
+	// end write txn under watchable store lock so the updates are visible
+	// when asynchronous event posting checks the current store revision
+	tw.s.mu.Lock()
+	tw.s.notify(rev, evs)
+	tw.TxnWrite.End()
+	tw.s.mu.Unlock()
+type watchableStoreTxnWrite struct {
+	TxnWrite
+	s *watchableStore
+func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} }
diff --git a/vendor/go.etcd.io/etcd/mvcc/watcher.go b/vendor/go.etcd.io/etcd/mvcc/watcher.go
new file mode 100644
index 0000000..2846d62
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/watcher.go
@@ -0,0 +1,193 @@
+// Copyright 2015 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"bytes"
+	"errors"
+	"sync"
+	"go.etcd.io/etcd/mvcc/mvccpb"
+// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
+// user-provided ID is available. If pass, an ID will automatically be assigned.
+const AutoWatchID WatchID = 0
+var (
+	ErrWatcherNotExist    = errors.New("mvcc: watcher does not exist")
+	ErrEmptyWatcherRange  = errors.New("mvcc: watcher range is empty")
+	ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream")
+type WatchID int64
+// FilterFunc returns true if the given event should be filtered out.
+type FilterFunc func(e mvccpb.Event) bool
+type WatchStream interface {
+	// Watch creates a watcher. The watcher watches the events happening or
+	// happened on the given key or range [key, end) from the given startRev.
+	//
+	// The whole event history can be watched unless compacted.
+	// If "startRev" <=0, watch observes events after currentRev.
+	//
+	// The returned "id" is the ID of this watcher. It appears as WatchID
+	// in events that are sent to the created watcher through stream channel.
+	// The watch ID is used when it's not equal to AutoWatchID. Otherwise,
+	// an auto-generated watch ID is returned.
+	Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)
+	// Chan returns a chan. All watch response will be sent to the returned chan.
+	Chan() <-chan WatchResponse
+	// RequestProgress requests the progress of the watcher with given ID. The response
+	// will only be sent if the watcher is currently synced.
+	// The responses will be sent through the WatchRespone Chan attached
+	// with this stream to ensure correct ordering.
+	// The responses contains no events. The revision in the response is the progress
+	// of the watchers since the watcher is currently synced.
+	RequestProgress(id WatchID)
+	// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
+	// returned.
+	Cancel(id WatchID) error
+	// Close closes Chan and release all related resources.
+	Close()
+	// Rev returns the current revision of the KV the stream watches on.
+	Rev() int64
+type WatchResponse struct {
+	// WatchID is the WatchID of the watcher this response sent to.
+	WatchID WatchID
+	// Events contains all the events that needs to send.
+	Events []mvccpb.Event
+	// Revision is the revision of the KV when the watchResponse is created.
+	// For a normal response, the revision should be the same as the last
+	// modified revision inside Events. For a delayed response to a unsynced
+	// watcher, the revision is greater than the last modified revision
+	// inside Events.
+	Revision int64
+	// CompactRevision is set when the watcher is cancelled due to compaction.
+	CompactRevision int64
+// watchStream contains a collection of watchers that share
+// one streaming chan to send out watched events and other control events.
+type watchStream struct {
+	watchable watchable
+	ch        chan WatchResponse
+	mu sync.Mutex // guards fields below it
+	// nextID is the ID pre-allocated for next new watcher in this stream
+	nextID   WatchID
+	closed   bool
+	cancels  map[WatchID]cancelFunc
+	watchers map[WatchID]*watcher
+// Watch creates a new watcher in the stream and returns its WatchID.
+func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
+	// prevent wrong range where key >= end lexicographically
+	// watch request with 'WithFromKey' has empty-byte range end
+	if len(end) != 0 && bytes.Compare(key, end) != -1 {
+		return -1, ErrEmptyWatcherRange
+	}
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	if ws.closed {
+		return -1, ErrEmptyWatcherRange
+	}
+	if id == AutoWatchID {
+		for ws.watchers[ws.nextID] != nil {
+			ws.nextID++
+		}
+		id = ws.nextID
+		ws.nextID++
+	} else if _, ok := ws.watchers[id]; ok {
+		return -1, ErrWatcherDuplicateID
+	}
+	w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
+	ws.cancels[id] = c
+	ws.watchers[id] = w
+	return id, nil
+func (ws *watchStream) Chan() <-chan WatchResponse {
+	return ws.ch
+func (ws *watchStream) Cancel(id WatchID) error {
+	ws.mu.Lock()
+	cancel, ok := ws.cancels[id]
+	w := ws.watchers[id]
+	ok = ok && !ws.closed
+	ws.mu.Unlock()
+	if !ok {
+		return ErrWatcherNotExist
+	}
+	cancel()
+	ws.mu.Lock()
+	// The watch isn't removed until cancel so that if Close() is called,
+	// it will wait for the cancel. Otherwise, Close() could close the
+	// watch channel while the store is still posting events.
+	if ww := ws.watchers[id]; ww == w {
+		delete(ws.cancels, id)
+		delete(ws.watchers, id)
+	}
+	ws.mu.Unlock()
+	return nil
+func (ws *watchStream) Close() {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	for _, cancel := range ws.cancels {
+		cancel()
+	}
+	ws.closed = true
+	close(ws.ch)
+	watchStreamGauge.Dec()
+func (ws *watchStream) Rev() int64 {
+	ws.mu.Lock()
+	defer ws.mu.Unlock()
+	return ws.watchable.rev()
+func (ws *watchStream) RequestProgress(id WatchID) {
+	ws.mu.Lock()
+	w, ok := ws.watchers[id]
+	ws.mu.Unlock()
+	if !ok {
+		return
+	}
+	ws.watchable.progress(w)
diff --git a/vendor/go.etcd.io/etcd/mvcc/watcher_group.go b/vendor/go.etcd.io/etcd/mvcc/watcher_group.go
new file mode 100644
index 0000000..151f0de
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/watcher_group.go
@@ -0,0 +1,293 @@
+// Copyright 2016 The etcd Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//     http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package mvcc
+import (
+	"fmt"
+	"math"
+	"go.etcd.io/etcd/mvcc/mvccpb"
+	"go.etcd.io/etcd/pkg/adt"
+var (
+	// watchBatchMaxRevs is the maximum distinct revisions that
+	// may be sent to an unsynced watcher at a time. Declared as
+	// var instead of const for testing purposes.
+	watchBatchMaxRevs = 1000
+type eventBatch struct {
+	// evs is a batch of revision-ordered events
+	evs []mvccpb.Event
+	// revs is the minimum unique revisions observed for this batch
+	revs int
+	// moreRev is first revision with more events following this batch
+	moreRev int64
+func (eb *eventBatch) add(ev mvccpb.Event) {
+	if eb.revs > watchBatchMaxRevs {
+		// maxed out batch size
+		return
+	}
+	if len(eb.evs) == 0 {
+		// base case
+		eb.revs = 1
+		eb.evs = append(eb.evs, ev)
+		return
+	}
+	// revision accounting
+	ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
+	evRev := ev.Kv.ModRevision
+	if evRev > ebRev {
+		eb.revs++
+		if eb.revs > watchBatchMaxRevs {
+			eb.moreRev = evRev
+			return
+		}
+	}
+	eb.evs = append(eb.evs, ev)
+type watcherBatch map[*watcher]*eventBatch
+func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
+	eb := wb[w]
+	if eb == nil {
+		eb = &eventBatch{}
+		wb[w] = eb
+	}
+	eb.add(ev)
+// newWatcherBatch maps watchers to their matched events. It enables quick
+// events look up by watcher.
+func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
+	if len(wg.watchers) == 0 {
+		return nil
+	}
+	wb := make(watcherBatch)
+	for _, ev := range evs {
+		for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
+			if ev.Kv.ModRevision >= w.minRev {
+				// don't double notify
+				wb.add(w, ev)
+			}
+		}
+	}
+	return wb
+type watcherSet map[*watcher]struct{}
+func (w watcherSet) add(wa *watcher) {
+	if _, ok := w[wa]; ok {
+		panic("add watcher twice!")
+	}
+	w[wa] = struct{}{}
+func (w watcherSet) union(ws watcherSet) {
+	for wa := range ws {
+		w.add(wa)
+	}
+func (w watcherSet) delete(wa *watcher) {
+	if _, ok := w[wa]; !ok {
+		panic("removing missing watcher!")
+	}
+	delete(w, wa)
+type watcherSetByKey map[string]watcherSet
+func (w watcherSetByKey) add(wa *watcher) {
+	set := w[string(wa.key)]
+	if set == nil {
+		set = make(watcherSet)
+		w[string(wa.key)] = set
+	}
+	set.add(wa)
+func (w watcherSetByKey) delete(wa *watcher) bool {
+	k := string(wa.key)
+	if v, ok := w[k]; ok {
+		if _, ok := v[wa]; ok {
+			delete(v, wa)
+			if len(v) == 0 {
+				// remove the set; nothing left
+				delete(w, k)
+			}
+			return true
+		}
+	}
+	return false
+// watcherGroup is a collection of watchers organized by their ranges
+type watcherGroup struct {
+	// keyWatchers has the watchers that watch on a single key
+	keyWatchers watcherSetByKey
+	// ranges has the watchers that watch a range; it is sorted by interval
+	ranges adt.IntervalTree
+	// watchers is the set of all watchers
+	watchers watcherSet
+func newWatcherGroup() watcherGroup {
+	return watcherGroup{
+		keyWatchers: make(watcherSetByKey),
+		ranges:      adt.NewIntervalTree(),
+		watchers:    make(watcherSet),
+	}
+// add puts a watcher in the group.
+func (wg *watcherGroup) add(wa *watcher) {
+	wg.watchers.add(wa)
+	if wa.end == nil {
+		wg.keyWatchers.add(wa)
+		return
+	}
+	// interval already registered?
+	ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
+	if iv := wg.ranges.Find(ivl); iv != nil {
+		iv.Val.(watcherSet).add(wa)
+		return
+	}
+	// not registered, put in interval tree
+	ws := make(watcherSet)
+	ws.add(wa)
+	wg.ranges.Insert(ivl, ws)
+// contains is whether the given key has a watcher in the group.
+func (wg *watcherGroup) contains(key string) bool {
+	_, ok := wg.keyWatchers[key]
+	return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
+// size gives the number of unique watchers in the group.
+func (wg *watcherGroup) size() int { return len(wg.watchers) }
+// delete removes a watcher from the group.
+func (wg *watcherGroup) delete(wa *watcher) bool {
+	if _, ok := wg.watchers[wa]; !ok {
+		return false
+	}
+	wg.watchers.delete(wa)
+	if wa.end == nil {
+		wg.keyWatchers.delete(wa)
+		return true
+	}
+	ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
+	iv := wg.ranges.Find(ivl)
+	if iv == nil {
+		return false
+	}
+	ws := iv.Val.(watcherSet)
+	delete(ws, wa)
+	if len(ws) == 0 {
+		// remove interval missing watchers
+		if ok := wg.ranges.Delete(ivl); !ok {
+			panic("could not remove watcher from interval tree")
+		}
+	}
+	return true
+// choose selects watchers from the watcher group to update
+func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
+	if len(wg.watchers) < maxWatchers {
+		return wg, wg.chooseAll(curRev, compactRev)
+	}
+	ret := newWatcherGroup()
+	for w := range wg.watchers {
+		if maxWatchers <= 0 {
+			break
+		}
+		maxWatchers--
+		ret.add(w)
+	}
+	return &ret, ret.chooseAll(curRev, compactRev)
+func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
+	minRev := int64(math.MaxInt64)
+	for w := range wg.watchers {
+		if w.minRev > curRev {
+			// after network partition, possibly choosing future revision watcher from restore operation
+			// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
+			// do not panic when such watcher had been moved from "synced" watcher during restore operation
+			if !w.restore {
+				panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
+			}
+			// mark 'restore' done, since it's chosen
+			w.restore = false
+		}
+		if w.minRev < compactRev {
+			select {
+			case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
+				w.compacted = true
+				wg.delete(w)
+			default:
+				// retry next time
+			}
+			continue
+		}
+		if minRev > w.minRev {
+			minRev = w.minRev
+		}
+	}
+	return minRev
+// watcherSetByKey gets the set of watchers that receive events on the given key.
+func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
+	wkeys := wg.keyWatchers[key]
+	wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
+	// zero-copy cases
+	switch {
+	case len(wranges) == 0:
+		// no need to merge ranges or copy; reuse single-key set
+		return wkeys
+	case len(wranges) == 0 && len(wkeys) == 0:
+		return nil
+	case len(wranges) == 1 && len(wkeys) == 0:
+		return wranges[0].Val.(watcherSet)
+	}
+	// copy case
+	ret := make(watcherSet)
+	ret.union(wg.keyWatchers[key])
+	for _, item := range wranges {
+		ret.union(item.Val.(watcherSet))
+	}
+	return ret