VOL-1497 : Add more control to kv/memory access

- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way

Amendments:

- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter

Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/vendor/go.etcd.io/etcd/clientv3/watch.go b/vendor/go.etcd.io/etcd/clientv3/watch.go
index d763385..8ec58bb 100644
--- a/vendor/go.etcd.io/etcd/clientv3/watch.go
+++ b/vendor/go.etcd.io/etcd/clientv3/watch.go
@@ -16,13 +16,14 @@
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"sync"
 	"time"
 
-	v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
-	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-	mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
+	v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+	mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
 
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
@@ -46,8 +47,33 @@
 	// through the returned channel. If revisions waiting to be sent over the
 	// watch are compacted, then the watch will be canceled by the server, the
 	// client will post a compacted error watch response, and the channel will close.
+	// If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
+	// and "WatchResponse" from this closed channel has zero events and nil "Err()".
+	// The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
+	// to release the associated resources.
+	//
+	// If the context is "context.Background/TODO", returned "WatchChan" will
+	// not be closed and block until event is triggered, except when server
+	// returns a non-recoverable error (e.g. ErrCompacted).
+	// For example, when context passed with "WithRequireLeader" and the
+	// connected server has no leader (e.g. due to network partition),
+	// error "etcdserver: no leader" (ErrNoLeader) will be returned,
+	// and then "WatchChan" is closed with non-nil "Err()".
+	// In order to prevent a watch stream being stuck in a partitioned node,
+	// make sure to wrap context with "WithRequireLeader".
+	//
+	// Otherwise, as long as the context has not been canceled or timed out,
+	// watch will retry on other recoverable errors forever until reconnected.
+	//
+	// TODO: explicitly set context error in the last "WatchResponse" message and close channel?
+	// Currently, client contexts are overwritten with "valCtx" that never closes.
+	// TODO(v3.4): configure watch retry policy, limit maximum retry number
+	// (see https://github.com/etcd-io/etcd/issues/8980)
 	Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
 
+	// RequestProgress requests a progress notify response be sent in all watch channels.
+	RequestProgress(ctx context.Context) error
+
 	// Close closes the watcher and cancels all watch requests.
 	Close() error
 }
@@ -134,7 +160,7 @@
 	resuming []*watcherStream
 
 	// reqc sends a watch request from Watch() to the main goroutine
-	reqc chan *watchRequest
+	reqc chan watchStreamRequest
 	// respc receives data from the watch client
 	respc chan *pb.WatchResponse
 	// donec closes to broadcast shutdown
@@ -152,16 +178,27 @@
 	closeErr error
 }
 
+// watchStreamRequest is a union of the supported watch request operation types
+type watchStreamRequest interface {
+	toPB() *pb.WatchRequest
+}
+
 // watchRequest is issued by the subscriber to start a new watcher
 type watchRequest struct {
 	ctx context.Context
 	key string
 	end string
 	rev int64
+
 	// send created notification event if this field is true
 	createdNotify bool
 	// progressNotify is for progress updates
 	progressNotify bool
+	// fragmentation should be disabled by default
+	// if true, split watch events when total exceeds
+	// "--max-request-bytes" flag value + 512-byte
+	fragment bool
+
 	// filters is the list of events to filter out
 	filters []pb.WatchCreateRequest_FilterType
 	// get the previous key-value pair before the event happens
@@ -170,6 +207,10 @@
 	retc chan chan WatchResponse
 }
 
+// progressRequest is issued by the subscriber to request watch progress
+type progressRequest struct {
+}
+
 // watcherStream represents a registered watcher
 type watcherStream struct {
 	// initReq is the request that initiated this request
@@ -227,7 +268,7 @@
 		cancel:     cancel,
 		substreams: make(map[int64]*watcherStream),
 		respc:      make(chan *pb.WatchResponse),
-		reqc:       make(chan *watchRequest),
+		reqc:       make(chan watchStreamRequest),
 		donec:      make(chan struct{}),
 		errc:       make(chan error, 1),
 		closingc:   make(chan *watcherStream),
@@ -256,6 +297,7 @@
 		end:            string(ow.end),
 		rev:            ow.rev,
 		progressNotify: ow.progressNotify,
+		fragment:       ow.fragment,
 		filters:        filters,
 		prevKV:         ow.prevKV,
 		retc:           make(chan chan WatchResponse, 1),
@@ -292,7 +334,7 @@
 	case <-wr.ctx.Done():
 	case <-donec:
 		if wgs.closeErr != nil {
-			closeCh <- WatchResponse{closeErr: wgs.closeErr}
+			closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
 			break
 		}
 		// retry; may have dropped stream from no ctxs
@@ -307,7 +349,7 @@
 		case <-ctx.Done():
 		case <-donec:
 			if wgs.closeErr != nil {
-				closeCh <- WatchResponse{closeErr: wgs.closeErr}
+				closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
 				break
 			}
 			// retry; may have dropped stream from no ctxs
@@ -332,6 +374,42 @@
 	return err
 }
 
