[VOL-2235] Mocks and interfaces for rw-core
This update consists of mocks that are used by the rw-core
during unit testing. It also includes interfaces used for unit
tests.
Change-Id: I20ca1455c358113c3aa897acc6355e0ddbc614b7
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v2v3/watcher.go b/vendor/go.etcd.io/etcd/etcdserver/api/v2v3/watcher.go
new file mode 100644
index 0000000..e8a3557
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/v2v3/watcher.go
@@ -0,0 +1,140 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package v2v3
+
+import (
+ "context"
+ "strings"
+
+ "go.etcd.io/etcd/clientv3"
+ "go.etcd.io/etcd/etcdserver/api/v2error"
+ "go.etcd.io/etcd/etcdserver/api/v2store"
+)
+
+func (s *v2v3Store) Watch(prefix string, recursive, stream bool, sinceIndex uint64) (v2store.Watcher, error) {
+ ctx, cancel := context.WithCancel(s.ctx)
+ wch := s.c.Watch(
+ ctx,
+ // TODO: very pricey; use a single store-wide watch in future
+ s.pfx,
+ clientv3.WithPrefix(),
+ clientv3.WithRev(int64(sinceIndex)),
+ clientv3.WithCreatedNotify(),
+ clientv3.WithPrevKV())
+ resp, ok := <-wch
+ if err := resp.Err(); err != nil || !ok {
+ cancel()
+ return nil, v2error.NewError(v2error.EcodeRaftInternal, prefix, 0)
+ }
+
+ evc, donec := make(chan *v2store.Event), make(chan struct{})
+ go func() {
+ defer func() {
+ close(evc)
+ close(donec)
+ }()
+ for resp := range wch {
+ for _, ev := range s.mkV2Events(resp) {
+ k := ev.Node.Key
+ if recursive {
+ if !strings.HasPrefix(k, prefix) {
+ continue
+ }
+ // accept events on hidden keys given in prefix
+ k = strings.Replace(k, prefix, "/", 1)
+ // ignore hidden keys deeper than prefix
+ if strings.Contains(k, "/_") {
+ continue
+ }
+ }
+ if !recursive && k != prefix {
+ continue
+ }
+ select {
+ case evc <- ev:
+ case <-ctx.Done():
+ return
+ }
+ if !stream {
+ return
+ }
+ }
+ }
+ }()
+
+ return &v2v3Watcher{
+ startRev: resp.Header.Revision,
+ evc: evc,
+ donec: donec,
+ cancel: cancel,
+ }, nil
+}
+
+func (s *v2v3Store) mkV2Events(wr clientv3.WatchResponse) (evs []*v2store.Event) {
+ ak := s.mkActionKey()
+ for _, rev := range mkRevs(wr) {
+ var act, key *clientv3.Event
+ for _, ev := range rev {
+ if string(ev.Kv.Key) == ak {
+ act = ev
+ } else if key != nil && len(key.Kv.Key) < len(ev.Kv.Key) {
+ // use longest key to ignore intermediate new
+ // directories from Create.
+ key = ev
+ } else if key == nil {
+ key = ev
+ }
+ }
+ v2ev := &v2store.Event{
+ Action: string(act.Kv.Value),
+ Node: s.mkV2Node(key.Kv),
+ PrevNode: s.mkV2Node(key.PrevKv),
+ EtcdIndex: mkV2Rev(wr.Header.Revision),
+ }
+ evs = append(evs, v2ev)
+ }
+ return evs
+}
+
+func mkRevs(wr clientv3.WatchResponse) (revs [][]*clientv3.Event) {
+ var curRev []*clientv3.Event
+ for _, ev := range wr.Events {
+ if curRev != nil && ev.Kv.ModRevision != curRev[0].Kv.ModRevision {
+ revs = append(revs, curRev)
+ curRev = nil
+ }
+ curRev = append(curRev, ev)
+ }
+ if curRev != nil {
+ revs = append(revs, curRev)
+ }
+ return revs
+}
+
+type v2v3Watcher struct {
+ startRev int64
+ evc chan *v2store.Event
+ donec chan struct{}
+ cancel context.CancelFunc
+}
+
+func (w *v2v3Watcher) StartIndex() uint64 { return mkV2Rev(w.startRev) }
+
+func (w *v2v3Watcher) Remove() {
+ w.cancel()
+ <-w.donec
+}
+
+func (w *v2v3Watcher) EventChan() chan *v2store.Event { return w.evc }