blob: 4b1006d490171a12bf3c4b81b4a7fba6a868a1ce [file] [log] [blame]
Zsolt Harasztidafefe12016-11-14 21:29:58 -08001#
Zsolt Haraszti3eb27a52017-01-03 21:56:48 -08002# Copyright 2017 the original author or authors.
Zsolt Harasztidafefe12016-11-14 21:29:58 -08003#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16from uuid import uuid4
17
18import structlog
Zsolt Harasztib7067842016-11-22 18:11:53 -080019from simplejson import dumps, loads
Zsolt Harasztidafefe12016-11-14 21:29:58 -080020
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080021from voltha.core.config.config_node import ConfigNode
Zsolt Harasztib7067842016-11-22 18:11:53 -080022from voltha.core.config.config_rev import ConfigRevision
23from voltha.core.config.config_rev_persisted import PersistedConfigRevision
Zsolt Haraszti00d9a842016-11-23 11:18:23 -080024from voltha.core.config.merge_3way import MergeConflictException
Zsolt Harasztidafefe12016-11-14 21:29:58 -080025
26log = structlog.get_logger()
27
28
29class ConfigRoot(ConfigNode):
30
31 __slots__ = (
32 '_dirty_nodes', # holds set of modified nodes per transaction branch
Zsolt Harasztib7067842016-11-22 18:11:53 -080033 '_kv_store',
34 '_loading',
Zsolt Haraszti66862032016-11-28 14:28:39 -080035 '_rev_cls',
Khen Nursimulu56b36472017-03-08 15:32:42 -050036 '_deferred_callback_queue',
37 '_notification_deferred_callback_queue'
Zsolt Harasztidafefe12016-11-14 21:29:58 -080038 )
39
Zsolt Harasztib7067842016-11-22 18:11:53 -080040 def __init__(self, initial_data, kv_store=None, rev_cls=ConfigRevision):
41 self._kv_store = kv_store
Zsolt Harasztidafefe12016-11-14 21:29:58 -080042 self._dirty_nodes = {}
Zsolt Harasztib7067842016-11-22 18:11:53 -080043 self._loading = False
44 if kv_store is not None and \
45 not issubclass(rev_cls, PersistedConfigRevision):
46 rev_cls = PersistedConfigRevision
47 self._rev_cls = rev_cls
Zsolt Haraszti66862032016-11-28 14:28:39 -080048 self._deferred_callback_queue = []
Khen Nursimulu56b36472017-03-08 15:32:42 -050049 self._notification_deferred_callback_queue = []
Zsolt Harasztib7067842016-11-22 18:11:53 -080050 super(ConfigRoot, self).__init__(self, initial_data, False)
51
52 @property
53 def kv_store(self):
54 if self._loading:
55 # provide fake store for storing things
56 # TODO this shall be a fake_dict providing noop for all relevant
57 # operations
58 return dict()
59 else:
60 return self._kv_store
61
62 def mkrev(self, *args, **kw):
63 return self._rev_cls(*args, **kw)
Zsolt Harasztidafefe12016-11-14 21:29:58 -080064
65 def mk_txbranch(self):
66 txid = uuid4().hex[:12]
67 self._dirty_nodes[txid] = {self}
68 self._mk_txbranch(txid)
69 return txid
70
71 def del_txbranch(self, txid):
72 for dirty_node in self._dirty_nodes[txid]:
73 dirty_node._del_txbranch(txid)
74 del self._dirty_nodes[txid]
75
76 def fold_txbranch(self, txid):
77 try:
78 self._merge_txbranch(txid, dry_run=1)
79 except MergeConflictException:
80 self.del_txbranch(txid)
81 raise
82
Zsolt Haraszti66862032016-11-28 14:28:39 -080083 try:
84 self._merge_txbranch(txid)
85 finally:
86 self.execute_deferred_callbacks()
Zsolt Harasztidafefe12016-11-14 21:29:58 -080087
88 # ~~~~~~ Overridden, root-level CRUD methods to handle transactions ~~~~~~~
89
90 def update(self, path, data, strict=None, txid=None, mk_branch=None):
91 assert mk_branch is None
Zsolt Haraszti66862032016-11-28 14:28:39 -080092 self.check_callback_queue()
93 try:
94 if txid is not None:
95 dirtied = self._dirty_nodes[txid]
Zsolt Harasztidafefe12016-11-14 21:29:58 -080096
Zsolt Haraszti66862032016-11-28 14:28:39 -080097 def track_dirty(node):
98 dirtied.add(node)
99 return node._mk_txbranch(txid)
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800100
Zsolt Haraszti66862032016-11-28 14:28:39 -0800101 res = super(ConfigRoot, self).update(path, data, strict,
102 txid, track_dirty)
103 else:
104 res = super(ConfigRoot, self).update(path, data, strict)
105 finally:
106 self.execute_deferred_callbacks()
107 return res
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800108
109 def add(self, path, data, txid=None, mk_branch=None):
110 assert mk_branch is None
Zsolt Haraszti66862032016-11-28 14:28:39 -0800111 self.check_callback_queue()
112 try:
113 if txid is not None:
114 dirtied = self._dirty_nodes[txid]
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800115
Zsolt Haraszti66862032016-11-28 14:28:39 -0800116 def track_dirty(node):
117 dirtied.add(node)
118 return node._mk_txbranch(txid)
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800119
Zsolt Haraszti66862032016-11-28 14:28:39 -0800120 res = super(ConfigRoot, self).add(path, data, txid, track_dirty)
121 else:
122 res = super(ConfigRoot, self).add(path, data)
123 finally:
124 self.execute_deferred_callbacks()
125 return res
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800126
127 def remove(self, path, txid=None, mk_branch=None):
128 assert mk_branch is None
Zsolt Haraszti66862032016-11-28 14:28:39 -0800129 self.check_callback_queue()
130 try:
131 if txid is not None:
132 dirtied = self._dirty_nodes[txid]
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800133
Zsolt Haraszti66862032016-11-28 14:28:39 -0800134 def track_dirty(node):
135 dirtied.add(node)
136 return node._mk_txbranch(txid)
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800137
Zsolt Haraszti66862032016-11-28 14:28:39 -0800138 res = super(ConfigRoot, self).remove(path, txid, track_dirty)
139 else:
140 res = super(ConfigRoot, self).remove(path)
141 finally:
142 self.execute_deferred_callbacks()
143 return res
144
145 def check_callback_queue(self):
146 assert len(self._deferred_callback_queue) == 0
147
148 def enqueue_callback(self, func, *args, **kw):
149 self._deferred_callback_queue.append((func, args, kw))
150
Khen Nursimulu56b36472017-03-08 15:32:42 -0500151 def enqueue_notification_callback(self, func, *args, **kw):
152 """
153 A separate queue is required for notification. Previously, when the
154 notifications were added to the self._deferred_callback_queue there
155 was a deadlock condition where two callbacks were added (one
156 related to the model change and one for the notification related to
157 that model change). Since the model change requires the
158 self._deferred_callback_queue to be empty then there was a deadlock
159 in that scenario. The simple approach to avoid this problem is to
160 have separate queues for model and notification.
161 TODO: Investigate whether there is a need for the
162 self._deferred_callback_queue to handle multiple model events at the same time
163 :param func: callback function
164 :param args: args
165 :param kw: key-value args
166 :return: None
167 """
168 self._notification_deferred_callback_queue.append((func, args, kw))
169
Zsolt Haraszti66862032016-11-28 14:28:39 -0800170 def execute_deferred_callbacks(self):
Khen Nursimulu56b36472017-03-08 15:32:42 -0500171 # First process the model-triggered related callbacks
Zsolt Haraszti66862032016-11-28 14:28:39 -0800172 while self._deferred_callback_queue:
173 func, args, kw = self._deferred_callback_queue.pop(0)
174 func(*args, **kw)
Zsolt Harasztidafefe12016-11-14 21:29:58 -0800175
Khen Nursimulu56b36472017-03-08 15:32:42 -0500176 # Execute the notification callbacks
177 while self._notification_deferred_callback_queue:
178 func, args, kw = self._notification_deferred_callback_queue.pop(0)
179 func(*args, **kw)
180
181
Zsolt Harasztib7067842016-11-22 18:11:53 -0800182 # ~~~~~~~~~~~~~~~~ Persistence related ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
183
184 @classmethod
185 def load(cls, root_msg_cls, kv_store):
186 # need to use fake kv store during initial load for not to override
187 # our real k vstore
188 fake_kv_store = dict() # shall use more efficient mock dict
189 root = cls(root_msg_cls(), kv_store=fake_kv_store,
190 rev_cls=PersistedConfigRevision)
191 # we can install the real store now
192 root._kv_store = kv_store
193 root.load_from_persistence(root_msg_cls)
194 return root
195
196 def _make_latest(self, branch, *args, **kw):
197 super(ConfigRoot, self)._make_latest(branch, *args, **kw)
198 # only persist the committed branch
199 if self._kv_store is not None and branch._txid is None:
200 root_data = dict(
201 latest=branch._latest._hash,
202 tags=dict((k, v._hash) for k, v in self._tags.iteritems())
203 )
204 blob = dumps(root_data)
205 self._kv_store['root'] = blob
206
Ryan Van Gilderbf974d02017-02-24 15:01:22 -0800207 def persist_tags(self):
208 if self._kv_store is not None:
209 root_data = loads(self.kv_store['root'])
210 root_data = dict(
211 latest=root_data['latest'],
212 tags=dict((k, v._hash) for k, v in self._tags.iteritems())
213 )
214 blob = dumps(root_data)
215 self._kv_store['root'] = blob
216
Zsolt Harasztib7067842016-11-22 18:11:53 -0800217 def load_from_persistence(self, root_msg_cls):
218 self._loading = True
219 blob = self._kv_store['root']
220 root_data = loads(blob)
221
222 for tag, hash in root_data['tags'].iteritems():
Ryan Van Gilderbf974d02017-02-24 15:01:22 -0800223 self.load_latest(hash)
224 self._tags[tag] = self.latest
Zsolt Harasztib7067842016-11-22 18:11:53 -0800225
226 self.load_latest(root_data['latest'])
227
228 self._loading = False
229