[VOL-1862] rwCore waits for kafka and KV Store

This commit is a cherry pick into the master branch from the
voltha 2.1 branch of patch https://gerrit.opencord.org/#/c/15030/

Change-Id: I8a306c8b37ad700ef8234466919e0604e14787cd
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index 937eefe..67c9219 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -77,7 +77,7 @@
 	return evnt
 }
 
-// Client represents the set of APIs  a KV Client must implement
+// Client represents the set of APIs a KV Client must implement
 type Client interface {
 	List(key string, timeout int, lock ...bool) (map[string]*KVPair, error)
 	Get(key string, timeout int, lock ...bool) (*KVPair, error)
@@ -90,6 +90,7 @@
 	Watch(key string) chan *Event
 	AcquireLock(lockName string, timeout int) error
 	ReleaseLock(lockName string) error
+	IsConnectionUp(timeout int) bool // timeout in second
 	CloseWatch(key string, ch chan *Event)
 	Close()
 }
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index 4b25b5f..c4fa0af 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -63,6 +63,12 @@
 	return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
 }
 
+// IsConnectionUp returns whether the connection to the Consul KV store is up
+func (c *ConsulClient) IsConnectionUp(timeout int) bool {
+	log.Error("Unimplemented function")
+	return false
+}
+
 // List returns an array of key-value pairs with key as a prefix.  Timeout defines how long the function will
 // wait for a response
 func (c *ConsulClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 7f6940a..f19f365 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -59,6 +59,16 @@
 		lockToSessionMap: lockSessionMap}, nil
 }
 
+// IsConnectionUp returns whether the connection to the Etcd KV store is up.  If a timeout occurs then
+// it is assumed the connection is down or unreachable.
+func (c *EtcdClient) IsConnectionUp(timeout int) bool {
+	// Let's try to get a non existent key.  If the connection is up then there will be no error returned.
+	if _, err := c.Get("non-existent-key", timeout); err != nil {
+		return false
+	}
+	return true
+}
+
 // List returns an array of key-value pairs with key as a prefix.  Timeout defines how long the function will
 // wait for a response
 func (c *EtcdClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
diff --git a/db/model/node.go b/db/model/node.go
index fcd3b5f..c9815fa 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -457,7 +457,7 @@
 	n.mutex.Lock()
 	defer n.mutex.Unlock()
 
-	log.Debugw("node-update-request", log.Fields{"path": path, "strict": strict, "txid": txid, "makeBranch": makeBranch})
+	log.Debugw("node-update-request", log.Fields{"path": path, "strict": strict, "txid": txid})
 
 	for strings.HasPrefix(path, "/") {
 		path = path[1:]
@@ -626,7 +626,7 @@
 	n.mutex.Lock()
 	defer n.mutex.Unlock()
 
-	log.Debugw("node-add-request", log.Fields{"path": path, "txid": txid, "makeBranch": makeBranch})
+	log.Debugw("node-add-request", log.Fields{"path": path, "txid": txid})
 
 	for strings.HasPrefix(path, "/") {
 		path = path[1:]
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 0576da9..8037002 100755
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -228,6 +228,13 @@
 
 	var err error
 
+	// Add a cleanup in case of failure to startup
+	defer func() {
+		if err != nil {
+			sc.Stop()
+		}
+	}()
+
 	// Create the Cluster Admin
 	if err = sc.createClusterAdmin(); err != nil {
 		log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 41d96f4..133b1a4 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -52,6 +52,8 @@
 	default_CoreTimeout               = int64(500)
 	default_CoreBindingKey            = "voltha_backend_name"
 	default_CorePairTopic             = "rwcore_1"
+	default_MaxConnectionRetries      = -1 // retries forever
+	default_ConnectionRetryInterval   = 2  // in seconds
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
@@ -84,6 +86,8 @@
 	DefaultCoreTimeout        int64
 	CoreBindingKey            string
 	CorePairTopic             string
+	MaxConnectionRetries      int
+	ConnectionRetryInterval   int
 }
 
 func init() {
@@ -120,6 +124,8 @@
 		DefaultCoreTimeout:        default_CoreTimeout,
 		CoreBindingKey:            default_CoreBindingKey,
 		CorePairTopic:             default_CorePairTopic,
+		MaxConnectionRetries:      default_MaxConnectionRetries,
+		ConnectionRetryInterval:   default_ConnectionRetryInterval,
 	}
 	return &rwCoreFlag
 }
@@ -201,5 +207,11 @@
 	help = fmt.Sprintf("Core pairing group topic")
 	flag.StringVar(&cf.CorePairTopic, "core_pair_topic", default_CorePairTopic, help)
 
+	help = fmt.Sprintf("The number of retries to connect to a dependent component")
+	flag.IntVar(&(cf.MaxConnectionRetries), "max_connection_retries", default_MaxConnectionRetries, help)
+
+	help = fmt.Sprintf("The number of seconds between each connection retry attempt ")
+	flag.IntVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
+
 	flag.Parse()
 }
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index fca8cce..4ddea05 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -25,6 +25,9 @@
 	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-protos/go/voltha"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"time"
 )
 
 type Core struct {
@@ -59,9 +62,6 @@
 	core.kafkaClient = kafkaClient
 
 	// Setup the KV store
-	// Do not call NewBackend constructor; it creates its own KV client
-	// Commented the backend for now until the issue between the model and the KV store
-	// is resolved.
 	backend := model.Backend{
 		Client:     kvClient,
 		StoreType:  cf.KVStoreType,
@@ -77,10 +77,17 @@
 }
 
 func (core *Core) Start(ctx context.Context) {
-	log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
-	if err := core.startKafkaMessagingProxy(ctx); err != nil {
+	log.Info("starting-core-services", log.Fields{"coreId": core.instanceId})
+
+	// Wait until connection to KV Store is up
+	if err := core.waitUntilKVStoreReachableOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
+		log.Fatal("Unable-to-connect-to-KV-store")
+	}
+
+	if err := core.waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
 		log.Fatal("Failure-starting-kafkaMessagingProxy")
 	}
+
 	log.Debugw("values", log.Fields{"kmp": core.kmp})
 	core.deviceMgr = newDeviceManager(core)
 	core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId, core.deviceMgr)
