general update for library compatibility, [VOL-3202], some TechProf processing restructuring
Signed-off-by: mpagenko <michael.pagenkopf@adtran.com>
Change-Id: I451c663fea4dc3ea5acd141069e64a1ad2a68d58
diff --git a/cmd/openonu-adapter/main.go b/cmd/openonu-adapter/main.go
index 86e2f2a..b472c2e 100644
--- a/cmd/openonu-adapter/main.go
+++ b/cmd/openonu-adapter/main.go
@@ -99,7 +99,9 @@
}
// Setup Log Config
- cm := conf.NewConfigManager(a.kvClient, a.config.KVStoreType, a.config.KVStoreHost, a.config.KVStorePort, a.config.KVStoreTimeout)
+ /* address config update acc. to [VOL-2736] */
+ addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
+ cm := conf.NewConfigManager(a.kvClient, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
go conf.StartLogLevelConfigProcessing(cm, ctx)
// Setup Kafka Client
@@ -173,8 +175,7 @@
// #############################################
// Adapter Utility methods ##### begin #########
-func newKVClient(storeType, address string, timeout int) (kvstore.Client, error) {
-
+func newKVClient(storeType, address string, timeout time.Duration) (kvstore.Client, error) {
logger.Infow("kv-store-type", log.Fields{"store": storeType})
switch storeType {
case "consul":
@@ -188,11 +189,13 @@
func newKafkaClient(clientType, host string, port int) (kafka.Client, error) {
logger.Infow("common-client-type", log.Fields{"client": clientType})
+ /* address config update acc. to [VOL-2736] */
+ addr := host + ":" + strconv.Itoa(port)
+
switch clientType {
case "sarama":
return kafka.NewSaramaClient(
- kafka.Host(host),
- kafka.Port(port),
+ kafka.Address(addr),
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
kafka.ProducerMaxRetries(6),
@@ -219,9 +222,10 @@
logger.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
"port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
var err error
+ /* address config update acc. to [VOL-2736] */
+ addr := a.config.KafkaAdapterHost + ":" + strconv.Itoa(a.config.KafkaAdapterPort)
kip := kafka.NewInterContainerProxy(
- kafka.InterContainerHost(a.config.KafkaAdapterHost),
- kafka.InterContainerPort(a.config.KafkaAdapterPort),
+ kafka.InterContainerAddress(addr),
kafka.MsgClient(a.kafkaClient),
kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
count := 0
@@ -247,7 +251,7 @@
cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
cfg *config.AdapterFlags) (*ac.OpenONUAC, error) {
var err error
- sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, cfg)
+ sAcONU := ac.NewOpenONUAC(ctx, a.kip, cp, ap, ep, a.kvClient, cfg)
if err = sAcONU.Start(ctx); err != nil {
logger.Fatalw("error-starting-OpenOnuAdapterCore", log.Fields{"error": err})