blob: d9063488ab7745358e86935804ea0c276a37a85b [file] [log] [blame]
William Kurkianfefd4642019-02-07 15:30:03 -05001# Copyright 2017 the original author or authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15from consul import Consul, ConsulException
16from common.utils.asleep import asleep
17from requests import ConnectionError
18from twisted.internet.defer import inlineCallbacks, returnValue
19
20import etcd3
21import structlog
22
23
24class ConsulStore(object):
25 """ Config kv store for consul with a cache for quicker subsequent reads
26
27 TODO: This will block the reactor. Should either change
28 whole call stack to yield or put the put/delete transactions into a
29 queue to write later with twisted. Will need a transaction
30 log to ensure we don't lose anything.
31 Making the whole callstack yield is troublesome because other tasks can
32 come in on the side and start modifying things which could be bad.
33 """
34
35 CONNECT_RETRY_INTERVAL_SEC = 1
36 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
37
38 def __init__(self, host, port, path_prefix):
39
40 self.log = structlog.get_logger()
41 self._consul = Consul(host=host, port=port)
42 self.host = host
43 self.port = port
44 self._path_prefix = path_prefix
45 self._cache = {}
46 self.retries = 0
47
48 def make_path(self, key):
49 return '{}/{}'.format(self._path_prefix, key)
50
51 def __getitem__(self, key):
52 if key in self._cache:
53 return self._cache[key]
54 value = self._kv_get(self.make_path(key))
55 if value is not None:
56 # consul turns empty strings to None, so we do the reverse here
57 self._cache[key] = value['Value'] or ''
58 return value['Value'] or ''
59 else:
60 raise KeyError(key)
61
62 def __contains__(self, key):
63 if key in self._cache:
64 return True
65 value = self._kv_get(self.make_path(key))
66 if value is not None:
67 self._cache[key] = value['Value']
68 return True
69 else:
70 return False
71
72 def __setitem__(self, key, value):
73 try:
74 assert isinstance(value, basestring)
75 self._cache[key] = value
76 self._kv_put(self.make_path(key), value)
77 except Exception, e:
78 self.log.exception('cannot-set-item', e=e)
79
80 def __delitem__(self, key):
81 self._cache.pop(key, None)
82 self._kv_delete(self.make_path(key))
83
84 @inlineCallbacks
85 def _backoff(self, msg):
86 wait_time = self.RETRY_BACKOFF[min(self.retries,
87 len(self.RETRY_BACKOFF) - 1)]
88 self.retries += 1
89 self.log.error(msg, retry_in=wait_time)
90 yield asleep(wait_time)
91
92 def _redo_consul_connection(self):
93 self._consul = Consul(host=self.host, port=self.port)
94 self._cache.clear()
95
96 def _clear_backoff(self):
97 if self.retries:
98 self.log.info('reconnected-to-consul', after_retries=self.retries)
99 self.retries = 0
100
101 def _get_consul(self):
102 return self._consul
103
104 # Proxy methods for consul with retry support
105 def _kv_get(self, *args, **kw):
106 return self._retry('GET', *args, **kw)
107
108 def _kv_put(self, *args, **kw):
109 return self._retry('PUT', *args, **kw)
110
111 def _kv_delete(self, *args, **kw):
112 return self._retry('DELETE', *args, **kw)
113
114 def _retry(self, operation, *args, **kw):
115 while 1:
116 try:
117 consul = self._get_consul()
118 self.log.debug('consul', consul=consul, operation=operation,
119 args=args)
120 if operation == 'GET':
121 index, result = consul.kv.get(*args, **kw)
122 elif operation == 'PUT':
123 result = consul.kv.put(*args, **kw)
124 elif operation == 'DELETE':
125 result = consul.kv.delete(*args, **kw)
126 else:
127 # Default case - consider operation as a function call
128 result = operation(*args, **kw)
129 self._clear_backoff()
130 break
131 except ConsulException, e:
132 self.log.exception('consul-not-up', e=e)
133 self._backoff('consul-not-up')
134 except ConnectionError, e:
135 self.log.exception('cannot-connect-to-consul', e=e)
136 self._backoff('cannot-connect-to-consul')
137 except Exception, e:
138 self.log.exception(e)
139 self._backoff('unknown-error')
140 self._redo_consul_connection()
141
142 return result
143
144
145class EtcdStore(object):
146 """ Config kv store for etcd with a cache for quicker subsequent reads
147
148 TODO: This will block the reactor. Should either change
149 whole call stack to yield or put the put/delete transactions into a
150 queue to write later with twisted. Will need a transaction
151 log to ensure we don't lose anything.
152 Making the whole callstack yield is troublesome because other tasks can
153 come in on the side and start modifying things which could be bad.
154 """
155
156 CONNECT_RETRY_INTERVAL_SEC = 1
157 RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
158
159 def __init__(self, host, port, path_prefix):
160
161 self.log = structlog.get_logger()
162 self._etcd = etcd3.client(host=host, port=port)
163 self.host = host
164 self.port = port
165 self._path_prefix = path_prefix
166 self._cache = {}
167 self.retries = 0
168
169 def make_path(self, key):
170 return '{}/{}'.format(self._path_prefix, key)
171
172 def __getitem__(self, key):
173 if key in self._cache:
174 return self._cache[key]
175 (value, meta) = self._kv_get(self.make_path(key))
176 if value is not None:
177 self._cache[key] = value
178 return value
179 else:
180 raise KeyError(key)
181
182 def __contains__(self, key):
183 if key in self._cache:
184 return True
185 (value, meta) = self._kv_get(self.make_path(key))
186 if value is not None:
187 self._cache[key] = value
188 return True
189 else:
190 return False
191
192 def __setitem__(self, key, value):
193 try:
194 assert isinstance(value, basestring)
195 self._cache[key] = value
196 self._kv_put(self.make_path(key), value)
197 except Exception, e:
198 self.log.exception('cannot-set-item', e=e)
199
200 def __delitem__(self, key):
201 self._cache.pop(key, None)
202 self._kv_delete(self.make_path(key))
203
204 @inlineCallbacks
205 def _backoff(self, msg):
206 wait_time = self.RETRY_BACKOFF[min(self.retries,
207 len(self.RETRY_BACKOFF) - 1)]
208 self.retries += 1
209 self.log.error(msg, retry_in=wait_time)
210 yield asleep(wait_time)
211
212 def _redo_etcd_connection(self):
213 self._etcd = etcd3.client(host=self.host, port=self.port)
214 self._cache.clear()
215
216 def _clear_backoff(self):
217 if self.retries:
218 self.log.info('reconnected-to-etcd', after_retries=self.retries)
219 self.retries = 0
220
221 def _get_etcd(self):
222 return self._etcd
223
224 # Proxy methods for etcd with retry support
225 def _kv_get(self, *args, **kw):
226 return self._retry('GET', *args, **kw)
227
228 def _kv_put(self, *args, **kw):
229 return self._retry('PUT', *args, **kw)
230
231 def _kv_delete(self, *args, **kw):
232 return self._retry('DELETE', *args, **kw)
233
234 def _retry(self, operation, *args, **kw):
235
236 # etcd data sometimes contains non-utf8 sequences, replace
237 self.log.debug('backend-op',
238 operation=operation,
239 args=map(lambda x : unicode(x,'utf8','replace'), args),
240 kw=kw)
241
242 while 1:
243 try:
244 etcd = self._get_etcd()
245 self.log.debug('etcd', etcd=etcd, operation=operation,
246 args=map(lambda x : unicode(x,'utf8','replace'), args))
247 if operation == 'GET':
248 (value, meta) = etcd.get(*args, **kw)
249 result = (value, meta)
250 elif operation == 'PUT':
251 result = etcd.put(*args, **kw)
252 elif operation == 'DELETE':
253 result = etcd.delete(*args, **kw)
254 else:
255 # Default case - consider operation as a function call
256 result = operation(*args, **kw)
257 self._clear_backoff()
258 break
259 except Exception, e:
260 self.log.exception(e)
261 self._backoff('unknown-error-with-etcd')
262 self._redo_etcd_connection()
263
264 return result
265
266
267def load_backend(store_id, store_prefix, args):
268 """ Return the kv store backend based on the command line arguments
269 """
270
271 def load_consul_store():
272 instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
273
274 host, port = args.consul.split(':', 1)
275 return ConsulStore(host, int(port), instance_core_store_prefix)
276
277 def load_etcd_store():
278 instance_core_store_prefix = '{}/{}'.format(store_prefix, store_id)
279
280 host, port = args.etcd.split(':', 1)
281 return EtcdStore(host, int(port), instance_core_store_prefix)
282
283 loaders = {
284 'none': lambda: None,
285 'consul': load_consul_store,
286 'etcd': load_etcd_store
287 }
288
289 return loaders[args.backend]()