@@ -90,6 +97,7 @@
 	if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
 		log.Fatal("Failure-registering-adapterRequestHandler")
 	}
+
 	go core.startDeviceManager(ctx)
 	go core.startLogicalDeviceManager(ctx)
 	go core.startGRPCService(ctx)
@@ -99,7 +107,7 @@
 	core.deviceOwnership = NewDeviceOwnership(core.instanceId, core.kvClient, core.deviceMgr, core.logicalDeviceMgr,
 		"service/voltha/owns_device", 10)
 
-	log.Info("adaptercore-started")
+	log.Info("core-services-started")
 }
 
 func (core *Core) Stop(ctx context.Context) {
@@ -138,7 +146,7 @@
 	log.Info("grpc-server-started")
 }
 
-func (core *Core) startKafkaMessagingProxy(ctx context.Context) error {
+func (core *Core) waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
 	log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
 		"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
 	var err error
@@ -151,16 +159,58 @@
 		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
-
-	if err = core.kmp.Start(); err != nil {
-		log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
-		return err
+	count := 0
+	for {
+		if err = core.kmp.Start(); err != nil {
+			log.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+			if maxRetries != -1 {
+				if count >= maxRetries {
+					return err
+				}
+			}
+			count += 1
+			log.Infow("retry-starting-kafka-messaging-proxy", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+			//	Take a nap before retrying
+			time.Sleep(time.Duration(retryInterval) * time.Second)
+		} else {
+			break
+		}
 	}
-
 	log.Info("kafka-messaging-proxy-created")
 	return nil
 }
 
+// waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
+func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
+	log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
+		"port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
+	// Get timeout in seconds with 1 second set as minimum
+	timeout := int(core.config.DefaultCoreTimeout / 1000)
+	if timeout < 1 {
+		timeout = 1
+	}
+	count := 0
+	for {
+		if !core.kvClient.IsConnectionUp(timeout) {
+			log.Info("KV-store-unreachable")
+			if maxRetries != -1 {
+				if count >= maxRetries {
+					return status.Error(codes.Unavailable, "kv store unreachable")
+				}
+			}
+			count += 1
+			//	Take a nap before retrying
+			time.Sleep(time.Duration(retryInterval) * time.Second)
+			log.Infow("retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+
+		} else {
+			break
+		}
+	}
+	log.Info("KV-store-reachable")
+	return nil
+}
+
 func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
 	ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
 ) error {