blob: a56b776a34928393c6580510ca8d5a44fd6da7d2 [file] [log] [blame]
Matt Jeanneretcab955f2019-04-10 15:45:57 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package model
18
19import (
20 "bytes"
21 "compress/gzip"
Matt Jeanneretcab955f2019-04-10 15:45:57 -040022 "github.com/golang/protobuf/proto"
Matt Jeanneretcab955f2019-04-10 15:45:57 -040023 "github.com/opencord/voltha-go/common/log"
24 "github.com/opencord/voltha-go/db/kvstore"
25 "reflect"
26 "runtime/debug"
27 "strings"
28 "sync"
29)
30
31// PersistedRevision holds information of revision meant to be saved in a persistent storage
32type PersistedRevision struct {
33 Revision
34 Compress bool
35
Matt Jeanneret384d8c92019-05-06 14:27:31 -040036 events chan *kvstore.Event
37 kvStore *Backend
38 mutex sync.RWMutex
Matt Jeanneretcab955f2019-04-10 15:45:57 -040039 isStored bool
40 isWatched bool
41}
42
Matt Jeanneret384d8c92019-05-06 14:27:31 -040043type watchCache struct {
44 Cache sync.Map
45}
46
47var watchCacheInstance *watchCache
48var watchCacheOne sync.Once
49
50func Watches() *watchCache {
51 watchCacheOne.Do(func() {
52 watchCacheInstance = &watchCache{Cache: sync.Map{}}
53 })
54 return watchCacheInstance
55}
56
Matt Jeanneretcab955f2019-04-10 15:45:57 -040057// NewPersistedRevision creates a new instance of a PersistentRevision structure
58func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
59 pr := &PersistedRevision{}
60 pr.kvStore = branch.Node.GetRoot().KvStore
61 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
62 return pr
63}
64
65// Finalize is responsible of saving the revision in the persistent storage
66func (pr *PersistedRevision) Finalize(skipOnExist bool) {
67 pr.store(skipOnExist)
68}
69
Matt Jeanneretcab955f2019-04-10 15:45:57 -040070func (pr *PersistedRevision) store(skipOnExist bool) {
71 if pr.GetBranch().Txid != "" {
72 return
73 }
74
75 if pair, _ := pr.kvStore.Get(pr.GetName()); pair != nil && skipOnExist {
76 log.Debugw("revision-config-already-exists", log.Fields{"hash": pr.GetHash(), "name": pr.GetName()})
77 return
78 }
79
80 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
81 // TODO report error
82 } else {
83 if pr.Compress {
84 var b bytes.Buffer
85 w := gzip.NewWriter(&b)
86 w.Write(blob)
87 w.Close()
88 blob = b.Bytes()
89 }
90
91 if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
92 log.Warnw("problem-storing-revision-config",
93 log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
94 } else {
95 log.Debugw("storing-revision-config", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
96 pr.isStored = true
97 }
98 }
99}
100
101func (pr *PersistedRevision) SetupWatch(key string) {
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400102 if key == "" {
103 log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
104 return
105 }
106
107 if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
108 return
109 }
110
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400111 if pr.events == nil {
112 pr.events = make(chan *kvstore.Event)
113
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400114 log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400115
116 pr.SetName(key)
117 pr.events = pr.kvStore.CreateWatch(key)
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400118 }
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400119
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400120 if !pr.isWatched {
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400121 pr.isWatched = true
122
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400123 log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
124
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400125 // Start watching
126 go pr.startWatching()
127 }
128}
129
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400130func (pr *PersistedRevision) startWatching() {
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400131 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "stack": string(debug.Stack())})
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400132
133StopWatchLoop:
134 for {
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400135 if pr.IsDiscarded() {
136 break StopWatchLoop
137 }
138
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400139 select {
140 case event, ok := <-pr.events:
141 if !ok {
142 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
143 break StopWatchLoop
144 }
145
146 log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": pr.GetName()})
147
148 switch event.EventType {
149 case kvstore.DELETE:
150 log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
151 pr.Revision.Drop("", true)
152 break StopWatchLoop
153
154 case kvstore.PUT:
155 log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
156
157 if dataPair, err := pr.kvStore.Get(pr.GetName()); err != nil || dataPair == nil {
158 log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
159 } else {
160 data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
161
162 if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
163 log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
164 } else {
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400165 if pr.GetNode().GetProxy() != nil {
166 pr.LoadFromPersistence(pr.GetNode().GetProxy().getFullPath(), "")
167 }
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400168 }
169 }
170
171 default:
172 log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "type": event.EventType})
173 }
174 }
175 }
176
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400177 Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400178
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400179 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "stack": string(debug.Stack())})
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400180}
181
182// UpdateData modifies the information in the data model and saves it in the persistent storage
183func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
184 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
185
186 newNPR := pr.Revision.UpdateData(data, branch)
187
188 newPR := &PersistedRevision{
189 Revision: newNPR,
190 Compress: pr.Compress,
191 kvStore: pr.kvStore,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400192 events: pr.events,
193 }
194
195 if newPR.GetHash() != pr.GetHash() {
196 newPR.isWatched = false
197 newPR.isStored = false
198 pr.Drop(branch.Txid, false)
199 newPR.SetupWatch(newPR.GetName())
200 } else {
201 newPR.isWatched = true
202 newPR.isStored = true
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400203 }
204
205 return newPR
206}
207
208// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
209func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
210 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
211
212 newNPR := pr.Revision.UpdateChildren(name, children, branch)
213
214 newPR := &PersistedRevision{
215 Revision: newNPR,
216 Compress: pr.Compress,
217 kvStore: pr.kvStore,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400218 events: pr.events,
219 }
220
221 if newPR.GetHash() != pr.GetHash() {
222 newPR.isWatched = false
223 newPR.isStored = false
224 pr.Drop(branch.Txid, false)
225 newPR.SetupWatch(newPR.GetName())
226 } else {
227 newPR.isWatched = true
228 newPR.isStored = true
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400229 }
230
231 return newPR
232}
233
234// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
235func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
236 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
237
238 newNPR := pr.Revision.UpdateAllChildren(children, branch)
239
240 newPR := &PersistedRevision{
241 Revision: newNPR,
242 Compress: pr.Compress,
243 kvStore: pr.kvStore,
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400244 events: pr.events,
245 }
246
247 if newPR.GetHash() != pr.GetHash() {
248 newPR.isWatched = false
249 newPR.isStored = false
250 pr.Drop(branch.Txid, false)
251 newPR.SetupWatch(newPR.GetName())
252 } else {
253 newPR.isWatched = true
254 newPR.isStored = true
Matt Jeanneretcab955f2019-04-10 15:45:57 -0400255 }
256
257 return newPR
258}
259
260// Drop takes care of eliminating a revision hash that is no longer needed
261// and its associated config when required
262func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
263 pr.Revision.Drop(txid, includeConfig)
264}
265
266// Drop takes care of eliminating a revision hash that is no longer needed
267// and its associated config when required
268func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
269 log.Debugw("dropping-revision",
270 log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "stack": string(debug.Stack())})
271
272 pr.mutex.Lock()
273 defer pr.mutex.Unlock()
274 if pr.kvStore != nil && txid == "" {
275 if pr.isStored {
276 if pr.isWatched {
277 pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
278 pr.isWatched = false
279 }
280
281 if err := pr.kvStore.Delete(pr.GetName()); err != nil {
282 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
283 } else {
284 pr.isStored = false
285 }
286 }
287
288 } else {
289 if includeConfig {
290 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
291 }
292 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
293 }
294
295 pr.Revision.Drop(txid, includeConfig)
296}
Matt Jeanneret384d8c92019-05-06 14:27:31 -0400297
298// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
299func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
300 rev := pr
301
302 children := make([]Revision, len(rev.GetBranch().GetLatest().GetChildren(typeName)))
303 copy(children, rev.GetBranch().GetLatest().GetChildren(typeName))
304
305 // Verify if the revision contains a child that matches that key
306 if childIdx, childRev := rev.GetNode().findRevByKey(rev.GetBranch().GetLatest().GetChildren(typeName), keyName, keyValue); childRev != nil {
307 // A child matching the provided key exists in memory
308 // Verify if the data differs to what was retrieved from persistence
309 if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
310 log.Debugw("verify-persisted-entry--data-is-different", log.Fields{
311 "key": childRev.GetHash(),
312 "name": childRev.GetName(),
313 })
314
315 // Data has changed; replace the child entry and update the parent revision
316 updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
317 updatedChildRev.SetupWatch(updatedChildRev.GetName())
318 childRev.Drop(txid, false)
319
320 if childIdx >= 0 {
321 children[childIdx] = updatedChildRev
322 } else {
323 children = append(children, updatedChildRev)
324 }
325
326 rev.GetBranch().LatestLock.Lock()
327 updatedRev := rev.UpdateChildren(typeName, children, rev.GetBranch())
328 rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
329 rev.GetBranch().LatestLock.Unlock()
330
331 // Drop the previous child revision
332 rev.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
333
334 if updatedChildRev != nil {
335 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
336 "key": updatedChildRev.GetHash(),
337 "name": updatedChildRev.GetName(),
338 })
339 response = updatedChildRev
340 }
341 } else {
342 // Data is the same. Continue to the next entry
343 log.Debugw("verify-persisted-entry--same-data", log.Fields{
344 "key": childRev.GetHash(),
345 "name": childRev.GetName(),
346 })
347 if childRev != nil {
348 log.Debugw("verify-persisted-entry--keeping-child", log.Fields{
349 "key": childRev.GetHash(),
350 "name": childRev.GetName(),
351 })
352 response = childRev
353 }
354 }
355 } else {
356 // There is no available child with that key value.
357 // Create a new child and update the parent revision.
358 log.Debugw("verify-persisted-entry--no-such-entry", log.Fields{
359 "key": keyValue,
360 "name": typeName,
361 })
362
363 // Construct a new child node with the retrieved persistence data
364 childRev = rev.GetBranch().Node.MakeNode(data, txid).Latest(txid)
365
366 // We need to start watching this entry for future changes
367 childRev.SetName(typeName + "/" + keyValue)
368
369 // Add the child to the parent revision
370 rev.GetBranch().LatestLock.Lock()
371 children = append(children, childRev)
372 updatedRev := rev.GetBranch().Node.Latest().UpdateChildren(typeName, children, rev.GetBranch())
373 childRev.SetupWatch(childRev.GetName())
374
375 //rev.GetBranch().Node.Latest().Drop(txid, false)
376 rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
377 rev.GetBranch().LatestLock.Unlock()
378
379 // Child entry is valid and can be included in the response object
380 if childRev != nil {
381 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
382 "key": childRev.GetHash(),
383 "name": childRev.GetName(),
384 })
385 response = childRev
386 }
387 }
388
389 return response
390}
391
392// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
393// by adding missing entries, updating changed entries and ignoring unchanged ones
394func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
395 pr.mutex.Lock()
396 defer pr.mutex.Unlock()
397
398 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
399
400 var response []Revision
401 var rev Revision
402
403 rev = pr
404
405 if pr.kvStore != nil && path != "" {
406 blobMap, _ := pr.kvStore.List(path)
407
408 partition := strings.SplitN(path, "/", 2)
409 name := partition[0]
410
411 if len(partition) < 2 {
412 path = ""
413 } else {
414 path = partition[1]
415 }
416
417 field := ChildrenFields(rev.GetBranch().Node.Type)[name]
418
419 if field != nil && field.IsContainer {
420 log.Debugw("load-from-persistence--start-blobs", log.Fields{
421 "path": path,
422 "name": name,
423 "size": len(blobMap),
424 })
425
426 for _, blob := range blobMap {
427 output := blob.Value.([]byte)
428
429 data := reflect.New(field.ClassType.Elem())
430
431 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
432 log.Errorw("load-from-persistence--failed-to-unmarshal", log.Fields{
433 "path": path,
434 "txid": txid,
435 "error": err,
436 })
437 } else if path == "" {
438 if field.Key != "" {
439 // Retrieve the key identifier value from the data structure
440 // based on the field's key attribute
441 _, key := GetAttributeValue(data.Interface(), field.Key, 0)
442
443 if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
444 response = append(response, entry)
445 }
446 }
447
448 } else if field.Key != "" {
449 // The request is for a specific entry/id
450 partition := strings.SplitN(path, "/", 2)
451 key := partition[0]
452 if len(partition) < 2 {
453 path = ""
454 } else {
455 path = partition[1]
456 }
457 keyValue := field.KeyFromStr(key)
458
459 if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
460 response = append(response, entry)
461 }
462 }
463 }
464
465 log.Debugw("load-from-persistence--end-blobs", log.Fields{"path": path, "name": name})
466 } else {
467 log.Debugw("load-from-persistence--cannot-process-field", log.Fields{
468 "type": rev.GetBranch().Node.Type,
469 "name": name,
470 })
471 }
472 }
473
474 return response
475}