blob: f41cb6c05695bca79f9dfe924d8cced77557f439 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3rpc
16
17import (
18 "context"
19 "io"
20 "math/rand"
21 "sync"
22 "time"
23
24 "go.etcd.io/etcd/auth"
25 "go.etcd.io/etcd/etcdserver"
26 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
27 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
28 "go.etcd.io/etcd/mvcc"
29 "go.etcd.io/etcd/mvcc/mvccpb"
30
31 "go.uber.org/zap"
32)
33
34type watchServer struct {
35 lg *zap.Logger
36
37 clusterID int64
38 memberID int64
39
40 maxRequestBytes int
41
42 sg etcdserver.RaftStatusGetter
43 watchable mvcc.WatchableKV
44 ag AuthGetter
45}
46
47// NewWatchServer returns a new watch server.
48func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
49 return &watchServer{
50 lg: s.Cfg.Logger,
51
52 clusterID: int64(s.Cluster().ID()),
53 memberID: int64(s.ID()),
54
55 maxRequestBytes: int(s.Cfg.MaxRequestBytes + grpcOverheadBytes),
56
57 sg: s,
58 watchable: s.Watchable(),
59 ag: s,
60 }
61}
62
63var (
64 // External test can read this with GetProgressReportInterval()
65 // and change this to a small value to finish fast with
66 // SetProgressReportInterval().
67 progressReportInterval = 10 * time.Minute
68 progressReportIntervalMu sync.RWMutex
69)
70
71// GetProgressReportInterval returns the current progress report interval (for testing).
72func GetProgressReportInterval() time.Duration {
73 progressReportIntervalMu.RLock()
74 interval := progressReportInterval
75 progressReportIntervalMu.RUnlock()
76
77 // add rand(1/10*progressReportInterval) as jitter so that etcdserver will not
78 // send progress notifications to watchers around the same time even when watchers
79 // are created around the same time (which is common when a client restarts itself).
80 jitter := time.Duration(rand.Int63n(int64(interval) / 10))
81
82 return interval + jitter
83}
84
85// SetProgressReportInterval updates the current progress report interval (for testing).
86func SetProgressReportInterval(newTimeout time.Duration) {
87 progressReportIntervalMu.Lock()
88 progressReportInterval = newTimeout
89 progressReportIntervalMu.Unlock()
90}
91
92// We send ctrl response inside the read loop. We do not want
93// send to block read, but we still want ctrl response we sent to
94// be serialized. Thus we use a buffered chan to solve the problem.
95// A small buffer should be OK for most cases, since we expect the
96// ctrl requests are infrequent.
97const ctrlStreamBufLen = 16
98
99// serverWatchStream is an etcd server side stream. It receives requests
100// from client side gRPC stream. It receives watch events from mvcc.WatchStream,
101// and creates responses that forwarded to gRPC stream.
102// It also forwards control message like watch created and canceled.
103type serverWatchStream struct {
104 lg *zap.Logger
105
106 clusterID int64
107 memberID int64
108
109 maxRequestBytes int
110
111 sg etcdserver.RaftStatusGetter
112 watchable mvcc.WatchableKV
113 ag AuthGetter
114
115 gRPCStream pb.Watch_WatchServer
116 watchStream mvcc.WatchStream
117 ctrlStream chan *pb.WatchResponse
118
119 // mu protects progress, prevKV, fragment
120 mu sync.RWMutex
121 // tracks the watchID that stream might need to send progress to
122 // TODO: combine progress and prevKV into a single struct?
123 progress map[mvcc.WatchID]bool
124 // record watch IDs that need return previous key-value pair
125 prevKV map[mvcc.WatchID]bool
126 // records fragmented watch IDs
127 fragment map[mvcc.WatchID]bool
128
129 // closec indicates the stream is closed.
130 closec chan struct{}
131
132 // wg waits for the send loop to complete
133 wg sync.WaitGroup
134}
135
136func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
137 sws := serverWatchStream{
138 lg: ws.lg,
139
140 clusterID: ws.clusterID,
141 memberID: ws.memberID,
142
143 maxRequestBytes: ws.maxRequestBytes,
144
145 sg: ws.sg,
146 watchable: ws.watchable,
147 ag: ws.ag,
148
149 gRPCStream: stream,
150 watchStream: ws.watchable.NewWatchStream(),
151 // chan for sending control response like watcher created and canceled.
152 ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
153
154 progress: make(map[mvcc.WatchID]bool),
155 prevKV: make(map[mvcc.WatchID]bool),
156 fragment: make(map[mvcc.WatchID]bool),
157
158 closec: make(chan struct{}),
159 }
160
161 sws.wg.Add(1)
162 go func() {
163 sws.sendLoop()
164 sws.wg.Done()
165 }()
166
167 errc := make(chan error, 1)
168 // Ideally recvLoop would also use sws.wg to signal its completion
169 // but when stream.Context().Done() is closed, the stream's recv
170 // may continue to block since it uses a different context, leading to
171 // deadlock when calling sws.close().
172 go func() {
173 if rerr := sws.recvLoop(); rerr != nil {
174 if isClientCtxErr(stream.Context().Err(), rerr) {
175 if sws.lg != nil {
176 sws.lg.Debug("failed to receive watch request from gRPC stream", zap.Error(rerr))
177 } else {
178 plog.Debugf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
179 }
180 } else {
181 if sws.lg != nil {
182 sws.lg.Warn("failed to receive watch request from gRPC stream", zap.Error(rerr))
183 } else {
184 plog.Warningf("failed to receive watch request from gRPC stream (%q)", rerr.Error())
185 }
186 streamFailures.WithLabelValues("receive", "watch").Inc()
187 }
188 errc <- rerr
189 }
190 }()
191
192 select {
193 case err = <-errc:
194 close(sws.ctrlStream)
195
196 case <-stream.Context().Done():
197 err = stream.Context().Err()
198 // the only server-side cancellation is noleader for now.
199 if err == context.Canceled {
200 err = rpctypes.ErrGRPCNoLeader
201 }
202 }
203
204 sws.close()
205 return err
206}
207
208func (sws *serverWatchStream) isWatchPermitted(wcr *pb.WatchCreateRequest) bool {
209 authInfo, err := sws.ag.AuthInfoFromCtx(sws.gRPCStream.Context())
210 if err != nil {
211 return false
212 }
213 if authInfo == nil {
214 // if auth is enabled, IsRangePermitted() can cause an error
215 authInfo = &auth.AuthInfo{}
216 }
217 return sws.ag.AuthStore().IsRangePermitted(authInfo, wcr.Key, wcr.RangeEnd) == nil
218}
219
220func (sws *serverWatchStream) recvLoop() error {
221 for {
222 req, err := sws.gRPCStream.Recv()
223 if err == io.EOF {
224 return nil
225 }
226 if err != nil {
227 return err
228 }
229
230 switch uv := req.RequestUnion.(type) {
231 case *pb.WatchRequest_CreateRequest:
232 if uv.CreateRequest == nil {
233 break
234 }
235
236 creq := uv.CreateRequest
237 if len(creq.Key) == 0 {
238 // \x00 is the smallest key
239 creq.Key = []byte{0}
240 }
241 if len(creq.RangeEnd) == 0 {
242 // force nil since watchstream.Watch distinguishes
243 // between nil and []byte{} for single key / >=
244 creq.RangeEnd = nil
245 }
246 if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
247 // support >= key queries
248 creq.RangeEnd = []byte{}
249 }
250
251 if !sws.isWatchPermitted(creq) {
252 wr := &pb.WatchResponse{
253 Header: sws.newResponseHeader(sws.watchStream.Rev()),
254 WatchId: creq.WatchId,
255 Canceled: true,
256 Created: true,
257 CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
258 }
259
260 select {
261 case sws.ctrlStream <- wr:
262 case <-sws.closec:
263 }
264 return nil
265 }
266
267 filters := FiltersFromRequest(creq)
268
269 wsrev := sws.watchStream.Rev()
270 rev := creq.StartRevision
271 if rev == 0 {
272 rev = wsrev + 1
273 }
274 id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
275 if err == nil {
276 sws.mu.Lock()
277 if creq.ProgressNotify {
278 sws.progress[id] = true
279 }
280 if creq.PrevKv {
281 sws.prevKV[id] = true
282 }
283 if creq.Fragment {
284 sws.fragment[id] = true
285 }
286 sws.mu.Unlock()
287 }
288 wr := &pb.WatchResponse{
289 Header: sws.newResponseHeader(wsrev),
290 WatchId: int64(id),
291 Created: true,
292 Canceled: err != nil,
293 }
294 if err != nil {
295 wr.CancelReason = err.Error()
296 }
297 select {
298 case sws.ctrlStream <- wr:
299 case <-sws.closec:
300 return nil
301 }
302
303 case *pb.WatchRequest_CancelRequest:
304 if uv.CancelRequest != nil {
305 id := uv.CancelRequest.WatchId
306 err := sws.watchStream.Cancel(mvcc.WatchID(id))
307 if err == nil {
308 sws.ctrlStream <- &pb.WatchResponse{
309 Header: sws.newResponseHeader(sws.watchStream.Rev()),
310 WatchId: id,
311 Canceled: true,
312 }
313 sws.mu.Lock()
314 delete(sws.progress, mvcc.WatchID(id))
315 delete(sws.prevKV, mvcc.WatchID(id))
316 delete(sws.fragment, mvcc.WatchID(id))
317 sws.mu.Unlock()
318 }
319 }
320 case *pb.WatchRequest_ProgressRequest:
321 if uv.ProgressRequest != nil {
322 sws.ctrlStream <- &pb.WatchResponse{
323 Header: sws.newResponseHeader(sws.watchStream.Rev()),
324 WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
325 }
326 }
327 default:
328 // we probably should not shutdown the entire stream when
329 // receive an valid command.
330 // so just do nothing instead.
331 continue
332 }
333 }
334}
335
336func (sws *serverWatchStream) sendLoop() {
337 // watch ids that are currently active
338 ids := make(map[mvcc.WatchID]struct{})
339 // watch responses pending on a watch id creation message
340 pending := make(map[mvcc.WatchID][]*pb.WatchResponse)
341
342 interval := GetProgressReportInterval()
343 progressTicker := time.NewTicker(interval)
344
345 defer func() {
346 progressTicker.Stop()
347 // drain the chan to clean up pending events
348 for ws := range sws.watchStream.Chan() {
349 mvcc.ReportEventReceived(len(ws.Events))
350 }
351 for _, wrs := range pending {
352 for _, ws := range wrs {
353 mvcc.ReportEventReceived(len(ws.Events))
354 }
355 }
356 }()
357
358 for {
359 select {
360 case wresp, ok := <-sws.watchStream.Chan():
361 if !ok {
362 return
363 }
364
365 // TODO: evs is []mvccpb.Event type
366 // either return []*mvccpb.Event from the mvcc package
367 // or define protocol buffer with []mvccpb.Event.
368 evs := wresp.Events
369 events := make([]*mvccpb.Event, len(evs))
370 sws.mu.RLock()
371 needPrevKV := sws.prevKV[wresp.WatchID]
372 sws.mu.RUnlock()
373 for i := range evs {
374 events[i] = &evs[i]
375 if needPrevKV {
376 opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
377 r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
378 if err == nil && len(r.KVs) != 0 {
379 events[i].PrevKv = &(r.KVs[0])
380 }
381 }
382 }
383
384 canceled := wresp.CompactRevision != 0
385 wr := &pb.WatchResponse{
386 Header: sws.newResponseHeader(wresp.Revision),
387 WatchId: int64(wresp.WatchID),
388 Events: events,
389 CompactRevision: wresp.CompactRevision,
390 Canceled: canceled,
391 }
392
393 if _, okID := ids[wresp.WatchID]; !okID {
394 // buffer if id not yet announced
395 wrs := append(pending[wresp.WatchID], wr)
396 pending[wresp.WatchID] = wrs
397 continue
398 }
399
400 mvcc.ReportEventReceived(len(evs))
401
402 sws.mu.RLock()
403 fragmented, ok := sws.fragment[wresp.WatchID]
404 sws.mu.RUnlock()
405
406 var serr error
407 if !fragmented && !ok {
408 serr = sws.gRPCStream.Send(wr)
409 } else {
410 serr = sendFragments(wr, sws.maxRequestBytes, sws.gRPCStream.Send)
411 }
412
413 if serr != nil {
414 if isClientCtxErr(sws.gRPCStream.Context().Err(), serr) {
415 if sws.lg != nil {
416 sws.lg.Debug("failed to send watch response to gRPC stream", zap.Error(serr))
417 } else {
418 plog.Debugf("failed to send watch response to gRPC stream (%q)", serr.Error())
419 }
420 } else {
421 if sws.lg != nil {
422 sws.lg.Warn("failed to send watch response to gRPC stream", zap.Error(serr))
423 } else {
424 plog.Warningf("failed to send watch response to gRPC stream (%q)", serr.Error())
425 }
426 streamFailures.WithLabelValues("send", "watch").Inc()
427 }
428 return
429 }
430
431 sws.mu.Lock()
432 if len(evs) > 0 && sws.progress[wresp.WatchID] {
433 // elide next progress update if sent a key update
434 sws.progress[wresp.WatchID] = false
435 }
436 sws.mu.Unlock()
437
438 case c, ok := <-sws.ctrlStream:
439 if !ok {
440 return
441 }
442
443 if err := sws.gRPCStream.Send(c); err != nil {
444 if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
445 if sws.lg != nil {
446 sws.lg.Debug("failed to send watch control response to gRPC stream", zap.Error(err))
447 } else {
448 plog.Debugf("failed to send watch control response to gRPC stream (%q)", err.Error())
449 }
450 } else {
451 if sws.lg != nil {
452 sws.lg.Warn("failed to send watch control response to gRPC stream", zap.Error(err))
453 } else {
454 plog.Warningf("failed to send watch control response to gRPC stream (%q)", err.Error())
455 }
456 streamFailures.WithLabelValues("send", "watch").Inc()
457 }
458 return
459 }
460
461 // track id creation
462 wid := mvcc.WatchID(c.WatchId)
463 if c.Canceled {
464 delete(ids, wid)
465 continue
466 }
467 if c.Created {
468 // flush buffered events
469 ids[wid] = struct{}{}
470 for _, v := range pending[wid] {
471 mvcc.ReportEventReceived(len(v.Events))
472 if err := sws.gRPCStream.Send(v); err != nil {
473 if isClientCtxErr(sws.gRPCStream.Context().Err(), err) {
474 if sws.lg != nil {
475 sws.lg.Debug("failed to send pending watch response to gRPC stream", zap.Error(err))
476 } else {
477 plog.Debugf("failed to send pending watch response to gRPC stream (%q)", err.Error())
478 }
479 } else {
480 if sws.lg != nil {
481 sws.lg.Warn("failed to send pending watch response to gRPC stream", zap.Error(err))
482 } else {
483 plog.Warningf("failed to send pending watch response to gRPC stream (%q)", err.Error())
484 }
485 streamFailures.WithLabelValues("send", "watch").Inc()
486 }
487 return
488 }
489 }
490 delete(pending, wid)
491 }
492
493 case <-progressTicker.C:
494 sws.mu.Lock()
495 for id, ok := range sws.progress {
496 if ok {
497 sws.watchStream.RequestProgress(id)
498 }
499 sws.progress[id] = true
500 }
501 sws.mu.Unlock()
502
503 case <-sws.closec:
504 return
505 }
506 }
507}
508
509func sendFragments(
510 wr *pb.WatchResponse,
511 maxRequestBytes int,
512 sendFunc func(*pb.WatchResponse) error) error {
513 // no need to fragment if total request size is smaller
514 // than max request limit or response contains only one event
515 if wr.Size() < maxRequestBytes || len(wr.Events) < 2 {
516 return sendFunc(wr)
517 }
518
519 ow := *wr
520 ow.Events = make([]*mvccpb.Event, 0)
521 ow.Fragment = true
522
523 var idx int
524 for {
525 cur := ow
526 for _, ev := range wr.Events[idx:] {
527 cur.Events = append(cur.Events, ev)
528 if len(cur.Events) > 1 && cur.Size() >= maxRequestBytes {
529 cur.Events = cur.Events[:len(cur.Events)-1]
530 break
531 }
532 idx++
533 }
534 if idx == len(wr.Events) {
535 // last response has no more fragment
536 cur.Fragment = false
537 }
538 if err := sendFunc(&cur); err != nil {
539 return err
540 }
541 if !cur.Fragment {
542 break
543 }
544 }
545 return nil
546}
547
548func (sws *serverWatchStream) close() {
549 sws.watchStream.Close()
550 close(sws.closec)
551 sws.wg.Wait()
552}
553
554func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {
555 return &pb.ResponseHeader{
556 ClusterId: uint64(sws.clusterID),
557 MemberId: uint64(sws.memberID),
558 Revision: rev,
559 RaftTerm: sws.sg.Term(),
560 }
561}
562
563func filterNoDelete(e mvccpb.Event) bool {
564 return e.Type == mvccpb.DELETE
565}
566
567func filterNoPut(e mvccpb.Event) bool {
568 return e.Type == mvccpb.PUT
569}
570
571// FiltersFromRequest returns "mvcc.FilterFunc" from a given watch create request.
572func FiltersFromRequest(creq *pb.WatchCreateRequest) []mvcc.FilterFunc {
573 filters := make([]mvcc.FilterFunc, 0, len(creq.Filters))
574 for _, ft := range creq.Filters {
575 switch ft {
576 case pb.WatchCreateRequest_NOPUT:
577 filters = append(filters, filterNoPut)
578 case pb.WatchCreateRequest_NODELETE:
579 filters = append(filters, filterNoDelete)
580 default:
581 }
582 }
583 return filters
584}