[VOL-1024] This update consists of:
1) Inter-container kafka library in Go
2) initial set of proto definitions
3) Test cases for the kafka library
Change-Id: Ibe8b0f673a90bbe4cb92847ce40f31ec2d0b6244
diff --git a/rw_core/main.go b/rw_core/main.go
index faa5851..a0928c5 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -6,6 +6,8 @@
"fmt"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/kafka"
+ ca "github.com/opencord/voltha-go/protos/core_adapter"
"github.com/opencord/voltha-go/rw_core/config"
"os"
"os/signal"
@@ -19,6 +21,9 @@
config *config.RWCoreFlags
halted bool
exitChannel chan int
+ kmp *kafka.KafkaMessagingProxy
+ //For test
+ receiverChannels []<-chan *ca.InterContainerMessage
}
func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
@@ -38,6 +43,7 @@
rwCore.config = cf
rwCore.halted = false
rwCore.exitChannel = make(chan int, 1)
+ rwCore.receiverChannels = make([]<-chan *ca.InterContainerMessage, 0)
return &rwCore
}
@@ -63,6 +69,7 @@
}
}
+
func (core *rwCore) start(ctx context.Context) {
log.Info("Starting RW Core components")
// Setup GRPC Server
@@ -70,13 +77,29 @@
// Setup KV Client
// Setup Kafka messaging services
-
+ var err error
+ if core.kmp, err = kafka.NewKafkaMessagingProxy(
+ kafka.KafkaHost("10.100.198.220"),
+ kafka.KafkaPort(9092),
+ kafka.DefaultTopic(&kafka.Topic{Name: "Adapter"})); err != nil {
+ log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
+ return
+ }
+ // Start the kafka messaging service - synchronous call to ensure
+ if err = core.kmp.Start(); err != nil {
+ log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
+ }
}
func (core *rwCore) stop() {
// Stop leadership tracking
core.halted = true
+ // Stop the Kafka messaging service
+ if core.kmp != nil {
+ core.kmp.Stop()
+ }
+
// send exit signal
core.exitChannel <- 0