blob: 78df19326b956b47d229805238a4c8dc369787a7 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "sync"
19 "time"
20
21 "github.com/coreos/etcd/lease"
22 "github.com/coreos/etcd/mvcc/backend"
23 "github.com/coreos/etcd/mvcc/mvccpb"
24)
25
26// non-const so modifiable by tests
27var (
28 // chanBufLen is the length of the buffered chan
29 // for sending out watched events.
30 // TODO: find a good buf value. 1024 is just a random one that
31 // seems to be reasonable.
32 chanBufLen = 1024
33
34 // maxWatchersPerSync is the number of watchers to sync in a single batch
35 maxWatchersPerSync = 512
36)
37
38type watchable interface {
39 watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
40 progress(w *watcher)
41 rev() int64
42}
43
44type watchableStore struct {
45 *store
46
47 // mu protects watcher groups and batches. It should never be locked
48 // before locking store.mu to avoid deadlock.
49 mu sync.RWMutex
50
51 // victims are watcher batches that were blocked on the watch channel
52 victims []watcherBatch
53 victimc chan struct{}
54
55 // contains all unsynced watchers that needs to sync with events that have happened
56 unsynced watcherGroup
57
58 // contains all synced watchers that are in sync with the progress of the store.
59 // The key of the map is the key that the watcher watches on.
60 synced watcherGroup
61
62 stopc chan struct{}
63 wg sync.WaitGroup
64}
65
66// cancelFunc updates unsynced and synced maps when running
67// cancel operations.
68type cancelFunc func()
69
70func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
71 return newWatchableStore(b, le, ig)
72}
73
74func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
75 s := &watchableStore{
76 store: NewStore(b, le, ig),
77 victimc: make(chan struct{}, 1),
78 unsynced: newWatcherGroup(),
79 synced: newWatcherGroup(),
80 stopc: make(chan struct{}),
81 }
82 s.store.ReadView = &readView{s}
83 s.store.WriteView = &writeView{s}
84 if s.le != nil {
85 // use this store as the deleter so revokes trigger watch events
86 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
87 }
88 s.wg.Add(2)
89 go s.syncWatchersLoop()
90 go s.syncVictimsLoop()
91 return s
92}
93
94func (s *watchableStore) Close() error {
95 close(s.stopc)
96 s.wg.Wait()
97 return s.store.Close()
98}
99
100func (s *watchableStore) NewWatchStream() WatchStream {
101 watchStreamGauge.Inc()
102 return &watchStream{
103 watchable: s,
104 ch: make(chan WatchResponse, chanBufLen),
105 cancels: make(map[WatchID]cancelFunc),
106 watchers: make(map[WatchID]*watcher),
107 }
108}
109
110func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
111 wa := &watcher{
112 key: key,
113 end: end,
114 minRev: startRev,
115 id: id,
116 ch: ch,
117 fcs: fcs,
118 }
119
120 s.mu.Lock()
121 s.revMu.RLock()
122 synced := startRev > s.store.currentRev || startRev == 0
123 if synced {
124 wa.minRev = s.store.currentRev + 1
125 if startRev > wa.minRev {
126 wa.minRev = startRev
127 }
128 }
129 if synced {
130 s.synced.add(wa)
131 } else {
132 slowWatcherGauge.Inc()
133 s.unsynced.add(wa)
134 }
135 s.revMu.RUnlock()
136 s.mu.Unlock()
137
138 watcherGauge.Inc()
139
140 return wa, func() { s.cancelWatcher(wa) }
141}
142
143// cancelWatcher removes references of the watcher from the watchableStore
144func (s *watchableStore) cancelWatcher(wa *watcher) {
145 for {
146 s.mu.Lock()
147 if s.unsynced.delete(wa) {
148 slowWatcherGauge.Dec()
149 break
150 } else if s.synced.delete(wa) {
151 break
152 } else if wa.compacted {
153 break
154 } else if wa.ch == nil {
155 // already canceled (e.g., cancel/close race)
156 break
157 }
158
159 if !wa.victim {
160 panic("watcher not victim but not in watch groups")
161 }
162
163 var victimBatch watcherBatch
164 for _, wb := range s.victims {
165 if wb[wa] != nil {
166 victimBatch = wb
167 break
168 }
169 }
170 if victimBatch != nil {
171 slowWatcherGauge.Dec()
172 delete(victimBatch, wa)
173 break
174 }
175
176 // victim being processed so not accessible; retry
177 s.mu.Unlock()
178 time.Sleep(time.Millisecond)
179 }
180
181 watcherGauge.Dec()
182 wa.ch = nil
183 s.mu.Unlock()
184}
185
186func (s *watchableStore) Restore(b backend.Backend) error {
187 s.mu.Lock()
188 defer s.mu.Unlock()
189 err := s.store.Restore(b)
190 if err != nil {
191 return err
192 }
193
194 for wa := range s.synced.watchers {
195 wa.restore = true
196 s.unsynced.add(wa)
197 }
198 s.synced = newWatcherGroup()
199 return nil
200}
201
202// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
203func (s *watchableStore) syncWatchersLoop() {
204 defer s.wg.Done()
205
206 for {
207 s.mu.RLock()
208 st := time.Now()
209 lastUnsyncedWatchers := s.unsynced.size()
210 s.mu.RUnlock()
211
212 unsyncedWatchers := 0
213 if lastUnsyncedWatchers > 0 {
214 unsyncedWatchers = s.syncWatchers()
215 }
216 syncDuration := time.Since(st)
217
218 waitDuration := 100 * time.Millisecond
219 // more work pending?
220 if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
221 // be fair to other store operations by yielding time taken
222 waitDuration = syncDuration
223 }
224
225 select {
226 case <-time.After(waitDuration):
227 case <-s.stopc:
228 return
229 }
230 }
231}
232
233// syncVictimsLoop tries to write precomputed watcher responses to
234// watchers that had a blocked watcher channel
235func (s *watchableStore) syncVictimsLoop() {
236 defer s.wg.Done()
237
238 for {
239 for s.moveVictims() != 0 {
240 // try to update all victim watchers
241 }
242 s.mu.RLock()
243 isEmpty := len(s.victims) == 0
244 s.mu.RUnlock()
245
246 var tickc <-chan time.Time
247 if !isEmpty {
248 tickc = time.After(10 * time.Millisecond)
249 }
250
251 select {
252 case <-tickc:
253 case <-s.victimc:
254 case <-s.stopc:
255 return
256 }
257 }
258}
259
260// moveVictims tries to update watches with already pending event data
261func (s *watchableStore) moveVictims() (moved int) {
262 s.mu.Lock()
263 victims := s.victims
264 s.victims = nil
265 s.mu.Unlock()
266
267 var newVictim watcherBatch
268 for _, wb := range victims {
269 // try to send responses again
270 for w, eb := range wb {
271 // watcher has observed the store up to, but not including, w.minRev
272 rev := w.minRev - 1
273 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
274 pendingEventsGauge.Add(float64(len(eb.evs)))
275 } else {
276 if newVictim == nil {
277 newVictim = make(watcherBatch)
278 }
279 newVictim[w] = eb
280 continue
281 }
282 moved++
283 }
284
285 // assign completed victim watchers to unsync/sync
286 s.mu.Lock()
287 s.store.revMu.RLock()
288 curRev := s.store.currentRev
289 for w, eb := range wb {
290 if newVictim != nil && newVictim[w] != nil {
291 // couldn't send watch response; stays victim
292 continue
293 }
294 w.victim = false
295 if eb.moreRev != 0 {
296 w.minRev = eb.moreRev
297 }
298 if w.minRev <= curRev {
299 s.unsynced.add(w)
300 } else {
301 slowWatcherGauge.Dec()
302 s.synced.add(w)
303 }
304 }
305 s.store.revMu.RUnlock()
306 s.mu.Unlock()
307 }
308
309 if len(newVictim) > 0 {
310 s.mu.Lock()
311 s.victims = append(s.victims, newVictim)
312 s.mu.Unlock()
313 }
314
315 return moved
316}
317
318// syncWatchers syncs unsynced watchers by:
319// 1. choose a set of watchers from the unsynced watcher group
320// 2. iterate over the set to get the minimum revision and remove compacted watchers
321// 3. use minimum revision to get all key-value pairs and send those events to watchers
322// 4. remove synced watchers in set from unsynced group and move to synced group
323func (s *watchableStore) syncWatchers() int {
324 s.mu.Lock()
325 defer s.mu.Unlock()
326
327 if s.unsynced.size() == 0 {
328 return 0
329 }
330
331 s.store.revMu.RLock()
332 defer s.store.revMu.RUnlock()
333
334 // in order to find key-value pairs from unsynced watchers, we need to
335 // find min revision index, and these revisions can be used to
336 // query the backend store of key-value pairs
337 curRev := s.store.currentRev
338 compactionRev := s.store.compactMainRev
339
340 wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
341 minBytes, maxBytes := newRevBytes(), newRevBytes()
342 revToBytes(revision{main: minRev}, minBytes)
343 revToBytes(revision{main: curRev + 1}, maxBytes)
344
345 // UnsafeRange returns keys and values. And in boltdb, keys are revisions.
346 // values are actual key-value pairs in backend.
347 tx := s.store.b.ReadTx()
348 tx.Lock()
349 revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
350 evs := kvsToEvents(wg, revs, vs)
351 tx.Unlock()
352
353 var victims watcherBatch
354 wb := newWatcherBatch(wg, evs)
355 for w := range wg.watchers {
356 w.minRev = curRev + 1
357
358 eb, ok := wb[w]
359 if !ok {
360 // bring un-notified watcher to synced
361 s.synced.add(w)
362 s.unsynced.delete(w)
363 continue
364 }
365
366 if eb.moreRev != 0 {
367 w.minRev = eb.moreRev
368 }
369
370 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
371 pendingEventsGauge.Add(float64(len(eb.evs)))
372 } else {
373 if victims == nil {
374 victims = make(watcherBatch)
375 }
376 w.victim = true
377 }
378
379 if w.victim {
380 victims[w] = eb
381 } else {
382 if eb.moreRev != 0 {
383 // stay unsynced; more to read
384 continue
385 }
386 s.synced.add(w)
387 }
388 s.unsynced.delete(w)
389 }
390 s.addVictim(victims)
391
392 vsz := 0
393 for _, v := range s.victims {
394 vsz += len(v)
395 }
396 slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
397
398 return s.unsynced.size()
399}
400
401// kvsToEvents gets all events for the watchers from all key-value pairs
402func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
403 for i, v := range vals {
404 var kv mvccpb.KeyValue
405 if err := kv.Unmarshal(v); err != nil {
406 plog.Panicf("cannot unmarshal event: %v", err)
407 }
408
409 if !wg.contains(string(kv.Key)) {
410 continue
411 }
412
413 ty := mvccpb.PUT
414 if isTombstone(revs[i]) {
415 ty = mvccpb.DELETE
416 // patch in mod revision so watchers won't skip
417 kv.ModRevision = bytesToRev(revs[i]).main
418 }
419 evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
420 }
421 return evs
422}
423
424// notify notifies the fact that given event at the given rev just happened to
425// watchers that watch on the key of the event.
426func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
427 var victim watcherBatch
428 for w, eb := range newWatcherBatch(&s.synced, evs) {
429 if eb.revs != 1 {
430 plog.Panicf("unexpected multiple revisions in notification")
431 }
432 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
433 pendingEventsGauge.Add(float64(len(eb.evs)))
434 } else {
435 // move slow watcher to victims
436 w.minRev = rev + 1
437 if victim == nil {
438 victim = make(watcherBatch)
439 }
440 w.victim = true
441 victim[w] = eb
442 s.synced.delete(w)
443 slowWatcherGauge.Inc()
444 }
445 }
446 s.addVictim(victim)
447}
448
449func (s *watchableStore) addVictim(victim watcherBatch) {
450 if victim == nil {
451 return
452 }
453 s.victims = append(s.victims, victim)
454 select {
455 case s.victimc <- struct{}{}:
456 default:
457 }
458}
459
460func (s *watchableStore) rev() int64 { return s.store.Rev() }
461
462func (s *watchableStore) progress(w *watcher) {
463 s.mu.RLock()
464 defer s.mu.RUnlock()
465
466 if _, ok := s.synced.watchers[w]; ok {
467 w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
468 // If the ch is full, this watcher is receiving events.
469 // We do not need to send progress at all.
470 }
471}
472
473type watcher struct {
474 // the watcher key
475 key []byte
476 // end indicates the end of the range to watch.
477 // If end is set, the watcher is on a range.
478 end []byte
479
480 // victim is set when ch is blocked and undergoing victim processing
481 victim bool
482
483 // compacted is set when the watcher is removed because of compaction
484 compacted bool
485
486 // restore is true when the watcher is being restored from leader snapshot
487 // which means that this watcher has just been moved from "synced" to "unsynced"
488 // watcher group, possibly with a future revision when it was first added
489 // to the synced watcher
490 // "unsynced" watcher revision must always be <= current revision,
491 // except when the watcher were to be moved from "synced" watcher group
492 restore bool
493
494 // minRev is the minimum revision update the watcher will accept
495 minRev int64
496 id WatchID
497
498 fcs []FilterFunc
499 // a chan to send out the watch response.
500 // The chan might be shared with other watchers.
501 ch chan<- WatchResponse
502}
503
504func (w *watcher) send(wr WatchResponse) bool {
505 progressEvent := len(wr.Events) == 0
506
507 if len(w.fcs) != 0 {
508 ne := make([]mvccpb.Event, 0, len(wr.Events))
509 for i := range wr.Events {
510 filtered := false
511 for _, filter := range w.fcs {
512 if filter(wr.Events[i]) {
513 filtered = true
514 break
515 }
516 }
517 if !filtered {
518 ne = append(ne, wr.Events[i])
519 }
520 }
521 wr.Events = ne
522 }
523
524 // if all events are filtered out, we should send nothing.
525 if !progressEvent && len(wr.Events) == 0 {
526 return true
527 }
528 select {
529 case w.ch <- wr:
530 return true
531 default:
532 return false
533 }
534}