blob: 45df9c8dfbcc620cc8fec037a862e50122765fa7 [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import asyncio
7from unittest import TestCase, main, mock
8
9from common.service import MagmaService
10from common.service_registry import ServiceRegistry
11from orc8r.protos.common_pb2 import Void
12from orc8r.protos.mconfig import mconfigs_pb2
13from orc8r.protos.service303_pb2 import ServiceInfo
14from orc8r.protos.service303_pb2_grpc import Service303Stub
15
16
17class Service303Tests(TestCase):
18 """
19 Tests for the MagmaService and the Service303 interface
20 """
21
22 @mock.patch('time.time', mock.MagicMock(return_value=12345))
23 def setUp(self):
24 ServiceRegistry.add_service('test', '0.0.0.0', 0)
25 self._stub = None
26
27 self._loop = asyncio.new_event_loop()
28 # Use a new event loop to ensure isolated tests
29 self._service = MagmaService(
30 name='test',
31 empty_mconfig=mconfigs_pb2.MagmaD(),
32 loop=self._loop,
33 )
34 asyncio.set_event_loop(self._service.loop)
35
36 @mock.patch(
37 'magma.common.service_registry.ServiceRegistry.get_proxy_config',
38 )
39 def test_service_run(self, mock_get_proxy_config):
40 """
41 Test if the service starts and stops gracefully.
42 """
43
44 self.assertEqual(self._service.state, ServiceInfo.STARTING)
45
46 mock_get_proxy_config.return_value = {
47 'cloud_address': '127.0.0.1',
48 'proxy_cloud_connections': True,
49 }
50
51 # Start the service and pause the loop
52 self._service.loop.stop()
53 self._service.run()
54 asyncio.set_event_loop(self._service.loop)
55 self._service.log_counter._periodic_task.cancel()
56 self.assertEqual(self._service.state, ServiceInfo.ALIVE)
57
58 # Create a rpc stub and query the Service303 interface
59 ServiceRegistry.add_service('test', '0.0.0.0', self._service.port)
60 channel = ServiceRegistry.get_rpc_channel(
61 'test',
62 ServiceRegistry.LOCAL,
63 )
64 self._stub = Service303Stub(channel)
65
66 info = ServiceInfo(
67 name='test',
68 version='0.0.0',
69 state=ServiceInfo.ALIVE,
70 health=ServiceInfo.APP_HEALTHY,
71 start_time_secs=12345,
72 )
73 self.assertEqual(self._stub.GetServiceInfo(Void()), info)
74
75 # Stop the service
76 self._stub.StopService(Void())
77 self._service.loop.run_forever()
78 self.assertEqual(self._service.state, ServiceInfo.STOPPED)
79
80
81if __name__ == "__main__":
82 main()