Add transaction key for ofagent request.

Since the afrouter (api-server) binds an ofagent to two rw_core
forming a core-pair and transparently forward requests/responses,
then the ofagent needs to include a transaction key-value when
sending a request to the rw_core.  This will allow the rw_cores in
the pair to compete for the transaction with the winning one
fulfilling the requests while the other Core monitoring the
transaction in case the winning core fails to process the
transaction.

Change-Id: I231ac3c027d40a475f0c395fc8123e9b54fd35d0
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 1ad0766..8f7b328 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -121,7 +121,24 @@
 	return handler.coreInCompetingMode
 }
 
-// This function handles the creation of new devices
+// acquireRequestForList handles transaction processing for list requests, i.e. when there are no specific id requested.
+func (handler *APIHandler) acquireRequestForList(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+	timeout := handler.defaultRequestTimeout
+	if len(maxTimeout) > 0 {
+		timeout = maxTimeout[0]
+	}
+	log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
+	txn, err := handler.createKvTransaction(ctx)
+	if txn == nil {
+		return nil, err
+	} else if txn.Acquired(timeout) {
+		return txn, nil
+	} else {
+		return nil, errors.New("failed-to-seize-request")
+	}
+}
+
+// acquireRequest handles transaction processing for creation of new devices
 func (handler *APIHandler) acquireRequest(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := handler.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
@@ -237,6 +254,19 @@
 	return &voltha.Membership{}, nil
 }
 
+func (handler *APIHandler) GetLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+	log.Debugw("GetLogicalDevicePort-request", log.Fields{"id": *id})
+
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &voltha.LogicalPort{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
+	return handler.logicalDeviceMgr.getLogicalPort(id)
+}
+
 func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
 	log.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
 	if isTestMode(ctx) {
@@ -286,17 +316,11 @@
 		return out, nil
 	}
 
-	// TODO: Update this logic when the OF Controller (OFAgent in this case) is able to send a transaction Id in its
-	// request (the api-router binds the OfAgent to two Cores in a pair and let the traffic flows transparently)
 	if handler.competeForTransaction() {
-		if !handler.isOFControllerRequest(ctx) {
-			if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
-				return new(empty.Empty), err
-			} else {
-				defer txn.Close()
-			}
-		} else if !handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: flow.Id}) {
-			return new(empty.Empty), nil
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
+			return new(empty.Empty), err
+		} else {
+			defer txn.Close()
 		}
 	}
 
@@ -313,17 +337,11 @@
 		return out, nil
 	}
 
-	// TODO: Update this logic when the OF Controller (OFAgent in this case) is able to send a transaction Id in its
-	// request (the api-router binds the OfAgent to two Cores in a pair and let the traffic flows transparently)
 	if handler.competeForTransaction() {
-		if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
-				return new(empty.Empty), err
-			} else {
-				defer txn.Close()
-			}
-		} else if !handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{Id: flow.Id}) {
-			return new(empty.Empty), nil
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
+			return new(empty.Empty), err
+		} else {
+			defer txn.Close()
 		}
 	}
 
@@ -371,15 +389,32 @@
 	return waitForNilResponseOnSuccess(ctx, ch)
 }
 
-// GetLogicalDevice must be implemented in the read-only containers - should it also be implemented here?
 func (handler *APIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
 	log.Debugw("GetLogicalDevice-request", log.Fields{"id": id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &voltha.LogicalDevice{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
 	return handler.logicalDeviceMgr.getLogicalDevice(id.Id)
 }
 
-// ListLogicalDevices must be implemented in the read-only containers - should it also be implemented here?
 func (handler *APIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
-	log.Debug("ListLogicalDevices")
+	log.Debug("ListLogicalDevices-request")
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireRequestForList(ctx); err != nil {
+			return &voltha.LogicalDevices{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
+	if handler.isOFControllerRequest(ctx) {
+		//	Since an OF controller is only interested in the set of logical devices managed by thgis Core then return
+		//	only logical devices managed/monitored by this Core.
+		return handler.logicalDeviceMgr.listManagedLogicalDevices()
+	}
 	return handler.logicalDeviceMgr.listLogicalDevices()
 }
 
@@ -391,17 +426,37 @@
 
 func (handler *APIHandler) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
 	log.Debugw("ListLogicalDeviceFlows", log.Fields{"id": *id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &openflow_13.Flows{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
 	return handler.logicalDeviceMgr.ListLogicalDeviceFlows(ctx, id.Id)
 }
 
 func (handler *APIHandler) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
 	log.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"id": *id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &openflow_13.FlowGroups{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
 	return handler.logicalDeviceMgr.ListLogicalDeviceFlowGroups(ctx, id.Id)
 }
 
-// ListLogicalDevicePorts must be implemented in the read-only containers - should it also be implemented here?
 func (handler *APIHandler) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
 	log.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
+	if handler.competeForTransaction() {
+		if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
+			return &voltha.LogicalPorts{}, err
+		} else {
+			defer txn.Close()
+		}
+	}
 	return handler.logicalDeviceMgr.ListLogicalDevicePorts(ctx, id.Id)
 }
 
@@ -413,7 +468,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireRequest(ctx, nil); err != nil {
+		if txn, err := handler.acquireRequest(ctx, &utils.DeviceID{Id: device.Id}); err != nil {
 			return &voltha.Device{}, err
 		} else {
 			defer txn.Close()
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 6dfcb86..96e3541 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -134,6 +134,19 @@
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
 
+func (ldMgr *LogicalDeviceManager) listManagedLogicalDevices() (*voltha.LogicalDevices, error) {
+	log.Debug("listManagedLogicalDevices")
+	result := &voltha.LogicalDevices{}
+	ldMgr.lockLogicalDeviceAgentsMap.RLock()
+	defer ldMgr.lockLogicalDeviceAgentsMap.RUnlock()
+	for _, agent := range ldMgr.logicalDeviceAgents {
+		if ld, _ := agent.GetLogicalDevice(); ld != nil {
+			result.Items = append(result.Items, ld)
+		}
+	}
+	return result, nil
+}
+
 func (ldMgr *LogicalDeviceManager) listLogicalDevices() (*voltha.LogicalDevices, error) {
 	log.Debug("ListAllLogicalDevices")
 	result := &voltha.LogicalDevices{}