Init commit for standalone enodebd
Change-Id: I88eeef5135dd7ba8551ddd9fb6a0695f5325337b
diff --git a/common/tests/cert_utils_tests.py b/common/tests/cert_utils_tests.py
new file mode 100644
index 0000000..6563ff9
--- /dev/null
+++ b/common/tests/cert_utils_tests.py
@@ -0,0 +1,109 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import base64
+import datetime
+import os
+from tempfile import TemporaryDirectory
+from unittest import TestCase
+
+import magma.common.cert_utils as cu
+from cryptography import x509
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes, serialization
+from cryptography.hazmat.primitives.asymmetric import ec
+
+
+class CertUtilsTest(TestCase):
+ def test_key(self):
+ with TemporaryDirectory(prefix='/tmp/test_cert_utils') as temp_dir:
+ key = ec.generate_private_key(ec.SECP384R1(), default_backend())
+ cu.write_key(key, os.path.join(temp_dir, 'test.key'))
+ key_load = cu.load_key(os.path.join(temp_dir, 'test.key'))
+
+ key_bytes = key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption(),
+ )
+ key_load_bytes = key_load.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption(),
+ )
+ self.assertEqual(key_bytes, key_load_bytes)
+
+ def load_public_key_to_base64der(self):
+ with TemporaryDirectory(prefix='/tmp/test_cert_utils') as temp_dir:
+ key = ec.generate_private_key(ec.SECP384R1(), default_backend())
+ cu.write_key(key, os.path.join(temp_dir, 'test.key'))
+ base64der = cu.load_public_key_to_base64der(
+ os.path.join(temp_dir, 'test.key'),
+ )
+ der = base64.b64decode(base64der)
+ pub_key = serialization.load_der_public_key(der, default_backend())
+ self.assertEqual(pub_key, key.public_key())
+
+ def test_csr(self):
+ key = ec.generate_private_key(ec.SECP384R1(), default_backend())
+ csr = cu.create_csr(
+ key, 'i am dummy test',
+ 'US', 'CA', 'MPK', 'FB', 'magma', 'magma@fb.com',
+ )
+ self.assertTrue(csr.is_signature_valid)
+ public_key_bytes = key.public_key().public_bytes(
+ serialization.Encoding.OpenSSH,
+ serialization.PublicFormat.OpenSSH,
+ )
+ csr_public_key_bytes = csr.public_key().public_bytes(
+ serialization.Encoding.OpenSSH,
+ serialization.PublicFormat.OpenSSH,
+ )
+ self.assertEqual(public_key_bytes, csr_public_key_bytes)
+
+ def test_cert(self):
+ with TemporaryDirectory(prefix='/tmp/test_cert_utils') as temp_dir:
+ cert = _create_dummy_cert()
+ cert_file = os.path.join(temp_dir, 'test.cert')
+ cu.write_cert(
+ cert.public_bytes(
+ serialization.Encoding.DER,
+ ), cert_file,
+ )
+ cert_load = cu.load_cert(cert_file)
+ self.assertEqual(cert, cert_load)
+
+
+def _create_dummy_cert():
+ key = ec.generate_private_key(ec.SECP384R1(), default_backend())
+ subject = issuer = x509.Name([
+ x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, u"US"),
+ x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
+ x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, u"San Francisco"),
+ x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, u"My Company"),
+ x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"mysite.com"),
+ ])
+ cert = x509.CertificateBuilder().subject_name(
+ subject,
+ ).issuer_name(
+ issuer,
+ ).public_key(
+ key.public_key(),
+ ).serial_number(
+ x509.random_serial_number(),
+ ).not_valid_before(
+ datetime.datetime.utcnow(),
+ ).not_valid_after(
+ datetime.datetime.utcnow() + datetime.timedelta(days=10),
+ ).sign(key, hashes.SHA256(), default_backend())
+ return cert
diff --git a/common/tests/cert_validity_tests.py b/common/tests/cert_validity_tests.py
new file mode 100644
index 0000000..8ce67dc
--- /dev/null
+++ b/common/tests/cert_validity_tests.py
@@ -0,0 +1,285 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import asyncio
+import errno
+import ssl
+from unittest import TestCase
+from unittest.mock import MagicMock, patch
+
+import magma.common.cert_validity as cv
+
+
+# https://stackoverflow.com/questions/32480108/mocking-async-call-in-python-3-5
+def AsyncMock():
+ coro = MagicMock(name="CoroutineResult")
+ corofunc = MagicMock(
+ name="CoroutineFunction",
+ side_effect=asyncio.coroutine(coro),
+ )
+ corofunc.coro = coro
+ return corofunc
+
+
+class CertValidityTests(TestCase):
+ def setUp(self):
+ self.host = 'localhost'
+ self.port = 8080
+ self.certfile = 'certfile'
+ self.keyfile = 'keyfile'
+
+ asyncio.set_event_loop(None)
+ self.loop = asyncio.new_event_loop()
+
+ def test_tcp_connection(self):
+ """
+ Test that loop.create_connection called with the correct TCP args.
+ """
+ self.loop.create_connection = MagicMock()
+
+ @asyncio.coroutine
+ def go():
+ yield from cv.create_tcp_connection(
+ self.host,
+ self.port,
+ self.loop,
+ )
+ self.loop.run_until_complete(go())
+
+ self.loop.create_connection.assert_called_once_with(
+ cv.TCPClientProtocol,
+ self.host,
+ self.port,
+ )
+
+ @patch('magma.common.cert_validity.ssl.SSLContext')
+ def test_ssl_connection(self, mock_ssl):
+ """
+ Test that ssl.SSLContext and loop.create_connection are called with the
+ correct SSL args.
+ """
+ self.loop.create_connection = MagicMock()
+
+ @asyncio.coroutine
+ def go():
+ yield from cv.create_ssl_connection(
+ self.host,
+ self.port,
+ self.certfile,
+ self.keyfile,
+ self.loop,
+ )
+ self.loop.run_until_complete(go())
+
+ mock_context = mock_ssl.return_value
+
+ mock_ssl.assert_called_once_with(ssl.PROTOCOL_SSLv23)
+ mock_context.load_cert_chain.assert_called_once_with(
+ self.certfile,
+ keyfile=self.keyfile,
+ )
+
+ self.loop.create_connection.assert_called_once_with(
+ cv.TCPClientProtocol,
+ self.host,
+ self.port,
+ ssl=mock_context,
+ )
+
+ @patch(
+ 'magma.common.cert_validity.create_ssl_connection',
+ new_callable=AsyncMock,
+ )
+ @patch(
+ 'magma.common.cert_validity.create_tcp_connection',
+ new_callable=AsyncMock,
+ )
+ def test_cert_is_invalid_both_ok(self, mock_create_tcp, mock_create_ssl):
+ """
+ Test the appropriate calls and return value for cert_is_invalid()
+ cert_is_invalid() == False when TCP and SSL succeed
+ """
+
+ @asyncio.coroutine
+ def go():
+ return (
+ yield from cv.cert_is_invalid(
+ self.host,
+ self.port,
+ self.certfile,
+ self.keyfile,
+ self.loop,
+ )
+ )
+ ret_val = self.loop.run_until_complete(go())
+
+ mock_create_tcp.assert_called_once_with(
+ self.host,
+ self.port,
+ self.loop,
+ )
+ mock_create_ssl.assert_called_once_with(
+ self.host,
+ self.port,
+ self.certfile,
+ self.keyfile,
+ self.loop,
+ )
+ self.assertEqual(ret_val, False)
+
+ @patch(
+ 'magma.common.cert_validity.create_ssl_connection',
+ new_callable=AsyncMock,
+ )
+ @patch('magma.common.cert_validity.create_tcp_connection', AsyncMock())
+ def test_cert_is_invalid_ssl_fail(self, mock_create_ssl):
+ """
+ Test cert_is_invalid() == True when TCP succeeds and SSL fails
+ """
+
+ mock_err = TimeoutError()
+ mock_err.errno = errno.ETIMEDOUT
+ mock_create_ssl.coro.side_effect = mock_err
+
+ @asyncio.coroutine
+ def go():
+ return (
+ yield from cv.cert_is_invalid(
+ self.host,
+ self.port,
+ self.certfile,
+ self.keyfile,
+ self.loop,
+ )
+ )
+ ret_val = self.loop.run_until_complete(go())
+ self.assertEqual(ret_val, True)
+
+ @patch(
+ 'magma.common.cert_validity.create_ssl_connection',
+ new_callable=AsyncMock,
+ )
+ @patch('magma.common.cert_validity.create_tcp_connection', AsyncMock())
+ def test_cert_is_invalid_ssl_fail_none_errno(self, mock_create_ssl):
+ """
+ Test cert_is_invalid() == True when TCP succeeds and SSL fails w/o error number
+ """
+
+ mock_err = TimeoutError()
+ mock_err.errno = None
+ mock_create_ssl.coro.side_effect = mock_err
+
+ @asyncio.coroutine
+ def go():
+ return (
+ yield from cv.cert_is_invalid(
+ self.host,
+ self.port,
+ self.certfile,
+ self.keyfile,
+ self.loop,
+ )
+ )
+ ret_val = self.loop.run_until_complete(go())
+ self.assertEqual(ret_val, True)
+
+ @patch('magma.common.cert_validity.create_ssl_connection', AsyncMock())
+ @patch(
+ 'magma.common.cert_validity.create_tcp_connection',
+ new_callable=AsyncMock,
+ )
+ def test_cert_is_invalid_tcp_fail_none_errno(self, mock_create_tcp):
+ """
+ Test cert_is_invalid() == False when TCP fails w/o errno and SSL succeeds
+ """
+
+ mock_err = TimeoutError()
+ mock_err.errno = None
+ mock_create_tcp.coro.side_effect = mock_err
+
+ @asyncio.coroutine
+ def go():
+ return (
+ yield from cv.cert_is_invalid(
+ self.host,
+ self.port,
+ self.certfile,
+ self.keyfile,
+ self.loop,
+ )
+ )
+ ret_val = self.loop.run_until_complete(go())
+ self.assertEqual(ret_val, False)
+
+ @patch('magma.common.cert_validity.create_ssl_connection', AsyncMock())
+ @patch(
+ 'magma.common.cert_validity.create_tcp_connection',
+ new_callable=AsyncMock,
+ )
+ def test_cert_is_invalid_tcp_fail(self, mock_create_tcp):
+ """
+ Test cert_is_invalid() == False when TCP fails and SSL succeeds
+ """
+
+ mock_err = TimeoutError()
+ mock_err.errno = errno.ETIMEDOUT
+ mock_create_tcp.coro.side_effect = mock_err
+
+ @asyncio.coroutine
+ def go():
+ return (
+ yield from cv.cert_is_invalid(
+ self.host,
+ self.port,
+ self.certfile,
+ self.keyfile,
+ self.loop,
+ )
+ )
+ ret_val = self.loop.run_until_complete(go())
+ self.assertEqual(ret_val, False)
+
+ @patch(
+ 'magma.common.cert_validity.create_ssl_connection',
+ new_callable=AsyncMock,
+ )
+ @patch(
+ 'magma.common.cert_validity.create_tcp_connection',
+ new_callable=AsyncMock,
+ )
+ def test_cert_is_invalid_both_fail(self, mock_create_tcp, mock_create_ssl):
+ """
+ Test cert_is_invalid() == False when TCP and SSL fail
+ """
+
+ mock_tcp_err = TimeoutError()
+ mock_tcp_err.errno = errno.ETIMEDOUT
+ mock_create_tcp.coro.side_effect = mock_tcp_err
+
+ mock_ssl_err = TimeoutError()
+ mock_ssl_err.errno = errno.ETIMEDOUT
+ mock_create_ssl.coro.side_effect = mock_ssl_err
+
+ @asyncio.coroutine
+ def go():
+ return (
+ yield from cv.cert_is_invalid(
+ self.host,
+ self.port,
+ self.certfile,
+ self.keyfile,
+ self.loop,
+ )
+ )
+ ret_val = self.loop.run_until_complete(go())
+ self.assertEqual(ret_val, False)
diff --git a/common/tests/metrics_tests.py b/common/tests/metrics_tests.py
new file mode 100644
index 0000000..f48f2f8
--- /dev/null
+++ b/common/tests/metrics_tests.py
@@ -0,0 +1,241 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import unittest
+import unittest.mock
+
+import metrics_pb2
+from common import metrics_export
+from orc8r.protos import metricsd_pb2
+from prometheus_client import (
+ CollectorRegistry,
+ Counter,
+ Gauge,
+ Histogram,
+ Summary,
+)
+
+
+class Service303MetricTests(unittest.TestCase):
+ """
+ Tests for the Service303 metrics interface
+ """
+
+ def setUp(self):
+ self.registry = CollectorRegistry()
+ self.maxDiff = None
+
+ def test_counter(self):
+ """Test that we can track counters in Service303"""
+ # Add a counter with a label to the regisry
+ c = Counter(
+ 'process_max_fds', 'A counter', ['result'],
+ registry=self.registry,
+ )
+
+ # Create two series for value1 and value2
+ c.labels('success').inc(1.23)
+ c.labels('failure').inc(2.34)
+
+ # Build proto outputs
+ counter1 = metrics_pb2.Counter(value=1.23)
+ counter2 = metrics_pb2.Counter(value=2.34)
+ metric1 = metrics_pb2.Metric(
+ counter=counter1,
+ timestamp_ms=1234000,
+ )
+ metric2 = metrics_pb2.Metric(
+ counter=counter2,
+ timestamp_ms=1234000,
+ )
+ family = metrics_pb2.MetricFamily(
+ name=str(metricsd_pb2.process_max_fds),
+ type=metrics_pb2.COUNTER,
+ )
+ metric1.label.add(
+ name=str(metricsd_pb2.result),
+ value='success',
+ )
+ metric2.label.add(
+ name=str(metricsd_pb2.result),
+ value='failure',
+ )
+ family.metric.extend([metric1, metric2])
+
+ with unittest.mock.patch('time.time') as mock_time:
+ mock_time.side_effect = lambda: 1234
+ self.assertCountEqual(
+ list(metrics_export.get_metrics(self.registry))[0].metric,
+ family.metric,
+ )
+
+ def test_gauge(self):
+ """Test that we can track gauges in Service303"""
+ # Add a gauge with a label to the regisry
+ c = Gauge(
+ 'process_max_fds', 'A gauge', ['result'],
+ registry=self.registry,
+ )
+
+ # Create two series for value1 and value2
+ c.labels('success').inc(1.23)
+ c.labels('failure').inc(2.34)
+
+ # Build proto outputs
+ gauge1 = metrics_pb2.Gauge(value=1.23)
+ gauge2 = metrics_pb2.Gauge(value=2.34)
+ metric1 = metrics_pb2.Metric(
+ gauge=gauge1,
+ timestamp_ms=1234000,
+ )
+ metric2 = metrics_pb2.Metric(
+ gauge=gauge2,
+ timestamp_ms=1234000,
+ )
+ family = metrics_pb2.MetricFamily(
+ name=str(metricsd_pb2.process_max_fds),
+ type=metrics_pb2.GAUGE,
+ )
+ metric1.label.add(
+ name=str(metricsd_pb2.result),
+ value='success',
+ )
+ metric2.label.add(
+ name=str(metricsd_pb2.result),
+ value='failure',
+ )
+ family.metric.extend([metric1, metric2])
+
+ with unittest.mock.patch('time.time') as mock_time:
+ mock_time.side_effect = lambda: 1234
+ self.assertCountEqual(
+ list(metrics_export.get_metrics(self.registry))[0].metric,
+ family.metric,
+ )
+
+ def test_summary(self):
+ """Test that we can track summaries in Service303"""
+ # Add a summary with a label to the regisry
+ c = Summary(
+ 'process_max_fds', 'A summary', [
+ 'result',
+ ], registry=self.registry,
+ )
+ c.labels('success').observe(1.23)
+ c.labels('failure').observe(2.34)
+
+ # Build proto outputs
+ summary1 = metrics_pb2.Summary(sample_count=1, sample_sum=1.23)
+ summary2 = metrics_pb2.Summary(sample_count=1, sample_sum=2.34)
+ metric1 = metrics_pb2.Metric(
+ summary=summary1,
+ timestamp_ms=1234000,
+ )
+ metric2 = metrics_pb2.Metric(
+ summary=summary2,
+ timestamp_ms=1234000,
+ )
+ family = metrics_pb2.MetricFamily(
+ name=str(metricsd_pb2.process_max_fds),
+ type=metrics_pb2.SUMMARY,
+ )
+ metric1.label.add(
+ name=str(metricsd_pb2.result),
+ value='success',
+ )
+ metric2.label.add(
+ name=str(metricsd_pb2.result),
+ value='failure',
+ )
+ family.metric.extend([metric1, metric2])
+
+ with unittest.mock.patch('time.time') as mock_time:
+ mock_time.side_effect = lambda: 1234
+ self.assertCountEqual(
+ list(metrics_export.get_metrics(self.registry))[0].metric,
+ family.metric,
+ )
+
+ def test_histogram(self):
+ """Test that we can track histogram in Service303"""
+ # Add a histogram with a label to the regisry
+ c = Histogram(
+ 'process_max_fds', 'A summary', ['result'],
+ registry=self.registry, buckets=[0, 2, float('inf')],
+ )
+ c.labels('success').observe(1.23)
+ c.labels('failure').observe(2.34)
+
+ # Build proto outputs
+ histogram1 = metrics_pb2.Histogram(sample_count=1, sample_sum=1.23)
+ histogram1.bucket.add(upper_bound=0, cumulative_count=0)
+ histogram1.bucket.add(upper_bound=2, cumulative_count=1)
+ histogram1.bucket.add(upper_bound=float('inf'), cumulative_count=1)
+ histogram2 = metrics_pb2.Histogram(sample_count=1, sample_sum=2.34)
+ histogram2.bucket.add(upper_bound=0, cumulative_count=0)
+ histogram2.bucket.add(upper_bound=2, cumulative_count=0)
+ histogram2.bucket.add(upper_bound=float('inf'), cumulative_count=1)
+ metric1 = metrics_pb2.Metric(
+ histogram=histogram1,
+ timestamp_ms=1234000,
+ )
+ metric2 = metrics_pb2.Metric(
+ histogram=histogram2,
+ timestamp_ms=1234000,
+ )
+ family = metrics_pb2.MetricFamily(
+ name=str(metricsd_pb2.process_max_fds),
+ type=metrics_pb2.HISTOGRAM,
+ )
+ metric1.label.add(
+ name=str(metricsd_pb2.result),
+ value='success',
+ )
+ metric2.label.add(
+ name=str(metricsd_pb2.result),
+ value='failure',
+ )
+ family.metric.extend([metric1, metric2])
+
+ with unittest.mock.patch('time.time') as mock_time:
+ mock_time.side_effect = lambda: 1234
+ self.assertCountEqual(
+ list(metrics_export.get_metrics(self.registry))[0].metric,
+ family.metric,
+ )
+
+ def test_converted_enums(self):
+ """ Test that metric names and labels are auto converted """
+ # enum values (from metricsd.proto):
+ # mme_new_association => 500, result => 0
+ c = Counter(
+ 'mme_new_association', 'A counter', ['result'],
+ registry=self.registry,
+ )
+
+ c.labels('success').inc(1.23)
+
+ metric_family = list(metrics_export.get_metrics(self.registry))[0]
+
+ self.assertEqual(
+ metric_family.name,
+ str(metricsd_pb2.mme_new_association),
+ )
+ metric_labels = metric_family.metric[0].label
+ # Order not guaranteed=
+ self.assertEqual(metric_labels[0].name, str(metricsd_pb2.result))
+ self.assertEqual(metric_labels[0].value, 'success')
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/common/tests/service303_tests.py b/common/tests/service303_tests.py
new file mode 100644
index 0000000..49175e4
--- /dev/null
+++ b/common/tests/service303_tests.py
@@ -0,0 +1,90 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import asyncio
+from unittest import TestCase, main, mock
+
+from common.service import MagmaService
+from common.service_registry import ServiceRegistry
+from orc8r.protos.common_pb2 import Void
+from orc8r.protos.mconfig import mconfigs_pb2
+from orc8r.protos.service303_pb2 import ServiceInfo
+from orc8r.protos.service303_pb2_grpc import Service303Stub
+
+
+class Service303Tests(TestCase):
+ """
+ Tests for the MagmaService and the Service303 interface
+ """
+
+ @mock.patch('time.time', mock.MagicMock(return_value=12345))
+ def setUp(self):
+ ServiceRegistry.add_service('test', '0.0.0.0', 0)
+ self._stub = None
+
+ self._loop = asyncio.new_event_loop()
+ # Use a new event loop to ensure isolated tests
+ self._service = MagmaService(
+ name='test',
+ empty_mconfig=mconfigs_pb2.MagmaD(),
+ loop=self._loop,
+ )
+ asyncio.set_event_loop(self._service.loop)
+
+ @mock.patch(
+ 'magma.common.service_registry.ServiceRegistry.get_proxy_config',
+ )
+ def test_service_run(self, mock_get_proxy_config):
+ """
+ Test if the service starts and stops gracefully.
+ """
+
+ self.assertEqual(self._service.state, ServiceInfo.STARTING)
+
+ mock_get_proxy_config.return_value = {
+ 'cloud_address': '127.0.0.1',
+ 'proxy_cloud_connections': True,
+ }
+
+ # Start the service and pause the loop
+ self._service.loop.stop()
+ self._service.run()
+ asyncio.set_event_loop(self._service.loop)
+ self._service.log_counter._periodic_task.cancel()
+ self.assertEqual(self._service.state, ServiceInfo.ALIVE)
+
+ # Create a rpc stub and query the Service303 interface
+ ServiceRegistry.add_service('test', '0.0.0.0', self._service.port)
+ channel = ServiceRegistry.get_rpc_channel(
+ 'test',
+ ServiceRegistry.LOCAL,
+ )
+ self._stub = Service303Stub(channel)
+
+ info = ServiceInfo(
+ name='test',
+ version='0.0.0',
+ state=ServiceInfo.ALIVE,
+ health=ServiceInfo.APP_HEALTHY,
+ start_time_secs=12345,
+ )
+ self.assertEqual(self._stub.GetServiceInfo(Void()), info)
+
+ # Stop the service
+ self._stub.StopService(Void())
+ self._service.loop.run_forever()
+ self.assertEqual(self._service.state, ServiceInfo.STOPPED)
+
+
+if __name__ == "__main__":
+ main()