blob: dd24e7ef9bd6571eb313c9b6101090ddcc3e192b [file] [log] [blame]
khenaidoobf6e7bb2018-08-14 22:27:29 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Stephane Barbariedc5022d2018-11-19 15:21:44 -050016
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040017package model
18
19import (
20 "bytes"
21 "compress/gzip"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040022 "github.com/golang/protobuf/proto"
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040023 "github.com/opencord/voltha-go/common/log"
Stephane Barbariee0a4c792019-01-16 11:26:29 -050024 "github.com/opencord/voltha-go/db/kvstore"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040025 "reflect"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050026 "runtime/debug"
Stephane Barbarie1ab43272018-12-08 21:42:13 -050027 "strings"
Stephane Barbariedc5022d2018-11-19 15:21:44 -050028 "sync"
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040029)
30
Stephane Barbariedc5022d2018-11-19 15:21:44 -050031// PersistedRevision holds information of revision meant to be saved in a persistent storage
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040032type PersistedRevision struct {
Stephane Barbarieec0919b2018-09-05 14:14:29 -040033 Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040034 Compress bool
Stephane Barbariee0a4c792019-01-16 11:26:29 -050035
36 events chan *kvstore.Event `json:"-"`
37 kvStore *Backend `json:"-"`
38 mutex sync.RWMutex `json:"-"`
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040039}
40
Stephane Barbariedc5022d2018-11-19 15:21:44 -050041// NewPersistedRevision creates a new instance of a PersistentRevision structure
Stephane Barbarieec0919b2018-09-05 14:14:29 -040042func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040043 pr := &PersistedRevision{}
Stephane Barbariedc5022d2018-11-19 15:21:44 -050044 pr.kvStore = branch.Node.GetRoot().KvStore
45 pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040046 return pr
47}
48
Stephane Barbariedc5022d2018-11-19 15:21:44 -050049// Finalize is responsible of saving the revision in the persistent storage
Stephane Barbarie1ab43272018-12-08 21:42:13 -050050func (pr *PersistedRevision) Finalize(skipOnExist bool) {
51 pr.store(skipOnExist)
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040052}
53
54type revData struct {
55 Children map[string][]string
56 Config string
57}
58
Stephane Barbarie1ab43272018-12-08 21:42:13 -050059func (pr *PersistedRevision) store(skipOnExist bool) {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -040060 if pr.GetBranch().Txid != "" {
61 return
62 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040063
Stephane Barbarie1ab43272018-12-08 21:42:13 -050064 if pair, _ := pr.kvStore.Get(pr.GetHash()); pair != nil && skipOnExist {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050065 log.Debugw("revision-config-already-exists", log.Fields{"hash": pr.GetHash()})
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040066 return
67 }
Stephane Barbarie1ab43272018-12-08 21:42:13 -050068
Stephane Barbarieec0919b2018-09-05 14:14:29 -040069 if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040070 // TODO report error
71 } else {
72 if pr.Compress {
73 var b bytes.Buffer
74 w := gzip.NewWriter(&b)
75 w.Write(blob)
76 w.Close()
77 blob = b.Bytes()
78 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -050079
Stephane Barbarie1ab43272018-12-08 21:42:13 -050080 if err := pr.kvStore.Put(pr.GetHash(), blob); err != nil {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050081 log.Warnw("problem-storing-revision-config",
82 log.Fields{"error": err, "hash": pr.GetHash(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -050083 } else {
Stephane Barbariee0a4c792019-01-16 11:26:29 -050084 log.Debugw("storing-revision-config",
85 log.Fields{"hash": pr.GetHash(), "data": pr.GetConfig().Data})
Stephane Barbariedc5022d2018-11-19 15:21:44 -050086 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -040087 }
88}
89
Stephane Barbariee0a4c792019-01-16 11:26:29 -050090func (pr *PersistedRevision) SetupWatch(key string) {
91 if pr.events == nil {
92 pr.events = make(chan *kvstore.Event)
93
94 log.Debugw("setting-watch", log.Fields{"key": key})
95
96 pr.events = pr.kvStore.CreateWatch(key)
97
98 // Start watching
99 go pr.startWatching()
100 }
101}
102
103func (pr *PersistedRevision) startWatching() {
104 log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
105
106StopWatchLoop:
107 for {
108 select {
109 case event, ok := <-pr.events:
110 if !ok {
111 log.Errorw("event-channel-failure: stopping watch loop", log.Fields{"key": pr.GetHash()})
112 break StopWatchLoop
113 }
114
115 log.Debugw("received-event", log.Fields{"type": event.EventType})
116
117 switch event.EventType {
118 case kvstore.DELETE:
119 log.Debugw("delete-from-memory", log.Fields{"key": pr.GetHash()})
120 pr.Revision.Drop("", true)
121 break StopWatchLoop
122
123 case kvstore.PUT:
124 log.Debugw("update-in-memory", log.Fields{"key": pr.GetHash()})
125
126 if dataPair, err := pr.kvStore.Get(pr.GetHash()); err != nil || dataPair == nil {
127 log.Errorw("update-in-memory--key-retrieval-failed", log.Fields{"key": pr.GetHash(), "error": err})
128 } else {
129 pr.UpdateData(dataPair.Value, pr.GetBranch())
130 }
131
132 default:
133 log.Debugw("unhandled-event", log.Fields{"key": pr.GetHash(), "type": event.EventType})
134 }
135 }
136 }
137
138 log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash()})
139}
140
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500141func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500142 log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
143
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500144 var response []Revision
145 var rev Revision
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400146
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500147 rev = pr
148
149 if pr.kvStore != nil {
150 blobMap, _ := pr.kvStore.List(path)
151
152 partition := strings.SplitN(path, "/", 2)
153 name := partition[0]
154
155 if len(partition) < 2 {
156 path = ""
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400157 } else {
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500158 path = partition[1]
159 }
160
161 field := ChildrenFields(rev.GetBranch().Node.Type)[name]
162
163 if field.IsContainer {
164 for _, blob := range blobMap {
165 output := blob.Value.([]byte)
166
167 data := reflect.New(field.ClassType.Elem())
168
169 if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
170 // TODO report error
171 } else {
172
173 var children []Revision
174
175 if path == "" {
176 if field.Key != "" {
177 // e.g. /logical_devices/abcde --> path="" name=logical_devices key=abcde
178 if field.Key != "" {
179 children = make([]Revision, len(rev.GetChildren()[name]))
180 copy(children, rev.GetChildren()[name])
181
182 _, key := GetAttributeValue(data.Interface(), field.Key, 0)
183
184 childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
185 childRev.SetHash(name + "/" + key.String())
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500186
187 // Create watch for <component>/<key>
188 pr.SetupWatch(childRev.GetHash())
189
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500190 children = append(children, childRev)
191 rev = rev.UpdateChildren(name, children, rev.GetBranch())
192
193 rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
194
195 response = append(response, childRev)
196 continue
197 }
198 }
199 } else if field.Key != "" {
200 // e.g. /logical_devices/abcde/flows/vwxyz --> path=abcde/flows/vwxyz
201
202 partition := strings.SplitN(path, "/", 2)
203 key := partition[0]
204 if len(partition) < 2 {
205 path = ""
206 } else {
207 path = partition[1]
208 }
209 keyValue := field.KeyFromStr(key)
210
211 children = make([]Revision, len(rev.GetChildren()[name]))
212 copy(children, rev.GetChildren()[name])
213
214 idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue)
215
216 newChildRev := childRev.LoadFromPersistence(path, txid)
217
218 children[idx] = newChildRev[0]
219
220 rev := rev.UpdateChildren(name, rev.GetChildren()[name], rev.GetBranch())
221 rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
222
223 response = append(response, newChildRev[0])
224 continue
225 }
226 }
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400227 }
228 }
229 }
Stephane Barbarie1ab43272018-12-08 21:42:13 -0500230 return response
Stephane Barbarie4a2564d2018-07-26 11:02:58 -0400231}
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400232
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500233// UpdateData modifies the information in the data model and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400234func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500235 log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
236
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400237 newNPR := pr.Revision.UpdateData(data, branch)
238
239 newPR := &PersistedRevision{
240 Revision: newNPR,
241 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400242 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400243 }
244
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400245 return newPR
246}
247
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500248// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400249func (pr *PersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500250 log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
251
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400252 newNPR := pr.Revision.UpdateChildren(name, children, branch)
253
254 newPR := &PersistedRevision{
255 Revision: newNPR,
256 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400257 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400258 }
259
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400260 return newPR
261}
262
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500263// UpdateAllChildren modifies the children for all components of a revision and saves it in the peristent storage
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400264func (pr *PersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500265 log.Debugw("updating-all-persisted-children", log.Fields{"hash": pr.GetHash()})
266
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400267 newNPR := pr.Revision.UpdateAllChildren(children, branch)
268
269 newPR := &PersistedRevision{
270 Revision: newNPR,
271 Compress: pr.Compress,
khenaidoob9203542018-09-17 22:56:37 -0400272 kvStore: pr.kvStore,
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400273 }
274
Stephane Barbarieec0919b2018-09-05 14:14:29 -0400275 return newPR
276}
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400277
278// Drop takes care of eliminating a revision hash that is no longer needed
279// and its associated config when required
280func (pr *PersistedRevision) Drop(txid string, includeConfig bool) {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500281 log.Debugw("dropping-revision",
282 log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash, "stack": string(debug.Stack())})
283
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500284 pr.mutex.Lock()
285 defer pr.mutex.Unlock()
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400286 if pr.kvStore != nil && txid == "" {
287 if includeConfig {
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400288 if err := pr.kvStore.Delete(pr.GetConfig().Hash); err != nil {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500289 log.Errorw("failed-to-remove-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "error": err.Error()})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400290 }
291 }
292
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400293 if err := pr.kvStore.Delete(pr.GetHash()); err != nil {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500294 log.Errorw("failed-to-remove-revision", log.Fields{"hash": pr.GetHash(), "error": err.Error()})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400295 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500296
297 pr.kvStore.DeleteWatch(pr.GetConfig().Hash, pr.events)
298
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400299 } else {
300 if includeConfig {
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500301 log.Debugw("attempted-to-remove-transacted-revision-config", log.Fields{"hash": pr.GetConfig().Hash, "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400302 }
Stephane Barbariee0a4c792019-01-16 11:26:29 -0500303 log.Debugw("attempted-to-remove-transacted-revision", log.Fields{"hash": pr.GetHash(), "txid": txid})
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400304 }
Stephane Barbariedc5022d2018-11-19 15:21:44 -0500305
306 pr.Revision.Drop(txid, includeConfig)
Stephane Barbarie88fbe7f2018-09-25 12:25:23 -0400307}