VOL-1512: Set device Active ownership per Core in a Core pair
- Changed NB & SB APIs to seize requests based on device ownership
- Added queue support for change-events
- Need to make prefix & timeout for the device ownership key configurable,
  currently hard-coded
- Need to make KV Transaction Monitor timeout configurable,
  currently hard-coded
- Need to clean up AdapterRequestHandlerProxy & LogicalDeviceManager
  constructors

Change-Id: Ieeb3df6d70baa529b87c8253cb9f0f5b2a94382a
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 7b486bd..d7834eb 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -53,6 +53,7 @@
 	logicalDeviceMgr *LogicalDeviceManager
 	adapterMgr *AdapterManager
 	packetInQueue    *queue.Queue
+	changeEventQueue *queue.Queue
 	coreInCompetingMode bool
 	longRunningRequestTimeout int64
 	defaultRequestTimeout int64
@@ -84,6 +85,7 @@
 		defaultRequestTimeout:core.config.DefaultRequestTimeout,
 		// TODO: Figure out what the 'hint' parameter to queue.New does
 		packetInQueue: queue.New(10),
+		changeEventQueue: queue.New(10),
 		core: core,
 	}
 	return handler
@@ -138,7 +140,8 @@
 	return handler.coreInCompetingMode
 }
 
-func (handler *APIHandler) acquireTransaction(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
+// This function handles the creation of new devices
+func (handler *APIHandler) acquireRequest(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := handler.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
@@ -168,6 +171,41 @@
 	}
 }
 
+// This function handles the modification or deletion of existing devices
+func (handler *APIHandler) takeRequestOwnership(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
+	timeout := handler.defaultRequestTimeout
+	if len(maxTimeout) > 0 {
+		timeout = maxTimeout[0]
+	}
+	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+	txn, err := handler.createKvTransaction(ctx)
+	if txn == nil {
+		return nil,  err
+	}
+
+	owned := false
+	if id != nil {
+		if devId, ok := id.(*deviceID); ok {
+			owned = handler.core.deviceOwnership.OwnedByMe(devId.id)
+		} else if lDevId, ok := id.(*logicalDeviceID); ok {
+			owned = handler.core.deviceOwnership.OwnedByMe(lDevId.id)
+		}
+	}
+	if owned {
+		if txn.Acquired(timeout) {
+			return txn, nil
+		} else {
+			return nil, errors.New("failed-to-seize-request")
+		}
+	} else {
+		if txn.Monitor(timeout) {
+			return txn, nil
+		} else {
+			return nil, errors.New("device-not-owned")
+		}
+	}
+}
+
 // waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
 // response is expected in a successful scenario
 func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
@@ -226,7 +264,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -247,7 +285,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -269,7 +307,7 @@
 
 	if handler.competeForTransaction() {
 		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
+			if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
@@ -292,7 +330,7 @@
 
 	if handler.competeForTransaction() {
 		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
+			if txn, err := handler.takeRequestOwnership(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
@@ -377,7 +415,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
+		if txn, err := handler.acquireRequest(ctx, nil); err != nil {
 			return &voltha.Device{}, err
 		} else {
 			defer txn.Close()
@@ -394,6 +432,7 @@
 				return &voltha.Device{}, err
 			}
 			if d, ok := res.(*voltha.Device); ok {
+				handler.core.deviceOwnership.OwnedByMe(d.Id)
 				return d, nil
 			}
 		}
@@ -414,7 +453,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -435,7 +474,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -456,7 +495,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -477,7 +516,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, nil); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -499,7 +538,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:img.Id}); err != nil {
 			return &common.OperationResp{}, err
 		} else {
 			defer txn.Close()
@@ -590,7 +629,7 @@
 	failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
+		if txn, err := handler.takeRequestOwnership(ctx, &deviceID{id:img.Id}); err != nil {
 			return failedresponse, err
 		} else {
 			defer txn.Close()
@@ -760,7 +799,10 @@
 	//}
 	event := openflow_13.ChangeEvent{Id: deviceId, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
 	log.Debugw("sendChangeEvent", log.Fields{"event": event})
-	// TODO: put the packet in the queue
+	// Enqueue the change event
+	if err := handler.changeEventQueue.Put(event); err != nil {
+		log.Errorw("failed-to-enqueue-change-event", log.Fields{"error": err})
+	}
 }
 
 func (handler *APIHandler) ReceiveChangeEvents(
@@ -769,16 +811,18 @@
 ) error {
 	log.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
 	for {
-		// TODO: need to retrieve packet from queue
-		event := &openflow_13.ChangeEvent{}
-		time.Sleep(time.Duration(5) * time.Second)
-		err := changeEvents.Send(event)
-		if err != nil {
-			log.Errorw("Failed to send change event", log.Fields{"error": err})
+		// Dequeue a change event
+		if events, err := handler.changeEventQueue.Get(1); err == nil {
+			log.Debugw("dequeued-change-event", log.Fields{"event": events[0]})
+			if event, ok := events[0].(openflow_13.ChangeEvent); ok {
+				log.Debugw("sending-change-event", log.Fields{"event": event})
+				if err := changeEvents.Send(&event); err != nil {
+					log.Errorw("failed-to-send-change-event", log.Fields{"error": err})
+				}
+			}
 		}
 	}
-	// TODO: put the packet in the queue
-	}
+}
 
 func (handler *APIHandler) Subscribe(
 	ctx context.Context,