VOL-2583 Multicast task added for IPTV service. This task building entities
concerning special multicast. These entities; multicast operation profile,
multicast subscriber config info and a multicast gem.
Change-Id: Ie33e22c36f0ad7028b2b27c86c4b6563f7a5ae34
diff --git a/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py b/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py
new file mode 100644
index 0000000..1615587
--- /dev/null
+++ b/python/adapters/brcm_openomci_onu/omci/brcm_mcast_task.py
@@ -0,0 +1,331 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+import structlog
+from pyvoltha.adapters.extensions.omci.omci_entities import OntG, PriorityQueueG, Tcont
+from pyvoltha.adapters.extensions.omci.omci_me import MacBridgePortConfigurationDataFrame, AccessControlRow0, \
+ MulticastOperationsProfileFrame, MulticastSubscriberConfigInfoFrame, VlanTaggingFilterDataFrame, \
+ ExtendedVlanTaggingOperationConfigurationDataFrame, VlanTaggingOperation
+from twisted.internet import reactor
+from pyvoltha.adapters.extensions.omci.omci_defs import ReasonCodes, EntityOperations
+from pyvoltha.adapters.extensions.omci.tasks.task import Task
+from twisted.internet.defer import inlineCallbacks, failure
+
+
+RC = ReasonCodes
+OP = EntityOperations
+IGMP_VERSION = 2
+RESERVED_VLAN = 4095
+
+
+class BrcmMcastTask(Task):
+ task_priority = 150
+ name = "Broadcom Multicast Task"
+
+ def __init__(self, omci_agent, handler, device_id, uni_id, tp_id, vlan_id, dynamic_acl, static_acl, multicast_gem_id ):
+
+ self.log = structlog.get_logger ( device_id=device_id )
+ super ( BrcmMcastTask , self ).__init__ ( BrcmMcastTask.name ,
+ omci_agent ,
+ device_id,
+ self.task_priority)
+ self._device = omci_agent.get_device ( device_id )
+ self._results = None
+ self._local_deferred = None
+ #mcast entities IDs
+ self._mcast_operation_profile_id = handler.pon_port.mac_bridge_port_ani_entity_id + tp_id
+ self._mcast_sub_conf_info_id = self._mcast_operation_profile_id + tp_id
+ self._mac_bridge_service_profile_entity_id = handler.mac_bridge_service_profile_entity_id
+ self._mac_bridge_configuration_data_id =self._mac_bridge_service_profile_entity_id +\
+ self._uni_port.mac_bridge_port_num
+ self._ieee_mapper_service_profile_entity_id = \
+ handler.pon_port.ieee_mapper_service_profile_entity_id
+ self._gal_enet_profile_entity_id = \
+ handler.gal_enet_profile_entity_id
+ # using for multicast operations profile and multicast subscriber config info
+ self._set_igmp_function = 0
+ self._set_immediate_leave = False
+ self._set_vlan_id = vlan_id
+ # dynamic_access_control_list , static_access_control_list and multicast_gem_id
+ self._dynamic_acl= dynamic_acl
+ self._static_acl= static_acl
+ self._mcast_gem_id= multicast_gem_id
+ self._uni_port = handler.uni_ports[uni_id]
+ assert self._uni_port.uni_id == uni_id
+ # gem_port list
+ self._gem_ports = []
+ for gem_port in list(handler.pon_port.gem_ports.values()):
+ if gem_port.uni_id is not None and gem_port.uni_id != self._uni_port.uni_id: continue
+ self._gem_ports.append(gem_port)
+
+ self._tcont_list=[]
+ for gem_port in self._gem_ports:
+ self._tcont_list.append(gem_port.tcont)
+ self.tcont_me_to_queue_map = dict()
+ self.uni_port_to_queue_map = dict()
+
+ self._set_me_type = 0
+ self._set_interworking_option = 0
+ self._store_tcont_list()
+
+ def cancel_deferred(self):
+ super ( BrcmMcastTask , self ).cancel_deferred ()
+ d , self._local_deferred = self._local_deferred , None
+ try:
+ if d is not None and not d.called:
+ d.cancel ()
+ except:
+ pass
+
+ def start(self):
+ super ( BrcmMcastTask , self ).start ()
+ self._local_deferred = reactor.callLater ( 0 , self.perform_multicast )
+
+ @inlineCallbacks
+ def perform_multicast(self):
+ self.log.debug('performing-multicast', device_id=self._device._device_id, uni_id=self._uni_port.uni_id,
+ vlan_id=self._set_vlan_id, multicast_gem_id=self._mcast_gem_id)
+
+ try:
+ # create gem port and use tcont
+ yield self._create_gem_ports()
+
+ # create multicast operation profile
+ yield self._create_mcast_operation_profile()
+
+ # set multicast operation profile
+ yield self._set_mcast_operation_profile()
+
+ # create multicast subscriber config data
+ yield self._create_mcast_subscriber_conf_info()
+
+ # create mac bridge port configuration data
+ yield self._create_mac_bridge_configuration_data()
+
+ # create vlan filtering entity
+ yield self._create_vlan_tagging_filter_data()
+
+ # set vlan filtering entity
+ yield self._set_vlan_tagging_filter_data()
+
+ self.deferred.callback ( self )
+ except Exception as e:
+ self.log.exception ( 'multicast exception' , e=e )
+ self.deferred.errback ( failure.Failure ( e ) )
+
+ @inlineCallbacks
+ def _create_gem_ports(self):
+ omci_cc = self._device.omci_cc
+ try:
+ tcont = None
+ self.log.debug("tcont-list", _tcont_list=self._tcont_list)
+ for gem_tcont in self._tcont_list:
+ if gem_tcont.entity_id is not None:
+ tcont=gem_tcont
+ for gem_port in self._gem_ports:
+ gem_port_tcont= gem_port.tcont
+ if gem_port_tcont is not None and gem_port_tcont.entity_id is not None:
+ tcont = gem_port.tcont
+ ul_prior_q_entity_id = \
+ self.tcont_me_to_queue_map[tcont.entity_id][gem_port.priority_q]
+ self.log.debug("ul_prior_q_entity_id", ul_priority=ul_prior_q_entity_id)
+
+ dl_prior_q_entity_id = \
+ self.uni_port_to_queue_map[self._uni_port.entity_id][gem_port.priority_q]
+ if ul_prior_q_entity_id is not None and dl_prior_q_entity_id is not None:
+ if gem_port.direction == 'downstream' and gem_port.gem_id == self._mcast_gem_id:
+ results = yield gem_port.add_to_hardware ( omci=omci_cc ,
+ tcont_entity_id=tcont.entity_id ,
+ ieee_mapper_service_profile_entity_id=self._ieee_mapper_service_profile_entity_id +
+ self._uni_port.mac_bridge_port_num ,
+ gal_enet_profile_entity_id=self._gal_enet_profile_entity_id ,
+ ul_prior_q_entity_id=ul_prior_q_entity_id ,
+ dl_prior_q_entity_id=dl_prior_q_entity_id)
+ self.check_status_and_state ( results , 'assign-gem-port' )
+ pass
+ except Exception as e:
+ self.log.debug("failed-create-gem-ports-method", e=e)
+
+ def _store_tcont_list(self):
+ pq_to_related_port = dict()
+ onu_g = self._device.query_mib ( OntG.class_id )
+ traffic_mgmt_opt = \
+ onu_g.get ( 'attributes' , {} ).get ( 'traffic_management_options' , 0 )
+ self.log.debug ( "traffic-mgmt-option" , traffic_mgmt_opt=traffic_mgmt_opt )
+
+ prior_q = self._device.query_mib ( PriorityQueueG.class_id )
+ for k , v in prior_q.items():
+ self.log.debug("prior-q", k=k, v=v)
+ try:
+ _ = iter(v)
+ except TypeError:
+ continue
+
+ if 'instance_id' in v:
+ related_port = v['attributes']['related_port']
+ pq_to_related_port[k] = related_port
+
+ if v['instance_id'] & 0b1000000000000000:
+ tcont_me = (related_port & 0xffff0000) >> 16
+ if tcont_me not in self.tcont_me_to_queue_map:
+ self.log.debug ( "prior-q-related-port-and-tcont-me" ,
+ related_port=related_port ,
+ tcont_me=tcont_me )
+ self.tcont_me_to_queue_map[tcont_me] = list ()
+
+ self.tcont_me_to_queue_map[tcont_me].append ( k )
+ else:
+ uni_port = (related_port & 0xffff0000) >> 16
+ if uni_port == self._uni_port.entity_id:
+ if uni_port not in self.uni_port_to_queue_map:
+ self.log.debug ( "prior-q-related-port-and-uni-port-me" ,
+ related_port=related_port ,
+ uni_port_me=uni_port )
+ self.uni_port_to_queue_map[uni_port] = list ()
+
+ self.uni_port_to_queue_map[uni_port].append ( k )
+
+
+ @inlineCallbacks
+ def _create_mac_bridge_configuration_data(self):
+ self.log.debug ( 'starting-create-mac-bridge-conf-data' )
+ attributes = dict ( bridge_id_pointer=self._mac_bridge_configuration_data_id,
+ port_num=0xf0,
+ tp_type=6,
+ tp_pointer=self._mcast_gem_id)
+ msg = MacBridgePortConfigurationDataFrame ( entity_id=self._mac_bridge_configuration_data_id, attributes=attributes )
+ yield self._send_msg ( msg , 'create' , 'create-mac-bridge-port-conf-data' )
+
+
+ @inlineCallbacks
+ def _create_mcast_operation_profile(self):
+ self.log.debug ( 'starting-create-mcast-operation-profile' )
+ attributes = dict ( igmp_version=IGMP_VERSION , igmp_function=self._set_igmp_function,
+ immediate_leave=self._set_immediate_leave, robustness=0)
+ msg = MulticastOperationsProfileFrame ( entity_id=self._mcast_operation_profile_id,
+ querier_ip_address=0,
+ query_interval=125,
+ query_max_response=100,
+ last_member_query_interval=10,
+ unauthorized_join_request_behavior=False,
+ attributes=attributes)
+ yield self._send_msg ( msg , 'create' , 'create-multicast-operation-profile')
+
+ @inlineCallbacks
+ def _set_mcast_operation_profile(self):
+ self.log.debug('starting-set-mcast-operation-profile')
+ dynamic_access_control_list_table= AccessControlRow0 (
+ set_ctrl=1 ,
+ row_part_id=0 ,
+ test=0 ,
+ row_key=0 ,
+ gem_port_id=self._mcast_gem_id,
+ vlan_id=self._set_vlan_id,
+ src_ip="0.0.0.0",
+ dst_ip_start=self._dynamic_acl[0],
+ dst_ip_end=self._dynamic_acl[1],
+ ipm_group_bw=0
+
+ )
+ msg = MulticastOperationsProfileFrame(entity_id=self._mcast_operation_profile_id,
+ dynamic_access_control_list_table=dynamic_access_control_list_table
+ )
+ yield self._send_msg(msg, 'set', 'set-multicast-operations-profile')
+
+ @inlineCallbacks
+ def _create_mcast_subscriber_conf_info(self):
+ self.log.debug ( 'creating-multicast-subscriber-config-info' )
+ attributes = dict ( me_type=self._set_me_type,
+ multicast_operations_profile_pointer=self._mcast_operation_profile_id )
+ msg = MulticastSubscriberConfigInfoFrame ( entity_id=self._uni_port.entity_id, attributes=attributes )
+ yield self._send_msg ( msg , 'create' , 'create-multicast-subscriber-config-info' )
+
+
+ @inlineCallbacks
+ def _create_vlan_tagging_filter_data(self):
+ self.log.debug('creating-vlan-tagging-filter-data')
+ forward_operation = 0x10 # VID investigation
+ # When the PUSH VLAN is RESERVED_VLAN (4095), let ONU be transparent
+ if self._set_vlan_id == RESERVED_VLAN:
+ forward_operation = 0x00 # no investigation, ONU transparent
+
+ # Create bridge ani side vlan filter
+ msg = VlanTaggingFilterDataFrame(
+ self._mac_bridge_configuration_data_id, # Entity ID
+ vlan_tcis=[self._set_vlan_id], # VLAN IDs
+ forward_operation=forward_operation
+ )
+
+ yield self._send_msg(msg, 'create', 'flow-create-vlan-tagging-filter-data')
+
+ @inlineCallbacks
+ def _set_vlan_tagging_filter_data(self):
+ self.log.debug('setting-vlan-tagging-filter-data')
+ forward_operation = 0x10 # VID investigation
+ # When the PUSH VLAN is RESERVED_VLAN (4095), let ONU be transparent
+ if self._set_vlan_id == RESERVED_VLAN:
+ forward_operation = 0x00 # no investigation, ONU transparent
+
+ # Create bridge ani side vlan filter
+ msg = VlanTaggingFilterDataFrame(
+ self._mac_bridge_configuration_data_id, # Entity ID
+ vlan_tcis=[self._set_vlan_id], # VLAN IDs
+ forward_operation=forward_operation
+ )
+
+ yield self._send_msg(msg, 'set', 'flow-set-vlan-tagging-filter-data')
+
+
+ @inlineCallbacks
+ def _send_msg(self , msg , operation , log_comment):
+ if operation == 'create':
+ frame = msg.create ()
+ elif operation == 'set':
+ frame = msg.set ()
+ else:
+ frame = msg.delete ()
+ self.log.debug ( 'openomci-msg' , omci_msg=msg )
+ self.strobe_watchdog ()
+ results = yield self._device.omci_cc.send ( frame )
+ self.check_status_and_state ( results , log_comment )
+
+ def check_status_and_state(self , results , operation=''):
+ """
+ Check the results of an OMCI response. An exception is thrown
+ if the task was cancelled or an error was detected.
+
+ :param results: (OmciFrame) OMCI Response frame
+ :param operation: (str) what operation was being performed
+ :return: True if successful, False if the entity existed (already created)
+ """
+
+ omci_msg = results.fields['omci_message'].fields
+ status = omci_msg['success_code']
+ error_mask = omci_msg.get ( 'parameter_error_attributes_mask' , 'n/a' )
+ failed_mask = omci_msg.get ( 'failed_attributes_mask' , 'n/a' )
+ unsupported_mask = omci_msg.get ( 'unsupported_attributes_mask' , 'n/a' )
+
+ self.log.debug ( "OMCI Result: %s" , operation , omci_msg=omci_msg ,
+ status=status , error_mask=error_mask ,
+ failed_mask=failed_mask , unsupported_mask=unsupported_mask )
+
+ if status == RC.Success:
+ self.strobe_watchdog ()
+ return True
+
+ elif status == RC.InstanceExists:
+ return False
diff --git a/python/adapters/brcm_openomci_onu/onu_gem_port.py b/python/adapters/brcm_openomci_onu/onu_gem_port.py
index 6b00c2e..d9ed42e 100644
--- a/python/adapters/brcm_openomci_onu/onu_gem_port.py
+++ b/python/adapters/brcm_openomci_onu/onu_gem_port.py
@@ -16,7 +16,8 @@
from __future__ import absolute_import
import structlog
from twisted.internet.defer import inlineCallbacks, returnValue
-from pyvoltha.adapters.extensions.omci.omci_me import GemInterworkingTpFrame, GemPortNetworkCtpFrame
+from pyvoltha.adapters.extensions.omci.omci_me import GemInterworkingTpFrame, GemPortNetworkCtpFrame, MulticastGemInterworkingTPFrame
+from pyvoltha.adapters.extensions.omci.omci_entities import IPv4MulticastAddressTable
from pyvoltha.adapters.extensions.omci.omci_defs import ReasonCodes
RC = ReasonCodes
@@ -272,6 +273,7 @@
try:
direction = "downstream" if self.multicast else "bi-directional"
+ entity_id = self.gem_id if self.multicast else self.entity_id
assert not self.multicast, 'MCAST is not supported yet'
attributes = dict()
@@ -294,16 +296,30 @@
raise
try:
- # TODO: magic numbers here
- msg = GemInterworkingTpFrame(
- self.entity_id, # same entity id as GEM port
- gem_port_network_ctp_pointer=self.entity_id,
- interworking_option=5, # IEEE 802.1
- service_profile_pointer=ieee_mapper_service_profile_entity_id,
- interworking_tp_pointer=0x0,
- pptp_counter=1,
- gal_profile_pointer=gal_enet_profile_entity_id
- )
+ if self.multicast:
+ # 224.0.0.0 - 239.255.255.255 is valid multicast ip range
+ mcast_ip_list= IPv4MulticastAddressTable( gem_port_id=self.gem_id,
+ secondary_key=0,
+ multicast_ip_range_start="224.0.0.0",
+ multicast_ip_range_stop="239.255.255.255")
+ msg = MulticastGemInterworkingTPFrame(
+ self.gem_id,
+ gem_port_network_ctp_pointer=self.gem_id,
+ interworking_option=0, # Mac Bridge
+ service_profile_pointer=0,
+ gal_profile_pointer=gal_enet_profile_entity_id,
+ ipv4_multicast_address_table= mcast_ip_list
+ )
+ else:
+ msg = GemInterworkingTpFrame(
+ self.entity_id, # same entity id as GEM port
+ gem_port_network_ctp_pointer=self.entity_id,
+ interworking_option=5, # IEEE 802.1
+ service_profile_pointer=ieee_mapper_service_profile_entity_id,
+ interworking_tp_pointer=0x0,
+ pptp_counter=1,
+ gal_profile_pointer=gal_enet_profile_entity_id
+ )
frame = msg.create()
self.log.debug('openomci-msg', omci_msg=msg)
results = yield omci.send(frame)
diff --git a/python/requirements.txt b/python/requirements.txt
index 4733885..e0c2769 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -1,2 +1,2 @@
voltha-protos==3.2.3
-pyvoltha==2.3.18
+pyvoltha==2.3.20