VOL-2112 move to voltha-lib-go
Change-Id: Ic1af08003c1d2c698c0cce371e64f47b47b8d875
diff --git a/vendor/go.etcd.io/etcd/clientv3/client.go b/vendor/go.etcd.io/etcd/clientv3/client.go
index 445ecfe..d6000a8 100644
--- a/vendor/go.etcd.io/etcd/clientv3/client.go
+++ b/vendor/go.etcd.io/etcd/clientv3/client.go
@@ -16,7 +16,6 @@
import (
"context"
- "crypto/tls"
"errors"
"fmt"
"net"
@@ -30,12 +29,13 @@
"go.etcd.io/etcd/clientv3/balancer"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
+ "go.etcd.io/etcd/clientv3/credentials"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
+ grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
@@ -51,12 +51,17 @@
func init() {
lg := zap.NewNop()
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
+ lcfg := logutil.DefaultZapLoggerConfig
+ lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
+
var err error
- lg, err = zap.NewProductionConfig().Build() // info level logging
+ lg, err = lcfg.Build() // info level logging
if err != nil {
panic(err)
}
}
+
+ // TODO: support custom balancer
balancer.RegisterBuilder(balancer.Config{
Policy: picker.RoundrobinBalanced,
Name: roundRobinBalancerName,
@@ -76,10 +81,9 @@
conn *grpc.ClientConn
cfg Config
- creds *credentials.TransportCredentials
- balancer balancer.Balancer
+ creds grpccredentials.TransportCredentials
resolverGroup *endpoint.ResolverGroup
- mu *sync.Mutex
+ mu *sync.RWMutex
ctx context.Context
cancel context.CancelFunc
@@ -87,9 +91,8 @@
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
- Password string
- // tokenCred is an instance of WithPerRPCCredentials()'s argument
- tokenCred *authTokenCredential
+ Password string
+ authTokenBundle credentials.Bundle
callOpts []grpc.CallOption
@@ -126,8 +129,12 @@
// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
- c.Watcher.Close()
- c.Lease.Close()
+ if c.Watcher != nil {
+ c.Watcher.Close()
+ }
+ if c.Lease != nil {
+ c.Lease.Close()
+ }
if c.resolverGroup != nil {
c.resolverGroup.Close()
}
@@ -143,11 +150,13 @@
func (c *Client) Ctx() context.Context { return c.ctx }
// Endpoints lists the registered endpoints for the client.
-func (c *Client) Endpoints() (eps []string) {
+func (c *Client) Endpoints() []string {
// copy the slice; protect original endpoints from being changed
- eps = make([]string, len(c.cfg.Endpoints))
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ eps := make([]string, len(c.cfg.Endpoints))
copy(eps, c.cfg.Endpoints)
- return
+ return eps
}
// SetEndpoints updates client's endpoints.
@@ -192,24 +201,7 @@
}
}
-type authTokenCredential struct {
- token string
- tokenMu *sync.RWMutex
-}
-
-func (cred authTokenCredential) RequireTransportSecurity() bool {
- return false
-}
-
-func (cred authTokenCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
- cred.tokenMu.RLock()
- defer cred.tokenMu.RUnlock()
- return map[string]string{
- rpctypes.TokenFieldNameGRPC: cred.token,
- }, nil
-}
-
-func (c *Client) processCreds(scheme string) (creds *credentials.TransportCredentials) {
+func (c *Client) processCreds(scheme string) (creds grpccredentials.TransportCredentials) {
creds = c.creds
switch scheme {
case "unix":
@@ -219,9 +211,7 @@
if creds != nil {
break
}
- tlsconfig := &tls.Config{}
- emptyCreds := credentials.NewTLS(tlsconfig)
- creds = &emptyCreds
+ creds = credentials.NewBundle(credentials.Config{}).TransportCredentials()
default:
creds = nil
}
@@ -229,7 +219,7 @@
}
// dialSetupOpts gives the dial opts prior to any authentication.
-func (c *Client) dialSetupOpts(creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
+func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
if c.cfg.DialKeepAliveTime > 0 {
params := keepalive.ClientParameters{
Time: c.cfg.DialKeepAliveTime,
@@ -254,7 +244,7 @@
opts = append(opts, grpc.WithDialer(f))
if creds != nil {
- opts = append(opts, grpc.WithTransportCredentials(*creds))
+ opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
@@ -288,8 +278,8 @@
var err error // return last error in a case of fail
var auth *authenticator
- for i := 0; i < len(c.cfg.Endpoints); i++ {
- ep := c.cfg.Endpoints[i]
+ eps := c.Endpoints()
+ for _, ep := range eps {
// use dial options without dopts to avoid reusing the client balancer
var dOpts []grpc.DialOption
_, host, _ := endpoint.ParseEndpoint(ep)
@@ -317,10 +307,7 @@
continue
}
- c.tokenCred.tokenMu.Lock()
- c.tokenCred.token = resp.Token
- c.tokenCred.tokenMu.Unlock()
-
+ c.authTokenBundle.UpdateAuthToken(resp.Token)
return nil
}
@@ -337,16 +324,14 @@
}
// dial configures and dials any grpc balancer target.
-func (c *Client) dial(target string, creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
+func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts, err := c.dialSetupOpts(creds, dopts...)
if err != nil {
return nil, fmt.Errorf("failed to configure dialer: %v", err)
}
if c.Username != "" && c.Password != "" {
- c.tokenCred = &authTokenCredential{
- tokenMu: &sync.RWMutex{},
- }
+ c.authTokenBundle = credentials.NewBundle(credentials.Config{})
ctx, cancel := c.ctx, func() {}
if c.cfg.DialTimeout > 0 {
@@ -363,7 +348,7 @@
return nil, err
}
} else {
- opts = append(opts, grpc.WithPerRPCCredentials(c.tokenCred))
+ opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
}
cancel()
}
@@ -384,26 +369,25 @@
return conn, nil
}
-func (c *Client) directDialCreds(ep string) *credentials.TransportCredentials {
+func (c *Client) directDialCreds(ep string) grpccredentials.TransportCredentials {
_, hostPort, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
if creds != nil {
- c := *creds
- clone := c.Clone()
+ clone := creds.Clone()
// Set the server name must to the endpoint hostname without port since grpc
// otherwise attempts to check if x509 cert is valid for the full endpoint
// including the scheme and port, which fails.
host, _ := endpoint.ParseHostPort(hostPort)
clone.OverrideServerName(host)
- creds = &clone
+ creds = clone
}
}
return creds
}
-func (c *Client) dialWithBalancerCreds(ep string) *credentials.TransportCredentials {
+func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials {
_, _, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
@@ -423,10 +407,9 @@
if cfg == nil {
cfg = &Config{}
}
- var creds *credentials.TransportCredentials
+ var creds grpccredentials.TransportCredentials
if cfg.TLS != nil {
- c := credentials.NewTLS(cfg.TLS)
- creds = &c
+ creds = credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials()
}
// use a temporary skeleton client to bootstrap first connection
@@ -442,7 +425,7 @@
creds: creds,
ctx: ctx,
cancel: cancel,
- mu: new(sync.Mutex),
+ mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
}
@@ -540,13 +523,17 @@
func (c *Client) checkVersion() (err error) {
var wg sync.WaitGroup
- errc := make(chan error, len(c.cfg.Endpoints))
+
+ eps := c.Endpoints()
+ errc := make(chan error, len(eps))
ctx, cancel := context.WithCancel(c.ctx)
if c.cfg.DialTimeout > 0 {
- ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
+ cancel()
+ ctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
}
- wg.Add(len(c.cfg.Endpoints))
- for _, ep := range c.cfg.Endpoints {
+
+ wg.Add(len(eps))
+ for _, ep := range eps {
// if cluster is current, any endpoint gives a recent version
go func(e string) {
defer wg.Done()
@@ -558,8 +545,15 @@
vs := strings.Split(resp.Version, ".")
maj, min := 0, 0
if len(vs) >= 2 {
- maj, _ = strconv.Atoi(vs[0])
- min, rerr = strconv.Atoi(vs[1])
+ var serr error
+ if maj, serr = strconv.Atoi(vs[0]); serr != nil {
+ errc <- serr
+ return
+ }
+ if min, serr = strconv.Atoi(vs[1]); serr != nil {
+ errc <- serr
+ return
+ }
}
if maj < 3 || (maj == 3 && min < 2) {
rerr = ErrOldCluster
@@ -568,7 +562,7 @@
}(ep)
}
// wait for success
- for i := 0; i < len(c.cfg.Endpoints); i++ {
+ for range eps {
if err = <-errc; err == nil {
break
}
@@ -608,10 +602,13 @@
if err == nil {
return false
}
- ev, _ := status.FromError(err)
- // Unavailable codes mean the system will be right back.
- // (e.g., can't connect, lost leader)
- return ev.Code() == codes.Unavailable
+ ev, ok := status.FromError(err)
+ if ok {
+ // Unavailable codes mean the system will be right back.
+ // (e.g., can't connect, lost leader)
+ return ev.Code() == codes.Unavailable
+ }
+ return false
}
func toErr(ctx context.Context, err error) error {
@@ -631,9 +628,6 @@
if ctx.Err() != nil {
err = ctx.Err()
}
- case codes.Unavailable:
- case codes.FailedPrecondition:
- err = grpc.ErrClientConnClosing
}
}
return err
@@ -653,16 +647,19 @@
if err == nil {
return false
}
- // >= gRPC v1.10.x
+
+ // >= gRPC v1.23.x
s, ok := status.FromError(err)
if ok {
// connection is canceled or server has already closed the connection
return s.Code() == codes.Canceled || s.Message() == "transport is closing"
}
+
// >= gRPC v1.10.x
if err == context.Canceled {
return true
}
+
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
return strings.Contains(err.Error(), "grpc: the client connection is closing")
}