Add heartbeat between Maple OLT adapter and hardware.

Change-Id: I4453375b5c181318056bd0a233966fde9559e15c
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 6d5183c..988e6f2 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -20,10 +20,10 @@
 from uuid import uuid4
 
 import arrow
-import grpc
 import structlog
 from scapy.layers.l2 import Ether, Dot1Q
 from twisted.internet import reactor
+from twisted.internet.protocol import ReconnectingClientFactory
 from twisted.spread import pb
 from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
 from zope.interface import implementer
@@ -237,27 +237,74 @@
         handler.packet_out(egress_port_no, msg)
 
 
+class MaplePBClientFactory(pb.PBClientFactory, ReconnectingClientFactory):
+    channel = None
+    maxDelay = 60
+    initialDelay = 15
+
+    def clientConnectionMade(self, broker):
+        log.info('pb-client-connection-made')
+        pb.PBClientFactory.clientConnectionMade(self, broker)
+        ReconnectingClientFactory.resetDelay(self)
+
+    def clientConnectionLost(self, connector, reason, reconnecting=0):
+        log.info('pb-client-connection-lost')
+        pb.PBClientFactory.clientConnectionLost(self, connector, reason, reconnecting=1)
+        ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
+        log.info('pb-client-connection-lost-retrying')
+
+    def clientConnectionFailed(self, connector, reason):
+        log.info('pb-client-connection-failed')
+        pb.PBClientFactory.clientConnectionFailed(self, connector, reason)
+        ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
+        log.info('pb-client-connection-failed-retrying')
+
+    def disconnect(self, stopTrying=0):
+        if stopTrying:
+            ReconnectingClientFactory.stopTrying(self)
+        pb.PBClientFactory.disconnect(self)
+
+    def channel_disconnected(self, channel):
+        log.info('pb-channel-disconnected', channel=channel)
+        self.disconnect()
+
+    @inlineCallbacks
+    def getChannel(self):
+        if self.channel is None:
+            try:
+                self.channel = yield self.getRootObject()
+                self.channel.notifyOnDisconnect(self.channel_disconnected)
+            except Exception as e:
+                log.info('pb-client-failed-to-get-channel', exc=str(e))
+                self.channel = None
+        returnValue(self.channel)
+
+
 class MapleOltHandler(object):
     def __init__(self, adapter, device_id):
         self.adapter = adapter
         self.adapter_agent = adapter.adapter_agent
         self.device_id = device_id
         self.log = structlog.get_logger(device_id=device_id)
-        self.channel = None
         self.io_port = None
         self.logical_device_id = None
         self.interface = registry('main').get_args().interface
-        self.pbc_factory = pb.PBClientFactory()
+        self.pbc_factory = MaplePBClientFactory()
         self.pbc_port = 24498
         self.tx_id = 0
         self.rx_handler = MapleOltRxHandler(self.device_id, self.adapter)
+        self.heartbeat_count = 0
+        self.heartbeat_miss = 0
+        self.heartbeat_interval = 1
+        self.heartbeat_failed_limit = 3
+        self.command_timeout = 5
 
     def __del__(self):
         if self.io_port is not None:
             registry('frameio').close_port(self.io_port)
 
     def get_channel(self):
-        return self.channel
+        return self.pbc_factory.getChannel()
 
     def get_vlan_from_onu(self, onu):
         vlan = onu + 1024
@@ -274,7 +321,7 @@
         self.log.info('setting-remote-ip-port', ip=srv_ip, port=srv_port)
 
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('set_remote', srv_ip, srv_port)
             self.log.info('set-remote', data=data, ip=srv_ip, port=srv_port)
         except Exception as e:
