| // Copyright 2016 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package clientv3 |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "sync" |
| "time" |
| |
| v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" |
| pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| mvccpb "github.com/coreos/etcd/mvcc/mvccpb" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/metadata" |
| "google.golang.org/grpc/status" |
| ) |
| |
| const ( |
| EventTypeDelete = mvccpb.DELETE |
| EventTypePut = mvccpb.PUT |
| |
| closeSendErrTimeout = 250 * time.Millisecond |
| ) |
| |
| type Event mvccpb.Event |
| |
| type WatchChan <-chan WatchResponse |
| |
| type Watcher interface { |
| // Watch watches on a key or prefix. The watched events will be returned |
| // through the returned channel. If revisions waiting to be sent over the |
| // watch are compacted, then the watch will be canceled by the server, the |
| // client will post a compacted error watch response, and the channel will close. |
| // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed, |
| // and "WatchResponse" from this closed channel has zero events and nil "Err()". |
| // The context "ctx" MUST be canceled, as soon as watcher is no longer being used, |
| // to release the associated resources. |
| // |
| // If the context is "context.Background/TODO", returned "WatchChan" will |
| // not be closed and block until event is triggered, except when server |
| // returns a non-recoverable error (e.g. ErrCompacted). |
| // For example, when context passed with "WithRequireLeader" and the |
| // connected server has no leader (e.g. due to network partition), |
| // error "etcdserver: no leader" (ErrNoLeader) will be returned, |
| // and then "WatchChan" is closed with non-nil "Err()". |
| // In order to prevent a watch stream being stuck in a partitioned node, |
| // make sure to wrap context with "WithRequireLeader". |
| // |
| // Otherwise, as long as the context has not been canceled or timed out, |
| // watch will retry on other recoverable errors forever until reconnected. |
| // |
| // TODO: explicitly set context error in the last "WatchResponse" message and close channel? |
| // Currently, client contexts are overwritten with "valCtx" that never closes. |
| // TODO(v3.4): configure watch retry policy, limit maximum retry number |
| // (see https://github.com/etcd-io/etcd/issues/8980) |
| Watch(ctx context.Context, key string, opts ...OpOption) WatchChan |
| |
| // RequestProgress requests a progress notify response be sent in all watch channels. |
| RequestProgress(ctx context.Context) error |
| |
| // Close closes the watcher and cancels all watch requests. |
| Close() error |
| } |
| |
| type WatchResponse struct { |
| Header pb.ResponseHeader |
| Events []*Event |
| |
| // CompactRevision is the minimum revision the watcher may receive. |
| CompactRevision int64 |
| |
| // Canceled is used to indicate watch failure. |
| // If the watch failed and the stream was about to close, before the channel is closed, |
| // the channel sends a final response that has Canceled set to true with a non-nil Err(). |
| Canceled bool |
| |
| // Created is used to indicate the creation of the watcher. |
| Created bool |
| |
| closeErr error |
| |
| // cancelReason is a reason of canceling watch |
| cancelReason string |
| } |
| |
| // IsCreate returns true if the event tells that the key is newly created. |
| func (e *Event) IsCreate() bool { |
| return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision |
| } |
| |
| // IsModify returns true if the event tells that a new value is put on existing key. |
| func (e *Event) IsModify() bool { |
| return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision |
| } |
| |
| // Err is the error value if this WatchResponse holds an error. |
| func (wr *WatchResponse) Err() error { |
| switch { |
| case wr.closeErr != nil: |
| return v3rpc.Error(wr.closeErr) |
| case wr.CompactRevision != 0: |
| return v3rpc.ErrCompacted |
| case wr.Canceled: |
| if len(wr.cancelReason) != 0 { |
| return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason)) |
| } |
| return v3rpc.ErrFutureRev |
| } |
| return nil |
| } |
| |
| // IsProgressNotify returns true if the WatchResponse is progress notification. |
| func (wr *WatchResponse) IsProgressNotify() bool { |
| return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0 |
| } |
| |
| // watcher implements the Watcher interface |
| type watcher struct { |
| remote pb.WatchClient |
| callOpts []grpc.CallOption |
| |
| // mu protects the grpc streams map |
| mu sync.RWMutex |
| |
| // streams holds all the active grpc streams keyed by ctx value. |
| streams map[string]*watchGrpcStream |
| } |
| |
| // watchGrpcStream tracks all watch resources attached to a single grpc stream. |
| type watchGrpcStream struct { |
| owner *watcher |
| remote pb.WatchClient |
| callOpts []grpc.CallOption |
| |
| // ctx controls internal remote.Watch requests |
| ctx context.Context |
| // ctxKey is the key used when looking up this stream's context |
| ctxKey string |
| cancel context.CancelFunc |
| |
| // substreams holds all active watchers on this grpc stream |
| substreams map[int64]*watcherStream |
| // resuming holds all resuming watchers on this grpc stream |
| resuming []*watcherStream |
| |
| // reqc sends a watch request from Watch() to the main goroutine |
| reqc chan watchStreamRequest |
| // respc receives data from the watch client |
| respc chan *pb.WatchResponse |
| // donec closes to broadcast shutdown |
| donec chan struct{} |
| // errc transmits errors from grpc Recv to the watch stream reconnect logic |
| errc chan error |
| // closingc gets the watcherStream of closing watchers |
| closingc chan *watcherStream |
| // wg is Done when all substream goroutines have exited |
| wg sync.WaitGroup |
| |
| // resumec closes to signal that all substreams should begin resuming |
| resumec chan struct{} |
| // closeErr is the error that closed the watch stream |
| closeErr error |
| } |
| |
| // watchStreamRequest is a union of the supported watch request operation types |
| type watchStreamRequest interface { |
| toPB() *pb.WatchRequest |
| } |
| |
| // watchRequest is issued by the subscriber to start a new watcher |
| type watchRequest struct { |
| ctx context.Context |
| key string |
| end string |
| rev int64 |
| |
| // send created notification event if this field is true |
| createdNotify bool |
| // progressNotify is for progress updates |
| progressNotify bool |
| // fragmentation should be disabled by default |
| // if true, split watch events when total exceeds |
| // "--max-request-bytes" flag value + 512-byte |
| fragment bool |
| |
| // filters is the list of events to filter out |
| filters []pb.WatchCreateRequest_FilterType |
| // get the previous key-value pair before the event happens |
| prevKV bool |
| // retc receives a chan WatchResponse once the watcher is established |
| retc chan chan WatchResponse |
| } |
| |
| // progressRequest is issued by the subscriber to request watch progress |
| type progressRequest struct { |
| } |
| |
| // watcherStream represents a registered watcher |
| type watcherStream struct { |
| // initReq is the request that initiated this request |
| initReq watchRequest |
| |
| // outc publishes watch responses to subscriber |
| outc chan WatchResponse |
| // recvc buffers watch responses before publishing |
| recvc chan *WatchResponse |
| // donec closes when the watcherStream goroutine stops. |
| donec chan struct{} |
| // closing is set to true when stream should be scheduled to shutdown. |
| closing bool |
| // id is the registered watch id on the grpc stream |
| id int64 |
| |
| // buf holds all events received from etcd but not yet consumed by the client |
| buf []*WatchResponse |
| } |
| |
| func NewWatcher(c *Client) Watcher { |
| return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c) |
| } |
| |
| func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { |
| w := &watcher{ |
| remote: wc, |
| streams: make(map[string]*watchGrpcStream), |
| } |
| if c != nil { |
| w.callOpts = c.callOpts |
| } |
| return w |
| } |
| |
| // never closes |
| var valCtxCh = make(chan struct{}) |
| var zeroTime = time.Unix(0, 0) |
| |
| // ctx with only the values; never Done |
| type valCtx struct{ context.Context } |
| |
| func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false } |
| func (vc *valCtx) Done() <-chan struct{} { return valCtxCh } |
| func (vc *valCtx) Err() error { return nil } |
| |
| func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { |
| ctx, cancel := context.WithCancel(&valCtx{inctx}) |
| wgs := &watchGrpcStream{ |
| owner: w, |
| remote: w.remote, |
| callOpts: w.callOpts, |
| ctx: ctx, |
| ctxKey: streamKeyFromCtx(inctx), |
| cancel: cancel, |
| substreams: make(map[int64]*watcherStream), |
| respc: make(chan *pb.WatchResponse), |
| reqc: make(chan watchStreamRequest), |
| donec: make(chan struct{}), |
| errc: make(chan error, 1), |
| closingc: make(chan *watcherStream), |
| resumec: make(chan struct{}), |
| } |
| go wgs.run() |
| return wgs |
| } |
| |
| // Watch posts a watch request to run() and waits for a new watcher channel |
| func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { |
| ow := opWatch(key, opts...) |
| |
| var filters []pb.WatchCreateRequest_FilterType |
| if ow.filterPut { |
| filters = append(filters, pb.WatchCreateRequest_NOPUT) |
| } |
| if ow.filterDelete { |
| filters = append(filters, pb.WatchCreateRequest_NODELETE) |
| } |
| |
| wr := &watchRequest{ |
| ctx: ctx, |
| createdNotify: ow.createdNotify, |
| key: string(ow.key), |
| end: string(ow.end), |
| rev: ow.rev, |
| progressNotify: ow.progressNotify, |
| fragment: ow.fragment, |
| filters: filters, |
| prevKV: ow.prevKV, |
| retc: make(chan chan WatchResponse, 1), |
| } |
| |
| ok := false |
| ctxKey := streamKeyFromCtx(ctx) |
| |
| // find or allocate appropriate grpc watch stream |
| w.mu.Lock() |
| if w.streams == nil { |
| // closed |
| w.mu.Unlock() |
| ch := make(chan WatchResponse) |
| close(ch) |
| return ch |
| } |
| wgs := w.streams[ctxKey] |
| if wgs == nil { |
| wgs = w.newWatcherGrpcStream(ctx) |
| w.streams[ctxKey] = wgs |
| } |
| donec := wgs.donec |
| reqc := wgs.reqc |
| w.mu.Unlock() |
| |
| // couldn't create channel; return closed channel |
| closeCh := make(chan WatchResponse, 1) |
| |
| // submit request |
| select { |
| case reqc <- wr: |
| ok = true |
| case <-wr.ctx.Done(): |
| case <-donec: |
| if wgs.closeErr != nil { |
| closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} |
| break |
| } |
| // retry; may have dropped stream from no ctxs |
| return w.Watch(ctx, key, opts...) |
| } |
| |
| // receive channel |
| if ok { |
| select { |
| case ret := <-wr.retc: |
| return ret |
| case <-ctx.Done(): |
| case <-donec: |
| if wgs.closeErr != nil { |
| closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} |
| break |
| } |
| // retry; may have dropped stream from no ctxs |
| return w.Watch(ctx, key, opts...) |
| } |
| } |
| |
| close(closeCh) |
| return closeCh |
| } |
| |
| func (w *watcher) Close() (err error) { |
| w.mu.Lock() |
| streams := w.streams |
| w.streams = nil |
| w.mu.Unlock() |
| for _, wgs := range streams { |
| if werr := wgs.close(); werr != nil { |
| err = werr |
| } |
| } |
| // Consider context.Canceled as a successful close |
| if err == context.Canceled { |
| err = nil |
| } |
| return err |
| } |
| |
| // RequestProgress requests a progress notify response be sent in all watch channels. |
| func (w *watcher) RequestProgress(ctx context.Context) (err error) { |
| ctxKey := streamKeyFromCtx(ctx) |
| |
| w.mu.Lock() |
| if w.streams == nil { |
| w.mu.Unlock() |
| return fmt.Errorf("no stream found for context") |
| } |
| wgs := w.streams[ctxKey] |
| if wgs == nil { |
| wgs = w.newWatcherGrpcStream(ctx) |
| w.streams[ctxKey] = wgs |
| } |
| donec := wgs.donec |
| reqc := wgs.reqc |
| w.mu.Unlock() |
| |
| pr := &progressRequest{} |
| |
| select { |
| case reqc <- pr: |
| return nil |
| case <-ctx.Done(): |
| if err == nil { |
| return ctx.Err() |
| } |
| return err |
| case <-donec: |
| if wgs.closeErr != nil { |
| return wgs.closeErr |
| } |
| // retry; may have dropped stream from no ctxs |
| return w.RequestProgress(ctx) |
| } |
| } |
| |
| func (w *watchGrpcStream) close() (err error) { |
| w.cancel() |
| <-w.donec |
| select { |
| case err = <-w.errc: |
| default: |
| } |
| return toErr(w.ctx, err) |
| } |
| |
| func (w *watcher) closeStream(wgs *watchGrpcStream) { |
| w.mu.Lock() |
| close(wgs.donec) |
| wgs.cancel() |
| if w.streams != nil { |
| delete(w.streams, wgs.ctxKey) |
| } |
| w.mu.Unlock() |
| } |
| |
| func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { |
| // check watch ID for backward compatibility (<= v3.3) |
| if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { |
| w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) |
| // failed; no channel |
| close(ws.recvc) |
| return |
| } |
| ws.id = resp.WatchId |
| w.substreams[ws.id] = ws |
| } |
| |
| func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) { |
| select { |
| case ws.outc <- *resp: |
| case <-ws.initReq.ctx.Done(): |
| case <-time.After(closeSendErrTimeout): |
| } |
| close(ws.outc) |
| } |
| |
| func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { |
| // send channel response in case stream was never established |
| select { |
| case ws.initReq.retc <- ws.outc: |
| default: |
| } |
| // close subscriber's channel |
| if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { |
| go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr}) |
| } else if ws.outc != nil { |
| close(ws.outc) |
| } |
| if ws.id != -1 { |
| delete(w.substreams, ws.id) |
| return |
| } |
| for i := range w.resuming { |
| if w.resuming[i] == ws { |
| w.resuming[i] = nil |
| return |
| } |
| } |
| } |
| |
| // run is the root of the goroutines for managing a watcher client |
| func (w *watchGrpcStream) run() { |
| var wc pb.Watch_WatchClient |
| var closeErr error |
| |
| // substreams marked to close but goroutine still running; needed for |
| // avoiding double-closing recvc on grpc stream teardown |
| closing := make(map[*watcherStream]struct{}) |
| |
| defer func() { |
| w.closeErr = closeErr |
| // shutdown substreams and resuming substreams |
| for _, ws := range w.substreams { |
| if _, ok := closing[ws]; !ok { |
| close(ws.recvc) |
| closing[ws] = struct{}{} |
| } |
| } |
| for _, ws := range w.resuming { |
| if _, ok := closing[ws]; ws != nil && !ok { |
| close(ws.recvc) |
| closing[ws] = struct{}{} |
| } |
| } |
| w.joinSubstreams() |
| for range closing { |
| w.closeSubstream(<-w.closingc) |
| } |
| w.wg.Wait() |
| w.owner.closeStream(w) |
| }() |
| |
| // start a stream with the etcd grpc server |
| if wc, closeErr = w.newWatchClient(); closeErr != nil { |
| return |
| } |
| |
| cancelSet := make(map[int64]struct{}) |
| |
| var cur *pb.WatchResponse |
| for { |
| select { |
| // Watch() requested |
| case req := <-w.reqc: |
| switch wreq := req.(type) { |
| case *watchRequest: |
| outc := make(chan WatchResponse, 1) |
| // TODO: pass custom watch ID? |
| ws := &watcherStream{ |
| initReq: *wreq, |
| id: -1, |
| outc: outc, |
| // unbuffered so resumes won't cause repeat events |
| recvc: make(chan *WatchResponse), |
| } |
| |
| ws.donec = make(chan struct{}) |
| w.wg.Add(1) |
| go w.serveSubstream(ws, w.resumec) |
| |
| // queue up for watcher creation/resume |
| w.resuming = append(w.resuming, ws) |
| if len(w.resuming) == 1 { |
| // head of resume queue, can register a new watcher |
| wc.Send(ws.initReq.toPB()) |
| } |
| case *progressRequest: |
| wc.Send(wreq.toPB()) |
| } |
| |
| // new events from the watch client |
| case pbresp := <-w.respc: |
| if cur == nil || pbresp.Created || pbresp.Canceled { |
| cur = pbresp |
| } else if cur != nil && cur.WatchId == pbresp.WatchId { |
| // merge new events |
| cur.Events = append(cur.Events, pbresp.Events...) |
| // update "Fragment" field; last response with "Fragment" == false |
| cur.Fragment = pbresp.Fragment |
| } |
| |
| switch { |
| case pbresp.Created: |
| // response to head of queue creation |
| if ws := w.resuming[0]; ws != nil { |
| w.addSubstream(pbresp, ws) |
| w.dispatchEvent(pbresp) |
| w.resuming[0] = nil |
| } |
| |
| if ws := w.nextResume(); ws != nil { |
| wc.Send(ws.initReq.toPB()) |
| } |
| |
| // reset for next iteration |
| cur = nil |
| |
| case pbresp.Canceled && pbresp.CompactRevision == 0: |
| delete(cancelSet, pbresp.WatchId) |
| if ws, ok := w.substreams[pbresp.WatchId]; ok { |
| // signal to stream goroutine to update closingc |
| close(ws.recvc) |
| closing[ws] = struct{}{} |
| } |
| |
| // reset for next iteration |
| cur = nil |
| |
| case cur.Fragment: |
| // watch response events are still fragmented |
| // continue to fetch next fragmented event arrival |
| continue |
| |
| default: |
| // dispatch to appropriate watch stream |
| ok := w.dispatchEvent(cur) |
| |
| // reset for next iteration |
| cur = nil |
| |
| if ok { |
| break |
| } |
| |
| // watch response on unexpected watch id; cancel id |
| if _, ok := cancelSet[pbresp.WatchId]; ok { |
| break |
| } |
| |
| cancelSet[pbresp.WatchId] = struct{}{} |
| cr := &pb.WatchRequest_CancelRequest{ |
| CancelRequest: &pb.WatchCancelRequest{ |
| WatchId: pbresp.WatchId, |
| }, |
| } |
| req := &pb.WatchRequest{RequestUnion: cr} |
| wc.Send(req) |
| } |
| |
| // watch client failed on Recv; spawn another if possible |
| case err := <-w.errc: |
| if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader { |
| closeErr = err |
| return |
| } |
| if wc, closeErr = w.newWatchClient(); closeErr != nil { |
| return |
| } |
| if ws := w.nextResume(); ws != nil { |
| wc.Send(ws.initReq.toPB()) |
| } |
| cancelSet = make(map[int64]struct{}) |
| |
| case <-w.ctx.Done(): |
| return |
| |
| case ws := <-w.closingc: |
| w.closeSubstream(ws) |
| delete(closing, ws) |
| // no more watchers on this stream, shutdown |
| if len(w.substreams)+len(w.resuming) == 0 { |
| return |
| } |
| } |
| } |
| } |
| |
| // nextResume chooses the next resuming to register with the grpc stream. Abandoned |
| // streams are marked as nil in the queue since the head must wait for its inflight registration. |
| func (w *watchGrpcStream) nextResume() *watcherStream { |
| for len(w.resuming) != 0 { |
| if w.resuming[0] != nil { |
| return w.resuming[0] |
| } |
| w.resuming = w.resuming[1:len(w.resuming)] |
| } |
| return nil |
| } |
| |
| // dispatchEvent sends a WatchResponse to the appropriate watcher stream |
| func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { |
| events := make([]*Event, len(pbresp.Events)) |
| for i, ev := range pbresp.Events { |
| events[i] = (*Event)(ev) |
| } |
| // TODO: return watch ID? |
| wr := &WatchResponse{ |
| Header: *pbresp.Header, |
| Events: events, |
| CompactRevision: pbresp.CompactRevision, |
| Created: pbresp.Created, |
| Canceled: pbresp.Canceled, |
| cancelReason: pbresp.CancelReason, |
| } |
| |
| // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to |
| // indicate they should be broadcast. |
| if wr.IsProgressNotify() && pbresp.WatchId == -1 { |
| return w.broadcastResponse(wr) |
| } |
| |
| return w.unicastResponse(wr, pbresp.WatchId) |
| |
| } |
| |
| // broadcastResponse send a watch response to all watch substreams. |
| func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool { |
| for _, ws := range w.substreams { |
| select { |
| case ws.recvc <- wr: |
| case <-ws.donec: |
| } |
| } |
| return true |
| } |
| |
| // unicastResponse sends a watch response to a specific watch substream. |
| func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool { |
| ws, ok := w.substreams[watchId] |
| if !ok { |
| return false |
| } |
| select { |
| case ws.recvc <- wr: |
| case <-ws.donec: |
| return false |
| } |
| return true |
| } |
| |
| // serveWatchClient forwards messages from the grpc stream to run() |
| func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { |
| for { |
| resp, err := wc.Recv() |
| if err != nil { |
| select { |
| case w.errc <- err: |
| case <-w.donec: |
| } |
| return |
| } |
| select { |
| case w.respc <- resp: |
| case <-w.donec: |
| return |
| } |
| } |
| } |
| |
| // serveSubstream forwards watch responses from run() to the subscriber |
| func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) { |
| if ws.closing { |
| panic("created substream goroutine but substream is closing") |
| } |
| |
| // nextRev is the minimum expected next revision |
| nextRev := ws.initReq.rev |
| resuming := false |
| defer func() { |
| if !resuming { |
| ws.closing = true |
| } |
| close(ws.donec) |
| if !resuming { |
| w.closingc <- ws |
| } |
| w.wg.Done() |
| }() |
| |
| emptyWr := &WatchResponse{} |
| for { |
| curWr := emptyWr |
| outc := ws.outc |
| |
| if len(ws.buf) > 0 { |
| curWr = ws.buf[0] |
| } else { |
| outc = nil |
| } |
| select { |
| case outc <- *curWr: |
| if ws.buf[0].Err() != nil { |
| return |
| } |
| ws.buf[0] = nil |
| ws.buf = ws.buf[1:] |
| case wr, ok := <-ws.recvc: |
| if !ok { |
| // shutdown from closeSubstream |
| return |
| } |
| |
| if wr.Created { |
| if ws.initReq.retc != nil { |
| ws.initReq.retc <- ws.outc |
| // to prevent next write from taking the slot in buffered channel |
| // and posting duplicate create events |
| ws.initReq.retc = nil |
| |
| // send first creation event only if requested |
| if ws.initReq.createdNotify { |
| ws.outc <- *wr |
| } |
| // once the watch channel is returned, a current revision |
| // watch must resume at the store revision. This is necessary |
| // for the following case to work as expected: |
| // wch := m1.Watch("a") |
| // m2.Put("a", "b") |
| // <-wch |
| // If the revision is only bound on the first observed event, |
| // if wch is disconnected before the Put is issued, then reconnects |
| // after it is committed, it'll miss the Put. |
| if ws.initReq.rev == 0 { |
| nextRev = wr.Header.Revision |
| } |
| } |
| } else { |
| // current progress of watch; <= store revision |
| nextRev = wr.Header.Revision |
| } |
| |
| if len(wr.Events) > 0 { |
| nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 |
| } |
| ws.initReq.rev = nextRev |
| |
| // created event is already sent above, |
| // watcher should not post duplicate events |
| if wr.Created { |
| continue |
| } |
| |
| // TODO pause channel if buffer gets too large |
| ws.buf = append(ws.buf, wr) |
| case <-w.ctx.Done(): |
| return |
| case <-ws.initReq.ctx.Done(): |
| return |
| case <-resumec: |
| resuming = true |
| return |
| } |
| } |
| // lazily send cancel message if events on missing id |
| } |
| |
| func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { |
| // mark all substreams as resuming |
| close(w.resumec) |
| w.resumec = make(chan struct{}) |
| w.joinSubstreams() |
| for _, ws := range w.substreams { |
| ws.id = -1 |
| w.resuming = append(w.resuming, ws) |
| } |
| // strip out nils, if any |
| var resuming []*watcherStream |
| for _, ws := range w.resuming { |
| if ws != nil { |
| resuming = append(resuming, ws) |
| } |
| } |
| w.resuming = resuming |
| w.substreams = make(map[int64]*watcherStream) |
| |
| // connect to grpc stream while accepting watcher cancelation |
| stopc := make(chan struct{}) |
| donec := w.waitCancelSubstreams(stopc) |
| wc, err := w.openWatchClient() |
| close(stopc) |
| <-donec |
| |
| // serve all non-closing streams, even if there's a client error |
| // so that the teardown path can shutdown the streams as expected. |
| for _, ws := range w.resuming { |
| if ws.closing { |
| continue |
| } |
| ws.donec = make(chan struct{}) |
| w.wg.Add(1) |
| go w.serveSubstream(ws, w.resumec) |
| } |
| |
| if err != nil { |
| return nil, v3rpc.Error(err) |
| } |
| |
| // receive data from new grpc stream |
| go w.serveWatchClient(wc) |
| return wc, nil |
| } |
| |
| func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} { |
| var wg sync.WaitGroup |
| wg.Add(len(w.resuming)) |
| donec := make(chan struct{}) |
| for i := range w.resuming { |
| go func(ws *watcherStream) { |
| defer wg.Done() |
| if ws.closing { |
| if ws.initReq.ctx.Err() != nil && ws.outc != nil { |
| close(ws.outc) |
| ws.outc = nil |
| } |
| return |
| } |
| select { |
| case <-ws.initReq.ctx.Done(): |
| // closed ws will be removed from resuming |
| ws.closing = true |
| close(ws.outc) |
| ws.outc = nil |
| w.wg.Add(1) |
| go func() { |
| defer w.wg.Done() |
| w.closingc <- ws |
| }() |
| case <-stopc: |
| } |
| }(w.resuming[i]) |
| } |
| go func() { |
| defer close(donec) |
| wg.Wait() |
| }() |
| return donec |
| } |
| |
| // joinSubstreams waits for all substream goroutines to complete. |
| func (w *watchGrpcStream) joinSubstreams() { |
| for _, ws := range w.substreams { |
| <-ws.donec |
| } |
| for _, ws := range w.resuming { |
| if ws != nil { |
| <-ws.donec |
| } |
| } |
| } |
| |
| var maxBackoff = 100 * time.Millisecond |
| |
| // openWatchClient retries opening a watch client until success or halt. |
| // manually retry in case "ws==nil && err==nil" |
| // TODO: remove FailFast=false |
| func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { |
| backoff := time.Millisecond |
| for { |
| select { |
| case <-w.ctx.Done(): |
| if err == nil { |
| return nil, w.ctx.Err() |
| } |
| return nil, err |
| default: |
| } |
| if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil { |
| break |
| } |
| if isHaltErr(w.ctx, err) { |
| return nil, v3rpc.Error(err) |
| } |
| if isUnavailableErr(w.ctx, err) { |
| // retry, but backoff |
| if backoff < maxBackoff { |
| // 25% backoff factor |
| backoff = backoff + backoff/4 |
| if backoff > maxBackoff { |
| backoff = maxBackoff |
| } |
| } |
| time.Sleep(backoff) |
| } |
| } |
| return ws, nil |
| } |
| |
| // toPB converts an internal watch request structure to its protobuf WatchRequest structure. |
| func (wr *watchRequest) toPB() *pb.WatchRequest { |
| req := &pb.WatchCreateRequest{ |
| StartRevision: wr.rev, |
| Key: []byte(wr.key), |
| RangeEnd: []byte(wr.end), |
| ProgressNotify: wr.progressNotify, |
| Filters: wr.filters, |
| PrevKv: wr.prevKV, |
| Fragment: wr.fragment, |
| } |
| cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} |
| return &pb.WatchRequest{RequestUnion: cr} |
| } |
| |
| // toPB converts an internal progress request structure to its protobuf WatchRequest structure. |
| func (pr *progressRequest) toPB() *pb.WatchRequest { |
| req := &pb.WatchProgressRequest{} |
| cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req} |
| return &pb.WatchRequest{RequestUnion: cr} |
| } |
| |
| func streamKeyFromCtx(ctx context.Context) string { |
| if md, ok := metadata.FromOutgoingContext(ctx); ok { |
| return fmt.Sprintf("%+v", md) |
| } |
| return "" |
| } |