[VOL-4292] OpenOLT Adapter changes for gRPC migration
Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/cmd/openolt-adapter/main_test.go b/cmd/openolt-adapter/main_test.go
index 617007d..f33efd0 100644
--- a/cmd/openolt-adapter/main_test.go
+++ b/cmd/openolt-adapter/main_test.go
@@ -17,30 +17,19 @@
import (
"context"
- "errors"
"testing"
- "time"
- conf "github.com/opencord/voltha-lib-go/v6/pkg/config"
-
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-
- "github.com/opencord/voltha-lib-go/v6/pkg/kafka"
+ conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ mgrpc "github.com/opencord/voltha-lib-go/v7/pkg/mocks/grpc"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
- "github.com/opencord/voltha-openolt-adapter/pkg/mocks"
- ca "github.com/opencord/voltha-protos/v4/go/inter_container"
"go.etcd.io/etcd/pkg/mock/mockserver"
)
func newMockAdapter() *adapter {
- conf := config.NewAdapterFlags()
- conf.KVStoreType = "etcd"
- cp := mocks.MockCoreProxy{}
- ap := mocks.MockAdapterProxy{}
- ad := newAdapter(conf)
- ad.coreProxy = &cp
- ad.adapterProxy = &ap
+ cf := config.NewAdapterFlags()
+ cf.KVStoreType = "etcd"
+ ad := newAdapter(cf)
return ad
}
func Test_adapter_setKVClient(t *testing.T) {
@@ -85,24 +74,27 @@
func Test_registerWithCore(t *testing.T) {
ad := newMockAdapter()
ctx := context.TODO()
- err := ad.registerWithCore(ctx, 1)
+ // Create a and start a mock Core GRPC server
+ ms, err := mgrpc.NewMockGRPCServer(ctx)
+ if err != nil {
+ t.Errorf("grpc server: expected error:nil, got error: %v", err)
+ }
+ ms.AddCoreService(ctx, &vgrpc.MockCoreServiceHandler{})
+ go ms.Start(ctx)
+ defer ms.Stop()
+
+ if ad.coreClient, err = vgrpc.NewClient(ms.ApiEndpoint,
+ ad.coreRestarted); err != nil {
+ t.Errorf("grpc client: expected error:nil, got error: %v", err)
+ }
+ // Start the core grpc client
+ go ad.coreClient.Start(ctx, setAndTestCoreServiceHandler)
+ defer ad.coreClient.Stop(ctx)
+ err = ad.registerWithCore(ctx, coreService, 1)
if err != nil {
t.Errorf("Expected error:nil, got error: %v", err)
}
}
-func Test_startInterContainerProxy(t *testing.T) {
- ad := newMockAdapter()
- kc := &mockKafkaClient{}
- ad.kafkaClient = kc
- ctx := context.TODO()
- icp, err := ad.startInterContainerProxy(ctx, 1)
- if icp != nil {
- t.Log("Intercontainer proxy ", icp)
- }
- if err != nil {
- t.Errorf("err %v", err)
- }
-}
func Test_startOpenOLT(t *testing.T) {
a, _ := mockserver.StartMockServers(1)
@@ -111,8 +103,7 @@
defer a.StopAt(0)
ad := newMockAdapter()
- oolt, err := ad.startOpenOLT(context.TODO(), nil,
- ad.coreProxy, ad.adapterProxy, ad.eventProxy, ad.config, cm)
+ oolt, err := ad.startOpenOLT(context.TODO(), nil, ad.eventProxy, ad.config, cm)
if oolt != nil {
t.Log("Open OLT ", oolt)
}
@@ -136,8 +127,8 @@
wantErr bool
}{
// TODO: Add test cases.
- {"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaAdapterAddress}, false},
- {"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaAdapterAddress}, false},
+ {"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaClusterAddress}, false},
+ {"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaClusterAddress}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -150,83 +141,3 @@
})
}
}
-
-func Test_adapter_setupRequestHandler(t *testing.T) {
-
- ad := newMockAdapter()
- cm := &conf.ConfigManager{}
-
- kip := kafka.NewInterContainerProxy(
- kafka.InterContainerAddress(ad.config.KafkaAdapterAddress),
- kafka.MsgClient(&mockKafkaClient{}),
- kafka.DefaultTopic(&kafka.Topic{Name: ad.config.Topic}))
-
- ad.kip = kip
- _ = ad.kip.Start(context.Background())
-
- oolt, _ := ad.startOpenOLT(context.TODO(), nil,
- ad.coreProxy, ad.adapterProxy, ad.eventProxy, ad.config, cm)
- printBanner()
- printVersion()
- ctx := context.TODO()
- if err := ad.setupRequestHandler(ctx, ad.config.InstanceID, oolt); err != nil {
- t.Logf("adapter.setupRequestHandler() error = %v", err)
- }
-
-}
-
-// Kafka client mocker
-type mockKafkaClient struct {
-}
-
-func (kc *mockKafkaClient) Start(ctx context.Context) error {
- return nil
-}
-func (kc *mockKafkaClient) Stop(ctx context.Context) {
-}
-func (kc *mockKafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
- if topic != nil {
- return nil
- }
- return errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
- if topic != nil {
- return nil
- }
- return errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ca.InterContainerMessage, error) {
- if topic != nil {
- ch := make(chan *ca.InterContainerMessage)
- return ch, nil
- }
- return nil, errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ca.InterContainerMessage) error {
- if topic == nil {
- return nil
- }
- return errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
- if topic != nil {
- return nil
- }
- return errors.New("invalid topic")
-}
-
-func (kc *mockKafkaClient) SendLiveness(ctx context.Context) error {
- return status.Error(codes.Unimplemented, "SendLiveness")
-}
-
-func (kc *mockKafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
- return nil
-}
-
-func (kc *mockKafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
- return nil
-}
-
-func (kc *mockKafkaClient) SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time)) {
-}