@@ -288,7 +335,7 @@
                       ip_proto=ip_proto,
                       dst_port=dst_port)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('config_classifier',
                                            olt_no,
                                            etype,
@@ -307,7 +354,7 @@
                       ip_proto=ip_proto,
                       dst_port=dst_port)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('config_acflow',
                                            olt_no,
                                            onu_no,
@@ -323,7 +370,7 @@
     def send_connect_olt(self, olt_no):
         self.log.info('connecting-to-olt', olt=olt_no)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('connect_olt', olt_no)
             self.log.info('connected-to-olt', data=data)
         except Exception as e:
@@ -333,7 +380,7 @@
     def send_activate_olt(self, olt_no):
         self.log.info('activating-olt', olt=olt_no)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('activate_olt', olt_no)
             self.log.info('activated-olt', data=data)
         except Exception as e:
@@ -347,7 +394,7 @@
                       serial=serial_no,
                       vendor=vendor_no)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('create_onu',
                                            olt_no,
                                            onu_no,
@@ -364,7 +411,7 @@
                       onu=onu_no,
                       alloc_id=alloc_id)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('configure_alloc_id',
                                            olt_no,
                                            onu_no,
@@ -380,7 +427,7 @@
                       onu=onu_no,
                       unicast_gem_port=uni_gem)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('configure_unicast_gem',
                                            olt_no,
                                            onu_no,
@@ -396,7 +443,7 @@
                       onu=onu_no,
                       multicast_gem_port=multi_gem)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('configure_multicast_gem',
                                            olt_no,
                                            onu_no,
@@ -414,7 +461,7 @@
                       unicast_gem_port=uni_gem,
                       multicast_gem_port=multi_gem)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('configure_onu',
                                            olt_no,
                                            onu_no,
@@ -429,111 +476,198 @@
     def send_activate_onu(self, olt_no, onu_no):
         self.log.info('activating-onu', olt=olt_no, onu=onu_no)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('activate_onu', olt_no, onu_no)
             self.log.info('activated-onu', data=data)
         except Exception as e:
             self.log.info('activate-onu-exception', exc=str(e))
 
-    @inlineCallbacks
-    def activate(self, device):
-        self.log.info('activating')
 
-        if not device.ipv4_address:
-            device.oper_status = OperStatus.FAILED
-            device.reason = 'No ipv4_address field provided'
-            self.adapter_agent.update_device(device)
+    @inlineCallbacks
+    def heartbeat(self, device_id, state='run'):
+        """Heartbeat OLT hardware
+
+        Call PB remote method 'heartbeat' to verify connectivity to OLT hardware.
+        If heartbeat missed self.heartbeat_failed_limit times OLT adapter is set
+        FAILED/UNREACHABLE.
+        No further action from VOLTHA core is expected as result of heartbeat failure.
+        Heartbeat continues following failure and once connectivity is restored adapter
+        state will be set to ACTIVE/REACHABLE
+
+        Arguments:
+        device_id: adapter device id
+        state: desired state (stop, start, run)
+        """
+
+        self.log.debug('olt-heartbeat', device=device_id, state=state, count=self.heartbeat_count)
+
+        def add_timeout(d, duration):
+            return reactor.callLater(duration, d.cancel)
+
+        def cancel_timeout(t):
+            if t.active():
+                t.cancel()
+                self.log.debug('olt-heartbeat-timeout-cancelled')
+
+        if state == 'stop':
             return
 
+        if state == 'start':
+            self.heartbeat_count = 0
+            self.heartbeat_miss = 0
+
+        try:
+            d = self.get_channel()
+            timeout = add_timeout(d, self.command_timeout)
+            remote = yield d
+            cancel_timeout(timeout)
+
+            d = remote.callRemote('heartbeat', self.heartbeat_count)
+            timeout = add_timeout(d, self.command_timeout)
+            data = yield d
+            cancel_timeout(timeout)
+        except Exception as e:
+            data = -1
+            self.log.info('olt-heartbeat-exception', data=data, count=self.heartbeat_miss, exc=str(e))
+
+        if data != self.heartbeat_count:
+            # something is not right
+            self.heartbeat_miss += 1
+            self.log.info('olt-heartbeat-miss', data=data, count=self.heartbeat_count, miss=self.heartbeat_miss)
+        else:
+            if self.heartbeat_miss > 0:
+                self.heartbeat_miss = 0
+                _device = self.adapter_agent.get_device(device_id)
+                _device.connect_status = ConnectStatus.REACHABLE
+                _device.oper_status = OperStatus.ACTIVE
+                _device.reason = ''
+                self.adapter_agent.update_device(_device)
+
+        _device = self.adapter_agent.get_device(device_id)
+        if (self.heartbeat_miss >= self.heartbeat_failed_limit) and (_device.connect_status == ConnectStatus.REACHABLE):
+            self.log.info('olt-heartbeat-failed', data=data, count=self.heartbeat_miss)
+            _device = self.adapter_agent.get_device(device_id)
+            _device.connect_status = ConnectStatus.UNREACHABLE
+            _device.oper_status = OperStatus.FAILED
+            _device.reason = 'Lost connectivity to OLT'
+            self.adapter_agent.update_device(_device)
+
+        self.heartbeat_count += 1 
+        reactor.callLater(self.heartbeat_interval, self.heartbeat, device_id)
+
+    @inlineCallbacks
+    def activate(self, device):
+        self.log.info('activating-olt', device=device)
+        if self.logical_device_id is None:
+            if not device.ipv4_address:
+                device.oper_status = OperStatus.FAILED
+                device.reason = 'No ipv4_address field provided'
+                self.adapter_agent.update_device(device)
+                return
+
+            device.root = True
+            device.vendor = 'Broadcom'
+            device.model = 'bcm68620'
+            device.serial_number = device.ipv4_address
+            self.adapter_agent.update_device(device)
+
+            nni_port = Port(
+                port_no=2,
+                label='NNI facing Ethernet port',
+                type=Port.ETHERNET_NNI,
+                admin_state=AdminState.ENABLED,
+                oper_status=OperStatus.ACTIVE
+            )
+            self.adapter_agent.add_port(device.id, nni_port)
+            self.adapter_agent.add_port(device.id, Port(
+                port_no=1,
+                label='PON port',
+                type=Port.PON_OLT,
+                admin_state=AdminState.ENABLED,
+                oper_status=OperStatus.ACTIVE
+            ))
+
+            ld = LogicalDevice(
+                # not setting id and datapth_id will let the adapter agent pick id
+                desc=ofp_desc(
+                    mfr_desc='cord project',
+                    hw_desc='n/a',
+                    sw_desc='logical device for Maple-based PON',
+                    serial_num=uuid4().hex,
+                    dp_desc='n/a'
+                ),
+                switch_features=ofp_switch_features(
+                    n_buffers=256,  # TODO fake for now
+                    n_tables=2,  # TODO ditto
+                    capabilities=(  # TODO and ditto
+                        OFPC_FLOW_STATS
+                        | OFPC_TABLE_STATS
+                        | OFPC_PORT_STATS
+                        | OFPC_GROUP_STATS
+                    )
+                ),
+                root_device_id=device.id
+            )
+            ld_initialized = self.adapter_agent.create_logical_device(ld)
+            cap = OFPPF_1GB_FD | OFPPF_FIBER
+            self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
+                id='nni',
+                ofp_port=ofp_port(
+                    port_no=0,  # is 0 OK?
+                    hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % 129),
+                    name='nni',
+                    config=0,
+                    state=OFPPS_LIVE,
+                    curr=cap,
+                    advertised=cap,
+                    peer=cap,
+                    curr_speed=OFPPF_1GB_FD,
+                    max_speed=OFPPF_1GB_FD
+                ),
+                device_id=device.id,
+                device_port_no=nni_port.port_no,
+                root_port=True
+            ))
+
+            device = self.adapter_agent.get_device(device.id)
+            device.parent_id = ld_initialized.id
+            device.connect_status = ConnectStatus.UNREACHABLE
+            device.oper_status = OperStatus.ACTIVATING
+            self.adapter_agent.update_device(device)
+            self.logical_device_id = ld_initialized.id
+
+        device = self.adapter_agent.get_device(device.id)
         self.log.info('initiating-connection-to-olt',
                       device_id=device.id,
                       ipv4=device.ipv4_address,
                       port=self.pbc_port)
