VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/vendor/go.etcd.io/etcd/clientv3/lease.go b/vendor/go.etcd.io/etcd/clientv3/lease.go
index 3729cf3..c2796fc 100644
--- a/vendor/go.etcd.io/etcd/clientv3/lease.go
+++ b/vendor/go.etcd.io/etcd/clientv3/lease.go
@@ -19,9 +19,10 @@
"sync"
"time"
- "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@@ -117,22 +118,21 @@
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
- // KeepAlive keeps the given lease alive forever. If the keepalive response
- // posted to the channel is not consumed immediately, the lease client will
- // continue sending keep alive requests to the etcd server at least every
- // second until latest response is consumed.
+ // KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
+ // to the channel are not consumed promptly the channel may become full. When full, the lease
+ // client will continue sending keep alive requests to the etcd server, but will drop responses
+ // until there is capacity on the channel to send more responses.
+ //
+ // If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or
+ // canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error
+ // containing the error reason.
//
// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
// alive stream is interrupted in some way the client cannot handle itself;
- // given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
- // from this closed channel is nil.
- //
- // If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
- // no leader") or canceled by the caller (e.g. context.Canceled), the error
- // is returned. Otherwise, it retries.
+ // given context "ctx" is canceled or timed out.
//
// TODO(v4.0): post errors to last keep alive message before closing
- // (see https://github.com/coreos/etcd/pull/7866)
+ // (see https://github.com/etcd-io/etcd/pull/7866)
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. The response corresponds to the
@@ -172,6 +172,8 @@
firstKeepAliveOnce sync.Once
callOpts []grpc.CallOption
+
+ lg *zap.Logger
}
// keepAlive multiplexes a keepalive for a lease over multiple channels
@@ -196,6 +198,7 @@
keepAlives: make(map[LeaseID]*keepAlive),
remote: remote,
firstKeepAliveTimeout: keepAliveTimeout,
+ lg: c.lg,
}
if l.firstKeepAliveTimeout == time.Second {
l.firstKeepAliveTimeout = defaultTTL
@@ -291,7 +294,7 @@
}
l.mu.Unlock()
- go l.keepAliveCtxCloser(id, ctx, ka.donec)
+ go l.keepAliveCtxCloser(ctx, id, ka.donec)
l.firstKeepAliveOnce.Do(func() {
go l.recvKeepAliveLoop()
go l.deadlineLoop()
@@ -323,7 +326,7 @@
return nil
}
-func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
+func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
select {
case <-donec:
return
@@ -459,7 +462,6 @@
select {
case <-time.After(retryConnWait):
- continue
case <-l.stopCtx.Done():
return l.stopCtx.Err()
}
@@ -469,7 +471,7 @@
// resetRecv opens a new lease stream and starts sending keep alive requests.
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx)
- stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
+ stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
if err != nil {
cancel()
return nil, err
@@ -518,6 +520,12 @@
select {
case ch <- karesp:
default:
+ if l.lg != nil {
+ l.lg.Warn("lease keepalive response queue is full; dropping response send",
+ zap.Int("queue-size", len(ch)),
+ zap.Int("queue-capacity", cap(ch)),
+ )
+ }
}
// still advance in order to rate-limit keep-alive sends
ka.nextKeepAlive = nextKeepAlive
@@ -569,7 +577,7 @@
}
select {
- case <-time.After(500 * time.Millisecond):
+ case <-time.After(retryConnWait):
case <-stream.Context().Done():
return
case <-l.donec: