[VOL-4293] OpenONU Adapter update for gRPC migration
Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/vendor/go.etcd.io/etcd/clientv3/client.go b/vendor/go.etcd.io/etcd/clientv3/client.go
index d6000a8..c49e4ba 100644
--- a/vendor/go.etcd.io/etcd/clientv3/client.go
+++ b/vendor/go.etcd.io/etcd/clientv3/client.go
@@ -25,19 +25,19 @@
"sync"
"time"
+ "github.com/coreos/etcd/clientv3/balancer"
+ "github.com/coreos/etcd/clientv3/balancer/picker"
+ "github.com/coreos/etcd/clientv3/balancer/resolver/endpoint"
+ "github.com/coreos/etcd/clientv3/credentials"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ "github.com/coreos/etcd/pkg/logutil"
+ "github.com/coreos/pkg/capnslog"
"github.com/google/uuid"
- "go.etcd.io/etcd/clientv3/balancer"
- "go.etcd.io/etcd/clientv3/balancer/picker"
- "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
- "go.etcd.io/etcd/clientv3/credentials"
- "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- "go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@@ -48,6 +48,10 @@
roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String())
)
+var (
+ plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "clientv3")
+)
+
func init() {
lg := zap.NewNop()
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
@@ -129,12 +133,8 @@
// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
- if c.Watcher != nil {
- c.Watcher.Close()
- }
- if c.Lease != nil {
- c.Lease.Close()
- }
+ c.Watcher.Close()
+ c.Lease.Close()
if c.resolverGroup != nil {
c.resolverGroup.Close()
}
@@ -230,24 +230,17 @@
}
opts = append(opts, dopts...)
- // Provide a net dialer that supports cancelation and timeout.
- f := func(dialEp string, t time.Duration) (net.Conn, error) {
- proto, host, _ := endpoint.ParseEndpoint(dialEp)
- select {
- case <-c.ctx.Done():
- return nil, c.ctx.Err()
- default:
- }
- dialer := &net.Dialer{Timeout: t}
- return dialer.DialContext(c.ctx, proto, host)
- }
- opts = append(opts, grpc.WithDialer(f))
-
+ dialer := endpoint.Dialer
if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(creds))
+ // gRPC load balancer workaround. See credentials.transportCredential for details.
+ if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok {
+ dialer = credsDialer.Dialer
+ }
} else {
opts = append(opts, grpc.WithInsecure())
}
+ opts = append(opts, grpc.WithContextDialer(dialer))
// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
@@ -266,7 +259,10 @@
// Dial connects to a single endpoint using the client's config.
func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
- creds := c.directDialCreds(ep)
+ creds, err := c.directDialCreds(ep)
+ if err != nil {
+ return nil, err
+ }
// Use the grpc passthrough resolver to directly dial a single endpoint.
// This resolver passes through the 'unix' and 'unixs' endpoints schemes used
// by etcd without modification, allowing us to directly dial endpoints and
@@ -369,8 +365,8 @@
return conn, nil
}
-func (c *Client) directDialCreds(ep string) grpccredentials.TransportCredentials {
- _, hostPort, scheme := endpoint.ParseEndpoint(ep)
+func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredentials, error) {
+ _, host, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
@@ -379,12 +375,17 @@
// Set the server name must to the endpoint hostname without port since grpc
// otherwise attempts to check if x509 cert is valid for the full endpoint
// including the scheme and port, which fails.
- host, _ := endpoint.ParseHostPort(hostPort)
- clone.OverrideServerName(host)
+ overrideServerName, _, err := net.SplitHostPort(host)
+ if err != nil {
+ // Either the host didn't have a port or the host could not be parsed. Either way, continue with the
+ // original host string.
+ overrideServerName = host
+ }
+ clone.OverrideServerName(overrideServerName)
creds = clone
}
}
- return creds
+ return creds, nil
}
func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials {
@@ -396,13 +397,6 @@
return creds
}
-// WithRequireLeader requires client requests to only succeed
-// when the cluster has a leader.
-func WithRequireLeader(ctx context.Context) context.Context {
- md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
- return metadata.NewOutgoingContext(ctx, md)
-}
-
func newClient(cfg *Config) (*Client, error) {
if cfg == nil {
cfg = &Config{}
@@ -663,3 +657,9 @@
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
return strings.Contains(err.Error(), "grpc: the client connection is closing")
}
+
+// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details.
+type TransportCredentialsWithDialer interface {
+ grpccredentials.TransportCredentials
+ Dialer(ctx context.Context, dialEp string) (net.Conn, error)
+}