[VOL-1429]  This commit adds a flag to bypass the transaction processing
in single core instance.  It also removes the hardcoded KV store
path prefix and put it as part of the config. Adding the ability for
a calling function to change the transaction timeout.

Change-Id: I8570b44b34db99b46410dafd58c6c5b86ea97b41
diff --git a/compose/rw_core.yml b/compose/rw_core.yml
index 8b164db..dba2544 100644
--- a/compose/rw_core.yml
+++ b/compose/rw_core.yml
@@ -29,6 +29,8 @@
         - -kafka_cluster_host=${DOCKER_HOST_IP}
         - -kafka_cluster_port=9092
         - -rw_core_topic=rwcore
+        - -kv_store_data_prefix=service/voltha
+        - -in_competing_mode=false
         - -log_level=0
     ports:
       - 50057:50057
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index f7ac794..fc819a1 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -26,33 +26,35 @@
 const (
 	ConsulStoreName               = "consul"
 	EtcdStoreName                 = "etcd"
-	default_InstanceID            = "rwcore001"
-	default_GrpcPort              = 50057
-	default_GrpcHost              = ""
-	default_KafkaAdapterHost      = "127.0.0.1"
-	default_KafkaAdapterPort      = 9092
-	default_KafkaClusterHost      = "127.0.0.1"
-	default_KafkaClusterPort      = 9094
-	default_KVStoreType           = EtcdStoreName
-	default_KVStoreTimeout        = 5 //in seconds
-	default_KVStoreHost           = "127.0.0.1"
-	default_KVStorePort           = 2379 // Consul = 8500; Etcd = 2379
-	default_KVTxnKeyDelTime       = 60
-	default_LogLevel              = 0
-	default_Banner                = false
-	default_CoreTopic             = "rwcore"
-	default_RWCoreEndpoint        = "rwcore"
-	default_RWCoreKey             = "pki/voltha.key"
-	default_RWCoreCert            = "pki/voltha.crt"
-	default_RWCoreCA              = "pki/voltha-CA.pem"
-	default_Affinity_Router_Topic = "affinityRouter"
+	default_InstanceID           = "rwcore001"
+	default_GrpcPort             = 50057
+	default_GrpcHost             = ""
+	default_KafkaAdapterHost     = "127.0.0.1"
+	default_KafkaAdapterPort     = 9092
+	default_KafkaClusterHost     = "127.0.0.1"
+	default_KafkaClusterPort     = 9094
+	default_KVStoreType          = EtcdStoreName
+	default_KVStoreTimeout       = 5 //in seconds
+	default_KVStoreHost         = "127.0.0.1"
+	default_KVStorePort         = 2379 // Consul = 8500; Etcd = 2379
+	default_KVTxnKeyDelTime     = 60
+	default_KVStoreDataPrefix   = "service/voltha"
+	default_LogLevel            = 0
+	default_Banner              = false
+	default_CoreTopic           = "rwcore"
+	default_RWCoreEndpoint      = "rwcore"
+	default_RWCoreKey           = "pki/voltha.key"
+	default_RWCoreCert          = "pki/voltha.crt"
+	default_RWCoreCA            = "pki/voltha-CA.pem"
+	default_AffinityRouterTopic = "affinityRouter"
+	default_InCompetingMode     = true
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
 type RWCoreFlags struct {
 	// Command line parameters
-	InstanceID          string
-	RWCoreEndpoint      string
+	InstanceID           string
+	RWCoreEndpoint       string
 	GrpcHost            string
 	GrpcPort            int
 	KafkaAdapterHost    string
@@ -64,6 +66,7 @@
 	KVStoreHost         string
 	KVStorePort         int
 	KVTxnKeyDelTime     int
+	KVStoreDataPrefix   string
 	CoreTopic           string
 	LogLevel            int
 	Banner              bool
@@ -71,6 +74,7 @@
 	RWCoreCert          string
 	RWCoreCA            string
 	AffinityRouterTopic string
+	InCompetingMode     bool
 }
 
 func init() {
@@ -87,11 +91,12 @@
 		KafkaAdapterHost:    default_KafkaAdapterHost,
 		KafkaAdapterPort:    default_KafkaAdapterPort,
 		KafkaClusterHost:    default_KafkaClusterHost,
-		KafkaClusterPort:    default_KafkaClusterPort,
-		KVStoreType:         default_KVStoreType,
+		KafkaClusterPort:     default_KafkaClusterPort,
+		KVStoreType:          default_KVStoreType,
 		KVStoreTimeout:      default_KVStoreTimeout,
 		KVStoreHost:         default_KVStoreHost,
 		KVStorePort:         default_KVStorePort,
+		KVStoreDataPrefix:   default_KVStoreDataPrefix,
 		KVTxnKeyDelTime:     default_KVTxnKeyDelTime,
 		CoreTopic:           default_CoreTopic,
 		LogLevel:            default_LogLevel,
@@ -99,7 +104,8 @@
 		RWCoreKey:           default_RWCoreKey,
 		RWCoreCert:          default_RWCoreCert,
 		RWCoreCA:            default_RWCoreCA,
-		AffinityRouterTopic: default_Affinity_Router_Topic,
+		AffinityRouterTopic: default_AffinityRouterTopic,
+		InCompetingMode:     default_InCompetingMode,
 	}
 	return &rwCoreFlag
 }
