[VOL-2235] Mocks and interfaces for rw-core
This update consists of mocks that are used by the rw-core
during unit testing. It also includes interfaces used for unit
tests.
Change-Id: I20ca1455c358113c3aa897acc6355e0ddbc614b7
diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/read_tx.go b/vendor/go.etcd.io/etcd/mvcc/backend/read_tx.go
new file mode 100644
index 0000000..91fe72e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/backend/read_tx.go
@@ -0,0 +1,210 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package backend
+
+import (
+ "bytes"
+ "math"
+ "sync"
+
+ bolt "go.etcd.io/bbolt"
+)
+
+// safeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
+// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
+// is known to never overwrite any key so range is safe.
+var safeRangeBucket = []byte("key")
+
+type ReadTx interface {
+ Lock()
+ Unlock()
+ RLock()
+ RUnlock()
+
+ UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
+ UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
+}
+
+type readTx struct {
+ // mu protects accesses to the txReadBuffer
+ mu sync.RWMutex
+ buf txReadBuffer
+
+ // TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
+ // txMu protects accesses to buckets and tx on Range requests.
+ txMu sync.RWMutex
+ tx *bolt.Tx
+ buckets map[string]*bolt.Bucket
+ // txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
+ txWg *sync.WaitGroup
+}
+
+func (rt *readTx) Lock() { rt.mu.Lock() }
+func (rt *readTx) Unlock() { rt.mu.Unlock() }
+func (rt *readTx) RLock() { rt.mu.RLock() }
+func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
+
+func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+ if endKey == nil {
+ // forbid duplicates for single keys
+ limit = 1
+ }
+ if limit <= 0 {
+ limit = math.MaxInt64
+ }
+ if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
+ panic("do not use unsafeRange on non-keys bucket")
+ }
+ keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
+ if int64(len(keys)) == limit {
+ return keys, vals
+ }
+
+ // find/cache bucket
+ bn := string(bucketName)
+ rt.txMu.RLock()
+ bucket, ok := rt.buckets[bn]
+ rt.txMu.RUnlock()
+ if !ok {
+ rt.txMu.Lock()
+ bucket = rt.tx.Bucket(bucketName)
+ rt.buckets[bn] = bucket
+ rt.txMu.Unlock()
+ }
+
+ // ignore missing bucket since may have been created in this batch
+ if bucket == nil {
+ return keys, vals
+ }
+ rt.txMu.Lock()
+ c := bucket.Cursor()
+ rt.txMu.Unlock()
+
+ k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
+ return append(k2, keys...), append(v2, vals...)
+}
+
+func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+ dups := make(map[string]struct{})
+ getDups := func(k, v []byte) error {
+ dups[string(k)] = struct{}{}
+ return nil
+ }
+ visitNoDup := func(k, v []byte) error {
+ if _, ok := dups[string(k)]; ok {
+ return nil
+ }
+ return visitor(k, v)
+ }
+ if err := rt.buf.ForEach(bucketName, getDups); err != nil {
+ return err
+ }
+ rt.txMu.Lock()
+ err := unsafeForEach(rt.tx, bucketName, visitNoDup)
+ rt.txMu.Unlock()
+ if err != nil {
+ return err
+ }
+ return rt.buf.ForEach(bucketName, visitor)
+}
+
+func (rt *readTx) reset() {
+ rt.buf.reset()
+ rt.buckets = make(map[string]*bolt.Bucket)
+ rt.tx = nil
+ rt.txWg = new(sync.WaitGroup)
+}
+
+// TODO: create a base type for readTx and concurrentReadTx to avoid duplicated function implementation?
+type concurrentReadTx struct {
+ buf txReadBuffer
+ txMu *sync.RWMutex
+ tx *bolt.Tx
+ buckets map[string]*bolt.Bucket
+ txWg *sync.WaitGroup
+}
+
+func (rt *concurrentReadTx) Lock() {}
+func (rt *concurrentReadTx) Unlock() {}
+
+// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
+func (rt *concurrentReadTx) RLock() {}
+
+// RUnlock signals the end of concurrentReadTx.
+func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
+
+func (rt *concurrentReadTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
+ dups := make(map[string]struct{})
+ getDups := func(k, v []byte) error {
+ dups[string(k)] = struct{}{}
+ return nil
+ }
+ visitNoDup := func(k, v []byte) error {
+ if _, ok := dups[string(k)]; ok {
+ return nil
+ }
+ return visitor(k, v)
+ }
+ if err := rt.buf.ForEach(bucketName, getDups); err != nil {
+ return err
+ }
+ rt.txMu.Lock()
+ err := unsafeForEach(rt.tx, bucketName, visitNoDup)
+ rt.txMu.Unlock()
+ if err != nil {
+ return err
+ }
+ return rt.buf.ForEach(bucketName, visitor)
+}
+
+func (rt *concurrentReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
+ if endKey == nil {
+ // forbid duplicates for single keys
+ limit = 1
+ }
+ if limit <= 0 {
+ limit = math.MaxInt64
+ }
+ if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) {
+ panic("do not use unsafeRange on non-keys bucket")
+ }
+ keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
+ if int64(len(keys)) == limit {
+ return keys, vals
+ }
+
+ // find/cache bucket
+ bn := string(bucketName)
+ rt.txMu.RLock()
+ bucket, ok := rt.buckets[bn]
+ rt.txMu.RUnlock()
+ if !ok {
+ rt.txMu.Lock()
+ bucket = rt.tx.Bucket(bucketName)
+ rt.buckets[bn] = bucket
+ rt.txMu.Unlock()
+ }
+
+ // ignore missing bucket since may have been created in this batch
+ if bucket == nil {
+ return keys, vals
+ }
+ rt.txMu.Lock()
+ c := bucket.Cursor()
+ rt.txMu.Unlock()
+
+ k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
+ return append(k2, keys...), append(v2, vals...)
+}