VOL-2009[RO Core doesn't retry KV store connection on startup]
Change-Id: I01ed30d41d968f1bf9e052014eae420973d85266
diff --git a/.gitignore b/.gitignore
index 8ae7909..5a1a698 100644
--- a/.gitignore
+++ b/.gitignore
@@ -56,3 +56,6 @@
# test output
tests/results
sca-report
+
+# etcd mocks
+ro_core/core/voltha.lib.mocks.etcd
diff --git a/go.mod b/go.mod
index b716f65..d504b31 100644
--- a/go.mod
+++ b/go.mod
@@ -8,7 +8,7 @@
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/gyuho/goraph v0.0.0-20160328020532-d460590d53a9
- github.com/opencord/voltha-lib-go/v2 v2.2.16
+ github.com/opencord/voltha-lib-go/v2 v2.2.17
github.com/opencord/voltha-protos/v2 v2.0.1
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/stretchr/testify v1.4.0
diff --git a/go.sum b/go.sum
index e295761..cc98671 100644
--- a/go.sum
+++ b/go.sum
@@ -192,8 +192,8 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v2 v2.2.16 h1:7Uzlt+uVYx6Lo4W/ZX89WQud8IvQJvcW7yOnCKvs+AY=
-github.com/opencord/voltha-lib-go/v2 v2.2.16/go.mod h1:Hql0xWiBFqYM6WpE5G+w9//NdaIoR9mVzcvVYDxEnZY=
+github.com/opencord/voltha-lib-go/v2 v2.2.17 h1:if2mGx376oUO8+wFI7BZ7KMLElewoeSBj0zWi7Xl/Fk=
+github.com/opencord/voltha-lib-go/v2 v2.2.17/go.mod h1:1NOSy3uX2DcAIJyZZXkbjCwokwliEQJPu0zF3Jo5OEA=
github.com/opencord/voltha-protos/v2 v2.0.1 h1:vcE0XxNVeu0SED0bW2lf2w24k/QMFrFqMexuedIyTEg=
github.com/opencord/voltha-protos/v2 v2.0.1/go.mod h1:6kOcfYi1CadWowFxI2SH5wLfHrsRECZLZlD2MFK6WDI=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
diff --git a/ro_core/config/config.go b/ro_core/config/config.go
index eea6080..03dedf2 100644
--- a/ro_core/config/config.go
+++ b/ro_core/config/config.go
@@ -25,55 +25,61 @@
// RO Core service default constants
const (
- ConsulStoreName = "consul"
- EtcdStoreName = "etcd"
- default_InstanceID = "rocore001"
- default_GrpcPort = 50057
- default_GrpcHost = ""
- default_KVStoreType = EtcdStoreName
- default_KVStoreTimeout = 5 //in seconds
- default_KVStoreHost = "127.0.0.1"
- default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
- default_KVTxnKeyDelTime = 60
- default_LogLevel = 0
- default_Banner = false
- default_DisplayVersionOnly = false
- default_CoreTopic = "rocore"
- default_ROCoreEndpoint = "rocore"
- default_ROCoreKey = "pki/voltha.key"
- default_ROCoreCert = "pki/voltha.crt"
- default_ROCoreCA = "pki/voltha-CA.pem"
- default_Affinity_Router_Topic = "affinityRouter"
- default_ProbeHost = ""
- default_ProbePort = 8080
- default_LiveProbeInterval = 60 * time.Second
- default_NotLiveProbeInterval = 5 * time.Second // Probe more frequently to detect Recovery early
+ ConsulStoreName = "consul"
+ EtcdStoreName = "etcd"
+ default_InstanceID = "rocore001"
+ default_GrpcPort = 50057
+ default_GrpcHost = ""
+ default_KVStoreType = EtcdStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "127.0.0.1"
+ default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ default_KVTxnKeyDelTime = 60
+ default_LogLevel = 0
+ default_Banner = false
+ default_DisplayVersionOnly = false
+ default_CoreTopic = "rocore"
+ default_ROCoreEndpoint = "rocore"
+ default_ROCoreKey = "pki/voltha.key"
+ default_ROCoreCert = "pki/voltha.crt"
+ default_ROCoreCA = "pki/voltha-CA.pem"
+ default_Affinity_Router_Topic = "affinityRouter"
+ default_ProbeHost = ""
+ default_ProbePort = 8080
+ default_LiveProbeInterval = 60 * time.Second
+ default_NotLiveProbeInterval = 5 * time.Second // Probe more frequently to detect Recovery early
+ default_CoreTimeout = 59 * time.Second
+ default_MaxConnectionRetries = -1 // retries forever
+ default_ConnectionRetryInterval = 2 * time.Second // in seconds
)
// ROCoreFlags represents the set of configurations used by the read-only core service
type ROCoreFlags struct {
// Command line parameters
- InstanceID string
- ROCoreEndpoint string
- GrpcHost string
- GrpcPort int
- KVStoreType string
- KVStoreTimeout int // in seconds
- KVStoreHost string
- KVStorePort int
- KVTxnKeyDelTime int
- CoreTopic string
- LogLevel int
- Banner bool
- DisplayVersionOnly bool
- ROCoreKey string
- ROCoreCert string
- ROCoreCA string
- AffinityRouterTopic string
- ProbeHost string
- ProbePort int
- LiveProbeInterval time.Duration
- NotLiveProbeInterval time.Duration
+ InstanceID string
+ ROCoreEndpoint string
+ GrpcHost string
+ GrpcPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ KVTxnKeyDelTime int
+ CoreTopic string
+ LogLevel int
+ Banner bool
+ DisplayVersionOnly bool
+ ROCoreKey string
+ ROCoreCert string
+ ROCoreCA string
+ AffinityRouterTopic string
+ ProbeHost string
+ ProbePort int
+ LiveProbeInterval time.Duration
+ NotLiveProbeInterval time.Duration
+ CoreTimeout time.Duration
+ MaxConnectionRetries int
+ ConnectionRetryInterval time.Duration
}
func init() {
@@ -83,27 +89,30 @@
// NewROCoreFlags returns a new ROCore config
func NewROCoreFlags() *ROCoreFlags {
var roCoreFlag = ROCoreFlags{ // Default values
- InstanceID: default_InstanceID,
- ROCoreEndpoint: default_ROCoreEndpoint,
- GrpcHost: default_GrpcHost,
- GrpcPort: default_GrpcPort,
- KVStoreType: default_KVStoreType,
- KVStoreTimeout: default_KVStoreTimeout,
- KVStoreHost: default_KVStoreHost,
- KVStorePort: default_KVStorePort,
- KVTxnKeyDelTime: default_KVTxnKeyDelTime,
- CoreTopic: default_CoreTopic,
- LogLevel: default_LogLevel,
- Banner: default_Banner,
- DisplayVersionOnly: default_DisplayVersionOnly,
- ROCoreKey: default_ROCoreKey,
- ROCoreCert: default_ROCoreCert,
- ROCoreCA: default_ROCoreCA,
- AffinityRouterTopic: default_Affinity_Router_Topic,
- ProbeHost: default_ProbeHost,
- ProbePort: default_ProbePort,
- LiveProbeInterval: default_LiveProbeInterval,
- NotLiveProbeInterval: default_NotLiveProbeInterval,
+ InstanceID: default_InstanceID,
+ ROCoreEndpoint: default_ROCoreEndpoint,
+ GrpcHost: default_GrpcHost,
+ GrpcPort: default_GrpcPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ KVTxnKeyDelTime: default_KVTxnKeyDelTime,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ Banner: default_Banner,
+ DisplayVersionOnly: default_DisplayVersionOnly,
+ ROCoreKey: default_ROCoreKey,
+ ROCoreCert: default_ROCoreCert,
+ ROCoreCA: default_ROCoreCA,
+ AffinityRouterTopic: default_Affinity_Router_Topic,
+ ProbeHost: default_ProbeHost,
+ ProbePort: default_ProbePort,
+ LiveProbeInterval: default_LiveProbeInterval,
+ NotLiveProbeInterval: default_NotLiveProbeInterval,
+ CoreTimeout: default_CoreTimeout,
+ MaxConnectionRetries: default_MaxConnectionRetries,
+ ConnectionRetryInterval: default_ConnectionRetryInterval,
}
return &roCoreFlag
}
@@ -164,6 +173,15 @@
help = fmt.Sprintf("Time interval between liveness probes while in a not live state")
flag.DurationVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", default_NotLiveProbeInterval, help)
+ help = fmt.Sprintf("The maximum time the core will wait while attempting to connect to a dependent component duration")
+ flag.DurationVar(&(cf.CoreTimeout), "core_timeout", default_CoreTimeout, help)
+
+ help = fmt.Sprintf("The number of retries to connect to a dependent component")
+ flag.IntVar(&(cf.MaxConnectionRetries), "max_connection_retries", default_MaxConnectionRetries, help)
+
+ help = fmt.Sprintf("The duration between each connection retry attempt ")
+ flag.DurationVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
+
flag.Parse()
containerName := getContainerInfo()
diff --git a/ro_core/core/core.go b/ro_core/core/core.go
index d022266..797c05a 100644
--- a/ro_core/core/core.go
+++ b/ro_core/core/core.go
@@ -26,6 +26,8 @@
"github.com/opencord/voltha-lib-go/v2/pkg/probe"
"github.com/opencord/voltha-protos/v2/go/voltha"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"time"
)
@@ -80,8 +82,48 @@
return &core
}
+// waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
+func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval time.Duration) error {
+ log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
+ "port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
+
+ // Get timeout in seconds with 1 second set as minimum
+ timeout := int(core.config.CoreTimeout.Seconds())
+ if timeout < 1 {
+ timeout = 1
+ }
+ count := 0
+ for {
+ if !core.kvClient.IsConnectionUp(timeout) {
+ log.Info("KV-store-unreachable")
+ if maxRetries != -1 {
+ if count >= maxRetries {
+ return status.Error(codes.Unavailable, "kv store unreachable")
+ }
+ }
+ count += 1
+ // Take a nap before retrying
+ time.Sleep(retryInterval)
+ log.Infow("retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+
+ } else {
+ break
+ }
+ }
+ log.Info("KV-store-reachable")
+ return nil
+}
+
func (core *Core) Start(ctx context.Context) {
log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
+
+ // Wait until connection to KV Store is up
+ if err := core.waitUntilKVStoreReachableOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
+ log.Fatal("Unable-to-connect-to-KV-store")
+ }
+
+ probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+
core.genericMgr = newModelProxyManager(core.clusterDataProxy)
core.deviceMgr = newDeviceManager(core.clusterDataProxy, core.instanceId)
core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.clusterDataProxy)
diff --git a/ro_core/core/core_test.go b/ro_core/core/core_test.go
index 948e63b..88bd561 100644
--- a/ro_core/core/core_test.go
+++ b/ro_core/core/core_test.go
@@ -18,14 +18,15 @@
import (
"context"
"errors"
+ "fmt"
"github.com/opencord/voltha-go/ro_core/config"
"github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
grpcserver "github.com/opencord/voltha-lib-go/v2/pkg/grpc"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/mocks"
ic "github.com/opencord/voltha-protos/v2/go/inter_container"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
- "strconv"
"testing"
)
@@ -63,19 +64,31 @@
func MakeTestNewCore() (*config.ROCoreFlags, *roCore) {
- freePort, errP := freeport.GetFreePort()
- if errP == nil {
- freePortStr := strconv.Itoa(freePort)
+ clientPort, err := freeport.GetFreePort()
+ if err == nil {
+ peerPort, err := freeport.GetFreePort()
+ if err != nil {
+ log.Fatal(err)
+ }
+ etcdServer := mocks.StartEtcdServer(mocks.MKConfig("voltha.mock.test", clientPort, peerPort, "voltha.lib.mocks.etcd", "error"))
+ if etcdServer == nil {
+ log.Fatal("Embedded server failed to start")
+ }
+ clientAddr := fmt.Sprintf("localhost:%d", clientPort)
roCoreFlgs := config.NewROCoreFlags()
roC := newROCore(roCoreFlgs)
if (roC != nil) && (roCoreFlgs != nil) {
- addr := "127.0.0.1" + ":" + freePortStr
- cli, err := newKVClient("etcd", addr, 5)
+ cli, err := newKVClient("etcd", clientAddr, 5)
if err == nil {
roC.kvClient = cli
return roCoreFlgs, roC
}
+ if err != nil {
+ etcdServer.Stop()
+ log.Fatal("Failed to create an Etcd client")
+ }
+
}
}
return nil, nil
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go
index 9b9dec4..3246ca0 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/mocks/etcd_server.go
@@ -16,15 +16,17 @@
package mocks
import (
+ "fmt"
"go.etcd.io/etcd/embed"
"log"
+ "net/url"
"os"
"time"
)
const (
- serverStartUpTimeout = 10 * time.Second // Maximum time allowed to wait for the Etcd server to be ready
- localPersistentStorage = "voltha.embed.etcd"
+ serverStartUpTimeout = 10 * time.Second // Maximum time allowed to wait for the Etcd server to be ready
+ defaultLocalPersistentStorage = "voltha.test.embed.etcd"
)
//EtcdServer represents an embedded Etcd server. It is used for testing only.
@@ -32,10 +34,57 @@
server *embed.Etcd
}
+func islogLevelValid(logLevel string) bool {
+ valid := []string{"debug", "info", "warn", "error", "panic", "fatal"}
+ for _, l := range valid {
+ if l == logLevel {
+ return true
+ }
+ }
+ return false
+}
+
+/*
+* MKConfig creates an embedded Etcd config
+* :param configName: A name for this config
+* :param clientPort: The port the etcd client will connect to (do not use 2379 for unit test)
+* :param peerPort: The port the etcd server will listen for its peers (do not use 2380 for unit test)
+* :param localPersistentStorageDir: The name of a local directory which will hold the Etcd server data
+* :param logLevel: One of debug, info, warn, error, panic, or fatal. Default 'info'.
+ */
+func MKConfig(configName string, clientPort, peerPort int, localPersistentStorageDir string, logLevel string) *embed.Config {
+ cfg := embed.NewConfig()
+ cfg.Name = configName
+ cfg.Dir = localPersistentStorageDir
+ cfg.Logger = "zap"
+ if !islogLevelValid(logLevel) {
+ log.Fatalf("Invalid log level -%s", logLevel)
+ }
+ cfg.LogLevel = logLevel
+ acurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", clientPort))
+ if err != nil {
+ log.Fatalf("Invalid client port -%d", clientPort)
+ }
+ cfg.ACUrls = []url.URL{*acurl}
+ cfg.LCUrls = []url.URL{*acurl}
+
+ apurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", peerPort))
+ if err != nil {
+ log.Fatalf("Invalid peer port -%d", peerPort)
+ }
+ cfg.LPUrls = []url.URL{*apurl}
+ cfg.APUrls = []url.URL{*apurl}
+
+ cfg.ClusterState = embed.ClusterStateFlagNew
+ cfg.InitialCluster = cfg.Name + "=" + apurl.String()
+
+ return cfg
+}
+
//getDefaultCfg specifies the default config
func getDefaultCfg() *embed.Config {
cfg := embed.NewConfig()
- cfg.Dir = localPersistentStorage
+ cfg.Dir = defaultLocalPersistentStorage
cfg.Logger = "zap"
cfg.LogLevel = "error"
return cfg
@@ -50,8 +99,8 @@
}
// Remove the local directory as
// a safeguard for the case where a prior test failed
- if err := os.RemoveAll(localPersistentStorage); err != nil {
- log.Fatalf("Failure removing local directory %s", localPersistentStorage)
+ if err := os.RemoveAll(cfg.Dir); err != nil {
+ log.Fatalf("Failure removing local directory %s", cfg.Dir)
}
e, err := embed.StartEtcd(cfg)
if err != nil {
@@ -75,9 +124,10 @@
//Stop closes the embedded Etcd server and removes the local data directory as well
func (es *EtcdServer) Stop() {
if es != nil {
+ storage := es.server.Config().Dir
es.server.Server.HardStop()
es.server.Close()
- if err := os.RemoveAll(localPersistentStorage); err != nil {
+ if err := os.RemoveAll(storage); err != nil {
log.Fatalf("Failure removing local directory %s", es.server.Config().Dir)
}
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 0dc27a7..594cd27 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -100,7 +100,7 @@
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v2 v2.2.16
+# github.com/opencord/voltha-lib-go/v2 v2.2.17
github.com/opencord/voltha-lib-go/v2/pkg/log
github.com/opencord/voltha-lib-go/v2/pkg/db
github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore
@@ -245,10 +245,10 @@
go.uber.org/zap/internal/exit
# golang.org/x/crypto v0.0.0-20191001170739-f9e2070545dc
golang.org/x/crypto/bcrypt
-golang.org/x/crypto/md4
golang.org/x/crypto/blowfish
-golang.org/x/crypto/pbkdf2
+golang.org/x/crypto/md4
golang.org/x/crypto/ssh/terminal
+golang.org/x/crypto/pbkdf2
# golang.org/x/net v0.0.0-20190930134127-c5a3c61f89f3
golang.org/x/net/trace
golang.org/x/net/internal/timeseries