blob: e1850e79987971faad192e9f933e5fff49e204c9 [file] [log] [blame]
William Kurkian6f436d02019-02-06 16:25:01 -05001# Copyright 2018-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15################################################################################
16#
17# Most of the txaioetcd methods provide a timeout parameter. This parameter
18# is likely intended to limit the amount of time spent by any one method
19# waiting for a response from the etcd server. However, if the server is
20# down, the method immediately throws a ConnectionRefusedError exception;
21# it does not perform any retries. The timeout parameter provided by the
22# methods in EtcdClient cover this contingency.
23#
24################################################################################
25
26from kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair
27from structlog import get_logger
28from twisted.internet import reactor
29from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
30from twisted.internet.error import ConnectionRefusedError
31from txaioetcd import Client, CompVersion, Failed, KeySet, OpGet, OpSet, Transaction
32
33log = get_logger()
34
35class EtcdClient(KVClient):
36
37 def __init__(self, kv_host, kv_port):
38 KVClient.__init__(self, kv_host, kv_port)
39 self.url = u'http://' + kv_host + u':' + str(kv_port)
40 self.client = Client(reactor, self.url)
41
42 @inlineCallbacks
43 def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
44 self.key_watches[key] = key_change_callback
45 result = yield self._op_with_retry('WATCH', key, None, timeout, callback=self.key_changed)
46 returnValue(result)
47
48 def key_changed(self, kv):
49 key = kv.key
50 value = kv.value
51 log.debug('key-changed', key=key, value=value)
52 # Notify client of key change event
53 if value is not None:
54 evt = Event(Event.PUT, key, value)
55 else:
56 evt = Event(Event.DELETE, key, None)
57 if key in self.key_watches:
58 self.key_watches[key](evt)
59
60 def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
61 log.debug('close-watch', key=key)
62 if key in self.key_watches:
63 self.key_watches.pop(key)
64
65 @inlineCallbacks
66 def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
67 log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
68 err = None
69 result = None
70 if type(key) == str:
71 key = bytes(key)
72 if value is not None:
73 value = bytes(value)
74 while True:
75 try:
76 if operation == 'GET':
77 result = yield self._get(key)
78 elif operation == 'LIST':
79 result, err = yield self._list(key)
80 elif operation == 'PUT':
81 # Put returns an object of type Revision
82 result = yield self.client.set(key, value, **kw)
83 elif operation == 'DELETE':
84 # Delete returns an object of type Deleted
85 result = yield self.client.delete(key)
86 elif operation == 'RESERVE':
87 result, err = yield self._reserve(key, value, **kw)
88 elif operation == 'RENEW':
89 result, err = yield self._renew_reservation(key)
90 elif operation == 'RELEASE':
91 result, err = yield self._release_reservation(key)
92 elif operation == 'RELEASE-ALL':
93 err = yield self._release_all_reservations()
94 elif operation == 'WATCH':
95 for name, val in kw.items():
96 if name == 'callback':
97 callback = val
98 break
99 result = self.client.watch([KeySet(key, prefix=True)], callback)
100 self._clear_backoff()
101 break
102 except ConnectionRefusedError as ex:
103 log.error('comms-exception', ex=ex)
104 yield self._backoff('etcd-not-up')
105 except Exception as ex:
106 log.error('etcd-exception', ex=ex)
107 err = ex
108
109 if timeout > 0 and self.retry_time > timeout:
110 err = 'operation-timed-out'
111 if err is not None:
112 self._clear_backoff()
113 break
114
115 returnValue((result, err))
116
117 @inlineCallbacks
118 def _get(self, key):
119 kvp = None
120 resp = yield self.client.get(key)
121 if resp.kvs is not None and len(resp.kvs) == 1:
122 kv = resp.kvs[0]
123 kvp = KVPair(kv.key, kv.value, kv.mod_revision)
124 returnValue(kvp)
125
126 @inlineCallbacks
127 def _list(self, key):
128 err = None
129 list = []
130 resp = yield self.client.get(KeySet(key, prefix=True))
131 if resp.kvs is not None and len(resp.kvs) > 0:
132 for kv in resp.kvs:
133 list.append(KVPair(kv.key, kv.value, kv.mod_revision))
134 returnValue((list, err))
135
136 @inlineCallbacks
137 def _reserve(self, key, value, **kw):
138 for name, val in kw.items():
139 if name == 'ttl':
140 ttl = val
141 break
142 reserved = False
143 err = 'reservation-failed'
144 owner = None
145
146 # Create a lease
147 lease = yield self.client.lease(ttl)
148
149 # Create a transaction
150 txn = Transaction(
151 compare=[ CompVersion(key, '==', 0) ],
152 success=[ OpSet(key, bytes(value), lease=lease) ],
153 failure=[ OpGet(key) ]
154 )
155 newly_acquired = False
156 try:
157 result = yield self.client.submit(txn)
158 except Failed as failed:
159 log.debug('key-already-present', key=key)
160 if len(failed.responses) > 0:
161 response = failed.responses[0]
162 if response.kvs is not None and len(response.kvs) > 0:
163 kv = response.kvs[0]
164 log.debug('key-already-present', value=kv.value)
165 if kv.value == value:
166 reserved = True
167 log.debug('key-already-reserved', key = kv.key, value=kv.value)
168 else:
169 newly_acquired = True
170 log.debug('key-was-absent', key=key, result=result)
171
172 # Check if reservation succeeded
173 resp = yield self.client.get(key)
174 if resp.kvs is not None and len(resp.kvs) == 1:
175 owner = resp.kvs[0].value
176 if owner == value:
177 if newly_acquired:
178 log.debug('key-reserved', key=key, value=value, ttl=ttl,
179 lease_id=lease.lease_id)
180 reserved = True
181 # Add key to reservation list
182 self.key_reservations[key] = lease
183 else:
184 log.debug("reservation-still-held")
185 else:
186 log.debug('reservation-held-by-another', value=owner)
187
188 if reserved:
189 err = None
190 returnValue((owner, err))
191
192 @inlineCallbacks
193 def _renew_reservation(self, key):
194 result = None
195 err = None
196 if key not in self.key_reservations:
197 err = 'key-not-reserved'
198 else:
199 lease = self.key_reservations[key]
200 # A successfully refreshed lease returns an object of type Header
201 result = yield lease.refresh()
202 if result is None:
203 err = 'lease-refresh-failed'
204 returnValue((result, err))
205
206 @inlineCallbacks
207 def _release_reservation(self, key):
208 err = None
209 if key not in self.key_reservations:
210 err = 'key-not-reserved'
211 else:
212 lease = self.key_reservations[key]
213 time_left = yield lease.remaining()
214 # A successfully revoked lease returns an object of type Header
215 log.debug('release-reservation', key=key, lease_id=lease.lease_id,
216 time_left_in_secs=time_left)
217 result = yield lease.revoke()
218 if result is None:
219 err = 'lease-revoke-failed'
220 self.key_reservations.pop(key)
221 returnValue((result, err))
222
223 @inlineCallbacks
224 def _release_all_reservations(self):
225 err = None
226 keys_to_delete = []
227 for key in self.key_reservations:
228 lease = self.key_reservations[key]
229 time_left = yield lease.remaining()
230 # A successfully revoked lease returns an object of type Header
231 log.debug('release-reservation', key=key, lease_id=lease.lease_id,
232 time_left_in_secs=time_left)
233 result = yield lease.revoke()
234 if result is None:
235 err = 'lease-revoke-failed'
236 log.debug('lease-revoke', result=result)
237 keys_to_delete.append(key)
238 for key in keys_to_delete:
239 self.key_reservations.pop(key)
240 returnValue(err)