Publish indications on kafka

Change-Id: I4214e1de042109ff3ecf6871fe2d2d14a3476633
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 154f614..d750416 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -17,19 +17,14 @@
 import grpc
 import structlog
 import time
-import threading
-from twisted.internet import reactor
 from scapy.layers.l2 import Ether, Dot1Q
 from transitions import Machine
-from google.protobuf.message import Message
-from simplejson import loads
 
 from voltha.protos.device_pb2 import Port
 from voltha.adapters.openolt.protos import openolt_pb2
 from voltha.adapters.openolt.openolt_utils import OpenoltUtils
-from voltha.extensions.alarms.onu.onu_discovery_alarm import OnuDiscoveryAlarm
 from voltha.adapters.openolt.openolt_grpc import OpenoltGrpc
-from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
+from voltha.adapters.openolt.openolt_indications import OpenoltIndications
 
 
 class OpenoltDevice(object):
@@ -113,14 +108,13 @@
 
     def do_state_init(self, event):
         self.log.debug('init')
-        self.indications_thread_handle = threading.Thread(
-            target=self.indications_thread)
-        self.indications_thread_handle.setDaemon(True)
-        self.indications_thread_handle.start()
+        self._indications = OpenoltIndications(self)
+        self._indications.start()
 
     def post_init(self, event):
         self.log.debug('post_init')
         # Initialize gRPC
+        time.sleep(10)
         self._grpc = OpenoltGrpc(self.host_and_port, self)
 
         self.log.info('openolt-device-created')
@@ -162,21 +156,6 @@
         self.log.debug('post_down')
         self.flow_mgr.reset_flows()
 
-    def indications_thread(self):
-        self.log.debug('openolt indications thread starting')
-        self.kafka_consumer = KConsumer(
-            "openolt.ind.alarm",
-            "openolt.ind.pkt",
-            "openolt.ind.olt")
-        self.log.debug('openolt indications thread processing')
-        # block reading kafka
-        self.kafka_consumer.read(self.indications_process)
-        self.log.debug('openolt indications thread stopped')
-
-    def indications_process(self, msg):
-        ind = loads(msg)
-        self.log.debug("openolt indication", ind=ind)
-
     def olt_indication(self, olt_indication):
         if olt_indication.oper_state == "up":
             self.go_state_up()
@@ -209,15 +188,6 @@
         self.log.debug("onu discovery indication", intf_id=intf_id,
                        serial_number=serial_number_str)
 
-        # Post ONU Discover alarm  20180809_0805
-        try:
-            OnuDiscoveryAlarm(self.alarm_mgr.alarms, pon_id=intf_id,
-                              serial_number=serial_number_str).raise_alarm()
-        except Exception as disc_alarm_error:
-            self.log.exception("onu-discovery-alarm-error",
-                               errmsg=disc_alarm_error.message)
-            # continue for now.
-
         try:
             onu_id = self.data_model.onu_id(serial_number=serial_number_str)
         except ValueError:
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
index 93addd7..10055b7 100644
--- a/voltha/adapters/openolt/openolt_grpc.py
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -17,10 +17,10 @@
 import structlog
 import grpc
 import threading
+import time
 from simplejson import dumps
 from twisted.internet import reactor
 from google.protobuf.json_format import MessageToJson
-from google.protobuf.message import Message
 from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
 from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
 
@@ -61,8 +61,9 @@
                     self.log.debug('kafka-proxy-available')
                     kafka_proxy.send_message(
                         topic,
-                        dumps(MessageToJson(msg,
-                                            preserving_proto_field_name=True)))
+                        dumps(MessageToJson(
+                            msg,
+                            including_default_value_fields=True)))
                 else:
                     self.log.error('kafka-proxy-unavailable')
             except Exception, e:
@@ -128,27 +129,20 @@
                 # indication handlers run in the main event loop
                 if ind.HasField('olt_ind'):
                     forward_indication("openolt.ind.olt", ind.olt_ind)
-                    reactor.callFromThread(
-                        self.device.olt_indication, ind.olt_ind)
                 elif ind.HasField('intf_ind'):
-                    reactor.callFromThread(
-                        self.device.intf_indication, ind.intf_ind)
+                    forward_indication("openolt.ind.intf", ind.intf_ind)
                 elif ind.HasField('intf_oper_ind'):
-                    reactor.callFromThread(
-                        self.device.intf_oper_indication, ind.intf_oper_ind)
+                    forward_indication("openolt.ind.intfoper",
+                                       ind.intf_oper_ind)
                 elif ind.HasField('onu_disc_ind'):
-                    reactor.callFromThread(
-                        self.device.onu_discovery_indication, ind.onu_disc_ind)
+                    forward_indication("openolt.ind.onudisc", ind.onu_disc_ind)
                 elif ind.HasField('onu_ind'):
