blob: 1a7d207aba88365c4d51ba0f7aa83db5db32b31a [file] [log] [blame]
Richard Jankowski8af3c0e2018-08-14 16:07:18 -04001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from common.utils.asleep import asleep
16from structlog import get_logger
17from twisted.internet.defer import inlineCallbacks
18
19log = get_logger()
20
21class KVPair():
22 def __init__(self, key, value, index):
23 self.key = key
24 self.value = value
25 self.index = index
26
27class Event():
28 PUT = 0
29 DELETE = 1
30 CONNECTION_DOWN = 2
31
32 def __init__(self, event_type, key, value):
33 self.event_type = event_type
34 self.key = key
35 self.value = value
36
37RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
38DEFAULT_TIMEOUT = 0.0
39for i in range(len(RETRY_BACKOFF)):
40 DEFAULT_TIMEOUT += RETRY_BACKOFF[i]
41
42class KVClient():
43
44 def __init__(self, kv_host, kv_port):
45 self.host = kv_host
46 self.port = kv_port
47 self.key_reservations = {}
48 self.key_watches = {}
49 self.retries = 0
50 self.retry_time = 0
51
52 @inlineCallbacks
53 def get(self, key, timeout=DEFAULT_TIMEOUT):
54 '''
55 This method returns the value of the given key in KV store.
56
57 :param key: The key whose value is requested
58 :param timeout: The length of time in seconds the method will wait for a response
59 :return: (KVPair, error) where KVPair is None if an error occurred
60 '''
61 raise NotImplementedError('Method not implemented')
62
63 @inlineCallbacks
64 def list(self, key, timeout=DEFAULT_TIMEOUT):
65 '''
66 The list method returns an array of key-value pairs all of which
67 share the same key prefix.
68
69 :param key: The key prefix
70 :param timeout: The length of time in seconds the method will wait for a response
71 :return: ([]KVPair, error) where []KVPair is a list of KVPair objects
72 '''
73 raise NotImplementedError('Method not implemented')
74
75 @inlineCallbacks
76 def put(self, key, value, timeout=DEFAULT_TIMEOUT):
77 '''
78 The put method writes a value to the given key in KV store.
79 Do NOT modify a reserved key in an etcd store; doing so seems
80 to nullify the TTL of the key. In other words, the key lasts
81 forever.
82
83 :param key: The key to be written to
84 :param value: The value of the key
85 :param timeout: The length of time in seconds the method will wait for a response
86 :return: error, which is set to None for a successful write
87 '''
88 raise NotImplementedError('Method not implemented')
89
90 @inlineCallbacks
91 def delete(self, key, timeout=DEFAULT_TIMEOUT):
92 '''
93 The delete method removes a key from the KV store.
94
95 :param key: The key to be deleted
96 :param timeout: The length of time in seconds the method will wait for a response
97 :return: error, which is set to None for a successful deletion
98 '''
99 raise NotImplementedError('Method not implemented')
100
101 @inlineCallbacks
102 def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
103 '''
104 This method acts essentially like a semaphore. The underlying mechanism
105 differs depending on the KV store: etcd uses a test-and-set transaction;
106 consul uses an acquire lock. If using etcd, do NOT write to the key
107 subsequent to the initial reservation; the TTL functionality may become
108 impaired (i.e. the reservation never expires).
109
110 :param key: The key under reservation
111 :param value: The reservation owner
112 :param ttl: The time-to-live (TTL) for the reservation. The key is unreserved
113 by the KV store when the TTL expires.
114 :param timeout: The length of time in seconds the method will wait for a response
115 :return: (key_value, error) If the key is acquired, then the value returned will
116 be the value passed in. If the key is already acquired, then the value assigned
117 to that key will be returned.
118 '''
119 raise NotImplementedError('Method not implemented')
120
121 @inlineCallbacks
122 def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
123 '''
124 This method renews the reservation for a given key. A reservation expires
125 after the TTL (Time To Live) period specified when reserving the key.
126
127 :param key: The reserved key
128 :param timeout: The length of time in seconds the method will wait for a response
129 :return: error, which is set to None for a successful renewal
130 '''
131 raise NotImplementedError('Method not implemented')
132
133 @inlineCallbacks
134 def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
135 '''
136 The release_reservation method cancels the reservation for a given key.
137
138 :param key: The reserved key
139 :param timeout: The length of time in seconds the method will wait for a response
140 :return: error, which is set to None for a successful cancellation
141 '''
142 raise NotImplementedError('Method not implemented')
143
144 @inlineCallbacks
145 def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
146 '''
147 This method cancels all key reservations made previously
148 using the reserve API.
149
150 :param timeout: The length of time in seconds the method will wait for a response
151 :return: error, which is set to None for a successful cancellation
152 '''
153 raise NotImplementedError('Method not implemented')
154
155 @inlineCallbacks
156 def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
157 '''
158 This method provides a watch capability for the given key. If the value of the key
159 changes or the key is deleted, then an event indicating the change is passed to
160 the given callback function.
161
162 :param key: The key to be watched
163 :param key_change_callback: The function invoked whenever the key changes
164 :param timeout: The length of time in seconds the method will wait for a response
165 :return: There is no return; key change events are passed to the callback function
166 '''
167 raise NotImplementedError('Method not implemented')
168
169 @inlineCallbacks
170 def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
171 '''
172 This method closes the watch on the given key. Once the watch is closed, key
173 change events are no longer passed to the key change callback function.
174
175 :param key: The key under watch
176 :param timeout: The length of time in seconds the method will wait for a response
177 :return: There is no return
178 '''
179 raise NotImplementedError('Method not implemented')
180
181
182 def _backoff(self, msg):
183 wait_time = RETRY_BACKOFF[min(self.retries, len(RETRY_BACKOFF) - 1)]
184 self.retry_time += wait_time
185 self.retries += 1
186 log.error(msg, next_retry_in_secs=wait_time,
187 total_delay_in_secs = self.retry_time,
188 retries=self.retries)
189 return asleep(wait_time)
190
191 def _clear_backoff(self):
192 if self.retries:
193 log.debug('reset-backoff', after_retries=self.retries)
194 self.retries = 0
195 self.retry_time = 0