[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/coreos/etcd/mvcc/watcher_group.go b/vendor/github.com/coreos/etcd/mvcc/watcher_group.go
new file mode 100644
index 0000000..b569d04
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/mvcc/watcher_group.go
@@ -0,0 +1,293 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mvcc
+
+import (
+	"fmt"
+	"math"
+
+	"github.com/coreos/etcd/mvcc/mvccpb"
+	"github.com/coreos/etcd/pkg/adt"
+)
+
+var (
+	// watchBatchMaxRevs is the maximum distinct revisions that
+	// may be sent to an unsynced watcher at a time. Declared as
+	// var instead of const for testing purposes.
+	watchBatchMaxRevs = 1000
+)
+
+type eventBatch struct {
+	// evs is a batch of revision-ordered events
+	evs []mvccpb.Event
+	// revs is the minimum unique revisions observed for this batch
+	revs int
+	// moreRev is first revision with more events following this batch
+	moreRev int64
+}
+
+func (eb *eventBatch) add(ev mvccpb.Event) {
+	if eb.revs > watchBatchMaxRevs {
+		// maxed out batch size
+		return
+	}
+
+	if len(eb.evs) == 0 {
+		// base case
+		eb.revs = 1
+		eb.evs = append(eb.evs, ev)
+		return
+	}
+
+	// revision accounting
+	ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
+	evRev := ev.Kv.ModRevision
+	if evRev > ebRev {
+		eb.revs++
+		if eb.revs > watchBatchMaxRevs {
+			eb.moreRev = evRev
+			return
+		}
+	}
+
+	eb.evs = append(eb.evs, ev)
+}
+
+type watcherBatch map[*watcher]*eventBatch
+
+func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
+	eb := wb[w]
+	if eb == nil {
+		eb = &eventBatch{}
+		wb[w] = eb
+	}
+	eb.add(ev)
+}
+
+// newWatcherBatch maps watchers to their matched events. It enables quick
+// events look up by watcher.
+func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
+	if len(wg.watchers) == 0 {
+		return nil
+	}
+
+	wb := make(watcherBatch)
+	for _, ev := range evs {
+		for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
+			if ev.Kv.ModRevision >= w.minRev {
+				// don't double notify
+				wb.add(w, ev)
+			}
+		}
+	}
+	return wb
+}
+
+type watcherSet map[*watcher]struct{}
+
+func (w watcherSet) add(wa *watcher) {
+	if _, ok := w[wa]; ok {
+		panic("add watcher twice!")
+	}
+	w[wa] = struct{}{}
+}
+
+func (w watcherSet) union(ws watcherSet) {
+	for wa := range ws {
+		w.add(wa)
+	}
+}
+
+func (w watcherSet) delete(wa *watcher) {
+	if _, ok := w[wa]; !ok {
+		panic("removing missing watcher!")
+	}
+	delete(w, wa)
+}
+
+type watcherSetByKey map[string]watcherSet
+
+func (w watcherSetByKey) add(wa *watcher) {
+	set := w[string(wa.key)]
+	if set == nil {
+		set = make(watcherSet)
+		w[string(wa.key)] = set
+	}
+	set.add(wa)
+}
+
+func (w watcherSetByKey) delete(wa *watcher) bool {
+	k := string(wa.key)
+	if v, ok := w[k]; ok {
+		if _, ok := v[wa]; ok {
+			delete(v, wa)
+			if len(v) == 0 {
+				// remove the set; nothing left
+				delete(w, k)
+			}
+			return true
+		}
+	}
+	return false
+}
+
+// watcherGroup is a collection of watchers organized by their ranges
+type watcherGroup struct {
+	// keyWatchers has the watchers that watch on a single key
+	keyWatchers watcherSetByKey
+	// ranges has the watchers that watch a range; it is sorted by interval
+	ranges adt.IntervalTree
+	// watchers is the set of all watchers
+	watchers watcherSet
+}
+
+func newWatcherGroup() watcherGroup {
+	return watcherGroup{
+		keyWatchers: make(watcherSetByKey),
+		ranges:      adt.NewIntervalTree(),
+		watchers:    make(watcherSet),
+	}
+}
+
+// add puts a watcher in the group.
+func (wg *watcherGroup) add(wa *watcher) {
+	wg.watchers.add(wa)
+	if wa.end == nil {
+		wg.keyWatchers.add(wa)
+		return
+	}
+
+	// interval already registered?
+	ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
+	if iv := wg.ranges.Find(ivl); iv != nil {
+		iv.Val.(watcherSet).add(wa)
+		return
+	}
+
+	// not registered, put in interval tree
+	ws := make(watcherSet)
+	ws.add(wa)
+	wg.ranges.Insert(ivl, ws)
+}
+
+// contains is whether the given key has a watcher in the group.
+func (wg *watcherGroup) contains(key string) bool {
+	_, ok := wg.keyWatchers[key]
+	return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
+}
+
+// size gives the number of unique watchers in the group.
+func (wg *watcherGroup) size() int { return len(wg.watchers) }
+
+// delete removes a watcher from the group.
+func (wg *watcherGroup) delete(wa *watcher) bool {
+	if _, ok := wg.watchers[wa]; !ok {
+		return false
+	}
+	wg.watchers.delete(wa)
+	if wa.end == nil {
+		wg.keyWatchers.delete(wa)
+		return true
+	}
+
+	ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
+	iv := wg.ranges.Find(ivl)
+	if iv == nil {
+		return false
+	}
+
+	ws := iv.Val.(watcherSet)
+	delete(ws, wa)
+	if len(ws) == 0 {
+		// remove interval missing watchers
+		if ok := wg.ranges.Delete(ivl); !ok {
+			panic("could not remove watcher from interval tree")
+		}
+	}
+
+	return true
+}
+
+// choose selects watchers from the watcher group to update
+func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
+	if len(wg.watchers) < maxWatchers {
+		return wg, wg.chooseAll(curRev, compactRev)
+	}
+	ret := newWatcherGroup()
+	for w := range wg.watchers {
+		if maxWatchers <= 0 {
+			break
+		}
+		maxWatchers--
+		ret.add(w)
+	}
+	return &ret, ret.chooseAll(curRev, compactRev)
+}
+
+func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
+	minRev := int64(math.MaxInt64)
+	for w := range wg.watchers {
+		if w.minRev > curRev {
+			// after network partition, possibly choosing future revision watcher from restore operation
+			// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
+			// do not panic when such watcher had been moved from "synced" watcher during restore operation
+			if !w.restore {
+				panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
+			}
+
+			// mark 'restore' done, since it's chosen
+			w.restore = false
+		}
+		if w.minRev < compactRev {
+			select {
+			case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
+				w.compacted = true
+				wg.delete(w)
+			default:
+				// retry next time
+			}
+			continue
+		}
+		if minRev > w.minRev {
+			minRev = w.minRev
+		}
+	}
+	return minRev
+}
+
+// watcherSetByKey gets the set of watchers that receive events on the given key.
+func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
+	wkeys := wg.keyWatchers[key]
+	wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
+
+	// zero-copy cases
+	switch {
+	case len(wranges) == 0:
+		// no need to merge ranges or copy; reuse single-key set
+		return wkeys
+	case len(wranges) == 0 && len(wkeys) == 0:
+		return nil
+	case len(wranges) == 1 && len(wkeys) == 0:
+		return wranges[0].Val.(watcherSet)
+	}
+
+	// copy case
+	ret := make(watcherSet)
+	ret.union(wg.keyWatchers[key])
+	for _, item := range wranges {
+		ret.union(item.Val.(watcherSet))
+	}
+	return ret
+}