diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 2b5017e..6510964 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,104 +24,113 @@
 
 // RW Core service default constants
 const (
-	EtcdStoreName                    = "etcd"
-	defaultGrpcAddress               = ":50057"
-	defaultKafkaAdapterAddress       = "127.0.0.1:9092"
-	defaultKafkaClusterAddress       = "127.0.0.1:9094"
-	defaultKVStoreType               = EtcdStoreName
-	defaultKVStoreTimeout            = 5 * time.Second
-	defaultKVStoreAddress            = "127.0.0.1:2379" // Etcd = 2379
-	defaultKVTxnKeyDelTime           = 60
-	defaultLogLevel                  = "WARN"
-	defaultBanner                    = false
-	defaultDisplayVersionOnly        = false
-	defaultCoreTopic                 = "rwcore"
-	defaultEventTopic                = "voltha.events"
-	defaultRWCoreEndpoint            = "rwcore"
-	defaultRWCoreKey                 = "pki/voltha.key"
-	defaultRWCoreCert                = "pki/voltha.crt"
-	defaultRWCoreCA                  = "pki/voltha-CA.pem"
-	defaultLongRunningRequestTimeout = 2000 * time.Millisecond
-	defaultDefaultRequestTimeout     = 1000 * time.Millisecond
-	defaultCoreTimeout               = 1000 * time.Millisecond
-	defaultCoreBindingKey            = "voltha_backend_name"
-	defaultMaxConnectionRetries      = -1 // retries forever
-	defaultConnectionRetryInterval   = 2 * time.Second
-	defaultLiveProbeInterval         = 60 * time.Second
-	defaultNotLiveProbeInterval      = 5 * time.Second // Probe more frequently when not alive
-	defaultProbeAddress              = ":8080"
-	defaultTraceEnabled              = false
-	defaultTraceAgentAddress         = "127.0.0.1:6831"
-	defaultLogCorrelationEnabled     = true
-	defaultVolthaStackID             = "voltha"
+	EtcdStoreName                      = "etcd"
+	defaultGrpcAddress                 = ":50057"
+	defaultKafkaAdapterAddress         = "127.0.0.1:9092"
+	defaultKafkaClusterAddress         = "127.0.0.1:9094"
+	defaultKVStoreType                 = EtcdStoreName
+	defaultKVStoreTimeout              = 5 * time.Second
+	defaultKVStoreAddress              = "127.0.0.1:2379" // Etcd = 2379
+	defaultKVTxnKeyDelTime             = 60
+	defaultLogLevel                    = "WARN"
+	defaultBanner                      = false
+	defaultDisplayVersionOnly          = false
+	defaultCoreTopic                   = "rwcore"
+	defaultEventTopic                  = "voltha.events"
+	defaultRWCoreEndpoint              = "rwcore"
+	defaultRWCoreKey                   = "pki/voltha.key"
+	defaultRWCoreCert                  = "pki/voltha.crt"
+	defaultRWCoreCA                    = "pki/voltha-CA.pem"
+	defaultLongRunningRequestTimeout   = 2000 * time.Millisecond
+	defaultDefaultRequestTimeout       = 1000 * time.Millisecond
+	defaultCoreTimeout                 = 1000 * time.Millisecond
+	defaultCoreBindingKey              = "voltha_backend_name"
+	defaultMaxConnectionRetries        = -1 // retries forever
+	defaultConnectionRetryInterval     = 2 * time.Second
+	defaultLiveProbeInterval           = 60 * time.Second
+	defaultNotLiveProbeInterval        = 5 * time.Second // Probe more frequently when not alive
+	defaultProbeAddress                = ":8080"
+	defaultTraceEnabled                = false
+	defaultTraceAgentAddress           = "127.0.0.1:6831"
+	defaultLogCorrelationEnabled       = true
+	defaultVolthaStackID               = "voltha"
+	defaultBackoffRetryInitialInterval = 500 * time.Millisecond
+	defaultBackoffRetryMaxElapsedTime  = 0
+	defaultBackoffRetryMaxInterval     = 1 * time.Minute
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
 type RWCoreFlags struct {
 	// Command line parameters
-	RWCoreEndpoint            string
-	GrpcAddress               string
-	KafkaAdapterAddress       string
-	KafkaClusterAddress       string
-	KVStoreType               string
-	KVStoreTimeout            time.Duration
-	KVStoreAddress            string
-	KVTxnKeyDelTime           int
-	CoreTopic                 string
-	EventTopic                string
-	LogLevel                  string
-	Banner                    bool
-	DisplayVersionOnly        bool
-	RWCoreKey                 string
-	RWCoreCert                string
-	RWCoreCA                  string
-	LongRunningRequestTimeout time.Duration
-	DefaultRequestTimeout     time.Duration
-	DefaultCoreTimeout        time.Duration
-	CoreBindingKey            string
-	MaxConnectionRetries      int
-	ConnectionRetryInterval   time.Duration
-	LiveProbeInterval         time.Duration
-	NotLiveProbeInterval      time.Duration
-	ProbeAddress              string
-	TraceEnabled              bool
-	TraceAgentAddress         string
-	LogCorrelationEnabled     bool
-	VolthaStackID             string
+	RWCoreEndpoint              string
+	GrpcAddress                 string
+	KafkaAdapterAddress         string
+	KafkaClusterAddress         string
+	KVStoreType                 string
+	KVStoreTimeout              time.Duration
+	KVStoreAddress              string
+	KVTxnKeyDelTime             int
+	CoreTopic                   string
+	EventTopic                  string
+	LogLevel                    string
+	Banner                      bool
+	DisplayVersionOnly          bool
+	RWCoreKey                   string
+	RWCoreCert                  string
+	RWCoreCA                    string
+	LongRunningRequestTimeout   time.Duration
+	DefaultRequestTimeout       time.Duration
+	DefaultCoreTimeout          time.Duration
+	CoreBindingKey              string
+	MaxConnectionRetries        int
+	ConnectionRetryInterval     time.Duration
+	LiveProbeInterval           time.Duration
+	NotLiveProbeInterval        time.Duration
+	ProbeAddress                string
+	TraceEnabled                bool
+	TraceAgentAddress           string
+	LogCorrelationEnabled       bool
+	VolthaStackID               string
+	BackoffRetryInitialInterval time.Duration
+	BackoffRetryMaxElapsedTime  time.Duration
+	BackoffRetryMaxInterval     time.Duration
 }
 
 // NewRWCoreFlags returns a new RWCore config
 func NewRWCoreFlags() *RWCoreFlags {
 	var rwCoreFlag = RWCoreFlags{ // Default values
-		RWCoreEndpoint:            defaultRWCoreEndpoint,
-		GrpcAddress:               defaultGrpcAddress,
-		KafkaAdapterAddress:       defaultKafkaAdapterAddress,
-		KafkaClusterAddress:       defaultKafkaClusterAddress,
-		KVStoreType:               defaultKVStoreType,
-		KVStoreTimeout:            defaultKVStoreTimeout,
-		KVStoreAddress:            defaultKVStoreAddress,
-		KVTxnKeyDelTime:           defaultKVTxnKeyDelTime,
-		CoreTopic:                 defaultCoreTopic,
-		EventTopic:                defaultEventTopic,
-		LogLevel:                  defaultLogLevel,
-		Banner:                    defaultBanner,
-		DisplayVersionOnly:        defaultDisplayVersionOnly,
-		RWCoreKey:                 defaultRWCoreKey,
-		RWCoreCert:                defaultRWCoreCert,
-		RWCoreCA:                  defaultRWCoreCA,
-		DefaultRequestTimeout:     defaultDefaultRequestTimeout,
-		LongRunningRequestTimeout: defaultLongRunningRequestTimeout,
-		DefaultCoreTimeout:        defaultCoreTimeout,
-		CoreBindingKey:            defaultCoreBindingKey,
-		MaxConnectionRetries:      defaultMaxConnectionRetries,
-		ConnectionRetryInterval:   defaultConnectionRetryInterval,
-		LiveProbeInterval:         defaultLiveProbeInterval,
-		NotLiveProbeInterval:      defaultNotLiveProbeInterval,
-		ProbeAddress:              defaultProbeAddress,
-		TraceEnabled:              defaultTraceEnabled,
-		TraceAgentAddress:         defaultTraceAgentAddress,
-		LogCorrelationEnabled:     defaultLogCorrelationEnabled,
-		VolthaStackID:             defaultVolthaStackID,
+		RWCoreEndpoint:              defaultRWCoreEndpoint,
+		GrpcAddress:                 defaultGrpcAddress,
+		KafkaAdapterAddress:         defaultKafkaAdapterAddress,
+		KafkaClusterAddress:         defaultKafkaClusterAddress,
+		KVStoreType:                 defaultKVStoreType,
+		KVStoreTimeout:              defaultKVStoreTimeout,
+		KVStoreAddress:              defaultKVStoreAddress,
+		KVTxnKeyDelTime:             defaultKVTxnKeyDelTime,
+		CoreTopic:                   defaultCoreTopic,
+		EventTopic:                  defaultEventTopic,
+		LogLevel:                    defaultLogLevel,
+		Banner:                      defaultBanner,
+		DisplayVersionOnly:          defaultDisplayVersionOnly,
+		RWCoreKey:                   defaultRWCoreKey,
+		RWCoreCert:                  defaultRWCoreCert,
+		RWCoreCA:                    defaultRWCoreCA,
+		DefaultRequestTimeout:       defaultDefaultRequestTimeout,
+		LongRunningRequestTimeout:   defaultLongRunningRequestTimeout,
+		DefaultCoreTimeout:          defaultCoreTimeout,
+		CoreBindingKey:              defaultCoreBindingKey,
+		MaxConnectionRetries:        defaultMaxConnectionRetries,
+		ConnectionRetryInterval:     defaultConnectionRetryInterval,
+		LiveProbeInterval:           defaultLiveProbeInterval,
+		NotLiveProbeInterval:        defaultNotLiveProbeInterval,
+		ProbeAddress:                defaultProbeAddress,
+		TraceEnabled:                defaultTraceEnabled,
+		TraceAgentAddress:           defaultTraceAgentAddress,
+		LogCorrelationEnabled:       defaultLogCorrelationEnabled,
+		VolthaStackID:               defaultVolthaStackID,
+		BackoffRetryInitialInterval: defaultBackoffRetryInitialInterval,
+		BackoffRetryMaxElapsedTime:  defaultBackoffRetryMaxElapsedTime,
+		BackoffRetryMaxInterval:     defaultBackoffRetryMaxInterval,
 	}
 	return &rwCoreFlag
 }
@@ -209,5 +218,14 @@
 	help = fmt.Sprintf("ID for the current voltha stack")
 	flag.StringVar(&cf.VolthaStackID, "stack_id", defaultVolthaStackID, help)
 
+	help = fmt.Sprintf("The initial number of milliseconds an exponential backoff will wait before a retry")
+	flag.DurationVar(&(cf.BackoffRetryInitialInterval), "backoff_retry_initial_interval", defaultBackoffRetryInitialInterval, help)
+
+	help = fmt.Sprintf("The maximum number of milliseconds an exponential backoff can elasped")
+	flag.DurationVar(&(cf.BackoffRetryMaxElapsedTime), "backoff_retry_max_elapsed_time", defaultBackoffRetryMaxElapsedTime, help)
+
+	help = fmt.Sprintf("The maximum number of milliseconds of an exponential backoff interval")
+	flag.DurationVar(&(cf.BackoffRetryMaxInterval), "backoff_retry_max_interval", defaultBackoffRetryMaxInterval, help)
+
 	flag.Parse()
 }
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 459c2fa..4b864de 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -56,6 +56,7 @@
 			"kv-store",
 			"adapter-manager",
 			"grpc-service",
