blob: 8ec58bb14695184c00fbfcfb19cd00cf9bda7291 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
Stephane Barbarie260a5632019-02-26 16:12:49 -050019 "errors"
khenaidooac637102019-01-14 15:44:34 -050020 "fmt"
21 "sync"
22 "time"
23
Stephane Barbarie260a5632019-02-26 16:12:49 -050024 v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
25 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
26 mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
khenaidooac637102019-01-14 15:44:34 -050027
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/metadata"
31 "google.golang.org/grpc/status"
32)
33
34const (
35 EventTypeDelete = mvccpb.DELETE
36 EventTypePut = mvccpb.PUT
37
38 closeSendErrTimeout = 250 * time.Millisecond
39)
40
41type Event mvccpb.Event
42
43type WatchChan <-chan WatchResponse
44
45type Watcher interface {
46 // Watch watches on a key or prefix. The watched events will be returned
47 // through the returned channel. If revisions waiting to be sent over the
48 // watch are compacted, then the watch will be canceled by the server, the
49 // client will post a compacted error watch response, and the channel will close.
Stephane Barbarie260a5632019-02-26 16:12:49 -050050 // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
51 // and "WatchResponse" from this closed channel has zero events and nil "Err()".
52 // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
53 // to release the associated resources.
54 //
55 // If the context is "context.Background/TODO", returned "WatchChan" will
56 // not be closed and block until event is triggered, except when server
57 // returns a non-recoverable error (e.g. ErrCompacted).
58 // For example, when context passed with "WithRequireLeader" and the
59 // connected server has no leader (e.g. due to network partition),
60 // error "etcdserver: no leader" (ErrNoLeader) will be returned,
61 // and then "WatchChan" is closed with non-nil "Err()".
62 // In order to prevent a watch stream being stuck in a partitioned node,
63 // make sure to wrap context with "WithRequireLeader".
64 //
65 // Otherwise, as long as the context has not been canceled or timed out,
66 // watch will retry on other recoverable errors forever until reconnected.
67 //
68 // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
69 // Currently, client contexts are overwritten with "valCtx" that never closes.
70 // TODO(v3.4): configure watch retry policy, limit maximum retry number
71 // (see https://github.com/etcd-io/etcd/issues/8980)
khenaidooac637102019-01-14 15:44:34 -050072 Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
73
Stephane Barbarie260a5632019-02-26 16:12:49 -050074 // RequestProgress requests a progress notify response be sent in all watch channels.
75 RequestProgress(ctx context.Context) error
76
khenaidooac637102019-01-14 15:44:34 -050077 // Close closes the watcher and cancels all watch requests.
78 Close() error
79}
80
81type WatchResponse struct {
82 Header pb.ResponseHeader
83 Events []*Event
84
85 // CompactRevision is the minimum revision the watcher may receive.
86 CompactRevision int64
87
88 // Canceled is used to indicate watch failure.
89 // If the watch failed and the stream was about to close, before the channel is closed,
90 // the channel sends a final response that has Canceled set to true with a non-nil Err().
91 Canceled bool
92
93 // Created is used to indicate the creation of the watcher.
94 Created bool
95
96 closeErr error
97
98 // cancelReason is a reason of canceling watch
99 cancelReason string
100}
101
102// IsCreate returns true if the event tells that the key is newly created.
103func (e *Event) IsCreate() bool {
104 return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
105}
106
107// IsModify returns true if the event tells that a new value is put on existing key.
108func (e *Event) IsModify() bool {
109 return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
110}
111
112// Err is the error value if this WatchResponse holds an error.
113func (wr *WatchResponse) Err() error {
114 switch {
115 case wr.closeErr != nil:
116 return v3rpc.Error(wr.closeErr)
117 case wr.CompactRevision != 0:
118 return v3rpc.ErrCompacted
119 case wr.Canceled:
120 if len(wr.cancelReason) != 0 {
121 return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
122 }
123 return v3rpc.ErrFutureRev
124 }
125 return nil
126}
127
128// IsProgressNotify returns true if the WatchResponse is progress notification.
129func (wr *WatchResponse) IsProgressNotify() bool {
130 return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
131}
132
133// watcher implements the Watcher interface
134type watcher struct {
135 remote pb.WatchClient
136 callOpts []grpc.CallOption
137
138 // mu protects the grpc streams map
139 mu sync.RWMutex
140
141 // streams holds all the active grpc streams keyed by ctx value.
142 streams map[string]*watchGrpcStream
143}
144
145// watchGrpcStream tracks all watch resources attached to a single grpc stream.
146type watchGrpcStream struct {
147 owner *watcher
148 remote pb.WatchClient
149 callOpts []grpc.CallOption
150
151 // ctx controls internal remote.Watch requests
152 ctx context.Context
153 // ctxKey is the key used when looking up this stream's context
154 ctxKey string
155 cancel context.CancelFunc
156
157 // substreams holds all active watchers on this grpc stream
158 substreams map[int64]*watcherStream
159 // resuming holds all resuming watchers on this grpc stream
160 resuming []*watcherStream
161
162 // reqc sends a watch request from Watch() to the main goroutine
Stephane Barbarie260a5632019-02-26 16:12:49 -0500163 reqc chan watchStreamRequest
khenaidooac637102019-01-14 15:44:34 -0500164 // respc receives data from the watch client
165 respc chan *pb.WatchResponse
166 // donec closes to broadcast shutdown
167 donec chan struct{}
168 // errc transmits errors from grpc Recv to the watch stream reconnect logic
169 errc chan error
170 // closingc gets the watcherStream of closing watchers
171 closingc chan *watcherStream
172 // wg is Done when all substream goroutines have exited
173 wg sync.WaitGroup
174
175 // resumec closes to signal that all substreams should begin resuming
176 resumec chan struct{}
177 // closeErr is the error that closed the watch stream
178 closeErr error
179}
180
Stephane Barbarie260a5632019-02-26 16:12:49 -0500181// watchStreamRequest is a union of the supported watch request operation types
182type watchStreamRequest interface {
183 toPB() *pb.WatchRequest
184}
185
khenaidooac637102019-01-14 15:44:34 -0500186// watchRequest is issued by the subscriber to start a new watcher
187type watchRequest struct {
188 ctx context.Context
189 key string
190 end string
191 rev int64
Stephane Barbarie260a5632019-02-26 16:12:49 -0500192
khenaidooac637102019-01-14 15:44:34 -0500193 // send created notification event if this field is true
194 createdNotify bool
195 // progressNotify is for progress updates
196 progressNotify bool
Stephane Barbarie260a5632019-02-26 16:12:49 -0500197 // fragmentation should be disabled by default
198 // if true, split watch events when total exceeds
199 // "--max-request-bytes" flag value + 512-byte
200 fragment bool
201
khenaidooac637102019-01-14 15:44:34 -0500202 // filters is the list of events to filter out
203 filters []pb.WatchCreateRequest_FilterType
204 // get the previous key-value pair before the event happens
205 prevKV bool
206 // retc receives a chan WatchResponse once the watcher is established
207 retc chan chan WatchResponse
208}
209
Stephane Barbarie260a5632019-02-26 16:12:49 -0500210// progressRequest is issued by the subscriber to request watch progress
211type progressRequest struct {
212}
213
khenaidooac637102019-01-14 15:44:34 -0500214// watcherStream represents a registered watcher
215type watcherStream struct {
216 // initReq is the request that initiated this request
217 initReq watchRequest
218
219 // outc publishes watch responses to subscriber
220 outc chan WatchResponse
221 // recvc buffers watch responses before publishing
222 recvc chan *WatchResponse
223 // donec closes when the watcherStream goroutine stops.
224 donec chan struct{}
225 // closing is set to true when stream should be scheduled to shutdown.
226 closing bool
227 // id is the registered watch id on the grpc stream
228 id int64
229
230 // buf holds all events received from etcd but not yet consumed by the client
231 buf []*WatchResponse
232}
233
234func NewWatcher(c *Client) Watcher {
235 return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
236}
237
238func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
239 w := &watcher{
240 remote: wc,
241 streams: make(map[string]*watchGrpcStream),
242 }
243 if c != nil {
244 w.callOpts = c.callOpts
245 }
246 return w
247}
248
249// never closes
250var valCtxCh = make(chan struct{})
251var zeroTime = time.Unix(0, 0)
252
253// ctx with only the values; never Done
254type valCtx struct{ context.Context }
255
256func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
257func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
258func (vc *valCtx) Err() error { return nil }
259
260func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
261 ctx, cancel := context.WithCancel(&valCtx{inctx})
262 wgs := &watchGrpcStream{
263 owner: w,
264 remote: w.remote,
265 callOpts: w.callOpts,
266 ctx: ctx,
267 ctxKey: streamKeyFromCtx(inctx),
268 cancel: cancel,
269 substreams: make(map[int64]*watcherStream),
270 respc: make(chan *pb.WatchResponse),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500271 reqc: make(chan watchStreamRequest),
khenaidooac637102019-01-14 15:44:34 -0500272 donec: make(chan struct{}),
273 errc: make(chan error, 1),
274 closingc: make(chan *watcherStream),
275 resumec: make(chan struct{}),
276 }
277 go wgs.run()
278 return wgs
279}
280
281// Watch posts a watch request to run() and waits for a new watcher channel
282func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
283 ow := opWatch(key, opts...)
284
285 var filters []pb.WatchCreateRequest_FilterType
286 if ow.filterPut {
287 filters = append(filters, pb.WatchCreateRequest_NOPUT)
288 }
289 if ow.filterDelete {
290 filters = append(filters, pb.WatchCreateRequest_NODELETE)
291 }
292
293 wr := &watchRequest{
294 ctx: ctx,
295 createdNotify: ow.createdNotify,
296 key: string(ow.key),
297 end: string(ow.end),
298 rev: ow.rev,
299 progressNotify: ow.progressNotify,
Stephane Barbarie260a5632019-02-26 16:12:49 -0500300 fragment: ow.fragment,
khenaidooac637102019-01-14 15:44:34 -0500301 filters: filters,
302 prevKV: ow.prevKV,
303 retc: make(chan chan WatchResponse, 1),
304 }
305
306 ok := false
307 ctxKey := streamKeyFromCtx(ctx)
308
309 // find or allocate appropriate grpc watch stream
310 w.mu.Lock()
311 if w.streams == nil {
312 // closed
313 w.mu.Unlock()
314 ch := make(chan WatchResponse)
315 close(ch)
316 return ch
317 }
318 wgs := w.streams[ctxKey]
319 if wgs == nil {
320 wgs = w.newWatcherGrpcStream(ctx)
321 w.streams[ctxKey] = wgs
322 }
323 donec := wgs.donec
324 reqc := wgs.reqc
325 w.mu.Unlock()
326
327 // couldn't create channel; return closed channel
328 closeCh := make(chan WatchResponse, 1)
329
330 // submit request
331 select {
332 case reqc <- wr:
333 ok = true
334 case <-wr.ctx.Done():
335 case <-donec:
336 if wgs.closeErr != nil {
Stephane Barbarie260a5632019-02-26 16:12:49 -0500337 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
khenaidooac637102019-01-14 15:44:34 -0500338 break
339 }
340 // retry; may have dropped stream from no ctxs
341 return w.Watch(ctx, key, opts...)
342 }
343
344 // receive channel
345 if ok {
346 select {
347 case ret := <-wr.retc:
348 return ret
349 case <-ctx.Done():
350 case <-donec:
351 if wgs.closeErr != nil {
Stephane Barbarie260a5632019-02-26 16:12:49 -0500352 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
khenaidooac637102019-01-14 15:44:34 -0500353 break
354 }
355 // retry; may have dropped stream from no ctxs
356 return w.Watch(ctx, key, opts...)
357 }
358 }
359
360 close(closeCh)
361 return closeCh
362}
363
364func (w *watcher) Close() (err error) {
365 w.mu.Lock()
366 streams := w.streams
367 w.streams = nil
368 w.mu.Unlock()
369 for _, wgs := range streams {
370 if werr := wgs.close(); werr != nil {
371 err = werr
372 }
373 }
374 return err
375}
376
Stephane Barbarie260a5632019-02-26 16:12:49 -0500377// RequestProgress requests a progress notify response be sent in all watch channels.
378func (w *watcher) RequestProgress(ctx context.Context) (err error) {
379 ctxKey := streamKeyFromCtx(ctx)
380
381 w.mu.Lock()
382 if w.streams == nil {
383 return fmt.Errorf("no stream found for context")
384 }
385 wgs := w.streams[ctxKey]
386 if wgs == nil {
387 wgs = w.newWatcherGrpcStream(ctx)
388 w.streams[ctxKey] = wgs
389 }
390 donec := wgs.donec
391 reqc := wgs.reqc
392 w.mu.Unlock()
393
394 pr := &progressRequest{}
395
396 select {
397 case reqc <- pr:
398 return nil
399 case <-ctx.Done():
400 if err == nil {
401 return ctx.Err()
402 }
403 return err
404 case <-donec:
405 if wgs.closeErr != nil {
406 return wgs.closeErr
407 }
408 // retry; may have dropped stream from no ctxs
409 return w.RequestProgress(ctx)
410 }
411}
412
khenaidooac637102019-01-14 15:44:34 -0500413func (w *watchGrpcStream) close() (err error) {
414 w.cancel()
415 <-w.donec
416 select {
417 case err = <-w.errc:
418 default:
419 }
420 return toErr(w.ctx, err)
421}
422
423func (w *watcher) closeStream(wgs *watchGrpcStream) {
424 w.mu.Lock()
425 close(wgs.donec)
426 wgs.cancel()
427 if w.streams != nil {
428 delete(w.streams, wgs.ctxKey)
429 }
430 w.mu.Unlock()
431}
432
433func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
Stephane Barbarie260a5632019-02-26 16:12:49 -0500434 // check watch ID for backward compatibility (<= v3.3)
435 if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
436 w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
khenaidooac637102019-01-14 15:44:34 -0500437 // failed; no channel
438 close(ws.recvc)
439 return
440 }
441 ws.id = resp.WatchId
442 w.substreams[ws.id] = ws
443}
444
445func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
446 select {
447 case ws.outc <- *resp:
448 case <-ws.initReq.ctx.Done():
449 case <-time.After(closeSendErrTimeout):
450 }
451 close(ws.outc)
452}
453
454func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
455 // send channel response in case stream was never established
456 select {
457 case ws.initReq.retc <- ws.outc:
458 default:
459 }
460 // close subscriber's channel
461 if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
Stephane Barbarie260a5632019-02-26 16:12:49 -0500462 go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
khenaidooac637102019-01-14 15:44:34 -0500463 } else if ws.outc != nil {
464 close(ws.outc)
465 }
466 if ws.id != -1 {
467 delete(w.substreams, ws.id)
468 return
469 }
470 for i := range w.resuming {
471 if w.resuming[i] == ws {
472 w.resuming[i] = nil
473 return
474 }
475 }
476}
477
478// run is the root of the goroutines for managing a watcher client
479func (w *watchGrpcStream) run() {
480 var wc pb.Watch_WatchClient
481 var closeErr error
482
483 // substreams marked to close but goroutine still running; needed for
484 // avoiding double-closing recvc on grpc stream teardown
485 closing := make(map[*watcherStream]struct{})
486
487 defer func() {
488 w.closeErr = closeErr
489 // shutdown substreams and resuming substreams
490 for _, ws := range w.substreams {
491 if _, ok := closing[ws]; !ok {
492 close(ws.recvc)
493 closing[ws] = struct{}{}
494 }
495 }
496 for _, ws := range w.resuming {
497 if _, ok := closing[ws]; ws != nil && !ok {
498 close(ws.recvc)
499 closing[ws] = struct{}{}
500 }
501 }
502 w.joinSubstreams()
503 for range closing {
504 w.closeSubstream(<-w.closingc)
505 }
506 w.wg.Wait()
507 w.owner.closeStream(w)
508 }()
509
510 // start a stream with the etcd grpc server
511 if wc, closeErr = w.newWatchClient(); closeErr != nil {
512 return
513 }
514
515 cancelSet := make(map[int64]struct{})
516
Stephane Barbarie260a5632019-02-26 16:12:49 -0500517 var cur *pb.WatchResponse
khenaidooac637102019-01-14 15:44:34 -0500518 for {
519 select {
520 // Watch() requested
Stephane Barbarie260a5632019-02-26 16:12:49 -0500521 case req := <-w.reqc:
522 switch wreq := req.(type) {
523 case *watchRequest:
524 outc := make(chan WatchResponse, 1)
525 // TODO: pass custom watch ID?
526 ws := &watcherStream{
527 initReq: *wreq,
528 id: -1,
529 outc: outc,
530 // unbuffered so resumes won't cause repeat events
531 recvc: make(chan *WatchResponse),
532 }
533
534 ws.donec = make(chan struct{})
535 w.wg.Add(1)
536 go w.serveSubstream(ws, w.resumec)
537
538 // queue up for watcher creation/resume
539 w.resuming = append(w.resuming, ws)
540 if len(w.resuming) == 1 {
541 // head of resume queue, can register a new watcher
542 wc.Send(ws.initReq.toPB())
543 }
544 case *progressRequest:
545 wc.Send(wreq.toPB())
khenaidooac637102019-01-14 15:44:34 -0500546 }
547
Stephane Barbarie260a5632019-02-26 16:12:49 -0500548 // new events from the watch client
khenaidooac637102019-01-14 15:44:34 -0500549 case pbresp := <-w.respc:
Stephane Barbarie260a5632019-02-26 16:12:49 -0500550 if cur == nil || pbresp.Created || pbresp.Canceled {
551 cur = pbresp
552 } else if cur != nil && cur.WatchId == pbresp.WatchId {
553 // merge new events
554 cur.Events = append(cur.Events, pbresp.Events...)
555 // update "Fragment" field; last response with "Fragment" == false
556 cur.Fragment = pbresp.Fragment
557 }
558
khenaidooac637102019-01-14 15:44:34 -0500559 switch {
560 case pbresp.Created:
561 // response to head of queue creation
562 if ws := w.resuming[0]; ws != nil {
563 w.addSubstream(pbresp, ws)
564 w.dispatchEvent(pbresp)
565 w.resuming[0] = nil
566 }
Stephane Barbarie260a5632019-02-26 16:12:49 -0500567
khenaidooac637102019-01-14 15:44:34 -0500568 if ws := w.nextResume(); ws != nil {
569 wc.Send(ws.initReq.toPB())
570 }
Stephane Barbarie260a5632019-02-26 16:12:49 -0500571
572 // reset for next iteration
573 cur = nil
574
khenaidooac637102019-01-14 15:44:34 -0500575 case pbresp.Canceled && pbresp.CompactRevision == 0:
576 delete(cancelSet, pbresp.WatchId)
577 if ws, ok := w.substreams[pbresp.WatchId]; ok {
578 // signal to stream goroutine to update closingc
579 close(ws.recvc)
580 closing[ws] = struct{}{}
581 }
Stephane Barbarie260a5632019-02-26 16:12:49 -0500582
583 // reset for next iteration
584 cur = nil
585
586 case cur.Fragment:
587 // watch response events are still fragmented
588 // continue to fetch next fragmented event arrival
589 continue
590
khenaidooac637102019-01-14 15:44:34 -0500591 default:
592 // dispatch to appropriate watch stream
Stephane Barbarie260a5632019-02-26 16:12:49 -0500593 ok := w.dispatchEvent(cur)
594
595 // reset for next iteration
596 cur = nil
597
598 if ok {
khenaidooac637102019-01-14 15:44:34 -0500599 break
600 }
Stephane Barbarie260a5632019-02-26 16:12:49 -0500601
khenaidooac637102019-01-14 15:44:34 -0500602 // watch response on unexpected watch id; cancel id
603 if _, ok := cancelSet[pbresp.WatchId]; ok {
604 break
605 }
Stephane Barbarie260a5632019-02-26 16:12:49 -0500606
khenaidooac637102019-01-14 15:44:34 -0500607 cancelSet[pbresp.WatchId] = struct{}{}
608 cr := &pb.WatchRequest_CancelRequest{
609 CancelRequest: &pb.WatchCancelRequest{
610 WatchId: pbresp.WatchId,
611 },
612 }
613 req := &pb.WatchRequest{RequestUnion: cr}
614 wc.Send(req)
615 }
Stephane Barbarie260a5632019-02-26 16:12:49 -0500616
khenaidooac637102019-01-14 15:44:34 -0500617 // watch client failed on Recv; spawn another if possible
618 case err := <-w.errc:
619 if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
620 closeErr = err
621 return
622 }
623 if wc, closeErr = w.newWatchClient(); closeErr != nil {
624 return
625 }
626 if ws := w.nextResume(); ws != nil {
627 wc.Send(ws.initReq.toPB())
628 }
629 cancelSet = make(map[int64]struct{})
Stephane Barbarie260a5632019-02-26 16:12:49 -0500630
khenaidooac637102019-01-14 15:44:34 -0500631 case <-w.ctx.Done():
632 return
Stephane Barbarie260a5632019-02-26 16:12:49 -0500633
khenaidooac637102019-01-14 15:44:34 -0500634 case ws := <-w.closingc:
635 w.closeSubstream(ws)
636 delete(closing, ws)
Stephane Barbarie260a5632019-02-26 16:12:49 -0500637 // no more watchers on this stream, shutdown
khenaidooac637102019-01-14 15:44:34 -0500638 if len(w.substreams)+len(w.resuming) == 0 {
khenaidooac637102019-01-14 15:44:34 -0500639 return
640 }
641 }
642 }
643}
644
645// nextResume chooses the next resuming to register with the grpc stream. Abandoned
646// streams are marked as nil in the queue since the head must wait for its inflight registration.
647func (w *watchGrpcStream) nextResume() *watcherStream {
648 for len(w.resuming) != 0 {
649 if w.resuming[0] != nil {
650 return w.resuming[0]
651 }
652 w.resuming = w.resuming[1:len(w.resuming)]
653 }
654 return nil
655}
656
657// dispatchEvent sends a WatchResponse to the appropriate watcher stream
658func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
659 events := make([]*Event, len(pbresp.Events))
660 for i, ev := range pbresp.Events {
661 events[i] = (*Event)(ev)
662 }
Stephane Barbarie260a5632019-02-26 16:12:49 -0500663 // TODO: return watch ID?
khenaidooac637102019-01-14 15:44:34 -0500664 wr := &WatchResponse{
665 Header: *pbresp.Header,
666 Events: events,
667 CompactRevision: pbresp.CompactRevision,
668 Created: pbresp.Created,
669 Canceled: pbresp.Canceled,
670 cancelReason: pbresp.CancelReason,
671 }
Stephane Barbarie260a5632019-02-26 16:12:49 -0500672
673 // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
674 // indicate they should be broadcast.
675 if wr.IsProgressNotify() && pbresp.WatchId == -1 {
676 return w.broadcastResponse(wr)
677 }
678
679 return w.unicastResponse(wr, pbresp.WatchId)
680
681}
682
683// broadcastResponse send a watch response to all watch substreams.
684func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
685 for _, ws := range w.substreams {
686 select {
687 case ws.recvc <- wr:
688 case <-ws.donec:
689 }
690 }
691 return true
692}
693
694// unicastResponse sends a watch response to a specific watch substream.
695func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
696 ws, ok := w.substreams[watchId]
khenaidooac637102019-01-14 15:44:34 -0500697 if !ok {
698 return false
699 }
700 select {
701 case ws.recvc <- wr:
702 case <-ws.donec:
703 return false
704 }
705 return true
706}
707
708// serveWatchClient forwards messages from the grpc stream to run()
709func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
710 for {
711 resp, err := wc.Recv()
712 if err != nil {
713 select {
714 case w.errc <- err:
715 case <-w.donec:
716 }
717 return
718 }
719 select {
720 case w.respc <- resp:
721 case <-w.donec:
722 return
723 }
724 }
725}
726
727// serveSubstream forwards watch responses from run() to the subscriber
728func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
729 if ws.closing {
730 panic("created substream goroutine but substream is closing")
731 }
732
733 // nextRev is the minimum expected next revision
734 nextRev := ws.initReq.rev
735 resuming := false
736 defer func() {
737 if !resuming {
738 ws.closing = true
739 }
740 close(ws.donec)
741 if !resuming {
742 w.closingc <- ws
743 }
744 w.wg.Done()
745 }()
746
747 emptyWr := &WatchResponse{}
748 for {
749 curWr := emptyWr
750 outc := ws.outc
751
752 if len(ws.buf) > 0 {
753 curWr = ws.buf[0]
754 } else {
755 outc = nil
756 }
757 select {
758 case outc <- *curWr:
759 if ws.buf[0].Err() != nil {
760 return
761 }
762 ws.buf[0] = nil
763 ws.buf = ws.buf[1:]
764 case wr, ok := <-ws.recvc:
765 if !ok {
766 // shutdown from closeSubstream
767 return
768 }
769
770 if wr.Created {
771 if ws.initReq.retc != nil {
772 ws.initReq.retc <- ws.outc
773 // to prevent next write from taking the slot in buffered channel
774 // and posting duplicate create events
775 ws.initReq.retc = nil
776
777 // send first creation event only if requested
778 if ws.initReq.createdNotify {
779 ws.outc <- *wr
780 }
781 // once the watch channel is returned, a current revision
782 // watch must resume at the store revision. This is necessary
783 // for the following case to work as expected:
784 // wch := m1.Watch("a")
785 // m2.Put("a", "b")
786 // <-wch
787 // If the revision is only bound on the first observed event,
788 // if wch is disconnected before the Put is issued, then reconnects
789 // after it is committed, it'll miss the Put.
790 if ws.initReq.rev == 0 {
791 nextRev = wr.Header.Revision
792 }
793 }
794 } else {
795 // current progress of watch; <= store revision
796 nextRev = wr.Header.Revision
797 }
798
799 if len(wr.Events) > 0 {
800 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
801 }
802 ws.initReq.rev = nextRev
803
804 // created event is already sent above,
805 // watcher should not post duplicate events
806 if wr.Created {
807 continue
808 }
809
810 // TODO pause channel if buffer gets too large
811 ws.buf = append(ws.buf, wr)
812 case <-w.ctx.Done():
813 return
814 case <-ws.initReq.ctx.Done():
815 return
816 case <-resumec:
817 resuming = true
818 return
819 }
820 }
821 // lazily send cancel message if events on missing id
822}
823
824func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
825 // mark all substreams as resuming
826 close(w.resumec)
827 w.resumec = make(chan struct{})
828 w.joinSubstreams()
829 for _, ws := range w.substreams {
830 ws.id = -1
831 w.resuming = append(w.resuming, ws)
832 }
833 // strip out nils, if any
834 var resuming []*watcherStream
835 for _, ws := range w.resuming {
836 if ws != nil {
837 resuming = append(resuming, ws)
838 }
839 }
840 w.resuming = resuming
841 w.substreams = make(map[int64]*watcherStream)
842
843 // connect to grpc stream while accepting watcher cancelation
844 stopc := make(chan struct{})
845 donec := w.waitCancelSubstreams(stopc)
846 wc, err := w.openWatchClient()
847 close(stopc)
848 <-donec
849
850 // serve all non-closing streams, even if there's a client error
851 // so that the teardown path can shutdown the streams as expected.
852 for _, ws := range w.resuming {
853 if ws.closing {
854 continue
855 }
856 ws.donec = make(chan struct{})
857 w.wg.Add(1)
858 go w.serveSubstream(ws, w.resumec)
859 }
860
861 if err != nil {
862 return nil, v3rpc.Error(err)
863 }
864
865 // receive data from new grpc stream
866 go w.serveWatchClient(wc)
867 return wc, nil
868}
869
870func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
871 var wg sync.WaitGroup
872 wg.Add(len(w.resuming))
873 donec := make(chan struct{})
874 for i := range w.resuming {
875 go func(ws *watcherStream) {
876 defer wg.Done()
877 if ws.closing {
878 if ws.initReq.ctx.Err() != nil && ws.outc != nil {
879 close(ws.outc)
880 ws.outc = nil
881 }
882 return
883 }
884 select {
885 case <-ws.initReq.ctx.Done():
886 // closed ws will be removed from resuming
887 ws.closing = true
888 close(ws.outc)
889 ws.outc = nil
890 w.wg.Add(1)
891 go func() {
892 defer w.wg.Done()
893 w.closingc <- ws
894 }()
895 case <-stopc:
896 }
897 }(w.resuming[i])
898 }
899 go func() {
900 defer close(donec)
901 wg.Wait()
902 }()
903 return donec
904}
905
906// joinSubstreams waits for all substream goroutines to complete.
907func (w *watchGrpcStream) joinSubstreams() {
908 for _, ws := range w.substreams {
909 <-ws.donec
910 }
911 for _, ws := range w.resuming {
912 if ws != nil {
913 <-ws.donec
914 }
915 }
916}
917
918var maxBackoff = 100 * time.Millisecond
919
920// openWatchClient retries opening a watch client until success or halt.
921// manually retry in case "ws==nil && err==nil"
922// TODO: remove FailFast=false
923func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
924 backoff := time.Millisecond
925 for {
926 select {
927 case <-w.ctx.Done():
928 if err == nil {
929 return nil, w.ctx.Err()
930 }
931 return nil, err
932 default:
933 }
934 if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
935 break
936 }
937 if isHaltErr(w.ctx, err) {
938 return nil, v3rpc.Error(err)
939 }
940 if isUnavailableErr(w.ctx, err) {
941 // retry, but backoff
942 if backoff < maxBackoff {
943 // 25% backoff factor
944 backoff = backoff + backoff/4
945 if backoff > maxBackoff {
946 backoff = maxBackoff
947 }
948 }
949 time.Sleep(backoff)
950 }
951 }
952 return ws, nil
953}
954
955// toPB converts an internal watch request structure to its protobuf WatchRequest structure.
956func (wr *watchRequest) toPB() *pb.WatchRequest {
957 req := &pb.WatchCreateRequest{
958 StartRevision: wr.rev,
959 Key: []byte(wr.key),
960 RangeEnd: []byte(wr.end),
961 ProgressNotify: wr.progressNotify,
962 Filters: wr.filters,
963 PrevKv: wr.prevKV,
Stephane Barbarie260a5632019-02-26 16:12:49 -0500964 Fragment: wr.fragment,
khenaidooac637102019-01-14 15:44:34 -0500965 }
966 cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
967 return &pb.WatchRequest{RequestUnion: cr}
968}
969
Stephane Barbarie260a5632019-02-26 16:12:49 -0500970// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
971func (pr *progressRequest) toPB() *pb.WatchRequest {
972 req := &pb.WatchProgressRequest{}
973 cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
974 return &pb.WatchRequest{RequestUnion: cr}
975}
976
khenaidooac637102019-01-14 15:44:34 -0500977func streamKeyFromCtx(ctx context.Context) string {
978 if md, ok := metadata.FromOutgoingContext(ctx); ok {
979 return fmt.Sprintf("%+v", md)
980 }
981 return ""
982}