Update the RW Core configuration to accept similar configuration as the
Twisted Python Voltha Core.

Change-Id: Ic9b497dd2b2160d76c941f5115e8e6d0271916e9
diff --git a/common/log/log.go b/common/log/log.go
index 4bb31c0..e9f3c1b 100644
--- a/common/log/log.go
+++ b/common/log/log.go
@@ -2,11 +2,11 @@
 
 import (
 	"errors"
+	"fmt"
 	zp "go.uber.org/zap"
 	zc "go.uber.org/zap/zapcore"
 	"runtime"
 	"strings"
-	"fmt"
 )
 
 const (
@@ -26,6 +26,7 @@
 
 // CONSOLE formats the log for the console, mostly used during development
 const CONSOLE = "console"
+
 // JSON formats the log using json format, mostly used by an automated logging system consumption
 const JSON = "json"
 
diff --git a/compose/rw_core.yml b/compose/rw_core.yml
index 0fde367..bd8ea9f 100644
--- a/compose/rw_core.yml
+++ b/compose/rw_core.yml
@@ -4,9 +4,10 @@
     image: voltha-rw-core
     entrypoint:
         - /app/rw_core
-        - -kv-store-type=etcd
-        - -kv-store-host=${DOCKER_HOST_IP}
-        - -kv-store-port=2379
+        - -kv_store_type=etcd
+        - -kv_store_host=${DOCKER_HOST_IP}
+        - -kv_store_port=2379
+        - -banner=true
     volumes:
     - "/var/run/docker.sock:/tmp/docker.sock"
     networks:
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index 453f282..911da58 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -1,12 +1,12 @@
 package kvstore
 
 import (
+	"bytes"
 	"context"
 	"errors"
-	"bytes"
+	log "github.com/opencord/voltha-go/common/log"
 	"sync"
 	"time"
-	log "github.com/opencord/voltha-go/common/log"
 	//log "ciena.com/coordinator/common"
 	consulapi "github.com/hashicorp/consul/api"
 )
@@ -17,7 +17,6 @@
 	cancel  context.CancelFunc
 }
 
-
 // ConsulClient represents the consul KV store client
 type ConsulClient struct {
 	session                *consulapi.Session
@@ -140,7 +139,7 @@
 		session := c.consul.Session()
 		_, err := session.Destroy(c.sessionID, nil)
 		if err != nil {
-			log.Errorw("error-cleaning-session", log.Fields{"session":c.sessionID, "error":err})
+			log.Errorw("error-cleaning-session", log.Fields{"session": c.sessionID, "error": err})
 		}
 	}
 	c.sessionID = ""
@@ -157,12 +156,12 @@
 	for {
 		id, meta, err := session.Create(entry, nil)
 		if err != nil {
-			log.Errorw("create-session-error", log.Fields{"error":err})
+			log.Errorw("create-session-error", log.Fields{"error": err})
 			if retries == 0 {
 				return nil, "", err
 			}
 		} else if meta.RequestTime == 0 {
-			log.Errorw("create-session-bad-meta-data", log.Fields{"meta-data":meta})
+			log.Errorw("create-session-bad-meta-data", log.Fields{"meta-data": meta})
 			if retries == 0 {
 				return nil, "", errors.New("bad-meta-data")
 			}
@@ -183,7 +182,6 @@
 	}
 }
 