@@ -134,7 +140,10 @@
 	flag.StringVar(&(cf.CoreTopic), "rw_core_topic", default_CoreTopic, help)
 
 	help = fmt.Sprintf("Affinity Router topic")
-	flag.StringVar(&(cf.AffinityRouterTopic), "affinity_router_topic", default_Affinity_Router_Topic, help)
+	flag.StringVar(&(cf.AffinityRouterTopic), "affinity_router_topic", default_AffinityRouterTopic, help)
+
+	help = fmt.Sprintf("In competing Mode - two cores competing to handle a transaction ")
+	flag.BoolVar(&cf.InCompetingMode, "in_competing_mode", default_InCompetingMode, help)
 
 	help = fmt.Sprintf("KV store type")
 	flag.StringVar(&(cf.KVStoreType), "kv_store_type", default_KVStoreType, help)
@@ -151,6 +160,9 @@
 	help = fmt.Sprintf("The time to wait before deleting a completed transaction key")
 	flag.IntVar(&(cf.KVTxnKeyDelTime), "kv_txn_delete_time", default_KVTxnKeyDelTime, help)
 
+	help = fmt.Sprintf("KV store data prefix")
+	flag.StringVar(&(cf.KVStoreDataPrefix), "kv_store_data_prefix", default_KVStoreDataPrefix, help)
+
 	help = fmt.Sprintf("Log level")
 	flag.IntVar(&(cf.LogLevel), "log_level", default_LogLevel, help)
 
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 484ff35..c13face 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -66,7 +66,7 @@
 		Host:       cf.KVStoreHost,
 		Port:       cf.KVStorePort,
 		Timeout:    cf.KVStoreTimeout,
-		PathPrefix: "service/voltha"}
+		PathPrefix: cf.KVStoreDataPrefix}
 	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
 	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
 	core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
@@ -76,11 +76,15 @@
 
 func (core *Core) Start(ctx context.Context) {
 	log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
-	core.startKafkaMessagingProxy(ctx)
+	if err := core.startKafkaMessagingProxy(ctx); err != nil {
+		log.Fatal("Failure-starting-kafkaMessagingProxy")
+	}
 	log.Info("values", log.Fields{"kmp": core.kmp})
 	core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.instanceId)
 	core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
-	core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.clusterDataProxy, core.localDataProxy)
+	if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+		log.Fatal("Failure-registering-adapterRequestHandler")
+	}
 	go core.startDeviceManager(ctx)
 	go core.startLogicalDeviceManager(ctx)
 	go core.startGRPCService(ctx)
@@ -106,7 +110,7 @@
 	core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
 	log.Info("grpc-server-created")
 
