VOL-1452: all adapter_agent calls must yield.
Given all adapter_agent (core_proxy) calls must
yield this has also revealed a timing issue when calling
the openolt grpc stub. Wait till the stub is ready before using
Change-Id: If91afcf5a0e8abb953da008e668df5defbf789d8
diff --git a/python/adapters/openolt/openolt_bw.py b/python/adapters/openolt/openolt_bw.py
index 7c70b78..ca0fdb2 100644
--- a/python/adapters/openolt/openolt_bw.py
+++ b/python/adapters/openolt/openolt_bw.py
@@ -25,17 +25,7 @@
self.proxy = proxy
def pir(self, serial_number):
- bw = 0
- try:
- bw = self.proxy.get(
- '/traffic_descriptor_profiles/{}'.format(serial_number))
- except KeyError:
- self.log.debug('bandwidth not configured',
- serial_number=serial_number)
- try:
- bw = self.proxy.get('/traffic_descriptor_profiles/{}' \
- .format(DEFAULT_ONU_BW_PROFILE))
- except KeyError:
- return DEFAULT_ONU_PIR
+ #TODO NEW CORE: the old xpon model traffic_descriptor_profiles is gone. Just return the default
+ # which was all that happened anyway
+ return DEFAULT_ONU_PIR
- return bw.maximum_bandwidth
diff --git a/python/adapters/openolt/openolt_device.py b/python/adapters/openolt/openolt_device.py
index 500be83..718416a 100644
--- a/python/adapters/openolt/openolt_device.py
+++ b/python/adapters/openolt/openolt_device.py
@@ -98,6 +98,7 @@
self.stats_mgr_class = kwargs['support_classes']['stats_mgr']
self.bw_mgr_class = kwargs['support_classes']['bw_mgr']
+ self.stub = None
is_reconciliation = kwargs.get('reconciliation', False)
self.device_id = device.id
self.host_and_port = device.host_and_port
@@ -105,9 +106,6 @@
self.log = structlog.get_logger(id=self.device_id,
ip=self.host_and_port)
- # TODO NEW CORE: this isnt implemented. Need to use kafka api calls to get device info
- # self.proxy = registry('core').get_proxy('/')
-
self.log.info('openolt-device-init')
# default device id and device serial number. If device_info provides better results, they will be updated
@@ -122,7 +120,8 @@
device.root = True
device.connect_status = ConnectStatus.UNREACHABLE
device.oper_status = OperStatus.ACTIVATING
- self.adapter_agent.device_update(device)
+ # TODO NEW CORE. need to move this, cant have a constructor be a generator (yield)
+ #self.adapter_agent.device_update(device)
# If logical device does exist use it, else create one after connecting to device
if device.parent_id:
@@ -138,6 +137,7 @@
send_event=True, initial='state_null')
self.go_state_init()
+ @inlineCallbacks
def create_logical_device(self, device_info):
dpid = device_info.device_id
serial_number = device_info.device_serial_number
@@ -186,9 +186,9 @@
self.logical_device_id = ld_init.id
- device = self.adapter_agent.get_device(self.device_id)
+ device = yield self.adapter_agent.get_device(self.device_id)
device.serial_number = serial_number
- self.adapter_agent.update_device(device)
+ yield self.adapter_agent.update_device(device)
self.dpid = dpid
self.serial_number = serial_number
@@ -235,10 +235,11 @@
except Exception as e:
self.log.exception('post_init failed', e=e)
+ @inlineCallbacks
def do_state_connected(self, event):
self.log.debug("do_state_connected")
- device = self.adapter_agent.get_device(self.device_id)
+ device = yield self.adapter_agent.get_device(self.device_id)
self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
@@ -303,7 +304,9 @@
yield self.adapter_agent.device_state_update(self.device_id,
connect_status=ConnectStatus.REACHABLE,
oper_status=OperStatus.ACTIVE)
+ self.log.debug("done_state_up")
+ @inlineCallbacks
def do_state_down(self, event):
self.log.debug("do_state_down")
oper_state = OperStatus.UNKNOWN
@@ -312,7 +315,7 @@
# Propagating to the children
# Children ports
- child_devices = self.adapter_agent.get_child_devices(self.device_id)
+ child_devices = yield self.adapter_agent.get_child_devices(self.device_id)
for onu_device in child_devices:
onu_adapter_agent = \
registry('adapter_loader').get_agent(onu_device.adapter)
@@ -321,19 +324,19 @@
self.onu_ports_down(onu_device, oper_state)
# Children devices
- self.adapter_agent.update_child_devices_state(
+ yield self.adapter_agent.update_child_devices_state(
self.device_id, oper_status=oper_state,
connect_status=connect_state)
# Device Ports
- device_ports = self.adapter_agent.get_ports(self.device_id,
+ device_ports = yield self.adapter_agent.get_ports(self.device_id,
Port.ETHERNET_NNI)
logical_ports_ids = [port.label for port in device_ports]
- device_ports += self.adapter_agent.get_ports(self.device_id,
+ device_ports += yield self.adapter_agent.get_ports(self.device_id,
Port.PON_OLT)
for port in device_ports:
port.oper_status = oper_state
- self.adapter_agent.add_port(self.device_id, port)
+ yield self.adapter_agent.add_port(self.device_id, port)
# Device logical port
for logical_port_id in logical_ports_ids:
@@ -344,7 +347,7 @@
logical_port)
# Device
- device = self.adapter_agent.get_device(self.device_id)
+ device = yield self.adapter_agent.get_device(self.device_id)
device.oper_status = oper_state
device.connect_status = connect_state
@@ -365,6 +368,10 @@
self.log.info('connected to olt', device_id=self.device_id)
self.go_state_connected()
+ # TODO: thread timing issue. stub isnt ready yet from above go_state_connected (which doesnt block)
+ while (self.stub is None):
+ time.sleep(0.5)
+
self.indications = self.stub.EnableIndication(openolt_pb2.Empty())
while True:
@@ -442,9 +449,8 @@
if intf_oper_indication.type == "nni":
# add_(logical_)port update the port if it exists
- port_no, label = self.add_port(intf_oper_indication.intf_id,
- Port.ETHERNET_NNI, oper_state)
- self.log.debug("int_oper_indication", port_no=port_no, label=label)
+ self.add_port(intf_oper_indication.intf_id,
+ Port.ETHERNET_NNI, oper_state)
# TODO NEW CORE: Is this needed anymore, or does the new core synthesize this
#self.add_logical_port(port_no, intf_oper_indication.intf_id,
@@ -497,7 +503,7 @@
else:
if onu_device.connect_status != ConnectStatus.REACHABLE:
onu_device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.device_update(onu_device)
+ yield self.adapter_agent.device_update(onu_device)
onu_id = onu_device.proxy_address.onu_id
if onu_device.oper_status == OperStatus.DISCOVERED \
@@ -517,7 +523,7 @@
onu_id=onu_id, serial_number=serial_number_str)
onu_device.oper_status = OperStatus.DISCOVERED
- self.adapter_agent.device_update(onu_device)
+ yield self.adapter_agent.device_update(onu_device)
try:
self.activate_onu(intf_id, onu_id, serial_number,
serial_number_str)
@@ -528,6 +534,7 @@
self.log.warn('unexpected state', onu_id=onu_id,
onu_device_oper_state=onu_device.oper_status)
+ @inlineCallbacks
def onu_indication(self, onu_indication):
self.log.debug("onu indication", intf_id=onu_indication.intf_id,
onu_id=onu_indication.onu_id,
@@ -541,11 +548,11 @@
serial_number_str = None
if serial_number_str is not None:
- onu_device = self.adapter_agent.get_child_device(
+ onu_device = yield self.adapter_agent.get_child_device(
self.device_id,
serial_number=serial_number_str)
else:
- onu_device = self.adapter_agent.get_child_device(
+ onu_device = yield self.adapter_agent.get_child_device(
self.device_id,
parent_port_no=self.platform.intf_id_to_port_no(
onu_indication.intf_id, Port.PON_OLT),
@@ -602,14 +609,14 @@
if onu_device.connect_status != ConnectStatus.UNREACHABLE:
onu_device.connect_status = ConnectStatus.UNREACHABLE
- self.adapter_agent.device_update(onu_device)
+ yield self.adapter_agent.device_update(onu_device)
# Move to discovered state
self.log.debug('onu-oper-state-is-down')
if onu_device.oper_status != OperStatus.DISCOVERED:
onu_device.oper_status = OperStatus.DISCOVERED
- self.adapter_agent.device_update(onu_device)
+ yield self.adapter_agent.device_update(onu_device)
# Set port oper state to Discovered
self.onu_ports_down(onu_device, OperStatus.DISCOVERED)
@@ -620,7 +627,7 @@
if onu_device.connect_status != ConnectStatus.REACHABLE:
onu_device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.device_update(onu_device)
+ yield self.adapter_agent.device_update(onu_device)
if onu_device.oper_status != OperStatus.DISCOVERED:
self.log.debug("ignore onu indication",
@@ -672,19 +679,21 @@
onu_ports=onu_ports, onu_port_id=onu_port_id,
error=e)
+ @inlineCallbacks
def omci_indication(self, omci_indication):
self.log.debug("omci indication", intf_id=omci_indication.intf_id,
onu_id=omci_indication.onu_id)
- onu_device = self.adapter_agent.get_child_device(
+ onu_device = yield self.adapter_agent.get_child_device(
self.device_id, onu_id=omci_indication.onu_id,
parent_port_no=self.platform.intf_id_to_port_no(
omci_indication.intf_id, Port.PON_OLT), )
- self.adapter_agent.receive_proxied_message(onu_device.proxy_address,
+ yield self.adapter_agent.receive_proxied_message(onu_device.proxy_address,
omci_indication.pkt)
+ @inlineCallbacks
def packet_indication(self, pkt_indication):
self.log.debug("packet indication",
@@ -727,7 +736,7 @@
logical_device_id=self.logical_device_id,
logical_port_no=logical_port_num)
- self.adapter_agent.send_packet_in(
+ yield self.adapter_agent.send_packet_in(
logical_device_id=self.logical_device_id,
logical_port_no=logical_port_num,
packet=str(pkt))
@@ -791,8 +800,9 @@
egress_port=egress_port,
port_type=egress_port_type)
+ @inlineCallbacks
def send_proxied_message(self, proxy_address, msg):
- onu_device = self.adapter_agent.get_child_device(
+ onu_device = yield self.adapter_agent.get_child_device(
self.device_id, onu_id=proxy_address.onu_id,
parent_port_no=self.platform.intf_id_to_port_no(
proxy_address.channel_id, Port.PON_OLT)
@@ -886,9 +896,6 @@
admin_state=AdminState.ENABLED, oper_status=oper_status)
yield self.adapter_agent.port_created(self.device_id, port)
- # self.adapter_agent.add_port(self.device_id, port)
-
- returnValue(port_no, label)
def delete_logical_port(self, child_device):
logical_ports = self.proxy.get('/logical_devices/{}/ports'.format(
@@ -904,6 +911,7 @@
self.logical_device_id, logical_port)
return
+ @inlineCallbacks
def delete_port(self, child_serial_number):
ports = self.proxy.get('/devices/{}/ports'.format(
self.device_id))
@@ -912,7 +920,7 @@
self.log.debug('delete-port',
onu_serial_number=child_serial_number,
port=port)
- self.adapter_agent.delete_port(self.device_id, port)
+ yield self.adapter_agent.delete_port(self.device_id, port)
return
def update_flow_table(self, flows):
@@ -1039,13 +1047,14 @@
self.stub.ActivateOnu(onu)
self.log.info('onu-activated', serial_number=serial_number_str)
+ @inlineCallbacks
def delete_child_device(self, child_device):
self.log.debug('sending-deactivate-onu',
olt_device_id=self.device_id,
onu_device=child_device,
onu_serial_number=child_device.serial_number)
try:
- self.adapter_agent.delete_child_device(self.device_id,
+ yield self.adapter_agent.delete_child_device(self.device_id,
child_device.id,
child_device)
except Exception as e: