Initial commit to move grpc into openolt_grpc

Change-Id: Ic2e02a927c6425eb6051a4dc9a90afee4c8ebfcc
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 99e7612..49a6645 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -13,23 +13,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import threading
 import binascii
 import grpc
 import structlog
 import time
+import threading
 from twisted.internet import reactor
 from scapy.layers.l2 import Ether, Dot1Q
 from transitions import Machine
-from simplejson import dumps
 from google.protobuf.message import Message
-from google.protobuf.json_format import MessageToDict
+from simplejson import loads
 
 from voltha.protos.device_pb2 import Port
-from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
+from voltha.adapters.openolt.protos import openolt_pb2
 from voltha.adapters.openolt.openolt_utils import OpenoltUtils
 from voltha.extensions.alarms.onu.onu_discovery_alarm import OnuDiscoveryAlarm
-from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+from voltha.adapters.openolt.openolt_grpc import OpenoltGrpc
+from voltha.adapters.openolt.openolt_kafka_consumer import KConsumer
 
 
 class OpenoltDevice(object):
@@ -108,33 +108,22 @@
 
         self.device_info = None
 
+        self._grpc = None
         self.go_state_init()
 
     def do_state_init(self, event):
-        # Initialize gRPC
-        self.channel = grpc.insecure_channel(self.host_and_port)
-        self.channel_ready_future = grpc.channel_ready_future(self.channel)
-
-        self.log.info('openolt-device-created')
+        self.log.debug('init')
+        self.indications_thread_handle = threading.Thread(
+            target=self.indications_thread)
+        self.indications_thread_handle.setDaemon(True)
+        self.indications_thread_handle.start()
 
     def post_init(self, event):
         self.log.debug('post_init')
+        # Initialize gRPC
+        self._grpc = OpenoltGrpc(self.host_and_port, self)
 
-        # We have reached init state, starting the indications thread
-
-        # Catch RuntimeError exception
-        try:
-            # Start indications thread
-            self.indications_thread_handle = threading.Thread(
-                target=self.indications_thread)
-            # Old getter/setter API for daemon; use it directly as a
-            # property instead. The Jinkins error will happon on the reason of
-            # Exception in thread Thread-1 (most likely raised # during
-            # interpreter shutdown)
-            self.indications_thread_handle.setDaemon(True)
-            self.indications_thread_handle.start()
-        except Exception as e:
-            self.log.exception('post_init failed', e=e)
+        self.log.info('openolt-device-created')
 
     def do_state_connected(self, event):
         self.log.debug("do_state_connected")
@@ -150,7 +139,7 @@
                                                     self.host_and_port,
                                                     self.extra_args,
                                                     self.device_info)
