blob: 397299e1173aa9f47e6c45f17cd6467ed21f2b70 [file] [log] [blame]
Richard Jankowski215a3e22018-10-04 13:56:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Two voltha cores receive the same request; each tries to acquire ownership of the request
19 * by writing its identifier (e.g. container name or pod name) to the transaction key named
Richard Jankowskie4d77662018-10-17 13:53:21 -040020 * after the serial number of the request. The core that loses the race for acquisition
21 * monitors the progress of the core actually serving the request by watching for changes
22 * in the value of the transaction key. Once the request is complete, the
23 * serving core closes the transaction by invoking the KVTransaction's Close method, which
Richard Jankowski215a3e22018-10-04 13:56:11 -040024 * replaces the value of the transaction (i.e. serial number) key with the string
25 * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
26 * and then deletes the transaction key.
27 *
28 * To ensure the key is removed despite possible standby core failures, a KV operation is
29 * scheduled in the background on both cores to delete the key well after the transaction is
30 * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
31 * long enough, on the order of many seconds, to ensure the standby sees the transaction
32 * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
33 * the KV store.
34 */
35package core
36
37import (
khenaidoo89b0e942018-10-21 21:11:33 -040038 log "github.com/opencord/voltha-go/common/log"
39 "github.com/opencord/voltha-go/db/kvstore"
40 "time"
Richard Jankowski215a3e22018-10-04 13:56:11 -040041)
42
43// Transaction acquisition results
44const (
khenaidoo89b0e942018-10-21 21:11:33 -040045 UNKNOWN = iota
46 SEIZED_BY_SELF
47 COMPLETED_BY_OTHER
48 ABANDONED_BY_OTHER
49 STOPPED_WAITING_FOR_OTHER
Richard Jankowski215a3e22018-10-04 13:56:11 -040050)
51
52const (
khenaidoo89b0e942018-10-21 21:11:33 -040053 TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
Richard Jankowski215a3e22018-10-04 13:56:11 -040054)
55
56type TransactionContext struct {
khenaidoo89b0e942018-10-21 21:11:33 -040057 kvClient kvstore.Client
58 kvOperationTimeout int
59 owner string
60 timeToDeleteCompletedKeys int
61 txnPrefix string
Richard Jankowski215a3e22018-10-04 13:56:11 -040062}
khenaidoo89b0e942018-10-21 21:11:33 -040063
Richard Jankowski215a3e22018-10-04 13:56:11 -040064var ctx *TransactionContext
65
khenaidoo89b0e942018-10-21 21:11:33 -040066var txnState = []string{
67 "UNKNOWN",
68 "SEIZED-BY-SELF",
69 "COMPLETED-BY-OTHER",
70 "ABANDONED-BY-OTHER",
71 "STOPPED-WAITING-FOR-OTHER"}
Richard Jankowski215a3e22018-10-04 13:56:11 -040072
73func init() {
khenaidoo89b0e942018-10-21 21:11:33 -040074 log.AddPackage(log.JSON, log.WarnLevel, nil)
Richard Jankowski215a3e22018-10-04 13:56:11 -040075}
76
77func NewTransactionContext(
khenaidoo89b0e942018-10-21 21:11:33 -040078 owner string,
79 txnPrefix string,
80 kvClient kvstore.Client,
81 kvOpTimeout int,
82 keyDeleteTime int) *TransactionContext {
Richard Jankowski215a3e22018-10-04 13:56:11 -040083
khenaidoo89b0e942018-10-21 21:11:33 -040084 return &TransactionContext{
85 owner: owner,
86 txnPrefix: txnPrefix,
87 kvClient: kvClient,
88 kvOperationTimeout: kvOpTimeout,
89 timeToDeleteCompletedKeys: keyDeleteTime}
Richard Jankowski215a3e22018-10-04 13:56:11 -040090}
91
92/*
93 * Before instantiating a KVTransaction, a TransactionContext must be created.
94 * The parameters stored in the context govern the behaviour of all KVTransaction
95 * instances.
96 *
97 * :param owner: The owner (i.e. voltha core name) of a transaction
98 * :param txnPrefix: The key prefix under which all transaction IDs, or serial numbers,
99 * will be created (e.g. "service/voltha/transactions")
100 * :param kvClient: The client API used for all interactions with the KV store. Currently
101 * only the etcd client is supported.
102 * :param: kvOpTimeout: The maximum time to be taken by any KV operation used by this
103 * package
104 * :param keyDeleteTime: The time to wait, in the background, before deleting a
105 * TRANSACTION_COMPLETE key
106 */
107func SetTransactionContext(owner string,
khenaidoo89b0e942018-10-21 21:11:33 -0400108 txnPrefix string,
109 kvClient kvstore.Client,
110 kvOpTimeout int,
111 keyDeleteTime int) error {
Richard Jankowski215a3e22018-10-04 13:56:11 -0400112
khenaidoo89b0e942018-10-21 21:11:33 -0400113 ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
114 return nil
Richard Jankowski215a3e22018-10-04 13:56:11 -0400115}
116
Richard Jankowskie4d77662018-10-17 13:53:21 -0400117type KVTransaction struct {
khenaidoo89b0e942018-10-21 21:11:33 -0400118 ch chan int
119 txnId string
120 txnKey string
Richard Jankowski215a3e22018-10-04 13:56:11 -0400121}
122
123/*
124 * A KVTransaction constructor
125 *
126 * :param txnId: The serial number of a voltha request.
Richard Jankowskie4d77662018-10-17 13:53:21 -0400127 * :return: A KVTransaction instance
Richard Jankowski215a3e22018-10-04 13:56:11 -0400128 */
Richard Jankowskie4d77662018-10-17 13:53:21 -0400129func NewKVTransaction(txnId string) *KVTransaction {
khenaidoo89b0e942018-10-21 21:11:33 -0400130 return &KVTransaction{
131 txnId: txnId,
132 txnKey: ctx.txnPrefix + txnId}
Richard Jankowski215a3e22018-10-04 13:56:11 -0400133}
134
135/*
136 * This function returns a boolean indicating whether or not the caller should process
137 * the request. True is returned in one of two cases:
138 * (1) The current core successfully reserved the request's serial number with the KV store
139 * (2) The current core failed in its reservation attempt but observed that the serving core
140 * has abandoned processing the request
141 *
142 * :param duration: The duration of the reservation in milliseconds
143 * :return: true - reservation acquired, process the request
144 * false - reservation not acquired, request being processed by another core
145 */
Richard Jankowskie4d77662018-10-17 13:53:21 -0400146func (c *KVTransaction) Acquired(duration int64) bool {
khenaidoo89b0e942018-10-21 21:11:33 -0400147 var acquired bool
148 var currOwner string = ""
149 var res int
Richard Jankowski215a3e22018-10-04 13:56:11 -0400150
khenaidoo89b0e942018-10-21 21:11:33 -0400151 // Convert milliseconds to seconds, rounding up
152 // The reservation TTL is specified in seconds
153 durationInSecs := duration / 1000
154 if remainder := duration % 1000; remainder > 0 {
155 durationInSecs++
156 }
157 value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400158
khenaidoo89b0e942018-10-21 21:11:33 -0400159 // If the reservation failed, do we simply abort or drop into watch mode anyway?
160 // Setting value to nil leads to watch mode
161 if value != nil {
162 if currOwner, err = kvstore.ToString(value); err != nil {
163 log.Error("unexpected-owner-type")
164 value = nil
165 }
166 }
167 if err == nil && value != nil && currOwner == ctx.owner {
168 // Process the request immediately
169 res = SEIZED_BY_SELF
170 } else {
171 // Another core instance has reserved the request
172 // Watch for reservation expiry or successful request completion
173 events := ctx.kvClient.Watch(c.txnKey)
174 log.Debugw("watch-other-server",
175 log.Fields{"owner": currOwner, "timeout": duration})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400176
khenaidoo89b0e942018-10-21 21:11:33 -0400177 select {
178 // Add a timeout here in case we miss an event from the KV
179 case <-time.After(time.Duration(duration) * time.Millisecond):
180 // In case of missing events, let's check the transaction key
181 kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
182 if err == nil && kvp == nil {
183 log.Debug("missed-deleted-event")
184 res = ABANDONED_BY_OTHER
185 } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
186 log.Debugw("missed-put-event",
187 log.Fields{"key": c.txnKey, "value": val})
188 res = COMPLETED_BY_OTHER
189 } else {
190 res = STOPPED_WAITING_FOR_OTHER
191 }
Richard Jankowski215a3e22018-10-04 13:56:11 -0400192
khenaidoo89b0e942018-10-21 21:11:33 -0400193 case event := <-events:
194 log.Debugw("received-event", log.Fields{"type": event.EventType})
195 if event.EventType == kvstore.DELETE {
196 // The other core failed to process the request; step up
197 res = ABANDONED_BY_OTHER
198 } else if event.EventType == kvstore.PUT {
199 key, e1 := kvstore.ToString(event.Key)
200 val, e2 := kvstore.ToString(event.Value)
201 if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
202 res = COMPLETED_BY_OTHER
203 // Successful request completion has been detected
204 // Remove the transaction key
205 c.Delete()
206 }
207 }
208 }
209 }
210 // Clean-up: delete the transaction key after a long delay
211 go c.deleteTransactionKey()
Richard Jankowski215a3e22018-10-04 13:56:11 -0400212
khenaidoo89b0e942018-10-21 21:11:33 -0400213 log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
214 switch res {
215 case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
216 acquired = true
217 default:
218 acquired = false
219 }
Richard Jankowski00a04662019-02-05 12:18:53 -0500220 // Ensure the request watcher does not reply before the request server
221 if !acquired {
222 time.Sleep(1 * time.Second)
223 }
khenaidoo89b0e942018-10-21 21:11:33 -0400224 return acquired
Richard Jankowski215a3e22018-10-04 13:56:11 -0400225}
226
Richard Jankowskie4d77662018-10-17 13:53:21 -0400227func (c *KVTransaction) deleteTransactionKey() {
khenaidoo89b0e942018-10-21 21:11:33 -0400228 log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
229 time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
230 log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
231 ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400232}
233
Richard Jankowskie4d77662018-10-17 13:53:21 -0400234func (c *KVTransaction) Close() error {
khenaidoo89b0e942018-10-21 21:11:33 -0400235 log.Debugw("close", log.Fields{"key": c.txnKey})
236 return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
Richard Jankowski215a3e22018-10-04 13:56:11 -0400237}
238
Richard Jankowskie4d77662018-10-17 13:53:21 -0400239func (c *KVTransaction) Delete() error {
khenaidoo89b0e942018-10-21 21:11:33 -0400240 log.Debugw("delete", log.Fields{"key": c.txnKey})
241 err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
242 return err
Richard Jankowski215a3e22018-10-04 13:56:11 -0400243}