[VOL-1462] Sync data between two voltha cores in the same pair

This commit consists of the following updates:
1) Background data syncing between two cores after a transaction
is completed by one core.
2) Add transaction management to southbound APIs (adapter facing).
This is enabled got adapter registration only for now.
3) Fix an issue with flow decomposition
4) Add the rough-in to allow a packet to be send to an OFAgent
with a transaction ID.  Two cores can therefore send the same
packet and let the OFAgent discard the duplicate.  The work in
OFAgent remains.
5) Cleanups

Change-Id: Ibe9d75edb66cfd6a0954bdfeb16a7e7c8a3c53b6
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 877fcb4..f72d615 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -35,8 +35,6 @@
 //TODO:  Move this Tag into the proto file
 const OF_CONTROLLER_TAG= "voltha_backend_name"
 
-//const MAX_RESPONSE_TIME = int64(500) // milliseconds
-
 const (
 	IMAGE_DOWNLOAD = iota
 	CANCEL_IMAGE_DOWNLOAD     = iota
@@ -44,6 +42,15 @@
 	REVERT_IMAGE = iota
 )
 
+
+type deviceID struct {
+	id string
+}
+
+type logicalDeviceID struct {
+	id string
+}
+
 type APIHandler struct {
 	deviceMgr        *DeviceManager
 	logicalDeviceMgr *LogicalDeviceManager
@@ -118,7 +125,7 @@
 	return handler.coreInCompetingMode
 }
 
-func (handler *APIHandler) acquireTransaction(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+func (handler *APIHandler) acquireTransaction(ctx context.Context, id interface{}, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := handler.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
@@ -130,6 +137,20 @@
 	} else if txn.Acquired(timeout) {
 		return txn, nil
 	} else {
+		if id != nil {
+			// The id can either be a device Id or a logical device id.
+			if dId, ok := id.(*deviceID); ok {
+				// Since this core has not processed this request, let's load the device, along with its extended
+				// family (parents and children) in memory.   This will keep this core in-sync with its paired core as
+				// much as possible. The watch feature in the core model will ensure that the contents of those objects in
+				// memory are in sync.
+				time.Sleep(2 * time.Second)
+				go handler.deviceMgr.load(dId.id)
+			} else if ldId, ok := id.(*logicalDeviceID); ok {
+				// This will load the logical device along with its children and grandchildren
+				go handler.logicalDeviceMgr.load(ldId.id)
+			}
+		}
 		return nil, errors.New("failed-to-seize-request")
 	}
 }
@@ -173,7 +194,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -194,7 +215,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -216,7 +237,7 @@
 
 	if handler.competeForTransaction() {
 		if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.acquireTransaction(ctx); err != nil {
+			if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
@@ -239,7 +260,7 @@
 
 	if handler.competeForTransaction() {
 		if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
-			if txn, err := handler.acquireTransaction(ctx); err != nil {
+			if txn, err := handler.acquireTransaction(ctx, &logicalDeviceID{id:flow.Id}); err != nil {
 				return new(empty.Empty), err
 			} else {
 				defer txn.Close()
@@ -324,7 +345,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
 			return &voltha.Device{}, err
 		} else {
 			defer txn.Close()
@@ -361,7 +382,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx, handler.longRunningRequestTimeout); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}, handler.longRunningRequestTimeout); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -382,7 +403,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -403,7 +424,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:id.Id}); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -424,7 +445,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, nil); err != nil {
 			return new(empty.Empty), err
 		} else {
 			defer txn.Close()
@@ -446,7 +467,7 @@
 	}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
 			return &common.OperationResp{}, err
 		} else {
 			defer txn.Close()
@@ -537,7 +558,7 @@
 	failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
 
 	if handler.competeForTransaction() {
-		if txn, err := handler.acquireTransaction(ctx); err != nil {
+		if txn, err := handler.acquireTransaction(ctx, &deviceID{id:img.Id}); err != nil {
 			return failedresponse, err
 		} else {
 			defer txn.Close()
@@ -670,7 +691,8 @@
 	return nil
 }
 
-func (handler *APIHandler) sendPacketIn(deviceId string, packet *openflow_13.OfpPacketIn) {
+func (handler *APIHandler) sendPacketIn(deviceId string, transationId string, packet *openflow_13.OfpPacketIn) {
+	// TODO: Augment the OF PacketIn to include the transactionId
 	packetIn := openflow_13.PacketIn{Id: deviceId, PacketIn: packet}
 	log.Debugw("sendPacketIn", log.Fields{"packetIn": packetIn})
 	// Enqueue the packet