VOL-217 - Test activate olt path with a python olt simulator.
Change-Id: I12b2013e8db4f42923daf34e3dc65bf430442752
diff --git a/tests/itests/voltha/adapters/__init__.py b/tests/itests/voltha/adapters/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/itests/voltha/adapters/__init__.py
diff --git a/tests/itests/voltha/adapters/asfvolt16_olt/__init__.py b/tests/itests/voltha/adapters/asfvolt16_olt/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/itests/voltha/adapters/asfvolt16_olt/__init__.py
diff --git a/tests/itests/voltha/adapters/asfvolt16_olt/test_device_state_changes.py b/tests/itests/voltha/adapters/asfvolt16_olt/test_device_state_changes.py
new file mode 100644
index 0000000..b3fb0d5
--- /dev/null
+++ b/tests/itests/voltha/adapters/asfvolt16_olt/test_device_state_changes.py
@@ -0,0 +1,197 @@
+from time import time, sleep
+
+from google.protobuf.json_format import MessageToDict
+
+from voltha.protos.device_pb2 import Device
+from tests.itests.voltha.rest_base import RestBase
+from common.utils.consulhelpers import get_endpoint_from_consul
+
+LOCAL_CONSUL = "localhost:8500"
+
+
+class TestDeviceStateChangeSequence(RestBase):
+ """
+ The prerequisite for this test are:
+ 1. voltha ensemble is running
+ docker-compose -f compose/docker-compose-system-test.yml up -d
+ 2. asfvolt16_olt simulator is running
+ sudo -s
+ . ./env.sh
+ ./voltha/adapters/asfvolt16_olt/sim.py
+ """
+
+ # Retrieve details of the REST entry point
+ rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'chameleon-rest')
+
+ # Construct the base_url
+ base_url = 'http://' + rest_endpoint
+
+ def wait_till(self, msg, predicate, interval=0.1, timeout=5.0):
+ deadline = time() + timeout
+ while time() < deadline:
+ if predicate():
+ return
+ sleep(interval)
+ self.fail('Timed out while waiting for condition: {}'.format(msg))
+
+ def test_device_state_changes_scenarios(self):
+
+ self.verify_prerequisites()
+ # Test basic scenario
+
+ self.basic_scenario()
+
+ def basic_scenario(self):
+ """
+ Test the enable -> disable -> enable -> disable -> delete for OLT
+ """
+ self.assert_no_device_present()
+ olt_id = self.add_olt_device()
+ self.verify_device_preprovisioned_state(olt_id)
+ self.enable_device(olt_id)
+ ldev_id = self.wait_for_logical_device(olt_id)
+ self.verify_logical_ports(ldev_id, 1)
+ olt_ids, _ = self.get_devices()
+ self.disable_device(olt_ids[0])
+ self.assert_no_logical_device()
+ self.delete_device(olt_ids[0])
+ self.assert_no_device_present()
+
+ def verify_prerequisites(self):
+ # all we care is that Voltha is available via REST using the base uri
+ self.get('/api/v1')
+
+ def get_devices(self):
+ devices = self.get('/api/v1/devices')['items']
+ olt_ids = []
+ onu_ids = []
+ for d in devices:
+ if d['adapter'] == 'asfvolt16_olt':
+ olt_ids.append(d['id'])
+ return olt_ids, onu_ids
+
+ def add_olt_device(self):
+ device = Device(
+ type='asfvolt16_olt',
+ host_and_port='172.17.0.1:50060'
+ )
+ device = self.post('/api/v1/devices', MessageToDict(device),
+ expected_code=200)
+ return device['id']
+
+ def verify_device_preprovisioned_state(self, olt_id):
+ # we also check that so far what we read back is same as what we get
+ # back on create
+ device = self.get('/api/v1/devices/{}'.format(olt_id))
+ self.assertNotEqual(device['id'], '')
+ self.assertEqual(device['adapter'], 'asfvolt16_olt')
+ self.assertEqual(device['admin_state'], 'PREPROVISIONED')
+ self.assertEqual(device['oper_status'], 'UNKNOWN')
+
+ def enable_device(self, olt_id):
+ path = '/api/v1/devices/{}'.format(olt_id)
+ self.post(path + '/enable', expected_code=200)
+ device = self.get(path)
+ self.assertEqual(device['admin_state'], 'ENABLED')
+
+ self.wait_till(
+ 'admin state moves to ACTIVATING or ACTIVE',
+ lambda: self.get(path)['oper_status'] in ('ACTIVATING', 'ACTIVE'),
+ timeout=0.5)
+
+ # eventually, it shall move to active state and by then we shall have
+ # device details filled, connect_state set, and device ports created
+ '''
+ # The check for ACTIVE is suppressed since the indications
+ # portion of the code is not yet ready.
+ self.wait_till(
+ 'admin state ACTIVE',
+ lambda: self.get(path)['oper_status'] == 'ACTIVE',
+ timeout=0.5)
+ device = self.get(path)
+ '''
+ self.assertEqual(device['connect_status'], 'REACHABLE')
+
+ ports = self.get(path + '/ports')['items']
+ #self.assertEqual(len(ports), 2)
+ self.assertEqual(len(ports), 1)
+
+ def wait_for_logical_device(self, olt_id):
+ # we shall find the logical device id from the parent_id of the olt
+ # (root) device
+ device = self.get(
+ '/api/v1/devices/{}'.format(olt_id))
+ self.assertNotEqual(device['parent_id'], '')
+ logical_device = self.get(
+ '/api/v1/logical_devices/{}'.format(device['parent_id']))
+
+ # the logical device shall be linked back to the hard device,
+ # its ports too
+ self.assertEqual(logical_device['root_device_id'], device['id'])
+
+ logical_ports = self.get(
+ '/api/v1/logical_devices/{}/ports'.format(
+ logical_device['id'])
+ )['items']
+ self.assertGreaterEqual(len(logical_ports), 1)
+ logical_port = logical_ports[0]
+ self.assertEqual(logical_port['id'], 'nni')
+ self.assertEqual(logical_port['ofp_port']['name'], 'nni')
+ self.assertEqual(logical_port['ofp_port']['port_no'], 0)
+ self.assertEqual(logical_port['device_id'], device['id'])
+ self.assertEqual(logical_port['device_port_no'], 1)
+ return logical_device['id']
+
+ def verify_logical_ports(self, ldev_id, num_ports):
+
+ # at this point we shall see num_ports logical ports on the
+ # logical device
+ logical_ports = self.get(
+ '/api/v1/logical_devices/{}/ports'.format(ldev_id)
+ )['items']
+ self.assertGreaterEqual(len(logical_ports), num_ports)
+
+ # verify that all logical ports are LIVE (state=4)
+ for lport in logical_ports:
+ self.assertEqual(lport['ofp_port']['state'], 4)
+
+ def disable_device(self, id):
+ path = '/api/v1/devices/{}'.format(id)
+ self.post(path + '/disable', expected_code=200)
+ device = self.get(path)
+ self.assertEqual(device['admin_state'], 'DISABLED')
+
+ self.wait_till(
+ 'operational state moves to UNKNOWN',
+ lambda: self.get(path)['oper_status'] == 'UNKNOWN',
+ timeout=0.5)
+
+ # eventually, the connect_state should be UNREACHABLE
+ self.wait_till(
+ 'connest status UNREACHABLE',
+ lambda: self.get(path)['connect_status'] == 'UNREACHABLE',
+ timeout=0.5)
+
+ # Device's ports should be INACTIVE
+ ports = self.get(path + '/ports')['items']
+ #self.assertEqual(len(ports), 2)
+ self.assertEqual(len(ports), 1)
+ for p in ports:
+ self.assertEqual(p['admin_state'], 'DISABLED')
+ self.assertEqual(p['oper_status'], 'UNKNOWN')
+
+ def delete_device(self, id):
+ path = '/api/v1/devices/{}'.format(id)
+ self.delete(path + '/delete', expected_code=200)
+ device = self.get(path, expected_code=404)
+ self.assertIsNone(device)
+
+ def assert_no_device_present(self):
+ path = '/api/v1/devices'
+ devices = self.get(path)['items']
+ self.assertEqual(devices, [])
+
+ def assert_no_logical_device(self):
+ path = '/api/v1/logical_devices'
+ ld = self.get(path)['items']
+ self.assertEqual(ld, [])
diff --git a/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py b/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
index 90d123b..2ca9d81 100644
--- a/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
+++ b/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
@@ -19,611 +19,144 @@
"""
from uuid import uuid4
-import structlog
-from twisted.internet.defer import inlineCallbacks, DeferredQueue
-from twisted.internet import reactor
-from scapy.layers.l2 import Packet, Ether, Dot1Q
-from common.frameio.frameio import BpfProgramFilter, hexify
-from voltha.registry import registry
-from voltha.protos.common_pb2 import OperStatus, ConnectStatus, \
- AdminState
-from voltha.protos.device_pb2 import Port, Device
+from voltha.protos.common_pb2 import OperStatus, ConnectStatus
+from voltha.protos.device_pb2 import Port
+from voltha.protos.common_pb2 import AdminState
from voltha.protos.logical_device_pb2 import LogicalPort, LogicalDevice
from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
OFPPF_1GB_FD, OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, \
- OFPC_FLOW_STATS, OFPXMC_OPENFLOW_BASIC, \
- ofp_switch_features, ofp_desc, ofp_port
+ OFPC_FLOW_STATS, ofp_switch_features, ofp_desc, ofp_port
from voltha.core.logical_device_agent import mac_str_to_tuple
-import voltha.core.flow_decomposer as fd
-from voltha.adapters.asfvolt16_olt.asfvolt16_rx_handler import Asfvolt16RxHandler
+from voltha.adapters.asfvolt16_olt.bal import Bal
+from voltha.adapters.device_handler import OltDeviceHandler
-log = structlog.get_logger()
-
-PACKET_IN_VLAN = 4091
-is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
- PACKET_IN_VLAN))
-
-class Asfvolt16Handler(object):
+class Asfvolt16Handler(OltDeviceHandler):
def __init__(self, adapter, device_id):
- self.adapter = adapter
- self.adapter_agent = adapter.adapter_agent
- self.device_id = device_id
- self.log = structlog.get_logger(device_id=device_id)
- self.io_port = None
- self.logical_device_id = None
- self.interface = registry('main').get_args().interface
- self.onu_discovered_queue = DeferredQueue()
- self.rx_handler = Asfvolt16RxHandler(self.device_id, self.adapter, self.onu_discovered_queue)
- self.heartbeat_count = 0
- self.heartbeat_miss = 0
- self.heartbeat_interval = 1
- self.heartbeat_failed_limit = 3
- self.command_timeout = 5
- self.pm_metrics = None
- self.onus = {}
+ super(Asfvolt16Handler, self).__init__(adapter, device_id)
+ self.bal = Bal(self.log)
- def __del__(self):
- if self.io_port is not None:
- registry('frameio').close_port(self.io_port)
-
- def get_channel(self):
- raise NotImplementedError()
-
- def get_proxy_channel_id_from_onu(self, onu_id):
- return onu_id << 4
-
- def get_onu_from_channel_id(self, channel_id):
- return channel_id >> 4
-
- def get_tunnel_tag_from_onu(self, onu):
- return 1024 + (onu * 16)
-
- def get_onu_from_tunnel_tag(self, tunnel_tag):
- return (tunnel_tag - 1024) / 16
-
- def get_new_onu_id(self, vendor, vendor_specific):
- onu_id = None
- for i in range(0, 63):
- if i not in self.onus:
- onu_id = i
- break
-
- if onu_id is not None:
- self.onus[onu_id] = {'onu_id': onu_id,
- 'vendor': vendor,
- 'vendor_specific': vendor_specific}
- return onu_id
-
- def onu_exists(self, onu_id):
- if onu_id in self.onus:
- self.log.info('onu-exists',
- onu_id=onu_id,
- vendor=self.onus[onu_id]['vendor'],
- vendor_specific=self.onus[onu_id]['vendor_specific'])
- return self.onus[onu_id]['vendor'], self.onus[onu_id]['vendor_specific']
- else:
- self.log.info('onu-does-not-exist', onu_id=onu_id)
- return None, None
-
- def onu_serial_exists(self, sn_vendor, sn_vendor_specific):
- for key, value in self.onus.iteritems():
- if sn_vendor in value.itervalues() and sn_vendor_specific in value.itervalues():
- self.log.info('onu-serial-number-exists',
- onu_id=value['onu_id'],
- vendor=sn_vendor,
- vendor_specific=sn_vendor_specific,
- onus=self.onus)
- return value['onu_id']
-
- self.log.info('onu-serial-number-does-not-exist',
- vendor=sn_vendor,
- vendor_specific=sn_vendor_specific,
- onus=self.onus)
- return None
-
- @inlineCallbacks
- def send_set_remote(self):
- self.log.info('setting-remote-ip-port')
- raise NotImplementedError()
-
- @inlineCallbacks
- def send_config_classifier(self, olt_no, etype, ip_proto=None,
- dst_port=None):
- self.log.info('configuring-classifier',
- olt=olt_no,
- etype=etype,
- ip_proto=ip_proto,
- dst_port=dst_port)
- raise NotImplementedError()
-
- @inlineCallbacks
- def send_config_acflow(self, olt_no, onu_no, etype, ip_proto=None,
- dst_port=None):
- self.log.info('configuring-acflow',
- olt=olt_no,
- onu=onu_no,
- etype=etype,
- ip_proto=ip_proto,
- dst_port=dst_port)
- raise NotImplementedError()
-
- @inlineCallbacks
- def send_connect_olt(self, olt_no):
- self.log.info('connecting-to-olt', olt=olt_no)
- raise NotImplementedError()
-
- @inlineCallbacks
- def send_activate_olt(self, olt_no):
- self.log.info('activating-olt', olt=olt_no)
- raise NotImplementedError()
-
- @inlineCallbacks
- def send_create_onu(self, olt_no, onu_no, serial_no, vendor_no):
- self.log.info('creating-onu',
- olt=olt_no,
- onu=onu_no,
- serial=serial_no,
- vendor=vendor_no)
- raise NotImplementedError()
-
- @inlineCallbacks
- def send_configure_alloc_id(self, olt_no, onu_no, alloc_id):
- self.log.info('configuring-alloc-id',
- olt=olt_no,
- onu=onu_no,
- alloc_id=alloc_id)
- raise NotImplementedError()
-
- @inlineCallbacks
- def send_configure_unicast_gem(self, olt_no, onu_no, uni_gem):
- self.log.info('configuring-unicast-gem',
- olt=olt_no,
- onu=onu_no,
- unicast_gem_port=uni_gem)
- raise NotImplementedError()
-
- @inlineCallbacks
- def send_configure_multicast_gem(self, olt_no, onu_no, multi_gem):
- self.log.info('configuring-multicast-gem',
- olt=olt_no,
- onu=onu_no,
- multicast_gem_port=multi_gem)
- raise NotImplementedError()
-
- @inlineCallbacks
- def send_configure_onu(self, olt_no, onu_no, alloc_id, uni_gem, multi_gem):
- self.log.info('configuring-onu',
- olt=olt_no,
- onu=onu_no,
- alloc_id=alloc_id,
- unicast_gem_port=uni_gem,
- multicast_gem_port=multi_gem)
- raise NotImplementedError()
-
- @inlineCallbacks
- def send_activate_onu(self, olt_no, onu_no):
- self.log.info('activating-onu', olt=olt_no, onu=onu_no)
- raise NotImplementedError()
-
- @inlineCallbacks
- def heartbeat(self, device_id, state='run'):
- """Heartbeat OLT hardware
-
- Call remote method 'heartbeat' to verify connectivity to OLT
- hardware. If heartbeat missed self.heartbeat_failed_limit times OLT
- adapter is set FAILED/UNREACHABLE.
- No further action from VOLTHA core is expected as result of heartbeat
- failure. Heartbeat continues following failure and once connectivity is
- restored adapter state will be set to ACTIVE/REACHABLE
-
- Arguments:
- device_id: adapter device id
- state: desired state (stop, start, run)
- """
-
- self.log.debug('olt-heartbeat', device=device_id, state=state,
- count=self.heartbeat_count)
- raise NotImplementedError()
-
- @inlineCallbacks
- def arrive_onu(self):
- self.log.info('arrive-onu waiting')
- _data = yield self.onu_discovered_queue.get()
-
- ok_to_arrive = False
- olt_id = _data['_device_id']
- pon_id = _data['_pon_id']
- onu_id = self.onu_serial_exists(_data['_vendor_id'], _data['_vendor_specific'])
- self.log.info('arrive-onu-detected', olt_id=olt_id, pon_ni=pon_id, onu_data=_data, onus=self.onus)
-
- if _data['onu_id'] == 65535:
- if onu_id is not None:
- self.log.info('onu-activation-already-in-progress',
- vendor=_data['_vendor_id'],
- vendor_specific=_data['_vendor_specific'],
- onus=self.onus)
- else:
- onu_id = self.get_new_onu_id(_data['_vendor_id'],
- _data['_vendor_specific'])
- self.log.info('assigned-onu-id',
- onu_id=onu_id,
- vendor=_data['_vendor_id'],
- vendor_specific=_data['_vendor_specific'],
- onus=self.onus)
- ok_to_arrive = True
- else:
- vendor_id, vendor_specific = self.onu_exists(_data['onu_id'])
- if vendor_id is not None and vendor_id == _data['_vendor_id'] and \
- vendor_specific is not None and vendor_specific == _data['_vendor_specific']:
- onu_id = _data['onu_id']
- self.log.info('re-discovered-existing-onu',
- onu_id=onu_id,
- vendor=_data['_vendor_id'],
- vendor_specific=_data['_vendor_specific'])
- ok_to_arrive = True
- else:
- self.log.info('onu-id-serial-number-mismatch-detected',
- onu_id=onu_id,
- vendor_id=vendor_id,
- new_vendor_id=_data['_vendor_id'],
- vendor_specific=vendor_specific,
- new_vendor_specific=_data['_vendor_specific'])
-
- if onu_id is not None and ok_to_arrive:
- self.log.info('arriving-onu', onu_id=onu_id)
- tunnel_tag = self.get_tunnel_tag_from_onu(onu_id)
- yield self.send_create_onu(pon_id,
- onu_id,
- _data['_vendor_id'],
- _data['_vendor_specific'])
- yield self.send_configure_alloc_id(pon_id, onu_id, tunnel_tag)
- yield self.send_configure_unicast_gem(pon_id, onu_id, tunnel_tag)
- yield self.send_configure_multicast_gem(pon_id, onu_id, 4000)
- yield self.send_activate_onu(pon_id, onu_id)
-
- self.adapter_agent.child_device_detected(
- parent_device_id=self.device_id,
- parent_port_no=100,
- child_device_type='broadcom_onu',
- proxy_address=Device.ProxyAddress(
- device_id=self.device_id,
- channel_id=self.get_proxy_channel_id_from_onu(onu_id), # c-vid
- onu_id=onu_id,
- onu_session_id=tunnel_tag # tunnel_tag/gem_port, alloc_id
- ),
- vlan=tunnel_tag,
- serial_number=_data['_vendor_specific']
- )
-
- reactor.callLater(1, self.arrive_onu)
-
- @inlineCallbacks
- def activate(self):
- device = self.adapter_agent.get_device(self.device_id)
+ def activate(self, device):
+ '''
+ TODO: Revisit how to determine the NNI port number.
+ Hardcoding for now.
+ '''
+ port_no = 1
self.log.info('activating-olt', device=device)
- while self.onu_discovered_queue.pending:
- _ = yield self.onu_discovered_queue.get()
-
- if self.logical_device_id is None:
- if not device.ipv4_address:
- device.oper_status = OperStatus.FAILED
- device.reason = 'No ipv4_address field provided'
- self.adapter_agent.update_device(device)
- return
-
- device.root = True
- device.vendor = 'Broadcom'
- device.model = 'bcm68620'
- device.serial_number = device.ipv4_address
- self.adapter_agent.update_device(device)
-
- nni_port = Port(
- port_no=1,
- label='NNI facing Ethernet port',
- type=Port.ETHERNET_NNI,
- admin_state=AdminState.ENABLED,
- oper_status=OperStatus.ACTIVE
- )
- self.adapter_agent.add_port(self.device_id, nni_port)
- self.adapter_agent.add_port(self.device_id, Port(
- port_no=100,
- label='PON port',
- type=Port.PON_OLT,
- admin_state=AdminState.ENABLED,
- oper_status=OperStatus.ACTIVE
- ))
-
- ld = LogicalDevice(
- # not setting id and datapth_id will let the adapter
- # agent pick id
- desc=ofp_desc(
- mfr_desc='cord project',
- hw_desc='n/a',
- sw_desc='logical device for Edgecore ASFvOLT16 OLT',
- serial_num=uuid4().hex,
- dp_desc='n/a'
- ),
- switch_features=ofp_switch_features(
- n_buffers=256, # TODO fake for now
- n_tables=2, # TODO ditto
- capabilities=( # TODO and ditto
- OFPC_FLOW_STATS
- | OFPC_TABLE_STATS
- | OFPC_PORT_STATS
- | OFPC_GROUP_STATS
- )
- ),
- root_device_id=self.device_id
- )
- ld_initialized = self.adapter_agent.create_logical_device(ld)
- cap = OFPPF_1GB_FD | OFPPF_FIBER
- self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
- id='nni',
- ofp_port=ofp_port(
- port_no=0, # is 0 OK?
- hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % 129),
- name='nni',
- config=0,
- state=OFPPS_LIVE,
- curr=cap,
- advertised=cap,
- peer=cap,
- curr_speed=OFPPF_1GB_FD,
- max_speed=OFPPF_1GB_FD
- ),
- device_id=self.device_id,
- device_port_no=nni_port.port_no,
- root_port=True
- ))
-
- device.parent_id = ld_initialized.id
- device.connect_status = ConnectStatus.UNREACHABLE
- device.oper_status = OperStatus.ACTIVATING
- self.adapter_agent.update_device(device)
- self.logical_device_id = ld_initialized.id
-
- self.log.info('initiating-connection-to-olt',
- device_id=self.device_id,
- ipv4=device.ipv4_address,
- port=self.pbc_port)
- try:
- reactor.connectTCP(device.ipv4_address, self.pbc_port, self.pbc_factory)
- device.connect_status = ConnectStatus.REACHABLE
- device.oper_status = OperStatus.ACTIVE
- device.reason = ''
- self.adapter_agent.update_device(device)
- except Exception as e:
- self.log.info('get-channel-exception', exc=str(e))
- device.oper_status = OperStatus.FAILED
- device.reason = 'Failed to connect to OLT'
- self.adapter_agent.update_device(device)
- self.pbc_factory.stopTrying()
- reactor.callLater(5, self.activate, device)
+ if self.logical_device_id is not None:
return
- self.log.info('connected-to-olt',
- device_id=self.device_id,
- ipv4=device.ipv4_address,
- port=self.pbc_port)
+ if not device.host_and_port:
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'No host_and_port field provided'
+ self.adapter_agent.update_device(device)
+ return
- reactor.callLater(0, self.heartbeat, self.device_id, state='start')
+ device.root = True
+ device.vendor = 'Edgecore'
+ device.model = 'ASFvOLT16'
+ device.serial_number = device.host_and_port
+ self.adapter_agent.update_device(device)
- yield self.send_set_remote()
- yield self.send_connect_olt(0)
- yield self.send_activate_olt(0)
+ self.add_port(port_no=port_no, port_type=Port.ETHERNET_NNI)
+ self.logical_device_id = self.add_logical_device(device_id=device.id)
+ self.add_logical_port(port_no=port_no,
+ port_type=Port.ETHERNET_NNI,
+ device_id=device.id,
+ logical_device_id=self.logical_device_id)
- # Open the frameio port to receive in-band packet_in messages
- self.log.info('registering-frameio')
- self.io_port = registry('frameio').open_port(
- self.interface, self.rcv_io, is_inband_frame)
+ self.bal.connect_olt(device.host_and_port)
+ # TODO - Add support for multiple OLT devices.
+ # Only single OLT is supported currently and it id is
+ # hard-coded to 0.
+ self.bal.activate_olt(olt_id=0)
- # Finally set the initial PM configuration for this device
- # TODO: if arrive_onu not working, the following PM stuff was commented out during testing
- '''
- self.pm_metrics=Asfvolt16PmMetrics(device)
- pm_config = self.pm_metrics.make_proto()
- log.info("initial-pm-config", pm_config=pm_config)
- self.adapter_agent.update_device_pm_config(pm_config,init=True)
+ device = self.adapter_agent.get_device(device.id)
+ device.parent_id = self.logical_device_id
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVATING
+ self.adapter_agent.update_device(device)
- # Apply the PM configuration
- self.update_pm_metrics(device, pm_config)
- '''
+ def add_port(self, port_no, port_type):
+ self.log.info('adding-port', port_no=port_no, port_type=port_type)
+ if port_type is Port.ETHERNET_NNI:
+ label='NNI facing Ethernet port'
+ else:
+ self.log.erro('invalid-port-type', port_type=port_type)
+ return
- reactor.callLater(1, self.arrive_onu)
-
- self.log.info('olt-activated', device=device)
-
- def rcv_io(self, port, frame):
- self.log.info('received', iface_name=port.iface_name,
- frame_len=len(frame))
- pkt = Ether(frame)
- if pkt.haslayer(Dot1Q):
- outer_shim = pkt.getlayer(Dot1Q)
- if isinstance(outer_shim.payload, Dot1Q):
- inner_shim = outer_shim.payload
- cvid = inner_shim.vlan
- logical_port = cvid
- popped_frame = (
- Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
- inner_shim.payload
- )
- kw = dict(
- logical_device_id=self.logical_device_id,
- logical_port_no=logical_port,
- )
- self.log.info('sending-packet-in', **kw)
- self.adapter_agent.send_packet_in(
- packet=str(popped_frame), **kw)
-
- @inlineCallbacks
- def update_flow_table(self, flows):
- self.log.info('bulk-flow-update', device_id=self.device_id, flows=flows)
-
- def is_downstream(port):
- return not is_upstream(port)
-
- def is_upstream(port):
- return port == 100 # Need a better way
-
- for flow in flows:
- _type = None
- _ip_proto = None
- _port = None
- _vlan_vid = None
- _udp_dst = None
- _udp_src = None
- _ipv4_dst = None
- _ipv4_src = None
- _metadata = None
- _output = None
- _push_tpid = None
- _field = None
-
- try:
- _in_port = fd.get_in_port(flow)
- assert _in_port is not None
-
- if is_downstream(_in_port):
- self.log.info('downstream-flow')
- elif is_upstream(_in_port):
- self.log.info('upstream-flow')
- else:
- raise Exception('port should be 1 or 2 by our convention')
-
- _out_port = fd.get_out_port(flow) # may be None
- self.log.info('out-port', out_port=_out_port)
-
- for field in fd.get_ofb_fields(flow):
-
- if field.type == fd.ETH_TYPE:
- _type = field.eth_type
- self.log.info('field-type-eth-type',
- eth_type=_type)
-
- elif field.type == fd.IP_PROTO:
- _ip_proto = field.ip_proto
- self.log.info('field-type-ip-proto',
- ip_proto=_ip_proto)
-
- elif field.type == fd.IN_PORT:
- _port = field.port
- self.log.info('field-type-in-port',
- in_port=_port)
-
- elif field.type == fd.VLAN_VID:
- _vlan_vid = field.vlan_vid & 0xfff
- self.log.info('field-type-vlan-vid',
- vlan=_vlan_vid)
-
- elif field.type == fd.VLAN_PCP:
- _vlan_pcp = field.vlan_pcp
- self.log.info('field-type-vlan-pcp',
- pcp=_vlan_pcp)
-
- elif field.type == fd.UDP_DST:
- _udp_dst = field.udp_dst
- self.log.info('field-type-udp-dst',
- udp_dst=_udp_dst)
-
- elif field.type == fd.UDP_SRC:
- _udp_src = field.udp_src
- self.log.info('field-type-udp-src',
- udp_src=_udp_src)
-
- elif field.type == fd.IPV4_DST:
- _ipv4_dst = field.ipv4_dst
- self.log.info('field-type-ipv4-dst',
- ipv4_dst=_ipv4_dst)
-
- elif field.type == fd.IPV4_SRC:
- _ipv4_src = field.ipv4_src
- self.log.info('field-type-ipv4-src',
- ipv4_dst=_ipv4_src)
-
- elif field.type == fd.METADATA:
- _metadata = field.table_metadata
- self.log.info('field-type-metadata',
- metadata=_metadata)
-
- else:
- raise NotImplementedError('field.type={}'.format(
- field.type))
-
- for action in fd.get_actions(flow):
-
- if action.type == fd.OUTPUT:
- _output = action.output.port
- self.log.info('action-type-output',
- output=_output, in_port=_in_port)
-
- elif action.type == fd.POP_VLAN:
- self.log.info('action-type-pop-vlan',
- in_port=_in_port)
-
- elif action.type == fd.PUSH_VLAN:
- _push_tpid = action.push.ethertype
- log.info('action-type-push-vlan',
- push_tpid=_push_tpid, in_port=_in_port)
- if action.push.ethertype != 0x8100:
- self.log.error('unhandled-tpid',
- ethertype=action.push.ethertype)
-
- elif action.type == fd.SET_FIELD:
- _field = action.set_field.field.ofb_field
- assert (action.set_field.field.oxm_class ==
- OFPXMC_OPENFLOW_BASIC)
- self.log.info('action-type-set-field',
- field=_field, in_port=_in_port)
- if _field.type == fd.VLAN_VID:
- self.log.info('set-field-type-vlan-vid',
- vlan_vid=_field.vlan_vid & 0xfff)
- else:
- self.log.error('unsupported-action-set-field-type',
- field_type=_field.type)
- else:
- log.error('unsupported-action-type',
- action_type=action.type, in_port=_in_port)
-
- if is_upstream(_in_port) and \
- (_type == 0x888e or
- (_type == 0x800 and (_ip_proto == 2 or _ip_proto == 17))):
- yield self.send_config_classifier(0, _type, _ip_proto, _udp_dst)
- yield self.send_config_acflow(0, _in_port, _type, _ip_proto, _udp_dst)
-
-
-
- except Exception as e:
- log.exception('failed-to-install-flow', e=e, flow=flow)
-
- @inlineCallbacks
- def send_proxied_message(self, proxy_address, msg):
- if isinstance(msg, Packet):
- msg = str(msg)
-
- self.log.info('send-proxied-message',
- proxy_address=proxy_address.channel_id,
- msg=msg)
- raise NotImplementedError()
-
- def packet_out(self, egress_port, msg):
- self.log.info('sending-packet-out',
- egress_port=egress_port,
- msg=hexify(msg))
-
- pkt = Ether(msg)
- out_pkt = (
- Ether(src=pkt.src, dst=pkt.dst) /
- Dot1Q(vlan=4091) /
- Dot1Q(vlan=egress_port, type=pkt.type) /
- pkt.payload
+ port = Port(
+ port_no=port_no,
+ label=label,
+ type=port_type,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
)
- self.io_port.send(str(out_pkt))
+ self.adapter_agent.add_port(self.device_id, port)
- @inlineCallbacks
- def update_pm_metrics(self, device, pm_config):
- self.log.info('update-pm-metrics', device_id=self.device_id,
- pm_config=pm_config)
- remote = yield self.get_channel()
- self.pm_metrics.update(device, pm_config, remote)
+ def add_logical_device(self, device_id):
+ self.log.info('adding-logical-device', device_id=device_id)
+ ld = LogicalDevice(
+ # not setting id and datapth_id will let the adapter
+ # agent pick id
+ desc=ofp_desc(
+ mfr_desc='cord project',
+ hw_desc='n/a',
+ sw_desc='logical device for Edgecore ASFvOLT16 OLT',
+ serial_num=uuid4().hex,
+ dp_desc='n/a'
+ ),
+ switch_features=ofp_switch_features(
+ n_buffers=256, # TODO fake for now
+ n_tables=2, # TODO ditto
+ capabilities=( # TODO and ditto
+ OFPC_FLOW_STATS
+ | OFPC_TABLE_STATS
+ | OFPC_PORT_STATS
+ | OFPC_GROUP_STATS
+ )
+ ),
+ root_device_id=device_id
+ )
+ ld_initialized = self.adapter_agent.create_logical_device(ld)
+ return ld_initialized.id
+
+ def add_logical_port(self, port_no, port_type, device_id, logical_device_id):
+ self.log.info('adding-logical-port', port_no=port_no, port_type=port_type, device_id=device_id)
+ if port_type is Port.ETHERNET_NNI:
+ label='nni'
+ cap = OFPPF_1GB_FD | OFPPF_FIBER
+ curr_speed=OFPPF_1GB_FD
+ max_speed=OFPPF_1GB_FD
+ else:
+ self.log.erro('invalid-port-type', port_type=port_type)
+ return
+
+ ofp=ofp_port(
+ port_no=0, # is 0 OK?
+ hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % 129),
+ name=label,
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=curr_speed,
+ max_speed=max_speed)
+
+ logical_port = LogicalPort(
+ id=label,
+ ofp_port=ofp,
+ device_id=device_id,
+ device_port_no=port_no,
+ root_port=True
+ )
+
+ self.adapter_agent.add_logical_port(logical_device_id, logical_port)
+
+def disable(self):
+ super(Asfvolt16Handler, self).disable()
+
+def delete(self):
+ super(Asfvolt16Handler, self).delete()
diff --git a/voltha/adapters/asfvolt16_olt/bal.py b/voltha/adapters/asfvolt16_olt/bal.py
new file mode 100644
index 0000000..3ce9ceb
--- /dev/null
+++ b/voltha/adapters/asfvolt16_olt/bal.py
@@ -0,0 +1,51 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from twisted.internet.defer import inlineCallbacks
+
+from voltha.adapters.asfvolt16_olt.protos import bal_pb2, bal_obj_pb2, \
+ bal_model_types_pb2
+from voltha.adapters.asfvolt16_olt.grpc_client import GrpcClient
+
+class Bal(object):
+ def __init__(self, log):
+ self.log = log
+ self.grpc_client = GrpcClient(self.log)
+
+ @inlineCallbacks
+ def connect_olt(self, host_and_port):
+ self.log.info('connecting-olt', host_and_port=host_and_port)
+ self.grpc_client.connect(host_and_port)
+ self.stub = bal_pb2.BalStub(self.grpc_client.channel)
+ init = bal_pb2.BalInit()
+ '''
+ TODO: Need to determine out what information
+ needs to be sent to the OLT at this stage.
+ '''
+ yield self.stub.BalApiInit(init)
+
+ def activate_olt(self, olt_id):
+ self.log.info('activating-olt')
+ self.set_access_terminal_admin_state(bal_model_types_pb2.BAL_STATE_UP, olt_id)
+
+ @inlineCallbacks
+ def set_access_terminal_admin_state(self, admin_state, olt_id):
+ self.log.info('setting-admin-state', admin_state=admin_state, olt_id=olt_id)
+ cfg = bal_pb2.BalCfg()
+ cfg.hdr.type = bal_obj_pb2.BAL_OBJ_MSG_TYPE_SET
+ cfg.cfg.key.access_term_id = olt_id
+ cfg.cfg.data.admin_state = admin_state
+ yield self.stub.BalCfgSet(cfg)
diff --git a/voltha/adapters/asfvolt16_olt/grpc_client.py b/voltha/adapters/asfvolt16_olt/grpc_client.py
new file mode 100644
index 0000000..41f46f6
--- /dev/null
+++ b/voltha/adapters/asfvolt16_olt/grpc_client.py
@@ -0,0 +1,43 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import grpc
+from grpc._channel import _Rendezvous
+
+class GrpcClient(object):
+ def __init__(self, log):
+ self.channel = None
+ self.connected = False
+ self.log = log
+
+ def connect(self, endpoint):
+ if self.connected:
+ return
+ try:
+ self.log.info('insecurely-connecting', endpoint=endpoint)
+ self.channel = grpc.insecure_channel(endpoint)
+ self.connected = True
+ self.log.info('insecurely-connected', endpoint=endpoint)
+ return
+
+ except _Rendezvous, e:
+ if e.code() == grpc.StatusCode.UNAVAILABLE:
+ self.log.info('grpc-endpoint-not-available')
+ else:
+ self.log.exception(e)
+
+ except Exception, e:
+ self.log.exception('cannot-connect', endpoint=endpoint)
diff --git a/voltha/adapters/asfvolt16_olt/protos/__init__.py b/voltha/adapters/asfvolt16_olt/protos/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/adapters/asfvolt16_olt/protos/__init__.py
diff --git a/voltha/adapters/asfvolt16_olt/protos/bal_obj.proto b/voltha/adapters/asfvolt16_olt/protos/bal_obj.proto
index c3cbb78..d528be3 100644
--- a/voltha/adapters/asfvolt16_olt/protos/bal_obj.proto
+++ b/voltha/adapters/asfvolt16_olt/protos/bal_obj.proto
@@ -40,28 +40,28 @@
/** Helper type to determine what the data format of a message should look like */
enum Bal_mgt_group
{
- BCMBAL_MGT_GROUP_KEY = 0; /**< Key that uniquely identifies object instance */
- BCMBAL_MGT_GROUP_CFG = 1; /**< Configuration (get/set/clear) */
- BCMBAL_MGT_GROUP_STAT = 2; /**< Statistics */
- BCMBAL_MGT_GROUP_AUTO = 3; /**< Autonomous indications */
- BCMBAL_MGT_GROUP_AUTO_CFG = 4; /**< Autonomous indication configuration */
- BCMBAL_MGT_GROUP__NUM_OF = 5;
+ BAL_MGT_GROUP_KEY = 0; /**< Key that uniquely identifies object instance */
+ BAL_MGT_GROUP_CFG = 1; /**< Configuration (get/set/clear) */
+ BAL_MGT_GROUP_STAT = 2; /**< Statistics */
+ BAL_MGT_GROUP_AUTO = 3; /**< Autonomous indications */
+ BAL_MGT_GROUP_AUTO_CFG = 4; /**< Autonomous indication configuration */
+ BAL_MGT_GROUP__NUM_OF = 5;
}
/** Object message type. Can be a combination of flags. */
enum BalObj_msg_type
{
- BCMBAL_OBJ_MSG_TYPE_INVALID = 0;
- BCMBAL_OBJ_MSG_TYPE_GET = 0x01; /**< Get configuration parameters */
- BCMBAL_OBJ_MSG_TYPE_SET = 0x02; /**< Set configuration parameters */
- BCMBAL_OBJ_MSG_TYPE_CLEAR = 0x04; /**< Clear configuration parameters */
+ BAL_OBJ_MSG_TYPE_INVALID = 0;
+ BAL_OBJ_MSG_TYPE_GET = 0x01; /**< Get configuration parameters */
+ BAL_OBJ_MSG_TYPE_SET = 0x02; /**< Set configuration parameters */
+ BAL_OBJ_MSG_TYPE_CLEAR = 0x04; /**< Clear configuration parameters */
}
/** Object message direction - request or response. */
enum BalObj_msg_dir
{
- BCMBAL_OBJ_MSG_DIR_REQUEST = 0;
- BCMBAL_OBJ_MSG_DIR_RESPONS = 1;
+ BAL_OBJ_MSG_DIR_REQUEST = 0;
+ BAL_OBJ_MSG_DIR_RESPONS = 1;
}
/** Information common to all BAL objects */
diff --git a/voltha/adapters/asfvolt16_olt/sim.py b/voltha/adapters/asfvolt16_olt/sim.py
new file mode 100755
index 0000000..9df3669
--- /dev/null
+++ b/voltha/adapters/asfvolt16_olt/sim.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+import argparse
+import time
+
+from concurrent import futures
+import grpc
+
+from voltha.adapters.asfvolt16_olt.protos.bal_pb2 import BalServicer, add_BalServicer_to_server, BalErr
+from voltha.adapters.asfvolt16_olt.protos.bal_errno_pb2 import BAL_ERR_OK
+
+log = structlog.get_logger()
+
+_ONE_DAY_IN_SECONDS = 60 * 60 * 24
+
+class BalHandler(BalServicer):
+ def __init__(self):
+ pass
+
+ def BalApiInit(self, request, context):
+ log.info('olt-connection-successful', request=request)
+ return BalErr(err=BAL_ERR_OK)
+
+ def BalApiFinish(self, request, context):
+ log.info('BalApi', request=request)
+ return BalErr(err=BAL_ERR_OK)
+
+ def BalCfgSet(self, request, context):
+ log.info('olt-activation-successful', request=request)
+ return BalErr(err=BAL_ERR_OK)
+
+ def BalAccessTerminalCfgSet(self, request, context):
+ log.info('olt-activation-successful', request=request)
+ return BalErr(err=BAL_ERR_OK)
+
+ def BalCfgClear(self, request, context):
+ log.info('BalCfClear', request=request)
+ return BalErr(err=BAL_ERR_OK)
+
+class GrpcServer(object):
+ def __init__(self, port):
+ self.port = port
+ self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
+
+ def start(self):
+ log.debug('starting')
+ add_BalServicer_to_server(BalHandler(), self.server)
+ self.server.add_insecure_port('[::]:%s' % self.port)
+ self.server.start()
+ log.info('started')
+
+ def stop(self, grace=0):
+ log.debug('stopping')
+ self.server.stop(grace)
+ log.info('stopped')
+
+def parse_args():
+ parser = argparse.ArgumentParser()
+ _help = ('port number of the GRPC service exposed by voltha (default: 50599)')
+ parser.add_argument('-g', '--grpc-port',
+ dest='grpc_port',
+ action='store',
+ default=50060,
+ help=_help)
+ args = parser.parse_args()
+ return args
+
+class Main(object):
+ def __init__(self):
+ self.args = parse_args()
+ self.grpc_server = GrpcServer(self.args.grpc_port)
+
+ def start(self):
+ self.grpc_server.start()
+ try:
+ while True:
+ time.sleep(_ONE_DAY_IN_SECONDS)
+ except KeyboardInterrupt:
+ self.stop()
+
+ def stop(self):
+ self.grpc_server.stop(0)
+
+if __name__ == '__main__':
+ Main().start()
diff --git a/voltha/adapters/device_handler.py b/voltha/adapters/device_handler.py
new file mode 100644
index 0000000..a3bc0a6
--- /dev/null
+++ b/voltha/adapters/device_handler.py
@@ -0,0 +1,85 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from voltha.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
+
+class DeviceHandler(object):
+ def __init__(self, adapter, device_id):
+ self.adapter = adapter
+ self.adapter_agent = adapter.adapter_agent
+ self.device_id = device_id
+ self.log = structlog.get_logger(device_id=device_id)
+ self.logical_device_id = None
+
+ def disable(self):
+ self.log.info('disabling', device_id=self.device_id)
+
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+
+ # Disable all ports on that device
+ self.adapter_agent.disable_all_ports(self.device_id)
+
+ # Update the operational status to UNKNOWN
+ device.oper_status = OperStatus.UNKNOWN
+ device.connect_status = ConnectStatus.UNREACHABLE
+ self.adapter_agent.update_device(device)
+
+class OltDeviceHandler(DeviceHandler):
+ def __init__(self, adapter, device_id):
+ super(OltDeviceHandler, self).__init__(adapter, device_id)
+
+ def disable(self):
+ super(OltDeviceHandler, self).disable()
+
+ # Remove the logical device
+ logical_device = self.adapter_agent.get_logical_device(
+ self.logical_device_id)
+ self.adapter_agent.delete_logical_device(logical_device)
+
+ # Disable all child devices first
+ self.adapter_agent.update_child_devices_state(self.device_id,
+ admin_state=AdminState.DISABLED)
+
+ # Remove the peer references from this device
+ self.adapter_agent.delete_all_peer_references(self.device_id)
+
+ # Update the logice device mapping
+ if self.logical_device_id in \
+ self.adapter.logical_device_id_to_root_device_id:
+ del self.adapter.logical_device_id_to_root_device_id[
+ self.logical_device_id]
+
+ # TODO:
+ # 1) Remove all flows from the device
+ # 2) Remove the device from ponsim
+
+ self.log.info('disabled', device_id=self.device_id)
+
+ def delete(self):
+ self.log.info('deleting', device_id=self.device_id)
+
+ # Remove all child devices
+ self.adapter_agent.delete_all_child_devices(self.device_id)
+
+ # TODO:
+ # 1) Remove all flows from the device
+ # 2) Remove the device from ponsim
+
+ self.log.info('deleted', device_id=self.device_id)
+
+class OnuDeviceHandler(DeviceHandler):
+ pass