Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies
Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/cmd/bbf-adapter/main.go b/cmd/bbf-adapter/main.go
index b5ac693..d3fce99 100644
--- a/cmd/bbf-adapter/main.go
+++ b/cmd/bbf-adapter/main.go
@@ -44,6 +44,7 @@
volthaNbiClient *clients.VolthaNbiClient
oltAppClient *clients.OltAppClient
sysrepoPlugin *sysrepo.SysrepoPlugin
+ kafkaConsumer *clients.KafkaConsumer
}
func newBbfAdapter(conf *config.BBFAdapterConfig) *bbfAdapter {
@@ -82,6 +83,14 @@
probe.UpdateStatusFromContext(ctx, sysrepoService, probe.ServiceStatusRunning)
}
+ //Set up the Kafka consumer
+ a.kafkaConsumer = clients.NewKafkaConsumer(a.conf.KafkaClusterAddress)
+ if err := a.kafkaConsumer.Start(ctx, a.sysrepoPlugin.ManageVolthaEvent); err != nil {
+ logger.Fatalw(ctx, "failed-to-start-kafka-consumer", log.Fields{"err": err})
+ } else {
+ probe.UpdateStatusFromContext(ctx, a.conf.KafkaClusterAddress, probe.ServiceStatusRunning)
+ }
+
//Set the service as running, making the adapter finally ready
probe.UpdateStatusFromContext(ctx, bbfAdapterService, probe.ServiceStatusRunning)
logger.Info(ctx, "bbf-adapter-ready")
@@ -91,6 +100,10 @@
func (a *bbfAdapter) cleanup(ctx context.Context) {
core.AdapterInstance = nil
+ if err := a.kafkaConsumer.Stop(); err != nil {
+ logger.Errorw(ctx, "failed-to-stop-kafka-consumer", log.Fields{"err": err})
+ }
+
a.volthaNbiClient.Close(ctx)
err := a.sysrepoPlugin.Stop(ctx)
@@ -201,6 +214,7 @@
bbfAdapterService,
conf.VolthaNbiEndpoint,
conf.OnosRestEndpoint,
+ conf.KafkaClusterAddress,
sysrepoService,
)