| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package mvcc |
| |
| import ( |
| "sync" |
| "time" |
| |
| "go.etcd.io/etcd/lease" |
| "go.etcd.io/etcd/mvcc/backend" |
| "go.etcd.io/etcd/mvcc/mvccpb" |
| "go.uber.org/zap" |
| ) |
| |
| // non-const so modifiable by tests |
| var ( |
| // chanBufLen is the length of the buffered chan |
| // for sending out watched events. |
| // TODO: find a good buf value. 1024 is just a random one that |
| // seems to be reasonable. |
| chanBufLen = 1024 |
| |
| // maxWatchersPerSync is the number of watchers to sync in a single batch |
| maxWatchersPerSync = 512 |
| ) |
| |
| type watchable interface { |
| watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) |
| progress(w *watcher) |
| rev() int64 |
| } |
| |
| type watchableStore struct { |
| *store |
| |
| // mu protects watcher groups and batches. It should never be locked |
| // before locking store.mu to avoid deadlock. |
| mu sync.RWMutex |
| |
| // victims are watcher batches that were blocked on the watch channel |
| victims []watcherBatch |
| victimc chan struct{} |
| |
| // contains all unsynced watchers that needs to sync with events that have happened |
| unsynced watcherGroup |
| |
| // contains all synced watchers that are in sync with the progress of the store. |
| // The key of the map is the key that the watcher watches on. |
| synced watcherGroup |
| |
| stopc chan struct{} |
| wg sync.WaitGroup |
| } |
| |
| // cancelFunc updates unsynced and synced maps when running |
| // cancel operations. |
| type cancelFunc func() |
| |
| func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV { |
| return newWatchableStore(lg, b, le, ig, cfg) |
| } |
| |
| func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore { |
| s := &watchableStore{ |
| store: NewStore(lg, b, le, ig, cfg), |
| victimc: make(chan struct{}, 1), |
| unsynced: newWatcherGroup(), |
| synced: newWatcherGroup(), |
| stopc: make(chan struct{}), |
| } |
| s.store.ReadView = &readView{s} |
| s.store.WriteView = &writeView{s} |
| if s.le != nil { |
| // use this store as the deleter so revokes trigger watch events |
| s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) |
| } |
| s.wg.Add(2) |
| go s.syncWatchersLoop() |
| go s.syncVictimsLoop() |
| return s |
| } |
| |
| func (s *watchableStore) Close() error { |
| close(s.stopc) |
| s.wg.Wait() |
| return s.store.Close() |
| } |
| |
| func (s *watchableStore) NewWatchStream() WatchStream { |
| watchStreamGauge.Inc() |
| return &watchStream{ |
| watchable: s, |
| ch: make(chan WatchResponse, chanBufLen), |
| cancels: make(map[WatchID]cancelFunc), |
| watchers: make(map[WatchID]*watcher), |
| } |
| } |
| |
| func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) { |
| wa := &watcher{ |
| key: key, |
| end: end, |
| minRev: startRev, |
| id: id, |
| ch: ch, |
| fcs: fcs, |
| } |
| |
| s.mu.Lock() |
| s.revMu.RLock() |
| synced := startRev > s.store.currentRev || startRev == 0 |
| if synced { |
| wa.minRev = s.store.currentRev + 1 |
| if startRev > wa.minRev { |
| wa.minRev = startRev |
| } |
| } |
| if synced { |
| s.synced.add(wa) |
| } else { |
| slowWatcherGauge.Inc() |
| s.unsynced.add(wa) |
| } |
| s.revMu.RUnlock() |
| s.mu.Unlock() |
| |
| watcherGauge.Inc() |
| |
| return wa, func() { s.cancelWatcher(wa) } |
| } |
| |
| // cancelWatcher removes references of the watcher from the watchableStore |
| func (s *watchableStore) cancelWatcher(wa *watcher) { |
| for { |
| s.mu.Lock() |
| if s.unsynced.delete(wa) { |
| slowWatcherGauge.Dec() |
| break |
| } else if s.synced.delete(wa) { |
| break |
| } else if wa.compacted { |
| break |
| } else if wa.ch == nil { |
| // already canceled (e.g., cancel/close race) |
| break |
| } |
| |
| if !wa.victim { |
| panic("watcher not victim but not in watch groups") |
| } |
| |
| var victimBatch watcherBatch |
| for _, wb := range s.victims { |
| if wb[wa] != nil { |
| victimBatch = wb |
| break |
| } |
| } |
| if victimBatch != nil { |
| slowWatcherGauge.Dec() |
| delete(victimBatch, wa) |
| break |
| } |
| |
| // victim being processed so not accessible; retry |
| s.mu.Unlock() |
| time.Sleep(time.Millisecond) |
| } |
| |
| watcherGauge.Dec() |
| wa.ch = nil |
| s.mu.Unlock() |
| } |
| |
| func (s *watchableStore) Restore(b backend.Backend) error { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| err := s.store.Restore(b) |
| if err != nil { |
| return err |
| } |
| |
| for wa := range s.synced.watchers { |
| wa.restore = true |
| s.unsynced.add(wa) |
| } |
| s.synced = newWatcherGroup() |
| return nil |
| } |
| |
| // syncWatchersLoop syncs the watcher in the unsynced map every 100ms. |
| func (s *watchableStore) syncWatchersLoop() { |
| defer s.wg.Done() |
| |
| for { |
| s.mu.RLock() |
| st := time.Now() |
| lastUnsyncedWatchers := s.unsynced.size() |
| s.mu.RUnlock() |
| |
| unsyncedWatchers := 0 |
| if lastUnsyncedWatchers > 0 { |
| unsyncedWatchers = s.syncWatchers() |
| } |
| syncDuration := time.Since(st) |
| |
| waitDuration := 100 * time.Millisecond |
| // more work pending? |
| if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers { |
| // be fair to other store operations by yielding time taken |
| waitDuration = syncDuration |
| } |
| |
| select { |
| case <-time.After(waitDuration): |
| case <-s.stopc: |
| return |
| } |
| } |
| } |
| |
| // syncVictimsLoop tries to write precomputed watcher responses to |
| // watchers that had a blocked watcher channel |
| func (s *watchableStore) syncVictimsLoop() { |
| defer s.wg.Done() |
| |
| for { |
| for s.moveVictims() != 0 { |
| // try to update all victim watchers |
| } |
| s.mu.RLock() |
| isEmpty := len(s.victims) == 0 |
| s.mu.RUnlock() |
| |
| var tickc <-chan time.Time |
| if !isEmpty { |
| tickc = time.After(10 * time.Millisecond) |
| } |
| |
| select { |
| case <-tickc: |
| case <-s.victimc: |
| case <-s.stopc: |
| return |
| } |
| } |
| } |
| |
| // moveVictims tries to update watches with already pending event data |
| func (s *watchableStore) moveVictims() (moved int) { |
| s.mu.Lock() |
| victims := s.victims |
| s.victims = nil |
| s.mu.Unlock() |
| |
| var newVictim watcherBatch |
| for _, wb := range victims { |
| // try to send responses again |
| for w, eb := range wb { |
| // watcher has observed the store up to, but not including, w.minRev |
| rev := w.minRev - 1 |
| if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { |
| pendingEventsGauge.Add(float64(len(eb.evs))) |
| } else { |
| if newVictim == nil { |
| newVictim = make(watcherBatch) |
| } |
| newVictim[w] = eb |
| continue |
| } |
| moved++ |
| } |
| |
| // assign completed victim watchers to unsync/sync |
| s.mu.Lock() |
| s.store.revMu.RLock() |
| curRev := s.store.currentRev |
| for w, eb := range wb { |
| if newVictim != nil && newVictim[w] != nil { |
| // couldn't send watch response; stays victim |
| continue |
| } |
| w.victim = false |
| if eb.moreRev != 0 { |
| w.minRev = eb.moreRev |
| } |
| if w.minRev <= curRev { |
| s.unsynced.add(w) |
| } else { |
| slowWatcherGauge.Dec() |
| s.synced.add(w) |
| } |
| } |
| s.store.revMu.RUnlock() |
| s.mu.Unlock() |
| } |
| |
| if len(newVictim) > 0 { |
| s.mu.Lock() |
| s.victims = append(s.victims, newVictim) |
| s.mu.Unlock() |
| } |
| |
| return moved |
| } |
| |
| // syncWatchers syncs unsynced watchers by: |
| // 1. choose a set of watchers from the unsynced watcher group |
| // 2. iterate over the set to get the minimum revision and remove compacted watchers |
| // 3. use minimum revision to get all key-value pairs and send those events to watchers |
| // 4. remove synced watchers in set from unsynced group and move to synced group |
| func (s *watchableStore) syncWatchers() int { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| if s.unsynced.size() == 0 { |
| return 0 |
| } |
| |
| s.store.revMu.RLock() |
| defer s.store.revMu.RUnlock() |
| |
| // in order to find key-value pairs from unsynced watchers, we need to |
| // find min revision index, and these revisions can be used to |
| // query the backend store of key-value pairs |
| curRev := s.store.currentRev |
| compactionRev := s.store.compactMainRev |
| |
| wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) |
| minBytes, maxBytes := newRevBytes(), newRevBytes() |
| revToBytes(revision{main: minRev}, minBytes) |
| revToBytes(revision{main: curRev + 1}, maxBytes) |
| |
| // UnsafeRange returns keys and values. And in boltdb, keys are revisions. |
| // values are actual key-value pairs in backend. |
| tx := s.store.b.ReadTx() |
| tx.RLock() |
| revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) |
| var evs []mvccpb.Event |
| if s.store != nil && s.store.lg != nil { |
| evs = kvsToEvents(s.store.lg, wg, revs, vs) |
| } else { |
| // TODO: remove this in v3.5 |
| evs = kvsToEvents(nil, wg, revs, vs) |
| } |
| tx.RUnlock() |
| |
| var victims watcherBatch |
| wb := newWatcherBatch(wg, evs) |
| for w := range wg.watchers { |
| w.minRev = curRev + 1 |
| |
| eb, ok := wb[w] |
| if !ok { |
| // bring un-notified watcher to synced |
| s.synced.add(w) |
| s.unsynced.delete(w) |
| continue |
| } |
| |
| if eb.moreRev != 0 { |
| w.minRev = eb.moreRev |
| } |
| |
| if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) { |
| pendingEventsGauge.Add(float64(len(eb.evs))) |
| } else { |
| if victims == nil { |
| victims = make(watcherBatch) |
| } |
| w.victim = true |
| } |
| |
| if w.victim { |
| victims[w] = eb |
| } else { |
| if eb.moreRev != 0 { |
| // stay unsynced; more to read |
| continue |
| } |
| s.synced.add(w) |
| } |
| s.unsynced.delete(w) |
| } |
| s.addVictim(victims) |
| |
| vsz := 0 |
| for _, v := range s.victims { |
| vsz += len(v) |
| } |
| slowWatcherGauge.Set(float64(s.unsynced.size() + vsz)) |
| |
| return s.unsynced.size() |
| } |
| |
| // kvsToEvents gets all events for the watchers from all key-value pairs |
| func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { |
| for i, v := range vals { |
| var kv mvccpb.KeyValue |
| if err := kv.Unmarshal(v); err != nil { |
| if lg != nil { |
| lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) |
| } else { |
| plog.Panicf("cannot unmarshal event: %v", err) |
| } |
| } |
| |
| if !wg.contains(string(kv.Key)) { |
| continue |
| } |
| |
| ty := mvccpb.PUT |
| if isTombstone(revs[i]) { |
| ty = mvccpb.DELETE |
| // patch in mod revision so watchers won't skip |
| kv.ModRevision = bytesToRev(revs[i]).main |
| } |
| evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty}) |
| } |
| return evs |
| } |
| |
| // notify notifies the fact that given event at the given rev just happened to |
| // watchers that watch on the key of the event. |
| func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { |
| var victim watcherBatch |
| for w, eb := range newWatcherBatch(&s.synced, evs) { |
| if eb.revs != 1 { |
| if s.store != nil && s.store.lg != nil { |
| s.store.lg.Panic( |
| "unexpected multiple revisions in watch notification", |
| zap.Int("number-of-revisions", eb.revs), |
| ) |
| } else { |
| plog.Panicf("unexpected multiple revisions in notification") |
| } |
| } |
| if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) { |
| pendingEventsGauge.Add(float64(len(eb.evs))) |
| } else { |
| // move slow watcher to victims |
| w.minRev = rev + 1 |
| if victim == nil { |
| victim = make(watcherBatch) |
| } |
| w.victim = true |
| victim[w] = eb |
| s.synced.delete(w) |
| slowWatcherGauge.Inc() |
| } |
| } |
| s.addVictim(victim) |
| } |
| |
| func (s *watchableStore) addVictim(victim watcherBatch) { |
| if victim == nil { |
| return |
| } |
| s.victims = append(s.victims, victim) |
| select { |
| case s.victimc <- struct{}{}: |
| default: |
| } |
| } |
| |
| func (s *watchableStore) rev() int64 { return s.store.Rev() } |
| |
| func (s *watchableStore) progress(w *watcher) { |
| s.mu.RLock() |
| defer s.mu.RUnlock() |
| |
| if _, ok := s.synced.watchers[w]; ok { |
| w.send(WatchResponse{WatchID: w.id, Revision: s.rev()}) |
| // If the ch is full, this watcher is receiving events. |
| // We do not need to send progress at all. |
| } |
| } |
| |
| type watcher struct { |
| // the watcher key |
| key []byte |
| // end indicates the end of the range to watch. |
| // If end is set, the watcher is on a range. |
| end []byte |
| |
| // victim is set when ch is blocked and undergoing victim processing |
| victim bool |
| |
| // compacted is set when the watcher is removed because of compaction |
| compacted bool |
| |
| // restore is true when the watcher is being restored from leader snapshot |
| // which means that this watcher has just been moved from "synced" to "unsynced" |
| // watcher group, possibly with a future revision when it was first added |
| // to the synced watcher |
| // "unsynced" watcher revision must always be <= current revision, |
| // except when the watcher were to be moved from "synced" watcher group |
| restore bool |
| |
| // minRev is the minimum revision update the watcher will accept |
| minRev int64 |
| id WatchID |
| |
| fcs []FilterFunc |
| // a chan to send out the watch response. |
| // The chan might be shared with other watchers. |
| ch chan<- WatchResponse |
| } |
| |
| func (w *watcher) send(wr WatchResponse) bool { |
| progressEvent := len(wr.Events) == 0 |
| |
| if len(w.fcs) != 0 { |
| ne := make([]mvccpb.Event, 0, len(wr.Events)) |
| for i := range wr.Events { |
| filtered := false |
| for _, filter := range w.fcs { |
| if filter(wr.Events[i]) { |
| filtered = true |
| break |
| } |
| } |
| if !filtered { |
| ne = append(ne, wr.Events[i]) |
| } |
| } |
| wr.Events = ne |
| } |
| |
| // if all events are filtered out, we should send nothing. |
| if !progressEvent && len(wr.Events) == 0 { |
| return true |
| } |
| select { |
| case w.ch <- wr: |
| return true |
| default: |
| return false |
| } |
| } |