Use one topic for all openolt indications

Change-Id: I4009e0980b9886af93be343ced78da61a6f03829
diff --git a/voltha/adapters/openolt/openolt_indications.py b/voltha/adapters/openolt/openolt_indications.py
index 8a51304..c5401d3 100644
--- a/voltha/adapters/openolt/openolt_indications.py
+++ b/voltha/adapters/openolt/openolt_indications.py
@@ -40,39 +40,42 @@
 
     def indications_thread(self):
         self.log.debug('openolt indications thread starting')
-        KConsumer(self.indications_process,
-                  "openolt.ind.olt",
-                  "openolt.ind.intf",
-                  'openolt.ind.intfoper',
-                  'openolt.ind.onudisc',
-                  'openolt.ind.onu',
-                  "openolt.ind.pkt")
+        KConsumer(self.indications_process, "openolt.ind")
 
     def indications_process(self, topic, msg):
-        self.log.debug("received openolt indication", topic=topic, msg=msg)
 
-        if topic == "openolt.ind.olt":
-            pb = Parse(loads(msg), openolt_pb2.OltIndication(),
-                       ignore_unknown_fields=True)
-            reactor.callFromThread(self.device.olt_indication, pb)
-        if topic == "openolt.ind.intf":
-            pb = Parse(loads(msg), openolt_pb2.IntfIndication(),
-                       ignore_unknown_fields=True)
-            reactor.callFromThread(self.device.intf_indication, pb)
-        if topic == "openolt.ind.intfoper":
-            pb = Parse(loads(msg), openolt_pb2.IntfOperIndication(),
-                       ignore_unknown_fields=True)
-            reactor.callFromThread(self.device.intf_oper_indication, pb)
-        if topic == "openolt.ind.onudisc":
-            pb = Parse(loads(msg), openolt_pb2.OnuDiscIndication(),
-                       ignore_unknown_fields=True)
+        ind = Parse(loads(msg), openolt_pb2.Indication(),
+                    ignore_unknown_fields=True)
+
+        self.log.debug("received openolt indication", ind=ind)
+
+        # indication handlers run in the main event loop
+        if ind.HasField('olt_ind'):
+            reactor.callFromThread(self.device.olt_indication, ind.olt_ind)
+        elif ind.HasField('intf_ind'):
+            reactor.callFromThread(self.device.intf_indication, ind.intf_ind)
+        elif ind.HasField('intf_oper_ind'):
+            reactor.callFromThread(self.device.intf_oper_indication,
+                                   ind.intf_oper_ind)
+        elif ind.HasField('onu_disc_ind'):
+            reactor.callFromThread(self.device.onu_discovery_indication,
+                                   ind.onu_disc_ind)
+        elif ind.HasField('onu_ind'):
+            reactor.callFromThread(self.device.onu_indication, ind.onu_ind)
+        elif ind.HasField('omci_ind'):
+            reactor.callFromThread(self.device.omci_indication, ind.omci_ind)
+        elif ind.HasField('pkt_ind'):
+            reactor.callFromThread(self.device.packet_indication, ind.pkt_ind)
+        elif ind.HasField('port_stats'):
             reactor.callFromThread(
-                self.device.onu_discovery_indication, pb)
-        if topic == "openolt.ind.onu":
-            pb = Parse(loads(msg), openolt_pb2.OnuIndication(),
-                       ignore_unknown_fields=True)
-            reactor.callFromThread(self.device.onu_indication, pb)
-        elif topic == "openolt.ind.pkt":
-            pb = Parse(loads(msg), openolt_pb2.PacketIndication(),
-                       ignore_unknown_fields=True)
-            reactor.callFromThread(self.device.packet_indication, pb)
+                self.device.stats_mgr.port_statistics_indication,
+                ind.port_stats)
+        elif ind.HasField('flow_stats'):
+            reactor.callFromThread(
+                self.device.stats_mgr.flow_statistics_indication,
+                ind.flow_stats)
+        elif ind.HasField('alarm_ind'):
+            reactor.callFromThread(
+                self.device.alarm_mgr.process_alarms, ind.alarm_ind)
+        else:
+            self.log.warn('unknown indication type')