VOL-2970 - Improved readability & traceability of startup code.

Changed Start() function to implement majority of the startup functionality, with less helpers.  Start() also defines local variables for each component created, to avoid accidentally using a component that isn't ready.
Also merged the rwCore into the Core.
Also changed Core to cancel a local context to on shutdown, and then wait for shutdown to complete.

Change-Id: I285e8486773476531e20ec352ff85a1b145432bf
diff --git a/rw_core/main.go b/rw_core/main.go
index 4d99fbb..6884993 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -18,141 +18,20 @@
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"os"
 	"os/signal"
-	"strconv"
 	"syscall"
 	"time"
 
 	"github.com/opencord/voltha-go/rw_core/config"
 	c "github.com/opencord/voltha-go/rw_core/core"
 	"github.com/opencord/voltha-go/rw_core/utils"
-	conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
-	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-lib-go/v3/pkg/probe"
 	"github.com/opencord/voltha-lib-go/v3/pkg/version"
-	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 )
 
-type rwCore struct {
-	kvClient    kvstore.Client
-	config      *config.RWCoreFlags
-	halted      bool
-	exitChannel chan int
-	//kmp         *kafka.KafkaMessagingProxy
-	kafkaClient kafka.Client
-	core        *c.Core
-	//For test
-	receiverChannels []<-chan *ic.InterContainerMessage
-}
-
-func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
-
-	logger.Infow("kv-store-type", log.Fields{"store": storeType})
-	switch storeType {
-	case "consul":
-		return kvstore.NewConsulClient(address, timeout)
-	case "etcd":
-		return kvstore.NewEtcdClient(address, timeout, log.FatalLevel)
-	}
-	return nil, errors.New("unsupported-kv-store")
-}
-
-func newKafkaClient(clientType string, host string, port int, instanceID string, livenessChannelInterval time.Duration) (kafka.Client, error) {
-
-	logger.Infow("kafka-client-type", log.Fields{"client": clientType})
-	switch clientType {
-	case "sarama":
-		return kafka.NewSaramaClient(
-			kafka.Host(host),
-			kafka.Port(port),
-			kafka.ConsumerType(kafka.GroupCustomer),
-			kafka.ProducerReturnOnErrors(true),
-			kafka.ProducerReturnOnSuccess(true),
-			kafka.ProducerMaxRetries(6),
-			kafka.NumPartitions(3),
-			kafka.ConsumerGroupName(instanceID),
-			kafka.ConsumerGroupPrefix(instanceID),
-			kafka.AutoCreateTopic(true),
-			kafka.ProducerFlushFrequency(5),
-			kafka.ProducerRetryBackoff(time.Millisecond*30),
-			kafka.LivenessChannelInterval(livenessChannelInterval),
-		), nil
-	}
-	return nil, errors.New("unsupported-client-type")
-}
-
-func newRWCore(cf *config.RWCoreFlags) *rwCore {
-	var rwCore rwCore
-	rwCore.config = cf
-	rwCore.halted = false
-	rwCore.exitChannel = make(chan int, 1)
-	rwCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
-	return &rwCore
-}
-
-func (rw *rwCore) start(ctx context.Context, instanceID string) {
-	logger.Info("Starting RW Core components")
-
-	// Setup KV Client
-	logger.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
-	var err error
-	if rw.kvClient, err = newKVClient(
-		rw.config.KVStoreType,
-		rw.config.KVStoreHost+":"+strconv.Itoa(rw.config.KVStorePort),
-		rw.config.KVStoreTimeout); err != nil {
-		logger.Fatal(err)
-	}
-	cm := conf.NewConfigManager(rw.kvClient, rw.config.KVStoreType, rw.config.KVStoreHost, rw.config.KVStorePort, rw.config.KVStoreTimeout)
-	go conf.StartLogLevelConfigProcessing(cm, ctx)
-
-	// Setup Kafka Client
-	if rw.kafkaClient, err = newKafkaClient("sarama",
-		rw.config.KafkaAdapterHost,
-		rw.config.KafkaAdapterPort,
-		instanceID,
-		rw.config.LiveProbeInterval/2); err != nil {
-		logger.Fatal("Unsupported-kafka-client")
-	}
-
-	// Create the core service
-	rw.core = c.NewCore(ctx, instanceID, rw.config, rw.kvClient, rw.kafkaClient)
-
-	// start the core
-	err = rw.core.Start(ctx)
-	if err != nil {
-		logger.Fatalf("failed-to-start-rwcore", log.Fields{"error": err})
-	}
-}
-
-func (rw *rwCore) stop(ctx context.Context) {
-	// Stop leadership tracking
-	rw.halted = true
-
-	// send exit signal
-	rw.exitChannel <- 0
-
-	// Cleanup - applies only if we had a kvClient
-	if rw.kvClient != nil {
-		// Release all reservations
-		if err := rw.kvClient.ReleaseAllReservations(ctx); err != nil {
-			logger.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
-		}
-		// Close the DB connection
-		rw.kvClient.Close()
-	}
-
-	rw.core.Stop(ctx)
-
-	//if rw.kafkaClient != nil {
-	//	rw.kafkaClient.Stop()
-	//}
-}
-
 func waitForExit() int {
 	signalChannel := make(chan os.Signal, 1)
 	signal.Notify(signalChannel,
@@ -161,35 +40,28 @@
 		syscall.SIGTERM,
 		syscall.SIGQUIT)
 
-	exitChannel := make(chan int)
-
-	go func() {
-		s := <-signalChannel
-		switch s {
-		case syscall.SIGHUP,
-			syscall.SIGINT,
-			syscall.SIGTERM,
-			syscall.SIGQUIT:
-			logger.Infow("closing-signal-received", log.Fields{"signal": s})
-			exitChannel <- 0
-		default:
-			logger.Infow("unexpected-signal-received", log.Fields{"signal": s})
-			exitChannel <- 1
-		}
-	}()
-
-	code := <-exitChannel
-	return code
+	s := <-signalChannel
+	switch s {
+	case syscall.SIGHUP,
+		syscall.SIGINT,
+		syscall.SIGTERM,
+		syscall.SIGQUIT:
+		logger.Infow("closing-signal-received", log.Fields{"signal": s})
+		return 0
+	default:
+		logger.Infow("unexpected-signal-received", log.Fields{"signal": s})
+		return 1
+	}
 }
 
 func printBanner() {
-	fmt.Println("                                            ")
-	fmt.Println(" ______        ______                       ")
-	fmt.Println("|  _ \\ \\      / / ___|___  _ __ ___       ")
-	fmt.Println("| |_) \\ \\ /\\ / / |   / _ \\| '__/ _ \\   ")
-	fmt.Println("|  _ < \\ V  V /| |__| (_) | | |  __/       ")
-	fmt.Println("|_| \\_\\ \\_/\\_/  \\____\\___/|_|  \\___| ")
-	fmt.Println("                                            ")
+	fmt.Println(`                                    `)
+	fmt.Println(` ______        ______               `)
+	fmt.Println(`|  _ \ \      / / ___|___  _ __ ___ `)
+	fmt.Println(`| |_) \ \ /\ / / |   / _ \| '__/ _ \`)
+	fmt.Println(`|  _ < \ V  V /| |__| (_) | | |  __/`)
+	fmt.Println(`|_| \_\ \_/\_/  \____\___/|_|  \___|`)
+	fmt.Println(`                                    `)
 }
 
 func printVersion() {
@@ -254,9 +126,6 @@
 
 	logger.Infow("rw-core-config", log.Fields{"config": *cf})
 
-	// Create the core
-	rw := newRWCore(cf)
-
 	// Create a context adding the status update channel
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
@@ -267,19 +136,19 @@
 	 * objects there can be a single probe end point for the process.
 	 */
 	p := &probe.Probe{}
-	go p.ListenAndServe(fmt.Sprintf("%s:%d", rw.config.ProbeHost, rw.config.ProbePort))
+	go p.ListenAndServe(fmt.Sprintf("%s:%d", cf.ProbeHost, cf.ProbePort))
 
 	// Add the probe to the context to pass to all the services started
 	probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
 
-	// Start the core
-	go rw.start(probeCtx, instanceID)
+	// create and start the core
+	core := c.NewCore(probeCtx, instanceID, cf)
 
 	code := waitForExit()
 	logger.Infow("received-a-closing-signal", log.Fields{"code": code})
 
 	// Cleanup before leaving
-	rw.stop(probeCtx)
+	core.Stop()
 
 	elapsed := time.Since(start)
 	logger.Infow("rw-core-run-time", log.Fields{"core": instanceID, "time": elapsed / time.Second})