[3393] Bug fix for kafka service restart

Change-Id: I137cb3285ce59ef823e131f8b2d49dfe2e7aaf3e
diff --git a/voltha/main.py b/voltha/main.py
index 5f282cc..e889864 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -397,15 +397,23 @@
         topic = "voltha.heartbeat"
 
         def send_msg():
-            message['ts'] = arrow.utcnow().timestamp
-            kafka_proxy.send_message(topic, dumps(message))
+            try:
+                kafka_proxy = get_kafka_proxy()
+                if kafka_proxy and not kafka_proxy.is_faulty():
+                    self.log.debug('kafka-proxy-available')
+                    message['ts'] = arrow.utcnow().timestamp
+                    self.log.debug('start-kafka-heartbeat')
+                    kafka_proxy.send_message(topic, dumps(message))
+                else:
+                    self.log.error('kafka-proxy-unavailable')
+            except Exception, e:
+                self.log.exception('failed-sending-message-heartbeat', e=e)
 
-        kafka_proxy = get_kafka_proxy()
-        if kafka_proxy:
+        try:
             lc = LoopingCall(send_msg)
             lc.start(10)
-        else:
-            self.log.error('Kafka proxy has not been created!')
+        except Exception, e:
+            self.log.exception('failed-kafka-heartbeat', e=e)
 
 
 if __name__ == '__main__':
diff --git a/voltha/northbound/kafka/event_bus_publisher.py b/voltha/northbound/kafka/event_bus_publisher.py
index 921a1de..8020bf5 100644
--- a/voltha/northbound/kafka/event_bus_publisher.py
+++ b/voltha/northbound/kafka/event_bus_publisher.py
@@ -37,16 +37,25 @@
         self.config = config
         self.topic_mappings = config.get('topic_mappings', {})
         self.event_bus = EventBusClient()
+        self.subscriptions = None
 
     def start(self):
         log.debug('starting')
+        self.subscriptions = list()
         self._setup_subscriptions(self.topic_mappings)
         log.info('started')
         return self
 
     def stop(self):
-        log.debug('stopping')
-        log.info('stopped')
+        try:
+            log.debug('stopping-event-bus')
+            if self.subscriptions:
+                for subscription in self.subscriptions:
+                    self.event_bus.unsubscribe(subscription)
+            log.info('stopped-event-bus')
+        except Exception, e:
+            log.exception('failed-stopping-event-bus', e=e)
+            return
 
     def _setup_subscriptions(self, mappings):
 
@@ -60,20 +69,22 @@
                           mapping=mapping)
                 continue
 
-            self.event_bus.subscribe(
+            self.subscriptions.append(self.event_bus.subscribe(
                 event_bus_topic,
                 # to avoid Python late-binding to the last registered
                 # kafka_topic, we force instant binding with the default arg
-                lambda _, m, k=kafka_topic: self.forward(k, m))
+                lambda _, m, k=kafka_topic: self.forward(k, m)))
 
             log.info('event-to-kafka', kafka_topic=kafka_topic,
                      event_bus_topic=event_bus_topic)
 
     def forward(self, kafka_topic, msg):
-
-        # convert to JSON string if msg is a protobuf msg
-        if isinstance(msg, Message):
-            msg = dumps(MessageToDict(msg, True, True))
-
-        self.kafka_proxy.send_message(kafka_topic, msg)
+        try:
+            # convert to JSON string if msg is a protobuf msg
+            if isinstance(msg, Message):
+                msg = dumps(MessageToDict(msg, True, True))
+            log.debug('forward-event-bus-publisher')
+            self.kafka_proxy.send_message(kafka_topic, msg)
+        except Exception, e:
+            log.exception('failed-forward-event-bus-publisher', e=e)
 
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 4ee3e66..ad1aad5 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -57,6 +57,8 @@
         self.kclient = None
         self.kproducer = None
         self.event_bus_publisher = None
+        self.stopping = False
+        self.faulty = False
         log.debug('initialized', endpoint=kafka_endpoint)
 
     @inlineCallbacks
@@ -67,37 +69,78 @@
         self.event_bus_publisher = yield EventBusPublisher(
             self, self.config.get('event_bus_publisher', {})).start()
         log.info('started')
+        KafkaProxy.faulty = False
+        self.stopping = False
         returnValue(self)
 
+    @inlineCallbacks
     def stop(self):
-        log.debug('stopping')
-        pass
-        log.info('stopped')
+        try:
+            log.debug('stopping-kafka-proxy')
+            try:
+                if self.kclient:
+                    yield self.kclient.close()
+                    self.kclient = None
+                    log.debug('stopped-kclient-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-kclient-kafka-proxy', e=e)
+                pass
+
+            try:
+                if self.kproducer:
+                    yield self.kproducer.stop()
+                    self.kproducer = None
+                    log.debug('stopped-kproducer-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
+                pass
+
+            #try:
+            #    if self.event_bus_publisher:
+            #        yield self.event_bus_publisher.stop()
+            #        self.event_bus_publisher = None
+            #        log.debug('stopped-event-bus-publisher-kafka-proxy')
+            #except Exception, e:
+            #    log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
+            #    pass
+
+            log.debug('stopped-kafka-proxy')
+
+        except Exception, e:
+            self.kclient = None
+            self.kproducer = None
+            #self.event_bus_publisher = None
+            log.exception('failed-stopped-kafka-proxy', e=e)
+            pass
 
     def _get_kafka_producer(self):
         # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
         #  to a local log before sending response
+        try:
 
-        if self.kafka_endpoint.startswith('@'):
-            try:
-                _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
-                                                       self.kafka_endpoint[1:])
-                log.debug('found-kafka-service', endpoint=_k_endpoint)
+            if self.kafka_endpoint.startswith('@'):
+                try:
+                    _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+                                                           self.kafka_endpoint[1:])
+                    log.debug('found-kafka-service', endpoint=_k_endpoint)
 
-            except Exception as e:
-                log.exception('no-kafka-service-in-consul', e=e)
+                except Exception as e:
+                    log.exception('no-kafka-service-in-consul', e=e)
 
-                self.kproducer = None
-                self.kclient = None
-                return
-        else:
-            _k_endpoint = self.kafka_endpoint
+                    self.kproducer = None
+                    self.kclient = None
+                    return
+            else:
+                _k_endpoint = self.kafka_endpoint
 
-        self.kclient = _KafkaClient(_k_endpoint)
-        self.kproducer = _kafkaProducer(self.kclient,
-                                        req_acks=PRODUCER_ACK_LOCAL_WRITE,
-                                        ack_timeout=self.ack_timeout,
-                                        max_req_attempts=self.max_req_attempts)
+            self.kclient = _KafkaClient(_k_endpoint)
+            self.kproducer = _kafkaProducer(self.kclient,
+                                            req_acks=PRODUCER_ACK_LOCAL_WRITE,
+                                            ack_timeout=self.ack_timeout,
+                                            max_req_attempts=self.max_req_attempts)
+        except Exception, e:
+            log.exception('failed-get-kafka-producer', e=e)
+            return
 
     @inlineCallbacks
     def send_message(self, topic, msg):
@@ -108,26 +151,50 @@
         # then try to get one - this happens only when we try to lookup the
         # kafka service from consul
         try:
-            if self.kproducer is None:
-                self._get_kafka_producer()
-                # Lets the next message request do the retry if still a failure
+            if self.faulty is False:
+
                 if self.kproducer is None:
-                    log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+                    self._get_kafka_producer()
+                    # Lets the next message request do the retry if still a failure
+                    if self.kproducer is None:
+                        log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+                        return
+
+                log.debug('sending-kafka-msg', topic=topic, msg=msg)
+                msgs = [msg]
+
+                if self.kproducer and self.kclient and \
+                        self.event_bus_publisher and self.faulty is False:
+                    yield self.kproducer.send_messages(topic, msgs=msgs)
+                    log.debug('sent-kafka-msg', topic=topic, msg=msg)
+                else:
                     return
 
-            log.debug('sending-kafka-msg', topic=topic, msg=msg)
-            msgs = [msg]
-            yield self.kproducer.send_messages(topic, msgs=msgs)
-            log.debug('sent-kafka-msg', topic=topic, msg=msg)
-
         except Exception, e:
+            self.faulty = True
             log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
 
             # set the kafka producer to None.  This is needed if the
             # kafka docker went down and comes back up with a different
             # port number.
-            self.kproducer = None
-            self.kclient = None
+            if self.stopping is False:
+                log.debug('stopping-kafka-proxy')
+                try:
+                    self.stopping = True
+                    self.stop()
+                    self.stopping = False
+                    self.faulty = False
+                    log.debug('stopped-kafka-proxy')
+                except Exception, e:
+                    log.exception('failed-stopping-kafka-proxy', e=e)
+                    pass
+            else:
+                log.info('already-stopping-kafka-proxy')
+
+            return
+
+    def is_faulty(self):
+        return self.faulty
 
 
 # Common method to get the singleton instance of the kafka proxy class