Init commit for standalone enodebd
Change-Id: I88eeef5135dd7ba8551ddd9fb6a0695f5325337b
diff --git a/eventd/__init__.py b/eventd/__init__.py
new file mode 100644
index 0000000..5c6cb64
--- /dev/null
+++ b/eventd/__init__.py
@@ -0,0 +1,12 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
diff --git a/eventd/event_validator.py b/eventd/event_validator.py
new file mode 100644
index 0000000..31b1636
--- /dev/null
+++ b/eventd/event_validator.py
@@ -0,0 +1,125 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+import logging
+from contextlib import closing
+from typing import Any, Dict
+
+import pkg_resources
+import yaml
+from bravado_core.spec import Spec
+from bravado_core.validate import validate_object as bravado_validate
+
+EVENT_REGISTRY = 'event_registry'
+SWAGGER_SPEC = 'swagger_spec'
+BRAVADO_SPEC = 'bravado_spec'
+MODULE = 'module'
+FILENAME = 'filename'
+DEFINITIONS = 'definitions'
+
+
+class EventValidator(object):
+ """
+ gRPC based server for EventD.
+ """
+
+ def __init__(self, config: Dict[str, Any]):
+ self.event_registry = config[EVENT_REGISTRY]
+ self.specs_by_filename = self._load_specs_from_registry()
+
+ def validate_event(self, raw_event: str, event_type: str) -> None:
+ """
+ Checks if an event is registered and validates it based on
+ a registered schema.
+ Args:
+ raw_event: The event to be validated, as a JSON-encoded string
+ event_type: The type of an event, which corresponds
+ to a generated model
+ Returns:
+ Does not return, but throws exceptions if validation fails.
+ """
+ event = json.loads(raw_event)
+
+ # Event not in registry
+ if event_type not in self.event_registry:
+ logging.debug(
+ 'Event type %s not among registered event types (%s)',
+ event_type, self.event_registry,
+ )
+ raise KeyError(
+ 'Event type {} not registered, '
+ 'please add it to the EventD config'.format(event_type),
+ )
+ filename = self.event_registry[event_type][FILENAME]
+ bravado_validate(
+ self.specs_by_filename[filename][BRAVADO_SPEC],
+ self.specs_by_filename[filename][SWAGGER_SPEC][event_type],
+ event,
+ )
+
+ def _load_specs_from_registry(self) -> Dict[str, Any]:
+ """
+ Loads all swagger definitions from the files specified in the
+ event registry.
+ """
+ specs_by_filename = {}
+ for event_type, info in self.event_registry.items():
+ filename = info[FILENAME]
+ if filename in specs_by_filename:
+ # Spec for this file is already registered
+ self._check_event_exists_in_spec(
+ specs_by_filename[filename][SWAGGER_SPEC],
+ filename,
+ event_type,
+ )
+ continue
+
+ module = '{}.swagger.specs'.format(info[MODULE])
+ if not pkg_resources.resource_exists(module, filename):
+ raise LookupError(
+ 'File {} not found under {}/swagger, please ensure that '
+ 'it exists'.format(filename, info[MODULE]),
+ )
+
+ stream = pkg_resources.resource_stream(module, filename)
+ with closing(stream) as spec_file:
+ swagger_spec = yaml.safe_load(spec_file)
+ self._check_event_exists_in_spec(
+ swagger_spec[DEFINITIONS], filename, event_type,
+ )
+
+ config = {'validate_swagger_spec': False}
+ bravado_spec = Spec.from_dict(swagger_spec, config=config)
+ specs_by_filename[filename] = {
+ SWAGGER_SPEC: swagger_spec[DEFINITIONS],
+ BRAVADO_SPEC: bravado_spec,
+ }
+
+ return specs_by_filename
+
+ @staticmethod
+ def _check_event_exists_in_spec(
+ swagger_definitions: Dict[str, Any],
+ filename: str,
+ event_type: str,
+ ):
+ """
+ Throw a KeyError if the event_type does not exist in swagger_definitions
+ """
+ if event_type not in swagger_definitions:
+ raise KeyError(
+ 'Event type {} is not defined in {}, '
+ 'please add the definition and re-generate '
+ 'swagger specifications'.format(event_type, filename),
+ )
diff --git a/eventd/eventd_client.py b/eventd/eventd_client.py
new file mode 100644
index 0000000..3f8b63e
--- /dev/null
+++ b/eventd/eventd_client.py
@@ -0,0 +1,52 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import logging
+
+import grpc
+from google.protobuf.json_format import MessageToDict
+from common.service_registry import ServiceRegistry
+from orc8r.protos.eventd_pb2 import Event
+from orc8r.protos.eventd_pb2_grpc import EventServiceStub
+
+EVENTD_SERVICE_NAME = "eventd"
+DEFAULT_GRPC_TIMEOUT = 10
+
+
+def log_event(event: Event) -> None:
+ """
+ Make RPC call to 'LogEvent' method of local eventD service
+ """
+ try:
+ chan = ServiceRegistry.get_rpc_channel(
+ EVENTD_SERVICE_NAME, ServiceRegistry.LOCAL,
+ )
+ except ValueError:
+ logging.error("Cant get RPC channel to %s", EVENTD_SERVICE_NAME)
+ return
+ client = EventServiceStub(chan)
+ try:
+ # Location will be filled in by directory service
+ client.LogEvent(event, DEFAULT_GRPC_TIMEOUT)
+ except grpc.RpcError as err:
+ if err.code() == grpc.StatusCode.UNAVAILABLE:
+ logging.debug(
+ "LogEvent will not occur unless eventd configuration "
+ "is set up.",
+ )
+ else:
+ logging.error(
+ "LogEvent error for event: %s, [%s] %s",
+ MessageToDict(event),
+ err.code(),
+ err.details(),
+ )
diff --git a/eventd/main.py b/eventd/main.py
new file mode 100644
index 0000000..64bb124
--- /dev/null
+++ b/eventd/main.py
@@ -0,0 +1,40 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from common.sentry import sentry_init
+from common.service import MagmaService
+from eventd.event_validator import EventValidator
+from eventd.rpc_servicer import EventDRpcServicer
+from orc8r.protos.mconfig.mconfigs_pb2 import EventD
+
+
+def main():
+ """ main() for eventd """
+ service = MagmaService('eventd', EventD())
+
+ # Optionally pipe errors to Sentry
+ sentry_init(service_name=service.name)
+
+ event_validator = EventValidator(service.config)
+ eventd_servicer = EventDRpcServicer(service.config, event_validator)
+ eventd_servicer.add_to_server(service.rpc_server)
+
+ # Run the service loop
+ service.run()
+
+ # Cleanup the service
+ service.close()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/eventd/rpc_servicer.py b/eventd/rpc_servicer.py
new file mode 100644
index 0000000..5b1e641
--- /dev/null
+++ b/eventd/rpc_servicer.py
@@ -0,0 +1,100 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+import logging
+import socket
+from contextlib import closing
+from typing import Any, Dict
+
+import grpc
+import jsonschema
+from common.rpc_utils import return_void
+from eventd.event_validator import EventValidator
+from orc8r.protos import eventd_pb2, eventd_pb2_grpc
+
+RETRY_ON_FAILURE = 'retry_on_failure'
+
+
+class EventDRpcServicer(eventd_pb2_grpc.EventServiceServicer):
+ """
+ gRPC based server for EventD.
+ """
+
+ def __init__(self, config: Dict[str, Any], validator: EventValidator):
+ self._fluent_bit_port = config['fluent_bit_port']
+ self._tcp_timeout = config['tcp_timeout']
+ self._event_registry = config['event_registry']
+ self._validator = validator
+
+ def add_to_server(self, server):
+ """
+ Add the servicer to a gRPC server
+ """
+ eventd_pb2_grpc.add_EventServiceServicer_to_server(self, server)
+
+ @return_void
+ def LogEvent(self, request: eventd_pb2.Event, context):
+ """
+ Logs an event.
+ """
+ logging.debug("Logging event: %s", request)
+
+ try:
+ self._validator.validate_event(request.value, request.event_type)
+ except (KeyError, jsonschema.ValidationError) as e:
+ logging.error("KeyError for log: %s. Error: %s", request, e)
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ context.set_details(
+ 'Event validation failed, Details: {}'.format(e),
+ )
+ return
+
+ value = {
+ 'stream_name': request.stream_name,
+ 'event_type': request.event_type,
+ 'event_tag': request.tag,
+ 'value': request.value,
+ 'retry_on_failure': self._needs_retries(request.event_type),
+ }
+ try:
+ with closing(
+ socket.create_connection(
+ ('localhost', self._fluent_bit_port),
+ timeout=self._tcp_timeout,
+ ),
+ ) as sock:
+ logging.debug('Sending log to FluentBit')
+ sock.sendall(json.dumps(value).encode('utf-8'))
+ except socket.error as e:
+ logging.error('Connection to FluentBit failed: %s', e)
+ logging.info(
+ 'FluentBit (td-agent-bit) may not be enabled '
+ 'or configured correctly',
+ )
+ context.set_code(grpc.StatusCode.UNAVAILABLE)
+ context.set_details(
+ 'Could not connect to FluentBit locally, Details: {}'
+ .format(e),
+ )
+ return
+
+ logging.debug("Successfully logged event: %s", request)
+
+ def _needs_retries(self, event_type: str) -> str:
+ if event_type not in self._event_registry:
+ # Should not get here
+ return 'False'
+ if RETRY_ON_FAILURE not in self._event_registry[event_type]:
+ return 'False'
+ return str(self._event_registry[event_type][RETRY_ON_FAILURE])
diff --git a/eventd/tests/__init__.py b/eventd/tests/__init__.py
new file mode 100644
index 0000000..5c6cb64
--- /dev/null
+++ b/eventd/tests/__init__.py
@@ -0,0 +1,12 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
diff --git a/eventd/tests/event_validation_tests.py b/eventd/tests/event_validation_tests.py
new file mode 100644
index 0000000..54c73e1
--- /dev/null
+++ b/eventd/tests/event_validation_tests.py
@@ -0,0 +1,124 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+from unittest import TestCase
+
+from jsonschema import ValidationError
+from magma.eventd.event_validator import EventValidator
+
+
+class EventValidationTests(TestCase):
+
+ def setUp(self):
+ # A test event registry that specifies the test events
+ test_events_location = {
+ 'module': 'orc8r',
+ 'filename': 'test_event_definitions.yml',
+ }
+ config = {
+ 'fluent_bit_port': '',
+ 'tcp_timeout': '',
+ 'event_registry': {
+ 'simple_event': test_events_location,
+ 'array_and_object_event': test_events_location,
+ 'null_event': test_events_location,
+ },
+ }
+ self.validator = EventValidator(config)
+
+ def test_event_registration(self):
+ data = json.dumps({
+ 'foo': 'magma', # required
+ 'bar': 123,
+ })
+ # Errors when event is not registered
+ with self.assertRaises(Exception):
+ self.validator.validate_event(data, 'non_existent_event')
+
+ # Does not error when event is registered
+ self.validator.validate_event(data, 'simple_event')
+
+ def test_field_consistency(self):
+ # Errors when there are missing fields (required fields)
+ with self.assertRaises(ValidationError):
+ # foo is missing
+ data = json.dumps({
+ 'bar': 123,
+ })
+ self.validator.validate_event(data, 'simple_event')
+
+ # Errors on excess fields (additionalProperties set to false)
+ with self.assertRaises(ValidationError):
+ data = json.dumps({
+ 'extra_field': 12,
+ 'foo': 'asdf',
+ 'bar': 123,
+ })
+ self.validator.validate_event(data, 'simple_event')
+
+ # Errors when there are missing AND excess fields
+ with self.assertRaises(ValidationError):
+ data = json.dumps({
+ 'extra_field': 12,
+ 'bar': 123,
+ })
+ # foo is missing
+ self.validator.validate_event(data, 'simple_event')
+
+ # Does not error when the fields are equivalent
+ data = json.dumps({
+ 'foo': 'magma', # required
+ 'bar': 123,
+ })
+ self.validator.validate_event(data, 'simple_event')
+
+ # Does not error when event has no fields
+ self.validator.validate_event(json.dumps({}), 'null_event')
+
+ def test_type_checking(self):
+ data = json.dumps({
+ 'an_array': ["a", "b"],
+ 'an_object': {
+ "a_key": 1,
+ "b_key": 1,
+ },
+ })
+ # Does not error when the types match
+ self.validator.validate_event(data, 'array_and_object_event')
+
+ # Errors when the type is wrong for primitive fields
+ with self.assertRaises(ValidationError):
+ data = json.dumps({
+ 'foo': 123,
+ 'bar': 'asdf',
+ })
+ self.validator.validate_event(data, 'simple_event')
+
+ # Errors when the type is wrong for array
+ with self.assertRaises(ValidationError):
+ data = json.dumps({
+ 'an_array': [1, 2, 3],
+ 'an_object': {},
+ })
+ self.validator.validate_event(data, 'array_and_object_event')
+
+ # Errors when the value type is wrong for object
+ with self.assertRaises(ValidationError):
+ data = json.dumps({
+ 'an_array': ["a", "b"],
+ 'an_object': {
+ "a_key": "wrong_value",
+ },
+ })
+ self.validator.validate_event(data, 'array_and_object_event')