-
 // Helper function to verify mostly whether the content of two interface types are the same.  Focus is []byte and
 // string types
 func isEqual(val1 interface{}, val2 interface{}) bool {
@@ -226,10 +224,10 @@
 
 	session, sessionID, err := c.createSession(ttl, -1)
 	if err != nil {
-		log.Errorw("no-session-created", log.Fields{"error":err})
+		log.Errorw("no-session-created", log.Fields{"error": err})
 		return "", errors.New("no-session-created")
 	}
-	log.Debugw("session-created", log.Fields{"session-id":sessionID})
+	log.Debugw("session-created", log.Fields{"session-id": sessionID})
 	c.sessionID = sessionID
 	c.session = session
 
@@ -238,11 +236,11 @@
 	kvp := consulapi.KVPair{Key: key, Value: val, Session: c.sessionID}
 	result, _, err := kv.Acquire(&kvp, nil)
 	if err != nil {
-		log.Errorw("error-acquiring-keys", log.Fields{"error":err})
+		log.Errorw("error-acquiring-keys", log.Fields{"error": err})
 		return nil, err
 	}
 
-	log.Debugw("key-acquired", log.Fields{"key":key, "status":result})
+	log.Debugw("key-acquired", log.Fields{"key": key, "status": result})
 
 	// Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
 	m, err := c.Get(key, defaultKVGetTimeout)
@@ -250,7 +248,7 @@
 		return nil, err
 	}
 	if m != nil {
-		log.Debugw("response-received", log.Fields{"key":m.Key, "m.value":string(m.Value.([]byte)), "value":value})
+		log.Debugw("response-received", log.Fields{"key": m.Key, "m.value": string(m.Value.([]byte)), "value": value})
 		if m.Key == key && isEqual(m.Value, value) {
 			// My reservation is successful - register it.  For now, support is only for 1 reservation per key
 			// per session.
@@ -280,11 +278,11 @@
 		kvp = consulapi.KVPair{Key: key, Value: value.([]byte), Session: c.sessionID}
 		result, _, err = kv.Release(&kvp, nil)
 		if err != nil {
-			log.Errorw("cannot-release-reservation", log.Fields{"key":key, "error":err})
+			log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
 			return err
 		}
 		if !result {
-			log.Errorw("cannot-release-reservation", log.Fields{"key":key})
+			log.Errorw("cannot-release-reservation", log.Fields{"key": key})
 		}
 		delete(c.keyReservations, key)
 	}
@@ -371,7 +369,7 @@
 	c.writeLock.Lock()
 	defer c.writeLock.Unlock()
 	if watchedChannelsContexts, ok = c.watchedChannelsContext[key]; !ok {
-		log.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key":key})
+		log.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key": key})
 		return
 	}
 	// Look for the channels
@@ -390,7 +388,7 @@
 	if pos >= 0 {
 		c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key][:pos], c.watchedChannelsContext[key][pos+1:]...)
 	}
-	log.Debugw("watched-channel-exiting", log.Fields{"key":key, "channel":c.watchedChannelsContext[key]})
+	log.Debugw("watched-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannelsContext[key]})
 }
 
 func (c *ConsulClient) isKVEqual(kv1 *consulapi.KVPair, kv2 *consulapi.KVPair) bool {
@@ -411,7 +409,7 @@
 }
 
 func (c *ConsulClient) listenForKeyChange(watchContext context.Context, key string, ch chan *Event) {
-	log.Debugw("start-watching-channel", log.Fields{"key":key, "channel":ch})
+	log.Debugw("start-watching-channel", log.Fields{"key": key, "channel": ch})
 
 	defer c.CloseWatch(key, ch)
 	duration := GetDuration(defaultKVGetTimeout)
@@ -441,10 +439,10 @@
 			return
 		default:
 			if err != nil {
-				log.Warnw("error-from-watch", log.Fields{"error":err})
+				log.Warnw("error-from-watch", log.Fields{"error": err})
 				ch <- NewEvent(CONNECTIONDOWN, key, []byte(""))
 			} else {
-				log.Debugw("index-state", log.Fields{"lastindex":lastIndex, "newindex":meta.LastIndex, "key":key})
+				log.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
 			}
 		}
 		if err != nil {
@@ -454,13 +452,13 @@
 		} else if meta.LastIndex <= lastIndex {
 			log.Info("no-index-change-or-negative")
 		} else {
-			log.Debugw("update-received", log.Fields{"pair":pair})
+			log.Debugw("update-received", log.Fields{"pair": pair})
 			if pair == nil {
 				ch <- NewEvent(DELETE, key, []byte(""))
 			} else if !c.isKVEqual(pair, previousKVPair) {
 				// Push the change onto the channel if the data has changed
 				// For now just assume it's a PUT change
-				log.Debugw("pair-details", log.Fields{"session":pair.Session, "key":pair.Key, "value":pair.Value})
+				log.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
 				ch <- NewEvent(PUT, pair.Key, pair.Value)
 			}
 			previousKVPair = pair
@@ -481,6 +479,6 @@
 
 	// Clear the sessionID
 	if _, err := c.consul.Session().Destroy(c.sessionID, &writeOptions); err != nil {
-		log.Errorw("error-closing-client", log.Fields{"error":err})
+		log.Errorw("error-closing-client", log.Fields{"error": err})
 	}
 }
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 2755cd1..becc86b 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -4,11 +4,11 @@
 	//log "../common"
 	"context"
 	"errors"