-        reactor.connectTCP(device.ipv4_address, self.pbc_port, self.pbc_factory)
         try:
-            self.channel = yield self.pbc_factory.getRootObject()
-            self.log.info('connected-to-olt',
-                          device_id=device.id,
-                          ipv4=device.ipv4_address,
-                          port=self.pbc_port)
+            reactor.connectTCP(device.ipv4_address, self.pbc_port, self.pbc_factory)
+            device.connect_status = ConnectStatus.REACHABLE
+            device.oper_status = OperStatus.ACTIVE
+            device.reason = ''
+            self.adapter_agent.update_device(device)
         except Exception as e:
             self.log.info('get-channel-exception', exc=str(e))
+            device = self.adapter_agent.get_device(device.id)
+            device.oper_status = OperStatus.FAILED
+            device.reason = 'Failed to connect to OLT'
+            self.adapter_agent.update_device(device)
+            self.pbc_factory.stopTrying()
+            reactor.callLater(5, self.activate, device)
+            return
+
+        device = self.adapter_agent.get_device(device.id)
+        self.log.info('connected-to-olt',
+                       device_id=device.id,
+                       ipv4=device.ipv4_address,
+                       port=self.pbc_port)
+
+        reactor.callLater(0, self.heartbeat, device.id, state='start')
 
         yield self.send_set_remote()
         yield self.send_connect_olt(0)
         yield self.send_activate_olt(0)
