blob: 87d222d1d68930a8eb33615b1b3a72e389e8bc55 [file] [log] [blame]
Scott Bakereee8dd82019-09-24 12:52:34 -07001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "errors"
20 "fmt"
21 "sync"
22 "time"
23
24 v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
25 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
26 mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/metadata"
31 "google.golang.org/grpc/status"
32)
33
34const (
35 EventTypeDelete = mvccpb.DELETE
36 EventTypePut = mvccpb.PUT
37
38 closeSendErrTimeout = 250 * time.Millisecond
39)
40
41type Event mvccpb.Event
42
43type WatchChan <-chan WatchResponse
44
45type Watcher interface {
46 // Watch watches on a key or prefix. The watched events will be returned
47 // through the returned channel. If revisions waiting to be sent over the
48 // watch are compacted, then the watch will be canceled by the server, the
49 // client will post a compacted error watch response, and the channel will close.
50 // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
51 // and "WatchResponse" from this closed channel has zero events and nil "Err()".
52 // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
53 // to release the associated resources.
54 //
55 // If the context is "context.Background/TODO", returned "WatchChan" will
56 // not be closed and block until event is triggered, except when server
57 // returns a non-recoverable error (e.g. ErrCompacted).
58 // For example, when context passed with "WithRequireLeader" and the
59 // connected server has no leader (e.g. due to network partition),
60 // error "etcdserver: no leader" (ErrNoLeader) will be returned,
61 // and then "WatchChan" is closed with non-nil "Err()".
62 // In order to prevent a watch stream being stuck in a partitioned node,
63 // make sure to wrap context with "WithRequireLeader".
64 //
65 // Otherwise, as long as the context has not been canceled or timed out,
66 // watch will retry on other recoverable errors forever until reconnected.
67 //
68 // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
69 // Currently, client contexts are overwritten with "valCtx" that never closes.
70 // TODO(v3.4): configure watch retry policy, limit maximum retry number
71 // (see https://github.com/etcd-io/etcd/issues/8980)
72 Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
73
74 // RequestProgress requests a progress notify response be sent in all watch channels.
75 RequestProgress(ctx context.Context) error
76
77 // Close closes the watcher and cancels all watch requests.
78 Close() error
79}
80
81type WatchResponse struct {
82 Header pb.ResponseHeader
83 Events []*Event
84
85 // CompactRevision is the minimum revision the watcher may receive.
86 CompactRevision int64
87
88 // Canceled is used to indicate watch failure.
89 // If the watch failed and the stream was about to close, before the channel is closed,
90 // the channel sends a final response that has Canceled set to true with a non-nil Err().
91 Canceled bool
92
93 // Created is used to indicate the creation of the watcher.
94 Created bool
95
96 closeErr error
97
98 // cancelReason is a reason of canceling watch
99 cancelReason string
100}
101
102// IsCreate returns true if the event tells that the key is newly created.
103func (e *Event) IsCreate() bool {
104 return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
105}
106
107// IsModify returns true if the event tells that a new value is put on existing key.
108func (e *Event) IsModify() bool {
109 return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
110}
111
112// Err is the error value if this WatchResponse holds an error.
113func (wr *WatchResponse) Err() error {
114 switch {
115 case wr.closeErr != nil:
116 return v3rpc.Error(wr.closeErr)
117 case wr.CompactRevision != 0:
118 return v3rpc.ErrCompacted
119 case wr.Canceled:
120 if len(wr.cancelReason) != 0 {
121 return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
122 }
123 return v3rpc.ErrFutureRev
124 }
125 return nil
126}
127
128// IsProgressNotify returns true if the WatchResponse is progress notification.
129func (wr *WatchResponse) IsProgressNotify() bool {
130 return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
131}
132
133// watcher implements the Watcher interface
134type watcher struct {
135 remote pb.WatchClient
136 callOpts []grpc.CallOption
137
138 // mu protects the grpc streams map
139 mu sync.RWMutex
140
141 // streams holds all the active grpc streams keyed by ctx value.
142 streams map[string]*watchGrpcStream
143}
144
145// watchGrpcStream tracks all watch resources attached to a single grpc stream.
146type watchGrpcStream struct {
147 owner *watcher
148 remote pb.WatchClient
149 callOpts []grpc.CallOption
150
151 // ctx controls internal remote.Watch requests
152 ctx context.Context
153 // ctxKey is the key used when looking up this stream's context
154 ctxKey string
155 cancel context.CancelFunc
156
157 // substreams holds all active watchers on this grpc stream
158 substreams map[int64]*watcherStream
159 // resuming holds all resuming watchers on this grpc stream
160 resuming []*watcherStream
161
162 // reqc sends a watch request from Watch() to the main goroutine
163 reqc chan watchStreamRequest
164 // respc receives data from the watch client
165 respc chan *pb.WatchResponse
166 // donec closes to broadcast shutdown
167 donec chan struct{}
168 // errc transmits errors from grpc Recv to the watch stream reconnect logic
169 errc chan error
170 // closingc gets the watcherStream of closing watchers
171 closingc chan *watcherStream
172 // wg is Done when all substream goroutines have exited
173 wg sync.WaitGroup
174
175 // resumec closes to signal that all substreams should begin resuming
176 resumec chan struct{}
177 // closeErr is the error that closed the watch stream
178 closeErr error
179}
180
181// watchStreamRequest is a union of the supported watch request operation types
182type watchStreamRequest interface {
183 toPB() *pb.WatchRequest
184}
185
186// watchRequest is issued by the subscriber to start a new watcher
187type watchRequest struct {
188 ctx context.Context
189 key string
190 end string
191 rev int64
192
193 // send created notification event if this field is true
194 createdNotify bool
195 // progressNotify is for progress updates
196 progressNotify bool
197 // fragmentation should be disabled by default
198 // if true, split watch events when total exceeds
199 // "--max-request-bytes" flag value + 512-byte
200 fragment bool
201
202 // filters is the list of events to filter out
203 filters []pb.WatchCreateRequest_FilterType
204 // get the previous key-value pair before the event happens
205 prevKV bool
206 // retc receives a chan WatchResponse once the watcher is established
207 retc chan chan WatchResponse
208}
209
210// progressRequest is issued by the subscriber to request watch progress
211type progressRequest struct {
212}
213
214// watcherStream represents a registered watcher
215type watcherStream struct {
216 // initReq is the request that initiated this request
217 initReq watchRequest
218
219 // outc publishes watch responses to subscriber
220 outc chan WatchResponse
221 // recvc buffers watch responses before publishing
222 recvc chan *WatchResponse
223 // donec closes when the watcherStream goroutine stops.
224 donec chan struct{}
225 // closing is set to true when stream should be scheduled to shutdown.
226 closing bool
227 // id is the registered watch id on the grpc stream
228 id int64
229
230 // buf holds all events received from etcd but not yet consumed by the client
231 buf []*WatchResponse
232}
233
234func NewWatcher(c *Client) Watcher {
235 return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
236}
237
238func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
239 w := &watcher{
240 remote: wc,
241 streams: make(map[string]*watchGrpcStream),
242 }
243 if c != nil {
244 w.callOpts = c.callOpts
245 }
246 return w
247}
248
249// never closes
250var valCtxCh = make(chan struct{})
251var zeroTime = time.Unix(0, 0)
252
253// ctx with only the values; never Done
254type valCtx struct{ context.Context }
255
256func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
257func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
258func (vc *valCtx) Err() error { return nil }
259
260func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
261 ctx, cancel := context.WithCancel(&valCtx{inctx})
262 wgs := &watchGrpcStream{
263 owner: w,
264 remote: w.remote,
265 callOpts: w.callOpts,
266 ctx: ctx,
267 ctxKey: streamKeyFromCtx(inctx),
268 cancel: cancel,
269 substreams: make(map[int64]*watcherStream),
270 respc: make(chan *pb.WatchResponse),
271 reqc: make(chan watchStreamRequest),
272 donec: make(chan struct{}),
273 errc: make(chan error, 1),
274 closingc: make(chan *watcherStream),
275 resumec: make(chan struct{}),
276 }
277 go wgs.run()
278 return wgs
279}
280
281// Watch posts a watch request to run() and waits for a new watcher channel
282func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
283 ow := opWatch(key, opts...)
284
285 var filters []pb.WatchCreateRequest_FilterType
286 if ow.filterPut {
287 filters = append(filters, pb.WatchCreateRequest_NOPUT)
288 }
289 if ow.filterDelete {
290 filters = append(filters, pb.WatchCreateRequest_NODELETE)
291 }
292
293 wr := &watchRequest{
294 ctx: ctx,
295 createdNotify: ow.createdNotify,
296 key: string(ow.key),
297 end: string(ow.end),
298 rev: ow.rev,
299 progressNotify: ow.progressNotify,
300 fragment: ow.fragment,
301 filters: filters,
302 prevKV: ow.prevKV,
303 retc: make(chan chan WatchResponse, 1),
304 }
305
306 ok := false
307 ctxKey := streamKeyFromCtx(ctx)
308
309 // find or allocate appropriate grpc watch stream
310 w.mu.Lock()
311 if w.streams == nil {
312 // closed
313 w.mu.Unlock()
314 ch := make(chan WatchResponse)
315 close(ch)
316 return ch
317 }
318 wgs := w.streams[ctxKey]
319 if wgs == nil {
320 wgs = w.newWatcherGrpcStream(ctx)
321 w.streams[ctxKey] = wgs
322 }
323 donec := wgs.donec
324 reqc := wgs.reqc
325 w.mu.Unlock()
326
327 // couldn't create channel; return closed channel
328 closeCh := make(chan WatchResponse, 1)
329
330 // submit request
331 select {
332 case reqc <- wr:
333 ok = true
334 case <-wr.ctx.Done():
335 case <-donec:
336 if wgs.closeErr != nil {
337 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
338 break
339 }
340 // retry; may have dropped stream from no ctxs
341 return w.Watch(ctx, key, opts...)
342 }
343
344 // receive channel
345 if ok {
346 select {
347 case ret := <-wr.retc:
348 return ret
349 case <-ctx.Done():
350 case <-donec:
351 if wgs.closeErr != nil {
352 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
353 break
354 }
355 // retry; may have dropped stream from no ctxs
356 return w.Watch(ctx, key, opts...)
357 }
358 }
359
360 close(closeCh)
361 return closeCh
362}
363
364func (w *watcher) Close() (err error) {
365 w.mu.Lock()
366 streams := w.streams
367 w.streams = nil
368 w.mu.Unlock()
369 for _, wgs := range streams {
370 if werr := wgs.close(); werr != nil {
371 err = werr
372 }
373 }
374 // Consider context.Canceled as a successful close
375 if err == context.Canceled {
376 err = nil
377 }
378 return err
379}
380
381// RequestProgress requests a progress notify response be sent in all watch channels.
382func (w *watcher) RequestProgress(ctx context.Context) (err error) {
383 ctxKey := streamKeyFromCtx(ctx)
384
385 w.mu.Lock()
386 if w.streams == nil {
Scott Baker611f6bd2019-10-18 13:45:19 -0700387 w.mu.Unlock()
Scott Bakereee8dd82019-09-24 12:52:34 -0700388 return fmt.Errorf("no stream found for context")
389 }
390 wgs := w.streams[ctxKey]
391 if wgs == nil {
392 wgs = w.newWatcherGrpcStream(ctx)
393 w.streams[ctxKey] = wgs
394 }
395 donec := wgs.donec
396 reqc := wgs.reqc
397 w.mu.Unlock()
398
399 pr := &progressRequest{}
400
401 select {
402 case reqc <- pr:
403 return nil
404 case <-ctx.Done():
405 if err == nil {
406 return ctx.Err()
407 }
408 return err
409 case <-donec:
410 if wgs.closeErr != nil {
411 return wgs.closeErr
412 }
413 // retry; may have dropped stream from no ctxs
414 return w.RequestProgress(ctx)
415 }
416}
417
418func (w *watchGrpcStream) close() (err error) {
419 w.cancel()
420 <-w.donec
421 select {
422 case err = <-w.errc:
423 default:
424 }
425 return toErr(w.ctx, err)
426}
427
428func (w *watcher) closeStream(wgs *watchGrpcStream) {
429 w.mu.Lock()
430 close(wgs.donec)
431 wgs.cancel()
432 if w.streams != nil {
433 delete(w.streams, wgs.ctxKey)
434 }
435 w.mu.Unlock()
436}
437
438func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
439 // check watch ID for backward compatibility (<= v3.3)
440 if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
441 w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
442 // failed; no channel
443 close(ws.recvc)
444 return
445 }
446 ws.id = resp.WatchId
447 w.substreams[ws.id] = ws
448}
449
450func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
451 select {
452 case ws.outc <- *resp:
453 case <-ws.initReq.ctx.Done():
454 case <-time.After(closeSendErrTimeout):
455 }
456 close(ws.outc)
457}
458
459func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
460 // send channel response in case stream was never established
461 select {
462 case ws.initReq.retc <- ws.outc:
463 default:
464 }
465 // close subscriber's channel
466 if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
467 go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
468 } else if ws.outc != nil {
469 close(ws.outc)
470 }
471 if ws.id != -1 {
472 delete(w.substreams, ws.id)
473 return
474 }
475 for i := range w.resuming {
476 if w.resuming[i] == ws {
477 w.resuming[i] = nil
478 return
479 }
480 }
481}
482
483// run is the root of the goroutines for managing a watcher client
484func (w *watchGrpcStream) run() {
485 var wc pb.Watch_WatchClient
486 var closeErr error
487
488 // substreams marked to close but goroutine still running; needed for
489 // avoiding double-closing recvc on grpc stream teardown
490 closing := make(map[*watcherStream]struct{})
491
492 defer func() {
493 w.closeErr = closeErr
494 // shutdown substreams and resuming substreams
495 for _, ws := range w.substreams {
496 if _, ok := closing[ws]; !ok {
497 close(ws.recvc)
498 closing[ws] = struct{}{}
499 }
500 }
501 for _, ws := range w.resuming {
502 if _, ok := closing[ws]; ws != nil && !ok {
503 close(ws.recvc)
504 closing[ws] = struct{}{}
505 }
506 }
507 w.joinSubstreams()
508 for range closing {
509 w.closeSubstream(<-w.closingc)
510 }
511 w.wg.Wait()
512 w.owner.closeStream(w)
513 }()
514
515 // start a stream with the etcd grpc server
516 if wc, closeErr = w.newWatchClient(); closeErr != nil {
517 return
518 }
519
520 cancelSet := make(map[int64]struct{})
521
522 var cur *pb.WatchResponse
523 for {
524 select {
525 // Watch() requested
526 case req := <-w.reqc:
527 switch wreq := req.(type) {
528 case *watchRequest:
529 outc := make(chan WatchResponse, 1)
530 // TODO: pass custom watch ID?
531 ws := &watcherStream{
532 initReq: *wreq,
533 id: -1,
534 outc: outc,
535 // unbuffered so resumes won't cause repeat events
536 recvc: make(chan *WatchResponse),
537 }
538
539 ws.donec = make(chan struct{})
540 w.wg.Add(1)
541 go w.serveSubstream(ws, w.resumec)
542
543 // queue up for watcher creation/resume
544 w.resuming = append(w.resuming, ws)
545 if len(w.resuming) == 1 {
546 // head of resume queue, can register a new watcher
547 wc.Send(ws.initReq.toPB())
548 }
549 case *progressRequest:
550 wc.Send(wreq.toPB())
551 }
552
553 // new events from the watch client
554 case pbresp := <-w.respc:
555 if cur == nil || pbresp.Created || pbresp.Canceled {
556 cur = pbresp
557 } else if cur != nil && cur.WatchId == pbresp.WatchId {
558 // merge new events
559 cur.Events = append(cur.Events, pbresp.Events...)
560 // update "Fragment" field; last response with "Fragment" == false
561 cur.Fragment = pbresp.Fragment
562 }
563
564 switch {
565 case pbresp.Created:
566 // response to head of queue creation
567 if ws := w.resuming[0]; ws != nil {
568 w.addSubstream(pbresp, ws)
569 w.dispatchEvent(pbresp)
570 w.resuming[0] = nil
571 }
572
573 if ws := w.nextResume(); ws != nil {
574 wc.Send(ws.initReq.toPB())
575 }
576
577 // reset for next iteration
578 cur = nil
579
580 case pbresp.Canceled && pbresp.CompactRevision == 0:
581 delete(cancelSet, pbresp.WatchId)
582 if ws, ok := w.substreams[pbresp.WatchId]; ok {
583 // signal to stream goroutine to update closingc
584 close(ws.recvc)
585 closing[ws] = struct{}{}
586 }
587
588 // reset for next iteration
589 cur = nil
590
591 case cur.Fragment:
592 // watch response events are still fragmented
593 // continue to fetch next fragmented event arrival
594 continue
595
596 default:
597 // dispatch to appropriate watch stream
598 ok := w.dispatchEvent(cur)
599
600 // reset for next iteration
601 cur = nil
602
603 if ok {
604 break
605 }
606
607 // watch response on unexpected watch id; cancel id
608 if _, ok := cancelSet[pbresp.WatchId]; ok {
609 break
610 }
611
612 cancelSet[pbresp.WatchId] = struct{}{}
613 cr := &pb.WatchRequest_CancelRequest{
614 CancelRequest: &pb.WatchCancelRequest{
615 WatchId: pbresp.WatchId,
616 },
617 }
618 req := &pb.WatchRequest{RequestUnion: cr}
619 wc.Send(req)
620 }
621
622 // watch client failed on Recv; spawn another if possible
623 case err := <-w.errc:
624 if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
625 closeErr = err
626 return
627 }
628 if wc, closeErr = w.newWatchClient(); closeErr != nil {
629 return
630 }
631 if ws := w.nextResume(); ws != nil {
632 wc.Send(ws.initReq.toPB())
633 }
634 cancelSet = make(map[int64]struct{})
635
636 case <-w.ctx.Done():
637 return
638
639 case ws := <-w.closingc:
640 w.closeSubstream(ws)
641 delete(closing, ws)
642 // no more watchers on this stream, shutdown
643 if len(w.substreams)+len(w.resuming) == 0 {
644 return
645 }
646 }
647 }
648}
649
650// nextResume chooses the next resuming to register with the grpc stream. Abandoned
651// streams are marked as nil in the queue since the head must wait for its inflight registration.
652func (w *watchGrpcStream) nextResume() *watcherStream {
653 for len(w.resuming) != 0 {
654 if w.resuming[0] != nil {
655 return w.resuming[0]
656 }
657 w.resuming = w.resuming[1:len(w.resuming)]
658 }
659 return nil
660}
661
662// dispatchEvent sends a WatchResponse to the appropriate watcher stream
663func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
664 events := make([]*Event, len(pbresp.Events))
665 for i, ev := range pbresp.Events {
666 events[i] = (*Event)(ev)
667 }
668 // TODO: return watch ID?
669 wr := &WatchResponse{
670 Header: *pbresp.Header,
671 Events: events,
672 CompactRevision: pbresp.CompactRevision,
673 Created: pbresp.Created,
674 Canceled: pbresp.Canceled,
675 cancelReason: pbresp.CancelReason,
676 }
677
678 // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
679 // indicate they should be broadcast.
680 if wr.IsProgressNotify() && pbresp.WatchId == -1 {
681 return w.broadcastResponse(wr)
682 }
683
684 return w.unicastResponse(wr, pbresp.WatchId)
685
686}
687
688// broadcastResponse send a watch response to all watch substreams.
689func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
690 for _, ws := range w.substreams {
691 select {
692 case ws.recvc <- wr:
693 case <-ws.donec:
694 }
695 }
696 return true
697}
698
699// unicastResponse sends a watch response to a specific watch substream.
700func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
701 ws, ok := w.substreams[watchId]
702 if !ok {
703 return false
704 }
705 select {
706 case ws.recvc <- wr:
707 case <-ws.donec:
708 return false
709 }
710 return true
711}
712
713// serveWatchClient forwards messages from the grpc stream to run()
714func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
715 for {
716 resp, err := wc.Recv()
717 if err != nil {
718 select {
719 case w.errc <- err:
720 case <-w.donec:
721 }
722 return
723 }
724 select {
725 case w.respc <- resp:
726 case <-w.donec:
727 return
728 }
729 }
730}
731
732// serveSubstream forwards watch responses from run() to the subscriber
733func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
734 if ws.closing {
735 panic("created substream goroutine but substream is closing")
736 }
737
738 // nextRev is the minimum expected next revision
739 nextRev := ws.initReq.rev
740 resuming := false
741 defer func() {
742 if !resuming {
743 ws.closing = true
744 }
745 close(ws.donec)
746 if !resuming {
747 w.closingc <- ws
748 }
749 w.wg.Done()
750 }()
751
752 emptyWr := &WatchResponse{}
753 for {
754 curWr := emptyWr
755 outc := ws.outc
756
757 if len(ws.buf) > 0 {
758 curWr = ws.buf[0]
759 } else {
760 outc = nil
761 }
762 select {
763 case outc <- *curWr:
764 if ws.buf[0].Err() != nil {
765 return
766 }
767 ws.buf[0] = nil
768 ws.buf = ws.buf[1:]
769 case wr, ok := <-ws.recvc:
770 if !ok {
771 // shutdown from closeSubstream
772 return
773 }
774
775 if wr.Created {
776 if ws.initReq.retc != nil {
777 ws.initReq.retc <- ws.outc
778 // to prevent next write from taking the slot in buffered channel
779 // and posting duplicate create events
780 ws.initReq.retc = nil
781
782 // send first creation event only if requested
783 if ws.initReq.createdNotify {
784 ws.outc <- *wr
785 }
786 // once the watch channel is returned, a current revision
787 // watch must resume at the store revision. This is necessary
788 // for the following case to work as expected:
789 // wch := m1.Watch("a")
790 // m2.Put("a", "b")
791 // <-wch
792 // If the revision is only bound on the first observed event,
793 // if wch is disconnected before the Put is issued, then reconnects
794 // after it is committed, it'll miss the Put.
795 if ws.initReq.rev == 0 {
796 nextRev = wr.Header.Revision
797 }
798 }
799 } else {
800 // current progress of watch; <= store revision
801 nextRev = wr.Header.Revision
802 }
803
804 if len(wr.Events) > 0 {
805 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
806 }
807 ws.initReq.rev = nextRev
808
809 // created event is already sent above,
810 // watcher should not post duplicate events
811 if wr.Created {
812 continue
813 }
814
815 // TODO pause channel if buffer gets too large
816 ws.buf = append(ws.buf, wr)
817 case <-w.ctx.Done():
818 return
819 case <-ws.initReq.ctx.Done():
820 return
821 case <-resumec:
822 resuming = true
823 return
824 }
825 }
826 // lazily send cancel message if events on missing id
827}
828
829func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
830 // mark all substreams as resuming
831 close(w.resumec)
832 w.resumec = make(chan struct{})
833 w.joinSubstreams()
834 for _, ws := range w.substreams {
835 ws.id = -1
836 w.resuming = append(w.resuming, ws)
837 }
838 // strip out nils, if any
839 var resuming []*watcherStream
840 for _, ws := range w.resuming {
841 if ws != nil {
842 resuming = append(resuming, ws)
843 }
844 }
845 w.resuming = resuming
846 w.substreams = make(map[int64]*watcherStream)
847
848 // connect to grpc stream while accepting watcher cancelation
849 stopc := make(chan struct{})
850 donec := w.waitCancelSubstreams(stopc)
851 wc, err := w.openWatchClient()
852 close(stopc)
853 <-donec
854
855 // serve all non-closing streams, even if there's a client error
856 // so that the teardown path can shutdown the streams as expected.
857 for _, ws := range w.resuming {
858 if ws.closing {
859 continue
860 }
861 ws.donec = make(chan struct{})
862 w.wg.Add(1)
863 go w.serveSubstream(ws, w.resumec)
864 }
865
866 if err != nil {
867 return nil, v3rpc.Error(err)
868 }
869
870 // receive data from new grpc stream
871 go w.serveWatchClient(wc)
872 return wc, nil
873}
874
875func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
876 var wg sync.WaitGroup
877 wg.Add(len(w.resuming))
878 donec := make(chan struct{})
879 for i := range w.resuming {
880 go func(ws *watcherStream) {
881 defer wg.Done()
882 if ws.closing {
883 if ws.initReq.ctx.Err() != nil && ws.outc != nil {
884 close(ws.outc)
885 ws.outc = nil
886 }
887 return
888 }
889 select {
890 case <-ws.initReq.ctx.Done():
891 // closed ws will be removed from resuming
892 ws.closing = true
893 close(ws.outc)
894 ws.outc = nil
895 w.wg.Add(1)
896 go func() {
897 defer w.wg.Done()
898 w.closingc <- ws
899 }()
900 case <-stopc:
901 }
902 }(w.resuming[i])
903 }
904 go func() {
905 defer close(donec)
906 wg.Wait()
907 }()
908 return donec
909}
910
911// joinSubstreams waits for all substream goroutines to complete.
912func (w *watchGrpcStream) joinSubstreams() {
913 for _, ws := range w.substreams {
914 <-ws.donec
915 }
916 for _, ws := range w.resuming {
917 if ws != nil {
918 <-ws.donec
919 }
920 }
921}
922
923var maxBackoff = 100 * time.Millisecond
924
925// openWatchClient retries opening a watch client until success or halt.
926// manually retry in case "ws==nil && err==nil"
927// TODO: remove FailFast=false
928func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
929 backoff := time.Millisecond
930 for {
931 select {
932 case <-w.ctx.Done():
933 if err == nil {
934 return nil, w.ctx.Err()
935 }
936 return nil, err
937 default:
938 }
939 if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
940 break
941 }
942 if isHaltErr(w.ctx, err) {
943 return nil, v3rpc.Error(err)
944 }
945 if isUnavailableErr(w.ctx, err) {
946 // retry, but backoff
947 if backoff < maxBackoff {
948 // 25% backoff factor
949 backoff = backoff + backoff/4
950 if backoff > maxBackoff {
951 backoff = maxBackoff
952 }
953 }
954 time.Sleep(backoff)
955 }
956 }
957 return ws, nil
958}
959
960// toPB converts an internal watch request structure to its protobuf WatchRequest structure.
961func (wr *watchRequest) toPB() *pb.WatchRequest {
962 req := &pb.WatchCreateRequest{
963 StartRevision: wr.rev,
964 Key: []byte(wr.key),
965 RangeEnd: []byte(wr.end),
966 ProgressNotify: wr.progressNotify,
967 Filters: wr.filters,
968 PrevKv: wr.prevKV,
969 Fragment: wr.fragment,
970 }
971 cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
972 return &pb.WatchRequest{RequestUnion: cr}
973}
974
975// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
976func (pr *progressRequest) toPB() *pb.WatchRequest {
977 req := &pb.WatchProgressRequest{}
978 cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
979 return &pb.WatchRequest{RequestUnion: cr}
980}
981
982func streamKeyFromCtx(ctx context.Context) string {
983 if md, ok := metadata.FromOutgoingContext(ctx); ok {
984 return fmt.Sprintf("%+v", md)
985 }
986 return ""
987}