[VOL-3005] Separate Flows from Device

Also some unit test functions moved to a test util class.
New loaders and Proxy implementation are applied.

Change-Id: Icf5a6f0a42a2dbaeff768fdb108f5e9b46644977
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 357c49a..17ac266 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -34,6 +34,7 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"github.com/opencord/voltha-protos/v3/go/common"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+	"github.com/opencord/voltha-protos/v3/go/openflow_13"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc/codes"
@@ -50,6 +51,7 @@
 	logicalDeviceMgr        *LogicalManager
 	kafkaICProxy            kafka.InterContainerProxy
 	stateTransitions        *TransitionMap
+	dbPath                  *model.Path
 	dProxy                  *model.Proxy
 	coreInstanceID          string
 	defaultTimeout          time.Duration
@@ -57,13 +59,15 @@
 	deviceLoadingInProgress map[string][]chan int
 }
 
-func NewManagers(dbProxy *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+//NewManagers creates the Manager and the Logical Manager.
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
 	deviceMgr := &Manager{
 		rootDevices:             make(map[string]bool),
 		kafkaICProxy:            kmp,
 		adapterProxy:            remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
 		coreInstanceID:          coreInstanceID,
-		dProxy:                  dbProxy.Proxy("devices"),
+		dbPath:                  dbPath,
+		dProxy:                  dbPath.Proxy("devices"),
 		adapterMgr:              adapterMgr,
 		defaultTimeout:          defaultCoreTimeout * time.Millisecond,
 		deviceLoadingInProgress: make(map[string][]chan int),
@@ -74,8 +78,8 @@
 		Manager:                        event.NewManager(),
 		deviceMgr:                      deviceMgr,
 		kafkaICProxy:                   kmp,
-		dbProxy:                        dbProxy,
-		ldProxy:                        dbProxy.Proxy("logical_devices"),
+		dbPath:                         dbPath,
+		ldProxy:                        dbPath.Proxy("logical_devices"),
 		defaultTimeout:                 defaultCoreTimeout,
 		logicalDeviceLoadingInProgress: make(map[string][]chan int),
 	}
@@ -157,7 +161,7 @@
 	// Ensure this device is set as root
 	device.Root = true
 	// Create and start a device agent for that device
-	agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+	agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
 	device, err = agent.start(ctx, device)
 	if err != nil {
 		logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
@@ -220,22 +224,34 @@
 // ListDeviceFlows returns the flow details for a specific device entry
 func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
 	logger.Debugw("ListDeviceFlows", log.Fields{"device-id": id.Id})
-	device, err := dMgr.getDevice(ctx, id.Id)
-	if err != nil {
-		return &ofp.Flows{}, err
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
+		return &ofp.Flows{}, status.Errorf(codes.NotFound, "device-%s", id.Id)
 	}
-	return device.Flows, nil
+
+	flows := agent.listDeviceFlows()
+	ctr, ret := 0, make([]*ofp.OfpFlowStats, len(flows))
+	for _, flow := range flows {
+		ret[ctr] = flow
+		ctr++
+	}
+	return &openflow_13.Flows{Items: ret}, nil
 }
 
 // ListDeviceFlowGroups returns the flow group details for a specific device entry
 func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
 	logger.Debugw("ListDeviceFlowGroups", log.Fields{"device-id": id.Id})
-
-	device, err := dMgr.getDevice(ctx, id.Id)
-	if err != nil {
+	agent := dMgr.getDeviceAgent(ctx, id.Id)
+	if agent == nil {
 		return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
 	}
-	return device.GetFlowGroups(), nil
+	groups := agent.listDeviceGroups()
+	ctr, ret := 0, make([]*openflow_13.OfpGroupEntry, len(groups))
+	for _, group := range groups {
+		ret[ctr] = group
+		ctr++
+	}
+	return &voltha.FlowGroups{Items: ret}, nil
 }
 
 // stopManagingDevice stops the management of the device as well as any of its reference device and logical device.
@@ -408,7 +424,7 @@
 		// If device is not in memory then set it up
 		if !dMgr.IsDeviceInCache(device.Id) {
 			logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
-			agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+			agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
 			if _, err := agent.start(ctx, nil); err != nil {
 				logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
 			} else {
@@ -471,7 +487,7 @@
 			// Proceed with the loading only if the device exist in the Model (could have been deleted)
 			if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
 				logger.Debugw("loading-device", log.Fields{"deviceId": deviceID})
-				agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+				agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
 				if _, err = agent.start(ctx, nil); err != nil {
 					logger.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
 				} else {
@@ -1029,7 +1045,7 @@
 	childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
 
 	// Create and start a device agent for that device
-	agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+	agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
 	childDevice, err := agent.start(ctx, childDevice)
 	if err != nil {
 		logger.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})