[VOL-3005] Separate Flows from Device
Also some unit test functions moved to a test util class.
New loaders and Proxy implementation are applied.
Change-Id: Icf5a6f0a42a2dbaeff768fdb108f5e9b46644977
diff --git a/rw_core/core/api/common_test.go b/rw_core/core/api/common_test.go
index f9bb047..097b1c1 100644
--- a/rw_core/core/api/common_test.go
+++ b/rw_core/core/api/common_test.go
@@ -18,24 +18,12 @@
import (
"context"
"fmt"
- "strconv"
"time"
"github.com/golang/protobuf/ptypes/empty"
"github.com/google/uuid"
- "github.com/opencord/voltha-go/rw_core/config"
- cm "github.com/opencord/voltha-go/rw_core/mocks"
- "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
- com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
"github.com/opencord/voltha-protos/v3/go/voltha"
- "github.com/phayes/freeport"
- "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
)
const (
@@ -43,11 +31,6 @@
retryInterval = 50 * time.Millisecond
)
-const (
- OltAdapter = iota
- OnuAdapter
-)
-
var (
coreInCompeteMode bool
)
@@ -74,67 +57,6 @@
return context.Background()
}
-//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
- kvClientPort, err := freeport.GetFreePort()
- if err != nil {
- return nil, 0, err
- }
- peerPort, err := freeport.GetFreePort()
- if err != nil {
- return nil, 0, err
- }
- etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
- if etcdServer == nil {
- return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
- }
- return etcdServer, kvClientPort, nil
-}
-
-func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
- if server != nil {
- server.Stop()
- }
-}
-
-func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
- addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
- client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
- if err != nil {
- panic("no kv client")
- }
- return client
-}
-
-func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
- var err error
- var adapter adapters.IAdapter
- adapterKafkaICProxy := kafka.NewInterContainerProxy(
- kafka.MsgClient(kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
- adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
- var adapterReqHandler *com.RequestHandlerProxy
- switch adapterType {
- case OltAdapter:
- adapter = cm.NewOLTAdapter(adapterCoreProxy)
- case OnuAdapter:
- adapter = cm.NewONUAdapter(adapterCoreProxy)
- default:
- logger.Fatalf("invalid-adapter-type-%d", adapterType)
- }
- adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
-
- if err = adapterKafkaICProxy.Start(); err != nil {
- logger.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
- return nil, err
- }
- if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
- logger.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
- return nil, err
- }
- return adapter, nil
-}
-
func waitUntilDeviceReadiness(deviceID string,
timeout time.Duration,
verificationFunction isDeviceConditionSatisfied,
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 634a286..543a19d 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -35,13 +35,12 @@
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device"
cm "github.com/opencord/voltha-go/rw_core/mocks"
+ tst "github.com/opencord/voltha-go/rw_core/test"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
"github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
- "github.com/opencord/voltha-lib-go/v3/pkg/version"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/phayes/freeport"
@@ -50,10 +49,6 @@
"google.golang.org/grpc/status"
)
-const (
- coreName = "rw_core"
-)
-
type NBTest struct {
etcdServer *mock_etcd.EtcdServer
deviceMgr *device.Manager
@@ -77,7 +72,7 @@
test := &NBTest{}
// Start the embedded etcd server
var err error
- test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.nb.test", "voltha.rwcore.nb.etcd", "error")
+ test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.nb.test", "voltha.rwcore.nb.etcd", "error")
if err != nil {
logger.Fatal(err)
}
@@ -107,7 +102,7 @@
cfg.GrpcPort = grpcPort
cfg.GrpcHost = "127.0.0.1"
setCoreCompeteMode(inCompeteMode)
- client := setupKVClient(cfg, nb.coreInstanceID)
+ client := tst.SetupKVClient(cfg, nb.coreInstanceID)
backend := &db.Backend{
Client: client,
StoreType: cfg.KVStoreType,
@@ -141,58 +136,6 @@
}
}
-func (nb *NBTest) createAndregisterAdapters(t *testing.T) {
- // Setup the mock OLT adapter
- oltAdapter, err := createMockAdapter(OltAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.oltAdapterName)
- if err != nil {
- logger.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
- }
- nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
- nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
- nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
-
- // Register the adapter
- registrationData := &voltha.Adapter{
- Id: nb.oltAdapterName,
- Vendor: "Voltha-olt",
- Version: version.VersionInfo.Version,
- Type: nb.oltAdapterName,
- CurrentReplica: 1,
- TotalReplicas: 1,
- Endpoint: nb.oltAdapterName,
- }
- types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
- deviceTypes := &voltha.DeviceTypes{Items: types}
- if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
- logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
- assert.NotNil(t, err)
- }
-
- // Setup the mock ONU adapter
- onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
- if err != nil {
- logger.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
- }
- nb.onuAdapter = (onuAdapter).(*cm.ONUAdapter)
-
- // Register the adapter
- registrationData = &voltha.Adapter{
- Id: nb.onuAdapterName,
- Vendor: "Voltha-onu",
- Version: version.VersionInfo.Version,
- Type: nb.onuAdapterName,
- CurrentReplica: 1,
- TotalReplicas: 1,
- Endpoint: nb.onuAdapterName,
- }
- types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
- deviceTypes = &voltha.DeviceTypes{Items: types}
- if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
- logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
- assert.NotNil(t, err)
- }
-}
-
func (nb *NBTest) stopAll() {
if nb.kClient != nil {
nb.kClient.Stop()
@@ -201,7 +144,7 @@
nb.kmp.Stop()
}
if nb.etcdServer != nil {
- stopEmbeddedEtcdServer(nb.etcdServer)
+ tst.StopEmbeddedEtcdServer(nb.etcdServer)
}
}
@@ -1287,7 +1230,9 @@
nb.testCoreWithoutData(t, nbi)
// Create/register the adapters
- nb.createAndregisterAdapters(t)
+ nb.oltAdapter, nb.onuAdapter = tst.CreateAndregisterAdapters(t, nb.kClient, nb.coreInstanceID, nb.oltAdapterName, nb.onuAdapterName, nb.adapterMgr)
+ nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+ nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
// 2. Test adapter registration
nb.testAdapterRegistration(t, nbi)