blob: 53d93b7b4ba93b3317f0f48a00b3660fef640444 [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
Stephane Barbarieef6650d2019-07-18 12:15:09 -040022 "context"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040023 "github.com/golang/protobuf/proto"
Stephane Barbarieef6650d2019-07-18 12:15:09 -040024 "github.com/google/uuid"
Scott Baker807addd2019-10-24 15:16:21 -070025 "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
26 "github.com/opencord/voltha-lib-go/v2/pkg/log"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040027 "reflect"
Stephane Barbarie1ab43272018-12-08 21:42:13 -050028 "strings"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050029 "sync"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040030)
31
Stephane Barbariedc5022d2018-11-19 15:21:44 -050032// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040033type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040034 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040035 Compress bool
Stephane Barbariee0a4c792019-01-16 11:26:29 -050036
Stephane Barbarieef6650d2019-07-18 12:15:09 -040037 events chan *kvstore.Event
38 kvStore *Backend
39 mutex sync.RWMutex
40 versionMutex sync.RWMutex
41 Version int64
42 isStored bool
43 isWatched bool
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040044}
45
Stephane Barbarie40fd3b22019-04-23 21:50:47 -040046type watchCache struct {
47 Cache sync.Map
48}
49
50var watchCacheInstance *watchCache
51var watchCacheOne sync.Once
52
53func Watches() *watchCache {
54 watchCacheOne.Do(func() {
55 watchCacheInstance = &watchCache{Cache: sync.Map{}}
56 })
57 return watchCacheInstance
58}
59
Stephane Barbariedc5022d2018-11-19 15:21:44 -050060// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040061func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040062 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050063 pr.kvStore = branch.Node.GetRoot().KvStore
Stephane Barbarieef6650d2019-07-18 12:15:09 -040064 pr.Version = 1
Stephane Barbariedc5022d2018-11-19 15:21:44 -050065 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040066 return pr
67}
68
Stephane Barbarieef6650d2019-07-18 12:15:09 -040069func (pr *PersistedRevision) getVersion() int64 {
70 pr.versionMutex.RLock()
71 defer pr.versionMutex.RUnlock()
72 return pr.Version
73}
74
75func (pr *PersistedRevision) setVersion(version int64) {
76 pr.versionMutex.Lock()
77 defer pr.versionMutex.Unlock()
78 pr.Version = version
79}
80
Stephane Barbariedc5022d2018-11-19 15:21:44 -050081// Finalize is responsible of saving the revision in the persistent storage
Stephane Barbarie1ab43272018-12-08 21:42:13 -050082func (pr *PersistedRevision) Finalize(skipOnExist bool) {
83 pr.store(skipOnExist)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040084}
85
Stephane Barbarie1ab43272018-12-08 21:42:13 -050086func (pr *PersistedRevision) store(skipOnExist bool) {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040087 if pr.GetBranch().Txid != "" {
88 return
89 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040090
Stephane Barbarie7512fc82019-05-07 12:25:46 -040091 log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
Stephane Barbarie1ab43272018-12-08 21:42:13 -050092
Stephane Barbarieef6650d2019-07-18 12:15:09 -040093 // clone the revision data to avoid any race conditions with processes
94 // accessing the same data
95 cloned := proto.Clone(pr.GetConfig().Data.(proto.Message))
96
97 if blob, err := proto.Marshal(cloned); err != nil {
98 log.Errorw("problem-to-marshal", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040099 } else {
100 if pr.Compress {
101 var b bytes.Buffer
102 w := gzip.NewWriter(&b)
103 w.Write(blob)
104 w.Close()
105 blob = b.Bytes()
106 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500107
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400108 GetRevCache().Set(pr.GetName(), pr)
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400109 if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400110 log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500111 } else {
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400112 log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500113 pr.isStored = true
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500114 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400115 }
116}
117
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500118func (pr *PersistedRevision) SetupWatch(key string) {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400119 if key == "" {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400120 log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400121 return
122 }
123
124 if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
125 return
126 }
127
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500128 if pr.events == nil {
129 pr.events = make(chan *kvstore.Event)
130
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400131 log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500132
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400133 pr.SetName(key)
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500134 pr.events = pr.kvStore.CreateWatch(key)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400135 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500136
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400137 if !pr.isWatched {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500138 pr.isWatched = true
139
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400140 log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash()})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400141
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500142 // Start watching
143 go pr.startWatching()
144 }
145}
146
147func (pr *PersistedRevision) startWatching() {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400148 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500149
150StopWatchLoop:
151 for {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400152 latestRev := pr.GetBranch().GetLatest()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400153
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500154 select {
155 case event, ok := <-pr.events:
156 if !ok {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400157 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500158 break StopWatchLoop
159 }
Stephane Barbariec92d1072019-06-07 16:21:49 -0400160 log.Debugw("received-event", log.Fields{"type": event.EventType, "watch": latestRev.GetName()})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500161
162 switch event.EventType {
163 case kvstore.DELETE:
Stephane Barbariec92d1072019-06-07 16:21:49 -0400164 log.Debugw("delete-from-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
David Bainbridgebdae73c2019-10-23 17:05:41 +0000165
166 // Remove reference from cache
167 GetRevCache().Delete(latestRev.GetName())
168
169 // Remove reference from parent
170 parent := pr.GetBranch().Node.GetRoot()
171 parent.GetBranch(NONE).Latest.ChildDropByName(latestRev.GetName())
172
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500173 break StopWatchLoop
174
175 case kvstore.PUT:
Stephane Barbariec92d1072019-06-07 16:21:49 -0400176 log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400177 if latestRev.getVersion() >= event.Version {
178 log.Debugw("skipping-matching-or-older-revision", log.Fields{
179 "watch": latestRev.GetName(),
180 "watch-version": event.Version,
181 "latest-version": latestRev.getVersion(),
182 })
183 continue
184 } else {
185 log.Debugw("watch-revision-is-newer", log.Fields{
186 "watch": latestRev.GetName(),
187 "watch-version": event.Version,
188 "latest-version": latestRev.getVersion(),
189 })
190 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500191
Stephane Barbariec92d1072019-06-07 16:21:49 -0400192 data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500193
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400194 if err := proto.Unmarshal(event.Value.([]byte), data.Interface().(proto.Message)); err != nil {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400195 log.Errorw("failed-to-unmarshal-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "error": err})
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400196 } else {
Stephane Barbariec92d1072019-06-07 16:21:49 -0400197 log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
Stephane Barbarie802aca42019-05-21 12:19:28 -0400198
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400199 var pathLock string
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400200 var blobs map[string]*kvstore.KVPair
201
202 // The watch reported new persistence data.
203 // Construct an object that will be used to update the memory
204 blobs = make(map[string]*kvstore.KVPair)
205 key, _ := kvstore.ToString(event.Key)
206 blobs[key] = &kvstore.KVPair{
207 Key: key,
208 Value: event.Value,
209 Session: "",
210 Lease: 0,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400211 Version: event.Version,
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400212 }
213
Stephane Barbariec92d1072019-06-07 16:21:49 -0400214 if latestRev.GetNode().GetProxy() != nil {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400215 //
216 // If a proxy exists for this revision, use it to lock access to the path
217 // and prevent simultaneous updates to the object in memory
218 //
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400219
220 //If the proxy already has a request in progress, then there is no need to process the watch
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400221 if latestRev.GetNode().GetProxy().GetOperation() != PROXY_NONE {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400222 log.Debugw("operation-in-progress", log.Fields{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400223 "key": latestRev.GetHash(),
224 "path": latestRev.GetNode().GetProxy().getFullPath(),
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400225 "operation": latestRev.GetNode().GetProxy().operation.String(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400226 })
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400227 continue
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400228 }
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400229
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400230 pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
231
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400232 // Reserve the path to prevent others to modify while we reload from persistence
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400233 latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
234 latestRev.GetNode().GetProxy().SetOperation(PROXY_WATCH)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400235
236 // Load changes and apply to memory
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400237 latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400238
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400239 // Release path
240 latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400241
242 } else {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400243 // This block should be reached only if coming from a non-proxied request
Stephane Barbariec92d1072019-06-07 16:21:49 -0400244 log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400245
246 // Load changes and apply to memory
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400247 latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
Stephane Barbariedf5479f2019-01-29 22:13:00 -0500248 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500249 }
250
251 default:
Stephane Barbariec92d1072019-06-07 16:21:49 -0400252 log.Debugw("unhandled-event", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "type": event.EventType})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500253 }
254 }
255 }
256
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400257 Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500258
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400259 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400260}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400261
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500262// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400263func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500264 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
265
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400266 newNPR := pr.Revision.UpdateData(ctx, data, branch)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400267
268 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400269 Revision: newNPR,
270 Compress: pr.Compress,
271 kvStore: pr.kvStore,
272 events: pr.events,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400273 Version: pr.getVersion(),
Stephane Barbariec92d1072019-06-07 16:21:49 -0400274 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400275 }
276
277 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400278 newPR.isStored = false
279 pr.Drop(branch.Txid, false)
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400280 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400281 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400282 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400283 }
284
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400285 return newPR
286}
287
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500288// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400289func (pr *PersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500290 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
291
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400292 newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400293
294 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400295 Revision: newNPR,
296 Compress: pr.Compress,
297 kvStore: pr.kvStore,
298 events: pr.events,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400299 Version: pr.getVersion(),
Stephane Barbariec92d1072019-06-07 16:21:49 -0400300 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400301 }
302
303 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400304 newPR.isStored = false
305 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400306 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400307 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400308 }
309
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400310 return newPR
311}
312
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500313// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400314func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500315 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
316
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400317 newNPR := pr.Revision.UpdateAllChildren(children, branch)
318
319 newPR := &PersistedRevision{
Stephane Barbariec92d1072019-06-07 16:21:49 -0400320 Revision: newNPR,
321 Compress: pr.Compress,
322 kvStore: pr.kvStore,
323 events: pr.events,
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400324 Version: pr.getVersion(),
Stephane Barbariec92d1072019-06-07 16:21:49 -0400325 isWatched: pr.isWatched,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400326 }
327
328 if newPR.GetHash() != pr.GetHash() {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400329 newPR.isStored = false
330 pr.Drop(branch.Txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400331 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400332 newPR.isStored = true
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400333 }
334
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400335 return newPR
336}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400337
338// Drop takes care of eliminating a revision hash that is no longer needed
339// and its associated config when required
340func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400341 pr.Revision.Drop(txid, includeConfig)
342}
343
344// Drop takes care of eliminating a revision hash that is no longer needed
345// and its associated config when required
346func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400347 log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500348
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500349 pr.mutex.Lock()
350 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400351 if pr.kvStore != nil && txid == "" {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500352 if pr.isStored {
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400353 if pr.isWatched {
354 pr.kvStore.DeleteWatch(pr.GetName(), pr.events)
355 pr.isWatched = false
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500356 }
357
Stephane Barbarief7fc1782019-03-28 22:33:41 -0400358 if err := pr.kvStore.Delete(pr.GetName()); err != nil {
Stephane Barbarie3cb01222019-01-16 17:15:56 -0500359 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
360 } else {
361 pr.isStored = false
362 }
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400363 }
364
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400365 } else {
366 if includeConfig {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500367 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400368 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500369 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400370 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500371
372 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400373}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400374
375// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400376func (pr *PersistedRevision) verifyPersistedEntry(ctx context.Context, data interface{}, typeName string, keyName string,
377 keyValue string, txid string, version int64) (response Revision) {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400378 // Parent which holds the current node entry
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400379 parent := pr.GetBranch().Node.GetRoot()
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400380
Stephane Barbarie802aca42019-05-21 12:19:28 -0400381 // Get a copy of the parent's children
382 children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
383 copy(children, parent.GetBranch(NONE).Latest.GetChildren(typeName))
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400384
Stephane Barbarie802aca42019-05-21 12:19:28 -0400385 // Verify if a child with the provided key value can be found
386 if childIdx, childRev := pr.GetNode().findRevByKey(children, keyName, keyValue); childRev != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400387 // A child matching the provided key exists in memory
Stephane Barbarie802aca42019-05-21 12:19:28 -0400388 // Verify if the data differs from what was retrieved from persistence
Stephane Barbariec92d1072019-06-07 16:21:49 -0400389 // Also check if we are treating a newer revision of the data or not
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400390 if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() && childRev.getVersion() < version {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400391 log.Debugw("revision-data-is-different", log.Fields{
David Bainbridgebdae73c2019-10-23 17:05:41 +0000392 "key": childRev.GetHash(),
393 "name": childRev.GetName(),
394 "data": childRev.GetData(),
395 "in-memory-version": childRev.getVersion(),
396 "persisted-version": version,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400397 })
398
Stephane Barbarie802aca42019-05-21 12:19:28 -0400399 //
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400400 // Data has changed; replace the child entry and update the parent revision
Stephane Barbarie802aca42019-05-21 12:19:28 -0400401 //
402
403 // BEGIN Lock child -- prevent any incoming changes
404 childRev.GetBranch().LatestLock.Lock()
405
406 // Update child
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400407 updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400408
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400409 updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
410 updatedChildRev.SetupWatch(updatedChildRev.GetName())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400411 updatedChildRev.SetLastUpdate()
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400412 updatedChildRev.(*PersistedRevision).setVersion(version)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400413
Stephane Barbarie802aca42019-05-21 12:19:28 -0400414 // Update cache
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400415 GetRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
Stephane Barbariec92d1072019-06-07 16:21:49 -0400416 childRev.Drop(txid, false)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400417
Stephane Barbarie802aca42019-05-21 12:19:28 -0400418 childRev.GetBranch().LatestLock.Unlock()
419 // END lock child
420
421 // Update child entry
422 children[childIdx] = updatedChildRev
423
424 // BEGIN lock parent -- Update parent
425 parent.GetBranch(NONE).LatestLock.Lock()
426
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400427 updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
Stephane Barbarie802aca42019-05-21 12:19:28 -0400428 parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
429
430 parent.GetBranch(NONE).LatestLock.Unlock()
431 // END lock parent
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400432
433 // Drop the previous child revision
Stephane Barbarie802aca42019-05-21 12:19:28 -0400434 parent.GetBranch(NONE).Latest.ChildDrop(typeName, childRev.GetHash())
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400435
436 if updatedChildRev != nil {
437 log.Debugw("verify-persisted-entry--adding-child", log.Fields{
438 "key": updatedChildRev.GetHash(),
439 "name": updatedChildRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400440 "data": updatedChildRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400441 })
442 response = updatedChildRev
443 }
444 } else {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400445 if childRev != nil {
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400446 log.Debugw("keeping-revision-data", log.Fields{
David Bainbridgebdae73c2019-10-23 17:05:41 +0000447 "key": childRev.GetHash(),
448 "name": childRev.GetName(),
449 "data": childRev.GetData(),
450 "in-memory-version": childRev.getVersion(),
451 "persistence-version": version,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400452 })
Stephane Barbarie802aca42019-05-21 12:19:28 -0400453
454 // Update timestamp to reflect when it was last read and to reset tracked timeout
455 childRev.SetLastUpdate()
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400456 if childRev.getVersion() < version {
457 childRev.(*PersistedRevision).setVersion(version)
458 }
459 GetRevCache().Set(childRev.GetName(), childRev)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400460 response = childRev
461 }
462 }
Stephane Barbariec92d1072019-06-07 16:21:49 -0400463
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400464 } else {
465 // There is no available child with that key value.
466 // Create a new child and update the parent revision.
Stephane Barbarie802aca42019-05-21 12:19:28 -0400467 log.Debugw("no-such-revision-entry", log.Fields{
David Bainbridgebdae73c2019-10-23 17:05:41 +0000468 "key": keyValue,
469 "name": typeName,
470 "data": data,
471 "version": version,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400472 })
473
Stephane Barbarie802aca42019-05-21 12:19:28 -0400474 // BEGIN child lock
475 pr.GetBranch().LatestLock.Lock()
476
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400477 // Construct a new child node with the retrieved persistence data
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400478 childRev = pr.GetBranch().Node.MakeNode(data, txid).Latest(txid)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400479
480 // We need to start watching this entry for future changes
481 childRev.SetName(typeName + "/" + keyValue)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400482 childRev.SetupWatch(childRev.GetName())
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400483 childRev.(*PersistedRevision).setVersion(version)
484
485 // Add entry to cache
486 GetRevCache().Set(childRev.GetName(), childRev)
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400487
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400488 pr.GetBranch().LatestLock.Unlock()
Stephane Barbarie802aca42019-05-21 12:19:28 -0400489 // END child lock
490
491 //
492 // Add the child to the parent revision
493 //
494
495 // BEGIN parent lock
496 parent.GetBranch(NONE).LatestLock.Lock()
497 children = append(children, childRev)
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400498 updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
Stephane Barbarie802aca42019-05-21 12:19:28 -0400499 updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
Stephane Barbarie802aca42019-05-21 12:19:28 -0400500 parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
501 parent.GetBranch(NONE).LatestLock.Unlock()
502 // END parent lock
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400503
504 // Child entry is valid and can be included in the response object
505 if childRev != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400506 log.Debugw("adding-revision-to-response", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400507 "key": childRev.GetHash(),
508 "name": childRev.GetName(),
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400509 "data": childRev.GetData(),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400510 })
511 response = childRev
512 }
513 }
514
515 return response
516}
517
518// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
519// by adding missing entries, updating changed entries and ignoring unchanged ones
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400520func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400521 pr.mutex.Lock()
522 defer pr.mutex.Unlock()
523
524 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
525
526 var response []Revision
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400527
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400528 for strings.HasPrefix(path, "/") {
529 path = path[1:]
530 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400531
532 if pr.kvStore != nil && path != "" {
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400533 if blobs == nil || len(blobs) == 0 {
534 log.Debugw("retrieve-from-kv", log.Fields{"path": path, "txid": txid})
535 blobs, _ = pr.kvStore.List(path)
536 }
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400537
538 partition := strings.SplitN(path, "/", 2)
539 name := partition[0]
540
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400541 var nodeType interface{}
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400542 if len(partition) < 2 {
543 path = ""
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400544 nodeType = pr.GetBranch().Node.Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400545 } else {
546 path = partition[1]
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400547 nodeType = pr.GetBranch().Node.GetRoot().Type
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400548 }
549
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400550 field := ChildrenFields(nodeType)[name]
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400551
552 if field != nil && field.IsContainer {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400553 log.Debugw("parsing-data-blobs", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400554 "path": path,
555 "name": name,
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400556 "size": len(blobs),
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400557 })
558
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400559 for _, blob := range blobs {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400560 output := blob.Value.([]byte)
561
562 data := reflect.New(field.ClassType.Elem())
563
564 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400565 log.Errorw("failed-to-unmarshal", log.Fields{
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400566 "path": path,
567 "txid": txid,
568 "error": err,
569 })
570 } else if path == "" {
571 if field.Key != "" {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400572 log.Debugw("no-path-with-container-key", log.Fields{
573 "path": path,
574 "txid": txid,
575 "data": data.Interface(),
576 })
577
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400578 // Retrieve the key identifier value from the data structure
579 // based on the field's key attribute
580 _, key := GetAttributeValue(data.Interface(), field.Key, 0)
581
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400582 if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, key.String(), txid, blob.Version); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400583 response = append(response, entry)
584 }
Stephane Barbarie802aca42019-05-21 12:19:28 -0400585 } else {
586 log.Debugw("path-with-no-container-key", log.Fields{
587 "path": path,
588 "txid": txid,
589 "data": data.Interface(),
590 })
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400591 }
592
593 } else if field.Key != "" {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400594 log.Debugw("path-with-container-key", log.Fields{
595 "path": path,
596 "txid": txid,
597 "data": data.Interface(),
598 })
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400599 // The request is for a specific entry/id
600 partition := strings.SplitN(path, "/", 2)
601 key := partition[0]
602 if len(partition) < 2 {
603 path = ""
604 } else {
605 path = partition[1]
606 }
607 keyValue := field.KeyFromStr(key)
608
Stephane Barbarieef6650d2019-07-18 12:15:09 -0400609 if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, keyValue.(string), txid, blob.Version); entry != nil {
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400610 response = append(response, entry)
611 }
612 }
613 }
614
Stephane Barbarie802aca42019-05-21 12:19:28 -0400615 log.Debugw("no-more-data-blobs", log.Fields{"path": path, "name": name})
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400616 } else {
Stephane Barbarie802aca42019-05-21 12:19:28 -0400617 log.Debugw("cannot-process-field", log.Fields{
Stephane Barbarie7512fc82019-05-07 12:25:46 -0400618 "type": pr.GetBranch().Node.Type,
Stephane Barbarie40fd3b22019-04-23 21:50:47 -0400619 "name": name,
620 })
621 }
622 }
623
624 return response
Kent Hagerman0ab4cb22019-04-24 13:13:35 -0400625}