VOL-4435 checks for parent device in reconcilation + flow timeout

Change-Id: I6de908454775d9c4ff98cf13682567241dd77ebb
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index ad885c2..4375b9b 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -45,6 +45,7 @@
 	RWCoreCA                    string
 	InternalTimeout             time.Duration
 	RPCTimeout                  time.Duration
+	FlowTimeout                 time.Duration
 	MaxConnectionRetries        int
 	ConnectionRetryInterval     time.Duration
 	LiveProbeInterval           time.Duration
@@ -114,6 +115,11 @@
 		5*time.Second,
 		"RPC timeout")
 
+	fs.DurationVar(&(cf.FlowTimeout), //Note flow time out will be considered for flows related rpc's not rpc timeout
+		"flow_timeout",
+		30*time.Second,
+		"Flow timeout")
+
 	fs.BoolVar(&cf.Banner,
 		"banner",
 		false,
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index ad7b523..818b976 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -69,6 +69,7 @@
 	requestQueue         *coreutils.RequestQueue
 	internalTimeout      time.Duration
 	rpcTimeout           time.Duration
+	flowTimeout          time.Duration
 	startOnce            sync.Once
 	stopOnce             sync.Once
 	stopped              bool
@@ -83,7 +84,7 @@
 }
 
 //newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, internalTimeout, rpcTimeout time.Duration) *Agent {
+func newAgent(device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, internalTimeout, rpcTimeout, flowTimeout time.Duration) *Agent {
 	deviceID := device.Id
 	if deviceID == "" {
 		deviceID = coreutils.CreateDeviceID()
@@ -101,6 +102,7 @@
 		dbProxy:              deviceProxy,
 		internalTimeout:      internalTimeout,
 		rpcTimeout:           rpcTimeout,
+		flowTimeout:          flowTimeout,
 		device:               proto.Clone(device).(*voltha.Device),
 		requestQueue:         coreutils.NewRequestQueue(),
 		config:               deviceMgr.config,
@@ -465,7 +467,7 @@
 	if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
 		return err
 	}
-	if errs := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); errs != nil {
+	if errs := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); errs != nil {
 		logger.Warnw(ctx, "adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
 		return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
 	}
@@ -484,7 +486,7 @@
 		return err
 	}
 
-	if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -502,7 +504,7 @@
 		return err
 	}
 
-	if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, flwResponse, grpResponse); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, flwResponse, grpResponse); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index f53cb7d..cb32eff 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -161,7 +161,7 @@
 		response.Error(err)
 		return
 	}
