Init commit for standalone enodebd
Change-Id: I88eeef5135dd7ba8551ddd9fb6a0695f5325337b
diff --git a/common/tests/service303_tests.py b/common/tests/service303_tests.py
new file mode 100644
index 0000000..49175e4
--- /dev/null
+++ b/common/tests/service303_tests.py
@@ -0,0 +1,90 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import asyncio
+from unittest import TestCase, main, mock
+
+from common.service import MagmaService
+from common.service_registry import ServiceRegistry
+from orc8r.protos.common_pb2 import Void
+from orc8r.protos.mconfig import mconfigs_pb2
+from orc8r.protos.service303_pb2 import ServiceInfo
+from orc8r.protos.service303_pb2_grpc import Service303Stub
+
+
+class Service303Tests(TestCase):
+ """
+ Tests for the MagmaService and the Service303 interface
+ """
+
+ @mock.patch('time.time', mock.MagicMock(return_value=12345))
+ def setUp(self):
+ ServiceRegistry.add_service('test', '0.0.0.0', 0)
+ self._stub = None
+
+ self._loop = asyncio.new_event_loop()
+ # Use a new event loop to ensure isolated tests
+ self._service = MagmaService(
+ name='test',
+ empty_mconfig=mconfigs_pb2.MagmaD(),
+ loop=self._loop,
+ )
+ asyncio.set_event_loop(self._service.loop)
+
+ @mock.patch(
+ 'magma.common.service_registry.ServiceRegistry.get_proxy_config',
+ )
+ def test_service_run(self, mock_get_proxy_config):
+ """
+ Test if the service starts and stops gracefully.
+ """
+
+ self.assertEqual(self._service.state, ServiceInfo.STARTING)
+
+ mock_get_proxy_config.return_value = {
+ 'cloud_address': '127.0.0.1',
+ 'proxy_cloud_connections': True,
+ }
+
+ # Start the service and pause the loop
+ self._service.loop.stop()
+ self._service.run()
+ asyncio.set_event_loop(self._service.loop)
+ self._service.log_counter._periodic_task.cancel()
+ self.assertEqual(self._service.state, ServiceInfo.ALIVE)
+
+ # Create a rpc stub and query the Service303 interface
+ ServiceRegistry.add_service('test', '0.0.0.0', self._service.port)
+ channel = ServiceRegistry.get_rpc_channel(
+ 'test',
+ ServiceRegistry.LOCAL,
+ )
+ self._stub = Service303Stub(channel)
+
+ info = ServiceInfo(
+ name='test',
+ version='0.0.0',
+ state=ServiceInfo.ALIVE,
+ health=ServiceInfo.APP_HEALTHY,
+ start_time_secs=12345,
+ )
+ self.assertEqual(self._stub.GetServiceInfo(Void()), info)
+
+ # Stop the service
+ self._stub.StopService(Void())
+ self._service.loop.run_forever()
+ self.assertEqual(self._service.state, ServiceInfo.STOPPED)
+
+
+if __name__ == "__main__":
+ main()