VOL-4435 checks for parent device in reconcilation + flow timeout
Change-Id: I6de908454775d9c4ff98cf13682567241dd77ebb
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index ad885c2..4375b9b 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -45,6 +45,7 @@
RWCoreCA string
InternalTimeout time.Duration
RPCTimeout time.Duration
+ FlowTimeout time.Duration
MaxConnectionRetries int
ConnectionRetryInterval time.Duration
LiveProbeInterval time.Duration
@@ -114,6 +115,11 @@
5*time.Second,
"RPC timeout")
+ fs.DurationVar(&(cf.FlowTimeout), //Note flow time out will be considered for flows related rpc's not rpc timeout
+ "flow_timeout",
+ 30*time.Second,
+ "Flow timeout")
+
fs.BoolVar(&cf.Banner,
"banner",
false,
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index ad7b523..818b976 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -69,6 +69,7 @@
requestQueue *coreutils.RequestQueue
internalTimeout time.Duration
rpcTimeout time.Duration
+ flowTimeout time.Duration
startOnce sync.Once
stopOnce sync.Once
stopped bool
@@ -83,7 +84,7 @@
}
//newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, internalTimeout, rpcTimeout time.Duration) *Agent {
+func newAgent(device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, internalTimeout, rpcTimeout, flowTimeout time.Duration) *Agent {
deviceID := device.Id
if deviceID == "" {
deviceID = coreutils.CreateDeviceID()
@@ -101,6 +102,7 @@
dbProxy: deviceProxy,
internalTimeout: internalTimeout,
rpcTimeout: rpcTimeout,
+ flowTimeout: flowTimeout,
device: proto.Clone(device).(*voltha.Device),
requestQueue: coreutils.NewRequestQueue(),
config: deviceMgr.config,
@@ -465,7 +467,7 @@
if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
return err
}
- if errs := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); errs != nil {
+ if errs := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); errs != nil {
logger.Warnw(ctx, "adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
}
@@ -484,7 +486,7 @@
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -502,7 +504,7 @@
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index f53cb7d..cb32eff 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -161,7 +161,7 @@
response.Error(err)
return
}
- subCtx, cancel := context.WithTimeout(ctx, agent.rpcTimeout)
+ subCtx, cancel := context.WithTimeout(ctx, agent.flowTimeout)
defer cancel()
if _, err = client.UpdateFlowsBulk(subCtx, &ca.BulkFlows{
@@ -203,7 +203,7 @@
response.Error(err)
return
}
- subCtx, cancel := context.WithTimeout(ctx, agent.rpcTimeout)
+ subCtx, cancel := context.WithTimeout(ctx, agent.flowTimeout)
defer cancel()
if _, err = client.UpdateFlowsIncrementally(subCtx, &ca.IncrementalFlows{
Device: device,
@@ -400,7 +400,7 @@
if err != nil {
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, response); res != nil {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, response); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 35142d3..5cc1f26 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -154,7 +154,7 @@
func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
deviceMgr := dat.deviceMgr
clonedDevice := proto.Clone(dat.device).(*voltha.Device)
- deviceAgent := newAgent(clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.internalTimeout, deviceMgr.rpcTimeout)
+ deviceAgent := newAgent(clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.internalTimeout, deviceMgr.rpcTimeout, deviceMgr.flowTimeout)
d, err := deviceAgent.start(context.TODO(), false, clonedDevice)
assert.Nil(t, err)
assert.NotNil(t, d)
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 9f7e656..c8322a2 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -54,6 +54,7 @@
coreInstanceID string
internalTimeout time.Duration
rpcTimeout time.Duration
+ flowTimeout time.Duration
devicesLoadingLock sync.RWMutex
deviceLoadingInProgress map[string][]chan int
config *config.RWCoreFlags
@@ -69,6 +70,7 @@
adapterMgr: adapterMgr,
internalTimeout: cf.InternalTimeout,
rpcTimeout: cf.RPCTimeout,
+ flowTimeout: cf.FlowTimeout,
Agent: event.NewAgent(eventProxy, coreInstanceID, cf.VolthaStackID),
deviceLoadingInProgress: make(map[string][]chan int),
config: cf,
@@ -111,7 +113,7 @@
for _, device := range devices {
// Create an agent for each device
- agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout)
+ agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout, dMgr.flowTimeout)
if _, err := agent.start(ctx, true, device); err != nil {
logger.Warnw(ctx, "failure-starting-agent", log.Fields{"device-id": device.Id})
} else {
@@ -269,7 +271,7 @@
// Proceed with the loading only if the device exist in the Model (could have been deleted)
if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
logger.Debugw(ctx, "loading-device", log.Fields{"device-id": deviceID})
- agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout)
+ agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout, dMgr.flowTimeout)
if _, err = agent.start(ctx, true, device); err != nil {
logger.Warnw(ctx, "failure-loading-device", log.Fields{"device-id": deviceID, "error": err})
} else {
@@ -470,6 +472,18 @@
if err := agent.canDeviceRequestProceed(ctx); err != nil {
return err
}
+ // Perform the same checks for parent device
+ if !agent.isRootDevice {
+ parentDeviceAgent := dMgr.getDeviceAgent(ctx, agent.parentID)
+ if parentDeviceAgent == nil {
+ logger.Errorw(ctx, "parent-device-adapter-nil", log.Fields{"parent-id": agent.parentID})
+ return status.Errorf(codes.Unavailable, "parent-device-adapter-nil-for-%s", deviceID)
+ }
+ if err := parentDeviceAgent.canDeviceRequestProceed(ctx); err != nil {
+ return err
+ }
+ }
+
}
if !ready {
return status.Error(codes.Unavailable, "adapter(s)-not-ready")
diff --git a/rw_core/core/device/manager_nbi.go b/rw_core/core/device/manager_nbi.go
index 3f23e2d..edefc54 100644
--- a/rw_core/core/device/manager_nbi.go
+++ b/rw_core/core/device/manager_nbi.go
@@ -54,7 +54,7 @@
// Ensure this device is set as root
device.Root = true
// Create and start a device agent for that device
- agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout)
+ agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout, dMgr.flowTimeout)
device, err = agent.start(ctx, false, device)
if err != nil {
logger.Errorw(ctx, "fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
diff --git a/rw_core/core/device/manager_sbi.go b/rw_core/core/device/manager_sbi.go
index 5530750..b518b2a 100644
--- a/rw_core/core/device/manager_sbi.go
+++ b/rw_core/core/device/manager_sbi.go
@@ -149,7 +149,7 @@
}
// Create and start a device agent for that device
- agent := newAgent(childDevice, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout)
+ agent := newAgent(childDevice, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout, dMgr.flowTimeout)
insertedChildDevice, err := agent.start(ctx, false, childDevice)
if err != nil {
logger.Errorw(ctx, "error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
diff --git a/rw_core/test/core_nbi_handler_multi_test.go b/rw_core/test/core_nbi_handler_multi_test.go
index 8dbbbfc..a8f9a0e 100755
--- a/rw_core/test/core_nbi_handler_multi_test.go
+++ b/rw_core/test/core_nbi_handler_multi_test.go
@@ -97,6 +97,7 @@
internalTimeout time.Duration
maxTimeout time.Duration
coreRPCTimeout time.Duration
+ coreFlowTimeout time.Duration
core *c.Core
probe *probe.Probe
oltAdaptersLock sync.RWMutex
@@ -130,10 +131,12 @@
test.internalTimeout = 20 * time.Second
test.maxTimeout = 20 * time.Second
test.coreRPCTimeout = 20 * time.Second
+ test.coreFlowTimeout = 30 * time.Second
if loadTest {
test.internalTimeout = 100 * time.Second
test.maxTimeout = 300 * time.Second
test.coreRPCTimeout = 100 * time.Second
+ test.coreFlowTimeout = 120 * time.Second
setRetryInterval(5 * time.Second)
}
return test
@@ -145,6 +148,7 @@
cfg.ParseCommandArguments([]string{})
cfg.InternalTimeout = nb.internalTimeout
cfg.RPCTimeout = nb.coreRPCTimeout
+ cfg.FlowTimeout = nb.coreFlowTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(nb.kvClientPort)
cfg.LogLevel = "DEBUG"