blob: 05c409015b987b9664d9344496f145b432f3ff5e [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040016package model
17
18import (
19 "bytes"
20 "compress/gzip"
21 "encoding/json"
22 "fmt"
23 "github.com/golang/protobuf/proto"
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040024 "github.com/opencord/voltha-go/common/log"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040025 "io/ioutil"
26 "reflect"
Stephane Barbarieec0919b2018-09-05 14:14:29 -040027 "time"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040028)
29
30type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040031 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040032 Compress bool
33 kvStore *Backend
34}
35
Stephane Barbarieec0919b2018-09-05 14:14:29 -040036func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040037 pr := &PersistedRevision{}
Stephane Barbarieec0919b2018-09-05 14:14:29 -040038 pr.kvStore = branch.Node.root.KvStore
39 pr.Revision = NewNonPersistedRevision(branch, data, children)
40 pr.Finalize()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040041 return pr
42}
43
Stephane Barbarieec0919b2018-09-05 14:14:29 -040044func (pr *PersistedRevision) Finalize() {
45 //pr.Revision.Finalize()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040046 pr.store()
47}
48
49type revData struct {
50 Children map[string][]string
51 Config string
52}
53
54func (pr *PersistedRevision) store() {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040055 if pr.GetBranch().Txid != "" {
56 return
57 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -040058 if ok, _ := pr.kvStore.Get(pr.Revision.GetHash()); ok != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040059 return
60 }
61
62 pr.storeConfig()
63
64 childrenHashes := make(map[string][]string)
Stephane Barbarieec0919b2018-09-05 14:14:29 -040065 for fieldName, children := range pr.GetChildren() {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040066 hashes := []string{}
67 for _, rev := range children {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040068 hashes = append(hashes, rev.GetHash())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040069 }
70 childrenHashes[fieldName] = hashes
71 }
72 data := &revData{
73 Children: childrenHashes,
Stephane Barbarieec0919b2018-09-05 14:14:29 -040074 Config: pr.GetConfig().Hash,
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040075 }
76 if blob, err := json.Marshal(data); err != nil {
77 // TODO report error
78 } else {
79 if pr.Compress {
80 var b bytes.Buffer
81 w := gzip.NewWriter(&b)
82 w.Write(blob)
83 w.Close()
84 blob = b.Bytes()
85 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -040086 pr.kvStore.Put(pr.GetHash(), blob)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040087 }
88}
89
Stephane Barbarieec0919b2018-09-05 14:14:29 -040090func (pr *PersistedRevision) Load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040091 blob, _ := kvStore.Get(hash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -040092
93 start := time.Now()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040094 output := blob.Value.([]byte)
95 var data revData
96 if pr.Compress {
97 b := bytes.NewBuffer(blob.Value.([]byte))
98 if r, err := gzip.NewReader(b); err != nil {
99 // TODO : report error
100 } else {
101 if output, err = ioutil.ReadAll(r); err != nil {
102 // TODO report error
103 }
104 }
105 }
106 if err := json.Unmarshal(output, &data); err != nil {
107 fmt.Errorf("problem to unmarshal data - %s", err.Error())
108 }
109
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400110 stop := time.Now()
111 GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400112 configHash := data.Config
113 configData := pr.loadConfig(kvStore, msgClass, configHash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400114
115 assembledChildren := make(map[string][]Revision)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400116
117 childrenHashes := data.Children
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400118 node := branch.Node
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400119 for fieldName, child := range ChildrenFields(msgClass) {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400120 var children []Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400121 for _, childHash := range childrenHashes[fieldName] {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400122 childNode := node.MakeNode(reflect.New(child.ClassType).Elem().Interface(), "")
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400123 childNode.LoadLatest(kvStore, childHash)
124 childRev := childNode.Latest()
125 children = append(children, childRev)
126 }
127 assembledChildren[fieldName] = children
128 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400129
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400130 rev := NewPersistedRevision(branch, configData, assembledChildren)
131 return rev
132}
133
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400134func (pr *PersistedRevision) assignValue(a, b Revision) Revision {
135 a = b
136 return a
137}
138
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400139func (pr *PersistedRevision) storeConfig() {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400140 if ok, _ := pr.kvStore.Get(pr.GetConfig().Hash); ok != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400141 return
142 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400143 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400144 // TODO report error
145 } else {
146 if pr.Compress {
147 var b bytes.Buffer
148 w := gzip.NewWriter(&b)
149 w.Write(blob)
150 w.Close()
151 blob = b.Bytes()
152 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400153 pr.kvStore.Put(pr.GetConfig().Hash, blob)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400154 }
155}
156
157func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
158 blob, _ := kvStore.Get(hash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400159 start := time.Now()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400160 output := blob.Value.([]byte)
161
162 if pr.Compress {
163 b := bytes.NewBuffer(blob.Value.([]byte))
164 if r, err := gzip.NewReader(b); err != nil {
165 // TODO : report error
166 } else {
167 if output, err = ioutil.ReadAll(r); err != nil {
168 // TODO report error
169 }
170 }
171 }
172
173 var data reflect.Value
174 if msgClass != nil {
175 data = reflect.New(reflect.TypeOf(msgClass).Elem())
176 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
177 // TODO report error
178 }
179 }
180
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400181 stop := time.Now()
182
183 GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400184 return data.Interface()
185}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400186
187func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
188 newNPR := pr.Revision.UpdateData(data, branch)
189
190 newPR := &PersistedRevision{
191 Revision: newNPR,
192 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400193 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400194 }
195
196 newPR.Finalize()
197
198 return newPR
199}
200
201func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
202 newNPR := pr.Revision.UpdateChildren(name, children, branch)
203
204 newPR := &PersistedRevision{
205 Revision: newNPR,
206 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400207 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400208 }
209
210 newPR.Finalize()
211
212 return newPR
213}
214
215func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
216 newNPR := pr.Revision.UpdateAllChildren(children, branch)
217
218 newPR := &PersistedRevision{
219 Revision: newNPR,
220 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400221 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400222 }
223
224 newPR.Finalize()
225
226 return newPR
227}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400228
229// Drop takes care of eliminating a revision hash that is no longer needed
230// and its associated config when required
231func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
232 if pr.kvStore != nil && txid == "" {
233 if includeConfig {
234 log.Debugf("removing rev config - hash: %s", pr.GetConfig().Hash)
235 if err := pr.kvStore.Delete(pr.GetConfig().Hash); err != nil {
236 log.Errorf(
237 "failed to remove rev config - hash: %s, err: %s",
238 pr.GetConfig().Hash,
239 err.Error(),
240 )
241 }
242 }
243
244 log.Debugf("removing rev - hash: %s", pr.GetHash())
245 if err := pr.kvStore.Delete(pr.GetHash()); err != nil {
246 log.Errorf("failed to remove rev - hash: %s, err: %s", pr.GetHash(), err.Error())
247 }
248 } else {
249 if includeConfig {
250 log.Debugf("Attempted to remove revision config:%s linked to transaction:%s", pr.GetConfig().Hash, txid)
251 }
252 log.Debugf("Attempted to remove revision:%s linked to transaction:%s", pr.GetHash(), txid)
253 }
254}