blob: 0ecb5efccd1e6baa97597415d3a95911cc1a1c18 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
Stephane Barbarie260a5632019-02-26 16:12:49 -050022 "encoding/hex"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040023 "github.com/golang/protobuf/proto"
Stephane Barbarie260a5632019-02-26 16:12:49 -050024 "github.com/google/uuid"
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040025 "github.com/opencord/voltha-go/common/log"
Stephane Barbariee0a4c792019-01-16 11:26:29 -050026 "github.com/opencord/voltha-go/db/kvstore"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040027 "reflect"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050028 "runtime/debug"
Stephane Barbarie1ab43272018-12-08 21:42:13 -050029 "strings"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050030 "sync"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040031)
32
Stephane Barbariedc5022d2018-11-19 15:21:44 -050033// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040034type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040035 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040036 Compress bool
Stephane Barbariee0a4c792019-01-16 11:26:29 -050037
Stephane Barbarie3cb01222019-01-16 17:15:56 -050038 events chan *kvstore.Event `json:"-"`
39 kvStore *Backend `json:"-"`
40 mutex sync.RWMutex `json:"-"`
41 isStored bool
42 isWatched bool
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040043}
44
Stephane Barbariedc5022d2018-11-19 15:21:44 -050045// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040046func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040047 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050048 pr.kvStore = branch.Node.GetRoot().KvStore
49 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040050 return pr
51}
52
Stephane Barbariedc5022d2018-11-19 15:21:44 -050053// Finalize is responsible of saving the revision in the persistent storage
Stephane Barbarie1ab43272018-12-08 21:42:13 -050054func (pr *PersistedRevision) Finalize(skipOnExist bool) {
55 pr.store(skipOnExist)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040056}
57
58type revData struct {
59 Children map[string][]string
60 Config string
61}
62
Stephane Barbarie1ab43272018-12-08 21:42:13 -050063func (pr *PersistedRevision) store(skipOnExist bool) {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040064 if pr.GetBranch().Txid != "" {
65 return
66 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040067
Stephane Barbarie1ab43272018-12-08 21:42:13 -050068 if pair, _ := pr.kvStore.Get(pr.GetHash()); pair != nil && skipOnExist {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050069 log.Debugw("revision-config-already-exists", log.Fields{"hash": pr.GetHash()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040070 return
71 }
Stephane Barbarie1ab43272018-12-08 21:42:13 -050072
Stephane Barbarieec0919b2018-09-05 14:14:29 -040073 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040074 // TODO report error
75 } else {
76 if pr.Compress {
77 var b bytes.Buffer
78 w := gzip.NewWriter(&b)
79 w.Write(blob)
80 w.Close()
81 blob = b.Bytes()
82 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -050083
Stephane Barbarie1ab43272018-12-08 21:42:13 -050084 if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050085 log.Warnw("problem-storing-revision-config",
86 log.Fields{"error": err, "hash": pr.GetHash(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -050087 } else {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050088 log.Debugw("storing-revision-config",
89 log.Fields{"hash": pr.GetHash(), "data": pr.GetConfig().Data})
Stephane Barbarie3cb01222019-01-16 17:15:56 -050090 pr.isStored = true
Stephane Barbariedc5022d2018-11-19 15:21:44 -050091 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040092 }
93}
94
Stephane Barbariee0a4c792019-01-16 11:26:29 -050095func (pr *PersistedRevision) SetupWatch(key string) {
96 if pr.events == nil {
97 pr.events = make(chan *kvstore.Event)
98
99 log.Debugw("setting-watch", log.Fields{"key": key})
100
101 pr.events = pr.kvStore.CreateWatch(key)
102
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500103 pr.isWatched = true
104
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500105 // Start watching
106 go pr.startWatching()
107 }
108}
109
Stephane Barbarie260a5632019-02-26 16:12:49 -0500110func (pr *PersistedRevision) updateInMemory(data interface{}) {
111 var pac *proxyAccessControl
112 var pathLock string
113
114 //
115 // If a proxy exists for this revision, use it to lock access to the path
116 // and prevent simultaneous updates to the object in memory
117 //
118 if pr.GetNode().GetProxy() != nil {
119 log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
120 pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
121 pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
122 pac.SetProxy(pr.GetNode().GetProxy())
123 pac.lock()
124
125 defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
126 defer pac.unlock()
127 defer PAC().ReleasePath(pathLock)
128 }
129
130 //
131 // Update the object in memory through a transaction
132 // This will allow for the object to be subsequently merged with any changes
133 // that might have occurred in memory
134 //
135
136 log.Debugw("update-in-memory--custom-transaction", log.Fields{"key": pr.GetHash()})
137
138 // Prepare the transaction
139 branch := pr.GetBranch()
140 latest := branch.GetLatest()
141 txidBin, _ := uuid.New().MarshalBinary()
142 txid := hex.EncodeToString(txidBin)[:12]
143
144 makeBranch := func(node *node) *Branch {
145 return node.MakeBranch(txid)
146 }
147
148 // Apply the update in a transaction branch
149 updatedRev := latest.GetNode().Update("", data, false, txid, makeBranch)
150 updatedRev.SetHash(latest.GetHash())
151
152 // Merge the transaction branch in memory
153 if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
154 branch.SetLatest(mergedRev)
155 }
156
157 // The transaction branch must be deleted to free-up memory
158 //latest.GetNode().GetRoot().DeleteTxBranch(txid)
159}
160
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500161func (pr *PersistedRevision) startWatching() {
162 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
163
164StopWatchLoop:
165 for {
166 select {
167 case event, ok := <-pr.events:
168 if !ok {
169 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash()})
170 break StopWatchLoop
171 }
172
173 log.Debugw("received-event", log.Fields{"type": event.EventType})
174
175 switch event.EventType {
176 case kvstore.DELETE:
177 log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash()})
178 pr.Revision.Drop("", true)
179 break StopWatchLoop
180
181 case kvstore.PUT:
182 log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash()})
183
184 if dataPair, err := pr.kvStore.Get(pr.GetHash()); err != nil || dataPair == nil {
185 log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "error": err})
186 } else {
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500187 data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
188
189 if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
190 log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
191 } else {
Stephane Barbarie260a5632019-02-26 16:12:49 -0500192 pr.updateInMemory(data.Interface())
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500193 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500194 }
195
196 default:
197 log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "type": event.EventType})
198 }
199 }
200 }
201
202 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash()})
203}
204
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500205func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500206 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
207
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500208 var response []Revision
209 var rev Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400210
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500211 rev = pr
212
Stephane Barbarie260a5632019-02-26 16:12:49 -0500213 if pr.kvStore != nil && path != "" {
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500214 blobMap, _ := pr.kvStore.List(path)
215
216 partition := strings.SplitN(path, "/", 2)
217 name := partition[0]
218
219 if len(partition) < 2 {
220 path = ""
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400221 } else {
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500222 path = partition[1]
223 }
224
225 field := ChildrenFields(rev.GetBranch().Node.Type)[name]
226
Stephane Barbarie260a5632019-02-26 16:12:49 -0500227 if field != nil && field.IsContainer {
Stephane Barbarieaa467942019-02-06 14:09:44 -0500228 var children []Revision
229 children = make([]Revision, len(rev.GetChildren(name)))
230 copy(children, rev.GetChildren(name))
231 existChildMap := make(map[string]int)
232 for i, child := range rev.GetChildren(name) {
233 existChildMap[child.GetHash()] = i
234 }
235
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500236 for _, blob := range blobMap {
237 output := blob.Value.([]byte)
238
239 data := reflect.New(field.ClassType.Elem())
240
241 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
Stephane Barbarieaa467942019-02-06 14:09:44 -0500242 log.Errorw(
243 "loading-from-persistence--failed-to-unmarshal",
244 log.Fields{"path": path, "txid": txid, "error": err},
245 )
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500246 } else if field.Key != "" {
247 var key reflect.Value
248 var keyValue interface{}
249 var keyStr string
250
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500251 if path == "" {
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500252 // e.g. /logical_devices --> path="" name=logical_devices key=""
253 _, key = GetAttributeValue(data.Interface(), field.Key, 0)
254 keyStr = key.String()
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500255
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500256 } else {
257 // e.g.
258 // /logical_devices/abcde --> path="abcde" name=logical_devices
259 // /logical_devices/abcde/image_downloads --> path="abcde/image_downloads" name=logical_devices
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500260
261 partition := strings.SplitN(path, "/", 2)
262 key := partition[0]
263 if len(partition) < 2 {
264 path = ""
265 } else {
266 path = partition[1]
267 }
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500268 keyValue = field.KeyFromStr(key)
269 keyStr = keyValue.(string)
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500270
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500271 if idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue); childRev != nil {
272 // Key is memory, continue recursing path
Stephane Barbarie260a5632019-02-26 16:12:49 -0500273 if newChildRev := childRev.LoadFromPersistence(path, txid); newChildRev != nil {
274 children[idx] = newChildRev[0]
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500275
Stephane Barbarie260a5632019-02-26 16:12:49 -0500276 rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
277 rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500278
Stephane Barbarie260a5632019-02-26 16:12:49 -0500279 response = append(response, newChildRev[0])
280 continue
281 }
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500282 }
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500283 }
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500284
285 childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
286 childRev.SetHash(name + "/" + keyStr)
287
288 // Do not process a child that is already in memory
289 if _, childExists := existChildMap[childRev.GetHash()]; !childExists {
290 // Create watch for <component>/<key>
291 childRev.SetupWatch(childRev.GetHash())
292
293 children = append(children, childRev)
294 rev = rev.UpdateChildren(name, children, rev.GetBranch())
295
296 rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
297 }
298 response = append(response, childRev)
299 continue
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500300 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400301 }
302 }
303 }
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500304
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500305 return response
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400306}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400307
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500308// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400309func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500310 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
311
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400312 newNPR := pr.Revision.UpdateData(data, branch)
313
314 newPR := &PersistedRevision{
315 Revision: newNPR,
316 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400317 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400318 }
319
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400320 return newPR
321}
322
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500323// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400324func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500325 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
326
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400327 newNPR := pr.Revision.UpdateChildren(name, children, branch)
328
329 newPR := &PersistedRevision{
330 Revision: newNPR,
331 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400332 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400333 }
334
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400335 return newPR
336}
337
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500338// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400339func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500340 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
341
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400342 newNPR := pr.Revision.UpdateAllChildren(children, branch)
343
344 newPR := &PersistedRevision{
345 Revision: newNPR,
346 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400347 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400348 }
349
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400350 return newPR
351}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400352
353// Drop takes care of eliminating a revision hash that is no longer needed
354// and its associated config when required
355func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500356 log.Debugw("dropping-revision",
357 log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "stack": string(debug.Stack())})
358
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500359 pr.mutex.Lock()
360 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400361 if pr.kvStore != nil && txid == "" {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500362 if pr.isStored {
363 if includeConfig {
364 if err := pr.kvStore.Delete(pr.GetConfig().Hash); err != nil {
365 log.Errorw("failed-to-remove-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "error": err.Error()})
366 }
367 }
368
369 if err := pr.kvStore.Delete(pr.GetHash()); err != nil {
370 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
371 } else {
372 pr.isStored = false
373 }
374
375 if pr.isWatched {
376 pr.kvStore.DeleteWatch(pr.GetHash(), pr.events)
377 pr.isWatched = false
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400378 }
379 }
380
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400381 } else {
382 if includeConfig {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500383 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400384 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500385 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400386 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500387
388 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400389}