blob: d7633850e7c9e82d419828ecef9be773fc93bda1 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "fmt"
20 "sync"
21 "time"
22
23 v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
24 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
25 mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
26
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/metadata"
30 "google.golang.org/grpc/status"
31)
32
33const (
34 EventTypeDelete = mvccpb.DELETE
35 EventTypePut = mvccpb.PUT
36
37 closeSendErrTimeout = 250 * time.Millisecond
38)
39
40type Event mvccpb.Event
41
42type WatchChan <-chan WatchResponse
43
44type Watcher interface {
45 // Watch watches on a key or prefix. The watched events will be returned
46 // through the returned channel. If revisions waiting to be sent over the
47 // watch are compacted, then the watch will be canceled by the server, the
48 // client will post a compacted error watch response, and the channel will close.
49 Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
50
51 // Close closes the watcher and cancels all watch requests.
52 Close() error
53}
54
55type WatchResponse struct {
56 Header pb.ResponseHeader
57 Events []*Event
58
59 // CompactRevision is the minimum revision the watcher may receive.
60 CompactRevision int64
61
62 // Canceled is used to indicate watch failure.
63 // If the watch failed and the stream was about to close, before the channel is closed,
64 // the channel sends a final response that has Canceled set to true with a non-nil Err().
65 Canceled bool
66
67 // Created is used to indicate the creation of the watcher.
68 Created bool
69
70 closeErr error
71
72 // cancelReason is a reason of canceling watch
73 cancelReason string
74}
75
76// IsCreate returns true if the event tells that the key is newly created.
77func (e *Event) IsCreate() bool {
78 return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
79}
80
81// IsModify returns true if the event tells that a new value is put on existing key.
82func (e *Event) IsModify() bool {
83 return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
84}
85
86// Err is the error value if this WatchResponse holds an error.
87func (wr *WatchResponse) Err() error {
88 switch {
89 case wr.closeErr != nil:
90 return v3rpc.Error(wr.closeErr)
91 case wr.CompactRevision != 0:
92 return v3rpc.ErrCompacted
93 case wr.Canceled:
94 if len(wr.cancelReason) != 0 {
95 return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
96 }
97 return v3rpc.ErrFutureRev
98 }
99 return nil
100}
101
102// IsProgressNotify returns true if the WatchResponse is progress notification.
103func (wr *WatchResponse) IsProgressNotify() bool {
104 return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
105}
106
107// watcher implements the Watcher interface
108type watcher struct {
109 remote pb.WatchClient
110 callOpts []grpc.CallOption
111
112 // mu protects the grpc streams map
113 mu sync.RWMutex
114
115 // streams holds all the active grpc streams keyed by ctx value.
116 streams map[string]*watchGrpcStream
117}
118
119// watchGrpcStream tracks all watch resources attached to a single grpc stream.
120type watchGrpcStream struct {
121 owner *watcher
122 remote pb.WatchClient
123 callOpts []grpc.CallOption
124
125 // ctx controls internal remote.Watch requests
126 ctx context.Context
127 // ctxKey is the key used when looking up this stream's context
128 ctxKey string
129 cancel context.CancelFunc
130
131 // substreams holds all active watchers on this grpc stream
132 substreams map[int64]*watcherStream
133 // resuming holds all resuming watchers on this grpc stream
134 resuming []*watcherStream
135
136 // reqc sends a watch request from Watch() to the main goroutine
137 reqc chan *watchRequest
138 // respc receives data from the watch client
139 respc chan *pb.WatchResponse
140 // donec closes to broadcast shutdown
141 donec chan struct{}
142 // errc transmits errors from grpc Recv to the watch stream reconnect logic
143 errc chan error
144 // closingc gets the watcherStream of closing watchers
145 closingc chan *watcherStream
146 // wg is Done when all substream goroutines have exited
147 wg sync.WaitGroup
148
149 // resumec closes to signal that all substreams should begin resuming
150 resumec chan struct{}
151 // closeErr is the error that closed the watch stream
152 closeErr error
153}
154
155// watchRequest is issued by the subscriber to start a new watcher
156type watchRequest struct {
157 ctx context.Context
158 key string
159 end string
160 rev int64
161 // send created notification event if this field is true
162 createdNotify bool
163 // progressNotify is for progress updates
164 progressNotify bool
165 // filters is the list of events to filter out
166 filters []pb.WatchCreateRequest_FilterType
167 // get the previous key-value pair before the event happens
168 prevKV bool
169 // retc receives a chan WatchResponse once the watcher is established
170 retc chan chan WatchResponse
171}
172
173// watcherStream represents a registered watcher
174type watcherStream struct {
175 // initReq is the request that initiated this request
176 initReq watchRequest
177
178 // outc publishes watch responses to subscriber
179 outc chan WatchResponse
180 // recvc buffers watch responses before publishing
181 recvc chan *WatchResponse
182 // donec closes when the watcherStream goroutine stops.
183 donec chan struct{}
184 // closing is set to true when stream should be scheduled to shutdown.
185 closing bool
186 // id is the registered watch id on the grpc stream
187 id int64
188
189 // buf holds all events received from etcd but not yet consumed by the client
190 buf []*WatchResponse
191}
192
193func NewWatcher(c *Client) Watcher {
194 return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
195}
196
197func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
198 w := &watcher{
199 remote: wc,
200 streams: make(map[string]*watchGrpcStream),
201 }
202 if c != nil {
203 w.callOpts = c.callOpts
204 }
205 return w
206}
207
208// never closes
209var valCtxCh = make(chan struct{})
210var zeroTime = time.Unix(0, 0)
211
212// ctx with only the values; never Done
213type valCtx struct{ context.Context }
214
215func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
216func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
217func (vc *valCtx) Err() error { return nil }
218
219func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
220 ctx, cancel := context.WithCancel(&valCtx{inctx})
221 wgs := &watchGrpcStream{
222 owner: w,
223 remote: w.remote,
224 callOpts: w.callOpts,
225 ctx: ctx,
226 ctxKey: streamKeyFromCtx(inctx),
227 cancel: cancel,
228 substreams: make(map[int64]*watcherStream),
229 respc: make(chan *pb.WatchResponse),
230 reqc: make(chan *watchRequest),
231 donec: make(chan struct{}),
232 errc: make(chan error, 1),
233 closingc: make(chan *watcherStream),
234 resumec: make(chan struct{}),
235 }
236 go wgs.run()
237 return wgs
238}
239
240// Watch posts a watch request to run() and waits for a new watcher channel
241func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
242 ow := opWatch(key, opts...)
243
244 var filters []pb.WatchCreateRequest_FilterType
245 if ow.filterPut {
246 filters = append(filters, pb.WatchCreateRequest_NOPUT)
247 }
248 if ow.filterDelete {
249 filters = append(filters, pb.WatchCreateRequest_NODELETE)
250 }
251
252 wr := &watchRequest{
253 ctx: ctx,
254 createdNotify: ow.createdNotify,
255 key: string(ow.key),
256 end: string(ow.end),
257 rev: ow.rev,
258 progressNotify: ow.progressNotify,
259 filters: filters,
260 prevKV: ow.prevKV,
261 retc: make(chan chan WatchResponse, 1),
262 }
263
264 ok := false
265 ctxKey := streamKeyFromCtx(ctx)
266
267 // find or allocate appropriate grpc watch stream
268 w.mu.Lock()
269 if w.streams == nil {
270 // closed
271 w.mu.Unlock()
272 ch := make(chan WatchResponse)
273 close(ch)
274 return ch
275 }
276 wgs := w.streams[ctxKey]
277 if wgs == nil {
278 wgs = w.newWatcherGrpcStream(ctx)
279 w.streams[ctxKey] = wgs
280 }
281 donec := wgs.donec
282 reqc := wgs.reqc
283 w.mu.Unlock()
284
285 // couldn't create channel; return closed channel
286 closeCh := make(chan WatchResponse, 1)
287
288 // submit request
289 select {
290 case reqc <- wr:
291 ok = true
292 case <-wr.ctx.Done():
293 case <-donec:
294 if wgs.closeErr != nil {
295 closeCh <- WatchResponse{closeErr: wgs.closeErr}
296 break
297 }
298 // retry; may have dropped stream from no ctxs
299 return w.Watch(ctx, key, opts...)
300 }
301
302 // receive channel
303 if ok {
304 select {
305 case ret := <-wr.retc:
306 return ret
307 case <-ctx.Done():
308 case <-donec:
309 if wgs.closeErr != nil {
310 closeCh <- WatchResponse{closeErr: wgs.closeErr}
311 break
312 }
313 // retry; may have dropped stream from no ctxs
314 return w.Watch(ctx, key, opts...)
315 }
316 }
317
318 close(closeCh)
319 return closeCh
320}
321
322func (w *watcher) Close() (err error) {
323 w.mu.Lock()
324 streams := w.streams
325 w.streams = nil
326 w.mu.Unlock()
327 for _, wgs := range streams {
328 if werr := wgs.close(); werr != nil {
329 err = werr
330 }
331 }
332 return err
333}
334
335func (w *watchGrpcStream) close() (err error) {
336 w.cancel()
337 <-w.donec
338 select {
339 case err = <-w.errc:
340 default:
341 }
342 return toErr(w.ctx, err)
343}
344
345func (w *watcher) closeStream(wgs *watchGrpcStream) {
346 w.mu.Lock()
347 close(wgs.donec)
348 wgs.cancel()
349 if w.streams != nil {
350 delete(w.streams, wgs.ctxKey)
351 }
352 w.mu.Unlock()
353}
354
355func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
356 if resp.WatchId == -1 {
357 // failed; no channel
358 close(ws.recvc)
359 return
360 }
361 ws.id = resp.WatchId
362 w.substreams[ws.id] = ws
363}
364
365func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
366 select {
367 case ws.outc <- *resp:
368 case <-ws.initReq.ctx.Done():
369 case <-time.After(closeSendErrTimeout):
370 }
371 close(ws.outc)
372}
373
374func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
375 // send channel response in case stream was never established
376 select {
377 case ws.initReq.retc <- ws.outc:
378 default:
379 }
380 // close subscriber's channel
381 if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
382 go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
383 } else if ws.outc != nil {
384 close(ws.outc)
385 }
386 if ws.id != -1 {
387 delete(w.substreams, ws.id)
388 return
389 }
390 for i := range w.resuming {
391 if w.resuming[i] == ws {
392 w.resuming[i] = nil
393 return
394 }
395 }
396}
397
398// run is the root of the goroutines for managing a watcher client
399func (w *watchGrpcStream) run() {
400 var wc pb.Watch_WatchClient
401 var closeErr error
402
403 // substreams marked to close but goroutine still running; needed for
404 // avoiding double-closing recvc on grpc stream teardown
405 closing := make(map[*watcherStream]struct{})
406
407 defer func() {
408 w.closeErr = closeErr
409 // shutdown substreams and resuming substreams
410 for _, ws := range w.substreams {
411 if _, ok := closing[ws]; !ok {
412 close(ws.recvc)
413 closing[ws] = struct{}{}
414 }
415 }
416 for _, ws := range w.resuming {
417 if _, ok := closing[ws]; ws != nil && !ok {
418 close(ws.recvc)
419 closing[ws] = struct{}{}
420 }
421 }
422 w.joinSubstreams()
423 for range closing {
424 w.closeSubstream(<-w.closingc)
425 }
426 w.wg.Wait()
427 w.owner.closeStream(w)
428 }()
429
430 // start a stream with the etcd grpc server
431 if wc, closeErr = w.newWatchClient(); closeErr != nil {
432 return
433 }
434
435 cancelSet := make(map[int64]struct{})
436
437 for {
438 select {
439 // Watch() requested
440 case wreq := <-w.reqc:
441 outc := make(chan WatchResponse, 1)
442 ws := &watcherStream{
443 initReq: *wreq,
444 id: -1,
445 outc: outc,
446 // unbuffered so resumes won't cause repeat events
447 recvc: make(chan *WatchResponse),
448 }
449
450 ws.donec = make(chan struct{})
451 w.wg.Add(1)
452 go w.serveSubstream(ws, w.resumec)
453
454 // queue up for watcher creation/resume
455 w.resuming = append(w.resuming, ws)
456 if len(w.resuming) == 1 {
457 // head of resume queue, can register a new watcher
458 wc.Send(ws.initReq.toPB())
459 }
460 // New events from the watch client
461 case pbresp := <-w.respc:
462 switch {
463 case pbresp.Created:
464 // response to head of queue creation
465 if ws := w.resuming[0]; ws != nil {
466 w.addSubstream(pbresp, ws)
467 w.dispatchEvent(pbresp)
468 w.resuming[0] = nil
469 }
470 if ws := w.nextResume(); ws != nil {
471 wc.Send(ws.initReq.toPB())
472 }
473 case pbresp.Canceled && pbresp.CompactRevision == 0:
474 delete(cancelSet, pbresp.WatchId)
475 if ws, ok := w.substreams[pbresp.WatchId]; ok {
476 // signal to stream goroutine to update closingc
477 close(ws.recvc)
478 closing[ws] = struct{}{}
479 }
480 default:
481 // dispatch to appropriate watch stream
482 if ok := w.dispatchEvent(pbresp); ok {
483 break
484 }
485 // watch response on unexpected watch id; cancel id
486 if _, ok := cancelSet[pbresp.WatchId]; ok {
487 break
488 }
489 cancelSet[pbresp.WatchId] = struct{}{}
490 cr := &pb.WatchRequest_CancelRequest{
491 CancelRequest: &pb.WatchCancelRequest{
492 WatchId: pbresp.WatchId,
493 },
494 }
495 req := &pb.WatchRequest{RequestUnion: cr}
496 wc.Send(req)
497 }
498 // watch client failed on Recv; spawn another if possible
499 case err := <-w.errc:
500 if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
501 closeErr = err
502 return
503 }
504 if wc, closeErr = w.newWatchClient(); closeErr != nil {
505 return
506 }
507 if ws := w.nextResume(); ws != nil {
508 wc.Send(ws.initReq.toPB())
509 }
510 cancelSet = make(map[int64]struct{})
511 case <-w.ctx.Done():
512 return
513 case ws := <-w.closingc:
514 w.closeSubstream(ws)
515 delete(closing, ws)
516 if len(w.substreams)+len(w.resuming) == 0 {
517 // no more watchers on this stream, shutdown
518 return
519 }
520 }
521 }
522}
523
524// nextResume chooses the next resuming to register with the grpc stream. Abandoned
525// streams are marked as nil in the queue since the head must wait for its inflight registration.
526func (w *watchGrpcStream) nextResume() *watcherStream {
527 for len(w.resuming) != 0 {
528 if w.resuming[0] != nil {
529 return w.resuming[0]
530 }
531 w.resuming = w.resuming[1:len(w.resuming)]
532 }
533 return nil
534}
535
536// dispatchEvent sends a WatchResponse to the appropriate watcher stream
537func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
538 events := make([]*Event, len(pbresp.Events))
539 for i, ev := range pbresp.Events {
540 events[i] = (*Event)(ev)
541 }
542 wr := &WatchResponse{
543 Header: *pbresp.Header,
544 Events: events,
545 CompactRevision: pbresp.CompactRevision,
546 Created: pbresp.Created,
547 Canceled: pbresp.Canceled,
548 cancelReason: pbresp.CancelReason,
549 }
550 ws, ok := w.substreams[pbresp.WatchId]
551 if !ok {
552 return false
553 }
554 select {
555 case ws.recvc <- wr:
556 case <-ws.donec:
557 return false
558 }
559 return true
560}
561
562// serveWatchClient forwards messages from the grpc stream to run()
563func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
564 for {
565 resp, err := wc.Recv()
566 if err != nil {
567 select {
568 case w.errc <- err:
569 case <-w.donec:
570 }
571 return
572 }
573 select {
574 case w.respc <- resp:
575 case <-w.donec:
576 return
577 }
578 }
579}
580
581// serveSubstream forwards watch responses from run() to the subscriber
582func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
583 if ws.closing {
584 panic("created substream goroutine but substream is closing")
585 }
586
587 // nextRev is the minimum expected next revision
588 nextRev := ws.initReq.rev
589 resuming := false
590 defer func() {
591 if !resuming {
592 ws.closing = true
593 }
594 close(ws.donec)
595 if !resuming {
596 w.closingc <- ws
597 }
598 w.wg.Done()
599 }()
600
601 emptyWr := &WatchResponse{}
602 for {
603 curWr := emptyWr
604 outc := ws.outc
605
606 if len(ws.buf) > 0 {
607 curWr = ws.buf[0]
608 } else {
609 outc = nil
610 }
611 select {
612 case outc <- *curWr:
613 if ws.buf[0].Err() != nil {
614 return
615 }
616 ws.buf[0] = nil
617 ws.buf = ws.buf[1:]
618 case wr, ok := <-ws.recvc:
619 if !ok {
620 // shutdown from closeSubstream
621 return
622 }
623
624 if wr.Created {
625 if ws.initReq.retc != nil {
626 ws.initReq.retc <- ws.outc
627 // to prevent next write from taking the slot in buffered channel
628 // and posting duplicate create events
629 ws.initReq.retc = nil
630
631 // send first creation event only if requested
632 if ws.initReq.createdNotify {
633 ws.outc <- *wr
634 }
635 // once the watch channel is returned, a current revision
636 // watch must resume at the store revision. This is necessary
637 // for the following case to work as expected:
638 // wch := m1.Watch("a")
639 // m2.Put("a", "b")
640 // <-wch
641 // If the revision is only bound on the first observed event,
642 // if wch is disconnected before the Put is issued, then reconnects
643 // after it is committed, it'll miss the Put.
644 if ws.initReq.rev == 0 {
645 nextRev = wr.Header.Revision
646 }
647 }
648 } else {
649 // current progress of watch; <= store revision
650 nextRev = wr.Header.Revision
651 }
652
653 if len(wr.Events) > 0 {
654 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
655 }
656 ws.initReq.rev = nextRev
657
658 // created event is already sent above,
659 // watcher should not post duplicate events
660 if wr.Created {
661 continue
662 }
663
664 // TODO pause channel if buffer gets too large
665 ws.buf = append(ws.buf, wr)
666 case <-w.ctx.Done():
667 return
668 case <-ws.initReq.ctx.Done():
669 return
670 case <-resumec:
671 resuming = true
672 return
673 }
674 }
675 // lazily send cancel message if events on missing id
676}
677
678func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
679 // mark all substreams as resuming
680 close(w.resumec)
681 w.resumec = make(chan struct{})
682 w.joinSubstreams()
683 for _, ws := range w.substreams {
684 ws.id = -1
685 w.resuming = append(w.resuming, ws)
686 }
687 // strip out nils, if any
688 var resuming []*watcherStream
689 for _, ws := range w.resuming {
690 if ws != nil {
691 resuming = append(resuming, ws)
692 }
693 }
694 w.resuming = resuming
695 w.substreams = make(map[int64]*watcherStream)
696
697 // connect to grpc stream while accepting watcher cancelation
698 stopc := make(chan struct{})
699 donec := w.waitCancelSubstreams(stopc)
700 wc, err := w.openWatchClient()
701 close(stopc)
702 <-donec
703
704 // serve all non-closing streams, even if there's a client error
705 // so that the teardown path can shutdown the streams as expected.
706 for _, ws := range w.resuming {
707 if ws.closing {
708 continue
709 }
710 ws.donec = make(chan struct{})
711 w.wg.Add(1)
712 go w.serveSubstream(ws, w.resumec)
713 }
714
715 if err != nil {
716 return nil, v3rpc.Error(err)
717 }
718
719 // receive data from new grpc stream
720 go w.serveWatchClient(wc)
721 return wc, nil
722}
723
724func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
725 var wg sync.WaitGroup
726 wg.Add(len(w.resuming))
727 donec := make(chan struct{})
728 for i := range w.resuming {
729 go func(ws *watcherStream) {
730 defer wg.Done()
731 if ws.closing {
732 if ws.initReq.ctx.Err() != nil && ws.outc != nil {
733 close(ws.outc)
734 ws.outc = nil
735 }
736 return
737 }
738 select {
739 case <-ws.initReq.ctx.Done():
740 // closed ws will be removed from resuming
741 ws.closing = true
742 close(ws.outc)
743 ws.outc = nil
744 w.wg.Add(1)
745 go func() {
746 defer w.wg.Done()
747 w.closingc <- ws
748 }()
749 case <-stopc:
750 }
751 }(w.resuming[i])
752 }
753 go func() {
754 defer close(donec)
755 wg.Wait()
756 }()
757 return donec
758}
759
760// joinSubstreams waits for all substream goroutines to complete.
761func (w *watchGrpcStream) joinSubstreams() {
762 for _, ws := range w.substreams {
763 <-ws.donec
764 }
765 for _, ws := range w.resuming {
766 if ws != nil {
767 <-ws.donec
768 }
769 }
770}
771
772var maxBackoff = 100 * time.Millisecond
773
774// openWatchClient retries opening a watch client until success or halt.
775// manually retry in case "ws==nil && err==nil"
776// TODO: remove FailFast=false
777func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
778 backoff := time.Millisecond
779 for {
780 select {
781 case <-w.ctx.Done():
782 if err == nil {
783 return nil, w.ctx.Err()
784 }
785 return nil, err
786 default:
787 }
788 if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
789 break
790 }
791 if isHaltErr(w.ctx, err) {
792 return nil, v3rpc.Error(err)
793 }
794 if isUnavailableErr(w.ctx, err) {
795 // retry, but backoff
796 if backoff < maxBackoff {
797 // 25% backoff factor
798 backoff = backoff + backoff/4
799 if backoff > maxBackoff {
800 backoff = maxBackoff
801 }
802 }
803 time.Sleep(backoff)
804 }
805 }
806 return ws, nil
807}
808
809// toPB converts an internal watch request structure to its protobuf WatchRequest structure.
810func (wr *watchRequest) toPB() *pb.WatchRequest {
811 req := &pb.WatchCreateRequest{
812 StartRevision: wr.rev,
813 Key: []byte(wr.key),
814 RangeEnd: []byte(wr.end),
815 ProgressNotify: wr.progressNotify,
816 Filters: wr.filters,
817 PrevKv: wr.prevKV,
818 }
819 cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
820 return &pb.WatchRequest{RequestUnion: cr}
821}
822
823func streamKeyFromCtx(ctx context.Context) string {
824 if md, ok := metadata.FromOutgoingContext(ctx); ok {
825 return fmt.Sprintf("%+v", md)
826 }
827 return ""
828}