blob: 12bf93e03c26ee0705e065dc0ac8c41f8bd673c5 [file] [log] [blame]
Richard Jankowski215a3e22018-10-04 13:56:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Two voltha cores receive the same request; each tries to acquire ownership of the request
19 * by writing its identifier (e.g. container name or pod name) to the transaction key named
Richard Jankowskie4d77662018-10-17 13:53:21 -040020 * after the serial number of the request. The core that loses the race for acquisition
21 * monitors the progress of the core actually serving the request by watching for changes
22 * in the value of the transaction key. Once the request is complete, the
23 * serving core closes the transaction by invoking the KVTransaction's Close method, which
Richard Jankowski215a3e22018-10-04 13:56:11 -040024 * replaces the value of the transaction (i.e. serial number) key with the string
25 * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
26 * and then deletes the transaction key.
27 *
28 * To ensure the key is removed despite possible standby core failures, a KV operation is
29 * scheduled in the background on both cores to delete the key well after the transaction is
30 * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
31 * long enough, on the order of many seconds, to ensure the standby sees the transaction
32 * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
33 * the KV store.
34 */
35package core
36
37import (
khenaidoo89b0e942018-10-21 21:11:33 -040038 log "github.com/opencord/voltha-go/common/log"
39 "github.com/opencord/voltha-go/db/kvstore"
40 "time"
Richard Jankowski215a3e22018-10-04 13:56:11 -040041)
42
43// Transaction acquisition results
44const (
khenaidoo89b0e942018-10-21 21:11:33 -040045 UNKNOWN = iota
46 SEIZED_BY_SELF
47 COMPLETED_BY_OTHER
48 ABANDONED_BY_OTHER
khenaidoo1ce37ad2019-03-24 22:07:24 -040049 STOPPED_WATCHING_KEY
50 STOPPED_WAITING_FOR_KEY
Richard Jankowski215a3e22018-10-04 13:56:11 -040051)
52
53const (
khenaidoo89b0e942018-10-21 21:11:33 -040054 TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
Richard Jankowski215a3e22018-10-04 13:56:11 -040055)
56
57type TransactionContext struct {
khenaidoo89b0e942018-10-21 21:11:33 -040058 kvClient kvstore.Client
59 kvOperationTimeout int
Richard Jankowski199fd862019-03-18 14:49:51 -040060 monitorLoopTime int64
khenaidoo89b0e942018-10-21 21:11:33 -040061 owner string
62 timeToDeleteCompletedKeys int
63 txnPrefix string
Richard Jankowski215a3e22018-10-04 13:56:11 -040064}
khenaidoo89b0e942018-10-21 21:11:33 -040065
Richard Jankowski215a3e22018-10-04 13:56:11 -040066var ctx *TransactionContext
67
khenaidoo89b0e942018-10-21 21:11:33 -040068var txnState = []string{
69 "UNKNOWN",
70 "SEIZED-BY-SELF",
71 "COMPLETED-BY-OTHER",
72 "ABANDONED-BY-OTHER",
khenaidoo1ce37ad2019-03-24 22:07:24 -040073 "STOPPED-WATCHING-KEY",
74 "STOPPED-WAITING-FOR-KEY"}
Richard Jankowski215a3e22018-10-04 13:56:11 -040075
76func init() {
Richard Jankowski199fd862019-03-18 14:49:51 -040077 log.AddPackage(log.JSON, log.DebugLevel, nil)
Richard Jankowski215a3e22018-10-04 13:56:11 -040078}
79
80func NewTransactionContext(
khenaidoo89b0e942018-10-21 21:11:33 -040081 owner string,
82 txnPrefix string,
83 kvClient kvstore.Client,
84 kvOpTimeout int,
Richard Jankowski199fd862019-03-18 14:49:51 -040085 keyDeleteTime int,
86 monLoopTime int64) *TransactionContext {
Richard Jankowski215a3e22018-10-04 13:56:11 -040087
khenaidoo89b0e942018-10-21 21:11:33 -040088 return &TransactionContext{
89 owner: owner,
90 txnPrefix: txnPrefix,
91 kvClient: kvClient,
92 kvOperationTimeout: kvOpTimeout,
Richard Jankowski199fd862019-03-18 14:49:51 -040093 monitorLoopTime: monLoopTime,
khenaidoo89b0e942018-10-21 21:11:33 -040094 timeToDeleteCompletedKeys: keyDeleteTime}
Richard Jankowski215a3e22018-10-04 13:56:11 -040095}
96
97/*
98 * Before instantiating a KVTransaction, a TransactionContext must be created.
99 * The parameters stored in the context govern the behaviour of all KVTransaction
100 * instances.
101 *
102 * :param owner: The owner (i.e. voltha core name) of a transaction
103 * :param txnPrefix: The key prefix under which all transaction IDs, or serial numbers,
104 * will be created (e.g. "service/voltha/transactions")
105 * :param kvClient: The client API used for all interactions with the KV store. Currently
106 * only the etcd client is supported.
Richard Jankowski199fd862019-03-18 14:49:51 -0400107 * :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
108 * used by this package
109 * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
110 * a TRANSACTION_COMPLETE key
111 * :param monLoopTime: The time in milliseconds that the monitor sleeps between
112 * checks for the existence of the transaction key
Richard Jankowski215a3e22018-10-04 13:56:11 -0400113 */
114func SetTransactionContext(owner string,
khenaidoo89b0e942018-10-21 21:11:33 -0400115 txnPrefix string,
116 kvClient kvstore.Client,
117 kvOpTimeout int,
Richard Jankowski199fd862019-03-18 14:49:51 -0400118 keyDeleteTime int,
119 monLoopTime int64) error {
Richard Jankowski215a3e22018-10-04 13:56:11 -0400120
Richard Jankowski199fd862019-03-18 14:49:51 -0400121 ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
khenaidoo89b0e942018-10-21 21:11:33 -0400122 return nil
Richard Jankowski215a3e22018-10-04 13:56:11 -0400123}
124
Richard Jankowskie4d77662018-10-17 13:53:21 -0400125type KVTransaction struct {
khenaidoo89b0e942018-10-21 21:11:33 -0400126 ch chan int
127 txnId string
128 txnKey string
Richard Jankowski215a3e22018-10-04 13:56:11 -0400129}
130
131/*
132 * A KVTransaction constructor
133 *
134 * :param txnId: The serial number of a voltha request.
Richard Jankowskie4d77662018-10-17 13:53:21 -0400135 * :return: A KVTransaction instance
Richard Jankowski215a3e22018-10-04 13:56:11 -0400136 */
Richard Jankowskie4d77662018-10-17 13:53:21 -0400137func NewKVTransaction(txnId string) *KVTransaction {
khenaidoo89b0e942018-10-21 21:11:33 -0400138 return &KVTransaction{
139 txnId: txnId,
140 txnKey: ctx.txnPrefix + txnId}
Richard Jankowski215a3e22018-10-04 13:56:11 -0400141}
142
143/*
144 * This function returns a boolean indicating whether or not the caller should process
145 * the request. True is returned in one of two cases:
146 * (1) The current core successfully reserved the request's serial number with the KV store
147 * (2) The current core failed in its reservation attempt but observed that the serving core
148 * has abandoned processing the request
149 *
150 * :param duration: The duration of the reservation in milliseconds
151 * :return: true - reservation acquired, process the request
152 * false - reservation not acquired, request being processed by another core
153 */
Richard Jankowskie4d77662018-10-17 13:53:21 -0400154func (c *KVTransaction) Acquired(duration int64) bool {
khenaidoo89b0e942018-10-21 21:11:33 -0400155 var acquired bool
156 var currOwner string = ""
157 var res int
Richard Jankowski215a3e22018-10-04 13:56:11 -0400158
khenaidoo89b0e942018-10-21 21:11:33 -0400159 // Convert milliseconds to seconds, rounding up
160 // The reservation TTL is specified in seconds
161 durationInSecs := duration / 1000
162 if remainder := duration % 1000; remainder > 0 {
163 durationInSecs++
164 }
165 value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400166
khenaidoo89b0e942018-10-21 21:11:33 -0400167 // If the reservation failed, do we simply abort or drop into watch mode anyway?
168 // Setting value to nil leads to watch mode
169 if value != nil {
170 if currOwner, err = kvstore.ToString(value); err != nil {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400171 log.Errorw("unexpected-owner-type", log.Fields{"txn": c.txnId})
khenaidoo89b0e942018-10-21 21:11:33 -0400172 value = nil
173 }
174 }
175 if err == nil && value != nil && currOwner == ctx.owner {
176 // Process the request immediately
177 res = SEIZED_BY_SELF
178 } else {
179 // Another core instance has reserved the request
180 // Watch for reservation expiry or successful request completion
khenaidoo89b0e942018-10-21 21:11:33 -0400181 log.Debugw("watch-other-server",
khenaidoo1ce37ad2019-03-24 22:07:24 -0400182 log.Fields{"txn": c.txnId, "owner": currOwner, "timeout": duration})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400183
Richard Jankowski199fd862019-03-18 14:49:51 -0400184 res = c.Watch(duration)
khenaidoo89b0e942018-10-21 21:11:33 -0400185 }
186 // Clean-up: delete the transaction key after a long delay
187 go c.deleteTransactionKey()
Richard Jankowski215a3e22018-10-04 13:56:11 -0400188
khenaidoo1ce37ad2019-03-24 22:07:24 -0400189 log.Debugw("acquire-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
khenaidoo89b0e942018-10-21 21:11:33 -0400190 switch res {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400191 case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY:
khenaidoo89b0e942018-10-21 21:11:33 -0400192 acquired = true
193 default:
194 acquired = false
195 }
Richard Jankowski00a04662019-02-05 12:18:53 -0500196 // Ensure the request watcher does not reply before the request server
197 if !acquired {
198 time.Sleep(1 * time.Second)
199 }
khenaidoo89b0e942018-10-21 21:11:33 -0400200 return acquired
Richard Jankowski215a3e22018-10-04 13:56:11 -0400201}
202
Richard Jankowski199fd862019-03-18 14:49:51 -0400203/*
204 * This function monitors the progress of a request that's been reserved by another
205 * Voltha core.
206 *
207 * :param duration: The duration of the reservation in milliseconds
208 * :return: true - reservation abandoned by the other core, process the request
209 * false - reservation not owned, request being processed by another core
210 */
211func (c *KVTransaction) Monitor(duration int64) bool {
212 var acquired bool
213 var res int
Richard Jankowski199fd862019-03-18 14:49:51 -0400214
215 // Convert milliseconds to seconds, rounding up
216 // The reservation TTL is specified in seconds
217 durationInSecs := duration / 1000
218 if remainder := duration % 1000; remainder > 0 {
219 durationInSecs++
220 }
khenaidoo1ce37ad2019-03-24 22:07:24 -0400221
222 res = c.Watch(duration)
223
Richard Jankowski199fd862019-03-18 14:49:51 -0400224 // Clean-up: delete the transaction key after a long delay
225 go c.deleteTransactionKey()
226
khenaidoo1ce37ad2019-03-24 22:07:24 -0400227 log.Debugw("monitor-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
Richard Jankowski199fd862019-03-18 14:49:51 -0400228 switch res {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400229 case ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY, STOPPED_WAITING_FOR_KEY:
Richard Jankowski199fd862019-03-18 14:49:51 -0400230 acquired = true
231 default:
232 acquired = false
233 }
234 // Ensure the request watcher does not reply before the request server
235 if !acquired {
236 time.Sleep(1 * time.Second)
237 }
238 return acquired
239}
240
241// duration in milliseconds
242func (c *KVTransaction) Watch(duration int64) int {
243 var res int
244
245 events := ctx.kvClient.Watch(c.txnKey)
246 select {
247 // Add a timeout here in case we miss an event from the KV
248 case <-time.After(time.Duration(duration) * time.Millisecond):
249 // In case of missing events, let's check the transaction key
250 kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
251 if err == nil && kvp == nil {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400252 log.Debugw("missed-delete-event", log.Fields{"txn": c.txnId})
Richard Jankowski199fd862019-03-18 14:49:51 -0400253 res = ABANDONED_BY_OTHER
254 } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400255 log.Debugw("missed-put-event", log.Fields{"txn": c.txnId, "value": val})
Richard Jankowski199fd862019-03-18 14:49:51 -0400256 res = COMPLETED_BY_OTHER
257 } else {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400258 log.Debugw("watch-timeout", log.Fields{"txn": c.txnId, "value": val})
259 res = STOPPED_WATCHING_KEY
Richard Jankowski199fd862019-03-18 14:49:51 -0400260 }
261
262 case event := <-events:
khenaidoo1ce37ad2019-03-24 22:07:24 -0400263 log.Debugw("received-event", log.Fields{"txn": c.txnId, "type": event.EventType})
Richard Jankowski199fd862019-03-18 14:49:51 -0400264 if event.EventType == kvstore.DELETE {
265 // The other core failed to process the request
266 res = ABANDONED_BY_OTHER
267 } else if event.EventType == kvstore.PUT {
268 key, e1 := kvstore.ToString(event.Key)
269 val, e2 := kvstore.ToString(event.Value)
270 if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
271 res = COMPLETED_BY_OTHER
272 // Successful request completion has been detected
273 // Remove the transaction key
274 c.Delete()
275 }
276 }
277 }
278 return res
279}
280
Richard Jankowskie4d77662018-10-17 13:53:21 -0400281func (c *KVTransaction) deleteTransactionKey() {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400282 log.Debugw("schedule-key-deletion", log.Fields{"txnId": c.txnId, "txnkey": c.txnKey})
khenaidoo89b0e942018-10-21 21:11:33 -0400283 time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
khenaidoo1ce37ad2019-03-24 22:07:24 -0400284 log.Debugw("background-key-deletion", log.Fields{"txn": c.txnId, "txnkey": c.txnKey})
Stephane Barbarie260a5632019-02-26 16:12:49 -0500285 ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400286}
287
Richard Jankowskie4d77662018-10-17 13:53:21 -0400288func (c *KVTransaction) Close() error {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400289 log.Debugw("close", log.Fields{"txn": c.txnId})
Stephane Barbarie260a5632019-02-26 16:12:49 -0500290 return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400291}
292
Richard Jankowskie4d77662018-10-17 13:53:21 -0400293func (c *KVTransaction) Delete() error {
khenaidoo1ce37ad2019-03-24 22:07:24 -0400294 log.Debugw("delete", log.Fields{"txn": c.txnId})
Stephane Barbarie260a5632019-02-26 16:12:49 -0500295 err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
khenaidoo89b0e942018-10-21 21:11:33 -0400296 return err
Richard Jankowski215a3e22018-10-04 13:56:11 -0400297}