VOL-1101: Implement a Twisted Python shim layer to interface with any KV store
- Moved methods get, list, put, delete, reserve, renew_reservation,
release_reservation, and release_all_reservations from each of the
implementation classes to the parent KVClient.
- Did not move method _op_with_retry because it invokes different
target methods and handles different target exceptions, where
the target is Etcd or Consul.
Change-Id: I7c5c06ab5d554e17b8f7658b9b11b22cb8e492d8
diff --git a/common/kvstore/kv_client.py b/common/kvstore/kv_client.py
index 1a7d207..69a6480 100644
--- a/common/kvstore/kv_client.py
+++ b/common/kvstore/kv_client.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
from common.utils.asleep import asleep
from structlog import get_logger
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
log = get_logger()
@@ -58,7 +58,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: (KVPair, error) where KVPair is None if an error occurred
'''
- raise NotImplementedError('Method not implemented')
+ result = yield self._op_with_retry('GET', key, None, timeout)
+ returnValue(result)
@inlineCallbacks
def list(self, key, timeout=DEFAULT_TIMEOUT):
@@ -70,7 +71,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: ([]KVPair, error) where []KVPair is a list of KVPair objects
'''
- raise NotImplementedError('Method not implemented')
+ result = yield self._op_with_retry('LIST', key, None, timeout)
+ returnValue(result)
@inlineCallbacks
def put(self, key, value, timeout=DEFAULT_TIMEOUT):
@@ -85,7 +87,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: error, which is set to None for a successful write
'''
- raise NotImplementedError('Method not implemented')
+ _, err = yield self._op_with_retry('PUT', key, value, timeout)
+ returnValue(err)
@inlineCallbacks
def delete(self, key, timeout=DEFAULT_TIMEOUT):
@@ -96,7 +99,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: error, which is set to None for a successful deletion
'''
- raise NotImplementedError('Method not implemented')
+ _, err = yield self._op_with_retry('DELETE', key, None, timeout)
+ returnValue(err)
@inlineCallbacks
def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
@@ -116,7 +120,8 @@
be the value passed in. If the key is already acquired, then the value assigned
to that key will be returned.
'''
- raise NotImplementedError('Method not implemented')
+ result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
+ returnValue(result)
@inlineCallbacks
def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
@@ -128,7 +133,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: error, which is set to None for a successful renewal
'''
- raise NotImplementedError('Method not implemented')
+ result, err = yield self._op_with_retry('RENEW', key, None, timeout)
+ returnValue(err)
@inlineCallbacks
def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
@@ -139,7 +145,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: error, which is set to None for a successful cancellation
'''
- raise NotImplementedError('Method not implemented')
+ result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
+ returnValue(err)
@inlineCallbacks
def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
@@ -150,7 +157,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: error, which is set to None for a successful cancellation
'''
- raise NotImplementedError('Method not implemented')
+ result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
+ returnValue(err)
@inlineCallbacks
def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
@@ -178,6 +186,9 @@
'''
raise NotImplementedError('Method not implemented')
+ @inlineCallbacks
+ def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+ raise NotImplementedError('Method not implemented')
def _backoff(self, msg):
wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]