[VOL-4292] OpenOLT Adapter changes for gRPC migration
Change-Id: I5af2125f2c2f53ffc78c474a94314bba408f8bae
diff --git a/cmd/openolt-adapter/common.go b/cmd/openolt-adapter/common.go
index 2176fa3..4011de4 100644
--- a/cmd/openolt-adapter/common.go
+++ b/cmd/openolt-adapter/common.go
@@ -18,7 +18,7 @@
package main
import (
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
)
var logger log.CLogger
diff --git a/cmd/openolt-adapter/main.go b/cmd/openolt-adapter/main.go
index b4fe6dd..148f3bd 100644
--- a/cmd/openolt-adapter/main.go
+++ b/cmd/openolt-adapter/main.go
@@ -26,36 +26,43 @@
"syscall"
"time"
- "github.com/opencord/voltha-lib-go/v6/pkg/adapters"
- "github.com/opencord/voltha-lib-go/v6/pkg/adapters/adapterif"
- com "github.com/opencord/voltha-lib-go/v6/pkg/adapters/common"
- conf "github.com/opencord/voltha-lib-go/v6/pkg/config"
- "github.com/opencord/voltha-lib-go/v6/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v6/pkg/events"
- "github.com/opencord/voltha-lib-go/v6/pkg/events/eventif"
- "github.com/opencord/voltha-lib-go/v6/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- "github.com/opencord/voltha-lib-go/v6/pkg/probe"
- "github.com/opencord/voltha-lib-go/v6/pkg/version"
+ "github.com/golang/protobuf/ptypes/empty"
+ conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
+ "github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v7/pkg/events"
+ "github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ "github.com/opencord/voltha-lib-go/v7/pkg/version"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
ac "github.com/opencord/voltha-openolt-adapter/internal/pkg/core"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-protos/v5/go/adapter_services"
+ "github.com/opencord/voltha-protos/v5/go/core"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/grpc"
+)
+
+const (
+ clusterMessagingService = "cluster-message-service"
+ oltAdapterService = "olt-adapter-service"
+ kvService = "kv-service"
+ coreService = "core-service"
)
type adapter struct {
- instanceID string
- config *config.AdapterFlags
- iAdapter adapters.IAdapter
- kafkaClient kafka.Client
- kvClient kvstore.Client
- kip kafka.InterContainerProxy
- coreProxy adapterif.CoreProxy
- adapterProxy adapterif.AdapterProxy
- eventProxy eventif.EventProxy
- halted bool
- exitChannel chan int
- receiverChannels []<-chan *ic.InterContainerMessage
+ instanceID string
+ config *config.AdapterFlags
+ grpcServer *vgrpc.GrpcServer
+ oltAdapter *ac.OpenOLT
+ kafkaClient kafka.Client
+ kvClient kvstore.Client
+ coreClient *vgrpc.Client
+ eventProxy eventif.EventProxy
+ halted bool
+ exitChannel chan int
}
func newAdapter(cf *config.AdapterFlags) *adapter {
@@ -64,7 +71,6 @@
a.config = cf
a.halted = false
a.exitChannel = make(chan int, 1)
- a.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
return &a
}
@@ -78,11 +84,10 @@
p = value.(*probe.Probe)
p.RegisterService(
ctx,
- "message-bus",
- "kv-store",
- "container-proxy",
- "core-request-handler",
- "register-with-core",
+ clusterMessagingService,
+ kvService,
+ oltAdapterService,
+ coreService,
)
}
}
@@ -94,7 +99,7 @@
}
if p != nil {
- p.UpdateStatus(ctx, "kv-store", probe.ServiceStatusRunning)
+ p.UpdateStatus(ctx, kvService, probe.ServiceStatusRunning)
}
// Setup Log Config
@@ -104,42 +109,52 @@
go conf.StartLogFeaturesConfigProcessing(cm, ctx)
// Setup Kafka Client
- if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterAddress); err != nil {
+ if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaClusterAddress); err != nil {
logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
}
- if p != nil {
- p.UpdateStatus(ctx, "message-bus", probe.ServiceStatusRunning)
+ // Start kafka communication with the broker
+ if err := kafka.StartAndWaitUntilKafkaConnectionIsUp(ctx, a.kafkaClient, a.config.HeartbeatCheckInterval, clusterMessagingService); err != nil {
+ logger.Fatal(ctx, "unable-to-connect-to-kafka")
}
- // setup endpointManager
-
- // Start the common InterContainer Proxy - retries indefinitely
- if a.kip, err = a.startInterContainerProxy(ctx, -1); err != nil {
- logger.Fatal(ctx, "error-starting-inter-container-proxy")
- }
-
- // Create the core proxy to handle requests to the Core
- a.coreProxy = com.NewCoreProxy(ctx, a.kip, a.config.Topic, a.config.CoreTopic)
-
- // Create the adaptor proxy to handle request between olt and onu
- a.adapterProxy = com.NewAdapterProxy(ctx, a.kip, a.config.CoreTopic, cm.Backend)
-
// Create the event proxy to post events to KAFKA
a.eventProxy = events.NewEventProxy(events.MsgClient(a.kafkaClient), events.MsgTopic(kafka.Topic{Name: a.config.EventTopic}))
+ go func() {
+ if err := a.eventProxy.Start(); err != nil {
+ logger.Fatalw(ctx, "event-proxy-cannot-start", log.Fields{"error": err})
+ }
+ }()
+
+ // Create the Core client to handle requests to the Core. Note that the coreClient is an interface and needs to be
+ // cast to the appropriate grpc client by invoking GetCoreGrpcClient on the a.coreClient
+ if a.coreClient, err = vgrpc.NewClient(a.config.CoreEndpoint,
+ a.coreRestarted,
+ vgrpc.ActivityCheck(true)); err != nil {
+ logger.Fatal(ctx, "grpc-client-not-created")
+ }
+ // Start the core grpc client
+ go a.coreClient.Start(ctx, setAndTestCoreServiceHandler)
// Create the open OLT adapter
- if a.iAdapter, err = a.startOpenOLT(ctx, a.kip, a.coreProxy, a.adapterProxy, a.eventProxy, a.config, cm); err != nil {
+ if a.oltAdapter, err = a.startOpenOLT(ctx, a.coreClient, a.eventProxy, a.config, cm); err != nil {
logger.Fatalw(ctx, "error-starting-openolt", log.Fields{"error": err})
}
- // Register the core request handler
- if err = a.setupRequestHandler(ctx, a.instanceID, a.iAdapter); err != nil {
- logger.Fatalw(ctx, "error-setting-core-request-handler", log.Fields{"error": err})
- }
+ // Create and start the grpc server
+ a.grpcServer = vgrpc.NewGrpcServer(a.config.GrpcAddress, nil, false, p)
+
+ //Register the adapter service
+ a.addAdapterService(ctx, a.grpcServer, a.oltAdapter)
+
+ //Register the olt inter-adapter service
+ a.addOltInterAdapterService(ctx, a.grpcServer, a.oltAdapter)
+
+ // Start the grpc server
+ go a.startGRPCService(ctx, a.grpcServer, oltAdapterService)
// Register this adapter to the Core - retries indefinitely
- if err = a.registerWithCore(ctx, -1); err != nil {
+ if err = a.registerWithCore(ctx, coreService, -1); err != nil {
logger.Fatal(ctx, "error-registering-with-core")
}
@@ -147,13 +162,28 @@
a.checkServicesReadiness(ctx)
}
+// TODO: Any action the adapter needs to do following a Core restart?
+func (a *adapter) coreRestarted(ctx context.Context, endPoint string) error {
+ logger.Errorw(ctx, "core-restarted", log.Fields{"endpoint": endPoint})
+ return nil
+}
+
+// setAndTestCoreServiceHandler is used to test whether the remote gRPC service is up
+func setAndTestCoreServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+ svc := core.NewCoreServiceClient(conn)
+ if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
+ return nil
+ }
+ return svc
+}
+
/**
This function checks the liveliness and readiness of the kakfa and kv-client services
and update the status in the probe.
*/
func (a *adapter) checkServicesReadiness(ctx context.Context) {
// checks the kafka readiness
- go a.checkKafkaReadiness(ctx)
+ go kafka.MonitorKafkaReadiness(ctx, a.kafkaClient, a.config.LiveProbeInterval, a.config.NotLiveProbeInterval, clusterMessagingService)
// checks the kv-store readiness
go a.checkKvStoreReadiness(ctx)
@@ -176,11 +206,11 @@
case liveliness := <-kvStoreChannel:
if !liveliness {
// kv-store not reachable or down, updating the status to not ready state
- probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusNotReady)
+ probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusNotReady)
timeout = a.config.NotLiveProbeInterval
} else {
// kv-store is reachable , updating the status to running state
- probe.UpdateStatusFromContext(ctx, "kv-store", probe.ServiceStatusRunning)
+ probe.UpdateStatusFromContext(ctx, kvService, probe.ServiceStatusRunning)
timeout = a.config.LiveProbeInterval / 2
}
// Check if the timer has expired or not
@@ -199,61 +229,6 @@
}
}
-/**
-This function checks the liveliness and readiness of the kafka service
-and update the status in the probe.
-*/
-func (a *adapter) checkKafkaReadiness(ctx context.Context) {
- livelinessChannel := a.kafkaClient.EnableLivenessChannel(ctx, true)
- healthinessChannel := a.kafkaClient.EnableHealthinessChannel(ctx, true)
- timeout := a.config.LiveProbeInterval
- failed := false
- for {
- timeoutTimer := time.NewTimer(timeout)
-
- select {
- case healthiness := <-healthinessChannel:
- if !healthiness {
- // This will eventually cause K8s to restart the container, and will do
- // so in a way that allows cleanup to continue, rather than an immediate
- // panic and exit here.
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusFailed)
- failed = true
- }
- // Check if the timer has expired or not
- if !timeoutTimer.Stop() {
- <-timeoutTimer.C
- }
- case liveliness := <-livelinessChannel:
- if failed {
- // Failures of the message bus are permanent and can't ever be recovered from,
- // so make sure we never inadvertently reset a failed state back to unready.
- } else if !liveliness {
- // kafka not reachable or down, updating the status to not ready state
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- timeout = a.config.NotLiveProbeInterval
- } else {
- // kafka is reachable , updating the status to running state
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
- timeout = a.config.LiveProbeInterval
- }
- // Check if the timer has expired or not
- if !timeoutTimer.Stop() {
- <-timeoutTimer.C
- }
- case <-timeoutTimer.C:
- logger.Info(ctx, "kafka-proxy-liveness-recheck")
- // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
- // the liveness probe may wait (and block) writing to our channel.
- err := a.kafkaClient.SendLiveness(ctx)
- if err != nil {
- // Catch possible error case if sending liveness after Sarama has been stopped.
- logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
- }
- }
- }
-}
-
func (a *adapter) stop(ctx context.Context) {
// Stop leadership tracking
a.halted = true
@@ -271,10 +246,21 @@
a.kvClient.Close(ctx)
}
- if a.kip != nil {
- a.kip.Stop(ctx)
+ if a.eventProxy != nil {
+ a.eventProxy.Stop()
}
+ if a.kafkaClient != nil {
+ a.kafkaClient.Stop(ctx)
+ }
+
+ // Stop core client
+ if a.coreClient != nil {
+ a.coreClient.Stop(ctx)
+ }
+
+ // TODO: Stop child devices connections
+
// TODO: More cleanup
}
@@ -316,39 +302,38 @@
return nil
}
-func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
- logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"address": a.config.KafkaAdapterAddress,
- "topic": a.config.Topic})
- var err error
- kip := kafka.NewInterContainerProxy(
- kafka.InterContainerAddress(a.config.KafkaAdapterAddress),
- kafka.MsgClient(a.kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
- count := 0
- for {
- if err = kip.Start(ctx); err != nil {
- logger.Warnw(ctx, "error-starting-messaging-proxy", log.Fields{"error": err})
- if retries == count {
- return nil, err
- }
- count = +1
- // Take a nap before retrying
- time.Sleep(2 * time.Second)
- } else {
- break
- }
- }
- probe.UpdateStatusFromContext(ctx, "container-proxy", probe.ServiceStatusRunning)
- logger.Info(ctx, "common-messaging-proxy-created")
- return kip, nil
+// startGRPCService creates the grpc service handlers, registers it to the grpc server and starts the server
+func (a *adapter) startGRPCService(ctx context.Context, server *vgrpc.GrpcServer, serviceName string) {
+ logger.Infow(ctx, "starting-grpc-service", log.Fields{"service": serviceName})
+
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Infow(ctx, "grpc-service-started", log.Fields{"service": serviceName})
+
+ server.Start(ctx)
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusStopped)
}
-func (a *adapter) startOpenOLT(ctx context.Context, kip kafka.InterContainerProxy,
- cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep eventif.EventProxy,
+func (a *adapter) addAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_services.AdapterServiceServer) {
+ logger.Info(ctx, "adding-adapter-service")
+
+ server.AddService(func(gs *grpc.Server) {
+ adapter_services.RegisterAdapterServiceServer(gs, handler)
+ })
+}
+
+func (a *adapter) addOltInterAdapterService(ctx context.Context, server *vgrpc.GrpcServer, handler adapter_services.OltInterAdapterServiceServer) {
+ logger.Info(ctx, "adding-olt-inter-adapter-service")
+
+ server.AddService(func(gs *grpc.Server) {
+ adapter_services.RegisterOltInterAdapterServiceServer(gs, handler)
+ })
+}
+
+func (a *adapter) startOpenOLT(ctx context.Context, cc *vgrpc.Client, ep eventif.EventProxy,
cfg *config.AdapterFlags, cm *conf.ConfigManager) (*ac.OpenOLT, error) {
logger.Info(ctx, "starting-open-olt")
var err error
- sOLT := ac.NewOpenOLT(ctx, a.kip, cp, ap, ep, cfg, cm)
+ sOLT := ac.NewOpenOLT(ctx, cc, ep, cfg, cm)
if err = sOLT.Start(ctx); err != nil {
return nil, err
@@ -358,19 +343,7 @@
return sOLT, nil
}
-func (a *adapter) setupRequestHandler(ctx context.Context, coreInstanceID string, iadapter adapters.IAdapter) error {
- logger.Info(ctx, "setting-request-handler")
- requestProxy := com.NewRequestHandlerProxy(coreInstanceID, iadapter, a.coreProxy)
- if err := a.kip.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: a.config.Topic}, requestProxy); err != nil {
- return err
-
- }
- probe.UpdateStatusFromContext(ctx, "core-request-handler", probe.ServiceStatusRunning)
- logger.Info(ctx, "request-handler-setup-done")
- return nil
-}
-
-func (a *adapter) registerWithCore(ctx context.Context, retries int) error {
+func (a *adapter) registerWithCore(ctx context.Context, serviceName string, retries int) error {
adapterID := fmt.Sprintf("openolt_%d", a.config.CurrentReplica)
logger.Infow(ctx, "registering-with-core", log.Fields{
"adapterID": adapterID,
@@ -381,34 +354,38 @@
Id: adapterID, // Unique name for the device type
Vendor: "VOLTHA OpenOLT",
Version: version.VersionInfo.Version,
- // TODO once we'll be ready to support multiple versions of the OpenOLT adapter
- // the Endpoint will have to change to `openolt_<currentReplica`>
- Endpoint: a.config.Topic,
+ // The Endpoint refers to the address this service is listening on.
+ Endpoint: a.config.AdapterEndpoint,
Type: "openolt",
CurrentReplica: int32(a.config.CurrentReplica),
TotalReplicas: int32(a.config.TotalReplicas),
}
types := []*voltha.DeviceType{{
Id: "openolt",
- Adapter: "openolt", // Type of the adapter that handles device type
+ AdapterType: "openolt", // Type of the adapter that handles device type
+ Adapter: "openolt", // Deprecated attribute
AcceptsBulkFlowUpdate: false, // Currently openolt adapter does not support bulk flow handling
AcceptsAddRemoveFlowUpdates: true}}
deviceTypes := &voltha.DeviceTypes{Items: types}
count := 0
for {
- if err := a.coreProxy.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), adapterDescription, deviceTypes); err != nil {
- logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"error": err})
- if retries == count {
- return err
+ gClient, err := a.coreClient.GetCoreServiceClient()
+ if gClient != nil {
+ if _, err = gClient.RegisterAdapter(log.WithSpanFromContext(context.TODO(), ctx), &ic.AdapterRegistration{
+ Adapter: adapterDescription,
+ DTypes: deviceTypes}); err == nil {
+ break
}
- count++
- // Take a nap before retrying
- time.Sleep(2 * time.Second)
- } else {
- break
}
+ logger.Warnw(ctx, "registering-with-core-failed", log.Fields{"endpoint": a.config.CoreEndpoint, "error": err, "count": count, "gclient": gClient})
+ if retries == count {
+ return err
+ }
+ count++
+ // Take a nap before retrying
+ time.Sleep(2 * time.Second)
}
- probe.UpdateStatusFromContext(ctx, "register-with-core", probe.ServiceStatusRunning)
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
logger.Info(ctx, "registered-with-core")
return nil
}
diff --git a/cmd/openolt-adapter/main_test.go b/cmd/openolt-adapter/main_test.go
index 617007d..f33efd0 100644
--- a/cmd/openolt-adapter/main_test.go
+++ b/cmd/openolt-adapter/main_test.go
@@ -17,30 +17,19 @@
import (
"context"
- "errors"
"testing"
- "time"
- conf "github.com/opencord/voltha-lib-go/v6/pkg/config"
-
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-
- "github.com/opencord/voltha-lib-go/v6/pkg/kafka"
+ conf "github.com/opencord/voltha-lib-go/v7/pkg/config"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ mgrpc "github.com/opencord/voltha-lib-go/v7/pkg/mocks/grpc"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
- "github.com/opencord/voltha-openolt-adapter/pkg/mocks"
- ca "github.com/opencord/voltha-protos/v4/go/inter_container"
"go.etcd.io/etcd/pkg/mock/mockserver"
)
func newMockAdapter() *adapter {
- conf := config.NewAdapterFlags()
- conf.KVStoreType = "etcd"
- cp := mocks.MockCoreProxy{}
- ap := mocks.MockAdapterProxy{}
- ad := newAdapter(conf)
- ad.coreProxy = &cp
- ad.adapterProxy = &ap
+ cf := config.NewAdapterFlags()
+ cf.KVStoreType = "etcd"
+ ad := newAdapter(cf)
return ad
}
func Test_adapter_setKVClient(t *testing.T) {
@@ -85,24 +74,27 @@
func Test_registerWithCore(t *testing.T) {
ad := newMockAdapter()
ctx := context.TODO()
- err := ad.registerWithCore(ctx, 1)
+ // Create a and start a mock Core GRPC server
+ ms, err := mgrpc.NewMockGRPCServer(ctx)
+ if err != nil {
+ t.Errorf("grpc server: expected error:nil, got error: %v", err)
+ }
+ ms.AddCoreService(ctx, &vgrpc.MockCoreServiceHandler{})
+ go ms.Start(ctx)
+ defer ms.Stop()
+
+ if ad.coreClient, err = vgrpc.NewClient(ms.ApiEndpoint,
+ ad.coreRestarted); err != nil {
+ t.Errorf("grpc client: expected error:nil, got error: %v", err)
+ }
+ // Start the core grpc client
+ go ad.coreClient.Start(ctx, setAndTestCoreServiceHandler)
+ defer ad.coreClient.Stop(ctx)
+ err = ad.registerWithCore(ctx, coreService, 1)
if err != nil {
t.Errorf("Expected error:nil, got error: %v", err)
}
}
-func Test_startInterContainerProxy(t *testing.T) {
- ad := newMockAdapter()
- kc := &mockKafkaClient{}
- ad.kafkaClient = kc
- ctx := context.TODO()
- icp, err := ad.startInterContainerProxy(ctx, 1)
- if icp != nil {
- t.Log("Intercontainer proxy ", icp)
- }
- if err != nil {
- t.Errorf("err %v", err)
- }
-}
func Test_startOpenOLT(t *testing.T) {
a, _ := mockserver.StartMockServers(1)
@@ -111,8 +103,7 @@
defer a.StopAt(0)
ad := newMockAdapter()
- oolt, err := ad.startOpenOLT(context.TODO(), nil,
- ad.coreProxy, ad.adapterProxy, ad.eventProxy, ad.config, cm)
+ oolt, err := ad.startOpenOLT(context.TODO(), nil, ad.eventProxy, ad.config, cm)
if oolt != nil {
t.Log("Open OLT ", oolt)
}
@@ -136,8 +127,8 @@
wantErr bool
}{
// TODO: Add test cases.
- {"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaAdapterAddress}, false},
- {"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaAdapterAddress}, false},
+ {"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaClusterAddress}, false},
+ {"newKafkaClient", args{clientType: "sarama", address: adapter.config.KafkaClusterAddress}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -150,83 +141,3 @@
})
}
}
-
-func Test_adapter_setupRequestHandler(t *testing.T) {
-
- ad := newMockAdapter()
- cm := &conf.ConfigManager{}
-
- kip := kafka.NewInterContainerProxy(
- kafka.InterContainerAddress(ad.config.KafkaAdapterAddress),
- kafka.MsgClient(&mockKafkaClient{}),
- kafka.DefaultTopic(&kafka.Topic{Name: ad.config.Topic}))
-
- ad.kip = kip
- _ = ad.kip.Start(context.Background())
-
- oolt, _ := ad.startOpenOLT(context.TODO(), nil,
- ad.coreProxy, ad.adapterProxy, ad.eventProxy, ad.config, cm)
- printBanner()
- printVersion()
- ctx := context.TODO()
- if err := ad.setupRequestHandler(ctx, ad.config.InstanceID, oolt); err != nil {
- t.Logf("adapter.setupRequestHandler() error = %v", err)
- }
-
-}
-
-// Kafka client mocker
-type mockKafkaClient struct {
-}
-
-func (kc *mockKafkaClient) Start(ctx context.Context) error {
- return nil
-}
-func (kc *mockKafkaClient) Stop(ctx context.Context) {
-}
-func (kc *mockKafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
- if topic != nil {
- return nil
- }
- return errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
- if topic != nil {
- return nil
- }
- return errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ca.InterContainerMessage, error) {
- if topic != nil {
- ch := make(chan *ca.InterContainerMessage)
- return ch, nil
- }
- return nil, errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ca.InterContainerMessage) error {
- if topic == nil {
- return nil
- }
- return errors.New("invalid Topic")
-}
-func (kc *mockKafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
- if topic != nil {
- return nil
- }
- return errors.New("invalid topic")
-}
-
-func (kc *mockKafkaClient) SendLiveness(ctx context.Context) error {
- return status.Error(codes.Unimplemented, "SendLiveness")
-}
-
-func (kc *mockKafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
- return nil
-}
-
-func (kc *mockKafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
- return nil
-}
-
-func (kc *mockKafkaClient) SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time)) {
-}