VOL-2009[RO Core doesn't retry KV store connection on startup]

Change-Id: I01ed30d41d968f1bf9e052014eae420973d85266
diff --git a/.gitignore b/.gitignore
index 8ae7909..5a1a698 100644
--- a/.gitignore
+++ b/.gitignore
@@ -56,3 +56,6 @@
 # test output
 tests/results
 sca-report
+
+# etcd mocks
+ro_core/core/voltha.lib.mocks.etcd
diff --git a/go.mod b/go.mod
index b716f65..d504b31 100644
--- a/go.mod
+++ b/go.mod
@@ -8,7 +8,7 @@
 	github.com/golang/protobuf v1.3.2
 	github.com/google/uuid v1.1.1
 	github.com/gyuho/goraph v0.0.0-20160328020532-d460590d53a9
-	github.com/opencord/voltha-lib-go/v2 v2.2.16
+	github.com/opencord/voltha-lib-go/v2 v2.2.17
 	github.com/opencord/voltha-protos/v2 v2.0.1
 	github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
 	github.com/stretchr/testify v1.4.0
diff --git a/go.sum b/go.sum
index e295761..cc98671 100644
--- a/go.sum
+++ b/go.sum
@@ -192,8 +192,8 @@
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
 github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v2 v2.2.16 h1:7Uzlt+uVYx6Lo4W/ZX89WQud8IvQJvcW7yOnCKvs+AY=
-github.com/opencord/voltha-lib-go/v2 v2.2.16/go.mod h1:Hql0xWiBFqYM6WpE5G+w9//NdaIoR9mVzcvVYDxEnZY=
+github.com/opencord/voltha-lib-go/v2 v2.2.17 h1:if2mGx376oUO8+wFI7BZ7KMLElewoeSBj0zWi7Xl/Fk=
+github.com/opencord/voltha-lib-go/v2 v2.2.17/go.mod h1:1NOSy3uX2DcAIJyZZXkbjCwokwliEQJPu0zF3Jo5OEA=
 github.com/opencord/voltha-protos/v2 v2.0.1 h1:vcE0XxNVeu0SED0bW2lf2w24k/QMFrFqMexuedIyTEg=
 github.com/opencord/voltha-protos/v2 v2.0.1/go.mod h1:6kOcfYi1CadWowFxI2SH5wLfHrsRECZLZlD2MFK6WDI=
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
diff --git a/ro_core/config/config.go b/ro_core/config/config.go
index eea6080..03dedf2 100644
--- a/ro_core/config/config.go
+++ b/ro_core/config/config.go
@@ -25,55 +25,61 @@
 
 // RO Core service default constants
 const (
-	ConsulStoreName               = "consul"
-	EtcdStoreName                 = "etcd"
-	default_InstanceID            = "rocore001"
-	default_GrpcPort              = 50057
-	default_GrpcHost              = ""
-	default_KVStoreType           = EtcdStoreName
-	default_KVStoreTimeout        = 5 //in seconds
-	default_KVStoreHost           = "127.0.0.1"
-	default_KVStorePort           = 2379 // Consul = 8500; Etcd = 2379
-	default_KVTxnKeyDelTime       = 60
-	default_LogLevel              = 0
-	default_Banner                = false
-	default_DisplayVersionOnly    = false
-	default_CoreTopic             = "rocore"
-	default_ROCoreEndpoint        = "rocore"
-	default_ROCoreKey             = "pki/voltha.key"
-	default_ROCoreCert            = "pki/voltha.crt"
-	default_ROCoreCA              = "pki/voltha-CA.pem"
-	default_Affinity_Router_Topic = "affinityRouter"
-	default_ProbeHost             = ""
-	default_ProbePort             = 8080
-	default_LiveProbeInterval     = 60 * time.Second
-	default_NotLiveProbeInterval  = 5 * time.Second // Probe more frequently to detect Recovery early
+	ConsulStoreName                 = "consul"
+	EtcdStoreName                   = "etcd"
+	default_InstanceID              = "rocore001"
+	default_GrpcPort                = 50057
+	default_GrpcHost                = ""
+	default_KVStoreType             = EtcdStoreName
+	default_KVStoreTimeout          = 5 //in seconds
+	default_KVStoreHost             = "127.0.0.1"
+	default_KVStorePort             = 2379 // Consul = 8500; Etcd = 2379
+	default_KVTxnKeyDelTime         = 60
+	default_LogLevel                = 0
+	default_Banner                  = false
+	default_DisplayVersionOnly      = false
+	default_CoreTopic               = "rocore"
+	default_ROCoreEndpoint          = "rocore"
+	default_ROCoreKey               = "pki/voltha.key"
+	default_ROCoreCert              = "pki/voltha.crt"
+	default_ROCoreCA                = "pki/voltha-CA.pem"
+	default_Affinity_Router_Topic   = "affinityRouter"
+	default_ProbeHost               = ""
+	default_ProbePort               = 8080
+	default_LiveProbeInterval       = 60 * time.Second
+	default_NotLiveProbeInterval    = 5 * time.Second // Probe more frequently to detect Recovery early
+	default_CoreTimeout             = 59 * time.Second
+	default_MaxConnectionRetries    = -1              // retries forever
+	default_ConnectionRetryInterval = 2 * time.Second // in seconds
 )
 
 // ROCoreFlags represents the set of configurations used by the read-only core service
 type ROCoreFlags struct {
 	// Command line parameters
-	InstanceID           string
-	ROCoreEndpoint       string
-	GrpcHost             string
-	GrpcPort             int
-	KVStoreType          string
-	KVStoreTimeout       int // in seconds
-	KVStoreHost          string
-	KVStorePort          int
-	KVTxnKeyDelTime      int
-	CoreTopic            string
-	LogLevel             int
-	Banner               bool
-	DisplayVersionOnly   bool
-	ROCoreKey            string
-	ROCoreCert           string
-	ROCoreCA             string
-	AffinityRouterTopic  string
-	ProbeHost            string
-	ProbePort            int
-	LiveProbeInterval    time.Duration
-	NotLiveProbeInterval time.Duration
+	InstanceID              string
+	ROCoreEndpoint          string
+	GrpcHost                string
+	GrpcPort                int
+	KVStoreType             string
+	KVStoreTimeout          int // in seconds
+	KVStoreHost             string
+	KVStorePort             int
+	KVTxnKeyDelTime         int
+	CoreTopic               string
+	LogLevel                int
+	Banner                  bool
+	DisplayVersionOnly      bool
+	ROCoreKey               string
+	ROCoreCert              string
+	ROCoreCA                string
+	AffinityRouterTopic     string
+	ProbeHost               string
+	ProbePort               int
+	LiveProbeInterval       time.Duration
+	NotLiveProbeInterval    time.Duration
+	CoreTimeout             time.Duration
+	MaxConnectionRetries    int
+	ConnectionRetryInterval time.Duration
 }
 
 func init() {
@@ -83,27 +89,30 @@
 // NewROCoreFlags returns a new ROCore config
 func NewROCoreFlags() *ROCoreFlags {
 	var roCoreFlag = ROCoreFlags{ // Default values
-		InstanceID:           default_InstanceID,
-		ROCoreEndpoint:       default_ROCoreEndpoint,
-		GrpcHost:             default_GrpcHost,
-		GrpcPort:             default_GrpcPort,
-		KVStoreType:          default_KVStoreType,
-		KVStoreTimeout:       default_KVStoreTimeout,
-		KVStoreHost:          default_KVStoreHost,
-		KVStorePort:          default_KVStorePort,
-		KVTxnKeyDelTime:      default_KVTxnKeyDelTime,
-		CoreTopic:            default_CoreTopic,
-		LogLevel:             default_LogLevel,
-		Banner:               default_Banner,
-		DisplayVersionOnly:   default_DisplayVersionOnly,
-		ROCoreKey:            default_ROCoreKey,
-		ROCoreCert:           default_ROCoreCert,
-		ROCoreCA:             default_ROCoreCA,
-		AffinityRouterTopic:  default_Affinity_Router_Topic,
-		ProbeHost:            default_ProbeHost,
-		ProbePort:            default_ProbePort,
-		LiveProbeInterval:    default_LiveProbeInterval,
-		NotLiveProbeInterval: default_NotLiveProbeInterval,
+		InstanceID:              default_InstanceID,
+		ROCoreEndpoint:          default_ROCoreEndpoint,
+		GrpcHost:                default_GrpcHost,
+		GrpcPort:                default_GrpcPort,
+		KVStoreType:             default_KVStoreType,
+		KVStoreTimeout:          default_KVStoreTimeout,
+		KVStoreHost:             default_KVStoreHost,
+		KVStorePort:             default_KVStorePort,
+		KVTxnKeyDelTime:         default_KVTxnKeyDelTime,
+		CoreTopic:               default_CoreTopic,
+		LogLevel:                default_LogLevel,
+		Banner:                  default_Banner,
+		DisplayVersionOnly:      default_DisplayVersionOnly,
+		ROCoreKey:               default_ROCoreKey,
+		ROCoreCert:              default_ROCoreCert,
+		ROCoreCA:                default_ROCoreCA,
+		AffinityRouterTopic:     default_Affinity_Router_Topic,
+		ProbeHost:               default_ProbeHost,
+		ProbePort:               default_ProbePort,
+		LiveProbeInterval:       default_LiveProbeInterval,
+		NotLiveProbeInterval:    default_NotLiveProbeInterval,
+		CoreTimeout:             default_CoreTimeout,
+		MaxConnectionRetries:    default_MaxConnectionRetries,
+		ConnectionRetryInterval: default_ConnectionRetryInterval,
 	}
 	return &roCoreFlag
 }
@@ -164,6 +173,15 @@
 	help = fmt.Sprintf("Time interval between liveness probes while in a not live state")
 	flag.DurationVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", default_NotLiveProbeInterval, help)
 
+	help = fmt.Sprintf("The maximum time the core will wait while attempting to connect to a dependent component duration")
+	flag.DurationVar(&(cf.CoreTimeout), "core_timeout", default_CoreTimeout, help)
+
+	help = fmt.Sprintf("The number of retries to connect to a dependent component")
+	flag.IntVar(&(cf.MaxConnectionRetries), "max_connection_retries", default_MaxConnectionRetries, help)
+
+	help = fmt.Sprintf("The duration between each connection retry attempt ")
+	flag.DurationVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
+
 	flag.Parse()
 
 	containerName := getContainerInfo()
diff --git a/ro_core/core/core.go b/ro_core/core/core.go
index d022266..797c05a 100644
--- a/ro_core/core/core.go
+++ b/ro_core/core/core.go
@@ -26,6 +26,8 @@
 	"github.com/opencord/voltha-lib-go/v2/pkg/probe"
 	"github.com/opencord/voltha-protos/v2/go/voltha"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 	"time"
 )
 
@@ -80,8 +82,48 @@
 	return &core
 }
 
+// waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
+func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval time.Duration) error {
+	log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
+		"port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
+
+	// Get timeout in seconds with 1 second set as minimum
+	timeout := int(core.config.CoreTimeout.Seconds())
+	if timeout < 1 {
+		timeout = 1
+	}
+	count := 0
+	for {
+		if !core.kvClient.IsConnectionUp(timeout) {
+			log.Info("KV-store-unreachable")
+			if maxRetries != -1 {
+				if count >= maxRetries {
+					return status.Error(codes.Unavailable, "kv store unreachable")
+				}
+			}
+			count += 1
+			//      Take a nap before retrying
+			time.Sleep(retryInterval)
+			log.Infow("retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+
+		} else {
+			break
+		}
+	}
+	log.Info("KV-store-reachable")
+	return nil
+}
+
 func (core *Core) Start(ctx context.Context) {
 	log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
+
+	// Wait until connection to KV Store is up
+	if err := core.waitUntilKVStoreReachableOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
+		log.Fatal("Unable-to-connect-to-KV-store")
+	}
+
+	probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+
 	core.genericMgr = newModelProxyManager(core.clusterDataProxy)
 	core.deviceMgr = newDeviceManager(core.clusterDataProxy, core.instanceId)
 	core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.clusterDataProxy)
diff --git a/ro_core/core/core_test.go b/ro_core/core/core_test.go
index 948e63b..88bd561 100644
--- a/ro_core/core/core_test.go
+++ b/ro_core/core/core_test.go
@@ -18,14 +18,15 @@
 import (
 	"context"
 	"errors"
+	"fmt"
 	"github.com/opencord/voltha-go/ro_core/config"
 	"github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
 	grpcserver "github.com/opencord/voltha-lib-go/v2/pkg/grpc"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	"github.com/opencord/voltha-lib-go/v2/pkg/mocks"
 	ic "github.com/opencord/voltha-protos/v2/go/inter_container"
 	"github.com/phayes/freeport"
 	"github.com/stretchr/testify/assert"
-	"strconv"
 	"testing"
 )
 
