VOL-1403:
a) Read Tech-Profile ID coming as part of write metadata action to
configure the OLTs schedulers and queues
b) Use the meter-id information coming as part of the flow to do
shaping on the schedulers
c) Generate a tech-profile instance and share it with the ONU so that
ONU can also do the necessary OMCI configuration as per the
tech-profile.
d) Other miscellaneous changes
Change-Id: I0939d7f022aff98c23ac9a60f468cc134d3c1bf3
diff --git a/common/tech_profile/tech_profile.py b/common/tech_profile/tech_profile.py
index abea364..6055b0b 100644
--- a/common/tech_profile/tech_profile.py
+++ b/common/tech_profile/tech_profile.py
@@ -22,7 +22,7 @@
from voltha.core.config.config_backend import ConsulStore
from voltha.core.config.config_backend import EtcdStore
from voltha.registry import registry
-from voltha.adapters.openolt.protos import openolt_pb2
+from voltha.protos import tech_profile_pb2
# logger
log = structlog.get_logger()
@@ -65,7 +65,7 @@
DEFAULT_ADDITIONAL_BW = 'auto'
DEFAULT_PRIORITY = 0
DEFAULT_WEIGHT = 0
- DEFAULT_Q_SCHED_POLICY = 'hybrid'
+ DEFAULT_Q_SCHED_POLICY = 'Hybrid'
def __init__(self, direction, additional_bw=DEFAULT_ADDITIONAL_BW,
priority=DEFAULT_PRIORITY,
@@ -173,13 +173,13 @@
host, port = self.args.etcd.split(':', 1)
self._kv_store = EtcdStore(
host, port, TechProfile.
- KV_STORE_TECH_PROFILE_PATH_PREFIX)
+ KV_STORE_TECH_PROFILE_PATH_PREFIX)
elif self.args.backend == 'consul':
# KV store's IP Address and PORT
host, port = self.args.consul.split(':', 1)
self._kv_store = ConsulStore(
host, port, TechProfile.
- KV_STORE_TECH_PROFILE_PATH_PREFIX)
+ KV_STORE_TECH_PROFILE_PATH_PREFIX)
# self.tech_profile_instance_store = dict()
except Exception as e:
@@ -220,6 +220,8 @@
log.debug(
"Created-tech-profile-instance-with-values-from-kvstore")
else:
+ # Create a default Tech-Profile.
+ # The default profile is a 1 TCONT, 1 GEM port model.
tech_profile = self._default_tech_profile()
log.debug(
"Created-tech-profile-instance-with-default-values")
@@ -427,7 +429,7 @@
@staticmethod
def get_us_scheduler(tech_profile_instance):
# upstream scheduler
- us_scheduler = openolt_pb2.Scheduler(
+ us_scheduler = tech_profile_pb2.SchedulerConfig(
direction=TechProfile.get_parameter(
'direction', tech_profile_instance.us_scheduler.
direction),
@@ -444,7 +446,7 @@
@staticmethod
def get_ds_scheduler(tech_profile_instance):
- ds_scheduler = openolt_pb2.Scheduler(
+ ds_scheduler = tech_profile_pb2.SchedulerConfig(
direction=TechProfile.get_parameter(
'direction', tech_profile_instance.ds_scheduler.
direction),
@@ -460,20 +462,20 @@
return ds_scheduler
@staticmethod
- def get_tconts(tech_profile_instance, us_scheduler=None, ds_scheduler=None):
+ def get_traffic_scheds(tech_profile_instance, us_scheduler=None, ds_scheduler=None):
if us_scheduler is None:
us_scheduler = TechProfile.get_us_scheduler(tech_profile_instance)
if ds_scheduler is None:
ds_scheduler = TechProfile.get_ds_scheduler(tech_profile_instance)
- tconts = [openolt_pb2.Tcont(direction=TechProfile.get_parameter(
+ tconts = [tech_profile_pb2.TrafficScheduler(direction=TechProfile.get_parameter(
'direction',
tech_profile_instance.
us_scheduler.direction),
alloc_id=tech_profile_instance.
us_scheduler.alloc_id,
scheduler=us_scheduler),
- openolt_pb2.Tcont(direction=TechProfile.get_parameter(
+ tech_profile_pb2.TrafficScheduler(direction=TechProfile.get_parameter(
'direction',
tech_profile_instance.
ds_scheduler.direction),
@@ -484,23 +486,88 @@
return tconts
@staticmethod
+ def get_traffic_queues(tech_profile_instance):
+ gemports = list()
+ # Upstream Gemports
+ for i in range(len(tech_profile_instance.
+ upstream_gem_port_attribute_list)):
+ gemports.append(tech_profile_pb2.TrafficQueue(
+ direction=TechProfile.get_parameter('direction',
+ tech_profile_instance.
+ us_scheduler.direction),
+ gemport_id=tech_profile_instance.
+ upstream_gem_port_attribute_list[i].gemport_id,
+ pbit_map=tech_profile_instance.
+ upstream_gem_port_attribute_list[i].pbit_map,
+ aes_encryption=ast.literal_eval(tech_profile_instance.
+ upstream_gem_port_attribute_list[i].aes_encryption),
+ sched_policy=TechProfile.get_parameter(
+ 'sched_policy', tech_profile_instance.
+ upstream_gem_port_attribute_list[i].
+ scheduling_policy),
+ priority=tech_profile_instance.
+ upstream_gem_port_attribute_list[i].priority_q,
+ weight=tech_profile_instance.
+ upstream_gem_port_attribute_list[i].weight,
+ discard_policy=TechProfile.get_parameter(
+ 'discard_policy', tech_profile_instance.
+ upstream_gem_port_attribute_list[i].
+ discard_policy)))
+
+ # Downstream Gemports
+ for i in range(len(tech_profile_instance.
+ downstream_gem_port_attribute_list)):
+ gemports.append(tech_profile_pb2.TrafficQueue(
+ direction=TechProfile.get_parameter('direction',
+ tech_profile_instance.
+ ds_scheduler.direction),
+ gemport_id=tech_profile_instance.
+ downstream_gem_port_attribute_list[i].gemport_id,
+ pbit_map=tech_profile_instance.
+ downstream_gem_port_attribute_list[i].pbit_map,
+ aes_encryption=ast.literal_eval(tech_profile_instance.
+ downstream_gem_port_attribute_list[i].aes_encryption),
+ sched_policy=TechProfile.get_parameter(
+ 'sched_policy', tech_profile_instance.
+ downstream_gem_port_attribute_list[i].
+ scheduling_policy),
+ priority=tech_profile_instance.
+ downstream_gem_port_attribute_list[i].priority_q,
+ weight=tech_profile_instance.
+ downstream_gem_port_attribute_list[i].weight,
+ discard_policy=TechProfile.get_parameter(
+ 'discard_policy', tech_profile_instance.
+ downstream_gem_port_attribute_list[i].
+ discard_policy)))
+ return gemports
+
+ @staticmethod
+ def get_us_traffic_scheduler(tech_profile_instance):
+ us_scheduler = TechProfile.get_us_scheduler(tech_profile_instance)
+ return tech_profile_pb2.TrafficScheduler(direction=TechProfile.get_parameter(
+ 'direction',
+ us_scheduler.direction),
+ alloc_id=us_scheduler.alloc_id,
+ scheduler=us_scheduler)
+
+ @staticmethod
def get_parameter(param_type, param_value):
parameter = None
try:
if param_type == 'direction':
- for direction in openolt_pb2.Direction.keys():
+ for direction in tech_profile_pb2.Direction.keys():
if param_value == direction:
parameter = direction
elif param_type == 'discard_policy':
- for discard_policy in openolt_pb2.DiscardPolicy.keys():
+ for discard_policy in tech_profile_pb2.DiscardPolicy.keys():
if param_value == discard_policy:
parameter = discard_policy
- elif param_type == 'sched_policy':
- for sched_policy in openolt_pb2.SchedulingPolicy.keys():
+ elif param_type == 'q_sched_policy':
+ for sched_policy in tech_profile_pb2.SchedulingPolicy.keys():
if param_value == sched_policy:
parameter = sched_policy
elif param_type == 'additional_bw':
- for bw_component in openolt_pb2.AdditionalBW.keys():
+ for bw_component in tech_profile_pb2.AdditionalBW.keys():
if param_value == bw_component:
parameter = bw_component
except BaseException as e:
@@ -510,26 +577,17 @@
class TechProfileInstance(object):
def __init__(self, subscriber_identifier, tech_profile, resource_mgr,
- intf_id, num_of_tconts=1):
+ intf_id):
if tech_profile is not None:
self.subscriber_identifier = subscriber_identifier
- self.num_of_tconts = num_of_tconts
- self.num_of_gem_ports = tech_profile.num_gem_ports
- self.name = tech_profile.name
- self.profile_type = tech_profile.profile_type
- self.version = tech_profile.version
- self.instance_control = tech_profile.instance_control
- # TODO: Fixed num_of_tconts to 1 per TP Instance.
- # This may change in future
- assert (num_of_tconts == 1)
# Get alloc id and gemport id using resource manager
alloc_id = resource_mgr.get_resource_id(intf_id,
'ALLOC_ID',
- num_of_tconts)
+ 1)
gem_ports = resource_mgr.get_resource_id(intf_id,
'GEMPORT_ID',
- self.num_of_gem_ports)
+ tech_profile.num_gem_ports)
gemport_list = list()
if isinstance(gem_ports, int):
@@ -547,7 +605,7 @@
self.upstream_gem_port_attribute_list = list()
self.downstream_gem_port_attribute_list = list()
- for i in range(self.num_of_gem_ports):
+ for i in range(tech_profile.num_gem_ports):
self.upstream_gem_port_attribute_list.append(
TechProfileInstance.IGemPortAttribute(
gemport_list[i],
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index d694938..5ca6bd2 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -386,7 +386,6 @@
self.log.debug('pon-add-gemport', gem_port=gem_port)
def _do_tech_profile_configuration(self, uni_id, tp):
- num_of_tconts = tp['num_of_tconts']
us_scheduler = tp['us_scheduler']
alloc_id = us_scheduler['alloc_id']
self._create_tconts(uni_id, us_scheduler)
diff --git a/voltha/adapters/openolt/openolt_data_model.py b/voltha/adapters/openolt/openolt_data_model.py
index e232a69..3191555 100644
--- a/voltha/adapters/openolt/openolt_data_model.py
+++ b/voltha/adapters/openolt/openolt_data_model.py
@@ -68,6 +68,12 @@
# Hash map GemPortId -> OnuInfo
self._onu_gemport_ids = {}
+ # Flow manager
+ self.flow_mgr = None
+
+ # packet in gem port
+ self.packet_in_gem_port = dict()
+
def reconcile(self):
assert self.logical_device_id is not None
self.adapter_agent.reconcile_logical_device(
@@ -205,10 +211,8 @@
self._onu_ids[OnuId(intf_id=intf_id, onu_id=onu_id)] = onu_info
self._onu_serial_numbers[serial_number] = onu_info
- def onu_delete(self, serial_number):
- onu_device = self.adapter_agent.get_child_device(
- self.device.id,
- serial_number=serial_number)
+ def onu_delete(self, flow_mgr, onu_device):
+ self.flow_mgr = flow_mgr
try:
self.adapter_agent.delete_child_device(self.device.id,
onu_device.id, onu_device)
@@ -219,16 +223,21 @@
self.__delete_logical_port(onu_device)
except Exception as e:
self.log.error('logical_port delete error', error=e)
+
try:
- self.delete_port(onu_device.serial_number)
+ ports = self.adapter_agent.get_ports(self.device.id, Port.ETHERNET_UNI)
+ for port in ports:
+ for peer in port.peers:
+ if peer.device_id == onu_device.id:
+ self.adapter_agent.delete_port(self.device.id, port)
except Exception as e:
self.log.error('port delete error', error=e)
-
+
# Delete onu info from cache
- onu_info = self._onu_ids[serial_number]
+ onu_info = self._onu_serial_numbers[onu_device.serial_number]
del self._onu_ids[OnuId(intf_id=onu_info.intf_id,
onu_id=onu_info.onu_id)]
- del self._onu_serial_numbers[serial_number]
+ del self._onu_serial_numbers[onu_device.serial_number]
def onu_oper_down(self, intf_id, onu_id):
@@ -317,6 +326,11 @@
self.adapter_agent.publish_inter_adapter_message(
onu_device.id, msg)
+ def meter_band(self, meter_id):
+ return self.adapter_agent.get_meter_band(
+ self.logical_device_id, meter_id
+ )
+
def onu_omci_rx(self, intf_id, onu_id, pkt):
onu_device = self.adapter_agent.get_child_device(
self.device.id,
@@ -328,15 +342,19 @@
def onu_send_packet_in(self, intf_type, intf_id, port_no, gemport_id, pkt):
if intf_type == "pon":
+ onu_id = self.onu_id(intf_id=intf_id, gemport_id=gemport_id)
+ logical_port_num = None
if port_no:
logical_port_num = port_no
else:
# Get logical_port_num from cache
- onu_id = self.onu_id(intf_id=intf_id, gemport_id=gemport_id)
uni_id = 0 # FIXME - multi-uni support
logical_port_num = self.platform.mk_uni_port_num(intf_id,
onu_id,
uni_id)
+ # Store the gem port through which the packet_in came. Use the same
+ # gem port for packet_out.
+ self.packet_in_gem_port[(intf_id, onu_id, logical_port_num)] = gemport_id
elif intf_type == "nni":
logical_port_num = self.platform.intf_id_to_port_no(
intf_id,
@@ -393,6 +411,24 @@
gem = GemPortId(intf_id=intf_id, gemport_id=gemport_id)
self._onu_gemport_ids[gem] = onu_info
+ def get_ofp_port_name(self, intf_id, onu_id, uni_id):
+ parent_port_no = self.platform.intf_id_to_port_no(intf_id,
+ Port.PON_OLT)
+ child_device = self.adapter_agent.get_child_device(
+ self.device.id, parent_port_no=parent_port_no, onu_id=onu_id)
+ if child_device is None:
+ self.log.error("could-not-find-child-device",
+ parent_port_no=intf_id, onu_id=onu_id)
+ return (None, None)
+ ports = self.adapter_agent.get_ports(child_device.id,
+ Port.ETHERNET_UNI)
+ logical_port = self.adapter_agent.get_logical_port(
+ self.logical_device_id, ports[uni_id].label)
+ ofp_port_name = (logical_port.ofp_port.name,
+ logical_port.ofp_port.port_no)
+ return ofp_port_name
+
+
# #######################################################################
# Methods used by Alarm and Statistics Manager (TODO - re-visit)
# #######################################################################
@@ -404,8 +440,8 @@
return self.device.id
def _resolve_onu_id(self, onu_id, port_intf_id):
+ onu_device = None
try:
- onu_device = None
onu_device = self.adapter_agent.get_child_device(
self.device_id,
parent_port_no=self.platform.intf_id_to_port_no(
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 81231d2..9024322 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -167,6 +167,66 @@
def post_down(self, event):
self.log.debug('post_down')
+ # We land here on loosing connection with OLT.
+ # In such case, reset all data on the KV store for all the associated UNIs,
+ # and NNI port. As a result, when the connection is regained with the OLT,
+ # the whole configuration is replayed again.
+
+ # Clear the KV store data associated with the all the UNI ports
+ # This clears up flow data and also resource map data for various
+ # other pon resources like alloc_id and gemport_id
+ child_devices = self.data_model.adapter_agent.get_child_devices(self.device_id)
+ for child_device in child_devices:
+ ports = self.data_model.adapter_agent.get_ports(child_device.id, Port.ETHERNET_UNI)
+ for port in ports:
+ if port.type == Port.ETHERNET_UNI:
+ self.log.debug("clearing-flows-for-onu-uni", child_device_id=child_device.id)
+ port_no = port.port_no
+ uni_id = self.platform.uni_id_from_port_num(port_no)
+ pon_intf_id = child_device.proxy_address.channel_id
+ onu_id = child_device.proxy_address.onu_id
+
+ try:
+ self.log.debug("clearing-tp-instance-for-onu",
+ serial_number=child_device.serial_number, onu_id=onu_id,
+ uni_id=uni_id, intf_id=pon_intf_id)
+ self.flow_mgr.delete_tech_profile_instance(
+ pon_intf_id, onu_id, uni_id,
+ child_device.serial_number)
+ except Exception as e:
+ self.log.exception("error-removing-tp-instance")
+
+ try:
+ pon_intf_id_onu_id = (pon_intf_id, onu_id, uni_id)
+ # Free any PON resources that were reserved for the ONU
+ self.resource_mgr.free_pon_resources_for_onu(pon_intf_id_onu_id)
+ # Free tech_profile id for ONU
+ self.resource_mgr.remove_tech_profile_id_for_onu(pon_intf_id, onu_id, uni_id)
+ # Free meter_ids for the ONU
+ self.resource_mgr.remove_meter_id_for_onu("upstream",
+ pon_intf_id, onu_id, uni_id)
+ self.resource_mgr.remove_meter_id_for_onu("downstream",
+ pon_intf_id, onu_id, uni_id)
+ self.log.debug('cleared-resource', pon_intf_id_onu_id=pon_intf_id_onu_id)
+ except Exception as e:
+ self.log.exception("error-removing-pon-resources-for-onu")
+
+ # Clear the flows from KV store associated with NNI port.
+ # There are mostly trap rules from NNI port (like LLDP)
+ ports = self.data_model.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
+ for port in ports:
+ self.log.debug('clear-flows-for-nni-in-olt-device', port=port)
+ if port.type == Port.ETHERNET_NNI:
+ nni_intf_id = self.platform.intf_id_from_nni_port_num(port.port_no)
+ flow_ids = self.resource_mgr.get_current_flow_ids(nni_intf_id,
+ -1, -1)
+ # Clear the flows on KV store
+ if flow_ids is not None and isinstance(flow_ids, list):
+ for flow_id in flow_ids:
+ self.resource_mgr.free_flow_id(nni_intf_id, -1, -1,
+ flow_id)
+ self.log.debug('cleared-flows', nni_intf_id=nni_intf_id)
+
self.flow_mgr.reset_flows()
def indications_thread(self):
@@ -393,10 +453,17 @@
port_no=egress_port,
packet=str(payload).encode("HEX"))
+ intf_id = self.platform.intf_id_from_uni_port_num(egress_port)
+ onu_id = self.platform.onu_id_from_port_num(egress_port)
+ port_no = egress_port
+ gemport_key = (intf_id, onu_id, port_no)
+ assert (gemport_key in self.data_model.packet_in_gem_port)
+ gemport_id = self.data_model.packet_in_gem_port[gemport_key]
onu_pkt = openolt_pb2.OnuPacket(
- intf_id=self.platform.intf_id_from_uni_port_num(egress_port),
- onu_id=self.platform.onu_id_from_port_num(egress_port),
- port_no=egress_port,
+ intf_id=intf_id,
+ onu_id=onu_id,
+ port_no=port_no,
+ gemport_id=gemport_id,
pkt=send_pkt)
self.stub.OnuPacketOut(onu_pkt)
@@ -501,42 +568,45 @@
# FIXME - instead of passing child_device around, delete_child_device
# needs to change to use serial_number.
def delete_child_device(self, child_device):
- self.log.debug('sending-deactivate-onu',
+ serial_number = OpenoltUtils.destringify_serial_number(
+ child_device.serial_number)
+ pon_intf_id = child_device.proxy_address.channel_id
+ onu_id = child_device.proxy_address.onu_id
+ self.log.debug('delete-device',
onu_device=child_device,
- onu_serial_number=child_device.serial_number)
+ onu_serial_number=serial_number,
+ device_id=child_device.id)
- self.data_model.onu_delete(child_device.serial_number)
+ try:
+ self.data_model.onu_delete(self.flow_mgr, child_device)
+ onu = openolt_pb2.Onu(
+ intf_id=pon_intf_id,
+ onu_id=onu_id,
+ serial_number=serial_number)
+ self.stub.DeleteOnu(onu)
+ except Exception as e:
+ self.log.exception("error-deleting-the-onu-on-olt-device", error=e)
# TODO FIXME - For each uni.
# TODO FIXME - Flows are not deleted
uni_id = 0 # FIXME
try:
self.flow_mgr.delete_tech_profile_instance(
- child_device.proxy_address.channel_id,
+ pon_intf_id,
child_device.proxy_address.onu_id,
- uni_id, None)
+ uni_id, child_device.serial_number)
except Exception as e:
self.log.exception("error-removing-tp-instance")
try:
- pon_intf_id_onu_id = (child_device.proxy_address.channel_id,
- child_device.proxy_address.onu_id,
+ pon_intf_id_onu_id = (pon_intf_id,
+ onu_id,
uni_id)
# Free any PON resources that were reserved for the ONU
self.resource_mgr.free_pon_resources_for_onu(pon_intf_id_onu_id)
except Exception as e:
self.log.exception("error-removing-pon-resources-for-onu")
- serial_number = OpenoltUtils.destringify_serial_number(
- child_device.serial_number)
- try:
- onu = openolt_pb2.Onu(
- intf_id=child_device.proxy_address.channel_id,
- onu_id=child_device.proxy_address.onu_id,
- serial_number=serial_number)
- self.stub.DeleteOnu(onu)
- except Exception as e:
- self.log.exception("error-deleting-the-onu-on-olt-device", error=e)
def reboot(self):
self.log.debug('rebooting openolt device')
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 1150d9a..cca6767 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -18,6 +18,7 @@
import grpc
from google.protobuf.json_format import MessageToDict
import hashlib
+import ast
from simplejson import dumps
from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
@@ -26,15 +27,17 @@
from voltha.protos.device_pb2 import Port
import voltha.core.flow_decomposer as fd
from voltha.adapters.openolt.protos import openolt_pb2
+from voltha.protos import tech_profile_pb2
from voltha.registry import registry
-
-from common.tech_profile.tech_profile import DEFAULT_TECH_PROFILE_TABLE_ID
+from common.tech_profile.tech_profile import Direction, TechProfile
# Flow categories
HSIA_FLOW = "HSIA_FLOW"
EAP_ETH_TYPE = 0x888e
LLDP_ETH_TYPE = 0x88cc
+IPV4_ETH_TYPE = 0x800
+IPv6_ETH_TYPE = 0x86dd
IGMP_PROTO = 2
@@ -119,12 +122,17 @@
self.log.debug('add flow', flow=flow)
classifier_info = dict()
action_info = dict()
+ us_meter_id = None
+ ds_meter_id = None
for field in fd.get_ofb_fields(flow):
if field.type == fd.ETH_TYPE:
classifier_info[ETH_TYPE] = field.eth_type
self.log.debug('field-type-eth-type',
eth_type=classifier_info[ETH_TYPE])
+ if classifier_info[ETH_TYPE] == IPv6_ETH_TYPE:
+ self.log.debug('Not handling IPv6 flows')
+ return
elif field.type == fd.IP_PROTO:
classifier_info[IP_PROTO] = field.ip_proto
self.log.debug('field-type-ip-proto',
@@ -157,10 +165,6 @@
classifier_info[IPV4_SRC] = field.ipv4_src
self.log.debug('field-type-ipv4-src',
ipv4_dst=classifier_info[IPV4_SRC])
- elif field.type == fd.METADATA:
- classifier_info[METADATA] = field.table_metadata
- self.log.debug('field-type-metadata',
- metadata=classifier_info[METADATA])
else:
raise NotImplementedError('field.type={}'.format(
field.type))
@@ -198,6 +202,10 @@
self.log.debug('set-field-type-vlan-vid',
vlan_vid=_field.vlan_vid & 0xfff)
action_info[VLAN_VID] = (_field.vlan_vid & 0xfff)
+ elif _field.type == fd.VLAN_PCP:
+ self.log.debug('set-field-type-vlan-pcp',
+ vlan_pcp=_field.vlan_pcp & 0x7)
+ action_info[VLAN_PCP] = (_field.vlan_pcp & 0x7)
else:
self.log.error('unsupported-action-set-field-type',
field_type=_field.type)
@@ -211,9 +219,11 @@
self.log.debug('being taken care of by ONU', flow=flow)
return
- if OUTPUT not in action_info and METADATA in classifier_info:
+ flow_metadata = fd.get_metadata_from_write_metadata(flow)
+
+ if OUTPUT not in action_info and flow_metadata is not None:
# find flow in the next table
- next_flow = self.find_next_flow(flow)
+ next_flow = self.find_next_flow(flow, flow_metadata)
if next_flow is None:
return
action_info[OUTPUT] = fd.get_out_port(next_flow)
@@ -228,19 +238,71 @@
= self.platform.extract_access_from_flow(
classifier_info[IN_PORT], action_info[OUTPUT])
+ # LLDP flow has nothing to do with any particular subscriber.
+ # So, lets not care about the Tech-profile, meters etc.
+ # Just add the flow and return.
+ if ETH_TYPE in classifier_info and \
+ classifier_info[ETH_TYPE] == LLDP_ETH_TYPE:
+ self.log.debug('lldp flow add')
+ self.add_lldp_flow(flow, port_no)
+ return
+
+ if ETH_TYPE in classifier_info and \
+ classifier_info[ETH_TYPE] == IPV4_ETH_TYPE and \
+ IP_PROTO in classifier_info and \
+ classifier_info[IP_PROTO] == 2:
+ self.log.debug('igmp flow add ignored, not implemented yet')
+ return
+
+ if IP_PROTO in classifier_info and \
+ classifier_info[IP_PROTO] == 17 and \
+ UDP_SRC in classifier_info and \
+ classifier_info[UDP_SRC] == 67:
+ self.log.debug('trap-dhcp-from-nni-flow')
+ self.add_dhcp_trap_nni(flow, classifier_info, port_no,
+ network_intf_id=0)
+ return
+
+ # Metadata 8 bytes:
+ # Most Significant 2 Bytes = Inner VLAN
+ # Next 2 Bytes = Tech Profile ID(TPID)
+ # Least Significant 4 Bytes = Port ID
+ # Flow METADATA carries Tech-Profile (TP) ID and is mandatory in all
+ # subscriber related flows.
+ # Note: If we are here, assert that the flow_metadata is not None
+ assert flow_metadata is not None
+
+ # Retrieve the TP-ID if one exists for the subscriber already
+ tp_id = self.resource_mgr.get_tech_profile_id_for_onu(intf_id, onu_id, uni_id)
+
+ if tp_id is not None:
+ # Assert that the tp_id received in flow metadata is same is the tp_id in use
+ # TODO:
+ # For now, tp_id updates, require that we tear down the service and
+ # and re-provision the service, i.e., dynamic TP updates not supported.
+
+ assert tp_id == fd.get_tp_id_from_metadata(flow_metadata), \
+ "tp-updates-not-supported"
+ else:
+ tp_id = fd.get_tp_id_from_metadata(flow_metadata)
+ self.log.info("received-tp-id-from-flow", tp_id=tp_id)
+
+ if self.platform.is_upstream(action_info[OUTPUT]):
+ us_meter_id = fd.get_meter_id_from_flow(flow)
+ else:
+ ds_meter_id = fd.get_meter_id_from_flow(flow)
+
self.divide_and_add_flow(intf_id, onu_id, uni_id, port_no,
- classifier_info, action_info, flow)
+ classifier_info, action_info, flow, tp_id, us_meter_id, ds_meter_id)
def _clear_flow_id_from_rm(self, flow, flow_id, flow_direction):
try:
pon_intf, onu_id, uni_id \
= self.platform.flow_extract_info(flow, flow_direction)
except ValueError:
- self.log.error("failure extracting pon_intf, onu_id, uni_id info \
- from flow")
+ self.log.error("failure extracting pon_intf, onu_id, uni_id info from flow")
else:
- flows = self.resource_mgr.get_flow_id_info(pon_intf, onu_id,
- uni_id, flow_id)
+ flows = self.resource_mgr.get_flow_id_info(pon_intf, onu_id, uni_id, flow_id)
assert (isinstance(flows, list))
self.log.debug("retrieved-flows", flows=flows)
for idx in range(len(flows)):
@@ -256,6 +318,14 @@
return
self.resource_mgr.free_flow_id(pon_intf, onu_id, uni_id, flow_id)
+ flow_list = self.resource_mgr.get_current_flow_ids(pon_intf, onu_id, uni_id)
+ if flow_list is None:
+ tp_id = self.resource_mgr.get_tech_profile_id_for_onu(pon_intf, onu_id, uni_id)
+ tp_instance = self.get_tech_profile_instance(pon_intf, onu_id, uni_id, tp_id)
+ self.log.info("all-flows-cleared-for-onu")
+ self.log.info("initiate-sched-queue-teardown")
+ self.remove_us_scheduler_queues(pon_intf, onu_id, uni_id, tp_instance)
+ self.remove_ds_scheduler_queues(pon_intf, onu_id, uni_id, tp_instance)
def retry_add_flow(self, flow):
self.log.debug("retry-add-flow")
@@ -304,7 +374,7 @@
flow_ids_removed=flows_ids_to_remove,
number_of_flows_removed=(len(device_flows) - len(
new_flows)), expected_flows_removed=len(
- device_flows_to_remove))
+ device_flows_to_remove))
else:
self.log.debug('no device flow to remove for this flow (normal '
'for multi table flows)', flow=flow)
@@ -315,7 +385,7 @@
ofp_port_name)
def delete_tech_profile_instance(self, intf_id, onu_id, uni_id,
- ofp_port_name):
+ ofp_port_name=None):
# Remove the TP instance associated with the ONU
if ofp_port_name is None:
ofp_port_name = self.data_model.serial_number(intf_id, onu_id)
@@ -327,193 +397,487 @@
return self.tech_profile[intf_id].delete_tech_profile_instance(tp_path)
def divide_and_add_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
- action, flow):
+ action, flow, tp_id, us_meter_id, ds_meter_id):
self.log.debug('sorting flow', intf_id=intf_id, onu_id=onu_id,
- uni_id=uni_id, port_no=port_no, classifier=classifier,
- action=action)
+ uni_id=uni_id, port_no=port_no,
+ classifier=classifier, action=action,
+ tp_id=tp_id, us_meter=us_meter_id,
+ ds_meter=ds_meter_id)
- alloc_id, gem_ports = self.create_tcont_gemport(intf_id, onu_id,
- uni_id, flow.table_id)
- if alloc_id is None or gem_ports is None:
- self.log.error("alloc-id-gem-ports-unavailable", alloc_id=alloc_id,
- gem_ports=gem_ports)
+ tp_instance = self.get_tech_profile_instance(intf_id, onu_id, uni_id, tp_id)
+ if tp_instance is None:
+ self.log.error("flow-not-added--tp-instance-unavailable")
return
+ pon_intf_onu_id = (intf_id, onu_id, uni_id)
+ alloc_id = \
+ self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_onu_id)
+ gem_ports = \
+ self.resource_mgr.get_current_gemport_ids_for_onu(pon_intf_onu_id)
+
+ if alloc_id is None or gem_ports is None:
+ self.log.error("alloc-id-or-gem-ports-unavailable",
+ alloc_id=alloc_id, gem_ports=gem_ports)
+ return
+
+ self.create_us_scheduler_queues(intf_id, onu_id, uni_id, tp_instance, us_meter_id)
+ self.create_ds_scheduler_queues(intf_id, onu_id, uni_id, tp_instance, ds_meter_id)
+
self.log.debug('Generated required alloc and gemport ids',
alloc_id=alloc_id, gemports=gem_ports)
- # Flows can't be added specific to gemport unless p-bits are received.
- # Hence adding flows for all gemports
- for gemport_id in gem_ports:
- if IP_PROTO in classifier:
- if classifier[IP_PROTO] == 17:
- self.log.debug('dhcp flow add')
- self.add_dhcp_trap(intf_id, onu_id, uni_id, port_no,
- classifier, action, flow, alloc_id,
- gemport_id)
- elif classifier[IP_PROTO] == 2:
- self.log.warn('igmp flow add ignored, not implemented yet')
+ ds_gem_port_attr_list = tp_instance.downstream_gem_port_attribute_list
+ us_gem_port_attr_list = tp_instance.upstream_gem_port_attribute_list
+ kwargs = dict()
+ kwargs['intf_id'] = intf_id
+ kwargs['onu_id'] = onu_id
+ kwargs['uni_id'] = uni_id
+ kwargs['port_no'] = port_no
+ kwargs['classifier'] = classifier
+ kwargs['action'] = action
+ kwargs['logical_flow'] = flow
+ kwargs['alloc_id'] = alloc_id
+
+ if IP_PROTO in classifier:
+ if classifier[IP_PROTO] == 17:
+ self.log.debug('dhcp flow add')
+ if VLAN_PCP in classifier:
+ gemport_id = self._get_gem_port_for_pcp(
+ classifier[VLAN_PCP], us_gem_port_attr_list
+ )
+ self.add_dhcp_trap_uni(intf_id, onu_id, uni_id, port_no,
+ classifier, action, flow, alloc_id,
+ gemport_id)
else:
- self.log.warn("Invalid-Classifier-to-handle",
- classifier=classifier,
- action=action)
- elif ETH_TYPE in classifier:
- if classifier[ETH_TYPE] == EAP_ETH_TYPE:
- self.log.debug('eapol flow add')
- self.add_eapol_flow(intf_id, onu_id, uni_id, port_no,
- flow, alloc_id, gemport_id)
- vlan_id = self.get_subscriber_vlan(fd.get_in_port(flow))
- if vlan_id is not None:
- self.add_eapol_flow(intf_id, onu_id, uni_id, port_no,
- flow, alloc_id, gemport_id,
- vlan_id=vlan_id)
- ofp_port_name = self.data_model.serial_number(intf_id,
- onu_id)
- tp_id = self.resource_mgr.get_tech_profile_id_for_onu(
- intf_id, onu_id, uni_id)
- tp_path = self.get_tp_path(intf_id, ofp_port_name, tp_id)
+ self._install_flow_on_all_gemports(self.add_dhcp_trap_uni,
+ kwargs,
+ us_gem_port_attr_list)
- self.log.debug('Load-tech-profile-request-to-brcm-handler',
- tp_path=tp_path)
- self.data_model.onu_download_tech_profile(
- intf_id, onu_id, uni_id, tp_path)
-
- if classifier[ETH_TYPE] == LLDP_ETH_TYPE:
- self.log.debug('lldp flow add')
- nni_intf_id = self.data_model.olt_nni_intf_id()
- self.add_lldp_flow(flow, port_no, nni_intf_id)
-
- elif PUSH_VLAN in action:
- self.add_upstream_data_flow(intf_id, onu_id, uni_id, port_no,
- classifier, action, flow, alloc_id,
- gemport_id)
- elif POP_VLAN in action:
- self.add_downstream_data_flow(intf_id, onu_id, uni_id, port_no,
- classifier, action, flow,
- alloc_id, gemport_id)
+ elif classifier[IP_PROTO] == 2:
+ self.log.warn('igmp flow add ignored, not implemented yet')
else:
- self.log.debug('Invalid-flow-type-to-handle',
- classifier=classifier,
- action=action, flow=flow)
+ self.log.warn("Invalid-Classifier-to-handle",
+ classifier=classifier,
+ action=action)
+ elif ETH_TYPE in classifier:
+ if classifier[ETH_TYPE] == EAP_ETH_TYPE:
+ self.log.debug('eapol flow add')
+ vlan_id = classifier[VLAN_VID]
- def create_tcont_gemport(self, intf_id, onu_id, uni_id, table_id):
- alloc_id, gem_port_ids = None, None
- pon_intf_onu_id = (intf_id, onu_id)
+ if vlan_id is None:
+ vlan_id = DEFAULT_MGMT_VLAN
- # If we already have allocated alloc_id and gem_ports earlier,
- # render them
- alloc_id = \
- self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_onu_id)
- gem_port_ids = \
- self.resource_mgr.get_current_gemport_ids_for_onu(pon_intf_onu_id)
- if alloc_id is not None and gem_port_ids is not None:
- return alloc_id, gem_port_ids
+ if VLAN_PCP in classifier:
+ gemport_id = self._get_gem_port_for_pcp(
+ classifier[VLAN_PCP], us_gem_port_attr_list
+ )
+ self.add_eapol_flow(
+ intf_id, onu_id, uni_id, port_no, flow, alloc_id, gemport_id,
+ vlan_id=vlan_id)
+ else:
+ kwargs['vlan_id'] = vlan_id
+ self._install_flow_on_all_gemports(self.add_eapol_flow,
+ kwargs,
+ us_gem_port_attr_list)
+ (ofp_port_name, ofp_port_no) = \
+ self.data_model.get_ofp_port_name(intf_id, onu_id, uni_id)
+ if ofp_port_name is None:
+ self.log.error("port-name-not-found")
+ return
+
+ tp_id = self.resource_mgr.get_tech_profile_id_for_onu(intf_id, onu_id, uni_id)
+ tp_path = self.get_tp_path(intf_id, ofp_port_name, tp_id)
+
+ self.log.debug('Load-tech-profile-request-to-brcm-handler',
+ tp_path=tp_path)
+ self.data_model.onu_download_tech_profile(
+ intf_id, onu_id, uni_id, tp_path)
+ elif PUSH_VLAN in action:
+ if VLAN_PCP in classifier:
+ gemport_id = self._get_gem_port_for_pcp(
+ classifier[VLAN_PCP], us_gem_port_attr_list
+ )
+ self.add_upstream_data_flow(intf_id, onu_id, uni_id, port_no, classifier,
+ action, flow, alloc_id, gemport_id)
+ else:
+ self._install_flow_on_all_gemports(self.add_upstream_data_flow,
+ kwargs, us_gem_port_attr_list
+ )
+ elif POP_VLAN in action:
+ if VLAN_PCP in classifier:
+ gemport_id = self._get_gem_port_for_pcp(
+ classifier[VLAN_PCP], us_gem_port_attr_list
+ )
+ self.add_downstream_data_flow(intf_id, onu_id, uni_id, port_no, classifier,
+ action, flow, alloc_id, gemport_id)
+ else:
+ self._install_flow_on_all_gemports(self.add_downstream_data_flow,
+ kwargs, ds_gem_port_attr_list
+ )
+ else:
+ self.log.debug('Invalid-flow-type-to-handle',
+ classifier=classifier,
+ action=action, flow=flow)
+
+ def get_scheduler(self, tech_profile_instance, direction, meter_id):
+ if direction == Direction.UPSTREAM:
+ scheduler = tech_profile_instance.us_scheduler
+ elif direction == Direction.DOWNSTREAM:
+ scheduler = tech_profile_instance.ds_scheduler
+ else:
+ raise Exception("invalid-direction")
+
+ meter_band = self.data_model.meter_band(meter_id)
+
+ traffic_shaping_info = None
+
+ if meter_band is not None:
+ cir = meter_band.bands[0].rate
+ cbs = meter_band.bands[0].burst_size
+ eir = meter_band.bands[1].rate
+ ebs = meter_band.bands[1].burst_size
+ pir = cir + eir
+ pbs = cbs + ebs
+
+ traffic_shaping_info = tech_profile_pb2.TrafficShapingInfo(
+ cir=cir,
+ cbs=cbs,
+ pir=pir,
+ pbs=pbs
+ )
+
+ scheduler_config = tech_profile_pb2.SchedulerConfig(
+ direction=TechProfile.get_parameter(
+ 'direction', scheduler.direction),
+ additional_bw=TechProfile.get_parameter(
+ 'additional_bw', scheduler.additional_bw),
+ priority=scheduler.priority,
+ weight=scheduler.weight,
+ sched_policy=TechProfile.get_parameter(
+ 'q_sched_policy', scheduler.q_sched_policy)
+ )
+
+ traffic_scheduler = tech_profile_pb2.TrafficScheduler(
+ direction=scheduler.direction,
+ scheduler=scheduler_config,
+ alloc_id=scheduler.alloc_id,
+ traffic_shaping_info=traffic_shaping_info
+ )
+
+ return traffic_scheduler
+
+ @staticmethod
+ def get_traffic_queues(tech_profile_instance, direction):
+ if direction == Direction.UPSTREAM:
+ gemport_attribute_list = tech_profile_instance. \
+ upstream_gem_port_attribute_list
+ tp_scheduler_direction = tech_profile_instance.us_scheduler.direction
+ elif direction == Direction.DOWNSTREAM:
+ gemport_attribute_list = tech_profile_instance. \
+ downstream_gem_port_attribute_list
+ tp_scheduler_direction = tech_profile_instance.ds_scheduler.direction
+ else:
+ raise Exception("invalid-direction")
+ traffic_queues = list()
+ for i in range(len(gemport_attribute_list)):
+ traffic_queues.append(tech_profile_pb2.TrafficQueue(
+ direction=TechProfile.get_parameter('direction',
+ tp_scheduler_direction),
+ gemport_id=gemport_attribute_list[i].gemport_id,
+ pbit_map=gemport_attribute_list[i].pbit_map,
+ aes_encryption=ast.literal_eval(gemport_attribute_list[i].
+ aes_encryption),
+ sched_policy=TechProfile.get_parameter(
+ 'sched_policy', gemport_attribute_list[i].
+ scheduling_policy),
+ priority=gemport_attribute_list[i].priority_q,
+ weight=gemport_attribute_list[i].weight,
+ discard_policy=TechProfile.get_parameter(
+ 'discard_policy', gemport_attribute_list[i].
+ discard_policy)))
+
+ return traffic_queues
+
+ def create_us_scheduler_queues(self, intf_id, onu_id, uni_id, tp_instance, us_meter_id):
+ if us_meter_id is None:
+ self.log.debug("us-meter-unavailable--no-action")
+ return
+
+ kv_store_meter_id = self.resource_mgr.get_meter_id_for_onu(UPSTREAM,
+ intf_id,
+ onu_id, uni_id)
+
+ # Lets make a simple assumption that if the meter-id is present on the KV store,
+ # then the scheduler and queues configuration is applied on the OLT device
+ # in the given direction.
+ if kv_store_meter_id is not None:
+ # TODO: Dynamic meter update not supported for now
+ # TODO: The subscriber has to be un-provisioned and re-provisioned for meter update
+ assert kv_store_meter_id == us_meter_id
+ self.log.debug("scheduler-already-created-in-us")
+ return
+
+ traffic_sched = self.get_scheduler(tp_instance, Direction.UPSTREAM, us_meter_id)
try:
- ofp_port_name = self.data_model.serial_number(intf_id, onu_id)
ofp_port_no = self.platform.mk_uni_port_num(intf_id,
onu_id, uni_id)
- # FIXME: If table id is <= 63 using 64 as table id
- if table_id < DEFAULT_TECH_PROFILE_TABLE_ID:
- table_id = DEFAULT_TECH_PROFILE_TABLE_ID
-
- # Check tech profile instance already exists for derived port name
- tech_profile_instance = self.tech_profile[intf_id]. \
- get_tech_profile_instance(table_id, ofp_port_name)
- self.log.debug('Get-tech-profile-instance-status',
- tech_profile_instance=tech_profile_instance)
-
- if tech_profile_instance is None:
- # create tech profile instance
- tech_profile_instance = self.tech_profile[intf_id]. \
- create_tech_profile_instance(table_id, ofp_port_name,
- intf_id)
- if tech_profile_instance is None:
- raise Exception('Tech-profile-instance-creation-failed')
+ self.stub.CreateTrafficSchedulers(
+ tech_profile_pb2.TrafficSchedulers(
+ intf_id=intf_id,
+ onu_id=onu_id,
+ uni_id=uni_id,
+ port_no=ofp_port_no,
+ traffic_scheds=[traffic_sched]
+ ))
+ except grpc.RpcError as grpc_e:
+ if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
+ self.log.warn("us-scheduler-already-exists")
else:
- self.log.debug(
- 'Tech-profile-instance-already-exist-for-given port-name',
- ofp_port_name=ofp_port_name)
+ self.log.error("failure-to-create-us-scheduler")
+ return
- # upstream scheduler
- us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
- tech_profile_instance)
- # downstream scheduler
- ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
- tech_profile_instance)
- # create Tcont
- tconts = self.tech_profile[intf_id].get_tconts(
- tech_profile_instance, us_scheduler, ds_scheduler)
+ # On receiving the CreateTrafficQueues request, the driver should create corresponding
+ # downstream queues.
+ try:
+ self.stub.CreateTrafficQueues(
+ tech_profile_pb2.TrafficQueues(
+ intf_id=intf_id,
+ onu_id=onu_id,
+ uni_id=uni_id,
+ port_no=ofp_port_no,
+ traffic_queues=
+ OpenOltFlowMgr.get_traffic_queues(tp_instance, Direction.UPSTREAM)
+ ))
+ except grpc.RpcError as grpc_e:
+ if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
+ self.log.warn("ds-queues-already-exists")
+ else:
+ self.log.error("failure-to-create-ds-queues")
+ return
- self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
- onu_id=onu_id,
- uni_id=uni_id,
- port_no=ofp_port_no,
- tconts=tconts))
+ # After we succesfully applied the scheduler configuration on the OLT device,
+ # store the meter id on the KV store, for further reference
+ self.resource_mgr.update_meter_id_for_onu(UPSTREAM, intf_id, onu_id, uni_id, us_meter_id)
+
+ def create_ds_scheduler_queues(self, intf_id, onu_id, uni_id, tp_instance, ds_meter_id):
+ if ds_meter_id is None:
+ self.log.debug("ds-meter-unavailable--no-action")
+ return
+
+ kv_store_meter_id = self.resource_mgr.get_meter_id_for_onu(DOWNSTREAM,
+ intf_id,
+ onu_id, uni_id)
+ # Lets make a simple assumption that if the meter-id is present on the KV store,
+ # then the scheduler and queues configuration is applied on the OLT device
+ if kv_store_meter_id is not None:
+ # TODO: Dynamic meter update not supported for now
+ # TODO: The subscriber has to be un-provisioned and re-provisioned for meter update
+ assert kv_store_meter_id == ds_meter_id
+ self.log.debug("scheduler-already-created-in-ds")
+ return
+
+ traffic_sched = self.get_scheduler(tp_instance, Direction.DOWNSTREAM, ds_meter_id)
+ _, ofp_port_no = self.data_model.get_ofp_port_name(intf_id, onu_id, uni_id)
+ try:
+ self.stub.CreateTrafficSchedulers(
+ tech_profile_pb2.TrafficSchedulers(
+ intf_id=intf_id,
+ onu_id=onu_id,
+ uni_id=uni_id,
+ port_no=ofp_port_no,
+ traffic_scheds=[traffic_sched]
+ ))
+ except grpc.RpcError as grpc_e:
+ if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
+ self.log.warn("ds-scheduler-already-exists")
+ else:
+ self.log.error("failure-to-create-ds-scheduler")
+ return
+
+ # On receiving the CreateTrafficQueues request, the driver should create corresponding
+ # downstream queues.
+ try:
+ self.stub.CreateTrafficQueues(
+ tech_profile_pb2.TrafficQueues(
+ intf_id=intf_id,
+ onu_id=onu_id,
+ uni_id=uni_id,
+ port_no=ofp_port_no,
+ traffic_queues=
+ OpenOltFlowMgr.get_traffic_queues(tp_instance, Direction.DOWNSTREAM)
+ ))
+ except grpc.RpcError as grpc_e:
+ if grpc_e.code() == grpc.StatusCode.ALREADY_EXISTS:
+ self.log.warn("ds-queues-already-exists")
+ else:
+ self.log.error("failure-to-create-ds-queues")
+ return
+
+ # After we successfully applied the scheduler configuration on the OLT device,
+ # store the meter id on the KV store, for further reference
+ self.resource_mgr.update_meter_id_for_onu(DOWNSTREAM, intf_id, onu_id, uni_id, ds_meter_id)
+
+ def remove_us_scheduler_queues(self, intf_id, onu_id, uni_id, tp_instance):
+ us_meter_id = self.resource_mgr.get_meter_id_for_onu(UPSTREAM,
+ intf_id,
+ onu_id, uni_id)
+ traffic_sched = self.get_scheduler(tp_instance, Direction.UPSTREAM, us_meter_id)
+ _, ofp_port_no = self.data_model.get_ofp_port_name(intf_id, onu_id, uni_id)
+
+ try:
+ self.stub.RemoveTrafficQueues(
+ tech_profile_pb2.TrafficQueues(
+ intf_id=intf_id,
+ onu_id=onu_id,
+ uni_id=uni_id,
+ port_no=ofp_port_no,
+ traffic_queues=
+ OpenOltFlowMgr.get_traffic_queues(tp_instance, Direction.UPSTREAM)
+ ))
+ self.log.debug("removed-upstream-Queues")
+ except grpc.RpcError as e:
+ self.log.error("failure-to-remove-us-queues", e=e)
+
+ try:
+ self.stub.RemoveTrafficSchedulers(
+ tech_profile_pb2.TrafficSchedulers(
+ intf_id=intf_id,
+ onu_id=onu_id,
+ uni_id=uni_id,
+ port_no=ofp_port_no,
+ traffic_scheds=[traffic_sched]
+ ))
+ self.log.debug("removed-upstream-Schedulers")
+ except grpc.RpcError as e:
+ self.log.error("failure-to-remove-us-scheduler", e=e)
+
+ self.resource_mgr.remove_meter_id_for_onu(UPSTREAM, intf_id, onu_id, uni_id)
+
+ def remove_ds_scheduler_queues(self, intf_id, onu_id, uni_id, tp_instance):
+ ds_meter_id = self.resource_mgr.get_meter_id_for_onu(DOWNSTREAM,
+ intf_id,
+ onu_id, uni_id)
+
+ traffic_sched = self.get_scheduler(tp_instance, Direction.DOWNSTREAM, ds_meter_id)
+ _, ofp_port_no = self.data_model.get_ofp_port_name(intf_id, onu_id, uni_id)
+
+ try:
+ self.stub.RemoveTrafficQueues(
+ tech_profile_pb2.TrafficQueues(
+ intf_id=intf_id,
+ onu_id=onu_id,
+ uni_id=uni_id,
+ port_no=ofp_port_no,
+ traffic_queues=
+ OpenOltFlowMgr.get_traffic_queues(tp_instance, Direction.DOWNSTREAM)
+ ))
+ self.log.debug("removed-downstream-Queues")
+ except grpc.RpcError as grpc_e:
+ self.log.error("failure-to-remove-ds-queues")
+
+ try:
+ self.stub.RemoveTrafficSchedulers(
+ tech_profile_pb2.TrafficSchedulers(
+ intf_id=intf_id,
+ onu_id=onu_id,
+ uni_id=uni_id,
+ port_no=ofp_port_no,
+ traffic_scheds=[traffic_sched]
+ ))
+ self.log.debug("removed-downstream-Schedulers")
+ except grpc.RpcError as grpc_e:
+ self.log.error("failure-to-remove-ds-scheduler")
+
+ self.resource_mgr.remove_meter_id_for_onu(DOWNSTREAM, intf_id, onu_id, uni_id)
+
+ def get_tech_profile_instance(self, intf_id, onu_id, uni_id, tp_id):
+ (ofp_port_name, ofp_port_no) \
+ = self.data_model.get_ofp_port_name(intf_id, onu_id, uni_id)
+ if ofp_port_name is None:
+ self.log.error("port-name-not-found")
+ return None
+
+ # Check tech profile instance already exists for derived port name
+ tech_profile_instance = self.tech_profile[intf_id]. \
+ get_tech_profile_instance(tp_id, ofp_port_name)
+
+ if tech_profile_instance is None:
+ # create tech profile instance
+ tech_profile_instance = self.tech_profile[intf_id]. \
+ create_tech_profile_instance(tp_id, ofp_port_name,
+ intf_id)
+ if tech_profile_instance is None:
+ raise Exception('Tech-profile-instance-creation-failed')
+
+ self.resource_mgr.update_tech_profile_id_for_onu(intf_id, onu_id,
+ uni_id, tp_id)
# Fetch alloc id and gemports from tech profile instance
alloc_id = tech_profile_instance.us_scheduler.alloc_id
gem_port_ids = []
+
for i in range(len(
tech_profile_instance.upstream_gem_port_attribute_list)):
gem_port_ids.append(
tech_profile_instance.upstream_gem_port_attribute_list[i].
- gemport_id)
- except Exception as e:
- self.log.exception(exception=e)
+ gemport_id)
- # Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV
- # store
- pon_intf_onu_id = (intf_id, onu_id, uni_id)
- self.resource_mgr.resource_mgrs[intf_id].update_alloc_ids_for_onu(
- pon_intf_onu_id,
- list([alloc_id])
- )
- self.resource_mgr.resource_mgrs[intf_id].update_gemport_ids_for_onu(
- pon_intf_onu_id,
- gem_port_ids
- )
+ # Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV
+ # store
+ pon_intf_onu_id = (intf_id, onu_id, uni_id)
+ self.resource_mgr.resource_mgrs[intf_id].update_alloc_ids_for_onu(
+ pon_intf_onu_id,
+ list([alloc_id])
+ )
+ self.resource_mgr.resource_mgrs[intf_id].update_gemport_ids_for_onu(
+ pon_intf_onu_id,
+ gem_port_ids
+ )
- self.resource_mgr.update_gemports_ponport_to_onu_map_on_kv_store(
- gem_port_ids, intf_id, onu_id, uni_id
- )
+ self.resource_mgr.update_gemports_ponport_to_onu_map_on_kv_store(
+ gem_port_ids, intf_id, onu_id, uni_id
+ )
- for gemport_id in gem_port_ids:
- self.data_model.gemport_id_add(intf_id, onu_id, gemport_id)
+ for gemport_id in gem_port_ids:
+ self.data_model.gemport_id_add(intf_id, onu_id, gemport_id)
+ else:
+ self.log.debug(
+ 'Tech-profile-instance-already-exist-for-given port-name',
+ ofp_port_name=ofp_port_name)
+ return tech_profile_instance
+
+ def get_alloc_id_gem_port(self, intf_id, onu_id):
+ pon_intf_onu_id = (intf_id, onu_id)
+ # If we already have allocated alloc_id and gem_ports earlier, render them
+ alloc_id = \
+ self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_onu_id)
+ gem_port_ids = \
+ self.resource_mgr.get_current_gemport_ids_for_onu(pon_intf_onu_id)
return alloc_id, gem_port_ids
- def add_upstream_data_flow(self, intf_id, onu_id, uni_id, port_no,
- uplink_classifier, uplink_action, logical_flow,
- alloc_id, gemport_id):
+ def add_upstream_data_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
+ action, logical_flow, alloc_id, gemport_id):
- uplink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
+ classifier[PACKET_TAG_TYPE] = SINGLE_TAG
- self.add_hsia_flow(intf_id, onu_id, uni_id, port_no, uplink_classifier,
- uplink_action, UPSTREAM,
+ self.add_hsia_flow(intf_id, onu_id, uni_id, port_no, classifier,
+ action, UPSTREAM,
logical_flow, alloc_id, gemport_id)
- # Secondary EAP on the subscriber vlan
- (eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id,
- uni_id)
- if eap_active:
- self.add_eapol_flow(intf_id, onu_id, uni_id, port_no,
- eap_logical_flow, alloc_id, gemport_id,
- vlan_id=uplink_classifier[VLAN_VID])
-
- def add_downstream_data_flow(self, intf_id, onu_id, uni_id, port_no,
- downlink_classifier, downlink_action, flow,
- alloc_id, gemport_id):
- downlink_classifier[PACKET_TAG_TYPE] = DOUBLE_TAG
+ def add_downstream_data_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
+ action, logical_flow, alloc_id, gemport_id):
+ classifier[PACKET_TAG_TYPE] = DOUBLE_TAG
# Needed ???? It should be already there
- downlink_action[POP_VLAN] = True
- downlink_action[VLAN_VID] = downlink_classifier[VLAN_VID]
+ action[POP_VLAN] = True
+ action[VLAN_VID] = classifier[VLAN_VID]
- self.add_hsia_flow(intf_id, onu_id, uni_id, port_no,
- downlink_classifier, downlink_action, DOWNSTREAM,
- flow, alloc_id, gemport_id)
+ self.add_hsia_flow(intf_id, onu_id, uni_id, port_no, classifier,
+ action, DOWNSTREAM,
+ logical_flow, alloc_id, gemport_id)
def add_hsia_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
action, direction, logical_flow, alloc_id, gemport_id):
@@ -526,16 +890,16 @@
flow_store_cookie):
self.log.debug('flow-exists--not-re-adding')
else:
-
# One of the OLT platform (Broadcom BAL) requires that symmetric
# flows require the same flow_id to be used across UL and DL.
# Since HSIA flow is the only symmetric flow currently, we need to
# re-use the flow_id across both direction. The 'flow_category'
# takes priority over flow_cookie to find any available HSIA_FLOW
# id for the ONU.
+
flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id,
- flow_store_cookie,
- HSIA_FLOW)
+ flow_category=HSIA_FLOW,
+ flow_pcp=classifier[VLAN_PCP])
if flow_id is None:
self.log.error("hsia-flow-unavailable")
return
@@ -557,8 +921,8 @@
flow.onu_id, flow.uni_id,
flow.flow_id, flow_info)
- def add_dhcp_trap(self, intf_id, onu_id, uni_id, port_no, classifier,
- action, logical_flow, alloc_id, gemport_id):
+ def add_dhcp_trap_uni(self, intf_id, onu_id, uni_id, port_no, classifier,
+ action, logical_flow, alloc_id, gemport_id):
self.log.debug('add dhcp upstream trap', classifier=classifier,
intf_id=intf_id, onu_id=onu_id, uni_id=uni_id,
@@ -573,15 +937,16 @@
flow_store_cookie = self._get_flow_store_cookie(classifier,
gemport_id)
+
if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id,
uni_id,
flow_store_cookie):
self.log.debug('flow-exists--not-re-adding')
else:
flow_id = self.resource_mgr.get_flow_id(
- intf_id, onu_id, uni_id, flow_store_cookie
+ intf_id, onu_id, uni_id,
+ flow_store_cookie=flow_store_cookie,
)
-
dhcp_flow = openolt_pb2.Flow(
onu_id=onu_id, uni_id=uni_id, flow_id=flow_id,
flow_type=UPSTREAM, access_intf_id=intf_id,
@@ -603,19 +968,20 @@
flow_info)
def add_eapol_flow(self, intf_id, onu_id, uni_id, port_no, logical_flow,
- alloc_id, gemport_id, vlan_id=DEFAULT_MGMT_VLAN):
+ alloc_id, gemport_id, vlan_id=DEFAULT_MGMT_VLAN, classifier=None, action=None):
uplink_classifier = dict()
uplink_classifier[ETH_TYPE] = EAP_ETH_TYPE
uplink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
uplink_classifier[VLAN_VID] = vlan_id
+ if classifier is not None:
+ uplink_classifier[VLAN_PCP] = classifier[VLAN_PCP]
uplink_action = dict()
uplink_action[TRAP_TO_HOST] = True
flow_store_cookie = self._get_flow_store_cookie(uplink_classifier,
gemport_id)
-
if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id,
uni_id,
flow_store_cookie):
@@ -623,7 +989,8 @@
else:
# Add Upstream EAPOL Flow.
uplink_flow_id = self.resource_mgr.get_flow_id(
- intf_id, onu_id, uni_id, flow_store_cookie
+ intf_id, onu_id, uni_id,
+ flow_store_cookie=flow_store_cookie
)
upstream_flow = openolt_pb2.Flow(
@@ -651,74 +1018,74 @@
upstream_flow.flow_id,
flow_info)
- if vlan_id == DEFAULT_MGMT_VLAN:
- # Add Downstream EAPOL Flow, Only for first EAP flow (BAL
- # requirement)
- # On one of the platforms (Broadcom BAL), when same DL classifier
- # vlan was used across multiple ONUs, eapol flow re-adds after
- # flow delete (cases of onu reboot/disable) fails.
- # In order to generate unique vlan, a combination of intf_id
- # onu_id and uni_id is used.
- # uni_id defaults to 0, so add 1 to it.
- special_vlan_downstream_flow = 4090 - intf_id * onu_id * (uni_id+1)
- # Assert that we do not generate invalid vlans under no condition
- assert special_vlan_downstream_flow >= 2
+ # Add Downstream EAPOL Flow, Only for first EAP flow (BAL
+ # requirement)
+ # On one of the platforms (Broadcom BAL), when same DL classifier
+ # vlan was used across multiple ONUs, eapol flow re-adds after
+ # flow delete (cases of onu reboot/disable) fails.
+ # In order to generate unique vlan, a combination of intf_id
+ # onu_id and uni_id is used.
+ # uni_id defaults to 0, so add 1 to it.
+ special_vlan_downstream_flow = 4090 - intf_id * onu_id * (uni_id + 1)
+ # Assert that we do not generate invalid vlans under no condition
+ assert special_vlan_downstream_flow >= 2
- downlink_classifier = dict()
- downlink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
- downlink_classifier[VLAN_VID] = special_vlan_downstream_flow
+ downlink_classifier = dict()
+ downlink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
+ downlink_classifier[ETH_TYPE] = EAP_ETH_TYPE
+ downlink_classifier[VLAN_VID] = special_vlan_downstream_flow
- downlink_action = dict()
- downlink_action[PUSH_VLAN] = True
- downlink_action[VLAN_VID] = vlan_id
+ downlink_action = dict()
+ downlink_action[PUSH_VLAN] = True
+ downlink_action[VLAN_VID] = vlan_id
- flow_store_cookie = self._get_flow_store_cookie(
- downlink_classifier, gemport_id)
- if self.resource_mgr.is_flow_cookie_on_kv_store(
- intf_id, onu_id, uni_id, flow_store_cookie):
- self.log.debug('flow-exists--not-re-adding')
- else:
+ flow_store_cookie = self._get_flow_store_cookie(
+ downlink_classifier, gemport_id)
+ if self.resource_mgr.is_flow_cookie_on_kv_store(
+ intf_id, onu_id, uni_id, flow_store_cookie):
+ self.log.debug('flow-exists--not-re-adding')
+ else:
+ downlink_flow_id = self.resource_mgr.get_flow_id(
+ intf_id, onu_id, uni_id,
+ flow_store_cookie=flow_store_cookie
+ )
- downlink_flow_id = self.resource_mgr.get_flow_id(
- intf_id, onu_id, uni_id, flow_store_cookie
- )
+ downstream_flow = openolt_pb2.Flow(
+ access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id,
+ flow_id=downlink_flow_id, flow_type=DOWNSTREAM,
+ alloc_id=alloc_id,
+ network_intf_id=self.data_model.olt_nni_intf_id(),
+ gemport_id=gemport_id,
+ classifier=self.mk_classifier(downlink_classifier),
+ action=self.mk_action(downlink_action),
+ priority=logical_flow.priority,
+ port_no=port_no,
+ cookie=logical_flow.cookie)
- downstream_flow = openolt_pb2.Flow(
- access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id,
- flow_id=downlink_flow_id, flow_type=DOWNSTREAM,
- alloc_id=alloc_id,
- network_intf_id=self.data_model.olt_nni_intf_id(),
- gemport_id=gemport_id,
- classifier=self.mk_classifier(downlink_classifier),
- action=self.mk_action(downlink_action),
- priority=logical_flow.priority,
- port_no=port_no,
- cookie=logical_flow.cookie)
+ downstream_logical_flow = ofp_flow_stats(
+ id=logical_flow.id, cookie=logical_flow.cookie,
+ table_id=logical_flow.table_id,
+ priority=logical_flow.priority, flags=logical_flow.flags)
- downstream_logical_flow = ofp_flow_stats(
- id=logical_flow.id, cookie=logical_flow.cookie,
- table_id=logical_flow.table_id,
- priority=logical_flow.priority, flags=logical_flow.flags)
+ downstream_logical_flow.match.oxm_fields.extend(
+ fd.mk_oxm_fields(
+ [fd.in_port(fd.get_out_port(logical_flow)),
+ fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
+ downstream_logical_flow.match.type = OFPMT_OXM
- downstream_logical_flow.match.oxm_fields.extend(
- fd.mk_oxm_fields(
- [fd.in_port(fd.get_out_port(logical_flow)),
- fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
- downstream_logical_flow.match.type = OFPMT_OXM
+ downstream_logical_flow.instructions.extend(
+ fd.mk_instructions_from_actions([fd.output(
+ self.platform.mk_uni_port_num(intf_id, onu_id,
+ uni_id))]))
- downstream_logical_flow.instructions.extend(
- fd.mk_instructions_from_actions([fd.output(
- self.platform.mk_uni_port_num(intf_id, onu_id,
- uni_id))]))
-
- if self.add_flow_to_device(downstream_flow,
- downstream_logical_flow):
- flow_info = self._get_flow_info_as_json_blob(
- downstream_flow, flow_store_cookie)
- self.update_flow_info_to_kv_store(
- downstream_flow.access_intf_id, downstream_flow.onu_id,
- downstream_flow.uni_id, downstream_flow.flow_id,
- flow_info)
+ if self.add_flow_to_device(downstream_flow,
+ downstream_logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(
+ downstream_flow, flow_store_cookie)
+ self.update_flow_info_to_kv_store(
+ downstream_flow.access_intf_id, downstream_flow.onu_id,
+ downstream_flow.uni_id, downstream_flow.flow_id,
+ flow_info)
def repush_all_different_flows(self):
# Check if the device is supposed to have flows, if so add them
@@ -736,8 +1103,59 @@
def reset_flows(self):
self.flows_proxy.update('/', Flows())
- """ Add a downstream LLDP trap flow on the NNI interface
+ """ Add a downstream DHCP trap flow on the NNI interface
"""
+ def add_dhcp_trap_nni(self, logical_flow, classifier,
+ port_no, network_intf_id=0):
+ self.log.info("trap-dhcp-of-nni-flow")
+ classifier[PACKET_TAG_TYPE] = DOUBLE_TAG
+ action = dict()
+ action[TRAP_TO_HOST] = True
+
+ # We manage flow_id resource pool on per PON port basis.
+ # Since this situation is tricky, as a hack, we pass the NNI port
+ # index (network_intf_id) as PON port Index for the flow_id resource
+ # pool. Also, there is no ONU Id available for trapping LLDP packets
+ # on NNI port, use onu_id as -1 (invalid)
+ # ****************** CAVEAT *******************
+ # This logic works if the NNI Port Id falls within the same valid
+ # range of PON Port Ids. If this doesn't work for some OLT Vendor
+ # we need to have a re-look at this.
+ # *********************************************
+ onu_id = -1
+ uni_id = -1
+ flow_store_cookie = self._get_flow_store_cookie(classifier)
+
+ if self.resource_mgr.is_flow_cookie_on_kv_store(
+ network_intf_id, onu_id, uni_id, flow_store_cookie):
+ self.log.debug('flow-exists--not-re-adding')
+ else:
+ flow_id = self.resource_mgr.get_flow_id(
+ network_intf_id, onu_id, uni_id,
+ flow_store_cookie=flow_store_cookie)
+
+ downstream_flow = openolt_pb2.Flow(
+ access_intf_id=-1, # access_intf_id not required
+ onu_id=onu_id, # onu_id not required
+ uni_id=uni_id, # uni_id not used
+ flow_id=flow_id,
+ flow_type=DOWNSTREAM,
+ network_intf_id=network_intf_id,
+ gemport_id=-1, # gemport_id not required
+ classifier=self.mk_classifier(classifier),
+ action=self.mk_action(action),
+ priority=logical_flow.priority,
+ port_no=port_no,
+ cookie=logical_flow.cookie)
+
+ self.log.debug('add dhcp downstream trap', classifier=classifier,
+ action=action, flow=downstream_flow,
+ port_no=port_no)
+ if self.add_flow_to_device(downstream_flow, logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(downstream_flow,
+ flow_store_cookie)
+ self.update_flow_info_to_kv_store(
+ network_intf_id, onu_id, uni_id, flow_id, flow_info)
def add_lldp_flow(self, logical_flow, port_no, network_intf_id=0):
@@ -767,7 +1185,7 @@
self.log.debug('flow-exists--not-re-adding')
else:
flow_id = self.resource_mgr.get_flow_id(
- network_intf_id, onu_id, uni_id, flow_store_cookie)
+ network_intf_id, onu_id, uni_id, flow_store_cookie=flow_store_cookie)
downstream_flow = openolt_pb2.Flow(
access_intf_id=-1, # access_intf_id not required
@@ -792,7 +1210,8 @@
self.update_flow_info_to_kv_store(
network_intf_id, onu_id, uni_id, flow_id, flow_info)
- def mk_classifier(self, classifier_info):
+ @staticmethod
+ def mk_classifier(classifier_info):
classifier = openolt_pb2.Classifier()
@@ -837,6 +1256,8 @@
elif PUSH_VLAN in action_info:
action.o_vid = action_info[VLAN_VID]
action.cmd.add_outer_tag = True
+ if VLAN_PCP in action_info:
+ action.o_pbits = action_info[VLAN_PCP]
elif TRAP_TO_HOST in action_info:
action.cmd.trap_to_host = True
else:
@@ -928,18 +1349,15 @@
flows.items.extend([stored_flow])
self.flows_proxy.update('/', flows)
- def find_next_flow(self, flow):
+ def find_next_flow(self, flow, metadata):
table_id = fd.get_goto_table_id(flow)
- metadata = 0
# Prior to ONOS 1.13.5, Metadata contained the UNI output port number.
# In 1.13.5 and later, the lower 32-bits is the output port number and
# the # upper 32-bits is the inner-vid we are looking for. Use just the
# lower 32 # bits. Allows this code to work with pre- and post-1.13.5
# ONOS OltPipeline
- for field in fd.get_ofb_fields(flow):
- if field.type == fd.METADATA:
- metadata = field.table_metadata & 0xFFFFFFFF
+ port = metadata & 0xFFFFFFFF
if table_id is None:
return None
flows = self.logical_flows_proxy.get('/').items
@@ -948,7 +1366,7 @@
if f.table_id == table_id:
# FIXME
if fd.get_in_port(f) == fd.get_in_port(flow) and \
- fd.get_out_port(f) == metadata:
+ fd.get_out_port(f) == port:
next_flows.append(f)
if len(next_flows) == 0:
@@ -983,11 +1401,6 @@
onu_id = child_device.proxy_address.onu_id
uni_id = self.platform.uni_id_from_port_num(port_no)
- # TODO: The DEFAULT_TECH_PROFILE_ID is assumed. Right way to do,
- # is probably to maintain a list of Tech-profile table IDs associated
- # with the UNI logical_port. This way, when the logical port is
- # deleted, all the associated tech-profile configuration with the UNI
- # logical_port can be cleared.
tp_id = self.resource_mgr.get_tech_profile_id_for_onu(pon_port, onu_id,
uni_id)
tech_profile_instance = self.tech_profile[pon_port]. \
@@ -997,37 +1410,26 @@
flow_ids = self.resource_mgr.get_current_flow_ids(pon_port, onu_id,
uni_id)
self.log.debug("outstanding-flows-to-be-cleared", flow_ids=flow_ids)
- for flow_id in flow_ids:
- flow_infos = self.resource_mgr.get_flow_id_info(pon_port, onu_id,
- uni_id, flow_id)
- for flow_info in flow_infos:
- direction = flow_info['flow_type']
- flow_to_remove = openolt_pb2.Flow(flow_id=flow_id,
- flow_type=direction)
- try:
- self.stub.FlowRemove(flow_to_remove)
- except grpc.RpcError as grpc_e:
- if grpc_e.code() == grpc.StatusCode.NOT_FOUND:
- self.log.debug('This flow does not exist on switch, '
- 'normal after an OLT reboot',
- flow=flow_to_remove)
- else:
- raise grpc_e
+ if flow_ids:
+ for flow_id in flow_ids:
+ flow_infos = self.resource_mgr.get_flow_id_info(pon_port, onu_id,
+ uni_id, flow_id)
+ for flow_info in flow_infos:
+ direction = flow_info['flow_type']
+ flow_to_remove = openolt_pb2.Flow(flow_id=flow_id,
+ flow_type=direction)
+ try:
+ self.stub.FlowRemove(flow_to_remove)
+ except grpc.RpcError as grpc_e:
+ if grpc_e.code() == grpc.StatusCode.NOT_FOUND:
+ self.log.debug('This flow does not exist on switch, '
+ 'normal after an OLT reboot',
+ flow=flow_to_remove)
+ else:
+ raise grpc_e
- self.resource_mgr.free_flow_id(pon_port, onu_id, uni_id,
- flow_id)
-
- try:
- tconts = self.tech_profile[pon_port].get_tconts(
- tech_profile_instance)
- self.stub.RemoveTconts(openolt_pb2.Tconts(intf_id=pon_port,
- onu_id=onu_id,
- uni_id=uni_id,
- port_no=port_no,
- tconts=tconts))
- except grpc.RpcError as grpc_e:
- self.log.error('error-removing-tcont-scheduler-queues',
- err=grpc_e)
+ self.remove_us_scheduler_queues(pon_port, onu_id, uni_id, tech_profile_instance)
+ self.remove_ds_scheduler_queues(pon_port, onu_id, uni_id, tech_profile_instance)
def generate_stored_id(self, flow_id, direction):
if direction == UPSTREAM:
@@ -1055,7 +1457,7 @@
# Make sure we have as many tech_profiles as there are pon ports on
# the device
assert len(self.tech_profile) \
- == self.resource_mgr.device_info.pon_ports
+ == self.resource_mgr.device_info.pon_ports
def _get_flow_info_as_json_blob(self, flow, flow_store_cookie,
flow_category=None):
@@ -1096,3 +1498,35 @@
else:
to_hash = dumps(classifier, sort_keys=True)
return hashlib.md5(to_hash).hexdigest()[:12]
+
+ @staticmethod
+ def _get_gem_port_for_pcp(pcp, get_gem_port_for_pcp):
+ """
+ Return gem_port id corresponding to a given pcp bit
+
+ :param pcp: Represents the p_bit
+ :param get_gem_port_for_pcp: Represents a list of gemport_attributes (DS or US)
+ :return: Gemport ID servicing the given pcp if found, else None
+ """
+ for gem_port_attr in get_gem_port_for_pcp:
+ # The pbit_map appears as "0b00011010" in the Tech-Profile instance.
+ # The initial '0b' has to be stripped.
+ # The remaining string is reversed, then enumerated and matched against pcp index.
+ for i, p in enumerate(reversed(gem_port_attr.pbit_map[2:])):
+ if i == pcp and p == '1':
+ return gem_port_attr.gemport_id
+ return None
+
+ @staticmethod
+ def _install_flow_on_all_gemports(func, kwargs, gem_attr_list):
+ for gem_attr in gem_attr_list:
+ # The pbit_map appears as "0b00011010" in the Tech-Profile instance.
+ # The initial '0b' has to be stripped.
+ # The remaining string is reversed, then enumerated and matched against pbit 1.
+ for i, p in enumerate(reversed(gem_attr.pbit_map[2:])):
+ if p == '1':
+ kwargs['classifier'][VLAN_PCP] = i
+ # Add the gemport corresponding to this PCP
+ kwargs['gemport_id'] = gem_attr.gemport_id
+ func(**kwargs)
+
diff --git a/voltha/adapters/openolt/openolt_platform.py b/voltha/adapters/openolt/openolt_platform.py
index 1291ce9..dd26b39 100644
--- a/voltha/adapters/openolt/openolt_platform.py
+++ b/voltha/adapters/openolt/openolt_platform.py
@@ -171,6 +171,9 @@
uni_port_no = action.output.port
break
+ if uni_port_no is None:
+ uni_port_no = fd.get_metadata_from_write_metadata(flow) & 0xFFFFFFFF
+
if uni_port_no is None:
raise ValueError
diff --git a/voltha/adapters/openolt/openolt_resource_manager.py b/voltha/adapters/openolt/openolt_resource_manager.py
index bda822c..8fd4eb9 100644
--- a/voltha/adapters/openolt/openolt_resource_manager.py
+++ b/voltha/adapters/openolt/openolt_resource_manager.py
@@ -28,7 +28,8 @@
class OpenOltResourceMgr(object):
BASE_PATH_KV_STORE = "service/voltha/openolt/{}" # service/voltha/openolt/<device_id>
- TP_ID_PATH_SUFFIX = 'tp_id/{}'
+ TP_ID_PATH_SUFFIX = 'tp_id/{}' # tp_id/<(pon_id, onu_id, uni_id)>
+ METER_ID_PATH_SUFFIX = 'meter_id/{}/{}' # meter_id/<(pon_id, onu_id, uni_id)>/<direction>
def __init__(self, device_id, host_and_port, extra_args, device_info):
self.log = structlog.get_logger(id=device_id,
@@ -124,7 +125,7 @@
@property
def max_uni_id_per_onu(self):
- return 0 #OpenOltPlatform.MAX_UNIS_PER_ONU-1, zero-based indexing Uncomment or override to make default multi-uni
+ return 0 # OpenOltPlatform.MAX_UNIS_PER_ONU-1, zero-based indexing Uncomment or override to make default multi-uni
def assert_uni_id_limit(self, pon_intf_id, onu_id, uni_id):
self.assert_onu_id_limit(pon_intf_id, onu_id)
@@ -136,34 +137,57 @@
return onu_id
- def get_flow_id(self, intf_id, onu_id, uni_id, flow_store_cookie,
- flow_category=None):
- intf_onu_id = (intf_id, onu_id, uni_id)
+ def get_flow_id(self, pon_intf_id, onu_id, uni_id, **kwargs):
+ pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
+ flow_store_cookie = kwargs.pop('flow_cookie', None)
+ flow_category = kwargs.pop('flow_category', None)
+ flow_pcp = kwargs.pop('flow_pcp', None)
try:
- flow_ids = self.resource_mgrs[intf_id]. \
- get_current_flow_ids_for_onu(intf_onu_id)
+ flow_ids = self.resource_mgrs[pon_intf_id]. \
+ get_current_flow_ids_for_onu(pon_intf_onu_id)
if flow_ids is not None:
for flow_id in flow_ids:
- flows = self.get_flow_id_info(intf_id, onu_id, uni_id, flow_id)
- assert (isinstance(flows, list))
- for flow in flows:
-
- if flow_category is not None and \
- 'flow_category' in flow and \
- flow['flow_category'] == flow_category:
- return flow_id
- if flow['flow_store_cookie'] == flow_store_cookie:
- return flow_id
+ try:
+ flows = self.get_flow_id_info(
+ pon_intf_id, onu_id, uni_id, flow_id
+ )
+ assert (isinstance(flows, list))
+ for flow in flows:
+ # If a flow_cookie is provided, we need no other match
+ # criteria to find the relevant flow_id.
+ # Return the first matched flow for the given flow_store_cookie
+ if flow_store_cookie is not None and \
+ flow_store_cookie == flow['flow_store_cookie']:
+ return flow_id
+ # If flow_category is specified as match criteria, we need the
+ # the vlan pcp for the flow as well. This is because the given
+ # flow_category (for ex: HSIA) could cater to more than one vlan pcp.
+ # Each, flow matches uniquely matches one vlan pcp.
+ # So, to find the exact flow_id we need the vlan pcp too.
+ if flow_category is not None:
+ assert flow_pcp is not None
+ if 'flow_category' in flow and \
+ flow['flow_category'] == flow_category:
+ if 'o_pbits' in flow['classifier'] and \
+ flow['classifier']['o_pbits'] == flow_pcp:
+ return flow_id
+ elif flow_pcp == 0 and \
+ 'o_pbits' not in flow['classifier']:
+ return flow_id
+ except KeyError as e:
+ self.log.error("key-error-retrieving-flow-info",
+ e=e, flow_id=flow_id)
except Exception as e:
self.log.error("error-retrieving-flow-info", e=e)
- flow_id = self.resource_mgrs[intf_id].get_resource_id(
- intf_onu_id[0], PONResourceManager.FLOW_ID)
+ # We could not find any existing flow_id for the given match criteria.
+ # Generate a new flow id.
+ flow_id = self.resource_mgrs[pon_intf_id].get_resource_id(
+ pon_intf_onu_id[0], PONResourceManager.FLOW_ID)
if flow_id is not None:
- self.resource_mgrs[intf_id].update_flow_id_for_onu(
- intf_onu_id, flow_id
+ self.resource_mgrs[pon_intf_id].update_flow_id_for_onu(
+ pon_intf_onu_id, flow_id
)
-
return flow_id
def get_flow_id_info(self, intf_id, onu_id, uni_id, flow_id):
@@ -295,6 +319,7 @@
pon_intf_id = pon_intf_id_onu_id[0]
onu_id = pon_intf_id_onu_id[1]
+ uni_id = pon_intf_id_onu_id[2]
alloc_ids = \
self.resource_mgrs[pon_intf_id].get_current_alloc_ids_for_onu(pon_intf_id_onu_id)
self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
@@ -312,6 +337,9 @@
self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
PONResourceManager.FLOW_ID,
flow_ids)
+ if flow_ids:
+ for flow_id in flow_ids:
+ self.free_flow_id(pon_intf_id, onu_id, uni_id, flow_id)
self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
PONResourceManager.ONU_ID,
@@ -440,7 +468,7 @@
flow_id_shared_resource_mgr=global_resource_mgr)
# Make sure loaded range fits the platform bit encoding ranges
- resource_mgr.update_ranges(uni_id_start_idx=0, uni_id_end_idx=OpenOltPlatform.MAX_UNIS_PER_ONU-1)
+ resource_mgr.update_ranges(uni_id_start_idx=0, uni_id_end_idx=OpenOltPlatform.MAX_UNIS_PER_ONU - 1)
def is_flow_cookie_on_kv_store(self, intf_id, onu_id, uni_id, flow_store_cookie):
'''
@@ -463,6 +491,11 @@
return False
+ def update_tech_profile_id_for_onu(self, intf_id, onu_id, uni_id, tp_id):
+ intf_id_onu_id_uni_id = (intf_id, onu_id, uni_id)
+ kv_path = OpenOltResourceMgr.TP_ID_PATH_SUFFIX.format(str(intf_id_onu_id_uni_id))
+ self.kv_store[kv_path] = str(tp_id)
+
def get_tech_profile_id_for_onu(self, intf_id, onu_id, uni_id):
intf_id_onu_id_uni_id = (intf_id, onu_id, uni_id)
try:
@@ -470,4 +503,39 @@
return int(self.kv_store[kv_path])
except Exception as e:
self.log.warn("tp-id-not-found-on-kv-store", e=e)
- return DEFAULT_TECH_PROFILE_TABLE_ID
+ return None
+
+ def remove_tech_profile_id_for_onu(self, intf_id, onu_id, uni_id):
+ intf_id_onu_id_uni_id = (intf_id, onu_id, uni_id)
+ kv_path = OpenOltResourceMgr.TP_ID_PATH_SUFFIX.format(str(intf_id_onu_id_uni_id))
+ try:
+ del self.kv_store[kv_path]
+ except Exception as e:
+ self.log.error("error-deleting-tech-profile-id", e=e)
+
+ def update_meter_id_for_onu(self, direction, intf_id, onu_id, uni_id, meter_id):
+ intf_id_onu_id_uni_id = (intf_id, onu_id, uni_id)
+ kv_path = OpenOltResourceMgr.METER_ID_PATH_SUFFIX.format(str(intf_id_onu_id_uni_id),
+ direction)
+ self.kv_store[kv_path] = str(meter_id)
+ self.log.debug("updated-meter-id-for-onu", path=kv_path, meter_id=meter_id)
+
+ def get_meter_id_for_onu(self, direction, intf_id, onu_id, uni_id):
+ intf_id_onu_id_uni_id = (intf_id, onu_id, uni_id)
+ try:
+ kv_path = OpenOltResourceMgr.METER_ID_PATH_SUFFIX.format(str(intf_id_onu_id_uni_id),
+ direction)
+ return int(self.kv_store[kv_path])
+ except Exception as e:
+ self.log.debug("meter-id-not-found-on-kv-store", e=e)
+ return None
+
+ def remove_meter_id_for_onu(self, direction, intf_id, onu_id, uni_id):
+ intf_id_onu_id_uni_id = (intf_id, onu_id, uni_id)
+ try:
+ kv_path = OpenOltResourceMgr.METER_ID_PATH_SUFFIX.format(str(intf_id_onu_id_uni_id),
+ direction)
+ del self.kv_store[kv_path]
+ self.log.debug("removed-meter-id-for-onu", path=kv_path)
+ except Exception as e:
+ self.log.debug("error-removing-meter", e=e)
diff --git a/voltha/adapters/openolt/protos/Makefile b/voltha/adapters/openolt/protos/Makefile
index 62eacc8..49834a5 100644
--- a/voltha/adapters/openolt/protos/Makefile
+++ b/voltha/adapters/openolt/protos/Makefile
@@ -23,6 +23,7 @@
default: build
PROTO_FILES := $(wildcard *.proto) $(wildcard $(VOLTHA_BASE)/voltha/protos/third_party/google/api/*proto)
+PROTO_FILES := $(PROTO_FILES) $(wildcard *.proto) $(wildcard $(VOLTHA_BASE)/voltha/protos/tech_profile.proto)
PROTO_PB2_FILES := $(foreach f,$(PROTO_FILES),$(subst .proto,_pb2.py,$(f)))
PROTO_DESC_FILES := $(foreach f,$(PROTO_FILES),$(subst .proto,.desc,$(f)))
@@ -47,6 +48,7 @@
env LD_LIBRARY_PATH=$(PROTOC_LIBDIR) python -m grpc.tools.protoc \
-I. \
-I$(VOLTHA_BASE)/voltha/protos/third_party \
+ -I$(VOLTHA_BASE)/voltha/protos \
--python_out=. \
--grpc_python_out=. \
--descriptor_set_out=$(basename $<).desc \
diff --git a/voltha/adapters/openolt/protos/openolt.proto b/voltha/adapters/openolt/protos/openolt.proto
index ddc8f74..da54a80 100644
--- a/voltha/adapters/openolt/protos/openolt.proto
+++ b/voltha/adapters/openolt/protos/openolt.proto
@@ -15,6 +15,7 @@
syntax = "proto3";
package openolt;
import "google/api/annotations.proto";
+import "tech_profile.proto";
service Openolt {
@@ -130,16 +131,30 @@
};
}
- rpc CreateTconts(Tconts) returns (Empty) {
+ rpc CreateTrafficSchedulers(tech_profile.TrafficSchedulers) returns (Empty) {
option (google.api.http) = {
- post: "/v1/CreateTconts"
+ post: "/v1/CreateTrafficSchedulers"
body: "*"
};
}
- rpc RemoveTconts(Tconts) returns (Empty) {
+ rpc RemoveTrafficSchedulers(tech_profile.TrafficSchedulers) returns (Empty) {
option (google.api.http) = {
- post: "/v1/RemoveTconts"
+ post: "/v1/RemoveTrafficSchedulers"
+ body: "*"
+ };
+ }
+
+ rpc CreateTrafficQueues(tech_profile.TrafficQueues) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/CreateTrafficQueues"
+ body: "*"
+ };
+ }
+
+ rpc RemoveTrafficQueues(tech_profile.TrafficQueues) returns (Empty) {
+ option (google.api.http) = {
+ post: "/v1/RemoveTrafficQueues"
body: "*"
};
}
@@ -247,6 +262,7 @@
fixed32 intf_id = 1;
fixed32 onu_id = 2;
fixed32 port_no = 4;
+ fixed32 gemport_id = 5;
bytes pkt = 3;
}
@@ -464,93 +480,4 @@
fixed32 onu_id = 2;
}
-enum Direction {
- UPSTREAM = 0;
- DOWNSTREAM = 1;
- BIDIRECTIONAL = 2;
-}
-
-enum SchedulingPolicy {
- WRR = 0;
- StrictPriority = 1;
- Hybrid = 2;
-}
-
-enum AdditionalBW {
- AdditionalBW_None = 0;
- AdditionalBW_NA = 1;
- AdditionalBW_BestEffort = 2;
- AdditionalBW_Auto = 3;
-}
-
-enum DiscardPolicy {
- TailDrop = 0;
- WTailDrop = 1;
- Red = 2;
- WRed = 3;
-}
-
-enum InferredAdditionBWIndication {
- InferredAdditionBWIndication_None = 0;
- InferredAdditionBWIndication_Assured = 1;
- InferredAdditionBWIndication_BestEffort = 2;
-}
-
-message Scheduler {
- Direction direction = 1;
- AdditionalBW additional_bw = 2; // Valid on for “direction == Upstream”.
- fixed32 priority = 3;
- fixed32 weight = 4;
- SchedulingPolicy sched_policy = 5;
-}
-
-message TrafficShapingInfo {
- fixed32 cir = 1;
- fixed32 cbs = 2;
- fixed32 pir = 3;
- fixed32 pbs = 4;
- fixed32 gir = 5; // only if “direction == Upstream ”
- InferredAdditionBWIndication add_bw_ind = 6; // only if “direction == Upstream”
-}
-
-message Tcont {
- Direction direction = 1;
- fixed32 alloc_id = 2; // valid only if “direction == Upstream ”
- Scheduler scheduler = 3;
- TrafficShapingInfo traffic_shaping_info = 4;
-}
-
-message Tconts {
- fixed32 intf_id = 1;
- fixed32 onu_id = 2;
- fixed32 uni_id = 4;
- fixed32 port_no = 5;
- repeated Tcont tconts = 3;
-}
-
-message TailDropDiscardConfig {
- fixed32 queue_size = 1;
-}
-
-message RedDiscardConfig {
- fixed32 min_threshold = 1;
- fixed32 max_threshold = 2;
- fixed32 max_probability = 3;
-}
-
-message WRedDiscardConfig {
- RedDiscardConfig green = 1;
- RedDiscardConfig yellow = 2;
- RedDiscardConfig red = 3;
-}
-
-message DiscardConfig {
- DiscardPolicy discard_policy = 1;
- oneof discard_config {
- TailDropDiscardConfig tail_drop_discard_config = 2;
- RedDiscardConfig red_discard_config = 3;
- WRedDiscardConfig wred_discard_config = 4;
- }
-}
-
message Empty {}
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 43e0000..d028c05 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -549,6 +549,22 @@
return self.root_proxy.get('/logical_devices/{}/ports/{}'.format(
logical_device_id, port_id))
+ def get_meter_band(self, logical_device_id, meter_id):
+ meters = list(self.root_proxy.get('/logical_devices/{}/meters'.format(
+ logical_device_id)).items)
+ for meter in meters:
+ if meter.config.meter_id == meter_id:
+ '''
+ # Returns
+ message ofp_meter_config {
+ uint32 flags = 1;
+ uint32 meter_id = 2;
+ repeated ofp_meter_band_header bands = 3;
+ };
+ '''
+ return meter.config
+ return None
+
def _create_cluster_ids_from_dpid(self, dpid):
"""
Create a logical device id using a datapath id.
diff --git a/voltha/protos/tech_profile.proto b/voltha/protos/tech_profile.proto
new file mode 100644
index 0000000..1b98e2d
--- /dev/null
+++ b/voltha/protos/tech_profile.proto
@@ -0,0 +1,126 @@
+// Copyright (c) 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+package tech_profile;
+import "google/api/annotations.proto";
+
+enum Direction {
+ UPSTREAM = 0;
+ DOWNSTREAM = 1;
+ BIDIRECTIONAL = 2;
+}
+
+enum SchedulingPolicy {
+ WRR = 0;
+ StrictPriority = 1;
+ Hybrid = 2;
+}
+
+enum AdditionalBW {
+ AdditionalBW_None = 0;
+ AdditionalBW_NA = 1;
+ AdditionalBW_BestEffort = 2;
+ AdditionalBW_Auto = 3;
+}
+
+enum DiscardPolicy {
+ TailDrop = 0;
+ WTailDrop = 1;
+ Red = 2;
+ WRed = 3;
+}
+
+enum InferredAdditionBWIndication {
+ InferredAdditionBWIndication_None = 0;
+ InferredAdditionBWIndication_Assured = 1;
+ InferredAdditionBWIndication_BestEffort = 2;
+}
+
+message SchedulerConfig {
+ Direction direction = 1;
+ AdditionalBW additional_bw = 2; // Valid on for “direction == Upstream”.
+ fixed32 priority = 3;
+ fixed32 weight = 4;
+ SchedulingPolicy sched_policy = 5;
+}
+
+message TrafficShapingInfo {
+ fixed32 cir = 1;
+ fixed32 cbs = 2;
+ fixed32 pir = 3;
+ fixed32 pbs = 4;
+ fixed32 gir = 5; // only if “direction == Upstream ”
+ InferredAdditionBWIndication add_bw_ind = 6; // only if “direction == Upstream”
+}
+
+message TrafficScheduler {
+ Direction direction = 1;
+ fixed32 alloc_id = 2; // valid only if “direction == Upstream ”
+ SchedulerConfig scheduler = 3;
+ TrafficShapingInfo traffic_shaping_info = 4;
+}
+
+message TrafficSchedulers {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ fixed32 uni_id = 4;
+ fixed32 port_no = 5;
+ repeated TrafficScheduler traffic_scheds = 3;
+}
+
+message TailDropDiscardConfig {
+ fixed32 queue_size = 1;
+}
+
+message RedDiscardConfig {
+ fixed32 min_threshold = 1;
+ fixed32 max_threshold = 2;
+ fixed32 max_probability = 3;
+}
+
+message WRedDiscardConfig {
+ RedDiscardConfig green = 1;
+ RedDiscardConfig yellow = 2;
+ RedDiscardConfig red = 3;
+}
+
+message DiscardConfig {
+ DiscardPolicy discard_policy = 1;
+ oneof discard_config {
+ TailDropDiscardConfig tail_drop_discard_config = 2;
+ RedDiscardConfig red_discard_config = 3;
+ WRedDiscardConfig wred_discard_config = 4;
+ }
+}
+
+message TrafficQueue {
+ Direction direction = 1;
+ fixed32 gemport_id = 2;
+ string pbit_map = 3;
+ bool aes_encryption = 4;
+ SchedulingPolicy sched_policy = 5;
+ fixed32 priority = 6;
+ fixed32 weight = 7;
+ DiscardPolicy discard_policy = 8;
+ DiscardConfig discard_config = 9;
+}
+
+message TrafficQueues {
+ fixed32 intf_id = 1;
+ fixed32 onu_id = 2;
+ fixed32 uni_id = 4;
+ fixed32 port_no = 5;
+ repeated TrafficQueue traffic_queues = 6;
+}