-                    reactor.callFromThread(
-                        self.device.onu_indication, ind.onu_ind)
+                    forward_indication("openolt.ind.onu", ind.onu_ind)
                 elif ind.HasField('omci_ind'):
                     reactor.callFromThread(
                         self.device.omci_indication, ind.omci_ind)
                 elif ind.HasField('pkt_ind'):
                     forward_indication("openolt.ind.pkt", ind.pkt_ind)
-                    reactor.callFromThread(
-                        self.device.packet_indication, ind.pkt_ind)
                 elif ind.HasField('port_stats'):
                     reactor.callFromThread(
                         self.device.stats_mgr.port_statistics_indication,
diff --git a/voltha/adapters/openolt/openolt_indications.py b/voltha/adapters/openolt/openolt_indications.py
new file mode 100644
index 0000000..8a51304
--- /dev/null
+++ b/voltha/adapters/openolt/openolt_indications.py
@@ -0,0 +1,78 @@
+#
+# Copyright 2019 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import threading
+from google.protobuf.json_format import Parse
+from simplejson import loads
+from twisted.internet import reactor
+import structlog
+
+from voltha.adapters.openolt.protos import openolt_pb2
+from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
+
+
+class OpenoltIndications(object):
+    def __init__(self, device):
+        self.log = structlog.get_logger()
+        self.device = device
+        self.indications_thread_handle = threading.Thread(
+            target=self.indications_thread)
+        self.indications_thread_handle.setDaemon(True)
+
+    def start(self):
+        self.indications_thread_handle.start()
+
+    def stop(self):
+        pass
+
+    def indications_thread(self):
+        self.log.debug('openolt indications thread starting')
+        KConsumer(self.indications_process,
+                  "openolt.ind.olt",
+                  "openolt.ind.intf",
+                  'openolt.ind.intfoper',
+                  'openolt.ind.onudisc',
+                  'openolt.ind.onu',
+                  "openolt.ind.pkt")
+
+    def indications_process(self, topic, msg):
+        self.log.debug("received openolt indication", topic=topic, msg=msg)
+
+        if topic == "openolt.ind.olt":
+            pb = Parse(loads(msg), openolt_pb2.OltIndication(),
+                       ignore_unknown_fields=True)
+            reactor.callFromThread(self.device.olt_indication, pb)
+        if topic == "openolt.ind.intf":
+            pb = Parse(loads(msg), openolt_pb2.IntfIndication(),
+                       ignore_unknown_fields=True)
+            reactor.callFromThread(self.device.intf_indication, pb)
+        if topic == "openolt.ind.intfoper":
+            pb = Parse(loads(msg), openolt_pb2.IntfOperIndication(),
+                       ignore_unknown_fields=True)
+            reactor.callFromThread(self.device.intf_oper_indication, pb)
+        if topic == "openolt.ind.onudisc":
+            pb = Parse(loads(msg), openolt_pb2.OnuDiscIndication(),
+                       ignore_unknown_fields=True)
+            reactor.callFromThread(
+                self.device.onu_discovery_indication, pb)
+        if topic == "openolt.ind.onu":
+            pb = Parse(loads(msg), openolt_pb2.OnuIndication(),
+                       ignore_unknown_fields=True)
+            reactor.callFromThread(self.device.onu_indication, pb)
+        elif topic == "openolt.ind.pkt":
+            pb = Parse(loads(msg), openolt_pb2.PacketIndication(),
+                       ignore_unknown_fields=True)
+            reactor.callFromThread(self.device.packet_indication, pb)
diff --git a/voltha/adapters/openolt/openolt_kafka_consumer.py b/voltha/adapters/openolt/openolt_kafka_consumer.py
index adbe1d5..6d6e1f1 100644
--- a/voltha/adapters/openolt/openolt_kafka_consumer.py
+++ b/voltha/adapters/openolt/openolt_kafka_consumer.py
@@ -26,7 +26,7 @@
 
 
 class KConsumer(object):
-    def __init__(self, *topics):
+    def __init__(self, callback, *topics):
         kafka_proxy = get_kafka_proxy()
         if kafka_proxy and not kafka_proxy.is_faulty():
             self.kafka_endpoint = kafka_proxy.kafka_endpoint
@@ -55,7 +55,6 @@
         self.topics = list(topics)
         self._c.subscribe(self.topics)
 
-    def read(self, callback):
         # Read messages from Kafka and hand it to to callback
         try:
             while True:
@@ -65,7 +64,7 @@
                     continue
                 elif not msg.error():
                     log.debug('got a kafka message', topic=msg.topic())
-                    callback(msg.value())
+                    callback(msg.topic(), msg.value())
                 elif msg.error().code() == KafkaError._PARTITION_EOF:
                     pass
                 else: