blob: 1a497462f99b4d1c450e466a90dab0bc862810e5 [file] [log] [blame]
sslobodrd046be82019-01-16 10:02:22 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package grpcproxy
16
17import (
18 "time"
19
20 "github.com/coreos/etcd/clientv3"
21 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
22 "github.com/coreos/etcd/mvcc"
23 "github.com/coreos/etcd/mvcc/mvccpb"
24)
25
26type watchRange struct {
27 key, end string
28}
29
30func (wr *watchRange) valid() bool {
31 return len(wr.end) == 0 || wr.end > wr.key || (wr.end[0] == 0 && len(wr.end) == 1)
32}
33
34type watcher struct {
35 // user configuration
36
37 wr watchRange
38 filters []mvcc.FilterFunc
39 progress bool
40 prevKV bool
41
42 // id is the id returned to the client on its watch stream.
43 id int64
44 // nextrev is the minimum expected next event revision.
45 nextrev int64
46 // lastHeader has the last header sent over the stream.
47 lastHeader pb.ResponseHeader
48
49 // wps is the parent.
50 wps *watchProxyStream
51}
52
53// send filters out repeated events by discarding revisions older
54// than the last one sent over the watch channel.
55func (w *watcher) send(wr clientv3.WatchResponse) {
56 if wr.IsProgressNotify() && !w.progress {
57 return
58 }
59 if w.nextrev > wr.Header.Revision && len(wr.Events) > 0 {
60 return
61 }
62 if w.nextrev == 0 {
63 // current watch; expect updates following this revision
64 w.nextrev = wr.Header.Revision + 1
65 }
66
67 events := make([]*mvccpb.Event, 0, len(wr.Events))
68
69 var lastRev int64
70 for i := range wr.Events {
71 ev := (*mvccpb.Event)(wr.Events[i])
72 if ev.Kv.ModRevision < w.nextrev {
73 continue
74 } else {
75 // We cannot update w.rev here.
76 // txn can have multiple events with the same rev.
77 // If w.nextrev updates here, it would skip events in the same txn.
78 lastRev = ev.Kv.ModRevision
79 }
80
81 filtered := false
82 for _, filter := range w.filters {
83 if filter(*ev) {
84 filtered = true
85 break
86 }
87 }
88 if filtered {
89 continue
90 }
91
92 if !w.prevKV {
93 evCopy := *ev
94 evCopy.PrevKv = nil
95 ev = &evCopy
96 }
97 events = append(events, ev)
98 }
99
100 if lastRev >= w.nextrev {
101 w.nextrev = lastRev + 1
102 }
103
104 // all events are filtered out?
105 if !wr.IsProgressNotify() && !wr.Created && len(events) == 0 && wr.CompactRevision == 0 {
106 return
107 }
108
109 w.lastHeader = wr.Header
110 w.post(&pb.WatchResponse{
111 Header: &wr.Header,
112 Created: wr.Created,
113 CompactRevision: wr.CompactRevision,
114 Canceled: wr.Canceled,
115 WatchId: w.id,
116 Events: events,
117 })
118}
119
120// post puts a watch response on the watcher's proxy stream channel
121func (w *watcher) post(wr *pb.WatchResponse) bool {
122 select {
123 case w.wps.watchCh <- wr:
124 case <-time.After(50 * time.Millisecond):
125 w.wps.cancel()
126 return false
127 }
128 return true
129}