blob: c2a3634beddc498e2372d0b379059af916feda39 [file] [log] [blame]
Richard Jankowski215a3e22018-10-04 13:56:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Two voltha cores receive the same request; each tries to acquire ownership of the request
19 * by writing its identifier (e.g. container name or pod name) to the transaction key named
Richard Jankowskie4d77662018-10-17 13:53:21 -040020 * after the serial number of the request. The core that loses the race for acquisition
21 * monitors the progress of the core actually serving the request by watching for changes
22 * in the value of the transaction key. Once the request is complete, the
23 * serving core closes the transaction by invoking the KVTransaction's Close method, which
Richard Jankowski215a3e22018-10-04 13:56:11 -040024 * replaces the value of the transaction (i.e. serial number) key with the string
25 * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
26 * and then deletes the transaction key.
27 *
28 * To ensure the key is removed despite possible standby core failures, a KV operation is
29 * scheduled in the background on both cores to delete the key well after the transaction is
30 * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
31 * long enough, on the order of many seconds, to ensure the standby sees the transaction
32 * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
33 * the KV store.
34 */
35package core
36
37import (
Richard Jankowski215a3e22018-10-04 13:56:11 -040038 "time"
Richard Jankowskie4d77662018-10-17 13:53:21 -040039 "github.com/opencord/voltha-go/db/kvstore"
Richard Jankowski215a3e22018-10-04 13:56:11 -040040 log "github.com/opencord/voltha-go/common/log"
41)
42
43// Transaction acquisition results
44const (
45 UNKNOWN = iota
46 SEIZED_BY_SELF
47 COMPLETED_BY_OTHER
48 ABANDONED_BY_OTHER
49 STOPPED_WAITING_FOR_OTHER
50)
51
52const (
53 TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
54)
55
56type TransactionContext struct {
Richard Jankowskie4d77662018-10-17 13:53:21 -040057 kvClient kvstore.Client
Richard Jankowski215a3e22018-10-04 13:56:11 -040058 kvOperationTimeout int
59 owner string
60 timeToDeleteCompletedKeys int
61 txnPrefix string
62}
63var ctx *TransactionContext
64
65var txnState = []string {
66 "UNKNOWN",
67 "SEIZED-BY-SELF",
68 "COMPLETED-BY-OTHER",
69 "ABANDONED-BY-OTHER",
70 "STOPPED-WAITING-FOR-OTHER"}
71
72func init() {
73 log.AddPackage(log.JSON, log.WarnLevel, nil)
74}
75
76func NewTransactionContext(
77 owner string,
78 txnPrefix string,
Richard Jankowskie4d77662018-10-17 13:53:21 -040079 kvClient kvstore.Client,
Richard Jankowski215a3e22018-10-04 13:56:11 -040080 kvOpTimeout int,
81 keyDeleteTime int) *TransactionContext {
82
83 return &TransactionContext{
84 owner: owner,
85 txnPrefix: txnPrefix,
86 kvClient: kvClient,
87 kvOperationTimeout: kvOpTimeout,
88 timeToDeleteCompletedKeys: keyDeleteTime}
89}
90
91/*
92 * Before instantiating a KVTransaction, a TransactionContext must be created.
93 * The parameters stored in the context govern the behaviour of all KVTransaction
94 * instances.
95 *
96 * :param owner: The owner (i.e. voltha core name) of a transaction
97 * :param txnPrefix: The key prefix under which all transaction IDs, or serial numbers,
98 * will be created (e.g. "service/voltha/transactions")
99 * :param kvClient: The client API used for all interactions with the KV store. Currently
100 * only the etcd client is supported.
101 * :param: kvOpTimeout: The maximum time to be taken by any KV operation used by this
102 * package
103 * :param keyDeleteTime: The time to wait, in the background, before deleting a
104 * TRANSACTION_COMPLETE key
105 */
106func SetTransactionContext(owner string,
107 txnPrefix string,
Richard Jankowskie4d77662018-10-17 13:53:21 -0400108 kvClient kvstore.Client,
Richard Jankowski215a3e22018-10-04 13:56:11 -0400109 kvOpTimeout int,
110 keyDeleteTime int) error {
111
112 ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
113 return nil
114}
115
Richard Jankowskie4d77662018-10-17 13:53:21 -0400116type KVTransaction struct {
Richard Jankowski215a3e22018-10-04 13:56:11 -0400117 ch chan int
118 txnId string
119 txnKey string
120}
121
122/*
123 * A KVTransaction constructor
124 *
125 * :param txnId: The serial number of a voltha request.
Richard Jankowskie4d77662018-10-17 13:53:21 -0400126 * :return: A KVTransaction instance
Richard Jankowski215a3e22018-10-04 13:56:11 -0400127 */
Richard Jankowskie4d77662018-10-17 13:53:21 -0400128func NewKVTransaction(txnId string) *KVTransaction {
129 return &KVTransaction{
Richard Jankowski215a3e22018-10-04 13:56:11 -0400130 txnId: txnId,
131 txnKey: ctx.txnPrefix + txnId}
132}
133
134/*
135 * This function returns a boolean indicating whether or not the caller should process
136 * the request. True is returned in one of two cases:
137 * (1) The current core successfully reserved the request's serial number with the KV store
138 * (2) The current core failed in its reservation attempt but observed that the serving core
139 * has abandoned processing the request
140 *
141 * :param duration: The duration of the reservation in milliseconds
142 * :return: true - reservation acquired, process the request
143 * false - reservation not acquired, request being processed by another core
144 */
Richard Jankowskie4d77662018-10-17 13:53:21 -0400145func (c *KVTransaction) Acquired(duration int64) bool {
Richard Jankowski215a3e22018-10-04 13:56:11 -0400146 var acquired bool
147 var currOwner string = ""
148 var res int
149
150 // Convert milliseconds to seconds, rounding up
151 // The reservation TTL is specified in seconds
152 durationInSecs := duration / 1000
153 if remainder := duration % 1000; remainder > 0 {
154 durationInSecs++
155 }
156 value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
157
158 // If the reservation failed, do we simply abort or drop into watch mode anyway?
159 // Setting value to nil leads to watch mode
160 if value != nil {
161 if currOwner, err = kvstore.ToString(value); err != nil {
Richard Jankowskie4d77662018-10-17 13:53:21 -0400162 log.Error("unexpected-owner-type")
Richard Jankowski215a3e22018-10-04 13:56:11 -0400163 value = nil
164 }
165 }
166 if err == nil && value != nil && currOwner == ctx.owner {
167 // Process the request immediately
168 res = SEIZED_BY_SELF
169 } else {
Richard Jankowski215a3e22018-10-04 13:56:11 -0400170 // Another core instance has reserved the request
171 // Watch for reservation expiry or successful request completion
Richard Jankowski215a3e22018-10-04 13:56:11 -0400172 events := ctx.kvClient.Watch(c.txnKey)
Richard Jankowskie4d77662018-10-17 13:53:21 -0400173 log.Debugw("watch-other-server",
174 log.Fields{"owner": currOwner, "timeout": duration})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400175
176 select {
Richard Jankowskie4d77662018-10-17 13:53:21 -0400177 // Add a timeout here in case we miss an event from the KV
Richard Jankowski215a3e22018-10-04 13:56:11 -0400178 case <-time.After(time.Duration(duration) * time.Millisecond):
179 // In case of missing events, let's check the transaction key
180 kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
181 if err == nil && kvp == nil {
Richard Jankowskie4d77662018-10-17 13:53:21 -0400182 log.Debug("missed-deleted-event")
Richard Jankowski215a3e22018-10-04 13:56:11 -0400183 res = ABANDONED_BY_OTHER
184 } else if val, err := kvstore.ToString(kvp.Value);
185 err == nil && val == TRANSACTION_COMPLETE {
Richard Jankowskie4d77662018-10-17 13:53:21 -0400186 log.Debugw("missed-put-event",
Richard Jankowski215a3e22018-10-04 13:56:11 -0400187 log.Fields{"key": c.txnKey, "value": val})
188 res = COMPLETED_BY_OTHER
189 } else {
190 res = STOPPED_WAITING_FOR_OTHER
191 }
192
193 case event := <-events:
Richard Jankowskie4d77662018-10-17 13:53:21 -0400194 log.Debugw("received-event", log.Fields{"type": event.EventType})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400195 if event.EventType == kvstore.DELETE {
196 // The other core failed to process the request; step up
197 res = ABANDONED_BY_OTHER
198 } else if event.EventType == kvstore.PUT {
199 key, e1 := kvstore.ToString(event.Key)
200 val, e2 := kvstore.ToString(event.Value)
201 if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
202 res = COMPLETED_BY_OTHER
203 // Successful request completion has been detected
204 // Remove the transaction key
205 c.Delete()
206 }
207 }
208 }
209 }
210 // Clean-up: delete the transaction key after a long delay
211 go c.deleteTransactionKey()
212
Richard Jankowskie4d77662018-10-17 13:53:21 -0400213 log.Debugw("acquire-transaction", log.Fields{"result": txnState[res]})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400214 switch res {
215 case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
216 acquired = true
217 default:
218 acquired = false
219 }
220 return acquired
221}
222
Richard Jankowskie4d77662018-10-17 13:53:21 -0400223func (c *KVTransaction) deleteTransactionKey() {
224 log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400225 time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
Richard Jankowskie4d77662018-10-17 13:53:21 -0400226 log.Debugw("background-key-deletion", log.Fields{"key": c.txnKey})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400227 ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
228}
229
Richard Jankowskie4d77662018-10-17 13:53:21 -0400230func (c *KVTransaction) Close() error {
231 log.Debugw("close", log.Fields{"key": c.txnKey})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400232 return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
233}
234
Richard Jankowskie4d77662018-10-17 13:53:21 -0400235func (c *KVTransaction) Delete() error {
236 log.Debugw("delete", log.Fields{"key": c.txnKey})
Richard Jankowski215a3e22018-10-04 13:56:11 -0400237 err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
238 return err
239}
240