khenaidoo | ffe076b | 2019-01-15 16:08:08 -0500 | [diff] [blame^] | 1 | // Copyright 2016 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package grpcproxy |
| 16 | |
| 17 | import ( |
| 18 | "time" |
| 19 | |
| 20 | "github.com/coreos/etcd/clientv3" |
| 21 | pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| 22 | "github.com/coreos/etcd/mvcc" |
| 23 | "github.com/coreos/etcd/mvcc/mvccpb" |
| 24 | ) |
| 25 | |
| 26 | type watchRange struct { |
| 27 | key, end string |
| 28 | } |
| 29 | |
| 30 | func (wr *watchRange) valid() bool { |
| 31 | return len(wr.end) == 0 || wr.end > wr.key || (wr.end[0] == 0 && len(wr.end) == 1) |
| 32 | } |
| 33 | |
| 34 | type watcher struct { |
| 35 | // user configuration |
| 36 | |
| 37 | wr watchRange |
| 38 | filters []mvcc.FilterFunc |
| 39 | progress bool |
| 40 | prevKV bool |
| 41 | |
| 42 | // id is the id returned to the client on its watch stream. |
| 43 | id int64 |
| 44 | // nextrev is the minimum expected next event revision. |
| 45 | nextrev int64 |
| 46 | // lastHeader has the last header sent over the stream. |
| 47 | lastHeader pb.ResponseHeader |
| 48 | |
| 49 | // wps is the parent. |
| 50 | wps *watchProxyStream |
| 51 | } |
| 52 | |
| 53 | // send filters out repeated events by discarding revisions older |
| 54 | // than the last one sent over the watch channel. |
| 55 | func (w *watcher) send(wr clientv3.WatchResponse) { |
| 56 | if wr.IsProgressNotify() && !w.progress { |
| 57 | return |
| 58 | } |
| 59 | if w.nextrev > wr.Header.Revision && len(wr.Events) > 0 { |
| 60 | return |
| 61 | } |
| 62 | if w.nextrev == 0 { |
| 63 | // current watch; expect updates following this revision |
| 64 | w.nextrev = wr.Header.Revision + 1 |
| 65 | } |
| 66 | |
| 67 | events := make([]*mvccpb.Event, 0, len(wr.Events)) |
| 68 | |
| 69 | var lastRev int64 |
| 70 | for i := range wr.Events { |
| 71 | ev := (*mvccpb.Event)(wr.Events[i]) |
| 72 | if ev.Kv.ModRevision < w.nextrev { |
| 73 | continue |
| 74 | } else { |
| 75 | // We cannot update w.rev here. |
| 76 | // txn can have multiple events with the same rev. |
| 77 | // If w.nextrev updates here, it would skip events in the same txn. |
| 78 | lastRev = ev.Kv.ModRevision |
| 79 | } |
| 80 | |
| 81 | filtered := false |
| 82 | for _, filter := range w.filters { |
| 83 | if filter(*ev) { |
| 84 | filtered = true |
| 85 | break |
| 86 | } |
| 87 | } |
| 88 | if filtered { |
| 89 | continue |
| 90 | } |
| 91 | |
| 92 | if !w.prevKV { |
| 93 | evCopy := *ev |
| 94 | evCopy.PrevKv = nil |
| 95 | ev = &evCopy |
| 96 | } |
| 97 | events = append(events, ev) |
| 98 | } |
| 99 | |
| 100 | if lastRev >= w.nextrev { |
| 101 | w.nextrev = lastRev + 1 |
| 102 | } |
| 103 | |
| 104 | // all events are filtered out? |
| 105 | if !wr.IsProgressNotify() && !wr.Created && len(events) == 0 && wr.CompactRevision == 0 { |
| 106 | return |
| 107 | } |
| 108 | |
| 109 | w.lastHeader = wr.Header |
| 110 | w.post(&pb.WatchResponse{ |
| 111 | Header: &wr.Header, |
| 112 | Created: wr.Created, |
| 113 | CompactRevision: wr.CompactRevision, |
| 114 | Canceled: wr.Canceled, |
| 115 | WatchId: w.id, |
| 116 | Events: events, |
| 117 | }) |
| 118 | } |
| 119 | |
| 120 | // post puts a watch response on the watcher's proxy stream channel |
| 121 | func (w *watcher) post(wr *pb.WatchResponse) bool { |
| 122 | select { |
| 123 | case w.wps.watchCh <- wr: |
| 124 | case <-time.After(50 * time.Millisecond): |
| 125 | w.wps.cancel() |
| 126 | return false |
| 127 | } |
| 128 | return true |
| 129 | } |