blob: e0fcb1041cfceb9376b5922cbc5ac1ae6ed01f82 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
Stephane Barbarieef6650d2019-07-18 12:15:09 -040022 "context"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040023 "github.com/golang/protobuf/proto"
Stephane Barbarieef6650d2019-07-18 12:15:09 -040024 "github.com/google/uuid"
sbarbari17d7e222019-11-05 10:02:29 -050025 "github.com/opencord/voltha-lib-go/v2/pkg/db"
Scott Baker807addd2019-10-24 15:16:21 -070026 "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
27 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040028 "reflect"
Stephane Barbarie1ab43272018-12-08 21:42:13 -050029 "strings"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050030 "sync"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040031)
32
Stephane Barbariedc5022d2018-11-19 15:21:44 -050033// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040034type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040035 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040036 Compress bool
Stephane Barbariee0a4c792019-01-16 11:26:29 -050037
Stephane Barbarieef6650d2019-07-18 12:15:09 -040038 events chan *kvstore.Event
sbarbari17d7e222019-11-05 10:02:29 -050039 kvStore *db.Backend
Stephane Barbarieef6650d2019-07-18 12:15:09 -040040 mutex sync.RWMutex
41 versionMutex sync.RWMutex
42 Version int64
43 isStored bool
44 isWatched bool
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040045}
46
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040047type watchCache struct {
48 Cache sync.Map
49}
50
51var watchCacheInstance *watchCache
52var watchCacheOne sync.Once
53
54func Watches() *watchCache {
55 watchCacheOne.Do(func() {
56 watchCacheInstance = &watchCache{Cache: sync.Map{}}
57 })
58 return watchCacheInstance
59}
60
Stephane Barbariedc5022d2018-11-19 15:21:44 -050061// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040062func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040063 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050064 pr.kvStore = branch.Node.GetRoot().KvStore
Stephane Barbarieef6650d2019-07-18 12:15:09 -040065 pr.Version = 1
Stephane Barbariedc5022d2018-11-19 15:21:44 -050066 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040067 return pr
68}
69
Stephane Barbarieef6650d2019-07-18 12:15:09 -040070func (pr *PersistedRevision) getVersion() int64 {
71 pr.versionMutex.RLock()
72 defer pr.versionMutex.RUnlock()
73 return pr.Version
74}
75
76func (pr *PersistedRevision) setVersion(version int64) {
77 pr.versionMutex.Lock()
78 defer pr.versionMutex.Unlock()
79 pr.Version = version
80}
81
Stephane Barbariedc5022d2018-11-19 15:21:44 -050082// Finalize is responsible of saving the revision in the persistent storage
Stephane Barbarie1ab43272018-12-08 21:42:13 -050083func (pr *PersistedRevision) Finalize(skipOnExist bool) {
84 pr.store(skipOnExist)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040085}
86
Stephane Barbarie1ab43272018-12-08 21:42:13 -050087func (pr *PersistedRevision) store(skipOnExist bool) {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040088 if pr.GetBranch().Txid != "" {
89 return
90 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040091
Stephane Barbarie7512fc82019-05-07 12:25:46 -040092 log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
Stephane Barbarie1ab43272018-12-08 21:42:13 -050093
Stephane Barbarieef6650d2019-07-18 12:15:09 -040094 // clone the revision data to avoid any race conditions with processes
95 // accessing the same data
96 cloned := proto.Clone(pr.GetConfig().Data.(proto.Message))
97
98 if blob, err := proto.Marshal(cloned); err != nil {
99 log.Errorw("problem-to-marshal", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400100 } else {
101 if pr.Compress {
102 var b bytes.Buffer
103 w := gzip.NewWriter(&b)
104 w.Write(blob)
105 w.Close()
106 blob = b.Bytes()
107 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500108
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400109 GetRevCache().Set(pr.GetName(), pr)
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400110 if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400111 log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500112 } else {
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400113 log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500114 pr.isStored = true
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500115 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400116 }
117}
118
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500119func (pr *PersistedRevision) SetupWatch(key string) {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400120 if key == "" {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400121 log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400122 return
123 }
124
125 if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
126 return
127 }
128
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500129 if pr.events == nil {
130 pr.events = make(chan *kvstore.Event)
131
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400132 log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500133
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400134 pr.SetName(key)
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500135 pr.events = pr.kvStore.CreateWatch(key)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400136 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500137
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400138 if !pr.isWatched {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500139 pr.isWatched = true
140
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400141 log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400142
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500143 // Start watching
144 go pr.startWatching()
145 }
146}
147
148func (pr *PersistedRevision) startWatching() {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400149 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500150
151StopWatchLoop:
152 for {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400153 latestRev := pr.GetBranch().GetLatest()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400154
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500155 select {
156 case event, ok := <-pr.events:
157 if !ok {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400158 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500159 break StopWatchLoop
160 }
Stephane Barbariec92d1072019-06-07 16:21:49 -0400161 log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500162
163 switch event.EventType {
164 case kvstore.DELETE:
Stephane Barbariec92d1072019-06-07 16:21:49 -0400165 log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
David Bainbridgebdae73c2019-10-23 17:05:41 +0000166
167 // Remove reference from cache
168 GetRevCache().Delete(latestRev.GetName())
169
170 // Remove reference from parent
171 parent := pr.GetBranch().Node.GetRoot()
172 parent.GetBranch(NONE).Latest.ChildDropByName(latestRev.GetName())
173
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500174 break StopWatchLoop
175
176 case kvstore.PUT:
Stephane Barbariec92d1072019-06-07 16:21:49 -0400177 log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400178 if latestRev.getVersion() >= event.Version {
179 log.Debugw("skipping-matching-or-older-revision", log.Fields{
180 "watch": latestRev.GetName(),
181 "watch-version": event.Version,
182 "latest-version": latestRev.getVersion(),
183 })
184 continue
185 } else {
186 log.Debugw("watch-revision-is-newer", log.Fields{
187 "watch": latestRev.GetName(),
188 "watch-version": event.Version,
189 "latest-version": latestRev.getVersion(),
190 })
191 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500192
Stephane Barbariec92d1072019-06-07 16:21:49 -0400193 data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500194
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400195 if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400196 log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400197 } else {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400198 log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
Stephane Barbarie802aca42019-05-21 12:19:28 -0400199
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400200 var pathLock string
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400201 var blobs map[string]*kvstore.KVPair
202
203 // The watch reported new persistence data.
204 // Construct an object that will be used to update the memory
205 blobs = make(map[string]*kvstore.KVPair)
206 key, _ := kvstore.ToString(event.Key)
207 blobs[key] = &kvstore.KVPair{
208 Key: key,
209 Value: event.Value,
210 Session: "",
211 Lease: 0,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400212 Version: event.Version,
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400213 }
214
Stephane Barbariec92d1072019-06-07 16:21:49 -0400215 if latestRev.GetNode().GetProxy() != nil {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400216 //
217 // If a proxy exists for this revision, use it to lock access to the path
218 // and prevent simultaneous updates to the object in memory
219 //
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400220
221 //If the proxy already has a request in progress, then there is no need to process the watch
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400222 if latestRev.GetNode().GetProxy().GetOperation() != PROXY_NONE {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400223 log.Debugw("operation-in-progress", log.Fields{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400224 "key": latestRev.GetHash(),
225 "path": latestRev.GetNode().GetProxy().getFullPath(),
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400226 "operation": latestRev.GetNode().GetProxy().operation.String(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400227 })
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400228 continue
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400229 }
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400230
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400231 pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
232
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400233 // Reserve the path to prevent others to modify while we reload from persistence
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400234 latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
235 latestRev.GetNode().GetProxy().SetOperation(PROXY_WATCH)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400236
237 // Load changes and apply to memory
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400238 latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400239
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400240 // Release path
241 latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400242
243 } else {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400244 // This block should be reached only if coming from a non-proxied request
Stephane Barbariec92d1072019-06-07 16:21:49 -0400245 log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400246
247 // Load changes and apply to memory
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400248 latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500249 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500250 }
251
252 default:
Stephane Barbariec92d1072019-06-07 16:21:49 -0400253 log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500254 }
255 }
256 }
257
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400258 Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500259
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400260 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400261}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400262
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500263// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400264func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500265 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
266
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400267 newNPR := pr.Revision.UpdateData(ctx, data, branch)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400268
269 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400270 Revision: newNPR,
271 Compress: pr.Compress,
272 kvStore: pr.kvStore,
273 events: pr.events,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400274 Version: pr.getVersion(),
Stephane Barbariec92d1072019-06-07 16:21:49 -0400275 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400276 }
277
278 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400279 newPR.isStored = false
280 pr.Drop(branch.Txid, false)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400281 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400282 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400283 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400284 }
285
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400286 return newPR
287}
288
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500289// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400290func (pr *PersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500291 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
292
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400293 newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400294
295 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400296 Revision: newNPR,
297 Compress: pr.Compress,
298 kvStore: pr.kvStore,
299 events: pr.events,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400300 Version: pr.getVersion(),
Stephane Barbariec92d1072019-06-07 16:21:49 -0400301 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400302 }
303
304 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400305 newPR.isStored = false
306 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400307 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400308 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400309 }
310
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400311 return newPR
312}
313
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500314// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400315func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500316 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
317
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400318 newNPR := pr.Revision.UpdateAllChildren(children, branch)
319
320 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400321 Revision: newNPR,
322 Compress: pr.Compress,
323 kvStore: pr.kvStore,
324 events: pr.events,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400325 Version: pr.getVersion(),
Stephane Barbariec92d1072019-06-07 16:21:49 -0400326 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400327 }
328
329 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400330 newPR.isStored = false
331 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400332 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400333 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400334 }
335
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400336 return newPR
337}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400338
339// Drop takes care of eliminating a revision hash that is no longer needed
340// and its associated config when required
341func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400342 pr.Revision.Drop(txid, includeConfig)
343}
344
345// Drop takes care of eliminating a revision hash that is no longer needed
346// and its associated config when required
347func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
khenaidoo49085352020-01-13 19:15:43 -0500348 log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "key": pr.GetName(), "isStored": pr.isStored})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500349
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500350 pr.mutex.Lock()
351 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400352 if pr.kvStore != nil && txid == "" {
khenaidoo49085352020-01-13 19:15:43 -0500353 if pr.isWatched {
354 pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
355 pr.isWatched = false
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400356 }
357
khenaidoo49085352020-01-13 19:15:43 -0500358 if err := pr.kvStore.Delete(pr.GetName()); err != nil {
359 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
360 } else {
361 pr.isStored = false
362 }
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400363 } else {
364 if includeConfig {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500365 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400366 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500367 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400368 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500369
370 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400371}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400372
373// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400374func (pr *PersistedRevision) verifyPersistedEntry(ctx context.Context, data interface{}, typeName string, keyName string,
375 keyValue string, txid string, version int64) (response Revision) {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400376 // Parent which holds the current node entry
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400377 parent := pr.GetBranch().Node.GetRoot()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400378
Stephane Barbarie802aca42019-05-21 12:19:28 -0400379 // Get a copy of the parent's children
380 children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
381 copy(children, parent.GetBranch(NONE).Latest.GetChildren(typeName))
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400382
Stephane Barbarie802aca42019-05-21 12:19:28 -0400383 // Verify if a child with the provided key value can be found
384 if childIdx, childRev := pr.GetNode().findRevByKey(children, keyName, keyValue); childRev != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400385 // A child matching the provided key exists in memory
Stephane Barbarie802aca42019-05-21 12:19:28 -0400386 // Verify if the data differs from what was retrieved from persistence
Stephane Barbariec92d1072019-06-07 16:21:49 -0400387 // Also check if we are treating a newer revision of the data or not
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400388 if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() && childRev.getVersion() < version {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400389 log.Debugw("revision-data-is-different", log.Fields{
David Bainbridgebdae73c2019-10-23 17:05:41 +0000390 "key": childRev.GetHash(),
391 "name": childRev.GetName(),
392 "data": childRev.GetData(),
393 "in-memory-version": childRev.getVersion(),
394 "persisted-version": version,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400395 })
396
Stephane Barbarie802aca42019-05-21 12:19:28 -0400397 //
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400398 // Data has changed; replace the child entry and update the parent revision
Stephane Barbarie802aca42019-05-21 12:19:28 -0400399 //
400
401 // BEGIN Lock child -- prevent any incoming changes
402 childRev.GetBranch().LatestLock.Lock()
403
404 // Update child
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400405 updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400406
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400407 updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
408 updatedChildRev.SetupWatch(updatedChildRev.GetName())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400409 updatedChildRev.SetLastUpdate()
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400410 updatedChildRev.(*PersistedRevision).setVersion(version)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400411
Stephane Barbarie802aca42019-05-21 12:19:28 -0400412 // Update cache
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400413 GetRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
Stephane Barbariec92d1072019-06-07 16:21:49 -0400414 childRev.Drop(txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400415
Stephane Barbarie802aca42019-05-21 12:19:28 -0400416 childRev.GetBranch().LatestLock.Unlock()
417 // END lock child
418
419 // Update child entry
420 children[childIdx] = updatedChildRev
421
422 // BEGIN lock parent -- Update parent
423 parent.GetBranch(NONE).LatestLock.Lock()
424
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400425 updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
Stephane Barbarie802aca42019-05-21 12:19:28 -0400426 parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
427
428 parent.GetBranch(NONE).LatestLock.Unlock()
429 // END lock parent
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400430
431 // Drop the previous child revision
Stephane Barbarie802aca42019-05-21 12:19:28 -0400432 parent.GetBranch(NONE).Latest.ChildDrop(typeName, childRev.GetHash())
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400433
434 if updatedChildRev != nil {
435 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
436 "key": updatedChildRev.GetHash(),
437 "name": updatedChildRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400438 "data": updatedChildRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400439 })
440 response = updatedChildRev
441 }
442 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400443 if childRev != nil {
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400444 log.Debugw("keeping-revision-data", log.Fields{
David Bainbridgebdae73c2019-10-23 17:05:41 +0000445 "key": childRev.GetHash(),
446 "name": childRev.GetName(),
447 "data": childRev.GetData(),
448 "in-memory-version": childRev.getVersion(),
449 "persistence-version": version,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400450 })
Stephane Barbarie802aca42019-05-21 12:19:28 -0400451
452 // Update timestamp to reflect when it was last read and to reset tracked timeout
453 childRev.SetLastUpdate()
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400454 if childRev.getVersion() < version {
455 childRev.(*PersistedRevision).setVersion(version)
456 }
457 GetRevCache().Set(childRev.GetName(), childRev)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400458 response = childRev
459 }
460 }
Stephane Barbariec92d1072019-06-07 16:21:49 -0400461
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400462 } else {
463 // There is no available child with that key value.
464 // Create a new child and update the parent revision.
Stephane Barbarie802aca42019-05-21 12:19:28 -0400465 log.Debugw("no-such-revision-entry", log.Fields{
David Bainbridgebdae73c2019-10-23 17:05:41 +0000466 "key": keyValue,
467 "name": typeName,
468 "data": data,
469 "version": version,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400470 })
471
Stephane Barbarie802aca42019-05-21 12:19:28 -0400472 // BEGIN child lock
473 pr.GetBranch().LatestLock.Lock()
474
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400475 // Construct a new child node with the retrieved persistence data
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400476 childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400477
478 // We need to start watching this entry for future changes
479 childRev.SetName(typeName + "/" + keyValue)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400480 childRev.SetupWatch(childRev.GetName())
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400481 childRev.(*PersistedRevision).setVersion(version)
482
483 // Add entry to cache
484 GetRevCache().Set(childRev.GetName(), childRev)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400485
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400486 pr.GetBranch().LatestLock.Unlock()
Stephane Barbarie802aca42019-05-21 12:19:28 -0400487 // END child lock
488
489 //
490 // Add the child to the parent revision
491 //
492
493 // BEGIN parent lock
494 parent.GetBranch(NONE).LatestLock.Lock()
495 children = append(children, childRev)
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400496 updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
Stephane Barbarie802aca42019-05-21 12:19:28 -0400497 updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400498 parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
499 parent.GetBranch(NONE).LatestLock.Unlock()
500 // END parent lock
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400501
502 // Child entry is valid and can be included in the response object
503 if childRev != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400504 log.Debugw("adding-revision-to-response", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400505 "key": childRev.GetHash(),
506 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400507 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400508 })
509 response = childRev
510 }
511 }
512
513 return response
514}
515
516// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
517// by adding missing entries, updating changed entries and ignoring unchanged ones
Thomas Lee Se5a44012019-11-07 20:32:24 +0530518func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) ([]Revision, error) {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400519 pr.mutex.Lock()
520 defer pr.mutex.Unlock()
521
522 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
523
524 var response []Revision
Thomas Lee Se5a44012019-11-07 20:32:24 +0530525 var err error
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400526
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400527 for strings.HasPrefix(path, "/") {
528 path = path[1:]
529 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400530
531 if pr.kvStore != nil && path != "" {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400532 if blobs == nil || len(blobs) == 0 {
533 log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530534
535 if blobs, err = pr.kvStore.List(path); err != nil {
536 log.Errorw("failed-to-retrieve-data-from-kvstore", log.Fields{"error": err})
537 return nil, err
538 }
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400539 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400540
541 partition := strings.SplitN(path, "/", 2)
542 name := partition[0]
543
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400544 var nodeType interface{}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400545 if len(partition) < 2 {
546 path = ""
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400547 nodeType = pr.GetBranch().Node.Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400548 } else {
549 path = partition[1]
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400550 nodeType = pr.GetBranch().Node.GetRoot().Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400551 }
552
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400553 field := ChildrenFields(nodeType)[name]
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400554
555 if field != nil && field.IsContainer {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400556 log.Debugw("parsing-data-blobs", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400557 "path": path,
558 "name": name,
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400559 "size": len(blobs),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400560 })
561
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400562 for _, blob := range blobs {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400563 output := blob.Value.([]byte)
564
565 data := reflect.New(field.ClassType.Elem())
566
567 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400568 log.Errorw("failed-to-unmarshal", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400569 "path": path,
570 "txid": txid,
571 "error": err,
572 })
573 } else if path == "" {
574 if field.Key != "" {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400575 log.Debugw("no-path-with-container-key", log.Fields{
576 "path": path,
577 "txid": txid,
578 "data": data.Interface(),
579 })
580
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400581 // Retrieve the key identifier value from the data structure
582 // based on the field's key attribute
583 _, key := GetAttributeValue(data.Interface(), field.Key, 0)
584
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400585 if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, key.String(), txid, blob.Version); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400586 response = append(response, entry)
587 }
Stephane Barbarie802aca42019-05-21 12:19:28 -0400588 } else {
589 log.Debugw("path-with-no-container-key", log.Fields{
590 "path": path,
591 "txid": txid,
592 "data": data.Interface(),
593 })
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400594 }
595
596 } else if field.Key != "" {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400597 log.Debugw("path-with-container-key", log.Fields{
598 "path": path,
599 "txid": txid,
600 "data": data.Interface(),
601 })
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400602 // The request is for a specific entry/id
603 partition := strings.SplitN(path, "/", 2)
604 key := partition[0]
605 if len(partition) < 2 {
606 path = ""
607 } else {
608 path = partition[1]
609 }
610 keyValue := field.KeyFromStr(key)
611
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400612 if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, keyValue.(string), txid, blob.Version); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400613 response = append(response, entry)
614 }
615 }
616 }
617
Stephane Barbarie802aca42019-05-21 12:19:28 -0400618 log.Debugw("no-more-data-blobs", log.Fields{"path": path, "name": name})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400619 } else {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400620 log.Debugw("cannot-process-field", log.Fields{
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400621 "type": pr.GetBranch().Node.Type,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400622 "name": name,
623 })
624 }
625 }
626
Thomas Lee Se5a44012019-11-07 20:32:24 +0530627 return response, nil
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400628}