diff --git a/cmd/openolt-adapter/main_test.go b/cmd/openolt-adapter/main_test.go
index bd5c81a..f084b5e 100644
--- a/cmd/openolt-adapter/main_test.go
+++ b/cmd/openolt-adapter/main_test.go
@@ -61,7 +61,7 @@
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			if err := tt.adapter.setKVClient(); (err != nil) != tt.wantErr {
+			if err := tt.adapter.setKVClient(context.Background()); (err != nil) != tt.wantErr {
 				t.Errorf("adapter.setKVClient() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
@@ -74,7 +74,7 @@
 	a.StartAt(0)
 	defer a.StopAt(0)
 
-	if err := adapt.setKVClient(); err != nil {
+	if err := adapt.setKVClient(context.Background()); err != nil {
 		t.Errorf("adapter.setKVClient() error = %v", err)
 	}
 }
@@ -137,7 +137,7 @@
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			_, err := newKafkaClient(tt.args.clientType, tt.args.address)
+			_, err := newKafkaClient(context.Background(), tt.args.clientType, tt.args.address)
 			if (err != nil) != tt.wantErr {
 				t.Errorf("newKafkaClient() error = %v, wantErr %v", err, tt.wantErr)
 				return
@@ -157,7 +157,7 @@
 		kafka.DefaultTopic(&kafka.Topic{Name: ad.config.Topic}))
 
 	ad.kip = kip
-	ad.kip.Start()
+	ad.kip.Start(context.Background())
 
 	oolt, _ := ad.startOpenOLT(context.TODO(), nil,
 		ad.coreProxy, ad.adapterProxy, ad.eventProxy, ad.config)
@@ -174,55 +174,55 @@
 type mockKafkaClient struct {
 }
 
-func (kc *mockKafkaClient) Start() error {
+func (kc *mockKafkaClient) Start(ctx context.Context) error {
 	return nil
 }
-func (kc *mockKafkaClient) Stop() {
+func (kc *mockKafkaClient) Stop(ctx context.Context) {
 }
-func (kc *mockKafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
+func (kc *mockKafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
 	if topic != nil {
 		return nil
 	}
 	return errors.New("invalid Topic")
 }
-func (kc *mockKafkaClient) DeleteTopic(topic *kafka.Topic) error {
+func (kc *mockKafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
 	if topic != nil {
 		return nil
 	}
 	return errors.New("invalid Topic")
 }
-func (kc *mockKafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ca.InterContainerMessage, error) {
+func (kc *mockKafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ca.InterContainerMessage, error) {
 	if topic != nil {
 		ch := make(chan *ca.InterContainerMessage)
 		return ch, nil
 	}
 	return nil, errors.New("invalid Topic")
 }
-func (kc *mockKafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ca.InterContainerMessage) error {
+func (kc *mockKafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ca.InterContainerMessage) error {
 	if topic == nil {
 		return nil
 	}
 	return errors.New("invalid Topic")
 }
-func (kc *mockKafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
+func (kc *mockKafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
 	if topic != nil {
 		return nil
 	}
 	return errors.New("invalid topic")
 }
 
-func (kc *mockKafkaClient) SendLiveness() error {
+func (kc *mockKafkaClient) SendLiveness(ctx context.Context) error {
 	return status.Error(codes.Unimplemented, "SendLiveness")
 }
 
-func (kc *mockKafkaClient) EnableLivenessChannel(enable bool) chan bool {
+func (kc *mockKafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
 	return nil
 }
 
-func (kc *mockKafkaClient) EnableHealthinessChannel(enable bool) chan bool {
+func (kc *mockKafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
 	return nil
 }
 
-func (kc *mockKafkaClient) SubscribeForMetadata(func(fromTopic string, timestamp time.Time)) {
+func (kc *mockKafkaClient) SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time)) {
 	return
 }