-	core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr)
+	core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.config.InCompetingMode)
 	core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
 	//	Create a function to register the core GRPC service with the GRPC server
 	f := func(gs *grpc.Server) {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index dd777c0..4c82471 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -35,7 +35,7 @@
 //TODO:  Move this Tag into the proto file
 const OF_CONTROLLER_TAG= "voltha_backend_name"
 
-const MAX_RESPONSE_TIME = 500 // milliseconds
+const MAX_RESPONSE_TIME = int64(500) // milliseconds
 
 const (
 	IMAGE_DOWNLOAD = iota
@@ -48,13 +48,15 @@
 	deviceMgr        *DeviceManager
 	logicalDeviceMgr *LogicalDeviceManager
 	packetInQueue    *queue.Queue
+	coreInCompetingMode bool
 	da.DefaultAPIHandler
 }
 
-func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager) *APIHandler {
+func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, inCompetingMode bool) *APIHandler {
 	handler := &APIHandler{
 		deviceMgr:        deviceMgr,
 		logicalDeviceMgr: lDeviceMgr,
+		coreInCompetingMode:inCompetingMode,
 		// TODO: Figure out what the 'hint' parameter to queue.New does
 		packetInQueue: queue.New(10),
 	}
@@ -94,24 +96,35 @@
 // isOFControllerRequest is a helper function to determine if a request was initiated
 // from the OpenFlow controller (or its proxy, e.g. OFAgent)
 func isOFControllerRequest(ctx context.Context) bool {
-	var (
-		ok    bool
-		md    metadata.MD
-		value []string
-	)
-	if md, ok = metadata.FromIncomingContext(ctx); !ok {
-		// No metadata
-		return false
+	if md, ok := metadata.FromIncomingContext(ctx); ok {
+		// Metadata in context
+		if _, ok = md[OF_CONTROLLER_TAG]; ok {
+			// OFAgent field in metadata
+			return true
+		}
 	}
-	if value, ok = md[OF_CONTROLLER_TAG]; !ok {
-		// No OFAgent field in metadata
-		return false
+	return false
+}
+
+// competeForTransaction is a helper function to determine whether every request needs to compete with another
+// Core to execute the request
+func (handler *APIHandler) competeForTransaction() bool {
+	return handler.coreInCompetingMode
+}
+
+func (handler *APIHandler) acquireTransaction(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+	timeout := MAX_RESPONSE_TIME
+	if len(maxTimeout) > 0 {
+		timeout = maxTimeout[0]
 	}
-	if value[0] == "" {
-		// OFAgent has not set a field value
-		return false
+	txn, err := handler.createKvTransaction(ctx)
+	if txn == nil {
+		return nil,  err
+	} else if txn.Acquired(timeout) {
+		return txn, nil
+	} else {
+		return nil, errors.New("failed-to-seize-request")
 	}
-	return true
 }
 
 // waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
@@ -152,16 +165,14 @@
 		return out, nil
 	}
 
-	if isOFControllerRequest(ctx) {
-		txn, err := handler.createKvTransaction(ctx)
-		if txn == nil {
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireTransaction(ctx); err != nil {
 			return new(empty.Empty), err
-		} else if txn.Acquired(MAX_RESPONSE_TIME) {
-			defer txn.Close() // Ensure active core signals "done" to standby
 		} else {
-			return new(empty.Empty), errors.New("failed-to-seize-request")
+			defer txn.Close()
 		}
 	}
+
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.logicalDeviceMgr.enableLogicalPort(ctx, id, ch)
@@ -175,16 +186,14 @@
 		return out, nil
 	}
 
-	if isOFControllerRequest(ctx) {
-		txn, err := handler.createKvTransaction(ctx)
-		if txn == nil {
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireTransaction(ctx); err != nil {
 			return new(empty.Empty), err
-		} else if txn.Acquired(MAX_RESPONSE_TIME) {
-			defer txn.Close() // Ensure active core signals "done" to standby
 		} else {
-			return new(empty.Empty), errors.New("failed-to-seize-request")
+			defer txn.Close()
 		}
 	}
+
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.logicalDeviceMgr.disableLogicalPort(ctx, id, ch)
@@ -198,16 +207,16 @@
 		return out, nil
 	}
 
-	if isOFControllerRequest(ctx) {
-		txn, err := handler.createKvTransaction(ctx)
-		if txn == nil {
-			return new(empty.Empty), err
-		} else if txn.Acquired(MAX_RESPONSE_TIME) {
-			defer txn.Close() // Ensure active core signals "done" to standby
-		} else {
-			return new(empty.Empty), errors.New("failed-to-seize-request")
+	if handler.competeForTransaction() {
+		if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
+			if txn, err := handler.acquireTransaction(ctx); err != nil {
+				return new(empty.Empty), err
+			} else {
+				defer txn.Close()
+			}
 		}
 	}
+
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.logicalDeviceMgr.updateFlowTable(ctx, flow.Id, flow.FlowMod, ch)
@@ -221,16 +230,16 @@
 		return out, nil
 	}
 
-	if isOFControllerRequest(ctx) {
-		txn, err := handler.createKvTransaction(ctx)
-		if txn == nil {
-			return new(empty.Empty), err
-		} else if txn.Acquired(MAX_RESPONSE_TIME) {
-			defer txn.Close() // Ensure active core signals "done" to standby
-		} else {
-			return new(empty.Empty), errors.New("failed-to-seize-request")
+	if handler.competeForTransaction() {
+		if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
+			if txn, err := handler.acquireTransaction(ctx); err != nil {
+				return new(empty.Empty), err
+			} else {
+				defer txn.Close()
+			}
 		}
 	}
+
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.logicalDeviceMgr.updateGroupTable(ctx, flow.Id, flow.GroupMod, ch)
@@ -267,16 +276,8 @@
 		return out, nil
 	}
 
-	if isOFControllerRequest(ctx) {
-		txn, err := handler.createKvTransaction(ctx)
-		if txn == nil {
-			return new(empty.Empty), err
-		} else if txn.Acquired(MAX_RESPONSE_TIME) {
-			defer txn.Close() // Ensure active core signals "done" to standby
-		} else {
-			return new(empty.Empty), errors.New("failed-to-seize-request")
-		}
-	}
+	// No need to grab a transaction as this request is core specific
+
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.ReconcileDevices(ctx, ids, ch)
@@ -308,16 +309,14 @@
 		return &voltha.Device{Id: device.Id}, nil
 	}
 
-	if isOFControllerRequest(ctx) {
-		txn, err := handler.createKvTransaction(ctx)
-		if txn == nil {
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireTransaction(ctx); err != nil {
 			return &voltha.Device{}, err
-		} else if txn.Acquired(MAX_RESPONSE_TIME) {
-			defer txn.Close() // Ensure active core signals "done" to standby
 		} else {
-			return &voltha.Device{}, errors.New("failed-to-seize-request")
+			defer txn.Close()
 		}
 	}
+
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.createDevice(ctx, device, ch)
@@ -347,16 +346,14 @@
 		return new(empty.Empty), nil
 	}
 
-	if isOFControllerRequest(ctx) {
-		txn, err := handler.createKvTransaction(ctx)
-		if txn == nil {
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireTransaction(ctx); err != nil {
 			return new(empty.Empty), err
-		} else if txn.Acquired(MAX_RESPONSE_TIME) {
-			defer txn.Close() // Ensure active core signals "done" to standby
 		} else {
-			return new(empty.Empty), errors.New("failed-to-seize-request")
+			defer txn.Close()
 		}
 	}
+
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.enableDevice(ctx, id, ch)
@@ -370,16 +367,14 @@
 		return new(empty.Empty), nil
 	}
 
-	if isOFControllerRequest(ctx) {
-		txn, err := handler.createKvTransaction(ctx)
-		if txn == nil {
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireTransaction(ctx); err != nil {
 			return new(empty.Empty), err
-		} else if txn.Acquired(MAX_RESPONSE_TIME) {
-			defer txn.Close() // Ensure active core signals "done" to standby
 		} else {
-			return new(empty.Empty), errors.New("failed-to-seize-request")
+			defer txn.Close()
 		}
 	}
+
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.disableDevice(ctx, id, ch)
@@ -393,16 +388,14 @@
 		return new(empty.Empty), nil
 	}
 
-	if isOFControllerRequest(ctx) {
-		txn, err := handler.createKvTransaction(ctx)
-		if txn == nil {
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireTransaction(ctx); err != nil {
 			return new(empty.Empty), err
-		} else if txn.Acquired(MAX_RESPONSE_TIME) {
-			defer txn.Close() // Ensure active core signals "done" to standby
 		} else {
-			return new(empty.Empty), errors.New("failed-to-seize-request")
+			defer txn.Close()
 		}
 	}
+
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.rebootDevice(ctx, id, ch)
@@ -416,34 +409,20 @@
 		return new(empty.Empty), nil
 	}
 
-	if isOFControllerRequest(ctx) {
-		txn, err := handler.createKvTransaction(ctx)
-		if txn == nil {
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireTransaction(ctx); err != nil {
 			return new(empty.Empty), err
-		} else if txn.Acquired(MAX_RESPONSE_TIME) {
-			defer txn.Close() // Ensure active core signals "done" to standby
 		} else {
-			return new(empty.Empty), errors.New("failed-to-seize-request")
+			defer txn.Close()
 		}
 	}
+
 	ch := make(chan interface{})
 	defer close(ch)
 	go handler.deviceMgr.deleteDevice(ctx, id, ch)
 	return waitForNilResponseOnSuccess(ctx, ch)
 }
 
-func (handler *APIHandler) acquireTransaction(ctx context.Context) (*KVTransaction, error) {
-	txn, err := handler.createKvTransaction(ctx)
-	if txn == nil {
-		return nil,  err
-	} else if txn.Acquired(MAX_RESPONSE_TIME) {
-		return txn, nil
-	} else {
-		txn.Close()
-		return nil, errors.New("failed-to-seize-request")
-	}
-}
-
 // processImageRequest is a helper method to execute an image download request
 func (handler *APIHandler) processImageRequest(ctx context.Context, img *voltha.ImageDownload, requestType int) (*common.OperationResp, error) {
 	log.Debugw("processImageDownload", log.Fields{"img": *img, "requestType": requestType})
@@ -452,10 +431,12 @@
 		return resp, nil
 	}
 
-	if txn, err := handler.acquireTransaction(ctx); err != nil {
-		return &common.OperationResp{}, err
-	} else {
-		defer txn.Close()
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireTransaction(ctx); err != nil {
+			return &common.OperationResp{}, err
+		} else {
+			defer txn.Close()
+		}
 	}
 
 	failedresponse := &common.OperationResp{Code:voltha.OperationResp_OPERATION_FAILURE}
@@ -541,10 +522,12 @@
 
 	failedresponse := &voltha.ImageDownload{State: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
 
-	if txn, err := handler.acquireTransaction(ctx); err != nil {
-		return  failedresponse, err
-	} else {
-		defer txn.Close()
+	if handler.competeForTransaction() {
+		if txn, err := handler.acquireTransaction(ctx); err != nil {
+			return failedresponse, err
+		} else {
+			defer txn.Close()
+		}
 	}
 
 	ch := make(chan interface{})
diff --git a/rw_core/main.go b/rw_core/main.go
index 336e731..2a29499 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -124,11 +124,14 @@
 	err := rw.setKVClient()
 	if err == nil {
 		// Setup KV transaction context
-		c.SetTransactionContext(rw.config.InstanceID,
-			"service/voltha/transactions/",
+		txnPrefix := rw.config.KVStoreDataPrefix + "/transactions/"
+		if err = c.SetTransactionContext(rw.config.InstanceID,
+			txnPrefix,
 			rw.kvClient,
 			rw.config.KVStoreTimeout,
-			rw.config.KVTxnKeyDelTime)
+			rw.config.KVTxnKeyDelTime); err != nil {
+			log.Fatal("creating-transaction-context-failed")
+		}
 	}
 
 	// Setup Kafka Client