https://jira.opencord.org/browse/CORD-824
Support for connecting multiple controllers in voltha provided.
Addressed review comments patch 7
Change-Id: I2d2375c7754014743c305a0f3841debe5eb3e795
diff --git a/tests/utests/ofagent/test_of_protocol_handler.py b/tests/utests/ofagent/test_of_protocol_handler.py
new file mode 100644
index 0000000..d72b0b2
--- /dev/null
+++ b/tests/utests/ofagent/test_of_protocol_handler.py
@@ -0,0 +1,77 @@
+from unittest import TestCase, main
+from of_protocol_handler import OpenFlowProtocolHandler
+import loxi.of13 as ofp
+
+class TestOF_Protocol_handler(TestCase):
+
+ def gen_packet_in(self):
+ packet_in = 1
+ return packet_in
+
+ def gen_device(self):
+ device =lambda: None
+ device.id = "1"
+ device.datapath_id = 1
+ device.desc = '{mfr_desc: "cord porject" hw_desc: "simualted pon" sw_desc: "simualted pon"\
+ serial_num: "2f150d56afa2405eba3ba24e33ce8df9" dp_desc: "n/a"}'
+ device.switch_features = '{ n_buffers: 256 n_tables: 2 capabilities: 15 }'
+ device.root_device_id = "a245bd8bb8b8"
+ return device
+
+ def gen_generic_obj(self):
+ generic_obj = lambda: None
+ return generic_obj
+
+ def gen_role_req(self):
+ req = self.gen_generic_obj()
+ req.role = ofp.OFPCR_ROLE_MASTER
+ return req
+
+ def test_handle_flow_mod_request_role_salve(self):
+ generic_obj = self.gen_generic_obj()
+ device = self.gen_device()
+ of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+ of_proto_handler.role = ofp.OFPCR_ROLE_SLAVE
+ with self.assertRaises(Exception) as context:
+ of_proto_handler.handle_flow_mod_request(generic_obj)
+ print context.exception
+ self.assertTrue('\'function\' object has no attribute \'send\'' in context.exception)
+
+ def test_handle_flow_mod_request_role_master(self):
+ generic_obj = self.gen_generic_obj()
+ device = self.gen_device()
+ of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+ of_proto_handler.role = ofp.OFPCR_ROLE_MASTER
+ of_proto_handler.handle_flow_mod_request(generic_obj)
+
+ def test_handle_role_request(self):
+ generic_obj = self.gen_generic_obj()
+ req = self.gen_role_req()
+ device = self.gen_device()
+ of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+ with self.assertRaises(Exception) as context:
+ of_proto_handler.handle_role_request(req)
+ self.assertEqual(of_proto_handler.role,req.role)
+ print context.exception
+ self.assertTrue('\'function\' object has no attribute \'send\'' in context.exception)
+
+ def test_forward_packet_in_role_none(self):
+ packet_in = self.gen_packet_in()
+ generic_obj = self.gen_generic_obj()
+ device = self.gen_device()
+ of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+ of_proto_handler.forward_packet_in(packet_in)
+
+ def test_forward_packet_in_role_master(self):
+ packet_in = self.gen_packet_in()
+ generic_obj = self.gen_generic_obj()
+ device = self.gen_device()
+ of_proto_handler = OpenFlowProtocolHandler(device.datapath_id, device.id, generic_obj, generic_obj, generic_obj)
+ of_proto_handler.role = ofp.OFPCR_ROLE_MASTER
+ with self.assertRaises(Exception) as context:
+ of_proto_handler.forward_packet_in(packet_in)
+ print context.exception
+ self.assertTrue('\'function\' object has no attribute \'send\'' in context.exception)
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file