VOL-1080 Refactor OpenOLT adapter
Change-Id: If183d40b34983004cbf64ae7ebecfb78fa29e713
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 57025be..4fb8057 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -14,15 +14,14 @@
# limitations under the License.
#
-import structlog
import threading
+import binascii
import grpc
-import time
+import structlog
from twisted.internet import reactor
from scapy.layers.l2 import Ether, Dot1Q
-import binascii
-from transitions import Machine
+from transitions import Machine, State
from voltha.protos.device_pb2 import Port, Device
from voltha.protos.common_pb2 import OperStatus, AdminState, ConnectStatus
@@ -40,29 +39,50 @@
from voltha.protos.bbf_fiber_base_pb2 import VEnetConfig
import voltha.core.flow_decomposer as fd
-import openolt_platform as platform
-from openolt_flow_mgr import OpenOltFlowMgr, DEFAULT_MGMT_VLAN
-from openolt_alarms import OpenOltAlarmMgr
from voltha.adapters.openolt.openolt_statistics import OpenOltStatisticsMgr
-
-MAX_HEARTBEAT_MISS = 3
-HEARTBEAT_PERIOD = 1
-GRPC_TIMEOUT = 5
-
-"""
-OpenoltDevice represents an OLT.
-"""
+import voltha.adapters.openolt.openolt_platform as platform
+from voltha.adapters.openolt.openolt_flow_mgr import OpenOltFlowMgr, \
+ DEFAULT_MGMT_VLAN
+from voltha.adapters.openolt.openolt_alarms import OpenOltAlarmMgr
class OpenoltDevice(object):
+ """
+ OpenoltDevice state machine:
- states = ['up', 'down']
+ null ----> init ------> connected -----> up -----> down
+ ^ ^ | ^ | |
+ | | | | | |
+ | +-------------+ +---------+ |
+ | |
+ +-----------------------------------------+
+ """
+ # pylint: disable=too-many-instance-attributes
+ # pylint: disable=R0904
+ states = [
+ 'state_null',
+ 'state_init',
+ 'state_connected',
+ 'state_up',
+ 'state_down']
+
transitions = [
- {'trigger': 'olt_up', 'source': 'down', 'dest': 'up',
- 'before': 'olt_indication_up'},
- {'trigger': 'olt_down', 'source': 'up', 'dest': 'down',
- 'before': 'olt_indication_down'}
- ]
+ {'trigger': 'go_state_init',
+ 'source': ['state_null', 'state_connected', 'state_down'],
+ 'dest': 'state_init',
+ 'before': 'do_state_init'},
+ {'trigger': 'go_state_connected',
+ 'source': 'state_init',
+ 'dest': 'state_connected',
+ 'before': 'do_state_connected'},
+ {'trigger': 'go_state_up',
+ 'source': ['state_connected', 'state_down'],
+ 'dest': 'state_up',
+ 'before': 'do_state_up'},
+ {'trigger': 'go_state_down',
+ 'source': ['state_up'],
+ 'dest': 'state_down',
+ 'before': 'do_state_down'}]
def __init__(self, **kwargs):
super(OpenoltDevice, self).__init__()
@@ -83,22 +103,12 @@
# Update device
device.root = True
device.serial_number = self.host_and_port # FIXME
- device.connect_status = ConnectStatus.REACHABLE
+ device.connect_status = ConnectStatus.UNREACHABLE
device.oper_status = OperStatus.ACTIVATING
self.adapter_agent.update_device(device)
- # Initialize the OLT state machine
- self.machine = Machine(model=self, states=OpenoltDevice.states,
- transitions=OpenoltDevice.transitions,
- send_event=True, initial='down',
- ignore_invalid_triggers=True)
- self.machine.add_transition(trigger='olt_ind_up', source='down',
- dest='up')
- self.machine.add_transition(trigger='olt_ind_loss', source='up',
- dest='down')
-
# If logical device does not exist create it
- if len(device.parent_id) == 0:
+ if not device.parent_id:
dpid = '00:00:' + self.ip_hex(self.host_and_port.split(":")[0])
@@ -126,47 +136,115 @@
self.adapter_agent.reconcile_logical_device(
self.logical_device_id)
+ # Initialize the OLT state machine
+ self.machine = Machine(model=self, states=OpenoltDevice.states,
+ transitions=OpenoltDevice.transitions,
+ send_event=True, initial='state_null')
+ self.go_state_init()
+
+ def do_state_init(self, event):
# Initialize gRPC
self.channel = grpc.insecure_channel(self.host_and_port)
self.channel_ready_future = grpc.channel_ready_future(self.channel)
- self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
+ # Start indications thread
+ self.indications_thread_handle = threading.Thread(
+ target=self.indications_thread)
+ self.indications_thread_handle.daemon = True
+ self.indications_thread_handle.start()
+
+ '''
+ # FIXME - Move to oper_up state without connecting to OLT?
+ if is_reconciliation:
+ # Put state machine in state up
+ reactor.callFromThread(self.go_state_up, reconciliation=True)
+ '''
+
+ self.log.debug('openolt-device-created', device_id=self.device_id)
+
+ def do_state_connected(self, event):
+ device = self.adapter_agent.get_device(self.device_id)
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
self.flow_mgr = OpenOltFlowMgr(self.log, self.stub, self.device_id)
self.alarm_mgr = OpenOltAlarmMgr(self.log)
self.stats_mgr = OpenOltStatisticsMgr(self, self.log)
- # Indications thread plcaholder (started by heartbeat thread)
- self.indications_thread = None
- self.indications_thread_active = False
+ def do_state_up(self, event):
+ device = self.adapter_agent.get_device(self.device_id)
- # Start heartbeat thread
- self.heartbeat_thread = threading.Thread(target=self.heartbeat)
- self.heartbeat_thread.setDaemon(True)
- self.heartbeat_thread_active = True
- self.heartbeat_miss = 0
- self.heartbeat_signature = None
- self.heartbeat_thread.start()
+ # Update phys OF device
+ device.parent_id = self.logical_device_id
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
- if is_reconciliation:
- # Put state machine in state up
- reactor.callFromThread(self.olt_up, reconciliation=True)
+ def do_state_down(self, event):
+ self.log.debug("do_state_down")
+ oper_state = OperStatus.UNKNOWN
+ connect_state = ConnectStatus.UNREACHABLE
- self.log.debug('openolt-device-created', device_id=self.device_id)
+ # Propagating to the children
- def process_indications(self):
+ # Children ports
+ child_devices = self.adapter_agent.get_child_devices(self.device_id)
+ for onu_device in child_devices:
+ uni_no = platform.mk_uni_port_num(
+ onu_device.proxy_address.channel_id,
+ onu_device.proxy_address.onu_id)
+ uni_name = self.port_name(uni_no, Port.ETHERNET_UNI,
+ serial_number=onu_device.serial_number)
+ self.onu_ports_down(onu_device, uni_no, uni_name, oper_state)
+ # Children devices
+ self.adapter_agent.update_child_devices_state(
+ self.device_id, oper_status=oper_state,
+ connect_status=connect_state)
+ # Device Ports
+ device_ports = self.adapter_agent.get_ports(self.device_id,
+ Port.ETHERNET_NNI)
+ logical_ports_ids = [port.label for port in device_ports]
+ device_ports += self.adapter_agent.get_ports(self.device_id,
+ Port.PON_OLT)
+
+ for port in device_ports:
+ port.oper_status = oper_state
+ self.adapter_agent.add_port(self.device_id, port)
+
+ # Device logical port
+ for logical_port_id in logical_ports_ids:
+ logical_port = self.adapter_agent.get_logical_port(
+ self.logical_device_id, logical_port_id)
+ logical_port.ofp_port.state = OFPPS_LINK_DOWN
+ self.adapter_agent.update_logical_port(self.logical_device_id,
+ logical_port)
+
+ # Device
+ device = self.adapter_agent.get_device(self.device_id)
+ device.oper_status = oper_state
+ device.connect_status = connect_state
+
+ self.adapter_agent.update_device(device)
+
+ def indications_thread(self):
self.log.debug('starting-indications-thread')
+ self.log.debug('connecting to olt', device_id=self.device_id)
+ self.channel_ready_future.result() # blocking call
+ self.log.debug('connected to olt', device_id=self.device_id)
+ self.go_state_connected()
self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
- while self.indications_thread_active:
+ while True:
try:
# get the next indication from olt
ind = next(self.indications)
except Exception as e:
- self.log.warn('GRPC-connection-lost-stoping-indication-thread',
- error=e)
- self.indications_thread_active = False
+ self.log.warn('gRPC connection lost', error=e)
+ reactor.callFromThread(self.go_state_down)
+ reactor.callFromThread(self.go_state_init)
+ break
else:
self.log.debug("rx indication", indication=ind)
@@ -189,97 +267,23 @@
reactor.callFromThread(self.packet_indication, ind.pkt_ind)
elif ind.HasField('port_stats'):
reactor.callFromThread(
- self.stats_mgr.port_statistics_indication,
- ind.port_stats)
+ self.stats_mgr.port_statistics_indication,
+ ind.port_stats)
elif ind.HasField('flow_stats'):
reactor.callFromThread(
- self.stats_mgr.flow_statistics_indication,
- ind.flow_stats)
+ self.stats_mgr.flow_statistics_indication,
+ ind.flow_stats)
elif ind.HasField('alarm_ind'):
reactor.callFromThread(self.alarm_mgr.process_alarms,
ind.alarm_ind)
else:
self.log.warn('unknown indication type')
- self.log.debug('stopping-indications-thread', device_id=self.device_id)
-
def olt_indication(self, olt_indication):
if olt_indication.oper_state == "up":
- self.olt_up(ind=olt_indication)
+ self.go_state_up()
elif olt_indication.oper_state == "down":
- self.olt_down(ind=olt_indication)
-
- def olt_indication_up(self, event):
- olt_indication = event.kwargs.get('ind', None)
- is_reconciliation = event.kwargs.get('reconciliation', False)
- self.log.debug("olt indication", olt_ind=olt_indication,
- reconciliation=is_reconciliation)
-
- device = self.adapter_agent.get_device(self.device_id)
-
- # Update phys OF device
- device.parent_id = self.logical_device_id
- device.oper_status = OperStatus.ACTIVE
- self.adapter_agent.update_device(device)
-
- def olt_indication_down(self, event):
- olt_indication = event.kwargs.get('ind', None)
- new_admin_state = event.kwargs.get('admin_state', None)
- new_oper_state = event.kwargs.get('oper_state', None)
- new_connect_state = event.kwargs.get('connect_state', None)
- self.log.debug("olt indication", olt_ind=olt_indication,
- admin_state=new_admin_state, oper_state=new_oper_state,
- connect_state=new_connect_state)
-
- # Propagating to the children
-
- # Children ports
- child_devices = self.adapter_agent.get_child_devices(self.device_id)
- for onu_device in child_devices:
- uni_no = platform.mk_uni_port_num(
- onu_device.proxy_address.channel_id,
- onu_device.proxy_address.onu_id)
- uni_name = self.port_name(uni_no, Port.ETHERNET_UNI,
- serial_number=onu_device.serial_number)
-
- self.onu_ports_down(onu_device, uni_no, uni_name, new_oper_state)
- # Children devices
- self.adapter_agent.update_child_devices_state(
- self.device_id, oper_status=new_oper_state,
- connect_status=ConnectStatus.UNREACHABLE,
- admin_state=new_admin_state)
- # Device Ports
- device_ports = self.adapter_agent.get_ports(self.device_id,
- Port.ETHERNET_NNI)
- logical_ports_ids = [port.label for port in device_ports]
- device_ports += self.adapter_agent.get_ports(self.device_id,
- Port.PON_OLT)
-
- for port in device_ports:
- if new_admin_state is not None:
- port.admin_state = new_admin_state
- if new_oper_state is not None:
- port.oper_status = new_oper_state
- self.adapter_agent.add_port(self.device_id, port)
-
- # Device logical port
- for logical_port_id in logical_ports_ids:
- logical_port = self.adapter_agent.get_logical_port(
- self.logical_device_id, logical_port_id)
- logical_port.ofp_port.state = OFPPS_LINK_DOWN
- self.adapter_agent.update_logical_port(self.logical_device_id,
- logical_port)
-
- # Device
- device = self.adapter_agent.get_device(self.device_id)
- if new_admin_state is not None:
- device.admin_state = new_admin_state
- if new_oper_state is not None:
- device.oper_status = new_oper_state
- if new_connect_state is not None:
- device.connect_status = new_connect_state
-
- self.adapter_agent.update_device(device)
+ self.go_state_down()
def intf_indication(self, intf_indication):
self.log.debug("intf indication", intf_id=intf_indication.intf_id,
@@ -390,9 +394,9 @@
try:
serial_number_str = self.stringify_serial_number(
onu_indication.serial_number)
- except:
+ except Exception as e:
serial_number_str = None
-
+
if serial_number_str is not None:
onu_device = self.adapter_agent.get_child_device(
self.device_id,
@@ -616,76 +620,114 @@
logical_port_no=logical_port_num)
self.adapter_agent.send_packet_in(packet=str(pkt), **kw)
- def olt_reachable(self):
- device = self.adapter_agent.get_device(self.device_id)
- device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.update_device(device)
- # Not changing its child devices state, we cannot guaranty that
+ def port_statistics_indication(self, port_stats):
+ self.log.info('port-stats-collected', stats=port_stats)
+ self.ports_statistics_kpis(port_stats)
+ # FIXME : only the first uplink is a logical port
+ if port_stats.intf_id == 128:
+ # ONOS update
+ self.update_logical_port_stats(port_stats)
+ # FIXME: Discard other uplinks, they do not exist as an object
+ if port_stats.intf_id in [129, 130, 131]:
+ self.log.debug('those uplinks are not created')
+ return
+ # update port object stats
+ port = self.adapter_agent.get_port(self.device_id,
+ port_no=port_stats.intf_id)
- def heartbeat(self):
+ if port is None:
+ self.log.warn('port associated with this stats does not exist')
+ return
- # block till gRPC connection is complete
- self.channel_ready_future.result()
+ port.rx_packets = port_stats.rx_packets
+ port.rx_bytes = port_stats.rx_bytes
+ port.rx_errors = port_stats.rx_error_packets
+ port.tx_packets = port_stats.tx_packets
+ port.tx_bytes = port_stats.tx_bytes
+ port.tx_errors = port_stats.tx_error_packets
- while self.heartbeat_thread_active:
+ # Add port does an update if port exists
+ self.adapter_agent.add_port(self.device_id, port)
- try:
- heartbeat = self.stub.HeartbeatCheck(openolt_pb2.Empty(),
- timeout=GRPC_TIMEOUT)
- except Exception as e:
- self.heartbeat_miss += 1
- self.log.warn('heartbeat-miss',
- missed_heartbeat=self.heartbeat_miss, error=e)
- if self.heartbeat_miss == MAX_HEARTBEAT_MISS:
- self.log.error('lost-connectivity-to-olt')
- # TODO : send alarm/notify monitoring system
- # Using reactor to synchronize update
- # flagging it as unreachable and in unknow state
- reactor.callFromThread(
- self.olt_down,
- oper_state=OperStatus.UNKNOWN,
- connect_state=ConnectStatus.UNREACHABLE)
+ def flow_statistics_indication(self, flow_stats):
+ self.log.info('flow-stats-collected', stats=flow_stats)
+ # TODO: send to kafka ?
+ # UNTESTED : the openolt driver does not yet provide flow stats
+ self.adapter_agent.update_flow_stats(
+ self.logical_device_id,
+ flow_id=flow_stats.flow_id,
+ packet_count=flow_stats.tx_packets,
+ byte_count=flow_stats.tx_bytes)
- else:
- # heartbeat received
- if self.heartbeat_signature is None:
- # Initialize heartbeat signature
- self.heartbeat_signature = heartbeat.heartbeat_signature
- self.log.debug(
- 'heartbeat-signature',
- device_id=self.device_id,
- heartbeat_signature=self.heartbeat_signature)
- # Check if signature is different
- if self.heartbeat_signature != heartbeat.heartbeat_signature:
- # OLT has rebooted
- self.log.warn('OLT-was-rebooted', device_id=self.device_id)
- # TODO: notify monitoring system
- self.heartbeat_signature = heartbeat.heartbeat_signature
+ def ports_statistics_kpis(self, port_stats):
+ pm_data = {}
+ pm_data["rx_bytes"] = port_stats.rx_bytes
+ pm_data["rx_packets"] = port_stats.rx_packets
+ pm_data["rx_ucast_packets"] = port_stats.rx_ucast_packets
+ pm_data["rx_mcast_packets"] = port_stats.rx_mcast_packets
+ pm_data["rx_bcast_packets"] = port_stats.rx_bcast_packets
+ pm_data["rx_error_packets"] = port_stats.rx_error_packets
+ pm_data["tx_bytes"] = port_stats.tx_bytes
+ pm_data["tx_packets"] = port_stats.tx_packets
+ pm_data["tx_ucast_packets"] = port_stats.tx_ucast_packets
+ pm_data["tx_mcast_packets"] = port_stats.tx_mcast_packets
+ pm_data["tx_bcast_packets"] = port_stats.tx_bcast_packets
+ pm_data["tx_error_packets"] = port_stats.tx_error_packets
+ pm_data["rx_crc_errors"] = port_stats.rx_crc_errors
+ pm_data["bip_errors"] = port_stats.bip_errors
- else:
- self.log.debug('valid-heartbeat-received')
+ prefix = 'voltha.openolt.{}'.format(self.device_id)
+ # FIXME
+ if port_stats.intf_id < 132:
+ prefixes = {
+ prefix + '{}.nni'.format(port_stats.intf_id): MetricValuePairs(
+ metrics=pm_data)
+ }
+ else:
+ prefixes = {
+ prefix + '.pon.{}'.format(platform.intf_id_from_pon_port_no(
+ port_stats.intf_id)): MetricValuePairs(
+ metrics=pm_data)
+ }
- if self.heartbeat_miss > MAX_HEARTBEAT_MISS:
- self.log.info('OLT-connection-restored')
- # TODO : suppress alarm/notify monitoring system
- # flagging it as reachable again
- reactor.callFromThread(self.olt_reachable)
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=port_stats.timestamp,
+ prefixes=prefixes)
+ self.adapter_agent.submit_kpis(kpi_event)
- if not self.indications_thread_active:
- self.log.info('(re)starting-indications-thread')
- # reset indications thread
- self.indications_thread = threading.Thread(
- target=self.process_indications)
- self.indications_thread.setDaemon(True)
- self.indications_thread_active = True
- self.indications_thread.start()
+ def update_logical_port_stats(self, port_stats):
+ # FIXME
+ label = 'nni-{}'.format(port_stats.intf_id)
+ try:
+ logical_port = self.adapter_agent.get_logical_port(
+ self.logical_device_id, label)
+ except KeyError as e:
+ self.log.warn('logical port was not found, it may not have been '
+ 'created yet', exception=e)
+ logical_port = None
- self.heartbeat_miss = 0
+ if logical_port is None:
+ self.log.error('logical-port-is-None',
+ logical_device_id=self.logical_device_id,
+ label=label,
+ port_stats=port_stats)
+ return
- time.sleep(HEARTBEAT_PERIOD)
+ self.log.debug('before', port=logical_port)
- self.log.debug('stopping-heartbeat-thread', device_id=self.device_id)
+ logical_port.ofp_port_stats.rx_packets = port_stats.rx_packets
+ logical_port.ofp_port_stats.rx_bytes = port_stats.rx_bytes
+ logical_port.ofp_port_stats.tx_packets = port_stats.tx_packets
+ logical_port.ofp_port_stats.tx_bytes = port_stats.tx_bytes
+ logical_port.ofp_port_stats.rx_errors = port_stats.rx_error_packets
+ logical_port.ofp_port_stats.tx_errors = port_stats.tx_error_packets
+ logical_port.ofp_port_stats.rx_crc_err = port_stats.rx_crc_errors
+ self.log.debug('after', port=logical_port)
+
+ self.adapter_agent.update_logical_port(self.logical_device_id,
+ logical_port)
def packet_out(self, egress_port, msg):
pkt = Ether(msg)
@@ -911,17 +953,13 @@
# but we are not doing that presently
# Bring OLT down
- self.olt_down(oper_state=OperStatus.UNKNOWN,
- admin_state=AdminState.DISABLED,
- connect_state=ConnectStatus.UNREACHABLE)
+ self.go_state_down()
def delete(self):
self.log.info('delete-olt', device_id=self.device_id)
# Stop the grpc communication threads
self.log.info('stopping-grpc-threads', device_id=self.device_id)
- self.indications_thread_active = False
- self.heartbeat_thread_active = False
# Close the grpc channel
# self.log.info('unsubscribing-grpc-channel', device_id=self.device_id)
@@ -933,7 +971,7 @@
self.log.info('reenable-olt', device_id=self.device_id)
# Bring up OLT
- self.olt_up()
+ self.go_state_up()
# Enable all child devices
self.log.info('enabling-child-devices', device_id=self.device_id)