blob: 31b17f1fa0b26a510e910c1ba1681ca478ea05b8 [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import json
7import logging
8import socket
9from contextlib import closing
10from typing import Any, Dict
11
12import grpc
13import jsonschema
14from common.rpc_utils import return_void
15from eventd.event_validator import EventValidator
16from orc8r.protos import eventd_pb2, eventd_pb2_grpc
17
18RETRY_ON_FAILURE = 'retry_on_failure'
19
20
21class EventDRpcServicer(eventd_pb2_grpc.EventServiceServicer):
22 """
23 gRPC based server for EventD.
24 """
25
26 def __init__(self, config: Dict[str, Any], validator: EventValidator):
27 self._fluent_bit_port = config['fluent_bit_port']
28 self._tcp_timeout = config['tcp_timeout']
29 self._event_registry = config['event_registry']
30 self._validator = validator
31
32 def add_to_server(self, server):
33 """
34 Add the servicer to a gRPC server
35 """
36 eventd_pb2_grpc.add_EventServiceServicer_to_server(self, server)
37
38 @return_void
39 def LogEvent(self, request: eventd_pb2.Event, context):
40 """
41 Logs an event.
42 """
43 logging.debug("Logging event: %s", request)
44
45 try:
46 self._validator.validate_event(request.value, request.event_type)
47 except (KeyError, jsonschema.ValidationError) as e:
48 logging.error("KeyError for log: %s. Error: %s", request, e)
49 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
50 context.set_details(
51 'Event validation failed, Details: {}'.format(e),
52 )
53 return
54
55 value = {
56 'stream_name': request.stream_name,
57 'event_type': request.event_type,
58 'event_tag': request.tag,
59 'value': request.value,
60 'retry_on_failure': self._needs_retries(request.event_type),
61 }
62 try:
63 with closing(
64 socket.create_connection(
65 ('localhost', self._fluent_bit_port),
66 timeout=self._tcp_timeout,
67 ),
68 ) as sock:
69 logging.debug('Sending log to FluentBit')
70 sock.sendall(json.dumps(value).encode('utf-8'))
71 except socket.error as e:
72 logging.error('Connection to FluentBit failed: %s', e)
73 logging.info(
74 'FluentBit (td-agent-bit) may not be enabled '
75 'or configured correctly',
76 )
77 context.set_code(grpc.StatusCode.UNAVAILABLE)
78 context.set_details(
79 'Could not connect to FluentBit locally, Details: {}'
80 .format(e),
81 )
82 return
83
84 logging.debug("Successfully logged event: %s", request)
85
86 def _needs_retries(self, event_type: str) -> str:
87 if event_type not in self._event_registry:
88 # Should not get here
89 return 'False'
90 if RETRY_ON_FAILURE not in self._event_registry[event_type]:
91 return 'False'
92 return str(self._event_registry[event_type][RETRY_ON_FAILURE])