VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/vendor/go.etcd.io/etcd/clientv3/watch.go b/vendor/go.etcd.io/etcd/clientv3/watch.go
index d763385..8ec58bb 100644
--- a/vendor/go.etcd.io/etcd/clientv3/watch.go
+++ b/vendor/go.etcd.io/etcd/clientv3/watch.go
@@ -16,13 +16,14 @@
import (
"context"
+ "errors"
"fmt"
"sync"
"time"
- v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
+ v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -46,8 +47,33 @@
// through the returned channel. If revisions waiting to be sent over the
// watch are compacted, then the watch will be canceled by the server, the
// client will post a compacted error watch response, and the channel will close.
+ // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
+ // and "WatchResponse" from this closed channel has zero events and nil "Err()".
+ // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
+ // to release the associated resources.
+ //
+ // If the context is "context.Background/TODO", returned "WatchChan" will
+ // not be closed and block until event is triggered, except when server
+ // returns a non-recoverable error (e.g. ErrCompacted).
+ // For example, when context passed with "WithRequireLeader" and the
+ // connected server has no leader (e.g. due to network partition),
+ // error "etcdserver: no leader" (ErrNoLeader) will be returned,
+ // and then "WatchChan" is closed with non-nil "Err()".
+ // In order to prevent a watch stream being stuck in a partitioned node,
+ // make sure to wrap context with "WithRequireLeader".
+ //
+ // Otherwise, as long as the context has not been canceled or timed out,
+ // watch will retry on other recoverable errors forever until reconnected.
+ //
+ // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
+ // Currently, client contexts are overwritten with "valCtx" that never closes.
+ // TODO(v3.4): configure watch retry policy, limit maximum retry number
+ // (see https://github.com/etcd-io/etcd/issues/8980)
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
+ // RequestProgress requests a progress notify response be sent in all watch channels.
+ RequestProgress(ctx context.Context) error
+
// Close closes the watcher and cancels all watch requests.
Close() error
}
@@ -134,7 +160,7 @@
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
- reqc chan *watchRequest
+ reqc chan watchStreamRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
@@ -152,16 +178,27 @@
closeErr error
}
+// watchStreamRequest is a union of the supported watch request operation types
+type watchStreamRequest interface {
+ toPB() *pb.WatchRequest
+}
+
// watchRequest is issued by the subscriber to start a new watcher
type watchRequest struct {
ctx context.Context
key string
end string
rev int64
+
// send created notification event if this field is true
createdNotify bool
// progressNotify is for progress updates
progressNotify bool
+ // fragmentation should be disabled by default
+ // if true, split watch events when total exceeds
+ // "--max-request-bytes" flag value + 512-byte
+ fragment bool
+
// filters is the list of events to filter out
filters []pb.WatchCreateRequest_FilterType
// get the previous key-value pair before the event happens
@@ -170,6 +207,10 @@
retc chan chan WatchResponse
}
+// progressRequest is issued by the subscriber to request watch progress
+type progressRequest struct {
+}
+
// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
@@ -227,7 +268,7 @@
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
- reqc: make(chan *watchRequest),
+ reqc: make(chan watchStreamRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
@@ -256,6 +297,7 @@
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
+ fragment: ow.fragment,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
@@ -292,7 +334,7 @@
case <-wr.ctx.Done():
case <-donec:
if wgs.closeErr != nil {
- closeCh <- WatchResponse{closeErr: wgs.closeErr}
+ closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
@@ -307,7 +349,7 @@
case <-ctx.Done():
case <-donec:
if wgs.closeErr != nil {
- closeCh <- WatchResponse{closeErr: wgs.closeErr}
+ closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
@@ -332,6 +374,42 @@
return err
}
+// RequestProgress requests a progress notify response be sent in all watch channels.
+func (w *watcher) RequestProgress(ctx context.Context) (err error) {
+ ctxKey := streamKeyFromCtx(ctx)
+
+ w.mu.Lock()
+ if w.streams == nil {
+ return fmt.Errorf("no stream found for context")
+ }
+ wgs := w.streams[ctxKey]
+ if wgs == nil {
+ wgs = w.newWatcherGrpcStream(ctx)
+ w.streams[ctxKey] = wgs
+ }
+ donec := wgs.donec
+ reqc := wgs.reqc
+ w.mu.Unlock()
+
+ pr := &progressRequest{}
+
+ select {
+ case reqc <- pr:
+ return nil
+ case <-ctx.Done():
+ if err == nil {
+ return ctx.Err()
+ }
+ return err
+ case <-donec:
+ if wgs.closeErr != nil {
+ return wgs.closeErr
+ }
+ // retry; may have dropped stream from no ctxs
+ return w.RequestProgress(ctx)
+ }
+}
+
func (w *watchGrpcStream) close() (err error) {
w.cancel()
<-w.donec
@@ -353,7 +431,9 @@
}
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
- if resp.WatchId == -1 {
+ // check watch ID for backward compatibility (<= v3.3)
+ if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
+ w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
// failed; no channel
close(ws.recvc)
return
@@ -379,7 +459,7 @@
}
// close subscriber's channel
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
- go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
+ go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
} else if ws.outc != nil {
close(ws.outc)
}
@@ -434,31 +514,48 @@
cancelSet := make(map[int64]struct{})
+ var cur *pb.WatchResponse
for {
select {
// Watch() requested
- case wreq := <-w.reqc:
- outc := make(chan WatchResponse, 1)
- ws := &watcherStream{
- initReq: *wreq,
- id: -1,
- outc: outc,
- // unbuffered so resumes won't cause repeat events
- recvc: make(chan *WatchResponse),
+ case req := <-w.reqc:
+ switch wreq := req.(type) {
+ case *watchRequest:
+ outc := make(chan WatchResponse, 1)
+ // TODO: pass custom watch ID?
+ ws := &watcherStream{
+ initReq: *wreq,
+ id: -1,
+ outc: outc,
+ // unbuffered so resumes won't cause repeat events
+ recvc: make(chan *WatchResponse),
+ }
+
+ ws.donec = make(chan struct{})
+ w.wg.Add(1)
+ go w.serveSubstream(ws, w.resumec)
+
+ // queue up for watcher creation/resume
+ w.resuming = append(w.resuming, ws)
+ if len(w.resuming) == 1 {
+ // head of resume queue, can register a new watcher
+ wc.Send(ws.initReq.toPB())
+ }
+ case *progressRequest:
+ wc.Send(wreq.toPB())
}
- ws.donec = make(chan struct{})
- w.wg.Add(1)
- go w.serveSubstream(ws, w.resumec)
-
- // queue up for watcher creation/resume
- w.resuming = append(w.resuming, ws)
- if len(w.resuming) == 1 {
- // head of resume queue, can register a new watcher
- wc.Send(ws.initReq.toPB())
- }
- // New events from the watch client
+ // new events from the watch client
case pbresp := <-w.respc:
+ if cur == nil || pbresp.Created || pbresp.Canceled {
+ cur = pbresp
+ } else if cur != nil && cur.WatchId == pbresp.WatchId {
+ // merge new events
+ cur.Events = append(cur.Events, pbresp.Events...)
+ // update "Fragment" field; last response with "Fragment" == false
+ cur.Fragment = pbresp.Fragment
+ }
+
switch {
case pbresp.Created:
// response to head of queue creation
@@ -467,9 +564,14 @@
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
+
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
+
+ // reset for next iteration
+ cur = nil
+
case pbresp.Canceled && pbresp.CompactRevision == 0:
delete(cancelSet, pbresp.WatchId)
if ws, ok := w.substreams[pbresp.WatchId]; ok {
@@ -477,15 +579,31 @@
close(ws.recvc)
closing[ws] = struct{}{}
}
+
+ // reset for next iteration
+ cur = nil
+
+ case cur.Fragment:
+ // watch response events are still fragmented
+ // continue to fetch next fragmented event arrival
+ continue
+
default:
// dispatch to appropriate watch stream
- if ok := w.dispatchEvent(pbresp); ok {
+ ok := w.dispatchEvent(cur)
+
+ // reset for next iteration
+ cur = nil
+
+ if ok {
break
}
+
// watch response on unexpected watch id; cancel id
if _, ok := cancelSet[pbresp.WatchId]; ok {
break
}
+
cancelSet[pbresp.WatchId] = struct{}{}
cr := &pb.WatchRequest_CancelRequest{
CancelRequest: &pb.WatchCancelRequest{
@@ -495,6 +613,7 @@
req := &pb.WatchRequest{RequestUnion: cr}
wc.Send(req)
}
+
// watch client failed on Recv; spawn another if possible
case err := <-w.errc:
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
@@ -508,13 +627,15 @@
wc.Send(ws.initReq.toPB())
}
cancelSet = make(map[int64]struct{})
+
case <-w.ctx.Done():
return
+
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
+ // no more watchers on this stream, shutdown
if len(w.substreams)+len(w.resuming) == 0 {
- // no more watchers on this stream, shutdown
return
}
}
@@ -539,6 +660,7 @@
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
+ // TODO: return watch ID?
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
@@ -547,7 +669,31 @@
Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason,
}
- ws, ok := w.substreams[pbresp.WatchId]
+
+ // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
+ // indicate they should be broadcast.
+ if wr.IsProgressNotify() && pbresp.WatchId == -1 {
+ return w.broadcastResponse(wr)
+ }
+
+ return w.unicastResponse(wr, pbresp.WatchId)
+
+}
+
+// broadcastResponse send a watch response to all watch substreams.
+func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
+ for _, ws := range w.substreams {
+ select {
+ case ws.recvc <- wr:
+ case <-ws.donec:
+ }
+ }
+ return true
+}
+
+// unicastResponse sends a watch response to a specific watch substream.
+func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
+ ws, ok := w.substreams[watchId]
if !ok {
return false
}
@@ -815,11 +961,19 @@
ProgressNotify: wr.progressNotify,
Filters: wr.filters,
PrevKv: wr.prevKV,
+ Fragment: wr.fragment,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
}
+// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
+func (pr *progressRequest) toPB() *pb.WatchRequest {
+ req := &pb.WatchProgressRequest{}
+ cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
+ return &pb.WatchRequest{RequestUnion: cr}
+}
+
func streamKeyFromCtx(ctx context.Context) string {
if md, ok := metadata.FromOutgoingContext(ctx); ok {
return fmt.Sprintf("%+v", md)