blob: c2a6c64054e0ec842464ce9dbda2f6ce32b6c332 [file] [log] [blame]
Matt Jeanneretcab955f2019-04-10 15:45:57 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package model
18
19import (
20 "bytes"
21 "compress/gzip"
22 "encoding/hex"
23 "github.com/golang/protobuf/proto"
24 "github.com/google/uuid"
25 "github.com/opencord/voltha-go/common/log"
26 "github.com/opencord/voltha-go/db/kvstore"
27 "reflect"
28 "runtime/debug"
29 "strings"
30 "sync"
31)
32
33// PersistedRevision holds information of revision meant to be saved in a persistent storage
34type PersistedRevision struct {
35 Revision
36 Compress bool
37
38 events chan *kvstore.Event `json:"-"`
39 kvStore *Backend `json:"-"`
40 mutex sync.RWMutex `json:"-"`
41 isStored bool
42 isWatched bool
43}
44
45// NewPersistedRevision creates a new instance of a PersistentRevision structure
46func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
47 pr := &PersistedRevision{}
48 pr.kvStore = branch.Node.GetRoot().KvStore
49 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
50 return pr
51}
52
53// Finalize is responsible of saving the revision in the persistent storage
54func (pr *PersistedRevision) Finalize(skipOnExist bool) {
55 pr.store(skipOnExist)
56}
57
58type revData struct {
59 Children map[string][]string
60 Config string
61}
62
63func (pr *PersistedRevision) store(skipOnExist bool) {
64 if pr.GetBranch().Txid != "" {
65 return
66 }
67
68 if pair, _ := pr.kvStore.Get(pr.GetName()); pair != nil && skipOnExist {
69 log.Debugw("revision-config-already-exists", log.Fields{"hash": pr.GetHash(), "name": pr.GetName()})
70 return
71 }
72
73 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
74 // TODO report error
75 } else {
76 if pr.Compress {
77 var b bytes.Buffer
78 w := gzip.NewWriter(&b)
79 w.Write(blob)
80 w.Close()
81 blob = b.Bytes()
82 }
83
84 if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
85 log.Warnw("problem-storing-revision-config",
86 log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
87 } else {
88 log.Debugw("storing-revision-config", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
89 pr.isStored = true
90 }
91 }
92}
93
94func (pr *PersistedRevision) SetupWatch(key string) {
95 if pr.events == nil {
96 pr.events = make(chan *kvstore.Event)
97
98 log.Debugw("setting-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
99
100 pr.SetName(key)
101 pr.events = pr.kvStore.CreateWatch(key)
102
103 pr.isWatched = true
104
105 // Start watching
106 go pr.startWatching()
107 }
108}
109
110func (pr *PersistedRevision) updateInMemory(data interface{}) {
111 pr.mutex.Lock()
112 defer pr.mutex.Unlock()
113
114 var pac *proxyAccessControl
115 var pathLock string
116
117 //
118 // If a proxy exists for this revision, use it to lock access to the path
119 // and prevent simultaneous updates to the object in memory
120 //
121 if pr.GetNode().GetProxy() != nil {
122 pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
123
124 // If the proxy already has a request in progress, then there is no need to process the watch
125 log.Debugw("update-in-memory--checking-pathlock", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
126 if PAC().IsReserved(pathLock) {
127 switch pr.GetNode().GetRoot().GetProxy().Operation {
128 case PROXY_ADD:
129 fallthrough
130 case PROXY_REMOVE:
131 fallthrough
132 case PROXY_UPDATE:
133 log.Debugw("update-in-memory--skipping", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
134 return
135 default:
136 log.Debugw("update-in-memory--operation", log.Fields{"operation": pr.GetNode().GetRoot().GetProxy().Operation})
137 }
138 } else {
139 log.Debugw("update-in-memory--path-not-locked", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
140 }
141
142 log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
143
144 pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
145 pac.SetProxy(pr.GetNode().GetProxy())
146 pac.lock()
147
148 defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
149 defer pac.unlock()
150 defer PAC().ReleasePath(pathLock)
151 }
152
153 //
154 // Update the object in memory through a transaction
155 // This will allow for the object to be subsequently merged with any changes
156 // that might have occurred in memory
157 //
158
159 log.Debugw("update-in-memory--custom-transaction", log.Fields{"key": pr.GetHash()})
160
161 // Prepare the transaction
162 branch := pr.GetBranch()
163 latest := branch.GetLatest()
164 txidBin, _ := uuid.New().MarshalBinary()
165 txid := hex.EncodeToString(txidBin)[:12]
166
167 makeBranch := func(node *node) *Branch {
168 return node.MakeBranch(txid)
169 }
170
171 // Apply the update in a transaction branch
172 updatedRev := latest.GetNode().Update("", data, false, txid, makeBranch)
173 updatedRev.SetName(latest.GetName())
174
175 // Merge the transaction branch in memory
176 if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
177 branch.SetLatest(mergedRev)
178 }
179}
180
181func (pr *PersistedRevision) startWatching() {
182 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
183
184StopWatchLoop:
185 for {
186 select {
187 case event, ok := <-pr.events:
188 if !ok {
189 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
190 break StopWatchLoop
191 }
192
193 log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.GetName()})
194
195 switch event.EventType {
196 case kvstore.DELETE:
197 log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
198 pr.Revision.Drop("", true)
199 break StopWatchLoop
200
201 case kvstore.PUT:
202 log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
203
204 if dataPair, err := pr.kvStore.Get(pr.GetName()); err != nil || dataPair == nil {
205 log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
206 } else {
207 data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
208
209 if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
210 log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
211 } else {
212 pr.updateInMemory(data.Interface())
213 }
214 }
215
216 default:
217 log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "type": event.EventType})
218 }
219 }
220 }
221
222 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
223}
224
225func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
226 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
227
228 var response []Revision
229 var rev Revision
230
231 rev = pr
232
233 if pr.kvStore != nil && path != "" {
234 blobMap, _ := pr.kvStore.List(path)
235
236 partition := strings.SplitN(path, "/", 2)
237 name := partition[0]
238
239 if len(partition) < 2 {
240 path = ""
241 } else {
242 path = partition[1]
243 }
244
245 field := ChildrenFields(rev.GetBranch().Node.Type)[name]
246
247 if field != nil && field.IsContainer {
248 var children []Revision
249 children = make([]Revision, len(rev.GetChildren(name)))
250 copy(children, rev.GetChildren(name))
251 existChildMap := make(map[string]int)
252 for i, child := range rev.GetChildren(name) {
253 existChildMap[child.GetHash()] = i
254 }
255
256 for _, blob := range blobMap {
257 output := blob.Value.([]byte)
258
259 data := reflect.New(field.ClassType.Elem())
260
261 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
262 log.Errorw(
263 "loading-from-persistence--failed-to-unmarshal",
264 log.Fields{"path": path, "txid": txid, "error": err},
265 )
266 } else if field.Key != "" {
267 var key reflect.Value
268 var keyValue interface{}
269 var keyStr string
270
271 if path == "" {
272 // e.g. /logical_devices --> path="" name=logical_devices key=""
273 _, key = GetAttributeValue(data.Interface(), field.Key, 0)
274 keyStr = key.String()
275
276 } else {
277 // e.g.
278 // /logical_devices/abcde --> path="abcde" name=logical_devices
279 // /logical_devices/abcde/image_downloads --> path="abcde/image_downloads" name=logical_devices
280
281 partition := strings.SplitN(path, "/", 2)
282 key := partition[0]
283 if len(partition) < 2 {
284 path = ""
285 } else {
286 path = partition[1]
287 }
288 keyValue = field.KeyFromStr(key)
289 keyStr = keyValue.(string)
290
291 if idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue); childRev != nil {
292 // Key is memory, continue recursing path
293 if newChildRev := childRev.LoadFromPersistence(path, txid); newChildRev != nil {
294 children[idx] = newChildRev[0]
295
296 rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
297 rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
298
299 response = append(response, newChildRev[0])
300 continue
301 }
302 }
303 }
304
305 childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
306 childRev.SetName(name + "/" + keyStr)
307
308 // Do not process a child that is already in memory
309 if _, childExists := existChildMap[childRev.GetHash()]; !childExists {
310 // Create watch for <component>/<key>
311 childRev.SetupWatch(childRev.GetName())
312
313 children = append(children, childRev)
314 rev = rev.UpdateChildren(name, children, rev.GetBranch())
315
316 rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
317 }
318 response = append(response, childRev)
319 continue
320 }
321 }
322 }
323 }
324
325 return response
326}
327
328// UpdateData modifies the information in the data model and saves it in the persistent storage
329func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
330 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
331
332 newNPR := pr.Revision.UpdateData(data, branch)
333
334 newPR := &PersistedRevision{
335 Revision: newNPR,
336 Compress: pr.Compress,
337 kvStore: pr.kvStore,
338 }
339
340 return newPR
341}
342
343// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
344func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
345 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
346
347 newNPR := pr.Revision.UpdateChildren(name, children, branch)
348
349 newPR := &PersistedRevision{
350 Revision: newNPR,
351 Compress: pr.Compress,
352 kvStore: pr.kvStore,
353 }
354
355 return newPR
356}
357
358// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
359func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
360 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
361
362 newNPR := pr.Revision.UpdateAllChildren(children, branch)
363
364 newPR := &PersistedRevision{
365 Revision: newNPR,
366 Compress: pr.Compress,
367 kvStore: pr.kvStore,
368 }
369
370 return newPR
371}
372
373// Drop takes care of eliminating a revision hash that is no longer needed
374// and its associated config when required
375func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
376 pr.Revision.Drop(txid, includeConfig)
377}
378
379// Drop takes care of eliminating a revision hash that is no longer needed
380// and its associated config when required
381func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
382 log.Debugw("dropping-revision",
383 log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "stack": string(debug.Stack())})
384
385 pr.mutex.Lock()
386 defer pr.mutex.Unlock()
387 if pr.kvStore != nil && txid == "" {
388 if pr.isStored {
389 if pr.isWatched {
390 pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
391 pr.isWatched = false
392 }
393
394 if err := pr.kvStore.Delete(pr.GetName()); err != nil {
395 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
396 } else {
397 pr.isStored = false
398 }
399 }
400
401 } else {
402 if includeConfig {
403 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
404 }
405 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
406 }
407
408 pr.Revision.Drop(txid, includeConfig)
409}