blob: 5a9596df5d8f6ad4004ded6b48a8e8839888afe7 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package namespace
16
17import (
18 "context"
19 "sync"
20
21 "github.com/coreos/etcd/clientv3"
22)
23
24type watcherPrefix struct {
25 clientv3.Watcher
26 pfx string
27
28 wg sync.WaitGroup
29 stopc chan struct{}
30 stopOnce sync.Once
31}
32
33// NewWatcher wraps a Watcher instance so that all Watch requests
34// are prefixed with a given string and all Watch responses have
35// the prefix removed.
36func NewWatcher(w clientv3.Watcher, prefix string) clientv3.Watcher {
37 return &watcherPrefix{Watcher: w, pfx: prefix, stopc: make(chan struct{})}
38}
39
40func (w *watcherPrefix) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
41 // since OpOption is opaque, determine range for prefixing through an OpGet
42 op := clientv3.OpGet(key, opts...)
43 end := op.RangeBytes()
44 pfxBegin, pfxEnd := prefixInterval(w.pfx, []byte(key), end)
45 if pfxEnd != nil {
46 opts = append(opts, clientv3.WithRange(string(pfxEnd)))
47 }
48
49 wch := w.Watcher.Watch(ctx, string(pfxBegin), opts...)
50
51 // translate watch events from prefixed to unprefixed
52 pfxWch := make(chan clientv3.WatchResponse)
53 w.wg.Add(1)
54 go func() {
55 defer func() {
56 close(pfxWch)
57 w.wg.Done()
58 }()
59 for wr := range wch {
60 for i := range wr.Events {
61 wr.Events[i].Kv.Key = wr.Events[i].Kv.Key[len(w.pfx):]
62 if wr.Events[i].PrevKv != nil {
63 wr.Events[i].PrevKv.Key = wr.Events[i].Kv.Key
64 }
65 }
66 select {
67 case pfxWch <- wr:
68 case <-ctx.Done():
69 return
70 case <-w.stopc:
71 return
72 }
73 }
74 }()
75 return pfxWch
76}
77
78func (w *watcherPrefix) Close() error {
79 err := w.Watcher.Close()
80 w.stopOnce.Do(func() { close(w.stopc) })
81 w.wg.Wait()
82 return err
83}