This commit made some modifications in the way messages over
kafka are consumed, mostly around the initial offset to use.

Change-Id: I6104ef710d9c595034cd4cedc0d58ae774cec719
diff --git a/kafka/client.go b/kafka/client.go
index ad8f01a..b93ad86 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -33,7 +33,7 @@
 	DefaultProducerFlushFrequency   = 5
 	DefaultProducerFlushMessages    = 1
 	DefaultProducerFlushMaxmessages = 5
-	DefaultProducerReturnSuccess    = false
+	DefaultProducerReturnSuccess    = true
 	DefaultProducerReturnErrors     = true
 	DefaultProducerRetryMax         = 3
 	DefaultProducerRetryBackoff     = time.Millisecond * 100
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index e2210c4..ff3584f 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -589,7 +589,7 @@
 			// partitions.
 			replyTopic := &Topic{Name: msg.Header.FromTopic}
 			key := GetDeviceIdFromTopic(*replyTopic)
-			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "key": key})
+			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
 			// TODO: handle error response.
 			kp.kafkaClient.Send(icm, replyTopic, key)
 		}
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index f2de01a..8468e42 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -109,13 +109,25 @@
 	}
 }
 
-func ReturnOnErrors(opt bool) SaramaClientOption {
+func ProducerMaxRetries(num int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerRetryMax = num
+	}
+}
+
+func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerRetryBackOff = duration
+	}
+}
+
+func ProducerReturnOnErrors(opt bool) SaramaClientOption {
 	return func(args *SaramaClient) {
 		args.producerReturnErrors = opt
 	}
 }
 
-func ReturnOnSuccess(opt bool) SaramaClientOption {
+func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
 	return func(args *SaramaClient) {
 		args.producerReturnSuccess = opt
 	}
@@ -376,6 +388,16 @@
 
 	// Send message to kafka
 	sc.producer.Input() <- kafkaMsg
+
+	// Wait for result
+	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+	select {
+	case ok := <-sc.producer.Successes():
+		log.Debugw("message-sent", log.Fields{"status":ok})
+	case notOk := <-sc.producer.Errors():
+		log.Debugw("error-sending", log.Fields{"status":notOk})
+		return notOk
+	}
 	return nil
 }
 
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index 978f57d..cf7afd8 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -18,11 +18,11 @@
 This facade handles kafka-formatted messages from the Core, extracts the kafka
 formatting and forwards the request to the concrete handler.
 """
-
+import structlog
 from twisted.internet.defer import inlineCallbacks
 from zope.interface import implementer
 from twisted.internet import reactor
-
+from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
 from python.adapters.interface import IAdapterInterface
 from python.protos.core_adapter_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
 from python.protos.device_pb2 import Device
@@ -31,6 +31,7 @@
 from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
     get_messaging_proxy
 
+log = structlog.get_logger()
 
 class MacAddressError(BaseException):
     def __init__(self, error):
@@ -59,31 +60,34 @@
 
     @inlineCallbacks
     def start(self):
-        self.log.debug('starting')
+        log.debug('starting')
 
     @inlineCallbacks
     def stop(self):
-        self.log.debug('stopping')
+        log.debug('stopping')
 
-
+    @inlineCallbacks
     def createKafkaDeviceTopic(self, deviceId):
+        log.debug("subscribing-to-topic", device_id=deviceId)
         kafka_proxy = get_messaging_proxy()
         device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
-        kafka_proxy.subscribe(topic=device_topic, target_cls=self)
+        # yield kafka_proxy.create_topic(topic=device_topic)
+        yield kafka_proxy.subscribe(topic=device_topic, target_cls=self, offset=OFFSET_EARLIEST)
+        log.debug("subscribed-to-topic", topic=device_topic)
 
     def adopt_device(self, device):
         d = Device()
         if device:
             device.Unpack(d)
 
+            # Start the creation of a device specific topic to handle all
+            # subsequent requests from the Core. This adapter instance will
+            # handle all requests for that device.
+            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+
             result = self.adapter.adopt_device(d)
             # return True, self.adapter.adopt_device(d)
 
-            # Before we return, create a device specific topic to handle all
-            # subsequent requests from the Core. This adapter instance will
-            # handle all requests for that device
-            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
-
             return True, result
         else:
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index aaa0c3c..a3de4f1 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -138,6 +138,11 @@
         except Exception as e:
             log.exception("Exception-when-stopping-messaging-proxy:", e=e)
 
+
+    @inlineCallbacks
+    def create_topic(self, topic):
+        yield self._wait_until_topic_is_ready(self.kafka_client, topic)
+
     @inlineCallbacks
     def _wait_until_topic_is_ready(self, client, topic):
         e = True
@@ -159,8 +164,9 @@
         return self.default_topic
 
     @inlineCallbacks
-    def _subscribe(self, topic, callback=None, target_cls=None):
+    def _subscribe(self, topic, offset, callback=None, target_cls=None):
         try:
+            log.debug("subscribing-to-topic-start", topic=topic)
             yield self._wait_until_topic_is_ready(self.kafka_client, topic)
             partitions = self.kafka_client.topic_partitions[topic]
             consumers = []
@@ -168,6 +174,7 @@
             # First setup the generic callback - all received messages will
             # go through that queue
             if topic not in self.topic_consumer_map:
+                log.debug("topic-not-in-consumer-map", topic=topic)
                 consumers = [Consumer(self.kafka_client, topic, partition,
                                       self._enqueue_received_message)
                              for partition in partitions]
@@ -206,15 +213,18 @@
                 log.warn("Consumers-failed", failure=failure)
 
             for c in consumers:
-                c.start(OFFSET_LATEST).addCallbacks(cb_closed, eb_failed)
+                c.start(offset).addCallbacks(cb_closed, eb_failed)
+
+            log.debug("subscribed-to-topic", topic=topic)
 
             returnValue(True)
         except Exception as e:
             log.exception("Exception-during-subscription", e=e)
             returnValue(False)
 
+    @inlineCallbacks
     def subscribe(self, topic, callback=None, target_cls=None,
-                  max_retry=3):
+                  max_retry=3, offset=OFFSET_LATEST):
         """
         Scenario 1:  invoked to subscribe to a specific topic with a
         target_cls to invoke when a message is received on that topic.  This
@@ -246,14 +256,26 @@
             return asleep(wait_time)
 
         retry = 0
-        while not self._subscribe(topic, callback=callback,
-                                  target_cls=target_cls):
-            if retry > max_retry:
-                return False
+        subscribed = False
+        while not subscribed:
+            subscribed = yield self._subscribe(topic, callback=callback,
+                                               target_cls=target_cls, offset=offset)
+            if subscribed:
+                returnValue(True)
+            elif retry > max_retry:
+                returnValue(False)
             else:
                 _backoff("subscription-not-complete", retry)
                 retry += 1
