Init commit for standalone enodebd

Change-Id: I88eeef5135dd7ba8551ddd9fb6a0695f5325337b
diff --git a/eventd/__init__.py b/eventd/__init__.py
new file mode 100644
index 0000000..5c6cb64
--- /dev/null
+++ b/eventd/__init__.py
@@ -0,0 +1,12 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
diff --git a/eventd/event_validator.py b/eventd/event_validator.py
new file mode 100644
index 0000000..31b1636
--- /dev/null
+++ b/eventd/event_validator.py
@@ -0,0 +1,125 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+import logging
+from contextlib import closing
+from typing import Any, Dict
+
+import pkg_resources
+import yaml
+from bravado_core.spec import Spec
+from bravado_core.validate import validate_object as bravado_validate
+
+EVENT_REGISTRY = 'event_registry'
+SWAGGER_SPEC = 'swagger_spec'
+BRAVADO_SPEC = 'bravado_spec'
+MODULE = 'module'
+FILENAME = 'filename'
+DEFINITIONS = 'definitions'
+
+
+class EventValidator(object):
+    """
+    gRPC based server for EventD.
+    """
+
+    def __init__(self, config: Dict[str, Any]):
+        self.event_registry = config[EVENT_REGISTRY]
+        self.specs_by_filename = self._load_specs_from_registry()
+
+    def validate_event(self, raw_event: str, event_type: str) -> None:
+        """
+        Checks if an event is registered and validates it based on
+        a registered schema.
+        Args:
+            raw_event: The event to be validated, as a JSON-encoded string
+            event_type: The type of an event, which corresponds
+            to a generated model
+        Returns:
+            Does not return, but throws exceptions if validation fails.
+        """
+        event = json.loads(raw_event)
+
+        # Event not in registry
+        if event_type not in self.event_registry:
+            logging.debug(
+                'Event type %s not among registered event types (%s)',
+                event_type, self.event_registry,
+            )
+            raise KeyError(
+                'Event type {} not registered, '
+                'please add it to the EventD config'.format(event_type),
+            )
+        filename = self.event_registry[event_type][FILENAME]
+        bravado_validate(
+            self.specs_by_filename[filename][BRAVADO_SPEC],
+            self.specs_by_filename[filename][SWAGGER_SPEC][event_type],
+            event,
+        )
+
+    def _load_specs_from_registry(self) -> Dict[str, Any]:
+        """
+        Loads all swagger definitions from the files specified in the
+        event registry.
+        """
+        specs_by_filename = {}
+        for event_type, info in self.event_registry.items():
+            filename = info[FILENAME]
+            if filename in specs_by_filename:
+                # Spec for this file is already registered
+                self._check_event_exists_in_spec(
+                    specs_by_filename[filename][SWAGGER_SPEC],
+                    filename,
+                    event_type,
+                )
+                continue
+
+            module = '{}.swagger.specs'.format(info[MODULE])
+            if not pkg_resources.resource_exists(module, filename):
+                raise LookupError(
+                    'File {} not found under {}/swagger, please ensure that '
+                    'it exists'.format(filename, info[MODULE]),
+                )
+
+            stream = pkg_resources.resource_stream(module, filename)
+            with closing(stream) as spec_file:
+                swagger_spec = yaml.safe_load(spec_file)
+                self._check_event_exists_in_spec(
+                    swagger_spec[DEFINITIONS], filename, event_type,
+                )
+
+                config = {'validate_swagger_spec': False}
+                bravado_spec = Spec.from_dict(swagger_spec, config=config)
+                specs_by_filename[filename] = {
+                    SWAGGER_SPEC: swagger_spec[DEFINITIONS],
+                    BRAVADO_SPEC: bravado_spec,
+                }
+
+        return specs_by_filename
+
+    @staticmethod
+    def _check_event_exists_in_spec(
+            swagger_definitions: Dict[str, Any],
+            filename: str,
+            event_type: str,
+    ):
+        """
+        Throw a KeyError if the event_type does not exist in swagger_definitions
+        """
+        if event_type not in swagger_definitions:
+            raise KeyError(
+                'Event type {} is not defined in {}, '
+                'please add the definition and re-generate '
+                'swagger specifications'.format(event_type, filename),
+            )
diff --git a/eventd/eventd_client.py b/eventd/eventd_client.py
new file mode 100644
index 0000000..3f8b63e
--- /dev/null
+++ b/eventd/eventd_client.py
@@ -0,0 +1,52 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import logging
+
+import grpc
+from google.protobuf.json_format import MessageToDict
+from common.service_registry import ServiceRegistry
+from orc8r.protos.eventd_pb2 import Event
+from orc8r.protos.eventd_pb2_grpc import EventServiceStub
+
+EVENTD_SERVICE_NAME = "eventd"
+DEFAULT_GRPC_TIMEOUT = 10
+
+
+def log_event(event: Event) -> None:
+    """
+    Make RPC call to 'LogEvent' method of local eventD service
+    """
+    try:
+        chan = ServiceRegistry.get_rpc_channel(
+            EVENTD_SERVICE_NAME, ServiceRegistry.LOCAL,
+        )
+    except ValueError:
+        logging.error("Cant get RPC channel to %s", EVENTD_SERVICE_NAME)
+        return
+    client = EventServiceStub(chan)
+    try:
+        # Location will be filled in by directory service
+        client.LogEvent(event, DEFAULT_GRPC_TIMEOUT)
+    except grpc.RpcError as err:
+        if err.code() == grpc.StatusCode.UNAVAILABLE:
+            logging.debug(
+                "LogEvent will not occur unless eventd configuration "
+                "is set up.",
+            )
+        else:
+            logging.error(
+                "LogEvent error for event: %s, [%s] %s",
+                MessageToDict(event),
+                err.code(),
+                err.details(),
+            )
diff --git a/eventd/main.py b/eventd/main.py
new file mode 100644
index 0000000..64bb124
--- /dev/null
+++ b/eventd/main.py
@@ -0,0 +1,40 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from common.sentry import sentry_init
+from common.service import MagmaService
+from eventd.event_validator import EventValidator
+from eventd.rpc_servicer import EventDRpcServicer
+from orc8r.protos.mconfig.mconfigs_pb2 import EventD
+
+
+def main():
+    """ main() for eventd """
+    service = MagmaService('eventd', EventD())
+
+    # Optionally pipe errors to Sentry
+    sentry_init(service_name=service.name)
+
+    event_validator = EventValidator(service.config)
+    eventd_servicer = EventDRpcServicer(service.config, event_validator)
+    eventd_servicer.add_to_server(service.rpc_server)
+
+    # Run the service loop
+    service.run()
+
+    # Cleanup the service
+    service.close()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/eventd/rpc_servicer.py b/eventd/rpc_servicer.py
new file mode 100644
index 0000000..5b1e641
--- /dev/null
+++ b/eventd/rpc_servicer.py
@@ -0,0 +1,100 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+import logging
+import socket
+from contextlib import closing
+from typing import Any, Dict
+
+import grpc
+import jsonschema
+from common.rpc_utils import return_void
+from eventd.event_validator import EventValidator
+from orc8r.protos import eventd_pb2, eventd_pb2_grpc
+
+RETRY_ON_FAILURE = 'retry_on_failure'
+
+
+class EventDRpcServicer(eventd_pb2_grpc.EventServiceServicer):
+    """
+    gRPC based server for EventD.
+    """
+
+    def __init__(self, config: Dict[str, Any], validator: EventValidator):
+        self._fluent_bit_port = config['fluent_bit_port']
+        self._tcp_timeout = config['tcp_timeout']
+        self._event_registry = config['event_registry']
+        self._validator = validator
+
+    def add_to_server(self, server):
+        """
+        Add the servicer to a gRPC server
+        """
+        eventd_pb2_grpc.add_EventServiceServicer_to_server(self, server)
+
+    @return_void
+    def LogEvent(self, request: eventd_pb2.Event, context):
+        """
+        Logs an event.
+        """
+        logging.debug("Logging event: %s", request)
+
+        try:
+            self._validator.validate_event(request.value, request.event_type)
+        except (KeyError, jsonschema.ValidationError) as e:
+            logging.error("KeyError for log: %s. Error: %s", request, e)
+            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+            context.set_details(
+                'Event validation failed, Details: {}'.format(e),
+            )
+            return
+
+        value = {
+            'stream_name': request.stream_name,
+            'event_type': request.event_type,
+            'event_tag': request.tag,
+            'value': request.value,
+            'retry_on_failure': self._needs_retries(request.event_type),
+        }
+        try:
+            with closing(
+                socket.create_connection(
+                ('localhost', self._fluent_bit_port),
+                timeout=self._tcp_timeout,
+                ),
+            ) as sock:
+                logging.debug('Sending log to FluentBit')
+                sock.sendall(json.dumps(value).encode('utf-8'))
+        except socket.error as e:
+            logging.error('Connection to FluentBit failed: %s', e)
+            logging.info(
+                'FluentBit (td-agent-bit) may not be enabled '
+                'or configured correctly',
+            )
+            context.set_code(grpc.StatusCode.UNAVAILABLE)
+            context.set_details(
+                'Could not connect to FluentBit locally, Details: {}'
+                .format(e),
+            )
+            return
+
+        logging.debug("Successfully logged event: %s", request)
+
+    def _needs_retries(self, event_type: str) -> str:
+        if event_type not in self._event_registry:
+            # Should not get here
+            return 'False'
+        if RETRY_ON_FAILURE not in self._event_registry[event_type]:
+            return 'False'
+        return str(self._event_registry[event_type][RETRY_ON_FAILURE])
diff --git a/eventd/tests/__init__.py b/eventd/tests/__init__.py
new file mode 100644
index 0000000..5c6cb64
--- /dev/null
+++ b/eventd/tests/__init__.py
@@ -0,0 +1,12 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
diff --git a/eventd/tests/event_validation_tests.py b/eventd/tests/event_validation_tests.py
new file mode 100644
index 0000000..54c73e1
--- /dev/null
+++ b/eventd/tests/event_validation_tests.py
@@ -0,0 +1,124 @@
+"""
+Copyright 2020 The Magma Authors.
+
+This source code is licensed under the BSD-style license found in the
+LICENSE file in the root directory of this source tree.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+from unittest import TestCase
+
+from jsonschema import ValidationError
+from magma.eventd.event_validator import EventValidator
+
+
+class EventValidationTests(TestCase):
+
+    def setUp(self):
+        # A test event registry that specifies the test events
+        test_events_location = {
+            'module': 'orc8r',
+            'filename': 'test_event_definitions.yml',
+        }
+        config = {
+            'fluent_bit_port': '',
+            'tcp_timeout': '',
+            'event_registry': {
+                'simple_event': test_events_location,
+                'array_and_object_event': test_events_location,
+                'null_event': test_events_location,
+            },
+        }
+        self.validator = EventValidator(config)
+
+    def test_event_registration(self):
+        data = json.dumps({
+            'foo': 'magma',  # required
+            'bar': 123,
+        })
+        # Errors when event is not registered
+        with self.assertRaises(Exception):
+            self.validator.validate_event(data, 'non_existent_event')
+
+        # Does not error when event is registered
+        self.validator.validate_event(data, 'simple_event')
+
+    def test_field_consistency(self):
+        # Errors when there are missing fields (required fields)
+        with self.assertRaises(ValidationError):
+            # foo is missing
+            data = json.dumps({
+                'bar': 123,
+            })
+            self.validator.validate_event(data, 'simple_event')
+
+        # Errors on excess fields (additionalProperties set to false)
+        with self.assertRaises(ValidationError):
+            data = json.dumps({
+                'extra_field': 12,
+                'foo': 'asdf',
+                'bar': 123,
+            })
+            self.validator.validate_event(data, 'simple_event')
+
+        # Errors when there are missing AND excess fields
+        with self.assertRaises(ValidationError):
+            data = json.dumps({
+                'extra_field': 12,
+                'bar': 123,
+            })
+            # foo is missing
+            self.validator.validate_event(data, 'simple_event')
+
+        # Does not error when the fields are equivalent
+        data = json.dumps({
+            'foo': 'magma',  # required
+            'bar': 123,
+        })
+        self.validator.validate_event(data, 'simple_event')
+
+        # Does not error when event has no fields
+        self.validator.validate_event(json.dumps({}), 'null_event')
+
+    def test_type_checking(self):
+        data = json.dumps({
+            'an_array': ["a", "b"],
+            'an_object': {
+                "a_key": 1,
+                "b_key": 1,
+            },
+        })
+        # Does not error when the types match
+        self.validator.validate_event(data, 'array_and_object_event')
+
+        # Errors when the type is wrong for primitive fields
+        with self.assertRaises(ValidationError):
+            data = json.dumps({
+                'foo': 123,
+                'bar': 'asdf',
+            })
+            self.validator.validate_event(data, 'simple_event')
+
+        # Errors when the type is wrong for array
+        with self.assertRaises(ValidationError):
+            data = json.dumps({
+                'an_array': [1, 2, 3],
+                'an_object': {},
+            })
+            self.validator.validate_event(data, 'array_and_object_event')
+
+        # Errors when the value type is wrong for object
+        with self.assertRaises(ValidationError):
+            data = json.dumps({
+                'an_array': ["a", "b"],
+                'an_object': {
+                    "a_key": "wrong_value",
+                },
+            })
+            self.validator.validate_event(data, 'array_and_object_event')