blob: b65c7bc5eb7e8c66f4c4fb2c1bddac78a13192ad [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "fmt"
19 "math"
20
21 "github.com/coreos/etcd/mvcc/mvccpb"
22 "github.com/coreos/etcd/pkg/adt"
23)
24
25var (
26 // watchBatchMaxRevs is the maximum distinct revisions that
27 // may be sent to an unsynced watcher at a time. Declared as
28 // var instead of const for testing purposes.
29 watchBatchMaxRevs = 1000
30)
31
32type eventBatch struct {
33 // evs is a batch of revision-ordered events
34 evs []mvccpb.Event
35 // revs is the minimum unique revisions observed for this batch
36 revs int
37 // moreRev is first revision with more events following this batch
38 moreRev int64
39}
40
41func (eb *eventBatch) add(ev mvccpb.Event) {
42 if eb.revs > watchBatchMaxRevs {
43 // maxed out batch size
44 return
45 }
46
47 if len(eb.evs) == 0 {
48 // base case
49 eb.revs = 1
50 eb.evs = append(eb.evs, ev)
51 return
52 }
53
54 // revision accounting
55 ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
56 evRev := ev.Kv.ModRevision
57 if evRev > ebRev {
58 eb.revs++
59 if eb.revs > watchBatchMaxRevs {
60 eb.moreRev = evRev
61 return
62 }
63 }
64
65 eb.evs = append(eb.evs, ev)
66}
67
68type watcherBatch map[*watcher]*eventBatch
69
70func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
71 eb := wb[w]
72 if eb == nil {
73 eb = &eventBatch{}
74 wb[w] = eb
75 }
76 eb.add(ev)
77}
78
79// newWatcherBatch maps watchers to their matched events. It enables quick
80// events look up by watcher.
81func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
82 if len(wg.watchers) == 0 {
83 return nil
84 }
85
86 wb := make(watcherBatch)
87 for _, ev := range evs {
88 for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
89 if ev.Kv.ModRevision >= w.minRev {
90 // don't double notify
91 wb.add(w, ev)
92 }
93 }
94 }
95 return wb
96}
97
98type watcherSet map[*watcher]struct{}
99
100func (w watcherSet) add(wa *watcher) {
101 if _, ok := w[wa]; ok {
102 panic("add watcher twice!")
103 }
104 w[wa] = struct{}{}
105}
106
107func (w watcherSet) union(ws watcherSet) {
108 for wa := range ws {
109 w.add(wa)
110 }
111}
112
113func (w watcherSet) delete(wa *watcher) {
114 if _, ok := w[wa]; !ok {
115 panic("removing missing watcher!")
116 }
117 delete(w, wa)
118}
119
120type watcherSetByKey map[string]watcherSet
121
122func (w watcherSetByKey) add(wa *watcher) {
123 set := w[string(wa.key)]
124 if set == nil {
125 set = make(watcherSet)
126 w[string(wa.key)] = set
127 }
128 set.add(wa)
129}
130
131func (w watcherSetByKey) delete(wa *watcher) bool {
132 k := string(wa.key)
133 if v, ok := w[k]; ok {
134 if _, ok := v[wa]; ok {
135 delete(v, wa)
136 if len(v) == 0 {
137 // remove the set; nothing left
138 delete(w, k)
139 }
140 return true
141 }
142 }
143 return false
144}
145
146// watcherGroup is a collection of watchers organized by their ranges
147type watcherGroup struct {
148 // keyWatchers has the watchers that watch on a single key
149 keyWatchers watcherSetByKey
150 // ranges has the watchers that watch a range; it is sorted by interval
151 ranges adt.IntervalTree
152 // watchers is the set of all watchers
153 watchers watcherSet
154}
155
156func newWatcherGroup() watcherGroup {
157 return watcherGroup{
158 keyWatchers: make(watcherSetByKey),
159 watchers: make(watcherSet),
160 }
161}
162
163// add puts a watcher in the group.
164func (wg *watcherGroup) add(wa *watcher) {
165 wg.watchers.add(wa)
166 if wa.end == nil {
167 wg.keyWatchers.add(wa)
168 return
169 }
170
171 // interval already registered?
172 ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
173 if iv := wg.ranges.Find(ivl); iv != nil {
174 iv.Val.(watcherSet).add(wa)
175 return
176 }
177
178 // not registered, put in interval tree
179 ws := make(watcherSet)
180 ws.add(wa)
181 wg.ranges.Insert(ivl, ws)
182}
183
184// contains is whether the given key has a watcher in the group.
185func (wg *watcherGroup) contains(key string) bool {
186 _, ok := wg.keyWatchers[key]
187 return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
188}
189
190// size gives the number of unique watchers in the group.
191func (wg *watcherGroup) size() int { return len(wg.watchers) }
192
193// delete removes a watcher from the group.
194func (wg *watcherGroup) delete(wa *watcher) bool {
195 if _, ok := wg.watchers[wa]; !ok {
196 return false
197 }
198 wg.watchers.delete(wa)
199 if wa.end == nil {
200 wg.keyWatchers.delete(wa)
201 return true
202 }
203
204 ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
205 iv := wg.ranges.Find(ivl)
206 if iv == nil {
207 return false
208 }
209
210 ws := iv.Val.(watcherSet)
211 delete(ws, wa)
212 if len(ws) == 0 {
213 // remove interval missing watchers
214 if ok := wg.ranges.Delete(ivl); !ok {
215 panic("could not remove watcher from interval tree")
216 }
217 }
218
219 return true
220}
221
222// choose selects watchers from the watcher group to update
223func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
224 if len(wg.watchers) < maxWatchers {
225 return wg, wg.chooseAll(curRev, compactRev)
226 }
227 ret := newWatcherGroup()
228 for w := range wg.watchers {
229 if maxWatchers <= 0 {
230 break
231 }
232 maxWatchers--
233 ret.add(w)
234 }
235 return &ret, ret.chooseAll(curRev, compactRev)
236}
237
238func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
239 minRev := int64(math.MaxInt64)
240 for w := range wg.watchers {
241 if w.minRev > curRev {
242 // after network partition, possibly choosing future revision watcher from restore operation
243 // with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
244 // do not panic when such watcher had been moved from "synced" watcher during restore operation
245 if !w.restore {
246 panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
247 }
248
249 // mark 'restore' done, since it's chosen
250 w.restore = false
251 }
252 if w.minRev < compactRev {
253 select {
254 case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
255 w.compacted = true
256 wg.delete(w)
257 default:
258 // retry next time
259 }
260 continue
261 }
262 if minRev > w.minRev {
263 minRev = w.minRev
264 }
265 }
266 return minRev
267}
268
269// watcherSetByKey gets the set of watchers that receive events on the given key.
270func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
271 wkeys := wg.keyWatchers[key]
272 wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
273
274 // zero-copy cases
275 switch {
276 case len(wranges) == 0:
277 // no need to merge ranges or copy; reuse single-key set
278 return wkeys
279 case len(wranges) == 0 && len(wkeys) == 0:
280 return nil
281 case len(wranges) == 1 && len(wkeys) == 0:
282 return wranges[0].Val.(watcherSet)
283 }
284
285 // copy case
286 ret := make(watcherSet)
287 ret.union(wg.keyWatchers[key])
288 for _, item := range wranges {
289 ret.union(item.Val.(watcherSet))
290 }
291 return ret
292}