blob: fa35eca8614ad9f2e86ade4265622ed4e854180c [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040022 "github.com/golang/protobuf/proto"
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040023 "github.com/opencord/voltha-go/common/log"
Stephane Barbariee0a4c792019-01-16 11:26:29 -050024 "github.com/opencord/voltha-go/db/kvstore"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040025 "reflect"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050026 "runtime/debug"
Stephane Barbarie1ab43272018-12-08 21:42:13 -050027 "strings"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050028 "sync"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040029)
30
Stephane Barbariedc5022d2018-11-19 15:21:44 -050031// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040032type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040033 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040034 Compress bool
Stephane Barbariee0a4c792019-01-16 11:26:29 -050035
Stephane Barbarie3cb01222019-01-16 17:15:56 -050036 events chan *kvstore.Event `json:"-"`
37 kvStore *Backend `json:"-"`
38 mutex sync.RWMutex `json:"-"`
39 isStored bool
40 isWatched bool
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040041}
42
Stephane Barbariedc5022d2018-11-19 15:21:44 -050043// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040044func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040045 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050046 pr.kvStore = branch.Node.GetRoot().KvStore
47 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040048 return pr
49}
50
Stephane Barbariedc5022d2018-11-19 15:21:44 -050051// Finalize is responsible of saving the revision in the persistent storage
Stephane Barbarie1ab43272018-12-08 21:42:13 -050052func (pr *PersistedRevision) Finalize(skipOnExist bool) {
53 pr.store(skipOnExist)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040054}
55
56type revData struct {
57 Children map[string][]string
58 Config string
59}
60
Stephane Barbarie1ab43272018-12-08 21:42:13 -050061func (pr *PersistedRevision) store(skipOnExist bool) {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040062 if pr.GetBranch().Txid != "" {
63 return
64 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040065
Stephane Barbarie1ab43272018-12-08 21:42:13 -050066 if pair, _ := pr.kvStore.Get(pr.GetHash()); pair != nil && skipOnExist {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050067 log.Debugw("revision-config-already-exists", log.Fields{"hash": pr.GetHash()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040068 return
69 }
Stephane Barbarie1ab43272018-12-08 21:42:13 -050070
Stephane Barbarieec0919b2018-09-05 14:14:29 -040071 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040072 // TODO report error
73 } else {
74 if pr.Compress {
75 var b bytes.Buffer
76 w := gzip.NewWriter(&b)
77 w.Write(blob)
78 w.Close()
79 blob = b.Bytes()
80 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -050081
Stephane Barbarie1ab43272018-12-08 21:42:13 -050082 if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050083 log.Warnw("problem-storing-revision-config",
84 log.Fields{"error": err, "hash": pr.GetHash(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -050085 } else {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050086 log.Debugw("storing-revision-config",
87 log.Fields{"hash": pr.GetHash(), "data": pr.GetConfig().Data})
Stephane Barbarie3cb01222019-01-16 17:15:56 -050088 pr.isStored = true
Stephane Barbariedc5022d2018-11-19 15:21:44 -050089 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040090 }
91}
92
Stephane Barbariee0a4c792019-01-16 11:26:29 -050093func (pr *PersistedRevision) SetupWatch(key string) {
94 if pr.events == nil {
95 pr.events = make(chan *kvstore.Event)
96
97 log.Debugw("setting-watch", log.Fields{"key": key})
98
99 pr.events = pr.kvStore.CreateWatch(key)
100
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500101 pr.isWatched = true
102
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500103 // Start watching
104 go pr.startWatching()
105 }
106}
107
108func (pr *PersistedRevision) startWatching() {
109 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
110
111StopWatchLoop:
112 for {
113 select {
114 case event, ok := <-pr.events:
115 if !ok {
116 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash()})
117 break StopWatchLoop
118 }
119
120 log.Debugw("received-event", log.Fields{"type": event.EventType})
121
122 switch event.EventType {
123 case kvstore.DELETE:
124 log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash()})
125 pr.Revision.Drop("", true)
126 break StopWatchLoop
127
128 case kvstore.PUT:
129 log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash()})
130
131 if dataPair, err := pr.kvStore.Get(pr.GetHash()); err != nil || dataPair == nil {
132 log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "error": err})
133 } else {
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500134 data := reflect.New(reflect.TypeOf(pr.GetData()).Elem())
135
136 if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
137 log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "error": err})
138 } else {
Stephane Barbarie0a97e9b2019-02-11 22:02:17 -0500139 // Apply changes to current revision
140 branch := pr.GetBranch()
141 rev := branch.GetLatest()
142 updatedRev := rev.UpdateData(data.Interface(), branch)
143
Stephane Barbarieb0c79892019-02-13 11:29:59 -0500144 // ensure that we keep the previous hash value
145 updatedRev.SetHash(rev.GetHash())
146
147 // Save revision
148 branch.SetLatest(updatedRev)
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500149 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500150 }
151
152 default:
153 log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "type": event.EventType})
154 }
155 }
156 }
157
158 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash()})
159}
160
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500161func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500162 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
163
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500164 var response []Revision
165 var rev Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400166
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500167 rev = pr
168
169 if pr.kvStore != nil {
170 blobMap, _ := pr.kvStore.List(path)
171
172 partition := strings.SplitN(path, "/", 2)
173 name := partition[0]
174
175 if len(partition) < 2 {
176 path = ""
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400177 } else {
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500178 path = partition[1]
179 }
180
181 field := ChildrenFields(rev.GetBranch().Node.Type)[name]
182
183 if field.IsContainer {
Stephane Barbarieaa467942019-02-06 14:09:44 -0500184 var children []Revision
185 children = make([]Revision, len(rev.GetChildren(name)))
186 copy(children, rev.GetChildren(name))
187 existChildMap := make(map[string]int)
188 for i, child := range rev.GetChildren(name) {
189 existChildMap[child.GetHash()] = i
190 }
191
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500192 for _, blob := range blobMap {
193 output := blob.Value.([]byte)
194
195 data := reflect.New(field.ClassType.Elem())
196
197 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
Stephane Barbarieaa467942019-02-06 14:09:44 -0500198 log.Errorw(
199 "loading-from-persistence--failed-to-unmarshal",
200 log.Fields{"path": path, "txid": txid, "error": err},
201 )
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500202 } else if field.Key != "" {
203 var key reflect.Value
204 var keyValue interface{}
205 var keyStr string
206
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500207 if path == "" {
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500208 // e.g. /logical_devices --> path="" name=logical_devices key=""
209 _, key = GetAttributeValue(data.Interface(), field.Key, 0)
210 keyStr = key.String()
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500211
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500212 } else {
213 // e.g.
214 // /logical_devices/abcde --> path="abcde" name=logical_devices
215 // /logical_devices/abcde/image_downloads --> path="abcde/image_downloads" name=logical_devices
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500216
217 partition := strings.SplitN(path, "/", 2)
218 key := partition[0]
219 if len(partition) < 2 {
220 path = ""
221 } else {
222 path = partition[1]
223 }
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500224 keyValue = field.KeyFromStr(key)
225 keyStr = keyValue.(string)
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500226
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500227 if idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue); childRev != nil {
228 // Key is memory, continue recursing path
229 newChildRev := childRev.LoadFromPersistence(path, txid)
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500230
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500231 children[idx] = newChildRev[0]
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500232
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500233 rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
234 rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500235
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500236 response = append(response, newChildRev[0])
237 continue
238 }
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500239 }
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500240
241 childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
242 childRev.SetHash(name + "/" + keyStr)
243
244 // Do not process a child that is already in memory
245 if _, childExists := existChildMap[childRev.GetHash()]; !childExists {
246 // Create watch for <component>/<key>
247 childRev.SetupWatch(childRev.GetHash())
248
249 children = append(children, childRev)
250 rev = rev.UpdateChildren(name, children, rev.GetBranch())
251
252 rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
253 }
254 response = append(response, childRev)
255 continue
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500256 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400257 }
258 }
259 }
Stephane Barbarie11b88e72019-02-07 12:28:29 -0500260
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500261 return response
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400262}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400263
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500264// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400265func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500266 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
267
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400268 newNPR := pr.Revision.UpdateData(data, branch)
269
270 newPR := &PersistedRevision{
271 Revision: newNPR,
272 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400273 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400274 }
275
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400276 return newPR
277}
278
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500279// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400280func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500281 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
282
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400283 newNPR := pr.Revision.UpdateChildren(name, children, branch)
284
285 newPR := &PersistedRevision{
286 Revision: newNPR,
287 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400288 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400289 }
290
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400291 return newPR
292}
293
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500294// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400295func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500296 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
297
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400298 newNPR := pr.Revision.UpdateAllChildren(children, branch)
299
300 newPR := &PersistedRevision{
301 Revision: newNPR,
302 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400303 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400304 }
305
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400306 return newPR
307}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400308
309// Drop takes care of eliminating a revision hash that is no longer needed
310// and its associated config when required
311func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500312 log.Debugw("dropping-revision",
313 log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "stack": string(debug.Stack())})
314
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500315 pr.mutex.Lock()
316 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400317 if pr.kvStore != nil && txid == "" {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500318 if pr.isStored {
319 if includeConfig {
320 if err := pr.kvStore.Delete(pr.GetConfig().Hash); err != nil {
321 log.Errorw("failed-to-remove-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "error": err.Error()})
322 }
323 }
324
325 if err := pr.kvStore.Delete(pr.GetHash()); err != nil {
326 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
327 } else {
328 pr.isStored = false
329 }
330
331 if pr.isWatched {
332 pr.kvStore.DeleteWatch(pr.GetHash(), pr.events)
333 pr.isWatched = false
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400334 }
335 }
336
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400337 } else {
338 if includeConfig {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500339 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400340 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500341 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400342 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500343
344 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400345}