blob: d7934da4d5f796f0ba799f7c94624f3b9d399096 [file] [log] [blame]
Andy Bavier561f3e52019-03-15 10:46:05 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import unittest
16from mock import patch, call, Mock, PropertyMock
17import json
18
19import os, sys
20
21test_path=os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
22
23class TestSubscriberAuthEvent(unittest.TestCase):
24
25 def setUp(self):
26
27 self.sys_path_save = sys.path
28
29 # Setting up the config module
30 from xosconfig import Config
31 config = os.path.join(test_path, "../test_config.yaml")
32 Config.clear()
33 Config.init(config, "synchronizer-config-schema.yaml")
34 from multistructlog import create_logger
35 log = create_logger(Config().get('logging'))
36 # END Setting up the config module
37
38 from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
39 mock_modelaccessor_config(test_path, [("tt-workflow-driver", "tt-workflow-driver.xproto"),
40 ("olt-service", "volt.xproto"),
41 ("rcord", "rcord.xproto")])
42
43 import xossynchronizer.modelaccessor
44 import mock_modelaccessor
45 reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
46 reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
47
48 from xossynchronizer.modelaccessor import model_accessor
49 from dhcp_event import SubscriberDhcpEventStep
50
51 # import all class names to globals
52 for (k, v) in model_accessor.all_model_classes.items():
53 globals()[k] = v
54
55 self.model_accessor = model_accessor
56 self.log = log
57
58 self.event_step = SubscriberDhcpEventStep(model_accessor=self.model_accessor, log=self.log)
59
60 self.event = Mock()
61
62 self.volt = Mock()
63 self.volt.name = "vOLT"
64 self.volt.leaf_model = Mock()
65
66 # self.subscriber = RCORDSubscriber()
67 # self.subscriber.onu_device = "BRCM1234"
68 # self.subscriber.save = Mock()
69
70 self.mac_address = "00:AA:00:00:00:01"
71 self.ip_address = "192.168.3.5"
72
73 self.si = TtWorkflowDriverServiceInstance()
74 self.si.serial_number = "BRCM1234"
75 self.si.save = Mock()
76
77
78 def tearDown(self):
79 sys.path = self.sys_path_save
80
81 def test_dhcp_subscriber(self):
82
83 self.event.value = json.dumps({
84 "deviceId" : "of:0000000000000001",
85 "portNumber" : "1",
86 "macAddress" : self.mac_address,
87 "ipAddress" : self.ip_address,
88 "messageType": "DHCPREQUEST"
89 })
90
91 with patch.object(VOLTService.objects, "get_items") as volt_service_mock, \
92 patch.object(TtWorkflowDriverServiceInstance.objects, "get_items") as si_mock, \
93 patch.object(self.volt, "get_onu_sn_from_openflow") as get_onu_sn:
94
95 self.assertTrue(VOLTService.objects.first() is not None)
96
97 volt_service_mock.return_value = [self.volt]
98 get_onu_sn.return_value = "BRCM1234"
99 si_mock.return_value = [self.si]
100
101 self.event_step.process_event(self.event)
102
103 self.si.save.assert_called()
104 self.assertEqual(self.si.dhcp_state, "DHCPREQUEST")
105 self.assertEqual(self.si.mac_address, self.mac_address)
106 self.assertEqual(self.si.ip_address, self.ip_address)
107
108if __name__ == '__main__':
109 sys.path.append("..") # for import of helpers.py
110 unittest.main()