blob: 774b77e06bc452c5e66e94b26943ec4ec2cbb5c4 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040016package model
17
18import (
19 "bytes"
20 "compress/gzip"
21 "encoding/json"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040022 "github.com/golang/protobuf/proto"
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040023 "github.com/opencord/voltha-go/common/log"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040024 "io/ioutil"
25 "reflect"
Stephane Barbarieec0919b2018-09-05 14:14:29 -040026 "time"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040027)
28
29type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040030 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040031 Compress bool
32 kvStore *Backend
33}
34
Stephane Barbarieec0919b2018-09-05 14:14:29 -040035func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040036 pr := &PersistedRevision{}
Stephane Barbarieec0919b2018-09-05 14:14:29 -040037 pr.kvStore = branch.Node.root.KvStore
38 pr.Revision = NewNonPersistedRevision(branch, data, children)
39 pr.Finalize()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040040 return pr
41}
42
Stephane Barbarieec0919b2018-09-05 14:14:29 -040043func (pr *PersistedRevision) Finalize() {
44 //pr.Revision.Finalize()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040045 pr.store()
46}
47
48type revData struct {
49 Children map[string][]string
50 Config string
51}
52
53func (pr *PersistedRevision) store() {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040054 if pr.GetBranch().Txid != "" {
55 return
56 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -040057 if ok, _ := pr.kvStore.Get(pr.Revision.GetHash()); ok != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040058 return
59 }
60
61 pr.storeConfig()
62
63 childrenHashes := make(map[string][]string)
Stephane Barbarieec0919b2018-09-05 14:14:29 -040064 for fieldName, children := range pr.GetChildren() {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040065 hashes := []string{}
66 for _, rev := range children {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040067 hashes = append(hashes, rev.GetHash())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040068 }
69 childrenHashes[fieldName] = hashes
70 }
71 data := &revData{
72 Children: childrenHashes,
Stephane Barbarieec0919b2018-09-05 14:14:29 -040073 Config: pr.GetConfig().Hash,
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040074 }
75 if blob, err := json.Marshal(data); err != nil {
76 // TODO report error
77 } else {
78 if pr.Compress {
79 var b bytes.Buffer
80 w := gzip.NewWriter(&b)
81 w.Write(blob)
82 w.Close()
83 blob = b.Bytes()
84 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -040085 pr.kvStore.Put(pr.GetHash(), blob)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040086 }
87}
88
Stephane Barbarieec0919b2018-09-05 14:14:29 -040089func (pr *PersistedRevision) Load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040090 blob, _ := kvStore.Get(hash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -040091
92 start := time.Now()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040093 output := blob.Value.([]byte)
94 var data revData
95 if pr.Compress {
96 b := bytes.NewBuffer(blob.Value.([]byte))
97 if r, err := gzip.NewReader(b); err != nil {
98 // TODO : report error
99 } else {
100 if output, err = ioutil.ReadAll(r); err != nil {
101 // TODO report error
102 }
103 }
104 }
105 if err := json.Unmarshal(output, &data); err != nil {
Stephane Barbarie8c48b5c2018-10-02 09:45:17 -0400106 log.Errorf("problem to unmarshal data - %s", err.Error())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400107 }
108
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400109 stop := time.Now()
110 GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400111 configHash := data.Config
112 configData := pr.loadConfig(kvStore, msgClass, configHash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400113
114 assembledChildren := make(map[string][]Revision)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400115
116 childrenHashes := data.Children
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400117 node := branch.Node
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400118 for fieldName, child := range ChildrenFields(msgClass) {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400119 var children []Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400120 for _, childHash := range childrenHashes[fieldName] {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400121 childNode := node.MakeNode(reflect.New(child.ClassType).Elem().Interface(), "")
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400122 childNode.LoadLatest(kvStore, childHash)
123 childRev := childNode.Latest()
124 children = append(children, childRev)
125 }
126 assembledChildren[fieldName] = children
127 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400128
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400129 rev := NewPersistedRevision(branch, configData, assembledChildren)
130 return rev
131}
132
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400133func (pr *PersistedRevision) assignValue(a, b Revision) Revision {
134 a = b
135 return a
136}
137
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400138func (pr *PersistedRevision) storeConfig() {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400139 if ok, _ := pr.kvStore.Get(pr.GetConfig().Hash); ok != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400140 return
141 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400142 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400143 // TODO report error
144 } else {
145 if pr.Compress {
146 var b bytes.Buffer
147 w := gzip.NewWriter(&b)
148 w.Write(blob)
149 w.Close()
150 blob = b.Bytes()
151 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400152 pr.kvStore.Put(pr.GetConfig().Hash, blob)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400153 }
154}
155
156func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
157 blob, _ := kvStore.Get(hash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400158 start := time.Now()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400159 output := blob.Value.([]byte)
160
161 if pr.Compress {
162 b := bytes.NewBuffer(blob.Value.([]byte))
163 if r, err := gzip.NewReader(b); err != nil {
164 // TODO : report error
165 } else {
166 if output, err = ioutil.ReadAll(r); err != nil {
167 // TODO report error
168 }
169 }
170 }
171
172 var data reflect.Value
173 if msgClass != nil {
174 data = reflect.New(reflect.TypeOf(msgClass).Elem())
175 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
176 // TODO report error
177 }
178 }
179
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400180 stop := time.Now()
181
182 GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400183 return data.Interface()
184}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400185
186func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
187 newNPR := pr.Revision.UpdateData(data, branch)
188
189 newPR := &PersistedRevision{
190 Revision: newNPR,
191 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400192 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400193 }
194
195 newPR.Finalize()
196
197 return newPR
198}
199
200func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
201 newNPR := pr.Revision.UpdateChildren(name, children, branch)
202
203 newPR := &PersistedRevision{
204 Revision: newNPR,
205 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400206 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400207 }
208
209 newPR.Finalize()
210
211 return newPR
212}
213
214func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
215 newNPR := pr.Revision.UpdateAllChildren(children, branch)
216
217 newPR := &PersistedRevision{
218 Revision: newNPR,
219 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400220 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400221 }
222
223 newPR.Finalize()
224
225 return newPR
226}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400227
228// Drop takes care of eliminating a revision hash that is no longer needed
229// and its associated config when required
230func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
231 if pr.kvStore != nil && txid == "" {
232 if includeConfig {
233 log.Debugf("removing rev config - hash: %s", pr.GetConfig().Hash)
234 if err := pr.kvStore.Delete(pr.GetConfig().Hash); err != nil {
235 log.Errorf(
236 "failed to remove rev config - hash: %s, err: %s",
237 pr.GetConfig().Hash,
238 err.Error(),
239 )
240 }
241 }
242
243 log.Debugf("removing rev - hash: %s", pr.GetHash())
244 if err := pr.kvStore.Delete(pr.GetHash()); err != nil {
245 log.Errorf("failed to remove rev - hash: %s, err: %s", pr.GetHash(), err.Error())
246 }
247 } else {
248 if includeConfig {
249 log.Debugf("Attempted to remove revision config:%s linked to transaction:%s", pr.GetConfig().Hash, txid)
250 }
251 log.Debugf("Attempted to remove revision:%s linked to transaction:%s", pr.GetHash(), txid)
252 }
253}