blob: a2c6528d7dc5a4a3a2a87161df519f3fddb95f79 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "sync"
19 "time"
20
21 "github.com/coreos/etcd/auth"
22 "github.com/coreos/etcd/lease"
23 "github.com/coreos/etcd/mvcc/backend"
24 "github.com/coreos/etcd/mvcc/mvccpb"
25)
26
27// non-const so modifiable by tests
28var (
29 // chanBufLen is the length of the buffered chan
30 // for sending out watched events.
31 // See https://github.com/etcd-io/etcd/issues/11906 for more detail.
32 chanBufLen = 128
33
34 // maxWatchersPerSync is the number of watchers to sync in a single batch
35 maxWatchersPerSync = 512
36)
37
38type watchable interface {
39 watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
40 progress(w *watcher)
41 rev() int64
42}
43
44type watchableStore struct {
45 *store
46
47 // mu protects watcher groups and batches. It should never be locked
48 // before locking store.mu to avoid deadlock.
49 mu sync.RWMutex
50
51 // victims are watcher batches that were blocked on the watch channel
52 victims []watcherBatch
53 victimc chan struct{}
54
55 // contains all unsynced watchers that needs to sync with events that have happened
56 unsynced watcherGroup
57
58 // contains all synced watchers that are in sync with the progress of the store.
59 // The key of the map is the key that the watcher watches on.
60 synced watcherGroup
61
62 stopc chan struct{}
63 wg sync.WaitGroup
64}
65
66// cancelFunc updates unsynced and synced maps when running
67// cancel operations.
68type cancelFunc func()
69
70func New(b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter) ConsistentWatchableKV {
71 return newWatchableStore(b, le, as, ig)
72}
73
74func newWatchableStore(b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter) *watchableStore {
75 s := &watchableStore{
76 store: NewStore(b, le, ig),
77 victimc: make(chan struct{}, 1),
78 unsynced: newWatcherGroup(),
79 synced: newWatcherGroup(),
80 stopc: make(chan struct{}),
81 }
82 s.store.ReadView = &readView{s}
83 s.store.WriteView = &writeView{s}
84 if s.le != nil {
85 // use this store as the deleter so revokes trigger watch events
86 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
87 }
88 if as != nil {
89 // TODO: encapsulating consistentindex into a separate package
90 as.SetConsistentIndexSyncer(s.store.saveIndex)
91 }
92 s.wg.Add(2)
93 go s.syncWatchersLoop()
94 go s.syncVictimsLoop()
95 return s
96}
97
98func (s *watchableStore) Close() error {
99 close(s.stopc)
100 s.wg.Wait()
101 return s.store.Close()
102}
103
104func (s *watchableStore) NewWatchStream() WatchStream {
105 watchStreamGauge.Inc()
106 return &watchStream{
107 watchable: s,
108 ch: make(chan WatchResponse, chanBufLen),
109 cancels: make(map[WatchID]cancelFunc),
110 watchers: make(map[WatchID]*watcher),
111 }
112}
113
114func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
115 wa := &watcher{
116 key: key,
117 end: end,
118 minRev: startRev,
119 id: id,
120 ch: ch,
121 fcs: fcs,
122 }
123
124 s.mu.Lock()
125 s.revMu.RLock()
126 synced := startRev > s.store.currentRev || startRev == 0
127 if synced {
128 wa.minRev = s.store.currentRev + 1
129 if startRev > wa.minRev {
130 wa.minRev = startRev
131 }
132 }
133 if synced {
134 s.synced.add(wa)
135 } else {
136 slowWatcherGauge.Inc()
137 s.unsynced.add(wa)
138 }
139 s.revMu.RUnlock()
140 s.mu.Unlock()
141
142 watcherGauge.Inc()
143
144 return wa, func() { s.cancelWatcher(wa) }
145}
146
147// cancelWatcher removes references of the watcher from the watchableStore
148func (s *watchableStore) cancelWatcher(wa *watcher) {
149 for {
150 s.mu.Lock()
151 if s.unsynced.delete(wa) {
152 slowWatcherGauge.Dec()
153 break
154 } else if s.synced.delete(wa) {
155 break
156 } else if wa.compacted {
157 break
158 } else if wa.ch == nil {
159 // already canceled (e.g., cancel/close race)
160 break
161 }
162
163 if !wa.victim {
164 panic("watcher not victim but not in watch groups")
165 }
166
167 var victimBatch watcherBatch
168 for _, wb := range s.victims {
169 if wb[wa] != nil {
170 victimBatch = wb
171 break
172 }
173 }
174 if victimBatch != nil {
175 slowWatcherGauge.Dec()
176 delete(victimBatch, wa)
177 break
178 }
179
180 // victim being processed so not accessible; retry
181 s.mu.Unlock()
182 time.Sleep(time.Millisecond)
183 }
184
185 watcherGauge.Dec()
186 wa.ch = nil
187 s.mu.Unlock()
188}
189
190func (s *watchableStore) Restore(b backend.Backend) error {
191 s.mu.Lock()
192 defer s.mu.Unlock()
193 err := s.store.Restore(b)
194 if err != nil {
195 return err
196 }
197
198 for wa := range s.synced.watchers {
199 wa.restore = true
200 s.unsynced.add(wa)
201 }
202 s.synced = newWatcherGroup()
203 return nil
204}
205
206// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
207func (s *watchableStore) syncWatchersLoop() {
208 defer s.wg.Done()
209
210 for {
211 s.mu.RLock()
212 st := time.Now()
213 lastUnsyncedWatchers := s.unsynced.size()
214 s.mu.RUnlock()
215
216 unsyncedWatchers := 0
217 if lastUnsyncedWatchers > 0 {
218 unsyncedWatchers = s.syncWatchers()
219 }
220 syncDuration := time.Since(st)
221
222 waitDuration := 100 * time.Millisecond
223 // more work pending?
224 if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
225 // be fair to other store operations by yielding time taken
226 waitDuration = syncDuration
227 }
228
229 select {
230 case <-time.After(waitDuration):
231 case <-s.stopc:
232 return
233 }
234 }
235}
236
237// syncVictimsLoop tries to write precomputed watcher responses to
238// watchers that had a blocked watcher channel
239func (s *watchableStore) syncVictimsLoop() {
240 defer s.wg.Done()
241
242 for {
243 for s.moveVictims() != 0 {
244 // try to update all victim watchers
245 }
246 s.mu.RLock()
247 isEmpty := len(s.victims) == 0
248 s.mu.RUnlock()
249
250 var tickc <-chan time.Time
251 if !isEmpty {
252 tickc = time.After(10 * time.Millisecond)
253 }
254
255 select {
256 case <-tickc:
257 case <-s.victimc:
258 case <-s.stopc:
259 return
260 }
261 }
262}
263
264// moveVictims tries to update watches with already pending event data
265func (s *watchableStore) moveVictims() (moved int) {
266 s.mu.Lock()
267 victims := s.victims
268 s.victims = nil
269 s.mu.Unlock()
270
271 var newVictim watcherBatch
272 for _, wb := range victims {
273 // try to send responses again
274 for w, eb := range wb {
275 // watcher has observed the store up to, but not including, w.minRev
276 rev := w.minRev - 1
277 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
278 pendingEventsGauge.Add(float64(len(eb.evs)))
279 } else {
280 if newVictim == nil {
281 newVictim = make(watcherBatch)
282 }
283 newVictim[w] = eb
284 continue
285 }
286 moved++
287 }
288
289 // assign completed victim watchers to unsync/sync
290 s.mu.Lock()
291 s.store.revMu.RLock()
292 curRev := s.store.currentRev
293 for w, eb := range wb {
294 if newVictim != nil && newVictim[w] != nil {
295 // couldn't send watch response; stays victim
296 continue
297 }
298 w.victim = false
299 if eb.moreRev != 0 {
300 w.minRev = eb.moreRev
301 }
302 if w.minRev <= curRev {
303 s.unsynced.add(w)
304 } else {
305 slowWatcherGauge.Dec()
306 s.synced.add(w)
307 }
308 }
309 s.store.revMu.RUnlock()
310 s.mu.Unlock()
311 }
312
313 if len(newVictim) > 0 {
314 s.mu.Lock()
315 s.victims = append(s.victims, newVictim)
316 s.mu.Unlock()
317 }
318
319 return moved
320}
321
322// syncWatchers syncs unsynced watchers by:
323// 1. choose a set of watchers from the unsynced watcher group
324// 2. iterate over the set to get the minimum revision and remove compacted watchers
325// 3. use minimum revision to get all key-value pairs and send those events to watchers
326// 4. remove synced watchers in set from unsynced group and move to synced group
327func (s *watchableStore) syncWatchers() int {
328 s.mu.Lock()
329 defer s.mu.Unlock()
330
331 if s.unsynced.size() == 0 {
332 return 0
333 }
334
335 s.store.revMu.RLock()
336 defer s.store.revMu.RUnlock()
337
338 // in order to find key-value pairs from unsynced watchers, we need to
339 // find min revision index, and these revisions can be used to
340 // query the backend store of key-value pairs
341 curRev := s.store.currentRev
342 compactionRev := s.store.compactMainRev
343
344 wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
345 minBytes, maxBytes := newRevBytes(), newRevBytes()
346 revToBytes(revision{main: minRev}, minBytes)
347 revToBytes(revision{main: curRev + 1}, maxBytes)
348
349 // UnsafeRange returns keys and values. And in boltdb, keys are revisions.
350 // values are actual key-value pairs in backend.
351 tx := s.store.b.ReadTx()
352 tx.Lock()
353 revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
354 evs := kvsToEvents(wg, revs, vs)
355 tx.Unlock()
356
357 var victims watcherBatch
358 wb := newWatcherBatch(wg, evs)
359 for w := range wg.watchers {
360 w.minRev = curRev + 1
361
362 eb, ok := wb[w]
363 if !ok {
364 // bring un-notified watcher to synced
365 s.synced.add(w)
366 s.unsynced.delete(w)
367 continue
368 }
369
370 if eb.moreRev != 0 {
371 w.minRev = eb.moreRev
372 }
373
374 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
375 pendingEventsGauge.Add(float64(len(eb.evs)))
376 } else {
377 if victims == nil {
378 victims = make(watcherBatch)
379 }
380 w.victim = true
381 }
382
383 if w.victim {
384 victims[w] = eb
385 } else {
386 if eb.moreRev != 0 {
387 // stay unsynced; more to read
388 continue
389 }
390 s.synced.add(w)
391 }
392 s.unsynced.delete(w)
393 }
394 s.addVictim(victims)
395
396 vsz := 0
397 for _, v := range s.victims {
398 vsz += len(v)
399 }
400 slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
401
402 return s.unsynced.size()
403}
404
405// kvsToEvents gets all events for the watchers from all key-value pairs
406func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
407 for i, v := range vals {
408 var kv mvccpb.KeyValue
409 if err := kv.Unmarshal(v); err != nil {
410 plog.Panicf("cannot unmarshal event: %v", err)
411 }
412
413 if !wg.contains(string(kv.Key)) {
414 continue
415 }
416
417 ty := mvccpb.PUT
418 if isTombstone(revs[i]) {
419 ty = mvccpb.DELETE
420 // patch in mod revision so watchers won't skip
421 kv.ModRevision = bytesToRev(revs[i]).main
422 }
423 evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
424 }
425 return evs
426}
427
428// notify notifies the fact that given event at the given rev just happened to
429// watchers that watch on the key of the event.
430func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
431 var victim watcherBatch
432 for w, eb := range newWatcherBatch(&s.synced, evs) {
433 if eb.revs != 1 {
434 plog.Panicf("unexpected multiple revisions in notification")
435 }
436 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
437 pendingEventsGauge.Add(float64(len(eb.evs)))
438 } else {
439 // move slow watcher to victims
440 w.minRev = rev + 1
441 if victim == nil {
442 victim = make(watcherBatch)
443 }
444 w.victim = true
445 victim[w] = eb
446 s.synced.delete(w)
447 slowWatcherGauge.Inc()
448 }
449 }
450 s.addVictim(victim)
451}
452
453func (s *watchableStore) addVictim(victim watcherBatch) {
454 if victim == nil {
455 return
456 }
457 s.victims = append(s.victims, victim)
458 select {
459 case s.victimc <- struct{}{}:
460 default:
461 }
462}
463
464func (s *watchableStore) rev() int64 { return s.store.Rev() }
465
466func (s *watchableStore) progress(w *watcher) {
467 s.mu.RLock()
468 defer s.mu.RUnlock()
469
470 if _, ok := s.synced.watchers[w]; ok {
471 w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
472 // If the ch is full, this watcher is receiving events.
473 // We do not need to send progress at all.
474 }
475}
476
477type watcher struct {
478 // the watcher key
479 key []byte
480 // end indicates the end of the range to watch.
481 // If end is set, the watcher is on a range.
482 end []byte
483
484 // victim is set when ch is blocked and undergoing victim processing
485 victim bool
486
487 // compacted is set when the watcher is removed because of compaction
488 compacted bool
489
490 // restore is true when the watcher is being restored from leader snapshot
491 // which means that this watcher has just been moved from "synced" to "unsynced"
492 // watcher group, possibly with a future revision when it was first added
493 // to the synced watcher
494 // "unsynced" watcher revision must always be <= current revision,
495 // except when the watcher were to be moved from "synced" watcher group
496 restore bool
497
498 // minRev is the minimum revision update the watcher will accept
499 minRev int64
500 id WatchID
501
502 fcs []FilterFunc
503 // a chan to send out the watch response.
504 // The chan might be shared with other watchers.
505 ch chan<- WatchResponse
506}
507
508func (w *watcher) send(wr WatchResponse) bool {
509 progressEvent := len(wr.Events) == 0
510
511 if len(w.fcs) != 0 {
512 ne := make([]mvccpb.Event, 0, len(wr.Events))
513 for i := range wr.Events {
514 filtered := false
515 for _, filter := range w.fcs {
516 if filter(wr.Events[i]) {
517 filtered = true
518 break
519 }
520 }
521 if !filtered {
522 ne = append(ne, wr.Events[i])
523 }
524 }
525 wr.Events = ne
526 }
527
528 // if all events are filtered out, we should send nothing.
529 if !progressEvent && len(wr.Events) == 0 {
530 return true
531 }
532 select {
533 case w.ch <- wr:
534 return true
535 default:
536 return false
537 }
538}