blob: 282886cf07297caf0d4f40654849c2fedc66ce8b [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
6import abc
7import logging
8import threading
9import time
10from typing import Any, List
11
12import grpc
13import snowflake
14from google.protobuf import any_pb2
15from common import serialization_utils
16from common.metrics import STREAMER_RESPONSES
17from common.service_registry import ServiceRegistry
18from configuration.service_configs import get_service_config_value
19from orc8r.protos.streamer_pb2 import DataUpdate, StreamRequest
20from orc8r.protos.streamer_pb2_grpc import StreamerStub
21
22
23class StreamerClient(threading.Thread):
24 """
25 StreamerClient provides an interface to communicate with the Streamer
26 service in the cloud to get updates for a stream.
27
28 The StreamerClient spawns a thread which listens to updates and
29 schedules a callback in the asyncio event loop when an update
30 is received from the cloud.
31
32 If the connection to the cloud gets terminated, the StreamerClient
33 would retry (TBD: with exponential backoff) to connect back to the cloud.
34 """
35
36 class Callback:
37
38 @abc.abstractmethod
39 def get_request_args(self, stream_name: str) -> Any:
40 """
41 This is called before every stream request to collect any extra
42 arguments to send up to the cloud streamer service.
43
44 Args:
45 stream_name:
46 Name of the stream that the request arg will be sent to
47
48 Returns: A protobuf message
49 """
50 pass
51
52 @abc.abstractmethod
53 def process_update(
54 self, stream_name: str, updates: List[DataUpdate],
55 resync: bool,
56 ):
57 """
58 Called when we get an update from the cloud. This method will
59 be called in the event loop provided to the StreamerClient.
60
61 Args:
62 stream_name: Name of the stream
63 updates: Array of updates
64 resync: if true, the application can clear the
65 contents before applying the updates
66 """
67 raise NotImplementedError()
68
69 def __init__(self, stream_callbacks, loop):
70 """
71 Args:
72 stream_callbacks ({string: Callback}): Mapping of stream names to
73 callbacks to subscribe to.
74 loop: asyncio event loop to schedule the callback
75 """
76 threading.Thread.__init__(self)
77 self._stream_callbacks = stream_callbacks
78 self._loop = loop
79 # Set this thread as daemon thread. We can kill this background
80 # thread abruptly since we handle all updates (and database
81 # transactions) in the asyncio event loop.
82 self.daemon = True
83
84 # Don't allow stream update rate faster than every 5 seconds
85 self._reconnect_pause = get_service_config_value(
86 'streamer', 'reconnect_sec', 60,
87 )
88 self._reconnect_pause = max(5, self._reconnect_pause)
89 logging.info("Streamer reconnect pause: %d", self._reconnect_pause)
90 self._stream_timeout = get_service_config_value(
91 'streamer', 'stream_timeout', 150,
92 )
93 logging.info("Streamer timeout: %d", self._stream_timeout)
94
95 def run(self):
96 while True:
97 try:
98 channel = ServiceRegistry.get_rpc_channel(
99 'streamer', ServiceRegistry.CLOUD,
100 )
101 client = StreamerStub(channel)
102 self.process_all_streams(client)
103 except Exception as exp: # pylint: disable=broad-except
104 logging.error("Error with streamer: %s", exp)
105
106 # If the connection is terminated, wait for a period of time
107 # before connecting back to the cloud.
108 # TODO: make this more intelligent (exponential backoffs, etc.)
109 time.sleep(self._reconnect_pause)
110
111 def process_all_streams(self, client):
112 for stream_name, callback in self._stream_callbacks.items():
113 try:
114 self.process_stream_updates(client, stream_name, callback)
115
116 STREAMER_RESPONSES.labels(result='Success').inc()
117 except grpc.RpcError as err:
118 logging.error(
119 "Error! Streaming from the cloud failed! [%s] %s",
120 err.code(), err.details(),
121 )
122 STREAMER_RESPONSES.labels(result='RpcError').inc()
123 except ValueError as err:
124 logging.error("Error! Streaming from cloud failed! %s", err)
125 STREAMER_RESPONSES.labels(result='ValueError').inc()
126
127 def process_stream_updates(self, client, stream_name, callback):
128 extra_args = self._get_extra_args_any(callback, stream_name)
129 request = StreamRequest(
130 gatewayId=snowflake.snowflake(),
131 stream_name=stream_name,
132 extra_args=extra_args,
133 )
134 for update_batch in client.GetUpdates(
135 request, timeout=self._stream_timeout,
136 ):
137 self._loop.call_soon_threadsafe(
138 callback.process_update,
139 stream_name,
140 update_batch.updates,
141 update_batch.resync,
142 )
143
144 @staticmethod
145 def _get_extra_args_any(callback, stream_name):
146 extra_args = callback.get_request_args(stream_name)
147 if extra_args is None:
148 return None
149 else:
150 extra_any = any_pb2.Any()
151 extra_any.Pack(extra_args)
152 return extra_any
153
154
155def get_stream_serialize_filename(stream_name):
156 return '/var/opt/magma/streams/{}'.format(stream_name)
157
158
159class SerializingStreamCallback(StreamerClient.Callback):
160 """
161 Streamer client callback which decodes stream update as a string and writes
162 it to a file, overwriting the previous contents of that file. The file
163 location is defined by get_stream_serialize_filename.
164
165 This callback will only save the newest update, with each successive update
166 overwriting the previous.
167 """
168
169 def get_request_args(self, stream_name: str) -> Any:
170 return None
171
172 def process_update(self, stream_name, updates, resync):
173 if not updates:
174 return
175 # For now, we only care about the last (newest) update
176 for update in updates[:-1]:
177 logging.info('Ignoring update %s', update.key)
178
179 logging.info('Serializing stream update %s', updates[-1].key)
180 filename = get_stream_serialize_filename(stream_name)
181 serialization_utils.write_to_file_atomically(
182 filename,
183 updates[-1].value.decode(),
184 )