blob: 69a648086a3fd816e8cec7d53131f88cc00f556b [file] [log] [blame]
Stephane Barbarie6e1bd502018-11-05 22:44:45 -05001# Copyright 2018-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from common.utils.asleep import asleep
16from structlog import get_logger
17from twisted.internet.defer import inlineCallbacks, returnValue
18
19log = get_logger()
20
21class KVPair():
22 def __init__(self, key, value, index):
23 self.key = key
24 self.value = value
25 self.index = index
26
27class Event():
28 PUT = 0
29 DELETE = 1
30 CONNECTION_DOWN = 2
31
32 def __init__(self, event_type, key, value):
33 self.event_type = event_type
34 self.key = key
35 self.value = value
36
37RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
38DEFAULT_TIMEOUT = 0.0
39for i in range(len(RETRY_BACKOFF)):
40 DEFAULT_TIMEOUT += RETRY_BACKOFF[i]
41
42class KVClient():
43
44 def __init__(self, kv_host, kv_port):
45 self.host = kv_host
46 self.port = kv_port
47 self.key_reservations = {}
48 self.key_watches = {}
49 self.retries = 0
50 self.retry_time = 0
51
52 @inlineCallbacks
53 def get(self, key, timeout=DEFAULT_TIMEOUT):
54 '''
55 This method returns the value of the given key in KV store.
56
57 :param key: The key whose value is requested
58 :param timeout: The length of time in seconds the method will wait for a response
59 :return: (KVPair, error) where KVPair is None if an error occurred
60 '''
61 result = yield self._op_with_retry('GET', key, None, timeout)
62 returnValue(result)
63
64 @inlineCallbacks
65 def list(self, key, timeout=DEFAULT_TIMEOUT):
66 '''
67 The list method returns an array of key-value pairs all of which
68 share the same key prefix.
69
70 :param key: The key prefix
71 :param timeout: The length of time in seconds the method will wait for a response
72 :return: ([]KVPair, error) where []KVPair is a list of KVPair objects
73 '''
74 result = yield self._op_with_retry('LIST', key, None, timeout)
75 returnValue(result)
76
77 @inlineCallbacks
78 def put(self, key, value, timeout=DEFAULT_TIMEOUT):
79 '''
80 The put method writes a value to the given key in KV store.
81 Do NOT modify a reserved key in an etcd store; doing so seems
82 to nullify the TTL of the key. In other words, the key lasts
83 forever.
84
85 :param key: The key to be written to
86 :param value: The value of the key
87 :param timeout: The length of time in seconds the method will wait for a response
88 :return: error, which is set to None for a successful write
89 '''
90 _, err = yield self._op_with_retry('PUT', key, value, timeout)
91 returnValue(err)
92
93 @inlineCallbacks
94 def delete(self, key, timeout=DEFAULT_TIMEOUT):
95 '''
96 The delete method removes a key from the KV store.
97
98 :param key: The key to be deleted
99 :param timeout: The length of time in seconds the method will wait for a response
100 :return: error, which is set to None for a successful deletion
101 '''
102 _, err = yield self._op_with_retry('DELETE', key, None, timeout)
103 returnValue(err)
104
105 @inlineCallbacks
106 def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
107 '''
108 This method acts essentially like a semaphore. The underlying mechanism
109 differs depending on the KV store: etcd uses a test-and-set transaction;
110 consul uses an acquire lock. If using etcd, do NOT write to the key
111 subsequent to the initial reservation; the TTL functionality may become
112 impaired (i.e. the reservation never expires).
113
114 :param key: The key under reservation
115 :param value: The reservation owner
116 :param ttl: The time-to-live (TTL) for the reservation. The key is unreserved
117 by the KV store when the TTL expires.
118 :param timeout: The length of time in seconds the method will wait for a response
119 :return: (key_value, error) If the key is acquired, then the value returned will
120 be the value passed in. If the key is already acquired, then the value assigned
121 to that key will be returned.
122 '''
123 result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
124 returnValue(result)
125
126 @inlineCallbacks
127 def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
128 '''
129 This method renews the reservation for a given key. A reservation expires
130 after the TTL (Time To Live) period specified when reserving the key.
131
132 :param key: The reserved key
133 :param timeout: The length of time in seconds the method will wait for a response
134 :return: error, which is set to None for a successful renewal
135 '''
136 result, err = yield self._op_with_retry('RENEW', key, None, timeout)
137 returnValue(err)
138
139 @inlineCallbacks
140 def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
141 '''
142 The release_reservation method cancels the reservation for a given key.
143
144 :param key: The reserved key
145 :param timeout: The length of time in seconds the method will wait for a response
146 :return: error, which is set to None for a successful cancellation
147 '''
148 result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
149 returnValue(err)
150
151 @inlineCallbacks
152 def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
153 '''
154 This method cancels all key reservations made previously
155 using the reserve API.
156
157 :param timeout: The length of time in seconds the method will wait for a response
158 :return: error, which is set to None for a successful cancellation
159 '''
160 result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
161 returnValue(err)
162
163 @inlineCallbacks
164 def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
165 '''
166 This method provides a watch capability for the given key. If the value of the key
167 changes or the key is deleted, then an event indicating the change is passed to
168 the given callback function.
169
170 :param key: The key to be watched
171 :param key_change_callback: The function invoked whenever the key changes
172 :param timeout: The length of time in seconds the method will wait for a response
173 :return: There is no return; key change events are passed to the callback function
174 '''
175 raise NotImplementedError('Method not implemented')
176
177 @inlineCallbacks
178 def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
179 '''
180 This method closes the watch on the given key. Once the watch is closed, key
181 change events are no longer passed to the key change callback function.
182
183 :param key: The key under watch
184 :param timeout: The length of time in seconds the method will wait for a response
185 :return: There is no return
186 '''
187 raise NotImplementedError('Method not implemented')
188
189 @inlineCallbacks
190 def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
191 raise NotImplementedError('Method not implemented')
192
193 def _backoff(self, msg):
194 wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
195 self.retry_time += wait_time
196 self.retries += 1
197 log.error(msg, next_retry_in_secs=wait_time,
198 total_delay_in_secs = self.retry_time,
199 retries=self.retries)
200 return asleep(wait_time)
201
202 def _clear_backoff(self):
203 if self.retries:
204 log.debug('reset-backoff', after_retries=self.retries)
205 self.retries = 0
206 self.retry_time = 0