blob: bbb4a1d3750898b321562c2bfa7dd8db1a584603 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
Stephane Barbarieef6650d2019-07-18 12:15:09 -040022 "context"
npujar9a30c702019-11-14 17:06:39 +053023 "reflect"
24 "strings"
25 "sync"
26
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040027 "github.com/golang/protobuf/proto"
Stephane Barbarieef6650d2019-07-18 12:15:09 -040028 "github.com/google/uuid"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080029 "github.com/opencord/voltha-lib-go/v3/pkg/db"
30 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
31 "github.com/opencord/voltha-lib-go/v3/pkg/log"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040032)
33
Stephane Barbariedc5022d2018-11-19 15:21:44 -050034// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040035type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040036 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040037 Compress bool
Stephane Barbariee0a4c792019-01-16 11:26:29 -050038
Stephane Barbarieef6650d2019-07-18 12:15:09 -040039 events chan *kvstore.Event
sbarbari17d7e222019-11-05 10:02:29 -050040 kvStore *db.Backend
Stephane Barbarieef6650d2019-07-18 12:15:09 -040041 mutex sync.RWMutex
42 versionMutex sync.RWMutex
43 Version int64
44 isStored bool
45 isWatched bool
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040046}
47
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040048type watchCache struct {
49 Cache sync.Map
50}
51
52var watchCacheInstance *watchCache
53var watchCacheOne sync.Once
54
npujar9a30c702019-11-14 17:06:39 +053055func watches() *watchCache {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040056 watchCacheOne.Do(func() {
57 watchCacheInstance = &watchCache{Cache: sync.Map{}}
58 })
59 return watchCacheInstance
60}
61
Stephane Barbariedc5022d2018-11-19 15:21:44 -050062// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040063func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040064 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050065 pr.kvStore = branch.Node.GetRoot().KvStore
Stephane Barbarieef6650d2019-07-18 12:15:09 -040066 pr.Version = 1
Stephane Barbariedc5022d2018-11-19 15:21:44 -050067 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040068 return pr
69}
70
Stephane Barbarieef6650d2019-07-18 12:15:09 -040071func (pr *PersistedRevision) getVersion() int64 {
72 pr.versionMutex.RLock()
73 defer pr.versionMutex.RUnlock()
74 return pr.Version
75}
76
77func (pr *PersistedRevision) setVersion(version int64) {
78 pr.versionMutex.Lock()
79 defer pr.versionMutex.Unlock()
80 pr.Version = version
81}
82
Stephane Barbariedc5022d2018-11-19 15:21:44 -050083// Finalize is responsible of saving the revision in the persistent storage
npujar467fe752020-01-16 20:17:45 +053084func (pr *PersistedRevision) Finalize(ctx context.Context, skipOnExist bool) {
85 pr.store(ctx, skipOnExist)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040086}
87
npujar467fe752020-01-16 20:17:45 +053088func (pr *PersistedRevision) store(ctx context.Context, skipOnExist bool) {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040089 if pr.GetBranch().Txid != "" {
90 return
91 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040092
Stephane Barbarie7512fc82019-05-07 12:25:46 -040093 log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
Stephane Barbarie1ab43272018-12-08 21:42:13 -050094
Stephane Barbarieef6650d2019-07-18 12:15:09 -040095 // clone the revision data to avoid any race conditions with processes
96 // accessing the same data
97 cloned := proto.Clone(pr.GetConfig().Data.(proto.Message))
98
99 if blob, err := proto.Marshal(cloned); err != nil {
100 log.Errorw("problem-to-marshal", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400101 } else {
102 if pr.Compress {
103 var b bytes.Buffer
104 w := gzip.NewWriter(&b)
npujar9a30c702019-11-14 17:06:39 +0530105 if _, err := w.Write(blob); err != nil {
106 log.Errorw("Unable to write a compressed form of p to the underlying io.Writer.", log.Fields{"error": err})
107 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400108 w.Close()
109 blob = b.Bytes()
110 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500111
npujar9a30c702019-11-14 17:06:39 +0530112 getRevCache().Set(pr.GetName(), pr)
npujar467fe752020-01-16 20:17:45 +0530113 if err := pr.kvStore.Put(ctx, pr.GetName(), blob); err != nil {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400114 log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500115 } else {
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400116 log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500117 pr.isStored = true
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500118 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400119 }
120}
121
npujar9a30c702019-11-14 17:06:39 +0530122// SetupWatch -
npujar467fe752020-01-16 20:17:45 +0530123func (pr *PersistedRevision) SetupWatch(ctx context.Context, key string) {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400124 if key == "" {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400125 log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400126 return
127 }
128
npujar9a30c702019-11-14 17:06:39 +0530129 if _, exists := watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400130 return
131 }
132
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500133 if pr.events == nil {
134 pr.events = make(chan *kvstore.Event)
135
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400136 log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500137
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400138 pr.SetName(key)
npujar467fe752020-01-16 20:17:45 +0530139 pr.events = pr.kvStore.CreateWatch(ctx, key)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400140 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500141
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400142 if !pr.isWatched {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500143 pr.isWatched = true
144
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400145 log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400146
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500147 // Start watching
npujar467fe752020-01-16 20:17:45 +0530148 go pr.startWatching(ctx)
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500149 }
150}
151
npujar467fe752020-01-16 20:17:45 +0530152func (pr *PersistedRevision) startWatching(ctx context.Context) {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400153 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500154
155StopWatchLoop:
156 for {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400157 latestRev := pr.GetBranch().GetLatest()
npujar9a30c702019-11-14 17:06:39 +0530158 event, ok := <-pr.events
159 if !ok {
160 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
161 break StopWatchLoop
162 }
163 log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400164
npujar9a30c702019-11-14 17:06:39 +0530165 switch event.EventType {
166 case kvstore.DELETE:
167 log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
168
169 // Remove reference from cache
170 getRevCache().Delete(latestRev.GetName())
171
172 // Remove reference from parent
173 parent := pr.GetBranch().Node.GetRoot()
174 parent.GetBranch(NONE).Latest.ChildDropByName(latestRev.GetName())
175
176 break StopWatchLoop
177
178 case kvstore.PUT:
179 log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
180 if latestRev.getVersion() >= event.Version {
181 log.Debugw("skipping-matching-or-older-revision", log.Fields{
182 "watch": latestRev.GetName(),
183 "watch-version": event.Version,
184 "latest-version": latestRev.getVersion(),
185 })
186 continue
187 } else {
188 log.Debugw("watch-revision-is-newer", log.Fields{
189 "watch": latestRev.GetName(),
190 "watch-version": event.Version,
191 "latest-version": latestRev.getVersion(),
192 })
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500193 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500194
npujar9a30c702019-11-14 17:06:39 +0530195 data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
David Bainbridgebdae73c2019-10-23 17:05:41 +0000196
npujar9a30c702019-11-14 17:06:39 +0530197 if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
198 log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
199 } else {
200 log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
David Bainbridgebdae73c2019-10-23 17:05:41 +0000201
npujar9a30c702019-11-14 17:06:39 +0530202 var pathLock string
David Bainbridgebdae73c2019-10-23 17:05:41 +0000203
npujar9a30c702019-11-14 17:06:39 +0530204 // The watch reported new persistence data.
205 // Construct an object that will be used to update the memory
206 blobs := make(map[string]*kvstore.KVPair)
207 key, _ := kvstore.ToString(event.Key)
208 blobs[key] = &kvstore.KVPair{
209 Key: key,
210 Value: event.Value,
211 Session: "",
212 Lease: 0,
213 Version: event.Version,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400214 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500215
npujar9a30c702019-11-14 17:06:39 +0530216 if latestRev.getNode().GetProxy() != nil {
217 //
218 // If a proxy exists for this revision, use it to lock access to the path
219 // and prevent simultaneous updates to the object in memory
220 //
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500221
npujar9a30c702019-11-14 17:06:39 +0530222 //If the proxy already has a request in progress, then there is no need to process the watch
223 if latestRev.getNode().GetProxy().GetOperation() != ProxyNone {
224 log.Debugw("operation-in-progress", log.Fields{
225 "key": latestRev.GetHash(),
226 "path": latestRev.getNode().GetProxy().getFullPath(),
227 "operation": latestRev.getNode().GetProxy().operation.String(),
228 })
229 continue
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400230 }
231
npujar9a30c702019-11-14 17:06:39 +0530232 pathLock, _ = latestRev.getNode().GetProxy().parseForControlledPath(latestRev.getNode().GetProxy().getFullPath())
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400233
npujar9a30c702019-11-14 17:06:39 +0530234 // Reserve the path to prevent others to modify while we reload from persistence
npujar467fe752020-01-16 20:17:45 +0530235 if _, err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.Reserve(ctx, pathLock+"_", uuid.New().String(), ReservationTTL); err != nil {
npujar9a30c702019-11-14 17:06:39 +0530236 log.Errorw("Unable to acquire a key and set it to a given value", log.Fields{"error": err})
237 }
238 latestRev.getNode().GetProxy().SetOperation(ProxyWatch)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400239
npujar9a30c702019-11-14 17:06:39 +0530240 // Load changes and apply to memory
npujar467fe752020-01-16 20:17:45 +0530241 if _, err = latestRev.LoadFromPersistence(ctx, latestRev.GetName(), "", blobs); err != nil {
npujar9a30c702019-11-14 17:06:39 +0530242 log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
243 }
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400244
npujar9a30c702019-11-14 17:06:39 +0530245 // Release path
npujar467fe752020-01-16 20:17:45 +0530246 if err = latestRev.getNode().GetProxy().getRoot().KvStore.Client.ReleaseReservation(ctx, pathLock+"_"); err != nil {
npujar9a30c702019-11-14 17:06:39 +0530247 log.Errorw("Unable to release reservation for a specific key", log.Fields{"error": err})
248 }
249 } else {
250 // This block should be reached only if coming from a non-proxied request
251 log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400252
npujar9a30c702019-11-14 17:06:39 +0530253 // Load changes and apply to memory
npujar467fe752020-01-16 20:17:45 +0530254 if _, err = latestRev.LoadFromPersistence(ctx, latestRev.GetName(), "", blobs); err != nil {
npujar9a30c702019-11-14 17:06:39 +0530255 log.Errorw("Unable to refresh the memory by adding missing entries", log.Fields{"error": err})
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500256 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500257 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500258 }
npujar9a30c702019-11-14 17:06:39 +0530259
260 default:
261 log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500262 }
263 }
264
npujar9a30c702019-11-14 17:06:39 +0530265 watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500266
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400267 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400268}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400269
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500270// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400271func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500272 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
273
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400274 newNPR := pr.Revision.UpdateData(ctx, data, branch)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400275
276 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400277 Revision: newNPR,
278 Compress: pr.Compress,
279 kvStore: pr.kvStore,
280 events: pr.events,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400281 Version: pr.getVersion(),
Stephane Barbariec92d1072019-06-07 16:21:49 -0400282 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400283 }
284
285 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400286 newPR.isStored = false
287 pr.Drop(branch.Txid, false)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400288 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400289 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400290 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400291 }
292
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400293 return newPR
294}
295
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500296// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400297func (pr *PersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500298 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
299
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400300 newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400301
302 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400303 Revision: newNPR,
304 Compress: pr.Compress,
305 kvStore: pr.kvStore,
306 events: pr.events,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400307 Version: pr.getVersion(),
Stephane Barbariec92d1072019-06-07 16:21:49 -0400308 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400309 }
310
311 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400312 newPR.isStored = false
313 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400314 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400315 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400316 }
317
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400318 return newPR
319}
320
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500321// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
npujar467fe752020-01-16 20:17:45 +0530322func (pr *PersistedRevision) UpdateAllChildren(ctx context.Context, children map[string][]Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500323 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
324
npujar467fe752020-01-16 20:17:45 +0530325 newNPR := pr.Revision.UpdateAllChildren(ctx, children, branch)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400326
327 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400328 Revision: newNPR,
329 Compress: pr.Compress,
330 kvStore: pr.kvStore,
331 events: pr.events,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400332 Version: pr.getVersion(),
Stephane Barbariec92d1072019-06-07 16:21:49 -0400333 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400334 }
335
336 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400337 newPR.isStored = false
338 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400339 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400340 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400341 }
342
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400343 return newPR
344}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400345
346// Drop takes care of eliminating a revision hash that is no longer needed
347// and its associated config when required
348func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400349 pr.Revision.Drop(txid, includeConfig)
350}
351
npujar9a30c702019-11-14 17:06:39 +0530352// StorageDrop takes care of eliminating a revision hash that is no longer needed
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400353// and its associated config when required
npujar467fe752020-01-16 20:17:45 +0530354func (pr *PersistedRevision) StorageDrop(ctx context.Context, txid string, includeConfig bool) {
khenaidoo49085352020-01-13 19:15:43 -0500355 log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "key": pr.GetName(), "isStored": pr.isStored})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500356
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500357 pr.mutex.Lock()
358 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400359 if pr.kvStore != nil && txid == "" {
khenaidoo49085352020-01-13 19:15:43 -0500360 if pr.isWatched {
361 pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
362 pr.isWatched = false
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400363 }
364
npujar467fe752020-01-16 20:17:45 +0530365 if err := pr.kvStore.Delete(ctx, pr.GetName()); err != nil {
khenaidoo49085352020-01-13 19:15:43 -0500366 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
367 } else {
368 pr.isStored = false
369 }
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400370 } else {
371 if includeConfig {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500372 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400373 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500374 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400375 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500376
377 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400378}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400379
380// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400381func (pr *PersistedRevision) verifyPersistedEntry(ctx context.Context, data interface{}, typeName string, keyName string,
382 keyValue string, txid string, version int64) (response Revision) {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400383 // Parent which holds the current node entry
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400384 parent := pr.GetBranch().Node.GetRoot()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400385
Stephane Barbarie802aca42019-05-21 12:19:28 -0400386 // Get a copy of the parent's children
387 children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
388 copy(children, parent.GetBranch(NONE).Latest.GetChildren(typeName))
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400389
Stephane Barbarie802aca42019-05-21 12:19:28 -0400390 // Verify if a child with the provided key value can be found
npujar9a30c702019-11-14 17:06:39 +0530391 if childIdx, childRev := pr.getNode().findRevByKey(children, keyName, keyValue); childRev != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400392 // A child matching the provided key exists in memory
Stephane Barbarie802aca42019-05-21 12:19:28 -0400393 // Verify if the data differs from what was retrieved from persistence
Stephane Barbariec92d1072019-06-07 16:21:49 -0400394 // Also check if we are treating a newer revision of the data or not
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400395 if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() && childRev.getVersion() < version {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400396 log.Debugw("revision-data-is-different", log.Fields{
David Bainbridgebdae73c2019-10-23 17:05:41 +0000397 "key": childRev.GetHash(),
398 "name": childRev.GetName(),
399 "data": childRev.GetData(),
400 "in-memory-version": childRev.getVersion(),
401 "persisted-version": version,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400402 })
403
Stephane Barbarie802aca42019-05-21 12:19:28 -0400404 //
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400405 // Data has changed; replace the child entry and update the parent revision
Stephane Barbarie802aca42019-05-21 12:19:28 -0400406 //
407
408 // BEGIN Lock child -- prevent any incoming changes
409 childRev.GetBranch().LatestLock.Lock()
410
411 // Update child
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400412 updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400413
npujar9a30c702019-11-14 17:06:39 +0530414 updatedChildRev.getNode().SetProxy(childRev.getNode().GetProxy())
npujar467fe752020-01-16 20:17:45 +0530415 updatedChildRev.SetupWatch(ctx, updatedChildRev.GetName())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400416 updatedChildRev.SetLastUpdate()
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400417 updatedChildRev.(*PersistedRevision).setVersion(version)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400418
Stephane Barbarie802aca42019-05-21 12:19:28 -0400419 // Update cache
npujar9a30c702019-11-14 17:06:39 +0530420 getRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
Stephane Barbariec92d1072019-06-07 16:21:49 -0400421 childRev.Drop(txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400422
Stephane Barbarie802aca42019-05-21 12:19:28 -0400423 childRev.GetBranch().LatestLock.Unlock()
424 // END lock child
425
426 // Update child entry
427 children[childIdx] = updatedChildRev
428
429 // BEGIN lock parent -- Update parent
430 parent.GetBranch(NONE).LatestLock.Lock()
431
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400432 updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
Stephane Barbarie802aca42019-05-21 12:19:28 -0400433 parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
434
435 parent.GetBranch(NONE).LatestLock.Unlock()
436 // END lock parent
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400437
438 // Drop the previous child revision
Stephane Barbarie802aca42019-05-21 12:19:28 -0400439 parent.GetBranch(NONE).Latest.ChildDrop(typeName, childRev.GetHash())
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400440
441 if updatedChildRev != nil {
442 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
443 "key": updatedChildRev.GetHash(),
444 "name": updatedChildRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400445 "data": updatedChildRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400446 })
447 response = updatedChildRev
448 }
449 } else {
npujar9a30c702019-11-14 17:06:39 +0530450 log.Debugw("keeping-revision-data", log.Fields{
451 "key": childRev.GetHash(),
452 "name": childRev.GetName(),
453 "data": childRev.GetData(),
454 "in-memory-version": childRev.getVersion(),
455 "persistence-version": version,
456 })
Stephane Barbarie802aca42019-05-21 12:19:28 -0400457
npujar9a30c702019-11-14 17:06:39 +0530458 // Update timestamp to reflect when it was last read and to reset tracked timeout
459 childRev.SetLastUpdate()
460 if childRev.getVersion() < version {
461 childRev.(*PersistedRevision).setVersion(version)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400462 }
npujar9a30c702019-11-14 17:06:39 +0530463 getRevCache().Set(childRev.GetName(), childRev)
464 response = childRev
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400465 }
Stephane Barbariec92d1072019-06-07 16:21:49 -0400466
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400467 } else {
468 // There is no available child with that key value.
469 // Create a new child and update the parent revision.
Stephane Barbarie802aca42019-05-21 12:19:28 -0400470 log.Debugw("no-such-revision-entry", log.Fields{
David Bainbridgebdae73c2019-10-23 17:05:41 +0000471 "key": keyValue,
472 "name": typeName,
473 "data": data,
474 "version": version,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400475 })
476
Stephane Barbarie802aca42019-05-21 12:19:28 -0400477 // BEGIN child lock
478 pr.GetBranch().LatestLock.Lock()
479
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400480 // Construct a new child node with the retrieved persistence data
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400481 childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400482
483 // We need to start watching this entry for future changes
484 childRev.SetName(typeName + "/" + keyValue)
npujar467fe752020-01-16 20:17:45 +0530485 childRev.SetupWatch(ctx, childRev.GetName())
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400486 childRev.(*PersistedRevision).setVersion(version)
487
488 // Add entry to cache
npujar9a30c702019-11-14 17:06:39 +0530489 getRevCache().Set(childRev.GetName(), childRev)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400490
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400491 pr.GetBranch().LatestLock.Unlock()
Stephane Barbarie802aca42019-05-21 12:19:28 -0400492 // END child lock
493
494 //
495 // Add the child to the parent revision
496 //
497
498 // BEGIN parent lock
499 parent.GetBranch(NONE).LatestLock.Lock()
500 children = append(children, childRev)
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400501 updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
npujar9a30c702019-11-14 17:06:39 +0530502 updatedRev.getNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400503 parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
504 parent.GetBranch(NONE).LatestLock.Unlock()
505 // END parent lock
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400506
507 // Child entry is valid and can be included in the response object
508 if childRev != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400509 log.Debugw("adding-revision-to-response", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400510 "key": childRev.GetHash(),
511 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400512 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400513 })
514 response = childRev
515 }
516 }
517
518 return response
519}
520
521// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
522// by adding missing entries, updating changed entries and ignoring unchanged ones
Thomas Lee Se5a44012019-11-07 20:32:24 +0530523func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) ([]Revision, error) {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400524 pr.mutex.Lock()
525 defer pr.mutex.Unlock()
526
527 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
528
529 var response []Revision
Thomas Lee Se5a44012019-11-07 20:32:24 +0530530 var err error
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400531
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400532 for strings.HasPrefix(path, "/") {
533 path = path[1:]
534 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400535
536 if pr.kvStore != nil && path != "" {
npujar9a30c702019-11-14 17:06:39 +0530537 if len(blobs) == 0 {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400538 log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530539
npujar467fe752020-01-16 20:17:45 +0530540 if blobs, err = pr.kvStore.List(ctx, path); err != nil {
Thomas Lee Se5a44012019-11-07 20:32:24 +0530541 log.Errorw("failed-to-retrieve-data-from-kvstore", log.Fields{"error": err})
542 return nil, err
543 }
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400544 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400545
546 partition := strings.SplitN(path, "/", 2)
547 name := partition[0]
548
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400549 var nodeType interface{}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400550 if len(partition) < 2 {
551 path = ""
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400552 nodeType = pr.GetBranch().Node.Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400553 } else {
554 path = partition[1]
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400555 nodeType = pr.GetBranch().Node.GetRoot().Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400556 }
557
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400558 field := ChildrenFields(nodeType)[name]
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400559
560 if field != nil && field.IsContainer {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400561 log.Debugw("parsing-data-blobs", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400562 "path": path,
563 "name": name,
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400564 "size": len(blobs),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400565 })
566
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400567 for _, blob := range blobs {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400568 output := blob.Value.([]byte)
569
570 data := reflect.New(field.ClassType.Elem())
571
572 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400573 log.Errorw("failed-to-unmarshal", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400574 "path": path,
575 "txid": txid,
576 "error": err,
577 })
578 } else if path == "" {
579 if field.Key != "" {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400580 log.Debugw("no-path-with-container-key", log.Fields{
581 "path": path,
582 "txid": txid,
583 "data": data.Interface(),
584 })
585
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400586 // Retrieve the key identifier value from the data structure
587 // based on the field's key attribute
588 _, key := GetAttributeValue(data.Interface(), field.Key, 0)
589
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400590 if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, key.String(), txid, blob.Version); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400591 response = append(response, entry)
592 }
Stephane Barbarie802aca42019-05-21 12:19:28 -0400593 } else {
594 log.Debugw("path-with-no-container-key", log.Fields{
595 "path": path,
596 "txid": txid,
597 "data": data.Interface(),
598 })
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400599 }
600
601 } else if field.Key != "" {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400602 log.Debugw("path-with-container-key", log.Fields{
603 "path": path,
604 "txid": txid,
605 "data": data.Interface(),
606 })
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400607 // The request is for a specific entry/id
608 partition := strings.SplitN(path, "/", 2)
609 key := partition[0]
610 if len(partition) < 2 {
611 path = ""
612 } else {
613 path = partition[1]
614 }
615 keyValue := field.KeyFromStr(key)
616
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400617 if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, keyValue.(string), txid, blob.Version); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400618 response = append(response, entry)
619 }
620 }
621 }
622
Stephane Barbarie802aca42019-05-21 12:19:28 -0400623 log.Debugw("no-more-data-blobs", log.Fields{"path": path, "name": name})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400624 } else {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400625 log.Debugw("cannot-process-field", log.Fields{
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400626 "type": pr.GetBranch().Node.Type,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400627 "name": name,
628 })
629 }
630 }
631
Thomas Lee Se5a44012019-11-07 20:32:24 +0530632 return response, nil
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400633}