blob: 5b1e641c3b5270ebbd1f43f665dee8c790bdf5fa [file] [log] [blame]
Wei-Yu Chen49950b92021-11-08 19:19:18 +08001"""
2Copyright 2020 The Magma Authors.
3
4This source code is licensed under the BSD-style license found in the
5LICENSE file in the root directory of this source tree.
6
7Unless required by applicable law or agreed to in writing, software
8distributed under the License is distributed on an "AS IS" BASIS,
9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10See the License for the specific language governing permissions and
11limitations under the License.
12"""
13
14import json
15import logging
16import socket
17from contextlib import closing
18from typing import Any, Dict
19
20import grpc
21import jsonschema
22from common.rpc_utils import return_void
23from eventd.event_validator import EventValidator
24from orc8r.protos import eventd_pb2, eventd_pb2_grpc
25
26RETRY_ON_FAILURE = 'retry_on_failure'
27
28
29class EventDRpcServicer(eventd_pb2_grpc.EventServiceServicer):
30 """
31 gRPC based server for EventD.
32 """
33
34 def __init__(self, config: Dict[str, Any], validator: EventValidator):
35 self._fluent_bit_port = config['fluent_bit_port']
36 self._tcp_timeout = config['tcp_timeout']
37 self._event_registry = config['event_registry']
38 self._validator = validator
39
40 def add_to_server(self, server):
41 """
42 Add the servicer to a gRPC server
43 """
44 eventd_pb2_grpc.add_EventServiceServicer_to_server(self, server)
45
46 @return_void
47 def LogEvent(self, request: eventd_pb2.Event, context):
48 """
49 Logs an event.
50 """
51 logging.debug("Logging event: %s", request)
52
53 try:
54 self._validator.validate_event(request.value, request.event_type)
55 except (KeyError, jsonschema.ValidationError) as e:
56 logging.error("KeyError for log: %s. Error: %s", request, e)
57 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
58 context.set_details(
59 'Event validation failed, Details: {}'.format(e),
60 )
61 return
62
63 value = {
64 'stream_name': request.stream_name,
65 'event_type': request.event_type,
66 'event_tag': request.tag,
67 'value': request.value,
68 'retry_on_failure': self._needs_retries(request.event_type),
69 }
70 try:
71 with closing(
72 socket.create_connection(
73 ('localhost', self._fluent_bit_port),
74 timeout=self._tcp_timeout,
75 ),
76 ) as sock:
77 logging.debug('Sending log to FluentBit')
78 sock.sendall(json.dumps(value).encode('utf-8'))
79 except socket.error as e:
80 logging.error('Connection to FluentBit failed: %s', e)
81 logging.info(
82 'FluentBit (td-agent-bit) may not be enabled '
83 'or configured correctly',
84 )
85 context.set_code(grpc.StatusCode.UNAVAILABLE)
86 context.set_details(
87 'Could not connect to FluentBit locally, Details: {}'
88 .format(e),
89 )
90 return
91
92 logging.debug("Successfully logged event: %s", request)
93
94 def _needs_retries(self, event_type: str) -> str:
95 if event_type not in self._event_registry:
96 # Should not get here
97 return 'False'
98 if RETRY_ON_FAILURE not in self._event_registry[event_type]:
99 return 'False'
100 return str(self._event_registry[event_type][RETRY_ON_FAILURE])