VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 228148c..23f4bb2 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -44,7 +44,7 @@
 	grpcNBIAPIHandler *APIHandler
 	adapterMgr        *AdapterManager
 	config            *config.RWCoreFlags
-	kmp               *kafka.InterContainerProxy
+	kmp               kafka.InterContainerProxy
 	clusterDataRoot   model.Root
 	localDataRoot     model.Root
 	clusterDataProxy  *model.Proxy
@@ -120,12 +120,12 @@
 	}
 	var err error
 
-	core.clusterDataProxy, err = core.clusterDataRoot.CreateProxy(context.Background(), "/", false)
+	core.clusterDataProxy, err = core.clusterDataRoot.CreateProxy(ctx, "/", false)
 	if err != nil {
 		probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
 		return fmt.Errorf("Failed to create cluster data proxy")
 	}
-	core.localDataProxy, err = core.localDataRoot.CreateProxy(context.Background(), "/", false)
+	core.localDataProxy, err = core.localDataRoot.CreateProxy(ctx, "/", false)
 	if err != nil {
 		probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
 		return fmt.Errorf("Failed to create local data proxy")
@@ -133,9 +133,7 @@
 
 	// core.kmp must be created before deviceMgr and adapterMgr, as they will make
 	// private copies of the poiner to core.kmp.
-	if err := core.initKafkaManager(ctx); err != nil {
-		log.Fatal("Failed-to-init-kafka-manager")
-	}
+	core.initKafkaManager(ctx)
 
 	log.Debugw("values", log.Fields{"kmp": core.kmp})
 	core.deviceMgr = newDeviceManager(core)
@@ -221,32 +219,26 @@
 	 */
 	probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusRunning)
 	log.Info("grpc-server-started")
-	core.grpcServer.Start(context.Background())
+	core.grpcServer.Start(ctx)
 	probe.UpdateStatusFromContext(ctx, "grpc-service", probe.ServiceStatusStopped)
 }
 
 // Initialize the kafka manager, but we will start it later
-func (core *Core) initKafkaManager(ctx context.Context) error {
+func (core *Core) initKafkaManager(ctx context.Context) {
 	log.Infow("initialize-kafka-manager", log.Fields{"host": core.config.KafkaAdapterHost,
 		"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
 
 	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
 
 	// create the proxy
-	var err error
-	if core.kmp, err = kafka.NewInterContainerProxy(
+	core.kmp = kafka.NewInterContainerProxy(
 		kafka.InterContainerHost(core.config.KafkaAdapterHost),
 		kafka.InterContainerPort(core.config.KafkaAdapterPort),
 		kafka.MsgClient(core.kafkaClient),
 		kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic}),
-		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: core.config.AffinityRouterTopic})); err != nil {
-		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
-		return err
-	}
+		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: core.config.AffinityRouterTopic}))
 
 	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
-
-	return nil
 }
 
 /*
@@ -364,14 +356,9 @@
 func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval time.Duration) error {
 	log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
 		"port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
-	// Get timeout in seconds with 1 second set as minimum
-	timeout := int(core.config.DefaultCoreTimeout / 1000)
-	if timeout < 1 {
-		timeout = 1
-	}
 	count := 0
 	for {
-		if !core.kvClient.IsConnectionUp(timeout) {
+		if !core.kvClient.IsConnectionUp(ctx) {
 			log.Info("KV-store-unreachable")
 			if maxRetries != -1 {
 				if count >= maxRetries {
@@ -495,7 +482,7 @@
 			// The Liveness check will push Live state to same channel which this routine is
 			// reading and processing. This, do it asynchronously to avoid blocking for
 			// backend response and avoid any possibility of deadlock
-			go core.backend.PerformLivenessCheck(core.config.KVStoreTimeout)
+			go core.backend.PerformLivenessCheck(ctx)
 		}
 	}
 }