blob: 3cf491d1fdf2f909b84ccc1cbe702bafd862809b [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "sync"
19 "time"
20
21 "go.etcd.io/etcd/lease"
22 "go.etcd.io/etcd/mvcc/backend"
23 "go.etcd.io/etcd/mvcc/mvccpb"
24 "go.uber.org/zap"
25)
26
27// non-const so modifiable by tests
28var (
29 // chanBufLen is the length of the buffered chan
30 // for sending out watched events.
31 // TODO: find a good buf value. 1024 is just a random one that
32 // seems to be reasonable.
33 chanBufLen = 1024
34
35 // maxWatchersPerSync is the number of watchers to sync in a single batch
36 maxWatchersPerSync = 512
37)
38
39type watchable interface {
40 watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
41 progress(w *watcher)
42 rev() int64
43}
44
45type watchableStore struct {
46 *store
47
48 // mu protects watcher groups and batches. It should never be locked
49 // before locking store.mu to avoid deadlock.
50 mu sync.RWMutex
51
52 // victims are watcher batches that were blocked on the watch channel
53 victims []watcherBatch
54 victimc chan struct{}
55
56 // contains all unsynced watchers that needs to sync with events that have happened
57 unsynced watcherGroup
58
59 // contains all synced watchers that are in sync with the progress of the store.
60 // The key of the map is the key that the watcher watches on.
61 synced watcherGroup
62
63 stopc chan struct{}
64 wg sync.WaitGroup
65}
66
67// cancelFunc updates unsynced and synced maps when running
68// cancel operations.
69type cancelFunc func()
70
71func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
72 return newWatchableStore(lg, b, le, ig, cfg)
73}
74
75func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
76 s := &watchableStore{
77 store: NewStore(lg, b, le, ig, cfg),
78 victimc: make(chan struct{}, 1),
79 unsynced: newWatcherGroup(),
80 synced: newWatcherGroup(),
81 stopc: make(chan struct{}),
82 }
83 s.store.ReadView = &readView{s}
84 s.store.WriteView = &writeView{s}
85 if s.le != nil {
86 // use this store as the deleter so revokes trigger watch events
87 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
88 }
89 s.wg.Add(2)
90 go s.syncWatchersLoop()
91 go s.syncVictimsLoop()
92 return s
93}
94
95func (s *watchableStore) Close() error {
96 close(s.stopc)
97 s.wg.Wait()
98 return s.store.Close()
99}
100
101func (s *watchableStore) NewWatchStream() WatchStream {
102 watchStreamGauge.Inc()
103 return &watchStream{
104 watchable: s,
105 ch: make(chan WatchResponse, chanBufLen),
106 cancels: make(map[WatchID]cancelFunc),
107 watchers: make(map[WatchID]*watcher),
108 }
109}
110
111func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
112 wa := &watcher{
113 key: key,
114 end: end,
115 minRev: startRev,
116 id: id,
117 ch: ch,
118 fcs: fcs,
119 }
120
121 s.mu.Lock()
122 s.revMu.RLock()
123 synced := startRev > s.store.currentRev || startRev == 0
124 if synced {
125 wa.minRev = s.store.currentRev + 1
126 if startRev > wa.minRev {
127 wa.minRev = startRev
128 }
129 }
130 if synced {
131 s.synced.add(wa)
132 } else {
133 slowWatcherGauge.Inc()
134 s.unsynced.add(wa)
135 }
136 s.revMu.RUnlock()
137 s.mu.Unlock()
138
139 watcherGauge.Inc()
140
141 return wa, func() { s.cancelWatcher(wa) }
142}
143
144// cancelWatcher removes references of the watcher from the watchableStore
145func (s *watchableStore) cancelWatcher(wa *watcher) {
146 for {
147 s.mu.Lock()
148 if s.unsynced.delete(wa) {
149 slowWatcherGauge.Dec()
150 break
151 } else if s.synced.delete(wa) {
152 break
153 } else if wa.compacted {
154 break
155 } else if wa.ch == nil {
156 // already canceled (e.g., cancel/close race)
157 break
158 }
159
160 if !wa.victim {
161 panic("watcher not victim but not in watch groups")
162 }
163
164 var victimBatch watcherBatch
165 for _, wb := range s.victims {
166 if wb[wa] != nil {
167 victimBatch = wb
168 break
169 }
170 }
171 if victimBatch != nil {
172 slowWatcherGauge.Dec()
173 delete(victimBatch, wa)
174 break
175 }
176
177 // victim being processed so not accessible; retry
178 s.mu.Unlock()
179 time.Sleep(time.Millisecond)
180 }
181
182 watcherGauge.Dec()
183 wa.ch = nil
184 s.mu.Unlock()
185}
186
187func (s *watchableStore) Restore(b backend.Backend) error {
188 s.mu.Lock()
189 defer s.mu.Unlock()
190 err := s.store.Restore(b)
191 if err != nil {
192 return err
193 }
194
195 for wa := range s.synced.watchers {
196 wa.restore = true
197 s.unsynced.add(wa)
198 }
199 s.synced = newWatcherGroup()
200 return nil
201}
202
203// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
204func (s *watchableStore) syncWatchersLoop() {
205 defer s.wg.Done()
206
207 for {
208 s.mu.RLock()
209 st := time.Now()
210 lastUnsyncedWatchers := s.unsynced.size()
211 s.mu.RUnlock()
212
213 unsyncedWatchers := 0
214 if lastUnsyncedWatchers > 0 {
215 unsyncedWatchers = s.syncWatchers()
216 }
217 syncDuration := time.Since(st)
218
219 waitDuration := 100 * time.Millisecond
220 // more work pending?
221 if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
222 // be fair to other store operations by yielding time taken
223 waitDuration = syncDuration
224 }
225
226 select {
227 case <-time.After(waitDuration):
228 case <-s.stopc:
229 return
230 }
231 }
232}
233
234// syncVictimsLoop tries to write precomputed watcher responses to
235// watchers that had a blocked watcher channel
236func (s *watchableStore) syncVictimsLoop() {
237 defer s.wg.Done()
238
239 for {
240 for s.moveVictims() != 0 {
241 // try to update all victim watchers
242 }
243 s.mu.RLock()
244 isEmpty := len(s.victims) == 0
245 s.mu.RUnlock()
246
247 var tickc <-chan time.Time
248 if !isEmpty {
249 tickc = time.After(10 * time.Millisecond)
250 }
251
252 select {
253 case <-tickc:
254 case <-s.victimc:
255 case <-s.stopc:
256 return
257 }
258 }
259}
260
261// moveVictims tries to update watches with already pending event data
262func (s *watchableStore) moveVictims() (moved int) {
263 s.mu.Lock()
264 victims := s.victims
265 s.victims = nil
266 s.mu.Unlock()
267
268 var newVictim watcherBatch
269 for _, wb := range victims {
270 // try to send responses again
271 for w, eb := range wb {
272 // watcher has observed the store up to, but not including, w.minRev
273 rev := w.minRev - 1
274 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
275 pendingEventsGauge.Add(float64(len(eb.evs)))
276 } else {
277 if newVictim == nil {
278 newVictim = make(watcherBatch)
279 }
280 newVictim[w] = eb
281 continue
282 }
283 moved++
284 }
285
286 // assign completed victim watchers to unsync/sync
287 s.mu.Lock()
288 s.store.revMu.RLock()
289 curRev := s.store.currentRev
290 for w, eb := range wb {
291 if newVictim != nil && newVictim[w] != nil {
292 // couldn't send watch response; stays victim
293 continue
294 }
295 w.victim = false
296 if eb.moreRev != 0 {
297 w.minRev = eb.moreRev
298 }
299 if w.minRev <= curRev {
300 s.unsynced.add(w)
301 } else {
302 slowWatcherGauge.Dec()
303 s.synced.add(w)
304 }
305 }
306 s.store.revMu.RUnlock()
307 s.mu.Unlock()
308 }
309
310 if len(newVictim) > 0 {
311 s.mu.Lock()
312 s.victims = append(s.victims, newVictim)
313 s.mu.Unlock()
314 }
315
316 return moved
317}
318
319// syncWatchers syncs unsynced watchers by:
320// 1. choose a set of watchers from the unsynced watcher group
321// 2. iterate over the set to get the minimum revision and remove compacted watchers
322// 3. use minimum revision to get all key-value pairs and send those events to watchers
323// 4. remove synced watchers in set from unsynced group and move to synced group
324func (s *watchableStore) syncWatchers() int {
325 s.mu.Lock()
326 defer s.mu.Unlock()
327
328 if s.unsynced.size() == 0 {
329 return 0
330 }
331
332 s.store.revMu.RLock()
333 defer s.store.revMu.RUnlock()
334
335 // in order to find key-value pairs from unsynced watchers, we need to
336 // find min revision index, and these revisions can be used to
337 // query the backend store of key-value pairs
338 curRev := s.store.currentRev
339 compactionRev := s.store.compactMainRev
340
341 wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
342 minBytes, maxBytes := newRevBytes(), newRevBytes()
343 revToBytes(revision{main: minRev}, minBytes)
344 revToBytes(revision{main: curRev + 1}, maxBytes)
345
346 // UnsafeRange returns keys and values. And in boltdb, keys are revisions.
347 // values are actual key-value pairs in backend.
348 tx := s.store.b.ReadTx()
349 tx.RLock()
350 revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
351 var evs []mvccpb.Event
352 if s.store != nil && s.store.lg != nil {
353 evs = kvsToEvents(s.store.lg, wg, revs, vs)
354 } else {
355 // TODO: remove this in v3.5
356 evs = kvsToEvents(nil, wg, revs, vs)
357 }
358 tx.RUnlock()
359
360 var victims watcherBatch
361 wb := newWatcherBatch(wg, evs)
362 for w := range wg.watchers {
363 w.minRev = curRev + 1
364
365 eb, ok := wb[w]
366 if !ok {
367 // bring un-notified watcher to synced
368 s.synced.add(w)
369 s.unsynced.delete(w)
370 continue
371 }
372
373 if eb.moreRev != 0 {
374 w.minRev = eb.moreRev
375 }
376
377 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
378 pendingEventsGauge.Add(float64(len(eb.evs)))
379 } else {
380 if victims == nil {
381 victims = make(watcherBatch)
382 }
383 w.victim = true
384 }
385
386 if w.victim {
387 victims[w] = eb
388 } else {
389 if eb.moreRev != 0 {
390 // stay unsynced; more to read
391 continue
392 }
393 s.synced.add(w)
394 }
395 s.unsynced.delete(w)
396 }
397 s.addVictim(victims)
398
399 vsz := 0
400 for _, v := range s.victims {
401 vsz += len(v)
402 }
403 slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
404
405 return s.unsynced.size()
406}
407
408// kvsToEvents gets all events for the watchers from all key-value pairs
409func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
410 for i, v := range vals {
411 var kv mvccpb.KeyValue
412 if err := kv.Unmarshal(v); err != nil {
413 if lg != nil {
414 lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
415 } else {
416 plog.Panicf("cannot unmarshal event: %v", err)
417 }
418 }
419
420 if !wg.contains(string(kv.Key)) {
421 continue
422 }
423
424 ty := mvccpb.PUT
425 if isTombstone(revs[i]) {
426 ty = mvccpb.DELETE
427 // patch in mod revision so watchers won't skip
428 kv.ModRevision = bytesToRev(revs[i]).main
429 }
430 evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
431 }
432 return evs
433}
434
435// notify notifies the fact that given event at the given rev just happened to
436// watchers that watch on the key of the event.
437func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
438 var victim watcherBatch
439 for w, eb := range newWatcherBatch(&s.synced, evs) {
440 if eb.revs != 1 {
441 if s.store != nil && s.store.lg != nil {
442 s.store.lg.Panic(
443 "unexpected multiple revisions in watch notification",
444 zap.Int("number-of-revisions", eb.revs),
445 )
446 } else {
447 plog.Panicf("unexpected multiple revisions in notification")
448 }
449 }
450 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
451 pendingEventsGauge.Add(float64(len(eb.evs)))
452 } else {
453 // move slow watcher to victims
454 w.minRev = rev + 1
455 if victim == nil {
456 victim = make(watcherBatch)
457 }
458 w.victim = true
459 victim[w] = eb
460 s.synced.delete(w)
461 slowWatcherGauge.Inc()
462 }
463 }
464 s.addVictim(victim)
465}
466
467func (s *watchableStore) addVictim(victim watcherBatch) {
468 if victim == nil {
469 return
470 }
471 s.victims = append(s.victims, victim)
472 select {
473 case s.victimc <- struct{}{}:
474 default:
475 }
476}
477
478func (s *watchableStore) rev() int64 { return s.store.Rev() }
479
480func (s *watchableStore) progress(w *watcher) {
481 s.mu.RLock()
482 defer s.mu.RUnlock()
483
484 if _, ok := s.synced.watchers[w]; ok {
485 w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
486 // If the ch is full, this watcher is receiving events.
487 // We do not need to send progress at all.
488 }
489}
490
491type watcher struct {
492 // the watcher key
493 key []byte
494 // end indicates the end of the range to watch.
495 // If end is set, the watcher is on a range.
496 end []byte
497
498 // victim is set when ch is blocked and undergoing victim processing
499 victim bool
500
501 // compacted is set when the watcher is removed because of compaction
502 compacted bool
503
504 // restore is true when the watcher is being restored from leader snapshot
505 // which means that this watcher has just been moved from "synced" to "unsynced"
506 // watcher group, possibly with a future revision when it was first added
507 // to the synced watcher
508 // "unsynced" watcher revision must always be <= current revision,
509 // except when the watcher were to be moved from "synced" watcher group
510 restore bool
511
512 // minRev is the minimum revision update the watcher will accept
513 minRev int64
514 id WatchID
515
516 fcs []FilterFunc
517 // a chan to send out the watch response.
518 // The chan might be shared with other watchers.
519 ch chan<- WatchResponse
520}
521
522func (w *watcher) send(wr WatchResponse) bool {
523 progressEvent := len(wr.Events) == 0
524
525 if len(w.fcs) != 0 {
526 ne := make([]mvccpb.Event, 0, len(wr.Events))
527 for i := range wr.Events {
528 filtered := false
529 for _, filter := range w.fcs {
530 if filter(wr.Events[i]) {
531 filtered = true
532 break
533 }
534 }
535 if !filtered {
536 ne = append(ne, wr.Events[i])
537 }
538 }
539 wr.Events = ne
540 }
541
542 // if all events are filtered out, we should send nothing.
543 if !progressEvent && len(wr.Events) == 0 {
544 return true
545 }
546 select {
547 case w.ch <- wr:
548 return true
549 default:
550 return false
551 }
552}