| // Copyright 2017 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package v2v3 |
| |
| import ( |
| "context" |
| "strings" |
| |
| "go.etcd.io/etcd/clientv3" |
| "go.etcd.io/etcd/etcdserver/api/v2error" |
| "go.etcd.io/etcd/etcdserver/api/v2store" |
| ) |
| |
| func (s *v2v3Store) Watch(prefix string, recursive, stream bool, sinceIndex uint64) (v2store.Watcher, error) { |
| ctx, cancel := context.WithCancel(s.ctx) |
| wch := s.c.Watch( |
| ctx, |
| // TODO: very pricey; use a single store-wide watch in future |
| s.pfx, |
| clientv3.WithPrefix(), |
| clientv3.WithRev(int64(sinceIndex)), |
| clientv3.WithCreatedNotify(), |
| clientv3.WithPrevKV()) |
| resp, ok := <-wch |
| if err := resp.Err(); err != nil || !ok { |
| cancel() |
| return nil, v2error.NewError(v2error.EcodeRaftInternal, prefix, 0) |
| } |
| |
| evc, donec := make(chan *v2store.Event), make(chan struct{}) |
| go func() { |
| defer func() { |
| close(evc) |
| close(donec) |
| }() |
| for resp := range wch { |
| for _, ev := range s.mkV2Events(resp) { |
| k := ev.Node.Key |
| if recursive { |
| if !strings.HasPrefix(k, prefix) { |
| continue |
| } |
| // accept events on hidden keys given in prefix |
| k = strings.Replace(k, prefix, "/", 1) |
| // ignore hidden keys deeper than prefix |
| if strings.Contains(k, "/_") { |
| continue |
| } |
| } |
| if !recursive && k != prefix { |
| continue |
| } |
| select { |
| case evc <- ev: |
| case <-ctx.Done(): |
| return |
| } |
| if !stream { |
| return |
| } |
| } |
| } |
| }() |
| |
| return &v2v3Watcher{ |
| startRev: resp.Header.Revision, |
| evc: evc, |
| donec: donec, |
| cancel: cancel, |
| }, nil |
| } |
| |
| func (s *v2v3Store) mkV2Events(wr clientv3.WatchResponse) (evs []*v2store.Event) { |
| ak := s.mkActionKey() |
| for _, rev := range mkRevs(wr) { |
| var act, key *clientv3.Event |
| for _, ev := range rev { |
| if string(ev.Kv.Key) == ak { |
| act = ev |
| } else if key != nil && len(key.Kv.Key) < len(ev.Kv.Key) { |
| // use longest key to ignore intermediate new |
| // directories from Create. |
| key = ev |
| } else if key == nil { |
| key = ev |
| } |
| } |
| v2ev := &v2store.Event{ |
| Action: string(act.Kv.Value), |
| Node: s.mkV2Node(key.Kv), |
| PrevNode: s.mkV2Node(key.PrevKv), |
| EtcdIndex: mkV2Rev(wr.Header.Revision), |
| } |
| evs = append(evs, v2ev) |
| } |
| return evs |
| } |
| |
| func mkRevs(wr clientv3.WatchResponse) (revs [][]*clientv3.Event) { |
| var curRev []*clientv3.Event |
| for _, ev := range wr.Events { |
| if curRev != nil && ev.Kv.ModRevision != curRev[0].Kv.ModRevision { |
| revs = append(revs, curRev) |
| curRev = nil |
| } |
| curRev = append(curRev, ev) |
| } |
| if curRev != nil { |
| revs = append(revs, curRev) |
| } |
| return revs |
| } |
| |
| type v2v3Watcher struct { |
| startRev int64 |
| evc chan *v2store.Event |
| donec chan struct{} |
| cancel context.CancelFunc |
| } |
| |
| func (w *v2v3Watcher) StartIndex() uint64 { return mkV2Rev(w.startRev) } |
| |
| func (w *v2v3Watcher) Remove() { |
| w.cancel() |
| <-w.donec |
| } |
| |
| func (w *v2v3Watcher) EventChan() chan *v2store.Event { return w.evc } |