blob: b2d01a4686d76e67450d80e204c04fd8a58bdbb9 [file] [log] [blame]
Richard Jankowski215a3e22018-10-04 13:56:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Two voltha cores receive the same request; each tries to acquire ownership of the request
19 * by writing its identifier (e.g. container name or pod name) to the transaction key named
Richard Jankowskie4d77662018-10-17 13:53:21 -040020 * after the serial number of the request. The core that loses the race for acquisition
21 * monitors the progress of the core actually serving the request by watching for changes
22 * in the value of the transaction key. Once the request is complete, the
23 * serving core closes the transaction by invoking the KVTransaction's Close method, which
Richard Jankowski215a3e22018-10-04 13:56:11 -040024 * replaces the value of the transaction (i.e. serial number) key with the string
25 * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
26 * and then deletes the transaction key.
27 *
28 * To ensure the key is removed despite possible standby core failures, a KV operation is
29 * scheduled in the background on both cores to delete the key well after the transaction is
30 * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
31 * long enough, on the order of many seconds, to ensure the standby sees the transaction
32 * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
33 * the KV store.
34 */
35package core
36
37import (
khenaidoo2c6a0992019-04-29 13:46:56 -040038 "github.com/opencord/voltha-go/common/log"
khenaidoo89b0e942018-10-21 21:11:33 -040039 "github.com/opencord/voltha-go/db/kvstore"
Kent Hagerman46dcd9d2019-09-18 16:42:59 -040040 "google.golang.org/grpc/codes"
41 "google.golang.org/grpc/status"
khenaidoo89b0e942018-10-21 21:11:33 -040042 "time"
Richard Jankowski215a3e22018-10-04 13:56:11 -040043)
44
45// Transaction acquisition results
46const (
khenaidoo89b0e942018-10-21 21:11:33 -040047 UNKNOWN = iota
48 SEIZED_BY_SELF
49 COMPLETED_BY_OTHER
50 ABANDONED_BY_OTHER
khenaidoo1ce37ad2019-03-24 22:07:24 -040051 STOPPED_WATCHING_KEY
52 STOPPED_WAITING_FOR_KEY
Richard Jankowski215a3e22018-10-04 13:56:11 -040053)
54
Kent Hagerman46dcd9d2019-09-18 16:42:59 -040055var errorTransactionNotAcquired = status.Error(codes.Canceled, "transaction-not-acquired")
56
Richard Jankowski215a3e22018-10-04 13:56:11 -040057const (
khenaidoo89b0e942018-10-21 21:11:33 -040058 TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
Richard Jankowski215a3e22018-10-04 13:56:11 -040059)
60
61type TransactionContext struct {
khenaidoo89b0e942018-10-21 21:11:33 -040062 kvClient kvstore.Client
63 kvOperationTimeout int
Richard Jankowski199fd862019-03-18 14:49:51 -040064 monitorLoopTime int64
khenaidoo89b0e942018-10-21 21:11:33 -040065 owner string
66 timeToDeleteCompletedKeys int
67 txnPrefix string
Richard Jankowski215a3e22018-10-04 13:56:11 -040068}
khenaidoo89b0e942018-10-21 21:11:33 -040069
Richard Jankowski215a3e22018-10-04 13:56:11 -040070var ctx *TransactionContext
71
khenaidoo89b0e942018-10-21 21:11:33 -040072var txnState = []string{
73 "UNKNOWN",
74 "SEIZED-BY-SELF",
75 "COMPLETED-BY-OTHER",
76 "ABANDONED-BY-OTHER",
khenaidoo1ce37ad2019-03-24 22:07:24 -040077 "STOPPED-WATCHING-KEY",
78 "STOPPED-WAITING-FOR-KEY"}
Richard Jankowski215a3e22018-10-04 13:56:11 -040079
80func init() {
Richard Jankowski199fd862019-03-18 14:49:51 -040081 log.AddPackage(log.JSON, log.DebugLevel, nil)
Richard Jankowski215a3e22018-10-04 13:56:11 -040082}
83
84func NewTransactionContext(
khenaidoo89b0e942018-10-21 21:11:33 -040085 owner string,
86 txnPrefix string,
87 kvClient kvstore.Client,
88 kvOpTimeout int,
Richard Jankowski199fd862019-03-18 14:49:51 -040089 keyDeleteTime int,
90 monLoopTime int64) *TransactionContext {
Richard Jankowski215a3e22018-10-04 13:56:11 -040091
khenaidoo89b0e942018-10-21 21:11:33 -040092 return &TransactionContext{
93 owner: owner,
94 txnPrefix: txnPrefix,
95 kvClient: kvClient,
96 kvOperationTimeout: kvOpTimeout,
Richard Jankowski199fd862019-03-18 14:49:51 -040097 monitorLoopTime: monLoopTime,
khenaidoo89b0e942018-10-21 21:11:33 -040098 timeToDeleteCompletedKeys: keyDeleteTime}
Richard Jankowski215a3e22018-10-04 13:56:11 -040099}
100
101/*
102 * Before instantiating a KVTransaction, a TransactionContext must be created.
103 * The parameters stored in the context govern the behaviour of all KVTransaction
104 * instances.
105 *
106 * :param owner: The owner (i.e. voltha core name) of a transaction
107 * :param txnPrefix: The key prefix under which all transaction IDs, or serial numbers,
108 * will be created (e.g. "service/voltha/transactions")
109 * :param kvClient: The client API used for all interactions with the KV store. Currently
110 * only the etcd client is supported.
Richard Jankowski199fd862019-03-18 14:49:51 -0400111 * :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
112 * used by this package
113 * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
114 * a TRANSACTION_COMPLETE key
115 * :param monLoopTime: The time in milliseconds that the monitor sleeps between
116 * checks for the existence of the transaction key
Richard Jankowski215a3e22018-10-04 13:56:11 -0400117 */
118func SetTransactionContext(owner string,
khenaidoo89b0e942018-10-21 21:11:33 -0400119 txnPrefix string,
120 kvClient kvstore.Client,
121 kvOpTimeout int,
Richard Jankowski199fd862019-03-18 14:49:51 -0400122 keyDeleteTime int,
123 monLoopTime int64) error {
Richard Jankowski215a3e22018-10-04 13:56:11 -0400124
Richard Jankowski199fd862019-03-18 14:49:51 -0400125 ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
khenaidoo89b0e942018-10-21 21:11:33 -0400126 return nil
Richard Jankowski215a3e22018-10-04 13:56:11 -0400127}
128
Richard Jankowskie4d77662018-10-17 13:53:21 -0400129type KVTransaction struct {
khenaidoo89b0e942018-10-21 21:11:33 -0400130 ch chan int
131 txnId string
132 txnKey string
Richard Jankowski215a3e22018-10-04 13:56:11 -0400133}
134
135/*
136 * A KVTransaction constructor
137 *
138 * :param txnId: The serial number of a voltha request.
Richard Jankowskie4d77662018-10-17 13:53:21 -0400139 * :return: A KVTransaction instance
Richard Jankowski215a3e22018-10-04 13:56:11 -0400140 */
Richard Jankowskie4d77662018-10-17 13:53:21 -0400141func NewKVTransaction(txnId string) *KVTransaction {
khenaidoo89b0e942018-10-21 21:11:33 -0400142 return &KVTransaction{
143 txnId: txnId,
144 txnKey: ctx.txnPrefix + txnId}
Richard Jankowski215a3e22018-10-04 13:56:11 -0400145}
146
147/*
148 * This function returns a boolean indicating whether or not the caller should process
149 * the request. True is returned in one of two cases:
150 * (1) The current core successfully reserved the request's serial number with the KV store
151 * (2) The current core failed in its reservation attempt but observed that the serving core
152 * has abandoned processing the request
153 *
154 * :param duration: The duration of the reservation in milliseconds
155 * :return: true - reservation acquired, process the request
156 * false - reservation not acquired, request being processed by another core
157 */
Richard Jankowskie4d77662018-10-17 13:53:21 -0400158func (c *KVTransaction) Acquired(duration int64) bool {
khenaidoo89b0e942018-10-21 21:11:33 -0400159 var acquired bool
160 var currOwner string = ""
161 var res int
Richard Jankowski215a3e22018-10-04 13:56:11 -0400162
khenaidoo89b0e942018-10-21 21:11:33 -0400163 // Convert milliseconds to seconds, rounding up
164 // The reservation TTL is specified in seconds
165 durationInSecs := duration / 1000
166 if remainder := duration % 1000; remainder > 0 {
167 durationInSecs++
168 }
169 value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400170
khenaidoo89b0e942018-10-21 21:11:33 -0400171 // If the reservation failed, do we simply abort or drop into watch mode anyway?
172 // Setting value to nil leads to watch mode
173 if value != nil {
174 if currOwner, err = kvstore.ToString(value); err != nil {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400175 log.Errorw("unexpected-owner-type", log.Fields{"txn": c.txnId})
khenaidoo89b0e942018-10-21 21:11:33 -0400176 value = nil
177 }
178 }
179 if err == nil && value != nil && currOwner == ctx.owner {
180 // Process the request immediately
181 res = SEIZED_BY_SELF
182 } else {
183 // Another core instance has reserved the request
184 // Watch for reservation expiry or successful request completion
khenaidoo89b0e942018-10-21 21:11:33 -0400185 log.Debugw("watch-other-server",
khenaidoo1ce37ad2019-03-24 22:07:24 -0400186 log.Fields{"txn": c.txnId, "owner": currOwner, "timeout": duration})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400187
Richard Jankowski199fd862019-03-18 14:49:51 -0400188 res = c.Watch(duration)
khenaidoo89b0e942018-10-21 21:11:33 -0400189 }
190 // Clean-up: delete the transaction key after a long delay
191 go c.deleteTransactionKey()
Richard Jankowski215a3e22018-10-04 13:56:11 -0400192
khenaidoo1ce37ad2019-03-24 22:07:24 -0400193 log.Debugw("acquire-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
khenaidoo89b0e942018-10-21 21:11:33 -0400194 switch res {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400195 case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY:
khenaidoo89b0e942018-10-21 21:11:33 -0400196 acquired = true
197 default:
198 acquired = false
199 }
Richard Jankowski00a04662019-02-05 12:18:53 -0500200 // Ensure the request watcher does not reply before the request server
201 if !acquired {
A R Karthick919f6db2019-08-29 18:14:56 +0000202 log.Debugw("Transaction was not ACQUIRED", log.Fields{"txn": c.txnId})
Richard Jankowski00a04662019-02-05 12:18:53 -0500203 }
khenaidoo89b0e942018-10-21 21:11:33 -0400204 return acquired
Richard Jankowski215a3e22018-10-04 13:56:11 -0400205}
206
Richard Jankowski199fd862019-03-18 14:49:51 -0400207/*
208 * This function monitors the progress of a request that's been reserved by another
209 * Voltha core.
210 *
211 * :param duration: The duration of the reservation in milliseconds
212 * :return: true - reservation abandoned by the other core, process the request
213 * false - reservation not owned, request being processed by another core
214 */
215func (c *KVTransaction) Monitor(duration int64) bool {
216 var acquired bool
217 var res int
Richard Jankowski199fd862019-03-18 14:49:51 -0400218
219 // Convert milliseconds to seconds, rounding up
220 // The reservation TTL is specified in seconds
221 durationInSecs := duration / 1000
222 if remainder := duration % 1000; remainder > 0 {
223 durationInSecs++
224 }
khenaidoo1ce37ad2019-03-24 22:07:24 -0400225
226 res = c.Watch(duration)
227
Richard Jankowski199fd862019-03-18 14:49:51 -0400228 // Clean-up: delete the transaction key after a long delay
229 go c.deleteTransactionKey()
230
khenaidoo1ce37ad2019-03-24 22:07:24 -0400231 log.Debugw("monitor-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
Richard Jankowski199fd862019-03-18 14:49:51 -0400232 switch res {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400233 case ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY, STOPPED_WAITING_FOR_KEY:
Richard Jankowski199fd862019-03-18 14:49:51 -0400234 acquired = true
235 default:
236 acquired = false
237 }
238 // Ensure the request watcher does not reply before the request server
239 if !acquired {
A R Karthick919f6db2019-08-29 18:14:56 +0000240 log.Debugw("Transaction was not acquired", log.Fields{"txn": c.txnId})
Richard Jankowski199fd862019-03-18 14:49:51 -0400241 }
242 return acquired
243}
244
245// duration in milliseconds
246func (c *KVTransaction) Watch(duration int64) int {
247 var res int
248
249 events := ctx.kvClient.Watch(c.txnKey)
Richard Jankowski199fd862019-03-18 14:49:51 -0400250
A R Karthick919f6db2019-08-29 18:14:56 +0000251 for {
252 select {
253 // Add a timeout here in case we miss an event from the KV
254 case <-time.After(time.Duration(duration) * time.Millisecond):
255 // In case of missing events, let's check the transaction key
256 kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
257 if err == nil && kvp == nil {
258 log.Debugw("missed-delete-event", log.Fields{"txn": c.txnId})
259 res = ABANDONED_BY_OTHER
260 } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
261 log.Debugw("missed-put-event", log.Fields{"txn": c.txnId, "value": val})
Richard Jankowski199fd862019-03-18 14:49:51 -0400262 res = COMPLETED_BY_OTHER
A R Karthick919f6db2019-08-29 18:14:56 +0000263 } else {
264 log.Debugw("watch-timeout", log.Fields{"txn": c.txnId, "value": val})
265 res = STOPPED_WATCHING_KEY
266 }
267
268 case event := <-events:
269 log.Debugw("received-event", log.Fields{"txn": c.txnId, "type": event.EventType})
270 if event.EventType == kvstore.DELETE {
271 // The other core failed to process the request
272 res = ABANDONED_BY_OTHER
273 } else if event.EventType == kvstore.PUT {
274 key, e1 := kvstore.ToString(event.Key)
275 val, e2 := kvstore.ToString(event.Value)
276 if e1 == nil && key == c.txnKey && e2 == nil {
277 if val == TRANSACTION_COMPLETE {
278 res = COMPLETED_BY_OTHER
279 // Successful request completion has been detected
280 // Remove the transaction key
281 c.Delete()
282 } else {
283 log.Debugf("Ignoring reservation PUT event with val %v for key %v",
284 val, key)
285 continue
286 }
287 }
Richard Jankowski199fd862019-03-18 14:49:51 -0400288 }
289 }
A R Karthick919f6db2019-08-29 18:14:56 +0000290 break
Richard Jankowski199fd862019-03-18 14:49:51 -0400291 }
292 return res
293}
294
Richard Jankowskie4d77662018-10-17 13:53:21 -0400295func (c *KVTransaction) deleteTransactionKey() {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400296 log.Debugw("schedule-key-deletion", log.Fields{"txnId": c.txnId, "txnkey": c.txnKey})
khenaidoo89b0e942018-10-21 21:11:33 -0400297 time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
khenaidoo1ce37ad2019-03-24 22:07:24 -0400298 log.Debugw("background-key-deletion", log.Fields{"txn": c.txnId, "txnkey": c.txnKey})
Stephane Barbarie260a5632019-02-26 16:12:49 -0500299 ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400300}
301
Richard Jankowskie4d77662018-10-17 13:53:21 -0400302func (c *KVTransaction) Close() error {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400303 log.Debugw("close", log.Fields{"txn": c.txnId})
Stephane Barbarie260a5632019-02-26 16:12:49 -0500304 return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400305}
306
Richard Jankowskie4d77662018-10-17 13:53:21 -0400307func (c *KVTransaction) Delete() error {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400308 log.Debugw("delete", log.Fields{"txn": c.txnId})
Stephane Barbarie260a5632019-02-26 16:12:49 -0500309 err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
khenaidoo89b0e942018-10-21 21:11:33 -0400310 return err
Richard Jankowski215a3e22018-10-04 13:56:11 -0400311}