-
-        device.root = True
-        device.vendor = 'Broadcom'
-        device.model = 'bcm68620'
-        device.serial_number = device.ipv4_address
-        device.connect_status = ConnectStatus.REACHABLE
-        self.adapter_agent.update_device(device)
-
-        nni_port = Port(
-            port_no=2,
-            label='NNI facing Ethernet port',
-            type=Port.ETHERNET_NNI,
-            admin_state=AdminState.ENABLED,
-            oper_status=OperStatus.ACTIVE
-        )
-        self.adapter_agent.add_port(device.id, nni_port)
-        self.adapter_agent.add_port(device.id, Port(
-            port_no=1,
-            label='PON port',
-            type=Port.PON_OLT,
-            admin_state=AdminState.ENABLED,
-            oper_status=OperStatus.ACTIVE
-        ))
-
-        ld = LogicalDevice(
-            # not setting id and datapth_id will let the adapter agent pick id
-            desc=ofp_desc(
-                mfr_desc='cord project',
-                hw_desc='n/a',
-                sw_desc='logical device for Maple-based PON',
-                serial_num=uuid4().hex,
-                dp_desc='n/a'
-            ),
-            switch_features=ofp_switch_features(
-                n_buffers=256,  # TODO fake for now
-                n_tables=2,  # TODO ditto
-                capabilities=(  # TODO and ditto
-                    OFPC_FLOW_STATS
-                    | OFPC_TABLE_STATS
-                    | OFPC_PORT_STATS
-                    | OFPC_GROUP_STATS
-                )
-            ),
-            root_device_id=device.id
-        )
-        ld_initialized = self.adapter_agent.create_logical_device(ld)
-        cap = OFPPF_1GB_FD | OFPPF_FIBER
-        self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
-            id='nni',
-            ofp_port=ofp_port(
-                port_no=0,  # is 0 OK?
-                hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % 129),
-                name='nni',
-                config=0,
-                state=OFPPS_LIVE,
-                curr=cap,
-                advertised=cap,
-                peer=cap,
-                curr_speed=OFPPF_1GB_FD,
-                max_speed=OFPPF_1GB_FD
-            ),
-            device_id=device.id,
-            device_port_no=nni_port.port_no,
-            root_port=True
-        ))
-
-        device = self.adapter_agent.get_device(device.id)
-        device.parent_id = ld_initialized.id
-        device.oper_status = OperStatus.ACTIVE
-        self.adapter_agent.update_device(device)
-        self.logical_device_id = ld_initialized.id
-
         # register ONUS per uni port until done asynchronously
         for onu_no in [1]:
             vlan_id = self.get_vlan_from_onu(onu_no)
@@ -558,6 +692,7 @@
         self.log.info('registering-frameio')
         self.io_port = registry('frameio').open_port(
             self.interface, self.rcv_io, is_inband_frame)
+        self.log.info('olt-activated', device=device)
 
     def rcv_io(self, port, frame):
         self.log.info('received', iface_name=port.iface_name,
@@ -728,7 +863,7 @@
                       msg=msg)
 
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             yield remote.callRemote("send_omci",
                                     0,
                                     0,
@@ -757,7 +892,7 @@
     def send_configure_stats_collection_interval(self, olt_no, interval):
         self.log.info('configuring-stats-collect-interval', olt=olt_no, interval=interval)
         try:
-            remote = self.get_channel()
+            remote = yield self.get_channel()
             data = yield remote.callRemote('set_stats_collection_interval', interval)
             self.log.info('configured-stats-collect-interval', data=data)
         except Exception as e: