VOL-2686 Configure multicast MEs when flows are pushed and tp is configured
Change-Id: I283d6e689c010ec7e9c45c5921a20e08f3d5a93c
diff --git a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index dc84d6a..b5789aa 100644
--- a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -62,6 +62,7 @@
from onu_gem_port import OnuGemPort
from onu_tcont import OnuTCont
from pon_port import PonPort
+from omci.brcm_mcast_task import BrcmMcastTask
from uni_port import UniPort, UniType
from uni_port import RESERVED_TRANSPARENT_VLAN
from pyvoltha.common.tech_profile.tech_profile import TechProfile
@@ -72,6 +73,8 @@
OP = EntityOperations
RC = ReasonCodes
+IS_MULTICAST='is_multicast'
+GEM_PORT_ID = 'gemport_id'
_STARTUP_RETRY_WAIT = 10
_PATH_SEPERATOR = "/"
@@ -96,7 +99,7 @@
self._omcc_version = OMCCVersion.Unknown
self._total_tcont_count = 0 # From ANI-G ME
self._qos_flexibility = 0 # From ONT2_G ME
-
+ self._tp = dict() #tp_id -> technology profile definition in KV Store.
self._onu_indication = None
self._unis = dict() # Port # -> UniPort
@@ -126,6 +129,7 @@
self._queued_vlan_filter_task = dict()
+ self._set_vlan = dict() #uni_id, tp_id -> set_vlan_id
# Initialize KV store client
self.args = registry('main').get_args()
if self.args.backend == 'etcd':
@@ -389,7 +393,13 @@
new_gem_ports = []
for gem_port in gem_ports:
gemdict = dict()
- gemdict['gemport_id'] = gem_port['gemport_id']
+ if gem_port[IS_MULTICAST] == 'True':
+ gemdict[GEM_PORT_ID] = gem_port['multicast_gem_id']
+ gemdict[IS_MULTICAST] = True
+ else:
+ gemdict[GEM_PORT_ID] = gem_port[GEM_PORT_ID]
+ gemdict[IS_MULTICAST] = False
+
gemdict['direction'] = direction
gemdict['alloc_id_ref'] = alloc_id_ref
gemdict['encryption'] = gem_port['aes_encryption']
@@ -479,7 +489,7 @@
tpstored = self.kv_client[tp_path]
tpstring = tpstored.decode('ascii')
tp = json.loads(tpstring)
-
+ self._tp[tp_id] = tp
self.log.debug("tp-instance", tp=tp)
tconts, gem_ports = self._do_tech_profile_configuration(uni_id, tp)
@@ -493,6 +503,15 @@
reactor.callInThread(self._execute_queued_vlan_filter_tasks, uni_id, tp_id)
yield self.core_proxy.device_reason_update(self.device_id, 'tech-profile-config-download-success')
+ # Execute mcast task
+ for gem in gem_ports:
+ self.log.debug("checking-multicast-service-for-gem ", gem=gem)
+ if gem.mcast is True:
+ self.log.info("found-multicast-service-for-gem ", gem=gem, uni_id=uni_id, tp_id=tp_id)
+ reactor.callInThread(self.start_multicast_service, uni_id, tp_path)
+ self.log.debug("started_multicast_service-successfully", tconts=tconts, gems=gem_ports)
+ break
+
@inlineCallbacks
def failure(_reason):
self.log.warn('tech-profile-config-failure-retrying', uni_id=uni_id, tp_id=tp_id,
@@ -566,6 +585,56 @@
self._deferred = \
self._onu_omci_device.task_runner.queue_task(self._tp_service_specific_task[uni_id][tp_path])
self._deferred.addCallbacks(success, failure)
+ def start_multicast_service(self, uni_id, tp_path,retry_count=0):
+ self.log.debug("starting-multicast-service", uni_id=uni_id, tp_path=tp_path)
+ tp_id = self.extract_tp_id_from_path(tp_path)
+ if uni_id in self._set_vlan and tp_id in self._set_vlan[uni_id]:
+ try:
+ tp = self._tp[tp_id]
+ if tp is None:
+ tpstored = self.kv_client[tp_path]
+ tpstring = tpstored.decode('ascii')
+ tp = json.loads(tpstring)
+ if tp is None:
+ self.log.error("cannot-find-tp-to-start-multicast-service", uni_id=uni_id, tp_path=tp_path)
+ return
+ else:
+ self._tp[tp_id] = tp
+
+ self.log.debug("mcast-vlan-learned-before", self._set_vlan[uni_id][tp_id], uni_id=uni_id, tp_id=tp_id)
+ def success(_results):
+ self.log.debug('multicast-success', uni_id=uni_id)
+ self._multicast_task = None
+
+ def failure(_reason):
+ self.log.warn('multicast-failure', _reason=_reason)
+ retry = _STARTUP_RETRY_WAIT * (random.randint(1,5))
+ reactor.callLater(retry, self.start_multicast_service,
+ uni_id, tp_path)
+
+ self.log.debug('starting-multicast-task', mcast_vlan_id=self._set_vlan[uni_id][tp_id])
+ downstream_gem_port_attribute_list = tp['downstream_gem_port_attribute_list']
+ for i in range(len(downstream_gem_port_attribute_list)):
+ if IS_MULTICAST in downstream_gem_port_attribute_list[i] and \
+ downstream_gem_port_attribute_list[i][IS_MULTICAST] == 'True':
+ dynamic_access_control_list_table = downstream_gem_port_attribute_list[i]['dynamic_access_control_list'].split("-")
+ static_access_control_list_table = downstream_gem_port_attribute_list[i]['static_access_control_list'].split("-")
+ multicast_gem_id = downstream_gem_port_attribute_list[i]['multicast_gem_id']
+ break
+
+ self._multicast_task = BrcmMcastTask(self.omci_agent, self, self.device_id, uni_id, tp_id,
+ self._set_vlan[uni_id][tp_id],dynamic_access_control_list_table, static_access_control_list_table, multicast_gem_id)
+ self._deferred = self._onu_omci_device.task_runner.queue_task(self._multicast_task)
+ self._deferred.addCallbacks(success, failure)
+ except Exception as e:
+ self.log.exception("error-loading-multicast", e=e)
+ else:
+ if retry_count<30:
+ retry_count = +1
+ self.log.debug("going-to-wait-for-flow-to-learn-mcast-vlan", uni_id=uni_id, tp_id=tp_id, retry=retry_count)
+ reactor.callLater(0.5, self.start_multicast_service, uni_id, tp_path, retry_count)
+ else:
+ self.log.error("mcast-vlan-not-configured-yet-failing-mcast-service-conf", uni_id=uni_id, tp_id=tp_id, retry=retry_count)
def delete_tech_profile(self, uni_id, tp_path, alloc_id=None, gem_port_id=None):
try:
@@ -883,6 +952,12 @@
self.log.error('unsupported-action-type',
action_type=action.type, in_port=_in_port)
+ if self._set_vlan is not None:
+ if uni_id not in self._set_vlan:
+ self._set_vlan[uni_id] = dict()
+ self._set_vlan[uni_id][tp_id] = _set_vlan_vid
+ self.log.debug("set_vlan_id-for-tp", _set_vlan_vid=_set_vlan_vid, tp_id=tp_id)
+
# OMCI set vlan task can only filter and set on vlan header attributes. Any other openflow
# supported match and action criteria cannot be handled by omci and must be ignored.
if (_set_vlan_vid is None or _set_vlan_vid == 0) and _vlan_vid != RESERVED_TRANSPARENT_VLAN:
@@ -1079,6 +1154,11 @@
self.log.error('unsupported-action-type',
action_type=action.type, in_port=_in_port)
+ if self._set_vlan is not None:
+ if uni_id not in self._set_vlan:
+ self._set_vlan[uni_id] = dict()
+ self._set_vlan[uni_id][tp_id] = _set_vlan_vid
+ self.log.debug("set_vlan_id-for-tp", _set_vlan_vid=_set_vlan_vid, tp_id=tp_id)
# OMCI set vlan task can only filter and set on vlan header attributes. Any other openflow
# supported match and action criteria cannot be handled by omci and must be ignored.
if (_set_vlan_vid is None or _set_vlan_vid == 0) and _vlan_vid != RESERVED_TRANSPARENT_VLAN:
@@ -1268,6 +1348,8 @@
self.log.debug('stopping-openomci-statemachine', device_id=self.device_id)
reactor.callLater(0, self._onu_omci_device.stop)
+ self._tp = dict()
+
# Let TP download happen again
for uni_id in self._tp_service_specific_task:
self._tp_service_specific_task[uni_id].clear()
diff --git a/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py b/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py
index 1615587..55466b1 100644
--- a/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py
+++ b/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py
@@ -46,6 +46,8 @@
self._device = omci_agent.get_device ( device_id )
self._results = None
self._local_deferred = None
+ self._uni_port = handler.uni_ports[uni_id]
+ assert self._uni_port.uni_id == uni_id
#mcast entities IDs
self._mcast_operation_profile_id = handler.pon_port.mac_bridge_port_ani_entity_id + tp_id
self._mcast_sub_conf_info_id = self._mcast_operation_profile_id + tp_id
diff --git a/python/adapters/brcm_openomci_onu/onu_gem_port.py b/python/adapters/brcm_openomci_onu/onu_gem_port.py
index d9ed42e..5d8e9b7 100644
--- a/python/adapters/brcm_openomci_onu/onu_gem_port.py
+++ b/python/adapters/brcm_openomci_onu/onu_gem_port.py
@@ -90,13 +90,17 @@
self.tx_bytes = 0
def __str__(self):
- return "OnuGemPort - entity_id {}, alloc-id: {}, gem-id: {}".format(self.entity_id, self.alloc_id,
- self.gem_id)
+ return "OnuGemPort - entity_id {}, alloc-id: {}, gem-id: {}, direction: {}, multicast: {} ".format(self.entity_id, self.alloc_id,
+ self.gem_id, self.direction, self.multicast)
def __repr__(self):
return str(self)
@property
+ def mcast(self):
+ return self.multicast
+
+ @property
def pon_id(self):
return self._pon_id
@@ -253,6 +257,7 @@
priority_q=gem_port['priority_q'],
scheduling_policy=gem_port['scheduling_policy'],
weight=gem_port['weight'],
+ multicast=gem_port['is_multicast'],
handler=handler,
untagged=False)
@@ -269,17 +274,17 @@
ieee_mapper_service_profile_entity_id=ieee_mapper_service_profile_entity_id,
gal_enet_profile_entity_id=gal_enet_profile_entity_id,
ul_prior_q_entity_id=ul_prior_q_entity_id,
- dl_prior_q_entity_id=dl_prior_q_entity_id)
+ dl_prior_q_entity_id=dl_prior_q_entity_id,
+ multicast=self.multicast)
try:
direction = "downstream" if self.multicast else "bi-directional"
entity_id = self.gem_id if self.multicast else self.entity_id
- assert not self.multicast, 'MCAST is not supported yet'
attributes = dict()
attributes['priority_queue_pointer_downstream'] = dl_prior_q_entity_id
msg = GemPortNetworkCtpFrame(
- self.entity_id, # same entity id as GEM port
+ entity_id, # same entity id as GEM port
port_id=self.gem_id,
tcont_id=tcont_entity_id,
direction=direction,
diff --git a/python/requirements.txt b/python/requirements.txt
index f35a394..3cab263 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,2 +1,2 @@
voltha-protos==3.2.6
-pyvoltha==2.3.20
+pyvoltha==2.3.21