blob: ec7e4ca520f2055890793355213ac7902851a556 [file] [log] [blame]
Richard Jankowski215a3e22018-10-04 13:56:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Two voltha cores receive the same request; each tries to acquire ownership of the request
19 * by writing its identifier (e.g. container name or pod name) to the transaction key named
Richard Jankowskie4d77662018-10-17 13:53:21 -040020 * after the serial number of the request. The core that loses the race for acquisition
21 * monitors the progress of the core actually serving the request by watching for changes
22 * in the value of the transaction key. Once the request is complete, the
23 * serving core closes the transaction by invoking the KVTransaction's Close method, which
Richard Jankowski215a3e22018-10-04 13:56:11 -040024 * replaces the value of the transaction (i.e. serial number) key with the string
25 * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
26 * and then deletes the transaction key.
27 *
28 * To ensure the key is removed despite possible standby core failures, a KV operation is
29 * scheduled in the background on both cores to delete the key well after the transaction is
30 * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
31 * long enough, on the order of many seconds, to ensure the standby sees the transaction
32 * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
33 * the KV store.
34 */
35package core
36
37import (
khenaidoo89b0e942018-10-21 21:11:33 -040038 log "github.com/opencord/voltha-go/common/log"
39 "github.com/opencord/voltha-go/db/kvstore"
40 "time"
Richard Jankowski215a3e22018-10-04 13:56:11 -040041)
42
43// Transaction acquisition results
44const (
khenaidoo89b0e942018-10-21 21:11:33 -040045 UNKNOWN = iota
46 SEIZED_BY_SELF
47 COMPLETED_BY_OTHER
48 ABANDONED_BY_OTHER
49 STOPPED_WAITING_FOR_OTHER
Richard Jankowski215a3e22018-10-04 13:56:11 -040050)
51
52const (
khenaidoo89b0e942018-10-21 21:11:33 -040053 TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
Richard Jankowski215a3e22018-10-04 13:56:11 -040054)
55
56type TransactionContext struct {
khenaidoo89b0e942018-10-21 21:11:33 -040057 kvClient kvstore.Client
58 kvOperationTimeout int
Richard Jankowski199fd862019-03-18 14:49:51 -040059 monitorLoopTime int64
khenaidoo89b0e942018-10-21 21:11:33 -040060 owner string
61 timeToDeleteCompletedKeys int
62 txnPrefix string
Richard Jankowski215a3e22018-10-04 13:56:11 -040063}
khenaidoo89b0e942018-10-21 21:11:33 -040064
Richard Jankowski215a3e22018-10-04 13:56:11 -040065var ctx *TransactionContext
66
khenaidoo89b0e942018-10-21 21:11:33 -040067var txnState = []string{
68 "UNKNOWN",
69 "SEIZED-BY-SELF",
70 "COMPLETED-BY-OTHER",
71 "ABANDONED-BY-OTHER",
72 "STOPPED-WAITING-FOR-OTHER"}
Richard Jankowski215a3e22018-10-04 13:56:11 -040073
74func init() {
Richard Jankowski199fd862019-03-18 14:49:51 -040075 log.AddPackage(log.JSON, log.DebugLevel, nil)
Richard Jankowski215a3e22018-10-04 13:56:11 -040076}
77
78func NewTransactionContext(
khenaidoo89b0e942018-10-21 21:11:33 -040079 owner string,
80 txnPrefix string,
81 kvClient kvstore.Client,
82 kvOpTimeout int,
Richard Jankowski199fd862019-03-18 14:49:51 -040083 keyDeleteTime int,
84 monLoopTime int64) *TransactionContext {
Richard Jankowski215a3e22018-10-04 13:56:11 -040085
khenaidoo89b0e942018-10-21 21:11:33 -040086 return &TransactionContext{
87 owner: owner,
88 txnPrefix: txnPrefix,
89 kvClient: kvClient,
90 kvOperationTimeout: kvOpTimeout,
Richard Jankowski199fd862019-03-18 14:49:51 -040091 monitorLoopTime: monLoopTime,
khenaidoo89b0e942018-10-21 21:11:33 -040092 timeToDeleteCompletedKeys: keyDeleteTime}
Richard Jankowski215a3e22018-10-04 13:56:11 -040093}
94
95/*
96 * Before instantiating a KVTransaction, a TransactionContext must be created.
97 * The parameters stored in the context govern the behaviour of all KVTransaction
98 * instances.
99 *
100 * :param owner: The owner (i.e. voltha core name) of a transaction
101 * :param txnPrefix: The key prefix under which all transaction IDs, or serial numbers,
102 * will be created (e.g. "service/voltha/transactions")
103 * :param kvClient: The client API used for all interactions with the KV store. Currently
104 * only the etcd client is supported.
Richard Jankowski199fd862019-03-18 14:49:51 -0400105 * :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
106 * used by this package
107 * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
108 * a TRANSACTION_COMPLETE key
109 * :param monLoopTime: The time in milliseconds that the monitor sleeps between
110 * checks for the existence of the transaction key
Richard Jankowski215a3e22018-10-04 13:56:11 -0400111 */
112func SetTransactionContext(owner string,
khenaidoo89b0e942018-10-21 21:11:33 -0400113 txnPrefix string,
114 kvClient kvstore.Client,
115 kvOpTimeout int,
Richard Jankowski199fd862019-03-18 14:49:51 -0400116 keyDeleteTime int,
117 monLoopTime int64) error {
Richard Jankowski215a3e22018-10-04 13:56:11 -0400118
Richard Jankowski199fd862019-03-18 14:49:51 -0400119 ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
khenaidoo89b0e942018-10-21 21:11:33 -0400120 return nil
Richard Jankowski215a3e22018-10-04 13:56:11 -0400121}
122
Richard Jankowskie4d77662018-10-17 13:53:21 -0400123type KVTransaction struct {
khenaidoo89b0e942018-10-21 21:11:33 -0400124 ch chan int
125 txnId string
126 txnKey string
Richard Jankowski215a3e22018-10-04 13:56:11 -0400127}
128
129/*
130 * A KVTransaction constructor
131 *
132 * :param txnId: The serial number of a voltha request.
Richard Jankowskie4d77662018-10-17 13:53:21 -0400133 * :return: A KVTransaction instance
Richard Jankowski215a3e22018-10-04 13:56:11 -0400134 */
Richard Jankowskie4d77662018-10-17 13:53:21 -0400135func NewKVTransaction(txnId string) *KVTransaction {
khenaidoo89b0e942018-10-21 21:11:33 -0400136 return &KVTransaction{
137 txnId: txnId,
138 txnKey: ctx.txnPrefix + txnId}
Richard Jankowski215a3e22018-10-04 13:56:11 -0400139}
140
141/*
142 * This function returns a boolean indicating whether or not the caller should process
143 * the request. True is returned in one of two cases:
144 * (1) The current core successfully reserved the request's serial number with the KV store
145 * (2) The current core failed in its reservation attempt but observed that the serving core
146 * has abandoned processing the request
147 *
148 * :param duration: The duration of the reservation in milliseconds
149 * :return: true - reservation acquired, process the request
150 * false - reservation not acquired, request being processed by another core
151 */
Richard Jankowskie4d77662018-10-17 13:53:21 -0400152func (c *KVTransaction) Acquired(duration int64) bool {
khenaidoo89b0e942018-10-21 21:11:33 -0400153 var acquired bool
154 var currOwner string = ""
155 var res int
Richard Jankowski215a3e22018-10-04 13:56:11 -0400156
khenaidoo89b0e942018-10-21 21:11:33 -0400157 // Convert milliseconds to seconds, rounding up
158 // The reservation TTL is specified in seconds
159 durationInSecs := duration / 1000
160 if remainder := duration % 1000; remainder > 0 {
161 durationInSecs++
162 }
163 value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400164
khenaidoo89b0e942018-10-21 21:11:33 -0400165 // If the reservation failed, do we simply abort or drop into watch mode anyway?
166 // Setting value to nil leads to watch mode
167 if value != nil {
168 if currOwner, err = kvstore.ToString(value); err != nil {
169 log.Error("unexpected-owner-type")
170 value = nil
171 }
172 }
173 if err == nil && value != nil && currOwner == ctx.owner {
174 // Process the request immediately
175 res = SEIZED_BY_SELF
176 } else {
177 // Another core instance has reserved the request
178 // Watch for reservation expiry or successful request completion
khenaidoo89b0e942018-10-21 21:11:33 -0400179 log.Debugw("watch-other-server",
180 log.Fields{"owner": currOwner, "timeout": duration})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400181
Richard Jankowski199fd862019-03-18 14:49:51 -0400182 res = c.Watch(duration)
khenaidoo89b0e942018-10-21 21:11:33 -0400183 }
184 // Clean-up: delete the transaction key after a long delay
185 go c.deleteTransactionKey()
Richard Jankowski215a3e22018-10-04 13:56:11 -0400186
khenaidoo89b0e942018-10-21 21:11:33 -0400187 log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
188 switch res {
189 case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
190 acquired = true
191 default:
192 acquired = false
193 }
Richard Jankowski00a04662019-02-05 12:18:53 -0500194 // Ensure the request watcher does not reply before the request server
195 if !acquired {
196 time.Sleep(1 * time.Second)
197 }
khenaidoo89b0e942018-10-21 21:11:33 -0400198 return acquired
Richard Jankowski215a3e22018-10-04 13:56:11 -0400199}
200
Richard Jankowski199fd862019-03-18 14:49:51 -0400201/*
202 * This function monitors the progress of a request that's been reserved by another
203 * Voltha core.
204 *
205 * :param duration: The duration of the reservation in milliseconds
206 * :return: true - reservation abandoned by the other core, process the request
207 * false - reservation not owned, request being processed by another core
208 */
209func (c *KVTransaction) Monitor(duration int64) bool {
210 var acquired bool
211 var res int
212 var timeElapsed int64
213
214 // Convert milliseconds to seconds, rounding up
215 // The reservation TTL is specified in seconds
216 durationInSecs := duration / 1000
217 if remainder := duration % 1000; remainder > 0 {
218 durationInSecs++
219 }
220 // Check if transaction key has been set
221 keyExists := false
222 for timeElapsed = 0; timeElapsed < duration; timeElapsed = timeElapsed + ctx.monitorLoopTime {
223 kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
224 if err == nil && kvp == nil {
225 // This core has received the request before the core that actually
226 // owns the device. The owning core has yet to seize the transaction.
227 time.Sleep(time.Duration(ctx.monitorLoopTime) * time.Millisecond)
228 } else {
229 keyExists = true
230 log.Debug("waited-for-other-to-reserve-transaction")
231 break
232 }
233 }
234 if keyExists {
235 // Watch for reservation expiry or successful request completion
236 log.Debugw("watch-other-server", log.Fields{"timeout": duration})
237 res = c.Watch(duration)
238 } else {
239 res = STOPPED_WAITING_FOR_OTHER
240 }
241 // Clean-up: delete the transaction key after a long delay
242 go c.deleteTransactionKey()
243
244 log.Debugw("own-transaction", log.Fields{"result": txnState[res]})
245 switch res {
246 case ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
247 acquired = true
248 default:
249 acquired = false
250 }
251 // Ensure the request watcher does not reply before the request server
252 if !acquired {
253 time.Sleep(1 * time.Second)
254 }
255 return acquired
256}
257
258// duration in milliseconds
259func (c *KVTransaction) Watch(duration int64) int {
260 var res int
261
262 events := ctx.kvClient.Watch(c.txnKey)
263 select {
264 // Add a timeout here in case we miss an event from the KV
265 case <-time.After(time.Duration(duration) * time.Millisecond):
266 // In case of missing events, let's check the transaction key
267 kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
268 if err == nil && kvp == nil {
269 log.Debug("missed-deleted-event")
270 res = ABANDONED_BY_OTHER
271 } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
272 log.Debugw("missed-put-event",
273 log.Fields{"key": c.txnKey, "value": val})
274 res = COMPLETED_BY_OTHER
275 } else {
276 res = STOPPED_WAITING_FOR_OTHER
277 }
278
279 case event := <-events:
280 log.Debugw("received-event", log.Fields{"type": event.EventType})
281 if event.EventType == kvstore.DELETE {
282 // The other core failed to process the request
283 res = ABANDONED_BY_OTHER
284 } else if event.EventType == kvstore.PUT {
285 key, e1 := kvstore.ToString(event.Key)
286 val, e2 := kvstore.ToString(event.Value)
287 if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
288 res = COMPLETED_BY_OTHER
289 // Successful request completion has been detected
290 // Remove the transaction key
291 c.Delete()
292 }
293 }
294 }
295 return res
296}
297
Richard Jankowskie4d77662018-10-17 13:53:21 -0400298func (c *KVTransaction) deleteTransactionKey() {
khenaidoo89b0e942018-10-21 21:11:33 -0400299 log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
300 time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
301 log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
Stephane Barbarie260a5632019-02-26 16:12:49 -0500302 ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400303}
304
Richard Jankowskie4d77662018-10-17 13:53:21 -0400305func (c *KVTransaction) Close() error {
khenaidoo89b0e942018-10-21 21:11:33 -0400306 log.Debugw("close", log.Fields{"key": c.txnKey})
Stephane Barbarie260a5632019-02-26 16:12:49 -0500307 return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400308}
309
Richard Jankowskie4d77662018-10-17 13:53:21 -0400310func (c *KVTransaction) Delete() error {
khenaidoo89b0e942018-10-21 21:11:33 -0400311 log.Debugw("delete", log.Fields{"key": c.txnKey})
Stephane Barbarie260a5632019-02-26 16:12:49 -0500312 err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
khenaidoo89b0e942018-10-21 21:11:33 -0400313 return err
Richard Jankowski215a3e22018-10-04 13:56:11 -0400314}