blob: 77451c81998a9d0f3baf8d1bc81f3e971e331e26 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
#
# Most of the txaioetcd methods provide a timeout parameter. This parameter
# is likely intended to limit the amount of time spent by any one method
# waiting for a response from the etcd server. However, if the server is
# down, the method immediately throws a ConnectionRefusedError exception;
# it does not perform any retries. The timeout parameter provided by the
# methods in EtcdClient cover this contingency.
#
################################################################################
from common.kvstore.kv_client import DEFAULT_TIMEOUT, Event, KVClient, KVPair
from structlog import get_logger
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
from twisted.internet.error import ConnectionRefusedError
from txaioetcd import Client, CompVersion, Failed, KeySet, OpGet, OpSet, Transaction
log = get_logger()
class EtcdClient(KVClient):
def __init__(self, kv_host, kv_port):
KVClient.__init__(self, kv_host, kv_port)
self.url = u'http://' + kv_host + u':' + str(kv_port)
self.client = Client(reactor, self.url)
@inlineCallbacks
def get(self, key, timeout=DEFAULT_TIMEOUT):
result = yield self._op_with_retry('GET', key, None, timeout)
returnValue(result)
@inlineCallbacks
def list(self, key, timeout=DEFAULT_TIMEOUT):
result = yield self._op_with_retry('LIST', key, None, timeout)
returnValue(result)
@inlineCallbacks
def put(self, key, value, timeout=DEFAULT_TIMEOUT):
_, err = yield self._op_with_retry('PUT', key, value, timeout)
returnValue(err)
@inlineCallbacks
def delete(self, key, timeout=DEFAULT_TIMEOUT):
_, err = yield self._op_with_retry('DELETE', key, None, timeout)
returnValue(err)
@inlineCallbacks
def reserve(self, key, value, ttl, timeout=DEFAULT_TIMEOUT):
result = yield self._op_with_retry('RESERVE', key, value, timeout, ttl=ttl)
returnValue(result)
@inlineCallbacks
def renew_reservation(self, key, timeout=DEFAULT_TIMEOUT):
result, err = yield self._op_with_retry('RENEW', key, None, timeout)
returnValue(err)
@inlineCallbacks
def release_reservation(self, key, timeout=DEFAULT_TIMEOUT):
result, err = yield self._op_with_retry('RELEASE', key, None, timeout)
returnValue(err)
@inlineCallbacks
def release_all_reservations(self, timeout=DEFAULT_TIMEOUT):
result, err = yield self._op_with_retry('RELEASE-ALL', None, None, timeout)
returnValue(err)
@inlineCallbacks
def watch(self, key, key_change_callback, timeout=DEFAULT_TIMEOUT):
self.key_watches[key] = key_change_callback
result = yield self._op_with_retry('WATCH', key, None, timeout, callback=self.key_changed)
returnValue(result)
def key_changed(self, kv):
key = kv.key
value = kv.value
log.debug('key-changed', key=key, value=value)
# Notify client of key change event
if value is not None:
evt = Event(Event.PUT, key, value)
else:
evt = Event(Event.DELETE, key, None)
if key in self.key_watches:
self.key_watches[key](evt)
def close_watch(self, key, timeout=DEFAULT_TIMEOUT):
log.debug('close-watch', key=key)
if key in self.key_watches:
self.key_watches.pop(key)
@inlineCallbacks
def _op_with_retry(self, operation, key, value, timeout, *args, **kw):
log.debug('kv-op', operation=operation, key=key, timeout=timeout, args=args, kw=kw)
err = None
result = None
if type(key) == str:
key = bytes(key)
if value is not None:
value = bytes(value)
while True:
try:
if operation == 'GET':
result = yield self._get(key)
elif operation == 'LIST':
result, err = yield self._list(key)
elif operation == 'PUT':
# Put returns an object of type Revision
result = yield self.client.set(key, value, **kw)
elif operation == 'DELETE':
# Delete returns an object of type Deleted
result = yield self.client.delete(key)
elif operation == 'RESERVE':
result, err = yield self._reserve(key, value, **kw)
elif operation == 'RENEW':
result, err = yield self._renew_reservation(key)
elif operation == 'RELEASE':
result, err = yield self._release_reservation(key)
elif operation == 'RELEASE-ALL':
err = yield self._release_all_reservations()
elif operation == 'WATCH':
for name, val in kw.items():
if name == 'callback':
callback = val
break
result = self.client.watch([KeySet(key, prefix=True)], callback)
log.debug('watch-response', result=result)
self._clear_backoff()
break
except ConnectionRefusedError as ex:
log.error('comms-exception', ex=ex)
yield self._backoff('etcd-not-up')
except Exception as ex:
log.error('etcd-exception', ex=ex)
err = ex
if timeout > 0 and self.retry_time > timeout:
err = 'operation-timed-out'
if err is not None:
self._clear_backoff()
break
returnValue((result, err))
@inlineCallbacks
def _get(self, key):
kvp = None
resp = yield self.client.get(key)
if resp.kvs is not None and len(resp.kvs) == 1:
kv = resp.kvs[0]
kvp = KVPair(kv.key, kv.value, kv.mod_revision)
returnValue(kvp)
@inlineCallbacks
def _list(self, key):
err = None
list = []
resp = yield self.client.get(KeySet(key, prefix=True))
if resp.kvs is not None and len(resp.kvs) > 0:
for kv in resp.kvs:
list.append(KVPair(kv.key, kv.value, kv.mod_revision))
returnValue((list, err))
@inlineCallbacks
def _reserve(self, key, value, **kw):
for name, val in kw.items():
if name == 'ttl':
ttl = val
break
reserved = False
err = 'reservation-failed'
owner = None
# Create a lease
lease = yield self.client.lease(ttl)
# Create a transaction
txn = Transaction(
compare=[ CompVersion(key, '==', 0) ],
success=[ OpSet(key, bytes(value), lease=lease) ],
failure=[ OpGet(key) ]
)
newly_acquired = False
try:
result = yield self.client.submit(txn)
except Failed as failed:
log.debug('key-already-present', key=key)
if len(failed.responses) > 0:
response = failed.responses[0]
if response.kvs is not None and len(response.kvs) > 0:
kv = response.kvs[0]
log.debug('key-already-present', value=kv.value)
if kv.value == value:
reserved = True
log.debug('key-already-reserved', key = kv.key, value=kv.value)
else:
newly_acquired = True
log.debug('key-was-absent', key=key, result=result)
# Check if reservation succeeded
resp = yield self.client.get(key)
if resp.kvs is not None and len(resp.kvs) == 1:
owner = resp.kvs[0].value
if owner == value:
if newly_acquired:
log.debug('key-reserved', key=key, value=value, ttl=ttl,
lease_id=lease.lease_id)
reserved = True
# Add key to reservation list
self.key_reservations[key] = lease
else:
log.debug("reservation-still-held")
else:
log.debug('reservation-held-by-another', value=owner)
if reserved:
err = None
returnValue((owner, err))
@inlineCallbacks
def _renew_reservation(self, key):
result = None
err = None
if key not in self.key_reservations:
err = 'key-not-reserved'
else:
lease = self.key_reservations[key]
# A successfully refreshed lease returns an object of type Header
result = yield lease.refresh()
if result is None:
err = 'lease-refresh-failed'
returnValue((result, err))
@inlineCallbacks
def _release_reservation(self, key):
err = None
if key not in self.key_reservations:
err = 'key-not-reserved'
else:
lease = self.key_reservations[key]
time_left = yield lease.remaining()
# A successfully revoked lease returns an object of type Header
log.debug('release-reservation', key=key, lease_id=lease.lease_id,
time_left_in_secs=time_left)
result = yield lease.revoke()
if result is None:
err = 'lease-revoke-failed'
self.key_reservations.pop(key)
returnValue((result, err))
@inlineCallbacks
def _release_all_reservations(self):
err = None
keys_to_delete = []
for key in self.key_reservations:
lease = self.key_reservations[key]
time_left = yield lease.remaining()
# A successfully revoked lease returns an object of type Header
log.debug('release-reservation', key=key, lease_id=lease.lease_id,
time_left_in_secs=time_left)
result = yield lease.revoke()
if result is None:
err = 'lease-revoke-failed'
log.debug('lease-revoke', result=result)
keys_to_delete.append(key)
for key in keys_to_delete:
self.key_reservations.pop(key)
returnValue(err)