VOL-2180 code changes for context addition
Integrating InterContainerProxy interface changes
Change-Id: Ia20c5ac3093b7845acf80cce801ec0c1d90c125f
diff --git a/main.go b/main.go
index 6b07362..94f4d19 100644
--- a/main.go
+++ b/main.go
@@ -48,7 +48,7 @@
iAdapter adapters.IAdapter
kafkaClient kafka.Client
kvClient kvstore.Client
- kip *kafka.InterContainerProxy
+ kip kafka.InterContainerProxy
coreProxy adapterif.CoreProxy
adapterProxy adapterif.AdapterProxy
eventProxy adapterif.EventProxy
@@ -185,7 +185,7 @@
case <-timeoutTimer.C:
// Check the status of the kv-store
log.Info("kv-store liveliness-recheck")
- if a.kvClient.IsConnectionUp(a.config.KVStoreTimeout) {
+ if a.kvClient.IsConnectionUp(ctx) {
kvStoreChannel <- true
} else {
kvStoreChannel <- false
@@ -238,7 +238,7 @@
}
}
-func (a *adapter) stop() {
+func (a *adapter) stop(ctx context.Context) {
// Stop leadership tracking
a.halted = true
@@ -248,7 +248,7 @@
// Cleanup - applies only if we had a kvClient
if a.kvClient != nil {
// Release all reservations
- if err := a.kvClient.ReleaseAllReservations(); err != nil {
+ if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
}
// Close the DB connection
@@ -300,19 +300,15 @@
return nil
}
-func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (*kafka.InterContainerProxy, error) {
+func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
"port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
var err error
- var kip *kafka.InterContainerProxy
- if kip, err = kafka.NewInterContainerProxy(
+ kip := kafka.NewInterContainerProxy(
kafka.InterContainerHost(a.config.KafkaAdapterHost),
kafka.InterContainerPort(a.config.KafkaAdapterPort),
kafka.MsgClient(a.kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
- log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
- return nil, err
- }
+ kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
count := 0
for {
if err = kip.Start(); err != nil {
@@ -332,7 +328,7 @@
return kip, nil
}
-func (a *adapter) startOpenOLT(ctx context.Context, kip *kafka.InterContainerProxy,
+func (a *adapter) startOpenOLT(ctx context.Context, kip kafka.InterContainerProxy,
cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
cfg *config.AdapterFlags) (*ac.OpenOLT, error) {
log.Info("starting-open-olt")
@@ -489,7 +485,7 @@
log.Infow("received-a-closing-signal", log.Fields{"code": code})
// Cleanup before leaving
- ad.stop()
+ ad.stop(ctx)
elapsed := time.Since(start)
log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})