VOL-724 VOL-2106 Return of reconciliation and MIB external storage
Requires pyvoltha updates: https://gerrit.opencord.org/#/c/17881/
- reconcile/adapter restart works. remove forced reboot!
Needed to create persistence object and recovery procedures
to fill in variables from omci db and persisted state
refactored mib_in_sync needed to work with reconcile
- Use new lazy write in-memory storage class. Drop in replacement for mib_db_dict
- New Twisted etcd storage class. defers to threads given etcd3 blocks.
- Create function for initializing metrics. needed for reconcile
- Store onu indication data in etcd so reconciliation can mimic restarting omci
- Check if reconciling to prevent recreation and duplication of ports and provisioning
Change-Id: I08fd5d570059b4ba82e220a20a731dfc3ab1efe1
diff --git a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py
index 11de8ca..fba1f68 100644
--- a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py
+++ b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_adapter.py
@@ -36,6 +36,7 @@
from pyvoltha.adapters.common.frameio.frameio import hexify
from pyvoltha.adapters.extensions.omci.openomci_agent import OpenOMCIAgent, OpenOmciAgentDefaults
from pyvoltha.adapters.extensions.omci.database.mib_db_dict import MibDbVolatileDict
+from pyvoltha.adapters.extensions.omci.database.mib_db_dict_lazy import MibDbLazyWriteDict
from brcm_openomci_onu_handler import BrcmOpenomciOnuHandler
from omci.brcm_capabilities_task import BrcmCapabilitiesTask
@@ -77,7 +78,7 @@
self.broadcom_omci = deepcopy(OpenOmciAgentDefaults)
self.broadcom_omci['mib-synchronizer']['audit-delay'] = 0 # disable audits as brcm onu wont upload once provisioned
- self.broadcom_omci['mib-synchronizer']['database'] = MibDbVolatileDict
+ self.broadcom_omci['mib-synchronizer']['database'] = MibDbLazyWriteDict
self.broadcom_omci['alarm-synchronizer']['database'] = MibDbVolatileDict
self.broadcom_omci['omci-capabilities']['tasks']['get-capabilities'] = BrcmCapabilitiesTask
@@ -129,8 +130,12 @@
def reconcile_device(self, device):
self.log.info('reconcile-device', device_id=device.id)
- self.devices_handlers[device.id] = BrcmOpenomciOnuHandler(self, device.id)
- reactor.callLater(0, self.devices_handlers[device.id].reconcile, device)
+ if not device.id in self.devices_handlers:
+ self.devices_handlers[device.id] = BrcmOpenomciOnuHandler(self, device.id)
+ reactor.callLater(0, self.devices_handlers[device.id].reconcile, device)
+ else:
+ self.log.debug('already-called-reconcile-device', device_id=device.id)
+ return device
def abandon_device(self, device):
raise NotImplementedError()
diff --git a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index 79c72fe..700a0ee 100644
--- a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -28,7 +28,7 @@
from collections import OrderedDict
from twisted.internet import reactor
-from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
from heartbeat import HeartBeat
from pyvoltha.adapters.extensions.events.device_events.onu.onu_active_event import OnuActiveEvent
@@ -41,8 +41,7 @@
from pyvoltha.common.utils.registry import registry
from pyvoltha.adapters.common.frameio.frameio import hexify
from pyvoltha.common.utils.nethelpers import mac_str_to_tuple
-from pyvoltha.common.config.config_backend import ConsulStore
-from pyvoltha.common.config.config_backend import EtcdStore
+from pyvoltha.adapters.common.kvstore.twisted_etcd_store import TwistedEtcdStore
from voltha_protos.logical_device_pb2 import LogicalPort
from voltha_protos.common_pb2 import OperStatus, ConnectStatus, AdminState
from voltha_protos.device_pb2 import Port
@@ -52,7 +51,6 @@
InterAdapterOmciMessage, PortCapability, InterAdapterTechProfileDownloadMessage, InterAdapterDeleteGemPortMessage, \
InterAdapterDeleteTcontMessage
from voltha_protos.openolt_pb2 import OnuIndication
-from pyvoltha.adapters.extensions.omci.onu_configuration import OMCCVersion
from pyvoltha.adapters.extensions.omci.onu_device_entry import OnuDeviceEvents, \
OnuDeviceEntry, IN_SYNC_KEY
from omci.brcm_mib_download_task import BrcmMibDownloadTask
@@ -68,7 +66,7 @@
from uni_port import RESERVED_TRANSPARENT_VLAN
from pyvoltha.common.tech_profile.tech_profile import TechProfile
from pyvoltha.adapters.extensions.omci.tasks.omci_test_request import OmciTestRequest
-from pyvoltha.adapters.extensions.omci.omci_entities import AniG
+from pyvoltha.adapters.extensions.omci.omci_entities import AniG, Tcont, MacBridgeServiceProfile
from pyvoltha.adapters.extensions.omci.omci_defs import EntityOperations, ReasonCodes
from voltha_protos.voltha_pb2 import TestResponse
@@ -98,11 +96,19 @@
self._pm_metrics_started = False
self._test_request = None
self._test_request_started = False
- self._omcc_version = OMCCVersion.Unknown
- self._total_tcont_count = 0 # From ANI-G ME
- self._qos_flexibility = 0 # From ONT2_G ME
self._tp = dict() # tp_id -> technology profile definition in KV Store.
- self._onu_indication = None
+ self._reconciling = False
+
+ # Persisted onu configuration needed in case of reconciliation.
+ self._onu_persisted_state = {
+ 'onu_id': None,
+ 'intf_id': None,
+ 'serial_number': None,
+ 'admin_state': None,
+ 'oper_state': None,
+ 'uni_config': list()
+ }
+
self._unis = dict() # Port # -> UniPort
self._pon = None
@@ -134,23 +140,18 @@
# Stores information related to queued vlan filter tasks
# Dictionary with key being uni_id and value being device,uni port ,uni id and vlan id
-
self._queued_vlan_filter_task = dict()
self._set_vlan = dict() # uni_id, tp_id -> set_vlan_id
+
+ # Paths from kv store
+ ONU_PATH = 'service/voltha/openonu'
+
# Initialize KV store client
self.args = registry('main').get_args()
- if self.args.backend == 'etcd':
- host, port = self.args.etcd.split(':', 1)
- self.kv_client = EtcdStore(host, port,
- TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
- elif self.args.backend == 'consul':
- host, port = self.args.consul.split(':', 1)
- self.kv_client = ConsulStore(host, port,
- TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
- else:
- self.log.error('invalid-backend')
- raise Exception("invalid-backend-for-kv-store")
+ host, port = self.args.etcd.split(':', 1)
+ self.tp_kv_client = TwistedEtcdStore(host, port, TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX)
+ self.onu_kv_client = TwistedEtcdStore(host, port, ONU_PATH)
@property
def enabled(self):
@@ -258,49 +259,25 @@
# pm_metrics requires a logical device id. For now set to just device_id
self.logical_device_id = self.device_id
+ self._onu_persisted_state['serial_number'] = device.serial_number
+ try:
+ self.log.debug('updating-onu-state', device_id=self.device_id,
+ onu_persisted_state=self._onu_persisted_state)
+ yield self.onu_kv_client.set(self.device_id, json.dumps(self._onu_persisted_state))
+ except Exception as e:
+ self.log.error('could-not-store-onu-state', device_id=self.device_id,
+ onu_persisted_state=self._onu_persisted_state, e=e)
+ # if we cannot write to storage we can proceed, for now.
+ # later onu indications from the olt will have another chance
+
yield self.core_proxy.device_update(device)
self.log.debug('device-updated', device_id=device.id, serial_number=device.serial_number)
yield self._init_pon_state()
-
self.log.debug('pon state initialized', device_id=device.id, serial_number=device.serial_number)
- ############################################################################
- # Setup Alarm handler
- self.events = AdapterEvents(self.core_proxy, device.id, self.logical_device_id,
- device.serial_number)
- ############################################################################
- # Setup PM configuration for this device
- # Pass in ONU specific options
- kwargs = {
- OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
- 'heartbeat': self.heartbeat,
- OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
- }
- self.log.debug('create-pm-metrics', device_id=device.id, serial_number=device.serial_number)
- self._pm_metrics = OnuPmMetrics(self.events, self.core_proxy, self.device_id,
- self.logical_device_id, device.serial_number,
- grouped=True, freq_override=False, **kwargs)
- pm_config = self._pm_metrics.make_proto()
- self._onu_omci_device.set_pm_config(self._pm_metrics.omci_pm.openomci_interval_pm)
- self.log.info("initial-pm-config", device_id=device.id, serial_number=device.serial_number)
- yield self.core_proxy.device_pm_config_update(pm_config, init=True)
- # Note, ONU ID and UNI intf set in add_uni_port method
- self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.events,
- ani_ports=[self._pon])
-
- # Code to Run OMCI Test Action
- kwargs_omci_test_action = {
- OmciTestRequest.DEFAULT_FREQUENCY_KEY:
- OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY
- }
- serial_number = device.serial_number
- self._test_request = OmciTestRequest(self.core_proxy,
- self.omci_agent, self.device_id,
- AniG, serial_number,
- self.logical_device_id,
- exclusive=False,
- **kwargs_omci_test_action)
+ yield self._init_metrics()
+ self.log.debug('metrics initialized', device_id=device.id, serial_number=device.serial_number)
self.enabled = True
else:
@@ -311,6 +288,10 @@
def reconcile(self, device):
self.log.debug('reconcile-device', device_id=device.id, serial_number=device.serial_number)
+ if self._reconciling:
+ self.log.debug('already-running-reconcile-device', device_id=device.id, serial_number=device.serial_number)
+ return
+
# first we verify that we got parent reference and proxy info
assert device.parent_id
assert device.proxy_address.device_id
@@ -320,18 +301,33 @@
self._pon_port_number = device.parent_port_no
if self.enabled is not True:
- self.log.info('reconciling-broadcom-onu-device')
+ self._reconciling = True
+ self.log.info('reconciling-openonu-device')
self.logical_device_id = self.device_id
+
+ try:
+ query_data = yield self.onu_kv_client.get(device.id)
+ self._onu_persisted_state = json.loads(query_data)
+ self.log.debug('restored-onu-state', device_id=self.device_id,
+ onu_persisted_state=self._onu_persisted_state)
+ except Exception as e:
+ self.log.error('no-stored-onu-state', device_id=device.id, e=e)
+ # there is nothing we can do without data. flag the device as UNKNOWN and cannot reconcile
+ # likely it will take manual steps to delete/re-add this onu
+ yield self.core_proxy.device_reason_update(self.device_id, "cannot-reconcile")
+ yield self.core_proxy.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN)
+ return
+
self._init_pon_state()
+ self.log.debug('pon state initialized', device_id=device.id, serial_number=device.serial_number)
- # need to restart state machines on vcore restart. there is no indication to do it for us.
- self._onu_omci_device.start()
- yield self.core_proxy.device_reason_update(self.device_id, "restarting-openomci")
+ self._init_metrics()
+ self.log.debug('metrics initialized', device_id=device.id, serial_number=device.serial_number)
- # TODO: this is probably a bit heavy handed
- # Force a reboot for now. We need indications to reflow to reassign tconts and gems given vcore went away
- # This may not be necessary when mib resync actually works
- reactor.callLater(1, self.reboot)
+ self._subscribe_to_events()
+ # need to restart omci start machines and reload mib database. once db is loaded we can finish reconcile
+ self._onu_omci_device.start(device)
+ self._heartbeat.enabled = True
self.enabled = True
else:
@@ -349,7 +345,8 @@
oper_status=self._pon.get_port().oper_status,
)
- yield self.core_proxy.port_created(self.device_id, self._pon.get_port())
+ if not self._reconciling:
+ yield self.core_proxy.port_created(self.device_id, self._pon.get_port())
self.log.debug('added-pon-port-to-agent',
type=self._pon.get_port().type,
@@ -367,9 +364,60 @@
if self._pon is not None:
self._pon.enabled = True
+ @inlineCallbacks
+ def _init_metrics(self):
+ self.log.debug('init-metrics', device_id=self.device_id, device_logical_id=self.logical_device_id)
+
+ serial_number = self._onu_persisted_state.get('serial_number')
+
+ ############################################################################
+ # Setup Alarm handler
+ self.events = AdapterEvents(self.core_proxy, self.device_id, self.logical_device_id,
+ serial_number)
+ ############################################################################
+ # Setup PM configuration for this device
+ # Pass in ONU specific options
+ kwargs = {
+ OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
+ 'heartbeat': self.heartbeat,
+ OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
+ }
+ self.log.debug('create-pm-metrics', device_id=self.device_id, serial_number=serial_number)
+ self._pm_metrics = OnuPmMetrics(self.events, self.core_proxy, self.device_id,
+ self.logical_device_id, serial_number,
+ grouped=True, freq_override=False, **kwargs)
+ pm_config = self._pm_metrics.make_proto()
+ self._onu_omci_device.set_pm_config(self._pm_metrics.omci_pm.openomci_interval_pm)
+ self.log.debug("initial-pm-config", device_id=self.device_id, serial_number=serial_number)
+
+ if not self._reconciling:
+ yield self.core_proxy.device_pm_config_update(pm_config, init=True)
+
+ # Note, ONU ID and UNI intf set in add_uni_port method
+ self._onu_omci_device.alarm_synchronizer.set_alarm_params(mgr=self.events,
+ ani_ports=[self._pon])
+
+ # Code to Run OMCI Test Action
+ kwargs_omci_test_action = {
+ OmciTestRequest.DEFAULT_FREQUENCY_KEY:
+ OmciTestRequest.DEFAULT_COLLECTION_FREQUENCY
+ }
+ self._test_request = OmciTestRequest(self.core_proxy,
+ self.omci_agent, self.device_id,
+ AniG, serial_number,
+ self.logical_device_id,
+ exclusive=False,
+ **kwargs_omci_test_action)
+
+ @inlineCallbacks
def delete(self, device):
self.log.info('delete-onu', device_id=device.id, serial_number=device.serial_number)
+ try:
+ yield self.onu_kv_client.delete(device.id)
+ except Exception as e:
+ self.log.error('could-not-delete-onu-state', device_id=device.id, e=e)
+
self._deferred.cancel()
self._test_request.stop_collector()
self._pm_metrics.stop_collector()
@@ -477,6 +525,7 @@
return new_tconts, new_gems
+ @inlineCallbacks
def load_and_configure_tech_profile(self, uni_id, tp_path):
self.log.debug("loading-tech-profile-configuration", uni_id=uni_id, tp_path=tp_path)
tp_id = self.extract_tp_id_from_path(tp_path)
@@ -494,9 +543,9 @@
if tp_path in self._tp_service_specific_task[uni_id]:
self.log.info("tech-profile-config-already-in-progress",
tp_path=tp_path)
- return
+ returnValue(None)
- tpstored = self.kv_client[tp_path]
+ tpstored = yield self.tp_kv_client.get(tp_path)
tpstring = tpstored.decode('ascii')
tp = json.loads(tpstring)
self._tp[tp_id] = tp
@@ -561,7 +610,7 @@
self.log.info("tech-profile-config-already-done")
# Could be a case where TP exists but new gem-ports are getting added dynamically
- tpstored = self.kv_client[tp_path]
+ tpstored = yield self.tp_kv_client.get(tp_path)
tpstring = tpstored.decode('ascii')
tp = json.loads(tpstring)
upstream_gems = []
@@ -609,6 +658,7 @@
self._onu_omci_device.task_runner.queue_task(self._tp_service_specific_task[uni_id][tp_path])
self._deferred.addCallbacks(success, failure)
+ @inlineCallbacks
def start_multicast_service(self, uni_id, tp_path, retry_count=0):
self.log.debug("starting-multicast-service", uni_id=uni_id, tp_path=tp_path)
tp_id = self.extract_tp_id_from_path(tp_path)
@@ -616,7 +666,7 @@
try:
tp = self._tp[tp_id]
if tp is None:
- tpstored = self.kv_client[tp_path]
+ tpstored = yield self.tp_kv_client.get(tp_path)
tpstring = tpstored.decode('ascii')
tp = json.loads(tpstring)
if tp is None:
@@ -1303,6 +1353,7 @@
self._deferred = self._onu_omci_device.task_runner.queue_task(vlan_remove_task)
self._deferred.addCallbacks(success, failure)
+ @inlineCallbacks
def process_inter_adapter_message(self, request):
self.log.debug('process-inter-adapter-message', type=request.header.type, from_topic=request.header.from_topic,
to_topic=request.header.to_topic, to_device_id=request.header.to_device_id)
@@ -1313,6 +1364,9 @@
return
try:
+
+ update_onu_state = False
+
if request.header.type == InterAdapterMessageType.OMCI_REQUEST:
omci_msg = InterAdapterOmciMessage()
request.body.Unpack(omci_msg)
@@ -1327,10 +1381,16 @@
oper_state=onu_indication.oper_state, admin_state=onu_indication.admin_state,
serial_number=onu_indication.serial_number)
+ update_onu_state = True
+ self._onu_persisted_state['onu_id'] = onu_indication.onu_id
+ self._onu_persisted_state['intf_id'] = onu_indication.intf_id
+ self._onu_persisted_state['admin_state'] = onu_indication.admin_state
+ self._onu_persisted_state['oper_state'] = onu_indication.oper_state
+
if onu_indication.oper_state == "up":
- self.create_interface(onu_indication)
+ yield self.create_interface(onu_indication)
elif onu_indication.oper_state == "down" or onu_indication.oper_state == "unreachable":
- self.update_interface(onu_indication)
+ yield self.update_interface(onu_indication)
else:
self.log.error("unknown-onu-indication", onu_id=onu_indication.onu_id,
serial_number=onu_indication.serial_number)
@@ -1340,7 +1400,8 @@
request.body.Unpack(tech_msg)
self.log.debug('inter-adapter-recv-tech-profile', tech_msg=tech_msg)
- self.load_and_configure_tech_profile(tech_msg.uni_id, tech_msg.path)
+ update_onu_state = self._update_onu_persisted_state(tech_msg.uni_id, tp_path=tech_msg.path)
+ yield self.load_and_configure_tech_profile(tech_msg.uni_id, tech_msg.path)
elif request.header.type == InterAdapterMessageType.DELETE_GEM_PORT_REQUEST:
del_gem_msg = InterAdapterDeleteGemPortMessage()
@@ -1356,15 +1417,49 @@
request.body.Unpack(del_tcont_msg)
self.log.debug('inter-adapter-recv-del-tcont', del_tcont_msg=del_tcont_msg)
+ # Removal of the tcont/alloc id mapping represents the removal of the tech profile
+ update_onu_state = self._update_onu_persisted_state(del_tcont_msg.uni_id, tp_path=None)
self.delete_tech_profile(uni_id=del_tcont_msg.uni_id,
alloc_id=del_tcont_msg.alloc_id,
tp_path=del_tcont_msg.tp_path)
else:
self.log.error("inter-adapter-unhandled-type", request=request)
+ if update_onu_state:
+ try:
+ self.log.debug('updating-onu-state', device_id=self.device_id,
+ onu_persisted_state=self._onu_persisted_state)
+ yield self.onu_kv_client.set(self.device_id, json.dumps(self._onu_persisted_state))
+ except Exception as e:
+ self.log.error('could-not-store-onu-state', device_id=self.device_id,
+ onu_persisted_state=self._onu_persisted_state, e=e)
+ # at this point omci is started and/or indications being processed
+ # later indications may have a chance to write this state out again
+
except Exception as e:
self.log.exception("error-processing-inter-adapter-message", e=e)
+ def _update_onu_persisted_state(self, uni_id, tp_path):
+ # persist the uni and tech profile path for later reconciliation. update only if changed
+ update_onu_state = False
+ found = False
+ for entry in self._onu_persisted_state.get('uni_config', list()):
+ if entry.get('uni_id') == uni_id:
+ found = True
+ if entry.get('tp_path') != tp_path:
+ update_onu_state = True
+ entry['tp_path'] = tp_path
+
+ if not found:
+ update_onu_state = True
+ uni_tp = {
+ 'uni_id': uni_id,
+ 'tp_path': tp_path
+ }
+ self._onu_persisted_state['uni_config'].append(uni_tp)
+
+ return update_onu_state
+
# Called each time there is an onu "up" indication from the olt handler
@inlineCallbacks
def create_interface(self, onu_indication):
@@ -1376,7 +1471,6 @@
self.log.warn('received-onu-indication-for-active-onu', onu_indication=onu_indication)
return
- self._onu_indication = onu_indication
yield self.core_proxy.device_state_update(self.device_id, oper_status=OperStatus.ACTIVATING,
connect_status=ConnectStatus.REACHABLE)
@@ -1572,141 +1666,164 @@
self.log.debug('capabilities-handler-done')
# Mib is in sync, we can now query what we learned and actually start pushing ME (download) to the ONU.
- # Currently uses a basic mib download task that create a bridge with a single gem port and uni, only allowing EAP
- # Implement your own MibDownloadTask if you wish to setup something different by default
@inlineCallbacks
def _mib_in_sync(self):
self.log.debug('mib-in-sync')
-
- omci = self._onu_omci_device
- in_sync = omci.mib_db_in_sync
-
device = yield self.core_proxy.get_device(self.device_id)
- yield self.core_proxy.device_reason_update(self.device_id, 'discovery-mibsync-complete')
- if not self._dev_info_loaded:
- self.log.info('loading-device-data-from-mib', in_sync=in_sync, already_loaded=self._dev_info_loaded)
-
- omci_dev = self._onu_omci_device
- config = omci_dev.configuration
-
- try:
-
- # sort the lists so we get consistent port ordering.
- ani_list = sorted(config.ani_g_entities) if config.ani_g_entities else []
- uni_list = sorted(config.uni_g_entities) if config.uni_g_entities else []
- pptp_list = sorted(config.pptp_entities) if config.pptp_entities else []
- veip_list = sorted(config.veip_entities) if config.veip_entities else []
-
- if ani_list is None or (pptp_list is None and veip_list is None):
- self.log.warn("no-ani-or-unis")
- yield self.core_proxy.device_reason_update(self.device_id, 'onu-missing-required-elements')
- raise Exception("onu-missing-required-elements")
-
- # Currently logging the ani, pptp, veip, and uni for information purposes.
- # Actually act on the veip/pptp as its ME is the most correct one to use in later tasks.
- # And in some ONU the UNI-G list is incomplete or incorrect...
- for entity_id in ani_list:
- ani_value = config.ani_g_entities[entity_id]
- self.log.debug("discovered-ani", entity_id=entity_id, value=ani_value)
- # TODO: currently only one OLT PON port/ANI, so this works out. With NGPON there will be 2..?
- self._total_tcont_count = ani_value.get('total-tcont-count')
- self.log.debug("set-total-tcont-count", tcont_count=self._total_tcont_count)
-
- for entity_id in uni_list:
- uni_value = config.uni_g_entities[entity_id]
- self.log.debug("discovered-uni", entity_id=entity_id, value=uni_value)
-
- uni_entities = OrderedDict()
- for entity_id in pptp_list:
- pptp_value = config.pptp_entities[entity_id]
- self.log.debug("discovered-pptp", entity_id=entity_id, value=pptp_value)
- uni_entities[entity_id] = UniType.PPTP
-
- for entity_id in veip_list:
- veip_value = config.veip_entities[entity_id]
- self.log.debug("discovered-veip", entity_id=entity_id, value=veip_value)
- uni_entities[entity_id] = UniType.VEIP
-
- uni_id = 0
- for entity_id, uni_type in uni_entities.items():
- try:
- yield self._add_uni_port(device, entity_id, uni_id, uni_type)
- uni_id += 1
- except AssertionError as e:
- self.log.warn("could not add UNI", entity_id=entity_id, uni_type=uni_type, e=e)
-
- self._qos_flexibility = config.qos_configuration_flexibility or 0
- self._omcc_version = config.omcc_version or OMCCVersion.Unknown
-
- if self._unis:
- self._dev_info_loaded = True
- else:
- yield self.core_proxy.device_reason_update(self.device_id, 'no-usable-unis')
- self.log.warn("no-usable-unis")
- raise Exception("no-usable-unis")
-
- except Exception as e:
- self.log.exception('device-info-load', e=e)
- self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
-
- else:
- self.log.info('device-info-already-loaded', in_sync=in_sync, already_loaded=self._dev_info_loaded)
+ # only notify core if this is a new device. otherwise do not have reconcile generating
+ # a lot of needless message churn
+ if not self._reconciling:
+ yield self.core_proxy.device_reason_update(self.device_id, 'discovery-mibsync-complete')
if self._dev_info_loaded:
- if device.admin_state == AdminState.PREPROVISIONED or device.admin_state == AdminState.ENABLED:
-
- @inlineCallbacks
- def success(_results):
- self.log.info('mib-download-success', _results=_results)
- yield self.core_proxy.device_state_update(device.id,
- oper_status=OperStatus.ACTIVE,
- connect_status=ConnectStatus.REACHABLE)
- yield self.core_proxy.device_reason_update(self.device_id, 'initial-mib-downloaded')
- yield self.enable_ports()
- self._mib_download_task = None
- yield self.onu_active_event()
-
- # Start collecting stats from the device after a brief pause
- if not self._pm_metrics_started:
- self._pm_metrics_started = True
- pmstart = _STARTUP_RETRY_WAIT * (random.randint(1, 5))
- reactor.callLater(pmstart, self._pm_metrics.start_collector)
-
- # Start test requests after a brief pause
- if not self._test_request_started:
- self._test_request_started = True
- tststart = _STARTUP_RETRY_WAIT * (random.randint(1, 5))
- reactor.callLater(tststart, self._test_request.start_collector)
-
- @inlineCallbacks
- def failure(_reason):
- self.log.warn('mib-download-failure-retrying', _reason=_reason)
- retry = _STARTUP_RETRY_WAIT * (random.randint(1, 5))
- reactor.callLater(retry, self._mib_in_sync)
- yield self.core_proxy.device_reason_update(self.device_id, 'initial-mib-download-failure-retrying')
-
- # start by locking all the unis till mib sync and initial mib is downloaded
- # this way we can capture the port down/up events when we are ready
- self.lock_ports(lock=True)
-
- # Download an initial mib that creates simple bridge that can pass EAP. On success (above) finally set
- # the device to active/reachable. This then opens up the handler to openflow pushes from outside
- self.log.info('downloading-initial-mib-configuration')
- self._mib_download_task = BrcmMibDownloadTask(self.omci_agent, self)
- self._deferred = self._onu_omci_device.task_runner.queue_task(self._mib_download_task)
- self._deferred.addCallbacks(success, failure)
- else:
- self.log.info('admin-down-disabling')
- self.disable(device)
+ self.log.debug('device-info-already-loaded')
else:
- self.log.info('device-info-not-loaded-skipping-mib-download')
+ # new onu or adapter was restarted. fill up our local data
+ yield self._load_device_data(device)
+
+ if self._check_mib_downloaded():
+ self.log.debug('mib-already-downloaded')
+ if not self._reconciling:
+ yield self.core_proxy.device_state_update(device.id,
+ oper_status=OperStatus.ACTIVE,
+ connect_status=ConnectStatus.REACHABLE)
+ yield self.enable_ports()
+ else:
+ self._download_mib(device)
+
+ if self._reconciling:
+ yield self._restore_tech_profile()
+ self._start_monitoring()
+ self._reconciling = False
+ self.log.debug('reconcile-finished')
+
+ def _download_mib(self, device):
+ self.log.debug('downloading-initial-mib-configuration')
+
+ @inlineCallbacks
+ def success(_results):
+ self.log.debug('mib-download-success', _results=_results)
+ yield self.core_proxy.device_state_update(device.id,
+ oper_status=OperStatus.ACTIVE,
+ connect_status=ConnectStatus.REACHABLE)
+ yield self.core_proxy.device_reason_update(self.device_id, 'initial-mib-downloaded')
+ self._mib_download_task = None
+ yield self.enable_ports()
+ yield self.onu_active_event()
+ self._start_monitoring()
+
+ @inlineCallbacks
+ def failure(_reason):
+ self.log.warn('mib-download-failure-retrying', _reason=_reason)
+ retry = _STARTUP_RETRY_WAIT * (random.randint(1, 5))
+ reactor.callLater(retry, self._mib_in_sync)
+ yield self.core_proxy.device_reason_update(self.device_id, 'initial-mib-download-failure-retrying')
+
+ # start by locking all the unis till mib sync and initial mib is downloaded
+ # this way we can capture the port down/up events when we are ready
+ self.lock_ports(lock=True)
+
+ # Download an initial mib that creates simple bridge that can pass EAP. On success (above) finally set
+ # the device to active/reachable. This then opens up the handler to openflow pushes from outside
+ self._mib_download_task = BrcmMibDownloadTask(self.omci_agent, self)
+ self._deferred = self._onu_omci_device.task_runner.queue_task(self._mib_download_task)
+ self._deferred.addCallbacks(success, failure)
+
+ def _start_monitoring(self):
+ self.log.debug('starting-monitoring')
+
+ # Start collecting stats from the device after a brief pause
+ if not self._pm_metrics_started:
+ self._pm_metrics_started = True
+ pmstart = _STARTUP_RETRY_WAIT * (random.randint(1, 5))
+ reactor.callLater(pmstart, self._pm_metrics.start_collector)
+
+ # Start test requests after a brief pause
+ if not self._test_request_started:
+ self._test_request_started = True
+ tststart = _STARTUP_RETRY_WAIT * (random.randint(1, 5))
+ reactor.callLater(tststart, self._test_request.start_collector)
+
+ def _check_mib_downloaded(self):
+ self.log.debug('checking-mib-downloaded')
+ results = False
+
+ mac_bridges = self.onu_omci_device.query_mib(MacBridgeServiceProfile.class_id)
+ self.log.debug('mac-bridges', mac_bridges=mac_bridges)
+
+ for k, v in mac_bridges.items():
+ if not isinstance(v, dict):
+ continue
+ # found at least one mac bridge, good enough to say its done, break out
+ self.log.debug('found-mac-bridge-mib-download-has-been-done', omci_key=k, omci_value=v)
+ results = True
+ break
+
+ return results
+
+ @inlineCallbacks
+ def _load_device_data(self, device):
+ self.log.debug('loading-device-data-from-mib', device_id=device.id)
+
+ omci_dev = self._onu_omci_device
+ config = omci_dev.configuration
+
+ try:
+ # sort the lists so we get consistent port ordering.
+ ani_list = sorted(config.ani_g_entities) if config.ani_g_entities else []
+ uni_list = sorted(config.uni_g_entities) if config.uni_g_entities else []
+ pptp_list = sorted(config.pptp_entities) if config.pptp_entities else []
+ veip_list = sorted(config.veip_entities) if config.veip_entities else []
+
+ if ani_list is None or (pptp_list is None and veip_list is None):
+ yield self.core_proxy.device_reason_update(self.device_id, 'onu-missing-required-elements')
+ raise Exception("onu-missing-required-elements")
+
+ # Currently logging the ani, pptp, veip, and uni for information purposes.
+ # Actually act on the veip/pptp as its ME is the most correct one to use in later tasks.
+ # And in some ONU the UNI-G list is incomplete or incorrect...
+ for entity_id in ani_list:
+ ani_value = config.ani_g_entities[entity_id]
+ self.log.debug("discovered-ani", entity_id=entity_id, value=ani_value)
+
+ for entity_id in uni_list:
+ uni_value = config.uni_g_entities[entity_id]
+ self.log.debug("discovered-uni", entity_id=entity_id, value=uni_value)
+
+ uni_entities = OrderedDict()
+ for entity_id in pptp_list:
+ pptp_value = config.pptp_entities[entity_id]
+ self.log.debug("discovered-pptp", entity_id=entity_id, value=pptp_value)
+ uni_entities[entity_id] = UniType.PPTP
+
+ for entity_id in veip_list:
+ veip_value = config.veip_entities[entity_id]
+ self.log.debug("discovered-veip", entity_id=entity_id, value=veip_value)
+ uni_entities[entity_id] = UniType.VEIP
+
+ uni_id = 0
+ for entity_id, uni_type in uni_entities.items():
+ yield self._add_uni_port(device, entity_id, uni_id, uni_type)
+ uni_id += 1
+
+ if self._unis:
+ self._dev_info_loaded = True
+ else:
+ yield self.core_proxy.device_reason_update(self.device_id, 'no-usable-unis')
+ raise Exception("no-usable-unis")
+
+ except Exception as e:
+ self.log.exception('device-info-load', e=e)
+ self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT, self._mib_in_sync)
@inlineCallbacks
def _add_uni_port(self, device, entity_id, uni_id, uni_type=UniType.PPTP):
- self.log.debug('add-uni-port')
+ self.log.debug('add-uni-port', entity_id=entity_id, uni_id=uni_id)
- uni_no = self.mk_uni_port_num(self._onu_indication.intf_id, self._onu_indication.onu_id, uni_id)
+ intf_id = self._onu_persisted_state.get('intf_id')
+ onu_id = self._onu_persisted_state.get('onu_id')
+ uni_no = self.mk_uni_port_num(intf_id, onu_id, uni_id)
# TODO: Some or parts of this likely need to move to UniPort. especially the format stuff
uni_name = "uni-{}".format(uni_no)
@@ -1723,14 +1840,63 @@
self.log.debug("created-uni-port", uni=uni_port)
- yield self.core_proxy.port_created(device.id, uni_port.get_port())
+ if not self._reconciling:
+ yield self.core_proxy.port_created(device.id, uni_port.get_port())
self._unis[uni_port.port_number] = uni_port
- self._onu_omci_device.alarm_synchronizer.set_alarm_params(onu_id=self._onu_indication.onu_id,
+ self._onu_omci_device.alarm_synchronizer.set_alarm_params(onu_id=onu_id,
uni_ports=self.uni_ports,
serial_number=device.serial_number)
+ @inlineCallbacks
+ def _restore_tech_profile(self):
+ self.log.debug("reconcile-restoring-tech-profile-tcont-gem-config")
+
+ # for every uni that has tech profile config reload all its tcont/alloc_id and gem from the tp path
+ for entry in self._onu_persisted_state.get('uni_config', list()):
+ uni_id = entry.get('uni_id')
+ tp_path = entry.get('tp_path')
+ if tp_path:
+ tpstored = yield self.tp_kv_client.get(tp_path)
+ tpstring = tpstored.decode('ascii')
+ tp = json.loads(tpstring)
+
+ self.log.debug("restoring-tp-instance", tp=tp)
+
+ # re-run tech profile config that stores gem and tconts in the self._pon object
+ # this does not actually re-run the omci, just rebuilds our local data store
+ self._do_tech_profile_configuration(uni_id, tp)
+
+ tp_id = self.extract_tp_id_from_path(tp_path)
+
+ # rebuild cache dicts so tp updates and deletes dont get KeyErrors
+ if uni_id not in self._tp_service_specific_task:
+ self._tp_service_specific_task[uni_id] = dict()
+
+ if uni_id not in self._tech_profile_download_done:
+ self._tech_profile_download_done[uni_id] = dict()
+
+ if tp_id not in self._tech_profile_download_done[uni_id]:
+ self._tech_profile_download_done[uni_id][tp_id] = True
+ else:
+ self.log.debug("no-assigned-tp-instance", uni_id=uni_id)
+
+ # for every loaded tcont from tp check the mib database for its entity_id
+ # needed for later tp deletes/adds
+ tcont_idents = self.onu_omci_device.query_mib(Tcont.class_id)
+ self.log.debug('tcont-idents', tcont_idents=tcont_idents)
+
+ for k, v in tcont_idents.items():
+ if not isinstance(v, dict):
+ continue
+ alloc_check = v.get('attributes', {}).get('alloc_id', 0)
+ tcont = self._pon.tconts.get(alloc_check)
+ if tcont:
+ tcont.entity_id = k
+ self.log.debug('reassigning-tcont-entity-id', entity_id=tcont.entity_id,
+ alloc_id=tcont.alloc_id)
+
# TODO NEW CORE: Figure out how to gain this knowledge from the olt. for now cheat terribly.
def mk_uni_port_num(self, intf_id, onu_id, uni_id):
MAX_PONS_PER_OLT = 256
@@ -1746,27 +1912,31 @@
def onu_active_event(self):
self.log.debug('onu-active-event')
try:
- device = yield self.core_proxy.get_device(self.device_id)
+ # TODO: this is expensive for just getting the olt serial number. replace with direct api call
parent_device = yield self.core_proxy.get_device(self.parent_id)
olt_serial_number = parent_device.serial_number
raised_ts = arrow.utcnow().timestamp
+ intf_id = self._onu_persisted_state.get('intf_id')
+ onu_id = self._onu_persisted_state.get('onu_id')
+ onu_serial = self._onu_persisted_state.get('serial_number')
+
self.log.debug("onu-indication-context-data",
- pon_id=self._onu_indication.intf_id,
- onu_id=self._onu_indication.onu_id,
+ pon_id=intf_id,
+ onu_id=onu_id,
registration_id=self.device_id,
device_id=self.device_id,
- onu_serial_number=device.serial_number,
+ onu_serial_number=onu_serial,
olt_serial_number=olt_serial_number,
raised_ts=raised_ts)
self.log.debug("Trying-to-raise-onu-active-event")
OnuActiveEvent(self.events, self.device_id,
- self._onu_indication.intf_id,
- device.serial_number,
+ intf_id,
+ onu_serial,
str(self.device_id),
olt_serial_number, raised_ts,
- onu_id=self._onu_indication.onu_id).send(True)
+ onu_id=onu_id).send(True)
except Exception as active_event_error:
self.log.exception('onu-activated-event-error',
errmsg=active_event_error.message)
diff --git a/python/adapters/brcm_openomci_onu/onu_tcont.py b/python/adapters/brcm_openomci_onu/onu_tcont.py
index 7472728..6482aed 100644
--- a/python/adapters/brcm_openomci_onu/onu_tcont.py
+++ b/python/adapters/brcm_openomci_onu/onu_tcont.py
@@ -60,6 +60,10 @@
def entity_id(self):
return self._entity_id
+ @entity_id.setter
+ def entity_id(self, value):
+ self._entity_id = value
+
@property
def q_sched_policy(self):
return self._q_sched_policy