[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/test/utils.go b/rw_core/test/utils.go
index 8cf4e2f..8890295 100644
--- a/rw_core/test/utils.go
+++ b/rw_core/test/utils.go
@@ -18,20 +18,27 @@
 package test
 
 import (
+	"bufio"
 	"context"
+	"encoding/json"
+	"fmt"
+	"os"
+	"path/filepath"
+	"strings"
 	"testing"
+	"time"
+
+	ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+
+	"math/rand"
 
 	"github.com/opencord/voltha-go/rw_core/config"
-	"github.com/opencord/voltha-go/rw_core/core/adapter"
 	cm "github.com/opencord/voltha-go/rw_core/mocks"
-	"github.com/opencord/voltha-lib-go/v5/pkg/adapters"
-	com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
-	"github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
-	"github.com/opencord/voltha-lib-go/v5/pkg/log"
-	mock_etcd "github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd"
-	"github.com/opencord/voltha-lib-go/v5/pkg/version"
-	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	mock_etcd "github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
+	"github.com/opencord/voltha-lib-go/v7/pkg/version"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"github.com/phayes/freeport"
 	"github.com/stretchr/testify/assert"
 	"google.golang.org/grpc/codes"
@@ -43,82 +50,236 @@
 	OnuAdapter
 )
 
-//CreateMockAdapter creates mock OLT and ONU adapters
-func CreateMockAdapter(ctx context.Context, adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
-	var err error
-	var adapter adapters.IAdapter
-	adapterKafkaICProxy := kafka.NewInterContainerProxy(
-		kafka.MsgClient(kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
-	adapterCoreProxy := com.NewCoreProxy(ctx, adapterKafkaICProxy, adapterName, coreName)
-	var adapterReqHandler *com.RequestHandlerProxy
+type AdapterInfo struct {
+	TotalReplica    int32
+	Vendor          string
+	DeviceType      string
+	ChildDeviceType string
+	ChildVendor     string
+}
+
+// prettyPrintDeviceUpdateLog is used just for debugging and exploring the Core logs
+func prettyPrintDeviceUpdateLog(inputFile string, deviceID string) {
+	file, err := os.Open(filepath.Clean(inputFile))
+	if err != nil {
+		logger.Fatal(context.Background(), err)
+	}
+	defer func() {
+		if err := file.Close(); err != nil {
+			logger.Errorw(context.Background(), "file-close-error", log.Fields{"error": err})
+		}
+	}()
+
+	var logEntry struct {
+		Level       string `json:"level"`
+		Ts          string `json:"ts"`
+		Caller      string `json:"caller"`
+		Msg         string `json:"msg"`
+		RPC         string `json:"rpc"`
+		DeviceID    string `json:"device-id"`
+		RequestedBy string `json:"requested-by"`
+		StateChange string `json:"state-change"`
+		Status      string `json:"status"`
+		Description string `json:"description"`
+	}
+
+	scanner := bufio.NewScanner(file)
+	fmt.Println("Timestamp\t\t\tDeviceId\t\t\t\tStatus\t\t\tRPC\t\t\tRequestedBy\t\t\tStateChange\t\t\tDescription")
+	for scanner.Scan() {
+		input := scanner.Text()
+		// Look for device update logs only
+		if !strings.Contains(input, "device-operation") || !strings.Contains(input, "requested-by") {
+			continue
+		}
+		// Check if deviceID is required
+		if deviceID != "" {
+			if !strings.Contains(input, deviceID) {
+				continue
+			}
+		}
+		if err := json.Unmarshal([]byte(input), &logEntry); err != nil {
+			logger.Fatal(context.Background(), err)
+		}
+		fmt.Println(
+			fmt.Sprintf(
+				"%s\t%s\t%s\t%-30.30q\t%-16.16s\t%-25.25s\t%s",
+				logEntry.Ts,
+				logEntry.DeviceID,
+				logEntry.Status,
+				logEntry.RPC,
+				logEntry.RequestedBy,
+				logEntry.StateChange,
+				logEntry.Description))
+	}
+}
+
+func omciLog(inputFile string, deviceID string) {
+	file, err := os.Open(filepath.Clean(inputFile))
+	if err != nil {
+		logger.Fatal(context.Background(), err)
+	}
+	defer func() {
+		if err := file.Close(); err != nil {
+			logger.Errorw(context.Background(), "file-close-error", log.Fields{"error": err})
+		}
+	}()
+
+	var logEntry struct {
+		Level         string `json:"level"`
+		Ts            string `json:"ts"`
+		Caller        string `json:"caller"`
+		Msg           string `json:"msg"`
+		InstanceID    string `json:"instanceId"`
+		ChildDeviceID string `json:"child-device-id"`
+		OmciMsg       string `json:"omciMsg"`
+		IntfID        string `json:"intf-id"`
+		OnuID         string `json:"onu-id"`
+		OmciTrns      int    `json:"omciTransactionID"`
+	}
+
+	scanner := bufio.NewScanner(file)
+	uniqueTnsIDs := map[int]int{}
+	for scanner.Scan() {
+		input := scanner.Text()
+		// Look for device update logs only
+		if !strings.Contains(input, "sent-omci-msg") {
+			continue
+		}
+		// Check if deviceID is required
+		if deviceID != "" {
+			if !strings.Contains(input, deviceID) {
+				continue
+			}
+		}
+		if err := json.Unmarshal([]byte(input), &logEntry); err != nil {
+			logger.Fatal(context.Background(), err)
+		}
+		uniqueTnsIDs[logEntry.OmciTrns]++
+	}
+	repeatedTrnsID := []int{}
+	for k, v := range uniqueTnsIDs {
+		if v != 1 {
+			repeatedTrnsID = append(repeatedTrnsID, k)
+		}
+	}
+	fmt.Println("RepeatedIDs", repeatedTrnsID, "TransID:", len(uniqueTnsIDs))
+}
+
+//CreateMockAdapter creates mock OLT and ONU adapters - this will automatically the grpc service hosted by that
+// adapter
+func CreateMockAdapter(
+	ctx context.Context,
+	adapterType int,
+	coreEndpoint string,
+	deviceType string,
+	vendor string,
+	childDeviceType string,
+	childVendor string,
+) (interface{}, error) {
+
+	var adpt interface{}
 	switch adapterType {
 	case OltAdapter:
-		adapter = cm.NewOLTAdapter(ctx, adapterCoreProxy)
+		adpt = cm.NewOLTAdapter(ctx, coreEndpoint, deviceType, vendor, childDeviceType, childVendor)
 	case OnuAdapter:
-		adapter = cm.NewONUAdapter(ctx, adapterCoreProxy)
+		adpt = cm.NewONUAdapter(ctx, coreEndpoint, deviceType, vendor)
 	default:
 		logger.Fatalf(ctx, "invalid-adapter-type-%d", adapterType)
 	}
-	adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
-
-	if err = adapterKafkaICProxy.Start(ctx); err != nil {
-		logger.Errorw(ctx, "Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
-		return nil, err
-	}
-	if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
-		logger.Errorw(ctx, "Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
-		return nil, err
-	}
-	return adapter, nil
+	return adpt, nil
 }
 
-//CreateAndregisterAdapters creates mock ONU and OLT adapters and egisters them to rw-core
-func CreateAndregisterAdapters(ctx context.Context, t *testing.T, kClient kafka.Client, coreInstanceID string, oltAdapterName string, onuAdapterName string, adapterMgr *adapter.Manager) (*cm.OLTAdapter, *cm.ONUAdapter) {
-	// Setup the mock OLT adapter
-	oltAdapter, err := CreateMockAdapter(ctx, OltAdapter, kClient, coreInstanceID, "rw_core", oltAdapterName)
-	assert.Nil(t, err)
-	assert.NotNil(t, oltAdapter)
+//CreateAndRegisterAdapters creates mock ONU and OLT adapters and registers them to rw-core
+func CreateAndRegisterAdapters(
+	ctx context.Context,
+	t *testing.T,
+	oltAdapters map[string]*AdapterInfo,
+	onuAdapters map[string]*AdapterInfo,
+	coreEndpoint string,
+) (map[string][]*cm.OLTAdapter, map[string][]*cm.ONUAdapter) {
+	// Setup the ONU adapter first in this unit test environment.  This makes it easier to test whether the
+	// Core is ready to send grpc requests to the adapters.  The unit test uses grpc to communicate with the
+	// Core and as such it does not have inside knowledge when the adapters are ready.
 
-	//	Register the adapter
-	registrationData := &voltha.Adapter{
-		Id:             oltAdapterName,
-		Vendor:         "Voltha-olt",
-		Version:        version.VersionInfo.Version,
-		Type:           oltAdapterName,
-		CurrentReplica: 1,
-		TotalReplicas:  1,
-		Endpoint:       oltAdapterName,
-	}
-	types := []*voltha.DeviceType{{Id: oltAdapterName, Adapter: oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
-	deviceTypes := &voltha.DeviceTypes{Items: types}
-	if _, err := adapterMgr.RegisterAdapter(ctx, registrationData, deviceTypes); err != nil {
-		logger.Errorw(ctx, "failed-to-register-adapter", log.Fields{"error": err})
-		assert.NotNil(t, err)
+	// Setup the ONU Adapters
+	onuAdaptersMap := make(map[string][]*cm.ONUAdapter)
+	for adapterType, adapterInfo := range onuAdapters {
+		for replica := int32(1); replica <= adapterInfo.TotalReplica; replica++ {
+			adpt, err := CreateMockAdapter(ctx, OnuAdapter, coreEndpoint, adapterInfo.DeviceType, adapterInfo.Vendor, adapterInfo.ChildDeviceType, adapterInfo.ChildVendor)
+			assert.Nil(t, err)
+			onuAdapter, ok := adpt.(*cm.ONUAdapter)
+			assert.True(t, ok)
+			assert.NotNil(t, onuAdapter)
+			//	Register the adapter
+			adapterID := fmt.Sprintf("%s-%d", adapterType, replica)
+			adapterToRegister := &voltha.Adapter{
+				Id:             adapterID,
+				Vendor:         adapterInfo.Vendor,
+				Version:        version.VersionInfo.Version,
+				Type:           adapterType,
+				CurrentReplica: replica,
+				TotalReplicas:  adapterInfo.TotalReplica,
+				Endpoint:       onuAdapter.GetEndPoint(),
+			}
+			types := []*voltha.DeviceType{{Id: adapterInfo.DeviceType, AdapterType: adapterType, AcceptsAddRemoveFlowUpdates: true}}
+			deviceTypes := &voltha.DeviceTypes{Items: types}
+			coreClient, err := onuAdapter.GetCoreClient()
+			assert.Nil(t, err)
+			assert.NotNil(t, coreClient)
+			if _, err := coreClient.RegisterAdapter(ctx, &ic.AdapterRegistration{
+				Adapter: adapterToRegister,
+				DTypes:  deviceTypes}); err != nil {
+				logger.Errorw(ctx, "failed-to-register-adapter", log.Fields{"error": err, "adapter": adapterToRegister.Id})
+				assert.NotNil(t, err)
+			}
+			if _, ok := onuAdaptersMap[adapterType]; !ok {
+				onuAdaptersMap[adapterType] = []*cm.ONUAdapter{}
+			}
+			onuAdaptersMap[adapterType] = append(onuAdaptersMap[adapterType], onuAdapter)
+		}
 	}
 
-	// Setup the mock ONU adapter
-	onuAdapter, err := CreateMockAdapter(ctx, OnuAdapter, kClient, coreInstanceID, "rw_core", onuAdapterName)
+	// Setup the OLT Adapters
+	oltAdaptersMap := make(map[string][]*cm.OLTAdapter)
+	for adapterType, adapterInfo := range oltAdapters {
+		for replica := int32(1); replica <= adapterInfo.TotalReplica; replica++ {
+			adpt, err := CreateMockAdapter(ctx, OltAdapter, coreEndpoint, adapterInfo.DeviceType, adapterInfo.Vendor, adapterInfo.ChildDeviceType, adapterInfo.ChildVendor)
+			assert.Nil(t, err)
+			oltAdapter, ok := adpt.(*cm.OLTAdapter)
+			assert.True(t, ok)
+			assert.NotNil(t, oltAdapter)
 
-	assert.Nil(t, err)
-	assert.NotNil(t, onuAdapter)
-	//	Register the adapter
-	registrationData = &voltha.Adapter{
-		Id:             onuAdapterName,
-		Vendor:         "Voltha-onu",
-		Version:        version.VersionInfo.Version,
-		Type:           onuAdapterName,
-		CurrentReplica: 1,
-		TotalReplicas:  1,
-		Endpoint:       onuAdapterName,
+			//	Register the adapter
+			adapterID := fmt.Sprintf("%s-%d", adapterType, replica)
+			adapterToRegister := &voltha.Adapter{
+				Id:             adapterID,
+				Vendor:         adapterInfo.Vendor,
+				Version:        version.VersionInfo.Version,
+				Type:           adapterType,
+				CurrentReplica: replica,
+				TotalReplicas:  adapterInfo.TotalReplica,
+				Endpoint:       oltAdapter.GetEndPoint(),
+			}
+			types := []*voltha.DeviceType{{Id: adapterInfo.DeviceType, AdapterType: adapterType, AcceptsAddRemoveFlowUpdates: true}}
+			deviceTypes := &voltha.DeviceTypes{Items: types}
+			coreClient, err := oltAdapter.GetCoreClient()
+			assert.Nil(t, err)
+			assert.NotNil(t, coreClient)
+
+			if _, err := coreClient.RegisterAdapter(ctx, &ic.AdapterRegistration{
+				Adapter: adapterToRegister,
+				DTypes:  deviceTypes}); err != nil {
+				logger.Errorw(ctx, "failed-to-register-adapter", log.Fields{"error": err, "adapter": adapterToRegister.Id})
+				assert.NotNil(t, err)
+			}
+			if _, ok := oltAdaptersMap[adapterType]; !ok {
+				oltAdaptersMap[adapterType] = []*cm.OLTAdapter{}
+			}
+			oltAdaptersMap[adapterType] = append(oltAdaptersMap[adapterType], oltAdapter)
+		}
 	}
-	types = []*voltha.DeviceType{{Id: onuAdapterName, Adapter: onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
-	deviceTypes = &voltha.DeviceTypes{Items: types}
-	if _, err := adapterMgr.RegisterAdapter(ctx, registrationData, deviceTypes); err != nil {
-		logger.Errorw(ctx, "failed-to-register-adapter", log.Fields{"error": err})
-		assert.NotNil(t, err)
-	}
-	return oltAdapter.(*cm.OLTAdapter), onuAdapter.(*cm.ONUAdapter)
+
+	return oltAdaptersMap, onuAdaptersMap
 }
 
 //StartEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
@@ -153,3 +314,16 @@
 	}
 	return client
 }
+
+//getRandomMacAddress returns a random mac address
+func getRandomMacAddress() string {
+	rand.Seed(time.Now().UnixNano() / int64(rand.Intn(255)+1))
+	return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x",
+		rand.Intn(255),
+		rand.Intn(255),
+		rand.Intn(255),
+		rand.Intn(255),
+		rand.Intn(255),
+		rand.Intn(255),
+	)
+}