VOL-3503 Add a device operational state of RECONCILING

Change-Id: I55dad67a24acdfac0af9448e6f19ec9d35edc39e
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index dd64ab9..2c77186 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -19,6 +19,7 @@
 import (
 	"context"
 	"errors"
+	"github.com/opencord/voltha-go/rw_core/config"
 	"sync"
 	"time"
 
@@ -58,31 +59,33 @@
 	defaultTimeout          time.Duration
 	devicesLoadingLock      sync.RWMutex
 	deviceLoadingInProgress map[string][]chan int
+	config                  *config.RWCoreFlags
 }
 
 //NewManagers creates the Manager and the Logical Manager.
-func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration, eventProxy *events.EventProxy, stackID string) (*Manager, *LogicalManager) {
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, cf *config.RWCoreFlags, coreInstanceID string, eventProxy *events.EventProxy) (*Manager, *LogicalManager) {
 	deviceMgr := &Manager{
 		rootDevices:             make(map[string]bool),
 		kafkaICProxy:            kmp,
-		adapterProxy:            remote.NewAdapterProxy(kmp, coreTopic, endpointMgr),
+		adapterProxy:            remote.NewAdapterProxy(kmp, cf.CoreTopic, endpointMgr),
 		coreInstanceID:          coreInstanceID,
 		dbPath:                  dbPath,
 		dProxy:                  dbPath.Proxy("devices"),
 		adapterMgr:              adapterMgr,
-		defaultTimeout:          defaultCoreTimeout,
-		Agent:                   event.NewAgent(eventProxy, coreInstanceID, stackID),
+		defaultTimeout:          cf.DefaultCoreTimeout,
+		Agent:                   event.NewAgent(eventProxy, coreInstanceID, cf.VolthaStackID),
 		deviceLoadingInProgress: make(map[string][]chan int),
+		config:                  cf,
 	}
 	deviceMgr.stateTransitions = state.NewTransitionMap(deviceMgr)
 
 	logicalDeviceMgr := &LogicalManager{
-		Manager:                        event.NewManager(eventProxy, coreInstanceID, stackID),
+		Manager:                        event.NewManager(eventProxy, coreInstanceID, cf.VolthaStackID),
 		deviceMgr:                      deviceMgr,
 		kafkaICProxy:                   kmp,
 		dbPath:                         dbPath,
 		ldProxy:                        dbPath.Proxy("logical_devices"),
-		defaultTimeout:                 defaultCoreTimeout,
+		defaultTimeout:                 cf.DefaultCoreTimeout,
 		logicalDeviceLoadingInProgress: make(map[string][]chan int),
 	}
 	deviceMgr.logicalDeviceMgr = logicalDeviceMgr
@@ -651,17 +654,6 @@
 	return &empty.Empty{}, nil
 }
 
-// isOkToReconcile validates whether a device is in the correct status to be reconciled
-func (dMgr *Manager) isOkToReconcile(ctx context.Context, device *voltha.Device) bool {
-	if device == nil {
-		return false
-	}
-	if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
-		return device.AdminState != voltha.AdminState_PREPROVISIONED && (!agent.isDeletionInProgress())
-	}
-	return false
-}
-
 // adapterRestarted is invoked whenever an adapter is restarted
 func (dMgr *Manager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
 	logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapter-id": adapter.Id, "vendor": adapter.Vendor,
@@ -673,7 +665,11 @@
 		return nil
 	}
 
