khenaidoo | ffe076b | 2019-01-15 16:08:08 -0500 | [diff] [blame^] | 1 | // Copyright 2017 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package namespace |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "sync" |
| 20 | |
| 21 | "github.com/coreos/etcd/clientv3" |
| 22 | ) |
| 23 | |
| 24 | type watcherPrefix struct { |
| 25 | clientv3.Watcher |
| 26 | pfx string |
| 27 | |
| 28 | wg sync.WaitGroup |
| 29 | stopc chan struct{} |
| 30 | stopOnce sync.Once |
| 31 | } |
| 32 | |
| 33 | // NewWatcher wraps a Watcher instance so that all Watch requests |
| 34 | // are prefixed with a given string and all Watch responses have |
| 35 | // the prefix removed. |
| 36 | func NewWatcher(w clientv3.Watcher, prefix string) clientv3.Watcher { |
| 37 | return &watcherPrefix{Watcher: w, pfx: prefix, stopc: make(chan struct{})} |
| 38 | } |
| 39 | |
| 40 | func (w *watcherPrefix) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { |
| 41 | // since OpOption is opaque, determine range for prefixing through an OpGet |
| 42 | op := clientv3.OpGet(key, opts...) |
| 43 | end := op.RangeBytes() |
| 44 | pfxBegin, pfxEnd := prefixInterval(w.pfx, []byte(key), end) |
| 45 | if pfxEnd != nil { |
| 46 | opts = append(opts, clientv3.WithRange(string(pfxEnd))) |
| 47 | } |
| 48 | |
| 49 | wch := w.Watcher.Watch(ctx, string(pfxBegin), opts...) |
| 50 | |
| 51 | // translate watch events from prefixed to unprefixed |
| 52 | pfxWch := make(chan clientv3.WatchResponse) |
| 53 | w.wg.Add(1) |
| 54 | go func() { |
| 55 | defer func() { |
| 56 | close(pfxWch) |
| 57 | w.wg.Done() |
| 58 | }() |
| 59 | for wr := range wch { |
| 60 | for i := range wr.Events { |
| 61 | wr.Events[i].Kv.Key = wr.Events[i].Kv.Key[len(w.pfx):] |
| 62 | if wr.Events[i].PrevKv != nil { |
| 63 | wr.Events[i].PrevKv.Key = wr.Events[i].Kv.Key |
| 64 | } |
| 65 | } |
| 66 | select { |
| 67 | case pfxWch <- wr: |
| 68 | case <-ctx.Done(): |
| 69 | return |
| 70 | case <-w.stopc: |
| 71 | return |
| 72 | } |
| 73 | } |
| 74 | }() |
| 75 | return pfxWch |
| 76 | } |
| 77 | |
| 78 | func (w *watcherPrefix) Close() error { |
| 79 | err := w.Watcher.Close() |
| 80 | w.stopOnce.Do(func() { close(w.stopc) }) |
| 81 | w.wg.Wait() |
| 82 | return err |
| 83 | } |