[VOL-3069]Pass Context in methods which are performing logging and need the context

Change-Id: Ie84f9e240aa4f47d0046acaac0d82d21b17252e5
diff --git a/internal/pkg/core/openolt_eventmgr.go b/internal/pkg/core/openolt_eventmgr.go
index a11d3f7..76d1ca7 100644
--- a/internal/pkg/core/openolt_eventmgr.go
+++ b/internal/pkg/core/openolt_eventmgr.go
@@ -18,7 +18,7 @@
 package core
 
 import (
-	ctx "context"
+	"context"
 	"errors"
 	"fmt"
 	"strconv"
@@ -103,60 +103,60 @@
 
 // ProcessEvents is function to process and publish OpenOLT event
 // nolint: gocyclo
-func (em *OpenOltEventMgr) ProcessEvents(alarmInd *oop.AlarmIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) ProcessEvents(ctx context.Context, alarmInd *oop.AlarmIndication, deviceID string, raisedTs int64) error {
 	var err error
 	switch alarmInd.Data.(type) {
 	case *oop.AlarmIndication_LosInd:
-		logger.Debugw("received-los-indication", log.Fields{"alarm-ind": alarmInd})
-		err = em.oltLosIndication(alarmInd.GetLosInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-los-indication", log.Fields{"alarm-ind": alarmInd})
+		err = em.oltLosIndication(ctx, alarmInd.GetLosInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuAlarmInd:
-		logger.Debugw("received-onu-alarm-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuAlarmIndication(alarmInd.GetOnuAlarmInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-alarm-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuAlarmIndication(ctx, alarmInd.GetOnuAlarmInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_DyingGaspInd:
-		logger.Debugw("received-dying-gasp-indication", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuDyingGaspIndication(alarmInd.GetDyingGaspInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-dying-gasp-indication", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuDyingGaspIndication(ctx, alarmInd.GetDyingGaspInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuActivationFailInd:
-		logger.Debugw("received-onu-activation-fail-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuActivationFailIndication(alarmInd.GetOnuActivationFailInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-activation-fail-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuActivationFailIndication(ctx, alarmInd.GetOnuActivationFailInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuLossOmciInd:
-		logger.Debugw("received-onu-loss-omci-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuLossOmciIndication(alarmInd.GetOnuLossOmciInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-loss-omci-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuLossOmciIndication(ctx, alarmInd.GetOnuLossOmciInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuDriftOfWindowInd:
-		logger.Debugw("received-onu-drift-of-window-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuDriftOfWindowIndication(alarmInd.GetOnuDriftOfWindowInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-drift-of-window-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuDriftOfWindowIndication(ctx, alarmInd.GetOnuDriftOfWindowInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuSignalDegradeInd:
-		logger.Debugw("received-onu-signal-degrade-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuSignalDegradeIndication(alarmInd.GetOnuSignalDegradeInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-signal-degrade-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuSignalDegradeIndication(ctx, alarmInd.GetOnuSignalDegradeInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuSignalsFailInd:
-		logger.Debugw("received-onu-signal-fail-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuSignalsFailIndication(alarmInd.GetOnuSignalsFailInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-signal-fail-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuSignalsFailIndication(ctx, alarmInd.GetOnuSignalsFailInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuStartupFailInd:
-		logger.Debugw("received-onu-startup-fail-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuStartupFailedIndication(alarmInd.GetOnuStartupFailInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-startup-fail-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuStartupFailedIndication(ctx, alarmInd.GetOnuStartupFailInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuTiwiInd:
-		logger.Debugw("received-onu-transmission-warning-indication ", log.Fields{"alarm-ind": alarmInd})
-		logger.Warnw("not-implemented-yet", log.Fields{"alarm-ind": "Onu-Transmission-indication"})
+		logger.Debugw(ctx, "received-onu-transmission-warning-indication ", log.Fields{"alarm-ind": alarmInd})
+		logger.Warnw(ctx, "not-implemented-yet", log.Fields{"alarm-ind": "Onu-Transmission-indication"})
 	case *oop.AlarmIndication_OnuLossOfSyncFailInd:
-		logger.Debugw("received-onu-loss-of-sync-fail-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuLossOfSyncIndication(alarmInd.GetOnuLossOfSyncFailInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-loss-of-sync-fail-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuLossOfSyncIndication(ctx, alarmInd.GetOnuLossOfSyncFailInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuItuPonStatsInd:
-		logger.Debugw("received-onu-itu-pon-stats-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuItuPonStatsIndication(alarmInd.GetOnuItuPonStatsInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-itu-pon-stats-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuItuPonStatsIndication(ctx, alarmInd.GetOnuItuPonStatsInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuDeactivationFailureInd:
-		logger.Debugw("received-onu-deactivation-failure-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuDeactivationFailureIndication(alarmInd.GetOnuDeactivationFailureInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-deactivation-failure-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuDeactivationFailureIndication(ctx, alarmInd.GetOnuDeactivationFailureInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuLossGemDelineationInd:
-		logger.Debugw("received-onu-loss-of-gem-channel-delineation-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuLossOfGEMChannelDelineationIndication(alarmInd.GetOnuLossGemDelineationInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-loss-of-gem-channel-delineation-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuLossOfGEMChannelDelineationIndication(ctx, alarmInd.GetOnuLossGemDelineationInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuPhysicalEquipmentErrorInd:
-		logger.Debugw("received-onu-physical-equipment-error-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuPhysicalEquipmentErrorIndication(alarmInd.GetOnuPhysicalEquipmentErrorInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-physical-equipment-error-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuPhysicalEquipmentErrorIndication(ctx, alarmInd.GetOnuPhysicalEquipmentErrorInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuLossOfAckInd:
-		logger.Debugw("received-onu-loss-of-acknowledgement-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuLossOfAcknowledgementIndication(alarmInd.GetOnuLossOfAckInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-loss-of-acknowledgement-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuLossOfAcknowledgementIndication(ctx, alarmInd.GetOnuLossOfAckInd(), deviceID, raisedTs)
 	case *oop.AlarmIndication_OnuDiffReachExceededInd:
-		logger.Debugw("received-onu-differential-reach-exceeded-indication ", log.Fields{"alarm-ind": alarmInd})
-		err = em.onuDifferentialReachExceededIndication(alarmInd.GetOnuDiffReachExceededInd(), deviceID, raisedTs)
+		logger.Debugw(ctx, "received-onu-differential-reach-exceeded-indication ", log.Fields{"alarm-ind": alarmInd})
+		err = em.onuDifferentialReachExceededIndication(ctx, alarmInd.GetOnuDiffReachExceededInd(), deviceID, raisedTs)
 	default:
 		err = olterrors.NewErrInvalidValue(log.Fields{"indication-type": alarmInd}, nil)
 	}
@@ -167,7 +167,7 @@
 }
 
 // oltUpDownIndication handles Up and Down state of an OLT
-func (em *OpenOltEventMgr) oltUpDownIndication(oltIndication *oop.OltIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) oltUpDownIndication(ctx context.Context, oltIndication *oop.OltIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
@@ -181,15 +181,15 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", oltIndicationDown, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, communication, olt, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, communication, olt, raisedTs); err != nil {
 		return olterrors.NewErrCommunication("send-olt-event", log.Fields{"device-id": deviceID}, err)
 	}
-	logger.Debugw("olt-updown-event-sent-to-kafka", log.Fields{})
+	logger.Debugw(ctx, "olt-updown-event-sent-to-kafka", log.Fields{})
 	return nil
 }
 
 // OnuDiscoveryIndication is an exported method to handle ONU discovery event
-func (em *OpenOltEventMgr) OnuDiscoveryIndication(onuDisc *oop.OnuDiscIndication, oltDeviceID string, onuDeviceID string, OnuID uint32, serialNumber string, raisedTs int64) error {
+func (em *OpenOltEventMgr) OnuDiscoveryIndication(ctx context.Context, onuDisc *oop.OnuDiscIndication, oltDeviceID string, onuDeviceID string, OnuID uint32, serialNumber string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
@@ -202,20 +202,20 @@
 	de.ResourceId = oltDeviceID
 	de.DeviceEventName = fmt.Sprintf("%s_%s", onuDiscoveryEvent, "RAISE_EVENT")
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, equipment, pon, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, equipment, pon, raisedTs); err != nil {
 		return olterrors.NewErrCommunication("send-onu-discovery-event",
 			log.Fields{
 				"serial-number": serialNumber,
 				"intf-id":       onuDisc.IntfId}, err)
 	}
-	logger.Debugw("onu-discovery-event-sent-to-kafka",
+	logger.Debugw(ctx, "onu-discovery-event-sent-to-kafka",
 		log.Fields{
 			"serial-number": serialNumber,
 			"intf-id":       onuDisc.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) oltLosIndication(oltLos *oop.LosIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) oltLosIndication(ctx context.Context, oltLos *oop.LosIndication, deviceID string, raisedTs int64) error {
 	var err error = nil
 	var de voltha.DeviceEvent
 	var alarmInd oop.OnuAlarmIndication
@@ -239,7 +239,7 @@
 				alarmInd.IntfId = ponIntdID
 				alarmInd.OnuId = onuInCache.(*OnuDevice).onuID
 				alarmInd.LosStatus = statusCheckOn
-				err = em.onuAlarmIndication(&alarmInd, deviceID, raisedTs)
+				err = em.onuAlarmIndication(ctx, &alarmInd, deviceID, raisedTs)
 			}
 			return true
 		})
@@ -251,14 +251,14 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", oltLosEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, communication, olt, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, communication, olt, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("olt-los-event-sent-to-kafka", log.Fields{"intf-id": oltLos.IntfId})
+	logger.Debugw(ctx, "olt-los-event-sent-to-kafka", log.Fields{"intf-id": oltLos.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuDyingGaspIndication(dgi *oop.DyingGaspIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuDyingGaspIndication(ctx context.Context, dgi *oop.DyingGaspIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	var serialNumber string
 	context := make(map[string]string)
@@ -276,22 +276,22 @@
 	de.ResourceId = deviceID
 	de.DeviceEventName = fmt.Sprintf("%s_%s", onuDyingGaspEvent, "EVENT")
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, communication, pon, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-dying-gasp-event-sent-to-kafka", log.Fields{"intf-id": dgi.IntfId})
+	logger.Debugw(ctx, "onu-dying-gasp-event-sent-to-kafka", log.Fields{"intf-id": dgi.IntfId})
 	return nil
 }
 
 //wasLosRaised checks whether los raised already. If already raised returns true else false
-func (em *OpenOltEventMgr) wasLosRaised(onuAlarm *oop.OnuAlarmIndication) bool {
+func (em *OpenOltEventMgr) wasLosRaised(ctx context.Context, onuAlarm *oop.OnuAlarmIndication) bool {
 	onuKey := em.handler.formOnuKey(onuAlarm.IntfId, onuAlarm.OnuId)
 	if onuInCache, ok := em.handler.onus.Load(onuKey); ok {
-		logger.Debugw("onu-device-found-in-cache.", log.Fields{"intfID": onuAlarm.IntfId, "onuID": onuAlarm.OnuId})
+		logger.Debugw(ctx, "onu-device-found-in-cache.", log.Fields{"intfID": onuAlarm.IntfId, "onuID": onuAlarm.OnuId})
 
 		if onuAlarm.LosStatus == statusCheckOn {
 			if onuInCache.(*OnuDevice).losRaised {
-				logger.Warnw("onu-los-raised-already", log.Fields{"onu_id": onuAlarm.OnuId,
+				logger.Warnw(ctx, "onu-los-raised-already", log.Fields{"onu_id": onuAlarm.OnuId,
 					"intf_id": onuAlarm.IntfId, "LosStatus": onuAlarm.LosStatus})
 				return true
 			}
@@ -302,14 +302,14 @@
 }
 
 //wasLosCleared checks whether los cleared already. If already cleared returns true else false
-func (em *OpenOltEventMgr) wasLosCleared(onuAlarm *oop.OnuAlarmIndication) bool {
+func (em *OpenOltEventMgr) wasLosCleared(ctx context.Context, onuAlarm *oop.OnuAlarmIndication) bool {
 	onuKey := em.handler.formOnuKey(onuAlarm.IntfId, onuAlarm.OnuId)
 	if onuInCache, ok := em.handler.onus.Load(onuKey); ok {
-		logger.Debugw("onu-device-found-in-cache.", log.Fields{"intfID": onuAlarm.IntfId, "onuID": onuAlarm.OnuId})
+		logger.Debugw(ctx, "onu-device-found-in-cache.", log.Fields{"intfID": onuAlarm.IntfId, "onuID": onuAlarm.OnuId})
 
 		if onuAlarm.LosStatus == statusCheckOff {
 			if !onuInCache.(*OnuDevice).losRaised {
-				logger.Warnw("onu-los-cleared-already", log.Fields{"onu_id": onuAlarm.OnuId,
+				logger.Warnw(ctx, "onu-los-cleared-already", log.Fields{"onu_id": onuAlarm.OnuId,
 					"intf_id": onuAlarm.IntfId, "LosStatus": onuAlarm.LosStatus})
 				return true
 			}
@@ -349,7 +349,7 @@
 	return deviceEventName
 }
 
-func (em *OpenOltEventMgr) onuAlarmIndication(onuAlarm *oop.OnuAlarmIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuAlarmIndication(ctx context.Context, onuAlarm *oop.OnuAlarmIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	var serialNumber string
 	context := make(map[string]string)
@@ -370,7 +370,7 @@
 
 	switch onuAlarm.LosStatus {
 	case statusCheckOn:
-		if em.wasLosRaised(onuAlarm) {
+		if em.wasLosRaised(ctx, onuAlarm) {
 			/* No need to raise Onu Los Event as it might have already raised
 			   or Onu might have deleted */
 			return nil
@@ -383,7 +383,7 @@
 				onuInCache.(*OnuDevice).proxyDeviceID, true))
 		}
 	case statusCheckOff:
-		if em.wasLosCleared(onuAlarm) {
+		if em.wasLosCleared(ctx, onuAlarm) {
 			/* No need to clear Onu Los Event as it might have already cleared
 			   or Onu might have deleted */
 			return nil
@@ -398,14 +398,14 @@
 	}
 
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, communication, onu, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, communication, onu, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-los-event-sent-to-kafka", log.Fields{"onu-id": onuAlarm.OnuId, "intf-id": onuAlarm.IntfId})
+	logger.Debugw(ctx, "onu-los-event-sent-to-kafka", log.Fields{"onu-id": onuAlarm.OnuId, "intf-id": onuAlarm.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuActivationFailIndication(oaf *oop.OnuActivationFailureIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuActivationFailIndication(ctx context.Context, oaf *oop.OnuActivationFailureIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
@@ -417,14 +417,14 @@
 	de.ResourceId = deviceID
 	de.DeviceEventName = fmt.Sprintf("%s_%s", onuActivationFailEvent, "RAISE_EVENT")
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, equipment, pon, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, equipment, pon, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-activation-failure-event-sent-to-kafka", log.Fields{"onu-id": oaf.OnuId, "intf-id": oaf.IntfId})
+	logger.Debugw(ctx, "onu-activation-failure-event-sent-to-kafka", log.Fields{"onu-id": oaf.OnuId, "intf-id": oaf.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuLossOmciIndication(onuLossOmci *oop.OnuLossOfOmciChannelIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuLossOmciIndication(ctx context.Context, onuLossOmci *oop.OnuLossOfOmciChannelIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
@@ -439,14 +439,14 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOmciEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, communication, pon, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-loss-of-omci-channel-event-sent-to-kafka", log.Fields{"onu-id": onuLossOmci.OnuId, "intf-id": onuLossOmci.IntfId})
+	logger.Debugw(ctx, "onu-loss-of-omci-channel-event-sent-to-kafka", log.Fields{"onu-id": onuLossOmci.OnuId, "intf-id": onuLossOmci.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuDriftOfWindowIndication(onuDriftWindow *oop.OnuDriftOfWindowIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuDriftOfWindowIndication(ctx context.Context, onuDriftWindow *oop.OnuDriftOfWindowIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
@@ -463,14 +463,14 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuDriftOfWindowEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, communication, pon, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-drift-of-window-event-sent-to-kafka", log.Fields{"onu-id": onuDriftWindow.OnuId, "intf-id": onuDriftWindow.IntfId})
+	logger.Debugw(ctx, "onu-drift-of-window-event-sent-to-kafka", log.Fields{"onu-id": onuDriftWindow.OnuId, "intf-id": onuDriftWindow.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuSignalDegradeIndication(onuSignalDegrade *oop.OnuSignalDegradeIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuSignalDegradeIndication(ctx context.Context, onuSignalDegrade *oop.OnuSignalDegradeIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
@@ -486,14 +486,14 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuSignalDegradeEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, communication, pon, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-signal-degrade-event-sent-to-kafka", log.Fields{"onu-id": onuSignalDegrade.OnuId, "intf-id": onuSignalDegrade.IntfId})
+	logger.Debugw(ctx, "onu-signal-degrade-event-sent-to-kafka", log.Fields{"onu-id": onuSignalDegrade.OnuId, "intf-id": onuSignalDegrade.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuSignalsFailIndication(onuSignalsFail *oop.OnuSignalsFailureIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuSignalsFailIndication(ctx context.Context, onuSignalsFail *oop.OnuSignalsFailureIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
@@ -509,14 +509,14 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuSignalsFailEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, communication, pon, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-signals-fail-event-sent-to-kafka", log.Fields{"onu-id": onuSignalsFail.OnuId, "intf-id": onuSignalsFail.IntfId})
+	logger.Debugw(ctx, "onu-signals-fail-event-sent-to-kafka", log.Fields{"onu-id": onuSignalsFail.OnuId, "intf-id": onuSignalsFail.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuStartupFailedIndication(onuStartupFail *oop.OnuStartupFailureIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuStartupFailedIndication(ctx context.Context, onuStartupFail *oop.OnuStartupFailureIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
@@ -532,14 +532,14 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuStartupFailEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, communication, pon, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, communication, pon, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-startup-fail-event-sent-to-kafka", log.Fields{"onu-id": onuStartupFail.OnuId, "intf-id": onuStartupFail.IntfId})
+	logger.Debugw(ctx, "onu-startup-fail-event-sent-to-kafka", log.Fields{"onu-id": onuStartupFail.OnuId, "intf-id": onuStartupFail.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuLossOfSyncIndication(onuLOKI *oop.OnuLossOfKeySyncFailureIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuLossOfSyncIndication(ctx context.Context, onuLOKI *oop.OnuLossOfKeySyncFailureIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
@@ -555,27 +555,27 @@
 	}
 
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, security, onu, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, security, onu, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-loss-of-key-sync-event-sent-to-kafka", log.Fields{"onu-id": onuLOKI.OnuId, "intf-id": onuLOKI.IntfId})
+	logger.Debugw(ctx, "onu-loss-of-key-sync-event-sent-to-kafka", log.Fields{"onu-id": onuLOKI.OnuId, "intf-id": onuLOKI.IntfId})
 	return nil
 }
 
 // oltIntfOperIndication handles Up and Down state of an OLT PON ports
-func (em *OpenOltEventMgr) oltIntfOperIndication(ifindication *oop.IntfOperIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) oltIntfOperIndication(ctx context.Context, ifindication *oop.IntfOperIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
-	context := make(map[string]string)
 	portID := IntfIDToPortNo(ifindication.IntfId, voltha.Port_PON_OLT)
-	device, err := em.handler.coreProxy.GetDevice(ctx.Background(), deviceID, deviceID)
+	device, err := em.handler.coreProxy.GetDevice(context.Background(), deviceID, deviceID)
 	if err != nil {
 		return olterrors.NewErrAdapter("error-while-fetching-device-object", log.Fields{"DeviceId": deviceID}, err)
 	}
+	context := make(map[string]string)
 	for _, port := range device.Ports {
 		if port.PortNo == portID {
 			// Events are suppressed if the Port Adminstate is not enabled.
 			if port.AdminState != common.AdminState_ENABLED {
-				logger.Debugw("port-disable/enable-event-not-generated--the-port-is-not-enabled-by-operator", log.Fields{"deviceId": deviceID, "port": port})
+				logger.Debugw(ctx, "port-disable/enable-event-not-generated--the-port-is-not-enabled-by-operator", log.Fields{"deviceId": deviceID, "port": port})
 				return nil
 			}
 			break
@@ -593,14 +593,14 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", ponIntfDownIndiction, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, communication, olt, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, communication, olt, raisedTs); err != nil {
 		return olterrors.NewErrCommunication("send-olt-intf-oper-status-event", log.Fields{"device-id": deviceID, "intf-id": ifindication.IntfId, "oper-state": ifindication.OperState}, err).Log()
 	}
-	logger.Debug("sent-olt-intf-oper-status-event-to-kafka")
+	logger.Debug(ctx, "sent-olt-intf-oper-status-event-to-kafka")
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuDeactivationFailureIndication(onuDFI *oop.OnuDeactivationFailureIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuDeactivationFailureIndication(ctx context.Context, onuDFI *oop.OnuDeactivationFailureIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
@@ -615,13 +615,13 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuDeactivationFailureEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(&de, equipment, onu, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, &de, equipment, onu, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-deactivation-failure-event-sent-to-kafka", log.Fields{"onu-id": onuDFI.OnuId, "intf-id": onuDFI.IntfId})
+	logger.Debugw(ctx, "onu-deactivation-failure-event-sent-to-kafka", log.Fields{"onu-id": onuDFI.OnuId, "intf-id": onuDFI.IntfId})
 	return nil
 }
-func (em *OpenOltEventMgr) onuRemoteDefectIndication(onuID uint32, intfID uint32, rdiCount uint64, status string, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuRemoteDefectIndication(ctx context.Context, onuID uint32, intfID uint32, rdiCount uint64, status string, deviceID string, raisedTs int64) error {
 	/* Populating event context */
 	context := map[string]string{
 		"onu-id":    strconv.FormatUint(uint64(onuID), base10),
@@ -639,29 +639,29 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuRemoteDefectIndication, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(de, equipment, onu, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, de, equipment, onu, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-remote-defect-event-sent-to-kafka", log.Fields{"onu-id": onuID, "intf-id": intfID})
+	logger.Debugw(ctx, "onu-remote-defect-event-sent-to-kafka", log.Fields{"onu-id": onuID, "intf-id": intfID})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuItuPonStatsIndication(onuIPS *oop.OnuItuPonStatsIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuItuPonStatsIndication(ctx context.Context, onuIPS *oop.OnuItuPonStatsIndication, deviceID string, raisedTs int64) error {
 	onuDevice, found := em.handler.onus.Load(em.handler.formOnuKey(onuIPS.IntfId, onuIPS.OnuId))
 	if !found {
 		return errors.New("unknown-onu-device")
 	}
 	if onuIPS.GetRdiErrorInd().Status == statusCheckOn {
 		if !onuDevice.(*OnuDevice).rdiRaised {
-			if err := em.onuRemoteDefectIndication(onuIPS.OnuId, onuIPS.IntfId, onuIPS.GetRdiErrorInd().RdiErrorCount, statusCheckOn, deviceID, raisedTs); err != nil {
+			if err := em.onuRemoteDefectIndication(ctx, onuIPS.OnuId, onuIPS.IntfId, onuIPS.GetRdiErrorInd().RdiErrorCount, statusCheckOn, deviceID, raisedTs); err != nil {
 				return err
 			}
 			onuDevice.(*OnuDevice).rdiRaised = true
 			return nil
 		}
-		logger.Debugw("onu-remote-defect-already-raised", log.Fields{"onu-id": onuIPS.OnuId, "intf-id": onuIPS.IntfId})
+		logger.Debugw(ctx, "onu-remote-defect-already-raised", log.Fields{"onu-id": onuIPS.OnuId, "intf-id": onuIPS.IntfId})
 	} else {
-		if err := em.onuRemoteDefectIndication(onuIPS.OnuId, onuIPS.IntfId, onuIPS.GetRdiErrorInd().RdiErrorCount, statusCheckOff, deviceID, raisedTs); err != nil {
+		if err := em.onuRemoteDefectIndication(ctx, onuIPS.OnuId, onuIPS.IntfId, onuIPS.GetRdiErrorInd().RdiErrorCount, statusCheckOff, deviceID, raisedTs); err != nil {
 			return err
 		}
 		onuDevice.(*OnuDevice).rdiRaised = false
@@ -669,7 +669,7 @@
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuLossOfGEMChannelDelineationIndication(onuGCD *oop.OnuLossOfGEMChannelDelineationIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuLossOfGEMChannelDelineationIndication(ctx context.Context, onuGCD *oop.OnuLossOfGEMChannelDelineationIndication, deviceID string, raisedTs int64) error {
 	/* Populating event context */
 	context := map[string]string{
 		"onu-id":             strconv.FormatUint(uint64(onuGCD.OnuId), base10),
@@ -687,14 +687,14 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOfGEMChannelDelineationEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(de, communication, onu, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, de, communication, onu, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-loss-of-gem-channel-delineation-event-sent-to-kafka", log.Fields{"onu-id": onuGCD.OnuId, "intf-id": onuGCD.IntfId})
+	logger.Debugw(ctx, "onu-loss-of-gem-channel-delineation-event-sent-to-kafka", log.Fields{"onu-id": onuGCD.OnuId, "intf-id": onuGCD.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuPhysicalEquipmentErrorIndication(onuErr *oop.OnuPhysicalEquipmentErrorIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuPhysicalEquipmentErrorIndication(ctx context.Context, onuErr *oop.OnuPhysicalEquipmentErrorIndication, deviceID string, raisedTs int64) error {
 	/* Populating event context */
 	context := map[string]string{
 		"onu-id":  strconv.FormatUint(uint64(onuErr.OnuId), base10),
@@ -711,14 +711,14 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuPhysicalEquipmentErrorEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(de, equipment, onu, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, de, equipment, onu, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-physical-equipment-error-event-sent-to-kafka", log.Fields{"onu-id": onuErr.OnuId, "intf-id": onuErr.IntfId})
+	logger.Debugw(ctx, "onu-physical-equipment-error-event-sent-to-kafka", log.Fields{"onu-id": onuErr.OnuId, "intf-id": onuErr.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuLossOfAcknowledgementIndication(onuLOA *oop.OnuLossOfAcknowledgementIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuLossOfAcknowledgementIndication(ctx context.Context, onuLOA *oop.OnuLossOfAcknowledgementIndication, deviceID string, raisedTs int64) error {
 	/* Populating event context */
 	context := map[string]string{
 		"onu-id":  strconv.FormatUint(uint64(onuLOA.OnuId), base10),
@@ -735,14 +735,14 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOfAcknowledgementEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(de, equipment, onu, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, de, equipment, onu, raisedTs); err != nil {
 		return err
 	}
-	logger.Debugw("onu-physical-equipment-error-event-sent-to-kafka", log.Fields{"onu-id": onuLOA.OnuId, "intf-id": onuLOA.IntfId})
+	logger.Debugw(ctx, "onu-physical-equipment-error-event-sent-to-kafka", log.Fields{"onu-id": onuLOA.OnuId, "intf-id": onuLOA.IntfId})
 	return nil
 }
 
-func (em *OpenOltEventMgr) onuDifferentialReachExceededIndication(onuDRE *oop.OnuDifferentialReachExceededIndication, deviceID string, raisedTs int64) error {
+func (em *OpenOltEventMgr) onuDifferentialReachExceededIndication(ctx context.Context, onuDRE *oop.OnuDifferentialReachExceededIndication, deviceID string, raisedTs int64) error {
 	/* Populating event context */
 	context := map[string]string{
 		"onu-id":                strconv.FormatUint(uint64(onuDRE.OnuId), base10),
@@ -760,7 +760,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuDifferentialReachExceededEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(de, equipment, onu, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEvent(ctx, de, equipment, onu, raisedTs); err != nil {
 		return err
 	}
 	log.Debugw("onu-differential-reach-exceeded–event-sent-to-kafka", log.Fields{"onu-id": onuDRE.OnuId, "intf-id": onuDRE.IntfId})