[VOL-1429] This commit adds a flag to bypass the transaction processing
in single core instance. It also removes the hardcoded KV store
path prefix and put it as part of the config. Adding the ability for
a calling function to change the transaction timeout.
Change-Id: I8570b44b34db99b46410dafd58c6c5b86ea97b41
diff --git a/compose/rw_core.yml b/compose/rw_core.yml
index 8b164db..dba2544 100644
--- a/compose/rw_core.yml
+++ b/compose/rw_core.yml
@@ -29,6 +29,8 @@
- -kafka_cluster_host=${DOCKER_HOST_IP}
- -kafka_cluster_port=9092
- -rw_core_topic=rwcore
+ - -kv_store_data_prefix=service/voltha
+ - -in_competing_mode=false
- -log_level=0
ports:
- 50057:50057
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index f7ac794..fc819a1 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -26,33 +26,35 @@
const (
ConsulStoreName = "consul"
EtcdStoreName = "etcd"
- default_InstanceID = "rwcore001"
- default_GrpcPort = 50057
- default_GrpcHost = ""
- default_KafkaAdapterHost = "127.0.0.1"
- default_KafkaAdapterPort = 9092
- default_KafkaClusterHost = "127.0.0.1"
- default_KafkaClusterPort = 9094
- default_KVStoreType = EtcdStoreName
- default_KVStoreTimeout = 5 //in seconds
- default_KVStoreHost = "127.0.0.1"
- default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
- default_KVTxnKeyDelTime = 60
- default_LogLevel = 0
- default_Banner = false
- default_CoreTopic = "rwcore"
- default_RWCoreEndpoint = "rwcore"
- default_RWCoreKey = "pki/voltha.key"
- default_RWCoreCert = "pki/voltha.crt"
- default_RWCoreCA = "pki/voltha-CA.pem"
- default_Affinity_Router_Topic = "affinityRouter"
+ default_InstanceID = "rwcore001"
+ default_GrpcPort = 50057
+ default_GrpcHost = ""
+ default_KafkaAdapterHost = "127.0.0.1"
+ default_KafkaAdapterPort = 9092
+ default_KafkaClusterHost = "127.0.0.1"
+ default_KafkaClusterPort = 9094
+ default_KVStoreType = EtcdStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "127.0.0.1"
+ default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ default_KVTxnKeyDelTime = 60
+ default_KVStoreDataPrefix = "service/voltha"
+ default_LogLevel = 0
+ default_Banner = false
+ default_CoreTopic = "rwcore"
+ default_RWCoreEndpoint = "rwcore"
+ default_RWCoreKey = "pki/voltha.key"
+ default_RWCoreCert = "pki/voltha.crt"
+ default_RWCoreCA = "pki/voltha-CA.pem"
+ default_AffinityRouterTopic = "affinityRouter"
+ default_InCompetingMode = true
)
// RWCoreFlags represents the set of configurations used by the read-write core service
type RWCoreFlags struct {
// Command line parameters
- InstanceID string
- RWCoreEndpoint string
+ InstanceID string
+ RWCoreEndpoint string
GrpcHost string
GrpcPort int
KafkaAdapterHost string
@@ -64,6 +66,7 @@
KVStoreHost string
KVStorePort int
KVTxnKeyDelTime int
+ KVStoreDataPrefix string
CoreTopic string
LogLevel int
Banner bool
@@ -71,6 +74,7 @@
RWCoreCert string
RWCoreCA string
AffinityRouterTopic string
+ InCompetingMode bool
}
func init() {
@@ -87,11 +91,12 @@
KafkaAdapterHost: default_KafkaAdapterHost,
KafkaAdapterPort: default_KafkaAdapterPort,
KafkaClusterHost: default_KafkaClusterHost,
- KafkaClusterPort: default_KafkaClusterPort,
- KVStoreType: default_KVStoreType,
+ KafkaClusterPort: default_KafkaClusterPort,
+ KVStoreType: default_KVStoreType,
KVStoreTimeout: default_KVStoreTimeout,
KVStoreHost: default_KVStoreHost,
KVStorePort: default_KVStorePort,
+ KVStoreDataPrefix: default_KVStoreDataPrefix,
KVTxnKeyDelTime: default_KVTxnKeyDelTime,
CoreTopic: default_CoreTopic,
LogLevel: default_LogLevel,
@@ -99,7 +104,8 @@
RWCoreKey: default_RWCoreKey,
RWCoreCert: default_RWCoreCert,
RWCoreCA: default_RWCoreCA,
- AffinityRouterTopic: default_Affinity_Router_Topic,
+ AffinityRouterTopic: default_AffinityRouterTopic,
+ InCompetingMode: default_InCompetingMode,
}
return &rwCoreFlag
}
@@ -134,7 +140,10 @@
flag.StringVar(&(cf.CoreTopic), "rw_core_topic", default_CoreTopic, help)
help = fmt.Sprintf("Affinity Router topic")
- flag.StringVar(&(cf.AffinityRouterTopic), "affinity_router_topic", default_Affinity_Router_Topic, help)
+ flag.StringVar(&(cf.AffinityRouterTopic), "affinity_router_topic", default_AffinityRouterTopic, help)
+
+ help = fmt.Sprintf("In competing Mode - two cores competing to handle a transaction ")
+ flag.BoolVar(&cf.InCompetingMode, "in_competing_mode", default_InCompetingMode, help)
help = fmt.Sprintf("KV store type")
flag.StringVar(&(cf.KVStoreType), "kv_store_type", default_KVStoreType, help)
@@ -151,6 +160,9 @@
help = fmt.Sprintf("The time to wait before deleting a completed transaction key")
flag.IntVar(&(cf.KVTxnKeyDelTime), "kv_txn_delete_time", default_KVTxnKeyDelTime, help)
+ help = fmt.Sprintf("KV store data prefix")
+ flag.StringVar(&(cf.KVStoreDataPrefix), "kv_store_data_prefix", default_KVStoreDataPrefix, help)
+
help = fmt.Sprintf("Log level")
flag.IntVar(&(cf.LogLevel), "log_level", default_LogLevel, help)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 484ff35..c13face 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -66,7 +66,7 @@
Host: cf.KVStoreHost,
Port: cf.KVStorePort,
Timeout: cf.KVStoreTimeout,
- PathPrefix: "service/voltha"}
+ PathPrefix: cf.KVStoreDataPrefix}
core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
@@ -76,11 +76,15 @@
func (core *Core) Start(ctx context.Context) {
log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
- core.startKafkaMessagingProxy(ctx)
+ if err := core.startKafkaMessagingProxy(ctx); err != nil {
+ log.Fatal("Failure-starting-kafkaMessagingProxy")
+ }
log.Info("values", log.Fields{"kmp": core.kmp})
core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy, core.instanceId)
core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
- core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.clusterDataProxy, core.localDataProxy)
+ if err := core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
+ log.Fatal("Failure-registering-adapterRequestHandler")
+ }
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
go core.startGRPCService(ctx)
@@ -106,7 +110,7 @@
core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false)
log.Info("grpc-server-created")
- core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr)
+ core.grpcNBIAPIHandler = NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.config.InCompetingMode)
core.logicalDeviceMgr.setGrpcNbiHandler(core.grpcNBIAPIHandler)
// Create a function to register the core GRPC service with the GRPC server
f := func(gs *grpc.Server) {
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index dd777c0..4c82471 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -35,7 +35,7 @@
//TODO: Move this Tag into the proto file
const OF_CONTROLLER_TAG= "voltha_backend_name"
-const MAX_RESPONSE_TIME = 500 // milliseconds
+const MAX_RESPONSE_TIME = int64(500) // milliseconds
const (
IMAGE_DOWNLOAD = iota
@@ -48,13 +48,15 @@
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
packetInQueue *queue.Queue
+ coreInCompetingMode bool
da.DefaultAPIHandler
}
-func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager) *APIHandler {
+func NewAPIHandler(deviceMgr *DeviceManager, lDeviceMgr *LogicalDeviceManager, inCompetingMode bool) *APIHandler {
handler := &APIHandler{
deviceMgr: deviceMgr,
logicalDeviceMgr: lDeviceMgr,
+ coreInCompetingMode:inCompetingMode,
// TODO: Figure out what the 'hint' parameter to queue.New does
packetInQueue: queue.New(10),
}
@@ -94,24 +96,35 @@
// isOFControllerRequest is a helper function to determine if a request was initiated
// from the OpenFlow controller (or its proxy, e.g. OFAgent)
func isOFControllerRequest(ctx context.Context) bool {
- var (
- ok bool
- md metadata.MD
- value []string
- )
- if md, ok = metadata.FromIncomingContext(ctx); !ok {
- // No metadata
- return false
+ if md, ok := metadata.FromIncomingContext(ctx); ok {
+ // Metadata in context
+ if _, ok = md[OF_CONTROLLER_TAG]; ok {
+ // OFAgent field in metadata
+ return true
+ }
}
- if value, ok = md[OF_CONTROLLER_TAG]; !ok {
- // No OFAgent field in metadata
- return false
+ return false
+}
+
+// competeForTransaction is a helper function to determine whether every request needs to compete with another
+// Core to execute the request
+func (handler *APIHandler) competeForTransaction() bool {
+ return handler.coreInCompetingMode
+}
+
+func (handler *APIHandler) acquireTransaction(ctx context.Context, maxTimeout ...int64) (*KVTransaction, error) {
+ timeout := MAX_RESPONSE_TIME
+ if len(maxTimeout) > 0 {
+ timeout = maxTimeout[0]
}
- if value[0] == "" {
- // OFAgent has not set a field value
- return false
+ txn, err := handler.createKvTransaction(ctx)
+ if txn == nil {
+ return nil, err
+ } else if txn.Acquired(timeout) {
+ return txn, nil
+ } else {
+ return nil, errors.New("failed-to-seize-request")
}
- return true
}
// waitForNilResponseOnSuccess is a helper function to wait for a response on channel ch where an nil
@@ -152,16 +165,14 @@
return out, nil
}
- if isOFControllerRequest(ctx) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
+ if handler.competeForTransaction() {
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
return new(empty.Empty), err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- defer txn.Close() // Ensure active core signals "done" to standby
} else {
- return new(empty.Empty), errors.New("failed-to-seize-request")
+ defer txn.Close()
}
}
+
ch := make(chan interface{})
defer close(ch)
go handler.logicalDeviceMgr.enableLogicalPort(ctx, id, ch)
@@ -175,16 +186,14 @@
return out, nil
}
- if isOFControllerRequest(ctx) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
+ if handler.competeForTransaction() {
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
return new(empty.Empty), err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- defer txn.Close() // Ensure active core signals "done" to standby
} else {
- return new(empty.Empty), errors.New("failed-to-seize-request")
+ defer txn.Close()
}
}
+
ch := make(chan interface{})
defer close(ch)
go handler.logicalDeviceMgr.disableLogicalPort(ctx, id, ch)
@@ -198,16 +207,16 @@
return out, nil
}
- if isOFControllerRequest(ctx) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
- return new(empty.Empty), err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- defer txn.Close() // Ensure active core signals "done" to standby
- } else {
- return new(empty.Empty), errors.New("failed-to-seize-request")
+ if handler.competeForTransaction() {
+ if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
+ return new(empty.Empty), err
+ } else {
+ defer txn.Close()
+ }
}
}
+
ch := make(chan interface{})
defer close(ch)
go handler.logicalDeviceMgr.updateFlowTable(ctx, flow.Id, flow.FlowMod, ch)
@@ -221,16 +230,16 @@
return out, nil
}
- if isOFControllerRequest(ctx) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
- return new(empty.Empty), err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- defer txn.Close() // Ensure active core signals "done" to standby
- } else {
- return new(empty.Empty), errors.New("failed-to-seize-request")
+ if handler.competeForTransaction() {
+ if !isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
+ return new(empty.Empty), err
+ } else {
+ defer txn.Close()
+ }
}
}
+
ch := make(chan interface{})
defer close(ch)
go handler.logicalDeviceMgr.updateGroupTable(ctx, flow.Id, flow.GroupMod, ch)
@@ -267,16 +276,8 @@
return out, nil
}
- if isOFControllerRequest(ctx) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
- return new(empty.Empty), err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- defer txn.Close() // Ensure active core signals "done" to standby
- } else {
- return new(empty.Empty), errors.New("failed-to-seize-request")
- }
- }
+ // No need to grab a transaction as this request is core specific
+
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.ReconcileDevices(ctx, ids, ch)
@@ -308,16 +309,14 @@
return &voltha.Device{Id: device.Id}, nil
}
- if isOFControllerRequest(ctx) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
+ if handler.competeForTransaction() {
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
return &voltha.Device{}, err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- defer txn.Close() // Ensure active core signals "done" to standby
} else {
- return &voltha.Device{}, errors.New("failed-to-seize-request")
+ defer txn.Close()
}
}
+
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.createDevice(ctx, device, ch)
@@ -347,16 +346,14 @@
return new(empty.Empty), nil
}
- if isOFControllerRequest(ctx) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
+ if handler.competeForTransaction() {
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
return new(empty.Empty), err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- defer txn.Close() // Ensure active core signals "done" to standby
} else {
- return new(empty.Empty), errors.New("failed-to-seize-request")
+ defer txn.Close()
}
}
+
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.enableDevice(ctx, id, ch)
@@ -370,16 +367,14 @@
return new(empty.Empty), nil
}
- if isOFControllerRequest(ctx) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
+ if handler.competeForTransaction() {
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
return new(empty.Empty), err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- defer txn.Close() // Ensure active core signals "done" to standby
} else {
- return new(empty.Empty), errors.New("failed-to-seize-request")
+ defer txn.Close()
}
}
+
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.disableDevice(ctx, id, ch)
@@ -393,16 +388,14 @@
return new(empty.Empty), nil
}
- if isOFControllerRequest(ctx) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
+ if handler.competeForTransaction() {
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
return new(empty.Empty), err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- defer txn.Close() // Ensure active core signals "done" to standby
} else {
- return new(empty.Empty), errors.New("failed-to-seize-request")
+ defer txn.Close()
}
}
+
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.rebootDevice(ctx, id, ch)
@@ -416,34 +409,20 @@
return new(empty.Empty), nil
}
- if isOFControllerRequest(ctx) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
+ if handler.competeForTransaction() {
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
return new(empty.Empty), err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- defer txn.Close() // Ensure active core signals "done" to standby
} else {
- return new(empty.Empty), errors.New("failed-to-seize-request")
+ defer txn.Close()
}
}
+
ch := make(chan interface{})
defer close(ch)
go handler.deviceMgr.deleteDevice(ctx, id, ch)
return waitForNilResponseOnSuccess(ctx, ch)
}
-func (handler *APIHandler) acquireTransaction(ctx context.Context) (*KVTransaction, error) {
- txn, err := handler.createKvTransaction(ctx)
- if txn == nil {
- return nil, err
- } else if txn.Acquired(MAX_RESPONSE_TIME) {
- return txn, nil
- } else {
- txn.Close()
- return nil, errors.New("failed-to-seize-request")
- }
-}
-
// processImageRequest is a helper method to execute an image download request
func (handler *APIHandler) processImageRequest(ctx context.Context, img *voltha.ImageDownload, requestType int) (*common.OperationResp, error) {
log.Debugw("processImageDownload", log.Fields{"img": *img, "requestType": requestType})
@@ -452,10 +431,12 @@
return resp, nil
}
- if txn, err := handler.acquireTransaction(ctx); err != nil {
- return &common.OperationResp{}, err
- } else {
- defer txn.Close()
+ if handler.competeForTransaction() {
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
+ return &common.OperationResp{}, err
+ } else {
+ defer txn.Close()
+ }
}
failedresponse := &common.OperationResp{Code:voltha.OperationResp_OPERATION_FAILURE}
@@ -541,10 +522,12 @@
failedresponse := &voltha.ImageDownload{State: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
- if txn, err := handler.acquireTransaction(ctx); err != nil {
- return failedresponse, err
- } else {
- defer txn.Close()
+ if handler.competeForTransaction() {
+ if txn, err := handler.acquireTransaction(ctx); err != nil {
+ return failedresponse, err
+ } else {
+ defer txn.Close()
+ }
}
ch := make(chan interface{})
diff --git a/rw_core/main.go b/rw_core/main.go
index 336e731..2a29499 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -124,11 +124,14 @@
err := rw.setKVClient()
if err == nil {
// Setup KV transaction context
- c.SetTransactionContext(rw.config.InstanceID,
- "service/voltha/transactions/",
+ txnPrefix := rw.config.KVStoreDataPrefix + "/transactions/"
+ if err = c.SetTransactionContext(rw.config.InstanceID,
+ txnPrefix,
rw.kvClient,
rw.config.KVStoreTimeout,
- rw.config.KVTxnKeyDelTime)
+ rw.config.KVTxnKeyDelTime); err != nil {
+ log.Fatal("creating-transaction-context-failed")
+ }
}
// Setup Kafka Client