VOL-1080 Refactor OpenOLT adapter

Change-Id: If183d40b34983004cbf64ae7ebecfb78fa29e713
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 57025be..4fb8057 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -14,15 +14,14 @@
 # limitations under the License.
 #
 
-import structlog
 import threading
+import binascii
 import grpc
-import time
 
+import structlog
 from twisted.internet import reactor
 from scapy.layers.l2 import Ether, Dot1Q
-import binascii
-from transitions import Machine
+from transitions import Machine, State
 
 from voltha.protos.device_pb2 import Port, Device
 from voltha.protos.common_pb2 import OperStatus, AdminState, ConnectStatus
@@ -40,29 +39,50 @@
 from voltha.protos.bbf_fiber_base_pb2 import VEnetConfig
 import voltha.core.flow_decomposer as fd
 
-import openolt_platform as platform
-from openolt_flow_mgr import OpenOltFlowMgr, DEFAULT_MGMT_VLAN
-from openolt_alarms import OpenOltAlarmMgr
 from voltha.adapters.openolt.openolt_statistics import OpenOltStatisticsMgr
-
-MAX_HEARTBEAT_MISS = 3
-HEARTBEAT_PERIOD = 1
-GRPC_TIMEOUT = 5
-
-"""
-OpenoltDevice represents an OLT.
-"""
+import voltha.adapters.openolt.openolt_platform as platform
+from voltha.adapters.openolt.openolt_flow_mgr import OpenOltFlowMgr, \
+        DEFAULT_MGMT_VLAN
+from voltha.adapters.openolt.openolt_alarms import OpenOltAlarmMgr
 
 
 class OpenoltDevice(object):
+    """
+    OpenoltDevice state machine:
 
-    states = ['up', 'down']
+        null ----> init ------> connected -----> up -----> down
+                   ^ ^             |             ^         | |
+                   | |             |             |         | |
+                   | +-------------+             +---------+ |
+                   |                                         |
+                   +-----------------------------------------+
+    """
+    # pylint: disable=too-many-instance-attributes
+    # pylint: disable=R0904
+    states = [
+        'state_null',
+        'state_init',
+        'state_connected',
+        'state_up',
+        'state_down']
+
     transitions = [
-            {'trigger': 'olt_up', 'source': 'down', 'dest': 'up',
-             'before': 'olt_indication_up'},
-            {'trigger': 'olt_down', 'source': 'up', 'dest': 'down',
-             'before': 'olt_indication_down'}
-    ]
+         {'trigger': 'go_state_init',
+          'source': ['state_null', 'state_connected', 'state_down'],
+          'dest': 'state_init',
+          'before': 'do_state_init'},
+         {'trigger': 'go_state_connected',
+          'source': 'state_init',
+          'dest': 'state_connected',
+          'before': 'do_state_connected'},
+         {'trigger': 'go_state_up',
+          'source': ['state_connected', 'state_down'],
+          'dest': 'state_up',
+          'before': 'do_state_up'},
+         {'trigger': 'go_state_down',
+          'source': ['state_up'],
+          'dest': 'state_down',
+          'before': 'do_state_down'}]
 
     def __init__(self, **kwargs):
         super(OpenoltDevice, self).__init__()
@@ -83,22 +103,12 @@
             # Update device
             device.root = True
             device.serial_number = self.host_and_port  # FIXME
-            device.connect_status = ConnectStatus.REACHABLE
+            device.connect_status = ConnectStatus.UNREACHABLE
             device.oper_status = OperStatus.ACTIVATING
             self.adapter_agent.update_device(device)
 
-        # Initialize the OLT state machine
-        self.machine = Machine(model=self, states=OpenoltDevice.states,
-                               transitions=OpenoltDevice.transitions,
-                               send_event=True, initial='down',
-                               ignore_invalid_triggers=True)
-        self.machine.add_transition(trigger='olt_ind_up', source='down',
-                                    dest='up')
-        self.machine.add_transition(trigger='olt_ind_loss', source='up',
-                                    dest='down')
-
         # If logical device does not exist create it
-        if len(device.parent_id) == 0:
+        if not device.parent_id:
 
             dpid = '00:00:' + self.ip_hex(self.host_and_port.split(":")[0])
 
@@ -126,47 +136,115 @@
                 self.adapter_agent.reconcile_logical_device(
                     self.logical_device_id)
 
+        # Initialize the OLT state machine
+        self.machine = Machine(model=self, states=OpenoltDevice.states,
+                               transitions=OpenoltDevice.transitions,
+                               send_event=True, initial='state_null')
+        self.go_state_init()
+
+    def do_state_init(self, event):
         # Initialize gRPC
         self.channel = grpc.insecure_channel(self.host_and_port)
         self.channel_ready_future = grpc.channel_ready_future(self.channel)