-	responses := make([]utils.Response, 0)
+	if len(dMgr.rootDevices) == 0 {
+		logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapter-id": adapter.Id})
+		return nil
+	}
+
 	for rootDeviceID := range dMgr.rootDevices {
 		dAgent := dMgr.getDeviceAgent(ctx, rootDeviceID)
 		if dAgent == nil {
@@ -691,9 +687,9 @@
 				continue
 			}
 			if isDeviceOwnedByService {
-				if dMgr.isOkToReconcile(ctx, rootDevice) {
+				if rootDevice.AdminState != voltha.AdminState_PREPROVISIONED {
 					logger.Debugw(ctx, "reconciling-root-device", log.Fields{"rootId": rootDevice.Id})
-					responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, rootDevice))
+					go dAgent.ReconcileDevice(ctx, rootDevice)
 				} else {
 					logger.Debugw(ctx, "not-reconciling-root-device", log.Fields{"rootId": rootDevice.Id, "state": rootDevice.AdminState})
 				}
@@ -708,9 +704,9 @@
 								logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapter-type": adapter.Type, "replica-number": adapter.CurrentReplica})
 							}
 							if isDeviceOwnedByService {
-								if dMgr.isOkToReconcile(ctx, childDevice) {
+								if childDevice.AdminState != voltha.AdminState_PREPROVISIONED {
 									logger.Debugw(ctx, "reconciling-child-device", log.Fields{"child-device-id": childDevice.Id})
-									responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
+									go dAgent.ReconcileDevice(ctx, childDevice)
 								} else {
 									logger.Debugw(ctx, "not-reconciling-child-device", log.Fields{"child-device-id": childDevice.Id, "state": childDevice.AdminState})
 								}
@@ -725,54 +721,25 @@
 			}
 		}
 	}
-	if len(responses) > 0 {
-		// Wait for completion
-		if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
-			return status.Errorf(codes.Aborted, "errors-%s", res)
-		}
-	} else {
-		logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapter-id": adapter.Id})
-	}
+	logger.Debugw(ctx, "Reconciling for device on adapter restart is initiated", log.Fields{"adapter-id": adapter.Id})
+
 	return nil
 }
 
-func (dMgr *Manager) sendReconcileDeviceRequest(ctx context.Context, device *voltha.Device) utils.Response {
-	// Send a reconcile request to the adapter. Since this Core may not be managing this device then there is no
-	// point of creating a device agent (if the device is not being managed by this Core) before sending the request
-	// to the adapter.   We will therefore bypass the adapter adapter and send the request directly to the adapter via
-	// the adapter proxy.
-	response := utils.NewResponse()
-	ch, err := dMgr.adapterProxy.ReconcileDevice(ctx, device)
-	if err != nil {
-		response.Error(err)
-	}
-	// Wait for adapter response in its own routine
-	go func() {
-		resp, ok := <-ch
-		if !ok {
-			response.Error(status.Errorf(codes.Aborted, "channel-closed-device: %s", device.Id))
-		} else if resp.Err != nil {
-			response.Error(resp.Err)
-		}
-		response.Done()
-	}()
-	return response
-}
-
 func (dMgr *Manager) ReconcileChildDevices(ctx context.Context, parentDeviceID string) error {
+	dAgent := dMgr.getDeviceAgent(ctx, parentDeviceID)
+	if dAgent == nil {
+		return status.Errorf(codes.NotFound, "error-unable to get agent from device")
+	}
 	if parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID); err == nil {
-		responses := make([]utils.Response, 0)
 		for _, port := range parentDevicePorts {
 			for _, peer := range port.Peers {
 				if childDevice, err := dMgr.getDeviceFromModel(ctx, peer.DeviceId); err == nil {
-					responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
+					go dAgent.ReconcileDevice(ctx, childDevice)
 				}
 			}
 		}
-		// Wait for completion
-		if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
-			return status.Errorf(codes.Aborted, "errors-%s", res)
-		}
+		logger.Debugw(ctx, "Reconcile initiated for child devices", log.Fields{"parent-device-id": parentDeviceID})
 	}
 	return nil
 }
@@ -1961,3 +1928,17 @@
 		}
 	}
 }
+
+func (dMgr *Manager) ReconcilingCleanup(ctx context.Context, device *voltha.Device) error {
+	agent := dMgr.getDeviceAgent(ctx, device.Id)
+	if agent == nil {
+		logger.Errorf(ctx, "Not able to get device agent.")
+		return status.Errorf(codes.NotFound, "Not able to get device agent for device : %s", device.Id)
+	}
+	err := agent.reconcilingCleanup(ctx)
+	if err != nil {
+		logger.Errorf(ctx, err.Error())
+		return status.Errorf(codes.Internal, err.Error())
+	}
+	return nil
+}