blob: 49175e4bcf827bdf59f27836f577c4548a286a60 [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import asyncio
15from unittest import TestCase, main, mock
16
17from common.service import MagmaService
18from common.service_registry import ServiceRegistry
19from orc8r.protos.common_pb2 import Void
20from orc8r.protos.mconfig import mconfigs_pb2
21from orc8r.protos.service303_pb2 import ServiceInfo
22from orc8r.protos.service303_pb2_grpc import Service303Stub
23
24
25class Service303Tests(TestCase):
26 """
27 Tests for the MagmaService and the Service303 interface
28 """
29
30 @mock.patch('time.time', mock.MagicMock(return_value=12345))
31 def setUp(self):
32 ServiceRegistry.add_service('test', '0.0.0.0', 0)
33 self._stub = None
34
35 self._loop = asyncio.new_event_loop()
36 # Use a new event loop to ensure isolated tests
37 self._service = MagmaService(
38 name='test',
39 empty_mconfig=mconfigs_pb2.MagmaD(),
40 loop=self._loop,
41 )
42 asyncio.set_event_loop(self._service.loop)
43
44 @mock.patch(
45 'magma.common.service_registry.ServiceRegistry.get_proxy_config',
46 )
47 def test_service_run(self, mock_get_proxy_config):
48 """
49 Test if the service starts and stops gracefully.
50 """
51
52 self.assertEqual(self._service.state, ServiceInfo.STARTING)
53
54 mock_get_proxy_config.return_value = {
55 'cloud_address': '127.0.0.1',
56 'proxy_cloud_connections': True,
57 }
58
59 # Start the service and pause the loop
60 self._service.loop.stop()
61 self._service.run()
62 asyncio.set_event_loop(self._service.loop)
63 self._service.log_counter._periodic_task.cancel()
64 self.assertEqual(self._service.state, ServiceInfo.ALIVE)
65
66 # Create a rpc stub and query the Service303 interface
67 ServiceRegistry.add_service('test', '0.0.0.0', self._service.port)
68 channel = ServiceRegistry.get_rpc_channel(
69 'test',
70 ServiceRegistry.LOCAL,
71 )
72 self._stub = Service303Stub(channel)
73
74 info = ServiceInfo(
75 name='test',
76 version='0.0.0',
77 state=ServiceInfo.ALIVE,
78 health=ServiceInfo.APP_HEALTHY,
79 start_time_secs=12345,
80 )
81 self.assertEqual(self._stub.GetServiceInfo(Void()), info)
82
83 # Stop the service
84 self._stub.StopService(Void())
85 self._service.loop.run_forever()
86 self.assertEqual(self._service.state, ServiceInfo.STOPPED)
87
88
89if __name__ == "__main__":
90 main()