gRPC migration update
Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/vendor/go.etcd.io/etcd/clientv3/README.md b/vendor/go.etcd.io/etcd/clientv3/README.md
deleted file mode 100644
index 6c6fe7c..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/README.md
+++ /dev/null
@@ -1,85 +0,0 @@
-# etcd/clientv3
-
-[![Docs](https://img.shields.io/badge/docs-latest-green.svg)](https://etcd.io/docs)
-[![Godoc](https://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://godoc.org/go.etcd.io/etcd/clientv3)
-
-`etcd/clientv3` is the official Go etcd client for v3.
-
-## Install
-
-```bash
-go get go.etcd.io/etcd/clientv3
-```
-
-## Get started
-
-Create client using `clientv3.New`:
-
-```go
-cli, err := clientv3.New(clientv3.Config{
- Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
- DialTimeout: 5 * time.Second,
-})
-if err != nil {
- // handle error!
-}
-defer cli.Close()
-```
-
-etcd v3 uses [`gRPC`](https://www.grpc.io) for remote procedure calls. And `clientv3` uses
-[`grpc-go`](https://github.com/grpc/grpc-go) to connect to etcd. Make sure to close the client after using it.
-If the client is not closed, the connection will have leaky goroutines. To specify client request timeout,
-pass `context.WithTimeout` to APIs:
-
-```go
-ctx, cancel := context.WithTimeout(context.Background(), timeout)
-resp, err := cli.Put(ctx, "sample_key", "sample_value")
-cancel()
-if err != nil {
- // handle error!
-}
-// use the response
-```
-
-For full compatibility, it is recommended to vendor builds using etcd's vendored packages, using tools like `golang/dep`, as in [vendor directories](https://golang.org/cmd/go/#hdr-Vendor_Directories).
-
-## Error Handling
-
-etcd client returns 2 types of errors:
-
-1. context error: canceled or deadline exceeded.
-2. gRPC error: see [api/v3rpc/rpctypes](https://godoc.org/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes).
-
-Here is the example code to handle client errors:
-
-```go
-resp, err := cli.Put(ctx, "", "")
-if err != nil {
- switch err {
- case context.Canceled:
- log.Fatalf("ctx is canceled by another routine: %v", err)
- case context.DeadlineExceeded:
- log.Fatalf("ctx is attached with a deadline is exceeded: %v", err)
- case rpctypes.ErrEmptyKey:
- log.Fatalf("client-side error: %v", err)
- default:
- log.Fatalf("bad cluster endpoints, which are not etcd servers: %v", err)
- }
-}
-```
-
-## Metrics
-
-The etcd client optionally exposes RPC metrics through [go-grpc-prometheus](https://github.com/grpc-ecosystem/go-grpc-prometheus). See the [examples](https://github.com/etcd-io/etcd/blob/master/clientv3/example_metrics_test.go).
-
-## Namespacing
-
-The [namespace](https://godoc.org/go.etcd.io/etcd/clientv3/namespace) package provides `clientv3` interface wrappers to transparently isolate client requests to a user-defined prefix.
-
-## Request size limit
-
-Client request size limit is configurable via `clientv3.Config.MaxCallSendMsgSize` and `MaxCallRecvMsgSize` in bytes. If none given, client request send limit defaults to 2 MiB including gRPC overhead bytes. And receive limit defaults to `math.MaxInt32`.
-
-## Examples
-
-More code examples can be found at [GoDoc](https://godoc.org/go.etcd.io/etcd/clientv3).
diff --git a/vendor/go.etcd.io/etcd/clientv3/auth.go b/vendor/go.etcd.io/etcd/clientv3/auth.go
index c954f1b..edccf1a 100644
--- a/vendor/go.etcd.io/etcd/clientv3/auth.go
+++ b/vendor/go.etcd.io/etcd/clientv3/auth.go
@@ -19,8 +19,9 @@
"fmt"
"strings"
- "go.etcd.io/etcd/auth/authpb"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "github.com/coreos/etcd/auth/authpb"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
"google.golang.org/grpc"
)
@@ -52,8 +53,6 @@
PermReadWrite = authpb.READWRITE
)
-type UserAddOptions authpb.UserAddOptions
-
type Auth interface {
// AuthEnable enables auth of an etcd cluster.
AuthEnable(ctx context.Context) (*AuthEnableResponse, error)
@@ -64,9 +63,6 @@
// UserAdd adds a new user to an etcd cluster.
UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error)
- // UserAddWithOptions adds a new user to an etcd cluster with some options.
- UserAddWithOptions(ctx context.Context, name string, password string, opt *UserAddOptions) (*AuthUserAddResponse, error)
-
// UserDelete deletes a user from an etcd cluster.
UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error)
@@ -104,75 +100,70 @@
RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error)
}
-type authClient struct {
+type auth struct {
remote pb.AuthClient
callOpts []grpc.CallOption
}
func NewAuth(c *Client) Auth {
- api := &authClient{remote: RetryAuthClient(c)}
+ api := &auth{remote: RetryAuthClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
-func (auth *authClient) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
+func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
return (*AuthEnableResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
+func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...)
return (*AuthDisableResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
- resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password, Options: &authpb.UserAddOptions{NoPassword: false}}, auth.callOpts...)
+func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
+ resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) UserAddWithOptions(ctx context.Context, name string, password string, options *UserAddOptions) (*AuthUserAddResponse, error) {
- resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password, Options: (*authpb.UserAddOptions)(options)}, auth.callOpts...)
- return (*AuthUserAddResponse)(resp), toErr(ctx, err)
-}
-
-func (auth *authClient) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
+func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...)
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
+func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
+func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...)
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
+func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...)
return (*AuthUserGetResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) UserList(ctx context.Context) (*AuthUserListResponse, error) {
+func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) {
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...)
return (*AuthUserListResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
+func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...)
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
+func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...)
return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) {
+func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) {
perm := &authpb.Permission{
Key: []byte(key),
RangeEnd: []byte(rangeEnd),
@@ -182,22 +173,22 @@
return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
+func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...)
return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
+func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...)
return (*AuthRoleListResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
- resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: []byte(key), RangeEnd: []byte(rangeEnd)}, auth.callOpts...)
+func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
+ resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, auth.callOpts...)
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
}
-func (auth *authClient) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
+func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...)
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go b/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
deleted file mode 100644
index d02a7ee..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/balancer.go
+++ /dev/null
@@ -1,293 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package balancer implements client balancer.
-package balancer
-
-import (
- "strconv"
- "sync"
- "time"
-
- "go.etcd.io/etcd/clientv3/balancer/connectivity"
- "go.etcd.io/etcd/clientv3/balancer/picker"
-
- "go.uber.org/zap"
- "google.golang.org/grpc/balancer"
- grpcconnectivity "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/resolver"
- _ "google.golang.org/grpc/resolver/dns" // register DNS resolver
- _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
-)
-
-// Config defines balancer configurations.
-type Config struct {
- // Policy configures balancer policy.
- Policy picker.Policy
-
- // Picker implements gRPC picker.
- // Leave empty if "Policy" field is not custom.
- // TODO: currently custom policy is not supported.
- // Picker picker.Picker
-
- // Name defines an additional name for balancer.
- // Useful for balancer testing to avoid register conflicts.
- // If empty, defaults to policy name.
- Name string
-
- // Logger configures balancer logging.
- // If nil, logs are discarded.
- Logger *zap.Logger
-}
-
-// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
-// must be invoked at initialization time.
-func RegisterBuilder(cfg Config) {
- bb := &builder{cfg}
- balancer.Register(bb)
-
- bb.cfg.Logger.Debug(
- "registered balancer",
- zap.String("policy", bb.cfg.Policy.String()),
- zap.String("name", bb.cfg.Name),
- )
-}
-
-type builder struct {
- cfg Config
-}
-
-// Build is called initially when creating "ccBalancerWrapper".
-// "grpc.Dial" is called to this client connection.
-// Then, resolved addresses will be handled via "HandleResolvedAddrs".
-func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
- bb := &baseBalancer{
- id: strconv.FormatInt(time.Now().UnixNano(), 36),
- policy: b.cfg.Policy,
- name: b.cfg.Name,
- lg: b.cfg.Logger,
-
- addrToSc: make(map[resolver.Address]balancer.SubConn),
- scToAddr: make(map[balancer.SubConn]resolver.Address),
- scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
-
- currentConn: nil,
- connectivityRecorder: connectivity.New(b.cfg.Logger),
-
- // initialize picker always returns "ErrNoSubConnAvailable"
- picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
- }
-
- // TODO: support multiple connections
- bb.mu.Lock()
- bb.currentConn = cc
- bb.mu.Unlock()
-
- bb.lg.Info(
- "built balancer",
- zap.String("balancer-id", bb.id),
- zap.String("policy", bb.policy.String()),
- zap.String("resolver-target", cc.Target()),
- )
- return bb
-}
-
-// Name implements "grpc/balancer.Builder" interface.
-func (b *builder) Name() string { return b.cfg.Name }
-
-// Balancer defines client balancer interface.
-type Balancer interface {
- // Balancer is called on specified client connection. Client initiates gRPC
- // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
- // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
- // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
- // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
- // changes, thus requires failover logic in this method.
- balancer.Balancer
-
- // Picker calls "Pick" for every client request.
- picker.Picker
-}
-
-type baseBalancer struct {
- id string
- policy picker.Policy
- name string
- lg *zap.Logger
-
- mu sync.RWMutex
-
- addrToSc map[resolver.Address]balancer.SubConn
- scToAddr map[balancer.SubConn]resolver.Address
- scToSt map[balancer.SubConn]grpcconnectivity.State
-
- currentConn balancer.ClientConn
- connectivityRecorder connectivity.Recorder
-
- picker picker.Picker
-}
-
-// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
-// gRPC sends initial or updated resolved addresses from "Build".
-func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
- if err != nil {
- bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
- return
- }
- bb.lg.Info("resolved",
- zap.String("picker", bb.picker.String()),
- zap.String("balancer-id", bb.id),
- zap.Strings("addresses", addrsToStrings(addrs)),
- )
-
- bb.mu.Lock()
- defer bb.mu.Unlock()
-
- resolved := make(map[resolver.Address]struct{})
- for _, addr := range addrs {
- resolved[addr] = struct{}{}
- if _, ok := bb.addrToSc[addr]; !ok {
- sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
- if err != nil {
- bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
- continue
- }
- bb.lg.Info("created subconn", zap.String("address", addr.Addr))
- bb.addrToSc[addr] = sc
- bb.scToAddr[sc] = addr
- bb.scToSt[sc] = grpcconnectivity.Idle
- sc.Connect()
- }
- }
-
- for addr, sc := range bb.addrToSc {
- if _, ok := resolved[addr]; !ok {
- // was removed by resolver or failed to create subconn
- bb.currentConn.RemoveSubConn(sc)
- delete(bb.addrToSc, addr)
-
- bb.lg.Info(
- "removed subconn",
- zap.String("picker", bb.picker.String()),
- zap.String("balancer-id", bb.id),
- zap.String("address", addr.Addr),
- zap.String("subconn", scToString(sc)),
- )
-
- // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
- // The entry will be deleted in HandleSubConnStateChange.
- // (DO NOT) delete(bb.scToAddr, sc)
- // (DO NOT) delete(bb.scToSt, sc)
- }
- }
-}
-
-// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
-func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
- bb.mu.Lock()
- defer bb.mu.Unlock()
-
- old, ok := bb.scToSt[sc]
- if !ok {
- bb.lg.Warn(
- "state change for an unknown subconn",
- zap.String("picker", bb.picker.String()),
- zap.String("balancer-id", bb.id),
- zap.String("subconn", scToString(sc)),
- zap.Int("subconn-size", len(bb.scToAddr)),
- zap.String("state", s.String()),
- )
- return
- }
-
- bb.lg.Info(
- "state changed",
- zap.String("picker", bb.picker.String()),
- zap.String("balancer-id", bb.id),
- zap.Bool("connected", s == grpcconnectivity.Ready),
- zap.String("subconn", scToString(sc)),
- zap.Int("subconn-size", len(bb.scToAddr)),
- zap.String("address", bb.scToAddr[sc].Addr),
- zap.String("old-state", old.String()),
- zap.String("new-state", s.String()),
- )
-
- bb.scToSt[sc] = s
- switch s {
- case grpcconnectivity.Idle:
- sc.Connect()
- case grpcconnectivity.Shutdown:
- // When an address was removed by resolver, b called RemoveSubConn but
- // kept the sc's state in scToSt. Remove state for this sc here.
- delete(bb.scToAddr, sc)
- delete(bb.scToSt, sc)
- }
-
- oldAggrState := bb.connectivityRecorder.GetCurrentState()
- bb.connectivityRecorder.RecordTransition(old, s)
-
- // Update balancer picker when one of the following happens:
- // - this sc became ready from not-ready
- // - this sc became not-ready from ready
- // - the aggregated state of balancer became TransientFailure from non-TransientFailure
- // - the aggregated state of balancer became non-TransientFailure from TransientFailure
- if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
- (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
- bb.updatePicker()
- }
-
- bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
-}
-
-func (bb *baseBalancer) updatePicker() {
- if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
- bb.picker = picker.NewErr(balancer.ErrTransientFailure)
- bb.lg.Info(
- "updated picker to transient error picker",
- zap.String("picker", bb.picker.String()),
- zap.String("balancer-id", bb.id),
- zap.String("policy", bb.policy.String()),
- )
- return
- }
-
- // only pass ready subconns to picker
- scToAddr := make(map[balancer.SubConn]resolver.Address)
- for addr, sc := range bb.addrToSc {
- if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
- scToAddr[sc] = addr
- }
- }
-
- bb.picker = picker.New(picker.Config{
- Policy: bb.policy,
- Logger: bb.lg,
- SubConnToResolverAddress: scToAddr,
- })
- bb.lg.Info(
- "updated picker",
- zap.String("picker", bb.picker.String()),
- zap.String("balancer-id", bb.id),
- zap.String("policy", bb.policy.String()),
- zap.Strings("subconn-ready", scsToStrings(scToAddr)),
- zap.Int("subconn-size", len(scToAddr)),
- )
-}
-
-// Close implements "grpc/balancer.Balancer" interface.
-// Close is a nop because base balancer doesn't have internal state to clean up,
-// and it doesn't need to call RemoveSubConn for the SubConns.
-func (bb *baseBalancer) Close() {
- // TODO
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity/connectivity.go b/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity/connectivity.go
deleted file mode 100644
index 4c4ad36..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/connectivity/connectivity.go
+++ /dev/null
@@ -1,93 +0,0 @@
-// Copyright 2019 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package connectivity implements client connectivity operations.
-package connectivity
-
-import (
- "sync"
-
- "go.uber.org/zap"
- "google.golang.org/grpc/connectivity"
-)
-
-// Recorder records gRPC connectivity.
-type Recorder interface {
- GetCurrentState() connectivity.State
- RecordTransition(oldState, newState connectivity.State)
-}
-
-// New returns a new Recorder.
-func New(lg *zap.Logger) Recorder {
- return &recorder{lg: lg}
-}
-
-// recorder takes the connectivity states of multiple SubConns
-// and returns one aggregated connectivity state.
-// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
-type recorder struct {
- lg *zap.Logger
-
- mu sync.RWMutex
-
- cur connectivity.State
-
- numReady uint64 // Number of addrConns in ready state.
- numConnecting uint64 // Number of addrConns in connecting state.
- numTransientFailure uint64 // Number of addrConns in transientFailure.
-}
-
-func (rc *recorder) GetCurrentState() (state connectivity.State) {
- rc.mu.RLock()
- defer rc.mu.RUnlock()
- return rc.cur
-}
-
-// RecordTransition records state change happening in subConn and based on that
-// it evaluates what aggregated state should be.
-//
-// - If at least one SubConn in Ready, the aggregated state is Ready;
-// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
-// - Else the aggregated state is TransientFailure.
-//
-// Idle and Shutdown are not considered.
-//
-// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go
-func (rc *recorder) RecordTransition(oldState, newState connectivity.State) {
- rc.mu.Lock()
- defer rc.mu.Unlock()
-
- for idx, state := range []connectivity.State{oldState, newState} {
- updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
- switch state {
- case connectivity.Ready:
- rc.numReady += updateVal
- case connectivity.Connecting:
- rc.numConnecting += updateVal
- case connectivity.TransientFailure:
- rc.numTransientFailure += updateVal
- default:
- rc.lg.Warn("connectivity recorder received unknown state", zap.String("connectivity-state", state.String()))
- }
- }
-
- switch { // must be exclusive, no overlap
- case rc.numReady > 0:
- rc.cur = connectivity.Ready
- case rc.numConnecting > 0:
- rc.cur = connectivity.Connecting
- default:
- rc.cur = connectivity.TransientFailure
- }
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/doc.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/doc.go
deleted file mode 100644
index 35dabf5..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/doc.go
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package picker defines/implements client balancer picker policy.
-package picker
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go
deleted file mode 100644
index 9e04378..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/err.go
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package picker
-
-import (
- "context"
-
- "google.golang.org/grpc/balancer"
-)
-
-// NewErr returns a picker that always returns err on "Pick".
-func NewErr(err error) Picker {
- return &errPicker{p: Error, err: err}
-}
-
-type errPicker struct {
- p Policy
- err error
-}
-
-func (ep *errPicker) String() string {
- return ep.p.String()
-}
-
-func (ep *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
- return nil, nil, ep.err
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go
deleted file mode 100644
index bd1a5d2..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/picker.go
+++ /dev/null
@@ -1,91 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package picker
-
-import (
- "fmt"
-
- "go.uber.org/zap"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/resolver"
-)
-
-// Picker defines balancer Picker methods.
-type Picker interface {
- balancer.Picker
- String() string
-}
-
-// Config defines picker configuration.
-type Config struct {
- // Policy specifies etcd clientv3's built in balancer policy.
- Policy Policy
-
- // Logger defines picker logging object.
- Logger *zap.Logger
-
- // SubConnToResolverAddress maps each gRPC sub-connection to an address.
- // Basically, it is a list of addresses that the Picker can pick from.
- SubConnToResolverAddress map[balancer.SubConn]resolver.Address
-}
-
-// Policy defines balancer picker policy.
-type Policy uint8
-
-const (
- // Error is error picker policy.
- Error Policy = iota
-
- // RoundrobinBalanced balances loads over multiple endpoints
- // and implements failover in roundrobin fashion.
- RoundrobinBalanced
-
- // Custom defines custom balancer picker.
- // TODO: custom picker is not supported yet.
- Custom
-)
-
-func (p Policy) String() string {
- switch p {
- case Error:
- return "picker-error"
-
- case RoundrobinBalanced:
- return "picker-roundrobin-balanced"
-
- case Custom:
- panic("'custom' picker policy is not supported yet")
-
- default:
- panic(fmt.Errorf("invalid balancer picker policy (%d)", p))
- }
-}
-
-// New creates a new Picker.
-func New(cfg Config) Picker {
- switch cfg.Policy {
- case Error:
- panic("'error' picker policy is not supported here; use 'picker.NewErr'")
-
- case RoundrobinBalanced:
- return newRoundrobinBalanced(cfg)
-
- case Custom:
- panic("'custom' picker policy is not supported yet")
-
- default:
- panic(fmt.Errorf("invalid balancer picker policy (%d)", cfg.Policy))
- }
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go b/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go
deleted file mode 100644
index 1b8b285..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/picker/roundrobin_balanced.go
+++ /dev/null
@@ -1,95 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package picker
-
-import (
- "context"
- "sync"
-
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/resolver"
-)
-
-// newRoundrobinBalanced returns a new roundrobin balanced picker.
-func newRoundrobinBalanced(cfg Config) Picker {
- scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress))
- for sc := range cfg.SubConnToResolverAddress {
- scs = append(scs, sc)
- }
- return &rrBalanced{
- p: RoundrobinBalanced,
- lg: cfg.Logger,
- scs: scs,
- scToAddr: cfg.SubConnToResolverAddress,
- }
-}
-
-type rrBalanced struct {
- p Policy
-
- lg *zap.Logger
-
- mu sync.RWMutex
- next int
- scs []balancer.SubConn
- scToAddr map[balancer.SubConn]resolver.Address
-}
-
-func (rb *rrBalanced) String() string { return rb.p.String() }
-
-// Pick is called for every client request.
-func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
- rb.mu.RLock()
- n := len(rb.scs)
- rb.mu.RUnlock()
- if n == 0 {
- return nil, nil, balancer.ErrNoSubConnAvailable
- }
-
- rb.mu.Lock()
- cur := rb.next
- sc := rb.scs[cur]
- picked := rb.scToAddr[sc].Addr
- rb.next = (rb.next + 1) % len(rb.scs)
- rb.mu.Unlock()
-
- rb.lg.Debug(
- "picked",
- zap.String("picker", rb.p.String()),
- zap.String("address", picked),
- zap.Int("subconn-index", cur),
- zap.Int("subconn-size", n),
- )
-
- doneFunc := func(info balancer.DoneInfo) {
- // TODO: error handling?
- fss := []zapcore.Field{
- zap.Error(info.Err),
- zap.String("picker", rb.p.String()),
- zap.String("address", picked),
- zap.Bool("success", info.Err == nil),
- zap.Bool("bytes-sent", info.BytesSent),
- zap.Bool("bytes-received", info.BytesReceived),
- }
- if info.Err == nil {
- rb.lg.Debug("balancer done", fss...)
- } else {
- rb.lg.Warn("balancer failed", fss...)
- }
- }
- return sc, doneFunc, nil
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/resolver/endpoint/endpoint.go b/vendor/go.etcd.io/etcd/clientv3/balancer/resolver/endpoint/endpoint.go
deleted file mode 100644
index 1f32039..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/resolver/endpoint/endpoint.go
+++ /dev/null
@@ -1,240 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint://<id>/<endpoint>'.
-package endpoint
-
-import (
- "fmt"
- "net/url"
- "strings"
- "sync"
-
- "google.golang.org/grpc/resolver"
-)
-
-const scheme = "endpoint"
-
-var (
- targetPrefix = fmt.Sprintf("%s://", scheme)
-
- bldr *builder
-)
-
-func init() {
- bldr = &builder{
- resolverGroups: make(map[string]*ResolverGroup),
- }
- resolver.Register(bldr)
-}
-
-type builder struct {
- mu sync.RWMutex
- resolverGroups map[string]*ResolverGroup
-}
-
-// NewResolverGroup creates a new ResolverGroup with the given id.
-func NewResolverGroup(id string) (*ResolverGroup, error) {
- return bldr.newResolverGroup(id)
-}
-
-// ResolverGroup keeps all endpoints of resolvers using a common endpoint://<id>/ target
-// up-to-date.
-type ResolverGroup struct {
- mu sync.RWMutex
- id string
- endpoints []string
- resolvers []*Resolver
-}
-
-func (e *ResolverGroup) addResolver(r *Resolver) {
- e.mu.Lock()
- addrs := epsToAddrs(e.endpoints...)
- e.resolvers = append(e.resolvers, r)
- e.mu.Unlock()
- r.cc.NewAddress(addrs)
-}
-
-func (e *ResolverGroup) removeResolver(r *Resolver) {
- e.mu.Lock()
- for i, er := range e.resolvers {
- if er == r {
- e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...)
- break
- }
- }
- e.mu.Unlock()
-}
-
-// SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated
-// immediately with the new endpoints.
-func (e *ResolverGroup) SetEndpoints(endpoints []string) {
- addrs := epsToAddrs(endpoints...)
- e.mu.Lock()
- e.endpoints = endpoints
- for _, r := range e.resolvers {
- r.cc.NewAddress(addrs)
- }
- e.mu.Unlock()
-}
-
-// Target constructs a endpoint target using the endpoint id of the ResolverGroup.
-func (e *ResolverGroup) Target(endpoint string) string {
- return Target(e.id, endpoint)
-}
-
-// Target constructs a endpoint resolver target.
-func Target(id, endpoint string) string {
- return fmt.Sprintf("%s://%s/%s", scheme, id, endpoint)
-}
-
-// IsTarget checks if a given target string in an endpoint resolver target.
-func IsTarget(target string) bool {
- return strings.HasPrefix(target, "endpoint://")
-}
-
-func (e *ResolverGroup) Close() {
- bldr.close(e.id)
-}
-
-// Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target.
-func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
- if len(target.Authority) < 1 {
- return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to")
- }
- id := target.Authority
- es, err := b.getResolverGroup(id)
- if err != nil {
- return nil, fmt.Errorf("failed to build resolver: %v", err)
- }
- r := &Resolver{
- endpointID: id,
- cc: cc,
- }
- es.addResolver(r)
- return r, nil
-}
-
-func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) {
- b.mu.RLock()
- _, ok := b.resolverGroups[id]
- b.mu.RUnlock()
- if ok {
- return nil, fmt.Errorf("Endpoint already exists for id: %s", id)
- }
-
- es := &ResolverGroup{id: id}
- b.mu.Lock()
- b.resolverGroups[id] = es
- b.mu.Unlock()
- return es, nil
-}
-
-func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) {
- b.mu.RLock()
- es, ok := b.resolverGroups[id]
- b.mu.RUnlock()
- if !ok {
- return nil, fmt.Errorf("ResolverGroup not found for id: %s", id)
- }
- return es, nil
-}
-
-func (b *builder) close(id string) {
- b.mu.Lock()
- delete(b.resolverGroups, id)
- b.mu.Unlock()
-}
-
-func (b *builder) Scheme() string {
- return scheme
-}
-
-// Resolver provides a resolver for a single etcd cluster, identified by name.
-type Resolver struct {
- endpointID string
- cc resolver.ClientConn
- sync.RWMutex
-}
-
-// TODO: use balancer.epsToAddrs
-func epsToAddrs(eps ...string) (addrs []resolver.Address) {
- addrs = make([]resolver.Address, 0, len(eps))
- for _, ep := range eps {
- addrs = append(addrs, resolver.Address{Addr: ep})
- }
- return addrs
-}
-
-func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}
-
-func (r *Resolver) Close() {
- es, err := bldr.getResolverGroup(r.endpointID)
- if err != nil {
- return
- }
- es.removeResolver(r)
-}
-
-// ParseEndpoint endpoint parses an endpoint of the form
-// (http|https)://<host>*|(unix|unixs)://<path>)
-// and returns a protocol ('tcp' or 'unix'),
-// host (or filepath if a unix socket),
-// scheme (http, https, unix, unixs).
-func ParseEndpoint(endpoint string) (proto string, host string, scheme string) {
- proto = "tcp"
- host = endpoint
- url, uerr := url.Parse(endpoint)
- if uerr != nil || !strings.Contains(endpoint, "://") {
- return proto, host, scheme
- }
- scheme = url.Scheme
-
- // strip scheme:// prefix since grpc dials by host
- host = url.Host
- switch url.Scheme {
- case "http", "https":
- case "unix", "unixs":
- proto = "unix"
- host = url.Host + url.Path
- default:
- proto, host = "", ""
- }
- return proto, host, scheme
-}
-
-// ParseTarget parses a endpoint://<id>/<endpoint> string and returns the parsed id and endpoint.
-// If the target is malformed, an error is returned.
-func ParseTarget(target string) (string, string, error) {
- noPrefix := strings.TrimPrefix(target, targetPrefix)
- if noPrefix == target {
- return "", "", fmt.Errorf("malformed target, %s prefix is required: %s", targetPrefix, target)
- }
- parts := strings.SplitN(noPrefix, "/", 2)
- if len(parts) != 2 {
- return "", "", fmt.Errorf("malformed target, expected %s://<id>/<endpoint>, but got %s", scheme, target)
- }
- return parts[0], parts[1], nil
-}
-
-// ParseHostPort splits a "<host>:<port>" string into the host and port parts.
-// The port part is optional.
-func ParseHostPort(hostPort string) (host string, port string) {
- parts := strings.SplitN(hostPort, ":", 2)
- host = parts[0]
- if len(parts) > 1 {
- port = parts[1]
- }
- return host, port
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go b/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go
deleted file mode 100644
index 48eb875..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/balancer/utils.go
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright 2018 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package balancer
-
-import (
- "fmt"
- "net/url"
- "sort"
- "sync/atomic"
- "time"
-
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/resolver"
-)
-
-func scToString(sc balancer.SubConn) string {
- return fmt.Sprintf("%p", sc)
-}
-
-func scsToStrings(scs map[balancer.SubConn]resolver.Address) (ss []string) {
- ss = make([]string, 0, len(scs))
- for sc, a := range scs {
- ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc)))
- }
- sort.Strings(ss)
- return ss
-}
-
-func addrsToStrings(addrs []resolver.Address) (ss []string) {
- ss = make([]string, len(addrs))
- for i := range addrs {
- ss[i] = addrs[i].Addr
- }
- sort.Strings(ss)
- return ss
-}
-
-func epsToAddrs(eps ...string) (addrs []resolver.Address) {
- addrs = make([]resolver.Address, 0, len(eps))
- for _, ep := range eps {
- u, err := url.Parse(ep)
- if err != nil {
- addrs = append(addrs, resolver.Address{Addr: ep, Type: resolver.Backend})
- continue
- }
- addrs = append(addrs, resolver.Address{Addr: u.Host, Type: resolver.Backend})
- }
- return addrs
-}
-
-var genN = new(uint32)
-
-func genName() string {
- now := time.Now().UnixNano()
- return fmt.Sprintf("%X%X", now, atomic.AddUint32(genN, 1))
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/client.go b/vendor/go.etcd.io/etcd/clientv3/client.go
index d6000a8..c49e4ba 100644
--- a/vendor/go.etcd.io/etcd/clientv3/client.go
+++ b/vendor/go.etcd.io/etcd/clientv3/client.go
@@ -25,19 +25,19 @@
"sync"
"time"
+ "github.com/coreos/etcd/clientv3/balancer"
+ "github.com/coreos/etcd/clientv3/balancer/picker"
+ "github.com/coreos/etcd/clientv3/balancer/resolver/endpoint"
+ "github.com/coreos/etcd/clientv3/credentials"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ "github.com/coreos/etcd/pkg/logutil"
+ "github.com/coreos/pkg/capnslog"
"github.com/google/uuid"
- "go.etcd.io/etcd/clientv3/balancer"
- "go.etcd.io/etcd/clientv3/balancer/picker"
- "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
- "go.etcd.io/etcd/clientv3/credentials"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- "go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@@ -48,6 +48,10 @@
roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
)
+var (
+ plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "clientv3")
+)
+
func init() {
lg := zap.NewNop()
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
@@ -129,12 +133,8 @@
// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
- if c.Watcher != nil {
- c.Watcher.Close()
- }
- if c.Lease != nil {
- c.Lease.Close()
- }
+ c.Watcher.Close()
+ c.Lease.Close()
if c.resolverGroup != nil {
c.resolverGroup.Close()
}
@@ -230,24 +230,17 @@
}
opts = append(opts, dopts...)
- // Provide a net dialer that supports cancelation and timeout.
- f := func(dialEp string, t time.Duration) (net.Conn, error) {
- proto, host, _ := endpoint.ParseEndpoint(dialEp)
- select {
- case <-c.ctx.Done():
- return nil, c.ctx.Err()
- default:
- }
- dialer := &net.Dialer{Timeout: t}
- return dialer.DialContext(c.ctx, proto, host)
- }
- opts = append(opts, grpc.WithDialer(f))
-
+ dialer := endpoint.Dialer
if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(creds))
+ // gRPC load balancer workaround. See credentials.transportCredential for details.
+ if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok {
+ dialer = credsDialer.Dialer
+ }
} else {
opts = append(opts, grpc.WithInsecure())
}
+ opts = append(opts, grpc.WithContextDialer(dialer))
// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
@@ -266,7 +259,10 @@
// Dial connects to a single endpoint using the client's config.
func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
- creds := c.directDialCreds(ep)
+ creds, err := c.directDialCreds(ep)
+ if err != nil {
+ return nil, err
+ }
// Use the grpc passthrough resolver to directly dial a single endpoint.
// This resolver passes through the 'unix' and 'unixs' endpoints schemes used
// by etcd without modification, allowing us to directly dial endpoints and
@@ -369,8 +365,8 @@
return conn, nil
}
-func (c *Client) directDialCreds(ep string) grpccredentials.TransportCredentials {
- _, hostPort, scheme := endpoint.ParseEndpoint(ep)
+func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredentials, error) {
+ _, host, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
@@ -379,12 +375,17 @@
// Set the server name must to the endpoint hostname without port since grpc
// otherwise attempts to check if x509 cert is valid for the full endpoint
// including the scheme and port, which fails.
- host, _ := endpoint.ParseHostPort(hostPort)
- clone.OverrideServerName(host)
+ overrideServerName, _, err := net.SplitHostPort(host)
+ if err != nil {
+ // Either the host didn't have a port or the host could not be parsed. Either way, continue with the
+ // original host string.
+ overrideServerName = host
+ }
+ clone.OverrideServerName(overrideServerName)
creds = clone
}
}
- return creds
+ return creds, nil
}
func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials {
@@ -396,13 +397,6 @@
return creds
}
-// WithRequireLeader requires client requests to only succeed
-// when the cluster has a leader.
-func WithRequireLeader(ctx context.Context) context.Context {
- md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
- return metadata.NewOutgoingContext(ctx, md)
-}
-
func newClient(cfg *Config) (*Client, error) {
if cfg == nil {
cfg = &Config{}
@@ -663,3 +657,9 @@
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
return strings.Contains(err.Error(), "grpc: the client connection is closing")
}
+
+// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details.
+type TransportCredentialsWithDialer interface {
+ grpccredentials.TransportCredentials
+ Dialer(ctx context.Context, dialEp string) (net.Conn, error)
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/cluster.go b/vendor/go.etcd.io/etcd/clientv3/cluster.go
index ce97e5c..785672b 100644
--- a/vendor/go.etcd.io/etcd/clientv3/cluster.go
+++ b/vendor/go.etcd.io/etcd/clientv3/cluster.go
@@ -17,19 +17,18 @@
import (
"context"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- "go.etcd.io/etcd/pkg/types"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "github.com/coreos/etcd/pkg/types"
"google.golang.org/grpc"
)
type (
- Member pb.Member
- MemberListResponse pb.MemberListResponse
- MemberAddResponse pb.MemberAddResponse
- MemberRemoveResponse pb.MemberRemoveResponse
- MemberUpdateResponse pb.MemberUpdateResponse
- MemberPromoteResponse pb.MemberPromoteResponse
+ Member pb.Member
+ MemberListResponse pb.MemberListResponse
+ MemberAddResponse pb.MemberAddResponse
+ MemberRemoveResponse pb.MemberRemoveResponse
+ MemberUpdateResponse pb.MemberUpdateResponse
)
type Cluster interface {
@@ -39,17 +38,11 @@
// MemberAdd adds a new member into the cluster.
MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
- // MemberAddAsLearner adds a new learner member into the cluster.
- MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
-
// MemberRemove removes an existing member from the cluster.
MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)
// MemberUpdate updates the peer addresses of the member.
MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error)
-
- // MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
- MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error)
}
type cluster struct {
@@ -74,23 +67,12 @@
}
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
- return c.memberAdd(ctx, peerAddrs, false)
-}
-
-func (c *cluster) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
- return c.memberAdd(ctx, peerAddrs, true)
-}
-
-func (c *cluster) memberAdd(ctx context.Context, peerAddrs []string, isLearner bool) (*MemberAddResponse, error) {
// fail-fast before panic in rafthttp
if _, err := types.NewURLs(peerAddrs); err != nil {
return nil, err
}
- r := &pb.MemberAddRequest{
- PeerURLs: peerAddrs,
- IsLearner: isLearner,
- }
+ r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
@@ -130,12 +112,3 @@
}
return nil, toErr(ctx, err)
}
-
-func (c *cluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) {
- r := &pb.MemberPromoteRequest{ID: id}
- resp, err := c.remote.MemberPromote(ctx, r, c.callOpts...)
- if err != nil {
- return nil, toErr(ctx, err)
- }
- return (*MemberPromoteResponse)(resp), nil
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/compact_op.go b/vendor/go.etcd.io/etcd/clientv3/compact_op.go
index 5779713..41e80c1 100644
--- a/vendor/go.etcd.io/etcd/clientv3/compact_op.go
+++ b/vendor/go.etcd.io/etcd/clientv3/compact_op.go
@@ -15,7 +15,7 @@
package clientv3
import (
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
// CompactOp represents a compact operation.
diff --git a/vendor/go.etcd.io/etcd/clientv3/compare.go b/vendor/go.etcd.io/etcd/clientv3/compare.go
index 01ed68e..b5f0a25 100644
--- a/vendor/go.etcd.io/etcd/clientv3/compare.go
+++ b/vendor/go.etcd.io/etcd/clientv3/compare.go
@@ -15,7 +15,7 @@
package clientv3
import (
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
type CompareTarget int
diff --git a/vendor/go.etcd.io/etcd/clientv3/config.go b/vendor/go.etcd.io/etcd/clientv3/config.go
index 11d447d..9c17fc2 100644
--- a/vendor/go.etcd.io/etcd/clientv3/config.go
+++ b/vendor/go.etcd.io/etcd/clientv3/config.go
@@ -72,17 +72,15 @@
// Without this, Dial returns immediately and connecting the server happens in background.
DialOptions []grpc.DialOption
- // Context is the default client context; it can be used to cancel grpc dial out and
- // other operations that do not have an explicit context.
- Context context.Context
-
// LogConfig configures client-side logger.
// If nil, use the default logger.
// TODO: configure gRPC logger
LogConfig *zap.Config
+ // Context is the default client context; it can be used to cancel grpc dial out and
+ // other operations that do not have an explicit context.
+ Context context.Context
+
// PermitWithoutStream when set will allow client to send keepalive pings to server without any active streams(RPCs).
PermitWithoutStream bool `json:"permit-without-stream"`
-
- // TODO: support custom balancer picker
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/credentials/credentials.go b/vendor/go.etcd.io/etcd/clientv3/credentials/credentials.go
deleted file mode 100644
index e6fd75c..0000000
--- a/vendor/go.etcd.io/etcd/clientv3/credentials/credentials.go
+++ /dev/null
@@ -1,155 +0,0 @@
-// Copyright 2019 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package credentials implements gRPC credential interface with etcd specific logic.
-// e.g., client handshake with custom authority parameter
-package credentials
-
-import (
- "context"
- "crypto/tls"
- "net"
- "sync"
-
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- grpccredentials "google.golang.org/grpc/credentials"
-)
-
-// Config defines gRPC credential configuration.
-type Config struct {
- TLSConfig *tls.Config
-}
-
-// Bundle defines gRPC credential interface.
-type Bundle interface {
- grpccredentials.Bundle
- UpdateAuthToken(token string)
-}
-
-// NewBundle constructs a new gRPC credential bundle.
-func NewBundle(cfg Config) Bundle {
- return &bundle{
- tc: newTransportCredential(cfg.TLSConfig),
- rc: newPerRPCCredential(),
- }
-}
-
-// bundle implements "grpccredentials.Bundle" interface.
-type bundle struct {
- tc *transportCredential
- rc *perRPCCredential
-}
-
-func (b *bundle) TransportCredentials() grpccredentials.TransportCredentials {
- return b.tc
-}
-
-func (b *bundle) PerRPCCredentials() grpccredentials.PerRPCCredentials {
- return b.rc
-}
-
-func (b *bundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
- // no-op
- return nil, nil
-}
-
-// transportCredential implements "grpccredentials.TransportCredentials" interface.
-type transportCredential struct {
- gtc grpccredentials.TransportCredentials
-}
-
-func newTransportCredential(cfg *tls.Config) *transportCredential {
- return &transportCredential{
- gtc: grpccredentials.NewTLS(cfg),
- }
-}
-
-func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
- // Only overwrite when authority is an IP address!
- // Let's say, a server runs SRV records on "etcd.local" that resolves
- // to "m1.etcd.local", and its SAN field also includes "m1.etcd.local".
- // But what if SAN does not include its resolved IP address (e.g. 127.0.0.1)?
- // Then, the server should only authenticate using its DNS hostname "m1.etcd.local",
- // instead of overwriting it with its IP address.
- // And we do not overwrite "localhost" either. Only overwrite IP addresses!
- if isIP(authority) {
- target := rawConn.RemoteAddr().String()
- if authority != target {
- // When user dials with "grpc.WithDialer", "grpc.DialContext" "cc.parsedTarget"
- // update only happens once. This is problematic, because when TLS is enabled,
- // retries happen through "grpc.WithDialer" with static "cc.parsedTarget" from
- // the initial dial call.
- // If the server authenticates by IP addresses, we want to set a new endpoint as
- // a new authority. Otherwise
- // "transport: authentication handshake failed: x509: certificate is valid for 127.0.0.1, 192.168.121.180, not 192.168.223.156"
- // when the new dial target is "192.168.121.180" whose certificate host name is also "192.168.121.180"
- // but client tries to authenticate with previously set "cc.parsedTarget" field "192.168.223.156"
- authority = target
- }
- }
- return tc.gtc.ClientHandshake(ctx, authority, rawConn)
-}
-
-// return true if given string is an IP.
-func isIP(ep string) bool {
- return net.ParseIP(ep) != nil
-}
-
-func (tc *transportCredential) ServerHandshake(rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
- return tc.gtc.ServerHandshake(rawConn)
-}
-
-func (tc *transportCredential) Info() grpccredentials.ProtocolInfo {
- return tc.gtc.Info()
-}
-
-func (tc *transportCredential) Clone() grpccredentials.TransportCredentials {
- return &transportCredential{
- gtc: tc.gtc.Clone(),
- }
-}
-
-func (tc *transportCredential) OverrideServerName(serverNameOverride string) error {
- return tc.gtc.OverrideServerName(serverNameOverride)
-}
-
-// perRPCCredential implements "grpccredentials.PerRPCCredentials" interface.
-type perRPCCredential struct {
- authToken string
- authTokenMu sync.RWMutex
-}
-
-func newPerRPCCredential() *perRPCCredential { return &perRPCCredential{} }
-
-func (rc *perRPCCredential) RequireTransportSecurity() bool { return false }
-
-func (rc *perRPCCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
- rc.authTokenMu.RLock()
- authToken := rc.authToken
- rc.authTokenMu.RUnlock()
- return map[string]string{rpctypes.TokenFieldNameGRPC: authToken}, nil
-}
-
-func (b *bundle) UpdateAuthToken(token string) {
- if b.rc == nil {
- return
- }
- b.rc.UpdateAuthToken(token)
-}
-
-func (rc *perRPCCredential) UpdateAuthToken(token string) {
- rc.authTokenMu.Lock()
- rc.authToken = token
- rc.authTokenMu.Unlock()
-}
diff --git a/vendor/go.etcd.io/etcd/clientv3/ctx.go b/vendor/go.etcd.io/etcd/clientv3/ctx.go
new file mode 100644
index 0000000..da8297b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/ctx.go
@@ -0,0 +1,64 @@
+// Copyright 2020 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+ "context"
+ "strings"
+
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ "github.com/coreos/etcd/version"
+ "google.golang.org/grpc/metadata"
+)
+
+// WithRequireLeader requires client requests to only succeed
+// when the cluster has a leader.
+func WithRequireLeader(ctx context.Context) context.Context {
+ md, ok := metadata.FromOutgoingContext(ctx)
+ if !ok { // no outgoing metadata ctx key, create one
+ md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
+ return metadata.NewOutgoingContext(ctx, md)
+ }
+ copied := md.Copy() // avoid racey updates
+ // overwrite/add 'hasleader' key/value
+ metadataSet(copied, rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
+ return metadata.NewOutgoingContext(ctx, copied)
+}
+
+// embeds client version
+func withVersion(ctx context.Context) context.Context {
+ md, ok := metadata.FromOutgoingContext(ctx)
+ if !ok { // no outgoing metadata ctx key, create one
+ md = metadata.Pairs(rpctypes.MetadataClientAPIVersionKey, version.APIVersion)
+ return metadata.NewOutgoingContext(ctx, md)
+ }
+ copied := md.Copy() // avoid racey updates
+ // overwrite/add version key/value
+ metadataSet(copied, rpctypes.MetadataClientAPIVersionKey, version.APIVersion)
+ return metadata.NewOutgoingContext(ctx, copied)
+}
+
+func metadataGet(md metadata.MD, k string) []string {
+ k = strings.ToLower(k)
+ return md[k]
+}
+
+func metadataSet(md metadata.MD, k string, vals ...string) {
+ if len(vals) == 0 {
+ return
+ }
+ k = strings.ToLower(k)
+ md[k] = vals
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/doc.go b/vendor/go.etcd.io/etcd/clientv3/doc.go
index 913cd28..717fbe4 100644
--- a/vendor/go.etcd.io/etcd/clientv3/doc.go
+++ b/vendor/go.etcd.io/etcd/clientv3/doc.go
@@ -19,7 +19,7 @@
// // expect dial time-out on ipv4 blackhole
// _, err := clientv3.New(clientv3.Config{
// Endpoints: []string{"http://254.0.0.1:12345"},
-// DialTimeout: 2 * time.Second,
+// DialTimeout: 2 * time.Second
// })
//
// // etcd clientv3 >= v3.2.10, grpc/grpc-go >= v1.7.3
@@ -61,7 +61,7 @@
//
// 1. context error: canceled or deadline exceeded.
// 2. gRPC status error: e.g. when clock drifts in server-side before client's context deadline exceeded.
-// 3. gRPC error: see https://github.com/etcd-io/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
+// 3. gRPC error: see https://github.com/coreos/etcd/blob/master/etcdserver/api/v3rpc/rpctypes/error.go
//
// Here is the example code to handle client errors:
//
@@ -71,14 +71,14 @@
// // ctx is canceled by another routine
// } else if err == context.DeadlineExceeded {
// // ctx is attached with a deadline and it exceeded
-// } else if err == rpctypes.ErrEmptyKey {
-// // client-side error: key is not provided
// } else if ev, ok := status.FromError(err); ok {
// code := ev.Code()
// if code == codes.DeadlineExceeded {
// // server-side context might have timed-out first (due to clock skew)
// // while original client-side context is not timed-out yet
// }
+// } else if verr, ok := err.(*v3rpc.ErrEmptyKey); ok {
+// // process (verr.Errors)
// } else {
// // bad cluster endpoints, which are not etcd servers
// }
@@ -87,20 +87,11 @@
// go func() { cli.Close() }()
// _, err := kvc.Get(ctx, "a")
// if err != nil {
-// // with etcd clientv3 <= v3.3
// if err == context.Canceled {
// // grpc balancer calls 'Get' with an inflight client.Close
-// } else if err == grpc.ErrClientConnClosing { // <= gRCP v1.7.x
+// } else if err == grpc.ErrClientConnClosing {
// // grpc balancer calls 'Get' after client.Close.
// }
-// // with etcd clientv3 >= v3.4
-// if clientv3.IsConnCanceled(err) {
-// // gRPC client connection is closed
-// }
// }
//
-// The grpc load balancer is registered statically and is shared across etcd clients.
-// To enable detailed load balancer logging, set the ETCD_CLIENT_DEBUG environment
-// variable. E.g. "ETCD_CLIENT_DEBUG=1".
-//
package clientv3
diff --git a/vendor/go.etcd.io/etcd/clientv3/kv.go b/vendor/go.etcd.io/etcd/clientv3/kv.go
index 2b7864a..5a7469b 100644
--- a/vendor/go.etcd.io/etcd/clientv3/kv.go
+++ b/vendor/go.etcd.io/etcd/clientv3/kv.go
@@ -17,7 +17,7 @@
import (
"context"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
diff --git a/vendor/go.etcd.io/etcd/clientv3/lease.go b/vendor/go.etcd.io/etcd/clientv3/lease.go
index c2796fc..3729cf3 100644
--- a/vendor/go.etcd.io/etcd/clientv3/lease.go
+++ b/vendor/go.etcd.io/etcd/clientv3/lease.go
@@ -19,10 +19,9 @@
"sync"
"time"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- "go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@@ -118,21 +117,22 @@
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
- // KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
- // to the channel are not consumed promptly the channel may become full. When full, the lease
- // client will continue sending keep alive requests to the etcd server, but will drop responses
- // until there is capacity on the channel to send more responses.
- //
- // If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or
- // canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error
- // containing the error reason.
+ // KeepAlive keeps the given lease alive forever. If the keepalive response
+ // posted to the channel is not consumed immediately, the lease client will
+ // continue sending keep alive requests to the etcd server at least every
+ // second until latest response is consumed.
//
// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
// alive stream is interrupted in some way the client cannot handle itself;
- // given context "ctx" is canceled or timed out.
+ // given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
+ // from this closed channel is nil.
+ //
+ // If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
+ // no leader") or canceled by the caller (e.g. context.Canceled), the error
+ // is returned. Otherwise, it retries.
//
// TODO(v4.0): post errors to last keep alive message before closing
- // (see https://github.com/etcd-io/etcd/pull/7866)
+ // (see https://github.com/coreos/etcd/pull/7866)
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. The response corresponds to the
@@ -172,8 +172,6 @@
firstKeepAliveOnce sync.Once
callOpts []grpc.CallOption
-
- lg *zap.Logger
}
// keepAlive multiplexes a keepalive for a lease over multiple channels
@@ -198,7 +196,6 @@
keepAlives: make(map[LeaseID]*keepAlive),
remote: remote,
firstKeepAliveTimeout: keepAliveTimeout,
- lg: c.lg,
}
if l.firstKeepAliveTimeout == time.Second {
l.firstKeepAliveTimeout = defaultTTL
@@ -294,7 +291,7 @@
}
l.mu.Unlock()
- go l.keepAliveCtxCloser(ctx, id, ka.donec)
+ go l.keepAliveCtxCloser(id, ctx, ka.donec)
l.firstKeepAliveOnce.Do(func() {
go l.recvKeepAliveLoop()
go l.deadlineLoop()
@@ -326,7 +323,7 @@
return nil
}
-func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
+func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
select {
case <-donec:
return
@@ -462,6 +459,7 @@
select {
case <-time.After(retryConnWait):
+ continue
case <-l.stopCtx.Done():
return l.stopCtx.Err()
}
@@ -471,7 +469,7 @@
// resetRecv opens a new lease stream and starts sending keep alive requests.
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
sctx, cancel := context.WithCancel(l.stopCtx)
- stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
+ stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
if err != nil {
cancel()
return nil, err
@@ -520,12 +518,6 @@
select {
case ch <- karesp:
default:
- if l.lg != nil {
- l.lg.Warn("lease keepalive response queue is full; dropping response send",
- zap.Int("queue-size", len(ch)),
- zap.Int("queue-capacity", cap(ch)),
- )
- }
}
// still advance in order to rate-limit keep-alive sends
ka.nextKeepAlive = nextKeepAlive
@@ -577,7 +569,7 @@
}
select {
- case <-time.After(retryConnWait):
+ case <-time.After(500 * time.Millisecond):
case <-stream.Context().Done():
return
case <-l.donec:
diff --git a/vendor/go.etcd.io/etcd/clientv3/logger.go b/vendor/go.etcd.io/etcd/clientv3/logger.go
index f5ae010..3276372 100644
--- a/vendor/go.etcd.io/etcd/clientv3/logger.go
+++ b/vendor/go.etcd.io/etcd/clientv3/logger.go
@@ -18,7 +18,7 @@
"io/ioutil"
"sync"
- "go.etcd.io/etcd/pkg/logutil"
+ "github.com/coreos/etcd/pkg/logutil"
"google.golang.org/grpc/grpclog"
)
diff --git a/vendor/go.etcd.io/etcd/clientv3/maintenance.go b/vendor/go.etcd.io/etcd/clientv3/maintenance.go
index 744455a..5e87cf8 100644
--- a/vendor/go.etcd.io/etcd/clientv3/maintenance.go
+++ b/vendor/go.etcd.io/etcd/clientv3/maintenance.go
@@ -19,7 +19,7 @@
"fmt"
"io"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
@@ -193,23 +193,32 @@
return nil, toErr(ctx, err)
}
+ plog.Info("opened snapshot stream; downloading")
pr, pw := io.Pipe()
go func() {
for {
resp, err := ss.Recv()
if err != nil {
+ switch err {
+ case io.EOF:
+ plog.Info("completed snapshot read; closing")
+ default:
+ plog.Warningf("failed to receive from snapshot stream; closing (%v)", err)
+ }
pw.CloseWithError(err)
return
}
- if resp == nil && err == nil {
- break
- }
+
+ // can "resp == nil && err == nil"
+ // before we receive snapshot SHA digest?
+ // No, server sends EOF with an empty response
+ // after it sends SHA digest at the end
+
if _, werr := pw.Write(resp.Blob); werr != nil {
pw.CloseWithError(werr)
return
}
}
- pw.Close()
}()
return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/op.go b/vendor/go.etcd.io/etcd/clientv3/op.go
index 81ae31f..3dca41b 100644
--- a/vendor/go.etcd.io/etcd/clientv3/op.go
+++ b/vendor/go.etcd.io/etcd/clientv3/op.go
@@ -14,7 +14,7 @@
package clientv3
-import pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+import pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
type opType int
@@ -26,7 +26,9 @@
tTxn
)
-var noPrefixEnd = []byte{0}
+var (
+ noPrefixEnd = []byte{0}
+)
// Op represents an Operation that kv can execute.
type Op struct {
@@ -81,15 +83,8 @@
// accessors / mutators
-// IsTxn returns true if the "Op" type is transaction.
-func (op Op) IsTxn() bool {
- return op.t == tTxn
-}
-
-// Txn returns the comparison(if) operations, "then" operations, and "else" operations.
-func (op Op) Txn() ([]Cmp, []Op, []Op) {
- return op.cmps, op.thenOps, op.elseOps
-}
+func (op Op) IsTxn() bool { return op.t == tTxn }
+func (op Op) Txn() ([]Cmp, []Op, []Op) { return op.cmps, op.thenOps, op.elseOps }
// KeyBytes returns the byte slice holding the Op's key.
func (op Op) KeyBytes() []byte { return op.key }
@@ -113,13 +108,13 @@
func (op Op) IsDelete() bool { return op.t == tDeleteRange }
// IsSerializable returns true if the serializable field is true.
-func (op Op) IsSerializable() bool { return op.serializable }
+func (op Op) IsSerializable() bool { return op.serializable == true }
// IsKeysOnly returns whether keysOnly is set.
-func (op Op) IsKeysOnly() bool { return op.keysOnly }
+func (op Op) IsKeysOnly() bool { return op.keysOnly == true }
// IsCountOnly returns whether countOnly is set.
-func (op Op) IsCountOnly() bool { return op.countOnly }
+func (op Op) IsCountOnly() bool { return op.countOnly == true }
// MinModRev returns the operation's minimum modify revision.
func (op Op) MinModRev() int64 { return op.minModRev }
@@ -216,23 +211,13 @@
return op.t != tRange
}
-// OpGet returns "get" operation based on given key and operation options.
func OpGet(key string, opts ...OpOption) Op {
- // WithPrefix and WithFromKey are not supported together
- if isWithPrefix(opts) && isWithFromKey(opts) {
- panic("`WithPrefix` and `WithFromKey` cannot be set at the same time, choose one")
- }
ret := Op{t: tRange, key: []byte(key)}
ret.applyOpts(opts)
return ret
}
-// OpDelete returns "delete" operation based on given key and operation options.
func OpDelete(key string, opts ...OpOption) Op {
- // WithPrefix and WithFromKey are not supported together
- if isWithPrefix(opts) && isWithFromKey(opts) {
- panic("`WithPrefix` and `WithFromKey` cannot be set at the same time, choose one")
- }
ret := Op{t: tDeleteRange, key: []byte(key)}
ret.applyOpts(opts)
switch {
@@ -260,7 +245,6 @@
return ret
}
-// OpPut returns "put" operation based on given key-value and operation options.
func OpPut(key, val string, opts ...OpOption) Op {
ret := Op{t: tPut, key: []byte(key), val: []byte(val)}
ret.applyOpts(opts)
@@ -289,7 +273,6 @@
return ret
}
-// OpTxn returns "txn" operation based on given transaction conditions.
func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op {
return Op{t: tTxn, cmps: cmps, thenOps: thenOps, elseOps: elseOps}
}
@@ -400,14 +383,7 @@
// WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests
// to be equal or greater than the key in the argument.
-func WithFromKey() OpOption {
- return func(op *Op) {
- if len(op.key) == 0 {
- op.key = []byte{0}
- }
- op.end = []byte("\x00")
- }
-}
+func WithFromKey() OpOption { return WithRange("\x00") }
// WithSerializable makes 'Get' request serializable. By default,
// it's linearizable. Serializable requests are better for lower latency
@@ -496,17 +472,6 @@
}
}
-// WithFragment to receive raw watch response with fragmentation.
-// Fragmentation is disabled by default. If fragmentation is enabled,
-// etcd watch server will split watch response before sending to clients
-// when the total size of watch events exceed server-side request limit.
-// The default server-side request limit is 1.5 MiB, which can be configured
-// as "--max-request-bytes" flag value + gRPC-overhead 512 bytes.
-// See "etcdserver/api/v3rpc/watch.go" for more details.
-func WithFragment() OpOption {
- return func(op *Op) { op.fragment = true }
-}
-
// WithIgnoreValue updates the key using its current value.
// This option can not be combined with non-empty values.
// Returns an error if the key does not exist.
@@ -553,8 +518,13 @@
return &pb.LeaseTimeToLiveRequest{ID: int64(id), Keys: ret.attachedKeys}
}
-// isWithPrefix returns true if WithPrefix is being called in the op
-func isWithPrefix(opts []OpOption) bool { return isOpFuncCalled("WithPrefix", opts) }
-
-// isWithFromKey returns true if WithFromKey is being called in the op
-func isWithFromKey(opts []OpOption) bool { return isOpFuncCalled("WithFromKey", opts) }
+// WithFragment to receive raw watch response with fragmentation.
+// Fragmentation is disabled by default. If fragmentation is enabled,
+// etcd watch server will split watch response before sending to clients
+// when the total size of watch events exceed server-side request limit.
+// The default server-side request limit is 1.5 MiB, which can be configured
+// as "--max-request-bytes" flag value + gRPC-overhead 512 bytes.
+// See "etcdserver/api/v3rpc/watch.go" for more details.
+func WithFragment() OpOption {
+ return func(op *Op) { op.fragment = true }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/retry.go b/vendor/go.etcd.io/etcd/clientv3/retry.go
index 7e855de..6baa52e 100644
--- a/vendor/go.etcd.io/etcd/clientv3/retry.go
+++ b/vendor/go.etcd.io/etcd/clientv3/retry.go
@@ -17,8 +17,8 @@
import (
"context"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -179,10 +179,6 @@
return rcc.cc.MemberUpdate(ctx, in, opts...)
}
-func (rcc *retryClusterClient) MemberPromote(ctx context.Context, in *pb.MemberPromoteRequest, opts ...grpc.CallOption) (resp *pb.MemberPromoteResponse, err error) {
- return rcc.cc.MemberPromote(ctx, in, opts...)
-}
-
type retryMaintenanceClient struct {
mc pb.MaintenanceClient
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go b/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go
index 080490a..f3c5057 100644
--- a/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go
+++ b/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go
@@ -23,7 +23,7 @@
"sync"
"time"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -38,6 +38,7 @@
func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor {
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ ctx = withVersion(ctx)
grpcOpts, retryOpts := filterCallOptions(opts)
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
// short circuit for simplicity, and avoiding allocations.
@@ -103,6 +104,7 @@
func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor {
intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ ctx = withVersion(ctx)
grpcOpts, retryOpts := filterCallOptions(opts)
callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
// short circuit for simplicity, and avoiding allocations.
@@ -113,10 +115,9 @@
return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()")
}
newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
- logger.Warn("retry stream intercept", zap.Error(err))
if err != nil {
- // TODO(mwitkow): Maybe dial and transport errors should be retriable?
- return nil, err
+ logger.Error("streamer failed to create ClientStream", zap.Error(err))
+ return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
}
retryingStreamer := &serverStreamingRetryingStream{
client: c,
@@ -185,6 +186,7 @@
if !attemptRetry {
return lastErr // success or hard failure
}
+
// We start off from attempt 1, because zeroth was already made on normal SendMsg().
for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil {
@@ -192,12 +194,13 @@
}
newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
if err != nil {
- // TODO(mwitkow): Maybe dial and transport errors should be retriable?
- return err
+ s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err))
+ return err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
}
s.setStream(newStream)
+
+ s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr))
attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
- //fmt.Printf("Received message and indicate: %v %v\n", attemptRetry, lastErr)
if !attemptRetry {
return lastErr
}
diff --git a/vendor/go.etcd.io/etcd/clientv3/txn.go b/vendor/go.etcd.io/etcd/clientv3/txn.go
index c19715d..c3c2d24 100644
--- a/vendor/go.etcd.io/etcd/clientv3/txn.go
+++ b/vendor/go.etcd.io/etcd/clientv3/txn.go
@@ -18,7 +18,7 @@
"context"
"sync"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)
diff --git a/vendor/go.etcd.io/etcd/clientv3/watch.go b/vendor/go.etcd.io/etcd/clientv3/watch.go
index 87d222d..4a3b8cc 100644
--- a/vendor/go.etcd.io/etcd/clientv3/watch.go
+++ b/vendor/go.etcd.io/etcd/clientv3/watch.go
@@ -21,9 +21,9 @@
"sync"
"time"
- v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
- mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
+ v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"