[VOL-2235] Mocks and interfaces for rw-core

This update consists of mocks that are used by the rw-core
during unit testing.  It also includes interfaces used for unit
tests.

Change-Id: I20ca1455c358113c3aa897acc6355e0ddbc614b7
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v2store/watcher_hub.go b/vendor/go.etcd.io/etcd/etcdserver/api/v2store/watcher_hub.go
new file mode 100644
index 0000000..a452e7e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v2store/watcher_hub.go
@@ -0,0 +1,200 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v2store
+
+import (
+	"container/list"
+	"path"
+	"strings"
+	"sync"
+	"sync/atomic"
+
+	"go.etcd.io/etcd/etcdserver/api/v2error"
+)
+
+// A watcherHub contains all subscribed watchers
+// watchers is a map with watched path as key and watcher as value
+// EventHistory keeps the old events for watcherHub. It is used to help
+// watcher to get a continuous event history. Or a watcher might miss the
+// event happens between the end of the first watch command and the start
+// of the second command.
+type watcherHub struct {
+	// count must be the first element to keep 64-bit alignment for atomic
+	// access
+
+	count int64 // current number of watchers.
+
+	mutex        sync.Mutex
+	watchers     map[string]*list.List
+	EventHistory *EventHistory
+}
+
+// newWatchHub creates a watcherHub. The capacity determines how many events we will
+// keep in the eventHistory.
+// Typically, we only need to keep a small size of history[smaller than 20K].
+// Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
+func newWatchHub(capacity int) *watcherHub {
+	return &watcherHub{
+		watchers:     make(map[string]*list.List),
+		EventHistory: newEventHistory(capacity),
+	}
+}
+
+// Watch function returns a Watcher.
+// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
+// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
+// If index is zero, watch will start from the current index + 1.
+func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *v2error.Error) {
+	reportWatchRequest()
+	event, err := wh.EventHistory.scan(key, recursive, index)
+
+	if err != nil {
+		err.Index = storeIndex
+		return nil, err
+	}
+
+	w := &watcher{
+		eventChan:  make(chan *Event, 100), // use a buffered channel
+		recursive:  recursive,
+		stream:     stream,
+		sinceIndex: index,
+		startIndex: storeIndex,
+		hub:        wh,
+	}
+
+	wh.mutex.Lock()
+	defer wh.mutex.Unlock()
+	// If the event exists in the known history, append the EtcdIndex and return immediately
+	if event != nil {
+		ne := event.Clone()
+		ne.EtcdIndex = storeIndex
+		w.eventChan <- ne
+		return w, nil
+	}
+
+	l, ok := wh.watchers[key]
+
+	var elem *list.Element
+
+	if ok { // add the new watcher to the back of the list
+		elem = l.PushBack(w)
+	} else { // create a new list and add the new watcher
+		l = list.New()
+		elem = l.PushBack(w)
+		wh.watchers[key] = l
+	}
+
+	w.remove = func() {
+		if w.removed { // avoid removing it twice
+			return
+		}
+		w.removed = true
+		l.Remove(elem)
+		atomic.AddInt64(&wh.count, -1)
+		reportWatcherRemoved()
+		if l.Len() == 0 {
+			delete(wh.watchers, key)
+		}
+	}
+
+	atomic.AddInt64(&wh.count, 1)
+	reportWatcherAdded()
+
+	return w, nil
+}
+
+func (wh *watcherHub) add(e *Event) {
+	wh.EventHistory.addEvent(e)
+}
+
+// notify function accepts an event and notify to the watchers.
+func (wh *watcherHub) notify(e *Event) {
+	e = wh.EventHistory.addEvent(e) // add event into the eventHistory
+
+	segments := strings.Split(e.Node.Key, "/")
+
+	currPath := "/"
+
+	// walk through all the segments of the path and notify the watchers
+	// if the path is "/foo/bar", it will notify watchers with path "/",
+	// "/foo" and "/foo/bar"
+
+	for _, segment := range segments {
+		currPath = path.Join(currPath, segment)
+		// notify the watchers who interests in the changes of current path
+		wh.notifyWatchers(e, currPath, false)
+	}
+}
+
+func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
+	wh.mutex.Lock()
+	defer wh.mutex.Unlock()
+
+	l, ok := wh.watchers[nodePath]
+	if ok {
+		curr := l.Front()
+
+		for curr != nil {
+			next := curr.Next() // save reference to the next one in the list
+
+			w, _ := curr.Value.(*watcher)
+
+			originalPath := e.Node.Key == nodePath
+			if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
+				if !w.stream { // do not remove the stream watcher
+					// if we successfully notify a watcher
+					// we need to remove the watcher from the list
+					// and decrease the counter
+					w.removed = true
+					l.Remove(curr)
+					atomic.AddInt64(&wh.count, -1)
+					reportWatcherRemoved()
+				}
+			}
+
+			curr = next // update current to the next element in the list
+		}
+
+		if l.Len() == 0 {
+			// if we have notified all watcher in the list
+			// we can delete the list
+			delete(wh.watchers, nodePath)
+		}
+	}
+}
+
+// clone function clones the watcherHub and return the cloned one.
+// only clone the static content. do not clone the current watchers.
+func (wh *watcherHub) clone() *watcherHub {
+	clonedHistory := wh.EventHistory.clone()
+
+	return &watcherHub{
+		EventHistory: clonedHistory,
+	}
+}
+
+// isHidden checks to see if key path is considered hidden to watch path i.e. the
+// last element is hidden or it's within a hidden directory
+func isHidden(watchPath, keyPath string) bool {
+	// When deleting a directory, watchPath might be deeper than the actual keyPath
+	// For example, when deleting /foo we also need to notify watchers on /foo/bar.
+	if len(watchPath) > len(keyPath) {
+		return false
+	}
+	// if watch path is just a "/", after path will start without "/"
+	// add a "/" to deal with the special case when watchPath is "/"
+	afterPath := path.Clean("/" + keyPath[len(watchPath):])
+	return strings.Contains(afterPath, "/_")
+}