+// RequestProgress requests a progress notify response be sent in all watch channels.
+func (w *watcher) RequestProgress(ctx context.Context) (err error) {
+	ctxKey := streamKeyFromCtx(ctx)
+
+	w.mu.Lock()
+	if w.streams == nil {
+		return fmt.Errorf("no stream found for context")
+	}
+	wgs := w.streams[ctxKey]
+	if wgs == nil {
+		wgs = w.newWatcherGrpcStream(ctx)
+		w.streams[ctxKey] = wgs
+	}
+	donec := wgs.donec
+	reqc := wgs.reqc
+	w.mu.Unlock()
+
+	pr := &progressRequest{}
+
+	select {
+	case reqc <- pr:
+		return nil
+	case <-ctx.Done():
+		if err == nil {
+			return ctx.Err()
+		}
+		return err
+	case <-donec:
+		if wgs.closeErr != nil {
+			return wgs.closeErr
+		}
+		// retry; may have dropped stream from no ctxs
+		return w.RequestProgress(ctx)
+	}
+}
+
 func (w *watchGrpcStream) close() (err error) {
 	w.cancel()
 	<-w.donec
@@ -353,7 +431,9 @@
 }
 
 func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
-	if resp.WatchId == -1 {
+	// check watch ID for backward compatibility (<= v3.3)
+	if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
+		w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
 		// failed; no channel
 		close(ws.recvc)
 		return
@@ -379,7 +459,7 @@
 	}
 	// close subscriber's channel
 	if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
-		go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
+		go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
 	} else if ws.outc != nil {
 		close(ws.outc)
 	}
@@ -434,31 +514,48 @@
 
 	cancelSet := make(map[int64]struct{})
 
