[VOL-3005] Separate Flows from Device
Also some unit test functions moved to a test util class.
New loaders and Proxy implementation are applied.
Change-Id: Icf5a6f0a42a2dbaeff768fdb108f5e9b46644977
diff --git a/rw_core/test/utils.go b/rw_core/test/utils.go
new file mode 100644
index 0000000..7d2dda0
--- /dev/null
+++ b/rw_core/test/utils.go
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package test
+
+import (
+ "strconv"
+ "testing"
+
+ "github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ cm "github.com/opencord/voltha-go/rw_core/mocks"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
+ com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+ "github.com/opencord/voltha-lib-go/v3/pkg/version"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "github.com/phayes/freeport"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ OltAdapter = iota
+ OnuAdapter
+)
+
+//CreateMockAdapter creates mock OLT and ONU adapters
+func CreateMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
+ var err error
+ var adapter adapters.IAdapter
+ adapterKafkaICProxy := kafka.NewInterContainerProxy(
+ kafka.MsgClient(kafkaClient),
+ kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
+ adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
+ var adapterReqHandler *com.RequestHandlerProxy
+ switch adapterType {
+ case OltAdapter:
+ adapter = cm.NewOLTAdapter(adapterCoreProxy)
+ case OnuAdapter:
+ adapter = cm.NewONUAdapter(adapterCoreProxy)
+ default:
+ logger.Fatalf("invalid-adapter-type-%d", adapterType)
+ }
+ adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
+
+ if err = adapterKafkaICProxy.Start(); err != nil {
+ logger.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
+ return nil, err
+ }
+ if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
+ logger.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
+ return nil, err
+ }
+ return adapter, nil
+}
+
+//CreateAndregisterAdapters creates mock ONU and OLT adapters and egisters them to rw-core
+func CreateAndregisterAdapters(t *testing.T, kClient kafka.Client, coreInstanceID string, oltAdapterName string, onuAdapterName string, adapterMgr *adapter.Manager) (*cm.OLTAdapter, *cm.ONUAdapter) {
+ // Setup the mock OLT adapter
+ oltAdapter, err := CreateMockAdapter(OltAdapter, kClient, coreInstanceID, "rw_core", oltAdapterName)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltAdapter)
+
+ // Register the adapter
+ registrationData := &voltha.Adapter{
+ Id: oltAdapterName,
+ Vendor: "Voltha-olt",
+ Version: version.VersionInfo.Version,
+ Type: oltAdapterName,
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ Endpoint: oltAdapterName,
+ }
+ types := []*voltha.DeviceType{{Id: oltAdapterName, Adapter: oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
+ deviceTypes := &voltha.DeviceTypes{Items: types}
+ if _, err := adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+ logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
+ assert.NotNil(t, err)
+ }
+
+ // Setup the mock ONU adapter
+ onuAdapter, err := CreateMockAdapter(OnuAdapter, kClient, coreInstanceID, "rw_core", onuAdapterName)
+
+ assert.Nil(t, err)
+ assert.NotNil(t, onuAdapter)
+ // Register the adapter
+ registrationData = &voltha.Adapter{
+ Id: onuAdapterName,
+ Vendor: "Voltha-onu",
+ Version: version.VersionInfo.Version,
+ Type: onuAdapterName,
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ Endpoint: onuAdapterName,
+ }
+ types = []*voltha.DeviceType{{Id: onuAdapterName, Adapter: onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
+ deviceTypes = &voltha.DeviceTypes{Items: types}
+ if _, err := adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+ logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
+ assert.NotNil(t, err)
+ }
+ return oltAdapter.(*cm.OLTAdapter), onuAdapter.(*cm.ONUAdapter)
+}
+
+//StartEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
+func StartEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
+ kvClientPort, err := freeport.GetFreePort()
+ if err != nil {
+ return nil, 0, err
+ }
+ peerPort, err := freeport.GetFreePort()
+ if err != nil {
+ return nil, 0, err
+ }
+ etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+ if etcdServer == nil {
+ return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
+ }
+ return etcdServer, kvClientPort, nil
+}
+
+//StopEmbeddedEtcdServer stops the embedded etcd server
+func StopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
+ if server != nil {
+ server.Stop()
+ }
+}
+
+//SetupKVClient creates a new etcd client
+func SetupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
+ addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
+ client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
+ if err != nil {
+ panic("no kv client")
+ }
+ return client
+}