blob: 2ab91b77b35ec328ba5d546776315220c4c3c99f [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040022 "github.com/golang/protobuf/proto"
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040023 "github.com/opencord/voltha-go/common/log"
Stephane Barbariee0a4c792019-01-16 11:26:29 -050024 "github.com/opencord/voltha-go/db/kvstore"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040025 "reflect"
Stephane Barbarie1ab43272018-12-08 21:42:13 -050026 "strings"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050027 "sync"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040028)
29
Stephane Barbariedc5022d2018-11-19 15:21:44 -050030// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040031type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040032 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040033 Compress bool
Stephane Barbariee0a4c792019-01-16 11:26:29 -050034
Kent Hagerman0ab4cb22019-04-24 13:13:35 -040035 events chan *kvstore.Event
36 kvStore *Backend
37 mutex sync.RWMutex
Stephane Barbarie3cb01222019-01-16 17:15:56 -050038 isStored bool
39 isWatched bool
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040040}
41
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040042type watchCache struct {
43 Cache sync.Map
44}
45
46var watchCacheInstance *watchCache
47var watchCacheOne sync.Once
48
49func Watches() *watchCache {
50 watchCacheOne.Do(func() {
51 watchCacheInstance = &watchCache{Cache: sync.Map{}}
52 })
53 return watchCacheInstance
54}
55
Stephane Barbariedc5022d2018-11-19 15:21:44 -050056// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040057func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040058 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050059 pr.kvStore = branch.Node.GetRoot().KvStore
60 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040061 return pr
62}
63
Stephane Barbariedc5022d2018-11-19 15:21:44 -050064// Finalize is responsible of saving the revision in the persistent storage
Stephane Barbarie1ab43272018-12-08 21:42:13 -050065func (pr *PersistedRevision) Finalize(skipOnExist bool) {
66 pr.store(skipOnExist)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040067}
68
Stephane Barbarie1ab43272018-12-08 21:42:13 -050069func (pr *PersistedRevision) store(skipOnExist bool) {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040070 if pr.GetBranch().Txid != "" {
71 return
72 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040073
Stephane Barbarie7512fc82019-05-07 12:25:46 -040074 log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
Stephane Barbarie1ab43272018-12-08 21:42:13 -050075
Stephane Barbarieec0919b2018-09-05 14:14:29 -040076 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040077 // TODO report error
78 } else {
79 if pr.Compress {
80 var b bytes.Buffer
81 w := gzip.NewWriter(&b)
82 w.Write(blob)
83 w.Close()
84 blob = b.Bytes()
85 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -050086
Stephane Barbarief7fc1782019-03-28 22:33:41 -040087 if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
Stephane Barbarie7512fc82019-05-07 12:25:46 -040088 log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -050089 } else {
Stephane Barbarie7512fc82019-05-07 12:25:46 -040090 log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
Stephane Barbarie3cb01222019-01-16 17:15:56 -050091 pr.isStored = true
Stephane Barbariedc5022d2018-11-19 15:21:44 -050092 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040093 }
94}
95
Stephane Barbariee0a4c792019-01-16 11:26:29 -050096func (pr *PersistedRevision) SetupWatch(key string) {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040097 if key == "" {
Stephane Barbarie7512fc82019-05-07 12:25:46 -040098 log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040099 return
100 }
101
102 if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
103 return
104 }
105
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500106 if pr.events == nil {
107 pr.events = make(chan *kvstore.Event)
108
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400109 log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500110
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400111 pr.SetName(key)
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500112 pr.events = pr.kvStore.CreateWatch(key)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400113 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500114
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400115 if !pr.isWatched {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500116 pr.isWatched = true
117
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400118 log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400119
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500120 // Start watching
121 go pr.startWatching()
122 }
123}
124
125func (pr *PersistedRevision) startWatching() {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400126 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500127
128StopWatchLoop:
129 for {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400130 latestRev := pr.GetBranch().GetLatest()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400131
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500132 select {
133 case event, ok := <-pr.events:
134 if !ok {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400135 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500136 break StopWatchLoop
137 }
Stephane Barbariec92d1072019-06-07 16:21:49 -0400138 log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500139
140 switch event.EventType {
141 case kvstore.DELETE:
Stephane Barbariec92d1072019-06-07 16:21:49 -0400142 log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500143 pr.Revision.Drop("", true)
144 break StopWatchLoop
145
146 case kvstore.PUT:
Stephane Barbariec92d1072019-06-07 16:21:49 -0400147 log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500148
Stephane Barbariec92d1072019-06-07 16:21:49 -0400149 data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500150
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400151 if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400152 log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400153 } else {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400154 log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
Stephane Barbarie802aca42019-05-21 12:19:28 -0400155
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400156 var pathLock string
157 var pac *proxyAccessControl
158 var blobs map[string]*kvstore.KVPair
159
160 // The watch reported new persistence data.
161 // Construct an object that will be used to update the memory
162 blobs = make(map[string]*kvstore.KVPair)
163 key, _ := kvstore.ToString(event.Key)
164 blobs[key] = &kvstore.KVPair{
165 Key: key,
166 Value: event.Value,
167 Session: "",
168 Lease: 0,
169 }
170
Stephane Barbariec92d1072019-06-07 16:21:49 -0400171 if latestRev.GetNode().GetProxy() != nil {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400172 //
173 // If a proxy exists for this revision, use it to lock access to the path
174 // and prevent simultaneous updates to the object in memory
175 //
Stephane Barbariec92d1072019-06-07 16:21:49 -0400176 pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400177
178 //If the proxy already has a request in progress, then there is no need to process the watch
Stephane Barbariec92d1072019-06-07 16:21:49 -0400179 log.Debugw("checking-if-path-is-locked", log.Fields{"key": latestRev.GetHash(), "pathLock": pathLock})
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400180 if PAC().IsReserved(pathLock) {
181 log.Debugw("operation-in-progress", log.Fields{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400182 "key": latestRev.GetHash(),
183 "path": latestRev.GetNode().GetProxy().getFullPath(),
184 "operation": latestRev.GetNode().GetProxy().Operation.String(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400185 })
186
Stephane Barbariec92d1072019-06-07 16:21:49 -0400187 //continue
188
Stephane Barbarie802aca42019-05-21 12:19:28 -0400189 // Identify the operation type and determine if the watch event should be applied or not.
Stephane Barbariec92d1072019-06-07 16:21:49 -0400190 switch latestRev.GetNode().GetProxy().Operation {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400191 case PROXY_REMOVE:
192 fallthrough
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400193
Stephane Barbarie802aca42019-05-21 12:19:28 -0400194 case PROXY_ADD:
195 fallthrough
196
197 case PROXY_UPDATE:
198 // We will need to reload once the operation completes.
199 // Therefore, the data of the current event is most likely out-dated
200 // and should be ignored
201 log.Debugw("ignore-watch-event", log.Fields{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400202 "key": latestRev.GetHash(),
203 "path": latestRev.GetNode().GetProxy().getFullPath(),
204 "operation": latestRev.GetNode().GetProxy().Operation.String(),
Stephane Barbarie802aca42019-05-21 12:19:28 -0400205 })
206
207 continue
208
209 case PROXY_CREATE:
210 fallthrough
211
212 case PROXY_LIST:
213 fallthrough
214
215 case PROXY_GET:
216 fallthrough
217
Stephane Barbariec92d1072019-06-07 16:21:49 -0400218 case PROXY_WATCH:
219 fallthrough
220
Stephane Barbarie802aca42019-05-21 12:19:28 -0400221 default:
222 log.Debugw("process-watch-event", log.Fields{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400223 "key": latestRev.GetHash(),
224 "path": latestRev.GetNode().GetProxy().getFullPath(),
225 "operation": latestRev.GetNode().GetProxy().Operation.String(),
Stephane Barbarie802aca42019-05-21 12:19:28 -0400226 })
227 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400228 }
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400229
230 // Reserve the path to prevent others to modify while we reload from persistence
Stephane Barbariec92d1072019-06-07 16:21:49 -0400231 log.Debugw("reserve-and-lock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
232 pac = PAC().ReservePath(latestRev.GetNode().GetProxy().getFullPath(),
233 latestRev.GetNode().GetProxy(), pathLock)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400234 pac.lock()
Stephane Barbariec92d1072019-06-07 16:21:49 -0400235 latestRev.GetNode().GetProxy().Operation = PROXY_WATCH
236 pac.SetProxy(latestRev.GetNode().GetProxy())
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400237
238 // Load changes and apply to memory
Stephane Barbariec92d1072019-06-07 16:21:49 -0400239 latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400240
Stephane Barbariec92d1072019-06-07 16:21:49 -0400241 log.Debugw("release-and-unlock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
Stephane Barbarie802aca42019-05-21 12:19:28 -0400242 pac.getProxy().Operation = PROXY_GET
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400243 pac.unlock()
244 PAC().ReleasePath(pathLock)
245
246 } else {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400247 // This block should be reached only if coming from a non-proxied request
Stephane Barbariec92d1072019-06-07 16:21:49 -0400248 log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400249
250 // Load changes and apply to memory
Stephane Barbariec92d1072019-06-07 16:21:49 -0400251 latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500252 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500253 }
254
255 default:
Stephane Barbariec92d1072019-06-07 16:21:49 -0400256 log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500257 }
258 }
259 }
260
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400261 Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500262
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400263 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400264}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400265
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500266// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400267func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500268 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
269
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400270 newNPR := pr.Revision.UpdateData(data, branch)
271
272 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400273 Revision: newNPR,
274 Compress: pr.Compress,
275 kvStore: pr.kvStore,
276 events: pr.events,
277 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400278 }
279
280 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400281 newPR.isStored = false
282 pr.Drop(branch.Txid, false)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400283 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400284 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400285 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400286 }
287
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400288 return newPR
289}
290
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500291// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400292func (pr *PersistedRevision) UpdateChildren(name string, children []Revision,
293 branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500294 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
295
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400296 newNPR := pr.Revision.UpdateChildren(name, children, branch)
297
298 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400299 Revision: newNPR,
300 Compress: pr.Compress,
301 kvStore: pr.kvStore,
302 events: pr.events,
303 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400304 }
305
306 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400307 newPR.isStored = false
308 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400309 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400310 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400311 }
312
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400313 return newPR
314}
315
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500316// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400317func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500318 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
319
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400320 newNPR := pr.Revision.UpdateAllChildren(children, branch)
321
322 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400323 Revision: newNPR,
324 Compress: pr.Compress,
325 kvStore: pr.kvStore,
326 events: pr.events,
327 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400328 }
329
330 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400331 newPR.isStored = false
332 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400333 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400334 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400335 }
336
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400337 return newPR
338}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400339
340// Drop takes care of eliminating a revision hash that is no longer needed
341// and its associated config when required
342func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400343 pr.Revision.Drop(txid, includeConfig)
344}
345
346// Drop takes care of eliminating a revision hash that is no longer needed
347// and its associated config when required
348func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500349 log.Debugw("dropping-revision",
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400350 log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500351
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500352 pr.mutex.Lock()
353 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400354 if pr.kvStore != nil && txid == "" {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500355 if pr.isStored {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400356 if pr.isWatched {
357 pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
358 pr.isWatched = false
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500359 }
360
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400361 if err := pr.kvStore.Delete(pr.GetName()); err != nil {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500362 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
363 } else {
364 pr.isStored = false
365 }
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400366 }
367
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400368 } else {
369 if includeConfig {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500370 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400371 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500372 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400373 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500374
375 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400376}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400377
378// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
379func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400380 // Parent which holds the current node entry
381 parent := pr.GetBranch().Node.Root
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400382
Stephane Barbarie802aca42019-05-21 12:19:28 -0400383 // Get a copy of the parent's children
384 children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
385 copy(children, parent.GetBranch(NONE).Latest.GetChildren(typeName))
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400386
Stephane Barbarie802aca42019-05-21 12:19:28 -0400387 // Verify if a child with the provided key value can be found
388 if childIdx, childRev := pr.GetNode().findRevByKey(children, keyName, keyValue); childRev != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400389 // A child matching the provided key exists in memory
Stephane Barbarie802aca42019-05-21 12:19:28 -0400390 // Verify if the data differs from what was retrieved from persistence
Stephane Barbariec92d1072019-06-07 16:21:49 -0400391 // Also check if we are treating a newer revision of the data or not
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400392 if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400393 log.Debugw("revision-data-is-different", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400394 "key": childRev.GetHash(),
395 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400396 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400397 })
398
Stephane Barbarie802aca42019-05-21 12:19:28 -0400399 //
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400400 // Data has changed; replace the child entry and update the parent revision
Stephane Barbarie802aca42019-05-21 12:19:28 -0400401 //
402
403 // BEGIN Lock child -- prevent any incoming changes
404 childRev.GetBranch().LatestLock.Lock()
405
406 // Update child
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400407 updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400408
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400409 updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
410 updatedChildRev.SetupWatch(updatedChildRev.GetName())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400411 updatedChildRev.SetLastUpdate()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400412
Stephane Barbarie802aca42019-05-21 12:19:28 -0400413 // Update cache
414 GetRevCache().Cache.Store(updatedChildRev.GetName(), updatedChildRev)
Stephane Barbariec92d1072019-06-07 16:21:49 -0400415 childRev.Drop(txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400416
Stephane Barbarie802aca42019-05-21 12:19:28 -0400417 childRev.GetBranch().LatestLock.Unlock()
418 // END lock child
419
420 // Update child entry
421 children[childIdx] = updatedChildRev
422
423 // BEGIN lock parent -- Update parent
424 parent.GetBranch(NONE).LatestLock.Lock()
425
426 updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
427 parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
428
429 parent.GetBranch(NONE).LatestLock.Unlock()
430 // END lock parent
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400431
432 // Drop the previous child revision
Stephane Barbarie802aca42019-05-21 12:19:28 -0400433 parent.GetBranch(NONE).Latest.ChildDrop(typeName, childRev.GetHash())
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400434
435 if updatedChildRev != nil {
436 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
437 "key": updatedChildRev.GetHash(),
438 "name": updatedChildRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400439 "data": updatedChildRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400440 })
441 response = updatedChildRev
442 }
443 } else {
444 // Data is the same. Continue to the next entry
Stephane Barbarie802aca42019-05-21 12:19:28 -0400445 log.Debugw("same-revision-data", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400446 "key": childRev.GetHash(),
447 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400448 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400449 })
450 if childRev != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400451 log.Debugw("keeping-same-revision-data", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400452 "key": childRev.GetHash(),
453 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400454 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400455 })
Stephane Barbarie802aca42019-05-21 12:19:28 -0400456
457 // Update timestamp to reflect when it was last read and to reset tracked timeout
458 childRev.SetLastUpdate()
459 GetRevCache().Cache.Store(childRev.GetName(), childRev)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400460 response = childRev
461 }
462 }
Stephane Barbariec92d1072019-06-07 16:21:49 -0400463
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400464 } else {
465 // There is no available child with that key value.
466 // Create a new child and update the parent revision.
Stephane Barbarie802aca42019-05-21 12:19:28 -0400467 log.Debugw("no-such-revision-entry", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400468 "key": keyValue,
469 "name": typeName,
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400470 "data": data,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400471 })
472
Stephane Barbarie802aca42019-05-21 12:19:28 -0400473 // BEGIN child lock
474 pr.GetBranch().LatestLock.Lock()
475
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400476 // Construct a new child node with the retrieved persistence data
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400477 childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400478
479 // We need to start watching this entry for future changes
480 childRev.SetName(typeName + "/" + keyValue)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400481 childRev.SetupWatch(childRev.GetName())
482
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400483 pr.GetBranch().LatestLock.Unlock()
Stephane Barbarie802aca42019-05-21 12:19:28 -0400484 // END child lock
485
486 //
487 // Add the child to the parent revision
488 //
489
490 // BEGIN parent lock
491 parent.GetBranch(NONE).LatestLock.Lock()
492 children = append(children, childRev)
493 updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
494 updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400495 parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
496 parent.GetBranch(NONE).LatestLock.Unlock()
497 // END parent lock
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400498
499 // Child entry is valid and can be included in the response object
500 if childRev != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400501 log.Debugw("adding-revision-to-response", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400502 "key": childRev.GetHash(),
503 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400504 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400505 })
506 response = childRev
507 }
508 }
509
510 return response
511}
512
513// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
514// by adding missing entries, updating changed entries and ignoring unchanged ones
Stephane Barbariec92d1072019-06-07 16:21:49 -0400515func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400516 pr.mutex.Lock()
517 defer pr.mutex.Unlock()
518
519 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
520
521 var response []Revision
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400522
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400523 for strings.HasPrefix(path, "/") {
524 path = path[1:]
525 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400526
527 if pr.kvStore != nil && path != "" {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400528 if blobs == nil || len(blobs) == 0 {
529 log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
530 blobs, _ = pr.kvStore.List(path)
531 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400532
533 partition := strings.SplitN(path, "/", 2)
534 name := partition[0]
535
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400536 var nodeType interface{}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400537 if len(partition) < 2 {
538 path = ""
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400539 nodeType = pr.GetBranch().Node.Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400540 } else {
541 path = partition[1]
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400542 nodeType = pr.GetBranch().Node.Root.Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400543 }
544
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400545 field := ChildrenFields(nodeType)[name]
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400546
547 if field != nil && field.IsContainer {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400548 log.Debugw("parsing-data-blobs", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400549 "path": path,
550 "name": name,
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400551 "size": len(blobs),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400552 })
553
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400554 for _, blob := range blobs {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400555 output := blob.Value.([]byte)
556
557 data := reflect.New(field.ClassType.Elem())
558
559 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400560 log.Errorw("failed-to-unmarshal", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400561 "path": path,
562 "txid": txid,
563 "error": err,
564 })
565 } else if path == "" {
566 if field.Key != "" {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400567 log.Debugw("no-path-with-container-key", log.Fields{
568 "path": path,
569 "txid": txid,
570 "data": data.Interface(),
571 })
572
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400573 // Retrieve the key identifier value from the data structure
574 // based on the field's key attribute
575 _, key := GetAttributeValue(data.Interface(), field.Key, 0)
576
Stephane Barbariec92d1072019-06-07 16:21:49 -0400577 if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400578 response = append(response, entry)
579 }
Stephane Barbarie802aca42019-05-21 12:19:28 -0400580 } else {
581 log.Debugw("path-with-no-container-key", log.Fields{
582 "path": path,
583 "txid": txid,
584 "data": data.Interface(),
585 })
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400586 }
587
588 } else if field.Key != "" {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400589 log.Debugw("path-with-container-key", log.Fields{
590 "path": path,
591 "txid": txid,
592 "data": data.Interface(),
593 })
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400594 // The request is for a specific entry/id
595 partition := strings.SplitN(path, "/", 2)
596 key := partition[0]
597 if len(partition) < 2 {
598 path = ""
599 } else {
600 path = partition[1]
601 }
602 keyValue := field.KeyFromStr(key)
603
Stephane Barbariec92d1072019-06-07 16:21:49 -0400604 if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400605 response = append(response, entry)
606 }
607 }
608 }
609
Stephane Barbarie802aca42019-05-21 12:19:28 -0400610 log.Debugw("no-more-data-blobs", log.Fields{"path": path, "name": name})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400611 } else {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400612 log.Debugw("cannot-process-field", log.Fields{
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400613 "type": pr.GetBranch().Node.Type,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400614 "name": name,
615 })
616 }
617 }
618
619 return response
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400620}