Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 1 | # |
Zsolt Haraszti | 3eb27a5 | 2017-01-03 21:56:48 -0800 | [diff] [blame] | 2 | # Copyright 2017 the original author or authors. |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | # |
| 16 | |
| 17 | """ Consul-based coordinator services """ |
| 18 | |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 19 | from consul import ConsulException |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 20 | from consul.twisted import Consul |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 21 | from requests import ConnectionError |
| 22 | from structlog import get_logger |
| 23 | from twisted.internet import reactor |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 24 | from twisted.internet.defer import inlineCallbacks, returnValue, Deferred |
Jonathan Hart | dd9daf5 | 2018-05-21 17:04:45 -0700 | [diff] [blame] | 25 | from twisted.internet.error import DNSLookupError |
Zsolt Haraszti | dafefe1 | 2016-11-14 21:29:58 -0800 | [diff] [blame] | 26 | from zope.interface import implementer |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 27 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 28 | from leader import Leader |
Zsolt Haraszti | 023ea7c | 2016-10-16 19:30:34 -0700 | [diff] [blame] | 29 | from common.utils.asleep import asleep |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame] | 30 | from common.utils.message_queue import MessageQueue |
Zsolt Haraszti | dafefe1 | 2016-11-14 21:29:58 -0800 | [diff] [blame] | 31 | from voltha.registry import IComponent |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 32 | from worker import Worker |
khenaidoo | a8588f2 | 2017-06-16 12:13:34 -0400 | [diff] [blame] | 33 | from simplejson import dumps, loads |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 34 | from common.utils.deferred_utils import DeferredWithTimeout, TimeOutError |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 35 | |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 36 | log = get_logger() |
| 37 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 38 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 39 | class StaleMembershipEntryException(Exception): |
| 40 | pass |
| 41 | |
| 42 | |
Zsolt Haraszti | dafefe1 | 2016-11-14 21:29:58 -0800 | [diff] [blame] | 43 | @implementer(IComponent) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 44 | class Coordinator(object): |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 45 | """ |
| 46 | An app shall instantiate only one Coordinator (singleton). |
| 47 | A single instance of this object shall take care of all external |
| 48 | with consul, and via consul, all coordination activities with its |
| 49 | clustered peers. Roles include: |
| 50 | - registering an ephemeral membership entry (k/v record) in consul |
| 51 | - participating in a symmetric leader election, and potentially assuming |
| 52 | the leader's role. What leadership entails is not a concern for the |
| 53 | coordination, it simply instantiates (and shuts down) a leader class |
| 54 | when it gains (or looses) leadership. |
| 55 | """ |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 56 | |
| 57 | CONNECT_RETRY_INTERVAL_SEC = 1 |
| 58 | RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5] |
| 59 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 60 | # Public methods: |
| 61 | |
Zsolt Haraszti | de22bbc | 2016-09-14 15:27:33 -0700 | [diff] [blame] | 62 | def __init__(self, |
| 63 | internal_host_address, |
| 64 | external_host_address, |
| 65 | instance_id, |
| 66 | rest_port, |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 67 | config, |
Richard Jankowski | 4ea2663 | 2018-05-14 17:45:38 -0400 | [diff] [blame] | 68 | consul='localhost:8500', |
| 69 | container_name_regex='^.*\.([0-9]+)\..*$'): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 70 | |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 71 | log.info('initializing-coordinator') |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 72 | self.config = config['coordinator'] |
| 73 | self.worker_config = config['worker'] |
| 74 | self.leader_config = config['leader'] |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 75 | self.membership_watch_relatch_delay = config.get( |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 76 | 'membership_watch_relatch_delay', 0.1) |
| 77 | self.tracking_loop_delay = self.config.get( |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 78 | 'tracking_loop_delay', 1) |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 79 | self.session_renewal_timeout = self.config.get( |
| 80 | 'session_renewal_timeout', 5) |
| 81 | self.session_renewal_loop_delay = self.config.get( |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 82 | 'session_renewal_loop_delay', 3) |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 83 | self.membership_maintenance_loop_delay = self.config.get( |
| 84 | 'membership_maintenance_loop_delay', 5) |
| 85 | self.session_time_to_live = self.config.get( |
| 86 | 'session_time_to_live', 10) |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 87 | self.prefix = self.config.get('voltha_kv_prefix', 'service/voltha') |
Zsolt Haraszti | 00d9a84 | 2016-11-23 11:18:23 -0800 | [diff] [blame] | 88 | self.leader_prefix = '/'.join((self.prefix, self.config.get( |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame] | 89 | self.config['leader_key'], 'leader'))) |
Zsolt Haraszti | 00d9a84 | 2016-11-23 11:18:23 -0800 | [diff] [blame] | 90 | self.membership_prefix = '/'.join((self.prefix, self.config.get( |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame] | 91 | self.config['membership_key'], 'members'), '')) |
Zsolt Haraszti | 00d9a84 | 2016-11-23 11:18:23 -0800 | [diff] [blame] | 92 | self.assignment_prefix = '/'.join((self.prefix, self.config.get( |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame] | 93 | self.config['assignment_key'], 'assignments'), '')) |
Zsolt Haraszti | 00d9a84 | 2016-11-23 11:18:23 -0800 | [diff] [blame] | 94 | self.workload_prefix = '/'.join((self.prefix, self.config.get( |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame] | 95 | self.config['workload_key'], 'work'), '')) |
khenaidoo | 032d330 | 2017-06-09 14:50:04 -0400 | [diff] [blame] | 96 | self.core_store_prefix = '/'.join((self.prefix, self.config.get( |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame] | 97 | self.config['core_store_key'], 'data/core'))) |
| 98 | self.core_store_assignment_key = self.core_store_prefix + \ |
| 99 | '/assignment' |
| 100 | self.core_storage_suffix = 'core_store' |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 101 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 102 | self.retries = 0 |
| 103 | self.instance_id = instance_id |
| 104 | self.internal_host_address = internal_host_address |
| 105 | self.external_host_address = external_host_address |
Zsolt Haraszti | de22bbc | 2016-09-14 15:27:33 -0700 | [diff] [blame] | 106 | self.rest_port = rest_port |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 107 | self.membership_record_key = self.membership_prefix + self.instance_id |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 108 | |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 109 | self.session_id = None |
| 110 | self.i_am_leader = False |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 111 | self.leader_id = None # will be the instance id of the current leader |
| 112 | self.shutting_down = False |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 113 | self.leader = None |
khenaidoo | e154d59 | 2017-08-03 19:08:27 -0400 | [diff] [blame] | 114 | self.membership_callback = None |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 115 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 116 | self.worker = Worker(self.instance_id, self) |
| 117 | |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 118 | self.host = consul.split(':')[0].strip() |
| 119 | self.port = int(consul.split(':')[1].strip()) |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 120 | |
| 121 | # TODO need to handle reconnect events properly |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 122 | self.consul = Consul(host=self.host, port=self.port) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 123 | |
Richard Jankowski | 4ea2663 | 2018-05-14 17:45:38 -0400 | [diff] [blame] | 124 | self.container_name_regex = container_name_regex |
| 125 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 126 | self.wait_for_leader_deferreds = [] |
| 127 | |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame] | 128 | self.peers_mapping_queue = MessageQueue() |
| 129 | |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 130 | def start(self): |
| 131 | log.debug('starting') |
| 132 | reactor.callLater(0, self._async_init) |
| 133 | log.info('started') |
Zsolt Haraszti | dafefe1 | 2016-11-14 21:29:58 -0800 | [diff] [blame] | 134 | return self |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 135 | |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 136 | @inlineCallbacks |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 137 | def stop(self): |
| 138 | log.debug('stopping') |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 139 | self.shutting_down = True |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 140 | yield self._delete_session() # this will delete the leader lock too |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 141 | yield self.worker.stop() |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 142 | if self.leader is not None: |
Zsolt Haraszti | dafefe1 | 2016-11-14 21:29:58 -0800 | [diff] [blame] | 143 | yield self.leader.stop() |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 144 | self.leader = None |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 145 | log.info('stopped') |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 146 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 147 | def wait_for_a_leader(self): |
| 148 | """ |
| 149 | Async wait till a leader is detected/elected. The deferred will be |
| 150 | called with the leader's instance_id |
| 151 | :return: Deferred. |
| 152 | """ |
| 153 | d = Deferred() |
| 154 | if self.leader_id is not None: |
| 155 | d.callback(self.leader_id) |
| 156 | return d |
| 157 | else: |
| 158 | self.wait_for_leader_deferreds.append(d) |
| 159 | return d |
| 160 | |
khenaidoo | 032d330 | 2017-06-09 14:50:04 -0400 | [diff] [blame] | 161 | # Wait for a core data id to be assigned to this voltha instance |
| 162 | @inlineCallbacks |
| 163 | def get_core_store_id_and_prefix(self): |
| 164 | core_store_id = yield self.worker.get_core_store_id() |
| 165 | returnValue((core_store_id, self.core_store_prefix)) |
| 166 | |
khenaidoo | 08d48d2 | 2017-06-29 19:42:49 -0400 | [diff] [blame] | 167 | def recv_peers_map(self): |
| 168 | return self.peers_mapping_queue.get() |
| 169 | |
| 170 | def publish_peers_map_change(self, msg): |
| 171 | self.peers_mapping_queue.put(msg) |
| 172 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 173 | # Proxy methods for consul with retry support |
| 174 | |
| 175 | def kv_get(self, *args, **kw): |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 176 | return self._retry('GET', *args, **kw) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 177 | |
| 178 | def kv_put(self, *args, **kw): |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 179 | return self._retry('PUT', *args, **kw) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 180 | |
| 181 | def kv_delete(self, *args, **kw): |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 182 | return self._retry('DELETE', *args, **kw) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 183 | |
Zsolt Haraszti | 00d9a84 | 2016-11-23 11:18:23 -0800 | [diff] [blame] | 184 | # Methods exposing key membership information |
| 185 | |
| 186 | @inlineCallbacks |
| 187 | def get_members(self): |
| 188 | """Return list of all members""" |
| 189 | _, members = yield self.kv_get(self.membership_prefix, recurse=True) |
| 190 | returnValue([member['Key'][len(self.membership_prefix):] |
| 191 | for member in members]) |
| 192 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 193 | # Private (internal) methods: |
| 194 | |
| 195 | @inlineCallbacks |
| 196 | def _async_init(self): |
| 197 | yield self._create_session() |
| 198 | yield self._create_membership_record() |
| 199 | yield self._start_leader_tracking() |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 200 | yield self.worker.start() |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 201 | |
| 202 | def _backoff(self, msg): |
Zsolt Haraszti | 109db83 | 2016-09-16 16:32:36 -0700 | [diff] [blame] | 203 | wait_time = self.RETRY_BACKOFF[min(self.retries, |
| 204 | len(self.RETRY_BACKOFF) - 1)] |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 205 | self.retries += 1 |
Jonathan Hart | dd9daf5 | 2018-05-21 17:04:45 -0700 | [diff] [blame] | 206 | log.info(msg, retry_in=wait_time) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 207 | return asleep(wait_time) |
| 208 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 209 | def _clear_backoff(self): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 210 | if self.retries: |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 211 | log.info('reconnected-to-consul', after_retries=self.retries) |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 212 | self.retries = 0 |
| 213 | |
| 214 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 215 | def _create_session(self): |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 216 | |
| 217 | @inlineCallbacks |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 218 | def _create_session(): |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 219 | consul = yield self.get_consul() |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 220 | # create consul session |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 221 | self.session_id = yield consul.session.create( |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 222 | behavior='release', ttl=self.session_time_to_live, |
| 223 | lock_delay=1) |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 224 | log.info('created-consul-session', session_id=self.session_id) |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 225 | self._start_session_tracking() |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 226 | |
| 227 | yield self._retry(_create_session) |
| 228 | |
| 229 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 230 | def _delete_session(self): |
khenaidoo | e154d59 | 2017-08-03 19:08:27 -0400 | [diff] [blame] | 231 | try: |
| 232 | yield self.consul.session.destroy(self.session_id) |
| 233 | except Exception as e: |
| 234 | log.exception('failed-to-delete-session', |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 235 | session_id=self.session_id) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 236 | |
| 237 | @inlineCallbacks |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 238 | def _create_membership_record(self): |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 239 | yield self._do_create_membership_record_with_retries() |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 240 | reactor.callLater(0, self._maintain_membership_record) |
| 241 | |
| 242 | @inlineCallbacks |
| 243 | def _maintain_membership_record(self): |
| 244 | try: |
| 245 | while 1: |
| 246 | valid_membership = yield self._assert_membership_record_valid() |
| 247 | if not valid_membership: |
| 248 | log.info('recreating-membership-before', |
khenaidoo | d6e0e80 | 2017-08-29 19:55:44 -0400 | [diff] [blame] | 249 | session=self.session_id) |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 250 | yield self._do_create_membership_record_with_retries() |
| 251 | log.info('recreating-membership-after', |
khenaidoo | d6e0e80 | 2017-08-29 19:55:44 -0400 | [diff] [blame] | 252 | session=self.session_id) |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 253 | else: |
| 254 | log.debug('valid-membership', session=self.session_id) |
| 255 | # Async sleep before checking the membership record again |
| 256 | yield asleep(self.membership_maintenance_loop_delay) |
| 257 | |
| 258 | except Exception, e: |
| 259 | log.exception('unexpected-error-leader-trackin', e=e) |
| 260 | finally: |
| 261 | # except in shutdown, the loop must continue (after a short delay) |
| 262 | if not self.shutting_down: |
| 263 | reactor.callLater(self.membership_watch_relatch_delay, |
| 264 | self._maintain_membership_record) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 265 | |
khenaidoo | a8588f2 | 2017-06-16 12:13:34 -0400 | [diff] [blame] | 266 | def _create_membership_record_data(self): |
| 267 | member_record = dict() |
| 268 | member_record['status'] = 'alive' |
| 269 | member_record['host_address'] = self.external_host_address |
| 270 | return member_record |
| 271 | |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 272 | @inlineCallbacks |
khenaidoo | e154d59 | 2017-08-03 19:08:27 -0400 | [diff] [blame] | 273 | def _assert_membership_record_valid(self): |
| 274 | try: |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 275 | log.info('membership-record-before') |
khenaidoo | 890b090 | 2017-08-30 10:11:32 -0400 | [diff] [blame] | 276 | is_timeout, (_, record) = yield \ |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 277 | self.coordinator_get_with_timeout( |
khenaidoo | 890b090 | 2017-08-30 10:11:32 -0400 | [diff] [blame] | 278 | key=self.membership_record_key, |
| 279 | index=0, |
| 280 | timeout=5) |
| 281 | if is_timeout: |
| 282 | returnValue(False) |
| 283 | |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 284 | log.info('membership-record-after', record=record) |
khenaidoo | e154d59 | 2017-08-03 19:08:27 -0400 | [diff] [blame] | 285 | if record is None or \ |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 286 | 'Session' not in record or \ |
| 287 | record['Session'] != self.session_id: |
| 288 | log.info('membership-record-change-detected', |
| 289 | old_session=self.session_id, |
| 290 | record=record) |
khenaidoo | e154d59 | 2017-08-03 19:08:27 -0400 | [diff] [blame] | 291 | returnValue(False) |
| 292 | else: |
| 293 | returnValue(True) |
| 294 | except Exception as e: |
| 295 | log.exception('membership-validation-exception', e=e) |
| 296 | returnValue(False) |
| 297 | |
khenaidoo | e154d59 | 2017-08-03 19:08:27 -0400 | [diff] [blame] | 298 | @inlineCallbacks |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 299 | def _do_create_membership_record_with_retries(self): |
| 300 | while 1: |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 301 | log.info('recreating-membership', session=self.session_id) |
| 302 | result = yield self._retry( |
| 303 | 'PUT', |
| 304 | self.membership_record_key, |
| 305 | dumps(self._create_membership_record_data()), |
| 306 | acquire=self.session_id) |
| 307 | if result: |
| 308 | log.info('new-membership-record-created', |
| 309 | session=self.session_id) |
khenaidoo | e154d59 | 2017-08-03 19:08:27 -0400 | [diff] [blame] | 310 | break |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 311 | else: |
| 312 | log.warn('cannot-create-membership-record') |
| 313 | yield self._backoff('stale-membership-record') |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 314 | |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 315 | def _start_session_tracking(self): |
| 316 | reactor.callLater(0, self._session_tracking_loop) |
| 317 | |
| 318 | @inlineCallbacks |
| 319 | def _session_tracking_loop(self): |
| 320 | |
| 321 | @inlineCallbacks |
| 322 | def _redo_session(): |
| 323 | log.info('_redo_session-before') |
khenaidoo | e154d59 | 2017-08-03 19:08:27 -0400 | [diff] [blame] | 324 | yield self._delete_session() |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 325 | |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 326 | # Create a new consul connection/session with a TTL of 25 secs |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 327 | try: |
| 328 | self.consul = Consul(host=self.host, port=self.port) |
| 329 | self.session_id = yield self.consul.session.create( |
| 330 | behavior='release', |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 331 | ttl=self.session_time_to_live, |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 332 | lock_delay=1) |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 333 | log.info('new-consul-session', session=self.session_id) |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 334 | |
| 335 | except Exception as e: |
| 336 | log.exception('could-not-create-a-consul-session', e=e) |
| 337 | |
| 338 | @inlineCallbacks |
| 339 | def _renew_session(m_callback): |
| 340 | try: |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 341 | log.debug('_renew_session-before') |
| 342 | consul_ref = self.consul |
| 343 | result = yield consul_ref.session.renew( |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 344 | session_id=self.session_id) |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 345 | log.info('just-renewed-session', result=result) |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 346 | if not m_callback.called: |
| 347 | # Triggering callback will cancel the timeout timer |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 348 | log.info('trigger-callback-to-cancel-timout-timer') |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 349 | m_callback.callback(result) |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 350 | else: |
| 351 | # Timeout event has already been called. Just ignore |
| 352 | # this event |
| 353 | log.info('renew-called-after-timout', |
| 354 | new_consul_ref=self.consul, |
| 355 | old_consul_ref=consul_ref) |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 356 | except Exception, e: |
| 357 | # Let the invoking method receive a timeout |
| 358 | log.exception('could-not-renew-session', e=e) |
| 359 | |
| 360 | try: |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 361 | while 1: |
| 362 | log.debug('session-tracking-start') |
| 363 | rcvd = DeferredWithTimeout( |
| 364 | timeout=self.session_renewal_timeout) |
| 365 | _renew_session(rcvd) |
| 366 | try: |
| 367 | _ = yield rcvd |
| 368 | except TimeOutError as e: |
| 369 | log.info('session-renew-timeout', e=e) |
| 370 | # Redo the session |
| 371 | yield _redo_session() |
| 372 | except Exception as e: |
| 373 | log.exception('session-renew-exception', e=e) |
| 374 | else: |
| 375 | log.debug('successfully-renewed-session') |
| 376 | |
| 377 | # Async sleep before the next session tracking |
| 378 | yield asleep(self.session_renewal_loop_delay) |
| 379 | |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 380 | except Exception as e: |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 381 | log.exception('renew-exception', e=e) |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 382 | finally: |
| 383 | reactor.callLater(self.session_renewal_loop_delay, |
| 384 | self._session_tracking_loop) |
| 385 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 386 | def _start_leader_tracking(self): |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 387 | reactor.callLater(0, self._leadership_tracking_loop) |
| 388 | |
| 389 | @inlineCallbacks |
| 390 | def _leadership_tracking_loop(self): |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 391 | try: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 392 | # Attempt to acquire leadership lock. True indicates success; |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 393 | # False indicates there is already a leader. It's instance id |
| 394 | # is then the value under the leader key service/voltha/leader. |
| 395 | |
| 396 | # attempt acquire leader lock |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 397 | log.info('leadership-attempt-before') |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 398 | result = yield self._retry('PUT', |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 399 | self.leader_prefix, |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 400 | self.instance_id, |
| 401 | acquire=self.session_id) |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 402 | log.info('leadership-attempt-after') |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 403 | |
| 404 | # read it back before being too happy; seeing our session id is a |
| 405 | # proof and now we have the change id that we can use to reliably |
| 406 | # track any changes. In an unlikely scenario where the leadership |
| 407 | # key gets wiped out administratively since the previous line, |
| 408 | # the returned record can be None. Handle it. |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 409 | (index, record) = yield self._retry('GET', |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 410 | self.leader_prefix) |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 411 | log.info('leader-prefix', |
| 412 | i_am_leader=result, index=index, record=record) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 413 | |
| 414 | if record is not None: |
| 415 | if result is True: |
| 416 | if record['Session'] == self.session_id: |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 417 | yield self._assert_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 418 | else: |
| 419 | pass # confusion; need to retry leadership |
| 420 | else: |
| 421 | leader_id = record['Value'] |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 422 | yield self._assert_nonleadership(leader_id) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 423 | |
| 424 | # if record was none, we shall try leadership again |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 425 | last = record |
| 426 | while last is not None: |
| 427 | # this shall return only when update is made to leader key |
khenaidoo | d6e0e80 | 2017-08-29 19:55:44 -0400 | [diff] [blame] | 428 | # or expires after 5 seconds wait |
| 429 | is_timeout, (tmp_index, updated) = yield \ |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 430 | self.coordinator_get_with_timeout( |
khenaidoo | d6e0e80 | 2017-08-29 19:55:44 -0400 | [diff] [blame] | 431 | key=self.leader_prefix, |
| 432 | index=index, |
| 433 | timeout=5) |
| 434 | # Timeout means either there is a lost connectivity to |
| 435 | # consul or there are no change to that key. Do nothing. |
| 436 | if is_timeout: |
| 437 | continue |
| 438 | |
| 439 | # After timeout event the index returned from |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 440 | # coordinator_get_with_timeout is None. If we are here it's |
| 441 | # not a timeout, therefore the index is a valid one. |
khenaidoo | d6e0e80 | 2017-08-29 19:55:44 -0400 | [diff] [blame] | 442 | index=tmp_index |
| 443 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 444 | if updated is None or updated != last: |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 445 | log.info('leader-key-change', |
| 446 | index=index, updated=updated, last=last) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 447 | # leadership has changed or vacated (or forcefully |
| 448 | # removed), apply now |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 449 | # If I was previoulsy the leader then assert a non |
| 450 | # leadership role before going for election |
| 451 | if self.i_am_leader: |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 452 | log.info('leaving-leaderdhip', |
| 453 | leader=self.instance_id) |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 454 | yield self._assert_nonleadership(self.instance_id) |
| 455 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 456 | break |
| 457 | last = updated |
| 458 | |
| 459 | except Exception, e: |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 460 | log.exception('unexpected-error-leader-trackin', e=e) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 461 | |
| 462 | finally: |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 463 | # except in shutdown, the loop must continue (after a short delay) |
| 464 | if not self.shutting_down: |
Rouzbahan Rashidi-Tabrizi | 1c3eba8 | 2016-10-27 21:47:18 -0400 | [diff] [blame] | 465 | reactor.callLater(self.tracking_loop_delay, |
| 466 | self._leadership_tracking_loop) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 467 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 468 | @inlineCallbacks |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 469 | def _assert_leadership(self): |
| 470 | """(Re-)assert leadership""" |
| 471 | if not self.i_am_leader: |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 472 | self.i_am_leader = True |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 473 | self._set_leader_id(self.instance_id) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 474 | yield self._just_gained_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 475 | |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 476 | @inlineCallbacks |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 477 | def _assert_nonleadership(self, leader_id): |
| 478 | """(Re-)assert non-leader role""" |
| 479 | |
| 480 | # update leader_id anyway |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 481 | self._set_leader_id(leader_id) |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 482 | |
| 483 | if self.i_am_leader: |
| 484 | self.i_am_leader = False |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 485 | yield self._just_lost_leadership() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 486 | |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 487 | def _set_leader_id(self, leader_id): |
| 488 | self.leader_id = leader_id |
| 489 | deferreds, self.wait_for_leader_deferreds = \ |
| 490 | self.wait_for_leader_deferreds, [] |
| 491 | for d in deferreds: |
| 492 | d.callback(leader_id) |
| 493 | |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 494 | def _just_gained_leadership(self): |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 495 | log.info('became-leader') |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 496 | self.leader = Leader(self) |
| 497 | return self.leader.start() |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 498 | |
| 499 | def _just_lost_leadership(self): |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 500 | log.info('lost-leadership') |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 501 | return self._halt_leader() |
| 502 | |
| 503 | def _halt_leader(self): |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 504 | if self.leader: |
| 505 | d = self.leader.stop() |
| 506 | self.leader = None |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 507 | return d |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 508 | |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 509 | def get_consul(self): |
| 510 | return self.consul |
| 511 | |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 512 | @inlineCallbacks |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 513 | def _retry(self, operation, *args, **kw): |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 514 | while 1: |
| 515 | try: |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 516 | consul = yield self.get_consul() |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 517 | log.info('start', operation=operation, args=args) |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 518 | if operation == 'GET': |
| 519 | result = yield consul.kv.get(*args, **kw) |
| 520 | elif operation == 'PUT': |
| 521 | for name, value in kw.items(): |
| 522 | if name == 'acquire': |
| 523 | if value != self.session_id: |
| 524 | log.info('updating-session-in-put-operation', |
| 525 | old_session=value, |
| 526 | new_session=self.session_id) |
| 527 | kw['acquire'] = self.session_id |
| 528 | break |
| 529 | result = yield consul.kv.put(*args, **kw) |
| 530 | elif operation == 'DELETE': |
| 531 | result = yield consul.kv.delete(*args, **kw) |
| 532 | else: |
| 533 | # Default case - consider operation as a function call |
| 534 | result = yield operation(*args, **kw) |
Zsolt Haraszti | d4226ed | 2016-10-05 17:49:27 -0700 | [diff] [blame] | 535 | self._clear_backoff() |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 536 | break |
| 537 | except ConsulException, e: |
khenaidoo | 5431e4c | 2017-08-17 15:05:40 -0400 | [diff] [blame] | 538 | log.exception('consul-not-up', |
| 539 | operation=operation, |
| 540 | args=args, |
| 541 | session=self.consul.Session, |
| 542 | e=e) |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 543 | yield self._backoff('consul-not-up') |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 544 | except ConnectionError, e: |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 545 | log.exception('cannot-connect-to-consul', |
khenaidoo | 5431e4c | 2017-08-17 15:05:40 -0400 | [diff] [blame] | 546 | operation=operation, |
| 547 | args=args, |
| 548 | session=self.consul.Session, |
| 549 | e=e) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 550 | yield self._backoff('cannot-connect-to-consul') |
Jonathan Hart | dd9daf5 | 2018-05-21 17:04:45 -0700 | [diff] [blame] | 551 | except DNSLookupError, e: |
| 552 | log.info('dns-lookup-failed', operation=operation, args=args, |
| 553 | host=self.host) |
| 554 | yield self._backoff('dns-lookup-failed') |
Zsolt Haraszti | 1420def | 2016-09-18 00:07:31 -0700 | [diff] [blame] | 555 | except StaleMembershipEntryException, e: |
khenaidoo | b1602a3 | 2017-07-27 16:59:52 -0400 | [diff] [blame] | 556 | log.exception('stale-membership-record-in-the-way', |
khenaidoo | 5431e4c | 2017-08-17 15:05:40 -0400 | [diff] [blame] | 557 | operation=operation, |
| 558 | args=args, |
| 559 | session=self.consul.Session, |
| 560 | e=e) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 561 | yield self._backoff('stale-membership-record-in-the-way') |
Zsolt Haraszti | f2da1d0 | 2016-09-13 23:21:35 -0700 | [diff] [blame] | 562 | except Exception, e: |
Zsolt Haraszti | ac9310d | 2016-09-20 12:56:35 -0700 | [diff] [blame] | 563 | if not self.shutting_down: |
Zsolt Haraszti | 2bdb6b3 | 2016-11-03 16:56:17 -0700 | [diff] [blame] | 564 | log.exception(e) |
Zsolt Haraszti | a341031 | 2016-09-18 23:29:04 -0700 | [diff] [blame] | 565 | yield self._backoff('unknown-error') |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 566 | |
khenaidoo | 686f7bd | 2017-08-11 11:41:33 -0400 | [diff] [blame] | 567 | log.info('end', operation=operation, args=args) |
Zsolt Haraszti | e060a7d | 2016-09-16 11:08:24 -0700 | [diff] [blame] | 568 | returnValue(result) |
khenaidoo | d6e0e80 | 2017-08-29 19:55:44 -0400 | [diff] [blame] | 569 | |
| 570 | @inlineCallbacks |
Zack Williams | 18357ed | 2018-11-14 10:41:08 -0700 | [diff] [blame] | 571 | def coordinator_get_with_timeout(self, key, timeout, **kw): |
khenaidoo | d6e0e80 | 2017-08-29 19:55:44 -0400 | [diff] [blame] | 572 | """ |
| 573 | Query consul with a timeout |
| 574 | :param key: Key to query |
| 575 | :param timeout: timeout value |
| 576 | :param kw: additional key-value params |
| 577 | :return: (is_timeout, (index, result)). |
| 578 | """ |
| 579 | |
| 580 | @inlineCallbacks |
| 581 | def _get(key, m_callback): |
| 582 | try: |
| 583 | (index, result) = yield self._retry('GET', key, **kw) |
| 584 | if not m_callback.called: |
| 585 | log.debug('got-result-cancelling-timer') |
| 586 | m_callback.callback((index, result)) |
| 587 | except Exception as e: |
| 588 | log.exception('got-exception', e=e) |
| 589 | |
| 590 | try: |
| 591 | rcvd = DeferredWithTimeout(timeout=timeout) |
| 592 | _get(key, rcvd) |
| 593 | try: |
| 594 | result = yield rcvd |
| 595 | log.debug('result-received', result=result) |
| 596 | returnValue((False, result)) |
| 597 | except TimeOutError as e: |
Zack Williams | 8d811fd | 2018-11-22 09:23:23 -0700 | [diff] [blame] | 598 | log.debug('timeout-or-no-data-change', consul_key=key) |
khenaidoo | d6e0e80 | 2017-08-29 19:55:44 -0400 | [diff] [blame] | 599 | except Exception as e: |
| 600 | log.exception('exception', e=e) |
| 601 | except Exception as e: |
| 602 | log.exception('exception', e=e) |
| 603 | |
| 604 | returnValue((True, (None, None))) |