Add heartbeat between Maple OLT adapter and hardware.
Change-Id: I4453375b5c181318056bd0a233966fde9559e15c
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 6d5183c..988e6f2 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -20,10 +20,10 @@
from uuid import uuid4
import arrow
-import grpc
import structlog
from scapy.layers.l2 import Ether, Dot1Q
from twisted.internet import reactor
+from twisted.internet.protocol import ReconnectingClientFactory
from twisted.spread import pb
from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
from zope.interface import implementer
@@ -237,27 +237,74 @@
handler.packet_out(egress_port_no, msg)
+class MaplePBClientFactory(pb.PBClientFactory, ReconnectingClientFactory):
+ channel = None
+ maxDelay = 60
+ initialDelay = 15
+
+ def clientConnectionMade(self, broker):
+ log.info('pb-client-connection-made')
+ pb.PBClientFactory.clientConnectionMade(self, broker)
+ ReconnectingClientFactory.resetDelay(self)
+
+ def clientConnectionLost(self, connector, reason, reconnecting=0):
+ log.info('pb-client-connection-lost')
+ pb.PBClientFactory.clientConnectionLost(self, connector, reason, reconnecting=1)
+ ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
+ log.info('pb-client-connection-lost-retrying')
+
+ def clientConnectionFailed(self, connector, reason):
+ log.info('pb-client-connection-failed')
+ pb.PBClientFactory.clientConnectionFailed(self, connector, reason)
+ ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
+ log.info('pb-client-connection-failed-retrying')
+
+ def disconnect(self, stopTrying=0):
+ if stopTrying:
+ ReconnectingClientFactory.stopTrying(self)
+ pb.PBClientFactory.disconnect(self)
+
+ def channel_disconnected(self, channel):
+ log.info('pb-channel-disconnected', channel=channel)
+ self.disconnect()
+
+ @inlineCallbacks
+ def getChannel(self):
+ if self.channel is None:
+ try:
+ self.channel = yield self.getRootObject()
+ self.channel.notifyOnDisconnect(self.channel_disconnected)
+ except Exception as e:
+ log.info('pb-client-failed-to-get-channel', exc=str(e))
+ self.channel = None
+ returnValue(self.channel)
+
+
class MapleOltHandler(object):
def __init__(self, adapter, device_id):
self.adapter = adapter
self.adapter_agent = adapter.adapter_agent
self.device_id = device_id
self.log = structlog.get_logger(device_id=device_id)
- self.channel = None
self.io_port = None
self.logical_device_id = None
self.interface = registry('main').get_args().interface
- self.pbc_factory = pb.PBClientFactory()
+ self.pbc_factory = MaplePBClientFactory()
self.pbc_port = 24498
self.tx_id = 0
self.rx_handler = MapleOltRxHandler(self.device_id, self.adapter)
+ self.heartbeat_count = 0
+ self.heartbeat_miss = 0
+ self.heartbeat_interval = 1
+ self.heartbeat_failed_limit = 3
+ self.command_timeout = 5
def __del__(self):
if self.io_port is not None:
registry('frameio').close_port(self.io_port)
def get_channel(self):
- return self.channel
+ return self.pbc_factory.getChannel()
def get_vlan_from_onu(self, onu):
vlan = onu + 1024
@@ -274,7 +321,7 @@
self.log.info('setting-remote-ip-port', ip=srv_ip, port=srv_port)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('set_remote', srv_ip, srv_port)
self.log.info('set-remote', data=data, ip=srv_ip, port=srv_port)
except Exception as e:
@@ -288,7 +335,7 @@
ip_proto=ip_proto,
dst_port=dst_port)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('config_classifier',
olt_no,
etype,
@@ -307,7 +354,7 @@
ip_proto=ip_proto,
dst_port=dst_port)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('config_acflow',
olt_no,
onu_no,
@@ -323,7 +370,7 @@
def send_connect_olt(self, olt_no):
self.log.info('connecting-to-olt', olt=olt_no)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('connect_olt', olt_no)
self.log.info('connected-to-olt', data=data)
except Exception as e:
@@ -333,7 +380,7 @@
def send_activate_olt(self, olt_no):
self.log.info('activating-olt', olt=olt_no)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('activate_olt', olt_no)
self.log.info('activated-olt', data=data)
except Exception as e:
@@ -347,7 +394,7 @@
serial=serial_no,
vendor=vendor_no)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('create_onu',
olt_no,
onu_no,
@@ -364,7 +411,7 @@
onu=onu_no,
alloc_id=alloc_id)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('configure_alloc_id',
olt_no,
onu_no,
@@ -380,7 +427,7 @@
onu=onu_no,
unicast_gem_port=uni_gem)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('configure_unicast_gem',
olt_no,
onu_no,
@@ -396,7 +443,7 @@
onu=onu_no,
multicast_gem_port=multi_gem)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('configure_multicast_gem',
olt_no,
onu_no,
@@ -414,7 +461,7 @@
unicast_gem_port=uni_gem,
multicast_gem_port=multi_gem)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('configure_onu',
olt_no,
onu_no,
@@ -429,111 +476,198 @@
def send_activate_onu(self, olt_no, onu_no):
self.log.info('activating-onu', olt=olt_no, onu=onu_no)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('activate_onu', olt_no, onu_no)
self.log.info('activated-onu', data=data)
except Exception as e:
self.log.info('activate-onu-exception', exc=str(e))
- @inlineCallbacks
- def activate(self, device):
- self.log.info('activating')
- if not device.ipv4_address:
- device.oper_status = OperStatus.FAILED
- device.reason = 'No ipv4_address field provided'
- self.adapter_agent.update_device(device)
+ @inlineCallbacks
+ def heartbeat(self, device_id, state='run'):
+ """Heartbeat OLT hardware
+
+ Call PB remote method 'heartbeat' to verify connectivity to OLT hardware.
+ If heartbeat missed self.heartbeat_failed_limit times OLT adapter is set
+ FAILED/UNREACHABLE.
+ No further action from VOLTHA core is expected as result of heartbeat failure.
+ Heartbeat continues following failure and once connectivity is restored adapter
+ state will be set to ACTIVE/REACHABLE
+
+ Arguments:
+ device_id: adapter device id
+ state: desired state (stop, start, run)
+ """
+
+ self.log.debug('olt-heartbeat', device=device_id, state=state, count=self.heartbeat_count)
+
+ def add_timeout(d, duration):
+ return reactor.callLater(duration, d.cancel)
+
+ def cancel_timeout(t):
+ if t.active():
+ t.cancel()
+ self.log.debug('olt-heartbeat-timeout-cancelled')
+
+ if state == 'stop':
return
+ if state == 'start':
+ self.heartbeat_count = 0
+ self.heartbeat_miss = 0
+
+ try:
+ d = self.get_channel()
+ timeout = add_timeout(d, self.command_timeout)
+ remote = yield d
+ cancel_timeout(timeout)
+
+ d = remote.callRemote('heartbeat', self.heartbeat_count)
+ timeout = add_timeout(d, self.command_timeout)
+ data = yield d
+ cancel_timeout(timeout)
+ except Exception as e:
+ data = -1
+ self.log.info('olt-heartbeat-exception', data=data, count=self.heartbeat_miss, exc=str(e))
+
+ if data != self.heartbeat_count:
+ # something is not right
+ self.heartbeat_miss += 1
+ self.log.info('olt-heartbeat-miss', data=data, count=self.heartbeat_count, miss=self.heartbeat_miss)
+ else:
+ if self.heartbeat_miss > 0:
+ self.heartbeat_miss = 0
+ _device = self.adapter_agent.get_device(device_id)
+ _device.connect_status = ConnectStatus.REACHABLE
+ _device.oper_status = OperStatus.ACTIVE
+ _device.reason = ''
+ self.adapter_agent.update_device(_device)
+
+ _device = self.adapter_agent.get_device(device_id)
+ if (self.heartbeat_miss >= self.heartbeat_failed_limit) and (_device.connect_status == ConnectStatus.REACHABLE):
+ self.log.info('olt-heartbeat-failed', data=data, count=self.heartbeat_miss)
+ _device = self.adapter_agent.get_device(device_id)
+ _device.connect_status = ConnectStatus.UNREACHABLE
+ _device.oper_status = OperStatus.FAILED
+ _device.reason = 'Lost connectivity to OLT'
+ self.adapter_agent.update_device(_device)
+
+ self.heartbeat_count += 1
+ reactor.callLater(self.heartbeat_interval, self.heartbeat, device_id)
+
+ @inlineCallbacks
+ def activate(self, device):
+ self.log.info('activating-olt', device=device)
+ if self.logical_device_id is None:
+ if not device.ipv4_address:
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'No ipv4_address field provided'
+ self.adapter_agent.update_device(device)
+ return
+
+ device.root = True
+ device.vendor = 'Broadcom'
+ device.model = 'bcm68620'
+ device.serial_number = device.ipv4_address
+ self.adapter_agent.update_device(device)
+
+ nni_port = Port(
+ port_no=2,
+ label='NNI facing Ethernet port',
+ type=Port.ETHERNET_NNI,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ )
+ self.adapter_agent.add_port(device.id, nni_port)
+ self.adapter_agent.add_port(device.id, Port(
+ port_no=1,
+ label='PON port',
+ type=Port.PON_OLT,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ ))
+
+ ld = LogicalDevice(
+ # not setting id and datapth_id will let the adapter agent pick id
+ desc=ofp_desc(
+ mfr_desc='cord project',
+ hw_desc='n/a',
+ sw_desc='logical device for Maple-based PON',
+ serial_num=uuid4().hex,
+ dp_desc='n/a'
+ ),
+ switch_features=ofp_switch_features(
+ n_buffers=256, # TODO fake for now
+ n_tables=2, # TODO ditto
+ capabilities=( # TODO and ditto
+ OFPC_FLOW_STATS
+ | OFPC_TABLE_STATS
+ | OFPC_PORT_STATS
+ | OFPC_GROUP_STATS
+ )
+ ),
+ root_device_id=device.id
+ )
+ ld_initialized = self.adapter_agent.create_logical_device(ld)
+ cap = OFPPF_1GB_FD | OFPPF_FIBER
+ self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
+ id='nni',
+ ofp_port=ofp_port(
+ port_no=0, # is 0 OK?
+ hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % 129),
+ name='nni',
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_1GB_FD,
+ max_speed=OFPPF_1GB_FD
+ ),
+ device_id=device.id,
+ device_port_no=nni_port.port_no,
+ root_port=True
+ ))
+
+ device = self.adapter_agent.get_device(device.id)
+ device.parent_id = ld_initialized.id
+ device.connect_status = ConnectStatus.UNREACHABLE
+ device.oper_status = OperStatus.ACTIVATING
+ self.adapter_agent.update_device(device)
+ self.logical_device_id = ld_initialized.id
+
+ device = self.adapter_agent.get_device(device.id)
self.log.info('initiating-connection-to-olt',
device_id=device.id,
ipv4=device.ipv4_address,
port=self.pbc_port)
- reactor.connectTCP(device.ipv4_address, self.pbc_port, self.pbc_factory)
try:
- self.channel = yield self.pbc_factory.getRootObject()
- self.log.info('connected-to-olt',
- device_id=device.id,
- ipv4=device.ipv4_address,
- port=self.pbc_port)
+ reactor.connectTCP(device.ipv4_address, self.pbc_port, self.pbc_factory)
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self.adapter_agent.update_device(device)
except Exception as e:
self.log.info('get-channel-exception', exc=str(e))
+ device = self.adapter_agent.get_device(device.id)
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'Failed to connect to OLT'
+ self.adapter_agent.update_device(device)
+ self.pbc_factory.stopTrying()
+ reactor.callLater(5, self.activate, device)
+ return
+
+ device = self.adapter_agent.get_device(device.id)
+ self.log.info('connected-to-olt',
+ device_id=device.id,
+ ipv4=device.ipv4_address,
+ port=self.pbc_port)
+
+ reactor.callLater(0, self.heartbeat, device.id, state='start')
yield self.send_set_remote()
yield self.send_connect_olt(0)
yield self.send_activate_olt(0)
-
- device.root = True
- device.vendor = 'Broadcom'
- device.model = 'bcm68620'
- device.serial_number = device.ipv4_address
- device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.update_device(device)
-
- nni_port = Port(
- port_no=2,
- label='NNI facing Ethernet port',
- type=Port.ETHERNET_NNI,
- admin_state=AdminState.ENABLED,
- oper_status=OperStatus.ACTIVE
- )
- self.adapter_agent.add_port(device.id, nni_port)
- self.adapter_agent.add_port(device.id, Port(
- port_no=1,
- label='PON port',
- type=Port.PON_OLT,
- admin_state=AdminState.ENABLED,
- oper_status=OperStatus.ACTIVE
- ))
-
- ld = LogicalDevice(
- # not setting id and datapth_id will let the adapter agent pick id
- desc=ofp_desc(
- mfr_desc='cord project',
- hw_desc='n/a',
- sw_desc='logical device for Maple-based PON',
- serial_num=uuid4().hex,
- dp_desc='n/a'
- ),
- switch_features=ofp_switch_features(
- n_buffers=256, # TODO fake for now
- n_tables=2, # TODO ditto
- capabilities=( # TODO and ditto
- OFPC_FLOW_STATS
- | OFPC_TABLE_STATS
- | OFPC_PORT_STATS
- | OFPC_GROUP_STATS
- )
- ),
- root_device_id=device.id
- )
- ld_initialized = self.adapter_agent.create_logical_device(ld)
- cap = OFPPF_1GB_FD | OFPPF_FIBER
- self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
- id='nni',
- ofp_port=ofp_port(
- port_no=0, # is 0 OK?
- hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % 129),
- name='nni',
- config=0,
- state=OFPPS_LIVE,
- curr=cap,
- advertised=cap,
- peer=cap,
- curr_speed=OFPPF_1GB_FD,
- max_speed=OFPPF_1GB_FD
- ),
- device_id=device.id,
- device_port_no=nni_port.port_no,
- root_port=True
- ))
-
- device = self.adapter_agent.get_device(device.id)
- device.parent_id = ld_initialized.id
- device.oper_status = OperStatus.ACTIVE
- self.adapter_agent.update_device(device)
- self.logical_device_id = ld_initialized.id
-
# register ONUS per uni port until done asynchronously
for onu_no in [1]:
vlan_id = self.get_vlan_from_onu(onu_no)
@@ -558,6 +692,7 @@
self.log.info('registering-frameio')
self.io_port = registry('frameio').open_port(
self.interface, self.rcv_io, is_inband_frame)
+ self.log.info('olt-activated', device=device)
def rcv_io(self, port, frame):
self.log.info('received', iface_name=port.iface_name,
@@ -728,7 +863,7 @@
msg=msg)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
yield remote.callRemote("send_omci",
0,
0,
@@ -757,7 +892,7 @@
def send_configure_stats_collection_interval(self, olt_no, interval):
self.log.info('configuring-stats-collect-interval', olt=olt_no, interval=interval)
try:
- remote = self.get_channel()
+ remote = yield self.get_channel()
data = yield remote.callRemote('set_stats_collection_interval', interval)
self.log.info('configured-stats-collect-interval', data=data)
except Exception as e: