Adding OMCI Tx/Rx support to Maple OLT and Broadcom ONU adapters.
Change-Id: I73f3f1caa3942ddf003b56ebb8b7c9913510644f
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index e846de3..2d74946 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -15,28 +15,31 @@
#
"""
-Mock device adapter for testing.
+Broadcom OLT/ONU adapter.
"""
-from uuid import uuid4
+from uuid import uuid4
import structlog
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, DeferredQueue
+from twisted.internet.defer import DeferredQueue, inlineCallbacks
from zope.interface import implementer
-from common.utils.asleep import asleep
from voltha.adapters.interface import IAdapterInterface
from voltha.core.logical_device_agent import mac_str_to_tuple
-from voltha.protos.adapter_pb2 import Adapter, AdapterConfig
-from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Device, Port
-from voltha.protos.health_pb2 import HealthStatus
+from voltha.protos import third_party
+from voltha.protos.adapter_pb2 import Adapter
+from voltha.protos.adapter_pb2 import AdapterConfig
from voltha.protos.common_pb2 import LogLevel, OperStatus, ConnectStatus, \
AdminState
-from voltha.protos.logical_device_pb2 import LogicalDevice, LogicalPort
-from voltha.protos.openflow_13_pb2 import ofp_desc, ofp_port, OFPPF_1GB_FD, \
- OFPPF_FIBER, OFPPS_LIVE, ofp_switch_features, OFPC_PORT_STATS, \
- OFPC_GROUP_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS
+from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Port
+from voltha.protos.health_pb2 import HealthStatus
+from voltha.protos.logical_device_pb2 import LogicalPort
+from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, OFPPF_1GB_FD
+from voltha.protos.openflow_13_pb2 import ofp_port
+from common.frameio.frameio import hexify
+from voltha.extensions.omci.omci import *
+_ = third_party
log = structlog.get_logger()
@@ -47,7 +50,7 @@
supported_device_types = [
DeviceType(
- id='broadcom_onu',
+ id=name,
adapter=name,
accepts_bulk_flow_update=True
)
@@ -59,10 +62,10 @@
self.descriptor = Adapter(
id=self.name,
vendor='Voltha project',
- version='0.1',
+ version='0.4',
config=AdapterConfig(log_level=LogLevel.INFO)
)
- self.incoming_messages = DeferredQueue()
+ self.devices_handlers = dict() # device_id -> BroadcomOnuHandler()
def start(self):
log.debug('starting')
@@ -85,8 +88,9 @@
raise NotImplementedError()
def adopt_device(self, device):
- # We kick of a simulated activation scenario
- reactor.callLater(0.2, self._simulate_device_activation, device)
+ log.info('adopt_device', device_id=device.id)
+ self.devices_handlers[device.proxy_address.channel_id] = BroadcomOnuHandler(self, device.id)
+ reactor.callLater(0, self.devices_handlers[device.proxy_address.channel_id].activate, device)
return device
def abandon_device(self, device):
@@ -95,18 +99,60 @@
def deactivate_device(self, device):
raise NotImplementedError()
- @inlineCallbacks
- def _simulate_device_activation(self, device):
+ def update_flows_bulk(self, device, flows, groups):
+ log.info('bulk-flow-update', device_id=device.id,
+ flows=flows, groups=groups)
+ assert len(groups.items) == 0
+ handler = self.devices_handlers[device.proxy_address.channel_id]
+ return handler.update_flow_table(flows.items)
+
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ raise NotImplementedError()
+
+ def send_proxied_message(self, proxy_address, msg):
+ log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+
+ def receive_proxied_message(self, proxy_address, msg):
+ log.info('receive-proxied-message', proxy_address=proxy_address,
+ device_id=proxy_address.device_id, msg=hexify(msg))
+ handler = self.devices_handlers[proxy_address.channel_id]
+ handler.receive_message(msg)
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
+
+
+class BroadcomOnuHandler(object):
+
+ def __init__(self, adapter, device_id):
+ self.adapter = adapter
+ self.adapter_agent = adapter.adapter_agent
+ self.device_id = device_id
+ self.log = structlog.get_logger(device_id=device_id)
+ self.incoming_messages = DeferredQueue()
+ self.proxy_address = None
+ self.tx_id = 0
+
+ def receive_message(self, msg):
+ self.incoming_messages.put(msg)
+
+ def activate(self, device):
+ self.log.info('activating')
# first we verify that we got parent reference and proxy info
assert device.parent_id
assert device.proxy_address.device_id
assert device.proxy_address.channel_id
- # we pretend that we were able to contact the device and obtain
- # additional information about it
+ # register for proxied messages right away
+ self.proxy_address = device.proxy_address
+ self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
+ # populate device info
+ device.root = True
device.vendor = 'Broadcom'
- device.model = 'to be filled'
+ device.model ='n/a'
device.hardware_version = 'to be filled'
device.firmware_version = 'to be filled'
device.software_version = 'to be filled'
@@ -114,10 +160,9 @@
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
- # then shortly after we create some ports for the device
- yield asleep(0.05)
+ # register physical ports
uni_port = Port(
- port_no=0,
+ port_no=2,
label='UNI facing Ethernet port',
type=Port.ETHERNET_UNI,
admin_state=AdminState.ENABLED,
@@ -138,24 +183,14 @@
]
))
- # TODO adding vports to the logical device shall be done by agent?
- # then we create the logical device port that corresponds to the UNI
- # port of the device
- yield asleep(0.05)
-
- # obtain logical device id
+ # add uni port to logical device
parent_device = self.adapter_agent.get_device(device.parent_id)
logical_device_id = parent_device.parent_id
assert logical_device_id
-
- # we are going to use the proxy_address.channel_id as unique number
- # and name for the virtual ports, as this is guaranteed to be unique
- # in the context of the OLT port, so it is also unique in the context
- # of the logical device
port_no = device.proxy_address.channel_id
cap = OFPPF_1GB_FD | OFPPF_FIBER
self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
- id=str(port_no),
+ id='uni-{}'.format(port_no),
ofp_port=ofp_port(
port_no=port_no,
hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
@@ -172,50 +207,483 @@
device_port_no=uni_port.port_no
))
- # simulate a proxied message sending and receving a reply
- reply = yield self._simulate_message_exchange(device)
+ reactor.callLater(5, self.message_exchange)
- # and finally update to "ACTIVE"
device = self.adapter_agent.get_device(device.id)
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
- def update_flows_bulk(self, device, flows, groups):
- log.debug('bulk-flow-update', device_id=device.id,
- flows=flows, groups=groups)
+ @inlineCallbacks
+ def update_flow_table(self, flows):
- def update_flows_incrementally(self, device, flow_changes, group_changes):
- raise NotImplementedError()
+ # we need to proxy through the OLT to get to the ONU
- def send_proxied_message(self, proxy_address, msg):
- raise NotImplementedError()
+ # reset response queue
+ while self.incoming_messages.pending:
+ yield self.incoming_messages.get()
- def receive_proxied_message(self, proxy_address, msg):
- # just place incoming message to a list
- self.incoming_messages.put((proxy_address, msg))
+ msg = FlowTable(
+ port=self.proxy_address.channel_id,
+ flows=flows
+ )
+ self.adapter_agent.send_proxied_message(self.proxy_address, msg)
+
+ yield self.incoming_messages.get()
+
+ def get_tx_id(self):
+ self.tx_id += 1
+ return self.tx_id
+
+ def send_omci_message(self, frame):
+ _frame = hexify(str(frame))
+ self.log.info('send-omci-message-%s' % _frame)
+ device = self.adapter_agent.get_device(self.device_id)
+ try:
+ self.adapter_agent.send_proxied_message(device.proxy_address, _frame)
+ except Exception as e:
+ self.log.info('send-omci-message-exception', exc=str(e))
+
+ def send_get_circuit_pack(self, entity_id=0):
+ frame = OmciFrame(
+ transaction_id=self.get_tx_id(),
+ message_type=OmciGet.message_id,
+ omci_message=OmciGet(
+ entity_class=CircuitPack.class_id,
+ entity_id=entity_id,
+ attributes_mask=CircuitPack.mask_for('vendor_id')
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_mib_reset(self, entity_id=0):
+ frame = OmciFrame(
+ transaction_id=self.get_tx_id(),
+ message_type=OmciMibReset.message_id,
+ omci_message=OmciMibReset(
+ entity_class=OntData.class_id,
+ entity_id=entity_id
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_create_gal_ethernet_profile(self, entity_id, max_gem_payload_size):
+ frame = OmciFrame(
+ transaction_id=self.get_tx_id(),
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=GalEthernetProfile.class_id,
+ entity_id=entity_id,
+ data=dict(
+ max_gem_payload_size=max_gem_payload_size
+ )
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_set_tcont(self, entity_id, alloc_id):
+ data = dict(
+ alloc_id=alloc_id
+ )
+ frame = OmciFrame(
+ transaction_id=self.get_tx_id(),
+ message_type=OmciSet.message_id,
+ omci_message=OmciSet(
+ entity_class=Tcont.class_id,
+ entity_id = entity_id,
+ attributes_mask=Tcont.mask_for(*data.keys()),
+ data=data
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_create_8021p_mapper_service_profile(self, entity_id):
+ frame = OmciFrame(
+ transaction_id = self.get_tx_id(),
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=Ieee8021pMapperServiceProfile.class_id,
+ entity_id=entity_id,
+ data=dict(
+ tp_pointer=OmciNullPointer,
+ interwork_tp_pointer_for_p_bit_priority_0=OmciNullPointer,
+ )
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_create_mac_bridge_service_profile(self, entity_id):
+ frame = OmciFrame(
+ transaction_id = self.get_tx_id(),
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=MacBridgeServiceProfile.class_id,
+ entity_id=entity_id,
+ data=dict(
+ spanning_tree_ind=False,
+ learning_ind=True,
+ priority=0x8000,
+ max_age=20 * 256,
+ hello_time=2 * 256,
+ forward_delay=15 * 256,
+ unknown_mac_address_discard=True
+ )
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_create_gem_port_network_ctp(self, entity_id, port_id, tcont_id):
+ frame = OmciFrame(
+ transaction_id = self.get_tx_id(),
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=GemPortNetworkCtp.class_id,
+ entity_id=entity_id,
+ data=dict(
+ port_id=port_id,
+ tcont_pointer=tcont_id,
+ direction=3,
+ traffic_management_pointer_upstream=0x100
+ )
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_create_multicast_gem_interworking_tp(self, entity_id, gem_port_net_ctp_id):
+ frame = OmciFrame(
+ transaction_id = self.get_tx_id(),
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=MulticastGemInterworkingTp.class_id,
+ entity_id=entity_id,
+ data=dict(
+ gem_port_network_ctp_pointer=gem_port_net_ctp_id,
+ interworking_option=0,
+ service_profile_pointer=0x1,
+ )
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_create_gem_inteworking_tp(self, entity_id, gem_port_net_ctp_id, service_profile_id):
+ frame = OmciFrame(
+ transaction_id = self.get_tx_id(),
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=GemInterworkingTp.class_id,
+ entity_id=entity_id,
+ data=dict(
+ gem_port_network_ctp_pointer=gem_port_net_ctp_id,
+ interworking_option=5,
+ service_profile_pointer=service_profile_id,
+ interworking_tp_pointer=0x0,
+ gal_profile_pointer=0x1
+ )
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_set_8021p_mapper_service_profile(self, entity_id, interwork_tp_id):
+ data = dict(
+ interwork_tp_pointer_for_p_bit_priority_0 = interwork_tp_id
+ )
+ frame = OmciFrame(
+ transaction_id = self.get_tx_id(),
+ message_type=OmciSet.message_id,
+ omci_message=OmciSet(
+ entity_class=Ieee8021pMapperServiceProfile.class_id,
+ entity_id=entity_id,
+ attributes_mask=Ieee8021pMapperServiceProfile.mask_for(
+ *data.keys()),
+ data=data
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_create_mac_bridge_port_configuration_data(self,
+ entity_id,
+ bridge_id,
+ port_id,
+ tp_type,
+ tp_id):
+ frame = OmciFrame(
+ transaction_id=self.get_tx_id(),
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=MacBridgePortConfigurationData.class_id,
+ entity_id=entity_id,
+ data=dict(
+ bridge_id_pointer = bridge_id,
+ port_num=port_id,
+ tp_type=tp_type,
+ tp_pointer = tp_id
+ )
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_create_vlan_tagging_filter_data(self, entity_id, vlan_id):
+ frame = OmciFrame(
+ transaction_id = self.get_tx_id(),
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=VlanTaggingFilterData.class_id,
+ entity_id=entity_id,
+ data=dict(
+ vlan_filter_0=vlan_id,
+ forward_operation=0x10,
+ number_of_entries=1
+ )
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_create_extended_vlan_tagging_operation_configuration_data(self, entity_id):
+ frame = OmciFrame(
+ transaction_id = self.get_tx_id(),
+ message_type=OmciCreate.message_id,
+ omci_message=OmciCreate(
+ entity_class=
+ ExtendedVlanTaggingOperationConfigurationData.class_id,
+ entity_id=entity_id,
+ data=dict(
+ association_type=10,
+ associated_me_pointer=0x401
+ )
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_set_extended_vlan_tagging_operation_tpid_configuration_data(self, entity_id, input_tpid, output_tpid):
+ data = dict(
+ input_tpid = input_tpid,
+ output_tpid = output_tpid,
+ downstream_mode=0, # inverse of upstream
+ )
+ frame = OmciFrame(
+ transaction_id = self.get_tx_id(),
+ message_type=OmciSet.message_id,
+ omci_message=OmciSet(
+ entity_class=\
+ ExtendedVlanTaggingOperationConfigurationData.class_id,
+ entity_id=entity_id,
+ attributes_mask= \
+ ExtendedVlanTaggingOperationConfigurationData.mask_for(
+ *data.keys()),
+ data=data
+ )
+ )
+ self.send_omci_message(frame)
+
+ def send_set_extended_vlan_tagging_operation_vlan_configuration_data(self,
+ entity_id,
+ filter_inner_vid,
+ treatment_inner_vid):
+ data = dict(
+ received_frame_vlan_tagging_operation_table=\
+ VlanTaggingOperation(
+ filter_outer_priority=15,
+ filter_inner_priority=8,
+ filter_inner_vid = filter_inner_vid,
+ filter_inner_tpid_de=5,
+ filter_ether_type=0,
+ treatment_tags_to_remove=1,
+ pad3=2,
+ treatment_outer_priority=15,
+ treatment_inner_priority=8,
+ treatment_inner_vid = treatment_inner_vid,
+ treatment_inner_tpid_de=4
+ )
+ )
+ frame = OmciFrame(
+ transaction_id = self.get_tx_id(),
+ message_type=OmciSet.message_id,
+ omci_message=OmciSet(
+ entity_class=\
+ ExtendedVlanTaggingOperationConfigurationData.class_id,
+ entity_id = entity_id,
+ attributes_mask= \
+ ExtendedVlanTaggingOperationConfigurationData.mask_for(
+ *data.keys()),
+ data=data
+ )
+ )
+ self.send_omci_message(frame)
@inlineCallbacks
- def _simulate_message_exchange(self, device):
+ def wait_for_response(self):
+ log.info('wait-for-response')
+ try:
+ response = yield self.incoming_messages.get()
+ resp = OmciFrame(response)
+ resp.show()
+ except Exception as e:
+ self.log.info('wait-for-response-exception', exc=str(e))
- # register for receiving async messages
- self.adapter_agent.register_for_proxied_messages(device.proxy_address)
-
+ @inlineCallbacks
+ def message_exchange(self):
+ log.info('message_exchange')
# reset incoming message queue
while self.incoming_messages.pending:
_ = yield self.incoming_messages.get()
# construct message
- msg = 'test message'
+ # MIB Reset - OntData - 0
+ self.send_mib_reset()
+ yield self.wait_for_response()
- # send message
- self.adapter_agent.send_proxied_message(device.proxy_address, msg)
+ # Create AR - GalEthernetProfile - 1
+ self.send_create_gal_ethernet_profile(1, 48)
+ yield self.wait_for_response()
- # wait till we detect incoming message
- yield self.incoming_messages.get()
+ # Set AR - TCont - 32768 - 1024
+ self.send_set_tcont(0x8000, 0x400)
+ yield self.wait_for_response()
- # by returning we allow the device to be shown as active, which
- # indirectly verified that message passing works
+ # Set AR - TCont - 32769 - 1025
+ self.send_set_tcont(0x8001, 0x401)
+ yield self.wait_for_response()
- def receive_packet_out(self, logical_device_id, egress_port_no, msg):
- log.info('packet-out', logical_device_id=logical_device_id,
- egress_port_no=egress_port_no, msg_len=len(msg))
+ # Set AR - TCont - 32770 - 1026
+ self.send_set_tcont(0x8002, 0x402)
+ yield self.wait_for_response()
+
+ # Set AR - TCont - 32771 - 1027
+ self.send_set_tcont(0x8003, 0x403)
+ yield self.wait_for_response()
+
+ # Create AR - 802.1pMapperServiceProfile - 32768
+ self.send_create_8021p_mapper_service_profile(0x8000)
+ yield self.wait_for_response()
+
+ # Create AR - 802.1pMapperServiceProfile - 32769
+ self.send_create_8021p_mapper_service_profile(0x8001)
+ yield self.wait_for_response()
+
+ # Create AR - 802.1pMapperServiceProfile - 32770
+ self.send_create_8021p_mapper_service_profile(0x8002)
+ yield self.wait_for_response()
+
+ # Create AR - 802.1pMapperServiceProfile - 32771
+ self.send_create_8021p_mapper_service_profile(0x8003)
+ yield self.wait_for_response()
+
+ # Create AR - MacBridgeServiceProfile - 513
+ self.send_create_mac_bridge_service_profile(0x201)
+ yield self.wait_for_response()
+
+ # Create AR - GemPortNetworkCtp - 256 - 1024 - 32768
+ self.send_create_gem_port_network_ctp(0x100, 0x400, 0x8000)
+ yield self.wait_for_response()
+
+ # Create AR - GemPortNetworkCtp - 257 - 1025 - 32769
+ self.send_create_gem_port_network_ctp(0x101, 0x401, 0x8001)
+ yield self.wait_for_response()
+
+ # Create AR - GemPortNetworkCtp - 258 - 1026 - 32770
+ self.send_create_gem_port_network_ctp(0x102, 0x402, 0x8002)
+ yield self.wait_for_response()
+
+ # Create AR - GemPortNetworkCtp - 259 - 1027 - 32771
+ self.send_create_gem_port_network_ctp(0x103, 0x403, 0x8003)
+ yield self.wait_for_response()
+
+ # Create AR - MulticastGemInterworkingTp - 6 - 260
+ self.send_create_multicast_gem_interworking_tp(0x6, 0x104)
+ yield self.wait_for_response()
+
+ # Create AR - GemInterworkingTp - 32769 - 256 -32768 - 1
+ self.send_create_gem_inteworking_tp(0x8001, 0x100, 0x8000)
+ yield self.wait_for_response()
+
+ # Create AR - GemInterworkingTp - 32770 - 257 -32769 - 1
+ self.send_create_gem_inteworking_tp(0x8002, 0x101, 0x8001)
+ yield self.wait_for_response()
+
+ # Create AR - GemInterworkingTp - 32771 - 258 -32770 - 1
+ self.send_create_gem_inteworking_tp(0x8003, 0x102, 0x8002)
+ yield self.wait_for_response()
+
+ # Create AR - GemInterworkingTp - 32772 - 259 -32771 - 1
+ self.send_create_gem_inteworking_tp(0x8004, 0x103, 0x8003)
+ yield self.wait_for_response()
+
+ # Set AR - 802.1pMapperServiceProfile - 32768 - 32769
+ self.send_set_8021p_mapper_service_profile(0x8000, 0x8001)
+ yield self.wait_for_response()
+
+ # Set AR - 802.1pMapperServiceProfile - 32769 - 32770
+ self.send_set_8021p_mapper_service_profile(0x8001, 0x8002)
+ yield self.wait_for_response()
+
+ # Set AR - 802.1pMapperServiceProfile - 32770 - 32771
+ self.send_set_8021p_mapper_service_profile(0x8002, 0x8003)
+ yield self.wait_for_response()
+
+ # Set AR - 802.1pMapperServiceProfile - 32771 - 32772
+ self.send_set_8021p_mapper_service_profile(0x8003, 0x8004)
+ yield self.wait_for_response()
+
+ # Create AR - MacBridgePortConfigData - 8449 - 513 - 2 - 3 - 32768
+ self.send_create_mac_bridge_port_configuration_data(0x2101, 0x201, 2, 3, 0x8000)
+ yield self.wait_for_response()
+
+ # Create AR - MacBridgePortConfigData - 8450 - 513 - 3 - 3 - 32769
+ self.send_create_mac_bridge_port_configuration_data(0x2102, 0x201, 3, 3, 0x8001)
+ yield self.wait_for_response()
+
+ # Create AR - MacBridgePortConfigData - 8451 - 513 - 4 - 3 - 32770
+ self.send_create_mac_bridge_port_configuration_data(0x2103, 0x201, 4, 3, 0x8002)
+ yield self.wait_for_response()
+
+ # Create AR - MacBridgePortConfigData - 8452 - 513 - 5 - 3 - 32771
+ self.send_create_mac_bridge_port_configuration_data(0x2104, 0x201, 5, 3, 0x8003)
+ yield self.wait_for_response()
+
+ # Create AR - MacBridgePortConfigData - 9000 - 513 - 6 - 6 - 6
+ self.send_create_mac_bridge_port_configuration_data(0x2328, 0x201, 6, 6, 6)
+ yield self.wait_for_response()
+
+ # Create AR - VlanTaggingFilterData - 8449 - 040000000000000000000000000000000000000000000000
+ self.send_create_vlan_tagging_filter_data(0x2101, 0x0400)
+ yield self.wait_for_response()
+
+ # Create AR - VlanTaggingFilterData - 8450 - 040100000000000000000000000000000000000000000000
+ self.send_create_vlan_tagging_filter_data(0x2102, 0x0401)
+ yield self.wait_for_response()
+
+ # Create AR - VlanTaggingFilterData - 8451 - 040200000000000000000000000000000000000000000000
+ self.send_create_vlan_tagging_filter_data(0x2103, 0x0402)
+ yield self.wait_for_response()
+
+ # Create AR - VlanTaggingFilterData - 8452 - 040300000000000000000000000000000000000000000000
+ self.send_create_vlan_tagging_filter_data(0x2104, 0x0403)
+ yield self.wait_for_response()
+
+ # Create AR - ExtendedVlanTaggingOperationConfigData - 514 - 10 - 1025
+ self.send_create_extended_vlan_tagging_operation_configuration_data(0x202)
+ yield self.wait_for_response()
+
+ # Set AR - ExtendedVlanTaggingOperationConfigData - 514 - 8100 - 8100
+ self.send_set_extended_vlan_tagging_operation_tpid_configuration_data(0x202,0x8100,0x8100)
+ yield self.wait_for_response()
+
+ # Set AR - ExtendedVlanTaggingOperationConfigData - 514 - RxVlanTaggingOperationTable - 0x400 - 5 - 0x400 - 4
+ self.send_set_extended_vlan_tagging_operation_vlan_configuration_data(0x202, 0x400, 0x400)
+ yield self.wait_for_response()
+
+ # Set AR - ExtendedVlanTaggingOperationConfigData - 514 - RxVlanTaggingOperationTable - 0x401 - 5 - 0x401 - 4
+ self.send_set_extended_vlan_tagging_operation_vlan_configuration_data(0x202, 0x401, 0x401)
+ yield self.wait_for_response()
+
+ # Set AR - ExtendedVlanTaggingOperationConfigData - 514 - RxVlanTaggingOperationTable - 0x402 - 5 - 0x402 - 4
+ self.send_set_extended_vlan_tagging_operation_vlan_configuration_data(0x202, 0x402, 0x402)
+ yield self.wait_for_response()
+
+ # Set AR - ExtendedVlanTaggingOperationConfigData - 514 - RxVlanTaggingOperationTable - 0x403 - 5 - 0x403 - 4
+ self.send_set_extended_vlan_tagging_operation_vlan_configuration_data(0x202, 0x403, 0x403)
+ yield self.wait_for_response()
+
+ # Create AR - MacBridgePortConfigData - 513 - 513 - 1 - 11 - 1025
+ self.send_create_mac_bridge_port_configuration_data(0x201, 0x201, 1, 11, 0x401)
+ yield self.wait_for_response()
+
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 6c3b841..2ffa795 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -14,50 +14,88 @@
# limitations under the License.
#
-import sys
+"""
+Maple OLT/ONU adapter.
+"""
from uuid import uuid4
+import grpc
import structlog
-from twisted.spread import pb
+from scapy.layers.l2 import Ether, Dot1Q
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.spread import pb
+from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
from zope.interface import implementer
-from common.utils.asleep import asleep
+from common.frameio.frameio import BpfProgramFilter, hexify
+
from voltha.adapters.interface import IAdapterInterface
from voltha.core.logical_device_agent import mac_str_to_tuple
-from voltha.protos.adapter_pb2 import Adapter, AdapterConfig
-from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Device, Port
-from voltha.protos.health_pb2 import HealthStatus
+from voltha.protos import third_party
+from voltha.protos.adapter_pb2 import Adapter
+from voltha.protos.adapter_pb2 import AdapterConfig
from voltha.protos.common_pb2 import LogLevel, OperStatus, ConnectStatus, \
AdminState
-from voltha.protos.logical_device_pb2 import LogicalDevice, LogicalPort
-from voltha.protos.openflow_13_pb2 import ofp_desc, ofp_port, OFPPF_1GB_FD, \
- OFPPF_FIBER, OFPPS_LIVE, ofp_switch_features, OFPC_PORT_STATS, \
- OFPC_GROUP_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS
+from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Port, Device
+from voltha.protos.health_pb2 import HealthStatus
+from google.protobuf.empty_pb2 import Empty
+from voltha.protos.logical_device_pb2 import LogicalPort, LogicalDevice
+from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, OFPPF_1GB_FD, \
+ OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
+ ofp_switch_features, ofp_desc, ofp_port
+from voltha.registry import registry
+from voltha.extensions.omci.omci import *
+
+_ = third_party
log = structlog.get_logger()
+PACKET_IN_VLAN = 4091
+is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
+ PACKET_IN_VLAN))
-class AsyncRx(pb.Root):
+
+class OmciRxProxy(pb.Root):
+ def __init__(self):
+ self.pb_server_ip = '192.168.24.20' # registry('main').get_args().external_host_address
+ self.pb_server_port = 24497
+ self.pb_server_factory = pb.PBServerFactory(self)
+ # start PB server
+ self.listen_port = reactor.listenTCP(self.pb_server_port, self.pb_server_factory)
+ self.omci_rx_queue = DeferredQueue()
+ log.info('PB-server-started-on-port', port=self.pb_server_port)
+
+ def get_ip(self):
+ return self.pb_server_ip
+
+ def get_port(self):
+ return self.pb_server_port
+
+ def get_host(self):
+ return self.listen_port.getHost()
+
def remote_echo(self, pkt_type, pon, onu, port, crc_ok, msg_size, msg_data):
- log.info('packet-type', pkt_type=pkt_type)
- log.info('pon-id', pon_id=pon)
- log.info('onu-id', onu_id=onu)
- log.info('port', port_id=port)
- log.info('crc-ok', crc_ok=crc_ok)
- log.info('msg-size', msg_size=msg_size)
- log.info('msg-data', msg_data="".join("{:02x}".format(ord(c)) for c in msg_data))
- return 0
+ log.info('received-omci-msg',
+ pkt_type=pkt_type,
+ pon_id=pon,
+ onu_id=onu,
+ port_id=port,
+ crc_ok=crc_ok,
+ msg_size=msg_size,
+ msg_data=hexify(msg_data))
+ self.omci_rx_queue.put((onu, msg_data))
+
+ def receive(self):
+ return self.omci_rx_queue.get()
+
@implementer(IAdapterInterface)
class MapleOltAdapter(object):
-
name = 'maple_olt'
supported_device_types = [
DeviceType(
- id='maple_olt',
+ id=name,
adapter=name,
accepts_bulk_flow_update=True
)
@@ -69,13 +107,11 @@
self.descriptor = Adapter(
id=self.name,
vendor='Voltha project',
- version='0.1',
+ version='0.4',
config=AdapterConfig(log_level=LogLevel.INFO)
)
- self.PBServerPort = 24497
- #start PB server
- reactor.listenTCP(self.PBServerPort, pb.PBServerFactory(AsyncRx()))
- log.info('PB-server-started on port', port=self.PBServerPort)
+ self.devices_handlers = dict() # device_id -> MapleOltHandler()
+ self.logical_device_id_to_root_device_id = dict()
def start(self):
log.debug('starting')
@@ -98,50 +134,196 @@
raise NotImplementedError()
def adopt_device(self, device):
- log.info('adopt-device', device=device)
- # We kick of a simulated activation scenario
- reactor.callLater(0.2, self._activate_device, device)
+ self.devices_handlers[device.id] = MapleOltHandler(self, device.id)
+ reactor.callLater(0, self.devices_handlers[device.id].activate, device)
return device
def abandon_device(self, device):
- raise NotImplementedError(0
- )
+ raise NotImplementedError()
+
def deactivate_device(self, device):
raise NotImplementedError()
- @inlineCallbacks
- def _activate_device(self, device):
+ def update_flows_bulk(self, device, flows, groups):
+ log.info('bulk-flow-update', device_id=device.id,
+ flows=flows, groups=groups)
+ assert len(groups.items) == 0
+ handler = self.devices_handlers[device.id]
+ return handler.update_flow_table(flows.items)
- # launch connecion
- log.info('initiating-connection-to-olt', device_id=device.id, ipv4=device.ipv4_address)
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ raise NotImplementedError()
+
+ def send_proxied_message(self, proxy_address, msg):
+ log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+ handler = self.devices_handlers[proxy_address.device_id]
+ handler.send_proxied_message(proxy_address, msg)
+
+ def receive_proxied_message(self, proxy_address, msg):
+ raise NotImplementedError()
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ def ldi_to_di(ldi):
+ di = self.logical_device_id_to_root_device_id.get(ldi)
+ if di is None:
+ logical_device = self.adapter_agent.get_logical_device(ldi)
+ di = logical_device.root_device_id
+ self.logical_device_id_to_root_device_id[ldi] = di
+ return di
+
+ device_id = ldi_to_di(logical_device_id)
+ handler = self.devices_handlers[device_id]
+ handler.packet_out(egress_port_no, msg)
+
+
+class MapleOltHandler(object):
+ def __init__(self, adapter, device_id):
+ self.adapter = adapter
+ self.adapter_agent = adapter.adapter_agent
+ self.device_id = device_id
+ self.log = structlog.get_logger(device_id=device_id)
+ self.channel = None
+ self.io_port = None
+ self.logical_device_id = None
+ self.interface = registry('main').get_args().interface
self.pbc_factory = pb.PBClientFactory()
- reactor.connectTCP(device.ipv4_address, 24498, self.pbc_factory)
- self.remote = yield self.pbc_factory.getRootObject()
- log.info('connected-to-olt', device_id=device.id, ipv4=device.ipv4_address)
+ self.pbc_port = 24498
+ self.tx_id = 0
+ self.omci_rx_proxy = OmciRxProxy()
- data = yield self.remote.callRemote('connect_olt', 0)
- #TODO: add error handling
- log.info('connect-data', data=data)
+ def __del__(self):
+ if self.io_port is not None:
+ registry('frameio').del_interface(self.interface)
- data = yield self.remote.callRemote('activate_olt', 0)
- #TODO: add error handling
- log.info('activate-data', data=data)
+ def get_channel(self):
+ return self.channel
- # first we pretend that we were able to contact the device and obtain
- # additional information about it
+ def get_vlan_from_onu(self, onu):
+ vlan = onu + 1024
+ return vlan
+
+ def get_onu_from_vlan(self, vlan):
+ onu = vlan - 1024
+ return onu
+
+ @inlineCallbacks
+ def send_set_remote(self):
+ srv_ip = self.omci_rx_proxy.get_ip()
+ srv_port = self.omci_rx_proxy.get_port()
+ self.log.info('setting-remote-ip-port', ip=srv_ip, port=srv_port)
+
+ try:
+ remote = self.get_channel()
+ data = yield remote.callRemote('set_remote', srv_ip, srv_port)
+ self.log.info('set-remote', data=data, ip=srv_ip, port=srv_port)
+ except Exception as e:
+ self.log.info('set-remote-exception', exc=str(e))
+
+ @inlineCallbacks
+ def send_connect_olt(self, olt_no):
+ self.log.info('connecting-to-olt', olt=olt_no)
+ try:
+ remote = self.get_channel()
+ data = yield remote.callRemote('connect_olt', olt_no)
+ self.log.info('connected-to-olt', data=data)
+ except Exception as e:
+ self.log.info('connect-olt-exception', exc=str(e))
+
+ @inlineCallbacks
+ def send_activate_olt(self, olt_no):
+ self.log.info('activating-olt', olt=olt_no)
+ try:
+ remote = self.get_channel()
+ data = yield remote.callRemote('activate_olt', olt_no)
+ self.log.info('activated-olt', data=data)
+ except Exception as e:
+ self.log.info('activate-olt-exception', exc=str(e))
+
+ @inlineCallbacks
+ def send_create_onu(self, olt_no, onu_no, serial_no, vendor_no):
+ self.log.info('creating-onu',
+ olt=olt_no,
+ onu=onu_no,
+ serial=serial_no,
+ vendor=vendor_no)
+ try:
+ remote = self.get_channel()
+ data = yield remote.callRemote('create_onu',
+ olt_no,
+ onu_no,
+ serial_no,
+ vendor_no)
+ self.log.info('created-onu', data=data)
+ except Exception as e:
+ self.log.info('create-onu-exception', exc=str(e))
+
+ @inlineCallbacks
+ def send_configure_onu(self, olt_no, onu_no, alloc_id, uni_gem, multi_gem):
+ self.log.info('configuring-onu',
+ olt=olt_no,
+ onu=onu_no,
+ alloc_id=alloc_id,
+ unicast_gem_port=uni_gem,
+ multicast_gem_port=multi_gem)
+ try:
+ remote = self.get_channel()
+ data = yield remote.callRemote('configure_onu',
+ olt_no,
+ onu_no,
+ alloc_id,
+ uni_gem,
+ multi_gem)
+ self.log.info('configured-onu', data=data)
+ except Exception as e:
+ self.log.info('configure-onu-exception', exc=str(e))
+
+ @inlineCallbacks
+ def send_activate_onu(self, olt_no, onu_no):
+ self.log.info('activating-onu', olt=olt_no, onu=onu_no)
+ try:
+ remote = self.get_channel()
+ data = yield remote.callRemote('activate_onu', olt_no, onu_no)
+ self.log.info('activated-onu', data=data)
+ except Exception as e:
+ self.log.info('activate-onu-exception', exc=str(e))
+
+ @inlineCallbacks
+ def activate(self, device):
+ self.log.info('activating')
+
+ if not device.ipv4_address:
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'No ipv4_address field provided'
+ self.adapter_agent.update_device(device)
+ return
+
+ self.log.info('initiating-connection-to-olt',
+ device_id=device.id,
+ ipv4=device.ipv4_address,
+ port=self.pbc_port)
+ reactor.connectTCP(device.ipv4_address, self.pbc_port, self.pbc_factory)
+ try:
+ self.channel = yield self.pbc_factory.getRootObject()
+ self.log.info('connected-to-olt',
+ device_id=device.id,
+ ipv4=device.ipv4_address,
+ port=self.pbc_port)
+ except Exception as e:
+ self.log.info('get-channel-exception', exc=str(e))
+
+ self.send_set_remote()
+ self.send_connect_olt(0)
+ self.send_activate_olt(0)
+
device.root = True
device.vendor = 'Broadcom'
- device.model = 'Maple XYZ'
- device.hardware_version = 'Fill this'
- device.firmware_version = 'Fill this'
- device.software_version = 'Fill this'
- device.serial_number = 'Fill this'
+ device.model = 'bcm68620'
+ device.serial_number = device.ipv4_address
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
- # register ports
nni_port = Port(
- port_no=0,
+ port_no=2,
label='NNI facing Ethernet port',
type=Port.ETHERNET_NNI,
admin_state=AdminState.ENABLED,
@@ -156,11 +338,8 @@
oper_status=OperStatus.ACTIVE
))
- # register logical device (as we are a root device)
- logical_device_id = uuid4().hex[:12]
ld = LogicalDevice(
- id=logical_device_id,
- datapath_id=int('0x' + logical_device_id[:8], 16), # from id
+ # not setting id and datapth_id will let the adapter agent pick id
desc=ofp_desc(
mfr_desc='cord porject',
hw_desc='n/a',
@@ -180,12 +359,12 @@
),
root_device_id=device.id
)
- self.adapter_agent.create_logical_device(ld)
+ ld_initialized = self.adapter_agent.create_logical_device(ld)
cap = OFPPF_1GB_FD | OFPPF_FIBER
- self.adapter_agent.add_logical_port(ld.id, LogicalPort(
+ self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
id='nni',
ofp_port=ofp_port(
- port_no=0,
+ port_no=0, # is 0 OK?
hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % 129),
name='nni',
config=0,
@@ -201,103 +380,91 @@
root_port=True
))
- # and finally update to active
device = self.adapter_agent.get_device(device.id)
- device.parent_id = ld.id
+ device.parent_id = ld_initialized.id
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
+ self.logical_device_id = ld_initialized.id
- reactor.callLater(0.1, self._simulate_detection_of_onus, device)
+ # register ONUS per uni port until done asynchronously
+ for onu_no in [1]:
+ vlan_id = self.get_vlan_from_onu(onu_no)
+ yield self.send_create_onu(0, onu_no, '4252434d', '12345678')
+ yield self.send_configure_onu(0, onu_no, vlan_id, vlan_id, 4000)
+ yield self.send_activate_onu(0, onu_no)
- def _simulate_detection_of_onus(self, device):
- for i in xrange(1, 2):
- log.info('activate-olt-for-onu-{}'.format(i))
- vlan_id = self._olt_side_onu_activation(i)
self.adapter_agent.child_device_detected(
parent_device_id=device.id,
parent_port_no=1,
child_device_type='broadcom_onu',
proxy_address=Device.ProxyAddress(
device_id=device.id,
- channel_id=i
+ channel_id=vlan_id
),
- vlan=i+1024
+ vlan=vlan_id
)
- @inlineCallbacks
- def _olt_side_onu_activation(self, seq):
- """
- This is where if this was a real OLT, the OLT-side activation for
- the new ONU should be performed. By the time we return, the OLT shall
- be able to provide tunneled (proxy) communication to the given ONU,
- using the returned information.
- """
- data = yield self.remote.callRemote('create_onu', 0, seq, '4252434d', '12345678')
- log.info('create-onu-data', data=data)
+ # finally, open the frameio port to receive in-band packet_in messages
+ self.log.info('registering-frameoi')
+ # self.io_port = registry('frameio').add_interface(
+ # self.interface, self.rcv_io, is_inband_frame)
- vlan_id = seq + 1024
+ def rcv_io(self, port, frame):
+ self.log.info('reveived', iface_name=port.iface_name,
+ frame_len=len(frame))
+ pkt = Ether(frame)
+ if pkt.haslayer(Dot1Q):
+ outer_shim = pkt.getlayer(Dot1Q)
+ if isinstance(outer_shim.payload, Dot1Q):
+ inner_shim = outer_shim.payload
+ cvid = inner_shim.vlan
+ logical_port = cvid
+ popped_frame = (
+ Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
+ inner_shim.payload
+ )
+ kw = dict(
+ logical_device_id=self.logical_device_id,
+ logical_port_no=logical_port,
+ )
+ self.log.info('sending-packet-in', **kw)
+ self.adapter_agent.send_packet_in(
+ packet=str(popped_frame), **kw)
- data = yield self.remote.callRemote('configure_onu', 0, seq, alloc_id=vlan_id, unicast_gem=vlan_id, multicast_gem=4095)
- log.info('configure-onu-data', data=data)
-
- data = yield self.remote.callRemote('activate_onu', 0, seq)
- log.info('activate-onu-data', data=data)
-
- log.info('ready-to-send-omci')
- omci_msg = "00014F0A00020000000000000000000000000000000000000000000000000000000000000000000000000028"
- log.info('sending-omci-msg', msg=omci_msg)
- try:
- res = yield self.remote.callRemote(
- 'send_omci',
- 0,
- 0,
- 1,
- omci_msg
- )
- log.info('omci-send-result', result=res)
- except Exception, e:
- log.info('omci-send-exception', exc=str(e))
-
- #reactor.callLater(5.0, self._send_omci_test_msg)
-
- returnValue(vlan_id)
+ def update_flow_table(self, flows):
+ self.log.info('pushing-olt-flow-table')
@inlineCallbacks
- def _send_omci_test_msg(self):
- omci_msg = "00014F0A00020000000000000000000000000000000000000000000000000000000000000000000000000028"
- log.info('sending-omci-msg', msg=omci_msg)
- try:
- res = yield self.remote.callRemote(
- 'send_omci',
- 0,
- 0,
- 1,
- omci_msg
- )
- log.info('omci-send-result', result=res)
- except Exception, e:
- log.info('omci-send-exception', exc=str(e))
-
- def update_flows_bulk(self, device, flows, groups):
- log.debug('bulk-flow-update', device_id=device.id,
- flows=flows, groups=groups)
-
- def update_flows_incrementally(self, device, flow_changes, group_changes):
- raise NotImplementedError()
-
def send_proxied_message(self, proxy_address, msg):
- log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
- # we mimic a response by sending the same message back in a short time
- reactor.callLater(
- 0.2,
- self.adapter_agent.receive_proxied_message,
- proxy_address,
- msg
+ if isinstance(msg, Packet):
+ msg = str(msg)
+
+ self.log.info('send-proxied-message',
+ proxy_address=proxy_address.channel_id,
+ msg=msg)
+
+ try:
+ remote = self.get_channel()
+ yield remote.callRemote("send_omci",
+ 0,
+ 0,
+ self.get_onu_from_vlan(proxy_address.channel_id),
+ msg)
+ onu, rmsg = yield self.omci_rx_proxy.receive()
+ self.adapter_agent.receive_proxied_message(proxy_address, rmsg)
+ except Exception as e:
+ self.log.info('send-proxied_message-exception', exc=str(e))
+
+ def packet_out(self, egress_port, msg):
+ self.log.info('sending-packet-out',
+ egress_port=egress_port,
+ msg=hexify(msg))
+
+ pkt = Ether(msg)
+ out_pkt = (
+ Ether(src=pkt.src, dst=pkt.dst) /
+ Dot1Q(vlan=4091) /
+ Dot1Q(vlan=egress_port, type=pkt.type) /
+ pkt.payload
)
-
- def receive_proxied_message(self, proxy_address, msg):
- raise NotImplementedError()
-
- def receive_packet_out(self, logical_device_id, egress_port_no, msg):
- log.info('packet-out', logical_device_id=logical_device_id,
- egress_port_no=egress_port_no, msg_len=len(msg))
+ self.io_port.send(str(out_pkt))