VOL-1101: Implement a Twisted Python shim layer to interface with any KV store
- Moved methods get, list, put, delete, reserve, renew_reservation,
release_reservation, and release_all_reservations from each of the
implementation classes to the parent KVClient.
- Did not move method _op_with_retry because it invokes different
target methods and handles different target exceptions, where
the target is Etcd or Consul.
Change-Id: I7c5c06ab5d554e17b8f7658b9b11b22cb8e492d8
diff --git a/common/kvstore/__init__.py b/common/kvstore/__init__.py
index b0fb0b2..4a82628 100644
--- a/common/kvstore/__init__.py
+++ b/common/kvstore/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/common/kvstore/consul_client.py b/common/kvstore/consul_client.py
index 825d673..bc14759 100644
--- a/common/kvstore/consul_client.py
+++ b/common/kvstore/consul_client.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -28,47 +28,6 @@
KVClient.__init__(self, kv_host, kv_port)
self.session_id = None
self.client = Consul(kv_host, kv_port)
- self.watcher = None
-
- @inlineCallbacks
- def get(self, key, timeout=DEFAULT_TIMEOUT):
- result = yield self._op_with_retry('GET', key, None, timeout)
- returnValue(result)
-
- @inlineCallbacks
- def list(self, key, timeout=DEFAULT_TIMEOUT):
- result = yield self._op_with_retry('LIST', key, None, timeout)
- returnValue(result)
-
- @inlineCallbacks
- def put(self, key, value, timeout=DEFAULT_TIMEOUT):
- _, err = yield self._op_with_retry('PUT', key, value, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def delete(self, key, timeout=DEFAULT_TIMEOUT):
- _, err = yield self._op_with_retry('DELETE', key, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
- result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
- returnValue(result)
-
- @inlineCallbacks
- def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
- result, err = yield self._op_with_retry('RENEW', key, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
- result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
- result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
- returnValue(err)
def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
self._retriggering_watch(key, key_change_callback, timeout)
diff --git a/common/kvstore/etcd_client.py b/common/kvstore/etcd_client.py
index 77451c8..a958b71 100644
--- a/common/kvstore/etcd_client.py
+++ b/common/kvstore/etcd_client.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -40,46 +40,6 @@
self.client = Client(reactor, self.url)
@inlineCallbacks
- def get(self, key, timeout=DEFAULT_TIMEOUT):
- result = yield self._op_with_retry('GET', key, None, timeout)
- returnValue(result)
-
- @inlineCallbacks
- def list(self, key, timeout=DEFAULT_TIMEOUT):
- result = yield self._op_with_retry('LIST', key, None, timeout)
- returnValue(result)
-
- @inlineCallbacks
- def put(self, key, value, timeout=DEFAULT_TIMEOUT):
- _, err = yield self._op_with_retry('PUT', key, value, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def delete(self, key, timeout=DEFAULT_TIMEOUT):
- _, err = yield self._op_with_retry('DELETE', key, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
- result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
- returnValue(result)
-
- @inlineCallbacks
- def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
- result, err = yield self._op_with_retry('RENEW', key, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
- result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
- def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
- result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
- returnValue(err)
-
- @inlineCallbacks
def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
self.key_watches[key] = key_change_callback
result = yield self._op_with_retry('WATCH', key, None, timeout, callback=self.key_changed)
@@ -137,7 +97,6 @@
callback = val
break
result = self.client.watch([KeySet(key, prefix=True)], callback)
- log.debug('watch-response', result=result)
self._clear_backoff()
break
except ConnectionRefusedError as ex:
diff --git a/common/kvstore/kv_client.py b/common/kvstore/kv_client.py
index 1a7d207..69a6480 100644
--- a/common/kvstore/kv_client.py
+++ b/common/kvstore/kv_client.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
from common.utils.asleep import asleep
from structlog import get_logger
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
log = get_logger()
@@ -58,7 +58,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: (KVPair, error) where KVPair is None if an error occurred
'''
- raise NotImplementedError('Method not implemented')
+ result = yield self._op_with_retry('GET', key, None, timeout)
+ returnValue(result)
@inlineCallbacks
def list(self, key, timeout=DEFAULT_TIMEOUT):
@@ -70,7 +71,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: ([]KVPair, error) where []KVPair is a list of KVPair objects
'''
- raise NotImplementedError('Method not implemented')
+ result = yield self._op_with_retry('LIST', key, None, timeout)
+ returnValue(result)
@inlineCallbacks
def put(self, key, value, timeout=DEFAULT_TIMEOUT):
@@ -85,7 +87,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: error, which is set to None for a successful write
'''
- raise NotImplementedError('Method not implemented')
+ _, err = yield self._op_with_retry('PUT', key, value, timeout)
+ returnValue(err)
@inlineCallbacks
def delete(self, key, timeout=DEFAULT_TIMEOUT):
@@ -96,7 +99,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: error, which is set to None for a successful deletion
'''
- raise NotImplementedError('Method not implemented')
+ _, err = yield self._op_with_retry('DELETE', key, None, timeout)
+ returnValue(err)
@inlineCallbacks
def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
@@ -116,7 +120,8 @@
be the value passed in. If the key is already acquired, then the value assigned
to that key will be returned.
'''
- raise NotImplementedError('Method not implemented')
+ result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
+ returnValue(result)
@inlineCallbacks
def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
@@ -128,7 +133,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: error, which is set to None for a successful renewal
'''
- raise NotImplementedError('Method not implemented')
+ result, err = yield self._op_with_retry('RENEW', key, None, timeout)
+ returnValue(err)
@inlineCallbacks
def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
@@ -139,7 +145,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: error, which is set to None for a successful cancellation
'''
- raise NotImplementedError('Method not implemented')
+ result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
+ returnValue(err)
@inlineCallbacks
def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
@@ -150,7 +157,8 @@
:param timeout: The length of time in seconds the method will wait for a response
:return: error, which is set to None for a successful cancellation
'''
- raise NotImplementedError('Method not implemented')
+ result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
+ returnValue(err)
@inlineCallbacks
def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
@@ -178,6 +186,9 @@
'''
raise NotImplementedError('Method not implemented')
+ @inlineCallbacks
+ def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
+ raise NotImplementedError('Method not implemented')
def _backoff(self, msg):
wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
diff --git a/common/kvstore/kvstore.py b/common/kvstore/kvstore.py
index a7f7390..662b34d 100644
--- a/common/kvstore/kvstore.py
+++ b/common/kvstore/kvstore.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2018-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.