blob: 0cfaa646f4d7315751026b446d554f8dcc138cd3 [file] [log] [blame]
#
# Copyright 2017-present CIG, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import pprint
import random
import argparse
import datetime
import shlex
import time
import arrow
import structlog
import json
from twisted.internet import reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
from twisted.internet.task import LoopingCall
#from adapter_alarms import AdapterAlarms
from cig_olt_zmq import *
from cig_olt_device import *
from cig_olt_xpon import *
from download import *
from voltha.extensions.omci.omci import *
from voltha.core.logical_device_agent import mac_str_to_tuple
from voltha.core.flow_decomposer import *
from voltha.protos import third_party
from voltha.protos.common_pb2 import OperStatus, AdminState, ConnectStatus
from voltha.protos.events_pb2 import AlarmEventType, \
AlarmEventSeverity, AlarmEventState, AlarmEventCategory
from voltha.protos.device_pb2 import Device, Image, DeviceType, DeviceTypes, Port, Device, \
PmConfigs, PmConfig, PmGroupConfig
from voltha.protos.health_pb2 import HealthStatus
from voltha.protos.logical_device_pb2 import LogicalPort, LogicalPorts, LogicalDevice
from voltha.protos.openflow_13_pb2 import *
#from voltha.adapters.cig_olt.protos.olt_common_pb2 import *
#from voltha.adapters.cig_olt.protos.olt_d_pb2 import *
#from voltha.adapters.cig_olt.protos.olt_pon_pb2 import *
#from voltha.adapters.cig_olt.protos.olt_switch_pb2 import *
from voltha.protos.olt_common_pb2 import *
from voltha.protos.olt_d_pb2 import *
from voltha.protos.olt_pon_pb2 import *
from voltha.protos.olt_switch_pb2 import *
from voltha.protos.bbf_fiber_base_pb2 import \
ChannelgroupConfig, ChannelpartitionConfig, ChannelpairConfig, ChannelterminationConfig, \
OntaniConfig, VOntaniConfig, VEnetConfig
from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
from voltha.registry import registry
from common.frameio.frameio import BpfProgramFilter, hexify
from common.utils.asleep import asleep
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import Raw
from google.protobuf.json_format import MessageToDict
class CigOltHandler(object):
"""
The OLT Handler is used to wrap a single instance of a 10G OLT 1-U pizza-box
"""
def __init__(self, adapter, device_id):
self.adapter = adapter
self.adapter_agent = adapter.adapter_agent
self.device_id = device_id
self.log = structlog.get_logger(device_id=device_id)
self.io_port = None
self.logical_device_id = None
self._logical_device = None
#self.interface = registry('main').get_args().interface
self.work_mode = None
self.work_status = 0
self.reboot_status = 0
self.command_timeout = 5
self.pm_metrics = None
self.default_freq = 150
self.lc = None
#self.onus = {}
self.alarms = None
self.ip_address = None
self.olt_mac = None
self.startup = None
self.zmq_client_echo = None
self.zmq_client_sync = None
self.zmq_client_omci = None
self.zmq_client_of_packet = None
self.zmq_client_async = None
self.zmq_client_sub = None
self.echo_incoming_queue = DeferredQueue()
self.sync_incoming_queue = DeferredQueue()
self.async_incoming_queue = DeferredQueue()
self.omci_incoming_queue = DeferredQueue()
self.of_pkt_incoming_queue = DeferredQueue()
self.pon_port = None
self.onu_id = None
#self.channel = None # Proxy messaging channel with 'send' method
# Northbound and Southbound ports
self.northbound_ports = {} # port number -> Port
self.southbound_ports = {} # port number -> Port (For PON, use pon-id as key)
# self.management_ports = {} # port number -> Port TODO: Not currently supported
self.num_northbound_ports = None
self.num_southbound_ports = None
# self.num_management_ports = None
# Heartbeat support
self.heartbeat_miss = 0
self.heartbeat_interval = 2 # TODO: Decrease before release or any scale testing
self.heartbeat_failed_limit = 3
self.heartbrat_status = 0
self.reboot_check_times = 0
self.seq_no = 0
self.heartbeat = None
self.asyncmsg = None
self.omcimsg = None
self.packetmsg = None
#self.io_port = None
#self.interface = registry('main').get_args().interface
# Installed flows
#self.flow_entries = {} # Flow ID/name -> FlowEntry
# xPON config dictionaries
self._channel_groups = {} # Name -> dict
self._channel_partitions = {} # Name -> dict
self._channel_pairs = {} # Name -> dict
self._channel_terminations = {} # Name -> dict
self._v_ont_anis = {} # Name -> dict
self._ont_anis = {} # Name -> dict
self._v_enets = {} # Name -> dict
self._tconts = {} # Name -> dict
self._traffic_descriptors = {} # Name -> dict
self._gemports = {} # Name -> dict
self._cached_xpon_pon_info = {} # PON-id -> dict
self._download_protocols = None
self._download_deferred = None
self._downloads = {} # name -> Download obj
def __del__(self):
if self.io_port is not None:
registry('frameio').close_port(self.io_port)
@inlineCallbacks
def activate(self, device):
"""
Activate the OLT device
:param device: A voltha.Device object, with possible device-type
specific extensions.
:param reconciling: If True, this adapter is taking over for a previous adapter
for an existing OLT
"""
log.debug('cig activate starting')
log.info('cig activate started')
self.log.info('CigDeviceHandler.activating', device=device)
self.ip_address = device.ipv4_address
self.zmq_client_sync = CigZmqClientSync(self.ip_address, self.sync_incoming_queue)
olt_info = yield self.get_olt_info_com()
if olt_info == None:
self.log.info(' get olt info fail.')
self.activate_failed(device, "ipc timeout.", reachable=False)
return
else:
reactor.callLater(1, self._olt_activate, device, olt_info)
def _olt_activate(self, device, olt_info):
self.work_mode = olt_info.work_mode
self.olt_mac = olt_info.mac_address
device.root = True
device.vendor = olt_info.vendor
device.model = olt_info.model
device.hardware_version = olt_info.hardware_version
device.firmware_version = olt_info.firmware_version
device.serial_number = olt_info.serial_number
self.adapter_agent.update_device(device)
logical_device = self.get_logical_device(device)
self._logical_device = self.adapter_agent.create_logical_device(logical_device,dpid=self.olt_mac)
for m in olt_info.nni_port:
self.log.info('nni_port.port_no', m.port_no)
self.log.info('nni_port.port_type', m.port_type)
self.log.info('nni_port.mac_address', m.mac_address)
if m.port_no==1:
self.northbound_ports[m.port_no] = NniPort(m, self.device_id)
phy_port = self.northbound_ports[m.port_no].get_port()
self.adapter_agent.add_port(device.id, phy_port)
logical_port = self.northbound_ports[m.port_no].get_logical_port()
self.adapter_agent.add_logical_port(self._logical_device.id, logical_port)
for m in olt_info.pon_port:
self.log.info('pon_port.port_no', m.port_no)
self.log.info('pon_port.port_type', m.port_type)
#if m.port_no==1 or m.port_no==2:
#if m.port_no==1:
#self.southbound_ports[m.port_no] = PonPort(m.port_no, self.device_id)
self.southbound_ports[m.port_no] = PonPort(m.port_no, self)
phy_port = self.southbound_ports[m.port_no].get_port()
self.adapter_agent.add_port(device.id, phy_port)
self.log.info('create CigZmqClientAsync socket.')
self.zmq_client_async = CigZmqClientAsync(self.ip_address, self.heartbeat_receive)
self.log.info('send olt activate_msg.')
self.olt_activate_msg_send()
self.work_status = 1
############################################################################
# Setup Alarm handler
#self.alarms = AdapterAlarms(self.adapter, device)
############################################################################
self.log.info('get_device.....')
device = self.adapter_agent.get_device(device.id)
device.parent_id = self._logical_device.id
#device.connect_status = ConnectStatus.UNREACHABLE
#device.oper_status = OperStatus.ACTIVATING
#self.adapter_agent.update_device(device)
self.logical_device_id = self._logical_device.id
#device = self.adapter_agent.get_device(device.id)
device.connect_status = ConnectStatus.REACHABLE
device.oper_status = OperStatus.ACTIVE
device.reason = ''
self.log.info('update_device.....')
self.adapter_agent.update_device(device)
self.log.info('create others zmq sockets.....')
self.zmq_client_sub = CigZmqClientSub(self.ip_address, self.async_incoming_queue)
self.zmq_client_omci = CigZmqClientOmci(self.ip_address, self.omci_incoming_queue)
self.zmq_client_of_packet = CigZmqClientPacketInOut(self.ip_address, self.of_pkt_incoming_queue)
# Schedule the heartbeat for the device
self.log.debug('Starting heartbeat')
self.start_heartbeat(delay=5)
#self.start_onu_test()
self.start_poll_async_msg()
self.start_poll_omci_msg()
#self.start_kpi_collection()
self.start_poll_packet_in_msg()
self.log.info('activate over.', device_id=self.device_id)
@inlineCallbacks
def collect_gem_metrics(self,prefix):
try:
# get pon port
gem_metrics = {}
self.log.info("collect-gem-metrics")
for m in self.southbound_ports:
phy_port = self.southbound_ports[m]._pon_id
self.log.info("collect-gem-metrics", m=m, port=phy_port)
xgem_pm_req_head = OltMsgCommonHdr(
type=OLT_PON_GET_XGEM_PM_REQ,
src_appId=OLT_APPID_VOLTHA,
sync=1
)
data = xgem_pm_req_head.SerializeToString()
self.zmq_client_sync.sync_send(data, 1)
xgem_pm_req_data = OltPonXGemPMReq(
pon_slot=0,
pon_port=phy_port
)
data = xgem_pm_req_data.SerializeToString()
self.zmq_client_sync.sync_send(data, 0)
#time.sleep(1)
self.startup = self.get_sync_queue()
results = yield self.startup
self.log.info('get_sync_queue', results=results)
if results == "RecvTimeoutErr.":
self.log.info('get OLT_PON_GET_XGEM_PM_ACK info timeout.', results=results)
self.zmq_client_sync.sync_reconnect()
else:
gem_pm_info_rep = OltMsgCommonHdr()
gem_pm_info_rep.ParseFromString(results[0])
self.log.info('gem_pm_info_rep.type', gem_pm_info_rep.type)
self.log.info('gem_pm_info_rep.src_appId', gem_pm_info_rep.src_appId)
self.log.info('gem_pm_info_rep.sync', gem_pm_info_rep.sync)
if (gem_pm_info_rep.type != OLT_PON_GET_XGEM_PM_ACK) or (
gem_pm_info_rep.src_appId != OLT_APPID_OLTD) \
or (gem_pm_info_rep.sync != 1) or (len(results) < 2):
self.log.info('get OLT_PON_GET_XGEM_PM_ACK err.')
else:
gem_pm_info = OltPonXGemPMAck()
gem_pm_info.ParseFromString(results[1])
self.log.info('gem_pm_info.pon_port', pon_port=gem_pm_info.pon_port)
self.log.info('gem_pm_info.pon_slot', pon_slot=gem_pm_info.pon_slot)
self.log.info('gem_pm_info.tx_gem_frames', tx_gem_frames=gem_pm_info.tx_gem_frames)
self.log.info('gem_pm_info.rx_gem_frames', rx_gem_frames=gem_pm_info.rx_gem_frames)
self.log.info('gem_pm_info.tx_nolfbit_count', tx_nolfbit_count=gem_pm_info.tx_nolfbit_count)
self.log.info('gem_pm_info.hec_err_count', hec_err_count=gem_pm_info.hec_err_count)
self.log.info('gem_pm_info.frame_lost_count', frame_lost_count=gem_pm_info.frame_lost_count)
self.log.info('gem_pm_info.key_err_count', key_err_count=gem_pm_info.key_err_count)
gem_metrics['tx_gem_frames'] = gem_pm_info.tx_gem_frames
gem_metrics['rx_gem_frames'] = gem_pm_info.rx_gem_frames
gem_metrics['tx_nolfbit_count'] = gem_pm_info.tx_nolfbit_count
gem_metrics['hec_err_count'] = gem_pm_info.hec_err_count
gem_metrics['frame_lost_count'] = gem_pm_info.frame_lost_count
gem_metrics['key_err_count'] = gem_pm_info.key_err_count
# Step 2: prepare the KpiEvent for submission
# we can time-stamp them here (or could use time derived from OLT
ts = arrow.utcnow().timestamp
prefixstr = "%s pon_port:%d xgem" % (prefix, gem_pm_info.pon_port)
self.log.info('prefixstr', prefixstr=prefixstr)
kpi_event = KpiEvent(
type=KpiEventType.slice,
ts=ts,
prefixes={
# xgem
prefixstr: MetricValuePairs(
metrics=gem_metrics),
}
)
# Step 3: submit
self.adapter_agent.submit_kpis(kpi_event)
gem_metrics.clear()
except Exception as e:
log.exception('failed-to-submit-kpis', e=e)
def start_kpi_collection(self):
prefix = 'voltha.{}.{}'.format('cig-olt', self.device_id)
self.log.info('start_kpi_collection',prefix=prefix)
self.lc = LoopingCall(self.collect_gem_metrics,prefix)
self.log.info('start_kpi_collection ',interval=self.default_freq / 10)
self.lc.start(interval=self.default_freq / 10)
#reactor.run()
def stop_kpi_collection(self):
self.lc.stop()
#packet_in, strip svlan and cvlan, and send to onos
'''
def rcv_io(self, port, frame):
self.log.info('received', iface_name=port.iface_name,
frame_len=len(frame))
pkt = Ether(frame)
if pkt.haslayer(Dot1Q):
outer_shim = pkt.getlayer(Dot1Q)
if isinstance(outer_shim.payload, Dot1Q):
inner_shim = outer_shim.payload
cvid = inner_shim.vlan
logical_port = cvid
popped_frame = (
Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
inner_shim.payload
)
kw = dict(
logical_device_id=self.logical_device_id,
logical_port_no=logical_port,
)
self.log.info('sending-packet-in', **kw)
self.adapter_agent.send_packet_in(
packet=str(popped_frame), **kw)
elif pkt.haslayer(Raw):
raw_data = json.loads(pkt.getlayer(Raw).load)
self.alarms.send_alarm(self, raw_data)
'''
def activate_failed(self, device, reason, reachable=True):
"""
Activation process (adopt_device) has failed.
:param device: A voltha.Device object, with possible device-type
specific extensions. Such extensions shall be described as part of
the device type specification returned by device_types().
:param reason: (string) failure reason
:param reachable: (boolean) Flag indicating if device may be reachable
via RESTConf or NETConf even after this failure.
"""
device.oper_status = OperStatus.FAILED
if not reachable:
device.connect_status = ConnectStatus.UNREACHABLE
device.reason = reason
self.adapter_agent.update_device(device)
raise RuntimeError('Failed to activate OLT: {}'.format(device.reason))
#@inlineCallbacks
def deactivate(self, device):
# OLT Specific things here
d, self.startup = self.startup, None
if d is not None:
d.cancel()
self.pons.clear()
# TODO: Any other? OLT specific deactivate steps
# Call into base class and have it clean up as well
super(AdtranOltHandler, self).deactivate(device)
#@inlineCallbacks
def update_flow_table(self, flows, device):
'''
for flow in flows:
self.log.info('bulk-flow-update1', device_id=device.id, flow=flow)
for field in get_ofb_fields(flow):
if field.type == IN_PORT:
if field.port >= 100 and field.port < 124:
field.port = field.port - 100
flow.cookie = 1
elif field.port >= 0 and field.port < 6:
flow.cookie = 2
else:
pass
for action in get_actions(flow):
if action.type == OUTPUT:
if action.output.port >= 100 and action.output.port < 124:
action.output.port = action.output.port - 100
self.log.info('bulk-flow-update2', device_id=device.id, flow=flow)
'''
try:
msg_hdr = OltMsgCommonHdr(
type=OLT_SWITCH_UPDATE_FLOW_TABLE,
src_appId=OLT_APPID_VOLTHA,
sync=0
)
data=msg_hdr.SerializeToString()
self.zmq_client_async.async_send(data,1)
flow_table = OltSwitchFlowTable(
flows=flows
)
data=flow_table.SerializeToString()
self.zmq_client_async.async_send(data,0)
except Exception as e:
self.log.exception('Exception during update flow table.', e=e)
#@inlineCallbacks
def send_proxied_message(self, proxy_address, msg):
if isinstance(msg, Packet):
msg = str(msg)
self.log.info('send-proxied-message',
proxy_address=proxy_address,
msg=msg)
if self.zmq_client_omci is not None:
pon_id = proxy_address.channel_id - 100
onu_id = proxy_address.onu_id
self.log.info('send-proxied-message pon_id:',pon_id)
self.log.info('send-proxied-message onu_id:',onu_id)
else:
return
try:
msg_hdr = OltMsgCommonHdr(
type=OLT_PON_SEND_OMCI,
src_appId=OLT_APPID_VOLTHA,
sync=0
)
data=msg_hdr.SerializeToString()
self.zmq_client_omci.omci_send(data,1)
omci_msg = OltPonSendOmci(
pon_slot=0,
pon_port=pon_id,
onu_id=onu_id,
omci_content=msg
)
#self.log.info('omci_msg.pon_slot', omci_msg.pon_slot)
#self.log.info('omci_msg.pon_port', omci_msg.pon_port)
self.log.info('omci_msg.onu_id', omci_msg.onu_id, datetime.datetime.now())
#self.log.info('omci_msg.omci_content', omci_msg.omci_content)
data=omci_msg.SerializeToString()
self.zmq_client_omci.omci_send(data,0)
except Exception as e:
self.log.info('zmq_client_omci.omci_send exception', exc=str(e))
#raise
def packet_in_msg_proc(self, message):
try:
self.log.info('packet_in_msg_proc: Message from oltd')
sub_msg_header = OltMsgCommonHdr()
sub_msg_header.ParseFromString(message[0])
#self.log.info('got-response', sub_msg_header.type)
#self.log.info('got-response', sub_msg_header.src_appId)
#self.log.info('got-response', sub_msg_header.sync)
if (sub_msg_header.src_appId != OLT_APPID_OLTD) or (sub_msg_header.sync != 0):
self.log.exception('Get error msg.')
return
if sub_msg_header.type == OLT_D_PACKET_IN:
if len(message) != 2:
self.log.exception('Get error packet in msg.')
return
packet_msg = OltDEthPacket()
packet_msg.ParseFromString(message[1])
frame = packet_msg.pkt_buf
#self.log.info('received packet_msg.pkt_len', packet_msg.pkt_len)
#self.log.info('received packet_msg', hexify(packet_msg.pkt_buf))
#self.log.info('received frame:', hexify(frame))
#self.packet_out(1,frame)
pkt = Ether(frame)
if pkt.haslayer(Dot1Q):
inner_shim = pkt.getlayer(Dot1Q)
#if isinstance(outer_shim.payload, Dot1Q):
#inner_shim = outer_shim.payload
cvid = inner_shim.vlan
logical_port = cvid
popped_frame = (
Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
inner_shim.payload
)
kw = dict(
logical_device_id=self.logical_device_id,
logical_port_no=logical_port,
)
self.log.info('sending-packet-in', **kw)
self.adapter_agent.send_packet_in(
packet=str(popped_frame), **kw)
#elif pkt.haslayer(Raw):
#raw_data = json.loads(pkt.getlayer(Raw).load)
#self.alarms.send_alarm(self, raw_data)
else:
self.log.info('No Dot1Q tag.')
else:
self.log.exception('No support msg type.')
return
except Exception as e:
self.log.exception('Exception during packet_in_msg_proc processing', e=e)
def packet_out(self, egress_port, msg):
self.log.debug('sending-packet-out', egress_port=egress_port,
msg_hex=hexify(msg))
pkt = Ether(msg)
out_pkt = (
Ether(src=pkt.src, dst=pkt.dst) /
Dot1Q(vlan=egress_port, type=pkt.type) /
pkt.payload
)
#self.io_port.send(str(out_pkt))
if self.zmq_client_of_packet is None:
return
try:
msg_hdr = OltMsgCommonHdr(
type=OLT_D_PACKET_OUT,
src_appId=OLT_APPID_VOLTHA,
sync=0
)
data=msg_hdr.SerializeToString()
self.zmq_client_of_packet.packet_send(data,1)
packet_msg = OltDEthPacket(
pkt_len=len(str(out_pkt)),
pkt_buf=str(out_pkt)
)
#self.log.info('packet_msg.pkt_len', packet_msg.pkt_len)
#self.log.info('packet_msg.pkt_buf', packet_msg.pkt_buf)
data=packet_msg.SerializeToString()
self.zmq_client_of_packet.packet_send(data,0)
except Exception as e:
self.log.info('zmq_client_of_packet.packet_send exception', exc=str(e))
def poll_metrics_receive(self, message):
try:
self.log.info('poll_metrics_receive: Message from oltd')
self.poll_incoming_queue.put(message)
except Exception as e:
self.log.exception('Exception during poll_metrics_receive processing', e=e)
def of_packet_receive(self, message):
try:
self.log.info('of_packet_receive: Message from oltd')
except Exception as e:
self.log.exception('Exception during of_packet_receive processing', e=e)
def heartbeat_receive(self, message):
if self.work_status == 0:
return
try:
#self.log.info('heartbeat_receive: Message from oltd')
sub_msg_header = OltMsgCommonHdr()
sub_msg_header.ParseFromString(message[0])
if sub_msg_header.type == OLT_COMMON_HEART_BEAT_ACK:
heartbeat_ack = OltCommonHeartBeat()
heartbeat_ack.ParseFromString(message[1])
#self.log.info('heartbeat_ack.is_active', heartbeat_ack.is_active)
if heartbeat_ack.seq_no != self.seq_no:
return
self.heartbeat_miss = 0
#self.log.info("receive-heart-beat-ack heartbeat_ack.seq_no", heartbeat_ack.seq_no)
if heartbeat_ack.is_active == 0:
if self.reboot_status == 0:
for port in self.southbound_ports.itervalues():
port.delete()
self.olt_activate_msg_send()
for channel_term in self._channel_terminations:
self.log.info('self._channel_terminations:', channel_term)
self._on_channel_termination_create(channel_term)
self.configure_pon(channel_term)
for vont_ani in self._v_ont_anis:
self.log.info('self._v_ont_anis:', vont_ani)
self._on_vont_ani_create(vont_ani)
for ont_ani in self._ont_anis:
self.log.info('self._ont_anis:', ont_ani)
self._on_ont_ani_create(ont_ani)
#self.log.info('pon and onu config recovery.', device_id=self.device_id)
except Exception as e:
self.log.exception('Exception during of_packet_receive processing', e=e)
def omci_msg_proc(self, message):
try:
self.log.info('omci_msg_proc: Message from oltd')
sub_msg_header = OltMsgCommonHdr()
sub_msg_header.ParseFromString(message[0])
#self.log.info('got-response', sub_msg_header.type)
#self.log.info('got-response', sub_msg_header.src_appId)
#self.log.info('got-response', sub_msg_header.sync)
if (sub_msg_header.src_appId != OLT_APPID_OLTD) or (sub_msg_header.sync != 0):
self.log.exception('Get error async msg.')
return
if sub_msg_header.type == OLT_PON_SEND_OMCI:
if len(message) != 2:
self.log.exception('Get error omci msg.')
return
omci_msg = OltPonSendOmci()
omci_msg.ParseFromString(message[1])
#self.log.info('omci_msg.pon_slot', omci_msg.pon_slot)
#self.log.info('omci_msg.pon_port', omci_msg.pon_port)
self.log.info('omci_msg.onu_id', omci_msg.onu_id, datetime.datetime.now())
#self.log.info('omci_msg.omci_content', omci_msg.omci_content)
#proxy_address=Device.ProxyAddress(
# device_id=self.device_id,
# channel_id=omci_msg.pon_port + 100,
# onu_id=omci_msg.onu_id,
#)
child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=omci_msg.onu_id)
self.adapter_agent.receive_proxied_message(child_device.proxy_address, omci_msg.omci_content)
#self.adapter_agent.receive_proxied_message(proxy_address, omci_msg.omci_content)
else:
self.log.exception('No support omci msg type.')
return
except Exception as e:
self.log.exception('Exception during omci_receive processing', e=e)
def async_msg_proc(self, message):
try:
self.log.info('sub_receive: Message from oltd')
sub_msg_header = OltMsgCommonHdr()
sub_msg_header.ParseFromString(message[0])
#self.log.info('got-response', sub_msg_header.type)
#self.log.info('got-response', sub_msg_header.src_appId)
#self.log.info('got-response', sub_msg_header.sync)
if (sub_msg_header.src_appId != OLT_APPID_OLTD) or (sub_msg_header.sync != 0):
self.log.exception('Get error async msg.')
return
if sub_msg_header.type == OLT_PON_ONU_RANGING_EVENT:
if len(message) != 2:
self.log.exception('Get error async msg(OLT_PON_ONU_RANGING_EVENT).')
return
self.log.info('sub_receive. msg_len', len(message), message[1])
self.onu_detected(message[1])
elif sub_msg_header.type == OLT_PON_ONU_ACTIVATE_COMPLETE:
if len(message) != 2:
self.log.exception('Get error async msg(OLT_PON_ONU_ACTIVATE_COMPLETE).')
return
self.log.info('sub_receive. msg_len', len(message), message[1])
self.onu_activate_complete_msg_proc(message[1])
else:
self.log.exception('No support msg type.')
#return
except Exception as e:
self.log.exception('Exception during sub_receive processing', e=e)
def get_echo_queue(self):
request = self.echo_incoming_queue.get()
return request
def get_sync_queue(self):
request = self.sync_incoming_queue.get()
return request
def get_async_queue(self):
request = self.async_incoming_queue.get()
return request
def get_omci_queue(self):
request = self.omci_incoming_queue.get()
return request
def get_packet_in_queue(self):
request = self.of_pkt_incoming_queue.get()
return request
def start_poll_async_msg(self):
self.log.info('*** Starting polling async msg ***')
self.asyncmsg = reactor.callLater(0, self.poll_async_msg)
return self.asyncmsg
@inlineCallbacks
def poll_async_msg(self):
try:
response = yield self.get_async_queue()
self.async_msg_proc(response)
except Exception as e:
self.log.info('wait-for-async-exception', exc=str(e))
self.asyncmsg = reactor.callLater(0.07, self.poll_async_msg)
def start_poll_omci_msg(self):
self.log.info('*** Starting polling omci msg ***')
self.omcimsg = reactor.callLater(0, self.poll_omci_msg)
return self.omcimsg
@inlineCallbacks
def poll_omci_msg(self):
try:
response = yield self.get_omci_queue()
self.omci_msg_proc(response)
except Exception as e:
self.log.info('wait-for-omci-exception', exc=str(e))
self.omcimsg = reactor.callLater(0.1, self.poll_omci_msg)
def start_poll_packet_in_msg(self):
self.log.info('*** Starting polling packet in msg ***')
self.packetmsg = reactor.callLater(0, self.poll_packet_in_msg)
return self.packetmsg
@inlineCallbacks
def poll_packet_in_msg(self):
try:
response = yield self.get_packet_in_queue()
self.packet_in_msg_proc(response)
except Exception as e:
self.log.info('wait-for-packet-in-exception', exc=str(e))
self.packetmsg = reactor.callLater(0.1, self.poll_packet_in_msg)
def start_heartbeat(self, delay=10):
assert delay > 1
self.log.info('*** Starting Device Heartbeat ***')
self.heartbeat = reactor.callLater(delay, self.check_pulse)
return self.heartbeat
def check_pulse(self):
if self.work_status == 0:
return
self.heartbeat_check_status()
try:
echo_req = OltMsgCommonHdr(
type=1,
src_appId=OLT_APPID_VOLTHA,
sync=0
)
data=echo_req.SerializeToString()
self.zmq_client_async.async_send(data,1)
self.seq_no += 1
heart_beat = OltCommonHeartBeat(
seq_no=self.seq_no,
is_active=1
)
#self.log.info('send-heartbeat heart_beat.seq_no', heart_beat.seq_no)
#self.log.info('send-heartbeat heart_beat.is_active', heart_beat.is_active)
data=heart_beat.SerializeToString()
self.zmq_client_async.async_send(data,0)
self.heartbeat_miss += 1
except Exception as e:
self.log.exception('Exception during echo processing', e=e)
# Reschedule next heartbeat
#if self.logical_device_id is not None:
#if self.startup_heartbeat
self.heartbeat = reactor.callLater(self.heartbeat_interval, self.check_pulse)
def heartbeat_check_status(self):
"""
Check the number of heartbeat failures against the limit and emit an alarm if needed
"""
device = self.adapter_agent.get_device(self.device_id)
if self.heartbeat_miss >= self.heartbeat_failed_limit and device.connect_status == ConnectStatus.REACHABLE:
self.log.warning('olt-heartbeat-failed', count=self.heartbeat_miss)
self.heartbrat_status = 0
device.connect_status = ConnectStatus.UNREACHABLE
device.oper_status = OperStatus.FAILED
device.reason = 'heartbeat timeout'
self.adapter_agent.update_device(device)
self.heartbeat_alarm(True, self.heartbeat_miss)
else:
# Update device states
if self.heartbeat_miss == 0 and device.connect_status != ConnectStatus.REACHABLE:
self.heartbrat_status = 1
self.log.info('heartbeat success')
device.connect_status = ConnectStatus.REACHABLE
device.oper_status = OperStatus.ACTIVE
device.reason = ''
self.adapter_agent.update_device(device)
self.heartbeat_alarm(False)
def heartbeat_alarm(self, status, heartbeat_misses=0):
try:
ts = arrow.utcnow().timestamp
alarm_data = {'heartbeats_missed':str(heartbeat_misses)}
alarm_event = self.adapter_agent.create_alarm(
id='voltha.{}.{}.olt'.format(self.adapter.name, self.device_id),
resource_id='olt',
type=AlarmEventType.EQUIPMENT,
category=AlarmEventCategory.PON,
severity=AlarmEventSeverity.CRITICAL,
state=AlarmEventState.RAISED if status else
AlarmEventState.CLEARED,
description='OLT Alarm - Heartbeat - {}'.format('Raised'
if status
else 'Cleared'),
context=alarm_data,
raised_ts = ts)
self.adapter_agent.submit_alarm(self.device_id, alarm_event)
self.log.debug('olt-heartbeat alarm sent')
except Exception as e:
log.exception('failed-to-submit-alarm', e=e)
def onu_detected(self, message):
onu_ranging_event = OltPonOnuRangingEvent()
onu_ranging_event.ParseFromString(message)
#self.log.info('onu_detected', onu_ranging_event.pon_slot)
#self.log.info('onu_detected', onu_ranging_event.pon_port)
self.log.info('onu_detected', onu_ranging_event.onu_id, datetime.datetime.now())
#self.log.info('onu_detected', onu_ranging_event.sn)
#self.log.info('onu_detected', onu_ranging_event.ranging_state)
if onu_ranging_event.ranging_state == 1:
if self.southbound_ports[onu_ranging_event.pon_port].onu_exist_check(onu_ranging_event.onu_id)==False:
if self.work_mode == OLT_MODE_AUTO:
self.adapter_agent.child_device_detected(
parent_device_id=self.device_id,
parent_port_no=100 + onu_ranging_event.pon_port,
child_device_type='broadcom_onu',
proxy_address=Device.ProxyAddress(
device_id=self.device_id,
channel_id=onu_ranging_event.pon_port + 100,
onu_id=onu_ranging_event.onu_id,
),
admin_state=AdminState.ENABLED,
)
onu_info = {
'name': None,
'device-id': self.device_id,
'serial-number': onu_ranging_event.sn,
'xpon-name': None,
#'pon': onu_ranging_event.pon_port,
'pon': self.southbound_ports[onu_ranging_event.pon_port],
'onu-id': onu_ranging_event.onu_id,
'ranging-status': 1,
'config-status': 0,
'status_machine': 'init',
'enabled': None,
'channel-partition': None,
'expected-registration-id': None,
'upstream-channel-speed': None,
'upstream-fec': True,
'password': Onu.DEFAULT_PASSWORD,
't-conts': None,
'gem-ports': None,
'onu-vid': None,
'channel-id': onu_ranging_event.pon_port + 100,
'vont-ani': None
}
self.southbound_ports[onu_ranging_event.pon_port].onu_add(onu_info)
else:
if self.work_mode == OLT_MODE_CONFIG:
onu_info = {
'name': None,
'device-id': self.device_id,
'serial-number': onu_ranging_event.sn,
'xpon-name': None,
'pon': self.southbound_ports[onu_ranging_event.pon_port],
'onu-id': onu_ranging_event.onu_id,
'ranging-status': 1,
'config-status': None,
'status_machine': None,
'enabled': None,
'channel-partition': None,
'expected-registration-id': None,
'upstream-channel-speed': None,
'upstream-fec': None,
'password': None,
't-conts': None,
'gem-ports': None,
'onu-vid': None,
'channel-id': None,
'vont-ani': None
}
self.southbound_ports[onu_ranging_event.pon_port].onu_update(onu_info)
else :
if self.southbound_ports[onu_ranging_event.pon_port].onu_exist_check(onu_ranging_event.onu_id)==True:
if self.work_mode == OLT_MODE_AUTO:
self.southbound_ports[onu_ranging_event.pon_port].onu_del(onu_ranging_event.onu_id)
child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=onu_ranging_event.onu_id)
if child_device:
self.adapter_agent.delete_child_device(self.device_id, child_device.id)
elif self.work_mode == OLT_MODE_CONFIG:
onu_info = {
'name': None,
'device-id': self.device_id,
'serial-number': onu_ranging_event.sn,
'xpon-name': None,
'pon': self.southbound_ports[onu_ranging_event.pon_port],
'onu-id': onu_ranging_event.onu_id,
'ranging-status': 0,
'config-status': None,
'status_machine': None,
'enabled': None,
'channel-partition': None,
'expected-registration-id': None,
'upstream-channel-speed': None,
'upstream-fec': None,
'password': None,
't-conts': None,
'gem-ports': None,
'onu-vid': None,
'channel-id': None,
'vont-ani': None
}
self.southbound_ports[onu_ranging_event.pon_port].onu_update(onu_info)
child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=onu_ranging_event.onu_id)
self.log.info('onu_activate_ranging_down_msg', child_device)
time.sleep(1)
if child_device is not None:
msg = {'proxy_address': child_device.proxy_address,'event': 'deactivation-completed'}
self.adapter_agent.publish_inter_adapter_message(child_device.id, msg)
else:
pass
def onu_activate_complete_msg_proc(self, message):
onu_activate_complete_event = OltPonOnuActivateComplete()
onu_activate_complete_event.ParseFromString(message)
#self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.pon_port)
#self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.onu_id)
#self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.result)
child_device = self.adapter_agent.get_child_device(self.device_id, onu_id=onu_activate_complete_event.onu_id)
self.log.info('onu_activate_complete_msg_proc', child_device)
time.sleep(1)
if child_device is not None:
if onu_activate_complete_event.result==0:
ind_info = {
'activation_successful': True
}
onu_status = 'activation_successful'
else :
ind_info = {
'activation_successful': False
}
onu_status = 'activation_fail'
msg = {'proxy_address': child_device.proxy_address,'event': 'activation-completed', 'event_data': ind_info}
self.adapter_agent.publish_inter_adapter_message(child_device.id, msg)
self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.pon_port)
self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.onu_id)
self.log.info('onu_activate_complete_msg_proc', onu_activate_complete_event.result)
#update onu
onu_info = {
'name': None,
'device-id': self.device_id,
'serial-number': None,
'xpon-name': None,
'pon': self.southbound_ports[onu_activate_complete_event.pon_port],
'onu-id': onu_activate_complete_event.onu_id,
'ranging-status': None,
'config-status': None,
'status_machine': onu_status,
'enabled': None,
'channel-partition': None,
'expected-registration-id': None,
'upstream-channel-speed': None,
'upstream-fec': None,
'password': None,
't-conts': None,
'gem-ports': None,
'onu-vid': None,
'channel-id':None,
'vont-ani': None
}
self.southbound_ports[onu_activate_complete_event.pon_port].onu_update(onu_info)
onu = self.southbound_ports[onu_activate_complete_event.pon_port].onu(onu_activate_complete_event.onu_id)
if onu._status_machine == 'activation_successful':
for tcont_id in onu._tconts:
self.log.info('onu_activate_complete_msg_proc tcont add', tcont_id)
onu.tcont_add_msg_send(self, tcont_id)
for gemport_id in onu._gem_ports:
self.log.info('onu_activate_complete_msg_proc gem add', gemport_id)
gemport = onu._gem_ports.get(gemport_id)
onu.add_gemport(gemport, True)
def start_onu_test(self):
self.log.info('*** Starting onu report test ***')
self.test = reactor.callLater(0, self.onu_test, 1, 1, 1)
return self.test
def onu_test(self, port, onuid, state):
onu_ranging_event = OltPonOnuRangingEvent(
pon_slot=1,
pon_port=port,
onu_id=onuid,
#sn='CIGONU' + str((port << 24) + (onuid << 8)),
sn='BRCM00000001',
ranging_state=state,
eqd=1,
distance=1
)
data=onu_ranging_event.SerializeToString()
self.onu_detected(data)
self.onu_id = onuid
self.pon_port = port
if self.onu_id < 1 :
self.pon_port = port
self.onu_id = onuid + 1
else:
self.pon_port = port + 1
self.onu_id = 1
if self.pon_port < 2:
self.test = reactor.callLater(0.07, self.onu_test, self.pon_port, self.onu_id, state)
#elif state == 1:
#self.pon_port = 1
#self.onu_id = 0
#state = 0
#self.test = reactor.callLater(0.07, self.onu_test, self.pon_port, self.onu_id, state)
@inlineCallbacks
def get_olt_info_com(self):
while (True):
try:
get_olt_info_req = OltMsgCommonHdr(
type=OLT_D_GET_OLT_INFO_REQ,
src_appId=OLT_APPID_VOLTHA,
sync=1
)
data=get_olt_info_req.SerializeToString()
self.log.info('send get olt info msg.')
self.zmq_client_sync.sync_send(data,0)
self.startup = self.get_sync_queue()
results = yield self.startup
if results == "RecvTimeoutErr.":
self.log.info('get olt info timeout.')
self.zmq_client_sync.sync_reconnect()
#returnValue (None)
else:
get_olt_info_rep = OltMsgCommonHdr()
get_olt_info_rep.ParseFromString(results[0])
self.log.info('get_olt_info_rep.type', get_olt_info_rep.type)
self.log.info('get_olt_info_rep.src_appId', get_olt_info_rep.src_appId)
self.log.info('get_olt_info_rep.sync', get_olt_info_rep.sync)
if (get_olt_info_rep.type != OLT_D_GET_OLT_INFO_ACK) or (get_olt_info_rep.src_appId != OLT_APPID_OLTD) \
or (get_olt_info_rep.sync != 1) or (len(results) < 2):
self.log.info('get OltDGetOltInfoAck err.')
returnValue (None)
else:
olt_info = OltDGetOltInfoAck()
olt_info.ParseFromString(results[1])
self.log.info('olt_info.olt_state', olt_info.olt_state)
self.log.info('olt_info.vendor', olt_info.vendor)
self.log.info('olt_info.model', olt_info.model)
self.log.info('olt_info.hardware_version', olt_info.hardware_version)
self.log.info('olt_info.firmware_version', olt_info.firmware_version)
self.log.info('olt_info.software_version', olt_info.software_version)
self.log.info('olt_info.serial_number', olt_info.serial_number)
#add mac loginfo
self.log.info('olt_info.work_mode',olt_info.work_mode)
self.log.info('olt_info.mac_address',olt_info.mac_address)
returnValue (olt_info)
except Exception as e:
self.log.exception('Exception during activate get olt info processing', e=e)
returnValue (None)
def olt_activate_msg_send(self):
try:
olt_activate_msg = OltMsgCommonHdr(
type=OLT_D_ACTIVATE_OLT,
src_appId=OLT_APPID_VOLTHA,
sync=0
)
data=olt_activate_msg.SerializeToString()
self.zmq_client_async.async_send(data,0)
except Exception as e:
self.log.exception('Exception during activate processing', e=e)
def olt_deactivate_msg_send(self):
try:
msg_hdr = OltMsgCommonHdr(
#type=OLT_D_DISABLE_OLT,
type=OLT_D_DEACTIVATE_OLT,
src_appId=OLT_APPID_VOLTHA,
sync=0
)
data = msg_hdr.SerializeToString()
self.zmq_client_async.async_send(data, 0)
self.log.info("send-deactivate-olt ok")
except Exception as e:
self.log.exception('Exception during send deactivate olt', e=e)
def olt_reboot_msg_send(self):
try:
msg_hdr = OltMsgCommonHdr(
type=OLT_D_REBOOT_OLT,
src_appId=OLT_APPID_VOLTHA,
sync=0
)
data = msg_hdr.SerializeToString()
self.zmq_client_async.async_send(data, 0)
self.log.info("send-reboot-olt ok")
except Exception as e:
self.log.exception('Exception during send deactivate olt', e=e)
def get_logical_device(self, device):
"""
Get the VOLTHA logical device
:return: VOLTHA logical device or None
"""
if self._logical_device is None:
self._logical_device = LogicalDevice(
# not setting id and datapth_id will let the adapter
# agent pick id
desc=ofp_desc(
mfr_desc='cord project',
hw_desc='n/a',
sw_desc='logical device for Cig-based PON',
serial_num=device.serial_number,
dp_desc='n/a'
),
switch_features=ofp_switch_features(
n_buffers=256, # TODO fake for now
n_tables=2, # TODO ditto
capabilities=( # TODO and ditto
OFPC_FLOW_STATS
| OFPC_TABLE_STATS
| OFPC_PORT_STATS
| OFPC_GROUP_STATS
)
),
root_device_id=device.id
)
return self._logical_device
def _get_xpon_collection(self, data):
if isinstance(data, ChannelgroupConfig):
return self._channel_groups
elif isinstance(data, ChannelpartitionConfig):
return self._channel_partitions
elif isinstance(data, ChannelpairConfig):
return self._channel_pairs
elif isinstance(data, ChannelterminationConfig):
return self._channel_terminations
elif isinstance(data, OntaniConfig):
return self._ont_anis
elif isinstance(data, VOntaniConfig):
return self._v_ont_anis
elif isinstance(data, VEnetConfig):
return self._v_enets
return None
def pon(self, pon_id):
return self.southbound_ports.get(pon_id)
@property
def channel_terminations(self):
return self._channel_terminations
@property
def channel_pairs(self):
return self._channel_pairs
@property
def channel_partitions(self):
return self._channel_partitions
@property
def ont_anis(self):
return self._ont_anis
@property
def v_ont_anis(self):
return self._v_ont_anis
@property
def v_enets(self):
return self._v_enets
@property
def tconts(self):
return self._tconts
def _data_to_dict(self, data):
name = data.name
interface = data.interface
inst_data = data.data
if isinstance(data, ChannelgroupConfig):
return 'channel-group', {
'name': name,
'enabled': interface.enabled,
'system-id': inst_data.system_id,
'polling-period': inst_data.polling_period
}
elif isinstance(data, ChannelpartitionConfig):
def _auth_method_enum_to_string(value):
from voltha.protos.bbf_fiber_types_pb2 import SERIAL_NUMBER, LOID, \
REGISTRATION_ID, OMCI, DOT1X
return {
SERIAL_NUMBER: 'serial-number',
LOID: 'loid',
REGISTRATION_ID: 'registration-id',
OMCI: 'omci',
DOT1X: 'dot1x'
}.get(value, 'unknown')
return 'channel-partition', {
'name': name,
'enabled': interface.enabled,
'authentication-method': _auth_method_enum_to_string(inst_data.authentication_method),
'channel-group': inst_data.channelgroup_ref,
'fec-downstream': inst_data.fec_downstream,
'mcast-aes': inst_data.multicast_aes_indicator,
'differential-fiber-distance': inst_data.differential_fiber_distance,
'closest_ont_distance':inst_data.closest_ont_distance
}
elif isinstance(data, ChannelpairConfig):
return 'channel-pair', {
'name': name,
'enabled': interface.enabled,
'channel-group': inst_data.channelgroup_ref,
'channel-partition': inst_data.channelpartition_ref,
'line-rate': inst_data.channelpair_linerate
}
elif isinstance(data, ChannelterminationConfig):
return 'channel-termination', {
'name': name,
'enabled': interface.enabled,
'xgs-ponid': inst_data.xgs_ponid,
'xgpon-ponid': inst_data.xgpon_ponid,
'channel-pair': inst_data.channelpair_ref,
'ber-calc-period': inst_data.ber_calc_period,
'pon_tag':inst_data.pon_tag
}
elif isinstance(data, OntaniConfig):
return 'ont-ani', {
'name': name,
'enabled': interface.enabled,
'upstream-fec': inst_data.upstream_fec_indicator,
'mgnt-gemport-aes': inst_data.mgnt_gemport_aes_indicator
}
elif isinstance(data, VOntaniConfig):
return 'vOnt-ani', {
'name': name,
'enabled': interface.enabled,
'onu-id': inst_data.onu_id,
'expected-serial-number': inst_data.expected_serial_number,
'expected-registration-id': inst_data.expected_registration_id,
'preferred-channel-pair': inst_data.preferred_chanpair,
'channel-partition': inst_data.parent_ref,
'upstream-channel-speed': inst_data.upstream_channel_speed,
'data': data
}
elif isinstance(data, VEnetConfig):
return 'vEnet', {
'name': name,
'enabled': interface.enabled,
'v-ont-ani': inst_data.v_ontani_ref
}
else:
raise NotImplementedError('Unknown data type')
@staticmethod
def _dict_diff(lhs, rhs):
"""
Compare the values of two dictionaries and return the items in 'rhs'
that are different than 'lhs. The RHS dictionary keys can be a subset of the
LHS dictionary, or the RHS dictionary keys can contain new values.
:param lhs: (dict) Original dictionary values
:param rhs: (dict) New dictionary values to compare to the original (lhs) dict
:return: (dict) Dictionary with differences from the RHS dictionary
"""
assert len(lhs.keys()) == len(set(lhs.iterkeys()) & (rhs.iterkeys())), 'Dictionary Keys do not match'
return {k: v for k, v in rhs.items() if k not in lhs or lhs[k] != rhs[k]}
def _valid_to_modify(self, item_type, valid, diffs):
bad_keys = [mod_key not in valid for mod_key in diffs]
if len(bad_keys) != 0:
self.log.warn("{} modification of '{}' not supported").format(item_type, bad_keys[0])
return False
return True
def _on_channel_group_modify(self, name, items, diffs):
if len(diffs) == 0:
return
valid_keys = ['polling-period'] # Modify of these keys supported
if self._valid_to_modify('channel-group', valid_keys, diffs.keys()):
self.log.info('TODO: Not-Implemented-yet')
# for k, v in diffs.items:
# items[name][k] = v
def _on_channel_partition_modify(self, name, items, diffs):
if len(diffs) == 0:
return
valid_keys = ['fec-downstream', 'mcast-aes', 'differential-fiber-distance']
if self._valid_to_modify('channel-partition', valid_keys, diffs.keys()):
self.log.info('TODO: Not-Implemented-yet')
# for k, v in diffs.items:
# items[name][k] = v
def _on_channel_pair_modify(self, name, items, diffs):
if len(diffs) == 0:
return
valid_keys = ['line-rate'] # Modify of these keys supported
if self._valid_to_modify('channel-pair', valid_keys, diffs.keys()):
self.log.info('TODO: Not-Implemented-yet')
# for k, v in diffs.items:
# items[name][k] = v
def _on_channel_termination_create(self, name, pon_type='xgs-ponid'):
assert name in self._channel_terminations, \
'{} is not a channel-termination'.format(name)
ct = self._channel_terminations[name]
pon_id = ct[pon_type]
# Look up the southbound PON port
pon_port = self.southbound_ports.get(pon_id, None)
if pon_port is None:
raise ValueError('Unknown PON port. PON-ID: {}'.format(pon_id))
assert ct['channel-pair'] in self._channel_pairs, \
'{} is not a channel-pair'.format(ct['channel-pair'])
cpair = self._channel_pairs[ct['channel-pair']]
assert cpair['channel-group'] in self._channel_groups, \
'{} is not a -group'.format(cpair['channel-group'])
assert cpair['channel-partition'] in self._channel_partitions, \
'{} is not a channel-partition'.format(cpair('channel-partition'))
cg = self._channel_groups[cpair['channel-group']]
cpart = self._channel_partitions[cpair['channel-partition']]
enabled = ct['enabled']
polling_period = cg['polling-period']
authentication_method = cpart['authentication-method']
# line_rate = cpair['line-rate']
downstream_fec = cpart['fec-downstream']
deployment_range = cpart['differential-fiber-distance']
# mcast_aes = cpart['mcast-aes']
# TODO: Support BER calculation period
# TODO Support setting of line rate
pon_port.xpon_name = name
pon_port.discovery_tick = polling_period
pon_port.authentication_method = authentication_method
pon_port.deployment_range = deployment_range * 1000 # pon-agent uses meters
pon_port.downstream_fec_enable = downstream_fec
# TODO: For now, upstream FEC = downstream
pon_port.upstream_fec_enable = downstream_fec
# TODO: pon_port.mcast_aes = mcast_aes
pon_port.admin_state = AdminState.ENABLED if enabled else AdminState.DISABLED
def _on_channel_termination_modify(self, name, items, diffs):
if len(diffs) == 0:
return
valid_keys = ['enabled'] # Modify of these keys supported
if self._valid_to_modify('channel-termination', valid_keys, diffs.keys()):
self.log.info('TODO: Not-Implemented-yet')
# for k, v in diffs.items:
# items[name][k] = v
def _on_channel_termination_delete(self, name, pon_type='xgs-ponid'):
assert name in self._channel_terminations, \
'{} is not a channel-termination'.format(name)
ct = self._channel_terminations[name]
# Look up the southbound PON port
pon_id = ct[pon_type]
pon_port = self.southbound_ports.get(pon_id, None)
if pon_port is None:
raise ValueError('Unknown PON port. PON-ID: {}'.format(pon_id))
pon_port.admin_state = AdminState.DISABLED
def _on_vont_ani_create(self, name):
assert name in self._v_ont_anis, \
'{} is not a v_ont_ani'.format(name)
vani = self._v_ont_anis[name]
va_cp_name = vani['preferred-channel-pair']
if vani['preferred-channel-pair'] in self.channel_pairs:
for ct_name in self._channel_terminations:
#ct_name = ct['channel-pair']
ct_cp_name = self._channel_terminations[ct_name]['channel-pair']
if va_cp_name == ct_cp_name:
ct = self._channel_terminations[ct_name]
pon_id = ct['xgs-ponid']
# Look up the southbound PON port
pon_port = self.southbound_ports.get(pon_id, None)
if pon_port is not None:
sn = vani['expected-serial-number']
onu_id = vani['onu-id']
if pon_port.onu_exist_check(onu_id)==False:
#add onu
onu_info = {
'name': vani['name'],
'device-id': self.device_id,
'serial-number': sn,
'xpon-name': None,
'pon': pon_port,
'onu-id': onu_id,
'ranging-status': 0,
'config-status': 1,
'status_machine': 'init',
'enabled': vani['enabled'],
'channel-partition': vani['channel-partition'],
'expected-registration-id': vani['expected-registration-id'],
'upstream-channel-speed': vani['upstream-channel-speed'],
'upstream-fec': True,
'password': Onu.DEFAULT_PASSWORD,
't-conts': None,
'gem-ports': None,
'onu-vid': None,
'channel-id': pon_id + 100,
'vont-ani': vani
}
pon_port.onu_add(onu_info)
else:
#update onu
onu_info = {
'name': vani['name'],
'device-id': self.device_id,
'serial-number': sn,
'xpon-name': None,
'pon': pon_port,
'onu-id': onu_id,
'ranging-status': None,
'config-status': 1,
'status_machine': None,
'enabled': vani['enabled'],
'channel-partition': vani['channel-partition'],
'expected-registration-id': vani['expected-registration-id'],
'upstream-channel-speed': vani['upstream-channel-speed'],
'upstream-fec': None,
'password': None,
't-conts': None,
'gem-ports': None,
'onu-vid': None,
'channel-id':None,
'vont-ani': vani
}
pon_port.onu_update(onu_info)
else:
pass
else:
pass
else:
pass
else:
pass
#if vani['protection-channel-pair'] in self.channel_pairs:
#for ct in self._channel_terminations:
#if ct['channel-pair']== vani['protection-channel-pair']:
#pon_id = ct['xgs-ponid']
# Look up the southbound PON port
#pon_port = self.southbound_ports.get(pon_id, None)
#if pon_port is not None:
#sn = vani['expected-serial-number']
#onu_id = vani['onu-id']
#if pon_port.onu_exist_check(onuid)==False:
#add onu
#onu_info = {
#'device-id': self.device_id,
#'serial-number': sn,
#'xpon-name': None,
#'pon': pon_port,
#'onu-id': onu_id,
#'enabled': enabled,
#'upstream-channel-speed': vani['upstream-channel-speed'],
#'password': Onu.DEFAULT_PASSWORD,
#'t-conts': None,
#'gem-ports': None,
#'onu-vid': None,
#'vont-ani': vani
#}
#pon_port.onu_add(onu_info)
#else:
#update onu
def _on_vont_ani_delete(self, name):
assert name in self._v_ont_anis, \
'{} is not a v_ont_ani'.format(name)
vani = self._v_ont_anis[name]
va_cp_name = vani['preferred-channel-pair']
if vani['preferred-channel-pair'] in self.channel_pairs:
for ct_name in self._channel_terminations:
ct_cp_name = self._channel_terminations[ct_name]['channel-pair']
if va_cp_name == ct_cp_name:
ct = self._channel_terminations[ct_name]
pon_id = ct['xgs-ponid']
# Look up the southbound PON port
pon_port = self.southbound_ports.get(pon_id, None)
if pon_port is not None:
onu_id = vani['onu-id']
onu_info = {
'name': vani['name'],
'device-id': self.device_id,
'serial-number': None,
'xpon-name': None,
'pon': pon_port,
'onu-id': onu_id,
'ranging-status': None,
'config-status': 0,
'status_machine': None,
'enabled': None,
'channel-partition': None,
'expected-registration-id': None,
'upstream-channel-speed': None,
'upstream-fec': None,
'password': None,
't-conts': None,
'gem-ports': None,
'onu-vid': None,
'channel-id':None,
'vont-ani': None
}
pon_port.onu_update(onu_info)
else:
pass
else:
pass
else:
pass
else:
pass
#if vani['protection-channel-pair'] in self.channel_pairs:
#for ct in self._channel_terminations:
#if ct['channel-pair']== vani['protection-channel-pair']:
#pon_id = ct['xgs-ponid']
# Look up the southbound PON port
#pon_port = self.southbound_ports.get(pon_id, None)
#if pon_port is not None:
#sn = vani['expected-serial-number']
#onu_id = vani['onu-id']
#if pon_port.onu_exist_check(onuid)==False:
#add onu
#onu_info = {
#'device-id': self.device_id,
#'serial-number': sn,
#'xpon-name': None,
#'pon': pon_port,
#'onu-id': onu_id,
#'enabled': enabled,
#'upstream-channel-speed': vani['upstream-channel-speed'],
#'password': Onu.DEFAULT_PASSWORD,
#'t-conts': None,
#'gem-ports': None,
#'onu-vid': None,
#'vont-ani': vani
#}
#pon_port.onu_add(onu_info)
#else:
#update onu
def _on_ont_ani_create(self, name):
assert name in self._ont_anis, \
'{} is not a ont_ani'.format(name)
ani = self._ont_anis[name]
upstream_fec = ani['upstream-fec']
#if upstream_fec == 1:
#return
vani = self._v_ont_anis[name]
if vani is None:
return
va_cp_name = vani['preferred-channel-pair']
if va_cp_name in self.channel_pairs:
for ct_name in self._channel_terminations:
ct_cp_name = self._channel_terminations[ct_name]['channel-pair']
if va_cp_name == ct_cp_name:
ct = self._channel_terminations[ct_name]
pon_id = ct['xgs-ponid']
# Look up the southbound PON port
pon_port = self.southbound_ports.get(pon_id, None)
if pon_port is not None:
sn = vani['expected-serial-number']
onu_id = vani['onu-id']
if pon_port.onu_exist_check(onu_id)==True:
#update onu
onu_info = {
'name': vani['name'],
'device-id': self.device_id,
'serial-number': sn,
'xpon-name': None,
'pon': pon_port,
'onu-id': onu_id,
'ranging-status': None,
'config-status': None,
'status_machine': None,
'enabled': None,
'channel-partition': None,
'expected-registration-id': None,
'upstream-channel-speed': None,
'upstream-fec': upstream_fec,
'password': None,
't-conts': None,
'gem-ports': None,
'onu-vid': None,
'channel-id':None,
'vont-ani': None
}
pon_port.onu_update(onu_info)
else:
pass
else:
pass
else:
pass
else:
pass
def create_interface(self, data):
"""
Create XPON interfaces
:param data: (xpon config info)
"""
self.log.debug('create-interface', interface=data.interface, inst_data=data.data)
name = data.name
items = self._get_xpon_collection(data)
if items is not None and name not in items:
self._cached_xpon_pon_info = {} # Clear cached data
item_type, new_item = self._data_to_dict(data)
#self.log.debug('new-item', item_type=item_type, item=new_item)
if name not in items:
self.log.debug('new-item', item_type=item_type, item=new_item)
items[name] = new_item
if isinstance(data, ChannelterminationConfig):
self._on_channel_termination_create(name)
self.configure_pon(name)
elif isinstance(data, VOntaniConfig):
self._on_vont_ani_create(name)
elif isinstance(data, OntaniConfig):
self._on_ont_ani_create(name)
else:
pass
def configure_pon(self,name,pon_type='xgs-ponid'):
self.log.debug('configure-pon', name=name)
try:
assert name in self._channel_terminations, \
'{} is not a channel-termination'.format(name)
ct = self._channel_terminations[name]
pon_id = ct[pon_type]
# Look up the southbound PON port
pon_port = pon_id
assert ct['channel-pair'] in self._channel_pairs, \
'{} is not a channel-pair'.format(ct['channel-pair'])
cpair = self._channel_pairs[ct['channel-pair']]
assert cpair['channel-group'] in self._channel_groups, \
'{} is not a -group'.format(cpair['channel-group'])
assert cpair['channel-partition'] in self._channel_partitions, \
'{} is not a channel-partition'.format(cpair('channel-partition'))
cg = self._channel_groups[cpair['channel-group']]
cpart = self._channel_partitions[cpair['channel-partition']]
pon_tag = ct['pon_tag']
closest_ont_distance = cpart['closest_ont_distance']
differential_fiber_distance = cpart['differential-fiber-distance']
fec_downstream = cpart['fec-downstream']
aes_downstream = 1
aes_upstream = 1
pon_profile = 0
bwmap_cycle = 8
discover_period = cg['polling-period']
msg_hdr = OltMsgCommonHdr(
type=OLT_PON_CONFIGURE_PON,
src_appId=OLT_APPID_VOLTHA,
sync=0
)
data = msg_hdr.SerializeToString()
self.zmq_client_async.async_send(data, 1)
config_pon = OltPonConfigurePon(
pon_port = pon_port,
pon_id = pon_id,
pon_tag = pon_tag,
closest_ont_distance = closest_ont_distance,
differential_fiber_distance = differential_fiber_distance,
fec_downstream = fec_downstream,
aes_downstream = aes_downstream,
aes_upstream = aes_upstream,
pon_profile = pon_profile,
bwmap_cycle = bwmap_cycle,
discover_period = discover_period
)
data = config_pon.SerializeToString()
self.zmq_client_async.async_send(data, 0)
self.log.debug('send configure-pon message success',config_pon=config_pon)
except Exception as e:
self.log.exception('Exception during configure pon', e=e)
def update_interface(self, data):
"""
Update XPON interfaces
:param data: (xpon config info)
"""
self.log.debug('update_interface', interface=data.interface, inst_data=data.data)
name = data.name
items = self._get_xpon_collection(data)
if items is None:
raise ValueError('Unknown data type: {}'.format(type(data)))
existing_item = items.get(name)
if existing_item is None:
raise KeyError("'{}' not found. Type: {}".format(name, type(data)))
item_type, update_item = self._data_to_dict(data)
self.log.debug('update-item', item_type=item_type, item=update_item)
# Calculate the difference
diffs = self._dict_diff(existing_item, update_item)
if len(diffs) == 0:
self.log.debug('update-item-no-diffs')
self._cached_xpon_pon_info = {} # Clear cached data
# Act on changed items
if isinstance(data, ChannelgroupConfig):
self._on_channel_group_modify(name, items, diffs)
#raise NotImplementedError('TODO: not yet supported')
elif isinstance(data, ChannelpartitionConfig):
self._on_channel_partition_modify(name, items, diffs)
#raise NotImplementedError('TODO: not yet supported')
elif isinstance(data, ChannelpairConfig):
self._on_channel_pair_modify(name, items, diffs)
#raise NotImplementedError('TODO: not yet supported')
elif isinstance(data, ChannelterminationConfig):
self._on_channel_termination_modify(name, items, diffs)
#raise NotImplementedError('TODO: not yet supported')
elif isinstance(data, OntaniConfig):
raise NotImplementedError('TODO: not yet supported')
elif isinstance(data, VOntaniConfig):
raise NotImplementedError('TODO: not yet supported')
elif isinstance(data, VEnetConfig):
raise NotImplementedError('TODO: not yet supported')
else:
raise NotImplementedError('Unknown data type')
def remove_interface(self, data):
"""
Deleete XPON interfaces
:param data: (xpon config info)
"""
self.log.debug('remove_interface', interface=data.interface, inst_data=data.data)
name = data.name
items = self._get_xpon_collection(data)
item = items.get(name)
self.log.debug('delete-interface', name=name, data=data)
self.log.debug('remove_interface len(items)', len(items))
if item is not None:
self._cached_xpon_pon_info = {} # Clear cached data
#del items[name]
if isinstance(data, ChannelgroupConfig):
pass # Rely upon xPON logic to not allow delete of a referenced group
elif isinstance(data, ChannelpartitionConfig):
pass # Rely upon xPON logic to not allow delete of a referenced partition
elif isinstance(data, ChannelpairConfig):
pass # Rely upon xPON logic to not allow delete of a referenced pair
elif isinstance(data, ChannelterminationConfig):
self._on_channel_termination_delete(name)
elif isinstance(data, OntaniConfig):
pass
elif isinstance(data, VOntaniConfig):
self._on_vont_ani_delete(name)
#pass
elif isinstance(data, VEnetConfig):
pass
else:
raise NotImplementedError('Unknown data type')
del items[name]
self.log.debug('remove_interface len(items)', len(items))
#raise NotImplementedError('TODO: not yet supported')
def create_tcont(self, tcont_data, traffic_descriptor_data):
"""
Create TCONT information
:param tcont_data:
:param traffic_descriptor_data:
"""
self.log.debug('create-tcont', tcont=tcont_data, td=traffic_descriptor_data)
traffic_descriptor = TrafficDescriptor.create(traffic_descriptor_data)
tcont = TCont.create(tcont_data, traffic_descriptor)
if tcont.name not in self._tconts:
self._cached_xpon_pon_info = {} # Clear cached data
self._tconts[tcont.name] = tcont
# Update any ONUs referenced
tcont.xpon_create(self)
if traffic_descriptor.name not in self._traffic_descriptors:
self._traffic_descriptors[traffic_descriptor.name] = traffic_descriptor
# Update any ONUs referenced
traffic_descriptor.xpon_create(self, tcont)
def update_tcont(self, tcont_data, traffic_descriptor_data):
"""
Update TCONT information
:param tcont_data:
:param traffic_descriptor_data:
"""
self.log.debug('update-tcont', tcont=tcont_data, td=traffic_descriptor_data)
if tcont_data.name not in self._tconts:
raise KeyError("TCONT '{}' does not exists".format(tcont_data.name))
if traffic_descriptor_data.name not in self._traffic_descriptors:
raise KeyError("Traffic Descriptor '{}' does not exists".
format(traffic_descriptor_data.name))
self._cached_xpon_pon_info = {} # Clear cached data
traffic_descriptor = TrafficDescriptor.create(traffic_descriptor_data)
tcont = TCont.create(tcont_data, traffic_descriptor)
#
# Update any ONUs referenced
# tcont.xpon_update(self)
# traffic_descriptor.xpon_update(self, tcont)
pass
raise NotImplementedError('TODO: Not yet supported')
def remove_tcont(self, tcont_data, traffic_descriptor_data):
"""
Remove TCONT information
:param tcont_data:
:param traffic_descriptor_data:
"""
self.log.debug('remove-tcont', tcont=tcont_data, td=traffic_descriptor_data)
tcont = self._tconts.get(tcont_data.name)
traffic_descriptor = self._traffic_descriptors.get(traffic_descriptor_data.name)
if traffic_descriptor is not None:
del self._traffic_descriptors[traffic_descriptor_data.name]
self._cached_xpon_pon_info = {} # Clear cached data
pass # Perform any needed operations
#raise NotImplementedError('TODO: Not yet supported')
if tcont is not None:
#del self._tconts[tcont_data.name]
self._cached_xpon_pon_info = {} # Clear cached data
#Update any ONUs referenced
tcont.xpon_delete(self)
del self._tconts[tcont_data.name]
pass # Perform any needed operations
#raise NotImplementedError('TODO: Not yet supported')
def create_gemport(self, gemport_data):
"""
Create GEM Port
:param data:
"""
self.log.debug('create-gemport', gemport=gemport_data)
gemport = Gemport.create(gemport_data)
if gemport.name not in self._gemports:
self._cached_xpon_pon_info = {} # Clear cached data
self._gemports[gemport.name] = gemport
# Update any ONUs referenced
gemport.xpon_create(self)
def remove_gemport(self, data):
"""
Delete GEM Port
:param data:
"""
self.log.debug('remove-gemport', gem_port=data.name)
gemport = self._gemports.get(data.name)
if gemport is not None:
#del self._tconts[tcont_data.name]
self._cached_xpon_pon_info = {} # Clear cached data
#Update any ONUs referenced
gemport.xpon_delete(self)
del self._gemports[data.name]
pass # Perform any needed operations
def update_gemport(self, data):
"""
Update GEM Port
:param data:
"""
self.log.debug('update-gemport', gem_port=data)
pass
def _unregister_for_inter_adapter_messages(self):
try:
self.adapter_agent.unregister_for_inter_adapter_messages()
except:
pass
def _delete_logical_device(self):
ldi, self.logical_device_id = self.logical_device_id, None
if ldi is None:
return
self.log.debug('delete-logical-device', ldi=ldi)
logical_device = self.adapter_agent.get_logical_device(ldi)
self.adapter_agent.delete_logical_device(logical_device)
device = self.adapter_agent.get_device(self.device_id)
device.parent_id = ''
# Update the logical device mapping
if ldi in self.adapter.logical_device_id_to_root_device_id:
del self.adapter.logical_device_id_to_root_device_id[ldi]
def _cancel_deferred(self):
d1, self.heartbeat = self.heartbeat, None
d2, self.asyncmsg = self.asyncmsg, None
d3, self.omcimsg = self.omcimsg, None
d4, self.packetmsg = self.packetmsg, None
for d in [d1, d2, d3, d4]:
try:
if d is not None and not d.called:
d.cancel()
except:
pass
def _zmq_shutdown(self):
self.zmq_client_sub.sub_shutdown()
self.zmq_client_sync.sync_shutdown()
self.zmq_client_async.async_shutdown()
self.zmq_client_omci.omci_shutdown()
self.zmq_client_of_packet.packet_shutdown()
def _finish_reboot(self):
if self.heartbrat_status == 1:
self.log.info('reboot self.reboot_check_times:', self.reboot_check_times)
self.reboot_check_times = 0
# Reenable all child devices
self.adapter_agent.update_child_devices_state(self.device_id,
admin_state=AdminState.ENABLED)
#self.olt_activate_msg_send()
#self.work_status = 1
#for channel_term in self._channel_terminations:
#self.log.info('self._channel_terminations:', channel_term)
#self._on_channel_termination_create(channel_term)
#self.configure_pon(channel_term)
#for vont_ani in self._v_ont_anis:
#self.log.info('self._v_ont_anis:', vont_ani)
#self._on_vont_ani_create(vont_ani)
self.reboot_status = 0
self.log.info('rebooted', device_id=self.device_id)
else:
self.reboot_check_times += 1
if self.reboot_check_times < 20:
self.startup = reactor.callLater(10, self._finish_reboot)
else:
self.log.info('reboot fail. olt is unreachable.', device_id=self.device_id)
def disable(self):
self.log.info('disabling', device_id=self.device_id)
#send deactivate msg
self.olt_deactivate_msg_send()
self.work_status = 0
# Cancel any running enable/disable/... in progress
d, self.startup = self.startup, None
try:
if d is not None and not d.called:
d.cancel()
except:
pass
self._cancel_deferred()
self._unregister_for_inter_adapter_messages()
device = self.adapter_agent.get_device(self.device_id)
device.oper_status = OperStatus.UNKNOWN
device.connect_status = ConnectStatus.UNREACHABLE
#device.admin_state = AdminState.DISABLED
self.adapter_agent.update_device(device)
# Remove the logical device
logical_device = self.adapter_agent.get_logical_device(
self.logical_device_id)
self.adapter_agent.delete_logical_device(logical_device)
# Disable all child devices first
self.adapter_agent.update_child_devices_state(self.device_id,
oper_status=OperStatus.UNKNOWN,
connect_status=ConnectStatus.UNREACHABLE,
admin_state=AdminState.DISABLED)
# Remove the peer references from this device
self.adapter_agent.delete_all_peer_references(self.device_id)
for port in self.southbound_ports.itervalues():
port.delete()
# Set all ports to disabled
self.adapter_agent.disable_all_ports(self.device_id)
# Update the logice device mapping
if self.logical_device_id in \
self.adapter.logical_device_id_to_root_device_id:
del self.adapter.logical_device_id_to_root_device_id[
self.logical_device_id]
if self.logical_device_id is not None:
self.logical_device_id = None
#self._delete_logical_device()
self.log.info('disabled', device_id=device.id)
#zmq shutdown
#self._zmq_shutdown()
def reenable(self,done_deferred=None):
"""
This is called when a previously disabled device needs to be enabled based on a NBI call.
:param done_deferred: (Deferred) Deferred to fire when done
"""
self.log.info('re-enabling', device_id=self.device_id)
# Cancel any running enable/disable/... in progress
d, self.startup = self.startup, None
try:
if d is not None and not d.called:
d.cancel()
except:
pass
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
# Update the connect status to REACHABLE
device.connect_status = ConnectStatus.REACHABLE
device.oper_status = OperStatus.ACTIVATING
self.adapter_agent.update_device(device)
# Set all ports to enabled
self.adapter_agent.enable_all_ports(self.device_id)
# Recreate the logical device
logical_device = self.get_logical_device(device)
self._logical_device = self.adapter_agent.create_logical_device(logical_device,dpid=self.olt_mac)
# Recreate logical ports for all southbound and northbound interfaces
for port_no in self.northbound_ports:
logical_port = self.northbound_ports[port_no].get_logical_port()
self.adapter_agent.add_logical_port(self._logical_device.id, logical_port)
# update device active status now
device = self.adapter_agent.get_device(device.id)
device.parent_id = self._logical_device.id
device.oper_status = OperStatus.ACTIVE
device.reason = ''
self.logical_device_id = self._logical_device.id
self.adapter_agent.update_device(device)
# Reenable all child devices
self.adapter_agent.update_child_devices_state(device.id,
oper_status=OperStatus.DISCOVERED,
connect_status=ConnectStatus.REACHABLE,
admin_state=AdminState.ENABLED)
self.olt_activate_msg_send()
self.work_status = 1
for channel_term in self._channel_terminations:
self.log.info('self._channel_terminations:', channel_term)
self._on_channel_termination_create(channel_term)
self.configure_pon(channel_term)
for vont_ani in self._v_ont_anis:
self.log.info('self._v_ont_anis:', vont_ani)
self._on_vont_ani_create(vont_ani)
for ont_ani in self._ont_anis:
self.log.info('self._ont_anis:', ont_ani)
self._on_ont_ani_create(ont_ani)
#start heart_beat pm_collect
self.log.debug('Starting heartbeat')
self.start_heartbeat(delay=5)
#self.start_onu_test()
self.start_poll_async_msg()
self.start_poll_omci_msg()
#self.start_kpi_collection()
self.start_poll_packet_in_msg()
self.log.info('re-enabled', device_id=device.id)
def reboot(self):
self.log.info('rebooting', device_id=self.device_id)
# Update the operational status to ACTIVATING and connect status to
# UNREACHABLE
#device = self.adapter_agent.get_device(self.device_id)
#previous_oper_status = device.oper_status
#previous_conn_status = device.connect_status
#device.oper_status = OperStatus.ACTIVATING
#device.connect_status = ConnectStatus.UNREACHABLE
#self.adapter_agent.update_device(device)
# Update the child devices connect state to UNREACHABLE
#self.adapter_agent.update_child_devices_state(self.device_id,
# connect_status=ConnectStatus.UNREACHABLE)
# Disable all child devices first
self.adapter_agent.update_child_devices_state(self.device_id,
admin_state=AdminState.DISABLED)
# Remove the peer references from this device
self.adapter_agent.delete_all_peer_references(self.device_id)
for port in self.southbound_ports.itervalues():
port.delete()
self.olt_reboot_msg_send()
self.reboot_status = 1
self.heartbrat_status = 0
self.reboot_check_times = 0
self.startup = reactor.callLater(10, self._finish_reboot)
def delete(self):
self.log.info('deleting', device_id=self.device_id)
#send deactivate msg
#self.olt_deactivate_msg_send()
#self._cancel_deferred()
# Drop registration for adapter messages
#self._unregister_for_inter_adapter_messages()
# Cancel any outstanding tasks
#d, self.startup = self.startup, None
#try:
#if d is not None and not d.called:
#d.cancel()
#except:
#pass
# clear xpon config
self._channel_groups.clear
self._channel_partitions.clear
self._channel_pairs.clear
self._channel_terminations.clear
self._v_ont_anis.clear
self._ont_anis.clear
self._v_enets.clear
self._tconts.clear
self._traffic_descriptors.clear
self._gemports.clear
self._cached_xpon_pon_info.clear
# Remove all child devices
self.adapter_agent.delete_all_child_devices(self.device_id)
self.log.info("_delete_logical_device")
# Remove the logical device (should already be gone if disable came first)
if self.logical_device_id is not None:
logical_device = self.adapter_agent.get_logical_device(
self.logical_device_id)
self.adapter_agent.delete_logical_device(logical_device)
#self.log.info("delete_all_peer_references")
# Remove the peer references from this device
#self.adapter_agent.delete_all_peer_references(self.device_id)
# Update the logice device mapping
if self.logical_device_id in \
self.adapter.logical_device_id_to_root_device_id:
del self.adapter.logical_device_id_to_root_device_id[
self.logical_device_id]
if self.logical_device_id is not None:
self.logical_device_id = None
# Tell all ports to stop any background processing
for port in self.northbound_ports.itervalues():
port.delete()
for port in self.southbound_ports.itervalues():
port.delete()
self.northbound_ports.clear()
self.southbound_ports.clear()
# Shutdown communications with OLT
self._zmq_shutdown()
self.log.info('deleted', device_id=self.device_id)
def _update_download_status(self, request, download):
if download is not None:
request.state = download.download_state
request.reason = download.failure_reason
request.image_state = download.image_state
request.additional_info = download.additional_info
request.downloaded_bytes = download.downloaded_bytes
else:
request.state = ImageDownload.DOWNLOAD_UNKNOWN
request.reason = ImageDownload.UNKNOWN_ERROR
request.image_state = ImageDownload.IMAGE_UNKNOWN
request.additional_info = "Download request '{}' not found".format(request.name)
request.downloaded_bytes = 0
self.adapter_agent.update_image_download(request)
def start_download(self, device, request, done):
"""
This is called to request downloading a specified image into
the standby partition of a device based on a NBI call.
:param device: A Voltha.Device object.
:param request: A Voltha.ImageDownload object.
:param done: (Deferred) Deferred to fire when done
:return: (Deferred) Shall be fired to acknowledge the download.
"""
log.info('image_download', request=request)
try:
if request.name in self._downloads:
raise Exception("Download request with name '{}' already exists".
format(request.name))
try:
download = Download.create(self, request, self._download_protocols)
except Exception:
request.additional_info = 'Download request creation failed due to exception'
raise
try:
self._downloads[download.name] = download
self._update_download_status(request, download)
done.callback('started')
return done
except Exception:
request.additional_info = 'Download request startup failed due to exception'
del self._downloads[download.name]
download.cancel_download(request)
raise
except Exception as e:
self.log.exception('create', e=e)
request.reason = ImageDownload.UNKNOWN_ERROR
request.state = ImageDownload.DOWNLOAD_FAILED
if not request.additional_info:
request.additional_info = e.message
self.adapter_agent.update_image_download(request)
# restore admin state to enabled
device.admin_state = AdminState.ENABLED
self.adapter_agent.update_device(device)
raise
def download_status(self, device, request, done):
"""
This is called to inquire about a requested image download status based
on a NBI call.
The adapter is expected to update the DownloadImage DB object with the
query result
:param device: A Voltha.Device object.
:param request: A Voltha.ImageDownload object.
:param done: (Deferred) Deferred to fire when done
:return: (Deferred) Shall be fired to acknowledge
"""
log.info('download_status', request=request)
download = self._downloads.get(request.name)
self._update_download_status(request, download)
if request.state != ImageDownload.DOWNLOAD_STARTED:
# restore admin state to enabled
device.admin_state = AdminState.ENABLED
self.adapter_agent.update_device(device)
done.callback(request.state)
return done
def cancel_download(self, device, request, done):
"""
This is called to cancel a requested image download based on a NBI
call. The admin state of the device will not change after the
download.
:param device: A Voltha.Device object.
:param request: A Voltha.ImageDownload object.
:param done: (Deferred) Deferred to fire when done
:return: (Deferred) Shall be fired to acknowledge
"""
log.info('cancel_download', request=request)
download = self._downloads.get(request.name)
if download is not None:
del self._downloads[request.name]
result = download.cancel_download(request)
self._update_download_status(request, download)
done.callback(result)
else:
self._update_download_status(request, download)
done.errback(KeyError('Download request not found'))
if device.admin_state == AdminState.DOWNLOADING_IMAGE:
device.admin_state = AdminState.ENABLED
self.adapter_agent.update_device(device)
return done
def activate_image(self, device, request, done):
"""
This is called to activate a downloaded image from a standby partition
into active partition.
Depending on the device implementation, this call may or may not
cause device reboot. If no reboot, then a reboot is required to make
the activated image running on device
:param device: A Voltha.Device object.
:param request: A Voltha.ImageDownload object.
:param done: (Deferred) Deferred to fire when done
:return: (Deferred) OperationResponse object.
"""
log.info('activate_image', request=request)
download = self._downloads.get(request.name)
if download is not None:
del self._downloads[request.name]
result = download.activate_image()
self._update_download_status(request, download)
done.callback(result)
else:
self._update_download_status(request, download)
done.errback(KeyError('Download request not found'))
# restore admin state to enabled
device.admin_state = AdminState.ENABLED
self.adapter_agent.update_device(device)
return done
def revert_image(self, device, request, done):
"""
This is called to deactivate the specified image at active partition,
and revert to previous image at standby partition.
Depending on the device implementation, this call may or may not
cause device reboot. If no reboot, then a reboot is required to
make the previous image running on device
:param device: A Voltha.Device object.
:param request: A Voltha.ImageDownload object.
:param done: (Deferred) Deferred to fire when done
:return: (Deferred) OperationResponse object.
"""
log.info('revert_image', request=request)
download = self._downloads.get(request.name)
if download is not None:
del self._downloads[request.name]
result = download.revert_image()
self._update_download_status(request, download)
done.callback(result)
else:
self._update_download_status(request, download)
done.errback(KeyError('Download request not found'))
# restore admin state to enabled
device.admin_state = AdminState.ENABLED
self.adapter_agent.update_device(device)
return done