blob: cac248f7cc78a52ff538c3fb77e08f69b06e5d27 [file] [log] [blame]
Wei-Yu Chenad55cb82022-02-15 20:07:01 +08001# SPDX-FileCopyrightText: 2020 The Magma Authors.
2# SPDX-FileCopyrightText: 2022 Open Networking Foundation <support@opennetworking.org>
3#
4# SPDX-License-Identifier: BSD-3-Clause
Wei-Yu Chen49950b92021-11-08 19:19:18 +08005
Wei-Yu Chen49950b92021-11-08 19:19:18 +08006from copy import deepcopy
7from typing import Any, Iterator, List, MutableMapping, Optional, TypeVar
8
9import redis
10import redis_collections
11import redis_lock
12from common.redis.serializers import RedisSerde
13from orc8r.protos.redis_pb2 import RedisState
14from redis.lock import Lock
15
16# NOTE: these containers replace the serialization methods exposed by
17# the redis-collection objects. Although the methods are hinted to be
18# privately scoped, the method replacement is encouraged in the library's
19# docs: http://redis-collections.readthedocs.io/en/stable/usage-notes.html
20
21T = TypeVar('T')
22
23
24class RedisList(redis_collections.List):
25 """
26 List-like interface serializing elements to a Redis datastore.
27
28 Notes:
29 - Provides persistence across sessions
30 - Mutable elements handled correctly
31 - Not expected to be thread safe, but could be extended
32 """
33
34 def __init__(self, client, key, serialize, deserialize):
35 """
36 Initialize instance.
37
38 Args:
39 client (redis.Redis): Redis client object
40 key (str): key where this container's elements are stored in Redis
41 serialize (function (any) -> bytes):
42 function called to serialize an element
43 deserialize (function (bytes) -> any):
44 function called to deserialize an element
45 Returns:
46 redis_list (redis_collections.List): persistent list-like interface
47 """
48 self._pickle = serialize
49 self._unpickle = deserialize
50 super().__init__(redis=client, key=key, writeback=True)
51
52 def __copy__(self):
53 return [elt for elt in self]
54
55 def __deepcopy__(self, memo):
56 return [deepcopy(elt, memo) for elt in self]
57
58
59class RedisSet(redis_collections.Set):
60 """
61 Set-like interface serializing elements to a Redis datastore.
62
63 Notes:
64 - Provides persistence across sessions
65 - Mutable elements _not_ handled correctly:
66 - Get/set mutable elements supported
67 - Don't update the contents of a mutable element and
68 expect things to go well
69 - Expected to be thread safe, but not tested
70 """
71
72 def __init__(self, client, key, serialize, deserialize):
73 """
74 Initialize instance.
75
76 Args:
77 client (redis.Redis): Redis client object
78 key (str): key where this container's elements are stored in Redis
79 serialize (function (any) -> bytes):
80 function called to serialize an element
81 deserialize (function (bytes) -> any):
82 function called to deserialize an element
83 Returns:
84 redis_set (redis_collections.Set): persistent set-like interface
85 """
86 # NOTE: redis_collections.Set doesn't have a writeback option, causing
87 # issue when mutable elements are updated in-place.
88 self._pickle = serialize
89 self._unpickle = deserialize
90 super().__init__(redis=client, key=key)
91
92 def __copy__(self):
93 return {elt for elt in self}
94
95 def __deepcopy__(self, memo):
96 return {deepcopy(elt, memo) for elt in self}
97
98
99class RedisHashDict(redis_collections.DefaultDict):
100 """
101 Dict-like interface serializing elements to a Redis datastore. This dict
102 utilizes Redis's hashmap functionality
103
104 Notes:
105 - Keys must be string-like and are serialized to plaintext (UTF-8)
106 - Provides persistence across sessions
107 - Mutable elements handled correctly
108 - Not expected to be thread safe, but could be extended
109 - Keys are serialized in plaintext
110 """
111
112 @staticmethod
113 def serialize_key(key):
114 """ Serialize key to plaintext. """
115 return key
116
117 @staticmethod
118 def deserialize_key(serialized):
119 """ Deserialize key from plaintext encoded as UTF-8 bytes. """
120 return serialized.decode('utf-8') # Redis returns bytes
121
122 def __init__(
123 self, client, key, serialize, deserialize,
124 default_factory=None, writeback=False,
125 ):
126 """
127 Initialize instance.
128
129 Args:
130 client (redis.Redis): Redis client object
131 key (str): key where this container's elements are stored in Redis
132 serialize (function (any) -> bytes):
133 function called to serialize a value
134 deserialize (function (bytes) -> any):
135 function called to deserialize a value
136 default_factory: function that provides default value for a
137 non-existent key
138 writeback (bool): if writeback is set to true, dict maintains a
139 local cache of values and the `sync` method can be called to
140 store these values. NOTE: only use this option if syncing
141 between services is not important.
142
143 Returns:
144 redis_dict (redis_collections.Dict): persistent dict-like interface
145 """
146 # Key serialization (to/from plaintext)
147 self._pickle_key = RedisHashDict.serialize_key
148 self._unpickle_key = RedisHashDict.deserialize_key
149 # Value serialization
150 self._pickle_value = serialize
151 self._unpickle = deserialize
152 super().__init__(
153 default_factory, redis=client, key=key, writeback=writeback,
154 )
155
156 def __setitem__(self, key, value):
157 """Set ``d[key]`` to *value*.
158
159 Override in order to increment version on each update
160 """
161 version = self.get_version(key)
162 pickled_key = self._pickle_key(key)
163 pickled_value = self._pickle_value(value, version + 1)
164 self.redis.hset(self.key, pickled_key, pickled_value)
165
166 if self.writeback:
167 self.cache[key] = value
168
169 def __copy__(self):
170 return {key: self[key] for key in self}
171
172 def __deepcopy__(self, memo):
173 return {key: deepcopy(self[key], memo) for key in self}
174
175 def get_version(self, key):
176 """Return the version of the value for key *key*. Returns 0 if
177 key is not in the map
178 """
179 try:
180 value = self.cache[key]
181 except KeyError:
182 pickled_key = self._pickle_key(key)
183 value = self.redis.hget(self.key, pickled_key)
184 if value is None:
185 return 0
186
187 proto_wrapper = RedisState()
188 proto_wrapper.ParseFromString(value)
189 return proto_wrapper.version
190
191
192class RedisFlatDict(MutableMapping[str, T]):
193 """
194 Dict-like interface serializing elements to a Redis datastore. This
195 dict stores key directly (i.e. without a hashmap).
196 """
197
198 def __init__(
199 self, client: redis.Redis, serde: RedisSerde[T],
200 writethrough: bool = False,
201 ):
202 """
203 Args:
204 client (redis.Redis): Redis client object
205 serde (): RedisSerde for de/serializing the object stored
206 writethrough (bool): if writethrough is set to true,
207 RedisFlatDict maintains a local write-through cache of values.
208 """
209 super().__init__()
210 self._writethrough = writethrough
211 self.redis = client
212 self.serde = serde
213 self.redis_type = serde.redis_type
214 self.cache = {}
215 if self._writethrough:
216 self._sync_cache()
217
218 def __len__(self) -> int:
219 """Return the number of items in the dictionary."""
220 if self._writethrough:
221 return len(self.cache)
222
223 return len(self.keys())
224
225 def __iter__(self) -> Iterator[str]:
226 """Return an iterator over the keys of the dictionary."""
227 type_pattern = self._get_redis_type_pattern()
228
229 if self._writethrough:
230 for k in self.cache:
231 split_key, _ = k.split(":", 1)
232 yield split_key
233 else:
234 for k in self.redis.keys(pattern=type_pattern):
235 try:
236 deserialized_key = k.decode('utf-8')
237 split_key = deserialized_key.split(":", 1)
238 except AttributeError:
239 split_key = k.split(":", 1)
240 # There could be a delete key in between KEYS and GET, so ignore
241 # invalid values for now
242 try:
243 if self.is_garbage(split_key[0]):
244 continue
245 except KeyError:
246 continue
247 yield split_key[0]
248
249 def __contains__(self, key: str) -> bool:
250 """Return ``True`` if *key* is present and not garbage,
251 else ``False``.
252 """
253 composite_key = self._make_composite_key(key)
254
255 if self._writethrough:
256 return composite_key in self.cache
257
258 return bool(self.redis.exists(composite_key)) and \
259 not self.is_garbage(key)
260
261 def __getitem__(self, key: str) -> T:
262 """Return the item of dictionary with key *key:type*. Raises a
263 :exc:`KeyError` if *key:type* is not in the map or the object is
264 garbage
265 """
266 if ':' in key:
267 raise ValueError("Key %s cannot contain ':' char" % key)
268 composite_key = self._make_composite_key(key)
269
270 if self._writethrough:
271 cached_value = self.cache.get(composite_key)
272 if cached_value:
273 return cached_value
274
275 serialized_value = self.redis.get(composite_key)
276 if serialized_value is None:
277 raise KeyError(composite_key)
278
279 proto_wrapper = RedisState()
280 proto_wrapper.ParseFromString(serialized_value)
281 if proto_wrapper.is_garbage:
282 raise KeyError("Key %s is garbage" % key)
283
284 return self.serde.deserialize(serialized_value)
285
286 def __setitem__(self, key: str, value: T) -> Any:
287 """Set ``d[key:type]`` to *value*."""
288 if ':' in key:
289 raise ValueError("Key %s cannot contain ':' char" % key)
290 version = self.get_version(key)
291 serialized_value = self.serde.serialize(value, version + 1)
292 composite_key = self._make_composite_key(key)
293 if self._writethrough:
294 self.cache[composite_key] = value
295 return self.redis.set(composite_key, serialized_value)
296
297 def __delitem__(self, key: str) -> int:
298 """Remove ``d[key:type]`` from dictionary.
299 Raises a :func:`KeyError` if *key:type* is not in the map.
300 """
301 if ':' in key:
302 raise ValueError("Key %s cannot contain ':' char" % key)
303 composite_key = self._make_composite_key(key)
304 if self._writethrough:
305 del self.cache[composite_key]
306 deleted_count = self.redis.delete(composite_key)
307 if not deleted_count:
308 raise KeyError(composite_key)
309 return deleted_count
310
311 def get(self, key: str, default=None) -> Optional[T]:
312 """Get ``d[key:type]`` from dictionary.
313 Returns None if *key:type* is not in the map
314 """
315 try:
316 return self.__getitem__(key)
317 except (KeyError, ValueError):
318 return default
319
320 def clear(self) -> None:
321 """
322 Clear all keys in the dictionary. Objects are immediately deleted
323 (i.e. not garbage collected)
324 """
325 if self._writethrough:
326 self.cache.clear()
327 for key in self.keys():
328 composite_key = self._make_composite_key(key)
329 self.redis.delete(composite_key)
330
331 def get_version(self, key: str) -> int:
332 """Return the version of the value for key *key:type*. Returns 0 if
333 key is not in the map
334 """
335 composite_key = self._make_composite_key(key)
336 value = self.redis.get(composite_key)
337 if value is None:
338 return 0
339
340 proto_wrapper = RedisState()
341 proto_wrapper.ParseFromString(value)
342 return proto_wrapper.version
343
344 def keys(self) -> List[str]:
345 """Return a copy of the dictionary's list of keys
346 Note: for redis *key:type* key is returned
347 """
348 if self._writethrough:
349 return list(self.cache.keys())
350
351 return list(self.__iter__())
352
353 def mark_as_garbage(self, key: str) -> Any:
354 """Mark ``d[key:type]`` for garbage collection
355 Raises a KeyError if *key:type* is not in the map.
356 """
357 composite_key = self._make_composite_key(key)
358 value = self.redis.get(composite_key)
359 if value is None:
360 raise KeyError(composite_key)
361
362 proto_wrapper = RedisState()
363 proto_wrapper.ParseFromString(value)
364 proto_wrapper.is_garbage = True
365 garbage_serialized = proto_wrapper.SerializeToString()
366 return self.redis.set(composite_key, garbage_serialized)
367
368 def is_garbage(self, key: str) -> bool:
369 """Return if d[key:type] has been marked for garbage collection.
370 Raises a KeyError if *key:type* is not in the map.
371 """
372 composite_key = self._make_composite_key(key)
373 value = self.redis.get(composite_key)
374 if value is None:
375 raise KeyError(composite_key)
376
377 proto_wrapper = RedisState()
378 proto_wrapper.ParseFromString(value)
379 return proto_wrapper.is_garbage
380
381 def garbage_keys(self) -> List[str]:
382 """Return a copy of the dictionary's list of keys that are garbage
383 Note: for redis *key:type* key is returned
384 """
385 garbage_keys = []
386 type_pattern = self._get_redis_type_pattern()
387 for k in self.redis.keys(pattern=type_pattern):
388 try:
389 deserialized_key = k.decode('utf-8')
390 split_key = deserialized_key.split(":", 1)
391 except AttributeError:
392 split_key = k.split(":", 1)
393 # There could be a delete key in between KEYS and GET, so ignore
394 # invalid values for now
395 try:
396 if not self.is_garbage(split_key[0]):
397 continue
398 except KeyError:
399 continue
400 garbage_keys.append(split_key[0])
401 return garbage_keys
402
403 def delete_garbage(self, key) -> bool:
404 """Remove ``d[key:type]`` from dictionary iff the object is garbage
405 Returns False if *key:type* is not in the map
406 """
407 if not self.is_garbage(key):
408 return False
409 count = self.__delitem__(key)
410 return count > 0
411
412 def lock(self, key: str) -> Lock:
413 """Lock the dictionary for key *key*"""
414 return redis_lock.Lock(
415 self.redis,
416 name=self._make_composite_key(key) + ":lock",
417 expire=60,
418 auto_renewal=True,
419 strict=False,
420 )
421
422 def _sync_cache(self):
423 """
424 Syncs write-through cache with redis data on store.
425 """
426 type_pattern = self._get_redis_type_pattern()
427 for k in self.redis.keys(pattern=type_pattern):
428 composite_key = k.decode('utf-8')
429 serialized_value = self.redis.get(composite_key)
430 value = self.serde.deserialize(serialized_value)
431 self.cache[composite_key] = value
432
433 def _get_redis_type_pattern(self):
434 return "*:" + self.redis_type
435
436 def _make_composite_key(self, key):
437 return key + ":" + self.redis_type