This update addresses the following:
1. Decouple the kafka messaging proxy from the kafka client. This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters. This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch. This will be dealt in a separate
update.
Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/rw_core/main.go b/rw_core/main.go
index cd5dbe9..77ce304 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -22,6 +22,7 @@
grpcserver "github.com/opencord/voltha-go/common/grpc"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/kafka"
ca "github.com/opencord/voltha-go/protos/core_adapter"
"github.com/opencord/voltha-go/rw_core/config"
c "github.com/opencord/voltha-go/rw_core/core"
@@ -38,8 +39,9 @@
halted bool
exitChannel chan int
//kmp *kafka.KafkaMessagingProxy
- grpcServer *grpcserver.GrpcServer
- core *c.Core
+ grpcServer *grpcserver.GrpcServer
+ kafkaClient kafka.Client
+ core *c.Core
//For test
receiverChannels []<-chan *ca.InterContainerMessage
}
@@ -60,6 +62,18 @@
return nil, errors.New("unsupported-kv-store")
}
+func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+
+ log.Infow("kafka-client-type", log.Fields{"client": clientType})
+ switch clientType {
+ case "sarama":
+ return kafka.NewSaramaClient(
+ kafka.Host(host),
+ kafka.Port(port)), nil
+ }
+ return nil, errors.New("unsupported-client-type")
+}
+
func newRWCore(cf *config.RWCoreFlags) *rwCore {
var rwCore rwCore
rwCore.config = cf
@@ -92,47 +106,10 @@
}
}
-//func (rw *rwCore) createGRPCService(context.Context) {
-// // create an insecure gserver server
-// rw.grpcServer = grpcserver.NewGrpcServer(rw.config.GrpcHost, rw.config.GrpcPort, nil, false)
-// log.Info("grpc-server-created")
-//}
-
-//func (rw *rwCore) startKafkaMessagingProxy(ctx context.Context) error {
-// log.Infow("starting-kafka-messaging-proxy", log.Fields{"host":rw.config.KafkaAdapterHost,
-// "port":rw.config.KafkaAdapterPort, "topic":rw.config.CoreTopic})
-// var err error
-// if rw.kmp, err = kafka.NewKafkaMessagingProxy(
-// kafka.KafkaHost(rw.config.KafkaAdapterHost),
-// kafka.KafkaPort(rw.config.KafkaAdapterPort),
-// kafka.DefaultTopic(&kafka.Topic{Name: rw.config.CoreTopic})); err != nil {
-// log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
-// return err
-// }
-// if err = rw.kmp.Start(); err != nil {
-// log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
-// return err
-// }
-//
-// requestProxy := &c.RequestHandlerProxy{}
-// rw.kmp.SubscribeWithTarget(kafka.Topic{Name: rw.config.CoreTopic}, requestProxy)
-//
-// log.Info("started-kafka-messaging-proxy")
-// return nil
-//}
-
func (rw *rwCore) start(ctx context.Context) {
log.Info("Starting RW Core components")
- //// Setup GRPC Server
- //rw.createGRPCService(ctx)
-
- //// Setup Kafka messaging services
- //if err := rw.startKafkaMessagingProxy(ctx); err != nil {
- // log.Fatalw("failed-to-start-kafka-proxy", log.Fields{"err":err})
- //}
-
- // Setup KV Client
+ // Setup KV MsgClient
log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
err := rw.setKVClient()
if err == nil {
@@ -144,8 +121,12 @@
rw.config.KVTxnKeyDelTime)
}
+ if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort); err != nil {
+ log.Fatal("Unsupported-kafka-client")
+ }
+
// Create the core service
- rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient)
+ rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient, rw.kafkaClient)
// start the core
rw.core.Start(ctx)
@@ -155,11 +136,6 @@
// Stop leadership tracking
rw.halted = true
- //// Stop the Kafka messaging service
- //if rw.kmp != nil {
- // rw.kmp.Stop()
- //}
-
// send exit signal
rw.exitChannel <- 0
@@ -172,6 +148,12 @@
// Close the DB connection
rw.kvClient.Close()
}
+
+ rw.core.Stop(nil)
+
+ //if rw.kafkaClient != nil {
+ // rw.kafkaClient.Stop()
+ //}
}
func waitForExit() int {