[VOL-3005] Separate Flows from Device

Also some unit test functions moved to a test util class.
New loaders and Proxy implementation are applied.

Change-Id: Icf5a6f0a42a2dbaeff768fdb108f5e9b46644977
diff --git a/rw_core/core/api/common_test.go b/rw_core/core/api/common_test.go
index f9bb047..097b1c1 100644
--- a/rw_core/core/api/common_test.go
+++ b/rw_core/core/api/common_test.go
@@ -18,24 +18,12 @@
 import (
 	"context"
 	"fmt"
-	"strconv"
 	"time"
 
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/google/uuid"
-	"github.com/opencord/voltha-go/rw_core/config"
-	cm "github.com/opencord/voltha-go/rw_core/mocks"
-	"github.com/opencord/voltha-lib-go/v3/pkg/adapters"
-	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
-	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"github.com/phayes/freeport"
-	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/metadata"
-	"google.golang.org/grpc/status"
 )
 
 const (
@@ -43,11 +31,6 @@
 	retryInterval         = 50 * time.Millisecond
 )
 
-const (
-	OltAdapter = iota
-	OnuAdapter
-)
-
 var (
 	coreInCompeteMode bool
 )
@@ -74,67 +57,6 @@
 	return context.Background()
 }
 
-//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
-	kvClientPort, err := freeport.GetFreePort()
-	if err != nil {
-		return nil, 0, err
-	}
-	peerPort, err := freeport.GetFreePort()
-	if err != nil {
-		return nil, 0, err
-	}
-	etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
-	if etcdServer == nil {
-		return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
-	}
-	return etcdServer, kvClientPort, nil
-}
-
-func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
-	if server != nil {
-		server.Stop()
-	}
-}
-
-func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
-	addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
-	client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
-	if err != nil {
-		panic("no kv client")
-	}
-	return client
-}
-
-func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
-	var err error
-	var adapter adapters.IAdapter
-	adapterKafkaICProxy := kafka.NewInterContainerProxy(
-		kafka.MsgClient(kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
-	adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
-	var adapterReqHandler *com.RequestHandlerProxy
-	switch adapterType {
-	case OltAdapter:
-		adapter = cm.NewOLTAdapter(adapterCoreProxy)
-	case OnuAdapter:
-		adapter = cm.NewONUAdapter(adapterCoreProxy)
-	default:
-		logger.Fatalf("invalid-adapter-type-%d", adapterType)
-	}
-	adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
-
-	if err = adapterKafkaICProxy.Start(); err != nil {
-		logger.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
-		return nil, err
-	}
-	if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
-		logger.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
-		return nil, err
-	}
-	return adapter, nil
-}
-
 func waitUntilDeviceReadiness(deviceID string,
 	timeout time.Duration,
 	verificationFunction isDeviceConditionSatisfied,
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 634a286..543a19d 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -35,13 +35,12 @@
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
 	"github.com/opencord/voltha-go/rw_core/core/device"
 	cm "github.com/opencord/voltha-go/rw_core/mocks"
+	tst "github.com/opencord/voltha-go/rw_core/test"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
 	mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
-	"github.com/opencord/voltha-lib-go/v3/pkg/version"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/phayes/freeport"
@@ -50,10 +49,6 @@
 	"google.golang.org/grpc/status"
 )
 
-const (
-	coreName = "rw_core"
-)
-
 type NBTest struct {
 	etcdServer        *mock_etcd.EtcdServer
 	deviceMgr         *device.Manager
@@ -77,7 +72,7 @@
 	test := &NBTest{}
 	// Start the embedded etcd server
 	var err error
-	test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.nb.test", "voltha.rwcore.nb.etcd", "error")
+	test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.nb.test", "voltha.rwcore.nb.etcd", "error")
 	if err != nil {
 		logger.Fatal(err)
 	}
@@ -107,7 +102,7 @@
 	cfg.GrpcPort = grpcPort
 	cfg.GrpcHost = "127.0.0.1"
 	setCoreCompeteMode(inCompeteMode)
-	client := setupKVClient(cfg, nb.coreInstanceID)
+	client := tst.SetupKVClient(cfg, nb.coreInstanceID)
 	backend := &db.Backend{
 		Client:                  client,
 		StoreType:               cfg.KVStoreType,
@@ -141,58 +136,6 @@
 	}
 }
 
-func (nb *NBTest) createAndregisterAdapters(t *testing.T) {
-	// Setup the mock OLT adapter
-	oltAdapter, err := createMockAdapter(OltAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.oltAdapterName)
-	if err != nil {
-		logger.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
-	}
-	nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
-	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
-	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
-
-	//	Register the adapter
-	registrationData := &voltha.Adapter{
-		Id:             nb.oltAdapterName,
-		Vendor:         "Voltha-olt",
-		Version:        version.VersionInfo.Version,
-		Type:           nb.oltAdapterName,
-		CurrentReplica: 1,
-		TotalReplicas:  1,
-		Endpoint:       nb.oltAdapterName,
-	}
-	types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
-	deviceTypes := &voltha.DeviceTypes{Items: types}
-	if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
-		logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
-		assert.NotNil(t, err)
-	}
-
-	// Setup the mock ONU adapter
-	onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
-	if err != nil {
-		logger.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
-	}
-	nb.onuAdapter = (onuAdapter).(*cm.ONUAdapter)
-
-	//	Register the adapter
-	registrationData = &voltha.Adapter{
-		Id:             nb.onuAdapterName,
-		Vendor:         "Voltha-onu",
-		Version:        version.VersionInfo.Version,
-		Type:           nb.onuAdapterName,
-		CurrentReplica: 1,
-		TotalReplicas:  1,
-		Endpoint:       nb.onuAdapterName,
-	}
-	types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
-	deviceTypes = &voltha.DeviceTypes{Items: types}
-	if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
-		logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
-		assert.NotNil(t, err)
-	}
-}
-
 func (nb *NBTest) stopAll() {
 	if nb.kClient != nil {
 		nb.kClient.Stop()
@@ -201,7 +144,7 @@
 		nb.kmp.Stop()
 	}
 	if nb.etcdServer != nil {
-		stopEmbeddedEtcdServer(nb.etcdServer)
+		tst.StopEmbeddedEtcdServer(nb.etcdServer)
 	}
 }
 
@@ -1287,7 +1230,9 @@
 	nb.testCoreWithoutData(t, nbi)
 
 	// Create/register the adapters
-	nb.createAndregisterAdapters(t)
+	nb.oltAdapter, nb.onuAdapter = tst.CreateAndregisterAdapters(t, nb.kClient, nb.coreInstanceID, nb.oltAdapterName, nb.onuAdapterName, nb.adapterMgr)
+	nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+	nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
 
 	// 2. Test adapter registration
 	nb.testAdapterRegistration(t, nbi)