[VOL-1588] Improve Flow Add performance
This update consists of the following:
1) Update the performance when adding a flow to a logical device,
decomposing the flow into parent and child device and sending the
flow to the adapters.
2) Format a number of files as per GO fmt.
3) Ensure the device graph cache gets updated when a new port is
added to the graph that belongs to an existing device in cache.
The flow update/deletion performance will be addressed in a separate
commit.
Change-Id: I2eb663cc73eef9fc6172203ed88a35726f5fe008
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 6532b6e..8ff3f04 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -24,8 +24,8 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-protos/go/common"
- "github.com/opencord/voltha-protos/go/openflow_13"
"github.com/opencord/voltha-protos/go/omci"
+ "github.com/opencord/voltha-protos/go/openflow_13"
"github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
@@ -35,38 +35,37 @@
)
const (
- IMAGE_DOWNLOAD = iota
- CANCEL_IMAGE_DOWNLOAD = iota
- ACTIVATE_IMAGE = iota
- REVERT_IMAGE = iota
+ IMAGE_DOWNLOAD = iota
+ CANCEL_IMAGE_DOWNLOAD = iota
+ ACTIVATE_IMAGE = iota
+ REVERT_IMAGE = iota
)
-
type APIHandler struct {
- deviceMgr *DeviceManager
- logicalDeviceMgr *LogicalDeviceManager
- adapterMgr *AdapterManager
- packetInQueue *queue.Queue
- changeEventQueue *queue.Queue
- coreInCompetingMode bool
+ deviceMgr *DeviceManager
+ logicalDeviceMgr *LogicalDeviceManager
+ adapterMgr *AdapterManager
+ packetInQueue *queue.Queue
+ changeEventQueue *queue.Queue
+ coreInCompetingMode bool
longRunningRequestTimeout int64
- defaultRequestTimeout int64
+ defaultRequestTimeout int64
da.DefaultAPIHandler
core *Core
}
func NewAPIHandler(core *Core) *APIHandler {
handler := &APIHandler{
- deviceMgr: core.deviceMgr,
- logicalDeviceMgr: core.logicalDeviceMgr,
- adapterMgr: core.adapterMgr,
- coreInCompetingMode: core.config.InCompetingMode,
- longRunningRequestTimeout:core.config.LongRunningRequestTimeout,
- defaultRequestTimeout:core.config.DefaultRequestTimeout,
+ deviceMgr: core.deviceMgr,
+ logicalDeviceMgr: core.logicalDeviceMgr,
+ adapterMgr: core.adapterMgr,
+ coreInCompetingMode: core.config.InCompetingMode,
+ longRunningRequestTimeout: core.config.LongRunningRequestTimeout,
+ defaultRequestTimeout: core.config.DefaultRequestTimeout,
// TODO: Figure out what the 'hint' parameter to queue.New does
- packetInQueue: queue.New(10),
+ packetInQueue: queue.New(10),
changeEventQueue: queue.New(10),
- core: core,
+ core: core,
}
return handler
}
@@ -92,7 +91,7 @@
} else if serNum, ok = md["voltha_serial_number"]; !ok {
err = errors.New("serial-number-not-found")
}
- if !ok {
+ if !ok || serNum == nil {
log.Error(err)
return nil, err
}
@@ -129,7 +128,7 @@
log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
txn, err := handler.createKvTransaction(ctx)
if txn == nil {
- return nil, err
+ return nil, err
} else if txn.Acquired(timeout) {
return txn, nil
} else {
@@ -160,7 +159,7 @@
log.Debugw("transaction-timeout", log.Fields{"timeout": timeout})
txn, err := handler.createKvTransaction(ctx)
if txn == nil {
- return nil, err
+ return nil, err
}
owned := false
@@ -213,7 +212,6 @@
return out, nil
}
-
func (handler *APIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
out := new(empty.Empty)
@@ -231,7 +229,6 @@
return &voltha.Membership{}, nil
}
-
func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
log.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
if isTestMode(ctx) {
@@ -240,7 +237,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -261,7 +258,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -283,7 +280,7 @@
if handler.competeForTransaction() {
if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:flow.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -306,7 +303,7 @@
if handler.competeForTransaction() {
if !handler.isOFControllerRequest(ctx) { // No need to acquire the transaction as request is sent to one core only
- if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id:flow.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.LogicalDeviceID{Id: flow.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -370,7 +367,6 @@
return handler.logicalDeviceMgr.listLogicalDevices()
}
-
// ListAdapters returns the contents of all adapters known to the system
func (handler *APIHandler) ListAdapters(ctx context.Context, empty *empty.Empty) (*voltha.Adapters, error) {
log.Debug("ListDevices")
@@ -408,7 +404,7 @@
return &voltha.Device{}, err
}
if d, ok := res.(*voltha.Device); ok {
- handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id:d.Id})
+ handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{Id: d.Id})
return d, nil
}
}
@@ -429,7 +425,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}, handler.longRunningRequestTimeout); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}, handler.longRunningRequestTimeout); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -450,7 +446,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -471,7 +467,7 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:id.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: id.Id}); err != nil {
return new(empty.Empty), err
} else {
defer txn.Close()
@@ -514,14 +510,14 @@
}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:img.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: img.Id}); err != nil {
return &common.OperationResp{}, err
} else {
defer txn.Close()
}
}
- failedresponse := &common.OperationResp{Code:voltha.OperationResp_OPERATION_FAILURE}
+ failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
ch := make(chan interface{})
defer close(ch)
@@ -605,7 +601,7 @@
failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
if handler.competeForTransaction() {
- if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id:img.Id}); err != nil {
+ if txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{Id: img.Id}); err != nil {
return failedresponse, err
} else {
defer txn.Close()
@@ -651,15 +647,15 @@
func (handler *APIHandler) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
log.Debugw("ListImageDownloads-request", log.Fields{"deviceId": id.Id})
if isTestMode(ctx) {
- resp := &voltha.ImageDownloads{Items:[]*voltha.ImageDownload{}}
+ resp := &voltha.ImageDownloads{Items: []*voltha.ImageDownload{}}
return resp, nil
}
if downloads, err := handler.deviceMgr.listImageDownloads(ctx, id.Id); err != nil {
failedResp := &voltha.ImageDownloads{
- Items:[]*voltha.ImageDownload{
- &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN},
- },
+ Items: []*voltha.ImageDownload{
+ {DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN},
+ },
}
return failedResp, err
} else {
@@ -667,7 +663,6 @@
}
}
-
func (handler *APIHandler) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
log.Debugw("UpdateDevicePmConfigs-request", log.Fields{"configs": *configs})
if isTestMode(ctx) {
@@ -819,7 +814,7 @@
//@TODO useless stub, what should this actually do?
func (handler *APIHandler) GetMeterStatsOfLogicalDevice(
- ctx context.Context,
+ ctx context.Context,
in *common.ID,
) (*openflow_13.MeterStatsReply, error) {
log.Debug("GetMeterStatsOfLogicalDevice-stub")
@@ -828,8 +823,8 @@
//@TODO useless stub, what should this actually do?
func (handler *APIHandler) GetMibDeviceData(
- ctx context.Context,
- in *common.ID,
+ ctx context.Context,
+ in *common.ID,
) (*omci.MibDeviceData, error) {
log.Debug("GetMibDeviceData-stub")
return nil, nil