733: Insert the description of the change.Need to support Error Checking for consistency – if Meter Reference made then need Meter to exist first before reference.
If Flow Table ID does not resolve to a Technology Profile in KV Store generate an error – initial approach to simplify implementation - the Technology for the Profile will not be used to qualify the lookup.
Initially just do OF agent checks which do not need knowledge available in the adapter i.e. Meter Band Reference requires Meter Band definition first. Any Forward References need error checks (OF Error messages).
OF Agent needs to pass through the Meter/Meter Band programming and the Flow Table ID used to reference the Technology Profile.
The VOLTHA core needs to be able to receive meters from the OFAgent and store them in the appropriate logical device. A new northbound API needs to be added to the core to support this.
Change-Id: Ide776dbcbc04232c1d929a85707fec09e3dedf6f
diff --git a/ofagent/converter.py b/ofagent/converter.py
index 192a65a..c6386b3 100644
--- a/ofagent/converter.py
+++ b/ofagent/converter.py
@@ -65,6 +65,10 @@
kw = pb2dict(pb)
return of13.port_stats_entry(**kw)
+def ofp_meter_stats_to_loxi_meter_stats(pb):
+ kw = pb2dict(pb)
+ return of13.meter_stats(**kw)
+
def make_loxi_field(oxm_field):
assert oxm_field['oxm_class'] == pb2.OFPXMC_OPENFLOW_BASIC
ofb_field = oxm_field['ofb_field']
@@ -230,7 +234,8 @@
'ofp_bucket_counter': ofp_bucket_counter_to_loxy_bucket_counter,
'ofp_bucket': ofp_bucket_to_loxi_bucket,
'ofp_action': make_loxi_action,
- 'ofp_port_stats': ofp_port_stats_to_loxi_port_stats
+ 'ofp_port_stats': ofp_port_stats_to_loxi_port_stats,
+ 'ofp_meter_stats': ofp_meter_stats_to_loxi_meter_stats
}
@@ -250,6 +255,52 @@
match=to_grpc(lo.match),
instructions=[to_grpc(i) for i in lo.instructions])
+def loxi_meter_mod_to_ofp_meter_mod(lo):
+ return pb2.ofp_meter_mod(
+ command=lo.command,
+ flags=lo.flags,
+ meter_id=lo.meter_id,
+ bands=[to_grpc(i) for i in lo.meters])
+
+
+def loxi_meter_band_drop_to_ofp_meter_band_drop(lo):
+ return pb2.ofp_meter_band_header(
+ type=lo.type,
+ rate=lo.rate,
+ burst_size=lo.burst_size)
+
+
+def loxi_meter_band_dscp_remark_to_ofp_meter_band_dscp_remark(lo):
+ return pb2.ofp_meter_band_header(
+ type=lo.type,
+ rate=lo.rate,
+ burst_size=lo.burst_size,
+ dscp_remark=pb2.ofp_meter_band_dscp_remark(prec_level=lo.prec_level))
+
+
+def loxi_meter_band_experimenter_to_ofp_meter_band_experimenter(lo):
+ return pb2.ofp_meter_band_header(
+ type=lo.type,
+ rate=lo.rate,
+ burst_size=lo.burst_size,
+ experimenter=pb2.ofp_meter_band_experimenter(experimenter=lo.experimenter))
+
+
+def loxi_meter_multipart_request_to_ofp_meter_multipart_request(lo):
+ return pb2.ofp_meter_multipart_request(
+ meter_id=lo.meter_id)
+
+
+def loxi_meter_stats_to_ofp_meter_stats(lo):
+ return pb2.ofp_meter_stats(
+ meter_id=lo.meter_id,
+ flow_count=lo.flow_count,
+ packet_in_count =lo.packet_in_count,
+ byte_in_count=lo.byte_in_count,
+ duration_sec=lo.duration_sec,
+ duration_nsec=lo.duration_nsec,
+ band_stats=lo.band_stats)
+
def loxi_group_mod_to_ofp_group_mod(lo):
return pb2.ofp_group_mod(
@@ -405,12 +456,18 @@
of13.message.flow_delete_strict: loxi_flow_mod_to_ofp_flow_mod,
of13.message.flow_modify: loxi_flow_mod_to_ofp_flow_mod,
of13.message.flow_modify_strict: loxi_flow_mod_to_ofp_flow_mod,
+ of13.message.meter_mod: loxi_meter_mod_to_ofp_meter_mod,
+ of13.message.meter_stats_request: loxi_meter_stats_to_ofp_meter_stats,
of13.message.group_add: loxi_group_mod_to_ofp_group_mod,
of13.message.group_delete: loxi_group_mod_to_ofp_group_mod,
of13.message.group_modify: loxi_group_mod_to_ofp_group_mod,
of13.message.packet_out: loxi_packet_out_to_ofp_packet_out,
+ of13.meter_band.drop: loxi_meter_band_drop_to_ofp_meter_band_drop,
+ of13.meter_band.dscp_remark: loxi_meter_band_dscp_remark_to_ofp_meter_band_dscp_remark,
+ of13.meter_band.experimenter: loxi_meter_band_experimenter_to_ofp_meter_band_experimenter,
+
of13.common.match_v3: loxi_match_v3_to_ofp_match,
of13.common.bucket: loxi_bucket_to_ofp_bucket,
diff --git a/ofagent/grpc_client.py b/ofagent/grpc_client.py
index ab9417b..f4fc6df 100644
--- a/ofagent/grpc_client.py
+++ b/ofagent/grpc_client.py
@@ -27,7 +27,7 @@
from twisted.internet import threads
from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
-from protos.voltha_pb2 import ID, VolthaLocalServiceStub, FlowTableUpdate, \
+from protos.voltha_pb2 import ID, VolthaLocalServiceStub, FlowTableUpdate, MeterModUpdate, \
FlowGroupTableUpdate, PacketOut
from protos.logical_device_pb2 import LogicalPortId
from google.protobuf import empty_pb2
@@ -204,6 +204,16 @@
returnValue(res)
@inlineCallbacks
+ def update_meter_mod_table(self, device_id, meter_mod):
+ req = MeterModUpdate(
+ id=device_id,
+ meter_mod=meter_mod
+ )
+ res = yield threads.deferToThread(
+ self.local_stub.UpdateLogicalDeviceMeterTable, req)
+ returnValue(res)
+
+ @inlineCallbacks
def update_group_table(self, device_id, group_mod):
req = FlowGroupTableUpdate(
id=device_id,
@@ -252,3 +262,11 @@
res = yield threads.deferToThread(
self.local_stub.Subscribe, subscriber, timeout=self.grpc_timeout)
returnValue(res)
+
+
+ @inlineCallbacks
+ def get_meter_stats(self, device_id):
+ req = ID(id=device_id)
+ res = yield threads.deferToThread(
+ self.local_stub.GetMeterStatsOfLogicalDevice, req)
+ returnValue(res.items)
diff --git a/ofagent/of_protocol_handler.py b/ofagent/of_protocol_handler.py
index 8d09461..984802e 100644
--- a/ofagent/of_protocol_handler.py
+++ b/ofagent/of_protocol_handler.py
@@ -128,6 +128,29 @@
elif self.role == ofp.OFPCR_ROLE_SLAVE:
self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
+
+ def handle_meter_mod_request(self, req):
+ if self.role == ofp.OFPCR_ROLE_MASTER or self.role == ofp.OFPCR_ROLE_EQUAL:
+ try:
+ grpc_req = to_grpc(req)
+ except Exception, e:
+ log.exception('failed-to-convert-meter-mod-request', e=e)
+ else:
+ return self.rpc.update_meter_mod_table(self.device_id, grpc_req)
+
+ elif self.role == ofp.OFPCR_ROLE_SLAVE:
+ self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
+
+ @inlineCallbacks
+ def handle_meter_stats_request(self, req):
+ try:
+ meter_stats = yield self.rpc.get_meter_stats(self.device_id)
+ meter_stats = [to_loxi(m) for m in meter_stats]
+ of_message = ofp.message.meter_stats_reply(xid=req.xid, entries=meter_stats)
+ self.cxn.send(of_message)
+ except Exception, e:
+ log.exception("failed-meter-stats-request", req=req, e=e)
+
def handle_get_async_request(self, req):
raise NotImplementedError()
@@ -144,10 +167,6 @@
elif self.role == ofp.OFPCR_ROLE_SLAVE:
self.cxn.send(ofp.message.bad_request_error_msg(code=ofp.OFPBRC_IS_SLAVE))
-
- def handle_meter_mod_request(self, req):
- raise NotImplementedError()
-
def handle_role_request(self, req):
if req.role == ofp.OFPCR_ROLE_MASTER or req.role == ofp.OFPCR_ROLE_SLAVE:
if self.agent.generation_is_defined and (
@@ -238,11 +257,6 @@
def handle_group_features_request(self, req):
raise NotImplementedError()
- def handle_meter_stats_request(self, req):
- meter_stats = [] # see https://jira.opencord.org/browse/CORD-825
- self.cxn.send(ofp.message.meter_stats_reply(
- xid=req.xid, entries=meter_stats))
-
def handle_meter_config_request(self, req):
raise NotImplementedError()
diff --git a/tests/utests/ofagent/test_of_protocol_handler.py b/tests/utests/ofagent/test_of_protocol_handler.py
index ca4392d..fbcc654 100644
--- a/tests/utests/ofagent/test_of_protocol_handler.py
+++ b/tests/utests/ofagent/test_of_protocol_handler.py
@@ -40,7 +40,7 @@
req.role = ofp.OFPCR_ROLE_MASTER
return req
- def test_handle_flow_mod_request_role_salve(self):
+ def test_handle_flow_mod_request_role_slave(self):
generic_obj = self.gen_generic_obj()
device = self.gen_device()
of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
@@ -57,6 +57,23 @@
of_proto_handler.role = ofp.OFPCR_ROLE_MASTER
of_proto_handler.handle_flow_mod_request(generic_obj)
+ def test_handle_meter_mod_request_role_slave(self):
+ generic_obj = self.gen_generic_obj()
+ device = self.gen_device()
+ of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+ of_proto_handler.role = ofp.OFPCR_ROLE_SLAVE
+ with self.assertRaises(Exception) as context:
+ of_proto_handler.handle_meter_mod_request(generic_obj)
+ print context.exception
+ self.assertTrue('\'function\' object has no attribute \'send\'' in str(context.exception))
+
+ def test_handle_meter_mod_request_role_master(self):
+ generic_obj = self.gen_generic_obj()
+ device = self.gen_device()
+ of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+ of_proto_handler.role = ofp.OFPCR_ROLE_MASTER
+ of_proto_handler.handle_meter_mod_request(generic_obj)
+
def test_handle_role_request(self):
generic_obj = self.gen_generic_obj()
req = self.gen_role_req()
diff --git a/tests/utests/ofagent/test_ofagent_meter_mod_converter.py b/tests/utests/ofagent/test_ofagent_meter_mod_converter.py
new file mode 100644
index 0000000..899953a
--- /dev/null
+++ b/tests/utests/ofagent/test_ofagent_meter_mod_converter.py
@@ -0,0 +1,76 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from unittest import main, TestCase
+import structlog
+from loxi.of13.message import meter_mod
+from ofagent.converter import to_grpc
+from ofagent.loxi.of13 import const
+from random import randint
+from protobuf_to_dict import protobuf_to_dict
+from ofagent.loxi.of13.message import meter_mod
+
+log = structlog.get_logger()
+
+
+class test_of_agent_vcore_connection(TestCase):
+
+ def test_meter_mod_loxi_to_grpc_converter(self):
+ from ofagent.loxi.of13 import meter_band
+
+ assertion_check_keys = dict()
+ meter_bands = list()
+
+ command_list = [const.OFPMC_ADD, const.OFPMC_MODIFY, const.OFPMC_DELETE]
+ flag_list = [const.OFPMF_KBPS, const.OFPMF_PKTPS, const.OFPMF_BURST, const.OFPMF_STATS]
+ meter_band_constructs = [meter_band.dscp_remark, meter_band.drop, meter_band.experimenter]
+
+ assertion_check_keys['meter_band_entry_cnt'] = randint(0, 20)
+ assertion_check_keys['xid_instance'] = randint(0, 0xFFFFFFFF)
+ assertion_check_keys['command_instance'] = command_list[randint(0, len(command_list) - 1)]
+ assertion_check_keys['flag_instance'] = flag_list[randint(0, len(flag_list) - 1)]
+
+ for i in range(0, assertion_check_keys['meter_band_entry_cnt']):
+ meter_band = meter_band_constructs[randint(0, len(meter_band_constructs)-1)]()
+ assertion_check_keys['meter_band_type_' + str(i)] = meter_band.type
+ meter_bands.append(meter_band)
+
+ of_meter_mod_req = meter_mod(xid=assertion_check_keys['xid_instance'],
+ command=assertion_check_keys['command_instance'],
+ flags=assertion_check_keys['flag_instance'],
+ meters=meter_bands)
+
+ req = to_grpc(of_meter_mod_req)
+ request = protobuf_to_dict(req)
+
+ if request.has_key('flags'):
+ self.assertEqual(request['flags'], assertion_check_keys['flag_instance'])
+
+ if request.has_key('command'):
+ self.assertEqual(request['command'], assertion_check_keys['command_instance'])
+
+ if request.has_key('bands'):
+ self.assertEqual(assertion_check_keys['meter_band_entry_cnt'], len(request['bands']))
+
+ name_suffix = 0
+ for i in request['bands']:
+ self.assertEqual(i['type'], assertion_check_keys['meter_band_type_' + str(name_suffix)])
+ name_suffix = name_suffix+1
+
+
+if __name__ == '__main__':
+ main()
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 35a2395..5f73513 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -1389,6 +1389,23 @@
context.set_code(StatusCode.NOT_FOUND)
return AlarmDeviceData()
+
+ @twisted_async
+ def UpdateLogicalDeviceMeterTable(self, request, context):
+ log.error("logical-device-meter-update-service not implemented yet")
+ context.set_code(StatusCode.UNIMPLEMENTED)
+ context.set_details("UpdateLogicalDeviceMeterTable service has not been implemented yet")
+ return Empty()
+
+
+ @twisted_async
+ def GetMeterStatsOfLogicalDevice(self, request, context):
+ log.error("meter-stats-acquisition is not implemented yet")
+ context.set_code(StatusCode.UNIMPLEMENTED)
+ context.set_details("UpdateLogicalDeviceMeterTable service has not been implemented yet")
+ return Empty()
+
+
@twisted_async
def SimulateAlarm(self, request, context):
log.debug('grpc-request', request=request)
@@ -1416,4 +1433,4 @@
except Exception as e:
log.exception(e.message)
response = OperationResp(code=OperationResp.OPERATION_FAILURE)
- return response
\ No newline at end of file
+ return response
diff --git a/voltha/protos/openflow_13.proto b/voltha/protos/openflow_13.proto
index 48b7944..8079712 100644
--- a/voltha/protos/openflow_13.proto
+++ b/voltha/protos/openflow_13.proto
@@ -1354,36 +1354,29 @@
/* Common header for all meter bands */
message ofp_meter_band_header {
ofp_meter_band_type type = 1; /* One of OFPMBT_*. */
- uint32 len = 2; /* Length in bytes of this band. */
- uint32 rate = 3; /* Rate for this band. */
- uint32 burst_size = 4;/* Size of bursts. */
+ uint32 rate = 2; /* Rate for this band. */
+ uint32 burst_size = 3;/* Size of bursts. */
+ oneof data {
+ ofp_meter_band_drop drop = 4;
+ ofp_meter_band_dscp_remark dscp_remark = 5;
+ ofp_meter_band_experimenter experimenter = 6;
+ }
};
/* OFPMBT_DROP band - drop packets */
message ofp_meter_band_drop {
- uint32 type = 1; /* OFPMBT_DROP. */
- uint32 len = 2; /* Length in bytes of this band. */
- uint32 rate = 3; /* Rate for dropping packets. */
- uint32 burst_size = 4;/* Size of bursts. */
+ //Empty payload
};
/* OFPMBT_DSCP_REMARK band - Remark DSCP in the IP header */
message ofp_meter_band_dscp_remark {
- uint32 type = 1; /* OFPMBT_DSCP_REMARK. */
- uint32 len = 2; /* Length in bytes of this band. */
- uint32 rate = 3; /* Rate for remarking packets. */
- uint32 burst_size = 4; /* Size of bursts. */
- uint32 prec_level = 5; /* Number of drop precedence level to add. */
+ uint32 prec_level = 1; /* Number of drop precedence level to add. */
};
/* OFPMBT_EXPERIMENTER band - Experimenter type.
* The rest of the band is experimenter-defined. */
message ofp_meter_band_experimenter {
- ofp_meter_band_type type = 1; /* One of OFPMBT_*. */
- uint32 len = 2; /* Length in bytes of this band. */
- uint32 rate = 3; /* Rate for this band. */
- uint32 burst_size = 4; /* Size of bursts. */
- uint32 experimenter = 5;/* Experimenter ID which takes the
+ uint32 experimenter = 1;/* Experimenter ID which takes the
same form as in struct
ofp_experimenter_header. */
};
@@ -1406,7 +1399,6 @@
/* Meter configuration. OFPT_METER_MOD. */
message ofp_meter_mod {
-// ofp_header header = 1;
ofp_meter_mod_command command = 1; /* One of OFPMC_*. */
uint32 flags = 2; /* Bitmap of OFPMF_* flags. */
uint32 meter_id = 3; /* Meter instance. */
@@ -2252,6 +2244,15 @@
/* ADDITIONAL VOLTHA SPECIFIC MESSAGE TYPES, AIDING RPC CALLS */
+message MeterModUpdate {
+ string id = 1; // Device.id or LogicalDevice.id
+ ofp_meter_mod meter_mod = 2;
+}
+
+message MeterStatsReply {
+ repeated ofp_meter_stats meter_stats = 1;
+}
+
message FlowTableUpdate {
string id = 1; // Device.id or LogicalDevice.id
ofp_flow_mod flow_mod = 2;
diff --git a/voltha/protos/voltha.proto b/voltha/protos/voltha.proto
index a7cc99d..56f103b 100644
--- a/voltha/protos/voltha.proto
+++ b/voltha/protos/voltha.proto
@@ -347,6 +347,23 @@
};
}
+ // Update meter table for logical device
+ rpc UpdateLogicalDeviceMeterTable(openflow_13.MeterModUpdate)
+ returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/logical_devices/{id}/meters"
+ body: "*"
+ };
+ }
+
+ // Get all meter stats for logical device
+ rpc GetMeterStatsOfLogicalDevice(ID)
+ returns(openflow_13.MeterStatsReply) {
+ option (google.api.http) = {
+ get: "/api/v1/logical_devices/{id}/meters_stats"
+ };
+ }
+
// List all flow groups of a logical device
rpc ListLogicalDeviceFlowGroups(ID) returns(openflow_13.FlowGroups) {
option (google.api.http) = {
@@ -1092,6 +1109,24 @@
};
}
+ // Update meter table for logical device
+ rpc UpdateLogicalDeviceMeterTable(openflow_13.MeterModUpdate)
+ returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/logical_devices/{id}/meters"
+ body: "*"
+ };
+ }
+
+
+ // Get all meter stats for logical device
+ rpc GetMeterStatsOfLogicalDevice(ID)
+ returns(openflow_13.MeterStatsReply) {
+ option (google.api.http) = {
+ get: "/api/v1/logical_devices/{id}/meters_stats"
+ };
+ }
+
// List all flow groups of a logical device
rpc ListLogicalDeviceFlowGroups(ID) returns(openflow_13.FlowGroups) {
option (google.api.http) = {