blob: 603095f27f1c39db456cbcc9ff4c3fd3a10a7981 [file] [log] [blame]
sslobodrd046be82019-01-16 10:02:22 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package grpcproxy
16
17import (
18 "context"
19 "sync"
20
21 "github.com/coreos/etcd/clientv3"
22 "github.com/coreos/etcd/etcdserver/api/v3rpc"
23 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
24 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
25
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/metadata"
28)
29
30type watchProxy struct {
31 cw clientv3.Watcher
32 ctx context.Context
33
34 leader *leader
35
36 ranges *watchRanges
37
38 // mu protects adding outstanding watch servers through wg.
39 mu sync.Mutex
40
41 // wg waits until all outstanding watch servers quit.
42 wg sync.WaitGroup
43
44 // kv is used for permission checking
45 kv clientv3.KV
46}
47
48func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
49 cctx, cancel := context.WithCancel(c.Ctx())
50 wp := &watchProxy{
51 cw: c.Watcher,
52 ctx: cctx,
53 leader: newLeader(c.Ctx(), c.Watcher),
54
55 kv: c.KV, // for permission checking
56 }
57 wp.ranges = newWatchRanges(wp)
58 ch := make(chan struct{})
59 go func() {
60 defer close(ch)
61 <-wp.leader.stopNotify()
62 wp.mu.Lock()
63 select {
64 case <-wp.ctx.Done():
65 case <-wp.leader.disconnectNotify():
66 cancel()
67 }
68 <-wp.ctx.Done()
69 wp.mu.Unlock()
70 wp.wg.Wait()
71 wp.ranges.stop()
72 }()
73 return wp, ch
74}
75
76func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
77 wp.mu.Lock()
78 select {
79 case <-wp.ctx.Done():
80 wp.mu.Unlock()
81 select {
82 case <-wp.leader.disconnectNotify():
83 return grpc.ErrClientConnClosing
84 default:
85 return wp.ctx.Err()
86 }
87 default:
88 wp.wg.Add(1)
89 }
90 wp.mu.Unlock()
91
92 ctx, cancel := context.WithCancel(stream.Context())
93 wps := &watchProxyStream{
94 ranges: wp.ranges,
95 watchers: make(map[int64]*watcher),
96 stream: stream,
97 watchCh: make(chan *pb.WatchResponse, 1024),
98 ctx: ctx,
99 cancel: cancel,
100 kv: wp.kv,
101 }
102
103 var lostLeaderC <-chan struct{}
104 if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
105 v := md[rpctypes.MetadataRequireLeaderKey]
106 if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
107 lostLeaderC = wp.leader.lostNotify()
108 // if leader is known to be lost at creation time, avoid
109 // letting events through at all
110 select {
111 case <-lostLeaderC:
112 wp.wg.Done()
113 return rpctypes.ErrNoLeader
114 default:
115 }
116 }
117 }
118
119 // post to stopc => terminate server stream; can't use a waitgroup
120 // since all goroutines will only terminate after Watch() exits.
121 stopc := make(chan struct{}, 3)
122 go func() {
123 defer func() { stopc <- struct{}{} }()
124 wps.recvLoop()
125 }()
126 go func() {
127 defer func() { stopc <- struct{}{} }()
128 wps.sendLoop()
129 }()
130 // tear down watch if leader goes down or entire watch proxy is terminated
131 go func() {
132 defer func() { stopc <- struct{}{} }()
133 select {
134 case <-lostLeaderC:
135 case <-ctx.Done():
136 case <-wp.ctx.Done():
137 }
138 }()
139
140 <-stopc
141 cancel()
142
143 // recv/send may only shutdown after function exits;
144 // goroutine notifies proxy that stream is through
145 go func() {
146 <-stopc
147 <-stopc
148 wps.close()
149 wp.wg.Done()
150 }()
151
152 select {
153 case <-lostLeaderC:
154 return rpctypes.ErrNoLeader
155 case <-wp.leader.disconnectNotify():
156 return grpc.ErrClientConnClosing
157 default:
158 return wps.ctx.Err()
159 }
160}
161
162// watchProxyStream forwards etcd watch events to a proxied client stream.
163type watchProxyStream struct {
164 ranges *watchRanges
165
166 // mu protects watchers and nextWatcherID
167 mu sync.Mutex
168 // watchers receive events from watch broadcast.
169 watchers map[int64]*watcher
170 // nextWatcherID is the id to assign the next watcher on this stream.
171 nextWatcherID int64
172
173 stream pb.Watch_WatchServer
174
175 // watchCh receives watch responses from the watchers.
176 watchCh chan *pb.WatchResponse
177
178 ctx context.Context
179 cancel context.CancelFunc
180
181 // kv is used for permission checking
182 kv clientv3.KV
183}
184
185func (wps *watchProxyStream) close() {
186 var wg sync.WaitGroup
187 wps.cancel()
188 wps.mu.Lock()
189 wg.Add(len(wps.watchers))
190 for _, wpsw := range wps.watchers {
191 go func(w *watcher) {
192 wps.ranges.delete(w)
193 wg.Done()
194 }(wpsw)
195 }
196 wps.watchers = nil
197 wps.mu.Unlock()
198
199 wg.Wait()
200
201 close(wps.watchCh)
202}
203
204func (wps *watchProxyStream) checkPermissionForWatch(key, rangeEnd []byte) error {
205 if len(key) == 0 {
206 // If the length of the key is 0, we need to obtain full range.
207 // look at clientv3.WithPrefix()
208 key = []byte{0}
209 rangeEnd = []byte{0}
210 }
211 req := &pb.RangeRequest{
212 Serializable: true,
213 Key: key,
214 RangeEnd: rangeEnd,
215 CountOnly: true,
216 Limit: 1,
217 }
218 _, err := wps.kv.Do(wps.ctx, RangeRequestToOp(req))
219 return err
220}
221
222func (wps *watchProxyStream) recvLoop() error {
223 for {
224 req, err := wps.stream.Recv()
225 if err != nil {
226 return err
227 }
228 switch uv := req.RequestUnion.(type) {
229 case *pb.WatchRequest_CreateRequest:
230 cr := uv.CreateRequest
231
232 if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied {
233 // Return WatchResponse which is caused by permission checking if and only if
234 // the error is permission denied. For other errors (e.g. timeout or connection closed),
235 // the permission checking mechanism should do nothing for preserving error code.
236 wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true}
237 continue
238 }
239
240 w := &watcher{
241 wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
242 id: wps.nextWatcherID,
243 wps: wps,
244
245 nextrev: cr.StartRevision,
246 progress: cr.ProgressNotify,
247 prevKV: cr.PrevKv,
248 filters: v3rpc.FiltersFromRequest(cr),
249 }
250 if !w.wr.valid() {
251 w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
252 continue
253 }
254 wps.nextWatcherID++
255 w.nextrev = cr.StartRevision
256 wps.watchers[w.id] = w
257 wps.ranges.add(w)
258 case *pb.WatchRequest_CancelRequest:
259 wps.delete(uv.CancelRequest.WatchId)
260 default:
261 panic("not implemented")
262 }
263 }
264}
265
266func (wps *watchProxyStream) sendLoop() {
267 for {
268 select {
269 case wresp, ok := <-wps.watchCh:
270 if !ok {
271 return
272 }
273 if err := wps.stream.Send(wresp); err != nil {
274 return
275 }
276 case <-wps.ctx.Done():
277 return
278 }
279 }
280}
281
282func (wps *watchProxyStream) delete(id int64) {
283 wps.mu.Lock()
284 defer wps.mu.Unlock()
285
286 w, ok := wps.watchers[id]
287 if !ok {
288 return
289 }
290 wps.ranges.delete(w)
291 delete(wps.watchers, id)
292 resp := &pb.WatchResponse{
293 Header: &w.lastHeader,
294 WatchId: id,
295 Canceled: true,
296 }
297 wps.watchCh <- resp
298}