-	subCtx, cancel := context.WithTimeout(ctx, agent.rpcTimeout)
+	subCtx, cancel := context.WithTimeout(ctx, agent.flowTimeout)
 	defer cancel()
 
 	if _, err = client.UpdateFlowsBulk(subCtx, &ca.BulkFlows{
@@ -203,7 +203,7 @@
 		response.Error(err)
 		return
 	}
-	subCtx, cancel := context.WithTimeout(ctx, agent.rpcTimeout)
+	subCtx, cancel := context.WithTimeout(ctx, agent.flowTimeout)
 	defer cancel()
 	if _, err = client.UpdateFlowsIncrementally(subCtx, &ca.IncrementalFlows{
 		Device:       device,
@@ -400,7 +400,7 @@
 	if err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.rpcTimeout, response); res != nil {
+	if res := coreutils.WaitForNilOrErrorResponses(agent.flowTimeout, response); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 35142d3..5cc1f26 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -154,7 +154,7 @@
 func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
 	deviceMgr := dat.deviceMgr
 	clonedDevice := proto.Clone(dat.device).(*voltha.Device)
-	deviceAgent := newAgent(clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.internalTimeout, deviceMgr.rpcTimeout)
+	deviceAgent := newAgent(clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.internalTimeout, deviceMgr.rpcTimeout, deviceMgr.flowTimeout)
 	d, err := deviceAgent.start(context.TODO(), false, clonedDevice)
 	assert.Nil(t, err)
 	assert.NotNil(t, d)
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 9f7e656..c8322a2 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -54,6 +54,7 @@
 	coreInstanceID          string
 	internalTimeout         time.Duration
 	rpcTimeout              time.Duration
+	flowTimeout             time.Duration
 	devicesLoadingLock      sync.RWMutex
 	deviceLoadingInProgress map[string][]chan int
 	config                  *config.RWCoreFlags
@@ -69,6 +70,7 @@
 		adapterMgr:              adapterMgr,
 		internalTimeout:         cf.InternalTimeout,
 		rpcTimeout:              cf.RPCTimeout,
+		flowTimeout:             cf.FlowTimeout,
 		Agent:                   event.NewAgent(eventProxy, coreInstanceID, cf.VolthaStackID),
 		deviceLoadingInProgress: make(map[string][]chan int),
 		config:                  cf,
@@ -111,7 +113,7 @@
 
 	for _, device := range devices {
 		// Create an agent for each device
-		agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout)
+		agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout, dMgr.flowTimeout)
 		if _, err := agent.start(ctx, true, device); err != nil {
 			logger.Warnw(ctx, "failure-starting-agent", log.Fields{"device-id": device.Id})
 		} else {
@@ -269,7 +271,7 @@
 			// Proceed with the loading only if the device exist in the Model (could have been deleted)
 			if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
 				logger.Debugw(ctx, "loading-device", log.Fields{"device-id": deviceID})
-				agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout)
+				agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout, dMgr.flowTimeout)
 				if _, err = agent.start(ctx, true, device); err != nil {
 					logger.Warnw(ctx, "failure-loading-device", log.Fields{"device-id": deviceID, "error": err})
 				} else {
@@ -470,6 +472,18 @@
 		if err := agent.canDeviceRequestProceed(ctx); err != nil {
 			return err
 		}
+		// Perform the same checks for parent device
+		if !agent.isRootDevice {
+			parentDeviceAgent := dMgr.getDeviceAgent(ctx, agent.parentID)
+			if parentDeviceAgent == nil {
+				logger.Errorw(ctx, "parent-device-adapter-nil", log.Fields{"parent-id": agent.parentID})
+				return status.Errorf(codes.Unavailable, "parent-device-adapter-nil-for-%s", deviceID)
+			}
+			if err := parentDeviceAgent.canDeviceRequestProceed(ctx); err != nil {
+				return err
+			}
+		}
+
 	}
 	if !ready {
 		return status.Error(codes.Unavailable, "adapter(s)-not-ready")
diff --git a/rw_core/core/device/manager_nbi.go b/rw_core/core/device/manager_nbi.go
index 3f23e2d..edefc54 100644
--- a/rw_core/core/device/manager_nbi.go
+++ b/rw_core/core/device/manager_nbi.go
@@ -54,7 +54,7 @@
 	// Ensure this device is set as root
 	device.Root = true
 	// Create and start a device agent for that device
-	agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout)
+	agent := newAgent(device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout, dMgr.flowTimeout)
 	device, err = agent.start(ctx, false, device)
 	if err != nil {
 		logger.Errorw(ctx, "fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
diff --git a/rw_core/core/device/manager_sbi.go b/rw_core/core/device/manager_sbi.go
index 5530750..b518b2a 100644
--- a/rw_core/core/device/manager_sbi.go
+++ b/rw_core/core/device/manager_sbi.go
@@ -149,7 +149,7 @@
 	}
 
 	// Create and start a device agent for that device
-	agent := newAgent(childDevice, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout)
+	agent := newAgent(childDevice, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.internalTimeout, dMgr.rpcTimeout, dMgr.flowTimeout)
 	insertedChildDevice, err := agent.start(ctx, false, childDevice)
 	if err != nil {
 		logger.Errorw(ctx, "error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
diff --git a/rw_core/test/core_nbi_handler_multi_test.go b/rw_core/test/core_nbi_handler_multi_test.go
index 8dbbbfc..a8f9a0e 100755
--- a/rw_core/test/core_nbi_handler_multi_test.go
+++ b/rw_core/test/core_nbi_handler_multi_test.go
@@ -97,6 +97,7 @@
 	internalTimeout   time.Duration
 	maxTimeout        time.Duration
 	coreRPCTimeout    time.Duration
+	coreFlowTimeout   time.Duration
 	core              *c.Core
 	probe             *probe.Probe
 	oltAdaptersLock   sync.RWMutex
@@ -130,10 +131,12 @@
 	test.internalTimeout = 20 * time.Second
 	test.maxTimeout = 20 * time.Second
 	test.coreRPCTimeout = 20 * time.Second
+	test.coreFlowTimeout = 30 * time.Second
 	if loadTest {
 		test.internalTimeout = 100 * time.Second
 		test.maxTimeout = 300 * time.Second
 		test.coreRPCTimeout = 100 * time.Second
+		test.coreFlowTimeout = 120 * time.Second
 		setRetryInterval(5 * time.Second)
 	}
 	return test
@@ -145,6 +148,7 @@
 	cfg.ParseCommandArguments([]string{})
 	cfg.InternalTimeout = nb.internalTimeout
 	cfg.RPCTimeout = nb.coreRPCTimeout
+	cfg.FlowTimeout = nb.coreFlowTimeout
 	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(nb.kvClientPort)
 	cfg.LogLevel = "DEBUG"