blob: d50acbca32c3b4b0585103d74413387928c75f2c [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "errors"
20 "fmt"
21 "sync"
22 "time"
23
24 v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
25 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
26 mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/metadata"
31 "google.golang.org/grpc/status"
32)
33
34const (
35 EventTypeDelete = mvccpb.DELETE
36 EventTypePut = mvccpb.PUT
37
38 closeSendErrTimeout = 250 * time.Millisecond
39)
40
41type Event mvccpb.Event
42
43type WatchChan <-chan WatchResponse
44
45type Watcher interface {
46 // Watch watches on a key or prefix. The watched events will be returned
47 // through the returned channel. If revisions waiting to be sent over the
48 // watch are compacted, then the watch will be canceled by the server, the
49 // client will post a compacted error watch response, and the channel will close.
50 // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
51 // and "WatchResponse" from this closed channel has zero events and nil "Err()".
52 // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
53 // to release the associated resources.
54 //
55 // If the context is "context.Background/TODO", returned "WatchChan" will
56 // not be closed and block until event is triggered, except when server
57 // returns a non-recoverable error (e.g. ErrCompacted).
58 // For example, when context passed with "WithRequireLeader" and the
59 // connected server has no leader (e.g. due to network partition),
60 // error "etcdserver: no leader" (ErrNoLeader) will be returned,
61 // and then "WatchChan" is closed with non-nil "Err()".
62 // In order to prevent a watch stream being stuck in a partitioned node,
63 // make sure to wrap context with "WithRequireLeader".
64 //
65 // Otherwise, as long as the context has not been canceled or timed out,
66 // watch will retry on other recoverable errors forever until reconnected.
67 //
68 // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
69 // Currently, client contexts are overwritten with "valCtx" that never closes.
70 // TODO(v3.4): configure watch retry policy, limit maximum retry number
71 // (see https://github.com/etcd-io/etcd/issues/8980)
72 Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
73
74 // RequestProgress requests a progress notify response be sent in all watch channels.
75 RequestProgress(ctx context.Context) error
76
77 // Close closes the watcher and cancels all watch requests.
78 Close() error
79}
80
81type WatchResponse struct {
82 Header pb.ResponseHeader
83 Events []*Event
84
85 // CompactRevision is the minimum revision the watcher may receive.
86 CompactRevision int64
87
88 // Canceled is used to indicate watch failure.
89 // If the watch failed and the stream was about to close, before the channel is closed,
90 // the channel sends a final response that has Canceled set to true with a non-nil Err().
91 Canceled bool
92
93 // Created is used to indicate the creation of the watcher.
94 Created bool
95
96 closeErr error
97
98 // cancelReason is a reason of canceling watch
99 cancelReason string
100}
101
102// IsCreate returns true if the event tells that the key is newly created.
103func (e *Event) IsCreate() bool {
104 return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
105}
106
107// IsModify returns true if the event tells that a new value is put on existing key.
108func (e *Event) IsModify() bool {
109 return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
110}
111
112// Err is the error value if this WatchResponse holds an error.
113func (wr *WatchResponse) Err() error {
114 switch {
115 case wr.closeErr != nil:
116 return v3rpc.Error(wr.closeErr)
117 case wr.CompactRevision != 0:
118 return v3rpc.ErrCompacted
119 case wr.Canceled:
120 if len(wr.cancelReason) != 0 {
121 return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
122 }
123 return v3rpc.ErrFutureRev
124 }
125 return nil
126}
127
128// IsProgressNotify returns true if the WatchResponse is progress notification.
129func (wr *WatchResponse) IsProgressNotify() bool {
130 return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
131}
132
133// watcher implements the Watcher interface
134type watcher struct {
135 remote pb.WatchClient
136 callOpts []grpc.CallOption
137
138 // mu protects the grpc streams map
139 mu sync.RWMutex
140
141 // streams holds all the active grpc streams keyed by ctx value.
142 streams map[string]*watchGrpcStream
143}
144
145// watchGrpcStream tracks all watch resources attached to a single grpc stream.
146type watchGrpcStream struct {
147 owner *watcher
148 remote pb.WatchClient
149 callOpts []grpc.CallOption
150
151 // ctx controls internal remote.Watch requests
152 ctx context.Context
153 // ctxKey is the key used when looking up this stream's context
154 ctxKey string
155 cancel context.CancelFunc
156
157 // substreams holds all active watchers on this grpc stream
158 substreams map[int64]*watcherStream
159 // resuming holds all resuming watchers on this grpc stream
160 resuming []*watcherStream
161
162 // reqc sends a watch request from Watch() to the main goroutine
163 reqc chan watchStreamRequest
164 // respc receives data from the watch client
165 respc chan *pb.WatchResponse
166 // donec closes to broadcast shutdown
167 donec chan struct{}
168 // errc transmits errors from grpc Recv to the watch stream reconnect logic
169 errc chan error
170 // closingc gets the watcherStream of closing watchers
171 closingc chan *watcherStream
172 // wg is Done when all substream goroutines have exited
173 wg sync.WaitGroup
174
175 // resumec closes to signal that all substreams should begin resuming
176 resumec chan struct{}
177 // closeErr is the error that closed the watch stream
178 closeErr error
179}
180
181// watchStreamRequest is a union of the supported watch request operation types
182type watchStreamRequest interface {
183 toPB() *pb.WatchRequest
184}
185
186// watchRequest is issued by the subscriber to start a new watcher
187type watchRequest struct {
188 ctx context.Context
189 key string
190 end string
191 rev int64
192
193 // send created notification event if this field is true
194 createdNotify bool
195 // progressNotify is for progress updates
196 progressNotify bool
197 // fragmentation should be disabled by default
198 // if true, split watch events when total exceeds
199 // "--max-request-bytes" flag value + 512-byte
200 fragment bool
201
202 // filters is the list of events to filter out
203 filters []pb.WatchCreateRequest_FilterType
204 // get the previous key-value pair before the event happens
205 prevKV bool
206 // retc receives a chan WatchResponse once the watcher is established
207 retc chan chan WatchResponse
208}
209
210// progressRequest is issued by the subscriber to request watch progress
211type progressRequest struct {
212}
213
214// watcherStream represents a registered watcher
215type watcherStream struct {
216 // initReq is the request that initiated this request
217 initReq watchRequest
218
219 // outc publishes watch responses to subscriber
220 outc chan WatchResponse
221 // recvc buffers watch responses before publishing
222 recvc chan *WatchResponse
223 // donec closes when the watcherStream goroutine stops.
224 donec chan struct{}
225 // closing is set to true when stream should be scheduled to shutdown.
226 closing bool
227 // id is the registered watch id on the grpc stream
228 id int64
229
230 // buf holds all events received from etcd but not yet consumed by the client
231 buf []*WatchResponse
232}
233
234func NewWatcher(c *Client) Watcher {
235 return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
236}
237
238func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
239 w := &watcher{
240 remote: wc,
241 streams: make(map[string]*watchGrpcStream),
242 }
243 if c != nil {
244 w.callOpts = c.callOpts
245 }
246 return w
247}
248
249// never closes
250var valCtxCh = make(chan struct{})
251var zeroTime = time.Unix(0, 0)
252
253// ctx with only the values; never Done
254type valCtx struct{ context.Context }
255
256func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
257func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
258func (vc *valCtx) Err() error { return nil }
259
260func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
261 ctx, cancel := context.WithCancel(&valCtx{inctx})
262 wgs := &watchGrpcStream{
263 owner: w,
264 remote: w.remote,
265 callOpts: w.callOpts,
266 ctx: ctx,
267 ctxKey: streamKeyFromCtx(inctx),
268 cancel: cancel,
269 substreams: make(map[int64]*watcherStream),
270 respc: make(chan *pb.WatchResponse),
271 reqc: make(chan watchStreamRequest),
272 donec: make(chan struct{}),
273 errc: make(chan error, 1),
274 closingc: make(chan *watcherStream),
275 resumec: make(chan struct{}),
276 }
277 go wgs.run()
278 return wgs
279}
280
281// Watch posts a watch request to run() and waits for a new watcher channel
282func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
283 ow := opWatch(key, opts...)
284
285 var filters []pb.WatchCreateRequest_FilterType
286 if ow.filterPut {
287 filters = append(filters, pb.WatchCreateRequest_NOPUT)
288 }
289 if ow.filterDelete {
290 filters = append(filters, pb.WatchCreateRequest_NODELETE)
291 }
292
293 wr := &watchRequest{
294 ctx: ctx,
295 createdNotify: ow.createdNotify,
296 key: string(ow.key),
297 end: string(ow.end),
298 rev: ow.rev,
299 progressNotify: ow.progressNotify,
300 fragment: ow.fragment,
301 filters: filters,
302 prevKV: ow.prevKV,
303 retc: make(chan chan WatchResponse, 1),
304 }
305
306 ok := false
307 ctxKey := streamKeyFromCtx(ctx)
308
309 // find or allocate appropriate grpc watch stream
310 w.mu.Lock()
311 if w.streams == nil {
312 // closed
313 w.mu.Unlock()
314 ch := make(chan WatchResponse)
315 close(ch)
316 return ch
317 }
318 wgs := w.streams[ctxKey]
319 if wgs == nil {
320 wgs = w.newWatcherGrpcStream(ctx)
321 w.streams[ctxKey] = wgs
322 }
323 donec := wgs.donec
324 reqc := wgs.reqc
325 w.mu.Unlock()
326
327 // couldn't create channel; return closed channel
328 closeCh := make(chan WatchResponse, 1)
329
330 // submit request
331 select {
332 case reqc <- wr:
333 ok = true
334 case <-wr.ctx.Done():
335 case <-donec:
336 if wgs.closeErr != nil {
337 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
338 break
339 }
340 // retry; may have dropped stream from no ctxs
341 return w.Watch(ctx, key, opts...)
342 }
343
344 // receive channel
345 if ok {
346 select {
347 case ret := <-wr.retc:
348 return ret
349 case <-ctx.Done():
350 case <-donec:
351 if wgs.closeErr != nil {
352 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
353 break
354 }
355 // retry; may have dropped stream from no ctxs
356 return w.Watch(ctx, key, opts...)
357 }
358 }
359
360 close(closeCh)
361 return closeCh
362}
363
364func (w *watcher) Close() (err error) {
365 w.mu.Lock()
366 streams := w.streams
367 w.streams = nil
368 w.mu.Unlock()
369 for _, wgs := range streams {
370 if werr := wgs.close(); werr != nil {
371 err = werr
372 }
373 }
374 // Consider context.Canceled as a successful close
375 if err == context.Canceled {
376 err = nil
377 }
378 return err
379}
380
381// RequestProgress requests a progress notify response be sent in all watch channels.
382func (w *watcher) RequestProgress(ctx context.Context) (err error) {
383 ctxKey := streamKeyFromCtx(ctx)
384
385 w.mu.Lock()
386 if w.streams == nil {
387 return fmt.Errorf("no stream found for context")
388 }
389 wgs := w.streams[ctxKey]
390 if wgs == nil {
391 wgs = w.newWatcherGrpcStream(ctx)
392 w.streams[ctxKey] = wgs
393 }
394 donec := wgs.donec
395 reqc := wgs.reqc
396 w.mu.Unlock()
397
398 pr := &progressRequest{}
399
400 select {
401 case reqc <- pr:
402 return nil
403 case <-ctx.Done():
404 if err == nil {
405 return ctx.Err()
406 }
407 return err
408 case <-donec:
409 if wgs.closeErr != nil {
410 return wgs.closeErr
411 }
412 // retry; may have dropped stream from no ctxs
413 return w.RequestProgress(ctx)
414 }
415}
416
417func (w *watchGrpcStream) close() (err error) {
418 w.cancel()
419 <-w.donec
420 select {
421 case err = <-w.errc:
422 default:
423 }
424 return toErr(w.ctx, err)
425}
426
427func (w *watcher) closeStream(wgs *watchGrpcStream) {
428 w.mu.Lock()
429 close(wgs.donec)
430 wgs.cancel()
431 if w.streams != nil {
432 delete(w.streams, wgs.ctxKey)
433 }
434 w.mu.Unlock()
435}
436
437func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
438 // check watch ID for backward compatibility (<= v3.3)
439 if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
440 w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
441 // failed; no channel
442 close(ws.recvc)
443 return
444 }
445 ws.id = resp.WatchId
446 w.substreams[ws.id] = ws
447}
448
449func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
450 select {
451 case ws.outc <- *resp:
452 case <-ws.initReq.ctx.Done():
453 case <-time.After(closeSendErrTimeout):
454 }
455 close(ws.outc)
456}
457
458func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
459 // send channel response in case stream was never established
460 select {
461 case ws.initReq.retc <- ws.outc:
462 default:
463 }
464 // close subscriber's channel
465 if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
466 go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
467 } else if ws.outc != nil {
468 close(ws.outc)
469 }
470 if ws.id != -1 {
471 delete(w.substreams, ws.id)
472 return
473 }
474 for i := range w.resuming {
475 if w.resuming[i] == ws {
476 w.resuming[i] = nil
477 return
478 }
479 }
480}
481
482// run is the root of the goroutines for managing a watcher client
483func (w *watchGrpcStream) run() {
484 var wc pb.Watch_WatchClient
485 var closeErr error
486
487 // substreams marked to close but goroutine still running; needed for
488 // avoiding double-closing recvc on grpc stream teardown
489 closing := make(map[*watcherStream]struct{})
490
491 defer func() {
492 w.closeErr = closeErr
493 // shutdown substreams and resuming substreams
494 for _, ws := range w.substreams {
495 if _, ok := closing[ws]; !ok {
496 close(ws.recvc)
497 closing[ws] = struct{}{}
498 }
499 }
500 for _, ws := range w.resuming {
501 if _, ok := closing[ws]; ws != nil && !ok {
502 close(ws.recvc)
503 closing[ws] = struct{}{}
504 }
505 }
506 w.joinSubstreams()
507 for range closing {
508 w.closeSubstream(<-w.closingc)
509 }
510 w.wg.Wait()
511 w.owner.closeStream(w)
512 }()
513
514 // start a stream with the etcd grpc server
515 if wc, closeErr = w.newWatchClient(); closeErr != nil {
516 return
517 }
518
519 cancelSet := make(map[int64]struct{})
520
521 var cur *pb.WatchResponse
522 for {
523 select {
524 // Watch() requested
525 case req := <-w.reqc:
526 switch wreq := req.(type) {
527 case *watchRequest:
528 outc := make(chan WatchResponse, 1)
529 // TODO: pass custom watch ID?
530 ws := &watcherStream{
531 initReq: *wreq,
532 id: -1,
533 outc: outc,
534 // unbuffered so resumes won't cause repeat events
535 recvc: make(chan *WatchResponse),
536 }
537
538 ws.donec = make(chan struct{})
539 w.wg.Add(1)
540 go w.serveSubstream(ws, w.resumec)
541
542 // queue up for watcher creation/resume
543 w.resuming = append(w.resuming, ws)
544 if len(w.resuming) == 1 {
545 // head of resume queue, can register a new watcher
546 wc.Send(ws.initReq.toPB())
547 }
548 case *progressRequest:
549 wc.Send(wreq.toPB())
550 }
551
552 // new events from the watch client
553 case pbresp := <-w.respc:
554 if cur == nil || pbresp.Created || pbresp.Canceled {
555 cur = pbresp
556 } else if cur != nil && cur.WatchId == pbresp.WatchId {
557 // merge new events
558 cur.Events = append(cur.Events, pbresp.Events...)
559 // update "Fragment" field; last response with "Fragment" == false
560 cur.Fragment = pbresp.Fragment
561 }
562
563 switch {
564 case pbresp.Created:
565 // response to head of queue creation
566 if ws := w.resuming[0]; ws != nil {
567 w.addSubstream(pbresp, ws)
568 w.dispatchEvent(pbresp)
569 w.resuming[0] = nil
570 }
571
572 if ws := w.nextResume(); ws != nil {
573 wc.Send(ws.initReq.toPB())
574 }
575
576 // reset for next iteration
577 cur = nil
578
579 case pbresp.Canceled && pbresp.CompactRevision == 0:
580 delete(cancelSet, pbresp.WatchId)
581 if ws, ok := w.substreams[pbresp.WatchId]; ok {
582 // signal to stream goroutine to update closingc
583 close(ws.recvc)
584 closing[ws] = struct{}{}
585 }
586
587 // reset for next iteration
588 cur = nil
589
590 case cur.Fragment:
591 // watch response events are still fragmented
592 // continue to fetch next fragmented event arrival
593 continue
594
595 default:
596 // dispatch to appropriate watch stream
597 ok := w.dispatchEvent(cur)
598
599 // reset for next iteration
600 cur = nil
601
602 if ok {
603 break
604 }
605
606 // watch response on unexpected watch id; cancel id
607 if _, ok := cancelSet[pbresp.WatchId]; ok {
608 break
609 }
610
611 cancelSet[pbresp.WatchId] = struct{}{}
612 cr := &pb.WatchRequest_CancelRequest{
613 CancelRequest: &pb.WatchCancelRequest{
614 WatchId: pbresp.WatchId,
615 },
616 }
617 req := &pb.WatchRequest{RequestUnion: cr}
618 wc.Send(req)
619 }
620
621 // watch client failed on Recv; spawn another if possible
622 case err := <-w.errc:
623 if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
624 closeErr = err
625 return
626 }
627 if wc, closeErr = w.newWatchClient(); closeErr != nil {
628 return
629 }
630 if ws := w.nextResume(); ws != nil {
631 wc.Send(ws.initReq.toPB())
632 }
633 cancelSet = make(map[int64]struct{})
634
635 case <-w.ctx.Done():
636 return
637
638 case ws := <-w.closingc:
639 w.closeSubstream(ws)
640 delete(closing, ws)
641 // no more watchers on this stream, shutdown
642 if len(w.substreams)+len(w.resuming) == 0 {
643 return
644 }
645 }
646 }
647}
648
649// nextResume chooses the next resuming to register with the grpc stream. Abandoned
650// streams are marked as nil in the queue since the head must wait for its inflight registration.
651func (w *watchGrpcStream) nextResume() *watcherStream {
652 for len(w.resuming) != 0 {
653 if w.resuming[0] != nil {
654 return w.resuming[0]
655 }
656 w.resuming = w.resuming[1:len(w.resuming)]
657 }
658 return nil
659}
660
661// dispatchEvent sends a WatchResponse to the appropriate watcher stream
662func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
663 events := make([]*Event, len(pbresp.Events))
664 for i, ev := range pbresp.Events {
665 events[i] = (*Event)(ev)
666 }
667 // TODO: return watch ID?
668 wr := &WatchResponse{
669 Header: *pbresp.Header,
670 Events: events,
671 CompactRevision: pbresp.CompactRevision,
672 Created: pbresp.Created,
673 Canceled: pbresp.Canceled,
674 cancelReason: pbresp.CancelReason,
675 }
676
677 // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
678 // indicate they should be broadcast.
679 if wr.IsProgressNotify() && pbresp.WatchId == -1 {
680 return w.broadcastResponse(wr)
681 }
682
683 return w.unicastResponse(wr, pbresp.WatchId)
684
685}
686
687// broadcastResponse send a watch response to all watch substreams.
688func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
689 for _, ws := range w.substreams {
690 select {
691 case ws.recvc <- wr:
692 case <-ws.donec:
693 }
694 }
695 return true
696}
697
698// unicastResponse sends a watch response to a specific watch substream.
699func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
700 ws, ok := w.substreams[watchId]
701 if !ok {
702 return false
703 }
704 select {
705 case ws.recvc <- wr:
706 case <-ws.donec:
707 return false
708 }
709 return true
710}
711
712// serveWatchClient forwards messages from the grpc stream to run()
713func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
714 for {
715 resp, err := wc.Recv()
716 if err != nil {
717 select {
718 case w.errc <- err:
719 case <-w.donec:
720 }
721 return
722 }
723 select {
724 case w.respc <- resp:
725 case <-w.donec:
726 return
727 }
728 }
729}
730
731// serveSubstream forwards watch responses from run() to the subscriber
732func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
733 if ws.closing {
734 panic("created substream goroutine but substream is closing")
735 }
736
737 // nextRev is the minimum expected next revision
738 nextRev := ws.initReq.rev
739 resuming := false
740 defer func() {
741 if !resuming {
742 ws.closing = true
743 }
744 close(ws.donec)
745 if !resuming {
746 w.closingc <- ws
747 }
748 w.wg.Done()
749 }()
750
751 emptyWr := &WatchResponse{}
752 for {
753 curWr := emptyWr
754 outc := ws.outc
755
756 if len(ws.buf) > 0 {
757 curWr = ws.buf[0]
758 } else {
759 outc = nil
760 }
761 select {
762 case outc <- *curWr:
763 if ws.buf[0].Err() != nil {
764 return
765 }
766 ws.buf[0] = nil
767 ws.buf = ws.buf[1:]
768 case wr, ok := <-ws.recvc:
769 if !ok {
770 // shutdown from closeSubstream
771 return
772 }
773
774 if wr.Created {
775 if ws.initReq.retc != nil {
776 ws.initReq.retc <- ws.outc
777 // to prevent next write from taking the slot in buffered channel
778 // and posting duplicate create events
779 ws.initReq.retc = nil
780
781 // send first creation event only if requested
782 if ws.initReq.createdNotify {
783 ws.outc <- *wr
784 }
785 // once the watch channel is returned, a current revision
786 // watch must resume at the store revision. This is necessary
787 // for the following case to work as expected:
788 // wch := m1.Watch("a")
789 // m2.Put("a", "b")
790 // <-wch
791 // If the revision is only bound on the first observed event,
792 // if wch is disconnected before the Put is issued, then reconnects
793 // after it is committed, it'll miss the Put.
794 if ws.initReq.rev == 0 {
795 nextRev = wr.Header.Revision
796 }
797 }
798 } else {
799 // current progress of watch; <= store revision
800 nextRev = wr.Header.Revision
801 }
802
803 if len(wr.Events) > 0 {
804 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
805 }
806 ws.initReq.rev = nextRev
807
808 // created event is already sent above,
809 // watcher should not post duplicate events
810 if wr.Created {
811 continue
812 }
813
814 // TODO pause channel if buffer gets too large
815 ws.buf = append(ws.buf, wr)
816 case <-w.ctx.Done():
817 return
818 case <-ws.initReq.ctx.Done():
819 return
820 case <-resumec:
821 resuming = true
822 return
823 }
824 }
825 // lazily send cancel message if events on missing id
826}
827
828func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
829 // mark all substreams as resuming
830 close(w.resumec)
831 w.resumec = make(chan struct{})
832 w.joinSubstreams()
833 for _, ws := range w.substreams {
834 ws.id = -1
835 w.resuming = append(w.resuming, ws)
836 }
837 // strip out nils, if any
838 var resuming []*watcherStream
839 for _, ws := range w.resuming {
840 if ws != nil {
841 resuming = append(resuming, ws)
842 }
843 }
844 w.resuming = resuming
845 w.substreams = make(map[int64]*watcherStream)
846
847 // connect to grpc stream while accepting watcher cancelation
848 stopc := make(chan struct{})
849 donec := w.waitCancelSubstreams(stopc)
850 wc, err := w.openWatchClient()
851 close(stopc)
852 <-donec
853
854 // serve all non-closing streams, even if there's a client error
855 // so that the teardown path can shutdown the streams as expected.
856 for _, ws := range w.resuming {
857 if ws.closing {
858 continue
859 }
860 ws.donec = make(chan struct{})
861 w.wg.Add(1)
862 go w.serveSubstream(ws, w.resumec)
863 }
864
865 if err != nil {
866 return nil, v3rpc.Error(err)
867 }
868
869 // receive data from new grpc stream
870 go w.serveWatchClient(wc)
871 return wc, nil
872}
873
874func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
875 var wg sync.WaitGroup
876 wg.Add(len(w.resuming))
877 donec := make(chan struct{})
878 for i := range w.resuming {
879 go func(ws *watcherStream) {
880 defer wg.Done()
881 if ws.closing {
882 if ws.initReq.ctx.Err() != nil && ws.outc != nil {
883 close(ws.outc)
884 ws.outc = nil
885 }
886 return
887 }
888 select {
889 case <-ws.initReq.ctx.Done():
890 // closed ws will be removed from resuming
891 ws.closing = true
892 close(ws.outc)
893 ws.outc = nil
894 w.wg.Add(1)
895 go func() {
896 defer w.wg.Done()
897 w.closingc <- ws
898 }()
899 case <-stopc:
900 }
901 }(w.resuming[i])
902 }
903 go func() {
904 defer close(donec)
905 wg.Wait()
906 }()
907 return donec
908}
909
910// joinSubstreams waits for all substream goroutines to complete.
911func (w *watchGrpcStream) joinSubstreams() {
912 for _, ws := range w.substreams {
913 <-ws.donec
914 }
915 for _, ws := range w.resuming {
916 if ws != nil {
917 <-ws.donec
918 }
919 }
920}
921
922var maxBackoff = 100 * time.Millisecond
923
924// openWatchClient retries opening a watch client until success or halt.
925// manually retry in case "ws==nil && err==nil"
926// TODO: remove FailFast=false
927func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
928 backoff := time.Millisecond
929 for {
930 select {
931 case <-w.ctx.Done():
932 if err == nil {
933 return nil, w.ctx.Err()
934 }
935 return nil, err
936 default:
937 }
938 if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
939 break
940 }
941 if isHaltErr(w.ctx, err) {
942 return nil, v3rpc.Error(err)
943 }
944 if isUnavailableErr(w.ctx, err) {
945 // retry, but backoff
946 if backoff < maxBackoff {
947 // 25% backoff factor
948 backoff = backoff + backoff/4
949 if backoff > maxBackoff {
950 backoff = maxBackoff
951 }
952 }
953 time.Sleep(backoff)
954 }
955 }
956 return ws, nil
957}
958
959// toPB converts an internal watch request structure to its protobuf WatchRequest structure.
960func (wr *watchRequest) toPB() *pb.WatchRequest {
961 req := &pb.WatchCreateRequest{
962 StartRevision: wr.rev,
963 Key: []byte(wr.key),
964 RangeEnd: []byte(wr.end),
965 ProgressNotify: wr.progressNotify,
966 Filters: wr.filters,
967 PrevKv: wr.prevKV,
968 Fragment: wr.fragment,
969 }
970 cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
971 return &pb.WatchRequest{RequestUnion: cr}
972}
973
974// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
975func (pr *progressRequest) toPB() *pb.WatchRequest {
976 req := &pb.WatchProgressRequest{}
977 cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
978 return &pb.WatchRequest{RequestUnion: cr}
979}
980
981func streamKeyFromCtx(ctx context.Context) string {
982 if md, ok := metadata.FromOutgoingContext(ctx); ok {
983 return fmt.Sprintf("%+v", md)
984 }
985 return ""
986}