VOL-217 Initial implementation of device handler for asfvolt16
This commit is a copy-paste of the maple_olt device handler.
This is not elegant but it is being pushed to the repo to
enable others to start integrating with the gRPC protobuf
backend. Subsequent commits will introduce a base device
handler class which will abstract out the generic functions.
Change-Id: I7cfb25ece6cce0ab399d4b4083593ff9215c5270
diff --git a/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py b/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
new file mode 100644
index 0000000..ae0b41b
--- /dev/null
+++ b/voltha/adapters/asfvolt16_olt/asfvolt16_device_handler.py
@@ -0,0 +1,629 @@
+# Copyright 2017 the original author or authors.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Asfvolt16 OLT adapter
+from uuid import uuid4
+import structlog
+from twisted.internet.defer import inlineCallbacks, DeferredQueue
+from twisted.internet import reactor
+from scapy.layers.l2 import Packet, Ether, Dot1Q
+from common.frameio.frameio import BpfProgramFilter, hexify
+from voltha.registry import registry
+from voltha.protos.common_pb2 import OperStatus, ConnectStatus, \
+ AdminState
+from voltha.protos.device_pb2 import Port, Device
+from voltha.protos.logical_device_pb2 import LogicalPort, LogicalDevice
+from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
+ ofp_switch_features, ofp_desc, ofp_port
+from voltha.core.logical_device_agent import mac_str_to_tuple
+import voltha.core.flow_decomposer as fd
+from voltha.adapters.asfvolt16.asfvolt16_rx_handler import Asfvolt16RxHandler
+log = structlog.get_logger()
+is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
+class Asfvolt16Handler(object):
+ def __init__(self, adapter, device_id):
+ self.adapter = adapter
+ self.adapter_agent = adapter.adapter_agent
+ self.device_id = device_id
+ self.log = structlog.get_logger(device_id=device_id)
+ self.io_port = None
+ self.logical_device_id = None
+ self.interface = registry('main').get_args().interface
+ self.onu_discovered_queue = DeferredQueue()
+ self.rx_handler = Asfvolt16RxHandler(self.device_id, self.adapter, self.onu_discovered_queue)
+ self.heartbeat_count = 0
+ self.heartbeat_miss = 0
+ self.heartbeat_interval = 1
+ self.heartbeat_failed_limit = 3
+ self.command_timeout = 5
+ self.pm_metrics = None
+ self.onus = {}
+ def __del__(self):
+ if self.io_port is not None:
+ registry('frameio').close_port(self.io_port)
+ def get_channel(self):
+ raise NotImplementedError()
+ def get_proxy_channel_id_from_onu(self, onu_id):
+ return onu_id << 4
+ def get_onu_from_channel_id(self, channel_id):
+ return channel_id >> 4
+ def get_tunnel_tag_from_onu(self, onu):
+ return 1024 + (onu * 16)
+ def get_onu_from_tunnel_tag(self, tunnel_tag):
+ return (tunnel_tag - 1024) / 16
+ def get_new_onu_id(self, vendor, vendor_specific):
+ onu_id = None
+ for i in range(0, 63):
+ if i not in self.onus:
+ onu_id = i
+ break
+ if onu_id is not None:
+ self.onus[onu_id] = {'onu_id': onu_id,
+ 'vendor': vendor,
+ 'vendor_specific': vendor_specific}
+ return onu_id
+ def onu_exists(self, onu_id):
+ if onu_id in self.onus:
+ self.log.info('onu-exists',
+ onu_id=onu_id,
+ vendor=self.onus[onu_id]['vendor'],
+ vendor_specific=self.onus[onu_id]['vendor_specific'])
+ return self.onus[onu_id]['vendor'], self.onus[onu_id]['vendor_specific']
+ else:
+ self.log.info('onu-does-not-exist', onu_id=onu_id)
+ return None, None
+ def onu_serial_exists(self, sn_vendor, sn_vendor_specific):
+ for key, value in self.onus.iteritems():
+ if sn_vendor in value.itervalues() and sn_vendor_specific in value.itervalues():
+ self.log.info('onu-serial-number-exists',
+ onu_id=value['onu_id'],
+ vendor=sn_vendor,
+ vendor_specific=sn_vendor_specific,
+ onus=self.onus)
+ return value['onu_id']
+ self.log.info('onu-serial-number-does-not-exist',
+ vendor=sn_vendor,
+ vendor_specific=sn_vendor_specific,
+ onus=self.onus)
+ return None
+ @inlineCallbacks
+ def send_set_remote(self):
+ self.log.info('setting-remote-ip-port')
+ raise NotImplementedError()
+ @inlineCallbacks
+ def send_config_classifier(self, olt_no, etype, ip_proto=None,
+ dst_port=None):
+ self.log.info('configuring-classifier',
+ olt=olt_no,
+ etype=etype,
+ ip_proto=ip_proto,
+ dst_port=dst_port)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def send_config_acflow(self, olt_no, onu_no, etype, ip_proto=None,
+ dst_port=None):
+ self.log.info('configuring-acflow',
+ olt=olt_no,
+ onu=onu_no,
+ etype=etype,
+ ip_proto=ip_proto,
+ dst_port=dst_port)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def send_connect_olt(self, olt_no):
+ self.log.info('connecting-to-olt', olt=olt_no)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def send_activate_olt(self, olt_no):
+ self.log.info('activating-olt', olt=olt_no)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def send_create_onu(self, olt_no, onu_no, serial_no, vendor_no):
+ self.log.info('creating-onu',
+ olt=olt_no,
+ onu=onu_no,
+ serial=serial_no,
+ vendor=vendor_no)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def send_configure_alloc_id(self, olt_no, onu_no, alloc_id):
+ self.log.info('configuring-alloc-id',
+ olt=olt_no,
+ onu=onu_no,
+ alloc_id=alloc_id)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def send_configure_unicast_gem(self, olt_no, onu_no, uni_gem):
+ self.log.info('configuring-unicast-gem',
+ olt=olt_no,
+ onu=onu_no,
+ unicast_gem_port=uni_gem)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def send_configure_multicast_gem(self, olt_no, onu_no, multi_gem):
+ self.log.info('configuring-multicast-gem',
+ olt=olt_no,
+ onu=onu_no,
+ multicast_gem_port=multi_gem)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def send_configure_onu(self, olt_no, onu_no, alloc_id, uni_gem, multi_gem):
+ self.log.info('configuring-onu',
+ olt=olt_no,
+ onu=onu_no,
+ alloc_id=alloc_id,
+ unicast_gem_port=uni_gem,
+ multicast_gem_port=multi_gem)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def send_activate_onu(self, olt_no, onu_no):
+ self.log.info('activating-onu', olt=olt_no, onu=onu_no)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def heartbeat(self, device_id, state='run'):
+ """Heartbeat OLT hardware
+ Call remote method 'heartbeat' to verify connectivity to OLT
+ hardware. If heartbeat missed self.heartbeat_failed_limit times OLT
+ adapter is set FAILED/UNREACHABLE.
+ No further action from VOLTHA core is expected as result of heartbeat
+ failure. Heartbeat continues following failure and once connectivity is
+ restored adapter state will be set to ACTIVE/REACHABLE
+ Arguments:
+ device_id: adapter device id
+ state: desired state (stop, start, run)
+ """
+ self.log.debug('olt-heartbeat', device=device_id, state=state,
+ count=self.heartbeat_count)
+ raise NotImplementedError()
+ @inlineCallbacks
+ def arrive_onu(self):
+ self.log.info('arrive-onu waiting')
+ _data = yield self.onu_discovered_queue.get()
+ ok_to_arrive = False
+ olt_id = _data['_device_id']
+ pon_id = _data['_pon_id']
+ onu_id = self.onu_serial_exists(_data['_vendor_id'], _data['_vendor_specific'])
+ self.log.info('arrive-onu-detected', olt_id=olt_id, pon_ni=pon_id, onu_data=_data, onus=self.onus)
+ if _data['onu_id'] == 65535:
+ if onu_id is not None:
+ self.log.info('onu-activation-already-in-progress',
+ vendor=_data['_vendor_id'],
+ vendor_specific=_data['_vendor_specific'],
+ onus=self.onus)
+ else:
+ onu_id = self.get_new_onu_id(_data['_vendor_id'],
+ _data['_vendor_specific'])
+ self.log.info('assigned-onu-id',
+ onu_id=onu_id,
+ vendor=_data['_vendor_id'],
+ vendor_specific=_data['_vendor_specific'],
+ onus=self.onus)
+ ok_to_arrive = True
+ else:
+ vendor_id, vendor_specific = self.onu_exists(_data['onu_id'])
+ if vendor_id is not None and vendor_id == _data['_vendor_id'] and \
+ vendor_specific is not None and vendor_specific == _data['_vendor_specific']:
+ onu_id = _data['onu_id']
+ self.log.info('re-discovered-existing-onu',
+ onu_id=onu_id,
+ vendor=_data['_vendor_id'],
+ vendor_specific=_data['_vendor_specific'])
+ ok_to_arrive = True
+ else:
+ self.log.info('onu-id-serial-number-mismatch-detected',
+ onu_id=onu_id,
+ vendor_id=vendor_id,
+ new_vendor_id=_data['_vendor_id'],
+ vendor_specific=vendor_specific,
+ new_vendor_specific=_data['_vendor_specific'])
+ if onu_id is not None and ok_to_arrive:
+ self.log.info('arriving-onu', onu_id=onu_id)
+ tunnel_tag = self.get_tunnel_tag_from_onu(onu_id)
+ yield self.send_create_onu(pon_id,
+ onu_id,
+ _data['_vendor_id'],
+ _data['_vendor_specific'])
+ yield self.send_configure_alloc_id(pon_id, onu_id, tunnel_tag)
+ yield self.send_configure_unicast_gem(pon_id, onu_id, tunnel_tag)
+ yield self.send_configure_multicast_gem(pon_id, onu_id, 4000)
+ yield self.send_activate_onu(pon_id, onu_id)
+ self.adapter_agent.child_device_detected(
+ parent_device_id=self.device_id,
+ parent_port_no=100,
+ child_device_type='broadcom_onu',
+ proxy_address=Device.ProxyAddress(
+ device_id=self.device_id,
+ channel_id=self.get_proxy_channel_id_from_onu(onu_id), # c-vid
+ onu_id=onu_id,
+ onu_session_id=tunnel_tag # tunnel_tag/gem_port, alloc_id
+ ),
+ vlan=tunnel_tag,
+ serial_number=_data['_vendor_specific']
+ )
+ reactor.callLater(1, self.arrive_onu)
+ @inlineCallbacks
+ def activate(self):
+ device = self.adapter_agent.get_device(self.device_id)
+ self.log.info('activating-olt', device=device)
+ while self.onu_discovered_queue.pending:
+ _ = yield self.onu_discovered_queue.get()
+ if self.logical_device_id is None:
+ if not device.ipv4_address:
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'No ipv4_address field provided'
+ self.adapter_agent.update_device(device)
+ return
+ device.root = True
+ device.vendor = 'Broadcom'
+ device.model = 'bcm68620'
+ device.serial_number = device.ipv4_address
+ self.adapter_agent.update_device(device)
+ nni_port = Port(
+ port_no=1,
+ label='NNI facing Ethernet port',
+ type=Port.ETHERNET_NNI,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ )
+ self.adapter_agent.add_port(self.device_id, nni_port)
+ self.adapter_agent.add_port(self.device_id, Port(
+ port_no=100,
+ label='PON port',
+ type=Port.PON_OLT,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ ))
+ ld = LogicalDevice(
+ # not setting id and datapth_id will let the adapter
+ # agent pick id
+ desc=ofp_desc(
+ mfr_desc='cord project',
+ hw_desc='n/a',
+ sw_desc='logical device for Edgecore ASFvOLT16 OLT',
+ serial_num=uuid4().hex,
+ dp_desc='n/a'
+ ),
+ switch_features=ofp_switch_features(
+ n_buffers=256, # TODO fake for now
+ n_tables=2, # TODO ditto
+ capabilities=( # TODO and ditto
+ )
+ ),
+ root_device_id=self.device_id
+ )
+ ld_initialized = self.adapter_agent.create_logical_device(ld)
+ self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
+ id='nni',
+ ofp_port=ofp_port(
+ port_no=0, # is 0 OK?
+ hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % 129),
+ name='nni',
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_1GB_FD,
+ max_speed=OFPPF_1GB_FD
+ ),
+ device_id=self.device_id,
+ device_port_no=nni_port.port_no,
+ root_port=True
+ ))
+ device.parent_id = ld_initialized.id
+ device.connect_status = ConnectStatus.UNREACHABLE
+ device.oper_status = OperStatus.ACTIVATING
+ self.adapter_agent.update_device(device)
+ self.logical_device_id = ld_initialized.id
+ self.log.info('initiating-connection-to-olt',
+ device_id=self.device_id,
+ ipv4=device.ipv4_address,
+ port=self.pbc_port)
+ try:
+ reactor.connectTCP(device.ipv4_address, self.pbc_port, self.pbc_factory)
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self.adapter_agent.update_device(device)
+ except Exception as e:
+ self.log.info('get-channel-exception', exc=str(e))
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'Failed to connect to OLT'
+ self.adapter_agent.update_device(device)
+ self.pbc_factory.stopTrying()
+ reactor.callLater(5, self.activate, device)
+ return
+ self.log.info('connected-to-olt',
+ device_id=self.device_id,
+ ipv4=device.ipv4_address,
+ port=self.pbc_port)
+ reactor.callLater(0, self.heartbeat, self.device_id, state='start')
+ yield self.send_set_remote()
+ yield self.send_connect_olt(0)
+ yield self.send_activate_olt(0)
+ # Open the frameio port to receive in-band packet_in messages
+ self.log.info('registering-frameio')
+ self.io_port = registry('frameio').open_port(
+ self.interface, self.rcv_io, is_inband_frame)
+ # Finally set the initial PM configuration for this device
+ # TODO: if arrive_onu not working, the following PM stuff was commented out during testing
+ '''
+ self.pm_metrics=Asfvolt16PmMetrics(device)
+ pm_config = self.pm_metrics.make_proto()
+ log.info("initial-pm-config", pm_config=pm_config)
+ self.adapter_agent.update_device_pm_config(pm_config,init=True)
+ # Apply the PM configuration
+ self.update_pm_metrics(device, pm_config)
+ '''
+ reactor.callLater(1, self.arrive_onu)
+ self.log.info('olt-activated', device=device)
+ def rcv_io(self, port, frame):
+ self.log.info('received', iface_name=port.iface_name,
+ frame_len=len(frame))
+ pkt = Ether(frame)
+ if pkt.haslayer(Dot1Q):
+ outer_shim = pkt.getlayer(Dot1Q)
+ if isinstance(outer_shim.payload, Dot1Q):
+ inner_shim = outer_shim.payload
+ cvid = inner_shim.vlan
+ logical_port = cvid
+ popped_frame = (
+ Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
+ inner_shim.payload
+ )
+ kw = dict(
+ logical_device_id=self.logical_device_id,
+ logical_port_no=logical_port,
+ )
+ self.log.info('sending-packet-in', **kw)
+ self.adapter_agent.send_packet_in(
+ packet=str(popped_frame), **kw)
+ @inlineCallbacks
+ def update_flow_table(self, flows):
+ self.log.info('bulk-flow-update', device_id=self.device_id, flows=flows)
+ def is_downstream(port):
+ return not is_upstream(port)
+ def is_upstream(port):
+ return port == 100 # Need a better way
+ for flow in flows:
+ _type = None
+ _ip_proto = None
+ _port = None
+ _vlan_vid = None
+ _udp_dst = None
+ _udp_src = None
+ _ipv4_dst = None
+ _ipv4_src = None
+ _metadata = None
+ _output = None
+ _push_tpid = None
+ _field = None
+ try:
+ _in_port = fd.get_in_port(flow)
+ assert _in_port is not None
+ if is_downstream(_in_port):
+ self.log.info('downstream-flow')
+ elif is_upstream(_in_port):
+ self.log.info('upstream-flow')
+ else:
+ raise Exception('port should be 1 or 2 by our convention')
+ _out_port = fd.get_out_port(flow) # may be None
+ self.log.info('out-port', out_port=_out_port)
+ for field in fd.get_ofb_fields(flow):
+ if field.type == fd.ETH_TYPE:
+ _type = field.eth_type
+ self.log.info('field-type-eth-type',
+ eth_type=_type)
+ elif field.type == fd.IP_PROTO:
+ _ip_proto = field.ip_proto
+ self.log.info('field-type-ip-proto',
+ ip_proto=_ip_proto)
+ elif field.type == fd.IN_PORT:
+ _port = field.port
+ self.log.info('field-type-in-port',
+ in_port=_port)
+ elif field.type == fd.VLAN_VID:
+ _vlan_vid = field.vlan_vid & 0xfff
+ self.log.info('field-type-vlan-vid',
+ vlan=_vlan_vid)
+ elif field.type == fd.VLAN_PCP:
+ _vlan_pcp = field.vlan_pcp
+ self.log.info('field-type-vlan-pcp',
+ pcp=_vlan_pcp)
+ elif field.type == fd.UDP_DST:
+ _udp_dst = field.udp_dst
+ self.log.info('field-type-udp-dst',
+ udp_dst=_udp_dst)
+ elif field.type == fd.UDP_SRC:
+ _udp_src = field.udp_src
+ self.log.info('field-type-udp-src',
+ udp_src=_udp_src)
+ elif field.type == fd.IPV4_DST:
+ _ipv4_dst = field.ipv4_dst
+ self.log.info('field-type-ipv4-dst',
+ ipv4_dst=_ipv4_dst)
+ elif field.type == fd.IPV4_SRC:
+ _ipv4_src = field.ipv4_src
+ self.log.info('field-type-ipv4-src',
+ ipv4_dst=_ipv4_src)
+ elif field.type == fd.METADATA:
+ _metadata = field.table_metadata
+ self.log.info('field-type-metadata',
+ metadata=_metadata)
+ else:
+ raise NotImplementedError('field.type={}'.format(
+ field.type))
+ for action in fd.get_actions(flow):
+ if action.type == fd.OUTPUT:
+ _output = action.output.port
+ self.log.info('action-type-output',
+ output=_output, in_port=_in_port)
+ elif action.type == fd.POP_VLAN:
+ self.log.info('action-type-pop-vlan',
+ in_port=_in_port)
+ elif action.type == fd.PUSH_VLAN:
+ _push_tpid = action.push.ethertype
+ log.info('action-type-push-vlan',
+ push_tpid=_push_tpid, in_port=_in_port)
+ if action.push.ethertype != 0x8100:
+ self.log.error('unhandled-tpid',
+ ethertype=action.push.ethertype)
+ elif action.type == fd.SET_FIELD:
+ _field = action.set_field.field.ofb_field
+ assert (action.set_field.field.oxm_class ==
+ self.log.info('action-type-set-field',
+ field=_field, in_port=_in_port)
+ if _field.type == fd.VLAN_VID:
+ self.log.info('set-field-type-vlan-vid',
+ vlan_vid=_field.vlan_vid & 0xfff)
+ else:
+ self.log.error('unsupported-action-set-field-type',
+ field_type=_field.type)
+ else:
+ log.error('unsupported-action-type',
+ action_type=action.type, in_port=_in_port)
+ if is_upstream(_in_port) and \
+ (_type == 0x888e or
+ (_type == 0x800 and (_ip_proto == 2 or _ip_proto == 17))):
+ yield self.send_config_classifier(0, _type, _ip_proto, _udp_dst)
+ yield self.send_config_acflow(0, _in_port, _type, _ip_proto, _udp_dst)
+ except Exception as e:
+ log.exception('failed-to-install-flow', e=e, flow=flow)
+ @inlineCallbacks
+ def send_proxied_message(self, proxy_address, msg):
+ if isinstance(msg, Packet):
+ msg = str(msg)
+ self.log.info('send-proxied-message',
+ proxy_address=proxy_address.channel_id,
+ msg=msg)
+ raise NotImplementedError()
+ def packet_out(self, egress_port, msg):
+ self.log.info('sending-packet-out',
+ egress_port=egress_port,
+ msg=hexify(msg))
+ pkt = Ether(msg)
+ out_pkt = (
+ Ether(src=pkt.src, dst=pkt.dst) /
+ Dot1Q(vlan=4091) /
+ Dot1Q(vlan=egress_port, type=pkt.type) /
+ pkt.payload
+ )
+ self.io_port.send(str(out_pkt))
+ @inlineCallbacks
+ def update_pm_metrics(self, device, pm_config):
+ self.log.info('update-pm-metrics', device_id=self.device_id,
+ pm_config=pm_config)
+ remote = yield self.get_channel()
+ self.pm_metrics.update(device, pm_config, remote)
diff --git a/voltha/adapters/asfvolt16_olt/asfvolt16_olt.py b/voltha/adapters/asfvolt16_olt/asfvolt16_olt.py
index 983152a..94ce1c3 100644
--- a/voltha/adapters/asfvolt16_olt/asfvolt16_olt.py
+++ b/voltha/adapters/asfvolt16_olt/asfvolt16_olt.py
@@ -18,7 +18,11 @@
Asfvolt16 OLT adapter
+import structlog
from voltha.adapters.iadapter import OltAdapter
+from voltha.adapters.asfvolt16.asfvolt16_device_handler import Asfvolt16Handler
+log = structlog.get_logger()
class Asfvolt16Adapter(OltAdapter):
def __init__(self, adapter_agent, config):
@@ -30,52 +34,3 @@
# register for adapter messages
-class Asfvolt16Handler(object):
- def __init__(self, adapter, device_id):
- raise NotImplementedError()
- def __del__(self):
- raise NotImplementedError()
- def get_channel(self):
- raise NotImplementedError()
- def _get_nni_port(self):
- raise NotImplementedError()
- def activate(self, device):
- raise NotImplementedError()
- def reconcile(self, device):
- raise NotImplementedError()
- def rcv_io(self, port, frame):
- raise NotImplementedError()
- def update_flow_table(self, flows):
- raise NotImplementedError()
- def update_pm_config(self, device, pm_config):
- raise NotImplementedError()
- def send_proxied_message(self, proxy_address, msg):
- raise NotImplementedError()
- def packet_out(self, egress_port, msg):
- raise NotImplementedError()
- def reboot(self):
- raise NotImplementedError()
- def disable(self):
- raise NotImplementedError()
- def reenable(self):
- raise NotImplementedError()
- def delete(self):
- raise NotImplementedError()
- def start_kpi_collection(self, device_id):
- raise NotImplementedError()
diff --git a/voltha/adapters/asfvolt16_olt/asfvolt16_rx_handler.py b/voltha/adapters/asfvolt16_olt/asfvolt16_rx_handler.py
new file mode 100644
index 0000000..e7c912d
--- /dev/null
+++ b/voltha/adapters/asfvolt16_olt/asfvolt16_rx_handler.py
@@ -0,0 +1,233 @@
+# Copyright 2017 the original author or authors.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Asfvolt16 OLT adapter
+import structlog
+from twisted.internet.defer import DeferredQueue
+import arrow
+import binascii
+from common.frameio.frameio import hexify
+from voltha.protos.events_pb2 import KpiEvent, MetricValuePairs
+from voltha.protos.events_pb2 import KpiEventType
+from voltha.protos.events_pb2 import AlarmEventType, \
+ AlarmEventSeverity, AlarmEventState, AlarmEventCategory
+log = structlog.get_logger()
+class Asfvolt16RxHandler(object):
+ def __init__(self, device_id, adapter, onu_queue):
+ self.device_id = device_id
+ self.adapter = adapter
+ self.onu_discovered_queue = onu_queue
+ self.adapter_agent = adapter.adapter_agent
+ self.adapter_name = adapter.name
+ self.omci_rx_queue = DeferredQueue()
+ def remote_echo(self, pkt_type, pon, onu, port, crc_ok, msg_size, msg_data):
+ log.info('received-omci-msg',
+ pkt_type=pkt_type,
+ pon_id=pon,
+ onu_id=onu,
+ port_id=port,
+ crc_ok=crc_ok,
+ msg_size=msg_size,
+ msg_data=hexify(msg_data))
+ self.omci_rx_queue.put((onu, msg_data))
+ def receive_omci_msg(self):
+ return self.omci_rx_queue.get()
+ def remote_report_stats(self, _object, key, stats_data):
+ log.info('received-stats-msg',
+ object=_object,
+ key=key,
+ stats=stats_data)
+ prefix = 'voltha.{}.{}'.format(self.adapter_name, self.device_id)
+ try:
+ ts = arrow.utcnow().timestamp
+ prefixes = {
+ prefix + '.nni': MetricValuePairs(metrics=stats_data)
+ }
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=ts,
+ prefixes=prefixes
+ )
+ self.adapter_agent.submit_kpis(kpi_event)
+ except Exception as e:
+ log.exception('failed-to-submit-kpis', e=e)
+ def remote_report_event(self, _object, key, event, event_data=None):
+ def _convert_serial_data(data):
+ b = bytearray()
+ b.extend(data)
+ return binascii.hexlify(b)
+ log.info('received-event-msg',
+ object=_object,
+ key=key,
+ event_str=event,
+ event_data=event_data)
+ if _object == 'device':
+ # key: {'device_id': <int>}
+ # event: 'state-changed'
+ # event_data: {'state_change_successful': <False|True>,
+ # 'new_state': <str> ('active-working'|'inactive')}
+ pass
+ elif _object == 'nni':
+ # key: {'device_id': <int>, 'nni': <int>}
+ pass
+ elif _object == 'pon_ni':
+ # key: {'device_id': <int>, 'pon_ni': <int>}
+ # event: 'state-changed'
+ # event_data: {'state_change_successful': <False|True>,
+ # 'new_state': <str> ('active-working'|'inactive')}
+ #
+ # event: 'onu-discovered'
+ # event_data: {'serial_num_vendor_id': <str>
+ # 'serial_num_vendor_specific': <str>
+ # 'ranging_time': <int>
+ # 'onu_id': <int>
+ # 'us_line_rate': <int> (0=2.5G, 1=10G)
+ # 'ds_pon_id': <int>
+ # 'us_pon_id': <int>
+ # 'tuning_granularity': <int>
+ # 'step_tuning_time': <int>
+ # 'attenuation': <int>
+ # 'power_levelling_caps': <int>}
+ if 'onu-discovered' == event and event_data is not None:
+ event_data['_device_id'] = key['device_id'] if 'device_id' in key else None
+ event_data['_pon_id'] = key['pon_id'] if 'pon_id' in key else None
+ event_data['_vendor_id'] = _convert_serial_data(event_data['serial_num_vendor_id']) \
+ if 'serial_num_vendor_id' in event_data else None
+ event_data['_vendor_specific'] = _convert_serial_data(event_data['serial_num_vendor_specific']) \
+ if 'serial_num_vendor_specific' in event_data else None
+ self.onu_discovered_queue.put(event_data)
+ log.info('onu-discovered-event-added-to-queue', event_data=event_data)
+ elif _object == 'onu':
+ # key: {'device_id': <int>, 'pon_ni': <int>, 'onu_id': <int>}
+ # event: 'activation-completed'
+ # event_data: {'activation_successful': <False|True>,
+ # act_fail_reason': <str>}
+ #
+ # event: 'deactivation-completed'
+ # event_data: {'deactivation_successful': <False|True>}
+ #
+ # event: 'ranging-completed'
+ # event_data: {'ranging_successful': <False|True>,
+ # 'ranging_fail_reason': <str>,
+ # 'eqd': <int>,
+ # 'number_of_ploams': <int>,
+ # 'power_level': <int>}
+ #
+ # event: 'enable-completed'
+ # event_data: {'serial_num-vendor_id': <str>
+ # 'serial_num-vendor_specific: <str>}
+ #
+ # event: 'disable-completed'
+ # event_data: {'serial_num-vendor_id': <str>
+ # 'serial_num-vendor_specific: <str>}
+ # Get child_device from onu_id
+ child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=key['onu_id'])
+ assert child_device is not None
+ # Build the message, the ONU adapter uses the proxy_address
+ # to uniquely identify a specific ONU
+ msg = {'proxy_address':child_device.proxy_address, 'event':event, 'event_data':event_data}
+ # Send the event message to the ONU adapter
+ self.adapter_agent.publish_inter_adapter_message(child_device.id, msg)
+ elif _object == 'alloc_id':
+ # key: {'device_id': <int>, 'pon_ni': <int>, 'onu_id': <int>, 'alloc_id': ,<int>}
+ pass
+ elif _object == 'gem_port':
+ # key: {'device_id': <int>, 'pon_ni': <int>, 'onu_id': <int>, 'gem_port': ,<int>}
+ pass
+ elif _object == 'trx':
+ # key: {'device_id': <int>, 'pon_ni': <int>}
+ pass
+ elif _object == 'flow_map':
+ # key: {'device_id': <int>, 'pon_ni': <int>}
+ pass
+ def remote_report_alarm(self, _object, key, alarm, status, priority,
+ alarm_data=None):
+ log.info('received-alarm-msg',
+ object=_object,
+ key=key,
+ alarm=alarm,
+ status=status,
+ priority=priority,
+ alarm_data=alarm_data)
+ id = 'voltha.{}.{}.{}'.format(self.adapter_name, self.device_id, _object)
+ description = '{} Alarm - {} - {}'.format(_object.upper(), alarm.upper(),
+ 'Raised' if status else 'Cleared')
+ if priority == 'low':
+ severity = AlarmEventSeverity.MINOR
+ elif priority == 'medium':
+ severity = AlarmEventSeverity.MAJOR
+ elif priority == 'high':
+ severity = AlarmEventSeverity.CRITICAL
+ else:
+ severity = AlarmEventSeverity.INDETERMINATE
+ try:
+ ts = arrow.utcnow().timestamp
+ alarm_event = self.adapter_agent.create_alarm(
+ id=id,
+ resource_id=str(key),
+ type=AlarmEventType.EQUIPMENT,
+ category=AlarmEventCategory.PON,
+ severity=severity,
+ state=AlarmEventState.RAISED if status else AlarmEventState.CLEARED,
+ description=description,
+ context=alarm_data,
+ raised_ts = ts)
+ self.adapter_agent.submit_alarm(self.device_id, alarm_event)
+ except Exception as e:
+ log.exception('failed-to-submit-alarm', e=e)
+ # take action based on alarm type, only pon_ni and onu objects report alarms
+ if object == 'pon_ni':
+ # key: {'device_id': <int>, 'pon_ni': <int>}
+ # alarm: 'los'
+ # status: <False|True>
+ pass
+ elif object == 'onu':
+ # key: {'device_id': <int>, 'pon_ni': <int>, 'onu_id': <int>}
+ # alarm: <'los'|'lob'|'lopc_miss'|'los_mic_err'|'dow'|'sf'|'sd'|'suf'|'df'|'tiw'|'looc'|'dg'>
+ # status: <False|True>
+ pass