-        self.flow_mgr = self.flow_mgr_class(self.log, self.stub,
+        self.flow_mgr = self.flow_mgr_class(self.log, self._grpc.stub,
                                             self.device_id,
                                             self.data_model.logical_device_id,
                                             self.platform, self.resource_mgr,
@@ -174,108 +163,18 @@
         self.flow_mgr.reset_flows()
 
     def indications_thread(self):
+        self.log.debug('openolt indications thread starting')
+        self.kafka_consumer = KConsumer(
+            "openolt.ind.alarm",
+            "openolt.ind.pkt",
+            "openolt.ind.olt")
+        self.log.debug('openolt indications thread processing')
+        self.kafka_consumer.read(self.indications_process)
+        self.log.debug('alarm indications thread exited')
 
-        def forward_indication(topic, msg):
-            try:
-                kafka_proxy = get_kafka_proxy()
-                if kafka_proxy and not kafka_proxy.is_faulty():
-                    self.log.debug('kafka-proxy-available')
-                    # convert to JSON string if msg is a protobuf msg
-                    if isinstance(msg, Message):
-                        msg = dumps(MessageToDict(msg, True, True))
-                    kafka_proxy.send_message(topic, dumps(msg))
-                else:
-                    self.log.error('kafka-proxy-unavailable')
-            except Exception, e:
-                self.log.exception('failed-sending-message', e=e)
-
-        self.log.debug('starting-indications-thread')
-        self.log.debug('connecting to olt')
-
-        self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
-
-        timeout = 60*60
-        delay = 1
-        exponential_back_off = False
-        while True:
-            try:
-                self.device_info = self.stub.GetDeviceInfo(openolt_pb2.Empty())
-                break
-            except Exception as e:
-                if delay > timeout:
-                    self.log.error("timed out connecting to olt")
-                    return
-                else:
-                    self.log.warn("retry connecting to olt in %ds: %s"
-                                  % (delay, repr(e)))
-                    time.sleep(delay)
-                    if exponential_back_off:
-                        delay += delay
-                    else:
-                        delay += 1
-
-        self.log.info('connected to olt', device_info=self.device_info)
-
-        self.go_state_connected()
-
-        self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
-
-        while True:
-            try:
-                # get the next indication from olt
-                ind = next(self.indications)
-            except Exception as e:
-                self.log.warn('gRPC connection lost', error=e)
-                reactor.callFromThread(self.go_state_down)
-                reactor.callFromThread(self.go_state_init)
-                break
-            else:
-                self.log.debug("rx indication", indication=ind)
-
-                if self.admin_state is "down":
-                    if ind.HasField('intf_oper_ind') \
-                            and (ind.intf_oper_ind.type == "nni"):
-                        self.log.warn('olt is admin down, allow nni ind',
-                                      admin_state=self.admin_state,
-                                      indications=ind)
-                    else:
-                        self.log.warn('olt is admin down, ignore indication',
-                                      admin_state=self.admin_state,
-                                      indications=ind)
-                        continue
-
-
-                # indication handlers run in the main event loop
-                if ind.HasField('olt_ind'):
-                    reactor.callFromThread(self.olt_indication, ind.olt_ind)
-                elif ind.HasField('intf_ind'):
-                    reactor.callFromThread(self.intf_indication, ind.intf_ind)
-                elif ind.HasField('intf_oper_ind'):
-                    reactor.callFromThread(self.intf_oper_indication,
-                                           ind.intf_oper_ind)
-                elif ind.HasField('onu_disc_ind'):
-                    reactor.callFromThread(self.onu_discovery_indication,
-                                           ind.onu_disc_ind)
-                elif ind.HasField('onu_ind'):
-                    reactor.callFromThread(self.onu_indication, ind.onu_ind)
-                elif ind.HasField('omci_ind'):
-                    reactor.callFromThread(self.omci_indication, ind.omci_ind)
-                elif ind.HasField('pkt_ind'):
-                    reactor.callFromThread(self.packet_indication, ind.pkt_ind)
-                elif ind.HasField('port_stats'):
-                    reactor.callFromThread(
-                        self.stats_mgr.port_statistics_indication,
-                        ind.port_stats)
-                elif ind.HasField('flow_stats'):
-                    reactor.callFromThread(
-                        self.stats_mgr.flow_statistics_indication,
-                        ind.flow_stats)
-                elif ind.HasField('alarm_ind'):
-                    forward_indication("openolt.ind.alarm", ind)
-                    reactor.callFromThread(self.alarm_mgr.process_alarms,
-                                           ind.alarm_ind)
-                else:
-                    self.log.warn('unknown indication type')
+    def indications_process(self, msg):
+        ind = loads(msg)
+        self.log.debug("openolt indication", ind=ind)
 
     def olt_indication(self, olt_indication):
         if olt_indication.oper_state == "up":
@@ -420,7 +319,7 @@
                 port_no=egress_port,
                 pkt=send_pkt)
 
-            self.stub.OnuPacketOut(onu_pkt)
+            self._grpc.stub.OnuPacketOut(onu_pkt)
 
         elif egress_port_type == Port.ETHERNET_NNI:
             self.log.debug('sending-packet-to-uplink', egress_port=egress_port,
@@ -432,7 +331,7 @@
                 intf_id=self.platform.intf_id_from_nni_port_num(egress_port),
                 pkt=send_pkt)
 
-            self.stub.UplinkPacketOut(uplink_pkt)
+            self._grpc.stub.UplinkPacketOut(uplink_pkt)
 
         else:
             self.log.warn('Packet-out-to-this-interface-type-not-implemented',
@@ -442,7 +341,7 @@
     def send_proxied_message(self, proxy_address, msg):
         omci = openolt_pb2.OmciMsg(intf_id=proxy_address.channel_id,
                                    onu_id=proxy_address.onu_id, pkt=str(msg))
-        self.stub.OmciMsgOut(omci)
+        self._grpc.stub.OmciMsgOut(omci)
 
     def update_flow_table(self, flows):
         self.log.debug('No updates here now, all is done in logical flows '
@@ -464,7 +363,7 @@
 
         try:
             # Send grpc call
-            self.stub.DisableOlt(openolt_pb2.Empty())
+            self._grpc.stub.DisableOlt(openolt_pb2.Empty())
             self.admin_state = "down"
             self.log.info('openolt device disabled')
         except Exception as e:
@@ -491,7 +390,7 @@
         self.log.debug('reenabling-olt')
 
         try:
-            self.stub.ReenableOlt(openolt_pb2.Empty())
+            self._grpc.stub.ReenableOlt(openolt_pb2.Empty())
         except Exception as e:
             self.log.error('Failure to reenable openolt device', error=e)
         else:
@@ -506,7 +405,7 @@
         onu = openolt_pb2.Onu(intf_id=intf_id, onu_id=onu_id,
                               serial_number=serial_number)
         try:
-            self.stub.ActivateOnu(onu)
+            self._grpc.stub.ActivateOnu(onu)
         except grpc.RpcError as grpc_e:
             if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
                 self.log.info('onu activation in progress',
@@ -555,14 +454,14 @@
                 intf_id=child_device.proxy_address.channel_id,
                 onu_id=child_device.proxy_address.onu_id,
                 serial_number=serial_number)
-            self.stub.DeleteOnu(onu)
+            self._grpc.stub.DeleteOnu(onu)
         except Exception as e:
             self.log.exception("error-deleting-the-onu-on-olt-device", error=e)
 
     def reboot(self):
         self.log.debug('rebooting openolt device')
         try:
-            self.stub.Reboot(openolt_pb2.Empty())
+            self._grpc.stub.Reboot(openolt_pb2.Empty())
         except Exception as e:
             self.log.error('something went wrong with the reboot', error=e)
         else:
@@ -570,7 +469,7 @@
 
     def trigger_statistics_collection(self):
         try:
-            self.stub.CollectStatistics(openolt_pb2.Empty())
+            self._grpc.stub.CollectStatistics(openolt_pb2.Empty())
         except Exception as e:
             self.log.error('Error while triggering statistics collection',
                            error=e)
diff --git a/voltha/adapters/openolt/openolt_grpc.py b/voltha/adapters/openolt/openolt_grpc.py
new file mode 100644
index 0000000..436503b
--- /dev/null
+++ b/voltha/adapters/openolt/openolt_grpc.py
@@ -0,0 +1,169 @@
+#
+# Copyright 2019 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import structlog
+import grpc
+import threading
+from simplejson import dumps
+from twisted.internet import reactor
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from voltha.northbound.kafka.kafka_proxy import get_kafka_proxy
+from voltha.adapters.openolt.protos import openolt_pb2_grpc, openolt_pb2
+
+
+class OpenoltGrpc(object):
+    def __init__(self, host_and_port, device):
+        super(OpenoltGrpc, self).__init__()
+        self.log = structlog.get_logger()
+        self.log.debug('openolt grpc init')
+        self.device = device
+        self.host_and_port = host_and_port
+        self.channel = grpc.insecure_channel(self.host_and_port)
+        self.channel_ready_future = grpc.channel_ready_future(self.channel)
+
+        try:
+            # Start indications thread
+            self.log.debug('openolt grpc starting')
+            self.indications_thread_handle = threading.Thread(
+                target=self.indications_thread)
+            # Old getter/setter API for daemon; use it directly as a
+            # property instead. The Jinkins error will happon on the reason of
+            # Exception in thread Thread-1 (most likely raised # during
+            # interpreter shutdown)
+            self.indications_thread_handle.setDaemon(True)
+            self.indications_thread_handle.start()
+        except Exception as e:
+            self.log.exception('indication start failed', e=e)
+        else:
+            self.log.debug('openolt grpc started')
+
+    def indications_thread(self):
+
+        def forward_indication(topic, msg):
+            try:
+                self.log.debug('forward indication', topic=topic, msg=msg)
+                kafka_proxy = get_kafka_proxy()
+                if kafka_proxy and not kafka_proxy.is_faulty():
+                    self.log.debug('kafka-proxy-available')
+                    self.log.debug('shad 1', topic=topic, msg=msg)
+                    # convert to JSON string if msg is a protobuf msg
+                    if isinstance(msg, Message):
+                        self.log.debug('shad 2', topic=topic, msg=msg)
+                        msg = dumps(MessageToDict(msg, True, True))
+                    self.log.debug('shad 3', topic=topic, msg=msg)
+                    kafka_proxy.send_message(topic, dumps(msg))
+                else:
+                    self.log.error('kafka-proxy-unavailable')
+            except Exception, e:
+                self.log.exception('failed-sending-message', e=e)
+            self.log.debug('shad 4', topic=topic, msg=msg)
+
+        self.log.debug('openolt grpc connecting to olt')
+
+        self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
+
+        timeout = 60*60
+        delay = 1
+        exponential_back_off = False
+        while True:
+            try:
+                self.device.device_info \
+                    = self.stub.GetDeviceInfo(openolt_pb2.Empty())
+                break
+            except Exception as e:
+                if delay > timeout:
+                    self.log.error("openolt grpc timed out connecting to olt")
+                    return
+                else:
+                    self.log.warn(
+                        "openolt grpc retry connecting to olt in %ds: %s"
+                        % (delay, repr(e)))
+                    time.sleep(delay)
+                    if exponential_back_off:
+                        delay += delay
+                    else:
+                        delay += 1
+
+        self.log.info('openolt grpc connected to olt',
+                      device_info=self.device.device_info)
+
+        self.device.go_state_connected()
+
+        self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
+
+        while True:
+            try:
+                # get the next indication from olt
+                ind = next(self.indications)
+            except Exception as e:
+                self.log.warn('openolt grpc connection lost', error=e)
+                reactor.callFromThread(self.device.go_state_down)
+                reactor.callFromThread(self.device.go_state_init)
+                break
+            else:
+                self.log.debug("openolt grpc rx indication", indication=ind)
+
+                if self.device.admin_state is "down":
+                    if ind.HasField('intf_oper_ind') \
+                            and (ind.intf_oper_ind.type == "nni"):
+                        self.log.warn('olt is admin down, allow nni ind',
+                                      admin_state=self.device.admin_state,
+                                      indications=ind)
+                    else:
+                        self.log.warn('olt is admin down, ignore indication',
+                                      admin_state=self.admin_state,
+                                      indications=ind)
+                        continue
+
+                # indication handlers run in the main event loop
+                if ind.HasField('olt_ind'):
+                    forward_indication("openolt.ind.olt", ind.olt_ind)
+                    reactor.callFromThread(
+                        self.device.olt_indication, ind.olt_ind)
+                elif ind.HasField('intf_ind'):
+                    reactor.callFromThread(
+                        self.device.intf_indication, ind.intf_ind)
+                elif ind.HasField('intf_oper_ind'):
+                    reactor.callFromThread(
+                        self.device.intf_oper_indication, ind.intf_oper_ind)
+                elif ind.HasField('onu_disc_ind'):
+                    reactor.callFromThread(
+                        self.device.onu_discovery_indication, ind.onu_disc_ind)
+                elif ind.HasField('onu_ind'):
+                    reactor.callFromThread(
+                        self.device.onu_indication, ind.onu_ind)
+                elif ind.HasField('omci_ind'):
+                    reactor.callFromThread(
+                        self.device.omci_indication, ind.omci_ind)
+                elif ind.HasField('pkt_ind'):
+                    forward_indication("openolt.ind.pkt", ind.pkt_ind)
+                    reactor.callFromThread(
+                        self.device.packet_indication, ind.pkt_ind)
+                elif ind.HasField('port_stats'):
+                    reactor.callFromThread(
+                        self.device.stats_mgr.port_statistics_indication,
+                        ind.port_stats)
+                elif ind.HasField('flow_stats'):
+                    reactor.callFromThread(
+                        self.device.stats_mgr.flow_statistics_indication,
+                        ind.flow_stats)
+                elif ind.HasField('alarm_ind'):
+                    forward_indication("openolt.ind.alarm", ind.alarm_ind)
+                    reactor.callFromThread(
+                        self.device.alarm_mgr.process_alarms, ind.alarm_ind)
+                else:
+                    self.log.warn('unknown indication type')