blob: 77451c81998a9d0f3baf8d1bc81f3e971e331e26 [file] [log] [blame]
Richard Jankowski8af3c0e2018-08-14 16:07:18 -04001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15################################################################################
16#
17# Most of the txaioetcd methods provide a timeout parameter. This parameter
18# is likely intended to limit the amount of time spent by any one method
19# waiting for a response from the etcd server. However, if the server is
20# down, the method immediately throws a ConnectionRefusedError exception;
21# it does not perform any retries. The timeout parameter provided by the
22# methods in EtcdClient cover this contingency.
23#
24################################################################################
25
26from common.kvstore.kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair
27from structlog import get_logger
28from twisted.internet import reactor
29from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
30from twisted.internet.error import ConnectionRefusedError
31from txaioetcd import Client, CompVersion, Failed, KeySet, OpGet, OpSet, Transaction
32
33log = get_logger()
34
35class EtcdClient(KVClient):
36
37 def __init__(self, kv_host, kv_port):
38 KVClient.__init__(self, kv_host, kv_port)
39 self.url = u'http://' + kv_host + u':' + str(kv_port)
40 self.client = Client(reactor, self.url)
41
42 @inlineCallbacks
43 def get(self, key, timeout=DEFAULT_TIMEOUT):
44 result = yield self._op_with_retry('GET', key, None, timeout)
45 returnValue(result)
46
47 @inlineCallbacks
48 def list(self, key, timeout=DEFAULT_TIMEOUT):
49 result = yield self._op_with_retry('LIST', key, None, timeout)
50 returnValue(result)
51
52 @inlineCallbacks
53 def put(self, key, value, timeout=DEFAULT_TIMEOUT):
54 _, err = yield self._op_with_retry('PUT', key, value, timeout)
55 returnValue(err)
56
57 @inlineCallbacks
58 def delete(self, key, timeout=DEFAULT_TIMEOUT):
59 _, err = yield self._op_with_retry('DELETE', key, None, timeout)
60 returnValue(err)
61
62 @inlineCallbacks
63 def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
64 result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
65 returnValue(result)
66
67 @inlineCallbacks
68 def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
69 result, err = yield self._op_with_retry('RENEW', key, None, timeout)
70 returnValue(err)
71
72 @inlineCallbacks
73 def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
74 result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
75 returnValue(err)
76
77 @inlineCallbacks
78 def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
79 result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
80 returnValue(err)
81
82 @inlineCallbacks
83 def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
84 self.key_watches[key] = key_change_callback
85 result = yield self._op_with_retry('WATCH', key, None, timeout, callback=self.key_changed)
86 returnValue(result)
87
88 def key_changed(self, kv):
89 key = kv.key
90 value = kv.value
91 log.debug('key-changed', key=key, value=value)
92 # Notify client of key change event
93 if value is not None:
94 evt = Event(Event.PUT, key, value)
95 else:
96 evt = Event(Event.DELETE, key, None)
97 if key in self.key_watches:
98 self.key_watches[key](evt)
99
100 def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
101 log.debug('close-watch', key=key)
102 if key in self.key_watches:
103 self.key_watches.pop(key)
104
105 @inlineCallbacks
106 def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
107 log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
108 err = None
109 result = None
110 if type(key) == str:
111 key = bytes(key)
112 if value is not None:
113 value = bytes(value)
114 while True:
115 try:
116 if operation == 'GET':
117 result = yield self._get(key)
118 elif operation == 'LIST':
119 result, err = yield self._list(key)
120 elif operation == 'PUT':
121 # Put returns an object of type Revision
122 result = yield self.client.set(key, value, **kw)
123 elif operation == 'DELETE':
124 # Delete returns an object of type Deleted
125 result = yield self.client.delete(key)
126 elif operation == 'RESERVE':
127 result, err = yield self._reserve(key, value, **kw)
128 elif operation == 'RENEW':
129 result, err = yield self._renew_reservation(key)
130 elif operation == 'RELEASE':
131 result, err = yield self._release_reservation(key)
132 elif operation == 'RELEASE-ALL':
133 err = yield self._release_all_reservations()
134 elif operation == 'WATCH':
135 for name, val in kw.items():
136 if name == 'callback':
137 callback = val
138 break
139 result = self.client.watch([KeySet(key, prefix=True)], callback)
140 log.debug('watch-response', result=result)
141 self._clear_backoff()
142 break
143 except ConnectionRefusedError as ex:
144 log.error('comms-exception', ex=ex)
145 yield self._backoff('etcd-not-up')
146 except Exception as ex:
147 log.error('etcd-exception', ex=ex)
148 err = ex
149
150 if timeout > 0 and self.retry_time > timeout:
151 err = 'operation-timed-out'
152 if err is not None:
153 self._clear_backoff()
154 break
155
156 returnValue((result, err))
157
158 @inlineCallbacks
159 def _get(self, key):
160 kvp = None
161 resp = yield self.client.get(key)
162 if resp.kvs is not None and len(resp.kvs) == 1:
163 kv = resp.kvs[0]
164 kvp = KVPair(kv.key, kv.value, kv.mod_revision)
165 returnValue(kvp)
166
167 @inlineCallbacks
168 def _list(self, key):
169 err = None
170 list = []
171 resp = yield self.client.get(KeySet(key, prefix=True))
172 if resp.kvs is not None and len(resp.kvs) > 0:
173 for kv in resp.kvs:
174 list.append(KVPair(kv.key, kv.value, kv.mod_revision))
175 returnValue((list, err))
176
177 @inlineCallbacks
178 def _reserve(self, key, value, **kw):
179 for name, val in kw.items():
180 if name == 'ttl':
181 ttl = val
182 break
183 reserved = False
184 err = 'reservation-failed'
185 owner = None
186
187 # Create a lease
188 lease = yield self.client.lease(ttl)
189
190 # Create a transaction
191 txn = Transaction(
192 compare=[ CompVersion(key, '==', 0) ],
193 success=[ OpSet(key, bytes(value), lease=lease) ],
194 failure=[ OpGet(key) ]
195 )
196 newly_acquired = False
197 try:
198 result = yield self.client.submit(txn)
199 except Failed as failed:
200 log.debug('key-already-present', key=key)
201 if len(failed.responses) > 0:
202 response = failed.responses[0]
203 if response.kvs is not None and len(response.kvs) > 0:
204 kv = response.kvs[0]
205 log.debug('key-already-present', value=kv.value)
206 if kv.value == value:
207 reserved = True
208 log.debug('key-already-reserved', key = kv.key, value=kv.value)
209 else:
210 newly_acquired = True
211 log.debug('key-was-absent', key=key, result=result)
212
213 # Check if reservation succeeded
214 resp = yield self.client.get(key)
215 if resp.kvs is not None and len(resp.kvs) == 1:
216 owner = resp.kvs[0].value
217 if owner == value:
218 if newly_acquired:
219 log.debug('key-reserved', key=key, value=value, ttl=ttl,
220 lease_id=lease.lease_id)
221 reserved = True
222 # Add key to reservation list
223 self.key_reservations[key] = lease
224 else:
225 log.debug("reservation-still-held")
226 else:
227 log.debug('reservation-held-by-another', value=owner)
228
229 if reserved:
230 err = None
231 returnValue((owner, err))
232
233 @inlineCallbacks
234 def _renew_reservation(self, key):
235 result = None
236 err = None
237 if key not in self.key_reservations:
238 err = 'key-not-reserved'
239 else:
240 lease = self.key_reservations[key]
241 # A successfully refreshed lease returns an object of type Header
242 result = yield lease.refresh()
243 if result is None:
244 err = 'lease-refresh-failed'
245 returnValue((result, err))
246
247 @inlineCallbacks
248 def _release_reservation(self, key):
249 err = None
250 if key not in self.key_reservations:
251 err = 'key-not-reserved'
252 else:
253 lease = self.key_reservations[key]
254 time_left = yield lease.remaining()
255 # A successfully revoked lease returns an object of type Header
256 log.debug('release-reservation', key=key, lease_id=lease.lease_id,
257 time_left_in_secs=time_left)
258 result = yield lease.revoke()
259 if result is None:
260 err = 'lease-revoke-failed'
261 self.key_reservations.pop(key)
262 returnValue((result, err))
263
264 @inlineCallbacks
265 def _release_all_reservations(self):
266 err = None
267 keys_to_delete = []
268 for key in self.key_reservations:
269 lease = self.key_reservations[key]
270 time_left = yield lease.remaining()
271 # A successfully revoked lease returns an object of type Header
272 log.debug('release-reservation', key=key, lease_id=lease.lease_id,
273 time_left_in_secs=time_left)
274 result = yield lease.revoke()
275 if result is None:
276 err = 'lease-revoke-failed'
277 log.debug('lease-revoke', result=result)
278 keys_to_delete.append(key)
279 for key in keys_to_delete:
280 self.key_reservations.pop(key)
281 returnValue(err)