blob: 36826948e69bb25ca0e160e407357997583dc60d [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
22 "encoding/json"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040023 "github.com/golang/protobuf/proto"
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040024 "github.com/opencord/voltha-go/common/log"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040025 "io/ioutil"
26 "reflect"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050027 "runtime/debug"
28 "sync"
Stephane Barbarieec0919b2018-09-05 14:14:29 -040029 "time"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040030)
31
Stephane Barbariedc5022d2018-11-19 15:21:44 -050032// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040033type PersistedRevision struct {
Stephane Barbariedc5022d2018-11-19 15:21:44 -050034 mutex sync.RWMutex
Stephane Barbarieec0919b2018-09-05 14:14:29 -040035 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040036 Compress bool
37 kvStore *Backend
38}
39
Stephane Barbariedc5022d2018-11-19 15:21:44 -050040// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040041func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040042 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050043 pr.kvStore = branch.Node.GetRoot().KvStore
44 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarieec0919b2018-09-05 14:14:29 -040045 pr.Finalize()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040046 return pr
47}
48
Stephane Barbariedc5022d2018-11-19 15:21:44 -050049// Finalize is responsible of saving the revision in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -040050func (pr *PersistedRevision) Finalize() {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040051 pr.store()
52}
53
54type revData struct {
55 Children map[string][]string
56 Config string
57}
58
59func (pr *PersistedRevision) store() {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040060 if pr.GetBranch().Txid != "" {
61 return
62 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -040063 if ok, _ := pr.kvStore.Get(pr.Revision.GetHash()); ok != nil {
Stephane Barbariedc5022d2018-11-19 15:21:44 -050064 log.Debugf("Entry already exists - hash:%s, stack: %s", pr.Revision.GetHash(), string(debug.Stack()))
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040065 return
66 }
67
68 pr.storeConfig()
69
70 childrenHashes := make(map[string][]string)
Stephane Barbarieec0919b2018-09-05 14:14:29 -040071 for fieldName, children := range pr.GetChildren() {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040072 hashes := []string{}
73 for _, rev := range children {
Stephane Barbarie126101e2018-10-11 16:18:48 -040074 if rev != nil {
75 hashes = append(hashes, rev.GetHash())
76 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040077 }
78 childrenHashes[fieldName] = hashes
79 }
80 data := &revData{
81 Children: childrenHashes,
Stephane Barbarieec0919b2018-09-05 14:14:29 -040082 Config: pr.GetConfig().Hash,
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040083 }
84 if blob, err := json.Marshal(data); err != nil {
85 // TODO report error
86 } else {
87 if pr.Compress {
88 var b bytes.Buffer
89 w := gzip.NewWriter(&b)
90 w.Write(blob)
91 w.Close()
92 blob = b.Bytes()
93 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -050094 if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
95 log.Warnf("Problem storing revision - error: %s, hash: %s, data: %s", err.Error(), pr.GetHash(),
96 string(blob))
97 } else {
98 log.Debugf("Stored entry - hash:%s, blob: %s, stack: %s", pr.Revision.GetHash(), string(blob),
99 string(debug.Stack()))
100 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400101 }
102}
103
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500104// Load retrieves a revision from th persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400105func (pr *PersistedRevision) Load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400106 blob, _ := kvStore.Get(hash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400107
108 start := time.Now()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400109 output := blob.Value.([]byte)
110 var data revData
111 if pr.Compress {
112 b := bytes.NewBuffer(blob.Value.([]byte))
113 if r, err := gzip.NewReader(b); err != nil {
114 // TODO : report error
115 } else {
116 if output, err = ioutil.ReadAll(r); err != nil {
117 // TODO report error
118 }
119 }
120 }
121 if err := json.Unmarshal(output, &data); err != nil {
Stephane Barbarie8c48b5c2018-10-02 09:45:17 -0400122 log.Errorf("problem to unmarshal data - %s", err.Error())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400123 }
124
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400125 stop := time.Now()
126 GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400127 configHash := data.Config
128 configData := pr.loadConfig(kvStore, msgClass, configHash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400129
130 assembledChildren := make(map[string][]Revision)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400131
132 childrenHashes := data.Children
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400133 node := branch.Node
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400134 for fieldName, child := range ChildrenFields(msgClass) {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400135 var children []Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400136 for _, childHash := range childrenHashes[fieldName] {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400137 childNode := node.MakeNode(reflect.New(child.ClassType).Elem().Interface(), "")
Stephane Barbarie126101e2018-10-11 16:18:48 -0400138 childNode.LoadLatest(childHash)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400139 childRev := childNode.Latest()
140 children = append(children, childRev)
141 }
142 assembledChildren[fieldName] = children
143 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400144
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400145 rev := NewPersistedRevision(branch, configData, assembledChildren)
146 return rev
147}
148
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500149// storeConfig saves the data associated to a revision in the persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400150func (pr *PersistedRevision) storeConfig() {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400151 if ok, _ := pr.kvStore.Get(pr.GetConfig().Hash); ok != nil {
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500152 log.Debugf("Config already exists - hash:%s, stack: %s", pr.GetConfig().Hash, string(debug.Stack()))
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400153 return
154 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400155 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400156 // TODO report error
157 } else {
158 if pr.Compress {
159 var b bytes.Buffer
160 w := gzip.NewWriter(&b)
161 w.Write(blob)
162 w.Close()
163 blob = b.Bytes()
164 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500165
166 if err := pr.kvStore.Put(pr.GetConfig().Hash, blob); err != nil {
167 log.Warnf("Problem storing revision config - error: %s, hash: %s, data: %+v", err.Error(),
168 pr.GetConfig().Hash,
169 pr.GetConfig().Data)
170 } else {
171 log.Debugf("Stored config - hash:%s, blob: %+v, stack: %s", pr.GetConfig().Hash, pr.GetConfig().Data,
172 string(debug.Stack()))
173 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400174 }
175}
176
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500177// loadConfig restores the data associated to a revision from the persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400178func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
179 blob, _ := kvStore.Get(hash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400180 start := time.Now()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400181 output := blob.Value.([]byte)
182
183 if pr.Compress {
184 b := bytes.NewBuffer(blob.Value.([]byte))
185 if r, err := gzip.NewReader(b); err != nil {
186 // TODO : report error
187 } else {
188 if output, err = ioutil.ReadAll(r); err != nil {
189 // TODO report error
190 }
191 }
192 }
193
194 var data reflect.Value
195 if msgClass != nil {
196 data = reflect.New(reflect.TypeOf(msgClass).Elem())
197 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
198 // TODO report error
199 }
200 }
201
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400202 stop := time.Now()
203
204 GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400205 return data.Interface()
206}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400207
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500208// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400209func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
210 newNPR := pr.Revision.UpdateData(data, branch)
211
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500212 log.Debugf("Updating data %+v", data)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400213 newPR := &PersistedRevision{
214 Revision: newNPR,
215 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400216 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400217 }
218
219 newPR.Finalize()
220
221 return newPR
222}
223
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500224// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400225func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
226 newNPR := pr.Revision.UpdateChildren(name, children, branch)
227
228 newPR := &PersistedRevision{
229 Revision: newNPR,
230 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400231 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400232 }
233
234 newPR.Finalize()
235
236 return newPR
237}
238
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500239// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400240func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
241 newNPR := pr.Revision.UpdateAllChildren(children, branch)
242
243 newPR := &PersistedRevision{
244 Revision: newNPR,
245 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400246 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400247 }
248
249 newPR.Finalize()
250
251 return newPR
252}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400253
254// Drop takes care of eliminating a revision hash that is no longer needed
255// and its associated config when required
256func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500257 pr.mutex.Lock()
258 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400259 if pr.kvStore != nil && txid == "" {
260 if includeConfig {
261 log.Debugf("removing rev config - hash: %s", pr.GetConfig().Hash)
262 if err := pr.kvStore.Delete(pr.GetConfig().Hash); err != nil {
263 log.Errorf(
264 "failed to remove rev config - hash: %s, err: %s",
265 pr.GetConfig().Hash,
266 err.Error(),
267 )
268 }
269 }
270
271 log.Debugf("removing rev - hash: %s", pr.GetHash())
272 if err := pr.kvStore.Delete(pr.GetHash()); err != nil {
273 log.Errorf("failed to remove rev - hash: %s, err: %s", pr.GetHash(), err.Error())
274 }
275 } else {
276 if includeConfig {
277 log.Debugf("Attempted to remove revision config:%s linked to transaction:%s", pr.GetConfig().Hash, txid)
278 }
279 log.Debugf("Attempted to remove revision:%s linked to transaction:%s", pr.GetHash(), txid)
280 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500281
282 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400283}