VOL-2686 Configure multicast MEs when flows are pushed and tp is configured

Change-Id: I283d6e689c010ec7e9c45c5921a20e08f3d5a93c
diff --git a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index dc84d6a..b5789aa 100644
--- a/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/python/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -62,6 +62,7 @@
 from onu_gem_port import OnuGemPort
 from onu_tcont import OnuTCont
 from pon_port import PonPort
+from omci.brcm_mcast_task import BrcmMcastTask
 from uni_port import UniPort, UniType
 from uni_port import RESERVED_TRANSPARENT_VLAN
 from pyvoltha.common.tech_profile.tech_profile import TechProfile
@@ -72,6 +73,8 @@
 OP = EntityOperations
 RC = ReasonCodes
 
+IS_MULTICAST='is_multicast'
+GEM_PORT_ID = 'gemport_id'
 _STARTUP_RETRY_WAIT = 10
 _PATH_SEPERATOR = "/"
 
@@ -96,7 +99,7 @@
         self._omcc_version = OMCCVersion.Unknown
         self._total_tcont_count = 0  # From ANI-G ME
         self._qos_flexibility = 0  # From ONT2_G ME
-
+        self._tp = dict()          #tp_id -> technology profile definition in KV Store.
         self._onu_indication = None
         self._unis = dict()  # Port # -> UniPort
 
@@ -126,6 +129,7 @@
 
         self._queued_vlan_filter_task = dict()
 
+        self._set_vlan = dict()  #uni_id, tp_id -> set_vlan_id
         # Initialize KV store client
         self.args = registry('main').get_args()
         if self.args.backend == 'etcd':
@@ -389,7 +393,13 @@
         new_gem_ports = []
         for gem_port in gem_ports:
             gemdict = dict()
-            gemdict['gemport_id'] = gem_port['gemport_id']
+            if gem_port[IS_MULTICAST] == 'True':
+                gemdict[GEM_PORT_ID] = gem_port['multicast_gem_id']
+                gemdict[IS_MULTICAST] = True
+            else:
+                gemdict[GEM_PORT_ID] = gem_port[GEM_PORT_ID]
+                gemdict[IS_MULTICAST] = False
+
             gemdict['direction'] = direction
             gemdict['alloc_id_ref'] = alloc_id_ref
             gemdict['encryption'] = gem_port['aes_encryption']
@@ -479,7 +489,7 @@
                 tpstored = self.kv_client[tp_path]
                 tpstring = tpstored.decode('ascii')
                 tp = json.loads(tpstring)
-
+                self._tp[tp_id] = tp
                 self.log.debug("tp-instance", tp=tp)
                 tconts, gem_ports = self._do_tech_profile_configuration(uni_id, tp)
 
@@ -493,6 +503,15 @@
                     reactor.callInThread(self._execute_queued_vlan_filter_tasks, uni_id, tp_id)
                     yield self.core_proxy.device_reason_update(self.device_id, 'tech-profile-config-download-success')
 