-        self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
 
+        # Start indications thread
+        self.indications_thread_handle = threading.Thread(
+            target=self.indications_thread)
+        self.indications_thread_handle.daemon = True
+        self.indications_thread_handle.start()
+
+        '''
+        # FIXME - Move to oper_up state without connecting to OLT?
+        if is_reconciliation:
+            # Put state machine in state up
+            reactor.callFromThread(self.go_state_up, reconciliation=True)
+        '''
+
+        self.log.debug('openolt-device-created', device_id=self.device_id)
+
+    def do_state_connected(self, event):
+        device = self.adapter_agent.get_device(self.device_id)
+        device.connect_status = ConnectStatus.REACHABLE
+        self.adapter_agent.update_device(device)
+
+        self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
         self.flow_mgr = OpenOltFlowMgr(self.log, self.stub, self.device_id)
         self.alarm_mgr = OpenOltAlarmMgr(self.log)
         self.stats_mgr = OpenOltStatisticsMgr(self, self.log)
 
-        # Indications thread plcaholder (started by heartbeat thread)
-        self.indications_thread = None
-        self.indications_thread_active = False
+    def do_state_up(self, event):
+        device = self.adapter_agent.get_device(self.device_id)
 
-        # Start heartbeat thread
-        self.heartbeat_thread = threading.Thread(target=self.heartbeat)
-        self.heartbeat_thread.setDaemon(True)
-        self.heartbeat_thread_active = True
-        self.heartbeat_miss = 0
-        self.heartbeat_signature = None
-        self.heartbeat_thread.start()
+        # Update phys OF device
+        device.parent_id = self.logical_device_id
+        device.oper_status = OperStatus.ACTIVE
+        self.adapter_agent.update_device(device)
 
-        if is_reconciliation:
-            # Put state machine in state up
-            reactor.callFromThread(self.olt_up, reconciliation=True)
+    def do_state_down(self, event):
+        self.log.debug("do_state_down")
+        oper_state = OperStatus.UNKNOWN
+        connect_state = ConnectStatus.UNREACHABLE
 
-        self.log.debug('openolt-device-created', device_id=self.device_id)
+        # Propagating to the children
 
-    def process_indications(self):
+        # Children ports
+        child_devices = self.adapter_agent.get_child_devices(self.device_id)
+        for onu_device in child_devices:
+            uni_no = platform.mk_uni_port_num(
+                onu_device.proxy_address.channel_id,
+                onu_device.proxy_address.onu_id)
+            uni_name = self.port_name(uni_no, Port.ETHERNET_UNI,
+                                      serial_number=onu_device.serial_number)
 
+            self.onu_ports_down(onu_device, uni_no, uni_name, oper_state)
+        # Children devices
+        self.adapter_agent.update_child_devices_state(
+            self.device_id, oper_status=oper_state,
+            connect_status=connect_state)
+        # Device Ports
+        device_ports = self.adapter_agent.get_ports(self.device_id,
+                                                    Port.ETHERNET_NNI)
+        logical_ports_ids = [port.label for port in device_ports]
+        device_ports += self.adapter_agent.get_ports(self.device_id,
+                                                     Port.PON_OLT)
+
+        for port in device_ports:
+            port.oper_status = oper_state
+            self.adapter_agent.add_port(self.device_id, port)
+
+        # Device logical port
+        for logical_port_id in logical_ports_ids:
+            logical_port = self.adapter_agent.get_logical_port(
+                self.logical_device_id, logical_port_id)
+            logical_port.ofp_port.state = OFPPS_LINK_DOWN
+            self.adapter_agent.update_logical_port(self.logical_device_id,
+                                                   logical_port)
+
+        # Device
+        device = self.adapter_agent.get_device(self.device_id)
+        device.oper_status = oper_state
+        device.connect_status = connect_state
+
+        self.adapter_agent.update_device(device)
+
+    def indications_thread(self):
         self.log.debug('starting-indications-thread')
+        self.log.debug('connecting to olt', device_id=self.device_id)
+        self.channel_ready_future.result()  # blocking call
+        self.log.debug('connected to olt', device_id=self.device_id)
+        self.go_state_connected()
 
         self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
 
-        while self.indications_thread_active:
+        while True:
             try:
                 # get the next indication from olt
                 ind = next(self.indications)
             except Exception as e:
