This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.

Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 4af4fb0..3ce59a7 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -77,16 +77,20 @@
 	}
 	// Use a device topic for the response as we are the only core handling requests for this device
 	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+		return err
+	}
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
 	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
-	if success {
-		// From now on, any unsolicited requests from the adapters for this device will come over the device topic.
-		// We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
-		if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
-			log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
-			return err
-		}
-	}
+	//if success {
+	//	// From now on, any unsolicited requests from the adapters for this device will come over the device topic.
+	//	// We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
+	//	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+	//		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+	//		return err
+	//	}
+	//}
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
diff --git a/rw_core/main.go b/rw_core/main.go
index 77ce304..472072a 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -69,7 +69,11 @@
 	case "sarama":
 		return kafka.NewSaramaClient(
 			kafka.Host(host),
-			kafka.Port(port)), nil
+			kafka.Port(port),
+			kafka.ProducerReturnOnErrors(true),
+			kafka.ProducerReturnOnSuccess(true),
+			kafka.ProducerMaxRetries(6),
+			kafka.ProducerRetryBackoff(time.Millisecond * 30)), nil
 	}
 	return nil, errors.New("unsupported-client-type")
 }
@@ -109,7 +113,7 @@
 func (rw *rwCore) start(ctx context.Context) {
 	log.Info("Starting RW Core components")
 
-	// Setup KV MsgClient
+	// Setup KV Client
 	log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
 	err := rw.setKVClient()
 	if err == nil {
@@ -121,6 +125,7 @@
 			rw.config.KVTxnKeyDelTime)
 	}
 
+	// Setup Kafka Client
 	if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort); err != nil {
 		log.Fatal("Unsupported-kafka-client")
 	}
@@ -214,7 +219,7 @@
 	}
 
 	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
-	log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.WarnLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
 
 	defer log.CleanUp()