-        return True
+
+        # while not self._subscribe(topic, callback=callback,
+        #                           target_cls=target_cls):
+        #     if retry > max_retry:
+        #         return False
+        #     else:
+        #         _backoff("subscription-not-complete", retry)
+        #         retry += 1
+        # return True
 
     def unsubscribe(self, topic):
         """
diff --git a/python/adapters/kafka/kafka_proxy.py b/python/adapters/kafka/kafka_proxy.py
index 6dcb10f..d596334 100644
--- a/python/adapters/kafka/kafka_proxy.py
+++ b/python/adapters/kafka/kafka_proxy.py
@@ -136,6 +136,25 @@
             log.exception('failed-get-kafka-producer', e=e)
             return
 
+
+    # @inlineCallbacks
+    # def wait_until_topic_is_ready(self, topic):
+    #     #  Assumes "auto.create.topics.enable" is set in the broker
+    #     #  configuration
+    #     e = True
+    #     while e:
+    #         yield self.kclient.load_metadata_for_topics(topic)
+    #         e = self.kclient.metadata_error_for_topic(topic)
+    #         if e:
+    #             log.debug("Topic-not-ready-retrying...", topic=topic)
+
+
+    @inlineCallbacks
+    def create_topic(self, topic):
+        # Assume auto create is enabled on the broker configuration
+        yield self.wait_until_topic_is_ready(topic)
+
+
     @inlineCallbacks
     def send_message(self, topic, msg):
         assert topic is not None
diff --git a/python/adapters/ponsim_olt/ponsim_olt.py b/python/adapters/ponsim_olt/ponsim_olt.py
index df834e5..95d6590 100644
--- a/python/adapters/ponsim_olt/ponsim_olt.py
+++ b/python/adapters/ponsim_olt/ponsim_olt.py
@@ -304,7 +304,6 @@
             reactor.callInThread(self.rcv_grpc)
             self.log.info('started-frame-grpc-stream')
 
-            # TODO
             # Start collecting stats from the device after a brief pause
             self.start_kpi_collection(device.id)
         except Exception as e:
@@ -359,6 +358,7 @@
     def reconcile(self, device):
         self.log.info('reconciling-OLT-device')
 
+    @inlineCallbacks
     def _rcv_frame(self, frame):
         pkt = Ether(frame)
 
@@ -373,7 +373,7 @@
                         inner_shim.payload
                 )
                 self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
-                self.core_proxy.send_packet_in(device_id=self.device_id,
+                yield self.core_proxy.send_packet_in(device_id=self.device_id,
                                                port=cvid,
                                                packet=str(popped_frame))
             elif pkt.haslayer(Raw):
@@ -398,7 +398,7 @@
             for frame in self.frames:
                 self.log.info('received-grpc-frame',
                               frame_len=len(frame.payload))
-                self._rcv_frame(frame.payload)
+                yield self._rcv_frame(frame.payload)
 
         except _Rendezvous, e:
             log.warn('grpc-connection-lost', message=e.message)
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 4af4fb0..3ce59a7 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -77,16 +77,20 @@
 	}
 	// Use a device topic for the response as we are the only core handling requests for this device
 	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+		return err
+	}
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
 	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
-	if success {
-		// From now on, any unsolicited requests from the adapters for this device will come over the device topic.
-		// We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
-		if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
-			log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
-			return err
-		}
-	}
+	//if success {
+	//	// From now on, any unsolicited requests from the adapters for this device will come over the device topic.
+	//	// We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
+	//	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+	//		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+	//		return err
+	//	}
+	//}
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
diff --git a/rw_core/main.go b/rw_core/main.go
index 77ce304..472072a 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -69,7 +69,11 @@
 	case "sarama":
 		return kafka.NewSaramaClient(
 			kafka.Host(host),
-			kafka.Port(port)), nil
+			kafka.Port(port),
+			kafka.ProducerReturnOnErrors(true),
+			kafka.ProducerReturnOnSuccess(true),
+			kafka.ProducerMaxRetries(6),
+			kafka.ProducerRetryBackoff(time.Millisecond * 30)), nil
 	}
 	return nil, errors.New("unsupported-client-type")
 }
@@ -109,7 +113,7 @@
 func (rw *rwCore) start(ctx context.Context) {
 	log.Info("Starting RW Core components")
 
-	// Setup KV MsgClient
+	// Setup KV Client
 	log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
 	err := rw.setKVClient()
 	if err == nil {
@@ -121,6 +125,7 @@
 			rw.config.KVTxnKeyDelTime)
 	}
 
+	// Setup Kafka Client
 	if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort); err != nil {
 		log.Fatal("Unsupported-kafka-client")
 	}
@@ -214,7 +219,7 @@
 	}
 
 	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
-	log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.WarnLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
 
 	defer log.CleanUp()