blob: a452e7e951f617b346c81dc0094784084dec9afd [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v2store
16
17import (
18 "container/list"
19 "path"
20 "strings"
21 "sync"
22 "sync/atomic"
23
24 "go.etcd.io/etcd/etcdserver/api/v2error"
25)
26
27// A watcherHub contains all subscribed watchers
28// watchers is a map with watched path as key and watcher as value
29// EventHistory keeps the old events for watcherHub. It is used to help
30// watcher to get a continuous event history. Or a watcher might miss the
31// event happens between the end of the first watch command and the start
32// of the second command.
33type watcherHub struct {
34 // count must be the first element to keep 64-bit alignment for atomic
35 // access
36
37 count int64 // current number of watchers.
38
39 mutex sync.Mutex
40 watchers map[string]*list.List
41 EventHistory *EventHistory
42}
43
44// newWatchHub creates a watcherHub. The capacity determines how many events we will
45// keep in the eventHistory.
46// Typically, we only need to keep a small size of history[smaller than 20K].
47// Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
48func newWatchHub(capacity int) *watcherHub {
49 return &watcherHub{
50 watchers: make(map[string]*list.List),
51 EventHistory: newEventHistory(capacity),
52 }
53}
54
55// Watch function returns a Watcher.
56// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
57// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
58// If index is zero, watch will start from the current index + 1.
59func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *v2error.Error) {
60 reportWatchRequest()
61 event, err := wh.EventHistory.scan(key, recursive, index)
62
63 if err != nil {
64 err.Index = storeIndex
65 return nil, err
66 }
67
68 w := &watcher{
69 eventChan: make(chan *Event, 100), // use a buffered channel
70 recursive: recursive,
71 stream: stream,
72 sinceIndex: index,
73 startIndex: storeIndex,
74 hub: wh,
75 }
76
77 wh.mutex.Lock()
78 defer wh.mutex.Unlock()
79 // If the event exists in the known history, append the EtcdIndex and return immediately
80 if event != nil {
81 ne := event.Clone()
82 ne.EtcdIndex = storeIndex
83 w.eventChan <- ne
84 return w, nil
85 }
86
87 l, ok := wh.watchers[key]
88
89 var elem *list.Element
90
91 if ok { // add the new watcher to the back of the list
92 elem = l.PushBack(w)
93 } else { // create a new list and add the new watcher
94 l = list.New()
95 elem = l.PushBack(w)
96 wh.watchers[key] = l
97 }
98
99 w.remove = func() {
100 if w.removed { // avoid removing it twice
101 return
102 }
103 w.removed = true
104 l.Remove(elem)
105 atomic.AddInt64(&wh.count, -1)
106 reportWatcherRemoved()
107 if l.Len() == 0 {
108 delete(wh.watchers, key)
109 }
110 }
111
112 atomic.AddInt64(&wh.count, 1)
113 reportWatcherAdded()
114
115 return w, nil
116}
117
118func (wh *watcherHub) add(e *Event) {
119 wh.EventHistory.addEvent(e)
120}
121
122// notify function accepts an event and notify to the watchers.
123func (wh *watcherHub) notify(e *Event) {
124 e = wh.EventHistory.addEvent(e) // add event into the eventHistory
125
126 segments := strings.Split(e.Node.Key, "/")
127
128 currPath := "/"
129
130 // walk through all the segments of the path and notify the watchers
131 // if the path is "/foo/bar", it will notify watchers with path "/",
132 // "/foo" and "/foo/bar"
133
134 for _, segment := range segments {
135 currPath = path.Join(currPath, segment)
136 // notify the watchers who interests in the changes of current path
137 wh.notifyWatchers(e, currPath, false)
138 }
139}
140
141func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
142 wh.mutex.Lock()
143 defer wh.mutex.Unlock()
144
145 l, ok := wh.watchers[nodePath]
146 if ok {
147 curr := l.Front()
148
149 for curr != nil {
150 next := curr.Next() // save reference to the next one in the list
151
152 w, _ := curr.Value.(*watcher)
153
154 originalPath := e.Node.Key == nodePath
155 if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
156 if !w.stream { // do not remove the stream watcher
157 // if we successfully notify a watcher
158 // we need to remove the watcher from the list
159 // and decrease the counter
160 w.removed = true
161 l.Remove(curr)
162 atomic.AddInt64(&wh.count, -1)
163 reportWatcherRemoved()
164 }
165 }
166
167 curr = next // update current to the next element in the list
168 }
169
170 if l.Len() == 0 {
171 // if we have notified all watcher in the list
172 // we can delete the list
173 delete(wh.watchers, nodePath)
174 }
175 }
176}
177
178// clone function clones the watcherHub and return the cloned one.
179// only clone the static content. do not clone the current watchers.
180func (wh *watcherHub) clone() *watcherHub {
181 clonedHistory := wh.EventHistory.clone()
182
183 return &watcherHub{
184 EventHistory: clonedHistory,
185 }
186}
187
188// isHidden checks to see if key path is considered hidden to watch path i.e. the
189// last element is hidden or it's within a hidden directory
190func isHidden(watchPath, keyPath string) bool {
191 // When deleting a directory, watchPath might be deeper than the actual keyPath
192 // For example, when deleting /foo we also need to notify watchers on /foo/bar.
193 if len(watchPath) > len(keyPath) {
194 return false
195 }
196 // if watch path is just a "/", after path will start without "/"
197 // add a "/" to deal with the special case when watchPath is "/"
198 afterPath := path.Clean("/" + keyPath[len(watchPath):])
199 return strings.Contains(afterPath, "/_")
200}