blob: 4932daba82a60854bb9e5001bd815f04c84ae833 [file] [log] [blame]
Daniele Moro134b8d62020-01-14 11:32:05 -08001# Copyright 2020-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import unittest
16from mock import patch, Mock
17import json
18
19import os
20import sys
21
22test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
23
24
25class TestSubscriberAuthEvent(unittest.TestCase):
26
27 def setUp(self):
28
29 self.sys_path_save = sys.path
30
31 # Setting up the config module
32 from xosconfig import Config
33 config = os.path.join(test_path, "../test_config.yaml")
34 Config.clear()
35 Config.init(config, "synchronizer-config-schema.yaml")
36 from multistructlog import create_logger
37 log = create_logger(Config().get('logging'))
38 # END Setting up the config module
39
40 from xossynchronizer.mock_modelaccessor_build import mock_modelaccessor_config
41 mock_modelaccessor_config(test_path, [("dt-workflow-driver", "dt-workflow-driver.xproto"),
42 ("olt-service", "volt.xproto"),
43 ("rcord", "rcord.xproto")])
44
45 import xossynchronizer.modelaccessor
46 import mock_modelaccessor
47 reload(mock_modelaccessor) # in case nose2 loaded it in a previous test
48 reload(xossynchronizer.modelaccessor) # in case nose2 loaded it in a previous test
49
50 from xossynchronizer.modelaccessor import model_accessor
51 from pppoe_event import SubscriberPppoeEventStep
52
53 # import all class names to globals
54 for (k, v) in model_accessor.all_model_classes.items():
55 globals()[k] = v
56
57 self.model_accessor = model_accessor
58 self.log = log
59
60 self.event_step = SubscriberPppoeEventStep(model_accessor=self.model_accessor, log=self.log)
61
62 self.event = Mock()
63
64 self.volt = Mock()
65 self.volt.name = "vOLT"
66 self.volt.leaf_model = Mock()
67
68 # self.subscriber = RCORDSubscriber()
69 # self.subscriber.onu_device = "BRCM1234"
70 # self.subscriber.save = Mock()
71
72 self.mac_address = "00:AA:00:00:00:01"
73 self.ip_address = "192.168.3.5"
74 self.pppoe_session_id = "12"
75
76 self.si = DtWorkflowDriverServiceInstance()
77 self.si.serial_number = "BRCM1234"
78 self.si.save = Mock()
79
80 def tearDown(self):
81 sys.path = self.sys_path_save
82
83 def test_ipcp_subscriber(self):
84
85 self.event.value = json.dumps({
86 "deviceId": "of:0000000000000001",
87 "portNumber": "1",
88 "macAddress": self.mac_address,
89 "ipAddress": self.ip_address,
90 "sessionId": self.pppoe_session_id,
91 "eventType": "IPCP_CONF_ACK",
92 'serialNumber': "BRCM1234",
93 })
94
95 with patch.object(DtWorkflowDriverServiceInstance.objects, "get_items") as si_mock:
96
97 si_mock.return_value = [self.si]
98
99 self.event_step.process_event(self.event)
100
101 self.si.save.assert_called()
102 self.assertEqual(self.si.ipcp_state, "CONF_ACK")
103 self.assertEqual(self.si.mac_address, self.mac_address)
104 self.assertEqual(self.si.ip_address, self.ip_address)
105 self.assertEqual(self.si.pppoe_session_id, self.pppoe_session_id)
106
107
108if __name__ == '__main__':
109 sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) # for import of helpers.py
110 unittest.main()