VOL-771 OLT heartbeat
Heartbeat thread that polls the OLT through the GRPC channel
Indications thread is cleanly exited when GRPC connection is lost and is restarted when connectivity is restored
ONU and OLT put in UNREACHABLE and UNKNOW states, logical ports disabled
Use of a connection signature to see if the loss of connectivity was just temporary network problem or if the OLT rebooted/ drivers restarted

Change-Id: I49362afb6025489473b92441de73ad8cc09de237
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index ea8d96e..8620ae0 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -18,6 +18,7 @@
 import threading
 import grpc
 import collections
+import time
 
 from twisted.internet import reactor
 from scapy.layers.l2 import Ether, Dot1Q
@@ -42,6 +43,9 @@
 import openolt_platform as platform
 from openolt_flow_mgr import OpenOltFlowMgr
 
+MAX_HEARTBEAT_MISS = 3
+HEARTBEAT_PERIOD = 1
+GRPC_TIMEOUT = 5
 
 """
 OpenoltDevice represents an OLT.
@@ -83,40 +87,58 @@
         # Initialize gRPC
         self.channel = grpc.insecure_channel(self.host_and_port)
         self.channel_ready_future = grpc.channel_ready_future(self.channel)
+        self.channel_ready_future.result()  # blocks till gRPC connection is complete
+        self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
+
+        self.flow_mgr = OpenOltFlowMgr(self.log, self.stub)
 
         # Start indications thread
         self.indications_thread = threading.Thread(target=self.process_indications)
-        self.indications_thread.daemon = True
+        self.indications_thread.setDaemon(True)
+        self.indications_thread_active = True
         self.indications_thread.start()
 
+        # Start heartbeat thread
+        self.heartbeat_thread = threading.Thread(target=self.heartbeat)
+        self.heartbeat_thread.setDaemon(True)
+        self.heartbeat_thread_active = True
+        self.heartbeat_miss = 0
+        self.heartbeat_signature = None
+        self.heartbeat_thread.start()
+
     def process_indications(self):
 
-        self.channel_ready_future.result() # blocks till gRPC connection is complete
+        self.log.debug('starting-indications-thread')
 
-        self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
-        self.flow_mgr = OpenOltFlowMgr(self.log, self.stub)
         self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
 
-        while True:
-            # get the next indication from olt
-            ind = next(self.indications)
-            self.log.debug("rx indication", indication=ind)
+        while self.indications_thread_active:
+            try:
+                # get the next indication from olt
+                ind = next(self.indications)
+            except Exception as e:
+                self.log.warn('GRPC-connection-lost-stoping-indications-thread', error=e)
+                self.indications_thread_active = False
+            else:
+                self.log.debug("rx indication", indication=ind)
 
-            # indication handlers run in the main event loop
-            if ind.HasField('olt_ind'):
-                reactor.callFromThread(self.olt_indication, ind.olt_ind)
-            elif ind.HasField('intf_ind'):
-                reactor.callFromThread(self.intf_indication, ind.intf_ind)
-            elif ind.HasField('intf_oper_ind'):
-                reactor.callFromThread(self.intf_oper_indication, ind.intf_oper_ind)
-            elif ind.HasField('onu_disc_ind'):
-                reactor.callFromThread(self.onu_discovery_indication, ind.onu_disc_ind)
-            elif ind.HasField('onu_ind'):
-                reactor.callFromThread(self.onu_indication, ind.onu_ind)
-            elif ind.HasField('omci_ind'):
-                reactor.callFromThread(self.omci_indication, ind.omci_ind)
-            elif ind.HasField('pkt_ind'):
-                reactor.callFromThread(self.packet_indication, ind.pkt_ind)
+                # indication handlers run in the main event loop
+                if ind.HasField('olt_ind'):
+                    reactor.callFromThread(self.olt_indication, ind.olt_ind)
+                elif ind.HasField('intf_ind'):
+                    reactor.callFromThread(self.intf_indication, ind.intf_ind)
+                elif ind.HasField('intf_oper_ind'):
+                    reactor.callFromThread(self.intf_oper_indication, ind.intf_oper_ind)
+                elif ind.HasField('onu_disc_ind'):
+                    reactor.callFromThread(self.onu_discovery_indication, ind.onu_disc_ind)
+                elif ind.HasField('onu_ind'):
+                    reactor.callFromThread(self.onu_indication, ind.onu_ind)
+                elif ind.HasField('omci_ind'):
+                    reactor.callFromThread(self.omci_indication, ind.omci_ind)
+                elif ind.HasField('pkt_ind'):
+                    reactor.callFromThread(self.packet_indication, ind.pkt_ind)
+
+        self.log.debug('stopping-indications-thread', device_id=self.device_id)
 
     def olt_indication(self, olt_indication):
         if olt_indication.oper_state == "up":
@@ -128,34 +150,63 @@
         olt_indication = event.kwargs.get('ind', None)
         self.log.debug("olt indication", olt_ind=olt_indication)
 
-        dpid = '00:00:' + self.ip_hex(self.host_and_port.split(":")[0])
+        device = self.adapter_agent.get_device(self.device_id)
 
-        # Create logical OF device
-        ld = LogicalDevice(
-            root_device_id=self.device_id,
-            switch_features=ofp_switch_features(
-                n_buffers=256,  # TODO fake for now
-                n_tables=2,  # TODO ditto
-                capabilities=(  # TODO and ditto
-                    OFPC_FLOW_STATS
-                    | OFPC_TABLE_STATS
-                    | OFPC_PORT_STATS
-                    | OFPC_GROUP_STATS
+        # If logical device does not exist create it
+        if len(device.parent_id) == 0:
+
+            dpid = '00:00:' + self.ip_hex(self.host_and_port.split(":")[0])
+
+            # Create logical OF device
+            ld = LogicalDevice(
+                root_device_id=self.device_id,
+                switch_features=ofp_switch_features(
+                    n_buffers=256,  # TODO fake for now
+                    n_tables=2,  # TODO ditto
+                    capabilities=(  # TODO and ditto
+                        OFPC_FLOW_STATS
+                        | OFPC_TABLE_STATS
+                        | OFPC_PORT_STATS
+                        | OFPC_GROUP_STATS
+                    )
                 )
             )
-        )
-        ld_initialized = self.adapter_agent.create_logical_device(ld, dpid=dpid)
-        self.logical_device_id = ld_initialized.id
+            ld_initialized = self.adapter_agent.create_logical_device(ld, dpid=dpid)
+            self.logical_device_id = ld_initialized.id
 
         # Update phys OF device
-        device = self.adapter_agent.get_device(self.device_id)
         device.parent_id = self.logical_device_id
         device.oper_status = OperStatus.ACTIVE
         self.adapter_agent.update_device(device)
 
     def olt_indication_down(self, event):
         olt_indication = event.kwargs.get('ind', None)
-        self.log.debug("olt indication", olt_ind=olt_indication)
+        new_admin_state = event.kwargs.get('admin_state', None)
+        new_oper_state = event.kwargs.get('oper_state', None)
+        new_connect_state = event.kwargs.get('connect_state', None)
+        self.log.debug("olt indication", olt_ind=olt_indication, admin_state=new_admin_state, oper_state=new_oper_state,
+                       connect_state=new_connect_state)
+
+        device = self.adapter_agent.get_device(self.device_id)
+        if new_admin_state is not  None:
+            device.admin_state = new_admin_state
+        if new_oper_state is not None:
+            device.oper_status = new_oper_state
+        if new_connect_state is not None:
+            device.connect_status = new_connect_state
+
+        self.adapter_agent.update_device(device)
+        #Propagating to the children
+        self.adapter_agent.update_child_devices_state(self.device_id, oper_status=new_oper_state,
+                                              connect_status=ConnectStatus.UNREACHABLE, admin_state=new_admin_state)
+
+        child_devices = self.adapter_agent.get_child_devices(self.device_id)
+        for onu_device in child_devices:
+            uni_no = platform.mk_uni_port_num(onu_device.proxy_address.channel_id, onu_device.proxy_address.onu_id)
+            uni_name = self.port_name(uni_no, Port.ETHERNET_UNI, serial_number=onu_device.serial_number)
+
+            self.onu_ports_down(onu_device, uni_no, uni_name, new_oper_state)
+
 
     def intf_indication(self, intf_indication):
         self.log.debug("intf indication", intf_id=intf_indication.intf_id,
@@ -166,7 +217,7 @@
         else:
             oper_status = OperStatus.DISCOVERED
 
-        # FIXME - If port exists, update oper state
+        # add_port update the port if it exists
         self.add_port(intf_indication.intf_id, Port.PON_OLT, oper_status)
 
     def intf_oper_indication(self, intf_oper_indication):
@@ -221,6 +272,10 @@
                 self.log.exception('onu-activation-failed', e=e)
 
         else:
+            if onu_device.connect_status != ConnectStatus.REACHABLE:
+                    onu_device.connect_status = ConnectStatus.REACHABLE
+                    self.adapter_agent.update_device(onu_device)
+
             onu_id = onu_device.proxy_address.onu_id
             if onu_device.oper_status == OperStatus.DISCOVERED or onu_device.oper_status == OperStatus.ACTIVATING:
                 self.log.debug("ignore onu discovery indication, the onu has been discovered and should be \
@@ -228,6 +283,15 @@
             elif onu_device.oper_status== OperStatus.ACTIVE:
                 self.log.warn("onu discovery indication whereas onu is supposed to be active",
                               intf_id=intf_id, onu_id=onu_id, state=onu_device.oper_status)
+            elif onu_device.oper_status == OperStatus.UNKNOWN:
+                self.log.info("onu-in-unknow-state-recovering-form-olt-reboot-activate-onu", intf_id=intf_id, onu_id=onu_id,
+                              serial_number=serial_number_str)
+
+                onu_device.oper_status = OperStatus.DISCOVERED
+                self.adapter_agent.update_device(onu_device)
+
+                onu = openolt_pb2.Onu(intf_id=intf_id, onu_id=onu_id, serial_number=serial_number)
+                self.stub.ActivateOnu(onu)
             else:
                 self.log.warn('unexpected state', onu_id=onu_id, onu_device_oper_state=onu_device.oper_status)
 
@@ -258,6 +322,10 @@
             self.log.warn('onu-device-is-none-invalid-message')
             return
 
+        if onu_device.connect_status != ConnectStatus.REACHABLE:
+            onu_device.connect_status = ConnectStatus.REACHABLE
+            self.adapter_agent.update_device(onu_device)
+
         if platform.intf_id_from_pon_port_no(onu_device.parent_port_no) != onu_indication.intf_id:
             self.log.warn('ONU-is-on-a-different-intf-id-now',
                           previous_intf_id=platform.intf_id_from_pon_port_no(onu_device.parent_port_no),
@@ -308,42 +376,14 @@
                 onu_device.oper_status = OperStatus.DISCOVERED
                 self.adapter_agent.update_device(onu_device)
             #Set port oper state to Discovered
-            #add port will update port if it exists
-            self.adapter_agent.add_port(
-                self.device_id,
-                Port(
-                    port_no=uni_no,
-                    label=uni_name,
-                    type=Port.ETHERNET_UNI,
-                    admin_state=onu_device.admin_state,
-                    oper_status=OperStatus.DISCOVERED))
 
-            #Disable logical port
-            openolt_device = self.adapter_agent.get_device(self.device_id)
-            onu_ports = self.proxy.get('devices/{}/ports'.format(onu_device.id))
-            onu_port_id = None
-            for onu_port in onu_ports:
-                if onu_port.port_no == uni_no:
-                    onu_port_id = onu_port.label
-            if onu_port_id is None:
-                self.log.error('matching-onu-port-label-not-found', onu_id=onu_device.id, olt_id=self.device_id,
-                              onu_ports=onu_ports)
-                return
-            try:
-                onu_logical_port = self.adapter_agent.get_logical_port(logical_device_id=openolt_device.parent_id,
-                                                                   port_id=onu_port_id)
-                onu_logical_port.ofp_port.state=OFPPS_LINK_DOWN
-                self.adapter_agent.update_logical_port(logical_device_id=openolt_device.parent_id, port= onu_logical_port)
-                self.log.debug('cascading-oper-state-to-port-and-logical-port')
-            except KeyError as e:
-                self.log.error('matching-onu-port-label-invalid', onu_id=onu_device.id, olt_id=self.device_id,
-                               onu_ports=onu_ports, onu_port_id=onu_port_id, error=e)
+            self.onu_ports_down(onu_device, uni_no, uni_name, OperStatus.DISCOVERED)
 
         elif onu_indication.oper_state == 'up':
 
             if onu_device.oper_status != OperStatus.DISCOVERED:
                 self.log.debug("ignore onu indication", intf_id=onu_indication.intf_id,
-                               onu_id=onu_indication.onu_id, state=onu_device.oper_state,
+                               onu_id=onu_indication.onu_id, state=onu_device.oper_status,
                                msg_oper_state=onu_indication.oper_state)
                 return
 
@@ -412,6 +452,39 @@
         else:
             self.log.warn('Not-implemented-or-invalid-value-of-oper-state', oper_state=onu_indication.oper_state)
 
+    def onu_ports_down(self, onu_device, uni_no, uni_name, oper_state):
+        # Set port oper state to Discovered
+        # add port will update port if it exists
+        self.adapter_agent.add_port(
+            self.device_id,
+            Port(
+                port_no=uni_no,
+                label=uni_name,
+                type=Port.ETHERNET_UNI,
+                admin_state=onu_device.admin_state,
+                oper_status=oper_state))
+
+        # Disable logical port
+        openolt_device = self.adapter_agent.get_device(self.device_id)
+        onu_ports = self.proxy.get('devices/{}/ports'.format(onu_device.id))
+        onu_port_id = None
+        for onu_port in onu_ports:
+            if onu_port.port_no == uni_no:
+                onu_port_id = onu_port.label
+        if onu_port_id is None:
+            self.log.error('matching-onu-port-label-not-found', onu_id=onu_device.id, olt_id=self.device_id,
+                           onu_ports=onu_ports)
+            return
+        try:
+            onu_logical_port = self.adapter_agent.get_logical_port(logical_device_id=openolt_device.parent_id,
+                                                                   port_id=onu_port_id)
+            onu_logical_port.ofp_port.state = OFPPS_LINK_DOWN
+            self.adapter_agent.update_logical_port(logical_device_id=openolt_device.parent_id, port=onu_logical_port)
+            self.log.debug('cascading-oper-state-to-port-and-logical-port')
+        except KeyError as e:
+            self.log.error('matching-onu-port-label-invalid', onu_id=onu_device.id, olt_id=self.device_id,
+                           onu_ports=onu_ports, onu_port_id=onu_port_id, error=e)
+
     def omci_indication(self, omci_indication):
 
         self.log.debug("omci indication", intf_id=omci_indication.intf_id,
@@ -437,6 +510,68 @@
                 logical_port_no=logical_port_num)
         self.adapter_agent.send_packet_in(packet=str(pkt), **kw)
 
+    def olt_reachable(self):
+        device = self.adapter_agent.get_device(self.device_id)
+        device.connect_status = ConnectStatus.REACHABLE
+        self.adapter_agent.update_device(device)
+        # Not changing its child devices state, we cannot guaranty that
+
+    def heartbeat(self):
+
+        while self.heartbeat_thread_active:
+
+            try:
+                heartbeat = self.stub.HeartbeatCheck(openolt_pb2.Empty(), timeout=GRPC_TIMEOUT)
+            except Exception as e:
+                self.heartbeat_miss += 1
+                self.log.warn('heartbeat-miss', missed_heartbeat=self.heartbeat_miss, error=e)
+                if self.heartbeat_miss == MAX_HEARTBEAT_MISS:
+                    self.log.error('lost-connectivity-to-olt')
+                    #TODO : send alarm/notify monitoring system
+                    # Using reactor to synchronize update
+                    # flagging it as unreachable and in unknow state
+                    reactor.callLater(0, self.olt_down, oper_state=OperStatus.UNKNOWN,
+                                      connect_state=ConnectStatus.UNREACHABLE)
+
+            else:
+                # heartbeat received
+                if self.heartbeat_signature is None:
+                    # Initialize heartbeat signature
+                    self.heartbeat_signature = heartbeat.heartbeat_signature
+                    self.log.debug('heartbeat-signature', device_id=self.device_id,
+                                   heartbeat_signature=self.heartbeat_signature)
+                # Check if signature is different
+                if self.heartbeat_signature != heartbeat.heartbeat_signature:
+                    # OLT has rebooted
+                    self.log.warn('OLT-was-rebooted', device_id=self.device_id)
+                    #TODO: notify monitoring system
+                    self.heartbeat_signature = heartbeat.heartbeat_signature
+
+                else:
+                    self.log.debug('valid-heartbeat-received')
+
+                if self.heartbeat_miss > MAX_HEARTBEAT_MISS:
+                    self.log.info('OLT-connection-restored')
+                    #TODO : suppress alarm/notify monitoring system
+                    # flagging it as reachable again
+                    reactor.callLater(0, self.olt_reachable)
+
+                if not self.indications_thread_active:
+                    self.log.info('restarting-indications-thread')
+                    # reset indications thread
+                    self.indications_thread = threading.Thread(target=self.process_indications)
+                    self.indications_thread.setDaemon(True)
+                    self.indications_thread_active = True
+                    self.indications_thread.start()
+
+                self.heartbeat_miss = 0
+
+            time.sleep(HEARTBEAT_PERIOD)
+
+        self.log.debug('stopping-heartbeat-thread', device_id=self.device_id)
+
+
+
     def packet_out(self, egress_port, msg):
         pkt = Ether(msg)
         self.log.info('packet out', egress_port=egress_port,
@@ -538,8 +673,6 @@
         onu_id = None
         onu_devices = self.adapter_agent.get_child_devices(self.device_id)
         for i in range(1, 512):
-            # key = OnuKey(intf_id=intf_id, onu_id=i)
-            # if key not in self.onus:
             id_not_taken = True
             for child_device in onu_devices:
                 if child_device.proxy_address.onu_id == i:
diff --git a/voltha/adapters/openolt/protos/openolt.proto b/voltha/adapters/openolt/protos/openolt.proto
index 0dc175b..a7915c8 100644
--- a/voltha/adapters/openolt/protos/openolt.proto
+++ b/voltha/adapters/openolt/protos/openolt.proto
@@ -46,6 +46,13 @@
         };
     }
 
+    rpc HeartbeatCheck(Empty) returns (Heartbeat) {
+        option (google.api.http) = {
+          post: "/v1/HeartbeatCheck"
+          body: "*"
+        };
+    }
+
     rpc EnableIndication(Empty) returns (stream Indication) {}
 }
 
@@ -102,6 +109,10 @@
     bytes pkt = 4;
 }
 
+message Heartbeat {
+    fixed32 heartbeat_signature = 1;
+}
+
 message Onu {
     fixed32 intf_id = 1;
     fixed32 onu_id = 2;