VOL-1845 : Support for delete device in openolt adapter
This commit is for the handling of delete device.
The changes are done to handle the request for delete
device. This includes the clearing of all data related
to the device in KV store and reboot of device to reset
the device.
This commit has dependency in voltha-go so that needs to
be merged first. Please refer this review link
https://gerrit.opencord.org/#/c/15084/
Updated to dep ensure above voltha-go patch set. Also typo
and make lint/sca fixes.
Change-Id: I53f16022c6902d498dad30e9b7d0ff50bf156347
diff --git a/vendor/go.etcd.io/etcd/clientv3/client.go b/vendor/go.etcd.io/etcd/clientv3/client.go
index b91cbf9..d6000a8 100644
--- a/vendor/go.etcd.io/etcd/clientv3/client.go
+++ b/vendor/go.etcd.io/etcd/clientv3/client.go
@@ -16,7 +16,6 @@
import (
"context"
- "crypto/tls"
"errors"
"fmt"
"net"
@@ -30,12 +29,13 @@
"go.etcd.io/etcd/clientv3/balancer"
"go.etcd.io/etcd/clientv3/balancer/picker"
"go.etcd.io/etcd/clientv3/balancer/resolver/endpoint"
+ "go.etcd.io/etcd/clientv3/credentials"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
+ grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
@@ -51,12 +51,17 @@
func init() {
lg := zap.NewNop()
if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
+ lcfg := logutil.DefaultZapLoggerConfig
+ lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
+
var err error
- lg, err = zap.NewProductionConfig().Build() // info level logging
+ lg, err = lcfg.Build() // info level logging
if err != nil {
panic(err)
}
}
+
+ // TODO: support custom balancer
balancer.RegisterBuilder(balancer.Config{
Policy: picker.RoundrobinBalanced,
Name: roundRobinBalancerName,
@@ -76,7 +81,7 @@
conn *grpc.ClientConn
cfg Config
- creds *credentials.TransportCredentials
+ creds grpccredentials.TransportCredentials
resolverGroup *endpoint.ResolverGroup
mu *sync.RWMutex
@@ -86,9 +91,8 @@
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
- Password string
- // tokenCred is an instance of WithPerRPCCredentials()'s argument
- tokenCred *authTokenCredential
+ Password string
+ authTokenBundle credentials.Bundle
callOpts []grpc.CallOption
@@ -125,8 +129,12 @@
// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
- c.Watcher.Close()
- c.Lease.Close()
+ if c.Watcher != nil {
+ c.Watcher.Close()
+ }
+ if c.Lease != nil {
+ c.Lease.Close()
+ }
if c.resolverGroup != nil {
c.resolverGroup.Close()
}
@@ -193,24 +201,7 @@
}
}
-type authTokenCredential struct {
- token string
- tokenMu *sync.RWMutex
-}
-
-func (cred authTokenCredential) RequireTransportSecurity() bool {
- return false
-}
-
-func (cred authTokenCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
- cred.tokenMu.RLock()
- defer cred.tokenMu.RUnlock()
- return map[string]string{
- rpctypes.TokenFieldNameGRPC: cred.token,
- }, nil
-}
-
-func (c *Client) processCreds(scheme string) (creds *credentials.TransportCredentials) {
+func (c *Client) processCreds(scheme string) (creds grpccredentials.TransportCredentials) {
creds = c.creds
switch scheme {
case "unix":
@@ -220,9 +211,7 @@
if creds != nil {
break
}
- tlsconfig := &tls.Config{}
- emptyCreds := credentials.NewTLS(tlsconfig)
- creds = &emptyCreds
+ creds = credentials.NewBundle(credentials.Config{}).TransportCredentials()
default:
creds = nil
}
@@ -230,7 +219,7 @@
}
// dialSetupOpts gives the dial opts prior to any authentication.
-func (c *Client) dialSetupOpts(creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
+func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
if c.cfg.DialKeepAliveTime > 0 {
params := keepalive.ClientParameters{
Time: c.cfg.DialKeepAliveTime,
@@ -255,7 +244,7 @@
opts = append(opts, grpc.WithDialer(f))
if creds != nil {
- opts = append(opts, grpc.WithTransportCredentials(*creds))
+ opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
@@ -289,8 +278,8 @@
var err error // return last error in a case of fail
var auth *authenticator
- for i := 0; i < len(c.cfg.Endpoints); i++ {
- ep := c.cfg.Endpoints[i]
+ eps := c.Endpoints()
+ for _, ep := range eps {
// use dial options without dopts to avoid reusing the client balancer
var dOpts []grpc.DialOption
_, host, _ := endpoint.ParseEndpoint(ep)
@@ -318,10 +307,7 @@
continue
}
- c.tokenCred.tokenMu.Lock()
- c.tokenCred.token = resp.Token
- c.tokenCred.tokenMu.Unlock()
-
+ c.authTokenBundle.UpdateAuthToken(resp.Token)
return nil
}
@@ -338,16 +324,14 @@
}
// dial configures and dials any grpc balancer target.
-func (c *Client) dial(target string, creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
+func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts, err := c.dialSetupOpts(creds, dopts...)
if err != nil {
return nil, fmt.Errorf("failed to configure dialer: %v", err)
}
if c.Username != "" && c.Password != "" {
- c.tokenCred = &authTokenCredential{
- tokenMu: &sync.RWMutex{},
- }
+ c.authTokenBundle = credentials.NewBundle(credentials.Config{})
ctx, cancel := c.ctx, func() {}
if c.cfg.DialTimeout > 0 {
@@ -364,7 +348,7 @@
return nil, err
}
} else {
- opts = append(opts, grpc.WithPerRPCCredentials(c.tokenCred))
+ opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
}
cancel()
}
@@ -385,26 +369,25 @@
return conn, nil
}
-func (c *Client) directDialCreds(ep string) *credentials.TransportCredentials {
+func (c *Client) directDialCreds(ep string) grpccredentials.TransportCredentials {
_, hostPort, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
creds = c.processCreds(scheme)
if creds != nil {
- c := *creds
- clone := c.Clone()
+ clone := creds.Clone()
// Set the server name must to the endpoint hostname without port since grpc
// otherwise attempts to check if x509 cert is valid for the full endpoint
// including the scheme and port, which fails.
host, _ := endpoint.ParseHostPort(hostPort)
clone.OverrideServerName(host)
- creds = &clone
+ creds = clone
}
}
return creds
}
-func (c *Client) dialWithBalancerCreds(ep string) *credentials.TransportCredentials {
+func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials {
_, _, scheme := endpoint.ParseEndpoint(ep)
creds := c.creds
if len(scheme) != 0 {
@@ -424,10 +407,9 @@
if cfg == nil {
cfg = &Config{}
}
- var creds *credentials.TransportCredentials
+ var creds grpccredentials.TransportCredentials
if cfg.TLS != nil {
- c := credentials.NewTLS(cfg.TLS)
- creds = &c
+ creds = credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials()
}
// use a temporary skeleton client to bootstrap first connection
@@ -541,13 +523,17 @@
func (c *Client) checkVersion() (err error) {
var wg sync.WaitGroup
- errc := make(chan error, len(c.cfg.Endpoints))
+
+ eps := c.Endpoints()
+ errc := make(chan error, len(eps))
ctx, cancel := context.WithCancel(c.ctx)
if c.cfg.DialTimeout > 0 {
- ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
+ cancel()
+ ctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
}
- wg.Add(len(c.cfg.Endpoints))
- for _, ep := range c.cfg.Endpoints {
+
+ wg.Add(len(eps))
+ for _, ep := range eps {
// if cluster is current, any endpoint gives a recent version
go func(e string) {
defer wg.Done()
@@ -559,8 +545,15 @@
vs := strings.Split(resp.Version, ".")
maj, min := 0, 0
if len(vs) >= 2 {
- maj, _ = strconv.Atoi(vs[0])
- min, rerr = strconv.Atoi(vs[1])
+ var serr error
+ if maj, serr = strconv.Atoi(vs[0]); serr != nil {
+ errc <- serr
+ return
+ }
+ if min, serr = strconv.Atoi(vs[1]); serr != nil {
+ errc <- serr
+ return
+ }
}
if maj < 3 || (maj == 3 && min < 2) {
rerr = ErrOldCluster
@@ -569,7 +562,7 @@
}(ep)
}
// wait for success
- for i := 0; i < len(c.cfg.Endpoints); i++ {
+ for range eps {
if err = <-errc; err == nil {
break
}
@@ -609,10 +602,13 @@
if err == nil {
return false
}
- ev, _ := status.FromError(err)
- // Unavailable codes mean the system will be right back.
- // (e.g., can't connect, lost leader)
- return ev.Code() == codes.Unavailable
+ ev, ok := status.FromError(err)
+ if ok {
+ // Unavailable codes mean the system will be right back.
+ // (e.g., can't connect, lost leader)
+ return ev.Code() == codes.Unavailable
+ }
+ return false
}
func toErr(ctx context.Context, err error) error {
@@ -632,9 +628,6 @@
if ctx.Err() != nil {
err = ctx.Err()
}
- case codes.Unavailable:
- case codes.FailedPrecondition:
- err = grpc.ErrClientConnClosing
}
}
return err
@@ -654,16 +647,19 @@
if err == nil {
return false
}
- // >= gRPC v1.10.x
+
+ // >= gRPC v1.23.x
s, ok := status.FromError(err)
if ok {
// connection is canceled or server has already closed the connection
return s.Code() == codes.Canceled || s.Message() == "transport is closing"
}
+
// >= gRPC v1.10.x
if err == context.Canceled {
return true
}
+
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
return strings.Contains(err.Error(), "grpc: the client connection is closing")
}