VOL-1512: Set device Active ownership per Core in a Core pair
- Changed NB & SB APIs to seize requests based on device ownership
- Added queue support for change-events
- Need to make prefix & timeout for the device ownership key configurable,
currently hard-coded
- Need to make KV Transaction Monitor timeout configurable,
currently hard-coded
- Need to clean up AdapterRequestHandlerProxy & LogicalDeviceManager
constructors
Change-Id: Ieeb3df6d70baa529b87c8253cb9f0f5b2a94382a
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 7b486bd..d7834eb 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -53,6 +53,7 @@
logicalDeviceMgr *LogicalDeviceManager
adapterMgr *AdapterManager
packetInQueue *queue.Queue
+ changeEventQueue *queue.Queue
coreInCompetingMode bool
longRunningRequestTimeout int64
defaultRequestTimeout int64
@@ -84,6 +85,7 @@
defaultRequestTimeout:core.config.DefaultRequestTimeout,
// TODO: Figure out what the 'hint' parameter to queue.New does
packetInQueue: queue.New(10),
+ changeEventQueue: queue.New(10),
core: core,
}
return handler
@@ -138,7 +140,8 @@
return handler.coreInCompetingMode
}
-func (handler *APIHandler) acquireTransaction(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
+// This function handles the creation of new devices
+func (handler *APIHandler) acquireRequest(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
timeout := handler.defaultRequestTimeout
if len(maxTimeout) > 0 {
timeout = maxTimeout[0]
@@ -168,6 +171,41 @@
}
}
+// This function handles the modification or deletion of existing devices
+func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
+ timeout := handler.defaultRequestTimeout
+ if len(maxTimeout) > 0 {
+ timeout = maxTimeout[0]
+ }
+ log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return nil, err
+ }
+
+ owned := false
+ if id != nil {
+ if devId, ok := id.(*deviceID); ok {
+ owned = handler.core.deviceOwnership.OwnedByMe(devId.id)
+ } else if lDevId, ok := id.(*logicalDeviceID); ok {
+ owned = handler.core.deviceOwnership.OwnedByMe(lDevId.id)
+ }
+ }
+ if owned {
+ if txn.Acquired(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("failed-to-seize-request")
+ }
+ } else {
+ if txn.Monitor(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("device-not-owned")
+ }
+ }
+}
+
// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
// response is expected in a successful scenario
func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
@@ -226,7 +264,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -247,7 +285,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -269,7 +307,7 @@
if handler.competeForTransaction() {
if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -292,7 +330,7 @@
if handler.competeForTransaction() {
if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -377,7 +415,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
+ if txn, err := handler.acquireRequest(ctx, nil); err != nil {
return &voltha.Device{}, err
} else {
defer txn.Close()
@@ -394,6 +432,7 @@
return &voltha.Device{}, err
}
if d, ok := res.(*voltha.Device); ok {
+ handler.core.deviceOwnership.OwnedByMe(d.Id)
return d, nil
}
}
@@ -414,7 +453,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -435,7 +474,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -456,7 +495,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -477,7 +516,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -499,7 +538,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:img.Id}); err != nil {
return &common.OperationResp{}, err
} else {
defer txn.Close()
@@ -590,7 +629,7 @@
failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
if handler.competeForTransaction() {
- if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:img.Id}); err != nil {
return failedresponse, err
} else {
defer txn.Close()
@@ -760,7 +799,10 @@
//}
event := openflow_13.ChangeEvent{Id: deviceId, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
log.Debugw("sendChangeEvent", log.Fields{"event": event})
- // TODO: put the packet in the queue
+ // Enqueue the change event
+ if err := handler.changeEventQueue.Put(event); err != nil {
+ log.Errorw("failed-to-enqueue-change-event", log.Fields{"error": err})
+ }
}
func (handler *APIHandler) ReceiveChangeEvents(
@@ -769,16 +811,18 @@
) error {
log.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
for {
- // TODO: need to retrieve packet from queue
- event := &openflow_13.ChangeEvent{}
- time.Sleep(time.Duration(5) * time.Second)
- err := changeEvents.Send(event)
- if err != nil {
- log.Errorw("Failed to send change event", log.Fields{"error": err})
+ // Dequeue a change event
+ if events, err := handler.changeEventQueue.Get(1); err == nil {
+ log.Debugw("dequeued-change-event", log.Fields{"event": events[0]})
+ if event, ok := events[0].(openflow_13.ChangeEvent); ok {
+ log.Debugw("sending-change-event", log.Fields{"event": event})
+ if err := changeEvents.Send(&event); err != nil {
+ log.Errorw("failed-to-send-change-event", log.Fields{"error": err})
+ }
+ }
}
}
- // TODO: put the packet in the queue
- }
+}
func (handler *APIHandler) Subscribe(
ctx context.Context,