blob: 805557aafb783a5b28588d54547417533c8caf46 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040016package model
17
18import (
19 "bytes"
20 "compress/gzip"
21 "encoding/json"
22 "fmt"
23 "github.com/golang/protobuf/proto"
24 "io/ioutil"
25 "reflect"
Stephane Barbarieec0919b2018-09-05 14:14:29 -040026 "time"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040027)
28
29type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040030 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040031 Compress bool
32 kvStore *Backend
33}
34
Stephane Barbarieec0919b2018-09-05 14:14:29 -040035func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040036 pr := &PersistedRevision{}
Stephane Barbarieec0919b2018-09-05 14:14:29 -040037 pr.kvStore = branch.Node.root.KvStore
38 pr.Revision = NewNonPersistedRevision(branch, data, children)
39 pr.Finalize()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040040 return pr
41}
42
Stephane Barbarieec0919b2018-09-05 14:14:29 -040043func (pr *PersistedRevision) Finalize() {
44 //pr.Revision.Finalize()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040045 pr.store()
46}
47
48type revData struct {
49 Children map[string][]string
50 Config string
51}
52
53func (pr *PersistedRevision) store() {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040054 if ok, _ := pr.kvStore.Get(pr.Revision.GetHash()); ok != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040055 return
56 }
57
58 pr.storeConfig()
59
60 childrenHashes := make(map[string][]string)
Stephane Barbarieec0919b2018-09-05 14:14:29 -040061 for fieldName, children := range pr.GetChildren() {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040062 hashes := []string{}
63 for _, rev := range children {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040064 hashes = append(hashes, rev.GetHash())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040065 }
66 childrenHashes[fieldName] = hashes
67 }
68 data := &revData{
69 Children: childrenHashes,
Stephane Barbarieec0919b2018-09-05 14:14:29 -040070 Config: pr.GetConfig().Hash,
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040071 }
72 if blob, err := json.Marshal(data); err != nil {
73 // TODO report error
74 } else {
75 if pr.Compress {
76 var b bytes.Buffer
77 w := gzip.NewWriter(&b)
78 w.Write(blob)
79 w.Close()
80 blob = b.Bytes()
81 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -040082 pr.kvStore.Put(pr.GetHash(), blob)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040083 }
84}
85
Stephane Barbarieec0919b2018-09-05 14:14:29 -040086func (pr *PersistedRevision) Load(branch *Branch, kvStore *Backend, msgClass interface{}, hash string) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040087 blob, _ := kvStore.Get(hash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -040088
89 start := time.Now()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040090 output := blob.Value.([]byte)
91 var data revData
92 if pr.Compress {
93 b := bytes.NewBuffer(blob.Value.([]byte))
94 if r, err := gzip.NewReader(b); err != nil {
95 // TODO : report error
96 } else {
97 if output, err = ioutil.ReadAll(r); err != nil {
98 // TODO report error
99 }
100 }
101 }
102 if err := json.Unmarshal(output, &data); err != nil {
103 fmt.Errorf("problem to unmarshal data - %s", err.Error())
104 }
105
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400106 stop := time.Now()
107 GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400108 configHash := data.Config
109 configData := pr.loadConfig(kvStore, msgClass, configHash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400110
111 assembledChildren := make(map[string][]Revision)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400112
113 childrenHashes := data.Children
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400114 node := branch.Node
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400115 for fieldName, child := range ChildrenFields(msgClass) {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400116 var children []Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400117 for _, childHash := range childrenHashes[fieldName] {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400118 childNode := node.MakeNode(reflect.New(child.ClassType).Elem().Interface(), "")
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400119 childNode.LoadLatest(kvStore, childHash)
120 childRev := childNode.Latest()
121 children = append(children, childRev)
122 }
123 assembledChildren[fieldName] = children
124 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400125
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400126 rev := NewPersistedRevision(branch, configData, assembledChildren)
127 return rev
128}
129
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400130func (pr *PersistedRevision) assignValue(a, b Revision) Revision {
131 a = b
132 return a
133}
134
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400135func (pr *PersistedRevision) storeConfig() {
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400136 if ok, _ := pr.kvStore.Get(pr.GetConfig().Hash); ok != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400137 return
138 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400139 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400140 // TODO report error
141 } else {
142 if pr.Compress {
143 var b bytes.Buffer
144 w := gzip.NewWriter(&b)
145 w.Write(blob)
146 w.Close()
147 blob = b.Bytes()
148 }
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400149 pr.kvStore.Put(pr.GetConfig().Hash, blob)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400150 }
151}
152
153func (pr *PersistedRevision) loadConfig(kvStore *Backend, msgClass interface{}, hash string) interface{} {
154 blob, _ := kvStore.Get(hash)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400155 start := time.Now()
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400156 output := blob.Value.([]byte)
157
158 if pr.Compress {
159 b := bytes.NewBuffer(blob.Value.([]byte))
160 if r, err := gzip.NewReader(b); err != nil {
161 // TODO : report error
162 } else {
163 if output, err = ioutil.ReadAll(r); err != nil {
164 // TODO report error
165 }
166 }
167 }
168
169 var data reflect.Value
170 if msgClass != nil {
171 data = reflect.New(reflect.TypeOf(msgClass).Elem())
172 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
173 // TODO report error
174 }
175 }
176
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400177 stop := time.Now()
178
179 GetProfiling().AddToInMemoryModelTime(stop.Sub(start).Seconds())
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400180 return data.Interface()
181}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400182
183func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
184 newNPR := pr.Revision.UpdateData(data, branch)
185
186 newPR := &PersistedRevision{
187 Revision: newNPR,
188 Compress: pr.Compress,
189 kvStore: pr.kvStore,
190 }
191
192 newPR.Finalize()
193
194 return newPR
195}
196
197func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
198 newNPR := pr.Revision.UpdateChildren(name, children, branch)
199
200 newPR := &PersistedRevision{
201 Revision: newNPR,
202 Compress: pr.Compress,
203 kvStore: pr.kvStore,
204 }
205
206 newPR.Finalize()
207
208 return newPR
209}
210
211func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
212 newNPR := pr.Revision.UpdateAllChildren(children, branch)
213
214 newPR := &PersistedRevision{
215 Revision: newNPR,
216 Compress: pr.Compress,
217 kvStore: pr.kvStore,
218 }
219
220 newPR.Finalize()
221
222 return newPR
223}