VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/vendor/go.etcd.io/etcd/clientv3/README.md b/vendor/go.etcd.io/etcd/clientv3/README.md
index 376bfba..a249b73 100644
--- a/vendor/go.etcd.io/etcd/clientv3/README.md
+++ b/vendor/go.etcd.io/etcd/clientv3/README.md
@@ -1,13 +1,16 @@
# etcd/clientv3
-[![Godoc](https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://godoc.org/github.com/coreos/etcd/clientv3)
+[![Docs](https://readthedocs.org/projects/etcd/badge/?version=latest&style=flat-square)](https://etcd.readthedocs.io/en/latest/?badge=latest)
+[![Godoc](https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://godoc.org/go.etcd.io/etcd/clientv3)
`etcd/clientv3` is the official Go etcd client for v3.
+See https://etcd.readthedocs.io/en/latest for latest client architecture.
+
## Install
```bash
-go get github.com/coreos/etcd/clientv3
+go get go.etcd.io/etcd/clientv3
```
## Get started
@@ -26,7 +29,7 @@
```
etcd v3 uses [`gRPC`](http://www.grpc.io) for remote procedure calls. And `clientv3` uses
-[`grpc-go`](https://github.com/grpc/grpc-go) to connect to etcd. Make sure to close the client after using it.
+[`grpc-go`](https://github.com/grpc/grpc-go) to connect to etcd. Make sure to close the client after using it.
If the client is not closed, the connection will have leaky goroutines. To specify client request timeout,
pass `context.WithTimeout` to APIs:
@@ -40,19 +43,14 @@
// use the response
```
-etcd uses `cmd/vendor` directory to store external dependencies, which are
-to be compiled into etcd release binaries. `client` can be imported without
-vendoring. For full compatibility, it is recommended to vendor builds using
-etcd's vendored packages, using tools like godep, as in
-[vendor directories](https://golang.org/cmd/go/#hdr-Vendor_Directories).
-For more detail, please read [Go vendor design](https://golang.org/s/go15vendor).
+For full compatibility, it is recommended to vendor builds using etcd's vendored packages, using tools like `golang/dep`, as in [vendor directories](https://golang.org/cmd/go/#hdr-Vendor_Directories).
## Error Handling
etcd client returns 2 types of errors:
1. context error: canceled or deadline exceeded.
-2. gRPC error: see [api/v3rpc/rpctypes](https://godoc.org/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes).
+2. gRPC error: see [api/v3rpc/rpctypes](https://godoc.org/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes).
Here is the example code to handle client errors:
@@ -74,12 +72,16 @@
## Metrics
-The etcd client optionally exposes RPC metrics through [go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus). See the [examples](https://github.com/coreos/etcd/blob/master/clientv3/example_metrics_test.go).
+The etcd client optionally exposes RPC metrics through [go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus). See the [examples](https://github.com/etcd-io/etcd/blob/master/clientv3/example_metrics_test.go).
## Namespacing
-The [namespace](https://godoc.org/github.com/coreos/etcd/clientv3/namespace) package provides `clientv3` interface wrappers to transparently isolate client requests to a user-defined prefix.
+The [namespace](https://godoc.org/go.etcd.io/etcd/clientv3/namespace) package provides `clientv3` interface wrappers to transparently isolate client requests to a user-defined prefix.
+
+## Request size limit
+
+Client request size limit is configurable via `clientv3.Config.MaxCallSendMsgSize` and `MaxCallRecvMsgSize` in bytes. If none given, client request send limit defaults to 2 MiB including gRPC overhead bytes. And receive limit defaults to `math.MaxInt32`.
## Examples
-More code examples can be found at [GoDoc](https://godoc.org/github.com/coreos/etcd/clientv3).
+More code examples can be found at [GoDoc](https://godoc.org/go.etcd.io/etcd/clientv3).
diff --git a/vendor/go.etcd.io/etcd/clientv3/auth.go b/vendor/go.etcd.io/etcd/clientv3/auth.go
index 7545bb6..921f50f 100644
--- a/vendor/go.etcd.io/etcd/clientv3/auth.go
+++ b/vendor/go.etcd.io/etcd/clientv3/auth.go
@@ -19,9 +19,8 @@
"fmt"
"strings"
- "github.com/coreos/etcd/auth/authpb"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
-
+ "go.etcd.io/etcd/auth/authpb"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
@@ -100,70 +99,70 @@
RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error)
}
-type auth struct {
+type authClient struct {
remote pb.AuthClient
callOpts []grpc.CallOption
}
func NewAuth(c *Client) Auth {
- api := &auth{remote: RetryAuthClient(c)}
+ api := &authClient{remote: RetryAuthClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
-func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
+func (auth *authClient) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
return (*AuthEnableResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
+func (auth *authClient) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...)
return (*AuthDisableResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
+func (auth *authClient) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
+func (auth *authClient) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...)
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
+func (auth *authClient) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
+func (auth *authClient) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...)
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
+func (auth *authClient) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...)
return (*AuthUserGetResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
+func (auth *authClient) UserList(ctx context.Context) (*AuthUserListResponse, error) {
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...)
return (*AuthUserListResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
+func (auth *authClient) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...)
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
+func (auth *authClient) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...)
return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) {
+func (auth *authClient) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) {
perm := &authpb.Permission{
Key: []byte(key),
RangeEnd: []byte(rangeEnd),
@@ -173,22 +172,22 @@
return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
+func (auth *authClient) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...)
return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
+func (auth *authClient) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...)
return (*AuthRoleListResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
- resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, auth.callOpts...)
+func (auth *authClient) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
+ resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: []byte(key), RangeEnd: []byte(rangeEnd)}, auth.callOpts...)
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
}
-func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
+func (auth *authClient) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...)
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
}
@@ -216,8 +215,8 @@
auth.conn.Close()
}
-func newAuthenticator(endpoint string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
- conn, err := grpc.Dial(endpoint, opts...)
+func newAuthenticator(ctx context.Context, target string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
+ conn, err := grpc.DialContext(ctx, target, opts...)
if err != nil {
return nil, err
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go b/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
new file mode 100644
index 0000000..25dc2b7
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
@@ -0,0 +1,275 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package balancer
+
+import (
+ "fmt"
+ "strconv"
+ "sync"
+ "time"
+
+ "go.etcd.io/etcd/clientv3/balancer/picker"
+
+ "go.uber.org/zap"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/resolver"
+ _ "google.golang.org/grpc/resolver/dns" // register DNS resolver
+ _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
+)
+
+// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
+// must be invoked at initialization time.
+func RegisterBuilder(cfg Config) {
+ bb := &builder{cfg}
+ balancer.Register(bb)
+
+ bb.cfg.Logger.Info(
+ "registered balancer",
+ zap.String("policy", bb.cfg.Policy.String()),
+ zap.String("name", bb.cfg.Name),
+ )
+}
+
+type builder struct {
+ cfg Config
+}
+
+// Build is called initially when creating "ccBalancerWrapper".
+// "grpc.Dial" is called to this client connection.
+// Then, resolved addresses will be handled via "HandleResolvedAddrs".
+func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
+ bb := &baseBalancer{
+ id: strconv.FormatInt(time.Now().UnixNano(), 36),
+ policy: b.cfg.Policy,
+ name: b.cfg.Policy.String(),
+ lg: b.cfg.Logger,
+
+ addrToSc: make(map[resolver.Address]balancer.SubConn),
+ scToAddr: make(map[balancer.SubConn]resolver.Address),
+ scToSt: make(map[balancer.SubConn]connectivity.State),
+
+ currentConn: nil,
+ csEvltr: &connectivityStateEvaluator{},
+
+ // initialize picker always returns "ErrNoSubConnAvailable"
+ Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
+ }
+ if b.cfg.Name != "" {
+ bb.name = b.cfg.Name
+ }
+ if bb.lg == nil {
+ bb.lg = zap.NewNop()
+ }
+
+ // TODO: support multiple connections
+ bb.mu.Lock()
+ bb.currentConn = cc
+ bb.mu.Unlock()
+
+ bb.lg.Info(
+ "built balancer",
+ zap.String("balancer-id", bb.id),
+ zap.String("policy", bb.policy.String()),
+ zap.String("resolver-target", cc.Target()),
+ )
+ return bb
+}
+
+// Name implements "grpc/balancer.Builder" interface.
+func (b *builder) Name() string { return b.cfg.Name }
+
+// Balancer defines client balancer interface.
+type Balancer interface {
+ // Balancer is called on specified client connection. Client initiates gRPC
+ // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
+ // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
+ // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
+ // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
+ // changes, thus requires failover logic in this method.
+ balancer.Balancer
+
+ // Picker calls "Pick" for every client request.
+ picker.Picker
+}
+
+type baseBalancer struct {
+ id string
+ policy picker.Policy
+ name string
+ lg *zap.Logger
+
+ mu sync.RWMutex
+
+ addrToSc map[resolver.Address]balancer.SubConn
+ scToAddr map[balancer.SubConn]resolver.Address
+ scToSt map[balancer.SubConn]connectivity.State
+
+ currentConn balancer.ClientConn
+ currentState connectivity.State
+ csEvltr *connectivityStateEvaluator
+
+ picker.Picker
+}
+
+// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
+// gRPC sends initial or updated resolved addresses from "Build".
+func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
+ if err != nil {
+ bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
+ return
+ }
+ bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
+
+ bb.mu.Lock()
+ defer bb.mu.Unlock()
+
+ resolved := make(map[resolver.Address]struct{})
+ for _, addr := range addrs {
+ resolved[addr] = struct{}{}
+ if _, ok := bb.addrToSc[addr]; !ok {
+ sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
+ if err != nil {
+ bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
+ continue
+ }
+ bb.addrToSc[addr] = sc
+ bb.scToAddr[sc] = addr
+ bb.scToSt[sc] = connectivity.Idle
+ sc.Connect()
+ }
+ }
+
+ for addr, sc := range bb.addrToSc {
+ if _, ok := resolved[addr]; !ok {
+ // was removed by resolver or failed to create subconn
+ bb.currentConn.RemoveSubConn(sc)
+ delete(bb.addrToSc, addr)
+
+ bb.lg.Info(
+ "removed subconn",
+ zap.String("balancer-id", bb.id),
+ zap.String("address", addr.Addr),
+ zap.String("subconn", scToString(sc)),
+ )
+
+ // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
+ // The entry will be deleted in HandleSubConnStateChange.
+ // (DO NOT) delete(bb.scToAddr, sc)
+ // (DO NOT) delete(bb.scToSt, sc)
+ }
+ }
+}
+
+// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
+func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+ bb.mu.Lock()
+ defer bb.mu.Unlock()
+
+ old, ok := bb.scToSt[sc]
+ if !ok {
+ bb.lg.Warn(
+ "state change for an unknown subconn",
+ zap.String("balancer-id", bb.id),
+ zap.String("subconn", scToString(sc)),
+ zap.String("state", s.String()),
+ )
+ return
+ }
+
+ bb.lg.Info(
+ "state changed",
+ zap.String("balancer-id", bb.id),
+ zap.Bool("connected", s == connectivity.Ready),
+ zap.String("subconn", scToString(sc)),
+ zap.String("address", bb.scToAddr[sc].Addr),
+ zap.String("old-state", old.String()),
+ zap.String("new-state", s.String()),
+ )
+
+ bb.scToSt[sc] = s
+ switch s {
+ case connectivity.Idle:
+ sc.Connect()
+ case connectivity.Shutdown:
+ // When an address was removed by resolver, b called RemoveSubConn but
+ // kept the sc's state in scToSt. Remove state for this sc here.
+ delete(bb.scToAddr, sc)
+ delete(bb.scToSt, sc)
+ }
+
+ oldAggrState := bb.currentState
+ bb.currentState = bb.csEvltr.recordTransition(old, s)
+
+ // Regenerate picker when one of the following happens:
+ // - this sc became ready from not-ready
+ // - this sc became not-ready from ready
+ // - the aggregated state of balancer became TransientFailure from non-TransientFailure
+ // - the aggregated state of balancer became non-TransientFailure from TransientFailure
+ if (s == connectivity.Ready) != (old == connectivity.Ready) ||
+ (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
+ bb.regeneratePicker()
+ }
+
+ bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
+ return
+}
+
+func (bb *baseBalancer) regeneratePicker() {
+ if bb.currentState == connectivity.TransientFailure {
+ bb.lg.Info(
+ "generated transient error picker",
+ zap.String("balancer-id", bb.id),
+ zap.String("policy", bb.policy.String()),
+ )
+ bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
+ return
+ }
+
+ // only pass ready subconns to picker
+ scs := make([]balancer.SubConn, 0)
+ addrToSc := make(map[resolver.Address]balancer.SubConn)
+ scToAddr := make(map[balancer.SubConn]resolver.Address)
+ for addr, sc := range bb.addrToSc {
+ if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
+ scs = append(scs, sc)
+ addrToSc[addr] = sc
+ scToAddr[sc] = addr
+ }
+ }
+
+ switch bb.policy {
+ case picker.RoundrobinBalanced:
+ bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
+
+ default:
+ panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
+ }
+
+ bb.lg.Info(
+ "generated picker",
+ zap.String("balancer-id", bb.id),
+ zap.String("policy", bb.policy.String()),
+ zap.Strings("subconn-ready", scsToStrings(addrToSc)),
+ zap.Int("subconn-size", len(addrToSc)),
+ )
+}
+
+// Close implements "grpc/balancer.Balancer" interface.
+// Close is a nop because base balancer doesn't have internal state to clean up,
+// and it doesn't need to call RemoveSubConn for the SubConns.
+func (bb *baseBalancer) Close() {
+ // TODO
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/config.go b/vendor/go.etcd.io/etcd/clientv3/balancer/config.go
new file mode 100644
index 0000000..0339a84
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/config.go
@@ -0,0 +1,36 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package balancer
+
+import (
+ "go.etcd.io/etcd/clientv3/balancer/picker"
+
+ "go.uber.org/zap"
+)
+
+// Config defines balancer configurations.
+type Config struct {
+ // Policy configures balancer policy.
+ Policy picker.Policy
+
+ // Name defines an additional name for balancer.
+ // Useful for balancer testing to avoid register conflicts.
+ // If empty, defaults to policy name.
+ Name string
+
+ // Logger configures balancer logging.
+ // If nil, logs are discarded.
+ Logger *zap.Logger
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity.go b/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity.go
new file mode 100644
index 0000000..6cdeb3f
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity.go
@@ -0,0 +1,58 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package balancer
+
+import "google.golang.org/grpc/connectivity"
+
+// connectivityStateEvaluator gets updated by addrConns when their
+// states transition, based on which it evaluates the state of
+// ClientConn.
+type connectivityStateEvaluator struct {
+ numReady uint64 // Number of addrConns in ready state.
+ numConnecting uint64 // Number of addrConns in connecting state.
+ numTransientFailure uint64 // Number of addrConns in transientFailure.
+}
+
+// recordTransition records state change happening in every subConn and based on
+// that it evaluates what aggregated state should be.
+// It can only transition between Ready, Connecting and TransientFailure. Other states,
+// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
+// before any subConn is created ClientConn is in idle state. In the end when ClientConn
+// closes it is in Shutdown state.
+//
+// recordTransition should only be called synchronously from the same goroutine.
+func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
+ // Update counters.
+ for idx, state := range []connectivity.State{oldState, newState} {
+ updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
+ switch state {
+ case connectivity.Ready:
+ cse.numReady += updateVal
+ case connectivity.Connecting:
+ cse.numConnecting += updateVal
+ case connectivity.TransientFailure:
+ cse.numTransientFailure += updateVal
+ }
+ }
+
+ // Evaluate.
+ if cse.numReady > 0 {
+ return connectivity.Ready
+ }
+ if cse.numConnecting > 0 {
+ return connectivity.Connecting
+ }
+ return connectivity.TransientFailure
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/doc.go b/vendor/go.etcd.io/etcd/clientv3/balancer/doc.go
new file mode 100644
index 0000000..45af5e9
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package balancer implements client balancer.
+package balancer
diff --git a/vendor/go.etcd.io/etcd/clientv3/health_balancer.go b/vendor/go.etcd.io/etcd/clientv3/balancer/grpc1.7-health.go
similarity index 72%
rename from vendor/go.etcd.io/etcd/clientv3/health_balancer.go
rename to vendor/go.etcd.io/etcd/clientv3/balancer/grpc1.7-health.go
index 5918cba..7d24b93 100644
--- a/vendor/go.etcd.io/etcd/clientv3/health_balancer.go
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/grpc1.7-health.go
@@ -1,4 +1,4 @@
-// Copyright 2017 The etcd Authors
+// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package clientv3
+package balancer
import (
"context"
"errors"
+ "io/ioutil"
"net/url"
"strings"
"sync"
@@ -24,10 +25,14 @@
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/grpclog"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)
+// TODO: replace with something better
+var lg = grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard)
+
const (
minHealthRetryDuration = 3 * time.Second
unknownService = "unknown service grpc.health.v1.Health"
@@ -38,18 +43,16 @@
// This error is returned only when opts.BlockingWait is true.
var ErrNoAddrAvilable = status.Error(codes.Unavailable, "there is no address available")
-type healthCheckFunc func(ep string) (bool, error)
-
-type notifyMsg int
+type NotifyMsg int
const (
- notifyReset notifyMsg = iota
- notifyNext
+ NotifyReset NotifyMsg = iota
+ NotifyNext
)
-// healthBalancer does the bare minimum to expose multiple eps
+// GRPC17Health does the bare minimum to expose multiple eps
// to the grpc reconnection code path
-type healthBalancer struct {
+type GRPC17Health struct {
// addrs are the client's endpoint addresses for grpc
addrs []grpc.Address
@@ -64,7 +67,7 @@
readyOnce sync.Once
// healthCheck checks an endpoint's health.
- healthCheck healthCheckFunc
+ healthCheck func(ep string) (bool, error)
healthCheckTimeout time.Duration
unhealthyMu sync.RWMutex
@@ -88,7 +91,7 @@
donec chan struct{}
// updateAddrsC notifies updateNotifyLoop to update addrs.
- updateAddrsC chan notifyMsg
+ updateAddrsC chan NotifyMsg
// grpc issues TLS cert checks using the string passed into dial so
// that string must be the host. To recover the full scheme://host URL,
@@ -102,21 +105,29 @@
closed bool
}
-func newHealthBalancer(eps []string, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
+// DialFunc defines gRPC dial function.
+type DialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)
+
+// NewGRPC17Health returns a new health balancer with gRPC v1.7.
+func NewGRPC17Health(
+ eps []string,
+ timeout time.Duration,
+ dialFunc DialFunc,
+) *GRPC17Health {
notifyCh := make(chan []grpc.Address)
addrs := eps2addrs(eps)
- hb := &healthBalancer{
+ hb := &GRPC17Health{
addrs: addrs,
eps: eps,
notifyCh: notifyCh,
readyc: make(chan struct{}),
- healthCheck: hc,
+ healthCheck: func(ep string) (bool, error) { return grpcHealthCheck(ep, dialFunc) },
unhealthyHostPorts: make(map[string]time.Time),
upc: make(chan struct{}),
stopc: make(chan struct{}),
downc: make(chan struct{}),
donec: make(chan struct{}),
- updateAddrsC: make(chan notifyMsg),
+ updateAddrsC: make(chan NotifyMsg),
hostPort2ep: getHostPort2ep(eps),
}
if timeout < minHealthRetryDuration {
@@ -134,78 +145,81 @@
return hb
}
-func (b *healthBalancer) Start(target string, config grpc.BalancerConfig) error { return nil }
+func (b *GRPC17Health) Start(target string, config grpc.BalancerConfig) error { return nil }
-func (b *healthBalancer) ConnectNotify() <-chan struct{} {
+func (b *GRPC17Health) ConnectNotify() <-chan struct{} {
b.mu.Lock()
defer b.mu.Unlock()
return b.upc
}
-func (b *healthBalancer) ready() <-chan struct{} { return b.readyc }
+func (b *GRPC17Health) UpdateAddrsC() chan NotifyMsg { return b.updateAddrsC }
+func (b *GRPC17Health) StopC() chan struct{} { return b.stopc }
-func (b *healthBalancer) endpoint(hostPort string) string {
+func (b *GRPC17Health) Ready() <-chan struct{} { return b.readyc }
+
+func (b *GRPC17Health) Endpoint(hostPort string) string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.hostPort2ep[hostPort]
}
-func (b *healthBalancer) pinned() string {
+func (b *GRPC17Health) Pinned() string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.pinAddr
}
-func (b *healthBalancer) hostPortError(hostPort string, err error) {
- if b.endpoint(hostPort) == "" {
- logger.Lvl(4).Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error())
+func (b *GRPC17Health) HostPortError(hostPort string, err error) {
+ if b.Endpoint(hostPort) == "" {
+ lg.Infof("clientv3/balancer: %q is stale (skip marking as unhealthy on %q)", hostPort, err.Error())
return
}
b.unhealthyMu.Lock()
b.unhealthyHostPorts[hostPort] = time.Now()
b.unhealthyMu.Unlock()
- logger.Lvl(4).Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error())
+ lg.Infof("clientv3/balancer: %q is marked unhealthy (%q)", hostPort, err.Error())
}
-func (b *healthBalancer) removeUnhealthy(hostPort, msg string) {
- if b.endpoint(hostPort) == "" {
- logger.Lvl(4).Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg)
+func (b *GRPC17Health) removeUnhealthy(hostPort, msg string) {
+ if b.Endpoint(hostPort) == "" {
+ lg.Infof("clientv3/balancer: %q was not in unhealthy (%q)", hostPort, msg)
return
}
b.unhealthyMu.Lock()
delete(b.unhealthyHostPorts, hostPort)
b.unhealthyMu.Unlock()
- logger.Lvl(4).Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg)
+ lg.Infof("clientv3/balancer: %q is removed from unhealthy (%q)", hostPort, msg)
}
-func (b *healthBalancer) countUnhealthy() (count int) {
+func (b *GRPC17Health) countUnhealthy() (count int) {
b.unhealthyMu.RLock()
count = len(b.unhealthyHostPorts)
b.unhealthyMu.RUnlock()
return count
}
-func (b *healthBalancer) isUnhealthy(hostPort string) (unhealthy bool) {
+func (b *GRPC17Health) isUnhealthy(hostPort string) (unhealthy bool) {
b.unhealthyMu.RLock()
_, unhealthy = b.unhealthyHostPorts[hostPort]
b.unhealthyMu.RUnlock()
return unhealthy
}
-func (b *healthBalancer) cleanupUnhealthy() {
+func (b *GRPC17Health) cleanupUnhealthy() {
b.unhealthyMu.Lock()
for k, v := range b.unhealthyHostPorts {
if time.Since(v) > b.healthCheckTimeout {
delete(b.unhealthyHostPorts, k)
- logger.Lvl(4).Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout)
+ lg.Infof("clientv3/balancer: removed %q from unhealthy after %v", k, b.healthCheckTimeout)
}
}
b.unhealthyMu.Unlock()
}
-func (b *healthBalancer) liveAddrs() ([]grpc.Address, map[string]struct{}) {
+func (b *GRPC17Health) liveAddrs() ([]grpc.Address, map[string]struct{}) {
unhealthyCnt := b.countUnhealthy()
b.mu.RLock()
@@ -231,15 +245,15 @@
return addrs, liveHostPorts
}
-func (b *healthBalancer) updateUnhealthy() {
+func (b *GRPC17Health) updateUnhealthy() {
for {
select {
case <-time.After(b.healthCheckTimeout):
b.cleanupUnhealthy()
- pinned := b.pinned()
+ pinned := b.Pinned()
if pinned == "" || b.isUnhealthy(pinned) {
select {
- case b.updateAddrsC <- notifyNext:
+ case b.updateAddrsC <- NotifyNext:
case <-b.stopc:
return
}
@@ -250,7 +264,19 @@
}
}
-func (b *healthBalancer) updateAddrs(eps ...string) {
+// NeedUpdate returns true if all connections are down or
+// addresses do not include current pinned address.
+func (b *GRPC17Health) NeedUpdate() bool {
+ // updating notifyCh can trigger new connections,
+ // need update addrs if all connections are down
+ // or addrs does not include pinAddr.
+ b.mu.RLock()
+ update := !hasAddr(b.addrs, b.pinAddr)
+ b.mu.RUnlock()
+ return update
+}
+
+func (b *GRPC17Health) UpdateAddrs(eps ...string) {
np := getHostPort2ep(eps)
b.mu.Lock()
@@ -278,12 +304,12 @@
b.unhealthyMu.Unlock()
}
-func (b *healthBalancer) next() {
+func (b *GRPC17Health) Next() {
b.mu.RLock()
downc := b.downc
b.mu.RUnlock()
select {
- case b.updateAddrsC <- notifyNext:
+ case b.updateAddrsC <- NotifyNext:
case <-b.stopc:
}
// wait until disconnect so new RPCs are not issued on old connection
@@ -293,7 +319,7 @@
}
}
-func (b *healthBalancer) updateNotifyLoop() {
+func (b *GRPC17Health) updateNotifyLoop() {
defer close(b.donec)
for {
@@ -320,7 +346,7 @@
default:
}
case downc == nil:
- b.notifyAddrs(notifyReset)
+ b.notifyAddrs(NotifyReset)
select {
case <-upc:
case msg := <-b.updateAddrsC:
@@ -338,7 +364,7 @@
}
select {
case <-downc:
- b.notifyAddrs(notifyReset)
+ b.notifyAddrs(NotifyReset)
case msg := <-b.updateAddrsC:
b.notifyAddrs(msg)
case <-b.stopc:
@@ -348,8 +374,8 @@
}
}
-func (b *healthBalancer) notifyAddrs(msg notifyMsg) {
- if msg == notifyNext {
+func (b *GRPC17Health) notifyAddrs(msg NotifyMsg) {
+ if msg == NotifyNext {
select {
case b.notifyCh <- []grpc.Address{}:
case <-b.stopc:
@@ -380,7 +406,7 @@
}
}
-func (b *healthBalancer) Up(addr grpc.Address) func(error) {
+func (b *GRPC17Health) Up(addr grpc.Address) func(error) {
if !b.mayPin(addr) {
return func(err error) {}
}
@@ -402,7 +428,7 @@
}
if b.pinAddr != "" {
- logger.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
+ lg.Infof("clientv3/balancer: %q is up but not pinned (already pinned %q)", addr.Addr, b.pinAddr)
return func(err error) {}
}
@@ -410,7 +436,7 @@
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
- logger.Lvl(4).Infof("clientv3/balancer: pin %q", addr.Addr)
+ lg.Infof("clientv3/balancer: pin %q", addr.Addr)
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
@@ -420,19 +446,19 @@
// timeout will induce a network I/O error, and retrying until success;
// finding healthy endpoint on retry could take several timeouts and redials.
// To avoid wasting retries, gray-list unhealthy endpoints.
- b.hostPortError(addr.Addr, err)
+ b.HostPortError(addr.Addr, err)
b.mu.Lock()
b.upc = make(chan struct{})
close(b.downc)
b.pinAddr = ""
b.mu.Unlock()
- logger.Lvl(4).Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
+ lg.Infof("clientv3/balancer: unpin %q (%q)", addr.Addr, err.Error())
}
}
-func (b *healthBalancer) mayPin(addr grpc.Address) bool {
- if b.endpoint(addr.Addr) == "" { // stale host:port
+func (b *GRPC17Health) mayPin(addr grpc.Address) bool {
+ if b.Endpoint(addr.Addr) == "" { // stale host:port
return false
}
@@ -454,7 +480,7 @@
// 3. grpc-healthcheck still SERVING, thus retry to pin
// instead, return before grpc-healthcheck if failed within healthcheck timeout
if elapsed := time.Since(failedTime); elapsed < b.healthCheckTimeout {
- logger.Lvl(4).Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout)
+ lg.Infof("clientv3/balancer: %q is up but not pinned (failed %v ago, require minimum %v after failure)", addr.Addr, elapsed, b.healthCheckTimeout)
return false
}
@@ -463,11 +489,11 @@
return true
}
- b.hostPortError(addr.Addr, errors.New("health check failed"))
+ b.HostPortError(addr.Addr, errors.New("health check failed"))
return false
}
-func (b *healthBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
+func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
var (
addr string
closed bool
@@ -515,9 +541,9 @@
return grpc.Address{Addr: addr}, func() {}, nil
}
-func (b *healthBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
+func (b *GRPC17Health) Notify() <-chan []grpc.Address { return b.notifyCh }
-func (b *healthBalancer) Close() error {
+func (b *GRPC17Health) Close() error {
b.mu.Lock()
// In case gRPC calls close twice. TODO: remove the checking
// when we are sure that gRPC wont call close twice.
@@ -553,8 +579,8 @@
return nil
}
-func grpcHealthCheck(client *Client, ep string) (bool, error) {
- conn, err := client.dial(ep)
+func grpcHealthCheck(ep string, dialFunc func(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)) (bool, error) {
+ conn, err := dialFunc(ep)
if err != nil {
return false, err
}
@@ -607,3 +633,25 @@
}
return hm
}
+
+func parseEndpoint(endpoint string) (proto string, host string, scheme string) {
+ proto = "tcp"
+ host = endpoint
+ url, uerr := url.Parse(endpoint)
+ if uerr != nil || !strings.Contains(endpoint, "://") {
+ return proto, host, scheme
+ }
+ scheme = url.Scheme
+
+ // strip scheme:// prefix since grpc dials by host
+ host = url.Host
+ switch url.Scheme {
+ case "http", "https":
+ case "unix", "unixs":
+ proto = "unix"
+ host = url.Host + url.Path
+ default:
+ proto, host = "", ""
+ }
+ return proto, host, scheme
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/doc.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/doc.go
new file mode 100644
index 0000000..35dabf5
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package picker defines/implements client balancer picker policy.
+package picker
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go
new file mode 100644
index 0000000..c70ce15
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go
@@ -0,0 +1,34 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package picker
+
+import (
+ "context"
+
+ "google.golang.org/grpc/balancer"
+)
+
+// NewErr returns a picker that always returns err on "Pick".
+func NewErr(err error) Picker {
+ return &errPicker{err: err}
+}
+
+type errPicker struct {
+ err error
+}
+
+func (p *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
+ return nil, nil, p.err
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go
new file mode 100644
index 0000000..7ea761b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go
@@ -0,0 +1,24 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package picker
+
+import (
+ "google.golang.org/grpc/balancer"
+)
+
+// Picker defines balancer Picker methods.
+type Picker interface {
+ balancer.Picker
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker_policy.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker_policy.go
new file mode 100644
index 0000000..463ddc2
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker_policy.go
@@ -0,0 +1,49 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package picker
+
+import "fmt"
+
+// Policy defines balancer picker policy.
+type Policy uint8
+
+const (
+ // TODO: custom picker is not supported yet.
+ // custom defines custom balancer picker.
+ custom Policy = iota
+
+ // RoundrobinBalanced balance loads over multiple endpoints
+ // and implements failover in roundrobin fashion.
+ RoundrobinBalanced Policy = iota
+
+ // TODO: only send loads to pinned address "RoundrobinFailover"
+ // just like how 3.3 client works
+ //
+ // TODO: priotize leader
+ // TODO: health-check
+ // TODO: weighted roundrobin
+ // TODO: power of two random choice
+)
+
+func (p Policy) String() string {
+ switch p {
+ case custom:
+ panic("'custom' picker policy is not supported yet")
+ case RoundrobinBalanced:
+ return "etcd-client-roundrobin-balanced"
+ default:
+ panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go
new file mode 100644
index 0000000..b043d57
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go
@@ -0,0 +1,92 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package picker
+
+import (
+ "context"
+ "sync"
+
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/resolver"
+)
+
+// NewRoundrobinBalanced returns a new roundrobin balanced picker.
+func NewRoundrobinBalanced(
+ lg *zap.Logger,
+ scs []balancer.SubConn,
+ addrToSc map[resolver.Address]balancer.SubConn,
+ scToAddr map[balancer.SubConn]resolver.Address,
+) Picker {
+ return &rrBalanced{
+ lg: lg,
+ scs: scs,
+ addrToSc: addrToSc,
+ scToAddr: scToAddr,
+ }
+}
+
+type rrBalanced struct {
+ lg *zap.Logger
+
+ mu sync.RWMutex
+ next int
+ scs []balancer.SubConn
+
+ addrToSc map[resolver.Address]balancer.SubConn
+ scToAddr map[balancer.SubConn]resolver.Address
+}
+
+// Pick is called for every client request.
+func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
+ rb.mu.RLock()
+ n := len(rb.scs)
+ rb.mu.RUnlock()
+ if n == 0 {
+ return nil, nil, balancer.ErrNoSubConnAvailable
+ }
+
+ rb.mu.Lock()
+ cur := rb.next
+ sc := rb.scs[cur]
+ picked := rb.scToAddr[sc].Addr
+ rb.next = (rb.next + 1) % len(rb.scs)
+ rb.mu.Unlock()
+
+ rb.lg.Debug(
+ "picked",
+ zap.String("address", picked),
+ zap.Int("subconn-index", cur),
+ zap.Int("subconn-size", n),
+ )
+
+ doneFunc := func(info balancer.DoneInfo) {
+ // TODO: error handling?
+ fss := []zapcore.Field{
+ zap.Error(info.Err),
+ zap.String("address", picked),
+ zap.Bool("success", info.Err == nil),
+ zap.Bool("bytes-sent", info.BytesSent),
+ zap.Bool("bytes-received", info.BytesReceived),
+ }
+ if info.Err == nil {
+ rb.lg.Debug("balancer done", fss...)
+ } else {
+ rb.lg.Warn("balancer failed", fss...)
+ }
+ }
+ return sc, doneFunc, nil
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/resolver/endpoint/endpoint.go b/vendor/go.etcd.io/etcd/clientv3/balancer/resolver/endpoint/endpoint.go
new file mode 100644
index 0000000..1f32039
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/resolver/endpoint/endpoint.go
@@ -0,0 +1,240 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<id>/<endpoint>'.
+package endpoint
+
+import (
+ "fmt"
+ "net/url"
+ "strings"
+ "sync"
+
+ "google.golang.org/grpc/resolver"
+)
+
+const scheme = "endpoint"
+
+var (
+ targetPrefix = fmt.Sprintf("%s://", scheme)
+
+ bldr *builder
+)
+
+func init() {
+ bldr = &builder{
+ resolverGroups: make(map[string]*ResolverGroup),
+ }
+ resolver.Register(bldr)
+}
+
+type builder struct {
+ mu sync.RWMutex
+ resolverGroups map[string]*ResolverGroup
+}
+
+// NewResolverGroup creates a new ResolverGroup with the given id.
+func NewResolverGroup(id string) (*ResolverGroup, error) {
+ return bldr.newResolverGroup(id)
+}
+
+// ResolverGroup keeps all endpoints of resolvers using a common endpoint://<id>/ target
+// up-to-date.
+type ResolverGroup struct {
+ mu sync.RWMutex
+ id string
+ endpoints []string
+ resolvers []*Resolver
+}
+
+func (e *ResolverGroup) addResolver(r *Resolver) {
+ e.mu.Lock()
+ addrs := epsToAddrs(e.endpoints...)
+ e.resolvers = append(e.resolvers, r)
+ e.mu.Unlock()
+ r.cc.NewAddress(addrs)
+}
+
+func (e *ResolverGroup) removeResolver(r *Resolver) {
+ e.mu.Lock()
+ for i, er := range e.resolvers {
+ if er == r {
+ e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...)
+ break
+ }
+ }
+ e.mu.Unlock()
+}
+
+// SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated
+// immediately with the new endpoints.
+func (e *ResolverGroup) SetEndpoints(endpoints []string) {
+ addrs := epsToAddrs(endpoints...)
+ e.mu.Lock()
+ e.endpoints = endpoints
+ for _, r := range e.resolvers {
+ r.cc.NewAddress(addrs)
+ }
+ e.mu.Unlock()
+}
+
+// Target constructs a endpoint target using the endpoint id of the ResolverGroup.
+func (e *ResolverGroup) Target(endpoint string) string {
+ return Target(e.id, endpoint)
+}
+
+// Target constructs a endpoint resolver target.
+func Target(id, endpoint string) string {
+ return fmt.Sprintf("%s://%s/%s", scheme, id, endpoint)
+}
+
+// IsTarget checks if a given target string in an endpoint resolver target.
+func IsTarget(target string) bool {
+ return strings.HasPrefix(target, "endpoint://")
+}
+
+func (e *ResolverGroup) Close() {
+ bldr.close(e.id)
+}
+
+// Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target.
+func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
+ if len(target.Authority) < 1 {
+ return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to")
+ }
+ id := target.Authority
+ es, err := b.getResolverGroup(id)
+ if err != nil {
+ return nil, fmt.Errorf("failed to build resolver: %v", err)
+ }
+ r := &Resolver{
+ endpointID: id,
+ cc: cc,
+ }
+ es.addResolver(r)
+ return r, nil
+}
+
+func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) {
+ b.mu.RLock()
+ _, ok := b.resolverGroups[id]
+ b.mu.RUnlock()
+ if ok {
+ return nil, fmt.Errorf("Endpoint already exists for id: %s", id)
+ }
+
+ es := &ResolverGroup{id: id}
+ b.mu.Lock()
+ b.resolverGroups[id] = es
+ b.mu.Unlock()
+ return es, nil
+}
+
+func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) {
+ b.mu.RLock()
+ es, ok := b.resolverGroups[id]
+ b.mu.RUnlock()
+ if !ok {
+ return nil, fmt.Errorf("ResolverGroup not found for id: %s", id)
+ }
+ return es, nil
+}
+
+func (b *builder) close(id string) {
+ b.mu.Lock()
+ delete(b.resolverGroups, id)
+ b.mu.Unlock()
+}
+
+func (b *builder) Scheme() string {
+ return scheme
+}
+
+// Resolver provides a resolver for a single etcd cluster, identified by name.
+type Resolver struct {
+ endpointID string
+ cc resolver.ClientConn
+ sync.RWMutex
+}
+
+// TODO: use balancer.epsToAddrs
+func epsToAddrs(eps ...string) (addrs []resolver.Address) {
+ addrs = make([]resolver.Address, 0, len(eps))
+ for _, ep := range eps {
+ addrs = append(addrs, resolver.Address{Addr: ep})
+ }
+ return addrs
+}
+
+func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}
+
+func (r *Resolver) Close() {
+ es, err := bldr.getResolverGroup(r.endpointID)
+ if err != nil {
+ return
+ }
+ es.removeResolver(r)
+}
+
+// ParseEndpoint endpoint parses an endpoint of the form
+// (http|https)://<host>*|(unix|unixs)://<path>)
+// and returns a protocol ('tcp' or 'unix'),
+// host (or filepath if a unix socket),
+// scheme (http, https, unix, unixs).
+func ParseEndpoint(endpoint string) (proto string, host string, scheme string) {
+ proto = "tcp"
+ host = endpoint
+ url, uerr := url.Parse(endpoint)
+ if uerr != nil || !strings.Contains(endpoint, "://") {
+ return proto, host, scheme
+ }
+ scheme = url.Scheme
+
+ // strip scheme:// prefix since grpc dials by host
+ host = url.Host
+ switch url.Scheme {
+ case "http", "https":
+ case "unix", "unixs":
+ proto = "unix"
+ host = url.Host + url.Path
+ default:
+ proto, host = "", ""
+ }
+ return proto, host, scheme
+}
+
+// ParseTarget parses a endpoint://<id>/<endpoint> string and returns the parsed id and endpoint.
+// If the target is malformed, an error is returned.
+func ParseTarget(target string) (string, string, error) {
+ noPrefix := strings.TrimPrefix(target, targetPrefix)
+ if noPrefix == target {
+ return "", "", fmt.Errorf("malformed target, %s prefix is required: %s", targetPrefix, target)
+ }
+ parts := strings.SplitN(noPrefix, "/", 2)
+ if len(parts) != 2 {
+ return "", "", fmt.Errorf("malformed target, expected %s://<id>/<endpoint>, but got %s", scheme, target)
+ }
+ return parts[0], parts[1], nil
+}
+
+// ParseHostPort splits a "<host>:<port>" string into the host and port parts.
+// The port part is optional.
+func ParseHostPort(hostPort string) (host string, port string) {
+ parts := strings.SplitN(hostPort, ":", 2)
+ host = parts[0]
+ if len(parts) > 1 {
+ port = parts[1]
+ }
+ return host, port
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go b/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go
new file mode 100644
index 0000000..a11faeb
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go
@@ -0,0 +1,68 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package balancer
+
+import (
+ "fmt"
+ "net/url"
+ "sort"
+ "sync/atomic"
+ "time"
+
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/resolver"
+)
+
+func scToString(sc balancer.SubConn) string {
+ return fmt.Sprintf("%p", sc)
+}
+
+func scsToStrings(scs map[resolver.Address]balancer.SubConn) (ss []string) {
+ ss = make([]string, 0, len(scs))
+ for a, sc := range scs {
+ ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc)))
+ }
+ sort.Strings(ss)
+ return ss
+}
+
+func addrsToStrings(addrs []resolver.Address) (ss []string) {
+ ss = make([]string, len(addrs))
+ for i := range addrs {
+ ss[i] = addrs[i].Addr
+ }
+ sort.Strings(ss)
+ return ss
+}
+
+func epsToAddrs(eps ...string) (addrs []resolver.Address) {
+ addrs = make([]resolver.Address, 0, len(eps))
+ for _, ep := range eps {
+ u, err := url.Parse(ep)
+ if err != nil {
+ addrs = append(addrs, resolver.Address{Addr: ep, Type: resolver.Backend})
+ continue
+ }
+ addrs = append(addrs, resolver.Address{Addr: u.Host, Type: resolver.Backend})
+ }
+ return addrs
+}
+
+var genN = new(uint32)
+
+func genName() string {
+ now := time.Now().UnixNano()
+ return fmt.Sprintf("%X%X", now, atomic.AddUint32(genN, 1))
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/client.go b/vendor/go.etcd.io/etcd/clientv3/client.go
index 7132807..276b0f0 100644
--- a/vendor/go.etcd.io/etcd/clientv3/client.go
+++ b/vendor/go.etcd.io/etcd/clientv3/client.go
@@ -21,13 +21,18 @@
"fmt"
"net"
"net/url"
+ "os"
"strconv"
"strings"
"sync"
"time"
- "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
-
+ "github.com/google/uuid"
+ "go.etcd.io/etcd/clientv3/balancer"
+ "go.etcd.io/etcd/clientv3/balancer/picker"
+ "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
+ "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ "go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@@ -39,8 +44,26 @@
var (
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
ErrOldCluster = errors.New("etcdclient: old cluster version")
+
+ roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
)
+func init() {
+ lg := zap.NewNop()
+ if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
+ var err error
+ lg, err = zap.NewProductionConfig().Build() // info level logging
+ if err != nil {
+ panic(err)
+ }
+ }
+ balancer.RegisterBuilder(balancer.Config{
+ Policy: picker.RoundrobinBalanced,
+ Name: roundRobinBalancerName,
+ Logger: lg,
+ })
+}
+
// Client provides and manages an etcd v3 client session.
type Client struct {
Cluster
@@ -50,13 +73,13 @@
Auth
Maintenance
- conn *grpc.ClientConn
- dialerrc chan error
+ conn *grpc.ClientConn
- cfg Config
- creds *credentials.TransportCredentials
- balancer *healthBalancer
- mu *sync.Mutex
+ cfg Config
+ creds *credentials.TransportCredentials
+ balancer balancer.Balancer
+ resolverGroup *endpoint.ResolverGroup
+ mu *sync.Mutex
ctx context.Context
cancel context.CancelFunc
@@ -69,6 +92,8 @@
tokenCred *authTokenCredential
callOpts []grpc.CallOption
+
+ lg *zap.Logger
}
// New creates a new etcdv3 client from a given configuration.
@@ -93,11 +118,19 @@
return New(Config{Endpoints: []string{url}})
}
+// NewFromURLs creates a new etcdv3 client from URLs.
+func NewFromURLs(urls []string) (*Client, error) {
+ return New(Config{Endpoints: urls})
+}
+
// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
c.Watcher.Close()
c.Lease.Close()
+ if c.resolverGroup != nil {
+ c.resolverGroup.Close()
+ }
if c.conn != nil {
return toErr(c.ctx, c.conn.Close())
}
@@ -120,22 +153,9 @@
// SetEndpoints updates client's endpoints.
func (c *Client) SetEndpoints(eps ...string) {
c.mu.Lock()
+ defer c.mu.Unlock()
c.cfg.Endpoints = eps
- c.mu.Unlock()
- c.balancer.updateAddrs(eps...)
-
- // updating notifyCh can trigger new connections,
- // need update addrs if all connections are down
- // or addrs does not include pinAddr.
- c.balancer.mu.RLock()
- update := !hasAddr(c.balancer.addrs, c.balancer.pinAddr)
- c.balancer.mu.RUnlock()
- if update {
- select {
- case c.balancer.updateAddrsC <- notifyNext:
- case <-c.balancer.stopc:
- }
- }
+ c.resolverGroup.SetEndpoints(eps)
}
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
@@ -166,7 +186,7 @@
err := c.Sync(ctx)
cancel()
if err != nil && err != c.ctx.Err() {
- logger.Println("Auto sync endpoints failed:", err)
+ lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err)
}
}
}
@@ -185,32 +205,10 @@
cred.tokenMu.RLock()
defer cred.tokenMu.RUnlock()
return map[string]string{
- "token": cred.token,
+ rpctypes.TokenFieldNameGRPC: cred.token,
}, nil
}
-func parseEndpoint(endpoint string) (proto string, host string, scheme string) {
- proto = "tcp"
- host = endpoint
- url, uerr := url.Parse(endpoint)
- if uerr != nil || !strings.Contains(endpoint, "://") {
- return proto, host, scheme
- }
- scheme = url.Scheme
-
- // strip scheme:// prefix since grpc dials by host
- host = url.Host
- switch url.Scheme {
- case "http", "https":
- case "unix", "unixs":
- proto = "unix"
- host = url.Host + url.Path
- default:
- proto, host = "", ""
- }
- return proto, host, scheme
-}
-
func (c *Client) processCreds(scheme string) (creds *credentials.TransportCredentials) {
creds = c.creds
switch scheme {
@@ -230,63 +228,60 @@
return creds
}
-// dialSetupOpts gives the dial opts prior to any authentication
-func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts []grpc.DialOption) {
- if c.cfg.DialTimeout > 0 {
- opts = []grpc.DialOption{grpc.WithTimeout(c.cfg.DialTimeout)}
- }
+// dialSetupOpts gives the dial opts prior to any authentication.
+func (c *Client) dialSetupOpts(creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
if c.cfg.DialKeepAliveTime > 0 {
params := keepalive.ClientParameters{
- Time: c.cfg.DialKeepAliveTime,
- Timeout: c.cfg.DialKeepAliveTimeout,
+ Time: c.cfg.DialKeepAliveTime,
+ Timeout: c.cfg.DialKeepAliveTimeout,
+ PermitWithoutStream: c.cfg.PermitWithoutStream,
}
opts = append(opts, grpc.WithKeepaliveParams(params))
}
opts = append(opts, dopts...)
- f := func(host string, t time.Duration) (net.Conn, error) {
- proto, host, _ := parseEndpoint(c.balancer.endpoint(host))
- if host == "" && endpoint != "" {
- // dialing an endpoint not in the balancer; use
- // endpoint passed into dial
- proto, host, _ = parseEndpoint(endpoint)
- }
- if proto == "" {
- return nil, fmt.Errorf("unknown scheme for %q", host)
- }
+ // Provide a net dialer that supports cancelation and timeout.
+ f := func(dialEp string, t time.Duration) (net.Conn, error) {
+ proto, host, _ := endpoint.ParseEndpoint(dialEp)
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
default:
}
dialer := &net.Dialer{Timeout: t}
- conn, err := dialer.DialContext(c.ctx, proto, host)
- if err != nil {
- select {
- case c.dialerrc <- err:
- default:
- }
- }
- return conn, err
+ return dialer.DialContext(c.ctx, proto, host)
}
opts = append(opts, grpc.WithDialer(f))
- creds := c.creds
- if _, _, scheme := parseEndpoint(endpoint); len(scheme) != 0 {
- creds = c.processCreds(scheme)
- }
if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(*creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
- return opts
+ // Interceptor retry and backoff.
+ // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
+ // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
+ // once it is available.
+ rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
+ opts = append(opts,
+ // Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
+ // Streams that are safe to retry are enabled individually.
+ grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)),
+ grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)),
+ )
+
+ return opts, nil
}
// Dial connects to a single endpoint using the client's config.
-func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
- return c.dial(endpoint)
+func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
+ creds := c.directDialCreds(ep)
+ // Use the grpc passthrough resolver to directly dial a single endpoint.
+ // This resolver passes through the 'unix' and 'unixs' endpoints schemes used
+ // by etcd without modification, allowing us to directly dial endpoints and
+ // using the same dial functions that we use for load balancer dialing.
+ return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds)
}
func (c *Client) getToken(ctx context.Context) error {
@@ -294,10 +289,19 @@
var auth *authenticator
for i := 0; i < len(c.cfg.Endpoints); i++ {
- endpoint := c.cfg.Endpoints[i]
- host := getHost(endpoint)
+ ep := c.cfg.Endpoints[i]
// use dial options without dopts to avoid reusing the client balancer
- auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint), c)
+ var dOpts []grpc.DialOption
+ _, host, _ := endpoint.ParseEndpoint(ep)
+ target := c.resolverGroup.Target(host)
+ creds := c.dialWithBalancerCreds(ep)
+ dOpts, err = c.dialSetupOpts(creds, c.cfg.DialOptions...)
+ if err != nil {
+ err = fmt.Errorf("failed to configure auth dialer: %v", err)
+ continue
+ }
+ dOpts = append(dOpts, grpc.WithBalancerName(roundRobinBalancerName))
+ auth, err = newAuthenticator(ctx, target, dOpts, c)
if err != nil {
continue
}
@@ -306,6 +310,10 @@
var resp *AuthenticateResponse
resp, err = auth.authenticate(ctx, c.Username, c.Password)
if err != nil {
+ // return err without retrying other endpoints
+ if err == rpctypes.ErrAuthNotEnabled {
+ return err
+ }
continue
}
@@ -319,43 +327,91 @@
return err
}
-func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
- opts := c.dialSetupOpts(endpoint, dopts...)
- host := getHost(endpoint)
+// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
+// of the provided endpoint determines the scheme used for all endpoints of the client connection.
+func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
+ _, host, _ := endpoint.ParseEndpoint(ep)
+ target := c.resolverGroup.Target(host)
+ creds := c.dialWithBalancerCreds(ep)
+ return c.dial(target, creds, dopts...)
+}
+
+// dial configures and dials any grpc balancer target.
+func (c *Client) dial(target string, creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
+ opts, err := c.dialSetupOpts(creds, dopts...)
+ if err != nil {
+ return nil, fmt.Errorf("failed to configure dialer: %v", err)
+ }
+
if c.Username != "" && c.Password != "" {
c.tokenCred = &authTokenCredential{
tokenMu: &sync.RWMutex{},
}
- ctx := c.ctx
+ ctx, cancel := c.ctx, func() {}
if c.cfg.DialTimeout > 0 {
- cctx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
- defer cancel()
- ctx = cctx
+ ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
}
- err := c.getToken(ctx)
+ err = c.getToken(ctx)
if err != nil {
if toErr(ctx, err) != rpctypes.ErrAuthNotEnabled {
if err == ctx.Err() && ctx.Err() != c.ctx.Err() {
err = context.DeadlineExceeded
}
+ cancel()
return nil, err
}
} else {
opts = append(opts, grpc.WithPerRPCCredentials(c.tokenCred))
}
+ cancel()
}
opts = append(opts, c.cfg.DialOptions...)
- conn, err := grpc.DialContext(c.ctx, host, opts...)
+ dctx := c.ctx
+ if c.cfg.DialTimeout > 0 {
+ var cancel context.CancelFunc
+ dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
+ defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
+ }
+
+ conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil {
return nil, err
}
return conn, nil
}
+func (c *Client) directDialCreds(ep string) *credentials.TransportCredentials {
+ _, hostPort, scheme := endpoint.ParseEndpoint(ep)
+ creds := c.creds
+ if len(scheme) != 0 {
+ creds = c.processCreds(scheme)
+ if creds != nil {
+ c := *creds
+ clone := c.Clone()
+ // Set the server name must to the endpoint hostname without port since grpc
+ // otherwise attempts to check if x509 cert is valid for the full endpoint
+ // including the scheme and port, which fails.
+ host, _ := endpoint.ParseHostPort(hostPort)
+ clone.OverrideServerName(host)
+ creds = &clone
+ }
+ }
+ return creds
+}
+
+func (c *Client) dialWithBalancerCreds(ep string) *credentials.TransportCredentials {
+ _, _, scheme := endpoint.ParseEndpoint(ep)
+ creds := c.creds
+ if len(scheme) != 0 {
+ creds = c.processCreds(scheme)
+ }
+ return creds
+}
+
// WithRequireLeader requires client requests to only succeed
// when the cluster has a leader.
func WithRequireLeader(ctx context.Context) context.Context {
@@ -382,7 +438,6 @@
ctx, cancel := context.WithCancel(baseCtx)
client := &Client{
conn: nil,
- dialerrc: make(chan error, 1),
cfg: *cfg,
creds: creds,
ctx: ctx,
@@ -390,6 +445,17 @@
mu: new(sync.Mutex),
callOpts: defaultCallOpts,
}
+
+ lcfg := DefaultLogConfig
+ if cfg.LogConfig != nil {
+ lcfg = *cfg.LogConfig
+ }
+ var err error
+ client.lg, err = lcfg.Build()
+ if err != nil {
+ return nil, err
+ }
+
if cfg.Username != "" && cfg.Password != "" {
client.Username = cfg.Username
client.Password = cfg.Password
@@ -412,42 +478,31 @@
client.callOpts = callOpts
}
- client.balancer = newHealthBalancer(cfg.Endpoints, cfg.DialTimeout, func(ep string) (bool, error) {
- return grpcHealthCheck(client, ep)
- })
-
- // use Endpoints[0] so that for https:// without any tls config given, then
- // grpc will assume the certificate server name is the endpoint host.
- conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
+ // Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
+ // to dial so the client knows to use this resolver.
+ client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String()))
if err != nil {
client.cancel()
- client.balancer.Close()
return nil, err
}
- client.conn = conn
+ client.resolverGroup.SetEndpoints(cfg.Endpoints)
- // wait for a connection
- if cfg.DialTimeout > 0 {
- hasConn := false
- waitc := time.After(cfg.DialTimeout)
- select {
- case <-client.balancer.ready():
- hasConn = true
- case <-ctx.Done():
- case <-waitc:
- }
- if !hasConn {
- err := context.DeadlineExceeded
- select {
- case err = <-client.dialerrc:
- default:
- }
- client.cancel()
- client.balancer.Close()
- conn.Close()
- return nil, err
- }
+ if len(cfg.Endpoints) < 1 {
+ return nil, fmt.Errorf("at least one Endpoint must is required in client config")
}
+ dialEndpoint := cfg.Endpoints[0]
+
+ // Use a provided endpoint target so that for https:// without any tls config given, then
+ // grpc will assume the certificate server name is the endpoint host.
+ conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
+ if err != nil {
+ client.cancel()
+ client.resolverGroup.Close()
+ return nil, err
+ }
+ // TODO: With the old grpc balancer interface, we waited until the dial timeout
+ // for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
+ client.conn = conn
client.Cluster = NewCluster(client)
client.KV = NewKV(client)
@@ -467,6 +522,22 @@
return client, nil
}
+// roundRobinQuorumBackoff retries against quorum between each backoff.
+// This is intended for use with a round robin load balancer.
+func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc {
+ return func(attempt uint) time.Duration {
+ // after each round robin across quorum, backoff for our wait between duration
+ n := uint(len(c.Endpoints()))
+ quorum := (n/2 + 1)
+ if attempt%quorum == 0 {
+ c.lg.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
+ return jitterUp(waitBetween, jitterFraction)
+ }
+ c.lg.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
+ return 0
+ }
+}
+
func (c *Client) checkVersion() (err error) {
var wg sync.WaitGroup
errc := make(chan error, len(c.cfg.Endpoints))
@@ -551,18 +622,19 @@
if _, ok := err.(rpctypes.EtcdError); ok {
return err
}
- ev, _ := status.FromError(err)
- code := ev.Code()
- switch code {
- case codes.DeadlineExceeded:
- fallthrough
- case codes.Canceled:
- if ctx.Err() != nil {
- err = ctx.Err()
+ if ev, ok := status.FromError(err); ok {
+ code := ev.Code()
+ switch code {
+ case codes.DeadlineExceeded:
+ fallthrough
+ case codes.Canceled:
+ if ctx.Err() != nil {
+ err = ctx.Err()
+ }
+ case codes.Unavailable:
+ case codes.FailedPrecondition:
+ err = grpc.ErrClientConnClosing
}
- case codes.Unavailable:
- case codes.FailedPrecondition:
- err = grpc.ErrClientConnClosing
}
return err
}
@@ -574,3 +646,31 @@
return err == context.Canceled || err == context.DeadlineExceeded
}
+
+// IsConnCanceled returns true, if error is from a closed gRPC connection.
+// ref. https://github.com/grpc/grpc-go/pull/1854
+func IsConnCanceled(err error) bool {
+ if err == nil {
+ return false
+ }
+ // >= gRPC v1.10.x
+ s, ok := status.FromError(err)
+ if ok {
+ // connection is canceled or server has already closed the connection
+ return s.Code() == codes.Canceled || s.Message() == "transport is closing"
+ }
+ // >= gRPC v1.10.x
+ if err == context.Canceled {
+ return true
+ }
+ // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
+ return strings.Contains(err.Error(), "grpc: the client connection is closing")
+}
+
+func getHost(ep string) string {
+ url, uerr := url.Parse(ep)
+ if uerr != nil || !strings.Contains(ep, "://") {
+ return ep
+ }
+ return url.Host
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/cluster.go b/vendor/go.etcd.io/etcd/clientv3/cluster.go
index 785672b..d497c05 100644
--- a/vendor/go.etcd.io/etcd/clientv3/cluster.go
+++ b/vendor/go.etcd.io/etcd/clientv3/cluster.go
@@ -17,8 +17,8 @@
import (
"context"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- "github.com/coreos/etcd/pkg/types"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/pkg/types"
"google.golang.org/grpc"
)
diff --git a/vendor/go.etcd.io/etcd/clientv3/compact_op.go b/vendor/go.etcd.io/etcd/clientv3/compact_op.go
index 41e80c1..5779713 100644
--- a/vendor/go.etcd.io/etcd/clientv3/compact_op.go
+++ b/vendor/go.etcd.io/etcd/clientv3/compact_op.go
@@ -15,7 +15,7 @@
package clientv3
import (
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
)
// CompactOp represents a compact operation.
diff --git a/vendor/go.etcd.io/etcd/clientv3/compare.go b/vendor/go.etcd.io/etcd/clientv3/compare.go
index b5f0a25..01ed68e 100644
--- a/vendor/go.etcd.io/etcd/clientv3/compare.go
+++ b/vendor/go.etcd.io/etcd/clientv3/compare.go
@@ -15,7 +15,7 @@
package clientv3
import (
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
)
type CompareTarget int
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/doc.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/doc.go
new file mode 100644
index 0000000..dcdbf51
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/doc.go
@@ -0,0 +1,17 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package concurrency implements concurrency operations on top of
+// etcd such as distributed locks, barriers, and elections.
+package concurrency
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/election.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/election.go
new file mode 100644
index 0000000..2521db6
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/election.go
@@ -0,0 +1,254 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ v3 "go.etcd.io/etcd/clientv3"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/mvcc/mvccpb"
+)
+
+var (
+ ErrElectionNotLeader = errors.New("election: not leader")
+ ErrElectionNoLeader = errors.New("election: no leader")
+)
+
+type Election struct {
+ session *Session
+
+ keyPrefix string
+
+ leaderKey string
+ leaderRev int64
+ leaderSession *Session
+ hdr *pb.ResponseHeader
+}
+
+// NewElection returns a new election on a given key prefix.
+func NewElection(s *Session, pfx string) *Election {
+ return &Election{session: s, keyPrefix: pfx + "/"}
+}
+
+// ResumeElection initializes an election with a known leader.
+func ResumeElection(s *Session, pfx string, leaderKey string, leaderRev int64) *Election {
+ return &Election{
+ keyPrefix: pfx,
+ session: s,
+ leaderKey: leaderKey,
+ leaderRev: leaderRev,
+ leaderSession: s,
+ }
+}
+
+// Campaign puts a value as eligible for the election on the prefix
+// key.
+// Multiple sessions can participate in the election for the
+// same prefix, but only one can be the leader at a time.
+//
+// If the context is 'context.TODO()/context.Background()', the Campaign
+// will continue to be blocked for other keys to be deleted, unless server
+// returns a non-recoverable error (e.g. ErrCompacted).
+// Otherwise, until the context is not cancelled or timed-out, Campaign will
+// continue to be blocked until it becomes the leader.
+func (e *Election) Campaign(ctx context.Context, val string) error {
+ s := e.session
+ client := e.session.Client()
+
+ k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
+ txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
+ txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
+ txn = txn.Else(v3.OpGet(k))
+ resp, err := txn.Commit()
+ if err != nil {
+ return err
+ }
+ e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
+ if !resp.Succeeded {
+ kv := resp.Responses[0].GetResponseRange().Kvs[0]
+ e.leaderRev = kv.CreateRevision
+ if string(kv.Value) != val {
+ if err = e.Proclaim(ctx, val); err != nil {
+ e.Resign(ctx)
+ return err
+ }
+ }
+ }
+
+ _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
+ if err != nil {
+ // clean up in case of context cancel
+ select {
+ case <-ctx.Done():
+ e.Resign(client.Ctx())
+ default:
+ e.leaderSession = nil
+ }
+ return err
+ }
+ e.hdr = resp.Header
+
+ return nil
+}
+
+// Proclaim lets the leader announce a new value without another election.
+func (e *Election) Proclaim(ctx context.Context, val string) error {
+ if e.leaderSession == nil {
+ return ErrElectionNotLeader
+ }
+ client := e.session.Client()
+ cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
+ txn := client.Txn(ctx).If(cmp)
+ txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
+ tresp, terr := txn.Commit()
+ if terr != nil {
+ return terr
+ }
+ if !tresp.Succeeded {
+ e.leaderKey = ""
+ return ErrElectionNotLeader
+ }
+
+ e.hdr = tresp.Header
+ return nil
+}
+
+// Resign lets a leader start a new election.
+func (e *Election) Resign(ctx context.Context) (err error) {
+ if e.leaderSession == nil {
+ return nil
+ }
+ client := e.session.Client()
+ cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
+ resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
+ if err == nil {
+ e.hdr = resp.Header
+ }
+ e.leaderKey = ""
+ e.leaderSession = nil
+ return err
+}
+
+// Leader returns the leader value for the current election.
+func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
+ client := e.session.Client()
+ resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
+ if err != nil {
+ return nil, err
+ } else if len(resp.Kvs) == 0 {
+ // no leader currently elected
+ return nil, ErrElectionNoLeader
+ }
+ return resp, nil
+}
+
+// Observe returns a channel that reliably observes ordered leader proposals
+// as GetResponse values on every current elected leader key. It will not
+// necessarily fetch all historical leader updates, but will always post the
+// most recent leader value.
+//
+// The channel closes when the context is canceled or the underlying watcher
+// is otherwise disrupted.
+func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse {
+ retc := make(chan v3.GetResponse)
+ go e.observe(ctx, retc)
+ return retc
+}
+
+func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
+ client := e.session.Client()
+
+ defer close(ch)
+ for {
+ resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
+ if err != nil {
+ return
+ }
+
+ var kv *mvccpb.KeyValue
+ var hdr *pb.ResponseHeader
+
+ if len(resp.Kvs) == 0 {
+ cctx, cancel := context.WithCancel(ctx)
+ // wait for first key put on prefix
+ opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
+ wch := client.Watch(cctx, e.keyPrefix, opts...)
+ for kv == nil {
+ wr, ok := <-wch
+ if !ok || wr.Err() != nil {
+ cancel()
+ return
+ }
+ // only accept puts; a delete will make observe() spin
+ for _, ev := range wr.Events {
+ if ev.Type == mvccpb.PUT {
+ hdr, kv = &wr.Header, ev.Kv
+ // may have multiple revs; hdr.rev = the last rev
+ // set to kv's rev in case batch has multiple Puts
+ hdr.Revision = kv.ModRevision
+ break
+ }
+ }
+ }
+ cancel()
+ } else {
+ hdr, kv = resp.Header, resp.Kvs[0]
+ }
+
+ select {
+ case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
+ case <-ctx.Done():
+ return
+ }
+
+ cctx, cancel := context.WithCancel(ctx)
+ wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
+ keyDeleted := false
+ for !keyDeleted {
+ wr, ok := <-wch
+ if !ok {
+ cancel()
+ return
+ }
+ for _, ev := range wr.Events {
+ if ev.Type == mvccpb.DELETE {
+ keyDeleted = true
+ break
+ }
+ resp.Header = &wr.Header
+ resp.Kvs = []*mvccpb.KeyValue{ev.Kv}
+ select {
+ case ch <- *resp:
+ case <-cctx.Done():
+ cancel()
+ return
+ }
+ }
+ }
+ cancel()
+ }
+}
+
+// Key returns the leader key if elected, empty string otherwise.
+func (e *Election) Key() string { return e.leaderKey }
+
+// Rev returns the leader key's creation revision, if elected.
+func (e *Election) Rev() int64 { return e.leaderRev }
+
+// Header is the response header from the last successful election proposal.
+func (e *Election) Header() *pb.ResponseHeader { return e.hdr }
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/key.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/key.go
new file mode 100644
index 0000000..e4cf775
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/key.go
@@ -0,0 +1,65 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency
+
+import (
+ "context"
+ "fmt"
+
+ v3 "go.etcd.io/etcd/clientv3"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/mvcc/mvccpb"
+)
+
+func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
+ cctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ var wr v3.WatchResponse
+ wch := client.Watch(cctx, key, v3.WithRev(rev))
+ for wr = range wch {
+ for _, ev := range wr.Events {
+ if ev.Type == mvccpb.DELETE {
+ return nil
+ }
+ }
+ }
+ if err := wr.Err(); err != nil {
+ return err
+ }
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ return fmt.Errorf("lost watcher waiting for delete")
+}
+
+// waitDeletes efficiently waits until all keys matching the prefix and no greater
+// than the create revision.
+func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
+ getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
+ for {
+ resp, err := client.Get(ctx, pfx, getOpts...)
+ if err != nil {
+ return nil, err
+ }
+ if len(resp.Kvs) == 0 {
+ return resp.Header, nil
+ }
+ lastKey := string(resp.Kvs[0].Key)
+ if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
+ return nil, err
+ }
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go
new file mode 100644
index 0000000..0135341
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go
@@ -0,0 +1,117 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ v3 "go.etcd.io/etcd/clientv3"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+)
+
+// Mutex implements the sync Locker interface with etcd
+type Mutex struct {
+ s *Session
+
+ pfx string
+ myKey string
+ myRev int64
+ hdr *pb.ResponseHeader
+}
+
+func NewMutex(s *Session, pfx string) *Mutex {
+ return &Mutex{s, pfx + "/", "", -1, nil}
+}
+
+// Lock locks the mutex with a cancelable context. If the context is canceled
+// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
+func (m *Mutex) Lock(ctx context.Context) error {
+ s := m.s
+ client := m.s.Client()
+
+ m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
+ cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
+ // put self in lock waiters via myKey; oldest waiter holds lock
+ put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
+ // reuse key in case this session already holds the lock
+ get := v3.OpGet(m.myKey)
+ // fetch current holder to complete uncontended path with only one RPC
+ getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
+ resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
+ if err != nil {
+ return err
+ }
+ m.myRev = resp.Header.Revision
+ if !resp.Succeeded {
+ m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
+ }
+ // if no key on prefix / the minimum rev is key, already hold the lock
+ ownerKey := resp.Responses[1].GetResponseRange().Kvs
+ if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
+ m.hdr = resp.Header
+ return nil
+ }
+
+ // wait for deletion revisions prior to myKey
+ hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
+ // release lock key if wait failed
+ if werr != nil {
+ m.Unlock(client.Ctx())
+ } else {
+ m.hdr = hdr
+ }
+ return werr
+}
+
+func (m *Mutex) Unlock(ctx context.Context) error {
+ client := m.s.Client()
+ if _, err := client.Delete(ctx, m.myKey); err != nil {
+ return err
+ }
+ m.myKey = "\x00"
+ m.myRev = -1
+ return nil
+}
+
+func (m *Mutex) IsOwner() v3.Cmp {
+ return v3.Compare(v3.CreateRevision(m.myKey), "=", m.myRev)
+}
+
+func (m *Mutex) Key() string { return m.myKey }
+
+// Header is the response header received from etcd on acquiring the lock.
+func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }
+
+type lockerMutex struct{ *Mutex }
+
+func (lm *lockerMutex) Lock() {
+ client := lm.s.Client()
+ if err := lm.Mutex.Lock(client.Ctx()); err != nil {
+ panic(err)
+ }
+}
+func (lm *lockerMutex) Unlock() {
+ client := lm.s.Client()
+ if err := lm.Mutex.Unlock(client.Ctx()); err != nil {
+ panic(err)
+ }
+}
+
+// NewLocker creates a sync.Locker backed by an etcd mutex.
+func NewLocker(s *Session, pfx string) sync.Locker {
+ return &lockerMutex{NewMutex(s, pfx)}
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/session.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/session.go
new file mode 100644
index 0000000..598ec0e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/session.go
@@ -0,0 +1,141 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency
+
+import (
+ "context"
+ "time"
+
+ v3 "go.etcd.io/etcd/clientv3"
+)
+
+const defaultSessionTTL = 60
+
+// Session represents a lease kept alive for the lifetime of a client.
+// Fault-tolerant applications may use sessions to reason about liveness.
+type Session struct {
+ client *v3.Client
+ opts *sessionOptions
+ id v3.LeaseID
+
+ cancel context.CancelFunc
+ donec <-chan struct{}
+}
+
+// NewSession gets the leased session for a client.
+func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
+ ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
+ for _, opt := range opts {
+ opt(ops)
+ }
+
+ id := ops.leaseID
+ if id == v3.NoLease {
+ resp, err := client.Grant(ops.ctx, int64(ops.ttl))
+ if err != nil {
+ return nil, err
+ }
+ id = v3.LeaseID(resp.ID)
+ }
+
+ ctx, cancel := context.WithCancel(ops.ctx)
+ keepAlive, err := client.KeepAlive(ctx, id)
+ if err != nil || keepAlive == nil {
+ cancel()
+ return nil, err
+ }
+
+ donec := make(chan struct{})
+ s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
+
+ // keep the lease alive until client error or cancelled context
+ go func() {
+ defer close(donec)
+ for range keepAlive {
+ // eat messages until keep alive channel closes
+ }
+ }()
+
+ return s, nil
+}
+
+// Client is the etcd client that is attached to the session.
+func (s *Session) Client() *v3.Client {
+ return s.client
+}
+
+// Lease is the lease ID for keys bound to the session.
+func (s *Session) Lease() v3.LeaseID { return s.id }
+
+// Done returns a channel that closes when the lease is orphaned, expires, or
+// is otherwise no longer being refreshed.
+func (s *Session) Done() <-chan struct{} { return s.donec }
+
+// Orphan ends the refresh for the session lease. This is useful
+// in case the state of the client connection is indeterminate (revoke
+// would fail) or when transferring lease ownership.
+func (s *Session) Orphan() {
+ s.cancel()
+ <-s.donec
+}
+
+// Close orphans the session and revokes the session lease.
+func (s *Session) Close() error {
+ s.Orphan()
+ // if revoke takes longer than the ttl, lease is expired anyway
+ ctx, cancel := context.WithTimeout(s.opts.ctx, time.Duration(s.opts.ttl)*time.Second)
+ _, err := s.client.Revoke(ctx, s.id)
+ cancel()
+ return err
+}
+
+type sessionOptions struct {
+ ttl int
+ leaseID v3.LeaseID
+ ctx context.Context
+}
+
+// SessionOption configures Session.
+type SessionOption func(*sessionOptions)
+
+// WithTTL configures the session's TTL in seconds.
+// If TTL is <= 0, the default 60 seconds TTL will be used.
+func WithTTL(ttl int) SessionOption {
+ return func(so *sessionOptions) {
+ if ttl > 0 {
+ so.ttl = ttl
+ }
+ }
+}
+
+// WithLease specifies the existing leaseID to be used for the session.
+// This is useful in process restart scenario, for example, to reclaim
+// leadership from an election prior to restart.
+func WithLease(leaseID v3.LeaseID) SessionOption {
+ return func(so *sessionOptions) {
+ so.leaseID = leaseID
+ }
+}
+
+// WithContext assigns a context to the session instead of defaulting to
+// using the client context. This is useful for canceling NewSession and
+// Close operations immediately without having to close the client. If the
+// context is canceled before Close() completes, the session's lease will be
+// abandoned and left to expire instead of being revoked.
+func WithContext(ctx context.Context) SessionOption {
+ return func(so *sessionOptions) {
+ so.ctx = ctx
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/stm.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/stm.go
new file mode 100644
index 0000000..ee11510
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/stm.go
@@ -0,0 +1,387 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency
+
+import (
+ "context"
+ "math"
+
+ v3 "go.etcd.io/etcd/clientv3"
+)
+
+// STM is an interface for software transactional memory.
+type STM interface {
+ // Get returns the value for a key and inserts the key in the txn's read set.
+ // If Get fails, it aborts the transaction with an error, never returning.
+ Get(key ...string) string
+ // Put adds a value for a key to the write set.
+ Put(key, val string, opts ...v3.OpOption)
+ // Rev returns the revision of a key in the read set.
+ Rev(key string) int64
+ // Del deletes a key.
+ Del(key string)
+
+ // commit attempts to apply the txn's changes to the server.
+ commit() *v3.TxnResponse
+ reset()
+}
+
+// Isolation is an enumeration of transactional isolation levels which
+// describes how transactions should interfere and conflict.
+type Isolation int
+
+const (
+ // SerializableSnapshot provides serializable isolation and also checks
+ // for write conflicts.
+ SerializableSnapshot Isolation = iota
+ // Serializable reads within the same transaction attempt return data
+ // from the at the revision of the first read.
+ Serializable
+ // RepeatableReads reads within the same transaction attempt always
+ // return the same data.
+ RepeatableReads
+ // ReadCommitted reads keys from any committed revision.
+ ReadCommitted
+)
+
+// stmError safely passes STM errors through panic to the STM error channel.
+type stmError struct{ err error }
+
+type stmOptions struct {
+ iso Isolation
+ ctx context.Context
+ prefetch []string
+}
+
+type stmOption func(*stmOptions)
+
+// WithIsolation specifies the transaction isolation level.
+func WithIsolation(lvl Isolation) stmOption {
+ return func(so *stmOptions) { so.iso = lvl }
+}
+
+// WithAbortContext specifies the context for permanently aborting the transaction.
+func WithAbortContext(ctx context.Context) stmOption {
+ return func(so *stmOptions) { so.ctx = ctx }
+}
+
+// WithPrefetch is a hint to prefetch a list of keys before trying to apply.
+// If an STM transaction will unconditionally fetch a set of keys, prefetching
+// those keys will save the round-trip cost from requesting each key one by one
+// with Get().
+func WithPrefetch(keys ...string) stmOption {
+ return func(so *stmOptions) { so.prefetch = append(so.prefetch, keys...) }
+}
+
+// NewSTM initiates a new STM instance, using serializable snapshot isolation by default.
+func NewSTM(c *v3.Client, apply func(STM) error, so ...stmOption) (*v3.TxnResponse, error) {
+ opts := &stmOptions{ctx: c.Ctx()}
+ for _, f := range so {
+ f(opts)
+ }
+ if len(opts.prefetch) != 0 {
+ f := apply
+ apply = func(s STM) error {
+ s.Get(opts.prefetch...)
+ return f(s)
+ }
+ }
+ return runSTM(mkSTM(c, opts), apply)
+}
+
+func mkSTM(c *v3.Client, opts *stmOptions) STM {
+ switch opts.iso {
+ case SerializableSnapshot:
+ s := &stmSerializable{
+ stm: stm{client: c, ctx: opts.ctx},
+ prefetch: make(map[string]*v3.GetResponse),
+ }
+ s.conflicts = func() []v3.Cmp {
+ return append(s.rset.cmps(), s.wset.cmps(s.rset.first()+1)...)
+ }
+ return s
+ case Serializable:
+ s := &stmSerializable{
+ stm: stm{client: c, ctx: opts.ctx},
+ prefetch: make(map[string]*v3.GetResponse),
+ }
+ s.conflicts = func() []v3.Cmp { return s.rset.cmps() }
+ return s
+ case RepeatableReads:
+ s := &stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
+ s.conflicts = func() []v3.Cmp { return s.rset.cmps() }
+ return s
+ case ReadCommitted:
+ s := &stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
+ s.conflicts = func() []v3.Cmp { return nil }
+ return s
+ default:
+ panic("unsupported stm")
+ }
+}
+
+type stmResponse struct {
+ resp *v3.TxnResponse
+ err error
+}
+
+func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) {
+ outc := make(chan stmResponse, 1)
+ go func() {
+ defer func() {
+ if r := recover(); r != nil {
+ e, ok := r.(stmError)
+ if !ok {
+ // client apply panicked
+ panic(r)
+ }
+ outc <- stmResponse{nil, e.err}
+ }
+ }()
+ var out stmResponse
+ for {
+ s.reset()
+ if out.err = apply(s); out.err != nil {
+ break
+ }
+ if out.resp = s.commit(); out.resp != nil {
+ break
+ }
+ }
+ outc <- out
+ }()
+ r := <-outc
+ return r.resp, r.err
+}
+
+// stm implements repeatable-read software transactional memory over etcd
+type stm struct {
+ client *v3.Client
+ ctx context.Context
+ // rset holds read key values and revisions
+ rset readSet
+ // wset holds overwritten keys and their values
+ wset writeSet
+ // getOpts are the opts used for gets
+ getOpts []v3.OpOption
+ // conflicts computes the current conflicts on the txn
+ conflicts func() []v3.Cmp
+}
+
+type stmPut struct {
+ val string
+ op v3.Op
+}
+
+type readSet map[string]*v3.GetResponse
+
+func (rs readSet) add(keys []string, txnresp *v3.TxnResponse) {
+ for i, resp := range txnresp.Responses {
+ rs[keys[i]] = (*v3.GetResponse)(resp.GetResponseRange())
+ }
+}
+
+// first returns the store revision from the first fetch
+func (rs readSet) first() int64 {
+ ret := int64(math.MaxInt64 - 1)
+ for _, resp := range rs {
+ if rev := resp.Header.Revision; rev < ret {
+ ret = rev
+ }
+ }
+ return ret
+}
+
+// cmps guards the txn from updates to read set
+func (rs readSet) cmps() []v3.Cmp {
+ cmps := make([]v3.Cmp, 0, len(rs))
+ for k, rk := range rs {
+ cmps = append(cmps, isKeyCurrent(k, rk))
+ }
+ return cmps
+}
+
+type writeSet map[string]stmPut
+
+func (ws writeSet) get(keys ...string) *stmPut {
+ for _, key := range keys {
+ if wv, ok := ws[key]; ok {
+ return &wv
+ }
+ }
+ return nil
+}
+
+// cmps returns a cmp list testing no writes have happened past rev
+func (ws writeSet) cmps(rev int64) []v3.Cmp {
+ cmps := make([]v3.Cmp, 0, len(ws))
+ for key := range ws {
+ cmps = append(cmps, v3.Compare(v3.ModRevision(key), "<", rev))
+ }
+ return cmps
+}
+
+// puts is the list of ops for all pending writes
+func (ws writeSet) puts() []v3.Op {
+ puts := make([]v3.Op, 0, len(ws))
+ for _, v := range ws {
+ puts = append(puts, v.op)
+ }
+ return puts
+}
+
+func (s *stm) Get(keys ...string) string {
+ if wv := s.wset.get(keys...); wv != nil {
+ return wv.val
+ }
+ return respToValue(s.fetch(keys...))
+}
+
+func (s *stm) Put(key, val string, opts ...v3.OpOption) {
+ s.wset[key] = stmPut{val, v3.OpPut(key, val, opts...)}
+}
+
+func (s *stm) Del(key string) { s.wset[key] = stmPut{"", v3.OpDelete(key)} }
+
+func (s *stm) Rev(key string) int64 {
+ if resp := s.fetch(key); resp != nil && len(resp.Kvs) != 0 {
+ return resp.Kvs[0].ModRevision
+ }
+ return 0
+}
+
+func (s *stm) commit() *v3.TxnResponse {
+ txnresp, err := s.client.Txn(s.ctx).If(s.conflicts()...).Then(s.wset.puts()...).Commit()
+ if err != nil {
+ panic(stmError{err})
+ }
+ if txnresp.Succeeded {
+ return txnresp
+ }
+ return nil
+}
+
+func (s *stm) fetch(keys ...string) *v3.GetResponse {
+ if len(keys) == 0 {
+ return nil
+ }
+ ops := make([]v3.Op, len(keys))
+ for i, key := range keys {
+ if resp, ok := s.rset[key]; ok {
+ return resp
+ }
+ ops[i] = v3.OpGet(key, s.getOpts...)
+ }
+ txnresp, err := s.client.Txn(s.ctx).Then(ops...).Commit()
+ if err != nil {
+ panic(stmError{err})
+ }
+ s.rset.add(keys, txnresp)
+ return (*v3.GetResponse)(txnresp.Responses[0].GetResponseRange())
+}
+
+func (s *stm) reset() {
+ s.rset = make(map[string]*v3.GetResponse)
+ s.wset = make(map[string]stmPut)
+}
+
+type stmSerializable struct {
+ stm
+ prefetch map[string]*v3.GetResponse
+}
+
+func (s *stmSerializable) Get(keys ...string) string {
+ if wv := s.wset.get(keys...); wv != nil {
+ return wv.val
+ }
+ firstRead := len(s.rset) == 0
+ for _, key := range keys {
+ if resp, ok := s.prefetch[key]; ok {
+ delete(s.prefetch, key)
+ s.rset[key] = resp
+ }
+ }
+ resp := s.stm.fetch(keys...)
+ if firstRead {
+ // txn's base revision is defined by the first read
+ s.getOpts = []v3.OpOption{
+ v3.WithRev(resp.Header.Revision),
+ v3.WithSerializable(),
+ }
+ }
+ return respToValue(resp)
+}
+
+func (s *stmSerializable) Rev(key string) int64 {
+ s.Get(key)
+ return s.stm.Rev(key)
+}
+
+func (s *stmSerializable) gets() ([]string, []v3.Op) {
+ keys := make([]string, 0, len(s.rset))
+ ops := make([]v3.Op, 0, len(s.rset))
+ for k := range s.rset {
+ keys = append(keys, k)
+ ops = append(ops, v3.OpGet(k))
+ }
+ return keys, ops
+}
+
+func (s *stmSerializable) commit() *v3.TxnResponse {
+ keys, getops := s.gets()
+ txn := s.client.Txn(s.ctx).If(s.conflicts()...).Then(s.wset.puts()...)
+ // use Else to prefetch keys in case of conflict to save a round trip
+ txnresp, err := txn.Else(getops...).Commit()
+ if err != nil {
+ panic(stmError{err})
+ }
+ if txnresp.Succeeded {
+ return txnresp
+ }
+ // load prefetch with Else data
+ s.rset.add(keys, txnresp)
+ s.prefetch = s.rset
+ s.getOpts = nil
+ return nil
+}
+
+func isKeyCurrent(k string, r *v3.GetResponse) v3.Cmp {
+ if len(r.Kvs) != 0 {
+ return v3.Compare(v3.ModRevision(k), "=", r.Kvs[0].ModRevision)
+ }
+ return v3.Compare(v3.ModRevision(k), "=", 0)
+}
+
+func respToValue(resp *v3.GetResponse) string {
+ if resp == nil || len(resp.Kvs) == 0 {
+ return ""
+ }
+ return string(resp.Kvs[0].Value)
+}
+
+// NewSTMRepeatable is deprecated.
+func NewSTMRepeatable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
+ return NewSTM(c, apply, WithAbortContext(ctx), WithIsolation(RepeatableReads))
+}
+
+// NewSTMSerializable is deprecated.
+func NewSTMSerializable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
+ return NewSTM(c, apply, WithAbortContext(ctx), WithIsolation(Serializable))
+}
+
+// NewSTMReadCommitted is deprecated.
+func NewSTMReadCommitted(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
+ return NewSTM(c, apply, WithAbortContext(ctx), WithIsolation(ReadCommitted))
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/config.go b/vendor/go.etcd.io/etcd/clientv3/config.go
index 79d6e2a..96e94e1 100644
--- a/vendor/go.etcd.io/etcd/clientv3/config.go
+++ b/vendor/go.etcd.io/etcd/clientv3/config.go
@@ -19,6 +19,7 @@
"crypto/tls"
"time"
+ "go.uber.org/zap"
"google.golang.org/grpc"
)
@@ -72,4 +73,30 @@
// Context is the default client context; it can be used to cancel grpc dial out and
// other operations that do not have an explicit context.
Context context.Context
+
+ // LogConfig configures client-side logger.
+ // If nil, use the default logger.
+ // TODO: configure gRPC logger
+ LogConfig *zap.Config
+
+ // PermitWithoutStream when set will allow client to send keepalive pings to server without any active streams(RPCs).
+ PermitWithoutStream bool `json:"permit-without-stream"`
+}
+
+// DefaultLogConfig is the default client logging configuration.
+// Default log level is "Warn". Use "zap.InfoLevel" for debugging.
+// Use "/dev/null" for output paths, to discard all logs.
+var DefaultLogConfig = zap.Config{
+ Level: zap.NewAtomicLevelAt(zap.WarnLevel),
+ Development: false,
+ Sampling: &zap.SamplingConfig{
+ Initial: 100,
+ Thereafter: 100,
+ },
+ Encoding: "json",
+ EncoderConfig: zap.NewProductionEncoderConfig(),
+
+ // Use "/dev/null" to discard all
+ OutputPaths: []string{"stderr"},
+ ErrorOutputPaths: []string{"stderr"},
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/doc.go b/vendor/go.etcd.io/etcd/clientv3/doc.go
index 717fbe4..01a3f59 100644
--- a/vendor/go.etcd.io/etcd/clientv3/doc.go
+++ b/vendor/go.etcd.io/etcd/clientv3/doc.go
@@ -19,7 +19,7 @@
// // expect dial time-out on ipv4 blackhole
// _, err := clientv3.New(clientv3.Config{
// Endpoints: []string{"http://254.0.0.1:12345"},
-// DialTimeout: 2 * time.Second
+// DialTimeout: 2 * time.Second,
// })
//
// // etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3
@@ -61,7 +61,7 @@
//
// 1. context error: canceled or deadline exceeded.
// 2. gRPC status error: e.g. when clock drifts in server-side before client's context deadline exceeded.
-// 3. gRPC error: see https://github.com/coreos/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
+// 3. gRPC error: see https://github.com/etcd-io/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
//
// Here is the example code to handle client errors:
//
@@ -71,14 +71,14 @@
// // ctx is canceled by another routine
// } else if err == context.DeadlineExceeded {
// // ctx is attached with a deadline and it exceeded
+// } else if err == rpctypes.ErrEmptyKey {
+// // client-side error: key is not provided
// } else if ev, ok := status.FromError(err); ok {
// code := ev.Code()
// if code == codes.DeadlineExceeded {
// // server-side context might have timed-out first (due to clock skew)
// // while original client-side context is not timed-out yet
// }
-// } else if verr, ok := err.(*v3rpc.ErrEmptyKey); ok {
-// // process (verr.Errors)
// } else {
// // bad cluster endpoints, which are not etcd servers
// }
@@ -87,11 +87,20 @@
// go func() { cli.Close() }()
// _, err := kvc.Get(ctx, "a")
// if err != nil {
+// // with etcd clientv3 <= v3.3
// if err == context.Canceled {
// // grpc balancer calls 'Get' with an inflight client.Close
// } else if err == grpc.ErrClientConnClosing {
// // grpc balancer calls 'Get' after client.Close.
// }
+// // with etcd clientv3 >= v3.4
+// if clientv3.IsConnCanceled(err) {
+// // gRPC client connection is closed
+// }
// }
//
+// The grpc load balancer is registered statically and is shared across etcd clients.
+// To enable detailed load balancer logging, set the ETCD_CLIENT_DEBUG environment
+// variable. E.g. "ETCD_CLIENT_DEBUG=1".
+//
package clientv3
diff --git a/vendor/go.etcd.io/etcd/clientv3/kv.go b/vendor/go.etcd.io/etcd/clientv3/kv.go
index 5a7469b..2b7864a 100644
--- a/vendor/go.etcd.io/etcd/clientv3/kv.go
+++ b/vendor/go.etcd.io/etcd/clientv3/kv.go
@@ -17,7 +17,7 @@
import (
"context"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
diff --git a/vendor/go.etcd.io/etcd/clientv3/lease.go b/vendor/go.etcd.io/etcd/clientv3/lease.go
index 3729cf3..c2796fc 100644
--- a/vendor/go.etcd.io/etcd/clientv3/lease.go
+++ b/vendor/go.etcd.io/etcd/clientv3/lease.go
@@ -19,9 +19,10 @@
"sync"
"time"
- "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@@ -117,22 +118,21 @@
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
- // KeepAlive keeps the given lease alive forever. If the keepalive response
- // posted to the channel is not consumed immediately, the lease client will
- // continue sending keep alive requests to the etcd server at least every
- // second until latest response is consumed.
+ // KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
+ // to the channel are not consumed promptly the channel may become full. When full, the lease
+ // client will continue sending keep alive requests to the etcd server, but will drop responses
+ // until there is capacity on the channel to send more responses.
+ //
+ // If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or
+ // canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error
+ // containing the error reason.
//
// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
// alive stream is interrupted in some way the client cannot handle itself;
- // given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
- // from this closed channel is nil.
- //
- // If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
- // no leader") or canceled by the caller (e.g. context.Canceled), the error
- // is returned. Otherwise, it retries.
+ // given context "ctx" is canceled or timed out.
//
// TODO(v4.0): post errors to last keep alive message before closing
- // (see https://github.com/coreos/etcd/pull/7866)
+ // (see https://github.com/etcd-io/etcd/pull/7866)
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. The response corresponds to the
@@ -172,6 +172,8 @@
firstKeepAliveOnce sync.Once
callOpts []grpc.CallOption
+
+ lg *zap.Logger
}
// keepAlive multiplexes a keepalive for a lease over multiple channels
@@ -196,6 +198,7 @@
keepAlives: make(map[LeaseID]*keepAlive),
remote: remote,
firstKeepAliveTimeout: keepAliveTimeout,
+ lg: c.lg,
}
if l.firstKeepAliveTimeout == time.Second {
l.firstKeepAliveTimeout = defaultTTL
@@ -291,7 +294,7 @@
}
l.mu.Unlock()
- go l.keepAliveCtxCloser(id, ctx, ka.donec)
+ go l.keepAliveCtxCloser(ctx, id, ka.donec)
l.firstKeepAliveOnce.Do(func() {
go l.recvKeepAliveLoop()
go l.deadlineLoop()
@@ -323,7 +326,7 @@
return nil
}
-func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
+func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
select {
case <-donec:
return
@@ -459,7 +462,6 @@
select {
case <-time.After(retryConnWait):
- continue
case <-l.stopCtx.Done():
return l.stopCtx.Err()
}
@@ -469,7 +471,7 @@
// resetRecv opens a new lease stream and starts sending keep alive requests.
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx)
- stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
+ stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
if err != nil {
cancel()
return nil, err
@@ -518,6 +520,12 @@
select {
case ch <- karesp:
default:
+ if l.lg != nil {
+ l.lg.Warn("lease keepalive response queue is full; dropping response send",
+ zap.Int("queue-size", len(ch)),
+ zap.Int("queue-capacity", cap(ch)),
+ )
+ }
}
// still advance in order to rate-limit keep-alive sends
ka.nextKeepAlive = nextKeepAlive
@@ -569,7 +577,7 @@
}
select {
- case <-time.After(500 * time.Millisecond):
+ case <-time.After(retryConnWait):
case <-stream.Context().Done():
return
case <-l.donec:
diff --git a/vendor/go.etcd.io/etcd/clientv3/logger.go b/vendor/go.etcd.io/etcd/clientv3/logger.go
index 782e313..f5ae010 100644
--- a/vendor/go.etcd.io/etcd/clientv3/logger.go
+++ b/vendor/go.etcd.io/etcd/clientv3/logger.go
@@ -18,28 +18,14 @@
"io/ioutil"
"sync"
+ "go.etcd.io/etcd/pkg/logutil"
+
"google.golang.org/grpc/grpclog"
)
-// Logger is the logger used by client library.
-// It implements grpclog.LoggerV2 interface.
-type Logger interface {
- grpclog.LoggerV2
-
- // Lvl returns logger if logger's verbosity level >= "lvl".
- // Otherwise, logger that discards all logs.
- Lvl(lvl int) Logger
-
- // to satisfy capnslog
-
- Print(args ...interface{})
- Printf(format string, args ...interface{})
- Println(args ...interface{})
-}
-
var (
- loggerMu sync.RWMutex
- logger Logger
+ lgMu sync.RWMutex
+ lg logutil.Logger
)
type settableLogger struct {
@@ -49,29 +35,29 @@
func init() {
// disable client side logs by default
- logger = &settableLogger{}
+ lg = &settableLogger{}
SetLogger(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
}
// SetLogger sets client-side Logger.
func SetLogger(l grpclog.LoggerV2) {
- loggerMu.Lock()
- logger = NewLogger(l)
+ lgMu.Lock()
+ lg = logutil.NewLogger(l)
// override grpclog so that any changes happen with locking
- grpclog.SetLoggerV2(logger)
- loggerMu.Unlock()
+ grpclog.SetLoggerV2(lg)
+ lgMu.Unlock()
}
-// GetLogger returns the current logger.
-func GetLogger() Logger {
- loggerMu.RLock()
- l := logger
- loggerMu.RUnlock()
+// GetLogger returns the current logutil.Logger.
+func GetLogger() logutil.Logger {
+ lgMu.RLock()
+ l := lg
+ lgMu.RUnlock()
return l
}
-// NewLogger returns a new Logger with grpclog.LoggerV2.
-func NewLogger(gl grpclog.LoggerV2) Logger {
+// NewLogger returns a new Logger with logutil.Logger.
+func NewLogger(gl grpclog.LoggerV2) logutil.Logger {
return &settableLogger{l: gl}
}
@@ -104,32 +90,12 @@
func (s *settableLogger) Printf(format string, args ...interface{}) { s.get().Infof(format, args...) }
func (s *settableLogger) Println(args ...interface{}) { s.get().Infoln(args...) }
func (s *settableLogger) V(l int) bool { return s.get().V(l) }
-func (s *settableLogger) Lvl(lvl int) Logger {
+func (s *settableLogger) Lvl(lvl int) grpclog.LoggerV2 {
s.mu.RLock()
l := s.l
s.mu.RUnlock()
if l.V(lvl) {
return s
}
- return &noLogger{}
+ return logutil.NewDiscardLogger()
}
-
-type noLogger struct{}
-
-func (*noLogger) Info(args ...interface{}) {}
-func (*noLogger) Infof(format string, args ...interface{}) {}
-func (*noLogger) Infoln(args ...interface{}) {}
-func (*noLogger) Warning(args ...interface{}) {}
-func (*noLogger) Warningf(format string, args ...interface{}) {}
-func (*noLogger) Warningln(args ...interface{}) {}
-func (*noLogger) Error(args ...interface{}) {}
-func (*noLogger) Errorf(format string, args ...interface{}) {}
-func (*noLogger) Errorln(args ...interface{}) {}
-func (*noLogger) Fatal(args ...interface{}) {}
-func (*noLogger) Fatalf(format string, args ...interface{}) {}
-func (*noLogger) Fatalln(args ...interface{}) {}
-func (*noLogger) Print(args ...interface{}) {}
-func (*noLogger) Printf(format string, args ...interface{}) {}
-func (*noLogger) Println(args ...interface{}) {}
-func (*noLogger) V(l int) bool { return false }
-func (ng *noLogger) Lvl(lvl int) Logger { return ng }
diff --git a/vendor/go.etcd.io/etcd/clientv3/maintenance.go b/vendor/go.etcd.io/etcd/clientv3/maintenance.go
index f60cfbe..744455a 100644
--- a/vendor/go.etcd.io/etcd/clientv3/maintenance.go
+++ b/vendor/go.etcd.io/etcd/clientv3/maintenance.go
@@ -16,9 +16,10 @@
import (
"context"
+ "fmt"
"io"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
@@ -57,6 +58,8 @@
HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
// Snapshot provides a reader for a point-in-time snapshot of etcd.
+ // If the context "ctx" is canceled or timed out, reading from returned
+ // "io.ReadCloser" would error out (e.g. context.Canceled, context.DeadlineExceeded).
Snapshot(ctx context.Context) (io.ReadCloser, error)
// MoveLeader requests current leader to transfer its leadership to the transferee.
@@ -73,9 +76,9 @@
func NewMaintenance(c *Client) Maintenance {
api := &maintenance{
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
- conn, err := c.dial(endpoint)
+ conn, err := c.Dial(endpoint)
if err != nil {
- return nil, nil, err
+ return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %v", endpoint, err)
}
cancel := func() { conn.Close() }
return RetryMaintenanceClient(c, conn), cancel, nil
@@ -173,6 +176,7 @@
func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
remote, cancel, err := m.dial(endpoint)
if err != nil {
+
return nil, toErr(ctx, err)
}
defer cancel()
@@ -184,7 +188,7 @@
}
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
- ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...)
+ ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
if err != nil {
return nil, toErr(ctx, err)
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/op.go b/vendor/go.etcd.io/etcd/clientv3/op.go
index c6ec5bf..13507c9 100644
--- a/vendor/go.etcd.io/etcd/clientv3/op.go
+++ b/vendor/go.etcd.io/etcd/clientv3/op.go
@@ -14,7 +14,7 @@
package clientv3
-import pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+import pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
type opType int
@@ -26,9 +26,7 @@
tTxn
)
-var (
- noPrefixEnd = []byte{0}
-)
+var noPrefixEnd = []byte{0}
// Op represents an Operation that kv can execute.
type Op struct {
@@ -53,6 +51,12 @@
// for watch, put, delete
prevKV bool
+ // for watch
+ // fragmentation should be disabled by default
+ // if true, split watch events when total exceeds
+ // "--max-request-bytes" flag value + 512-byte
+ fragment bool
+
// for put
ignoreValue bool
ignoreLease bool
@@ -77,8 +81,15 @@
// accessors / mutators
-func (op Op) IsTxn() bool { return op.t == tTxn }
-func (op Op) Txn() ([]Cmp, []Op, []Op) { return op.cmps, op.thenOps, op.elseOps }
+// IsTxn returns true if the "Op" type is transaction.
+func (op Op) IsTxn() bool {
+ return op.t == tTxn
+}
+
+// Txn returns the comparison(if) operations, "then" operations, and "else" operations.
+func (op Op) Txn() ([]Cmp, []Op, []Op) {
+ return op.cmps, op.thenOps, op.elseOps
+}
// KeyBytes returns the byte slice holding the Op's key.
func (op Op) KeyBytes() []byte { return op.key }
@@ -205,12 +216,14 @@
return op.t != tRange
}
+// OpGet returns "get" operation based on given key and operation options.
func OpGet(key string, opts ...OpOption) Op {
ret := Op{t: tRange, key: []byte(key)}
ret.applyOpts(opts)
return ret
}
+// OpDelete returns "delete" operation based on given key and operation options.
func OpDelete(key string, opts ...OpOption) Op {
ret := Op{t: tDeleteRange, key: []byte(key)}
ret.applyOpts(opts)
@@ -239,6 +252,7 @@
return ret
}
+// OpPut returns "put" operation based on given key-value and operation options.
func OpPut(key, val string, opts ...OpOption) Op {
ret := Op{t: tPut, key: []byte(key), val: []byte(val)}
ret.applyOpts(opts)
@@ -267,6 +281,7 @@
return ret
}
+// OpTxn returns "txn" operation based on given transaction conditions.
func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op {
return Op{t: tTxn, cmps: cmps, thenOps: thenOps, elseOps: elseOps}
}
@@ -377,7 +392,14 @@
// WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests
// to be equal or greater than the key in the argument.
-func WithFromKey() OpOption { return WithRange("\x00") }
+func WithFromKey() OpOption {
+ return func(op *Op) {
+ if len(op.key) == 0 {
+ op.key = []byte{0}
+ }
+ op.end = []byte("\x00")
+ }
+}
// WithSerializable makes 'Get' request serializable. By default,
// it's linearizable. Serializable requests are better for lower latency
@@ -466,6 +488,17 @@
}
}
+// WithFragment to receive raw watch response with fragmentation.
+// Fragmentation is disabled by default. If fragmentation is enabled,
+// etcd watch server will split watch response before sending to clients
+// when the total size of watch events exceed server-side request limit.
+// The default server-side request limit is 1.5 MiB, which can be configured
+// as "--max-request-bytes" flag value + gRPC-overhead 512 bytes.
+// See "etcdserver/api/v3rpc/watch.go" for more details.
+func WithFragment() OpOption {
+ return func(op *Op) { op.fragment = true }
+}
+
// WithIgnoreValue updates the key using its current value.
// This option can not be combined with non-empty values.
// Returns an error if the key does not exist.
diff --git a/vendor/go.etcd.io/etcd/clientv3/options.go b/vendor/go.etcd.io/etcd/clientv3/options.go
index fa25811..4660ace 100644
--- a/vendor/go.etcd.io/etcd/clientv3/options.go
+++ b/vendor/go.etcd.io/etcd/clientv3/options.go
@@ -16,17 +16,17 @@
import (
"math"
+ "time"
"google.golang.org/grpc"
)
var (
- // Disable gRPC internal retrial logic
- // TODO: enable when gRPC retry is stable (FailFast=false)
- // Reference:
- // - https://github.com/grpc/grpc-go/issues/1532
- // - https://github.com/grpc/proposal/blob/master/A6-client-retries.md
- defaultFailFast = grpc.FailFast(true)
+ // client-side handling retrying of request failures where data was not written to the wire or
+ // where server indicates it did not process the data. gRPC default is default is "FailFast(true)"
+ // but for etcd we default to "FailFast(false)" to minimize client request error responses due to
+ // transient failures.
+ defaultFailFast = grpc.FailFast(false)
// client-side request send limit, gRPC default is math.MaxInt32
// Make sure that "client-side send limit < server-side default send/recv limit"
@@ -38,6 +38,22 @@
// because range response can easily exceed request send limits
// Default to math.MaxInt32; writes exceeding server-side send limit fails anyway
defaultMaxCallRecvMsgSize = grpc.MaxCallRecvMsgSize(math.MaxInt32)
+
+ // client-side non-streaming retry limit, only applied to requests where server responds with
+ // a error code clearly indicating it was unable to process the request such as codes.Unavailable.
+ // If set to 0, retry is disabled.
+ defaultUnaryMaxRetries uint = 100
+
+ // client-side streaming retry limit, only applied to requests where server responds with
+ // a error code clearly indicating it was unable to process the request such as codes.Unavailable.
+ // If set to 0, retry is disabled.
+ defaultStreamMaxRetries = uint(^uint(0)) // max uint
+
+ // client-side retry backoff wait between requests.
+ defaultBackoffWaitBetween = 25 * time.Millisecond
+
+ // client-side retry backoff default jitter fraction.
+ defaultBackoffJitterFraction = 0.10
)
// defaultCallOpts defines a list of default "gRPC.CallOption".
diff --git a/vendor/go.etcd.io/etcd/clientv3/retry.go b/vendor/go.etcd.io/etcd/clientv3/retry.go
index 7f89ba6..38ad00a 100644
--- a/vendor/go.etcd.io/etcd/clientv3/retry.go
+++ b/vendor/go.etcd.io/etcd/clientv3/retry.go
@@ -17,8 +17,8 @@
import (
"context"
- "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -32,465 +32,267 @@
nonRepeatable
)
+func (rp retryPolicy) String() string {
+ switch rp {
+ case repeatable:
+ return "repeatable"
+ case nonRepeatable:
+ return "nonRepeatable"
+ default:
+ return "UNKNOWN"
+ }
+}
+
type rpcFunc func(ctx context.Context) error
type retryRPCFunc func(context.Context, rpcFunc, retryPolicy) error
type retryStopErrFunc func(error) bool
+// isSafeRetryImmutableRPC returns "true" when an immutable request is safe for retry.
+//
// immutable requests (e.g. Get) should be retried unless it's
// an obvious server-side error (e.g. rpctypes.ErrRequestTooLarge).
//
-// "isRepeatableStopError" returns "true" when an immutable request
-// is interrupted by server-side or gRPC-side error and its status
-// code is not transient (!= codes.Unavailable).
-//
-// Returning "true" means retry should stop, since client cannot
+// Returning "false" means retry should stop, since client cannot
// handle itself even with retries.
-func isRepeatableStopError(err error) bool {
+func isSafeRetryImmutableRPC(err error) bool {
eErr := rpctypes.Error(err)
- // always stop retry on etcd errors
if serverErr, ok := eErr.(rpctypes.EtcdError); ok && serverErr.Code() != codes.Unavailable {
- return true
+ // interrupted by non-transient server-side or gRPC-side error
+ // client cannot handle itself (e.g. rpctypes.ErrCompacted)
+ return false
}
// only retry if unavailable
- ev, _ := status.FromError(err)
- return ev.Code() != codes.Unavailable
+ ev, ok := status.FromError(err)
+ if !ok {
+ // all errors from RPC is typed "grpc/status.(*statusError)"
+ // (ref. https://github.com/grpc/grpc-go/pull/1782)
+ //
+ // if the error type is not "grpc/status.(*statusError)",
+ // it could be from "Dial"
+ // TODO: do not retry for now
+ // ref. https://github.com/grpc/grpc-go/issues/1581
+ return false
+ }
+ return ev.Code() == codes.Unavailable
}
+// isSafeRetryMutableRPC returns "true" when a mutable request is safe for retry.
+//
// mutable requests (e.g. Put, Delete, Txn) should only be retried
// when the status code is codes.Unavailable when initial connection
-// has not been established (no pinned endpoint).
+// has not been established (no endpoint is up).
//
-// "isNonRepeatableStopError" returns "true" when a mutable request
-// is interrupted by non-transient error that client cannot handle itself,
-// or transient error while the connection has already been established
-// (pinned endpoint exists).
-//
-// Returning "true" means retry should stop, otherwise it violates
+// Returning "false" means retry should stop, otherwise it violates
// write-at-most-once semantics.
-func isNonRepeatableStopError(err error) bool {
- ev, _ := status.FromError(err)
- if ev.Code() != codes.Unavailable {
- return true
+func isSafeRetryMutableRPC(err error) bool {
+ if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable {
+ // not safe for mutable RPCs
+ // e.g. interrupted by non-transient error that client cannot handle itself,
+ // or transient error while the connection has already been established
+ return false
}
desc := rpctypes.ErrorDesc(err)
- return desc != "there is no address available" && desc != "there is no connection available"
-}
-
-func (c *Client) newRetryWrapper() retryRPCFunc {
- return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error {
- var isStop retryStopErrFunc
- switch rp {
- case repeatable:
- isStop = isRepeatableStopError
- case nonRepeatable:
- isStop = isNonRepeatableStopError
- }
- for {
- if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil {
- return err
- }
- pinned := c.balancer.pinned()
- err := f(rpcCtx)
- if err == nil {
- return nil
- }
- logger.Lvl(4).Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned)
-
- if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) {
- // mark this before endpoint switch is triggered
- c.balancer.hostPortError(pinned, err)
- c.balancer.next()
- logger.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error())
- }
-
- if isStop(err) {
- return err
- }
- }
- }
-}
-
-func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc {
- return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error {
- for {
- pinned := c.balancer.pinned()
- err := retryf(rpcCtx, f, rp)
- if err == nil {
- return nil
- }
- logger.Lvl(4).Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned)
- // always stop retry on etcd errors other than invalid auth token
- if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
- gterr := c.getToken(rpcCtx)
- if gterr != nil {
- logger.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned)
- return err // return the original error for simplicity
- }
- continue
- }
- return err
- }
- }
+ return desc == "there is no address available" || desc == "there is no connection available"
}
type retryKVClient struct {
- kc pb.KVClient
- retryf retryRPCFunc
+ kc pb.KVClient
}
// RetryKVClient implements a KVClient.
func RetryKVClient(c *Client) pb.KVClient {
return &retryKVClient{
- kc: pb.NewKVClient(c.conn),
- retryf: c.newAuthRetryWrapper(c.newRetryWrapper()),
+ kc: pb.NewKVClient(c.conn),
}
}
func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
- err = rkv.retryf(ctx, func(rctx context.Context) error {
- resp, err = rkv.kc.Range(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rkv.kc.Range(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
- err = rkv.retryf(ctx, func(rctx context.Context) error {
- resp, err = rkv.kc.Put(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rkv.kc.Put(ctx, in, opts...)
}
func (rkv *retryKVClient) DeleteRange(ctx context.Context, in *pb.DeleteRangeRequest, opts ...grpc.CallOption) (resp *pb.DeleteRangeResponse, err error) {
- err = rkv.retryf(ctx, func(rctx context.Context) error {
- resp, err = rkv.kc.DeleteRange(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rkv.kc.DeleteRange(ctx, in, opts...)
}
func (rkv *retryKVClient) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallOption) (resp *pb.TxnResponse, err error) {
- // TODO: "repeatable" for read-only txn
- err = rkv.retryf(ctx, func(rctx context.Context) error {
- resp, err = rkv.kc.Txn(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rkv.kc.Txn(ctx, in, opts...)
}
func (rkv *retryKVClient) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (resp *pb.CompactionResponse, err error) {
- err = rkv.retryf(ctx, func(rctx context.Context) error {
- resp, err = rkv.kc.Compact(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rkv.kc.Compact(ctx, in, opts...)
}
type retryLeaseClient struct {
- lc pb.LeaseClient
- retryf retryRPCFunc
+ lc pb.LeaseClient
}
// RetryLeaseClient implements a LeaseClient.
func RetryLeaseClient(c *Client) pb.LeaseClient {
return &retryLeaseClient{
- lc: pb.NewLeaseClient(c.conn),
- retryf: c.newAuthRetryWrapper(c.newRetryWrapper()),
+ lc: pb.NewLeaseClient(c.conn),
}
}
func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) {
- err = rlc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rlc.lc.LeaseTimeToLive(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rlc.lc.LeaseTimeToLive(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) {
- err = rlc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rlc.lc.LeaseLeases(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rlc.lc.LeaseLeases(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
- err = rlc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rlc.lc.LeaseGrant(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
-
+ return rlc.lc.LeaseGrant(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) {
- err = rlc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rlc.lc.LeaseRevoke(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rlc.lc.LeaseRevoke(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) {
- err = rlc.retryf(ctx, func(rctx context.Context) error {
- stream, err = rlc.lc.LeaseKeepAlive(rctx, opts...)
- return err
- }, repeatable)
- return stream, err
+ return rlc.lc.LeaseKeepAlive(ctx, append(opts, withRetryPolicy(repeatable))...)
}
type retryClusterClient struct {
- cc pb.ClusterClient
- retryf retryRPCFunc
+ cc pb.ClusterClient
}
// RetryClusterClient implements a ClusterClient.
func RetryClusterClient(c *Client) pb.ClusterClient {
return &retryClusterClient{
- cc: pb.NewClusterClient(c.conn),
- retryf: c.newRetryWrapper(),
+ cc: pb.NewClusterClient(c.conn),
}
}
func (rcc *retryClusterClient) MemberList(ctx context.Context, in *pb.MemberListRequest, opts ...grpc.CallOption) (resp *pb.MemberListResponse, err error) {
- err = rcc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rcc.cc.MemberList(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rcc.cc.MemberList(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
- err = rcc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rcc.cc.MemberAdd(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rcc.cc.MemberAdd(ctx, in, opts...)
}
func (rcc *retryClusterClient) MemberRemove(ctx context.Context, in *pb.MemberRemoveRequest, opts ...grpc.CallOption) (resp *pb.MemberRemoveResponse, err error) {
- err = rcc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rcc.cc.MemberRemove(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rcc.cc.MemberRemove(ctx, in, opts...)
}
func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUpdateRequest, opts ...grpc.CallOption) (resp *pb.MemberUpdateResponse, err error) {
- err = rcc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rcc.cc.MemberUpdate(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rcc.cc.MemberUpdate(ctx, in, opts...)
}
type retryMaintenanceClient struct {
- mc pb.MaintenanceClient
- retryf retryRPCFunc
+ mc pb.MaintenanceClient
}
// RetryMaintenanceClient implements a Maintenance.
func RetryMaintenanceClient(c *Client, conn *grpc.ClientConn) pb.MaintenanceClient {
return &retryMaintenanceClient{
- mc: pb.NewMaintenanceClient(conn),
- retryf: c.newRetryWrapper(),
+ mc: pb.NewMaintenanceClient(conn),
}
}
func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmRequest, opts ...grpc.CallOption) (resp *pb.AlarmResponse, err error) {
- err = rmc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rmc.mc.Alarm(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rmc.mc.Alarm(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) {
- err = rmc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rmc.mc.Status(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rmc.mc.Status(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rmc *retryMaintenanceClient) Hash(ctx context.Context, in *pb.HashRequest, opts ...grpc.CallOption) (resp *pb.HashResponse, err error) {
- err = rmc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rmc.mc.Hash(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rmc.mc.Hash(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rmc *retryMaintenanceClient) HashKV(ctx context.Context, in *pb.HashKVRequest, opts ...grpc.CallOption) (resp *pb.HashKVResponse, err error) {
- err = rmc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rmc.mc.HashKV(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rmc.mc.HashKV(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rmc *retryMaintenanceClient) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (stream pb.Maintenance_SnapshotClient, err error) {
- err = rmc.retryf(ctx, func(rctx context.Context) error {
- stream, err = rmc.mc.Snapshot(rctx, in, opts...)
- return err
- }, repeatable)
- return stream, err
+ return rmc.mc.Snapshot(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rmc *retryMaintenanceClient) MoveLeader(ctx context.Context, in *pb.MoveLeaderRequest, opts ...grpc.CallOption) (resp *pb.MoveLeaderResponse, err error) {
- err = rmc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rmc.mc.MoveLeader(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rmc.mc.MoveLeader(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rmc *retryMaintenanceClient) Defragment(ctx context.Context, in *pb.DefragmentRequest, opts ...grpc.CallOption) (resp *pb.DefragmentResponse, err error) {
- err = rmc.retryf(ctx, func(rctx context.Context) error {
- resp, err = rmc.mc.Defragment(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rmc.mc.Defragment(ctx, in, opts...)
}
type retryAuthClient struct {
- ac pb.AuthClient
- retryf retryRPCFunc
+ ac pb.AuthClient
}
// RetryAuthClient implements a AuthClient.
func RetryAuthClient(c *Client) pb.AuthClient {
return &retryAuthClient{
- ac: pb.NewAuthClient(c.conn),
- retryf: c.newRetryWrapper(),
+ ac: pb.NewAuthClient(c.conn),
}
}
func (rac *retryAuthClient) UserList(ctx context.Context, in *pb.AuthUserListRequest, opts ...grpc.CallOption) (resp *pb.AuthUserListResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.UserList(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rac.ac.UserList(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rac *retryAuthClient) UserGet(ctx context.Context, in *pb.AuthUserGetRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGetResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.UserGet(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rac.ac.UserGet(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rac *retryAuthClient) RoleGet(ctx context.Context, in *pb.AuthRoleGetRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGetResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.RoleGet(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rac.ac.RoleGet(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rac *retryAuthClient) RoleList(ctx context.Context, in *pb.AuthRoleListRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleListResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.RoleList(rctx, in, opts...)
- return err
- }, repeatable)
- return resp, err
+ return rac.ac.RoleList(ctx, in, append(opts, withRetryPolicy(repeatable))...)
}
func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.AuthEnable(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.AuthEnable(ctx, in, opts...)
}
func (rac *retryAuthClient) AuthDisable(ctx context.Context, in *pb.AuthDisableRequest, opts ...grpc.CallOption) (resp *pb.AuthDisableResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.AuthDisable(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.AuthDisable(ctx, in, opts...)
}
func (rac *retryAuthClient) UserAdd(ctx context.Context, in *pb.AuthUserAddRequest, opts ...grpc.CallOption) (resp *pb.AuthUserAddResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.UserAdd(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.UserAdd(ctx, in, opts...)
}
func (rac *retryAuthClient) UserDelete(ctx context.Context, in *pb.AuthUserDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthUserDeleteResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.UserDelete(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.UserDelete(ctx, in, opts...)
}
func (rac *retryAuthClient) UserChangePassword(ctx context.Context, in *pb.AuthUserChangePasswordRequest, opts ...grpc.CallOption) (resp *pb.AuthUserChangePasswordResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.UserChangePassword(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.UserChangePassword(ctx, in, opts...)
}
func (rac *retryAuthClient) UserGrantRole(ctx context.Context, in *pb.AuthUserGrantRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserGrantRoleResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.UserGrantRole(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.UserGrantRole(ctx, in, opts...)
}
func (rac *retryAuthClient) UserRevokeRole(ctx context.Context, in *pb.AuthUserRevokeRoleRequest, opts ...grpc.CallOption) (resp *pb.AuthUserRevokeRoleResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.UserRevokeRole(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.UserRevokeRole(ctx, in, opts...)
}
func (rac *retryAuthClient) RoleAdd(ctx context.Context, in *pb.AuthRoleAddRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleAddResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.RoleAdd(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.RoleAdd(ctx, in, opts...)
}
func (rac *retryAuthClient) RoleDelete(ctx context.Context, in *pb.AuthRoleDeleteRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleDeleteResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.RoleDelete(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.RoleDelete(ctx, in, opts...)
}
func (rac *retryAuthClient) RoleGrantPermission(ctx context.Context, in *pb.AuthRoleGrantPermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleGrantPermissionResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.RoleGrantPermission(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.RoleGrantPermission(ctx, in, opts...)
}
func (rac *retryAuthClient) RoleRevokePermission(ctx context.Context, in *pb.AuthRoleRevokePermissionRequest, opts ...grpc.CallOption) (resp *pb.AuthRoleRevokePermissionResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.RoleRevokePermission(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.RoleRevokePermission(ctx, in, opts...)
}
func (rac *retryAuthClient) Authenticate(ctx context.Context, in *pb.AuthenticateRequest, opts ...grpc.CallOption) (resp *pb.AuthenticateResponse, err error) {
- err = rac.retryf(ctx, func(rctx context.Context) error {
- resp, err = rac.ac.Authenticate(rctx, in, opts...)
- return err
- }, nonRepeatable)
- return resp, err
+ return rac.ac.Authenticate(ctx, in, opts...)
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go b/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go
new file mode 100644
index 0000000..6b1054e
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go
@@ -0,0 +1,395 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Based on github.com/grpc-ecosystem/go-grpc-middleware/retry, but modified to support the more
+// fine grained error checking required by write-at-most-once retry semantics of etcd.
+
+package clientv3
+
+import (
+ "context"
+ "io"
+ "sync"
+ "time"
+
+ "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ "go.uber.org/zap"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+)
+
+// unaryClientInterceptor returns a new retrying unary client interceptor.
+//
+// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
+// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
+func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor {
+ intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
+ return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ grpcOpts, retryOpts := filterCallOptions(opts)
+ callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
+ // short circuit for simplicity, and avoiding allocations.
+ if callOpts.max == 0 {
+ return invoker(ctx, method, req, reply, cc, grpcOpts...)
+ }
+ var lastErr error
+ for attempt := uint(0); attempt < callOpts.max; attempt++ {
+ if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
+ return err
+ }
+ logger.Info(
+ "retrying of unary invoker",
+ zap.String("target", cc.Target()),
+ zap.Uint("attempt", attempt),
+ )
+ lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
+ if lastErr == nil {
+ return nil
+ }
+ logger.Warn(
+ "retrying of unary invoker failed",
+ zap.String("target", cc.Target()),
+ zap.Uint("attempt", attempt),
+ zap.Error(lastErr),
+ )
+ if isContextError(lastErr) {
+ if ctx.Err() != nil {
+ // its the context deadline or cancellation.
+ return lastErr
+ }
+ // its the callCtx deadline or cancellation, in which case try again.
+ continue
+ }
+ if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
+ gterr := c.getToken(ctx)
+ if gterr != nil {
+ logger.Warn(
+ "retrying of unary invoker failed to fetch new auth token",
+ zap.String("target", cc.Target()),
+ zap.Error(gterr),
+ )
+ return lastErr // return the original error for simplicity
+ }
+ continue
+ }
+ if !isSafeRetry(c.lg, lastErr, callOpts) {
+ return lastErr
+ }
+ }
+ return lastErr
+ }
+}
+
+// streamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls.
+//
+// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
+// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
+//
+// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs
+// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams,
+// BidiStreams), the retry interceptor will fail the call.
+func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor {
+ intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
+ return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ grpcOpts, retryOpts := filterCallOptions(opts)
+ callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
+ // short circuit for simplicity, and avoiding allocations.
+ if callOpts.max == 0 {
+ return streamer(ctx, desc, cc, method, grpcOpts...)
+ }
+ if desc.ClientStreams {
+ return nil, grpc.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()")
+ }
+ newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
+ logger.Info("retry stream intercept", zap.Error(err))
+ if err != nil {
+ // TODO(mwitkow): Maybe dial and transport errors should be retriable?
+ return nil, err
+ }
+ retryingStreamer := &serverStreamingRetryingStream{
+ client: c,
+ ClientStream: newStreamer,
+ callOpts: callOpts,
+ ctx: ctx,
+ streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
+ return streamer(ctx, desc, cc, method, grpcOpts...)
+ },
+ }
+ return retryingStreamer, nil
+ }
+}
+
+// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
+// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
+// a new ClientStream according to the retry policy.
+type serverStreamingRetryingStream struct {
+ grpc.ClientStream
+ client *Client
+ bufferedSends []interface{} // single message that the client can sen
+ receivedGood bool // indicates whether any prior receives were successful
+ wasClosedSend bool // indicates that CloseSend was closed
+ ctx context.Context
+ callOpts *options
+ streamerCall func(ctx context.Context) (grpc.ClientStream, error)
+ mu sync.RWMutex
+}
+
+func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) {
+ s.mu.Lock()
+ s.ClientStream = clientStream
+ s.mu.Unlock()
+}
+
+func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return s.ClientStream
+}
+
+func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error {
+ s.mu.Lock()
+ s.bufferedSends = append(s.bufferedSends, m)
+ s.mu.Unlock()
+ return s.getStream().SendMsg(m)
+}
+
+func (s *serverStreamingRetryingStream) CloseSend() error {
+ s.mu.Lock()
+ s.wasClosedSend = true
+ s.mu.Unlock()
+ return s.getStream().CloseSend()
+}
+
+func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) {
+ return s.getStream().Header()
+}
+
+func (s *serverStreamingRetryingStream) Trailer() metadata.MD {
+ return s.getStream().Trailer()
+}
+
+func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
+ attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m)
+ if !attemptRetry {
+ return lastErr // success or hard failure
+ }
+ // We start off from attempt 1, because zeroth was already made on normal SendMsg().
+ for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
+ if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil {
+ return err
+ }
+ newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
+ if err != nil {
+ // TODO(mwitkow): Maybe dial and transport errors should be retriable?
+ return err
+ }
+ s.setStream(newStream)
+ attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
+ //fmt.Printf("Received message and indicate: %v %v\n", attemptRetry, lastErr)
+ if !attemptRetry {
+ return lastErr
+ }
+ }
+ return lastErr
+}
+
+func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) {
+ s.mu.RLock()
+ wasGood := s.receivedGood
+ s.mu.RUnlock()
+ err := s.getStream().RecvMsg(m)
+ if err == nil || err == io.EOF {
+ s.mu.Lock()
+ s.receivedGood = true
+ s.mu.Unlock()
+ return false, err
+ } else if wasGood {
+ // previous RecvMsg in the stream succeeded, no retry logic should interfere
+ return false, err
+ }
+ if isContextError(err) {
+ if s.ctx.Err() != nil {
+ return false, err
+ }
+ // its the callCtx deadline or cancellation, in which case try again.
+ return true, err
+ }
+ if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
+ gterr := s.client.getToken(s.ctx)
+ if gterr != nil {
+ s.client.lg.Info("retry failed to fetch new auth token", zap.Error(gterr))
+ return false, err // return the original error for simplicity
+ }
+ return true, err
+
+ }
+ return isSafeRetry(s.client.lg, err, s.callOpts), err
+}
+
+func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) {
+ s.mu.RLock()
+ bufferedSends := s.bufferedSends
+ s.mu.RUnlock()
+ newStream, err := s.streamerCall(callCtx)
+ if err != nil {
+ return nil, err
+ }
+ for _, msg := range bufferedSends {
+ if err := newStream.SendMsg(msg); err != nil {
+ return nil, err
+ }
+ }
+ if err := newStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return newStream, nil
+}
+
+func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) error {
+ waitTime := time.Duration(0)
+ if attempt > 0 {
+ waitTime = callOpts.backoffFunc(attempt)
+ }
+ if waitTime > 0 {
+ timer := time.NewTimer(waitTime)
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ return contextErrToGrpcErr(ctx.Err())
+ case <-timer.C:
+ }
+ }
+ return nil
+}
+
+// isSafeRetry returns "true", if request is safe for retry with the given error.
+func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool {
+ if isContextError(err) {
+ return false
+ }
+ switch callOpts.retryPolicy {
+ case repeatable:
+ return isSafeRetryImmutableRPC(err)
+ case nonRepeatable:
+ return isSafeRetryMutableRPC(err)
+ default:
+ lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
+ return false
+ }
+}
+
+func isContextError(err error) bool {
+ return grpc.Code(err) == codes.DeadlineExceeded || grpc.Code(err) == codes.Canceled
+}
+
+func contextErrToGrpcErr(err error) error {
+ switch err {
+ case context.DeadlineExceeded:
+ return grpc.Errorf(codes.DeadlineExceeded, err.Error())
+ case context.Canceled:
+ return grpc.Errorf(codes.Canceled, err.Error())
+ default:
+ return grpc.Errorf(codes.Unknown, err.Error())
+ }
+}
+
+var (
+ defaultOptions = &options{
+ retryPolicy: nonRepeatable,
+ max: 0, // disable
+ backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
+ retryAuth: true,
+ }
+)
+
+// backoffFunc denotes a family of functions that control the backoff duration between call retries.
+//
+// They are called with an identifier of the attempt, and should return a time the system client should
+// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
+// the deadline of the request takes precedence and the wait will be interrupted before proceeding
+// with the next iteration.
+type backoffFunc func(attempt uint) time.Duration
+
+// withRetryPolicy sets the retry policy of this call.
+func withRetryPolicy(rp retryPolicy) retryOption {
+ return retryOption{applyFunc: func(o *options) {
+ o.retryPolicy = rp
+ }}
+}
+
+// withAuthRetry sets enables authentication retries.
+func withAuthRetry(retryAuth bool) retryOption {
+ return retryOption{applyFunc: func(o *options) {
+ o.retryAuth = retryAuth
+ }}
+}
+
+// withMax sets the maximum number of retries on this call, or this interceptor.
+func withMax(maxRetries uint) retryOption {
+ return retryOption{applyFunc: func(o *options) {
+ o.max = maxRetries
+ }}
+}
+
+// WithBackoff sets the `BackoffFunc `used to control time between retries.
+func withBackoff(bf backoffFunc) retryOption {
+ return retryOption{applyFunc: func(o *options) {
+ o.backoffFunc = bf
+ }}
+}
+
+type options struct {
+ retryPolicy retryPolicy
+ max uint
+ backoffFunc backoffFunc
+ retryAuth bool
+}
+
+// retryOption is a grpc.CallOption that is local to clientv3's retry interceptor.
+type retryOption struct {
+ grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
+ applyFunc func(opt *options)
+}
+
+func reuseOrNewWithCallOptions(opt *options, retryOptions []retryOption) *options {
+ if len(retryOptions) == 0 {
+ return opt
+ }
+ optCopy := &options{}
+ *optCopy = *opt
+ for _, f := range retryOptions {
+ f.applyFunc(optCopy)
+ }
+ return optCopy
+}
+
+func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []retryOption) {
+ for _, opt := range callOptions {
+ if co, ok := opt.(retryOption); ok {
+ retryOptions = append(retryOptions, co)
+ } else {
+ grpcOptions = append(grpcOptions, opt)
+ }
+ }
+ return grpcOptions, retryOptions
+}
+
+// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).
+//
+// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.
+func backoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) backoffFunc {
+ return func(attempt uint) time.Duration {
+ return jitterUp(waitBetween, jitterFraction)
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/txn.go b/vendor/go.etcd.io/etcd/clientv3/txn.go
index c3c2d24..c19715d 100644
--- a/vendor/go.etcd.io/etcd/clientv3/txn.go
+++ b/vendor/go.etcd.io/etcd/clientv3/txn.go
@@ -18,7 +18,7 @@
"context"
"sync"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
diff --git a/vendor/go.etcd.io/etcd/clientv3/utils.go b/vendor/go.etcd.io/etcd/clientv3/utils.go
new file mode 100644
index 0000000..8502758
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/utils.go
@@ -0,0 +1,31 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "math/rand"
+ "time"
+)
+
+// jitterUp adds random jitter to the duration.
+//
+// This adds or subtracts time from the duration within a given jitter fraction.
+// For example for 10s and jitter 0.1, it will return a time within [9s, 11s])
+//
+// Reference: https://godoc.org/github.com/grpc-ecosystem/go-grpc-middleware/util/backoffutils
+func jitterUp(duration time.Duration, jitter float64) time.Duration {
+ multiplier := jitter * (rand.Float64()*2 - 1)
+ return time.Duration(float64(duration) * (1 + multiplier))
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/watch.go b/vendor/go.etcd.io/etcd/clientv3/watch.go
index d763385..8ec58bb 100644
--- a/vendor/go.etcd.io/etcd/clientv3/watch.go
+++ b/vendor/go.etcd.io/etcd/clientv3/watch.go
@@ -16,13 +16,14 @@
import (
"context"
+ "errors"
"fmt"
"sync"
"time"
- v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
+ v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -46,8 +47,33 @@
// through the returned channel. If revisions waiting to be sent over the
// watch are compacted, then the watch will be canceled by the server, the
// client will post a compacted error watch response, and the channel will close.
+ // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
+ // and "WatchResponse" from this closed channel has zero events and nil "Err()".
+ // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
+ // to release the associated resources.
+ //
+ // If the context is "context.Background/TODO", returned "WatchChan" will
+ // not be closed and block until event is triggered, except when server
+ // returns a non-recoverable error (e.g. ErrCompacted).
+ // For example, when context passed with "WithRequireLeader" and the
+ // connected server has no leader (e.g. due to network partition),
+ // error "etcdserver: no leader" (ErrNoLeader) will be returned,
+ // and then "WatchChan" is closed with non-nil "Err()".
+ // In order to prevent a watch stream being stuck in a partitioned node,
+ // make sure to wrap context with "WithRequireLeader".
+ //
+ // Otherwise, as long as the context has not been canceled or timed out,
+ // watch will retry on other recoverable errors forever until reconnected.
+ //
+ // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
+ // Currently, client contexts are overwritten with "valCtx" that never closes.
+ // TODO(v3.4): configure watch retry policy, limit maximum retry number
+ // (see https://github.com/etcd-io/etcd/issues/8980)
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
+ // RequestProgress requests a progress notify response be sent in all watch channels.
+ RequestProgress(ctx context.Context) error
+
// Close closes the watcher and cancels all watch requests.
Close() error
}
@@ -134,7 +160,7 @@
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
- reqc chan *watchRequest
+ reqc chan watchStreamRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
@@ -152,16 +178,27 @@
closeErr error
}
+// watchStreamRequest is a union of the supported watch request operation types
+type watchStreamRequest interface {
+ toPB() *pb.WatchRequest
+}
+
// watchRequest is issued by the subscriber to start a new watcher
type watchRequest struct {
ctx context.Context
key string
end string
rev int64
+
// send created notification event if this field is true
createdNotify bool
// progressNotify is for progress updates
progressNotify bool
+ // fragmentation should be disabled by default
+ // if true, split watch events when total exceeds
+ // "--max-request-bytes" flag value + 512-byte
+ fragment bool
+
// filters is the list of events to filter out
filters []pb.WatchCreateRequest_FilterType
// get the previous key-value pair before the event happens
@@ -170,6 +207,10 @@
retc chan chan WatchResponse
}
+// progressRequest is issued by the subscriber to request watch progress
+type progressRequest struct {
+}
+
// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
@@ -227,7 +268,7 @@
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
- reqc: make(chan *watchRequest),
+ reqc: make(chan watchStreamRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
@@ -256,6 +297,7 @@
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
+ fragment: ow.fragment,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
@@ -292,7 +334,7 @@
case <-wr.ctx.Done():
case <-donec:
if wgs.closeErr != nil {
- closeCh <- WatchResponse{closeErr: wgs.closeErr}
+ closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
@@ -307,7 +349,7 @@
case <-ctx.Done():
case <-donec:
if wgs.closeErr != nil {
- closeCh <- WatchResponse{closeErr: wgs.closeErr}
+ closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
@@ -332,6 +374,42 @@
return err
}
+// RequestProgress requests a progress notify response be sent in all watch channels.
+func (w *watcher) RequestProgress(ctx context.Context) (err error) {
+ ctxKey := streamKeyFromCtx(ctx)
+
+ w.mu.Lock()
+ if w.streams == nil {
+ return fmt.Errorf("no stream found for context")
+ }
+ wgs := w.streams[ctxKey]
+ if wgs == nil {
+ wgs = w.newWatcherGrpcStream(ctx)
+ w.streams[ctxKey] = wgs
+ }
+ donec := wgs.donec
+ reqc := wgs.reqc
+ w.mu.Unlock()
+
+ pr := &progressRequest{}
+
+ select {
+ case reqc <- pr:
+ return nil
+ case <-ctx.Done():
+ if err == nil {
+ return ctx.Err()
+ }
+ return err
+ case <-donec:
+ if wgs.closeErr != nil {
+ return wgs.closeErr
+ }
+ // retry; may have dropped stream from no ctxs
+ return w.RequestProgress(ctx)
+ }
+}
+
func (w *watchGrpcStream) close() (err error) {
w.cancel()
<-w.donec
@@ -353,7 +431,9 @@
}
func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
- if resp.WatchId == -1 {
+ // check watch ID for backward compatibility (<= v3.3)
+ if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
+ w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
// failed; no channel
close(ws.recvc)
return
@@ -379,7 +459,7 @@
}
// close subscriber's channel
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
- go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
+ go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
} else if ws.outc != nil {
close(ws.outc)
}
@@ -434,31 +514,48 @@
cancelSet := make(map[int64]struct{})
+ var cur *pb.WatchResponse
for {
select {
// Watch() requested
- case wreq := <-w.reqc:
- outc := make(chan WatchResponse, 1)
- ws := &watcherStream{
- initReq: *wreq,
- id: -1,
- outc: outc,
- // unbuffered so resumes won't cause repeat events
- recvc: make(chan *WatchResponse),
+ case req := <-w.reqc:
+ switch wreq := req.(type) {
+ case *watchRequest:
+ outc := make(chan WatchResponse, 1)
+ // TODO: pass custom watch ID?
+ ws := &watcherStream{
+ initReq: *wreq,
+ id: -1,
+ outc: outc,
+ // unbuffered so resumes won't cause repeat events
+ recvc: make(chan *WatchResponse),
+ }
+
+ ws.donec = make(chan struct{})
+ w.wg.Add(1)
+ go w.serveSubstream(ws, w.resumec)
+
+ // queue up for watcher creation/resume
+ w.resuming = append(w.resuming, ws)
+ if len(w.resuming) == 1 {
+ // head of resume queue, can register a new watcher
+ wc.Send(ws.initReq.toPB())
+ }
+ case *progressRequest:
+ wc.Send(wreq.toPB())
}
- ws.donec = make(chan struct{})
- w.wg.Add(1)
- go w.serveSubstream(ws, w.resumec)
-
- // queue up for watcher creation/resume
- w.resuming = append(w.resuming, ws)
- if len(w.resuming) == 1 {
- // head of resume queue, can register a new watcher
- wc.Send(ws.initReq.toPB())
- }
- // New events from the watch client
+ // new events from the watch client
case pbresp := <-w.respc:
+ if cur == nil || pbresp.Created || pbresp.Canceled {
+ cur = pbresp
+ } else if cur != nil && cur.WatchId == pbresp.WatchId {
+ // merge new events
+ cur.Events = append(cur.Events, pbresp.Events...)
+ // update "Fragment" field; last response with "Fragment" == false
+ cur.Fragment = pbresp.Fragment
+ }
+
switch {
case pbresp.Created:
// response to head of queue creation
@@ -467,9 +564,14 @@
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
+
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
+
+ // reset for next iteration
+ cur = nil
+
case pbresp.Canceled && pbresp.CompactRevision == 0:
delete(cancelSet, pbresp.WatchId)
if ws, ok := w.substreams[pbresp.WatchId]; ok {
@@ -477,15 +579,31 @@
close(ws.recvc)
closing[ws] = struct{}{}
}
+
+ // reset for next iteration
+ cur = nil
+
+ case cur.Fragment:
+ // watch response events are still fragmented
+ // continue to fetch next fragmented event arrival
+ continue
+
default:
// dispatch to appropriate watch stream
- if ok := w.dispatchEvent(pbresp); ok {
+ ok := w.dispatchEvent(cur)
+
+ // reset for next iteration
+ cur = nil
+
+ if ok {
break
}
+
// watch response on unexpected watch id; cancel id
if _, ok := cancelSet[pbresp.WatchId]; ok {
break
}
+
cancelSet[pbresp.WatchId] = struct{}{}
cr := &pb.WatchRequest_CancelRequest{
CancelRequest: &pb.WatchCancelRequest{
@@ -495,6 +613,7 @@
req := &pb.WatchRequest{RequestUnion: cr}
wc.Send(req)
}
+
// watch client failed on Recv; spawn another if possible
case err := <-w.errc:
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
@@ -508,13 +627,15 @@
wc.Send(ws.initReq.toPB())
}
cancelSet = make(map[int64]struct{})
+
case <-w.ctx.Done():
return
+
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
+ // no more watchers on this stream, shutdown
if len(w.substreams)+len(w.resuming) == 0 {
- // no more watchers on this stream, shutdown
return
}
}
@@ -539,6 +660,7 @@
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
+ // TODO: return watch ID?
wr := &WatchResponse{
Header: *pbresp.Header,
Events: events,
@@ -547,7 +669,31 @@
Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason,
}
- ws, ok := w.substreams[pbresp.WatchId]
+
+ // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
+ // indicate they should be broadcast.
+ if wr.IsProgressNotify() && pbresp.WatchId == -1 {
+ return w.broadcastResponse(wr)
+ }
+
+ return w.unicastResponse(wr, pbresp.WatchId)
+
+}
+
+// broadcastResponse send a watch response to all watch substreams.
+func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
+ for _, ws := range w.substreams {
+ select {
+ case ws.recvc <- wr:
+ case <-ws.donec:
+ }
+ }
+ return true
+}
+
+// unicastResponse sends a watch response to a specific watch substream.
+func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
+ ws, ok := w.substreams[watchId]
if !ok {
return false
}
@@ -815,11 +961,19 @@
ProgressNotify: wr.progressNotify,
Filters: wr.filters,
PrevKv: wr.prevKV,
+ Fragment: wr.fragment,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
}
+// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
+func (pr *progressRequest) toPB() *pb.WatchRequest {
+ req := &pb.WatchProgressRequest{}
+ cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
+ return &pb.WatchRequest{RequestUnion: cr}
+}
+
func streamKeyFromCtx(ctx context.Context) string {
if md, ok := metadata.FromOutgoingContext(ctx); ok {
return fmt.Sprintf("%+v", md)