-	"sync"
-	log "github.com/opencord/voltha-go/common/log"
 	"fmt"
 	v3Client "github.com/coreos/etcd/clientv3"
 	v3rpcTypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+	log "github.com/opencord/voltha-go/common/log"
+	"sync"
 )
 
 // EtcdClient represents the Etcd KV store client
@@ -98,13 +98,13 @@
 	if err != nil {
 		switch err {
 		case context.Canceled:
-			log.Warnw("context-cancelled", log.Fields{"error":err})
+			log.Warnw("context-cancelled", log.Fields{"error": err})
 		case context.DeadlineExceeded:
-			log.Warnw("context-deadline-exceeded", log.Fields{"error":err})
+			log.Warnw("context-deadline-exceeded", log.Fields{"error": err})
 		case v3rpcTypes.ErrEmptyKey:
-			log.Warnw("etcd-client-error", log.Fields{"error":err})
+			log.Warnw("etcd-client-error", log.Fields{"error": err})
 		default:
-			log.Warnw("bad-endpoints", log.Fields{"error":err})
+			log.Warnw("bad-endpoints", log.Fields{"error": err})
 		}
 		return err
 	}
@@ -142,7 +142,7 @@
 		return nil
 	}
 
-	log.Debugw("delete-keys", log.Fields{"all-keys-deleted":int64(len(gresp.Kvs)) == dresp.Deleted})
+	log.Debugw("delete-keys", log.Fields{"all-keys-deleted": int64(len(gresp.Kvs)) == dresp.Deleted})
 	if int64(len(gresp.Kvs)) == dresp.Deleted {
 		log.Debug("All-keys-deleted")
 	} else {
@@ -236,7 +236,7 @@
 	for key, leaseID := range c.keyReservations {
 		_, err := c.ectdAPI.Revoke(context.Background(), *leaseID)
 		if err != nil {
-			log.Errorw("cannot-release-reservation", log.Fields{"key":key, "error":err})
+			log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
 			return err
 		}
 		delete(c.keyReservations, key)
@@ -280,7 +280,7 @@
 	if leaseID != nil {
 		_, err := c.ectdAPI.KeepAliveOnce(context.Background(), *leaseID)
 		if err != nil {
-			log.Errorw("lease-may-have-expired", log.Fields{"error":err})
+			log.Errorw("lease-may-have-expired", log.Fields{"error": err})
 			return err
 		}
 	} else {
@@ -305,7 +305,7 @@
 	//defer c.writeLock.Unlock()
 	c.watchedChannels[key] = append(c.watchedChannels[key], channelMap)
 
-	log.Debugw("watched-channels", log.Fields{"channels":c.watchedChannels[key]})
+	log.Debugw("watched-channels", log.Fields{"channels": c.watchedChannels[key]})
 	// Launch a go routine to listen for updates
 	go c.listenForKeyChange(channel, ch)
 
@@ -323,7 +323,7 @@
 	defer c.writeLock.Unlock()
 
 	if watchedChannels, ok = c.watchedChannels[key]; !ok {
-		log.Warnw("key-has-no-watched-channels", log.Fields{"key":key})
+		log.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
 		return
 	}
 	// Look for the channels
@@ -333,7 +333,7 @@
 			log.Debug("channel-found")
 			// Close the etcd watcher before the client channel.  This should close the etcd channel as well
 			if err := t.Close(); err != nil {
-				log.Errorw("watcher-cannot-be-closed", log.Fields{"key":key, "error":err})
+				log.Errorw("watcher-cannot-be-closed", log.Fields{"key": key, "error": err})
 			}
 			close(ch)
 			pos = i
@@ -344,11 +344,11 @@
 	if pos >= 0 {
 		c.watchedChannels[key] = append(c.watchedChannels[key][:pos], c.watchedChannels[key][pos+1:]...)
 	}
-	log.Infow("watcher-channel-exiting", log.Fields{"key":key, "channel":c.watchedChannels[key]})
+	log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannels[key]})
 }
 
 func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
