VOL-2686 Configure multicast MEs when flows are pushed and tp is configured
Change-Id: I283d6e689c010ec7e9c45c5921a20e08f3d5a93c
diff --git a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index dc84d6a..b5789aa 100644
--- a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -62,6 +62,7 @@
from onu_gem_port import OnuGemPort
from onu_tcont import OnuTCont
from pon_port import PonPort
+from omci.brcm_mcast_task import BrcmMcastTask
from uni_port import UniPort, UniType
from uni_port import RESERVED_TRANSPARENT_VLAN
from pyvoltha.common.tech_profile.tech_profile import TechProfile
@@ -72,6 +73,8 @@
OP = EntityOperations
RC = ReasonCodes
+IS_MULTICAST='is_multicast'
+GEM_PORT_ID = 'gemport_id'
_STARTUP_RETRY_WAIT = 10
_PATH_SEPERATOR = "/"
@@ -96,7 +99,7 @@
self._omcc_version = OMCCVersion.Unknown
self._total_tcont_count = 0 # From ANI-G ME
self._qos_flexibility = 0 # From ONT2_G ME
-
+ self._tp = dict() #tp_id -> technology profile definition in KV Store.
self._onu_indication = None
self._unis = dict() # Port # -> UniPort
@@ -126,6 +129,7 @@
self._queued_vlan_filter_task = dict()
+ self._set_vlan = dict() #uni_id, tp_id -> set_vlan_id
# Initialize KV store client
self.args = registry('main').get_args()
if self.args.backend == 'etcd':
@@ -389,7 +393,13 @@
new_gem_ports = []
for gem_port in gem_ports:
gemdict = dict()
- gemdict['gemport_id'] = gem_port['gemport_id']
+ if gem_port[IS_MULTICAST] == 'True':
+ gemdict[GEM_PORT_ID] = gem_port['multicast_gem_id']
+ gemdict[IS_MULTICAST] = True
+ else:
+ gemdict[GEM_PORT_ID] = gem_port[GEM_PORT_ID]
+ gemdict[IS_MULTICAST] = False
+
gemdict['direction'] = direction
gemdict['alloc_id_ref'] = alloc_id_ref
gemdict['encryption'] = gem_port['aes_encryption']
@@ -479,7 +489,7 @@
tpstored = self.kv_client[tp_path]
tpstring = tpstored.decode('ascii')
tp = json.loads(tpstring)
-
+ self._tp[tp_id] = tp
self.log.debug("tp-instance", tp=tp)
tconts, gem_ports = self._do_tech_profile_configuration(uni_id, tp)
@@ -493,6 +503,15 @@
reactor.callInThread(self._execute_queued_vlan_filter_tasks, uni_id, tp_id)
yield self.core_proxy.device_reason_update(self.device_id, 'tech-profile-config-download-success')
+ # Execute mcast task
+ for gem in gem_ports:
+ self.log.debug("checking-multicast-service-for-gem ", gem=gem)
+ if gem.mcast is True:
+ self.log.info("found-multicast-service-for-gem ", gem=gem, uni_id=uni_id, tp_id=tp_id)
+ reactor.callInThread(self.start_multicast_service, uni_id, tp_path)
+ self.log.debug("started_multicast_service-successfully", tconts=tconts, gems=gem_ports)
+ break
+
@inlineCallbacks
def failure(_reason):
self.log.warn('tech-profile-config-failure-retrying', uni_id=uni_id, tp_id=tp_id,
@@ -566,6 +585,56 @@
self._deferred = \
self._onu_omci_device.task_runner.queue_task(self._tp_service_specific_task[uni_id][tp_path])
self._deferred.addCallbacks(success, failure)
+ def start_multicast_service(self, uni_id, tp_path,retry_count=0):
+ self.log.debug("starting-multicast-service", uni_id=uni_id, tp_path=tp_path)
+ tp_id = self.extract_tp_id_from_path(tp_path)
+ if uni_id in self._set_vlan and tp_id in self._set_vlan[uni_id]:
+ try:
+ tp = self._tp[tp_id]
+ if tp is None:
+ tpstored = self.kv_client[tp_path]
+ tpstring = tpstored.decode('ascii')
+ tp = json.loads(tpstring)
+ if tp is None:
+ self.log.error("cannot-find-tp-to-start-multicast-service", uni_id=uni_id, tp_path=tp_path)
+ return
+ else:
+ self._tp[tp_id] = tp
+
+ self.log.debug("mcast-vlan-learned-before", self._set_vlan[uni_id][tp_id], uni_id=uni_id, tp_id=tp_id)
+ def success(_results):
+ self.log.debug('multicast-success', uni_id=uni_id)
+ self._multicast_task = None
+
+ def failure(_reason):
+ self.log.warn('multicast-failure', _reason=_reason)
+ retry = _STARTUP_RETRY_WAIT * (random.randint(1,5))
+ reactor.callLater(retry, self.start_multicast_service,
+ uni_id, tp_path)
+
+ self.log.debug('starting-multicast-task', mcast_vlan_id=self._set_vlan[uni_id][tp_id])
+ downstream_gem_port_attribute_list = tp['downstream_gem_port_attribute_list']
+ for i in range(len(downstream_gem_port_attribute_list)):
+ if IS_MULTICAST in downstream_gem_port_attribute_list[i] and \
+ downstream_gem_port_attribute_list[i][IS_MULTICAST] == 'True':
+ dynamic_access_control_list_table = downstream_gem_port_attribute_list[i]['dynamic_access_control_list'].split("-")
+ static_access_control_list_table = downstream_gem_port_attribute_list[i]['static_access_control_list'].split("-")
+ multicast_gem_id = downstream_gem_port_attribute_list[i]['multicast_gem_id']
+ break
+
+ self._multicast_task = BrcmMcastTask(self.omci_agent, self, self.device_id, uni_id, tp_id,
+ self._set_vlan[uni_id][tp_id],dynamic_access_control_list_table, static_access_control_list_table, multicast_gem_id)
+ self._deferred = self._onu_omci_device.task_runner.queue_task(self._multicast_task)
+ self._deferred.addCallbacks(success, failure)
+ except Exception as e:
+ self.log.exception("error-loading-multicast", e=e)
+ else:
+ if retry_count<30:
+ retry_count = +1
+ self.log.debug("going-to-wait-for-flow-to-learn-mcast-vlan", uni_id=uni_id, tp_id=tp_id, retry=retry_count)
+ reactor.callLater(0.5, self.start_multicast_service, uni_id, tp_path, retry_count)
+ else:
+ self.log.error("mcast-vlan-not-configured-yet-failing-mcast-service-conf", uni_id=uni_id, tp_id=tp_id, retry=retry_count)
def delete_tech_profile(self, uni_id, tp_path, alloc_id=None, gem_port_id=None):
try:
@@ -883,6 +952,12 @@
self.log.error('unsupported-action-type',
action_type=action.type, in_port=_in_port)
+ if self._set_vlan is not None:
+ if uni_id not in self._set_vlan:
+ self._set_vlan[uni_id] = dict()
+ self._set_vlan[uni_id][tp_id] = _set_vlan_vid
+ self.log.debug("set_vlan_id-for-tp", _set_vlan_vid=_set_vlan_vid, tp_id=tp_id)
+
# OMCI set vlan task can only filter and set on vlan header attributes. Any other openflow
# supported match and action criteria cannot be handled by omci and must be ignored.
if (_set_vlan_vid is None or _set_vlan_vid == 0) and _vlan_vid != RESERVED_TRANSPARENT_VLAN:
@@ -1079,6 +1154,11 @@
self.log.error('unsupported-action-type',
action_type=action.type, in_port=_in_port)
+ if self._set_vlan is not None:
+ if uni_id not in self._set_vlan:
+ self._set_vlan[uni_id] = dict()
+ self._set_vlan[uni_id][tp_id] = _set_vlan_vid
+ self.log.debug("set_vlan_id-for-tp", _set_vlan_vid=_set_vlan_vid, tp_id=tp_id)
# OMCI set vlan task can only filter and set on vlan header attributes. Any other openflow
# supported match and action criteria cannot be handled by omci and must be ignored.
if (_set_vlan_vid is None or _set_vlan_vid == 0) and _vlan_vid != RESERVED_TRANSPARENT_VLAN:
@@ -1268,6 +1348,8 @@
self.log.debug('stopping-openomci-statemachine', device_id=self.device_id)
reactor.callLater(0, self._onu_omci_device.stop)
+ self._tp = dict()
+
# Let TP download happen again
for uni_id in self._tp_service_specific_task:
self._tp_service_specific_task[uni_id].clear()