-                self.log.warn('GRPC-connection-lost-stoping-indication-thread',
-                              error=e)
-                self.indications_thread_active = False
+                self.log.warn('gRPC connection lost', error=e)
+                reactor.callFromThread(self.go_state_down)
+                reactor.callFromThread(self.go_state_init)
+                break
             else:
                 self.log.debug("rx indication", indication=ind)
 
@@ -189,97 +267,23 @@
                     reactor.callFromThread(self.packet_indication, ind.pkt_ind)
                 elif ind.HasField('port_stats'):
                     reactor.callFromThread(
-                        self.stats_mgr.port_statistics_indication,
-                        ind.port_stats)
+                            self.stats_mgr.port_statistics_indication,
+                            ind.port_stats)
                 elif ind.HasField('flow_stats'):
                     reactor.callFromThread(
-                        self.stats_mgr.flow_statistics_indication,
-                        ind.flow_stats)
+                            self.stats_mgr.flow_statistics_indication,
+                            ind.flow_stats)
                 elif ind.HasField('alarm_ind'):
                     reactor.callFromThread(self.alarm_mgr.process_alarms,
                                            ind.alarm_ind)
                 else:
                     self.log.warn('unknown indication type')
 
-        self.log.debug('stopping-indications-thread', device_id=self.device_id)
-
     def olt_indication(self, olt_indication):
         if olt_indication.oper_state == "up":
-            self.olt_up(ind=olt_indication)
+            self.go_state_up()
         elif olt_indication.oper_state == "down":
