blob: 44d9dc1c4f548107dee2a024b7ad7959cf9387ef [file] [log] [blame]
Richard Jankowski215a3e22018-10-04 13:56:11 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Two voltha cores receive the same request; each tries to acquire ownership of the request
19 * by writing its identifier (e.g. container name or pod name) to the transaction key named
20 * after the serial number of the request. The core that loses the race to reserve the request
21 * watches the progress of the core actually serving the request. Once the request is complete,
22 * the serving core closes the transaction by invoking the KVTransaction's Close method, which
23 * replaces the value of the transaction (i.e. serial number) key with the string
24 * TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
25 * and then deletes the transaction key.
26 *
27 * To ensure the key is removed despite possible standby core failures, a KV operation is
28 * scheduled in the background on both cores to delete the key well after the transaction is
29 * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
30 * long enough, on the order of many seconds, to ensure the standby sees the transaction
31 * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
32 * the KV store.
33 */
34package core
35
36import (
37 "kvstore"
38 "time"
39 log "github.com/opencord/voltha-go/common/log"
40)
41
42// Transaction acquisition results
43const (
44 UNKNOWN = iota
45 SEIZED_BY_SELF
46 COMPLETED_BY_OTHER
47 ABANDONED_BY_OTHER
48 STOPPED_WAITING_FOR_OTHER
49)
50
51const (
52 TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
53)
54
55type TransactionContext struct {
56 kvClient *kvstore.EtcdClient
57 kvOperationTimeout int
58 owner string
59 timeToDeleteCompletedKeys int
60 txnPrefix string
61}
62var ctx *TransactionContext
63
64var txnState = []string {
65 "UNKNOWN",
66 "SEIZED-BY-SELF",
67 "COMPLETED-BY-OTHER",
68 "ABANDONED-BY-OTHER",
69 "STOPPED-WAITING-FOR-OTHER"}
70
71func init() {
72 log.AddPackage(log.JSON, log.WarnLevel, nil)
73}
74
75func NewTransactionContext(
76 owner string,
77 txnPrefix string,
78 kvClient *kvstore.EtcdClient,
79 kvOpTimeout int,
80 keyDeleteTime int) *TransactionContext {
81
82 return &TransactionContext{
83 owner: owner,
84 txnPrefix: txnPrefix,
85 kvClient: kvClient,
86 kvOperationTimeout: kvOpTimeout,
87 timeToDeleteCompletedKeys: keyDeleteTime}
88}
89
90/*
91 * Before instantiating a KVTransaction, a TransactionContext must be created.
92 * The parameters stored in the context govern the behaviour of all KVTransaction
93 * instances.
94 *
95 * :param owner: The owner (i.e. voltha core name) of a transaction
96 * :param txnPrefix: The key prefix under which all transaction IDs, or serial numbers,
97 * will be created (e.g. "service/voltha/transactions")
98 * :param kvClient: The client API used for all interactions with the KV store. Currently
99 * only the etcd client is supported.
100 * :param: kvOpTimeout: The maximum time to be taken by any KV operation used by this
101 * package
102 * :param keyDeleteTime: The time to wait, in the background, before deleting a
103 * TRANSACTION_COMPLETE key
104 */
105func SetTransactionContext(owner string,
106 txnPrefix string,
107 kvClient *kvstore.EtcdClient,
108 kvOpTimeout int,
109 keyDeleteTime int) error {
110
111 ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
112 return nil
113}
114
115type EtcdTransaction struct {
116 ch chan int
117 txnId string
118 txnKey string
119}
120
121/*
122 * A KVTransaction constructor
123 *
124 * :param txnId: The serial number of a voltha request.
125 * :return: A KVTransaction instance (currently an EtcdTransaction is returned)
126 */
127func NewEtcdTransaction(txnId string) *EtcdTransaction {
128 return &EtcdTransaction{
129 txnId: txnId,
130 txnKey: ctx.txnPrefix + txnId}
131}
132
133/*
134 * This function returns a boolean indicating whether or not the caller should process
135 * the request. True is returned in one of two cases:
136 * (1) The current core successfully reserved the request's serial number with the KV store
137 * (2) The current core failed in its reservation attempt but observed that the serving core
138 * has abandoned processing the request
139 *
140 * :param duration: The duration of the reservation in milliseconds
141 * :return: true - reservation acquired, process the request
142 * false - reservation not acquired, request being processed by another core
143 */
144func (c *EtcdTransaction) Acquired(duration int64) bool {
145 var acquired bool
146 var currOwner string = ""
147 var res int
148
149 // Convert milliseconds to seconds, rounding up
150 // The reservation TTL is specified in seconds
151 durationInSecs := duration / 1000
152 if remainder := duration % 1000; remainder > 0 {
153 durationInSecs++
154 }
155 value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
156
157 // If the reservation failed, do we simply abort or drop into watch mode anyway?
158 // Setting value to nil leads to watch mode
159 if value != nil {
160 if currOwner, err = kvstore.ToString(value); err != nil {
161 log.Fatal("Unexpected owner type")
162 value = nil
163 }
164 }
165 if err == nil && value != nil && currOwner == ctx.owner {
166 // Process the request immediately
167 res = SEIZED_BY_SELF
168 } else {
169 log.Debugw("Another owns the transaction", log.Fields{"owner": currOwner})
170 // Another core instance has reserved the request
171 // Watch for reservation expiry or successful request completion
172 // Add a timeout here in case we miss an event from the KV
173 log.Debugw("Wait for KV events", log.Fields{"timeout": duration})
174 events := ctx.kvClient.Watch(c.txnKey)
175
176 select {
177 case <-time.After(time.Duration(duration) * time.Millisecond):
178 // In case of missing events, let's check the transaction key
179 kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout)
180 if err == nil && kvp == nil {
181 log.Debug("Missed DELETED event from KV")
182 res = ABANDONED_BY_OTHER
183 } else if val, err := kvstore.ToString(kvp.Value);
184 err == nil && val == TRANSACTION_COMPLETE {
185 log.Debugw("Missed PUT event from KV",
186 log.Fields{"key": c.txnKey, "value": val})
187 res = COMPLETED_BY_OTHER
188 } else {
189 res = STOPPED_WAITING_FOR_OTHER
190 }
191
192 case event := <-events:
193 log.Debugw("Received KV event", log.Fields{"type": event.EventType})
194 if event.EventType == kvstore.DELETE {
195 // The other core failed to process the request; step up
196 res = ABANDONED_BY_OTHER
197 } else if event.EventType == kvstore.PUT {
198 key, e1 := kvstore.ToString(event.Key)
199 val, e2 := kvstore.ToString(event.Value)
200 if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
201 res = COMPLETED_BY_OTHER
202 // Successful request completion has been detected
203 // Remove the transaction key
204 c.Delete()
205 }
206 }
207 }
208 }
209 // Clean-up: delete the transaction key after a long delay
210 go c.deleteTransactionKey()
211
212 log.Debugw("Acquire transaction", log.Fields{"result": txnState[res]})
213 switch res {
214 case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
215 acquired = true
216 default:
217 acquired = false
218 }
219 return acquired
220}
221
222func (c *EtcdTransaction) deleteTransactionKey() {
223 log.Debugw("Schedule key deletion", log.Fields{"key": c.txnKey})
224 time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
225 log.Debugw("Background key deletion", log.Fields{"key": c.txnKey})
226 ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
227}
228
229func (c *EtcdTransaction) Close() error {
230 log.Debugw("Close", log.Fields{"key": c.txnKey})
231 return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout)
232}
233
234func (c *EtcdTransaction) Delete() error {
235 log.Debugw("Delete", log.Fields{"key": c.txnKey})
236 err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
237 return err
238}
239