+	var cur *pb.WatchResponse
 	for {
 		select {
 		// Watch() requested
-		case wreq := <-w.reqc:
-			outc := make(chan WatchResponse, 1)
-			ws := &watcherStream{
-				initReq: *wreq,
-				id:      -1,
-				outc:    outc,
-				// unbuffered so resumes won't cause repeat events
-				recvc: make(chan *WatchResponse),
+		case req := <-w.reqc:
+			switch wreq := req.(type) {
+			case *watchRequest:
+				outc := make(chan WatchResponse, 1)
+				// TODO: pass custom watch ID?
+				ws := &watcherStream{
+					initReq: *wreq,
+					id:      -1,
+					outc:    outc,
+					// unbuffered so resumes won't cause repeat events
+					recvc: make(chan *WatchResponse),
+				}
+
+				ws.donec = make(chan struct{})
+				w.wg.Add(1)
+				go w.serveSubstream(ws, w.resumec)
+
+				// queue up for watcher creation/resume
+				w.resuming = append(w.resuming, ws)
+				if len(w.resuming) == 1 {
+					// head of resume queue, can register a new watcher
+					wc.Send(ws.initReq.toPB())
+				}
+			case *progressRequest:
+				wc.Send(wreq.toPB())
 			}
 
-			ws.donec = make(chan struct{})
-			w.wg.Add(1)
-			go w.serveSubstream(ws, w.resumec)
-
-			// queue up for watcher creation/resume
-			w.resuming = append(w.resuming, ws)
-			if len(w.resuming) == 1 {
-				// head of resume queue, can register a new watcher
-				wc.Send(ws.initReq.toPB())
-			}
-		// New events from the watch client
+		// new events from the watch client
 		case pbresp := <-w.respc:
+			if cur == nil || pbresp.Created || pbresp.Canceled {
+				cur = pbresp
+			} else if cur != nil && cur.WatchId == pbresp.WatchId {
+				// merge new events
+				cur.Events = append(cur.Events, pbresp.Events...)
+				// update "Fragment" field; last response with "Fragment" == false
+				cur.Fragment = pbresp.Fragment
+			}
+
 			switch {
 			case pbresp.Created:
 				// response to head of queue creation
@@ -467,9 +564,14 @@
 					w.dispatchEvent(pbresp)
 					w.resuming[0] = nil
 				}
+
 				if ws := w.nextResume(); ws != nil {
 					wc.Send(ws.initReq.toPB())
 				}
+
+				// reset for next iteration
+				cur = nil
+
 			case pbresp.Canceled && pbresp.CompactRevision == 0:
 				delete(cancelSet, pbresp.WatchId)
 				if ws, ok := w.substreams[pbresp.WatchId]; ok {
@@ -477,15 +579,31 @@
 					close(ws.recvc)
 					closing[ws] = struct{}{}
 				}
+
+				// reset for next iteration
+				cur = nil
+
+			case cur.Fragment:
+				// watch response events are still fragmented
+				// continue to fetch next fragmented event arrival
+				continue
+
 			default:
 				// dispatch to appropriate watch stream
-				if ok := w.dispatchEvent(pbresp); ok {
+				ok := w.dispatchEvent(cur)
+
+				// reset for next iteration
+				cur = nil
+
+				if ok {
 					break
 				}
+
 				// watch response on unexpected watch id; cancel id
 				if _, ok := cancelSet[pbresp.WatchId]; ok {
 					break
 				}
+
 				cancelSet[pbresp.WatchId] = struct{}{}
 				cr := &pb.WatchRequest_CancelRequest{
 					CancelRequest: &pb.WatchCancelRequest{
@@ -495,6 +613,7 @@
 				req := &pb.WatchRequest{RequestUnion: cr}
 				wc.Send(req)
 			}
+
 		// watch client failed on Recv; spawn another if possible
 		case err := <-w.errc:
 			if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
@@ -508,13 +627,15 @@
 				wc.Send(ws.initReq.toPB())
 			}
 			cancelSet = make(map[int64]struct{})
+
 		case <-w.ctx.Done():
 			return
+
 		case ws := <-w.closingc:
 			w.closeSubstream(ws)
 			delete(closing, ws)
+			// no more watchers on this stream, shutdown
 			if len(w.substreams)+len(w.resuming) == 0 {
-				// no more watchers on this stream, shutdown
 				return
 			}
 		}
@@ -539,6 +660,7 @@
 	for i, ev := range pbresp.Events {
 		events[i] = (*Event)(ev)
 	}
+	// TODO: return watch ID?
 	wr := &WatchResponse{
 		Header:          *pbresp.Header,
 		Events:          events,
@@ -547,7 +669,31 @@
 		Canceled:        pbresp.Canceled,
 		cancelReason:    pbresp.CancelReason,
 	}
-	ws, ok := w.substreams[pbresp.WatchId]
+
+	// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
+	// indicate they should be broadcast.
+	if wr.IsProgressNotify() && pbresp.WatchId == -1 {
+		return w.broadcastResponse(wr)
+	}
+
+	return w.unicastResponse(wr, pbresp.WatchId)
+
+}
+
+// broadcastResponse send a watch response to all watch substreams.
+func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
+	for _, ws := range w.substreams {
+		select {
+		case ws.recvc <- wr:
+		case <-ws.donec:
+		}
+	}
+	return true
+}
+
+// unicastResponse sends a watch response to a specific watch substream.
+func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
+	ws, ok := w.substreams[watchId]
 	if !ok {
 		return false
 	}
@@ -815,11 +961,19 @@
 		ProgressNotify: wr.progressNotify,
 		Filters:        wr.filters,
 		PrevKv:         wr.prevKV,
+		Fragment:       wr.fragment,
 	}
 	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
 	return &pb.WatchRequest{RequestUnion: cr}
 }
 
+// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
+func (pr *progressRequest) toPB() *pb.WatchRequest {
+	req := &pb.WatchProgressRequest{}
+	cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
+	return &pb.WatchRequest{RequestUnion: cr}
+}
+
 func streamKeyFromCtx(ctx context.Context) string {
 	if md, ok := metadata.FromOutgoingContext(ctx); ok {
 		return fmt.Sprintf("%+v", md)