Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 1 | # |
| 2 | # Copyright 2017 the original author or authors. |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | """ Etcd-based coordinator services """ |
| 18 | |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 19 | from structlog import get_logger |
| 20 | from twisted.internet import reactor |
| 21 | from twisted.internet.defer import inlineCallbacks, returnValue, Deferred |
| 22 | from txaioetcd import Client, KeySet, Transaction, CompVersion, OpGet, OpSet, Failed |
| 23 | from zope.interface import implementer |
| 24 | |
| 25 | from leader import Leader |
| 26 | from common.utils.asleep import asleep |
| 27 | from common.utils.message_queue import MessageQueue |
| 28 | from voltha.registry import IComponent |
| 29 | from worker import Worker |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 30 | from simplejson import dumps |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 31 | from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError |
| 32 | |
| 33 | log = get_logger() |
| 34 | |
| 35 | |
| 36 | class StaleMembershipEntryException(Exception): |
| 37 | pass |
| 38 | |
| 39 | |
| 40 | @implementer(IComponent) |
| 41 | class CoordinatorEtcd(object): |
| 42 | """ |
| 43 | An app shall instantiate only one Coordinator (singleton). |
| 44 | A single instance of this object shall take care of all external |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 45 | with etcd, and via etcd, all coordination activities with its |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 46 | clustered peers. Roles include: |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 47 | - registering an ephemeral membership entry (k/v record) in etcd |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 48 | - participating in a symmetric leader election, and potentially assuming |
| 49 | the leader's role. What leadership entails is not a concern for the |
| 50 | coordination, it simply instantiates (and shuts down) a leader class |
| 51 | when it gains (or looses) leadership. |
| 52 | """ |
| 53 | |
| 54 | CONNECT_RETRY_INTERVAL_SEC = 1 |
| 55 | RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5] |
| 56 | |
| 57 | # Public methods: |
| 58 | |
| 59 | def __init__(self, |
| 60 | internal_host_address, |
| 61 | external_host_address, |
| 62 | instance_id, |
| 63 | rest_port, |
| 64 | config, |
Richard Jankowski | 4ea2663 | 2018-05-14 17:45:38 -0400 | [diff] [blame] | 65 | etcd='localhost:2379', |
| 66 | container_name_regex='^.*\.([0-9]+)\..*$'): |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 67 | |
| 68 | log.info('initializing-coordinator') |
| 69 | self.config = config['coordinator'] |
| 70 | self.worker_config = config['worker'] |
| 71 | self.leader_config = config['leader'] |
| 72 | self.membership_watch_relatch_delay = config.get( |
| 73 | 'membership_watch_relatch_delay', 0.1) |
| 74 | self.tracking_loop_delay = self.config.get( |
| 75 | 'tracking_loop_delay', 1) |
| 76 | self.session_renewal_timeout = self.config.get( |
| 77 | 'session_renewal_timeout', 5) |
| 78 | self.session_renewal_loop_delay = self.config.get( |
| 79 | 'session_renewal_loop_delay', 3) |
| 80 | self.membership_maintenance_loop_delay = self.config.get( |
| 81 | 'membership_maintenance_loop_delay', 5) |
| 82 | self.session_time_to_live = self.config.get( |
| 83 | 'session_time_to_live', 10) |
| 84 | self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha') |
| 85 | self.leader_prefix = '/'.join((self.prefix, self.config.get( |
| 86 | self.config['leader_key'], 'leader'))) |
| 87 | self.membership_prefix = '/'.join((self.prefix, self.config.get( |
| 88 | self.config['membership_key'], 'members'), '')) |
| 89 | self.assignment_prefix = '/'.join((self.prefix, self.config.get( |
| 90 | self.config['assignment_key'], 'assignments'), '')) |
| 91 | self.workload_prefix = '/'.join((self.prefix, self.config.get( |
| 92 | self.config['workload_key'], 'work'), '')) |
| 93 | self.core_store_prefix = '/'.join((self.prefix, self.config.get( |
| 94 | self.config['core_store_key'], 'data/core'))) |
| 95 | self.core_store_assignment_key = self.core_store_prefix + \ |
| 96 | '/assignment' |
| 97 | self.core_storage_suffix = 'core_store' |
| 98 | |
| 99 | self.retries = 0 |
| 100 | self.instance_id = instance_id |
| 101 | self.internal_host_address = internal_host_address |
| 102 | self.external_host_address = external_host_address |
| 103 | self.rest_port = rest_port |
| 104 | self.membership_record_key = self.membership_prefix + self.instance_id |
| 105 | |
| 106 | self.lease = None |
| 107 | # session_id refers to either a Consul session ID or an Etcd lease object |
| 108 | self.session_id = None |
| 109 | self.i_am_leader = False |
| 110 | self.leader_id = None # will be the instance id of the current leader |
| 111 | self.shutting_down = False |
| 112 | self.leader = None |
| 113 | self.membership_callback = None |
| 114 | |
| 115 | self.worker = Worker(self.instance_id, self) |
| 116 | |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 117 | # Create etcd client |
| 118 | kv_host = etcd.split(':')[0].strip() |
| 119 | kv_port = etcd.split(':')[1].strip() |
| 120 | self.etcd_url = u'http://' + kv_host + u':' + kv_port |
| 121 | self.etcd = Client(reactor, self.etcd_url) |
| 122 | |
Richard Jankowski | 4ea2663 | 2018-05-14 17:45:38 -0400 | [diff] [blame] | 123 | self.container_name_regex = container_name_regex |
| 124 | |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 125 | self.wait_for_leader_deferreds = [] |
| 126 | |
| 127 | self.peers_mapping_queue = MessageQueue() |
| 128 | |
| 129 | def start(self): |
| 130 | log.debug('starting') |
| 131 | reactor.callLater(0, self._async_init) |
| 132 | log.info('started') |
| 133 | return self |
| 134 | |
| 135 | @inlineCallbacks |
| 136 | def stop(self): |
| 137 | log.debug('stopping') |
| 138 | self.shutting_down = True |
| 139 | yield self._delete_session() # this will delete the leader lock too |
| 140 | yield self.worker.stop() |
| 141 | if self.leader is not None: |
| 142 | yield self.leader.stop() |
| 143 | self.leader = None |
| 144 | log.info('stopped') |
| 145 | |
| 146 | def wait_for_a_leader(self): |
| 147 | """ |
| 148 | Async wait till a leader is detected/elected. The deferred will be |
| 149 | called with the leader's instance_id |
| 150 | :return: Deferred. |
| 151 | """ |
| 152 | d = Deferred() |
| 153 | if self.leader_id is not None: |
| 154 | d.callback(self.leader_id) |
| 155 | return d |
| 156 | else: |
| 157 | self.wait_for_leader_deferreds.append(d) |
| 158 | return d |
| 159 | |
| 160 | # Wait for a core data id to be assigned to this voltha instance |
| 161 | @inlineCallbacks |
| 162 | def get_core_store_id_and_prefix(self): |
| 163 | core_store_id = yield self.worker.get_core_store_id() |
| 164 | returnValue((core_store_id, self.core_store_prefix)) |
| 165 | |
| 166 | def recv_peers_map(self): |
| 167 | return self.peers_mapping_queue.get() |
| 168 | |
| 169 | def publish_peers_map_change(self, msg): |
| 170 | self.peers_mapping_queue.put(msg) |
| 171 | |
| 172 | # Proxy methods for etcd with retry support |
| 173 | |
| 174 | def kv_get(self, *args, **kw): |
| 175 | # Intercept 'index' argument |
| 176 | for name, value in kw.items(): |
| 177 | if name == 'index': |
| 178 | kw.pop('index') |
| 179 | break |
| 180 | return self._retry('GET', *args, **kw) |
| 181 | |
| 182 | def kv_put(self, *args, **kw): |
| 183 | return self._retry('PUT', *args, **kw) |
| 184 | |
| 185 | def kv_delete(self, *args, **kw): |
| 186 | return self._retry('DELETE', *args, **kw) |
| 187 | |
| 188 | # Methods exposing key membership information |
| 189 | |
| 190 | @inlineCallbacks |
| 191 | def get_members(self): |
| 192 | """Return list of all members""" |
| 193 | _, members = yield self.kv_get(self.membership_prefix, recurse=True) |
| 194 | returnValue([member['Key'][len(self.membership_prefix):] |
| 195 | for member in members]) |
| 196 | |
| 197 | # Private (internal) methods: |
| 198 | |
| 199 | @inlineCallbacks |
| 200 | def _async_init(self): |
| 201 | yield self._create_session() |
| 202 | yield self._create_membership_record() |
| 203 | yield self._start_leader_tracking() |
| 204 | yield self.worker.start() |
| 205 | |
| 206 | def _backoff(self, msg): |
| 207 | wait_time = self.RETRY_BACKOFF[min(self.retries, |
| 208 | len(self.RETRY_BACKOFF) - 1)] |
| 209 | self.retries += 1 |
| 210 | log.error(msg, retry_in=wait_time) |
| 211 | return asleep(wait_time) |
| 212 | |
| 213 | def _clear_backoff(self): |
| 214 | if self.retries: |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 215 | log.info('reconnected-to-etcd', after_retries=self.retries) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 216 | self.retries = 0 |
| 217 | |
| 218 | @inlineCallbacks |
| 219 | def _create_session(self): |
| 220 | |
| 221 | @inlineCallbacks |
| 222 | def _create_session(): |
| 223 | etcd = yield self.get_kv_client() |
| 224 | # Create etcd lease |
| 225 | self.lease = yield etcd.lease(self.session_time_to_live) |
| 226 | self.session_id = self.lease |
| 227 | log.info('created-etcd-lease', lease=self.session_id) |
| 228 | self._start_session_tracking() |
| 229 | |
| 230 | yield self._retry(_create_session) |
| 231 | |
| 232 | @inlineCallbacks |
| 233 | def _delete_session(self): |
| 234 | try: |
| 235 | yield self.lease.revoke() |
| 236 | except Exception as e: |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 237 | log.exception('failed-to-delete-session %s' % e, |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 238 | session_id=self.session_id) |
| 239 | |
| 240 | @inlineCallbacks |
| 241 | def _create_membership_record(self): |
| 242 | yield self._do_create_membership_record_with_retries() |
| 243 | reactor.callLater(0, self._maintain_membership_record) |
| 244 | |
| 245 | @inlineCallbacks |
| 246 | def _maintain_membership_record(self): |
| 247 | try: |
| 248 | while 1: |
| 249 | valid_membership = yield self._assert_membership_record_valid() |
| 250 | if not valid_membership: |
| 251 | log.info('recreating-membership-before', |
| 252 | session=self.session_id) |
| 253 | yield self._do_create_membership_record_with_retries() |
| 254 | log.info('recreating-membership-after', |
| 255 | session=self.session_id) |
| 256 | else: |
| 257 | log.debug('valid-membership', session=self.session_id) |
| 258 | # Async sleep before checking the membership record again |
| 259 | yield asleep(self.membership_maintenance_loop_delay) |
| 260 | |
| 261 | except Exception, e: |
| 262 | log.exception('unexpected-error-leader-trackin', e=e) |
| 263 | finally: |
| 264 | # except in shutdown, the loop must continue (after a short delay) |
| 265 | if not self.shutting_down: |
| 266 | reactor.callLater(self.membership_watch_relatch_delay, |
| 267 | self._maintain_membership_record) |
| 268 | |
| 269 | def _create_membership_record_data(self): |
| 270 | member_record = dict() |
| 271 | member_record['status'] = 'alive' |
| 272 | member_record['host_address'] = self.external_host_address |
| 273 | return member_record |
| 274 | |
| 275 | @inlineCallbacks |
| 276 | def _assert_membership_record_valid(self): |
| 277 | try: |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 278 | log.debug('membership-record-before') |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 279 | is_timeout, (_, record) = yield \ |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 280 | self.coordinator_get_with_timeout( |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 281 | key=self.membership_record_key, |
| 282 | index=0, |
| 283 | timeout=5) |
| 284 | if is_timeout: |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 285 | log.debug('timeout creating membership record in etcd, key: %s' % |
| 286 | self.membership_record_key) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 287 | returnValue(False) |
| 288 | |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 289 | log.debug('membership-record-after', record=record) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 290 | if record is None or \ |
| 291 | 'Session' not in record: |
| 292 | log.info('membership-record-change-detected', |
| 293 | old_session=self.session_id, |
| 294 | record=record) |
| 295 | returnValue(False) |
| 296 | else: |
| 297 | returnValue(True) |
| 298 | except Exception as e: |
| 299 | log.exception('membership-validation-exception', e=e) |
| 300 | returnValue(False) |
| 301 | |
| 302 | @inlineCallbacks |
| 303 | def _do_create_membership_record_with_retries(self): |
| 304 | while 1: |
| 305 | log.info('recreating-membership', session=self.session_id) |
| 306 | result = yield self._retry( |
| 307 | 'PUT', |
| 308 | self.membership_record_key, |
| 309 | dumps(self._create_membership_record_data()), |
| 310 | acquire=self.session_id) |
| 311 | if result: |
| 312 | log.info('new-membership-record-created', |
| 313 | session=self.session_id) |
| 314 | break |
| 315 | else: |
| 316 | log.warn('cannot-create-membership-record') |
| 317 | yield self._backoff('stale-membership-record') |
| 318 | |
| 319 | def _start_session_tracking(self): |
| 320 | reactor.callLater(0, self._session_tracking_loop) |
| 321 | |
| 322 | @inlineCallbacks |
| 323 | def _session_tracking_loop(self): |
| 324 | |
| 325 | @inlineCallbacks |
| 326 | def _redo_session(): |
| 327 | log.info('_redo_session-before') |
| 328 | yield self._delete_session() |
| 329 | |
| 330 | # Create a new etcd connection/session with a new lease |
| 331 | try: |
| 332 | self.etcd = Client(reactor, self.etcd_url) |
| 333 | self.lease = yield self.etcd.lease(self.session_time_to_live) |
| 334 | self.session_id = self.lease |
| 335 | log.info('new-etcd-session', session=self.session_id) |
| 336 | |
| 337 | except Exception as e: |
| 338 | log.exception('could-not-create-an-etcd-lease', e=e) |
| 339 | |
| 340 | @inlineCallbacks |
| 341 | def _renew_session(m_callback): |
| 342 | try: |
| 343 | time_left = yield self.lease.remaining() |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 344 | log.debug('_renew_session', time_left=time_left) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 345 | result = yield self.lease.refresh() |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 346 | log.debug('just-renewed-session', result=result) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 347 | if not m_callback.called: |
| 348 | # Triggering callback will cancel the timeout timer |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 349 | log.debug('trigger-callback-to-cancel-timeout-timer') |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 350 | m_callback.callback(result) |
| 351 | else: |
| 352 | # Timeout event has already been called. Just ignore |
| 353 | # this event |
| 354 | log.info('renew-called-after-timeout, etcd ref changed?') |
| 355 | except Exception, e: |
| 356 | # Let the invoking method receive a timeout |
| 357 | log.exception('could-not-renew-session', e=e) |
| 358 | |
| 359 | try: |
| 360 | while 1: |
| 361 | log.debug('session-tracking-start') |
| 362 | rcvd = DeferredWithTimeout( |
| 363 | timeout=self.session_renewal_timeout) |
| 364 | _renew_session(rcvd) |
| 365 | try: |
| 366 | _ = yield rcvd |
| 367 | except TimeOutError as e: |
| 368 | log.info('session-renew-timeout', e=e) |
| 369 | # Redo the session |
| 370 | yield _redo_session() |
| 371 | except Exception as e: |
| 372 | log.exception('session-renew-exception', e=e) |
| 373 | else: |
| 374 | log.debug('successfully-renewed-session') |
| 375 | |
| 376 | # Async sleep before the next session tracking |
| 377 | yield asleep(self.session_renewal_loop_delay) |
| 378 | |
| 379 | except Exception as e: |
| 380 | log.exception('renew-exception', e=e) |
| 381 | finally: |
| 382 | reactor.callLater(self.session_renewal_loop_delay, |
| 383 | self._session_tracking_loop) |
| 384 | |
| 385 | def _start_leader_tracking(self): |
| 386 | reactor.callLater(0, self._leadership_tracking_loop) |
| 387 | |
| 388 | @inlineCallbacks |
| 389 | def _leadership_tracking_loop(self): |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 390 | try: |
Richard Jankowski | 57ebfdc | 2018-07-12 09:59:10 -0400 | [diff] [blame] | 391 | # Try to seize leadership via test-and-set operation. |
| 392 | # Success means the leader key was previously absent |
| 393 | # and was just re-created by this instance. |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 394 | |
Richard Jankowski | 57ebfdc | 2018-07-12 09:59:10 -0400 | [diff] [blame] | 395 | leader_prefix = bytes(self.leader_prefix) |
| 396 | txn = Transaction( |
| 397 | compare=[ |
| 398 | CompVersion(leader_prefix, '==', 0) |
| 399 | ], |
| 400 | success=[ |
| 401 | OpSet(leader_prefix, bytes(self.instance_id), lease=self.lease), |
| 402 | OpGet(leader_prefix) |
| 403 | ], |
| 404 | failure=[] |
| 405 | ) |
| 406 | newly_asserted = False |
| 407 | try: |
| 408 | result = yield self.etcd.submit(txn) |
| 409 | except Failed as failed: |
| 410 | # Leader key already present |
| 411 | pass |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 412 | else: |
Richard Jankowski | 57ebfdc | 2018-07-12 09:59:10 -0400 | [diff] [blame] | 413 | newly_asserted = True |
| 414 | log.info('leader-key-absent') |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 415 | |
Richard Jankowski | 57ebfdc | 2018-07-12 09:59:10 -0400 | [diff] [blame] | 416 | # Confirm that the assertion succeeded by reading back |
| 417 | # the value of the leader key. |
| 418 | leader = None |
| 419 | result = yield self.etcd.get(leader_prefix) |
| 420 | if result.kvs: |
| 421 | kv = result.kvs[0] |
| 422 | leader = kv.value |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 423 | log.debug('get-leader-key', leader=leader, instance=self.instance_id) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 424 | |
Richard Jankowski | 57ebfdc | 2018-07-12 09:59:10 -0400 | [diff] [blame] | 425 | if leader is None: |
| 426 | log.error('get-leader-failed') |
| 427 | elif leader == self.instance_id: |
| 428 | if newly_asserted: |
| 429 | log.info('leadership-seized') |
| 430 | yield self._assert_leadership() |
| 431 | else: |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 432 | log.debug('already-leader') |
Richard Jankowski | 57ebfdc | 2018-07-12 09:59:10 -0400 | [diff] [blame] | 433 | else: |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 434 | log.debug('leader-is-another', leader=leader) |
Richard Jankowski | 57ebfdc | 2018-07-12 09:59:10 -0400 | [diff] [blame] | 435 | yield self._assert_nonleadership(leader) |
| 436 | |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 437 | except Exception as e: |
Richard Jankowski | 57ebfdc | 2018-07-12 09:59:10 -0400 | [diff] [blame] | 438 | log.exception('unexpected-error-leader-tracking', e=e) |
| 439 | |
| 440 | finally: |
| 441 | # Except in shutdown, the loop must continue (after a short delay) |
| 442 | if not self.shutting_down: |
| 443 | reactor.callLater(self.tracking_loop_delay, |
| 444 | self._leadership_tracking_loop) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 445 | |
| 446 | @inlineCallbacks |
| 447 | def _assert_leadership(self): |
| 448 | """(Re-)assert leadership""" |
| 449 | if not self.i_am_leader: |
| 450 | self.i_am_leader = True |
| 451 | self._set_leader_id(self.instance_id) |
| 452 | yield self._just_gained_leadership() |
| 453 | |
| 454 | @inlineCallbacks |
| 455 | def _assert_nonleadership(self, leader_id): |
| 456 | """(Re-)assert non-leader role""" |
| 457 | |
| 458 | # update leader_id anyway |
| 459 | self._set_leader_id(leader_id) |
| 460 | |
| 461 | if self.i_am_leader: |
| 462 | self.i_am_leader = False |
| 463 | yield self._just_lost_leadership() |
| 464 | |
| 465 | def _set_leader_id(self, leader_id): |
| 466 | self.leader_id = leader_id |
| 467 | deferreds, self.wait_for_leader_deferreds = \ |
| 468 | self.wait_for_leader_deferreds, [] |
| 469 | for d in deferreds: |
| 470 | d.callback(leader_id) |
| 471 | |
| 472 | def _just_gained_leadership(self): |
| 473 | log.info('became-leader') |
| 474 | self.leader = Leader(self) |
| 475 | return self.leader.start() |
| 476 | |
| 477 | def _just_lost_leadership(self): |
| 478 | log.info('lost-leadership') |
| 479 | return self._halt_leader() |
| 480 | |
| 481 | def _halt_leader(self): |
| 482 | if self.leader: |
| 483 | d = self.leader.stop() |
| 484 | self.leader = None |
| 485 | return d |
| 486 | |
| 487 | def get_kv_client(self): |
| 488 | return self.etcd |
| 489 | |
| 490 | @inlineCallbacks |
| 491 | def _retry(self, operation, *args, **kw): |
| 492 | prefix = False |
Richard Jankowski | 4ea2663 | 2018-05-14 17:45:38 -0400 | [diff] [blame] | 493 | keys_only = False |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 494 | for name, value in kw.items(): |
| 495 | if name == 'acquire': |
| 496 | lease = value |
| 497 | kw['lease'] = lease |
| 498 | kw.pop('acquire') |
| 499 | elif name == 'keys': |
Richard Jankowski | 4ea2663 | 2018-05-14 17:45:38 -0400 | [diff] [blame] | 500 | keys_only = True |
| 501 | prefix = True |
| 502 | keyset = KeySet(bytes(args[0]), prefix=True) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 503 | kw['keys_only'] = True |
| 504 | kw.pop('keys') |
| 505 | elif name=='recurse': |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 506 | prefix = True |
| 507 | keyset = KeySet(bytes(args[0]), prefix=True) |
| 508 | kw.pop('recurse') |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 509 | log.debug('start-op', operation=operation, args=args, kw=kw) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 510 | |
| 511 | while 1: |
| 512 | try: |
| 513 | etcd = yield self.get_kv_client() |
| 514 | if operation == 'GET': |
| 515 | key = bytes(args[0]) |
| 516 | # If multiple keys requested, return a list |
| 517 | # else return a single record |
| 518 | if not prefix: |
| 519 | index = 0 |
| 520 | record = dict() |
| 521 | res = yield etcd.get(key, **kw) |
| 522 | if res.kvs: |
| 523 | if len(res.kvs) == 1: |
| 524 | kv = res.kvs[0] |
| 525 | index = kv.mod_revision |
| 526 | record['Key'] = kv.key |
| 527 | record['Value'] = kv.value |
| 528 | record['ModifyIndex'] = index |
| 529 | record['Session'] = self.lease.lease_id if self.lease else '' |
| 530 | result = (index, record) |
| 531 | else: |
| 532 | # Get values for all keys that match the prefix |
Richard Jankowski | 4ea2663 | 2018-05-14 17:45:38 -0400 | [diff] [blame] | 533 | # If keys_only requested, get only the keys |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 534 | index = 0 |
| 535 | records = [] |
Richard Jankowski | 4ea2663 | 2018-05-14 17:45:38 -0400 | [diff] [blame] | 536 | keys = [] |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 537 | res = yield etcd.get(keyset, **kw) |
| 538 | if args[0] == 'service/voltha/assignments/': |
| 539 | log.info('assignments', result=res) |
| 540 | if res.kvs and len(res.kvs) > 0: |
| 541 | for kv in res.kvs: |
| 542 | # Which index should be returned? The max over all keys? |
| 543 | if kv.mod_revision > index: |
| 544 | index = kv.mod_revision |
Richard Jankowski | 4ea2663 | 2018-05-14 17:45:38 -0400 | [diff] [blame] | 545 | if keys_only: |
| 546 | keys.append(kv.key) |
| 547 | else: |
| 548 | rec = dict() |
| 549 | rec['Key'] = kv.key |
| 550 | rec['Value'] = kv.value |
| 551 | rec['ModifyIndex'] = kv.mod_revision |
| 552 | rec['Session'] = self.lease.lease_id if self.lease else '' |
| 553 | records.append(rec) |
| 554 | result = (index, keys) if keys_only else (index, records) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 555 | elif operation == 'PUT': |
| 556 | key = bytes(args[0]) |
| 557 | result = yield etcd.set(key, args[1], **kw) |
| 558 | elif operation == 'DELETE': |
| 559 | key = bytes(args[0]) |
Richard Jankowski | 4ea2663 | 2018-05-14 17:45:38 -0400 | [diff] [blame] | 560 | result = yield etcd.delete(keyset) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 561 | else: |
| 562 | # Default case - consider operation as a function call |
| 563 | result = yield operation(*args, **kw) |
| 564 | self._clear_backoff() |
| 565 | break |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 566 | except Exception as e: |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 567 | if not self.shutting_down: |
| 568 | log.exception(e) |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 569 | yield self._backoff('etcd-unknown-error: %s' % e) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 570 | |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 571 | log.debug('end-op', operation=operation, args=args, kw=kw) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 572 | returnValue(result) |
| 573 | |
| 574 | @inlineCallbacks |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 575 | def coordinator_get_with_timeout(self, key, timeout, **kw): |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 576 | """ |
| 577 | Query etcd with a timeout |
| 578 | :param key: Key to query |
| 579 | :param timeout: timeout value |
| 580 | :param kw: additional key-value params |
| 581 | :return: (is_timeout, (index, result)). |
| 582 | |
| 583 | The Consul version of this method performed a 'wait-type' get operation |
| 584 | that returned a result when the key's value had a ModifyIndex greater |
| 585 | than the 'index' argument. Not sure etcd supports this functionality. |
| 586 | """ |
| 587 | |
| 588 | # Intercept 'index' argument |
| 589 | for name, value in kw.items(): |
| 590 | if name == 'index': |
| 591 | mod_revision = value |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 592 | log.debug('coordinator-get-with-timeout-etcd', |
| 593 | index=mod_revision) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 594 | kw.pop('index') |
| 595 | break |
| 596 | |
| 597 | @inlineCallbacks |
| 598 | def _get(key, m_callback): |
| 599 | try: |
| 600 | (index, result) = yield self._retry('GET', key, **kw) |
| 601 | if index > mod_revision and not m_callback.called: |
| 602 | log.debug('got-result-cancelling-timer') |
| 603 | m_callback.callback((index, result)) |
| 604 | except Exception as e: |
| 605 | log.exception('got-exception', e=e) |
| 606 | |
| 607 | try: |
| 608 | rcvd = DeferredWithTimeout(timeout=timeout) |
| 609 | _get(key, rcvd) |
| 610 | try: |
| 611 | result = yield rcvd |
| 612 | log.debug('result-received', result=result) |
| 613 | returnValue((False, result)) |
| 614 | except TimeOutError as e: |
Zack Williams | 8d811fd | 2018-11-22 09:23:23 -0700 | [diff] [blame] | 615 | log.debug('timeout-or-no-data-change', etcd_key=key) |
Richard Jankowski | 8b277c2 | 2017-12-19 09:49:27 -0500 | [diff] [blame] | 616 | except Exception as e: |
| 617 | log.exception('exception', e=e) |
| 618 | except Exception as e: |
| 619 | log.exception('exception', e=e) |
| 620 | |
| 621 | returnValue((True, (None, None))) |