+                    # Execute mcast task
+                    for gem in gem_ports:
+                        self.log.debug("checking-multicast-service-for-gem ",  gem=gem)
+                        if gem.mcast is True:
+                            self.log.info("found-multicast-service-for-gem ",  gem=gem, uni_id=uni_id, tp_id=tp_id)
+                            reactor.callInThread(self.start_multicast_service, uni_id, tp_path)
+                            self.log.debug("started_multicast_service-successfully", tconts=tconts, gems=gem_ports)
+                            break
+
                 @inlineCallbacks
                 def failure(_reason):
                     self.log.warn('tech-profile-config-failure-retrying', uni_id=uni_id, tp_id=tp_id,
@@ -566,6 +585,56 @@
                 self._deferred = \
                     self._onu_omci_device.task_runner.queue_task(self._tp_service_specific_task[uni_id][tp_path])
                 self._deferred.addCallbacks(success, failure)
+    def start_multicast_service(self, uni_id, tp_path,retry_count=0):
+        self.log.debug("starting-multicast-service", uni_id=uni_id, tp_path=tp_path)
+        tp_id = self.extract_tp_id_from_path(tp_path)
+        if uni_id in self._set_vlan and tp_id in self._set_vlan[uni_id]:
+            try:
+                tp = self._tp[tp_id]
+                if tp is None:
+                    tpstored = self.kv_client[tp_path]
+                    tpstring = tpstored.decode('ascii')
+                    tp = json.loads(tpstring)
+                    if tp is None:
+                        self.log.error("cannot-find-tp-to-start-multicast-service", uni_id=uni_id, tp_path=tp_path)
+                        return
+                    else:
+                        self._tp[tp_id] = tp
+
+                self.log.debug("mcast-vlan-learned-before", self._set_vlan[uni_id][tp_id], uni_id=uni_id, tp_id=tp_id)
+                def success(_results):
+                    self.log.debug('multicast-success', uni_id=uni_id)
+                    self._multicast_task = None
+
+                def failure(_reason):
+                    self.log.warn('multicast-failure', _reason=_reason)
+                    retry = _STARTUP_RETRY_WAIT * (random.randint(1,5))
+                    reactor.callLater(retry, self.start_multicast_service,
+                                                             uni_id, tp_path)
+
+                self.log.debug('starting-multicast-task', mcast_vlan_id=self._set_vlan[uni_id][tp_id])
+                downstream_gem_port_attribute_list = tp['downstream_gem_port_attribute_list']
+                for i in range(len(downstream_gem_port_attribute_list)):
+                    if IS_MULTICAST in downstream_gem_port_attribute_list[i] and \
+                            downstream_gem_port_attribute_list[i][IS_MULTICAST] == 'True':
+                        dynamic_access_control_list_table = downstream_gem_port_attribute_list[i]['dynamic_access_control_list'].split("-")
+                        static_access_control_list_table = downstream_gem_port_attribute_list[i]['static_access_control_list'].split("-")
+                        multicast_gem_id = downstream_gem_port_attribute_list[i]['multicast_gem_id']
+                        break
+
+                self._multicast_task = BrcmMcastTask(self.omci_agent, self, self.device_id, uni_id, tp_id,
+                                                     self._set_vlan[uni_id][tp_id],dynamic_access_control_list_table, static_access_control_list_table, multicast_gem_id)
+                self._deferred = self._onu_omci_device.task_runner.queue_task(self._multicast_task)
+                self._deferred.addCallbacks(success, failure)
+            except Exception as e:
+                self.log.exception("error-loading-multicast", e=e)
+        else:
+            if retry_count<30:
+                retry_count = +1
+                self.log.debug("going-to-wait-for-flow-to-learn-mcast-vlan", uni_id=uni_id, tp_id=tp_id, retry=retry_count)
+                reactor.callLater(0.5, self.start_multicast_service, uni_id, tp_path, retry_count)
+            else:
+                self.log.error("mcast-vlan-not-configured-yet-failing-mcast-service-conf", uni_id=uni_id, tp_id=tp_id, retry=retry_count)
 
     def delete_tech_profile(self, uni_id, tp_path, alloc_id=None, gem_port_id=None):
         try:
@@ -883,6 +952,12 @@
                             self.log.error('unsupported-action-type',
                                            action_type=action.type, in_port=_in_port)
 
+                    if self._set_vlan is not None:
+                        if uni_id not in self._set_vlan:
+                            self._set_vlan[uni_id] = dict()
+                        self._set_vlan[uni_id][tp_id] = _set_vlan_vid
+                        self.log.debug("set_vlan_id-for-tp", _set_vlan_vid=_set_vlan_vid, tp_id=tp_id)
+
                     # OMCI set vlan task can only filter and set on vlan header attributes.  Any other openflow
                     # supported match and action criteria cannot be handled by omci and must be ignored.
                     if (_set_vlan_vid is None or _set_vlan_vid == 0) and _vlan_vid != RESERVED_TRANSPARENT_VLAN:
@@ -1079,6 +1154,11 @@
                         self.log.error('unsupported-action-type',
                                        action_type=action.type, in_port=_in_port)
 
+                if self._set_vlan is not None:
+                    if uni_id not in self._set_vlan:
+                        self._set_vlan[uni_id] = dict()
+                    self._set_vlan[uni_id][tp_id] = _set_vlan_vid
+                    self.log.debug("set_vlan_id-for-tp", _set_vlan_vid=_set_vlan_vid, tp_id=tp_id)
                 # OMCI set vlan task can only filter and set on vlan header attributes.  Any other openflow
                 # supported match and action criteria cannot be handled by omci and must be ignored.
                 if (_set_vlan_vid is None or _set_vlan_vid == 0) and _vlan_vid != RESERVED_TRANSPARENT_VLAN:
@@ -1268,6 +1348,8 @@
             self.log.debug('stopping-openomci-statemachine', device_id=self.device_id)
             reactor.callLater(0, self._onu_omci_device.stop)
 
+            self._tp = dict()
+
             # Let TP download happen again
             for uni_id in self._tp_service_specific_task:
                 self._tp_service_specific_task[uni_id].clear()
diff --git a/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py b/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py
index 1615587..55466b1 100644
--- a/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py
+++ b/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py
@@ -46,6 +46,8 @@
         self._device = omci_agent.get_device ( device_id )
         self._results = None
         self._local_deferred = None
+        self._uni_port = handler.uni_ports[uni_id]
+        assert self._uni_port.uni_id == uni_id
         #mcast entities IDs
         self._mcast_operation_profile_id = handler.pon_port.mac_bridge_port_ani_entity_id + tp_id
         self._mcast_sub_conf_info_id = self._mcast_operation_profile_id + tp_id
diff --git a/python/adapters/brcm_openomci_onu/onu_gem_port.py b/python/adapters/brcm_openomci_onu/onu_gem_port.py
index d9ed42e..5d8e9b7 100644
--- a/python/adapters/brcm_openomci_onu/onu_gem_port.py
+++ b/python/adapters/brcm_openomci_onu/onu_gem_port.py
@@ -90,13 +90,17 @@
         self.tx_bytes = 0
 
     def __str__(self):
-        return "OnuGemPort - entity_id {}, alloc-id: {}, gem-id: {}".format(self.entity_id, self.alloc_id,
-                                                                              self.gem_id)
+        return "OnuGemPort - entity_id {}, alloc-id: {}, gem-id: {}, direction: {}, multicast: {} ".format(self.entity_id, self.alloc_id,
+                                                                              self.gem_id, self.direction, self.multicast)
 
     def __repr__(self):
         return str(self)
 
     @property
+    def mcast(self):
+        return self.multicast
+
+    @property
     def pon_id(self):
         return self._pon_id
 
@@ -253,6 +257,7 @@
                           priority_q=gem_port['priority_q'],
                           scheduling_policy=gem_port['scheduling_policy'],
                           weight=gem_port['weight'],
+                          multicast=gem_port['is_multicast'],
                           handler=handler,
                           untagged=False)
 
@@ -269,17 +274,17 @@
                        ieee_mapper_service_profile_entity_id=ieee_mapper_service_profile_entity_id,
                        gal_enet_profile_entity_id=gal_enet_profile_entity_id,
                        ul_prior_q_entity_id=ul_prior_q_entity_id,
-                       dl_prior_q_entity_id=dl_prior_q_entity_id)
+                       dl_prior_q_entity_id=dl_prior_q_entity_id,
+                       multicast=self.multicast)
 
         try:
             direction = "downstream" if self.multicast else "bi-directional"
             entity_id = self.gem_id if self.multicast else self.entity_id
-            assert not self.multicast, 'MCAST is not supported yet'
 
             attributes = dict()
             attributes['priority_queue_pointer_downstream'] = dl_prior_q_entity_id
             msg = GemPortNetworkCtpFrame(
-                self.entity_id,  # same entity id as GEM port
+                entity_id,  # same entity id as GEM port
                 port_id=self.gem_id,
                 tcont_id=tcont_entity_id,
                 direction=direction,
diff --git a/python/requirements.txt b/python/requirements.txt
index f35a394..3cab263 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,2 +1,2 @@
 voltha-protos==3.2.6
-pyvoltha==2.3.20
+pyvoltha==2.3.21