@@ -63,19 +64,31 @@
 
 func MakeTestNewCore() (*config.ROCoreFlags, *roCore) {
 
-	freePort, errP := freeport.GetFreePort()
-	if errP == nil {
-		freePortStr := strconv.Itoa(freePort)
+	clientPort, err := freeport.GetFreePort()
+	if err == nil {
+		peerPort, err := freeport.GetFreePort()
+		if err != nil {
+			log.Fatal(err)
+		}
+		etcdServer := mocks.StartEtcdServer(mocks.MKConfig("voltha.mock.test", clientPort, peerPort, "voltha.lib.mocks.etcd", "error"))
+		if etcdServer == nil {
+			log.Fatal("Embedded server failed to start")
+		}
+		clientAddr := fmt.Sprintf("localhost:%d", clientPort)
 
 		roCoreFlgs := config.NewROCoreFlags()
 		roC := newROCore(roCoreFlgs)
 		if (roC != nil) && (roCoreFlgs != nil) {
-			addr := "127.0.0.1" + ":" + freePortStr
-			cli, err := newKVClient("etcd", addr, 5)
+			cli, err := newKVClient("etcd", clientAddr, 5)
 			if err == nil {
 				roC.kvClient = cli
 				return roCoreFlgs, roC
 			}
+			if err != nil {
+				etcdServer.Stop()
+				log.Fatal("Failed to create an Etcd client")
+			}
+
 		}
 	}
 	return nil, nil
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go
index 9b9dec4..3246ca0 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go
@@ -16,15 +16,17 @@
 package mocks
 
 import (
+	"fmt"
 	"go.etcd.io/etcd/embed"
 	"log"
+	"net/url"
 	"os"
 	"time"
 )
 
 const (
-	serverStartUpTimeout   = 10 * time.Second // Maximum time allowed to wait for the Etcd server to be ready
-	localPersistentStorage = "voltha.embed.etcd"
+	serverStartUpTimeout          = 10 * time.Second // Maximum time allowed to wait for the Etcd server to be ready
+	defaultLocalPersistentStorage = "voltha.test.embed.etcd"
 )
 
 //EtcdServer represents an embedded Etcd server.  It is used for testing only.
@@ -32,10 +34,57 @@
 	server *embed.Etcd
 }
 
+func islogLevelValid(logLevel string) bool {
+	valid := []string{"debug", "info", "warn", "error", "panic", "fatal"}
+	for _, l := range valid {
+		if l == logLevel {
+			return true
+		}
+	}
+	return false
+}
+
+/*
+* MKConfig creates an embedded Etcd config
+* :param configName: A name for this config
+* :param clientPort:  The port the etcd client will connect to (do not use 2379 for unit test)
+* :param peerPort:  The port the etcd server will listen for its peers (do not use 2380 for unit test)
+* :param localPersistentStorageDir: The name of a local directory which will hold the Etcd server data
+* :param logLevel: One of debug, info, warn, error, panic, or fatal. Default 'info'.
+ */
+func MKConfig(configName string, clientPort, peerPort int, localPersistentStorageDir string, logLevel string) *embed.Config {
+	cfg := embed.NewConfig()
+	cfg.Name = configName
+	cfg.Dir = localPersistentStorageDir
+	cfg.Logger = "zap"
+	if !islogLevelValid(logLevel) {
+		log.Fatalf("Invalid log level -%s", logLevel)
+	}
+	cfg.LogLevel = logLevel
+	acurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", clientPort))
+	if err != nil {
+		log.Fatalf("Invalid client port -%d", clientPort)
+	}
+	cfg.ACUrls = []url.URL{*acurl}
+	cfg.LCUrls = []url.URL{*acurl}
+
+	apurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", peerPort))
+	if err != nil {
+		log.Fatalf("Invalid peer port -%d", peerPort)
+	}
+	cfg.LPUrls = []url.URL{*apurl}
+	cfg.APUrls = []url.URL{*apurl}
+
+	cfg.ClusterState = embed.ClusterStateFlagNew
+	cfg.InitialCluster = cfg.Name + "=" + apurl.String()
+
+	return cfg
+}
+
 //getDefaultCfg specifies the default config
 func getDefaultCfg() *embed.Config {
 	cfg := embed.NewConfig()
-	cfg.Dir = localPersistentStorage
+	cfg.Dir = defaultLocalPersistentStorage
 	cfg.Logger = "zap"
 	cfg.LogLevel = "error"
 	return cfg
@@ -50,8 +99,8 @@
 	}
 	// Remove the local directory as
 	// a safeguard for the case where a prior test failed
-	if err := os.RemoveAll(localPersistentStorage); err != nil {
-		log.Fatalf("Failure removing local directory %s", localPersistentStorage)
+	if err := os.RemoveAll(cfg.Dir); err != nil {
+		log.Fatalf("Failure removing local directory %s", cfg.Dir)
 	}
 	e, err := embed.StartEtcd(cfg)
 	if err != nil {
@@ -75,9 +124,10 @@
 //Stop closes the embedded Etcd server and removes the local data directory as well
 func (es *EtcdServer) Stop() {
 	if es != nil {
+		storage := es.server.Config().Dir
 		es.server.Server.HardStop()
 		es.server.Close()
-		if err := os.RemoveAll(localPersistentStorage); err != nil {
+		if err := os.RemoveAll(storage); err != nil {
 			log.Fatalf("Failure removing local directory %s", es.server.Config().Dir)
 		}
 	}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 0dc27a7..594cd27 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -100,7 +100,7 @@
 github.com/modern-go/concurrent
 # github.com/modern-go/reflect2 v1.0.1
 github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v2 v2.2.16
+# github.com/opencord/voltha-lib-go/v2 v2.2.17
 github.com/opencord/voltha-lib-go/v2/pkg/log
 github.com/opencord/voltha-lib-go/v2/pkg/db
 github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore
@@ -245,10 +245,10 @@
 go.uber.org/zap/internal/exit
 # golang.org/x/crypto v0.0.0-20191001170739-f9e2070545dc
 golang.org/x/crypto/bcrypt
-golang.org/x/crypto/md4
 golang.org/x/crypto/blowfish
-golang.org/x/crypto/pbkdf2
+golang.org/x/crypto/md4
 golang.org/x/crypto/ssh/terminal
+golang.org/x/crypto/pbkdf2
 # golang.org/x/net v0.0.0-20190930134127-c5a3c61f89f3
 golang.org/x/net/trace
 golang.org/x/net/internal/timeseries