[VOL-2193] Create mocks for Kafka Client and Etcd
This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.
Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/backend.go b/vendor/go.etcd.io/etcd/mvcc/backend/backend.go
new file mode 100644
index 0000000..bffd749
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/backend.go
@@ -0,0 +1,570 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+ "fmt"
+ "hash/crc32"
+ "io"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/coreos/pkg/capnslog"
+ humanize "github.com/dustin/go-humanize"
+ bolt "go.etcd.io/bbolt"
+ "go.uber.org/zap"
+)
+
+var (
+ defaultBatchLimit = 10000
+ defaultBatchInterval = 100 * time.Millisecond
+
+ defragLimit = 10000
+
+ // initialMmapSize is the initial size of the mmapped region. Setting this larger than
+ // the potential max db size can prevent writer from blocking reader.
+ // This only works for linux.
+ initialMmapSize = uint64(10 * 1024 * 1024 * 1024)
+
+ plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc/backend")
+
+ // minSnapshotWarningTimeout is the minimum threshold to trigger a long running snapshot warning.
+ minSnapshotWarningTimeout = 30 * time.Second
+)
+
+type Backend interface {
+ // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
+ ReadTx() ReadTx
+ BatchTx() BatchTx
+ // ConcurrentReadTx returns a non-blocking read transaction.
+ ConcurrentReadTx() ReadTx
+
+ Snapshot() Snapshot
+ Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
+ // Size returns the current size of the backend physically allocated.
+ // The backend can hold DB space that is not utilized at the moment,
+ // since it can conduct pre-allocation or spare unused space for recycling.
+ // Use SizeInUse() instead for the actual DB size.
+ Size() int64
+ // SizeInUse returns the current size of the backend logically in use.
+ // Since the backend can manage free space in a non-byte unit such as
+ // number of pages, the returned value can be not exactly accurate in bytes.
+ SizeInUse() int64
+ // OpenReadTxN returns the number of currently open read transactions in the backend.
+ OpenReadTxN() int64
+ Defrag() error
+ ForceCommit()
+ Close() error
+}
+
+type Snapshot interface {
+ // Size gets the size of the snapshot.
+ Size() int64
+ // WriteTo writes the snapshot into the given writer.
+ WriteTo(w io.Writer) (n int64, err error)
+ // Close closes the snapshot.
+ Close() error
+}
+
+type backend struct {
+ // size and commits are used with atomic operations so they must be
+ // 64-bit aligned, otherwise 32-bit tests will crash
+
+ // size is the number of bytes allocated in the backend
+ size int64
+ // sizeInUse is the number of bytes actually used in the backend
+ sizeInUse int64
+ // commits counts number of commits since start
+ commits int64
+ // openReadTxN is the number of currently open read transactions in the backend
+ openReadTxN int64
+
+ mu sync.RWMutex
+ db *bolt.DB
+
+ batchInterval time.Duration
+ batchLimit int
+ batchTx *batchTxBuffered
+
+ readTx *readTx
+
+ stopc chan struct{}
+ donec chan struct{}
+
+ lg *zap.Logger
+}
+
+type BackendConfig struct {
+ // Path is the file path to the backend file.
+ Path string
+ // BatchInterval is the maximum time before flushing the BatchTx.
+ BatchInterval time.Duration
+ // BatchLimit is the maximum puts before flushing the BatchTx.
+ BatchLimit int
+ // BackendFreelistType is the backend boltdb's freelist type.
+ BackendFreelistType bolt.FreelistType
+ // MmapSize is the number of bytes to mmap for the backend.
+ MmapSize uint64
+ // Logger logs backend-side operations.
+ Logger *zap.Logger
+}
+
+func DefaultBackendConfig() BackendConfig {
+ return BackendConfig{
+ BatchInterval: defaultBatchInterval,
+ BatchLimit: defaultBatchLimit,
+ MmapSize: initialMmapSize,
+ }
+}
+
+func New(bcfg BackendConfig) Backend {
+ return newBackend(bcfg)
+}
+
+func NewDefaultBackend(path string) Backend {
+ bcfg := DefaultBackendConfig()
+ bcfg.Path = path
+ return newBackend(bcfg)
+}
+
+func newBackend(bcfg BackendConfig) *backend {
+ bopts := &bolt.Options{}
+ if boltOpenOptions != nil {
+ *bopts = *boltOpenOptions
+ }
+ bopts.InitialMmapSize = bcfg.mmapSize()
+ bopts.FreelistType = bcfg.BackendFreelistType
+
+ db, err := bolt.Open(bcfg.Path, 0600, bopts)
+ if err != nil {
+ if bcfg.Logger != nil {
+ bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
+ } else {
+ plog.Panicf("cannot open database at %s (%v)", bcfg.Path, err)
+ }
+ }
+
+ // In future, may want to make buffering optional for low-concurrency systems
+ // or dynamically swap between buffered/non-buffered depending on workload.
+ b := &backend{
+ db: db,
+
+ batchInterval: bcfg.BatchInterval,
+ batchLimit: bcfg.BatchLimit,
+
+ readTx: &readTx{
+ buf: txReadBuffer{
+ txBuffer: txBuffer{make(map[string]*bucketBuffer)},
+ },
+ buckets: make(map[string]*bolt.Bucket),
+ txWg: new(sync.WaitGroup),
+ },
+
+ stopc: make(chan struct{}),
+ donec: make(chan struct{}),
+
+ lg: bcfg.Logger,
+ }
+ b.batchTx = newBatchTxBuffered(b)
+ go b.run()
+ return b
+}
+
+// BatchTx returns the current batch tx in coalescer. The tx can be used for read and
+// write operations. The write result can be retrieved within the same tx immediately.
+// The write result is isolated with other txs until the current one get committed.
+func (b *backend) BatchTx() BatchTx {
+ return b.batchTx
+}
+
+func (b *backend) ReadTx() ReadTx { return b.readTx }
+
+// ConcurrentReadTx creates and returns a new ReadTx, which:
+// A) creates and keeps a copy of backend.readTx.txReadBuffer,
+// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
+func (b *backend) ConcurrentReadTx() ReadTx {
+ b.readTx.RLock()
+ defer b.readTx.RUnlock()
+ // prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
+ b.readTx.txWg.Add(1)
+ // TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
+ return &concurrentReadTx{
+ buf: b.readTx.buf.unsafeCopy(),
+ tx: b.readTx.tx,
+ txMu: &b.readTx.txMu,
+ buckets: b.readTx.buckets,
+ txWg: b.readTx.txWg,
+ }
+}
+
+// ForceCommit forces the current batching tx to commit.
+func (b *backend) ForceCommit() {
+ b.batchTx.Commit()
+}
+
+func (b *backend) Snapshot() Snapshot {
+ b.batchTx.Commit()
+
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+ tx, err := b.db.Begin(false)
+ if err != nil {
+ if b.lg != nil {
+ b.lg.Fatal("failed to begin tx", zap.Error(err))
+ } else {
+ plog.Fatalf("cannot begin tx (%s)", err)
+ }
+ }
+
+ stopc, donec := make(chan struct{}), make(chan struct{})
+ dbBytes := tx.Size()
+ go func() {
+ defer close(donec)
+ // sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
+ // assuming a min tcp throughput of 100MB/s.
+ var sendRateBytes int64 = 100 * 1024 * 1014
+ warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
+ if warningTimeout < minSnapshotWarningTimeout {
+ warningTimeout = minSnapshotWarningTimeout
+ }
+ start := time.Now()
+ ticker := time.NewTicker(warningTimeout)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ if b.lg != nil {
+ b.lg.Warn(
+ "snapshotting taking too long to transfer",
+ zap.Duration("taking", time.Since(start)),
+ zap.Int64("bytes", dbBytes),
+ zap.String("size", humanize.Bytes(uint64(dbBytes))),
+ )
+ } else {
+ plog.Warningf("snapshotting is taking more than %v seconds to finish transferring %v MB [started at %v]", time.Since(start).Seconds(), float64(dbBytes)/float64(1024*1014), start)
+ }
+
+ case <-stopc:
+ snapshotTransferSec.Observe(time.Since(start).Seconds())
+ return
+ }
+ }
+ }()
+
+ return &snapshot{tx, stopc, donec}
+}
+
+type IgnoreKey struct {
+ Bucket string
+ Key string
+}
+
+func (b *backend) Hash(ignores map[IgnoreKey]struct{}) (uint32, error) {
+ h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
+
+ b.mu.RLock()
+ defer b.mu.RUnlock()
+ err := b.db.View(func(tx *bolt.Tx) error {
+ c := tx.Cursor()
+ for next, _ := c.First(); next != nil; next, _ = c.Next() {
+ b := tx.Bucket(next)
+ if b == nil {
+ return fmt.Errorf("cannot get hash of bucket %s", string(next))
+ }
+ h.Write(next)
+ b.ForEach(func(k, v []byte) error {
+ bk := IgnoreKey{Bucket: string(next), Key: string(k)}
+ if _, ok := ignores[bk]; !ok {
+ h.Write(k)
+ h.Write(v)
+ }
+ return nil
+ })
+ }
+ return nil
+ })
+
+ if err != nil {
+ return 0, err
+ }
+
+ return h.Sum32(), nil
+}
+
+func (b *backend) Size() int64 {
+ return atomic.LoadInt64(&b.size)
+}
+
+func (b *backend) SizeInUse() int64 {
+ return atomic.LoadInt64(&b.sizeInUse)
+}
+
+func (b *backend) run() {
+ defer close(b.donec)
+ t := time.NewTimer(b.batchInterval)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ case <-b.stopc:
+ b.batchTx.CommitAndStop()
+ return
+ }
+ if b.batchTx.safePending() != 0 {
+ b.batchTx.Commit()
+ }
+ t.Reset(b.batchInterval)
+ }
+}
+
+func (b *backend) Close() error {
+ close(b.stopc)
+ <-b.donec
+ return b.db.Close()
+}
+
+// Commits returns total number of commits since start
+func (b *backend) Commits() int64 {
+ return atomic.LoadInt64(&b.commits)
+}
+
+func (b *backend) Defrag() error {
+ return b.defrag()
+}
+
+func (b *backend) defrag() error {
+ now := time.Now()
+
+ // TODO: make this non-blocking?
+ // lock batchTx to ensure nobody is using previous tx, and then
+ // close previous ongoing tx.
+ b.batchTx.Lock()
+ defer b.batchTx.Unlock()
+
+ // lock database after lock tx to avoid deadlock.
+ b.mu.Lock()
+ defer b.mu.Unlock()
+
+ // block concurrent read requests while resetting tx
+ b.readTx.Lock()
+ defer b.readTx.Unlock()
+
+ b.batchTx.unsafeCommit(true)
+
+ b.batchTx.tx = nil
+
+ tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions)
+ if err != nil {
+ return err
+ }
+
+ dbp := b.db.Path()
+ tdbp := tmpdb.Path()
+ size1, sizeInUse1 := b.Size(), b.SizeInUse()
+ if b.lg != nil {
+ b.lg.Info(
+ "defragmenting",
+ zap.String("path", dbp),
+ zap.Int64("current-db-size-bytes", size1),
+ zap.String("current-db-size", humanize.Bytes(uint64(size1))),
+ zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
+ zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
+ )
+ }
+
+ err = defragdb(b.db, tmpdb, defragLimit)
+ if err != nil {
+ tmpdb.Close()
+ os.RemoveAll(tmpdb.Path())
+ return err
+ }
+
+ err = b.db.Close()
+ if err != nil {
+ if b.lg != nil {
+ b.lg.Fatal("failed to close database", zap.Error(err))
+ } else {
+ plog.Fatalf("cannot close database (%s)", err)
+ }
+ }
+ err = tmpdb.Close()
+ if err != nil {
+ if b.lg != nil {
+ b.lg.Fatal("failed to close tmp database", zap.Error(err))
+ } else {
+ plog.Fatalf("cannot close database (%s)", err)
+ }
+ }
+ err = os.Rename(tdbp, dbp)
+ if err != nil {
+ if b.lg != nil {
+ b.lg.Fatal("failed to rename tmp database", zap.Error(err))
+ } else {
+ plog.Fatalf("cannot rename database (%s)", err)
+ }
+ }
+
+ b.db, err = bolt.Open(dbp, 0600, boltOpenOptions)
+ if err != nil {
+ if b.lg != nil {
+ b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
+ } else {
+ plog.Panicf("cannot open database at %s (%v)", dbp, err)
+ }
+ }
+ b.batchTx.tx = b.unsafeBegin(true)
+
+ b.readTx.reset()
+ b.readTx.tx = b.unsafeBegin(false)
+
+ size := b.readTx.tx.Size()
+ db := b.readTx.tx.DB()
+ atomic.StoreInt64(&b.size, size)
+ atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
+
+ took := time.Since(now)
+ defragSec.Observe(took.Seconds())
+
+ size2, sizeInUse2 := b.Size(), b.SizeInUse()
+ if b.lg != nil {
+ b.lg.Info(
+ "defragmented",
+ zap.String("path", dbp),
+ zap.Int64("current-db-size-bytes-diff", size2-size1),
+ zap.Int64("current-db-size-bytes", size2),
+ zap.String("current-db-size", humanize.Bytes(uint64(size2))),
+ zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
+ zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
+ zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
+ zap.Duration("took", took),
+ )
+ }
+ return nil
+}
+
+func defragdb(odb, tmpdb *bolt.DB, limit int) error {
+ // open a tx on tmpdb for writes
+ tmptx, err := tmpdb.Begin(true)
+ if err != nil {
+ return err
+ }
+
+ // open a tx on old db for read
+ tx, err := odb.Begin(false)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ c := tx.Cursor()
+
+ count := 0
+ for next, _ := c.First(); next != nil; next, _ = c.Next() {
+ b := tx.Bucket(next)
+ if b == nil {
+ return fmt.Errorf("backend: cannot defrag bucket %s", string(next))
+ }
+
+ tmpb, berr := tmptx.CreateBucketIfNotExists(next)
+ if berr != nil {
+ return berr
+ }
+ tmpb.FillPercent = 0.9 // for seq write in for each
+
+ b.ForEach(func(k, v []byte) error {
+ count++
+ if count > limit {
+ err = tmptx.Commit()
+ if err != nil {
+ return err
+ }
+ tmptx, err = tmpdb.Begin(true)
+ if err != nil {
+ return err
+ }
+ tmpb = tmptx.Bucket(next)
+ tmpb.FillPercent = 0.9 // for seq write in for each
+
+ count = 0
+ }
+ return tmpb.Put(k, v)
+ })
+ }
+
+ return tmptx.Commit()
+}
+
+func (b *backend) begin(write bool) *bolt.Tx {
+ b.mu.RLock()
+ tx := b.unsafeBegin(write)
+ b.mu.RUnlock()
+
+ size := tx.Size()
+ db := tx.DB()
+ stats := db.Stats()
+ atomic.StoreInt64(&b.size, size)
+ atomic.StoreInt64(&b.sizeInUse, size-(int64(stats.FreePageN)*int64(db.Info().PageSize)))
+ atomic.StoreInt64(&b.openReadTxN, int64(stats.OpenTxN))
+
+ return tx
+}
+
+func (b *backend) unsafeBegin(write bool) *bolt.Tx {
+ tx, err := b.db.Begin(write)
+ if err != nil {
+ if b.lg != nil {
+ b.lg.Fatal("failed to begin tx", zap.Error(err))
+ } else {
+ plog.Fatalf("cannot begin tx (%s)", err)
+ }
+ }
+ return tx
+}
+
+func (b *backend) OpenReadTxN() int64 {
+ return atomic.LoadInt64(&b.openReadTxN)
+}
+
+// NewTmpBackend creates a backend implementation for testing.
+func NewTmpBackend(batchInterval time.Duration, batchLimit int) (*backend, string) {
+ dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
+ if err != nil {
+ panic(err)
+ }
+ tmpPath := filepath.Join(dir, "database")
+ bcfg := DefaultBackendConfig()
+ bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = tmpPath, batchInterval, batchLimit
+ return newBackend(bcfg), tmpPath
+}
+
+func NewDefaultTmpBackend() (*backend, string) {
+ return NewTmpBackend(defaultBatchInterval, defaultBatchLimit)
+}
+
+type snapshot struct {
+ *bolt.Tx
+ stopc chan struct{}
+ donec chan struct{}
+}
+
+func (s *snapshot) Close() error {
+ close(s.stopc)
+ <-s.donec
+ return s.Tx.Rollback()
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go b/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go
new file mode 100644
index 0000000..d5c8a88
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/batch_tx.go
@@ -0,0 +1,339 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+ "bytes"
+ "math"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ bolt "go.etcd.io/bbolt"
+ "go.uber.org/zap"
+)
+
+type BatchTx interface {
+ ReadTx
+ UnsafeCreateBucket(name []byte)
+ UnsafePut(bucketName []byte, key []byte, value []byte)
+ UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
+ UnsafeDelete(bucketName []byte, key []byte)
+ // Commit commits a previous tx and begins a new writable one.
+ Commit()
+ // CommitAndStop commits the previous tx and does not create a new one.
+ CommitAndStop()
+}
+
+type batchTx struct {
+ sync.Mutex
+ tx *bolt.Tx
+ backend *backend
+
+ pending int
+}
+
+func (t *batchTx) Lock() {
+ t.Mutex.Lock()
+}
+
+func (t *batchTx) Unlock() {
+ if t.pending >= t.backend.batchLimit {
+ t.commit(false)
+ }
+ t.Mutex.Unlock()
+}
+
+// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
+// have appropriate semantics in BatchTx interface. Therefore should not be called.
+// TODO: might want to decouple ReadTx and BatchTx
+
+func (t *batchTx) RLock() {
+ panic("unexpected RLock")
+}
+
+func (t *batchTx) RUnlock() {
+ panic("unexpected RUnlock")
+}
+
+func (t *batchTx) UnsafeCreateBucket(name []byte) {
+ _, err := t.tx.CreateBucket(name)
+ if err != nil && err != bolt.ErrBucketExists {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to create a bucket",
+ zap.String("bucket-name", string(name)),
+ zap.Error(err),
+ )
+ } else {
+ plog.Fatalf("cannot create bucket %s (%v)", name, err)
+ }
+ }
+ t.pending++
+}
+
+// UnsafePut must be called holding the lock on the tx.
+func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
+ t.unsafePut(bucketName, key, value, false)
+}
+
+// UnsafeSeqPut must be called holding the lock on the tx.
+func (t *batchTx) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+ t.unsafePut(bucketName, key, value, true)
+}
+
+func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
+ bucket := t.tx.Bucket(bucketName)
+ if bucket == nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to find a bucket",
+ zap.String("bucket-name", string(bucketName)),
+ )
+ } else {
+ plog.Fatalf("bucket %s does not exist", bucketName)
+ }
+ }
+ if seq {
+ // it is useful to increase fill percent when the workloads are mostly append-only.
+ // this can delay the page split and reduce space usage.
+ bucket.FillPercent = 0.9
+ }
+ if err := bucket.Put(key, value); err != nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to write to a bucket",
+ zap.String("bucket-name", string(bucketName)),
+ zap.Error(err),
+ )
+ } else {
+ plog.Fatalf("cannot put key into bucket (%v)", err)
+ }
+ }
+ t.pending++
+}
+
+// UnsafeRange must be called holding the lock on the tx.
+func (t *batchTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+ bucket := t.tx.Bucket(bucketName)
+ if bucket == nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to find a bucket",
+ zap.String("bucket-name", string(bucketName)),
+ )
+ } else {
+ plog.Fatalf("bucket %s does not exist", bucketName)
+ }
+ }
+ return unsafeRange(bucket.Cursor(), key, endKey, limit)
+}
+
+func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
+ if limit <= 0 {
+ limit = math.MaxInt64
+ }
+ var isMatch func(b []byte) bool
+ if len(endKey) > 0 {
+ isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
+ } else {
+ isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
+ limit = 1
+ }
+
+ for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
+ vs = append(vs, cv)
+ keys = append(keys, ck)
+ if limit == int64(len(keys)) {
+ break
+ }
+ }
+ return keys, vs
+}
+
+// UnsafeDelete must be called holding the lock on the tx.
+func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
+ bucket := t.tx.Bucket(bucketName)
+ if bucket == nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to find a bucket",
+ zap.String("bucket-name", string(bucketName)),
+ )
+ } else {
+ plog.Fatalf("bucket %s does not exist", bucketName)
+ }
+ }
+ err := bucket.Delete(key)
+ if err != nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal(
+ "failed to delete a key",
+ zap.String("bucket-name", string(bucketName)),
+ zap.Error(err),
+ )
+ } else {
+ plog.Fatalf("cannot delete key from bucket (%v)", err)
+ }
+ }
+ t.pending++
+}
+
+// UnsafeForEach must be called holding the lock on the tx.
+func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+ return unsafeForEach(t.tx, bucketName, visitor)
+}
+
+func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) error {
+ if b := tx.Bucket(bucket); b != nil {
+ return b.ForEach(visitor)
+ }
+ return nil
+}
+
+// Commit commits a previous tx and begins a new writable one.
+func (t *batchTx) Commit() {
+ t.Lock()
+ t.commit(false)
+ t.Unlock()
+}
+
+// CommitAndStop commits the previous tx and does not create a new one.
+func (t *batchTx) CommitAndStop() {
+ t.Lock()
+ t.commit(true)
+ t.Unlock()
+}
+
+func (t *batchTx) safePending() int {
+ t.Mutex.Lock()
+ defer t.Mutex.Unlock()
+ return t.pending
+}
+
+func (t *batchTx) commit(stop bool) {
+ // commit the last tx
+ if t.tx != nil {
+ if t.pending == 0 && !stop {
+ return
+ }
+
+ start := time.Now()
+
+ // gofail: var beforeCommit struct{}
+ err := t.tx.Commit()
+ // gofail: var afterCommit struct{}
+
+ rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
+ spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
+ writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
+ commitSec.Observe(time.Since(start).Seconds())
+ atomic.AddInt64(&t.backend.commits, 1)
+
+ t.pending = 0
+ if err != nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
+ } else {
+ plog.Fatalf("cannot commit tx (%s)", err)
+ }
+ }
+ }
+ if !stop {
+ t.tx = t.backend.begin(true)
+ }
+}
+
+type batchTxBuffered struct {
+ batchTx
+ buf txWriteBuffer
+}
+
+func newBatchTxBuffered(backend *backend) *batchTxBuffered {
+ tx := &batchTxBuffered{
+ batchTx: batchTx{backend: backend},
+ buf: txWriteBuffer{
+ txBuffer: txBuffer{make(map[string]*bucketBuffer)},
+ seq: true,
+ },
+ }
+ tx.Commit()
+ return tx
+}
+
+func (t *batchTxBuffered) Unlock() {
+ if t.pending != 0 {
+ t.backend.readTx.Lock() // blocks txReadBuffer for writing.
+ t.buf.writeback(&t.backend.readTx.buf)
+ t.backend.readTx.Unlock()
+ if t.pending >= t.backend.batchLimit {
+ t.commit(false)
+ }
+ }
+ t.batchTx.Unlock()
+}
+
+func (t *batchTxBuffered) Commit() {
+ t.Lock()
+ t.commit(false)
+ t.Unlock()
+}
+
+func (t *batchTxBuffered) CommitAndStop() {
+ t.Lock()
+ t.commit(true)
+ t.Unlock()
+}
+
+func (t *batchTxBuffered) commit(stop bool) {
+ // all read txs must be closed to acquire boltdb commit rwlock
+ t.backend.readTx.Lock()
+ t.unsafeCommit(stop)
+ t.backend.readTx.Unlock()
+}
+
+func (t *batchTxBuffered) unsafeCommit(stop bool) {
+ if t.backend.readTx.tx != nil {
+ // wait all store read transactions using the current boltdb tx to finish,
+ // then close the boltdb tx
+ go func(tx *bolt.Tx, wg *sync.WaitGroup) {
+ wg.Wait()
+ if err := tx.Rollback(); err != nil {
+ if t.backend.lg != nil {
+ t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
+ } else {
+ plog.Fatalf("cannot rollback tx (%s)", err)
+ }
+ }
+ }(t.backend.readTx.tx, t.backend.readTx.txWg)
+ t.backend.readTx.reset()
+ }
+
+ t.batchTx.commit(stop)
+
+ if !stop {
+ t.backend.readTx.tx = t.backend.begin(false)
+ }
+}
+
+func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) {
+ t.batchTx.UnsafePut(bucketName, key, value)
+ t.buf.put(bucketName, key, value)
+}
+
+func (t *batchTxBuffered) UnsafeSeqPut(bucketName []byte, key []byte, value []byte) {
+ t.batchTx.UnsafeSeqPut(bucketName, key, value)
+ t.buf.putSeq(bucketName, key, value)
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/config_default.go b/vendor/go.etcd.io/etcd/mvcc/backend/config_default.go
new file mode 100644
index 0000000..f15f030
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/config_default.go
@@ -0,0 +1,23 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build !linux,!windows
+
+package backend
+
+import bolt "go.etcd.io/bbolt"
+
+var boltOpenOptions *bolt.Options
+
+func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/config_linux.go b/vendor/go.etcd.io/etcd/mvcc/backend/config_linux.go
new file mode 100644
index 0000000..f712671
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/config_linux.go
@@ -0,0 +1,34 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+ "syscall"
+
+ bolt "go.etcd.io/bbolt"
+)
+
+// syscall.MAP_POPULATE on linux 2.6.23+ does sequential read-ahead
+// which can speed up entire-database read with boltdb. We want to
+// enable MAP_POPULATE for faster key-value store recovery in storage
+// package. If your kernel version is lower than 2.6.23
+// (https://github.com/torvalds/linux/releases/tag/v2.6.23), mmap might
+// silently ignore this flag. Please update your kernel to prevent this.
+var boltOpenOptions = &bolt.Options{
+ MmapFlags: syscall.MAP_POPULATE,
+ NoFreelistSync: true,
+}
+
+func (bcfg *BackendConfig) mmapSize() int { return int(bcfg.MmapSize) }
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/config_windows.go b/vendor/go.etcd.io/etcd/mvcc/backend/config_windows.go
new file mode 100644
index 0000000..c650059
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/config_windows.go
@@ -0,0 +1,26 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// +build windows
+
+package backend
+
+import bolt "go.etcd.io/bbolt"
+
+var boltOpenOptions *bolt.Options = nil
+
+// setting mmap size != 0 on windows will allocate the entire
+// mmap size for the file, instead of growing it. So, force 0.
+
+func (bcfg *BackendConfig) mmapSize() int { return 0 }
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/doc.go b/vendor/go.etcd.io/etcd/mvcc/backend/doc.go
new file mode 100644
index 0000000..9cc42fa
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package backend defines a standard interface for etcd's backend MVCC storage.
+package backend
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/metrics.go b/vendor/go.etcd.io/etcd/mvcc/backend/metrics.go
new file mode 100644
index 0000000..d9641af
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/metrics.go
@@ -0,0 +1,95 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import "github.com/prometheus/client_golang/prometheus"
+
+var (
+ commitSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "disk",
+ Name: "backend_commit_duration_seconds",
+ Help: "The latency distributions of commit called by backend.",
+
+ // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+ // highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+ })
+
+ rebalanceSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "disk",
+ Name: "backend_commit_rebalance_duration_seconds",
+ Help: "The latency distributions of commit.rebalance called by bboltdb backend.",
+
+ // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+ // highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+ })
+
+ spillSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "disk",
+ Name: "backend_commit_spill_duration_seconds",
+ Help: "The latency distributions of commit.spill called by bboltdb backend.",
+
+ // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+ // highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+ })
+
+ writeSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "disk",
+ Name: "backend_commit_write_duration_seconds",
+ Help: "The latency distributions of commit.write called by bboltdb backend.",
+
+ // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
+ // highest bucket start of 0.001 sec * 2^13 == 8.192 sec
+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
+ })
+
+ defragSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "disk",
+ Name: "backend_defrag_duration_seconds",
+ Help: "The latency distribution of backend defragmentation.",
+
+ // 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
+ // lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
+ // highest bucket start of 0.1 sec * 2^12 == 409.6 sec
+ Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
+ })
+
+ snapshotTransferSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "disk",
+ Name: "backend_snapshot_duration_seconds",
+ Help: "The latency distribution of backend snapshots.",
+
+ // lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
+ // highest bucket start of 0.01 sec * 2^16 == 655.36 sec
+ Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
+ })
+)
+
+func init() {
+ prometheus.MustRegister(commitSec)
+ prometheus.MustRegister(rebalanceSec)
+ prometheus.MustRegister(spillSec)
+ prometheus.MustRegister(writeSec)
+ prometheus.MustRegister(defragSec)
+ prometheus.MustRegister(snapshotTransferSec)
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/read_tx.go b/vendor/go.etcd.io/etcd/mvcc/backend/read_tx.go
new file mode 100644
index 0000000..91fe72e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/read_tx.go
@@ -0,0 +1,210 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+ "bytes"
+ "math"
+ "sync"
+
+ bolt "go.etcd.io/bbolt"
+)
+
+// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
+// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
+// is known to never overwrite any key so range is safe.
+var safeRangeBucket = []byte("key")
+
+type ReadTx interface {
+ Lock()
+ Unlock()
+ RLock()
+ RUnlock()
+
+ UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
+ UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
+}
+
+type readTx struct {
+ // mu protects accesses to the txReadBuffer
+ mu sync.RWMutex
+ buf txReadBuffer
+
+ // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
+ // txMu protects accesses to buckets and tx on Range requests.
+ txMu sync.RWMutex
+ tx *bolt.Tx
+ buckets map[string]*bolt.Bucket
+ // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
+ txWg *sync.WaitGroup
+}
+
+func (rt *readTx) Lock() { rt.mu.Lock() }
+func (rt *readTx) Unlock() { rt.mu.Unlock() }
+func (rt *readTx) RLock() { rt.mu.RLock() }
+func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
+
+func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+ if endKey == nil {
+ // forbid duplicates for single keys
+ limit = 1
+ }
+ if limit <= 0 {
+ limit = math.MaxInt64
+ }
+ if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
+ panic("do not use unsafeRange on non-keys bucket")
+ }
+ keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
+ if int64(len(keys)) == limit {
+ return keys, vals
+ }
+
+ // find/cache bucket
+ bn := string(bucketName)
+ rt.txMu.RLock()
+ bucket, ok := rt.buckets[bn]
+ rt.txMu.RUnlock()
+ if !ok {
+ rt.txMu.Lock()
+ bucket = rt.tx.Bucket(bucketName)
+ rt.buckets[bn] = bucket
+ rt.txMu.Unlock()
+ }
+
+ // ignore missing bucket since may have been created in this batch
+ if bucket == nil {
+ return keys, vals
+ }
+ rt.txMu.Lock()
+ c := bucket.Cursor()
+ rt.txMu.Unlock()
+
+ k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
+ return append(k2, keys...), append(v2, vals...)
+}
+
+func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+ dups := make(map[string]struct{})
+ getDups := func(k, v []byte) error {
+ dups[string(k)] = struct{}{}
+ return nil
+ }
+ visitNoDup := func(k, v []byte) error {
+ if _, ok := dups[string(k)]; ok {
+ return nil
+ }
+ return visitor(k, v)
+ }
+ if err := rt.buf.ForEach(bucketName, getDups); err != nil {
+ return err
+ }
+ rt.txMu.Lock()
+ err := unsafeForEach(rt.tx, bucketName, visitNoDup)
+ rt.txMu.Unlock()
+ if err != nil {
+ return err
+ }
+ return rt.buf.ForEach(bucketName, visitor)
+}
+
+func (rt *readTx) reset() {
+ rt.buf.reset()
+ rt.buckets = make(map[string]*bolt.Bucket)
+ rt.tx = nil
+ rt.txWg = new(sync.WaitGroup)
+}
+
+// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
+type concurrentReadTx struct {
+ buf txReadBuffer
+ txMu *sync.RWMutex
+ tx *bolt.Tx
+ buckets map[string]*bolt.Bucket
+ txWg *sync.WaitGroup
+}
+
+func (rt *concurrentReadTx) Lock() {}
+func (rt *concurrentReadTx) Unlock() {}
+
+// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
+func (rt *concurrentReadTx) RLock() {}
+
+// RUnlock signals the end of concurrentReadTx.
+func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
+
+func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+ dups := make(map[string]struct{})
+ getDups := func(k, v []byte) error {
+ dups[string(k)] = struct{}{}
+ return nil
+ }
+ visitNoDup := func(k, v []byte) error {
+ if _, ok := dups[string(k)]; ok {
+ return nil
+ }
+ return visitor(k, v)
+ }
+ if err := rt.buf.ForEach(bucketName, getDups); err != nil {
+ return err
+ }
+ rt.txMu.Lock()
+ err := unsafeForEach(rt.tx, bucketName, visitNoDup)
+ rt.txMu.Unlock()
+ if err != nil {
+ return err
+ }
+ return rt.buf.ForEach(bucketName, visitor)
+}
+
+func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+ if endKey == nil {
+ // forbid duplicates for single keys
+ limit = 1
+ }
+ if limit <= 0 {
+ limit = math.MaxInt64
+ }
+ if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
+ panic("do not use unsafeRange on non-keys bucket")
+ }
+ keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
+ if int64(len(keys)) == limit {
+ return keys, vals
+ }
+
+ // find/cache bucket
+ bn := string(bucketName)
+ rt.txMu.RLock()
+ bucket, ok := rt.buckets[bn]
+ rt.txMu.RUnlock()
+ if !ok {
+ rt.txMu.Lock()
+ bucket = rt.tx.Bucket(bucketName)
+ rt.buckets[bn] = bucket
+ rt.txMu.Unlock()
+ }
+
+ // ignore missing bucket since may have been created in this batch
+ if bucket == nil {
+ return keys, vals
+ }
+ rt.txMu.Lock()
+ c := bucket.Cursor()
+ rt.txMu.Unlock()
+
+ k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
+ return append(k2, keys...), append(v2, vals...)
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/tx_buffer.go b/vendor/go.etcd.io/etcd/mvcc/backend/tx_buffer.go
new file mode 100644
index 0000000..d734638
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/tx_buffer.go
@@ -0,0 +1,203 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+ "bytes"
+ "sort"
+)
+
+// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
+type txBuffer struct {
+ buckets map[string]*bucketBuffer
+}
+
+func (txb *txBuffer) reset() {
+ for k, v := range txb.buckets {
+ if v.used == 0 {
+ // demote
+ delete(txb.buckets, k)
+ }
+ v.used = 0
+ }
+}
+
+// txWriteBuffer buffers writes of pending updates that have not yet committed.
+type txWriteBuffer struct {
+ txBuffer
+ seq bool
+}
+
+func (txw *txWriteBuffer) put(bucket, k, v []byte) {
+ txw.seq = false
+ txw.putSeq(bucket, k, v)
+}
+
+func (txw *txWriteBuffer) putSeq(bucket, k, v []byte) {
+ b, ok := txw.buckets[string(bucket)]
+ if !ok {
+ b = newBucketBuffer()
+ txw.buckets[string(bucket)] = b
+ }
+ b.add(k, v)
+}
+
+func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
+ for k, wb := range txw.buckets {
+ rb, ok := txr.buckets[k]
+ if !ok {
+ delete(txw.buckets, k)
+ txr.buckets[k] = wb
+ continue
+ }
+ if !txw.seq && wb.used > 1 {
+ // assume no duplicate keys
+ sort.Sort(wb)
+ }
+ rb.merge(wb)
+ }
+ txw.reset()
+}
+
+// txReadBuffer accesses buffered updates.
+type txReadBuffer struct{ txBuffer }
+
+func (txr *txReadBuffer) Range(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+ if b := txr.buckets[string(bucketName)]; b != nil {
+ return b.Range(key, endKey, limit)
+ }
+ return nil, nil
+}
+
+func (txr *txReadBuffer) ForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+ if b := txr.buckets[string(bucketName)]; b != nil {
+ return b.ForEach(visitor)
+ }
+ return nil
+}
+
+// unsafeCopy returns a copy of txReadBuffer, caller should acquire backend.readTx.RLock()
+func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
+ txrCopy := txReadBuffer{
+ txBuffer: txBuffer{
+ buckets: make(map[string]*bucketBuffer, len(txr.txBuffer.buckets)),
+ },
+ }
+ for bucketName, bucket := range txr.txBuffer.buckets {
+ txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
+ }
+ return txrCopy
+}
+
+type kv struct {
+ key []byte
+ val []byte
+}
+
+// bucketBuffer buffers key-value pairs that are pending commit.
+type bucketBuffer struct {
+ buf []kv
+ // used tracks number of elements in use so buf can be reused without reallocation.
+ used int
+}
+
+func newBucketBuffer() *bucketBuffer {
+ return &bucketBuffer{buf: make([]kv, 512), used: 0}
+}
+
+func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
+ f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
+ idx := sort.Search(bb.used, f)
+ if idx < 0 {
+ return nil, nil
+ }
+ if len(endKey) == 0 {
+ if bytes.Equal(key, bb.buf[idx].key) {
+ keys = append(keys, bb.buf[idx].key)
+ vals = append(vals, bb.buf[idx].val)
+ }
+ return keys, vals
+ }
+ if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
+ return nil, nil
+ }
+ for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
+ if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
+ break
+ }
+ keys = append(keys, bb.buf[i].key)
+ vals = append(vals, bb.buf[i].val)
+ }
+ return keys, vals
+}
+
+func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error {
+ for i := 0; i < bb.used; i++ {
+ if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (bb *bucketBuffer) add(k, v []byte) {
+ bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
+ bb.used++
+ if bb.used == len(bb.buf) {
+ buf := make([]kv, (3*len(bb.buf))/2)
+ copy(buf, bb.buf)
+ bb.buf = buf
+ }
+}
+
+// merge merges data from bb into bbsrc.
+func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
+ for i := 0; i < bbsrc.used; i++ {
+ bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val)
+ }
+ if bb.used == bbsrc.used {
+ return
+ }
+ if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
+ return
+ }
+
+ sort.Stable(bb)
+
+ // remove duplicates, using only newest update
+ widx := 0
+ for ridx := 1; ridx < bb.used; ridx++ {
+ if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
+ widx++
+ }
+ bb.buf[widx] = bb.buf[ridx]
+ }
+ bb.used = widx + 1
+}
+
+func (bb *bucketBuffer) Len() int { return bb.used }
+func (bb *bucketBuffer) Less(i, j int) bool {
+ return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
+}
+func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }
+
+func (bb *bucketBuffer) Copy() *bucketBuffer {
+ bbCopy := bucketBuffer{
+ buf: make([]kv, len(bb.buf)),
+ used: bb.used,
+ }
+ copy(bbCopy.buf, bb.buf)
+ return &bbCopy
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/doc.go b/vendor/go.etcd.io/etcd/mvcc/doc.go
new file mode 100644
index 0000000..ad5be03
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mvcc defines etcd's stable MVCC storage.
+package mvcc
diff --git a/vendor/go.etcd.io/etcd/mvcc/index.go b/vendor/go.etcd.io/etcd/mvcc/index.go
new file mode 100644
index 0000000..f8cc6df
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/index.go
@@ -0,0 +1,258 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "sort"
+ "sync"
+
+ "github.com/google/btree"
+ "go.uber.org/zap"
+)
+
+type index interface {
+ Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
+ Range(key, end []byte, atRev int64) ([][]byte, []revision)
+ Revisions(key, end []byte, atRev int64) []revision
+ Put(key []byte, rev revision)
+ Tombstone(key []byte, rev revision) error
+ RangeSince(key, end []byte, rev int64) []revision
+ Compact(rev int64) map[revision]struct{}
+ Keep(rev int64) map[revision]struct{}
+ Equal(b index) bool
+
+ Insert(ki *keyIndex)
+ KeyIndex(ki *keyIndex) *keyIndex
+}
+
+type treeIndex struct {
+ sync.RWMutex
+ tree *btree.BTree
+ lg *zap.Logger
+}
+
+func newTreeIndex(lg *zap.Logger) index {
+ return &treeIndex{
+ tree: btree.New(32),
+ lg: lg,
+ }
+}
+
+func (ti *treeIndex) Put(key []byte, rev revision) {
+ keyi := &keyIndex{key: key}
+
+ ti.Lock()
+ defer ti.Unlock()
+ item := ti.tree.Get(keyi)
+ if item == nil {
+ keyi.put(ti.lg, rev.main, rev.sub)
+ ti.tree.ReplaceOrInsert(keyi)
+ return
+ }
+ okeyi := item.(*keyIndex)
+ okeyi.put(ti.lg, rev.main, rev.sub)
+}
+
+func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
+ keyi := &keyIndex{key: key}
+ ti.RLock()
+ defer ti.RUnlock()
+ if keyi = ti.keyIndex(keyi); keyi == nil {
+ return revision{}, revision{}, 0, ErrRevisionNotFound
+ }
+ return keyi.get(ti.lg, atRev)
+}
+
+func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
+ ti.RLock()
+ defer ti.RUnlock()
+ return ti.keyIndex(keyi)
+}
+
+func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
+ if item := ti.tree.Get(keyi); item != nil {
+ return item.(*keyIndex)
+ }
+ return nil
+}
+
+func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) {
+ keyi, endi := &keyIndex{key: key}, &keyIndex{key: end}
+
+ ti.RLock()
+ defer ti.RUnlock()
+
+ ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
+ if len(endi.key) > 0 && !item.Less(endi) {
+ return false
+ }
+ f(item.(*keyIndex))
+ return true
+ })
+}
+
+func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
+ if end == nil {
+ rev, _, _, err := ti.Get(key, atRev)
+ if err != nil {
+ return nil
+ }
+ return []revision{rev}
+ }
+ ti.visit(key, end, func(ki *keyIndex) {
+ if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
+ revs = append(revs, rev)
+ }
+ })
+ return revs
+}
+
+func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
+ if end == nil {
+ rev, _, _, err := ti.Get(key, atRev)
+ if err != nil {
+ return nil, nil
+ }
+ return [][]byte{key}, []revision{rev}
+ }
+ ti.visit(key, end, func(ki *keyIndex) {
+ if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
+ revs = append(revs, rev)
+ keys = append(keys, ki.key)
+ }
+ })
+ return keys, revs
+}
+
+func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
+ keyi := &keyIndex{key: key}
+
+ ti.Lock()
+ defer ti.Unlock()
+ item := ti.tree.Get(keyi)
+ if item == nil {
+ return ErrRevisionNotFound
+ }
+
+ ki := item.(*keyIndex)
+ return ki.tombstone(ti.lg, rev.main, rev.sub)
+}
+
+// RangeSince returns all revisions from key(including) to end(excluding)
+// at or after the given rev. The returned slice is sorted in the order
+// of revision.
+func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {
+ keyi := &keyIndex{key: key}
+
+ ti.RLock()
+ defer ti.RUnlock()
+
+ if end == nil {
+ item := ti.tree.Get(keyi)
+ if item == nil {
+ return nil
+ }
+ keyi = item.(*keyIndex)
+ return keyi.since(ti.lg, rev)
+ }
+
+ endi := &keyIndex{key: end}
+ var revs []revision
+ ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
+ if len(endi.key) > 0 && !item.Less(endi) {
+ return false
+ }
+ curKeyi := item.(*keyIndex)
+ revs = append(revs, curKeyi.since(ti.lg, rev)...)
+ return true
+ })
+ sort.Sort(revisions(revs))
+
+ return revs
+}
+
+func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
+ available := make(map[revision]struct{})
+ if ti.lg != nil {
+ ti.lg.Info("compact tree index", zap.Int64("revision", rev))
+ } else {
+ plog.Printf("store.index: compact %d", rev)
+ }
+ ti.Lock()
+ clone := ti.tree.Clone()
+ ti.Unlock()
+
+ clone.Ascend(func(item btree.Item) bool {
+ keyi := item.(*keyIndex)
+ //Lock is needed here to prevent modification to the keyIndex while
+ //compaction is going on or revision added to empty before deletion
+ ti.Lock()
+ keyi.compact(ti.lg, rev, available)
+ if keyi.isEmpty() {
+ item := ti.tree.Delete(keyi)
+ if item == nil {
+ if ti.lg != nil {
+ ti.lg.Panic("failed to delete during compaction")
+ } else {
+ plog.Panic("store.index: unexpected delete failure during compaction")
+ }
+ }
+ }
+ ti.Unlock()
+ return true
+ })
+ return available
+}
+
+// Keep finds all revisions to be kept for a Compaction at the given rev.
+func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
+ available := make(map[revision]struct{})
+ ti.RLock()
+ defer ti.RUnlock()
+ ti.tree.Ascend(func(i btree.Item) bool {
+ keyi := i.(*keyIndex)
+ keyi.keep(rev, available)
+ return true
+ })
+ return available
+}
+
+func (ti *treeIndex) Equal(bi index) bool {
+ b := bi.(*treeIndex)
+
+ if ti.tree.Len() != b.tree.Len() {
+ return false
+ }
+
+ equal := true
+
+ ti.tree.Ascend(func(item btree.Item) bool {
+ aki := item.(*keyIndex)
+ bki := b.tree.Get(item).(*keyIndex)
+ if !aki.equal(bki) {
+ equal = false
+ return false
+ }
+ return true
+ })
+
+ return equal
+}
+
+func (ti *treeIndex) Insert(ki *keyIndex) {
+ ti.Lock()
+ defer ti.Unlock()
+ ti.tree.ReplaceOrInsert(ki)
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/key_index.go b/vendor/go.etcd.io/etcd/mvcc/key_index.go
new file mode 100644
index 0000000..cf77cb4
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/key_index.go
@@ -0,0 +1,402 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+
+ "github.com/google/btree"
+ "go.uber.org/zap"
+)
+
+var (
+ ErrRevisionNotFound = errors.New("mvcc: revision not found")
+)
+
+// keyIndex stores the revisions of a key in the backend.
+// Each keyIndex has at least one key generation.
+// Each generation might have several key versions.
+// Tombstone on a key appends an tombstone version at the end
+// of the current generation and creates a new empty generation.
+// Each version of a key has an index pointing to the backend.
+//
+// For example: put(1.0);put(2.0);tombstone(3.0);put(4.0);tombstone(5.0) on key "foo"
+// generate a keyIndex:
+// key: "foo"
+// rev: 5
+// generations:
+// {empty}
+// {4.0, 5.0(t)}
+// {1.0, 2.0, 3.0(t)}
+//
+// Compact a keyIndex removes the versions with smaller or equal to
+// rev except the largest one. If the generation becomes empty
+// during compaction, it will be removed. if all the generations get
+// removed, the keyIndex should be removed.
+//
+// For example:
+// compact(2) on the previous example
+// generations:
+// {empty}
+// {4.0, 5.0(t)}
+// {2.0, 3.0(t)}
+//
+// compact(4)
+// generations:
+// {empty}
+// {4.0, 5.0(t)}
+//
+// compact(5):
+// generations:
+// {empty} -> key SHOULD be removed.
+//
+// compact(6):
+// generations:
+// {empty} -> key SHOULD be removed.
+type keyIndex struct {
+ key []byte
+ modified revision // the main rev of the last modification
+ generations []generation
+}
+
+// put puts a revision to the keyIndex.
+func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
+ rev := revision{main: main, sub: sub}
+
+ if !rev.GreaterThan(ki.modified) {
+ if lg != nil {
+ lg.Panic(
+ "'put' with an unexpected smaller revision",
+ zap.Int64("given-revision-main", rev.main),
+ zap.Int64("given-revision-sub", rev.sub),
+ zap.Int64("modified-revision-main", ki.modified.main),
+ zap.Int64("modified-revision-sub", ki.modified.sub),
+ )
+ } else {
+ plog.Panicf("store.keyindex: put with unexpected smaller revision [%v / %v]", rev, ki.modified)
+ }
+ }
+ if len(ki.generations) == 0 {
+ ki.generations = append(ki.generations, generation{})
+ }
+ g := &ki.generations[len(ki.generations)-1]
+ if len(g.revs) == 0 { // create a new key
+ keysGauge.Inc()
+ g.created = rev
+ }
+ g.revs = append(g.revs, rev)
+ g.ver++
+ ki.modified = rev
+}
+
+func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
+ if len(ki.generations) != 0 {
+ if lg != nil {
+ lg.Panic(
+ "'restore' got an unexpected non-empty generations",
+ zap.Int("generations-size", len(ki.generations)),
+ )
+ } else {
+ plog.Panicf("store.keyindex: cannot restore non-empty keyIndex")
+ }
+ }
+
+ ki.modified = modified
+ g := generation{created: created, ver: ver, revs: []revision{modified}}
+ ki.generations = append(ki.generations, g)
+ keysGauge.Inc()
+}
+
+// tombstone puts a revision, pointing to a tombstone, to the keyIndex.
+// It also creates a new empty generation in the keyIndex.
+// It returns ErrRevisionNotFound when tombstone on an empty generation.
+func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
+ if ki.isEmpty() {
+ if lg != nil {
+ lg.Panic(
+ "'tombstone' got an unexpected empty keyIndex",
+ zap.String("key", string(ki.key)),
+ )
+ } else {
+ plog.Panicf("store.keyindex: unexpected tombstone on empty keyIndex %s", string(ki.key))
+ }
+ }
+ if ki.generations[len(ki.generations)-1].isEmpty() {
+ return ErrRevisionNotFound
+ }
+ ki.put(lg, main, sub)
+ ki.generations = append(ki.generations, generation{})
+ keysGauge.Dec()
+ return nil
+}
+
+// get gets the modified, created revision and version of the key that satisfies the given atRev.
+// Rev must be higher than or equal to the given atRev.
+func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
+ if ki.isEmpty() {
+ if lg != nil {
+ lg.Panic(
+ "'get' got an unexpected empty keyIndex",
+ zap.String("key", string(ki.key)),
+ )
+ } else {
+ plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
+ }
+ }
+ g := ki.findGeneration(atRev)
+ if g.isEmpty() {
+ return revision{}, revision{}, 0, ErrRevisionNotFound
+ }
+
+ n := g.walk(func(rev revision) bool { return rev.main > atRev })
+ if n != -1 {
+ return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
+ }
+
+ return revision{}, revision{}, 0, ErrRevisionNotFound
+}
+
+// since returns revisions since the given rev. Only the revision with the
+// largest sub revision will be returned if multiple revisions have the same
+// main revision.
+func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision {
+ if ki.isEmpty() {
+ if lg != nil {
+ lg.Panic(
+ "'since' got an unexpected empty keyIndex",
+ zap.String("key", string(ki.key)),
+ )
+ } else {
+ plog.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
+ }
+ }
+ since := revision{rev, 0}
+ var gi int
+ // find the generations to start checking
+ for gi = len(ki.generations) - 1; gi > 0; gi-- {
+ g := ki.generations[gi]
+ if g.isEmpty() {
+ continue
+ }
+ if since.GreaterThan(g.created) {
+ break
+ }
+ }
+
+ var revs []revision
+ var last int64
+ for ; gi < len(ki.generations); gi++ {
+ for _, r := range ki.generations[gi].revs {
+ if since.GreaterThan(r) {
+ continue
+ }
+ if r.main == last {
+ // replace the revision with a new one that has higher sub value,
+ // because the original one should not be seen by external
+ revs[len(revs)-1] = r
+ continue
+ }
+ revs = append(revs, r)
+ last = r.main
+ }
+ }
+ return revs
+}
+
+// compact compacts a keyIndex by removing the versions with smaller or equal
+// revision than the given atRev except the largest one (If the largest one is
+// a tombstone, it will not be kept).
+// If a generation becomes empty during compaction, it will be removed.
+func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
+ if ki.isEmpty() {
+ if lg != nil {
+ lg.Panic(
+ "'compact' got an unexpected empty keyIndex",
+ zap.String("key", string(ki.key)),
+ )
+ } else {
+ plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
+ }
+ }
+
+ genIdx, revIndex := ki.doCompact(atRev, available)
+
+ g := &ki.generations[genIdx]
+ if !g.isEmpty() {
+ // remove the previous contents.
+ if revIndex != -1 {
+ g.revs = g.revs[revIndex:]
+ }
+ // remove any tombstone
+ if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {
+ delete(available, g.revs[0])
+ genIdx++
+ }
+ }
+
+ // remove the previous generations.
+ ki.generations = ki.generations[genIdx:]
+}
+
+// keep finds the revision to be kept if compact is called at given atRev.
+func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
+ if ki.isEmpty() {
+ return
+ }
+
+ genIdx, revIndex := ki.doCompact(atRev, available)
+ g := &ki.generations[genIdx]
+ if !g.isEmpty() {
+ // remove any tombstone
+ if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
+ delete(available, g.revs[revIndex])
+ }
+ }
+}
+
+func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
+ // walk until reaching the first revision smaller or equal to "atRev",
+ // and add the revision to the available map
+ f := func(rev revision) bool {
+ if rev.main <= atRev {
+ available[rev] = struct{}{}
+ return false
+ }
+ return true
+ }
+
+ genIdx, g := 0, &ki.generations[0]
+ // find first generation includes atRev or created after atRev
+ for genIdx < len(ki.generations)-1 {
+ if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
+ break
+ }
+ genIdx++
+ g = &ki.generations[genIdx]
+ }
+
+ revIndex = g.walk(f)
+
+ return genIdx, revIndex
+}
+
+func (ki *keyIndex) isEmpty() bool {
+ return len(ki.generations) == 1 && ki.generations[0].isEmpty()
+}
+
+// findGeneration finds out the generation of the keyIndex that the
+// given rev belongs to. If the given rev is at the gap of two generations,
+// which means that the key does not exist at the given rev, it returns nil.
+func (ki *keyIndex) findGeneration(rev int64) *generation {
+ lastg := len(ki.generations) - 1
+ cg := lastg
+
+ for cg >= 0 {
+ if len(ki.generations[cg].revs) == 0 {
+ cg--
+ continue
+ }
+ g := ki.generations[cg]
+ if cg != lastg {
+ if tomb := g.revs[len(g.revs)-1].main; tomb <= rev {
+ return nil
+ }
+ }
+ if g.revs[0].main <= rev {
+ return &ki.generations[cg]
+ }
+ cg--
+ }
+ return nil
+}
+
+func (ki *keyIndex) Less(b btree.Item) bool {
+ return bytes.Compare(ki.key, b.(*keyIndex).key) == -1
+}
+
+func (ki *keyIndex) equal(b *keyIndex) bool {
+ if !bytes.Equal(ki.key, b.key) {
+ return false
+ }
+ if ki.modified != b.modified {
+ return false
+ }
+ if len(ki.generations) != len(b.generations) {
+ return false
+ }
+ for i := range ki.generations {
+ ag, bg := ki.generations[i], b.generations[i]
+ if !ag.equal(bg) {
+ return false
+ }
+ }
+ return true
+}
+
+func (ki *keyIndex) String() string {
+ var s string
+ for _, g := range ki.generations {
+ s += g.String()
+ }
+ return s
+}
+
+// generation contains multiple revisions of a key.
+type generation struct {
+ ver int64
+ created revision // when the generation is created (put in first revision).
+ revs []revision
+}
+
+func (g *generation) isEmpty() bool { return g == nil || len(g.revs) == 0 }
+
+// walk walks through the revisions in the generation in descending order.
+// It passes the revision to the given function.
+// walk returns until: 1. it finishes walking all pairs 2. the function returns false.
+// walk returns the position at where it stopped. If it stopped after
+// finishing walking, -1 will be returned.
+func (g *generation) walk(f func(rev revision) bool) int {
+ l := len(g.revs)
+ for i := range g.revs {
+ ok := f(g.revs[l-i-1])
+ if !ok {
+ return l - i - 1
+ }
+ }
+ return -1
+}
+
+func (g *generation) String() string {
+ return fmt.Sprintf("g: created[%d] ver[%d], revs %#v\n", g.created, g.ver, g.revs)
+}
+
+func (g generation) equal(b generation) bool {
+ if g.ver != b.ver {
+ return false
+ }
+ if len(g.revs) != len(b.revs) {
+ return false
+ }
+
+ for i := range g.revs {
+ ar, br := g.revs[i], b.revs[i]
+ if ar != br {
+ return false
+ }
+ }
+ return true
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/kv.go b/vendor/go.etcd.io/etcd/mvcc/kv.go
new file mode 100644
index 0000000..8e898a5
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kv.go
@@ -0,0 +1,149 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "go.etcd.io/etcd/lease"
+ "go.etcd.io/etcd/mvcc/backend"
+ "go.etcd.io/etcd/mvcc/mvccpb"
+)
+
+type RangeOptions struct {
+ Limit int64
+ Rev int64
+ Count bool
+}
+
+type RangeResult struct {
+ KVs []mvccpb.KeyValue
+ Rev int64
+ Count int
+}
+
+type ReadView interface {
+ // FirstRev returns the first KV revision at the time of opening the txn.
+ // After a compaction, the first revision increases to the compaction
+ // revision.
+ FirstRev() int64
+
+ // Rev returns the revision of the KV at the time of opening the txn.
+ Rev() int64
+
+ // Range gets the keys in the range at rangeRev.
+ // The returned rev is the current revision of the KV when the operation is executed.
+ // If rangeRev <=0, range gets the keys at currentRev.
+ // If `end` is nil, the request returns the key.
+ // If `end` is not nil and not empty, it gets the keys in range [key, range_end).
+ // If `end` is not nil and empty, it gets the keys greater than or equal to key.
+ // Limit limits the number of keys returned.
+ // If the required rev is compacted, ErrCompacted will be returned.
+ Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error)
+}
+
+// TxnRead represents a read-only transaction with operations that will not
+// block other read transactions.
+type TxnRead interface {
+ ReadView
+ // End marks the transaction is complete and ready to commit.
+ End()
+}
+
+type WriteView interface {
+ // DeleteRange deletes the given range from the store.
+ // A deleteRange increases the rev of the store if any key in the range exists.
+ // The number of key deleted will be returned.
+ // The returned rev is the current revision of the KV when the operation is executed.
+ // It also generates one event for each key delete in the event history.
+ // if the `end` is nil, deleteRange deletes the key.
+ // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
+ DeleteRange(key, end []byte) (n, rev int64)
+
+ // Put puts the given key, value into the store. Put also takes additional argument lease to
+ // attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
+ // id.
+ // A put also increases the rev of the store, and generates one event in the event history.
+ // The returned rev is the current revision of the KV when the operation is executed.
+ Put(key, value []byte, lease lease.LeaseID) (rev int64)
+}
+
+// TxnWrite represents a transaction that can modify the store.
+type TxnWrite interface {
+ TxnRead
+ WriteView
+ // Changes gets the changes made since opening the write txn.
+ Changes() []mvccpb.KeyValue
+}
+
+// txnReadWrite coerces a read txn to a write, panicking on any write operation.
+type txnReadWrite struct{ TxnRead }
+
+func (trw *txnReadWrite) DeleteRange(key, end []byte) (n, rev int64) { panic("unexpected DeleteRange") }
+func (trw *txnReadWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+ panic("unexpected Put")
+}
+func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }
+
+func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }
+
+type KV interface {
+ ReadView
+ WriteView
+
+ // Read creates a read transaction.
+ Read() TxnRead
+
+ // Write creates a write transaction.
+ Write() TxnWrite
+
+ // Hash computes the hash of the KV's backend.
+ Hash() (hash uint32, revision int64, err error)
+
+ // HashByRev computes the hash of all MVCC revisions up to a given revision.
+ HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
+
+ // Compact frees all superseded keys with revisions less than rev.
+ Compact(rev int64) (<-chan struct{}, error)
+
+ // Commit commits outstanding txns into the underlying backend.
+ Commit()
+
+ // Restore restores the KV store from a backend.
+ Restore(b backend.Backend) error
+ Close() error
+}
+
+// WatchableKV is a KV that can be watched.
+type WatchableKV interface {
+ KV
+ Watchable
+}
+
+// Watchable is the interface that wraps the NewWatchStream function.
+type Watchable interface {
+ // NewWatchStream returns a WatchStream that can be used to
+ // watch events happened or happening on the KV.
+ NewWatchStream() WatchStream
+}
+
+// ConsistentWatchableKV is a WatchableKV that understands the consistency
+// algorithm and consistent index.
+// If the consistent index of executing entry is not larger than the
+// consistent index of ConsistentWatchableKV, all operations in
+// this entry are skipped and return empty response.
+type ConsistentWatchableKV interface {
+ WatchableKV
+ // ConsistentIndex returns the current consistent index of the KV.
+ ConsistentIndex() uint64
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/kv_view.go b/vendor/go.etcd.io/etcd/mvcc/kv_view.go
new file mode 100644
index 0000000..bd2e777
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kv_view.go
@@ -0,0 +1,51 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import "go.etcd.io/etcd/lease"
+
+type readView struct{ kv KV }
+
+func (rv *readView) FirstRev() int64 {
+ tr := rv.kv.Read()
+ defer tr.End()
+ return tr.FirstRev()
+}
+
+func (rv *readView) Rev() int64 {
+ tr := rv.kv.Read()
+ defer tr.End()
+ return tr.Rev()
+}
+
+func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+ tr := rv.kv.Read()
+ defer tr.End()
+ return tr.Range(key, end, ro)
+}
+
+type writeView struct{ kv KV }
+
+func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
+ tw := wv.kv.Write()
+ defer tw.End()
+ return tw.DeleteRange(key, end)
+}
+
+func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+ tw := wv.kv.Write()
+ defer tw.End()
+ return tw.Put(key, value, lease)
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/kvstore.go b/vendor/go.etcd.io/etcd/mvcc/kvstore.go
new file mode 100644
index 0000000..e367ebb
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kvstore.go
@@ -0,0 +1,620 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "hash/crc32"
+ "math"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "go.etcd.io/etcd/lease"
+ "go.etcd.io/etcd/mvcc/backend"
+ "go.etcd.io/etcd/mvcc/mvccpb"
+ "go.etcd.io/etcd/pkg/schedule"
+
+ "github.com/coreos/pkg/capnslog"
+ "go.uber.org/zap"
+)
+
+var (
+ keyBucketName = []byte("key")
+ metaBucketName = []byte("meta")
+
+ consistentIndexKeyName = []byte("consistent_index")
+ scheduledCompactKeyName = []byte("scheduledCompactRev")
+ finishedCompactKeyName = []byte("finishedCompactRev")
+
+ ErrCompacted = errors.New("mvcc: required revision has been compacted")
+ ErrFutureRev = errors.New("mvcc: required revision is a future revision")
+ ErrCanceled = errors.New("mvcc: watcher is canceled")
+ ErrClosed = errors.New("mvcc: closed")
+
+ plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "mvcc")
+)
+
+const (
+ // markedRevBytesLen is the byte length of marked revision.
+ // The first `revBytesLen` bytes represents a normal revision. The last
+ // one byte is the mark.
+ markedRevBytesLen = revBytesLen + 1
+ markBytePosition = markedRevBytesLen - 1
+ markTombstone byte = 't'
+)
+
+var restoreChunkKeys = 10000 // non-const for testing
+var defaultCompactBatchLimit = 1000
+
+// ConsistentIndexGetter is an interface that wraps the Get method.
+// Consistent index is the offset of an entry in a consistent replicated log.
+type ConsistentIndexGetter interface {
+ // ConsistentIndex returns the consistent index of current executing entry.
+ ConsistentIndex() uint64
+}
+
+type StoreConfig struct {
+ CompactionBatchLimit int
+}
+
+type store struct {
+ ReadView
+ WriteView
+
+ // consistentIndex caches the "consistent_index" key's value. Accessed
+ // through atomics so must be 64-bit aligned.
+ consistentIndex uint64
+
+ cfg StoreConfig
+
+ // mu read locks for txns and write locks for non-txn store changes.
+ mu sync.RWMutex
+
+ ig ConsistentIndexGetter
+
+ b backend.Backend
+ kvindex index
+
+ le lease.Lessor
+
+ // revMuLock protects currentRev and compactMainRev.
+ // Locked at end of write txn and released after write txn unlock lock.
+ // Locked before locking read txn and released after locking.
+ revMu sync.RWMutex
+ // currentRev is the revision of the last completed transaction.
+ currentRev int64
+ // compactMainRev is the main revision of the last compaction.
+ compactMainRev int64
+
+ // bytesBuf8 is a byte slice of length 8
+ // to avoid a repetitive allocation in saveIndex.
+ bytesBuf8 []byte
+
+ fifoSched schedule.Scheduler
+
+ stopc chan struct{}
+
+ lg *zap.Logger
+}
+
+// NewStore returns a new store. It is useful to create a store inside
+// mvcc pkg. It should only be used for testing externally.
+func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store {
+ if cfg.CompactionBatchLimit == 0 {
+ cfg.CompactionBatchLimit = defaultCompactBatchLimit
+ }
+ s := &store{
+ cfg: cfg,
+ b: b,
+ ig: ig,
+ kvindex: newTreeIndex(lg),
+
+ le: le,
+
+ currentRev: 1,
+ compactMainRev: -1,
+
+ bytesBuf8: make([]byte, 8),
+ fifoSched: schedule.NewFIFOScheduler(),
+
+ stopc: make(chan struct{}),
+
+ lg: lg,
+ }
+ s.ReadView = &readView{s}
+ s.WriteView = &writeView{s}
+ if s.le != nil {
+ s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
+ }
+
+ tx := s.b.BatchTx()
+ tx.Lock()
+ tx.UnsafeCreateBucket(keyBucketName)
+ tx.UnsafeCreateBucket(metaBucketName)
+ tx.Unlock()
+ s.b.ForceCommit()
+
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if err := s.restore(); err != nil {
+ // TODO: return the error instead of panic here?
+ panic("failed to recover store from backend")
+ }
+
+ return s
+}
+
+func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
+ if ctx == nil || ctx.Err() != nil {
+ s.mu.Lock()
+ select {
+ case <-s.stopc:
+ default:
+ f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
+ s.fifoSched.Schedule(f)
+ }
+ s.mu.Unlock()
+ return
+ }
+ close(ch)
+}
+
+func (s *store) Hash() (hash uint32, revision int64, err error) {
+ // TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly
+ start := time.Now()
+
+ s.b.ForceCommit()
+ h, err := s.b.Hash(DefaultIgnores)
+
+ hashSec.Observe(time.Since(start).Seconds())
+ return h, s.currentRev, err
+}
+
+func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
+ start := time.Now()
+
+ s.mu.RLock()
+ s.revMu.RLock()
+ compactRev, currentRev = s.compactMainRev, s.currentRev
+ s.revMu.RUnlock()
+
+ if rev > 0 && rev <= compactRev {
+ s.mu.RUnlock()
+ return 0, 0, compactRev, ErrCompacted
+ } else if rev > 0 && rev > currentRev {
+ s.mu.RUnlock()
+ return 0, currentRev, 0, ErrFutureRev
+ }
+
+ if rev == 0 {
+ rev = currentRev
+ }
+ keep := s.kvindex.Keep(rev)
+
+ tx := s.b.ReadTx()
+ tx.RLock()
+ defer tx.RUnlock()
+ s.mu.RUnlock()
+
+ upper := revision{main: rev + 1}
+ lower := revision{main: compactRev + 1}
+ h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
+
+ h.Write(keyBucketName)
+ err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
+ kr := bytesToRev(k)
+ if !upper.GreaterThan(kr) {
+ return nil
+ }
+ // skip revisions that are scheduled for deletion
+ // due to compacting; don't skip if there isn't one.
+ if lower.GreaterThan(kr) && len(keep) > 0 {
+ if _, ok := keep[kr]; !ok {
+ return nil
+ }
+ }
+ h.Write(k)
+ h.Write(v)
+ return nil
+ })
+ hash = h.Sum32()
+
+ hashRevSec.Observe(time.Since(start).Seconds())
+ return hash, currentRev, compactRev, err
+}
+
+func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) {
+ s.revMu.Lock()
+ if rev <= s.compactMainRev {
+ ch := make(chan struct{})
+ f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
+ s.fifoSched.Schedule(f)
+ s.revMu.Unlock()
+ return ch, ErrCompacted
+ }
+ if rev > s.currentRev {
+ s.revMu.Unlock()
+ return nil, ErrFutureRev
+ }
+
+ s.compactMainRev = rev
+
+ rbytes := newRevBytes()
+ revToBytes(revision{main: rev}, rbytes)
+
+ tx := s.b.BatchTx()
+ tx.Lock()
+ tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
+ tx.Unlock()
+ // ensure that desired compaction is persisted
+ s.b.ForceCommit()
+
+ s.revMu.Unlock()
+
+ return nil, nil
+}
+
+func (s *store) compact(rev int64) (<-chan struct{}, error) {
+ start := time.Now()
+ keep := s.kvindex.Compact(rev)
+ ch := make(chan struct{})
+ var j = func(ctx context.Context) {
+ if ctx.Err() != nil {
+ s.compactBarrier(ctx, ch)
+ return
+ }
+ if !s.scheduleCompaction(rev, keep) {
+ s.compactBarrier(nil, ch)
+ return
+ }
+ close(ch)
+ }
+
+ s.fifoSched.Schedule(j)
+
+ indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
+ return ch, nil
+}
+
+func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
+ ch, err := s.updateCompactRev(rev)
+ if nil != err {
+ return ch, err
+ }
+
+ return s.compact(rev)
+}
+
+func (s *store) Compact(rev int64) (<-chan struct{}, error) {
+ s.mu.Lock()
+
+ ch, err := s.updateCompactRev(rev)
+
+ if err != nil {
+ s.mu.Unlock()
+ return ch, err
+ }
+ s.mu.Unlock()
+
+ return s.compact(rev)
+}
+
+// DefaultIgnores is a map of keys to ignore in hash checking.
+var DefaultIgnores map[backend.IgnoreKey]struct{}
+
+func init() {
+ DefaultIgnores = map[backend.IgnoreKey]struct{}{
+ // consistent index might be changed due to v2 internal sync, which
+ // is not controllable by the user.
+ {Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
+ }
+}
+
+func (s *store) Commit() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ tx := s.b.BatchTx()
+ tx.Lock()
+ s.saveIndex(tx)
+ tx.Unlock()
+ s.b.ForceCommit()
+}
+
+func (s *store) Restore(b backend.Backend) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ close(s.stopc)
+ s.fifoSched.Stop()
+
+ atomic.StoreUint64(&s.consistentIndex, 0)
+ s.b = b
+ s.kvindex = newTreeIndex(s.lg)
+ s.currentRev = 1
+ s.compactMainRev = -1
+ s.fifoSched = schedule.NewFIFOScheduler()
+ s.stopc = make(chan struct{})
+
+ return s.restore()
+}
+
+func (s *store) restore() error {
+ s.setupMetricsReporter()
+
+ min, max := newRevBytes(), newRevBytes()
+ revToBytes(revision{main: 1}, min)
+ revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
+
+ keyToLease := make(map[string]lease.LeaseID)
+
+ // restore index
+ tx := s.b.BatchTx()
+ tx.Lock()
+
+ _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
+ if len(finishedCompactBytes) != 0 {
+ s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
+
+ if s.lg != nil {
+ s.lg.Info(
+ "restored last compact revision",
+ zap.String("meta-bucket-name", string(metaBucketName)),
+ zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
+ zap.Int64("restored-compact-revision", s.compactMainRev),
+ )
+ } else {
+ plog.Printf("restore compact to %d", s.compactMainRev)
+ }
+ }
+ _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
+ scheduledCompact := int64(0)
+ if len(scheduledCompactBytes) != 0 {
+ scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
+ }
+
+ // index keys concurrently as they're loaded in from tx
+ keysGauge.Set(0)
+ rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
+ for {
+ keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
+ if len(keys) == 0 {
+ break
+ }
+ // rkvc blocks if the total pending keys exceeds the restore
+ // chunk size to keep keys from consuming too much memory.
+ restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
+ if len(keys) < restoreChunkKeys {
+ // partial set implies final set
+ break
+ }
+ // next set begins after where this one ended
+ newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
+ newMin.sub++
+ revToBytes(newMin, min)
+ }
+ close(rkvc)
+ s.currentRev = <-revc
+
+ // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
+ // the correct revision should be set to compaction revision in the case, not the largest revision
+ // we have seen.
+ if s.currentRev < s.compactMainRev {
+ s.currentRev = s.compactMainRev
+ }
+ if scheduledCompact <= s.compactMainRev {
+ scheduledCompact = 0
+ }
+
+ for key, lid := range keyToLease {
+ if s.le == nil {
+ panic("no lessor to attach lease")
+ }
+ err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
+ if err != nil {
+ if s.lg != nil {
+ s.lg.Warn(
+ "failed to attach a lease",
+ zap.String("lease-id", fmt.Sprintf("%016x", lid)),
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("unexpected Attach error: %v", err)
+ }
+ }
+ }
+
+ tx.Unlock()
+
+ if scheduledCompact != 0 {
+ s.compactLockfree(scheduledCompact)
+
+ if s.lg != nil {
+ s.lg.Info(
+ "resume scheduled compaction",
+ zap.String("meta-bucket-name", string(metaBucketName)),
+ zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
+ zap.Int64("scheduled-compact-revision", scheduledCompact),
+ )
+ } else {
+ plog.Printf("resume scheduled compaction at %d", scheduledCompact)
+ }
+ }
+
+ return nil
+}
+
+type revKeyValue struct {
+ key []byte
+ kv mvccpb.KeyValue
+ kstr string
+}
+
+func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
+ rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
+ go func() {
+ currentRev := int64(1)
+ defer func() { revc <- currentRev }()
+ // restore the tree index from streaming the unordered index.
+ kiCache := make(map[string]*keyIndex, restoreChunkKeys)
+ for rkv := range rkvc {
+ ki, ok := kiCache[rkv.kstr]
+ // purge kiCache if many keys but still missing in the cache
+ if !ok && len(kiCache) >= restoreChunkKeys {
+ i := 10
+ for k := range kiCache {
+ delete(kiCache, k)
+ if i--; i == 0 {
+ break
+ }
+ }
+ }
+ // cache miss, fetch from tree index if there
+ if !ok {
+ ki = &keyIndex{key: rkv.kv.Key}
+ if idxKey := idx.KeyIndex(ki); idxKey != nil {
+ kiCache[rkv.kstr], ki = idxKey, idxKey
+ ok = true
+ }
+ }
+ rev := bytesToRev(rkv.key)
+ currentRev = rev.main
+ if ok {
+ if isTombstone(rkv.key) {
+ ki.tombstone(lg, rev.main, rev.sub)
+ continue
+ }
+ ki.put(lg, rev.main, rev.sub)
+ } else if !isTombstone(rkv.key) {
+ ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
+ idx.Insert(ki)
+ kiCache[rkv.kstr] = ki
+ }
+ }
+ }()
+ return rkvc, revc
+}
+
+func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
+ for i, key := range keys {
+ rkv := revKeyValue{key: key}
+ if err := rkv.kv.Unmarshal(vals[i]); err != nil {
+ if lg != nil {
+ lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
+ } else {
+ plog.Fatalf("cannot unmarshal event: %v", err)
+ }
+ }
+ rkv.kstr = string(rkv.kv.Key)
+ if isTombstone(key) {
+ delete(keyToLease, rkv.kstr)
+ } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
+ keyToLease[rkv.kstr] = lid
+ } else {
+ delete(keyToLease, rkv.kstr)
+ }
+ kvc <- rkv
+ }
+}
+
+func (s *store) Close() error {
+ close(s.stopc)
+ s.fifoSched.Stop()
+ return nil
+}
+
+func (s *store) saveIndex(tx backend.BatchTx) {
+ if s.ig == nil {
+ return
+ }
+ bs := s.bytesBuf8
+ ci := s.ig.ConsistentIndex()
+ binary.BigEndian.PutUint64(bs, ci)
+ // put the index into the underlying backend
+ // tx has been locked in TxnBegin, so there is no need to lock it again
+ tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
+ atomic.StoreUint64(&s.consistentIndex, ci)
+}
+
+func (s *store) ConsistentIndex() uint64 {
+ if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 {
+ return ci
+ }
+ tx := s.b.BatchTx()
+ tx.Lock()
+ defer tx.Unlock()
+ _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
+ if len(vs) == 0 {
+ return 0
+ }
+ v := binary.BigEndian.Uint64(vs[0])
+ atomic.StoreUint64(&s.consistentIndex, v)
+ return v
+}
+
+func (s *store) setupMetricsReporter() {
+ b := s.b
+ reportDbTotalSizeInBytesMu.Lock()
+ reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
+ reportDbTotalSizeInBytesMu.Unlock()
+ reportDbTotalSizeInBytesDebugMu.Lock()
+ reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) }
+ reportDbTotalSizeInBytesDebugMu.Unlock()
+ reportDbTotalSizeInUseInBytesMu.Lock()
+ reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
+ reportDbTotalSizeInUseInBytesMu.Unlock()
+ reportDbOpenReadTxNMu.Lock()
+ reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
+ reportDbOpenReadTxNMu.Unlock()
+ reportCurrentRevMu.Lock()
+ reportCurrentRev = func() float64 {
+ s.revMu.RLock()
+ defer s.revMu.RUnlock()
+ return float64(s.currentRev)
+ }
+ reportCurrentRevMu.Unlock()
+ reportCompactRevMu.Lock()
+ reportCompactRev = func() float64 {
+ s.revMu.RLock()
+ defer s.revMu.RUnlock()
+ return float64(s.compactMainRev)
+ }
+ reportCompactRevMu.Unlock()
+}
+
+// appendMarkTombstone appends tombstone mark to normal revision bytes.
+func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
+ if len(b) != revBytesLen {
+ if lg != nil {
+ lg.Panic(
+ "cannot append tombstone mark to non-normal revision bytes",
+ zap.Int("expected-revision-bytes-size", revBytesLen),
+ zap.Int("given-revision-bytes-size", len(b)),
+ )
+ } else {
+ plog.Panicf("cannot append mark to non normal revision bytes")
+ }
+ }
+ return append(b, markTombstone)
+}
+
+// isTombstone checks whether the revision bytes is a tombstone.
+func isTombstone(b []byte) bool {
+ return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go b/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go
new file mode 100644
index 0000000..2adb498
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go
@@ -0,0 +1,79 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "encoding/binary"
+ "time"
+
+ "go.uber.org/zap"
+)
+
+func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
+ totalStart := time.Now()
+ defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
+ keyCompactions := 0
+ defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }()
+
+ end := make([]byte, 8)
+ binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
+
+ last := make([]byte, 8+1+8)
+ for {
+ var rev revision
+
+ start := time.Now()
+
+ tx := s.b.BatchTx()
+ tx.Lock()
+ keys, _ := tx.UnsafeRange(keyBucketName, last, end, int64(s.cfg.CompactionBatchLimit))
+ for _, key := range keys {
+ rev = bytesToRev(key)
+ if _, ok := keep[rev]; !ok {
+ tx.UnsafeDelete(keyBucketName, key)
+ }
+ }
+
+ if len(keys) < s.cfg.CompactionBatchLimit {
+ rbytes := make([]byte, 8+1+8)
+ revToBytes(revision{main: compactMainRev}, rbytes)
+ tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
+ tx.Unlock()
+ if s.lg != nil {
+ s.lg.Info(
+ "finished scheduled compaction",
+ zap.Int64("compact-revision", compactMainRev),
+ zap.Duration("took", time.Since(totalStart)),
+ )
+ } else {
+ plog.Infof("finished scheduled compaction at %d (took %v)", compactMainRev, time.Since(totalStart))
+ }
+ return true
+ }
+
+ // update last
+ revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
+ tx.Unlock()
+ // Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
+ s.b.ForceCommit()
+ dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
+
+ select {
+ case <-time.After(10 * time.Millisecond):
+ case <-s.stopc:
+ return false
+ }
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go b/vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go
new file mode 100644
index 0000000..9698254
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go
@@ -0,0 +1,313 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "go.etcd.io/etcd/lease"
+ "go.etcd.io/etcd/mvcc/backend"
+ "go.etcd.io/etcd/mvcc/mvccpb"
+ "go.uber.org/zap"
+)
+
+type storeTxnRead struct {
+ s *store
+ tx backend.ReadTx
+
+ firstRev int64
+ rev int64
+}
+
+func (s *store) Read() TxnRead {
+ s.mu.RLock()
+ s.revMu.RLock()
+ // backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
+ // ConcurrentReadTx is created, it will not block write transaction.
+ tx := s.b.ConcurrentReadTx()
+ tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
+ firstRev, rev := s.compactMainRev, s.currentRev
+ s.revMu.RUnlock()
+ return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
+}
+
+func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
+func (tr *storeTxnRead) Rev() int64 { return tr.rev }
+
+func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+ return tr.rangeKeys(key, end, tr.Rev(), ro)
+}
+
+func (tr *storeTxnRead) End() {
+ tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
+ tr.s.mu.RUnlock()
+}
+
+type storeTxnWrite struct {
+ storeTxnRead
+ tx backend.BatchTx
+ // beginRev is the revision where the txn begins; it will write to the next revision.
+ beginRev int64
+ changes []mvccpb.KeyValue
+}
+
+func (s *store) Write() TxnWrite {
+ s.mu.RLock()
+ tx := s.b.BatchTx()
+ tx.Lock()
+ tw := &storeTxnWrite{
+ storeTxnRead: storeTxnRead{s, tx, 0, 0},
+ tx: tx,
+ beginRev: s.currentRev,
+ changes: make([]mvccpb.KeyValue, 0, 4),
+ }
+ return newMetricsTxnWrite(tw)
+}
+
+func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
+
+func (tw *storeTxnWrite) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
+ rev := tw.beginRev
+ if len(tw.changes) > 0 {
+ rev++
+ }
+ return tw.rangeKeys(key, end, rev, ro)
+}
+
+func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
+ if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
+ return n, tw.beginRev + 1
+ }
+ return 0, tw.beginRev
+}
+
+func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
+ tw.put(key, value, lease)
+ return tw.beginRev + 1
+}
+
+func (tw *storeTxnWrite) End() {
+ // only update index if the txn modifies the mvcc state.
+ if len(tw.changes) != 0 {
+ tw.s.saveIndex(tw.tx)
+ // hold revMu lock to prevent new read txns from opening until writeback.
+ tw.s.revMu.Lock()
+ tw.s.currentRev++
+ }
+ tw.tx.Unlock()
+ if len(tw.changes) != 0 {
+ tw.s.revMu.Unlock()
+ }
+ tw.s.mu.RUnlock()
+}
+
+func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
+ rev := ro.Rev
+ if rev > curRev {
+ return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
+ }
+ if rev <= 0 {
+ rev = curRev
+ }
+ if rev < tr.s.compactMainRev {
+ return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
+ }
+
+ revpairs := tr.s.kvindex.Revisions(key, end, rev)
+ if len(revpairs) == 0 {
+ return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
+ }
+ if ro.Count {
+ return &RangeResult{KVs: nil, Count: len(revpairs), Rev: curRev}, nil
+ }
+
+ limit := int(ro.Limit)
+ if limit <= 0 || limit > len(revpairs) {
+ limit = len(revpairs)
+ }
+
+ kvs := make([]mvccpb.KeyValue, limit)
+ revBytes := newRevBytes()
+ for i, revpair := range revpairs[:len(kvs)] {
+ revToBytes(revpair, revBytes)
+ _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
+ if len(vs) != 1 {
+ if tr.s.lg != nil {
+ tr.s.lg.Fatal(
+ "range failed to find revision pair",
+ zap.Int64("revision-main", revpair.main),
+ zap.Int64("revision-sub", revpair.sub),
+ )
+ } else {
+ plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
+ }
+ }
+ if err := kvs[i].Unmarshal(vs[0]); err != nil {
+ if tr.s.lg != nil {
+ tr.s.lg.Fatal(
+ "failed to unmarshal mvccpb.KeyValue",
+ zap.Error(err),
+ )
+ } else {
+ plog.Fatalf("cannot unmarshal event: %v", err)
+ }
+ }
+ }
+ return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
+}
+
+func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
+ rev := tw.beginRev + 1
+ c := rev
+ oldLease := lease.NoLease
+
+ // if the key exists before, use its previous created and
+ // get its previous leaseID
+ _, created, ver, err := tw.s.kvindex.Get(key, rev)
+ if err == nil {
+ c = created.main
+ oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
+ }
+
+ ibytes := newRevBytes()
+ idxRev := revision{main: rev, sub: int64(len(tw.changes))}
+ revToBytes(idxRev, ibytes)
+
+ ver = ver + 1
+ kv := mvccpb.KeyValue{
+ Key: key,
+ Value: value,
+ CreateRevision: c,
+ ModRevision: rev,
+ Version: ver,
+ Lease: int64(leaseID),
+ }
+
+ d, err := kv.Marshal()
+ if err != nil {
+ if tw.storeTxnRead.s.lg != nil {
+ tw.storeTxnRead.s.lg.Fatal(
+ "failed to marshal mvccpb.KeyValue",
+ zap.Error(err),
+ )
+ } else {
+ plog.Fatalf("cannot marshal event: %v", err)
+ }
+ }
+
+ tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
+ tw.s.kvindex.Put(key, idxRev)
+ tw.changes = append(tw.changes, kv)
+
+ if oldLease != lease.NoLease {
+ if tw.s.le == nil {
+ panic("no lessor to detach lease")
+ }
+ err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
+ if err != nil {
+ if tw.storeTxnRead.s.lg != nil {
+ tw.storeTxnRead.s.lg.Fatal(
+ "failed to detach old lease from a key",
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("unexpected error from lease detach: %v", err)
+ }
+ }
+ }
+ if leaseID != lease.NoLease {
+ if tw.s.le == nil {
+ panic("no lessor to attach lease")
+ }
+ err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
+ if err != nil {
+ panic("unexpected error from lease Attach")
+ }
+ }
+}
+
+func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
+ rrev := tw.beginRev
+ if len(tw.changes) > 0 {
+ rrev++
+ }
+ keys, _ := tw.s.kvindex.Range(key, end, rrev)
+ if len(keys) == 0 {
+ return 0
+ }
+ for _, key := range keys {
+ tw.delete(key)
+ }
+ return int64(len(keys))
+}
+
+func (tw *storeTxnWrite) delete(key []byte) {
+ ibytes := newRevBytes()
+ idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
+ revToBytes(idxRev, ibytes)
+
+ if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
+ ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
+ } else {
+ // TODO: remove this in v3.5
+ ibytes = appendMarkTombstone(nil, ibytes)
+ }
+
+ kv := mvccpb.KeyValue{Key: key}
+
+ d, err := kv.Marshal()
+ if err != nil {
+ if tw.storeTxnRead.s.lg != nil {
+ tw.storeTxnRead.s.lg.Fatal(
+ "failed to marshal mvccpb.KeyValue",
+ zap.Error(err),
+ )
+ } else {
+ plog.Fatalf("cannot marshal event: %v", err)
+ }
+ }
+
+ tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
+ err = tw.s.kvindex.Tombstone(key, idxRev)
+ if err != nil {
+ if tw.storeTxnRead.s.lg != nil {
+ tw.storeTxnRead.s.lg.Fatal(
+ "failed to tombstone an existing key",
+ zap.String("key", string(key)),
+ zap.Error(err),
+ )
+ } else {
+ plog.Fatalf("cannot tombstone an existing key (%s): %v", string(key), err)
+ }
+ }
+ tw.changes = append(tw.changes, kv)
+
+ item := lease.LeaseItem{Key: string(key)}
+ leaseID := tw.s.le.GetLease(item)
+
+ if leaseID != lease.NoLease {
+ err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
+ if err != nil {
+ if tw.storeTxnRead.s.lg != nil {
+ tw.storeTxnRead.s.lg.Fatal(
+ "failed to detach old lease from a key",
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("cannot detach %v", err)
+ }
+ }
+ }
+}
+
+func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }
diff --git a/vendor/go.etcd.io/etcd/mvcc/metrics.go b/vendor/go.etcd.io/etcd/mvcc/metrics.go
new file mode 100644
index 0000000..7526ee4
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/metrics.go
@@ -0,0 +1,336 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "sync"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+ rangeCounter = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "mvcc",
+ Name: "range_total",
+ Help: "Total number of ranges seen by this member.",
+ })
+ rangeCounterDebug = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "range_total",
+ Help: "Total number of ranges seen by this member.",
+ })
+
+ putCounter = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "mvcc",
+ Name: "put_total",
+ Help: "Total number of puts seen by this member.",
+ })
+ // TODO: remove in 3.5 release
+ putCounterDebug = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "put_total",
+ Help: "Total number of puts seen by this member.",
+ })
+
+ deleteCounter = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "mvcc",
+ Name: "delete_total",
+ Help: "Total number of deletes seen by this member.",
+ })
+ // TODO: remove in 3.5 release
+ deleteCounterDebug = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "delete_total",
+ Help: "Total number of deletes seen by this member.",
+ })
+
+ txnCounter = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "mvcc",
+ Name: "txn_total",
+ Help: "Total number of txns seen by this member.",
+ })
+ txnCounterDebug = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "txn_total",
+ Help: "Total number of txns seen by this member.",
+ })
+
+ keysGauge = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "keys_total",
+ Help: "Total number of keys.",
+ })
+
+ watchStreamGauge = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "watch_stream_total",
+ Help: "Total number of watch streams.",
+ })
+
+ watcherGauge = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "watcher_total",
+ Help: "Total number of watchers.",
+ })
+
+ slowWatcherGauge = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "slow_watcher_total",
+ Help: "Total number of unsynced slow watchers.",
+ })
+
+ totalEventsCounter = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "events_total",
+ Help: "Total number of events sent by this member.",
+ })
+
+ pendingEventsGauge = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "pending_events_total",
+ Help: "Total number of pending events to be sent.",
+ })
+
+ indexCompactionPauseMs = prometheus.NewHistogram(
+ prometheus.HistogramOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "index_compaction_pause_duration_milliseconds",
+ Help: "Bucketed histogram of index compaction pause duration.",
+
+ // lowest bucket start of upper bound 0.5 ms with factor 2
+ // highest bucket start of 0.5 ms * 2^13 == 4.096 sec
+ Buckets: prometheus.ExponentialBuckets(0.5, 2, 14),
+ })
+
+ dbCompactionPauseMs = prometheus.NewHistogram(
+ prometheus.HistogramOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "db_compaction_pause_duration_milliseconds",
+ Help: "Bucketed histogram of db compaction pause duration.",
+
+ // lowest bucket start of upper bound 1 ms with factor 2
+ // highest bucket start of 1 ms * 2^12 == 4.096 sec
+ Buckets: prometheus.ExponentialBuckets(1, 2, 13),
+ })
+
+ dbCompactionTotalMs = prometheus.NewHistogram(
+ prometheus.HistogramOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "db_compaction_total_duration_milliseconds",
+ Help: "Bucketed histogram of db compaction total duration.",
+
+ // lowest bucket start of upper bound 100 ms with factor 2
+ // highest bucket start of 100 ms * 2^13 == 8.192 sec
+ Buckets: prometheus.ExponentialBuckets(100, 2, 14),
+ })
+
+ dbCompactionKeysCounter = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "db_compaction_keys_total",
+ Help: "Total number of db keys compacted.",
+ })
+
+ dbTotalSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "mvcc",
+ Name: "db_total_size_in_bytes",
+ Help: "Total size of the underlying database physically allocated in bytes.",
+ },
+ func() float64 {
+ reportDbTotalSizeInBytesMu.RLock()
+ defer reportDbTotalSizeInBytesMu.RUnlock()
+ return reportDbTotalSizeInBytes()
+ },
+ )
+ // overridden by mvcc initialization
+ reportDbTotalSizeInBytesMu sync.RWMutex
+ reportDbTotalSizeInBytes = func() float64 { return 0 }
+
+ // TODO: remove this in v3.5
+ dbTotalSizeDebug = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "db_total_size_in_bytes",
+ Help: "Total size of the underlying database physically allocated in bytes.",
+ },
+ func() float64 {
+ reportDbTotalSizeInBytesDebugMu.RLock()
+ defer reportDbTotalSizeInBytesDebugMu.RUnlock()
+ return reportDbTotalSizeInBytesDebug()
+ },
+ )
+ // overridden by mvcc initialization
+ reportDbTotalSizeInBytesDebugMu sync.RWMutex
+ reportDbTotalSizeInBytesDebug = func() float64 { return 0 }
+
+ dbTotalSizeInUse = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "mvcc",
+ Name: "db_total_size_in_use_in_bytes",
+ Help: "Total size of the underlying database logically in use in bytes.",
+ },
+ func() float64 {
+ reportDbTotalSizeInUseInBytesMu.RLock()
+ defer reportDbTotalSizeInUseInBytesMu.RUnlock()
+ return reportDbTotalSizeInUseInBytes()
+ },
+ )
+ // overridden by mvcc initialization
+ reportDbTotalSizeInUseInBytesMu sync.RWMutex
+ reportDbTotalSizeInUseInBytes = func() float64 { return 0 }
+
+ dbOpenReadTxN = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "mvcc",
+ Name: "db_open_read_transactions",
+ Help: "The number of currently open read transactions",
+ },
+
+ func() float64 {
+ reportDbOpenReadTxNMu.RLock()
+ defer reportDbOpenReadTxNMu.RUnlock()
+ return reportDbOpenReadTxN()
+ },
+ )
+ // overridden by mvcc initialization
+ reportDbOpenReadTxNMu sync.RWMutex
+ reportDbOpenReadTxN = func() float64 { return 0 }
+
+ hashSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "mvcc",
+ Name: "hash_duration_seconds",
+ Help: "The latency distribution of storage hash operation.",
+
+ // 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
+ // lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
+ // highest bucket start of 0.01 sec * 2^14 == 163.84 sec
+ Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
+ })
+
+ hashRevSec = prometheus.NewHistogram(prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "mvcc",
+ Name: "hash_rev_duration_seconds",
+ Help: "The latency distribution of storage hash by revision operation.",
+
+ // 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
+ // lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
+ // highest bucket start of 0.01 sec * 2^14 == 163.84 sec
+ Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
+ })
+
+ currentRev = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "current_revision",
+ Help: "The current revision of store.",
+ },
+ func() float64 {
+ reportCurrentRevMu.RLock()
+ defer reportCurrentRevMu.RUnlock()
+ return reportCurrentRev()
+ },
+ )
+ // overridden by mvcc initialization
+ reportCurrentRevMu sync.RWMutex
+ reportCurrentRev = func() float64 { return 0 }
+
+ compactRev = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Namespace: "etcd_debugging",
+ Subsystem: "mvcc",
+ Name: "compact_revision",
+ Help: "The revision of the last compaction in store.",
+ },
+ func() float64 {
+ reportCompactRevMu.RLock()
+ defer reportCompactRevMu.RUnlock()
+ return reportCompactRev()
+ },
+ )
+ // overridden by mvcc initialization
+ reportCompactRevMu sync.RWMutex
+ reportCompactRev = func() float64 { return 0 }
+)
+
+func init() {
+ prometheus.MustRegister(rangeCounter)
+ prometheus.MustRegister(rangeCounterDebug)
+ prometheus.MustRegister(putCounter)
+ prometheus.MustRegister(putCounterDebug)
+ prometheus.MustRegister(deleteCounter)
+ prometheus.MustRegister(deleteCounterDebug)
+ prometheus.MustRegister(txnCounter)
+ prometheus.MustRegister(txnCounterDebug)
+ prometheus.MustRegister(keysGauge)
+ prometheus.MustRegister(watchStreamGauge)
+ prometheus.MustRegister(watcherGauge)
+ prometheus.MustRegister(slowWatcherGauge)
+ prometheus.MustRegister(totalEventsCounter)
+ prometheus.MustRegister(pendingEventsGauge)
+ prometheus.MustRegister(indexCompactionPauseMs)
+ prometheus.MustRegister(dbCompactionPauseMs)
+ prometheus.MustRegister(dbCompactionTotalMs)
+ prometheus.MustRegister(dbCompactionKeysCounter)
+ prometheus.MustRegister(dbTotalSize)
+ prometheus.MustRegister(dbTotalSizeDebug)
+ prometheus.MustRegister(dbTotalSizeInUse)
+ prometheus.MustRegister(dbOpenReadTxN)
+ prometheus.MustRegister(hashSec)
+ prometheus.MustRegister(hashRevSec)
+ prometheus.MustRegister(currentRev)
+ prometheus.MustRegister(compactRev)
+}
+
+// ReportEventReceived reports that an event is received.
+// This function should be called when the external systems received an
+// event from mvcc.Watcher.
+func ReportEventReceived(n int) {
+ pendingEventsGauge.Sub(float64(n))
+ totalEventsCounter.Add(float64(n))
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/metrics_txn.go b/vendor/go.etcd.io/etcd/mvcc/metrics_txn.go
new file mode 100644
index 0000000..64b629c
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/metrics_txn.go
@@ -0,0 +1,67 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import "go.etcd.io/etcd/lease"
+
+type metricsTxnWrite struct {
+ TxnWrite
+ ranges uint
+ puts uint
+ deletes uint
+}
+
+func newMetricsTxnRead(tr TxnRead) TxnRead {
+ return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0}
+}
+
+func newMetricsTxnWrite(tw TxnWrite) TxnWrite {
+ return &metricsTxnWrite{tw, 0, 0, 0}
+}
+
+func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) {
+ tw.ranges++
+ return tw.TxnWrite.Range(key, end, ro)
+}
+
+func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) {
+ tw.deletes++
+ return tw.TxnWrite.DeleteRange(key, end)
+}
+
+func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
+ tw.puts++
+ return tw.TxnWrite.Put(key, value, lease)
+}
+
+func (tw *metricsTxnWrite) End() {
+ defer tw.TxnWrite.End()
+ if sum := tw.ranges + tw.puts + tw.deletes; sum > 1 {
+ txnCounter.Inc()
+ txnCounterDebug.Inc() // TODO: remove in 3.5 release
+ }
+
+ ranges := float64(tw.ranges)
+ rangeCounter.Add(ranges)
+ rangeCounterDebug.Add(ranges) // TODO: remove in 3.5 release
+
+ puts := float64(tw.puts)
+ putCounter.Add(puts)
+ putCounterDebug.Add(puts) // TODO: remove in 3.5 release
+
+ deletes := float64(tw.deletes)
+ deleteCounter.Add(deletes)
+ deleteCounterDebug.Add(deletes) // TODO: remove in 3.5 release
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/revision.go b/vendor/go.etcd.io/etcd/mvcc/revision.go
new file mode 100644
index 0000000..d621386
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/revision.go
@@ -0,0 +1,67 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import "encoding/binary"
+
+// revBytesLen is the byte length of a normal revision.
+// First 8 bytes is the revision.main in big-endian format. The 9th byte
+// is a '_'. The last 8 bytes is the revision.sub in big-endian format.
+const revBytesLen = 8 + 1 + 8
+
+// A revision indicates modification of the key-value space.
+// The set of changes that share same main revision changes the key-value space atomically.
+type revision struct {
+ // main is the main revision of a set of changes that happen atomically.
+ main int64
+
+ // sub is the sub revision of a change in a set of changes that happen
+ // atomically. Each change has different increasing sub revision in that
+ // set.
+ sub int64
+}
+
+func (a revision) GreaterThan(b revision) bool {
+ if a.main > b.main {
+ return true
+ }
+ if a.main < b.main {
+ return false
+ }
+ return a.sub > b.sub
+}
+
+func newRevBytes() []byte {
+ return make([]byte, revBytesLen, markedRevBytesLen)
+}
+
+func revToBytes(rev revision, bytes []byte) {
+ binary.BigEndian.PutUint64(bytes, uint64(rev.main))
+ bytes[8] = '_'
+ binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub))
+}
+
+func bytesToRev(bytes []byte) revision {
+ return revision{
+ main: int64(binary.BigEndian.Uint64(bytes[0:8])),
+ sub: int64(binary.BigEndian.Uint64(bytes[9:])),
+ }
+}
+
+type revisions []revision
+
+func (a revisions) Len() int { return len(a) }
+func (a revisions) Less(i, j int) bool { return a[j].GreaterThan(a[i]) }
+func (a revisions) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
diff --git a/vendor/go.etcd.io/etcd/mvcc/util.go b/vendor/go.etcd.io/etcd/mvcc/util.go
new file mode 100644
index 0000000..032621a
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/util.go
@@ -0,0 +1,57 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "encoding/binary"
+ "fmt"
+
+ "go.etcd.io/etcd/mvcc/backend"
+ "go.etcd.io/etcd/mvcc/mvccpb"
+)
+
+func UpdateConsistentIndex(be backend.Backend, index uint64) {
+ tx := be.BatchTx()
+ tx.Lock()
+ defer tx.Unlock()
+
+ var oldi uint64
+ _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
+ if len(vs) != 0 {
+ oldi = binary.BigEndian.Uint64(vs[0])
+ }
+
+ if index <= oldi {
+ return
+ }
+
+ bs := make([]byte, 8)
+ binary.BigEndian.PutUint64(bs, index)
+ tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
+}
+
+func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
+ ibytes := newRevBytes()
+ revToBytes(revision{main: kv.ModRevision}, ibytes)
+
+ d, err := kv.Marshal()
+ if err != nil {
+ panic(fmt.Errorf("cannot marshal event: %v", err))
+ }
+
+ be.BatchTx().Lock()
+ be.BatchTx().UnsafePut(keyBucketName, ibytes, d)
+ be.BatchTx().Unlock()
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/watchable_store.go b/vendor/go.etcd.io/etcd/mvcc/watchable_store.go
new file mode 100644
index 0000000..3cf491d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/watchable_store.go
@@ -0,0 +1,552 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "sync"
+ "time"
+
+ "go.etcd.io/etcd/lease"
+ "go.etcd.io/etcd/mvcc/backend"
+ "go.etcd.io/etcd/mvcc/mvccpb"
+ "go.uber.org/zap"
+)
+
+// non-const so modifiable by tests
+var (
+ // chanBufLen is the length of the buffered chan
+ // for sending out watched events.
+ // TODO: find a good buf value. 1024 is just a random one that
+ // seems to be reasonable.
+ chanBufLen = 1024
+
+ // maxWatchersPerSync is the number of watchers to sync in a single batch
+ maxWatchersPerSync = 512
+)
+
+type watchable interface {
+ watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
+ progress(w *watcher)
+ rev() int64
+}
+
+type watchableStore struct {
+ *store
+
+ // mu protects watcher groups and batches. It should never be locked
+ // before locking store.mu to avoid deadlock.
+ mu sync.RWMutex
+
+ // victims are watcher batches that were blocked on the watch channel
+ victims []watcherBatch
+ victimc chan struct{}
+
+ // contains all unsynced watchers that needs to sync with events that have happened
+ unsynced watcherGroup
+
+ // contains all synced watchers that are in sync with the progress of the store.
+ // The key of the map is the key that the watcher watches on.
+ synced watcherGroup
+
+ stopc chan struct{}
+ wg sync.WaitGroup
+}
+
+// cancelFunc updates unsynced and synced maps when running
+// cancel operations.
+type cancelFunc func()
+
+func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
+ return newWatchableStore(lg, b, le, ig, cfg)
+}
+
+func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
+ s := &watchableStore{
+ store: NewStore(lg, b, le, ig, cfg),
+ victimc: make(chan struct{}, 1),
+ unsynced: newWatcherGroup(),
+ synced: newWatcherGroup(),
+ stopc: make(chan struct{}),
+ }
+ s.store.ReadView = &readView{s}
+ s.store.WriteView = &writeView{s}
+ if s.le != nil {
+ // use this store as the deleter so revokes trigger watch events
+ s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
+ }
+ s.wg.Add(2)
+ go s.syncWatchersLoop()
+ go s.syncVictimsLoop()
+ return s
+}
+
+func (s *watchableStore) Close() error {
+ close(s.stopc)
+ s.wg.Wait()
+ return s.store.Close()
+}
+
+func (s *watchableStore) NewWatchStream() WatchStream {
+ watchStreamGauge.Inc()
+ return &watchStream{
+ watchable: s,
+ ch: make(chan WatchResponse, chanBufLen),
+ cancels: make(map[WatchID]cancelFunc),
+ watchers: make(map[WatchID]*watcher),
+ }
+}
+
+func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
+ wa := &watcher{
+ key: key,
+ end: end,
+ minRev: startRev,
+ id: id,
+ ch: ch,
+ fcs: fcs,
+ }
+
+ s.mu.Lock()
+ s.revMu.RLock()
+ synced := startRev > s.store.currentRev || startRev == 0
+ if synced {
+ wa.minRev = s.store.currentRev + 1
+ if startRev > wa.minRev {
+ wa.minRev = startRev
+ }
+ }
+ if synced {
+ s.synced.add(wa)
+ } else {
+ slowWatcherGauge.Inc()
+ s.unsynced.add(wa)
+ }
+ s.revMu.RUnlock()
+ s.mu.Unlock()
+
+ watcherGauge.Inc()
+
+ return wa, func() { s.cancelWatcher(wa) }
+}
+
+// cancelWatcher removes references of the watcher from the watchableStore
+func (s *watchableStore) cancelWatcher(wa *watcher) {
+ for {
+ s.mu.Lock()
+ if s.unsynced.delete(wa) {
+ slowWatcherGauge.Dec()
+ break
+ } else if s.synced.delete(wa) {
+ break
+ } else if wa.compacted {
+ break
+ } else if wa.ch == nil {
+ // already canceled (e.g., cancel/close race)
+ break
+ }
+
+ if !wa.victim {
+ panic("watcher not victim but not in watch groups")
+ }
+
+ var victimBatch watcherBatch
+ for _, wb := range s.victims {
+ if wb[wa] != nil {
+ victimBatch = wb
+ break
+ }
+ }
+ if victimBatch != nil {
+ slowWatcherGauge.Dec()
+ delete(victimBatch, wa)
+ break
+ }
+
+ // victim being processed so not accessible; retry
+ s.mu.Unlock()
+ time.Sleep(time.Millisecond)
+ }
+
+ watcherGauge.Dec()
+ wa.ch = nil
+ s.mu.Unlock()
+}
+
+func (s *watchableStore) Restore(b backend.Backend) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ err := s.store.Restore(b)
+ if err != nil {
+ return err
+ }
+
+ for wa := range s.synced.watchers {
+ wa.restore = true
+ s.unsynced.add(wa)
+ }
+ s.synced = newWatcherGroup()
+ return nil
+}
+
+// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
+func (s *watchableStore) syncWatchersLoop() {
+ defer s.wg.Done()
+
+ for {
+ s.mu.RLock()
+ st := time.Now()
+ lastUnsyncedWatchers := s.unsynced.size()
+ s.mu.RUnlock()
+
+ unsyncedWatchers := 0
+ if lastUnsyncedWatchers > 0 {
+ unsyncedWatchers = s.syncWatchers()
+ }
+ syncDuration := time.Since(st)
+
+ waitDuration := 100 * time.Millisecond
+ // more work pending?
+ if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
+ // be fair to other store operations by yielding time taken
+ waitDuration = syncDuration
+ }
+
+ select {
+ case <-time.After(waitDuration):
+ case <-s.stopc:
+ return
+ }
+ }
+}
+
+// syncVictimsLoop tries to write precomputed watcher responses to
+// watchers that had a blocked watcher channel
+func (s *watchableStore) syncVictimsLoop() {
+ defer s.wg.Done()
+
+ for {
+ for s.moveVictims() != 0 {
+ // try to update all victim watchers
+ }
+ s.mu.RLock()
+ isEmpty := len(s.victims) == 0
+ s.mu.RUnlock()
+
+ var tickc <-chan time.Time
+ if !isEmpty {
+ tickc = time.After(10 * time.Millisecond)
+ }
+
+ select {
+ case <-tickc:
+ case <-s.victimc:
+ case <-s.stopc:
+ return
+ }
+ }
+}
+
+// moveVictims tries to update watches with already pending event data
+func (s *watchableStore) moveVictims() (moved int) {
+ s.mu.Lock()
+ victims := s.victims
+ s.victims = nil
+ s.mu.Unlock()
+
+ var newVictim watcherBatch
+ for _, wb := range victims {
+ // try to send responses again
+ for w, eb := range wb {
+ // watcher has observed the store up to, but not including, w.minRev
+ rev := w.minRev - 1
+ if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
+ pendingEventsGauge.Add(float64(len(eb.evs)))
+ } else {
+ if newVictim == nil {
+ newVictim = make(watcherBatch)
+ }
+ newVictim[w] = eb
+ continue
+ }
+ moved++
+ }
+
+ // assign completed victim watchers to unsync/sync
+ s.mu.Lock()
+ s.store.revMu.RLock()
+ curRev := s.store.currentRev
+ for w, eb := range wb {
+ if newVictim != nil && newVictim[w] != nil {
+ // couldn't send watch response; stays victim
+ continue
+ }
+ w.victim = false
+ if eb.moreRev != 0 {
+ w.minRev = eb.moreRev
+ }
+ if w.minRev <= curRev {
+ s.unsynced.add(w)
+ } else {
+ slowWatcherGauge.Dec()
+ s.synced.add(w)
+ }
+ }
+ s.store.revMu.RUnlock()
+ s.mu.Unlock()
+ }
+
+ if len(newVictim) > 0 {
+ s.mu.Lock()
+ s.victims = append(s.victims, newVictim)
+ s.mu.Unlock()
+ }
+
+ return moved
+}
+
+// syncWatchers syncs unsynced watchers by:
+// 1. choose a set of watchers from the unsynced watcher group
+// 2. iterate over the set to get the minimum revision and remove compacted watchers
+// 3. use minimum revision to get all key-value pairs and send those events to watchers
+// 4. remove synced watchers in set from unsynced group and move to synced group
+func (s *watchableStore) syncWatchers() int {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.unsynced.size() == 0 {
+ return 0
+ }
+
+ s.store.revMu.RLock()
+ defer s.store.revMu.RUnlock()
+
+ // in order to find key-value pairs from unsynced watchers, we need to
+ // find min revision index, and these revisions can be used to
+ // query the backend store of key-value pairs
+ curRev := s.store.currentRev
+ compactionRev := s.store.compactMainRev
+
+ wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
+ minBytes, maxBytes := newRevBytes(), newRevBytes()
+ revToBytes(revision{main: minRev}, minBytes)
+ revToBytes(revision{main: curRev + 1}, maxBytes)
+
+ // UnsafeRange returns keys and values. And in boltdb, keys are revisions.
+ // values are actual key-value pairs in backend.
+ tx := s.store.b.ReadTx()
+ tx.RLock()
+ revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
+ var evs []mvccpb.Event
+ if s.store != nil && s.store.lg != nil {
+ evs = kvsToEvents(s.store.lg, wg, revs, vs)
+ } else {
+ // TODO: remove this in v3.5
+ evs = kvsToEvents(nil, wg, revs, vs)
+ }
+ tx.RUnlock()
+
+ var victims watcherBatch
+ wb := newWatcherBatch(wg, evs)
+ for w := range wg.watchers {
+ w.minRev = curRev + 1
+
+ eb, ok := wb[w]
+ if !ok {
+ // bring un-notified watcher to synced
+ s.synced.add(w)
+ s.unsynced.delete(w)
+ continue
+ }
+
+ if eb.moreRev != 0 {
+ w.minRev = eb.moreRev
+ }
+
+ if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
+ pendingEventsGauge.Add(float64(len(eb.evs)))
+ } else {
+ if victims == nil {
+ victims = make(watcherBatch)
+ }
+ w.victim = true
+ }
+
+ if w.victim {
+ victims[w] = eb
+ } else {
+ if eb.moreRev != 0 {
+ // stay unsynced; more to read
+ continue
+ }
+ s.synced.add(w)
+ }
+ s.unsynced.delete(w)
+ }
+ s.addVictim(victims)
+
+ vsz := 0
+ for _, v := range s.victims {
+ vsz += len(v)
+ }
+ slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
+
+ return s.unsynced.size()
+}
+
+// kvsToEvents gets all events for the watchers from all key-value pairs
+func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
+ for i, v := range vals {
+ var kv mvccpb.KeyValue
+ if err := kv.Unmarshal(v); err != nil {
+ if lg != nil {
+ lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
+ } else {
+ plog.Panicf("cannot unmarshal event: %v", err)
+ }
+ }
+
+ if !wg.contains(string(kv.Key)) {
+ continue
+ }
+
+ ty := mvccpb.PUT
+ if isTombstone(revs[i]) {
+ ty = mvccpb.DELETE
+ // patch in mod revision so watchers won't skip
+ kv.ModRevision = bytesToRev(revs[i]).main
+ }
+ evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
+ }
+ return evs
+}
+
+// notify notifies the fact that given event at the given rev just happened to
+// watchers that watch on the key of the event.
+func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
+ var victim watcherBatch
+ for w, eb := range newWatcherBatch(&s.synced, evs) {
+ if eb.revs != 1 {
+ if s.store != nil && s.store.lg != nil {
+ s.store.lg.Panic(
+ "unexpected multiple revisions in watch notification",
+ zap.Int("number-of-revisions", eb.revs),
+ )
+ } else {
+ plog.Panicf("unexpected multiple revisions in notification")
+ }
+ }
+ if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
+ pendingEventsGauge.Add(float64(len(eb.evs)))
+ } else {
+ // move slow watcher to victims
+ w.minRev = rev + 1
+ if victim == nil {
+ victim = make(watcherBatch)
+ }
+ w.victim = true
+ victim[w] = eb
+ s.synced.delete(w)
+ slowWatcherGauge.Inc()
+ }
+ }
+ s.addVictim(victim)
+}
+
+func (s *watchableStore) addVictim(victim watcherBatch) {
+ if victim == nil {
+ return
+ }
+ s.victims = append(s.victims, victim)
+ select {
+ case s.victimc <- struct{}{}:
+ default:
+ }
+}
+
+func (s *watchableStore) rev() int64 { return s.store.Rev() }
+
+func (s *watchableStore) progress(w *watcher) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ if _, ok := s.synced.watchers[w]; ok {
+ w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
+ // If the ch is full, this watcher is receiving events.
+ // We do not need to send progress at all.
+ }
+}
+
+type watcher struct {
+ // the watcher key
+ key []byte
+ // end indicates the end of the range to watch.
+ // If end is set, the watcher is on a range.
+ end []byte
+
+ // victim is set when ch is blocked and undergoing victim processing
+ victim bool
+
+ // compacted is set when the watcher is removed because of compaction
+ compacted bool
+
+ // restore is true when the watcher is being restored from leader snapshot
+ // which means that this watcher has just been moved from "synced" to "unsynced"
+ // watcher group, possibly with a future revision when it was first added
+ // to the synced watcher
+ // "unsynced" watcher revision must always be <= current revision,
+ // except when the watcher were to be moved from "synced" watcher group
+ restore bool
+
+ // minRev is the minimum revision update the watcher will accept
+ minRev int64
+ id WatchID
+
+ fcs []FilterFunc
+ // a chan to send out the watch response.
+ // The chan might be shared with other watchers.
+ ch chan<- WatchResponse
+}
+
+func (w *watcher) send(wr WatchResponse) bool {
+ progressEvent := len(wr.Events) == 0
+
+ if len(w.fcs) != 0 {
+ ne := make([]mvccpb.Event, 0, len(wr.Events))
+ for i := range wr.Events {
+ filtered := false
+ for _, filter := range w.fcs {
+ if filter(wr.Events[i]) {
+ filtered = true
+ break
+ }
+ }
+ if !filtered {
+ ne = append(ne, wr.Events[i])
+ }
+ }
+ wr.Events = ne
+ }
+
+ // if all events are filtered out, we should send nothing.
+ if !progressEvent && len(wr.Events) == 0 {
+ return true
+ }
+ select {
+ case w.ch <- wr:
+ return true
+ default:
+ return false
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/watchable_store_txn.go b/vendor/go.etcd.io/etcd/mvcc/watchable_store_txn.go
new file mode 100644
index 0000000..3bcfa4d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/watchable_store_txn.go
@@ -0,0 +1,51 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import "go.etcd.io/etcd/mvcc/mvccpb"
+
+func (tw *watchableStoreTxnWrite) End() {
+ changes := tw.Changes()
+ if len(changes) == 0 {
+ tw.TxnWrite.End()
+ return
+ }
+
+ rev := tw.Rev() + 1
+ evs := make([]mvccpb.Event, len(changes))
+ for i, change := range changes {
+ evs[i].Kv = &changes[i]
+ if change.CreateRevision == 0 {
+ evs[i].Type = mvccpb.DELETE
+ evs[i].Kv.ModRevision = rev
+ } else {
+ evs[i].Type = mvccpb.PUT
+ }
+ }
+
+ // end write txn under watchable store lock so the updates are visible
+ // when asynchronous event posting checks the current store revision
+ tw.s.mu.Lock()
+ tw.s.notify(rev, evs)
+ tw.TxnWrite.End()
+ tw.s.mu.Unlock()
+}
+
+type watchableStoreTxnWrite struct {
+ TxnWrite
+ s *watchableStore
+}
+
+func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} }
diff --git a/vendor/go.etcd.io/etcd/mvcc/watcher.go b/vendor/go.etcd.io/etcd/mvcc/watcher.go
new file mode 100644
index 0000000..2846d62
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/watcher.go
@@ -0,0 +1,193 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "bytes"
+ "errors"
+ "sync"
+
+ "go.etcd.io/etcd/mvcc/mvccpb"
+)
+
+// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
+// user-provided ID is available. If pass, an ID will automatically be assigned.
+const AutoWatchID WatchID = 0
+
+var (
+ ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
+ ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
+ ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream")
+)
+
+type WatchID int64
+
+// FilterFunc returns true if the given event should be filtered out.
+type FilterFunc func(e mvccpb.Event) bool
+
+type WatchStream interface {
+ // Watch creates a watcher. The watcher watches the events happening or
+ // happened on the given key or range [key, end) from the given startRev.
+ //
+ // The whole event history can be watched unless compacted.
+ // If "startRev" <=0, watch observes events after currentRev.
+ //
+ // The returned "id" is the ID of this watcher. It appears as WatchID
+ // in events that are sent to the created watcher through stream channel.
+ // The watch ID is used when it's not equal to AutoWatchID. Otherwise,
+ // an auto-generated watch ID is returned.
+ Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)
+
+ // Chan returns a chan. All watch response will be sent to the returned chan.
+ Chan() <-chan WatchResponse
+
+ // RequestProgress requests the progress of the watcher with given ID. The response
+ // will only be sent if the watcher is currently synced.
+ // The responses will be sent through the WatchRespone Chan attached
+ // with this stream to ensure correct ordering.
+ // The responses contains no events. The revision in the response is the progress
+ // of the watchers since the watcher is currently synced.
+ RequestProgress(id WatchID)
+
+ // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
+ // returned.
+ Cancel(id WatchID) error
+
+ // Close closes Chan and release all related resources.
+ Close()
+
+ // Rev returns the current revision of the KV the stream watches on.
+ Rev() int64
+}
+
+type WatchResponse struct {
+ // WatchID is the WatchID of the watcher this response sent to.
+ WatchID WatchID
+
+ // Events contains all the events that needs to send.
+ Events []mvccpb.Event
+
+ // Revision is the revision of the KV when the watchResponse is created.
+ // For a normal response, the revision should be the same as the last
+ // modified revision inside Events. For a delayed response to a unsynced
+ // watcher, the revision is greater than the last modified revision
+ // inside Events.
+ Revision int64
+
+ // CompactRevision is set when the watcher is cancelled due to compaction.
+ CompactRevision int64
+}
+
+// watchStream contains a collection of watchers that share
+// one streaming chan to send out watched events and other control events.
+type watchStream struct {
+ watchable watchable
+ ch chan WatchResponse
+
+ mu sync.Mutex // guards fields below it
+ // nextID is the ID pre-allocated for next new watcher in this stream
+ nextID WatchID
+ closed bool
+ cancels map[WatchID]cancelFunc
+ watchers map[WatchID]*watcher
+}
+
+// Watch creates a new watcher in the stream and returns its WatchID.
+func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
+ // prevent wrong range where key >= end lexicographically
+ // watch request with 'WithFromKey' has empty-byte range end
+ if len(end) != 0 && bytes.Compare(key, end) != -1 {
+ return -1, ErrEmptyWatcherRange
+ }
+
+ ws.mu.Lock()
+ defer ws.mu.Unlock()
+ if ws.closed {
+ return -1, ErrEmptyWatcherRange
+ }
+
+ if id == AutoWatchID {
+ for ws.watchers[ws.nextID] != nil {
+ ws.nextID++
+ }
+ id = ws.nextID
+ ws.nextID++
+ } else if _, ok := ws.watchers[id]; ok {
+ return -1, ErrWatcherDuplicateID
+ }
+
+ w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
+
+ ws.cancels[id] = c
+ ws.watchers[id] = w
+ return id, nil
+}
+
+func (ws *watchStream) Chan() <-chan WatchResponse {
+ return ws.ch
+}
+
+func (ws *watchStream) Cancel(id WatchID) error {
+ ws.mu.Lock()
+ cancel, ok := ws.cancels[id]
+ w := ws.watchers[id]
+ ok = ok && !ws.closed
+ ws.mu.Unlock()
+
+ if !ok {
+ return ErrWatcherNotExist
+ }
+ cancel()
+
+ ws.mu.Lock()
+ // The watch isn't removed until cancel so that if Close() is called,
+ // it will wait for the cancel. Otherwise, Close() could close the
+ // watch channel while the store is still posting events.
+ if ww := ws.watchers[id]; ww == w {
+ delete(ws.cancels, id)
+ delete(ws.watchers, id)
+ }
+ ws.mu.Unlock()
+
+ return nil
+}
+
+func (ws *watchStream) Close() {
+ ws.mu.Lock()
+ defer ws.mu.Unlock()
+
+ for _, cancel := range ws.cancels {
+ cancel()
+ }
+ ws.closed = true
+ close(ws.ch)
+ watchStreamGauge.Dec()
+}
+
+func (ws *watchStream) Rev() int64 {
+ ws.mu.Lock()
+ defer ws.mu.Unlock()
+ return ws.watchable.rev()
+}
+
+func (ws *watchStream) RequestProgress(id WatchID) {
+ ws.mu.Lock()
+ w, ok := ws.watchers[id]
+ ws.mu.Unlock()
+ if !ok {
+ return
+ }
+ ws.watchable.progress(w)
+}
diff --git a/vendor/go.etcd.io/etcd/mvcc/watcher_group.go b/vendor/go.etcd.io/etcd/mvcc/watcher_group.go
new file mode 100644
index 0000000..151f0de
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/watcher_group.go
@@ -0,0 +1,293 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "fmt"
+ "math"
+
+ "go.etcd.io/etcd/mvcc/mvccpb"
+ "go.etcd.io/etcd/pkg/adt"
+)
+
+var (
+ // watchBatchMaxRevs is the maximum distinct revisions that
+ // may be sent to an unsynced watcher at a time. Declared as
+ // var instead of const for testing purposes.
+ watchBatchMaxRevs = 1000
+)
+
+type eventBatch struct {
+ // evs is a batch of revision-ordered events
+ evs []mvccpb.Event
+ // revs is the minimum unique revisions observed for this batch
+ revs int
+ // moreRev is first revision with more events following this batch
+ moreRev int64
+}
+
+func (eb *eventBatch) add(ev mvccpb.Event) {
+ if eb.revs > watchBatchMaxRevs {
+ // maxed out batch size
+ return
+ }
+
+ if len(eb.evs) == 0 {
+ // base case
+ eb.revs = 1
+ eb.evs = append(eb.evs, ev)
+ return
+ }
+
+ // revision accounting
+ ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
+ evRev := ev.Kv.ModRevision
+ if evRev > ebRev {
+ eb.revs++
+ if eb.revs > watchBatchMaxRevs {
+ eb.moreRev = evRev
+ return
+ }
+ }
+
+ eb.evs = append(eb.evs, ev)
+}
+
+type watcherBatch map[*watcher]*eventBatch
+
+func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
+ eb := wb[w]
+ if eb == nil {
+ eb = &eventBatch{}
+ wb[w] = eb
+ }
+ eb.add(ev)
+}
+
+// newWatcherBatch maps watchers to their matched events. It enables quick
+// events look up by watcher.
+func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
+ if len(wg.watchers) == 0 {
+ return nil
+ }
+
+ wb := make(watcherBatch)
+ for _, ev := range evs {
+ for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
+ if ev.Kv.ModRevision >= w.minRev {
+ // don't double notify
+ wb.add(w, ev)
+ }
+ }
+ }
+ return wb
+}
+
+type watcherSet map[*watcher]struct{}
+
+func (w watcherSet) add(wa *watcher) {
+ if _, ok := w[wa]; ok {
+ panic("add watcher twice!")
+ }
+ w[wa] = struct{}{}
+}
+
+func (w watcherSet) union(ws watcherSet) {
+ for wa := range ws {
+ w.add(wa)
+ }
+}
+
+func (w watcherSet) delete(wa *watcher) {
+ if _, ok := w[wa]; !ok {
+ panic("removing missing watcher!")
+ }
+ delete(w, wa)
+}
+
+type watcherSetByKey map[string]watcherSet
+
+func (w watcherSetByKey) add(wa *watcher) {
+ set := w[string(wa.key)]
+ if set == nil {
+ set = make(watcherSet)
+ w[string(wa.key)] = set
+ }
+ set.add(wa)
+}
+
+func (w watcherSetByKey) delete(wa *watcher) bool {
+ k := string(wa.key)
+ if v, ok := w[k]; ok {
+ if _, ok := v[wa]; ok {
+ delete(v, wa)
+ if len(v) == 0 {
+ // remove the set; nothing left
+ delete(w, k)
+ }
+ return true
+ }
+ }
+ return false
+}
+
+// watcherGroup is a collection of watchers organized by their ranges
+type watcherGroup struct {
+ // keyWatchers has the watchers that watch on a single key
+ keyWatchers watcherSetByKey
+ // ranges has the watchers that watch a range; it is sorted by interval
+ ranges adt.IntervalTree
+ // watchers is the set of all watchers
+ watchers watcherSet
+}
+
+func newWatcherGroup() watcherGroup {
+ return watcherGroup{
+ keyWatchers: make(watcherSetByKey),
+ ranges: adt.NewIntervalTree(),
+ watchers: make(watcherSet),
+ }
+}
+
+// add puts a watcher in the group.
+func (wg *watcherGroup) add(wa *watcher) {
+ wg.watchers.add(wa)
+ if wa.end == nil {
+ wg.keyWatchers.add(wa)
+ return
+ }
+
+ // interval already registered?
+ ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
+ if iv := wg.ranges.Find(ivl); iv != nil {
+ iv.Val.(watcherSet).add(wa)
+ return
+ }
+
+ // not registered, put in interval tree
+ ws := make(watcherSet)
+ ws.add(wa)
+ wg.ranges.Insert(ivl, ws)
+}
+
+// contains is whether the given key has a watcher in the group.
+func (wg *watcherGroup) contains(key string) bool {
+ _, ok := wg.keyWatchers[key]
+ return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
+}
+
+// size gives the number of unique watchers in the group.
+func (wg *watcherGroup) size() int { return len(wg.watchers) }
+
+// delete removes a watcher from the group.
+func (wg *watcherGroup) delete(wa *watcher) bool {
+ if _, ok := wg.watchers[wa]; !ok {
+ return false
+ }
+ wg.watchers.delete(wa)
+ if wa.end == nil {
+ wg.keyWatchers.delete(wa)
+ return true
+ }
+
+ ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
+ iv := wg.ranges.Find(ivl)
+ if iv == nil {
+ return false
+ }
+
+ ws := iv.Val.(watcherSet)
+ delete(ws, wa)
+ if len(ws) == 0 {
+ // remove interval missing watchers
+ if ok := wg.ranges.Delete(ivl); !ok {
+ panic("could not remove watcher from interval tree")
+ }
+ }
+
+ return true
+}
+
+// choose selects watchers from the watcher group to update
+func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
+ if len(wg.watchers) < maxWatchers {
+ return wg, wg.chooseAll(curRev, compactRev)
+ }
+ ret := newWatcherGroup()
+ for w := range wg.watchers {
+ if maxWatchers <= 0 {
+ break
+ }
+ maxWatchers--
+ ret.add(w)
+ }
+ return &ret, ret.chooseAll(curRev, compactRev)
+}
+
+func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
+ minRev := int64(math.MaxInt64)
+ for w := range wg.watchers {
+ if w.minRev > curRev {
+ // after network partition, possibly choosing future revision watcher from restore operation
+ // with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
+ // do not panic when such watcher had been moved from "synced" watcher during restore operation
+ if !w.restore {
+ panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
+ }
+
+ // mark 'restore' done, since it's chosen
+ w.restore = false
+ }
+ if w.minRev < compactRev {
+ select {
+ case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
+ w.compacted = true
+ wg.delete(w)
+ default:
+ // retry next time
+ }
+ continue
+ }
+ if minRev > w.minRev {
+ minRev = w.minRev
+ }
+ }
+ return minRev
+}
+
+// watcherSetByKey gets the set of watchers that receive events on the given key.
+func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
+ wkeys := wg.keyWatchers[key]
+ wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
+
+ // zero-copy cases
+ switch {
+ case len(wranges) == 0:
+ // no need to merge ranges or copy; reuse single-key set
+ return wkeys
+ case len(wranges) == 0 && len(wkeys) == 0:
+ return nil
+ case len(wranges) == 1 && len(wkeys) == 0:
+ return wranges[0].Val.(watcherSet)
+ }
+
+ // copy case
+ ret := make(watcherSet)
+ ret.union(wg.keyWatchers[key])
+ for _, item := range wranges {
+ ret.union(item.Val.(watcherSet))
+ }
+ return ret
+}