+			"adapter-request-handler",
 		)
 
 		if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
@@ -134,7 +135,8 @@
 	)
 
 	// create event proxy
-	eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval)
+	updateProbeClusterService := cf.KafkaAdapterAddress != cf.KafkaClusterAddress
+	eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval, updateProbeClusterService)
 	if err != nil {
 		logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
 		return
@@ -168,7 +170,7 @@
 	deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout, eventProxy, cf.VolthaStackID)
 
 	// register kafka RPC handler
-	registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic)
+	registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf, "adapter-request-handler")
 
 	// start gRPC handler
 	grpcServer := grpcserver.NewGrpcServer(cf.GrpcAddress, nil, false, probe.GetProbeFromContext(ctx))
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index d45a84e..ae33090 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -18,7 +18,10 @@
 
 import (
 	"context"
+	"github.com/cenkalti/backoff/v3"
+	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-lib-go/v4/pkg/events"
+
 	"time"
 
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -69,11 +72,13 @@
 	return kmp, nil
 }
 
-func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration) (*events.EventProxy, error) {
+func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration, updateProbeService bool) (*events.EventProxy, error) {
 	ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
 	for {
 		if err := kafkaClient.Start(ctx); err != nil {
-			probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+			if updateProbeService {
+				probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+			}
 			logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
 			select {
 			case <-time.After(connectionRetryInterval):
@@ -82,6 +87,9 @@
 				return nil, ctx.Err()
 			}
 		}
+		if updateProbeService {
+			probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusRunning)
+		}
 		logger.Info(ctx, "started-connection-on-kafka-cluster-address")
 		break
 	}
