VOL-2970 - Improved readability & traceability of startup code.

Changed Start() function to implement majority of the startup functionality, with less helpers.  Start() also defines local variables for each component created, to avoid accidentally using a component that isn't ready.
Also merged the rwCore into the Core.
Also changed Core to cancel a local context to on shutdown, and then wait for shutdown to complete.

Change-Id: I285e8486773476531e20ec352ff85a1b145432bf
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
new file mode 100644
index 0000000..fcdf340
--- /dev/null
+++ b/rw_core/core/kafka.go
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+	"context"
+	"time"
+
+	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	"github.com/opencord/voltha-go/rw_core/core/api"
+	"github.com/opencord/voltha-go/rw_core/core/device"
+	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-lib-go/v3/pkg/probe"
+)
+
+// startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
+func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, host string, port int, coreTopic, affinityRouterTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
+	logger.Infow("initialize-kafka-manager", log.Fields{"host": host, "port": port, "topic": coreTopic})
+
+	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
+
+	// create the kafka RPC proxy
+	kmp := kafka.NewInterContainerProxy(
+		kafka.InterContainerHost(host),
+		kafka.InterContainerPort(port),
+		kafka.MsgClient(kafkaClient),
+		kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}),
+		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: affinityRouterTopic}))
+
+	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
+
+	// wait for connectivity
+	logger.Infow("starting-kafka-manager", log.Fields{"host": host,
+		"port": port, "topic": coreTopic})
+
+	for {
+		// If we haven't started yet, then try to start
+		logger.Infow("starting-kafka-proxy", log.Fields{})
+		if err := kmp.Start(); err != nil {
+			// We failed to start. Delay and then try again later.
+			// Don't worry about liveness, as we can't be live until we've started.
+			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+			logger.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+			select {
+			case <-time.After(connectionRetryInterval):
+			case <-ctx.Done():
+				return nil, ctx.Err()
+			}
+			continue
+		}
+		// We started. We only need to do this once.
+		// Next we'll fall through and start checking liveness.
+		logger.Infow("started-kafka-proxy", log.Fields{})
+		break
+	}
+	return kmp, nil
+}
+
+/*
+ * monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
+ *
+ * Any producer that fails to send will cause KafkaInterContainerProxy to
+ * post a false event on its liveness channel. Any producer that succeeds in sending
+ * will cause KafkaInterContainerProxy to post a true event on its liveness
+ * channel. Group receivers also update liveness state, and a receiver will typically
+ * indicate a loss of liveness within 3-5 seconds of Kafka going down. Receivers
+ * only indicate restoration of liveness if a message is received. During normal
+ * operation, messages will be routinely produced and received, automatically
+ * indicating liveness state. These routine liveness indications are rate-limited
+ * inside sarama_client.
+ *
+ * This thread monitors the status of KafkaInterContainerProxy's liveness and pushes
+ * that state to the core's readiness probes. If no liveness event has been seen
+ * within a timeout, then the thread will make an attempt to produce a "liveness"
+ * message, which will in turn trigger a liveness event on the liveness channel, true
+ * or false depending on whether the attempt succeeded.
+ *
+ * The gRPC server in turn monitors the state of the readiness probe and will
+ * start issuing UNAVAILABLE response while the probe is not ready.
+ *
+ * startupRetryInterval -- interval between attempts to start
+ * liveProbeInterval -- interval between liveness checks when in a live state
+ * notLiveProbeInterval -- interval between liveness checks when in a notLive state
+ *
+ * liveProbeInterval and notLiveProbeInterval can be configured separately,
+ * though the current default is that both are set to 60 seconds.
+ */
+func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
+	logger.Info("started-kafka-message-proxy")
+
+	livenessChannel := kmp.EnableLivenessChannel(true)
+
+	logger.Info("enabled-kafka-liveness-channel")
+
+	timeout := liveProbeInterval
+	for {
+		timeoutTimer := time.NewTimer(timeout)
+		select {
+		case liveness := <-livenessChannel:
+			logger.Infow("kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+			// there was a state change in Kafka liveness
+			if !liveness {
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
+				logger.Info("kafka-manager-thread-set-server-notready")
+
+				// retry frequently while life is bad
+				timeout = notLiveProbeInterval
+			} else {
+				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
+				logger.Info("kafka-manager-thread-set-server-ready")
+
+				// retry infrequently while life is good
+				timeout = liveProbeInterval
+			}
+			if !timeoutTimer.Stop() {
+				<-timeoutTimer.C
+			}
+		case <-timeoutTimer.C:
+			logger.Info("kafka-proxy-liveness-recheck")
+			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+			// the liveness probe may wait (and block) writing to our channel.
+			go func() {
+				err := kmp.SendLiveness()
+				if err != nil {
+					// Catch possible error case if sending liveness after Sarama has been stopped.
+					logger.Warnw("error-kafka-send-liveness", log.Fields{"error": err})
+				}
+			}()
+		case <-ctx.Done():
+			return // just exit
+		}
+	}
+}
+
+func registerAdapterRequestHandlers(kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
+	requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
+
+	// Register the broadcast topic to handle any core-bound broadcast requests
+	if err := kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
+		logger.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
+	}
+
+	// Register the core-pair topic to handle core-bound requests destined to the core pair
+	if err := kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
+		logger.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": corePairTopic})
+	}
+
+	logger.Info("request-handler-registered")
+}