-	log.Infow("start-listening-on-channel", log.Fields{"channel":ch})
+	log.Infow("start-listening-on-channel", log.Fields{"channel": ch})
 	for resp := range channel {
 		for _, ev := range resp.Events {
 			//log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
@@ -373,6 +373,6 @@
 	c.writeLock.Lock()
 	defer c.writeLock.Unlock()
 	if err := c.ectdAPI.Close(); err != nil {
-		log.Errorw("error-closing-client", log.Fields{"error":err})
+		log.Errorw("error-closing-client", log.Fields{"error": err})
 	}
 }
diff --git a/db/kvstore/kvutils_test.go b/db/kvstore/kvutils_test.go
index f5e82d2..2f3818d 100644
--- a/db/kvstore/kvutils_test.go
+++ b/db/kvstore/kvutils_test.go
@@ -1,9 +1,9 @@
 package kvstore
 
 import (
-	"time"
-	"testing"
 	"github.com/stretchr/testify/assert"
+	"testing"
+	"time"
 )
 
 func TestDurationWithNegativeTimeout(t *testing.T) {
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index ec30eac..a223318 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -7,80 +7,140 @@
 	//dt "github.com/docker/docker/api/types"
 	//dc "github.com/docker/docker/client"
 	"os"
-	"time"
 )
 
-// Constants used to differentiate between the KV stores
+// RW Core service default constants
 const (
-	ConsulStoreName string = "consul"
-	EtcdStoreName   string = "etcd"
+	ConsulStoreName          = "consul"
+	EtcdStoreName            = "etcd"
+	default_InstanceID       = "rwcore001"
+	default_GrpcPort         = 50057
+	default_GrpcHost         = "10.100.198.240"
+	default_KafkaAdapterHost = "10.100.198.240"
+	default_KafkaAdapterPort = 9092
+	default_KafkaClusterHost = "10.100.198.240"
+	default_KafkaClusterPort = 9094
+	default_KVStoreType      = ConsulStoreName
+	default_KVStoreTimeout   = 5 //in seconds
+	default_KVStoreHost      = "10.100.198.240"
+	default_KVStorePort      = 8500 // Etcd = 2379
+	default_LogLevel         = 1
+	default_Banner           = false
+	default_CoreTopic        = "rwcore"
+	default_RWCoreEndpoint   = "rwcore"
+	default_RWCoreKey        = "pki/voltha.key"
+	default_RWCoreCert       = "pki/voltha.crt"
+	default_RWCoreCA         = "pki/voltha-CA.pem"
 )
 
-// CoordinatorFlags represents the set of configurations used by the coordinator
+// RWCoreFlags represents the set of configurations used by the read-write core service
 type RWCoreFlags struct {
 	// Command line parameters
-	InstanceID     string
-	KVStoreType    string
-	KVStoreTimeout int // in seconds
-	KVStoreHost    string
-	KVStorePort    int
-	LogLevel       string
+	InstanceID       string
+	RWCoreEndpoint   string
+	GrpcHost         string
+	GrpcPort         int
+	KafkaAdapterHost string
+	KafkaAdapterPort int
+	KafkaClusterHost string
+	KafkaClusterPort int
+	KVStoreType      string
+	KVStoreTimeout   int // in seconds
+	KVStoreHost      string
+	KVStorePort      int
+	CoreTopic        string
+	LogLevel         int
+	Banner           bool
+	RWCoreKey        string
+	RWCoreCert       string
+	RWCoreCA         string
 }
 
-// NewRWCoreFlags returns a new coordinator config
+// NewRWCoreFlags returns a new RWCore config
 func NewRWCoreFlags() *RWCoreFlags {
 	var rwCoreFlag = RWCoreFlags{ // Default values
-		InstanceID:     "rw_coreInstance001",
-		KVStoreType:    ConsulStoreName,
-		KVStoreTimeout: 5,
-		KVStoreHost:    "10.100.198.240",
-		//KVStorePort:                 2379,
-		KVStorePort: 8500,
-		LogLevel:    "info",
+		InstanceID:       default_InstanceID,
+		RWCoreEndpoint:   default_RWCoreEndpoint,
+		GrpcHost:         default_GrpcHost,
+		GrpcPort:         default_GrpcPort,
+		KafkaAdapterHost: default_KafkaAdapterHost,
+		KafkaAdapterPort: default_KafkaAdapterPort,
+		KafkaClusterHost: default_KafkaClusterHost,
+		KafkaClusterPort: default_KafkaClusterPort,
+		KVStoreType:      default_KVStoreType,
+		KVStoreTimeout:   default_KVStoreTimeout,
+		KVStoreHost:      default_KVStoreHost,
+		KVStorePort:      default_KVStorePort,
+		CoreTopic:        default_CoreTopic,
+		LogLevel:         default_LogLevel,
+		Banner:           default_Banner,
+		RWCoreKey:        default_RWCoreKey,
+		RWCoreCert:       default_RWCoreCert,
+		RWCoreCA:         default_RWCoreCA,
 	}
 	return &rwCoreFlag
 }
 
-// ParseCommandArguments parses the arguments when running coordinator
+// ParseCommandArguments parses the arguments when running read-write core service
 func (cf *RWCoreFlags) ParseCommandArguments() {
-	flag.IntVar(&(cf.KVStoreTimeout),
-		"kv-store-request-timeout",
-		cf.KVStoreTimeout,
-		"The default timeout when making a kv store request")
 
-	flag.StringVar(&(cf.KVStoreType),
-		"kv-store-type",
-		cf.KVStoreType,
-		"KV store type")
+	var help string
 
-	flag.StringVar(&(cf.KVStoreHost),
-		"kv-store-host",
-		cf.KVStoreHost,
-		"KV store host")
+	help = fmt.Sprintf("RW core endpoint address")
+	flag.StringVar(&(cf.RWCoreEndpoint), "vcore-endpoint", default_RWCoreEndpoint, help)
 
-	flag.IntVar(&(cf.KVStorePort),
-		"kv-store-port",
-		cf.KVStorePort,
-		"KV store port")
+	help = fmt.Sprintf("GRPC server - host")
+	flag.StringVar(&(cf.GrpcHost), "grpc_host", default_GrpcHost, help)
 
-	flag.StringVar(&(cf.LogLevel),
-		"log-level",
-		cf.LogLevel,
-		"Log level")
+	help = fmt.Sprintf("GRPC server - port")
+	flag.IntVar(&(cf.GrpcPort), "grpc_port", default_GrpcPort, help)
+
+	help = fmt.Sprintf("Kafka - Adapter messaging host")
+	flag.StringVar(&(cf.KafkaAdapterHost), "kafka_adapter_host", default_KafkaAdapterHost, help)
+
+	help = fmt.Sprintf("Kafka - Adapter messaging port")
+	flag.IntVar(&(cf.KafkaAdapterPort), "kafka_adapter_port", default_KafkaAdapterPort, help)
+
+	help = fmt.Sprintf("Kafka - Cluster messaging host")
+	flag.StringVar(&(cf.KafkaClusterHost), "kafka_cluster_host", default_KafkaClusterHost, help)
+
+	help = fmt.Sprintf("Kafka - Cluster messaging port")
+	flag.IntVar(&(cf.KafkaClusterPort), "kafka_cluster_port", default_KafkaClusterPort, help)
+
+	help = fmt.Sprintf("RW Core topic")
+	flag.StringVar(&(cf.CoreTopic), "rw_core_topic", default_CoreTopic, help)
+
+	help = fmt.Sprintf("KV store type")
+	flag.StringVar(&(cf.KVStoreType), "kv_store_type", default_KVStoreType, help)
+
+	help = fmt.Sprintf("The default timeout when making a kv store request")
+	flag.IntVar(&(cf.KVStoreTimeout), "kv_store_request_timeout", default_KVStoreTimeout, help)
+
+	help = fmt.Sprintf("KV store host")
+	flag.StringVar(&(cf.KVStoreHost), "kv_store_host", default_KVStoreHost, help)
+
+	help = fmt.Sprintf("KV store port")
+	flag.IntVar(&(cf.KVStorePort), "kv_store_port", default_KVStorePort, help)
+
+	help = fmt.Sprintf("Log level")
+	flag.IntVar(&(cf.LogLevel), "log_level", default_LogLevel, help)
+
+	help = fmt.Sprintf("Show startup banner log lines")
+	flag.BoolVar(&cf.Banner, "banner", default_Banner, help)
 
 	flag.Parse()
 
 	// Update the necessary keys with the prefixes
-	start := time.Now()
+	//start := time.Now()
 	containerName := getContainerInfo()
-	fmt.Println("container name:", containerName)
+	//fmt.Println("container name:", containerName)
 	if len(containerName) > 0 {
 		cf.InstanceID = containerName
 	}
 
-	fmt.Println("Inside config:", cf)
-	elapsed := time.Since(start)
-	fmt.Println("time:", elapsed/time.Second)
+	//fmt.Println("Inside config:", cf)
+	//elapsed := time.Since(start)
+	//fmt.Println("time:", elapsed/time.Second)
 }
 
 func getContainerInfo() string {
diff --git a/rw_core/main.go b/rw_core/main.go
index e766904..faa5851 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -1,28 +1,29 @@
 package main
 
 import (
+	"context"
+	"errors"
 	"fmt"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/db/kvstore"
+	"github.com/opencord/voltha-go/rw_core/config"
 	"os"
 	"os/signal"
-	"time"
-	"errors"
 	"strconv"
-	"github.com/opencord/voltha-go/db/kvstore"
-	"github.com/opencord/voltha-go/common/log"
-	"github.com/opencord/voltha-go/rw_core/config"
 	"syscall"
+	"time"
 )
 
 type rwCore struct {
-	kvClient     kvstore.Client
-	config       *config.RWCoreFlags
-	halted		bool
-	exitChannel  chan int
+	kvClient    kvstore.Client
+	config      *config.RWCoreFlags
+	halted      bool
+	exitChannel chan int
 }
 
 func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
 
-	log.Infow("kv-store-type", log.Fields{"store":storeType})
+	log.Infow("kv-store-type", log.Fields{"store": storeType})
 	switch storeType {
 	case "consul":
 		return kvstore.NewConsulClient(address, timeout)
@@ -51,7 +52,6 @@
 	return nil
 }
 
-
 func toString(value interface{}) (string, error) {
 	switch t := value.(type) {
 	case []byte:
@@ -63,41 +63,16 @@
 	}
 }
 
+func (core *rwCore) start(ctx context.Context) {
+	log.Info("Starting RW Core components")
+	// Setup GRPC Server
 
-func (core *rwCore) start() {
-	log.Info("core-starting")
+	// Setup KV Client
 
-	//First set the KV client.  Some client immediately tries to connect to the KV store (etcd) while others does
-	// not create the connection until a request to the store is made (consul)
-	tick := time.Tick(kvstore.GetDuration(core.config.KVStoreTimeout))
-	connected := false
-KVStoreConnectLoop:
-	for {
-		if err := core.setKVClient(); err != nil {
-			log.Warn("cannot-create-kv-client-retrying")
-			select {
-			case <-tick:
-				log.Debug("kv-client-retry")
-				continue
-			case <-core.exitChannel:
-				log.Info("exit-request-received")
-				break KVStoreConnectLoop
-			}
-		} else {
-			log.Debug("got-kv-client.")
-			connected = true
-			break
-		}
-	}
-	// Connected is true only if there is a valid KV store connection and no exit request has been received
-	if connected {
-		log.Info("core-started")
-	} else {
-		log.Info("core-ended")
-	}
+	// Setup Kafka messaging services
+
 }
 
-
 func (core *rwCore) stop() {
 	// Stop leadership tracking
 	core.halted = true
@@ -109,7 +84,7 @@
 	if core.kvClient != nil {
 		// Release all reservations
 		if err := core.kvClient.ReleaseAllReservations(); err != nil {
-			log.Infow("fail-to-release-all-reservations", log.Fields{"error":err})
+			log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
 		}
 		// Close the DB connection
 		core.kvClient.Close()
@@ -133,10 +108,10 @@
 			syscall.SIGINT,
 			syscall.SIGTERM,
 			syscall.SIGQUIT:
-			log.Infow("closing-signal-received", log.Fields{"signal":s})
+			log.Infow("closing-signal-received", log.Fields{"signal": s})
 			exitChannel <- 0
 		default:
-			log.Infow("unexpected-signal-received", log.Fields{"signal":s})
+			log.Infow("unexpected-signal-received", log.Fields{"signal": s})
 			exitChannel <- 1
 		}
 	}()
@@ -145,6 +120,16 @@
 	return code
 }
 
+func printBanner() {
+	fmt.Println("                                            ")
+	fmt.Println(" ______        ______                       ")
+	fmt.Println("|  _ \\ \\      / / ___|___  _ __ ___       ")
+	fmt.Println("| |_) \\ \\ /\\ / / |   / _ \\| '__/ _ \\   ")
+	fmt.Println("|  _ < \\ V  V /| |__| (_) | | |  __/       ")
+	fmt.Println("|_| \\_\\ \\_/\\_/  \\____\\___/|_|  \\___| ")
+	fmt.Println("                                            ")
+}
+
 func main() {
 	start := time.Now()
 
@@ -152,22 +137,30 @@
 	cf.ParseCommandArguments()
 
 	// Setup logging
-	if _, err := log.SetLogger(log.JSON, log.DebugLevel, log.Fields{"instanceId":cf.InstanceID}); err != nil {
+	if _, err := log.SetLogger(log.JSON, cf.LogLevel, log.Fields{"instanceId": cf.InstanceID}); err != nil {
 		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
 	}
 	defer log.CleanUp()
 
-	log.Infow("rw-core-config", log.Fields{"config":*cf})
+	// Print banner if specified
+	if cf.Banner {
+		printBanner()
+	}
+
+	log.Infow("rw-core-config", log.Fields{"config": *cf})
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
 
 	core := newRWCore(cf)
-	go core.start()
+	go core.start(ctx)
 
 	code := waitForExit()
-	log.Infow("received-a-closing-signal", log.Fields{"code":code})
+	log.Infow("received-a-closing-signal", log.Fields{"code": code})
 
 	// Cleanup before leaving
 	core.stop()
 
 	elapsed := time.Since(start)
-	log.Infow("rw-core-run-time", log.Fields{"core":core.config.InstanceID, "time":elapsed/time.Second})
+	log.Infow("rw-core-run-time", log.Fields{"core": core.config.InstanceID, "time": elapsed / time.Second})
 }