-            self.olt_down(ind=olt_indication)
-
-    def olt_indication_up(self, event):
-        olt_indication = event.kwargs.get('ind', None)
-        is_reconciliation = event.kwargs.get('reconciliation', False)
-        self.log.debug("olt indication", olt_ind=olt_indication,
-            reconciliation=is_reconciliation)
-
-        device = self.adapter_agent.get_device(self.device_id)
-
-        # Update phys OF device
-        device.parent_id = self.logical_device_id
-        device.oper_status = OperStatus.ACTIVE
-        self.adapter_agent.update_device(device)
-
-    def olt_indication_down(self, event):
-        olt_indication = event.kwargs.get('ind', None)
-        new_admin_state = event.kwargs.get('admin_state', None)
-        new_oper_state = event.kwargs.get('oper_state', None)
-        new_connect_state = event.kwargs.get('connect_state', None)
-        self.log.debug("olt indication", olt_ind=olt_indication,
-                       admin_state=new_admin_state, oper_state=new_oper_state,
-                       connect_state=new_connect_state)
-
-        # Propagating to the children
-
-        # Children ports
-        child_devices = self.adapter_agent.get_child_devices(self.device_id)
-        for onu_device in child_devices:
-            uni_no = platform.mk_uni_port_num(
-                onu_device.proxy_address.channel_id,
-                onu_device.proxy_address.onu_id)
-            uni_name = self.port_name(uni_no, Port.ETHERNET_UNI,
-                                      serial_number=onu_device.serial_number)
-
-            self.onu_ports_down(onu_device, uni_no, uni_name, new_oper_state)
-        # Children devices
-        self.adapter_agent.update_child_devices_state(
-            self.device_id, oper_status=new_oper_state,
-            connect_status=ConnectStatus.UNREACHABLE,
-            admin_state=new_admin_state)
-        # Device Ports
-        device_ports = self.adapter_agent.get_ports(self.device_id,
-                                                    Port.ETHERNET_NNI)
-        logical_ports_ids = [port.label for port in device_ports]
-        device_ports += self.adapter_agent.get_ports(self.device_id,
-                                                     Port.PON_OLT)
-
-        for port in device_ports:
-            if new_admin_state is not None:
-                port.admin_state = new_admin_state
-            if new_oper_state is not None:
-                port.oper_status = new_oper_state
-            self.adapter_agent.add_port(self.device_id, port)
-
-        # Device logical port
-        for logical_port_id in logical_ports_ids:
-            logical_port = self.adapter_agent.get_logical_port(
-                self.logical_device_id, logical_port_id)
-            logical_port.ofp_port.state = OFPPS_LINK_DOWN
-            self.adapter_agent.update_logical_port(self.logical_device_id,
-                                                   logical_port)
-
-        # Device
-        device = self.adapter_agent.get_device(self.device_id)
-        if new_admin_state is not None:
-            device.admin_state = new_admin_state
-        if new_oper_state is not None:
-            device.oper_status = new_oper_state
-        if new_connect_state is not None:
-            device.connect_status = new_connect_state
-
-        self.adapter_agent.update_device(device)
+            self.go_state_down()
 
     def intf_indication(self, intf_indication):
         self.log.debug("intf indication", intf_id=intf_indication.intf_id,
@@ -390,9 +394,9 @@
         try:
             serial_number_str = self.stringify_serial_number(
                 onu_indication.serial_number)
-        except:
+        except Exception as e:
             serial_number_str = None
-            
+
         if serial_number_str is not None:
             onu_device = self.adapter_agent.get_child_device(
                     self.device_id,
@@ -616,76 +620,114 @@
                   logical_port_no=logical_port_num)
         self.adapter_agent.send_packet_in(packet=str(pkt), **kw)
 
-    def olt_reachable(self):
-        device = self.adapter_agent.get_device(self.device_id)
-        device.connect_status = ConnectStatus.REACHABLE
-        self.adapter_agent.update_device(device)
-        # Not changing its child devices state, we cannot guaranty that
+    def port_statistics_indication(self, port_stats):
+        self.log.info('port-stats-collected', stats=port_stats)
+        self.ports_statistics_kpis(port_stats)
+        # FIXME : only the first uplink is a logical port
+        if port_stats.intf_id == 128:
+            # ONOS update
+            self.update_logical_port_stats(port_stats)
+        # FIXME: Discard other uplinks, they do not exist as an object
+        if port_stats.intf_id in [129, 130, 131]:
+            self.log.debug('those uplinks are not created')
+            return
+        # update port object stats
+        port = self.adapter_agent.get_port(self.device_id,
+                                           port_no=port_stats.intf_id)
 
-    def heartbeat(self):
+        if port is None:
+            self.log.warn('port associated with this stats does not exist')
+            return
 
-        # block till gRPC connection is complete
-        self.channel_ready_future.result()
+        port.rx_packets = port_stats.rx_packets
+        port.rx_bytes = port_stats.rx_bytes
+        port.rx_errors = port_stats.rx_error_packets
+        port.tx_packets = port_stats.tx_packets
+        port.tx_bytes = port_stats.tx_bytes
+        port.tx_errors = port_stats.tx_error_packets
 
-        while self.heartbeat_thread_active:
+        # Add port does an update if port exists
+        self.adapter_agent.add_port(self.device_id, port)
 
-            try:
-                heartbeat = self.stub.HeartbeatCheck(openolt_pb2.Empty(),
-                                                     timeout=GRPC_TIMEOUT)
-            except Exception as e:
-                self.heartbeat_miss += 1
-                self.log.warn('heartbeat-miss',
-                              missed_heartbeat=self.heartbeat_miss, error=e)
-                if self.heartbeat_miss == MAX_HEARTBEAT_MISS:
-                    self.log.error('lost-connectivity-to-olt')
-                    # TODO : send alarm/notify monitoring system
-                    # Using reactor to synchronize update
-                    # flagging it as unreachable and in unknow state
-                    reactor.callFromThread(
-                        self.olt_down,
-                        oper_state=OperStatus.UNKNOWN,
-                        connect_state=ConnectStatus.UNREACHABLE)
+    def flow_statistics_indication(self, flow_stats):
+        self.log.info('flow-stats-collected', stats=flow_stats)
+        # TODO: send to kafka ?
+        # UNTESTED : the openolt driver does not yet provide flow stats
+        self.adapter_agent.update_flow_stats(
+            self.logical_device_id,
+            flow_id=flow_stats.flow_id,
+            packet_count=flow_stats.tx_packets,
+            byte_count=flow_stats.tx_bytes)
 
-            else:
-                # heartbeat received
-                if self.heartbeat_signature is None:
-                    # Initialize heartbeat signature
-                    self.heartbeat_signature = heartbeat.heartbeat_signature
-                    self.log.debug(
-                        'heartbeat-signature',
-                        device_id=self.device_id,
-                        heartbeat_signature=self.heartbeat_signature)
-                # Check if signature is different
-                if self.heartbeat_signature != heartbeat.heartbeat_signature:
-                    # OLT has rebooted
-                    self.log.warn('OLT-was-rebooted', device_id=self.device_id)
-                    # TODO: notify monitoring system
-                    self.heartbeat_signature = heartbeat.heartbeat_signature
+    def ports_statistics_kpis(self, port_stats):
+        pm_data = {}
+        pm_data["rx_bytes"] = port_stats.rx_bytes
+        pm_data["rx_packets"] = port_stats.rx_packets
+        pm_data["rx_ucast_packets"] = port_stats.rx_ucast_packets
+        pm_data["rx_mcast_packets"] = port_stats.rx_mcast_packets
+        pm_data["rx_bcast_packets"] = port_stats.rx_bcast_packets
+        pm_data["rx_error_packets"] = port_stats.rx_error_packets
+        pm_data["tx_bytes"] = port_stats.tx_bytes
+        pm_data["tx_packets"] = port_stats.tx_packets
+        pm_data["tx_ucast_packets"] = port_stats.tx_ucast_packets
+        pm_data["tx_mcast_packets"] = port_stats.tx_mcast_packets
+        pm_data["tx_bcast_packets"] = port_stats.tx_bcast_packets
+        pm_data["tx_error_packets"] = port_stats.tx_error_packets
+        pm_data["rx_crc_errors"] = port_stats.rx_crc_errors
+        pm_data["bip_errors"] = port_stats.bip_errors
 
-                else:
-                    self.log.debug('valid-heartbeat-received')
+        prefix = 'voltha.openolt.{}'.format(self.device_id)
+        # FIXME
+        if port_stats.intf_id < 132:
+            prefixes = {
+                prefix + '{}.nni'.format(port_stats.intf_id): MetricValuePairs(
+                    metrics=pm_data)
+            }
+        else:
+            prefixes = {
+                prefix + '.pon.{}'.format(platform.intf_id_from_pon_port_no(
+                    port_stats.intf_id)): MetricValuePairs(
+                    metrics=pm_data)
+            }
 
-                if self.heartbeat_miss > MAX_HEARTBEAT_MISS:
-                    self.log.info('OLT-connection-restored')
-                    # TODO : suppress alarm/notify monitoring system
-                    # flagging it as reachable again
-                    reactor.callFromThread(self.olt_reachable)
+        kpi_event = KpiEvent(
+            type=KpiEventType.slice,
+            ts=port_stats.timestamp,
+            prefixes=prefixes)
+        self.adapter_agent.submit_kpis(kpi_event)
 
-                if not self.indications_thread_active:
-                    self.log.info('(re)starting-indications-thread')
-                    # reset indications thread
-                    self.indications_thread = threading.Thread(
-                        target=self.process_indications)
-                    self.indications_thread.setDaemon(True)
-                    self.indications_thread_active = True
-                    self.indications_thread.start()
+    def update_logical_port_stats(self, port_stats):
+        # FIXME
+        label = 'nni-{}'.format(port_stats.intf_id)
+        try:
+            logical_port = self.adapter_agent.get_logical_port(
+                self.logical_device_id, label)
+        except KeyError as e:
+            self.log.warn('logical port was not found, it may not have been '
+                          'created yet', exception=e)
+            logical_port = None
 
-                self.heartbeat_miss = 0
+        if logical_port is None:
+            self.log.error('logical-port-is-None',
+                           logical_device_id=self.logical_device_id,
+                           label=label,
+                           port_stats=port_stats)
+            return
 
-            time.sleep(HEARTBEAT_PERIOD)
+        self.log.debug('before', port=logical_port)
 
-        self.log.debug('stopping-heartbeat-thread', device_id=self.device_id)
+        logical_port.ofp_port_stats.rx_packets = port_stats.rx_packets
+        logical_port.ofp_port_stats.rx_bytes = port_stats.rx_bytes
+        logical_port.ofp_port_stats.tx_packets = port_stats.tx_packets
+        logical_port.ofp_port_stats.tx_bytes = port_stats.tx_bytes
+        logical_port.ofp_port_stats.rx_errors = port_stats.rx_error_packets
+        logical_port.ofp_port_stats.tx_errors = port_stats.tx_error_packets
+        logical_port.ofp_port_stats.rx_crc_err = port_stats.rx_crc_errors
 
+        self.log.debug('after', port=logical_port)
+
+        self.adapter_agent.update_logical_port(self.logical_device_id,
+                                               logical_port)
 
     def packet_out(self, egress_port, msg):
         pkt = Ether(msg)
@@ -911,17 +953,13 @@
         # but we are not doing that presently
 
         # Bring OLT down
-        self.olt_down(oper_state=OperStatus.UNKNOWN,
-                      admin_state=AdminState.DISABLED,
-                      connect_state=ConnectStatus.UNREACHABLE)
+        self.go_state_down()
 
     def delete(self):
         self.log.info('delete-olt', device_id=self.device_id)
 
         # Stop the grpc communication threads
         self.log.info('stopping-grpc-threads', device_id=self.device_id)
-        self.indications_thread_active = False
-        self.heartbeat_thread_active = False
 
         # Close the grpc channel
         # self.log.info('unsubscribing-grpc-channel', device_id=self.device_id)
@@ -933,7 +971,7 @@
         self.log.info('reenable-olt', device_id=self.device_id)
 
         # Bring up OLT
-        self.olt_up()
+        self.go_state_up()
 
         # Enable all child devices
         self.log.info('enabling-child-devices', device_id=self.device_id)