@@ -170,13 +178,45 @@
 	}
 }
 
-func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
+func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager,
+	aMgr *adapter.Manager, cf *config.RWCoreFlags, serviceName string) {
+	logger.Infow(ctx, "registering-request-handler", log.Fields{"topic": cf.CoreTopic})
+
+	// Set the exponential backoff params
+	kafkaRetryBackoff := backoff.NewExponentialBackOff()
+	kafkaRetryBackoff.InitialInterval = cf.BackoffRetryInitialInterval
+	kafkaRetryBackoff.MaxElapsedTime = cf.BackoffRetryMaxElapsedTime
+	kafkaRetryBackoff.MaxInterval = cf.BackoffRetryMaxInterval
+
+	//For initial request, do not wait
+	backoffTimer := time.NewTimer(0)
+
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
 	requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
-
-	// Register the broadcast topic to handle any core-bound broadcast requests
-	if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
-		logger.Fatalw(ctx, "Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
+	for {
+		select {
+		case <-backoffTimer.C:
+			// Register the broadcast topic to handle any core-bound broadcast requests
+			err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: cf.CoreTopic}, requestProxy)
+			if err == nil {
+				logger.Infow(ctx, "request-handler-registered", log.Fields{"topic": cf.CoreTopic})
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+				return
+			}
+			logger.Errorw(ctx, "failed-registering-broadcast-handler-retrying", log.Fields{"topic": cf.CoreTopic})
+			duration := kafkaRetryBackoff.NextBackOff()
+			//This case should never occur(by default) as max elapsed time for backoff is 0(by default) , so it will never return stop
+			if duration == backoff.Stop {
+				// If we reach a maximum then warn and reset the backoff timer and keep attempting.
+				logger.Warnw(ctx, "maximum-kafka-retry-backoff-reached-resetting",
+					log.Fields{"max-kafka-retry-backoff": kafkaRetryBackoff.MaxElapsedTime})
+				kafkaRetryBackoff.Reset()
+				duration = kafkaRetryBackoff.NextBackOff()
+			}
+			backoffTimer = time.NewTimer(duration)
+		case <-ctx.Done():
+			logger.Infow(ctx, "context-closed", log.Fields{"topic": cf.CoreTopic})
+			return
+		}
 	}
-
-	logger.Info(ctx, "request-handler-registered")
 }
