blob: 151f0de718517d105cef3df96c8d87f469e879b8 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "fmt"
19 "math"
20
21 "go.etcd.io/etcd/mvcc/mvccpb"
22 "go.etcd.io/etcd/pkg/adt"
23)
24
25var (
26 // watchBatchMaxRevs is the maximum distinct revisions that
27 // may be sent to an unsynced watcher at a time. Declared as
28 // var instead of const for testing purposes.
29 watchBatchMaxRevs = 1000
30)
31
32type eventBatch struct {
33 // evs is a batch of revision-ordered events
34 evs []mvccpb.Event
35 // revs is the minimum unique revisions observed for this batch
36 revs int
37 // moreRev is first revision with more events following this batch
38 moreRev int64
39}
40
41func (eb *eventBatch) add(ev mvccpb.Event) {
42 if eb.revs > watchBatchMaxRevs {
43 // maxed out batch size
44 return
45 }
46
47 if len(eb.evs) == 0 {
48 // base case
49 eb.revs = 1
50 eb.evs = append(eb.evs, ev)
51 return
52 }
53
54 // revision accounting
55 ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
56 evRev := ev.Kv.ModRevision
57 if evRev > ebRev {
58 eb.revs++
59 if eb.revs > watchBatchMaxRevs {
60 eb.moreRev = evRev
61 return
62 }
63 }
64
65 eb.evs = append(eb.evs, ev)
66}
67
68type watcherBatch map[*watcher]*eventBatch
69
70func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
71 eb := wb[w]
72 if eb == nil {
73 eb = &eventBatch{}
74 wb[w] = eb
75 }
76 eb.add(ev)
77}
78
79// newWatcherBatch maps watchers to their matched events. It enables quick
80// events look up by watcher.
81func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
82 if len(wg.watchers) == 0 {
83 return nil
84 }
85
86 wb := make(watcherBatch)
87 for _, ev := range evs {
88 for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
89 if ev.Kv.ModRevision >= w.minRev {
90 // don't double notify
91 wb.add(w, ev)
92 }
93 }
94 }
95 return wb
96}
97
98type watcherSet map[*watcher]struct{}
99
100func (w watcherSet) add(wa *watcher) {
101 if _, ok := w[wa]; ok {
102 panic("add watcher twice!")
103 }
104 w[wa] = struct{}{}
105}
106
107func (w watcherSet) union(ws watcherSet) {
108 for wa := range ws {
109 w.add(wa)
110 }
111}
112
113func (w watcherSet) delete(wa *watcher) {
114 if _, ok := w[wa]; !ok {
115 panic("removing missing watcher!")
116 }
117 delete(w, wa)
118}
119
120type watcherSetByKey map[string]watcherSet
121
122func (w watcherSetByKey) add(wa *watcher) {
123 set := w[string(wa.key)]
124 if set == nil {
125 set = make(watcherSet)
126 w[string(wa.key)] = set
127 }
128 set.add(wa)
129}
130
131func (w watcherSetByKey) delete(wa *watcher) bool {
132 k := string(wa.key)
133 if v, ok := w[k]; ok {
134 if _, ok := v[wa]; ok {
135 delete(v, wa)
136 if len(v) == 0 {
137 // remove the set; nothing left
138 delete(w, k)
139 }
140 return true
141 }
142 }
143 return false
144}
145
146// watcherGroup is a collection of watchers organized by their ranges
147type watcherGroup struct {
148 // keyWatchers has the watchers that watch on a single key
149 keyWatchers watcherSetByKey
150 // ranges has the watchers that watch a range; it is sorted by interval
151 ranges adt.IntervalTree
152 // watchers is the set of all watchers
153 watchers watcherSet
154}
155
156func newWatcherGroup() watcherGroup {
157 return watcherGroup{
158 keyWatchers: make(watcherSetByKey),
159 ranges: adt.NewIntervalTree(),
160 watchers: make(watcherSet),
161 }
162}
163
164// add puts a watcher in the group.
165func (wg *watcherGroup) add(wa *watcher) {
166 wg.watchers.add(wa)
167 if wa.end == nil {
168 wg.keyWatchers.add(wa)
169 return
170 }
171
172 // interval already registered?
173 ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
174 if iv := wg.ranges.Find(ivl); iv != nil {
175 iv.Val.(watcherSet).add(wa)
176 return
177 }
178
179 // not registered, put in interval tree
180 ws := make(watcherSet)
181 ws.add(wa)
182 wg.ranges.Insert(ivl, ws)
183}
184
185// contains is whether the given key has a watcher in the group.
186func (wg *watcherGroup) contains(key string) bool {
187 _, ok := wg.keyWatchers[key]
188 return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
189}
190
191// size gives the number of unique watchers in the group.
192func (wg *watcherGroup) size() int { return len(wg.watchers) }
193
194// delete removes a watcher from the group.
195func (wg *watcherGroup) delete(wa *watcher) bool {
196 if _, ok := wg.watchers[wa]; !ok {
197 return false
198 }
199 wg.watchers.delete(wa)
200 if wa.end == nil {
201 wg.keyWatchers.delete(wa)
202 return true
203 }
204
205 ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
206 iv := wg.ranges.Find(ivl)
207 if iv == nil {
208 return false
209 }
210
211 ws := iv.Val.(watcherSet)
212 delete(ws, wa)
213 if len(ws) == 0 {
214 // remove interval missing watchers
215 if ok := wg.ranges.Delete(ivl); !ok {
216 panic("could not remove watcher from interval tree")
217 }
218 }
219
220 return true
221}
222
223// choose selects watchers from the watcher group to update
224func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
225 if len(wg.watchers) < maxWatchers {
226 return wg, wg.chooseAll(curRev, compactRev)
227 }
228 ret := newWatcherGroup()
229 for w := range wg.watchers {
230 if maxWatchers <= 0 {
231 break
232 }
233 maxWatchers--
234 ret.add(w)
235 }
236 return &ret, ret.chooseAll(curRev, compactRev)
237}
238
239func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
240 minRev := int64(math.MaxInt64)
241 for w := range wg.watchers {
242 if w.minRev > curRev {
243 // after network partition, possibly choosing future revision watcher from restore operation
244 // with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
245 // do not panic when such watcher had been moved from "synced" watcher during restore operation
246 if !w.restore {
247 panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
248 }
249
250 // mark 'restore' done, since it's chosen
251 w.restore = false
252 }
253 if w.minRev < compactRev {
254 select {
255 case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
256 w.compacted = true
257 wg.delete(w)
258 default:
259 // retry next time
260 }
261 continue
262 }
263 if minRev > w.minRev {
264 minRev = w.minRev
265 }
266 }
267 return minRev
268}
269
270// watcherSetByKey gets the set of watchers that receive events on the given key.
271func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
272 wkeys := wg.keyWatchers[key]
273 wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
274
275 // zero-copy cases
276 switch {
277 case len(wranges) == 0:
278 // no need to merge ranges or copy; reuse single-key set
279 return wkeys
280 case len(wranges) == 0 && len(wkeys) == 0:
281 return nil
282 case len(wranges) == 1 && len(wkeys) == 0:
283 return wranges[0].Val.(watcherSet)
284 }
285
286 // copy case
287 ret := make(watcherSet)
288 ret.union(wg.keyWatchers[key])
289 for _, item := range wranges {
290 ret.union(item.Val.(watcherSet))
291 }
292 return ret
293}