VOL-2180 code changes for context addition
Integrating InterContainerProxy interface changes

Change-Id: Ia20c5ac3093b7845acf80cce801ec0c1d90c125f
diff --git a/main.go b/main.go
index 6b07362..94f4d19 100644
--- a/main.go
+++ b/main.go
@@ -48,7 +48,7 @@
 	iAdapter         adapters.IAdapter
 	kafkaClient      kafka.Client
 	kvClient         kvstore.Client
-	kip              *kafka.InterContainerProxy
+	kip              kafka.InterContainerProxy
 	coreProxy        adapterif.CoreProxy
 	adapterProxy     adapterif.AdapterProxy
 	eventProxy       adapterif.EventProxy
@@ -185,7 +185,7 @@
 		case <-timeoutTimer.C:
 			// Check the status of the kv-store
 			log.Info("kv-store liveliness-recheck")
-			if a.kvClient.IsConnectionUp(a.config.KVStoreTimeout) {
+			if a.kvClient.IsConnectionUp(ctx) {
 				kvStoreChannel <- true
 			} else {
 				kvStoreChannel <- false
@@ -238,7 +238,7 @@
 	}
 }
 
-func (a *adapter) stop() {
+func (a *adapter) stop(ctx context.Context) {
 	// Stop leadership tracking
 	a.halted = true
 
@@ -248,7 +248,7 @@
 	// Cleanup - applies only if we had a kvClient
 	if a.kvClient != nil {
 		// Release all reservations
-		if err := a.kvClient.ReleaseAllReservations(); err != nil {
+		if err := a.kvClient.ReleaseAllReservations(ctx); err != nil {
 			log.Infow("fail-to-release-all-reservations", log.Fields{"error": err})
 		}
 		// Close the DB connection
@@ -300,19 +300,15 @@
 	return nil
 }
 
-func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (*kafka.InterContainerProxy, error) {
+func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
 	log.Infow("starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
 		"port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
 	var err error
-	var kip *kafka.InterContainerProxy
-	if kip, err = kafka.NewInterContainerProxy(
+	kip := kafka.NewInterContainerProxy(
 		kafka.InterContainerHost(a.config.KafkaAdapterHost),
 		kafka.InterContainerPort(a.config.KafkaAdapterPort),
 		kafka.MsgClient(a.kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic})); err != nil {
-		log.Errorw("fail-to-create-common-proxy", log.Fields{"error": err})
-		return nil, err
-	}
+		kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
 	count := 0
 	for {
 		if err = kip.Start(); err != nil {
@@ -332,7 +328,7 @@
 	return kip, nil
 }
 
-func (a *adapter) startOpenOLT(ctx context.Context, kip *kafka.InterContainerProxy,
+func (a *adapter) startOpenOLT(ctx context.Context, kip kafka.InterContainerProxy,
 	cp adapterif.CoreProxy, ap adapterif.AdapterProxy, ep adapterif.EventProxy,
 	cfg *config.AdapterFlags) (*ac.OpenOLT, error) {
 	log.Info("starting-open-olt")
@@ -489,7 +485,7 @@
 	log.Infow("received-a-closing-signal", log.Fields{"code": code})
 
 	// Cleanup before leaving
-	ad.stop()
+	ad.stop(ctx)
 
 	elapsed := time.Since(start)
 	log.Infow("run-time", log.Fields{"instanceId": ad.config.InstanceID, "time": elapsed / time.Second})