Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies

Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/cmd/bbf-adapter/main.go b/cmd/bbf-adapter/main.go
index b5ac693..d3fce99 100644
--- a/cmd/bbf-adapter/main.go
+++ b/cmd/bbf-adapter/main.go
@@ -44,6 +44,7 @@
 	volthaNbiClient *clients.VolthaNbiClient
 	oltAppClient    *clients.OltAppClient
 	sysrepoPlugin   *sysrepo.SysrepoPlugin
+	kafkaConsumer   *clients.KafkaConsumer
 }
 
 func newBbfAdapter(conf *config.BBFAdapterConfig) *bbfAdapter {
@@ -82,6 +83,14 @@
 		probe.UpdateStatusFromContext(ctx, sysrepoService, probe.ServiceStatusRunning)
 	}
 
+	//Set up the Kafka consumer
+	a.kafkaConsumer = clients.NewKafkaConsumer(a.conf.KafkaClusterAddress)
+	if err := a.kafkaConsumer.Start(ctx, a.sysrepoPlugin.ManageVolthaEvent); err != nil {
+		logger.Fatalw(ctx, "failed-to-start-kafka-consumer", log.Fields{"err": err})
+	} else {
+		probe.UpdateStatusFromContext(ctx, a.conf.KafkaClusterAddress, probe.ServiceStatusRunning)
+	}
+
 	//Set the service as running, making the adapter finally ready
 	probe.UpdateStatusFromContext(ctx, bbfAdapterService, probe.ServiceStatusRunning)
 	logger.Info(ctx, "bbf-adapter-ready")
@@ -91,6 +100,10 @@
 func (a *bbfAdapter) cleanup(ctx context.Context) {
 	core.AdapterInstance = nil
 
+	if err := a.kafkaConsumer.Stop(); err != nil {
+		logger.Errorw(ctx, "failed-to-stop-kafka-consumer", log.Fields{"err": err})
+	}
+
 	a.volthaNbiClient.Close(ctx)
 
 	err := a.sysrepoPlugin.Stop(ctx)
@@ -201,6 +214,7 @@
 		bbfAdapterService,
 		conf.VolthaNbiEndpoint,
 		conf.OnosRestEndpoint,
+		conf.KafkaClusterAddress,
 		sysrepoService,
 	)