VOL-736: Initial Commit. Not Completed yet.
We will use Existing architecture of Flow Decomposer passing down to both OLT Adapter and ONU Adapter. The Technology Profile Flow Table ID (range 64-255) and Meter Programming OpenFlow Messages MUST be passed unmodified down to the OLT and ONU Adapters.
Change-Id: Ib4d21d1cc011ac4ceab536946cb727859cb2b525
diff --git a/cli/utils.py b/cli/utils.py
index 7426f99..544c764 100644
--- a/cli/utils.py
+++ b/cli/utils.py
@@ -139,7 +139,7 @@
for instruction in flow['instructions']:
itype = instruction['type']
- if itype == 4:
+ if itype == 4 or itype == 3:
for action in instruction['actions']['actions']:
atype = action['type'][len('OFPAT_'):]
table.add_cell(i, *action_printers[atype](action))
diff --git a/ofagent/converter.py b/ofagent/converter.py
index c6386b3..fb7aae5 100644
--- a/ofagent/converter.py
+++ b/ofagent/converter.py
@@ -170,6 +170,10 @@
elif type == pb2.OFPIT_GOTO_TABLE:
return of13.instruction.goto_table(
table_id=inst['goto_table']['table_id'])
+ elif type == pb2.OFPIT_WRITE_ACTIONS:
+ return of13.instruction.write_actions(
+ actions=[make_loxi_action(a)
+ for a in inst['actions']['actions']])
else:
raise NotImplementedError('Instruction type %d' % type)
@@ -427,6 +431,13 @@
output=pb2.ofp_action_output(port=lo.port, max_len=lo.max_len))
+def loxi_write_actions_to_ofp_instruction(lo):
+ return pb2.ofp_instruction(
+ type=pb2.OFPIT_WRITE_ACTIONS,
+ actions=pb2.ofp_instruction_actions(
+ actions=[to_grpc(a) for a in lo.actions]))
+
+
def loxi_group_action_to_ofp_action(lo):
return pb2.ofp_action(
type=pb2.OFPAT_GROUP,
@@ -483,6 +494,7 @@
of13.instruction.apply_actions: loxi_apply_actions_to_ofp_instruction,
of13.instruction.clear_actions: loxi_clear_actions_to_ofp_instruction,
+ of13.instruction.write_actions: loxi_write_actions_to_ofp_instruction,
of13.instruction.goto_table: loxi_goto_table_to_ofp_instruction,
of13.action.output: loxi_output_action_to_ofp_action,
diff --git a/tests/utests/voltha/core/config/test_persistence.py b/tests/utests/voltha/core/config/test_persistence.py
index a9a871c..bbb634f 100644
--- a/tests/utests/voltha/core/config/test_persistence.py
+++ b/tests/utests/voltha/core/config/test_persistence.py
@@ -76,14 +76,14 @@
# check that content of kv_store looks ok
size1 = len(kv_store)
- self.assertEqual(size1, 14 + 3 * (n_adapters + n_logical_nodes))
+ self.assertEqual(size1, 16 + 3 * (n_adapters + n_logical_nodes))
# this should actually drop if we pune
node.prune_untagged()
pt('prunning')
size2 = len(kv_store)
- self.assertEqual(size2, 7 + 2 * (1 + 1 + n_adapters + n_logical_nodes) + 2)
+ self.assertEqual(size2, 9 + 2 * (1 + 1 + n_adapters + n_logical_nodes) + 2)
all_latest_data = node.get('/', deep=1)
pt('deep get')
diff --git a/tests/utests/voltha/core/test_flow_decomposer.py b/tests/utests/voltha/core/test_flow_decomposer.py
index 5e77621..43db993 100644
--- a/tests/utests/voltha/core/test_flow_decomposer.py
+++ b/tests/utests/voltha/core/test_flow_decomposer.py
@@ -646,6 +646,75 @@
]
))
+ def test_unicast_upstream_rule_including_meter_band_decomposition(self):
+ flow1 = mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(1),
+ vlan_vid(ofp.OFPVID_PRESENT | 0),
+ vlan_pcp(0)
+ ],
+ actions=[
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | 101)),
+ ],
+ next_table_id=1,
+ )
+ flow2 = mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(1),
+ vlan_vid(ofp.OFPVID_PRESENT | 101),
+ vlan_pcp(0)
+ ],
+ actions=[
+ push_vlan(0x8100),
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | 1000)),
+ set_field(vlan_pcp(0)),
+ output(0)
+ ],
+ next_table_id=64,
+ meters=[1, 2]
+ )
+ device_rules = self.decompose_rules([flow1, flow2], [])
+ onu1_flows, onu1_groups = device_rules['onu1']
+ olt_flows, olt_groups = device_rules['olt']
+ self.assertEqual(len(onu1_flows), 2)
+ self.assertEqual(len(onu1_groups), 0)
+ self.assertEqual(len(olt_flows), 1)
+ self.assertEqual(len(olt_groups), 0)
+ self.assertFlowsEqual(onu1_flows.values()[1], mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(2),
+ vlan_vid(ofp.OFPVID_PRESENT | 0),
+ vlan_pcp(0)
+ ],
+ actions=[
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | 101)),
+ output(1)
+ ]
+ ))
+
+ check_flow = mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(1),
+ vlan_vid(ofp.OFPVID_PRESENT | 101),
+ vlan_pcp(0)
+ ],
+ actions=[
+ push_vlan(0x8100),
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | 1000)),
+ set_field(vlan_pcp(0)),
+ output(2)
+ ],
+ table_id=64,
+ meters=[1, 2]
+ )
+
+ self.assertFlowsEqual(olt_flows.values()[0], check_flow)
+
+
def test_unicast_downstream_rule_decomposition(self):
flow1 = mk_flow_stat(
match_fields=[
diff --git a/voltha/core/flow_decomposer.py b/voltha/core/flow_decomposer.py
index 7595e51..faf3141 100644
--- a/voltha/core/flow_decomposer.py
+++ b/voltha/core/flow_decomposer.py
@@ -25,7 +25,7 @@
from voltha.protos import third_party
from voltha.protos import openflow_13_pb2 as ofp
-
+from common.tech_profile import tech_profile
_ = third_party
log = structlog.get_logger()
@@ -284,9 +284,12 @@
"""Extract list of ofp_action objects from flow spec object"""
assert isinstance(flow, ofp.ofp_flow_stats)
# we have the following hard assumptions for now
+ actions = []
for instruction in flow.instructions:
- if instruction.type == ofp.OFPIT_APPLY_ACTIONS:
- return instruction.actions.actions
+ if instruction.type == ofp.OFPIT_APPLY_ACTIONS or instruction.type == ofp.OFPIT_WRITE_ACTIONS:
+ actions.extend(instruction.actions.actions)
+ return actions
+
def get_ofb_fields(flow):
assert isinstance(flow, ofp.ofp_flow_stats)
@@ -381,11 +384,18 @@
return action.group.group_id
return None
+def get_meter_ids_from_flow(flow):
+ meter_ids = list()
+ for instruction in flow.instructions:
+ if instruction.type == ofp.OFPIT_METER:
+ meter_ids.append(instruction.meter.meter_id)
+ return meter_ids
+
def has_group(flow):
return get_group(flow) is not None
def mk_oxm_fields(match_fields):
- oxm_fields=[
+ oxm_fields = [
ofp.ofp_oxm_field(
oxm_class=ofp.OFPXMC_OPENFLOW_BASIC,
ofb_field=field
@@ -402,7 +412,7 @@
return [instruction]
def mk_simple_flow_mod(match_fields, actions, command=ofp.OFPFC_ADD,
- next_table_id=None, **kw):
+ next_table_id=None, meters=None, **kw):
"""
Convenience function to generare ofp_flow_mod message with OXM BASIC match
composed from the match_fields, and single APPLY_ACTIONS instruction with
@@ -419,6 +429,14 @@
actions=ofp.ofp_instruction_actions(actions=actions)
)
]
+
+ if meters is not None:
+ for meter_id in meters:
+ instructions.append(ofp.ofp_instruction(
+ type=ofp.OFPIT_METER,
+ meter=ofp.ofp_instruction_meter(meter_id=meter_id)
+ ))
+
if next_table_id is not None:
instructions.append(ofp.ofp_instruction(
type=ofp.OFPIT_GOTO_TABLE,
@@ -596,6 +614,67 @@
def is_upstream():
return not is_downstream()
+ def update_devices_rules(flow, curr_device_rules, meter_ids=None, table_id=None):
+ actions = [action.type for action in get_actions(flow)]
+ if len(actions) == 1 and OUTPUT in actions:
+ # Transparent ONU and OLT case (No-L2-Modification flow)
+ child_device_flow_lst, _ = curr_device_rules.setdefault(
+ ingress_hop.device.id, ([], []))
+ parent_device_flow_lst, _ = curr_device_rules.setdefault(
+ egress_hop.device.id, ([], []))
+
+ child_device_flow_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(ingress_hop.ingress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ output(ingress_hop.egress_port.port_no)
+ ]
+ ))
+
+ parent_device_flow_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(egress_hop.ingress_port.port_no),
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ output(egress_hop.egress_port.port_no)
+ ],
+ table_id=table_id,
+ meters=meter_ids
+ ))
+
+ else:
+ fl_lst, _ = curr_device_rules.setdefault(
+ egress_hop.device.id, ([], []))
+ fl_lst.append(mk_flow_stat(
+ priority=flow.priority,
+ cookie=flow.cookie,
+ match_fields=[
+ in_port(egress_hop.ingress_port.port_no)
+ ] + [
+ field for field in get_ofb_fields(flow)
+ if field.type not in (IN_PORT,)
+ ],
+ actions=[
+ action for action in get_actions(flow)
+ if action.type != OUTPUT
+ ] + [
+ output(egress_hop.egress_port.port_no)
+ ],
+ table_id=table_id,
+ meters=meter_ids
+ ))
+
if out_port_no is not None and \
(out_port_no & 0x7fffffff) == ofp.OFPP_CONTROLLER:
@@ -681,7 +760,8 @@
# the first using the goto-statement. We also assume that the
# inner tag is applied at the ONU, while the outer tag is
# applied at the OLT
- if has_next_table(flow):
+ next_table_id = get_goto_table_id(flow)
+ if next_table_id is not None and next_table_id < tech_profile.DEFAULT_TECH_PROFILE_TABLE_ID:
assert out_port_no is None
fl_lst, _ = device_rules.setdefault(
ingress_hop.device.id, ([], []))
@@ -701,66 +781,16 @@
]
))
+ elif next_table_id is not None and next_table_id >= tech_profile.DEFAULT_TECH_PROFILE_TABLE_ID:
+ assert out_port_no is not None
+ meter_ids = get_meter_ids_from_flow(flow)
+ update_devices_rules(flow, device_rules, meter_ids, next_table_id)
else:
-
- actions = [action.type for action in get_actions(flow)]
- # Transparent ONU and OLT case (No-L2-Modification flow)
- if len(actions) == 1 and OUTPUT in actions:
- child_device_flow_lst, _ = device_rules.setdefault(
- ingress_hop.device.id, ([], []))
- parent_device_flow_lst, _ = device_rules.setdefault(
- egress_hop.device.id, ([], []))
-
- child_device_flow_lst.append(mk_flow_stat(
- priority = flow.priority,
- cookie = flow.cookie,
- match_fields=[
- in_port(ingress_hop.ingress_port.port_no)
- ] + [
- field for field in get_ofb_fields(flow)
- if field.type not in (IN_PORT,)
- ],
- actions=[
- output(ingress_hop.egress_port.port_no)
- ]
- ))
-
- parent_device_flow_lst.append(mk_flow_stat(
- priority = flow.priority,
- cookie=flow.cookie,
- match_fields=[
- in_port(egress_hop.ingress_port.port_no),
- ] + [
- field for field in get_ofb_fields(flow)
- if field.type not in (IN_PORT,)
- ],
- actions=[
- output(egress_hop.egress_port.port_no)
- ]
- ))
- else:
- assert out_port_no is not None
- fl_lst, _ = device_rules.setdefault(
- egress_hop.device.id, ([], []))
- fl_lst.append(mk_flow_stat(
- priority=flow.priority,
- cookie=flow.cookie,
- match_fields=[
- in_port(egress_hop.ingress_port.port_no),
- ] + [
- field for field in get_ofb_fields(flow)
- if field.type not in (IN_PORT, )
- ],
- actions=[
- action for action in get_actions(flow)
- if action.type != OUTPUT
- ] + [
- output(egress_hop.egress_port.port_no)
- ]
- ))
+ update_devices_rules(flow, device_rules)
else: # downstream
- if has_next_table(flow):
+ next_table_id = get_goto_table_id(flow)
+ if next_table_id is not None and next_table_id < tech_profile.DEFAULT_TECH_PROFILE_TABLE_ID:
assert out_port_no is None
if get_metadata(flow) is not None:
@@ -880,18 +910,16 @@
if action.type not in (OUTPUT,)
] + [
output(egress_hop.egress_port.port_no)
- ]
-
+ ],
+ #table_id=flow.table_id,
+ #meters=None if len(get_meter_ids_from_flow(flow)) == 0 else get_meter_ids_from_flow(flow)
))
-
else:
grp_id = get_group(flow)
-
- if grp_id is not None: # multicast case
+ if grp_id is not None: # multicast case
fl_lst_olt, _ = device_rules.setdefault(
ingress_hop.device.id, ([], []))
-
# having no group yet is the same as having a group with
# no buckets
group = group_map.get(grp_id, ofp.ofp_group_entry())
@@ -962,8 +990,6 @@
))
else:
raise NotImplementedError('undefined downstream case for flows')
-
-
return device_rules
# ~~~~~~~~~~~~ methods expected to be provided by derived class ~~~~~~~~~~~
diff --git a/voltha/core/global_handler.py b/voltha/core/global_handler.py
index a80c6a4..c5b21dc 100644
--- a/voltha/core/global_handler.py
+++ b/voltha/core/global_handler.py
@@ -300,6 +300,48 @@
@twisted_async
@inlineCallbacks
+ def UpdateLogicalDeviceMeterTable(self, request, context):
+ log.info('meter-table-update-grpc-request', request=request)
+ response = yield self.dispatcher.dispatch(
+ 'UpdateLogicalDeviceMeterTable',
+ request,
+ context,
+ id= request.id)
+ log.info("meter-table-update-grpc-response", response=response)
+
+ if isinstance(response, DispatchError):
+ log.warn('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
+
+ @twisted_async
+ @inlineCallbacks
+ def GetMeterStatsOfLogicalDevice(self, request, context):
+ log.info('meter-stats-request-grpc-request', request=request)
+ response = yield self.dispatcher.dispatch(
+ 'GetMeterStatsOfLogicalDevice',
+ request,
+ context,
+ id=request.id)
+ log.info("meter-stats-request-grpc-response", response=response)
+
+ if isinstance(response, DispatchError):
+ log.warn('grpc-error-response', error=response.error_code)
+ context.set_details(
+ 'Logical device \'{}\' error'.format(request.id))
+ context.set_code(response.error_code)
+ returnValue(Empty())
+ else:
+ log.info('grpc-success-response', response=response)
+ returnValue(response)
+
+ @twisted_async
+ @inlineCallbacks
def UpdateLogicalDeviceFlowTable(self, request, context):
log.debug('grpc-request', request=request)
response = yield self.dispatcher.dispatch(
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 4dc156d..98cc73c 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -1392,14 +1392,23 @@
context.set_code(StatusCode.NOT_FOUND)
return AlarmDeviceData()
-
@twisted_async
def UpdateLogicalDeviceMeterTable(self, request, context):
- log.error("logical-device-meter-update-service not implemented yet")
- context.set_code(StatusCode.UNIMPLEMENTED)
- context.set_details("UpdateLogicalDeviceMeterTable service has not been implemented yet")
- return Empty()
+ log.info('meter-table-update-grpc-request', request=request)
+ if '/' in request.id:
+ context.set_details('Malformed logical device id \'{}\''.format(request.id))
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ return Empty()
+
+ try:
+ agent = self.core.get_logical_device_agent(request.id)
+ agent.update_meter_table(request.meter_mod)
+ return Empty()
+ except KeyError:
+ context.set_details('Logical device \'{}\' not found'.format(request.id))
+ context.set_code(StatusCode.NOT_FOUND)
+ return Empty()
@twisted_async
def GetMeterStatsOfLogicalDevice(self, request, context):
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index 70e26b6..10ec66c 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -34,7 +34,7 @@
from voltha.protos import openflow_13_pb2 as ofp
from voltha.protos.device_pb2 import Port
from voltha.protos.logical_device_pb2 import LogicalPort
-from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
+from voltha.protos.openflow_13_pb2 import Flows, Meters, FlowGroups, ofp_meter_config
_ = third_party
@@ -53,6 +53,8 @@
self.root_proxy = core.get_proxy('/')
self.flows_proxy = core.get_proxy(
'/logical_devices/{}/flows'.format(logical_device.id))
+ self.meters_proxy = core.get_proxy(
+ '/logical_devices/{}/meters'.format(logical_device.id))
self.groups_proxy = core.get_proxy(
'/logical_devices/{}/flow_groups'.format(logical_device.id))
self.self_proxy = core.get_proxy(
@@ -176,8 +178,21 @@
self.flow_modify_strict(flow_mod)
else:
- self.log.warn('unhandled-flow-mod',
- command=command, flow_mod=flow_mod)
+ self.log.warn('unhandled-flow-mod', command=command, flow_mod=flow_mod)
+
+ def update_meter_table(self, meter_mod):
+ command = meter_mod.command
+
+ if command == ofp.OFPMC_ADD:
+ self.meter_add(meter_mod)
+
+ elif command == ofp.OFPMC_MODIFY:
+ self.meter_modify(meter_mod)
+
+ elif command == ofp.OFPMC_DELETE:
+ self.meter_delete(meter_mod)
+ else:
+ self.log.warn('unhandled-meter-mod', command=command, flow_mod=meter_mod)
def update_group_table(self, group_mod):
@@ -196,6 +211,64 @@
self.log.warn('unhandled-group-mod',
command=command, group_mod=group_mod)
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL METER HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
+
+ def meter_add(self, meter_mod):
+ assert isinstance(meter_mod, ofp.ofp_meter_mod)
+ # read from model
+ meters = list(self.meters_proxy.get('/').items)
+ if not self.check_meter_id_overlapping(meters, meter_mod):
+ meters.append(ofp_meter_config(flags=meter_mod.flags, \
+ meter_id=meter_mod.meter_id, \
+ bands=meter_mod.bands))
+
+ self.meters_proxy.update('/', Meters(items=meters))
+ else:
+ self.signal_meter_mod_error(ofp.OFPMMFC_METER_EXISTS, meter_mod)
+
+ def meter_modify(self, meter_mod):
+ assert isinstance(meter_mod, ofp.ofp_meter_mod)
+ meters = list(self.meters_proxy.get('/').items)
+ existing_meter = self.check_meter_id_overlapping(meters, meter_mod)
+ if existing_meter:
+ existing_meter.flags = meter_mod.flags
+ existing_meter.bands = meter_mod.bands
+ self.meters_proxy.update('/', Meters(items=meters))
+ else:
+ self.signal_meter_mod_error(ofp.OFPMMFC_UNKNOWN_METER, meter_mod)
+
+ def meter_delete(self, meter_mod):
+ assert isinstance(meter_mod, ofp.ofp_meter_mod)
+ meters = list(self.meters_proxy.get('/').items)
+ to_keep = list()
+ to_delete = 0
+
+ for meter in meters:
+ if meter.meter_id != meter_mod.meter_id:
+ to_keep.append(meter)
+ else:
+ to_delete += 1
+
+ if to_delete == 1:
+ self.meters_proxy.update('/', Meters(items=to_keep))
+ if to_delete == 0:
+ self.signal_meter_mod_error(ofp.OFPMMFC_UNKNOWN_METER, meter_mod)
+ elif to_delete > 1:
+ raise Exception('More than one meter_config sharing the same meter_id cannot exist')
+
+ @staticmethod
+ def check_meter_id_overlapping(meters, meter_mod):
+ for meter in meters:
+ if meter.meter_id == meter_mod.meter_id:
+ return meter
+ return False
+
+ def signal_meter_mod_error(self, error_code, meter_mod):
+ pass # TODO
+
+
+
+
# ~~~~~~~~~~~~~~~~~~~~~~~~~ LOW LEVEL FLOW HANDLERS ~~~~~~~~~~~~~~~~~~~~~~~
def flow_add(self, mod):
diff --git a/voltha/protos/Makefile b/voltha/protos/Makefile
index 6391517..3e5ea79 100644
--- a/voltha/protos/Makefile
+++ b/voltha/protos/Makefile
@@ -20,7 +20,7 @@
$(error To get started, please source the env.sh file from Voltha top level directory)
endif
-default: third_party build
+default: openolt third_party build
PROTO_FILES := $(wildcard *.proto)
PROTO_GOOGLE_API := $(wildcard third_party/google/api/*.proto)
@@ -79,6 +79,10 @@
$(PROTO_All_PB2_C_FILES) \
$(PROTO_PB2_GOOGLE_API)
+openolt:
+ @echo "Building OpenOlt Proto Files"
+ cd ../adapters/openolt/protos; make
+
$(PROTOC):
@echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
@echo "It looks like you don't have protocol buffer tools installed."
@@ -104,4 +108,3 @@
uninstall-protoc:
cd $(PROTOC_BUILD_TMP_DIR)/$(PROTOC_DIR); \
sudo make uninstall
-
diff --git a/voltha/protos/logical_device.proto b/voltha/protos/logical_device.proto
index 0b081d2..11844fe 100644
--- a/voltha/protos/logical_device.proto
+++ b/voltha/protos/logical_device.proto
@@ -52,8 +52,11 @@
// flows configured on the logical device
openflow_13.Flows flows = 129 [(child_node) = {}];
+ // meters configured on the logical device
+ openflow_13.Meters meters = 130 [(child_node) = {}];
+
// flow groups configured on the logical device
- openflow_13.FlowGroups flow_groups = 130 [(child_node) = {}];
+ openflow_13.FlowGroups flow_groups = 131 [(child_node) = {}];
}
diff --git a/voltha/protos/openflow_13.proto b/voltha/protos/openflow_13.proto
index 8079712..192ae39 100644
--- a/voltha/protos/openflow_13.proto
+++ b/voltha/protos/openflow_13.proto
@@ -2267,6 +2267,10 @@
repeated ofp_flow_stats items = 1;
}
+message Meters {
+ repeated ofp_meter_config items = 1;
+}
+
message FlowGroups {
repeated ofp_group_entry items = 1;
}