[VOL-1997] Remove transaction timeout for a non-active rw_core
This commit cleans up the transaction processing between two
cores in a pair. It prevents the core not processing the request
to grab the request based on a timeout only.
Since this update heavily relies on the etcd mechanism then customized
local tests (not unit as could not find a full-featured etcd mock)
were run against it as well as some basic manual tests with
kind-voltha.
There is a TODO item in this commit to implement a peer-probe
mechanism to guarantee that a core in a pair has actually died
before a switch over is done.
Minor updates after first review.
Comments updates after second review
Change-Id: Ifc1442471595a979b39251535b8ee9210e1a52df
(cherry picked from commit cc40904e208892dea8e1a2a73b52e6465d3c6d59)
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 88c13ae..3af1ef2 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -127,7 +127,15 @@
c.writeLock.Lock()
defer c.writeLock.Unlock()
- _, err := c.ectdAPI.Put(ctx, key, val)
+
+ var err error
+ // Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
+ // that KV key permanent instead of automatically removing it after a lease expiration
+ if leaseID, ok := c.keyReservations[key]; ok {
+ _, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
+ } else {
+ _, err = c.ectdAPI.Put(ctx, key, val)
+ }
cancel()
if err != nil {
switch err {
@@ -158,8 +166,8 @@
c.writeLock.Lock()
defer c.writeLock.Unlock()
- // delete the keys
- if _, err := c.ectdAPI.Delete(ctx, key, v3Client.WithPrefix()); err != nil {
+ // delete the key
+ if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
log.Errorw("failed-to-delete-key", log.Fields{"key": key, "error": err})
return err
}
@@ -309,7 +317,7 @@
func (c *EtcdClient) Watch(key string) chan *Event {
w := v3Client.NewWatcher(c.ectdAPI)
ctx, cancel := context.WithCancel(context.Background())
- channel := w.Watch(ctx, key, v3Client.WithPrefix())
+ channel := w.Watch(ctx, key)
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
@@ -317,8 +325,6 @@
// Keep track of the created channels so they can be closed when required
channelMap := make(map[chan *Event]v3Client.Watcher)
channelMap[ch] = w
- //c.writeLock.Lock()
- //defer c.writeLock.Unlock()
channelMaps := c.addChannelMap(key, channelMap)
@@ -412,7 +418,6 @@
defer close(ch)
for resp := range channel {
for _, ev := range resp.Events {
- //log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value, ev.Kv.Version)
}
}
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 56b5fa1..f9b3319 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -46,6 +46,9 @@
FromTopic = "fromTopic"
)
+var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
+var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
+
// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
// obtained from that channel, this interface is invoked. This is used to handle
// async requests into the Core via the kafka messaging bus
@@ -674,15 +677,20 @@
// Check for errors first
lastIndex := len(out) - 1
if out[lastIndex].Interface() != nil { // Error
- if goError, ok := out[lastIndex].Interface().(error); ok {
- returnError = &ic.Error{Reason: goError.Error()}
+ if retError, ok := out[lastIndex].Interface().(error); ok {
+ if retError.Error() == ErrorTransactionNotAcquired.Error() {
+ log.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
+ return // Ignore - process is in competing mode and ignored transaction
+ }
+ returnError = &ic.Error{Reason: retError.Error()}
returnedValues = append(returnedValues, returnError)
} else { // Should never happen
returnError = &ic.Error{Reason: "incorrect-error-returns"}
returnedValues = append(returnedValues, returnError)
}
} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
- return // Ignore case - when core is in competing mode
+ log.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
+ return // Ignore - should not happen
} else { // Non-error case
success = true
for idx, val := range out {
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index bd84bdd..5247247 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -60,22 +60,6 @@
return &proxy
}
-func (rhp *AdapterRequestHandlerProxy) acquireRequest(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
- timeout := rhp.defaultRequestTimeout
- if len(maxTimeout) > 0 {
- timeout = maxTimeout[0]
- }
- txn := NewKVTransaction(transactionId)
- if txn == nil {
- return nil, errors.New("fail-to-create-transaction")
- } else if txn.Acquired(timeout) {
- log.Debugw("acquired-request", log.Fields{"xtrnsId": transactionId})
- return txn, nil
- } else {
- return nil, errorTransactionNotAcquired
- }
-}
-
// This is a helper function that attempts to acquire the request by using the device ownership model
func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(transactionId string, devId string, maxTimeout ...int64) (*KVTransaction, error) {
timeout := rhp.defaultRequestTimeout
@@ -87,22 +71,24 @@
return nil, errors.New("fail-to-create-transaction")
}
- if rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: devId}) {
- log.Debugw("owned-by-me", log.Fields{"Id": devId})
- if txn.Acquired(timeout) {
- log.Debugw("processing-request", log.Fields{"Id": devId})
- return txn, nil
- } else {
- return nil, errorTransactionNotAcquired
+ var acquired bool
+ var err error
+ if devId != "" {
+ var ownedByMe bool
+ if ownedByMe, err = rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: devId}); err != nil {
+ log.Warnw("getting-ownership-failed", log.Fields{"deviceId": devId, "error": err})
+ return nil, kafka.ErrorTransactionInvalidId
}
+ acquired, err = txn.Acquired(timeout, ownedByMe)
} else {
- log.Debugw("not-owned-by-me", log.Fields{"Id": devId})
- if txn.Monitor(timeout) {
- log.Debugw("timeout-processing-request", log.Fields{"Id": devId})
- return txn, nil
- } else {
- return nil, errorTransactionNotAcquired
- }
+ acquired, err = txn.Acquired(timeout)
+ }
+ if err == nil && acquired {
+ log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnId})
+ return txn, nil
+ } else {
+ log.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnId, "error": err})
+ return nil, kafka.ErrorTransactionNotAcquired
}
}
@@ -120,7 +106,7 @@
}
adapter := &voltha.Adapter{}
deviceTypes := &voltha.DeviceTypes{}
- transactionID := &ic.StrType{}
+ transactionId := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "adapter":
@@ -134,22 +120,23 @@
return nil, err
}
case kafka.TransactionKey:
- if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ if err := ptypes.UnmarshalAny(arg.Value, transactionId); err != nil {
log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionID": transactionID.Val, "coreId": rhp.coreInstanceId})
+ log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionId": transactionId.Val, "coreId": rhp.coreInstanceId})
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // Update our adapters in memory
- go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ if txn, err := rhp.takeRequestOwnership(transactionId.Val, ""); err != nil {
+ if err.Error() == kafka.ErrorTransactionNotAcquired.Error() {
+ log.Debugw("Another core handled the request", log.Fields{"transactionId": transactionId})
+ // Update our adapters in memory
+ go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
+ }
+ return nil, err
} else {
defer txn.Close()
}
@@ -189,9 +176,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -238,9 +224,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, device.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -300,9 +285,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -342,8 +326,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, proxyAddress.DeviceId); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -390,6 +374,16 @@
allPorts.Items = append(allPorts.Items, aPort)
return allPorts, nil
}
+ // Try to grab the transaction as this core may be competing with another Core
+ if rhp.competeForTransaction() {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
+ } else {
+ defer txn.Close()
+ }
+ }
+
return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
}
@@ -421,9 +415,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -505,9 +498,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -565,9 +557,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -580,10 +571,6 @@
go rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
voltha.ConnectStatus_ConnectStatus(connStatus.Val))
- //if err := rhp.deviceMgr.updateDeviceStatus(deviceId.Id, voltha.OperStatus_OperStatus(operStatus.Val),
- // voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
- // return nil, err
- //}
return new(empty.Empty), nil
}
@@ -627,9 +614,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -683,9 +669,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -746,9 +731,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -795,9 +779,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -839,9 +822,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -883,9 +865,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -936,9 +917,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -980,9 +960,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, pmConfigs.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -1039,8 +1018,8 @@
// duplicates.
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -1087,9 +1066,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
@@ -1132,9 +1110,8 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
if txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceId.Id); err != nil {
- log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
- // returning nil, nil instructs the callee to ignore this request
- return nil, nil
+ log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
+ return nil, err
} else {
defer txn.Close()
}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 91c359a..1b0f586 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -922,6 +922,10 @@
dMgr.addDeviceAgentToMap(agent)
agent.start(nil, false)
+ // Since this Core has handled this request then it therefore owns this child device. Set the
+ // ownership of this device to this Core
+ dMgr.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: agent.deviceId})
+
// Activate the child device
if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
go agent.enableDevice(nil)
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index 4b30188..ade876b 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -110,7 +110,7 @@
return true
}
-func (da *DeviceOwnership) MonitorOwnership(id string, chnl chan int) {
+func (da *DeviceOwnership) monitorOwnership(id string, chnl chan int) {
log.Debugw("start-device-monitoring", log.Fields{"id": id})
op := "starting"
exit := false
@@ -186,9 +186,9 @@
return deviceIds
}
-// OwnedByMe returns where this Core instance active owns this device. This function will automatically
+// OwnedByMe returns whether this Core instance active owns this device. This function will automatically
// trigger the process to monitor the device and update the device ownership regularly.
-func (da *DeviceOwnership) OwnedByMe(id interface{}) bool {
+func (da *DeviceOwnership) OwnedByMe(id interface{}) (bool, error) {
// Retrieve the ownership key based on the id
var ownershipKey string
var err error
@@ -196,7 +196,7 @@
var cache bool
if ownershipKey, idStr, cache, err = da.getOwnershipKey(id); err != nil {
log.Warnw("no-ownershipkey", log.Fields{"error": err})
- return false
+ return false, err
}
// Update the deviceToKey map, if not from cache
@@ -215,7 +215,7 @@
deviceOwned, ownedByMe := da.getOwnership(ownershipKey)
if deviceOwned {
log.Debugw("ownership", log.Fields{"Id": ownershipKey, "owned": ownedByMe})
- return ownedByMe
+ return ownedByMe, nil
}
// Not owned by me or maybe nobody else. Try to reserve it
reservedByMe := da.tryToReserveKey(ownershipKey)
@@ -229,8 +229,8 @@
da.deviceMapLock.Unlock()
log.Debugw("set-new-ownership", log.Fields{"Id": ownershipKey, "owned": reservedByMe})
- go da.MonitorOwnership(ownershipKey, myChnl)
- return reservedByMe
+ go da.monitorOwnership(ownershipKey, myChnl)
+ return reservedByMe, nil
}
//AbandonDevice must be invoked whenever a device is deleted from the Core
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 987a3f6..482abfc 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -31,7 +31,6 @@
"google.golang.org/grpc/status"
"io"
"sync"
- "time"
)
const (
@@ -124,62 +123,41 @@
return handler.coreInCompetingMode
}
-// acquireRequest handles transaction processing for device creation and list requests, i.e. when there are no
-// specific id requested (list scenario) or id present in the request (creation use case).
-func (handler *APIHandler) acquireRequest(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+// takeRequestOwnership creates a transaction in the dB for this request and handles the logic of transaction
+// acquisition. If the device is owned by this Core (in a core-pair) then acquire the transaction with a
+// timeout value (in the event this Core dies the transaction times out in the dB causing the other Core in the
+// core-pair to proceed with the it). If the device is not owned then this Core will just monitor the transaction
+// for potential timeouts.
+func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
timeout := handler.defaultRequestTimeout
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
}
- log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
txn, err := handler.createKvTransaction(ctx)
if txn == nil {
return nil, err
- } else if txn.Acquired(timeout) {
+ }
+ var acquired bool
+ if id != nil {
+ var ownedByMe bool
+ if ownedByMe, err = handler.core.deviceOwnership.OwnedByMe(id); err != nil {
+ log.Warnw("getting-ownership-failed", log.Fields{"deviceId": id, "error": err})
+ return nil, errorTransactionInvalidId
+ }
+ acquired, err = txn.Acquired(timeout, ownedByMe)
+ } else {
+ acquired, err = txn.Acquired(timeout)
+ }
+ if err == nil && acquired {
+ log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnId})
return txn, nil
} else {
+ log.Debugw("transaction-not-acquired", log.Fields{"transactionId": txn.txnId, "error": err})
return nil, errorTransactionNotAcquired
}
}
-// takeRequestOwnership creates a transaction in the dB for this request and handles the logic of transaction
-// acquisition. If the device is owned by this Core (in a core-pair) then acquire the transaction with a
-// timeout value (in the event of a timeout the other Core in the core-pair will proceed with the transaction). If the
-// device is not owned then this Core will just monitor the transaction for potential timeouts.
-func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
- t := time.Now()
- timeout := handler.defaultRequestTimeout
- if len(maxTimeout) > 0 {
- timeout = maxTimeout[0]
- }
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
- return nil, err
- }
-
- owned := false
- if id != nil {
- owned = handler.core.deviceOwnership.OwnedByMe(id)
- }
- if owned {
- if txn.Acquired(timeout) {
- log.Debugw("acquired-transaction", log.Fields{"transaction-timeout": timeout})
- return txn, nil
- } else {
- return nil, errorTransactionNotAcquired
- }
- } else {
- if txn.Monitor(timeout) {
- log.Debugw("acquired-transaction-after-timeout", log.Fields{"timeout": timeout, "waited-time": time.Since(t)})
- return txn, nil
- } else {
- log.Debugw("transaction-completed-by-other", log.Fields{"timeout": timeout, "waited-time": time.Since(t)})
- return nil, errorTransactionNotAcquired
- }
- }
-}
-
-// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
+// waitForNilResponseOnSuccess is a helper function to wait for a response on channel monitorCh where an nil
// response is expected in a successful scenario
func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
select {
@@ -390,7 +368,7 @@
func (handler *APIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
log.Debug("ListLogicalDevices-request")
if handler.competeForTransaction() {
- if txn, err := handler.acquireRequest(ctx); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
return &voltha.LogicalDevices{}, err
} else {
defer txn.Close()
@@ -455,7 +433,7 @@
if handler.competeForTransaction() {
// There are no device Id present in this function.
- if txn, err := handler.acquireRequest(ctx); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
return &voltha.Device{}, err
} else {
defer txn.Close()
@@ -557,9 +535,11 @@
if handler.competeForTransaction() {
if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
- if err == errorTransactionNotAcquired && !handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: id.Id}) {
- // Remove the device in memory
- handler.deviceMgr.stopManagingDevice(id.Id)
+ if err == errorTransactionNotAcquired {
+ if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: id.Id}); !ownedByMe && err == nil {
+ // Remove the device in memory
+ handler.deviceMgr.stopManagingDevice(id.Id)
+ }
}
return new(empty.Empty), err
} else {
@@ -808,7 +788,7 @@
//TODO: Update this logic once the OF Controller (OFAgent in this case) can include a transaction Id in its
// request. For performance reason we can let both Cores in a Core-Pair forward the Packet to the adapters and
// let once of the shim layer (kafka proxy or adapter request handler filters out the duplicate packet)
- if handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: packet.Id}) {
+ if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: packet.Id}); ownedByMe && err == nil {
agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id)
agent.packetOut(packet.PacketOut)
}
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 11f3d4e..1b4370d 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -25,12 +25,6 @@
* TRANSACTION_COMPLETE. The standby core observes this update, stops watching the transaction,
* and then deletes the transaction key.
*
- * To ensure the key is removed despite possible standby core failures, a KV operation is
- * scheduled in the background on both cores to delete the key well after the transaction is
- * completed. The value of TransactionContext parameter timeToDeleteCompletedKeys should be
- * long enough, on the order of many seconds, to ensure the standby sees the transaction
- * closure. The aim is to prevent a growing list of TRANSACTION_COMPLETE values from loading
- * the KV store.
*/
package core
@@ -48,23 +42,36 @@
SEIZED_BY_SELF
COMPLETED_BY_OTHER
ABANDONED_BY_OTHER
- STOPPED_WATCHING_KEY
- STOPPED_WAITING_FOR_KEY
+ ABANDONED_WATCH_BY_SELF
)
var errorTransactionNotAcquired = status.Error(codes.Canceled, "transaction-not-acquired")
+var errorTransactionInvalidId = status.Error(codes.Canceled, "transaction-invalid-id")
const (
TRANSACTION_COMPLETE = "TRANSACTION-COMPLETE"
)
+// Transaction constants used to guarantee the Core processing a request hold on to the transaction until
+// it either completes it (either successfully or times out) or the Core itself crashes (e.g. a server failure).
+// If a request has a timeout of x seconds then the Core processing the request will renew the transaction lease
+// every x/NUM_TXN_RENEWAL_PER_REQUEST seconds. After the Core completes the request it stops renewing the
+// transaction and sets the transaction value to TRANSACTION_COMPLETE. If the processing Core crashes then it
+// will not renew the transaction causing the KV store to delete the transaction after its renewal period. The
+// Core watching the transaction will then take over.
+// Since the MIN_TXN_RENEWAL_INTERVAL_IN_SEC is 3 seconds then for any transaction that completes within 3 seconds
+// there won't be a transaction renewal done.
+const (
+ NUM_TXN_RENEWAL_PER_REQUEST = 2
+ MIN_TXN_RENEWAL_INTERVAL_IN_SEC = 3
+ MIN_TXN_RESERVATION_DURATION_IN_SEC = 5
+)
+
type TransactionContext struct {
- kvClient kvstore.Client
- kvOperationTimeout int
- monitorLoopTime int64
- owner string
- timeToDeleteCompletedKeys int
- txnPrefix string
+ kvClient kvstore.Client
+ kvOperationTimeout int
+ owner string
+ txnPrefix string
}
var ctx *TransactionContext
@@ -74,8 +81,7 @@
"SEIZED-BY-SELF",
"COMPLETED-BY-OTHER",
"ABANDONED-BY-OTHER",
- "STOPPED-WATCHING-KEY",
- "STOPPED-WAITING-FOR-KEY"}
+ "ABANDONED_WATCH_BY_SELF"}
func init() {
log.AddPackage(log.JSON, log.DebugLevel, nil)
@@ -85,17 +91,13 @@
owner string,
txnPrefix string,
kvClient kvstore.Client,
- kvOpTimeout int,
- keyDeleteTime int,
- monLoopTime int64) *TransactionContext {
+ kvOpTimeout int) *TransactionContext {
return &TransactionContext{
- owner: owner,
- txnPrefix: txnPrefix,
- kvClient: kvClient,
- kvOperationTimeout: kvOpTimeout,
- monitorLoopTime: monLoopTime,
- timeToDeleteCompletedKeys: keyDeleteTime}
+ owner: owner,
+ txnPrefix: txnPrefix,
+ kvClient: kvClient,
+ kvOperationTimeout: kvOpTimeout}
}
/*
@@ -110,26 +112,20 @@
* only the etcd client is supported.
* :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
* used by this package
- * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
- * a TRANSACTION_COMPLETE key
- * :param monLoopTime: The time in milliseconds that the monitor sleeps between
- * checks for the existence of the transaction key
*/
func SetTransactionContext(owner string,
txnPrefix string,
kvClient kvstore.Client,
- kvOpTimeout int,
- keyDeleteTime int,
- monLoopTime int64) error {
+ kvOpTimeout int) error {
- ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
+ ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout)
return nil
}
type KVTransaction struct {
- ch chan int
- txnId string
- txnKey string
+ monitorCh chan int
+ txnId string
+ txnKey string
}
/*
@@ -145,128 +141,117 @@
}
/*
- * This function returns a boolean indicating whether or not the caller should process
- * the request. True is returned in one of two cases:
- * (1) The current core successfully reserved the request's serial number with the KV store
- * (2) The current core failed in its reservation attempt but observed that the serving core
- * has abandoned processing the request
+ * Acquired is invoked by a Core, upon reception of a request, to reserve the transaction key in the KV store. The
+ * request may be resource specific (i.e will include an ID for that resource) or may be generic (i.e. list a set of
+ * resources). If the request is resource specific then this function should be invoked with the ownedByMe flag to
+ * indicate whether this Core owns this resource. In the case where this Core owns this resource or it is a generic
+ * request then we will proceed to reserve the transaction key in the KV store for a minimum time specified by the
+ * minDuration param. If the reservation request fails (i.e. the other Core got the reservation before this one - this
+ * can happen only for generic request) then the Core will start to watch for changes to the key to determine
+ * whether the other Core completed the transaction successfully or the Core just died. If the Core does not own the
+ * resource then we will proceed to watch the transaction key.
*
- * :param duration: The duration of the reservation in milliseconds
- * :return: true - reservation acquired, process the request
- * false - reservation not acquired, request being processed by another core
+ * :param minDuration: minimum time to reserve the transaction key in the KV store
+ * :param ownedByMe: specify whether the request is about a resource owned or not. If it's absent then this is a
+ * generic request that has no specific resource ID (e.g. list)
+ *
+ * :return: A boolean specifying whether the resource was acquired. An error is return in case this function is invoked
+ * for a resource that is nonexistent.
*/
-func (c *KVTransaction) Acquired(duration int64) bool {
+func (c *KVTransaction) Acquired(minDuration int64, ownedByMe ...bool) (bool, error) {
var acquired bool
var currOwner string = ""
var res int
// Convert milliseconds to seconds, rounding up
// The reservation TTL is specified in seconds
- durationInSecs := duration / 1000
- if remainder := duration % 1000; remainder > 0 {
+ durationInSecs := minDuration / 1000
+ if remainder := minDuration % 1000; remainder > 0 {
durationInSecs++
}
- value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
+ if durationInSecs < int64(MIN_TXN_RESERVATION_DURATION_IN_SEC) {
+ durationInSecs = int64(MIN_TXN_RESERVATION_DURATION_IN_SEC)
+ }
+ genericRequest := true
+ resourceOwned := false
+ if len(ownedByMe) > 0 {
+ genericRequest = false
+ resourceOwned = ownedByMe[0]
+ }
+ if resourceOwned || genericRequest {
+ // Keep the reservation longer that the minDuration (which is really the request timeout) to ensure the
+ // transaction key stays in the KV store until after the Core finalize a request timeout condition (which is
+ // a success from a request completion perspective).
+ if err := c.tryToReserveTxn(durationInSecs * 2); err == nil {
+ res = SEIZED_BY_SELF
+ } else {
+ log.Debugw("watch-other-server",
+ log.Fields{"transactionId": c.txnId, "owner": currOwner, "timeout": durationInSecs})
+ res = c.Watch(durationInSecs)
+ }
+ } else {
+ res = c.Watch(durationInSecs)
+ }
+ switch res {
+ case SEIZED_BY_SELF, ABANDONED_BY_OTHER:
+ acquired = true
+ default:
+ acquired = false
+ }
+ log.Debugw("acquire-transaction-status", log.Fields{"transactionId": c.txnId, "acquired": acquired, "result": txnState[res]})
+ return acquired, nil
+}
- // If the reservation failed, do we simply abort or drop into watch mode anyway?
- // Setting value to nil leads to watch mode
+func (c *KVTransaction) tryToReserveTxn(durationInSecs int64) error {
+ var currOwner string = ""
+ var res int
+ value, err := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
if value != nil {
- if currOwner, err = kvstore.ToString(value); err != nil {
- log.Errorw("unexpected-owner-type", log.Fields{"txn": c.txnId})
- value = nil
+ if currOwner, err = kvstore.ToString(value); err != nil { // This should never happen
+ log.Errorw("unexpected-owner-type", log.Fields{"transactionId": c.txnId, "error": err})
+ return err
+ }
+ if currOwner == ctx.owner {
+ log.Debugw("acquired-transaction", log.Fields{"transactionId": c.txnId, "result": txnState[res]})
+ // Setup the monitoring channel
+ c.monitorCh = make(chan int)
+ go c.holdOnToTxnUntilProcessingCompleted(c.txnKey, ctx.owner, durationInSecs)
+ return nil
}
}
- if err == nil && value != nil && currOwner == ctx.owner {
- // Process the request immediately
- res = SEIZED_BY_SELF
- } else {
- // Another core instance has reserved the request
- // Watch for reservation expiry or successful request completion
- log.Debugw("watch-other-server",
- log.Fields{"txn": c.txnId, "owner": currOwner, "timeout": duration})
-
- res = c.Watch(duration)
- }
- // Clean-up: delete the transaction key after a long delay
- go c.deleteTransactionKey()
-
- log.Debugw("acquire-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
- switch res {
- case SEIZED_BY_SELF, ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY:
- acquired = true
- default:
- acquired = false
- }
- // Ensure the request watcher does not reply before the request server
- if !acquired {
- log.Debugw("Transaction was not ACQUIRED", log.Fields{"txn": c.txnId})
- }
- return acquired
+ return status.Error(codes.PermissionDenied, "reservation-denied")
}
-/*
- * This function monitors the progress of a request that's been reserved by another
- * Voltha core.
- *
- * :param duration: The duration of the reservation in milliseconds
- * :return: true - reservation abandoned by the other core, process the request
- * false - reservation not owned, request being processed by another core
- */
-func (c *KVTransaction) Monitor(duration int64) bool {
- var acquired bool
- var res int
-
- // Convert milliseconds to seconds, rounding up
- // The reservation TTL is specified in seconds
- durationInSecs := duration / 1000
- if remainder := duration % 1000; remainder > 0 {
- durationInSecs++
- }
-
- res = c.Watch(duration)
-
- // Clean-up: delete the transaction key after a long delay
- go c.deleteTransactionKey()
-
- log.Debugw("monitor-transaction", log.Fields{"txn": c.txnId, "result": txnState[res]})
- switch res {
- case ABANDONED_BY_OTHER, STOPPED_WATCHING_KEY, STOPPED_WAITING_FOR_KEY:
- acquired = true
- default:
- acquired = false
- }
- // Ensure the request watcher does not reply before the request server
- if !acquired {
- log.Debugw("Transaction was not acquired", log.Fields{"txn": c.txnId})
- }
- return acquired
-}
-
-// duration in milliseconds
-func (c *KVTransaction) Watch(duration int64) int {
+func (c *KVTransaction) Watch(durationInSecs int64) int {
var res int
events := ctx.kvClient.Watch(c.txnKey)
defer ctx.kvClient.CloseWatch(c.txnKey, events)
+ transactionWasAcquiredByOther := false
+
+ //Check whether the transaction was already completed by the other Core before we got here.
+ if kvp, _ := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout); kvp != nil {
+ transactionWasAcquiredByOther = true
+ if val, err := kvstore.ToString(kvp.Value); err == nil {
+ if val == TRANSACTION_COMPLETE {
+ res = COMPLETED_BY_OTHER
+ // Do an immediate delete of the transaction in the KV Store to free up KV Storage faster
+ c.Delete()
+ return res
+ }
+ } else {
+ // An unexpected value - let's get out of here as something did not go according to plan
+ res = ABANDONED_WATCH_BY_SELF
+ log.Debugw("cannot-read-transaction-value", log.Fields{"txn": c.txnId, "error": err})
+ return res
+ }
+ }
+
for {
select {
- // Add a timeout here in case we miss an event from the KV
- case <-time.After(time.Duration(duration) * time.Millisecond):
- // In case of missing events, let's check the transaction key
- kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
- if err == nil && kvp == nil {
- log.Debugw("missed-delete-event", log.Fields{"txn": c.txnId})
- res = ABANDONED_BY_OTHER
- } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
- log.Debugw("missed-put-event", log.Fields{"txn": c.txnId, "value": val})
- res = COMPLETED_BY_OTHER
- } else {
- log.Debugw("watch-timeout", log.Fields{"txn": c.txnId, "value": val})
- res = STOPPED_WATCHING_KEY
- }
-
case event := <-events:
+ transactionWasAcquiredByOther = true
log.Debugw("received-event", log.Fields{"txn": c.txnId, "type": event.EventType})
if event.EventType == kvstore.DELETE {
// The other core failed to process the request
@@ -274,39 +259,73 @@
} else if event.EventType == kvstore.PUT {
key, e1 := kvstore.ToString(event.Key)
val, e2 := kvstore.ToString(event.Value)
- if e1 == nil && key == c.txnKey && e2 == nil {
+ if e1 == nil && e2 == nil && key == c.txnKey {
if val == TRANSACTION_COMPLETE {
res = COMPLETED_BY_OTHER
- // Successful request completion has been detected
- // Remove the transaction key
+ // Successful request completion has been detected. Remove the transaction key
c.Delete()
} else {
- log.Debugf("Ignoring reservation PUT event with val %v for key %v",
- val, key)
+ log.Debugw("Ignoring-PUT-event", log.Fields{"val": val, "key": key})
continue
}
+ } else {
+ log.Warnw("received-unexpected-PUT-event", log.Fields{"txn": c.txnId, "key": key, "ctxKey": c.txnKey})
}
}
+ case <-time.After(time.Duration(durationInSecs) * time.Second):
+ // Corner case: In the case where the Core owning the device dies and before this Core takes ownership of
+ // this device there is a window where new requests will end up being watched instead of being processed.
+ // Grab the request if the other Core did not create the transaction in the KV store.
+ // TODO: Use a peer-monitoring probe to switch over (still relies on the probe frequency). This will
+ // guarantee that the peer is actually gone instead of limiting the time the peer can get hold of a
+ // request.
+ if !transactionWasAcquiredByOther {
+ log.Debugw("timeout-no-peer", log.Fields{"txId": c.txnId})
+ res = ABANDONED_BY_OTHER
+ } else {
+ continue
+ }
}
break
}
return res
}
-func (c *KVTransaction) deleteTransactionKey() {
- log.Debugw("schedule-key-deletion", log.Fields{"txnId": c.txnId, "txnkey": c.txnKey})
- time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
- log.Debugw("background-key-deletion", log.Fields{"txn": c.txnId, "txnkey": c.txnKey})
- ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
-}
-
func (c *KVTransaction) Close() error {
log.Debugw("close", log.Fields{"txn": c.txnId})
- return ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
+ // Stop monitoring the key (applies only when there has been no transaction switch over)
+ if c.monitorCh != nil {
+ close(c.monitorCh)
+ ctx.kvClient.Put(c.txnKey, TRANSACTION_COMPLETE, ctx.kvOperationTimeout, false)
+ }
+ return nil
}
func (c *KVTransaction) Delete() error {
log.Debugw("delete", log.Fields{"txn": c.txnId})
- err := ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
- return err
+ return ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout, false)
+}
+
+// holdOnToTxnUntilProcessingCompleted renews the transaction lease until the transaction is complete. durationInSecs
+// is used to calculate the frequency at which the Core processing the transaction renews the lease. This function
+// exits only when the transaction is Closed, i.e completed.
+func (c *KVTransaction) holdOnToTxnUntilProcessingCompleted(key string, owner string, durationInSecs int64) {
+ log.Debugw("holdOnToTxnUntilProcessingCompleted", log.Fields{"txn": c.txnId})
+ renewInterval := durationInSecs / NUM_TXN_RENEWAL_PER_REQUEST
+ if renewInterval < MIN_TXN_RENEWAL_INTERVAL_IN_SEC {
+ renewInterval = MIN_TXN_RENEWAL_INTERVAL_IN_SEC
+ }
+forLoop:
+ for {
+ select {
+ case <-c.monitorCh:
+ log.Debugw("transaction-renewal-exits", log.Fields{"txn": c.txnId})
+ break forLoop
+ case <-time.After(time.Duration(renewInterval) * time.Second):
+ if err := ctx.kvClient.RenewReservation(c.txnKey); err != nil {
+ // Log and continue.
+ log.Warnw("transaction-renewal-failed", log.Fields{"txnId": c.txnKey, "error": err})
+ }
+ }
+ }
}
diff --git a/rw_core/main.go b/rw_core/main.go
index 2311029..e74d1bb 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -131,9 +131,7 @@
if err = c.SetTransactionContext(instanceId,
txnPrefix,
rw.kvClient,
- rw.config.KVStoreTimeout,
- rw.config.KVTxnKeyDelTime,
- 1); err != nil {
+ rw.config.KVStoreTimeout); err != nil {
log.Fatal("creating-transaction-context-failed")
}
}