VOL-1452 Use a more similar version of openolt.py to existing code.
Also pass around core_proxy in the place of adapter_agent
Change-Id: If79c5dbee342bbea1aaedc80ce2606ca4ff138d6
diff --git a/python/adapters/openolt/openolt.py b/python/adapters/openolt/openolt.py
index 4c18066..fd4e6d1 100644
--- a/python/adapters/openolt/openolt.py
+++ b/python/adapters/openolt/openolt.py
@@ -20,6 +20,8 @@
import arrow
import grpc
import structlog
+
+from zope.interface import implementer
from google.protobuf.empty_pb2 import Empty
from google.protobuf.json_format import MessageToDict
from scapy.layers.inet import Raw
@@ -33,7 +35,7 @@
from twisted.internet.task import LoopingCall
from pyvoltha.adapters.common.frameio.frameio import BpfProgramFilter, hexify
-from pyvoltha.adapters.iadapter import OltAdapter
+from pyvoltha.adapters.iadapter import IAdapterInterface
from pyvoltha.common.utils.asleep import asleep
from pyvoltha.common.utils.registry import registry
from pyvoltha.adapters.kafka.kafka_proxy import get_kafka_proxy
@@ -77,109 +79,9 @@
}
}
-class AdapterPmMetrics:
- def __init__(self, device):
- self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
- 'tx_256_511_pkts', 'tx_512_1023_pkts',
- 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
- 'rx_64_pkts', 'rx_65_127_pkts',
- 'rx_128_255_pkts', 'rx_256_511_pkts',
- 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
- 'rx_1519_9k_pkts'}
- self.device = device
- self.id = device.id
- self.name = 'ponsim_olt'
- self.default_freq = 150
- self.grouped = False
- self.freq_override = False
- self.pon_metrics_config = dict()
- self.nni_metrics_config = dict()
- self.lc = None
- for m in self.pm_names:
- self.pon_metrics_config[m] = PmConfig(name=m,
- type=PmConfig.COUNTER,
- enabled=True)
- self.nni_metrics_config[m] = PmConfig(name=m,
- type=PmConfig.COUNTER,
- enabled=True)
- def update(self, pm_config):
- if self.default_freq != pm_config.default_freq:
- # Update the callback to the new frequency.
- self.default_freq = pm_config.default_freq
- self.lc.stop()
- self.lc.start(interval=self.default_freq / 10)
- for m in pm_config.metrics:
- self.pon_metrics_config[m.name].enabled = m.enabled
- self.nni_metrics_config[m.name].enabled = m.enabled
-
- def make_proto(self):
- pm_config = PmConfigs(
- id=self.id,
- default_freq=self.default_freq,
- grouped=False,
- freq_override=False)
- for m in sorted(self.pon_metrics_config):
- pm = self.pon_metrics_config[m] # Either will do they're the same
- pm_config.metrics.extend([PmConfig(name=pm.name,
- type=pm.type,
- enabled=pm.enabled)])
- return pm_config
-
- def collect_port_metrics(self, channel):
- rtrn_port_metrics = dict()
- stub = ponsim_pb2.PonSimStub(channel)
- stats = stub.GetStats(Empty())
- rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
- rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
- return rtrn_port_metrics
-
- def extract_pon_metrics(self, stats):
- rtrn_pon_metrics = dict()
- for m in stats.metrics:
- if m.port_name == "pon":
- for p in m.packets:
- if self.pon_metrics_config[p.name].enabled:
- rtrn_pon_metrics[p.name] = p.value
- return rtrn_pon_metrics
-
- def extract_nni_metrics(self, stats):
- rtrn_pon_metrics = dict()
- for m in stats.metrics:
- if m.port_name == "nni":
- for p in m.packets:
- if self.pon_metrics_config[p.name].enabled:
- rtrn_pon_metrics[p.name] = p.value
- return rtrn_pon_metrics
-
- def start_collector(self, callback):
- log.info("starting-pm-collection", device_name=self.name,
- device_id=self.device.id)
- prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
- self.lc = LoopingCall(callback, self.device.id, prefix)
- self.lc.start(interval=self.default_freq / 10)
-
- def stop_collector(self):
- log.info("stopping-pm-collection", device_name=self.name,
- device_id=self.device.id)
- self.lc.stop()
-
-
-class AdapterAlarms:
- def __init__(self, adapter, device):
- self.adapter = adapter
- self.device = device
- self.lc = None
-
- # TODO: Implement code to send to kafka cluster directly instead of
- # going through the voltha core.
- def send_alarm(self, context_data, alarm_data):
- log.debug("send-alarm-not-implemented")
- return
-
-
-
-class OpenoltAdapter(OltAdapter):
+@implementer(IAdapterInterface)
+class OpenoltAdapter(object):
name = 'openolt'
supported_device_types = [
@@ -193,26 +95,16 @@
# System Init Methods #
def __init__(self, core_proxy, adapter_proxy, config):
- super(OpenoltAdapter, self).__init__(core_proxy=core_proxy,
- adapter_proxy=adapter_proxy,
- config=config,
- device_handler_class=OpenoltHandler,
- name='openolt',
- vendor='Voltha project',
- version='0.4',
- device_type='openolt',
- accepts_bulk_flow_update=True,
- accepts_add_remove_flow_updates=False)
self.adapter_proxy = adapter_proxy
- self.core_proxy = core_proxy
+ self.adapter_agent = core_proxy
self.config = config
self.descriptor = Adapter(
id=self.name,
vendor='OLT white box vendor',
version='0.1',
- config=config
+ config=AdapterConfig(log_level=LogLevel.INFO)
)
- log.debug('openolt.__init__', adapter_proxy=adapter_proxy)
+ log.debug('openolt.__init__', adapter_agent=adapter_proxy)
self.devices = dict() # device_id -> OpenoltDevice()
self.interface = registry('main').get_args().interface
self.logical_device_id_to_root_device_id = dict()
@@ -224,8 +116,6 @@
def stop(self):
log.info('stopped', interface=self.interface)
-
- # Info Methods #
def adapter_descriptor(self):
log.debug('get descriptor', interface=self.interface)
return self.descriptor
@@ -239,47 +129,76 @@
log.debug('get health', interface=self.interface)
raise NotImplementedError()
- def get_device_details(self, device):
- log.debug('get_device_details', device=device)
- raise NotImplementedError()
-
-
- # Device Operation Methods #
def change_master_state(self, master):
log.debug('change_master_state', interface=self.interface,
master=master)
raise NotImplementedError()
+ def adopt_device(self, device):
+ log.info('adopt-device', device=device)
+
+ kwargs = {
+ 'support_classes': OpenOltDefaults['support_classes'],
+ 'adapter_proxy': self.adapter_proxy,
+ 'adapter_agent': self.adapter_agent,
+ 'device': device,
+ 'device_num': self.num_devices + 1
+ }
+ try:
+ self.devices[device.id] = OpenoltDevice(**kwargs)
+ except Exception as e:
+ log.error('Failed to adopt OpenOLT device', error=e)
+ # TODO set status to ERROR so that is clear something went wrong
+ del self.devices[device.id]
+ raise
+ else:
+ self.num_devices += 1
+
+ def reconcile_device(self, device):
+ log.info('reconcile-device', device=device)
+ kwargs = {
+ 'support_classes': OpenOltDefaults['support_classes'],
+ 'adapter_agent': self.adapter_agent,
+ 'device': device,
+ 'device_num': self.num_devices + 1,
+ 'reconciliation': True
+ }
+ try:
+ reconciled_device = OpenoltDevice(**kwargs)
+ log.debug('reconciled-device-recreated',
+ device_id=reconciled_device.device_id)
+ self.devices[device.id] = reconciled_device
+ except Exception as e:
+ log.error('Failed to reconcile OpenOLT device', error=e,
+ exception_type=type(e).__name__)
+ del self.devices[device.id]
+ raise
+ else:
+ self.num_devices += 1
+ # Invoke the children reconciliation which would setup the
+ # basic children data structures
+ self.adapter_agent.reconcile_child_devices(device.id)
+ return device
+
def abandon_device(self, device):
log.info('abandon-device', device=device)
raise NotImplementedError()
+ def disable_device(self, device):
+ log.info('disable-device', device=device)
+ handler = self.devices[device.id]
+ handler.disable()
- # Configuration Methods #
- def update_flows_incrementally(self, device, flow_changes, group_changes):
- log.debug('update_flows_incrementally', device=device,
- flow_changes=flow_changes, group_changes=group_changes)
- log.info('This device does not allow this, therefore it is Not '
- 'implemented')
- raise NotImplementedError()
+ def reenable_device(self, device):
+ log.info('reenable-device', device=device)
+ handler = self.devices[device.id]
+ handler.reenable()
- def update_pm_config(self, device, pm_configs):
- log.info('update_pm_config - Not implemented yet', device=device,
- pm_configs=pm_configs)
- raise NotImplementedError()
+ def reboot_device(self, device):
+ log.info('reboot_device', device=device)
+ handler = self.devices[device.id]
+ handler.reboot()
- def receive_proxied_message(self, proxy_address, msg):
- log.debug('receive_proxied_message - Not implemented',
- proxy_address=proxy_address,
- proxied_msg=msg)
- raise NotImplementedError()
-
- def receive_inter_adapter_message(self, msg):
- log.info('rx_inter_adapter_msg - Not implemented')
- raise NotImplementedError()
-
-
- # Image Operations Methods #
def download_image(self, device, request):
log.info('image_download - Not implemented yet', device=device,
request=request)
@@ -308,8 +227,96 @@
log.info('Not implemented yet')
raise NotImplementedError()
+ def delete_device(self, device):
+ log.info('delete-device', device=device)
+ handler = self.devices[device.id]
+ handler.delete()
+ del self.devices[device.id]
+ del self.logical_device_id_to_root_device_id[device.parent_id]
+ return device
- # PON Operations Methods #
+ def get_device_details(self, device):
+ log.debug('get_device_details', device=device)
+ raise NotImplementedError()
+
+ def update_flows_bulk(self, device, flows, groups):
+ log.info('bulk-flow-update', device_id=device.id,
+ number_of_flows=len(flows.items), number_of_groups=len(
+ groups.items))
+ log.debug('flows and grousp details', flows=flows, groups=groups)
+ assert len(groups.items) == 0, "Cannot yet deal with groups"
+ handler = self.devices[device.id]
+ return handler.update_flow_table(flows.items)
+
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ log.debug('update_flows_incrementally', device=device,
+ flow_changes=flow_changes, group_changes=group_changes)
+ log.info('This device does not allow this, therefore it is Not '
+ 'implemented')
+ raise NotImplementedError()
+
+ def update_logical_flows(self, device_id, flows_to_add, flows_to_remove,
+ groups, device_rules_map):
+
+ log.info('logical-flows-update', flows_to_add=len(flows_to_add),
+ flows_to_remove=len(flows_to_remove))
+ log.debug('logical-flows-details', flows_to_add=flows_to_add,
+ flows_to_remove=flows_to_remove)
+ assert len(groups) == 0, "Cannot yet deal with groups"
+ handler = self.devices[device_id]
+ handler.update_logical_flows(flows_to_add, flows_to_remove,
+ device_rules_map)
+
+ def update_pm_config(self, device, pm_configs):
+ log.info('update_pm_config - Not implemented yet', device=device,
+ pm_configs=pm_configs)
+ raise NotImplementedError()
+
+ def send_proxied_message(self, proxy_address, msg):
+ log.debug('send-proxied-message',
+ proxy_address=proxy_address,
+ proxied_msg=msg)
+ handler = self.devices[proxy_address.device_id]
+ handler.send_proxied_message(proxy_address, msg)
+
+ def receive_proxied_message(self, proxy_address, msg):
+ log.debug('receive_proxied_message - Not implemented',
+ proxy_address=proxy_address,
+ proxied_msg=msg)
+ raise NotImplementedError()
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ log.debug('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
+
+ def ldi_to_di(ldi):
+ di = self.logical_device_id_to_root_device_id.get(ldi)
+ if di is None:
+ logical_device = self.adapter_agent.get_logical_device(ldi)
+ di = logical_device.root_device_id
+ self.logical_device_id_to_root_device_id[ldi] = di
+ return di
+
+ try:
+ device_id = ldi_to_di(logical_device_id)
+ handler = self.devices[device_id]
+ handler.packet_out(egress_port_no, msg)
+ except Exception as e:
+ log.error('packet-out:exception', e=e.message)
+
+ def receive_inter_adapter_message(self, msg):
+ log.info('rx_inter_adapter_msg - Not implemented')
+ raise NotImplementedError()
+
+ def suppress_alarm(self, filter):
+ log.info('suppress_alarm - Not implemented yet', filter=filter)
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ log.info('unsuppress_alarm - Not implemented yet', filter=filter)
+ raise NotImplementedError()
+
+ # PON Mgnt APIs #
def create_interface(self, device, data):
log.debug('create-interface - Not implemented - We do not use this',
data=data)
@@ -394,512 +401,39 @@
'not use this', data=data)
raise NotImplementedError()
-
- # Alarm Methods #
- def suppress_alarm(self, filter):
- log.info('suppress_alarm - Not implemented yet', filter=filter)
- raise NotImplementedError()
-
- def unsuppress_alarm(self, filter):
- log.info('unsuppress_alarm - Not implemented yet', filter=filter)
- raise NotImplementedError()
-
-class OpenoltHandler(object):
- def __init__(self, adapter, device_id):
- self.adapter = adapter
- self.core_proxy = adapter.core_proxy
- self.adapter_proxy = adapter.adapter_proxy
- self.device_id = device_id
- self.log = structlog.get_logger(device_id=device_id)
- self.channel = None
- self.io_port = None
- self.logical_device_id = None
- self.nni_port = None
- self.ofp_port_no = None
- self.interface = registry('main').get_args().interface
- self.pm_metrics = None
- self.alarms = None
- self.frames = None
- self.num_devices = 0
-
- @inlineCallbacks
- def get_channel(self):
- if self.channel is None:
- try:
- device = yield self.core_proxy.get_device(self.device_id)
- self.log.info('device-info', device=device,
- host_port=device.host_and_port)
- self.channel = grpc.insecure_channel(device.host_and_port)
- except Exception as e:
- log.exception("ponsim-connection-failure", e=e)
-
- # returnValue(self.channel)
-
- def close_channel(self):
- if self.channel is None:
- self.log.info('grpc-channel-already-closed')
- return
+ def delete_child_device(self, parent_device_id, child_device):
+ log.info('delete-child_device', parent_device_id=parent_device_id,
+ child_device=child_device)
+ handler = self.devices[parent_device_id]
+ if handler is not None:
+ handler.delete_child_device(child_device)
else:
- if self.frames is not None:
- self.frames.cancel()
- self.frames = None
- self.log.info('cancelled-grpc-frame-stream')
+ log.error('Could not find matching handler',
+ looking_for_device_id=parent_device_id,
+ available_handlers=self.devices.keys())
- self.channel.unsubscribe(lambda *args: None)
- self.channel = None
+ # This is currently not part of the Iadapter interface
+ def collect_stats(self, device_id):
+ log.info('collect_stats', device_id=device_id)
+ handler = self.devices[device_id]
+ if handler is not None:
+ handler.trigger_statistics_collection()
+ else:
+ log.error('Could not find matching handler',
+ looking_for_device_id=device_id,
+ available_handlers=self.devices.keys())
- self.log.info('grpc-channel-closed')
+ def simulate_alarm(self, device, request):
+ log.info('simulate_alarm', device=device, request=request)
- @inlineCallbacks
- def _get_nni_port(self):
- ports = yield self.core_proxy.get_ports(self.device_id,
- Port.ETHERNET_NNI)
- returnValue(ports)
-
- def init_device(self, kwargs):
- self.device = OpenoltDevice(**kwargs)
+ if device.id not in self.devices:
+ log.error("Device does not exist", device_id=device.id)
+ return OperationResp(code=OperationResp.OPERATION_FAILURE,
+ additional_info="Device %s does not exist"
+ % device.id)
- @inlineCallbacks
- def activate(self, device):
- try:
- self.log.info('activating')
- if not device.host_and_port:
- device.oper_status = OperStatus.FAILED
- device.reason = 'No host_and_port field provided'
- self.core_proxy.device_update(device)
- return
-
- kwargs = {
- 'support_classes': OpenOltDefaults['support_classes'],
- 'adapter_agent': self.core_proxy,
- 'device': device,
- 'device_num': self.num_devices + 1
- }
- try:
- yield self.init_device(kwargs)
- except Exception as e:
- log.error('Failed to adopt OpenOLT device', error=e)
- # TODO set status to ERROR so that is clear something went wrong
- #del self.devices[device.id]
- raise
- else:
- self.num_devices += 1
+ handler = self.devices[device.id]
- """
- yield self.get_channel()
- stub = PonSimStub(self.channel)
- info = stub.GetDeviceInfo(Empty())
- log.info('got-info', info=info, device_id=device.id)
- self.ofp_port_no = info.nni_port
+ handler.simulate_alarm(request)
- device.root = True
- device.vendor = 'ponsim'
- device.model = 'n/a'
- device.serial_number = device.host_and_port
- device.mac_address = "AA:BB:CC:DD:EE:FF"
- yield self.core_proxy.device_update(device)
-
- # Now set the initial PM configuration for this device
- self.pm_metrics = AdapterPmMetrics(device)
- pm_config = self.pm_metrics.make_proto()
- log.info("initial-pm-config", pm_config=pm_config)
- self.core_proxy.device_pm_config_update(pm_config, init=True)
-
- # Setup alarm handler
- self.alarms = AdapterAlarms(self.adapter, device)
-
- nni_port = Port(
- port_no=info.nni_port,
- label='NNI facing Ethernet port',
- type=Port.ETHERNET_NNI,
- oper_status=OperStatus.ACTIVE
- )
- self.nni_port = nni_port
- yield self.core_proxy.port_created(device.id, nni_port)
- yield self.core_proxy.port_created(device.id, Port(
- port_no=1,
- label='PON port',
- type=Port.PON_OLT,
- oper_status=OperStatus.ACTIVE
- ))
-
- yield self.core_proxy.device_state_update(device.id,
- connect_status=ConnectStatus.REACHABLE,
- oper_status=OperStatus.ACTIVE)
-
- # register ONUS
- self.log.info('onu-found', onus=info.onus, len=len(info.onus))
- for onu in info.onus:
- vlan_id = onu.uni_port
- yield self.core_proxy.child_device_detected(
- parent_device_id=device.id,
- parent_port_no=1,
- child_device_type='ponsim_onu',
- channel_id=vlan_id,
- )
-
- self.log.info('starting-frame-grpc-stream')
- reactor.callInThread(self.rcv_grpc)
- self.log.info('started-frame-grpc-stream')
-
- # Start collecting stats from the device after a brief pause
- self.start_kpi_collection(device.id)
- """
- except Exception as e:
- log.exception("Exception-activating", e=e)
-
- def get_ofp_device_info(self, device):
- return SwitchCapability(
- desc=ofp_desc(
- hw_desc='ponsim pon',
- sw_desc='ponsim pon',
- serial_num=device.serial_number,
- dp_desc='n/a'
- ),
- switch_features=ofp_switch_features(
- n_buffers=256, # TODO fake for now
- n_tables=2, # TODO ditto
- capabilities=( # TODO and ditto
- OFPC_FLOW_STATS
- | OFPC_TABLE_STATS
- | OFPC_PORT_STATS
- | OFPC_GROUP_STATS
- )
- )
- )
-
- def get_ofp_port_info(self, device, port_no):
- # Since the adapter created the device port then it has the reference of the port to
- # return the capability. TODO: Do a lookup on the NNI port number and return the
- # appropriate attributes
- self.log.info('get_ofp_port_info', port_no=port_no,
- info=self.ofp_port_no, device_id=device.id)
- cap = OFPPF_1GB_FD | OFPPF_FIBER
- return PortCapability(
- port=LogicalPort(
- ofp_port=ofp_port(
- hw_addr=mac_str_to_tuple(
- 'AA:BB:CC:DD:EE:%02x' % port_no),
- config=0,
- state=OFPPS_LIVE,
- curr=cap,
- advertised=cap,
- peer=cap,
- curr_speed=OFPPF_1GB_FD,
- max_speed=OFPPF_1GB_FD
- ),
- device_id=device.id,
- device_port_no=port_no
- )
- )
-
- # TODO - change for core 2.0
- def reconcile(self, device):
- self.log.info('reconciling-OLT-device')
-
- @inlineCallbacks
- def _rcv_frame(self, frame):
- pkt = Ether(frame)
-
- if pkt.haslayer(Dot1Q):
- outer_shim = pkt.getlayer(Dot1Q)
-
- if isinstance(outer_shim.payload, Dot1Q):
- inner_shim = outer_shim.payload
- cvid = inner_shim.vlan
- popped_frame = (
- Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
- inner_shim.payload
- )
- self.log.info('sending-packet-in',device_id=self.device_id, port=cvid)
- yield self.core_proxy.send_packet_in(device_id=self.device_id,
- port=cvid,
- packet=str(popped_frame))
- elif pkt.haslayer(Raw):
- raw_data = json.loads(pkt.getlayer(Raw).load)
- self.alarms.send_alarm(self, raw_data)
-
- @inlineCallbacks
- def rcv_grpc(self):
- """
- This call establishes a GRPC stream to receive frames.
- """
- yield self.get_channel()
- stub = PonSimStub(self.channel)
- # stub = PonSimStub(self.get_channel())
-
- # Attempt to establish a grpc stream with the remote ponsim service
- self.frames = stub.ReceiveFrames(Empty())
-
- self.log.info('start-receiving-grpc-frames')
-
- try:
- for frame in self.frames:
- self.log.info('received-grpc-frame',
- frame_len=len(frame.payload))
- yield self._rcv_frame(frame.payload)
-
- except _Rendezvous, e:
- log.warn('grpc-connection-lost', message=e.message)
-
- self.log.info('stopped-receiving-grpc-frames')
-
- @inlineCallbacks
- def update_flow_table(self, flows):
- yield self.get_channel()
- stub = PonSimStub(self.channel)
-
- self.log.info('pushing-olt-flow-table')
- stub.UpdateFlowTable(FlowTable(
- port=0,
- flows=flows
- ))
- self.log.info('success')
-
- def remove_from_flow_table(self, flows):
- self.log.debug('remove-from-flow-table', flows=flows)
- # TODO: Update PONSIM code to accept incremental flow changes
- # Once completed, the accepts_add_remove_flow_updates for this
- # device type can be set to True
-
- def add_to_flow_table(self, flows):
- self.log.debug('add-to-flow-table', flows=flows)
- # TODO: Update PONSIM code to accept incremental flow changes
- # Once completed, the accepts_add_remove_flow_updates for this
- # device type can be set to True
-
- def update_pm_config(self, device, pm_config):
- log.info("handler-update-pm-config", device=device,
- pm_config=pm_config)
- self.pm_metrics.update(pm_config)
-
- def send_proxied_message(self, proxy_address, msg):
- self.log.info('sending-proxied-message')
- if isinstance(msg, FlowTable):
- stub = PonSimStub(self.get_channel())
- self.log.info('pushing-onu-flow-table', port=msg.port)
- res = stub.UpdateFlowTable(msg)
- self.core_proxy.receive_proxied_message(proxy_address, res)
-
- @inlineCallbacks
- def process_inter_adapter_message(self, request):
- self.log.info('process-inter-adapter-message', msg=request)
- try:
- if request.header.type == InterAdapterMessageType.FLOW_REQUEST:
- f = FlowTable()
- if request.body:
- request.body.Unpack(f)
- stub = PonSimStub(self.channel)
- self.log.info('pushing-onu-flow-table')
- res = stub.UpdateFlowTable(f)
- # Send response back
- reply = InterAdapterResponseBody()
- reply.status = True
- self.log.info('sending-response-back', reply=reply)
- yield self.adapter_proxy.send_inter_adapter_message(
- msg=reply,
- type=InterAdapterMessageType.FLOW_RESPONSE,
- from_adapter=self.adapter.name,
- to_adapter=request.header.from_topic,
- to_device_id=request.header.to_device_id,
- message_id=request.header.id
- )
- elif request.header.type == InterAdapterMessageType.METRICS_REQUEST:
- m = PonSimMetricsRequest()
- if request.body:
- request.body.Unpack(m)
- stub = PonSimStub(self.channel)
- self.log.info('proxying onu stats request', port=m.port)
- res = stub.GetStats(m)
- # Send response back
- reply = InterAdapterResponseBody()
- reply.status = True
- reply.body.Pack(res)
- self.log.info('sending-response-back', reply=reply)
- yield self.adapter_proxy.send_inter_adapter_message(
- msg=reply,
- type=InterAdapterMessageType.METRICS_RESPONSE,
- from_adapter=self.adapter.name,
- to_adapter=request.header.from_topic,
- to_device_id=request.header.to_device_id,
- message_id=request.header.id
- )
- except Exception as e:
- self.log.exception("error-processing-inter-adapter-message", e=e)
-
- def packet_out(self, egress_port, msg):
- self.log.info('sending-packet-out', egress_port=egress_port,
- msg=hexify(msg))
- try:
- pkt = Ether(msg)
- out_pkt = pkt
- if egress_port != self.nni_port.port_no:
- # don't do the vlan manipulation for the NNI port, vlans are already correct
- out_pkt = (
- Ether(src=pkt.src, dst=pkt.dst) /
- Dot1Q(vlan=egress_port, type=pkt.type) /
- pkt.payload
- )
-
- # TODO need better way of mapping logical ports to PON ports
- out_port = self.nni_port.port_no if egress_port == self.nni_port.port_no else 1
-
- # send over grpc stream
- stub = PonSimStub(self.channel)
- frame = PonSimFrame(id=self.device_id, payload=str(out_pkt),
- out_port=out_port)
- stub.SendFrame(frame)
- except Exception as e:
- self.log.exception("error-processing-packet-out", e=e)
-
-
- @inlineCallbacks
- def reboot(self):
- self.log.info('rebooting', device_id=self.device_id)
-
- yield self.core_proxy.device_state_update(self.device_id,
- connect_status=ConnectStatus.UNREACHABLE)
-
- # Update the child devices connect state to UNREACHABLE
- yield self.core_proxy.children_state_update(self.device_id,
- connect_status=ConnectStatus.UNREACHABLE)
-
- # Sleep 10 secs, simulating a reboot
- # TODO: send alert and clear alert after the reboot
- yield asleep(10)
-
- # Change the connection status back to REACHABLE. With a
- # real OLT the connection state must be the actual state
- yield self.core_proxy.device_state_update(self.device_id,
- connect_status=ConnectStatus.REACHABLE)
-
- # Update the child devices connect state to REACHABLE
- yield self.core_proxy.children_state_update(self.device_id,
- connect_status=ConnectStatus.REACHABLE)
-
- self.log.info('rebooted', device_id=self.device_id)
-
- def self_test_device(self, device):
- """
- This is called to Self a device based on a NBI call.
- :param device: A Voltha.Device object.
- :return: Will return result of self test
- """
- log.info('self-test-device', device=device.id)
- raise NotImplementedError()
-
- @inlineCallbacks
- def disable(self):
- self.log.info('disabling', device_id=self.device_id)
-
- self.stop_kpi_collection()
-
- # Update the operational status to UNKNOWN and connection status to UNREACHABLE
- yield self.core_proxy.device_state_update(self.device_id,
- oper_status=OperStatus.UNKNOWN,
- connect_status=ConnectStatus.UNREACHABLE)
-
- self.close_channel()
- self.log.info('disabled-grpc-channel')
-
- self.stop_kpi_collection()
-
- # TODO:
- # 1) Remove all flows from the device
- # 2) Remove the device from ponsim
-
- self.log.info('disabled', device_id=self.device_id)
-
- @inlineCallbacks
- def reenable(self):
- self.log.info('re-enabling', device_id=self.device_id)
-
- # Set the ofp_port_no and nni_port in case we bypassed the reconcile
- # process if the device was in DISABLED state on voltha restart
- if not self.ofp_port_no and not self.nni_port:
- yield self.get_channel()
- stub = PonSimStub(self.channel)
- info = stub.GetDeviceInfo(Empty())
- log.info('got-info', info=info)
- self.ofp_port_no = info.nni_port
- ports = yield self._get_nni_port()
- # For ponsim, we are using only 1 NNI port
- if ports.items:
- self.nni_port = ports.items[0]
-
- # Update the state of the NNI port
- yield self.core_proxy.port_state_update(self.device_id,
- port_type=Port.ETHERNET_NNI,
- port_no=self.ofp_port_no,
- oper_status=OperStatus.ACTIVE)
-
- # Update the state of the PON port
- yield self.core_proxy.port_state_update(self.device_id,
- port_type=Port.PON_OLT,
- port_no=1,
- oper_status=OperStatus.ACTIVE)
-
- # Set the operational state of the device to ACTIVE and connect status to REACHABLE
- yield self.core_proxy.device_state_update(self.device_id,
- connect_status=ConnectStatus.REACHABLE,
- oper_status=OperStatus.ACTIVE)
-
- # TODO: establish frame grpc-stream
- # yield reactor.callInThread(self.rcv_grpc)
-
- self.start_kpi_collection(self.device_id)
-
- self.log.info('re-enabled', device_id=self.device_id)
-
- def delete(self):
- self.log.info('deleting', device_id=self.device_id)
-
- self.close_channel()
- self.log.info('disabled-grpc-channel')
-
- # TODO:
- # 1) Remove all flows from the device
- # 2) Remove the device from ponsim
-
- self.log.info('deleted', device_id=self.device_id)
-
- def start_kpi_collection(self, device_id):
-
- kafka_cluster_proxy = get_kafka_proxy()
-
- def _collect(device_id, prefix):
-
- try:
- # Step 1: gather metrics from device
- port_metrics = \
- self.pm_metrics.collect_port_metrics(self.channel)
-
- # Step 2: prepare the KpiEvent for submission
- # we can time-stamp them here (or could use time derived from OLT
- ts = arrow.utcnow().timestamp
- kpi_event = KpiEvent(
- type=KpiEventType.slice,
- ts=ts,
- prefixes={
- # OLT NNI port
- prefix + '.nni': MetricValuePairs(
- metrics=port_metrics['nni']),
- # OLT PON port
- prefix + '.pon': MetricValuePairs(
- metrics=port_metrics['pon'])
- }
- )
-
- # Step 3: submit directly to the kafka bus
- if kafka_cluster_proxy:
- if isinstance(kpi_event, Message):
- kpi_event = dumps(MessageToDict(kpi_event, True, True))
- kafka_cluster_proxy.send_message("voltha.kpis", kpi_event)
-
- except Exception as e:
- log.exception('failed-to-submit-kpis', e=e)
-
- self.pm_metrics.start_collector(_collect)
-
- def stop_kpi_collection(self):
- self.pm_metrics.stop_collector()
+ return OperationResp(code=OperationResp.OPERATION_SUCCESS)
diff --git a/python/adapters/openolt/openolt_device.py b/python/adapters/openolt/openolt_device.py
index f27c886..ea5e8e2 100644
--- a/python/adapters/openolt/openolt_device.py
+++ b/python/adapters/openolt/openolt_device.py
@@ -85,6 +85,7 @@
def __init__(self, **kwargs):
super(OpenoltDevice, self).__init__()
+ self.adapter_proxy = kwargs['adapter_proxy']
self.adapter_agent = kwargs['adapter_agent']
self.device_num = kwargs['device_num']
device = kwargs['device']
@@ -95,7 +96,7 @@
self.alarm_mgr_class = kwargs['support_classes']['alarm_mgr']
self.stats_mgr_class = kwargs['support_classes']['stats_mgr']
self.bw_mgr_class = kwargs['support_classes']['bw_mgr']
-
+
is_reconciliation = kwargs.get('reconciliation', False)
self.device_id = device.id
self.host_and_port = device.host_and_port
@@ -215,13 +216,11 @@
else:
uri = uri + uri[0:6 - l]
- print uri
-
return ":".join([hex(ord(x))[-2:] for x in uri])
def do_state_init(self, event):
# Initialize gRPC
- print ("Host And Port", self.host_and_port)
+ self.log.debug("grpc-host-port", self.host_and_port)
self.channel = grpc.insecure_channel(self.host_and_port)
self.channel_ready_future = grpc.channel_ready_future(self.channel)
@@ -241,7 +240,7 @@
# property instead. The Jinkins error will happon on the reason of
# Exception in thread Thread-1 (most likely raised # during
# interpreter shutdown)
- self.log.debug('starting indications thread')
+ self.log.debug('starting indications thread')
self.indications_thread_handle.setDaemon(True)
self.indications_thread_handle.start()
except Exception as e:
@@ -275,10 +274,10 @@
self.log.info('Device connected', device_info=device_info)
- #self.create_logical_device(device_info)
- self.logical_device_id = 0
+ # self.create_logical_device(device_info)
+ self.logical_device_id = 0
device.serial_number = self.serial_number
-
+
self.resource_mgr = self.resource_mgr_class(self.device_id,
self.host_and_port,
self.extra_args,
@@ -293,9 +292,9 @@
self.device_id,
self.logical_device_id,
self.platform)
- self.stats_mgr = self.stats_mgr_class(self, self.log, self.platform)
- self.bw_mgr = self.bw_mgr_class(self.log, self.adapter_agent)
-
+ self.stats_mgr = self.stats_mgr_class(self, self.log, self.platform)
+ self.bw_mgr = self.bw_mgr_class(self.log, self.proxy)
+
device.vendor = device_info.vendor
device.model = device_info.model
device.hardware_version = device_info.hardware_version
@@ -833,7 +832,8 @@
parent_device_id=self.device_id, parent_port_no=port_no,
vendor_id=serial_number.vendor_id, proxy_address=proxy_address,
root=True, serial_number=serial_number_str,
- admin_state=AdminState.ENABLED#, **{'vlan':4091} # magic still maps to brcm_openomci_onu.pon_port.BRDCM_DEFAULT_VLAN
+ admin_state=AdminState.ENABLED
+ # , **{'vlan':4091} # magic still maps to brcm_openomci_onu.pon_port.BRDCM_DEFAULT_VLAN
)
def port_name(self, port_no, port_type, intf_id=None, serial_number=None):
@@ -1070,9 +1070,9 @@
# TODO FIXME - Flows are not deleted
uni_id = 0 # FIXME
self.flow_mgr.delete_tech_profile_instance(
- child_device.proxy_address.channel_id,
- child_device.proxy_address.onu_id,
- uni_id
+ child_device.proxy_address.channel_id,
+ child_device.proxy_address.onu_id,
+ uni_id
)
pon_intf_id_onu_id = (child_device.proxy_address.channel_id,
child_device.proxy_address.onu_id,