[VOL-2235] Mocks and interfaces for rw-core
This update consists of mocks that are used by the rw-core
during unit testing. It also includes interfaces used for unit
tests.
Change-Id: I20ca1455c358113c3aa897acc6355e0ddbc614b7
diff --git a/vendor/go.etcd.io/etcd/mvcc/watcher_group.go b/vendor/go.etcd.io/etcd/mvcc/watcher_group.go
new file mode 100644
index 0000000..151f0de
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/mvcc/watcher_group.go
@@ -0,0 +1,293 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+ "fmt"
+ "math"
+
+ "go.etcd.io/etcd/mvcc/mvccpb"
+ "go.etcd.io/etcd/pkg/adt"
+)
+
+var (
+ // watchBatchMaxRevs is the maximum distinct revisions that
+ // may be sent to an unsynced watcher at a time. Declared as
+ // var instead of const for testing purposes.
+ watchBatchMaxRevs = 1000
+)
+
+type eventBatch struct {
+ // evs is a batch of revision-ordered events
+ evs []mvccpb.Event
+ // revs is the minimum unique revisions observed for this batch
+ revs int
+ // moreRev is first revision with more events following this batch
+ moreRev int64
+}
+
+func (eb *eventBatch) add(ev mvccpb.Event) {
+ if eb.revs > watchBatchMaxRevs {
+ // maxed out batch size
+ return
+ }
+
+ if len(eb.evs) == 0 {
+ // base case
+ eb.revs = 1
+ eb.evs = append(eb.evs, ev)
+ return
+ }
+
+ // revision accounting
+ ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
+ evRev := ev.Kv.ModRevision
+ if evRev > ebRev {
+ eb.revs++
+ if eb.revs > watchBatchMaxRevs {
+ eb.moreRev = evRev
+ return
+ }
+ }
+
+ eb.evs = append(eb.evs, ev)
+}
+
+type watcherBatch map[*watcher]*eventBatch
+
+func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
+ eb := wb[w]
+ if eb == nil {
+ eb = &eventBatch{}
+ wb[w] = eb
+ }
+ eb.add(ev)
+}
+
+// newWatcherBatch maps watchers to their matched events. It enables quick
+// events look up by watcher.
+func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
+ if len(wg.watchers) == 0 {
+ return nil
+ }
+
+ wb := make(watcherBatch)
+ for _, ev := range evs {
+ for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
+ if ev.Kv.ModRevision >= w.minRev {
+ // don't double notify
+ wb.add(w, ev)
+ }
+ }
+ }
+ return wb
+}
+
+type watcherSet map[*watcher]struct{}
+
+func (w watcherSet) add(wa *watcher) {
+ if _, ok := w[wa]; ok {
+ panic("add watcher twice!")
+ }
+ w[wa] = struct{}{}
+}
+
+func (w watcherSet) union(ws watcherSet) {
+ for wa := range ws {
+ w.add(wa)
+ }
+}
+
+func (w watcherSet) delete(wa *watcher) {
+ if _, ok := w[wa]; !ok {
+ panic("removing missing watcher!")
+ }
+ delete(w, wa)
+}
+
+type watcherSetByKey map[string]watcherSet
+
+func (w watcherSetByKey) add(wa *watcher) {
+ set := w[string(wa.key)]
+ if set == nil {
+ set = make(watcherSet)
+ w[string(wa.key)] = set
+ }
+ set.add(wa)
+}
+
+func (w watcherSetByKey) delete(wa *watcher) bool {
+ k := string(wa.key)
+ if v, ok := w[k]; ok {
+ if _, ok := v[wa]; ok {
+ delete(v, wa)
+ if len(v) == 0 {
+ // remove the set; nothing left
+ delete(w, k)
+ }
+ return true
+ }
+ }
+ return false
+}
+
+// watcherGroup is a collection of watchers organized by their ranges
+type watcherGroup struct {
+ // keyWatchers has the watchers that watch on a single key
+ keyWatchers watcherSetByKey
+ // ranges has the watchers that watch a range; it is sorted by interval
+ ranges adt.IntervalTree
+ // watchers is the set of all watchers
+ watchers watcherSet
+}
+
+func newWatcherGroup() watcherGroup {
+ return watcherGroup{
+ keyWatchers: make(watcherSetByKey),
+ ranges: adt.NewIntervalTree(),
+ watchers: make(watcherSet),
+ }
+}
+
+// add puts a watcher in the group.
+func (wg *watcherGroup) add(wa *watcher) {
+ wg.watchers.add(wa)
+ if wa.end == nil {
+ wg.keyWatchers.add(wa)
+ return
+ }
+
+ // interval already registered?
+ ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
+ if iv := wg.ranges.Find(ivl); iv != nil {
+ iv.Val.(watcherSet).add(wa)
+ return
+ }
+
+ // not registered, put in interval tree
+ ws := make(watcherSet)
+ ws.add(wa)
+ wg.ranges.Insert(ivl, ws)
+}
+
+// contains is whether the given key has a watcher in the group.
+func (wg *watcherGroup) contains(key string) bool {
+ _, ok := wg.keyWatchers[key]
+ return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
+}
+
+// size gives the number of unique watchers in the group.
+func (wg *watcherGroup) size() int { return len(wg.watchers) }
+
+// delete removes a watcher from the group.
+func (wg *watcherGroup) delete(wa *watcher) bool {
+ if _, ok := wg.watchers[wa]; !ok {
+ return false
+ }
+ wg.watchers.delete(wa)
+ if wa.end == nil {
+ wg.keyWatchers.delete(wa)
+ return true
+ }
+
+ ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
+ iv := wg.ranges.Find(ivl)
+ if iv == nil {
+ return false
+ }
+
+ ws := iv.Val.(watcherSet)
+ delete(ws, wa)
+ if len(ws) == 0 {
+ // remove interval missing watchers
+ if ok := wg.ranges.Delete(ivl); !ok {
+ panic("could not remove watcher from interval tree")
+ }
+ }
+
+ return true
+}
+
+// choose selects watchers from the watcher group to update
+func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
+ if len(wg.watchers) < maxWatchers {
+ return wg, wg.chooseAll(curRev, compactRev)
+ }
+ ret := newWatcherGroup()
+ for w := range wg.watchers {
+ if maxWatchers <= 0 {
+ break
+ }
+ maxWatchers--
+ ret.add(w)
+ }
+ return &ret, ret.chooseAll(curRev, compactRev)
+}
+
+func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
+ minRev := int64(math.MaxInt64)
+ for w := range wg.watchers {
+ if w.minRev > curRev {
+ // after network partition, possibly choosing future revision watcher from restore operation
+ // with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
+ // do not panic when such watcher had been moved from "synced" watcher during restore operation
+ if !w.restore {
+ panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
+ }
+
+ // mark 'restore' done, since it's chosen
+ w.restore = false
+ }
+ if w.minRev < compactRev {
+ select {
+ case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
+ w.compacted = true
+ wg.delete(w)
+ default:
+ // retry next time
+ }
+ continue
+ }
+ if minRev > w.minRev {
+ minRev = w.minRev
+ }
+ }
+ return minRev
+}
+
+// watcherSetByKey gets the set of watchers that receive events on the given key.
+func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
+ wkeys := wg.keyWatchers[key]
+ wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
+
+ // zero-copy cases
+ switch {
+ case len(wranges) == 0:
+ // no need to merge ranges or copy; reuse single-key set
+ return wkeys
+ case len(wranges) == 0 && len(wkeys) == 0:
+ return nil
+ case len(wranges) == 1 && len(wkeys) == 0:
+ return wranges[0].Val.(watcherSet)
+ }
+
+ // copy case
+ ret := make(watcherSet)
+ ret.union(wg.keyWatchers[key])
+ for _, item := range wranges {
+ ret.union(item.Val.(watcherSet))
+ }
+ return ret
+}