VOL-1512: Set device Active ownership per Core in a Core pair
- Changed NB & SB APIs to seize requests based on device ownership
- Added queue support for change-events
- Need to make prefix & timeout for the device ownership key configurable,
currently hard-coded
- Need to make KV Transaction Monitor timeout configurable,
currently hard-coded
- Need to clean up AdapterRequestHandlerProxy & LogicalDeviceManager
constructors
Change-Id: Ieeb3df6d70baa529b87c8253cb9f0f5b2a94382a
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index f5295c4..3d9487f 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -40,12 +40,14 @@
defaultRequestTimeout int64
longRunningRequestTimeout int64
coreInCompetingMode bool
+ core *Core
}
-func NewAdapterRequestHandlerProxy(coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
+func NewAdapterRequestHandlerProxy(core *Core, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout int64,
defaultRequestTimeout int64) *AdapterRequestHandlerProxy {
var proxy AdapterRequestHandlerProxy
+ proxy.core = core
proxy.coreInstanceId = coreInstanceId
proxy.deviceMgr = dMgr
proxy.lDeviceMgr = ldMgr
@@ -58,7 +60,7 @@
return &proxy
}
-func (rhp *AdapterRequestHandlerProxy) acquireTransaction(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
+func (rhp *AdapterRequestHandlerProxy) acquireRequest(transactionId string, maxTimeout ...int64) (*KVTransaction, error) {
timeout := rhp.defaultRequestTimeout
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
@@ -74,6 +76,33 @@
}
}
+// This is a helper function that attempts to acquire the request by using the device ownership model
+func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(transactionId string, devId string, maxTimeout ...int64) (*KVTransaction, error) {
+ timeout := rhp.defaultRequestTimeout
+ if len(maxTimeout) > 0 {
+ timeout = maxTimeout[0]
+ }
+ log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+ txn := NewKVTransaction(transactionId)
+ if txn == nil {
+ return nil, errors.New("fail-to-create-transaction")
+ }
+
+ if rhp.core.deviceOwnership.OwnedByMe(devId) {
+ if txn.Acquired(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("failed-to-seize-request")
+ }
+ } else {
+ if txn.Monitor(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("device-not-owned")
+ }
+ }
+}
+
// competeForTransaction is a helper function to determine whether every request needs to compete with another
// Core to execute the request
func (rhp *AdapterRequestHandlerProxy) competeForTransaction() bool {
@@ -112,7 +141,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// Update our adapters in memory
go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory()
@@ -156,7 +185,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -224,7 +253,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, device.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -294,7 +323,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -336,7 +365,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.acquireRequest(transactionID.Val); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -416,7 +445,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -500,7 +529,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -557,7 +586,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -616,7 +645,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -682,7 +711,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -733,7 +762,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -785,7 +814,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, pmConfigs.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
@@ -887,7 +916,7 @@
// Try to grab the transaction as this core may be competing with another Core
if rhp.competeForTransaction() {
- if txn, err := rhp.acquireTransaction(transactionID.Val); err != nil {
+ if txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceId.Id); err != nil {
log.Debugw("Another core handled the request", log.Fields{"transactionID": transactionID})
// returning nil, nil instructs the callee to ignore this request
return nil, nil
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 13baf84..02f99fd 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -50,6 +50,7 @@
kafkaClient kafka.Client
coreMembership *voltha.Membership
membershipLock *sync.RWMutex
+ deviceOwnership *DeviceOwnership
}
func init() {
@@ -64,6 +65,10 @@
core.kvClient = kvClient
core.kafkaClient = kafkaClient
+ // Setup device ownership context
+ core.deviceOwnership = NewDeviceOwnership(id, kvClient,
+ "service/voltha/owns_device", 60)
+
// Setup the KV store
// Do not call NewBackend constructor; it creates its own KV client
// Commented the backend for now until the issue between the model and the KV store
@@ -90,8 +95,8 @@
}
log.Info("values", log.Fields{"kmp": core.kmp})
core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId)
- core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.adapterMgr, core.instanceId)
- core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
+ core.deviceMgr = newDeviceManager(core)
+ core.logicalDeviceMgr = newLogicalDeviceManager(core, core.deviceMgr, core.kmp, core.clusterDataProxy)
if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
log.Fatal("Failure-registering-adapterRequestHandler")
@@ -179,7 +184,7 @@
func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
) error {
- requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
+ requestProxy := NewAdapterRequestHandlerProxy(core, coreInstanceId, dMgr, ldMgr, aMgr, cdProxy, ldProxy,
core.config.InCompetingMode, core.config.LongRunningRequestTimeout, core.config.DefaultRequestTimeout)
// Register the broadcast topic to handle any core-bound broadcast requests
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 09eca3b..06d3bd4 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -33,6 +33,7 @@
type DeviceManager struct {
deviceAgents map[string]*DeviceAgent
+ core *Core
adapterProxy *AdapterProxy
adapterMgr *AdapterManager
logicalDeviceMgr *LogicalDeviceManager
@@ -44,15 +45,16 @@
lockDeviceAgentsMap sync.RWMutex
}
-func newDeviceManager(kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy, adapterMgr *AdapterManager, coreInstanceId string) *DeviceManager {
+func newDeviceManager(core *Core) *DeviceManager {
var deviceMgr DeviceManager
+ deviceMgr.core = core
deviceMgr.exitChannel = make(chan int, 1)
deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
- deviceMgr.adapterProxy = NewAdapterProxy(kafkaICProxy)
- deviceMgr.kafkaICProxy = kafkaICProxy
- deviceMgr.coreInstanceId = coreInstanceId
- deviceMgr.clusterDataProxy = cdProxy
- deviceMgr.adapterMgr = adapterMgr
+ deviceMgr.kafkaICProxy = core.kmp
+ deviceMgr.adapterProxy = NewAdapterProxy(core.kmp)
+ deviceMgr.coreInstanceId = core.instanceId
+ deviceMgr.clusterDataProxy = core.clusterDataProxy
+ deviceMgr.adapterMgr = core.adapterMgr
deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
return &deviceMgr
}
@@ -184,6 +186,7 @@
if res == nil { //Success
agent.stop(ctx)
dMgr.deleteDeviceAgentToMap(agent)
+ dMgr.core.deviceOwnership.AbandonDevice(id.Id)
}
log.Debugw("deleteDevice-result", log.Fields{"result": res})
} else {
@@ -624,6 +627,9 @@
dMgr.addDeviceAgentToMap(agent)
agent.start(nil, false)
+ // Set device ownership
+ dMgr.core.deviceOwnership.OwnedByMe(agent.deviceId)
+
// Activate the child device
if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
go agent.enableDevice(nil)
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index b881351..f229383 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -81,17 +81,34 @@
}
func (da *DeviceOwnership) startOwnershipMonitoring(id string, chnl chan int) {
+ var op string
+
startloop:
for {
- if err := da.setOwnership(id, da.tryToReserveKey(id)); err != nil {
- log.Errorw("unexpected-error", log.Fields{"error": err})
+ da.deviceMapLock.RLock()
+ val, exist := da.deviceMap[id]
+ da.deviceMapLock.RUnlock()
+ if exist && val.owned {
+ // Device owned; renew reservation
+ op = "renew"
+ kvKey := fmt.Sprintf("%s_%s", da.ownershipPrefix, id)
+ if err := da.kvClient.RenewReservation(kvKey); err != nil {
+ log.Errorw("reservation-renewal-error", log.Fields{"error": err})
+ }
+ } else {
+ // Device not owned; try to seize ownership
+ op = "retry"
+ if err := da.setOwnership(id, da.tryToReserveKey(id)); err != nil {
+ log.Errorw("unexpected-error", log.Fields{"error": err})
+ }
}
select {
case <-da.exitChannel:
log.Infow("closing-monitoring", log.Fields{"Id": id})
break startloop
case <-time.After(time.Duration(da.reservationTimeout) / 3 * time.Second):
- log.Infow("renew-reservation", log.Fields{"Id": id})
+ msg := fmt.Sprintf("%s-reservation", op)
+ log.Infow(msg, log.Fields{"Id": id})
case <-chnl:
log.Infow("closing-device-monitoring", log.Fields{"Id": id})
break startloop
@@ -145,4 +162,4 @@
return nil
}
return status.Error(codes.NotFound, fmt.Sprintf("id-inexistent-%s", id))
-}
\ No newline at end of file
+}
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 7b486bd..d7834eb 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -53,6 +53,7 @@
logicalDeviceMgr *LogicalDeviceManager
adapterMgr *AdapterManager
packetInQueue *queue.Queue
+ changeEventQueue *queue.Queue
coreInCompetingMode bool
longRunningRequestTimeout int64
defaultRequestTimeout int64
@@ -84,6 +85,7 @@
defaultRequestTimeout:core.config.DefaultRequestTimeout,
// TODO: Figure out what the 'hint' parameter to queue.New does
packetInQueue: queue.New(10),
+ changeEventQueue: queue.New(10),
core: core,
}
return handler
@@ -138,7 +140,8 @@
return handler.coreInCompetingMode
}
-func (handler *APIHandler) acquireTransaction(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
+// This function handles the creation of new devices
+func (handler *APIHandler) acquireRequest(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
timeout := handler.defaultRequestTimeout
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
@@ -168,6 +171,41 @@
}
}
+// This function handles the modification or deletion of existing devices
+func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
+ timeout := handler.defaultRequestTimeout
+ if len(maxTimeout) > 0 {
+ timeout = maxTimeout[0]
+ }
+ log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return nil, err
+ }
+
+ owned := false
+ if id != nil {
+ if devId, ok := id.(*deviceID); ok {
+ owned = handler.core.deviceOwnership.OwnedByMe(devId.id)
+ } else if lDevId, ok := id.(*logicalDeviceID); ok {
+ owned = handler.core.deviceOwnership.OwnedByMe(lDevId.id)
+ }
+ }
+ if owned {
+ if txn.Acquired(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("failed-to-seize-request")
+ }
+ } else {
+ if txn.Monitor(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("device-not-owned")
+ }
+ }
+}
+
// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
// response is expected in a successful scenario
func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
@@ -226,7 +264,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -247,7 +285,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -269,7 +307,7 @@
if handler.competeForTransaction() {
if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -292,7 +330,7 @@
if handler.competeForTransaction() {
if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -377,7 +415,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
+ if txn, err := handler.acquireRequest(ctx, nil); err != nil {
return &voltha.Device{}, err
} else {
defer txn.Close()
@@ -394,6 +432,7 @@
return &voltha.Device{}, err
}
if d, ok := res.(*voltha.Device); ok {
+ handler.core.deviceOwnership.OwnedByMe(d.Id)
return d, nil
}
}
@@ -414,7 +453,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -435,7 +474,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -456,7 +495,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -477,7 +516,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -499,7 +538,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:img.Id}); err != nil {
return &common.OperationResp{}, err
} else {
defer txn.Close()
@@ -590,7 +629,7 @@
failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:img.Id}); err != nil {
return failedresponse, err
} else {
defer txn.Close()
@@ -760,7 +799,10 @@
//}
event := openflow_13.ChangeEvent{Id: deviceId, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
log.Debugw("sendChangeEvent", log.Fields{"event": event})
- // TODO: put the packet in the queue
+ // Enqueue the change event
+ if err := handler.changeEventQueue.Put(event); err != nil {
+ log.Errorw("failed-to-enqueue-change-event", log.Fields{"error": err})
+ }
}
func (handler *APIHandler) ReceiveChangeEvents(
@@ -769,16 +811,18 @@
) error {
log.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
for {
- // TODO: need to retrieve packet from queue
- event := &openflow_13.ChangeEvent{}
- time.Sleep(time.Duration(5) * time.Second)
- err := changeEvents.Send(event)
- if err != nil {
- log.Errorw("Failed to send change event", log.Fields{"error": err})
+ // Dequeue a change event
+ if events, err := handler.changeEventQueue.Get(1); err == nil {
+ log.Debugw("dequeued-change-event", log.Fields{"event": events[0]})
+ if event, ok := events[0].(openflow_13.ChangeEvent); ok {
+ log.Debugw("sending-change-event", log.Fields{"event": event})
+ if err := changeEvents.Send(&event); err != nil {
+ log.Errorw("failed-to-send-change-event", log.Fields{"error": err})
+ }
+ }
}
}
- // TODO: put the packet in the queue
- }
+}
func (handler *APIHandler) Subscribe(
ctx context.Context,
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index a3a29f0..47249f6 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -31,6 +31,7 @@
type LogicalDeviceManager struct {
logicalDeviceAgents map[string]*LogicalDeviceAgent
+ core *Core
deviceMgr *DeviceManager
grpcNbiHdlr *APIHandler
adapterProxy *AdapterProxy
@@ -40,8 +41,9 @@
lockLogicalDeviceAgentsMap sync.RWMutex
}
-func newLogicalDeviceManager(deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
+func newLogicalDeviceManager(core *Core, deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
var logicalDeviceMgr LogicalDeviceManager
+ logicalDeviceMgr.core = core
logicalDeviceMgr.exitChannel = make(chan int, 1)
logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
logicalDeviceMgr.deviceMgr = deviceMgr
@@ -182,6 +184,9 @@
ldMgr.addLogicalDeviceAgentToMap(agent)
go agent.start(ctx, false)
+ // Set device ownership
+ ldMgr.core.deviceOwnership.OwnedByMe(id)
+
log.Debug("creating-logical-device-ends")
return &id, nil
}
@@ -219,6 +224,7 @@
agent.stop(ctx)
//Remove the logical device agent from the Map
ldMgr.deleteLogicalDeviceAgent(logDeviceId)
+ ldMgr.core.deviceOwnership.AbandonDevice(logDeviceId)
}
log.Debug("deleting-logical-device-ends")
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index e7125d3..ec7e4ca 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -56,6 +56,7 @@
type TransactionContext struct {
kvClient kvstore.Client
kvOperationTimeout int
+ monitorLoopTime int64
owner string
timeToDeleteCompletedKeys int
txnPrefix string
@@ -71,7 +72,7 @@
"STOPPED-WAITING-FOR-OTHER"}
func init() {
- log.AddPackage(log.JSON, log.WarnLevel, nil)
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
}
func NewTransactionContext(
@@ -79,13 +80,15 @@
txnPrefix string,
kvClient kvstore.Client,
kvOpTimeout int,
- keyDeleteTime int) *TransactionContext {
+ keyDeleteTime int,
+ monLoopTime int64) *TransactionContext {
return &TransactionContext{
owner: owner,
txnPrefix: txnPrefix,
kvClient: kvClient,
kvOperationTimeout: kvOpTimeout,
+ monitorLoopTime: monLoopTime,
timeToDeleteCompletedKeys: keyDeleteTime}
}
@@ -99,18 +102,21 @@
* will be created (e.g. "service/voltha/transactions")
* :param kvClient: The client API used for all interactions with the KV store. Currently
* only the etcd client is supported.
- * :param: kvOpTimeout: The maximum time to be taken by any KV operation used by this
- * package
- * :param keyDeleteTime: The time to wait, in the background, before deleting a
- * TRANSACTION_COMPLETE key
+ * :param: kvOpTimeout: The maximum time, in seconds, to be taken by any KV operation
+ * used by this package
+ * :param keyDeleteTime: The time (seconds) to wait, in the background, before deleting
+ * a TRANSACTION_COMPLETE key
+ * :param monLoopTime: The time in milliseconds that the monitor sleeps between
+ * checks for the existence of the transaction key
*/
func SetTransactionContext(owner string,
txnPrefix string,
kvClient kvstore.Client,
kvOpTimeout int,
- keyDeleteTime int) error {
+ keyDeleteTime int,
+ monLoopTime int64) error {
- ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime)
+ ctx = NewTransactionContext(owner, txnPrefix, kvClient, kvOpTimeout, keyDeleteTime, monLoopTime)
return nil
}
@@ -170,42 +176,10 @@
} else {
// Another core instance has reserved the request
// Watch for reservation expiry or successful request completion
- events := ctx.kvClient.Watch(c.txnKey)
log.Debugw("watch-other-server",
log.Fields{"owner": currOwner, "timeout": duration})
- select {
- // Add a timeout here in case we miss an event from the KV
- case <-time.After(time.Duration(duration) * time.Millisecond):
- // In case of missing events, let's check the transaction key
- kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
- if err == nil && kvp == nil {
- log.Debug("missed-deleted-event")
- res = ABANDONED_BY_OTHER
- } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
- log.Debugw("missed-put-event",
- log.Fields{"key": c.txnKey, "value": val})
- res = COMPLETED_BY_OTHER
- } else {
- res = STOPPED_WAITING_FOR_OTHER
- }
-
- case event := <-events:
- log.Debugw("received-event", log.Fields{"type": event.EventType})
- if event.EventType == kvstore.DELETE {
- // The other core failed to process the request; step up
- res = ABANDONED_BY_OTHER
- } else if event.EventType == kvstore.PUT {
- key, e1 := kvstore.ToString(event.Key)
- val, e2 := kvstore.ToString(event.Value)
- if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
- res = COMPLETED_BY_OTHER
- // Successful request completion has been detected
- // Remove the transaction key
- c.Delete()
- }
- }
- }
+ res = c.Watch(duration)
}
// Clean-up: delete the transaction key after a long delay
go c.deleteTransactionKey()
@@ -224,6 +198,103 @@
return acquired
}
+/*
+ * This function monitors the progress of a request that's been reserved by another
+ * Voltha core.
+ *
+ * :param duration: The duration of the reservation in milliseconds
+ * :return: true - reservation abandoned by the other core, process the request
+ * false - reservation not owned, request being processed by another core
+ */
+func (c *KVTransaction) Monitor(duration int64) bool {
+ var acquired bool
+ var res int
+ var timeElapsed int64
+
+ // Convert milliseconds to seconds, rounding up
+ // The reservation TTL is specified in seconds
+ durationInSecs := duration / 1000
+ if remainder := duration % 1000; remainder > 0 {
+ durationInSecs++
+ }
+ // Check if transaction key has been set
+ keyExists := false
+ for timeElapsed = 0; timeElapsed < duration; timeElapsed = timeElapsed + ctx.monitorLoopTime {
+ kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
+ if err == nil && kvp == nil {
+ // This core has received the request before the core that actually
+ // owns the device. The owning core has yet to seize the transaction.
+ time.Sleep(time.Duration(ctx.monitorLoopTime) * time.Millisecond)
+ } else {
+ keyExists = true
+ log.Debug("waited-for-other-to-reserve-transaction")
+ break
+ }
+ }
+ if keyExists {
+ // Watch for reservation expiry or successful request completion
+ log.Debugw("watch-other-server", log.Fields{"timeout": duration})
+ res = c.Watch(duration)
+ } else {
+ res = STOPPED_WAITING_FOR_OTHER
+ }
+ // Clean-up: delete the transaction key after a long delay
+ go c.deleteTransactionKey()
+
+ log.Debugw("own-transaction", log.Fields{"result": txnState[res]})
+ switch res {
+ case ABANDONED_BY_OTHER, STOPPED_WAITING_FOR_OTHER:
+ acquired = true
+ default:
+ acquired = false
+ }
+ // Ensure the request watcher does not reply before the request server
+ if !acquired {
+ time.Sleep(1 * time.Second)
+ }
+ return acquired
+}
+
+// duration in milliseconds
+func (c *KVTransaction) Watch(duration int64) int {
+ var res int
+
+ events := ctx.kvClient.Watch(c.txnKey)
+ select {
+ // Add a timeout here in case we miss an event from the KV
+ case <-time.After(time.Duration(duration) * time.Millisecond):
+ // In case of missing events, let's check the transaction key
+ kvp, err := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout, false)
+ if err == nil && kvp == nil {
+ log.Debug("missed-deleted-event")
+ res = ABANDONED_BY_OTHER
+ } else if val, err := kvstore.ToString(kvp.Value); err == nil && val == TRANSACTION_COMPLETE {
+ log.Debugw("missed-put-event",
+ log.Fields{"key": c.txnKey, "value": val})
+ res = COMPLETED_BY_OTHER
+ } else {
+ res = STOPPED_WAITING_FOR_OTHER
+ }
+
+ case event := <-events:
+ log.Debugw("received-event", log.Fields{"type": event.EventType})
+ if event.EventType == kvstore.DELETE {
+ // The other core failed to process the request
+ res = ABANDONED_BY_OTHER
+ } else if event.EventType == kvstore.PUT {
+ key, e1 := kvstore.ToString(event.Key)
+ val, e2 := kvstore.ToString(event.Value)
+ if e1 == nil && key == c.txnKey && e2 == nil && val == TRANSACTION_COMPLETE {
+ res = COMPLETED_BY_OTHER
+ // Successful request completion has been detected
+ // Remove the transaction key
+ c.Delete()
+ }
+ }
+ }
+ return res
+}
+
func (c *KVTransaction) deleteTransactionKey() {
log.Debugw("schedule-key-deletion", log.Fields{"key": c.txnKey})
time.Sleep(time.Duration(ctx.timeToDeleteCompletedKeys) * time.Second)
diff --git a/rw_core/main.go b/rw_core/main.go
index 41d595b..92b16d2 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -129,7 +129,8 @@
txnPrefix,
rw.kvClient,
rw.config.KVStoreTimeout,
- rw.config.KVTxnKeyDelTime); err != nil {
+ rw.config.KVTxnKeyDelTime,
+ 10); err != nil {
log.Fatal("creating-transaction-context-failed")
}
}