[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor

Change-Id: Ie7cde57f423c2e947cbc9a42f86f96fb6ac97826
diff --git a/go.mod b/go.mod
index 568cba7..11a19f9 100644
--- a/go.mod
+++ b/go.mod
@@ -13,7 +13,7 @@
 	github.com/gogo/protobuf v1.3.2
 	github.com/golang/protobuf v1.5.2
 	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
-	github.com/opencord/voltha-lib-go/v7 v7.1.6
+	github.com/opencord/voltha-lib-go/v7 v7.1.8
 	github.com/opencord/voltha-protos/v5 v5.2.2
 	github.com/stretchr/testify v1.7.0
 	go.etcd.io/etcd v3.3.25+incompatible
diff --git a/go.sum b/go.sum
index a14bc72..f7c94be 100644
--- a/go.sum
+++ b/go.sum
@@ -191,8 +191,8 @@
 github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
 github.com/onsi/gomega v1.14.0 h1:ep6kpPVwmr/nTbklSx2nrLNSIO62DoYAhnPNIMhK8gI=
 github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
-github.com/opencord/voltha-lib-go/v7 v7.1.6 h1:LSN9QPz0ndKFyWV6puoCjvuUmBale5+IiLOjlrbrDmU=
-github.com/opencord/voltha-lib-go/v7 v7.1.6/go.mod h1:lnwlFfhDVMBg2siCv1CajB1fvfAU9Cs8VbB64LQ8zVg=
+github.com/opencord/voltha-lib-go/v7 v7.1.8 h1:5k+1Ul+T+gmvM7GONbK1/+YrX4tizAc3REgHoFvug0I=
+github.com/opencord/voltha-lib-go/v7 v7.1.8/go.mod h1:lnwlFfhDVMBg2siCv1CajB1fvfAU9Cs8VbB64LQ8zVg=
 github.com/opencord/voltha-protos/v5 v5.2.2 h1:1Bcgl+Fmp00ZxlDrHZdcbjpMgOwX6TnZmOTrYm9SbR8=
 github.com/opencord/voltha-protos/v5 v5.2.2/go.mod h1:ZGcyW79kQKIo7AySo1LRu613E6uiozixrCF0yNB/4x8=
 github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
diff --git a/internal/pkg/core/openolt_eventmgr.go b/internal/pkg/core/openolt_eventmgr.go
index 971a173..07d2bd0 100644
--- a/internal/pkg/core/openolt_eventmgr.go
+++ b/internal/pkg/core/openolt_eventmgr.go
@@ -283,7 +283,7 @@
 	de.ResourceId = oltDeviceID
 	de.DeviceEventName = fmt.Sprintf("%s_%s", onuDiscoveryEvent, "RAISE_EVENT")
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_PON, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_PON, raisedTs, onuDeviceID); err != nil {
 		return olterrors.NewErrCommunication("send-onu-discovery-event",
 			log.Fields{
 				"serial-number": serialNumber,
@@ -340,7 +340,7 @@
 	return nil
 }
 
-func (em *OpenOltEventMgr) populateContextWithSerialDeviceID(context map[string]string, intfID, onuID uint32) {
+func (em *OpenOltEventMgr) populateContextWithSerialDeviceID(context map[string]string, intfID, onuID uint32) string {
 	var serialNumber = ""
 	var onuDeviceID = ""
 	onu := em.handler.formOnuKey(intfID, onuID)
@@ -352,13 +352,14 @@
 	context[ContextOltPortLabel], _ = GetportLabel(intfID, voltha.Port_PON_OLT)
 	context[ContextOnuSerialNumber] = serialNumber
 	context[ContextOnuDeviceID] = onuDeviceID
+	return onuDeviceID
 }
 
 func (em *OpenOltEventMgr) onuDyingGaspIndication(ctx context.Context, dgi *oop.DyingGaspIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
-	em.populateContextWithSerialDeviceID(context, dgi.IntfId, dgi.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, dgi.IntfId, dgi.OnuId)
 
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(dgi.IntfId), base10)
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(dgi.OnuId), base10)
@@ -368,7 +369,7 @@
 	de.ResourceId = deviceID
 	de.DeviceEventName = fmt.Sprintf("%s_%s", onuDyingGaspEvent, "EVENT")
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-dying-gasp-event-sent-to-kafka", log.Fields{"intf-id": dgi.IntfId})
@@ -448,7 +449,7 @@
 	/* Populating event context */
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuAlarm.IntfId), base10)
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuAlarm.OnuId), base10)
-	em.populateContextWithSerialDeviceID(context, onuAlarm.IntfId, onuAlarm.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuAlarm.IntfId, onuAlarm.OnuId)
 
 	/* Populating device event body */
 	de.Context = context
@@ -485,7 +486,7 @@
 	}
 
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-los-event-sent-to-kafka", log.Fields{"onu-id": onuAlarm.OnuId, "intf-id": onuAlarm.IntfId})
@@ -500,14 +501,14 @@
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuInd.OnuId), base10)
 	context[ContextOnuFailureReaseon] = onuInd.FailReason.String()
 
-	em.populateContextWithSerialDeviceID(context, onuInd.IntfId, onuInd.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuInd.IntfId, onuInd.OnuId)
 
 	/* Populating device event body */
 	de.Context = context
 	de.ResourceId = deviceID
 	de.DeviceEventName = eventName
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_PON, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_PON, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-activation-failure-event-sent-to-kafka", log.Fields{"onu-id": onuInd.OnuId, "intf-id": onuInd.IntfId})
@@ -521,7 +522,7 @@
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuLossOmci.IntfId), base10)
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuLossOmci.OnuId), base10)
 
-	em.populateContextWithSerialDeviceID(context, onuLossOmci.IntfId, onuLossOmci.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuLossOmci.IntfId, onuLossOmci.OnuId)
 
 	/* Populating device event body */
 	de.Context = context
@@ -532,7 +533,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOmciEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-loss-of-omci-channel-event-sent-to-kafka", log.Fields{"onu-id": onuLossOmci.OnuId, "intf-id": onuLossOmci.IntfId})
@@ -548,7 +549,7 @@
 	context[ContextOnuDrift] = strconv.FormatUint(uint64(onuDriftWindow.Drift), base10)
 	context[ContextOnuNewEqd] = strconv.FormatUint(uint64(onuDriftWindow.NewEqd), base10)
 
-	em.populateContextWithSerialDeviceID(context, onuDriftWindow.IntfId, onuDriftWindow.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuDriftWindow.IntfId, onuDriftWindow.OnuId)
 
 	/* Populating device event body */
 	de.Context = context
@@ -559,7 +560,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuDriftOfWindowEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-drift-of-window-event-sent-to-kafka", log.Fields{"onu-id": onuDriftWindow.OnuId, "intf-id": onuDriftWindow.IntfId})
@@ -574,7 +575,7 @@
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuSignalDegrade.OnuId), base10)
 	context[ContextOnuInverseBitErrorRate] = strconv.FormatUint(uint64(onuSignalDegrade.InverseBitErrorRate), base10)
 
-	em.populateContextWithSerialDeviceID(context, onuSignalDegrade.IntfId, onuSignalDegrade.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuSignalDegrade.IntfId, onuSignalDegrade.OnuId)
 
 	/* Populating device event body */
 	de.Context = context
@@ -585,7 +586,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuSignalDegradeEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-signal-degrade-event-sent-to-kafka", log.Fields{"onu-id": onuSignalDegrade.OnuId, "intf-id": onuSignalDegrade.IntfId})
@@ -596,7 +597,7 @@
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
-	em.populateContextWithSerialDeviceID(context, onuSignalsFail.IntfId, onuSignalsFail.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuSignalsFail.IntfId, onuSignalsFail.OnuId)
 
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuSignalsFail.OnuId), base10)
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuSignalsFail.IntfId), base10)
@@ -610,7 +611,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuSignalsFailEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-signals-fail-event-sent-to-kafka", log.Fields{"onu-id": onuSignalsFail.OnuId, "intf-id": onuSignalsFail.IntfId})
@@ -621,7 +622,7 @@
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
-	em.populateContextWithSerialDeviceID(context, onuStartupFail.IntfId, onuStartupFail.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuStartupFail.IntfId, onuStartupFail.OnuId)
 
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuStartupFail.OnuId), base10)
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuStartupFail.IntfId), base10)
@@ -635,7 +636,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuStartupFailEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_PON, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_PON, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-startup-fail-event-sent-to-kafka", log.Fields{"onu-id": onuStartupFail.OnuId, "intf-id": onuStartupFail.IntfId})
@@ -646,7 +647,7 @@
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
-	em.populateContextWithSerialDeviceID(context, onuLOKI.IntfId, onuLOKI.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuLOKI.IntfId, onuLOKI.OnuId)
 
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuLOKI.OnuId), base10)
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuLOKI.IntfId), base10)
@@ -660,7 +661,7 @@
 	}
 
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_SECURITY, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_SECURITY, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-loss-of-key-sync-event-sent-to-kafka", log.Fields{"onu-id": onuLOKI.OnuId, "intf-id": onuLOKI.IntfId})
@@ -703,7 +704,7 @@
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
-	em.populateContextWithSerialDeviceID(context, onuDFI.IntfId, onuDFI.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuDFI.IntfId, onuDFI.OnuId)
 
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuDFI.OnuId), base10)
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuDFI.IntfId), base10)
@@ -716,7 +717,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuDeactivationFailureEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-deactivation-failure-event-sent-to-kafka", log.Fields{"onu-id": onuDFI.OnuId, "intf-id": onuDFI.IntfId})
@@ -730,7 +731,7 @@
 		ContextOnuPonIntfID:                  strconv.FormatUint(uint64(intfID), base10),
 		ContextOnuRemoteDefectIndicatorCount: strconv.FormatUint(rdiCount, base10),
 	}
-	em.populateContextWithSerialDeviceID(context, intfID, onuID)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, intfID, onuID)
 
 	/* Populating device event body */
 	de := &voltha.DeviceEvent{
@@ -743,7 +744,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuRemoteDefectIndication, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-remote-defect-event-sent-to-kafka", log.Fields{"onu-id": onuID, "intf-id": intfID})
@@ -780,7 +781,7 @@
 		ContextOnuPonIntfID:         strconv.FormatUint(uint64(onuGCD.IntfId), base10),
 		ContextOnuDelineationErrors: strconv.FormatUint(uint64(onuGCD.DelineationErrors), base10),
 	}
-	em.populateContextWithSerialDeviceID(context, onuGCD.IntfId, onuGCD.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuGCD.IntfId, onuGCD.OnuId)
 
 	/* Populating device event body */
 	de := &voltha.DeviceEvent{
@@ -793,7 +794,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOfGEMChannelDelineationEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-loss-of-gem-channel-delineation-event-sent-to-kafka", log.Fields{"onu-id": onuGCD.OnuId, "intf-id": onuGCD.IntfId})
@@ -806,7 +807,7 @@
 		ContextOnuOnuID:     strconv.FormatUint(uint64(onuErr.OnuId), base10),
 		ContextOnuPonIntfID: strconv.FormatUint(uint64(onuErr.IntfId), base10),
 	}
-	em.populateContextWithSerialDeviceID(context, onuErr.IntfId, onuErr.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuErr.IntfId, onuErr.OnuId)
 	/* Populating device event body */
 	de := &voltha.DeviceEvent{
 		Context:    context,
@@ -818,7 +819,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuPhysicalEquipmentErrorEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-physical-equipment-error-event-sent-to-kafka", log.Fields{"onu-id": onuErr.OnuId, "intf-id": onuErr.IntfId})
@@ -831,7 +832,7 @@
 		ContextOnuOnuID:     strconv.FormatUint(uint64(onuLOA.OnuId), base10),
 		ContextOnuPonIntfID: strconv.FormatUint(uint64(onuLOA.IntfId), base10),
 	}
-	em.populateContextWithSerialDeviceID(context, onuLOA.IntfId, onuLOA.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuLOA.IntfId, onuLOA.OnuId)
 
 	/* Populating device event body */
 	de := &voltha.DeviceEvent{
@@ -844,7 +845,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOfAcknowledgementEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-physical-equipment-error-event-sent-to-kafka", log.Fields{"onu-id": onuLOA.OnuId, "intf-id": onuLOA.IntfId})
@@ -858,7 +859,7 @@
 		ContextOnuPonIntfID:            strconv.FormatUint(uint64(onuDRE.IntfId), base10),
 		ContextOnuDifferentialDistance: strconv.FormatUint(uint64(onuDRE.Distance), base10),
 	}
-	em.populateContextWithSerialDeviceID(context, onuDRE.IntfId, onuDRE.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuDRE.IntfId, onuDRE.OnuId)
 
 	/* Populating device event body */
 	de := &voltha.DeviceEvent{
@@ -871,7 +872,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuDifferentialReachExceededEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-differential-reach-exceeded–event-sent-to-kafka", log.Fields{"onu-id": onuDRE.OnuId, "intf-id": onuDRE.IntfId})
diff --git a/pkg/mocks/mockEventproxy.go b/pkg/mocks/mockEventproxy.go
index 892e159..2f63de7 100644
--- a/pkg/mocks/mockEventproxy.go
+++ b/pkg/mocks/mockEventproxy.go
@@ -21,6 +21,7 @@
 	"context"
 	"errors"
 
+	"github.com/opencord/voltha-lib-go/v7/pkg/events/eventif"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
@@ -73,3 +74,9 @@
 // Stop stops the proxy
 func (me *MockEventProxy) Stop() {
 }
+
+//SendDeviceEventWithKey mocks SendDeviceEventWithKey
+func (me *MockEventProxy) SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory,
+	subCategory eventif.EventSubCategory, raisedTs int64, key string) error {
+	return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go
index e4ebc36..625cd0b 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go
@@ -26,6 +26,8 @@
 type EventProxy interface {
 	SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category EventCategory,
 		subCategory EventSubCategory, raisedTs int64) error
+	SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category EventCategory,
+		subCategory EventSubCategory, raisedTs int64, key string) error
 	SendKpiEvent(ctx context.Context, id string, deviceEvent *voltha.KpiEvent2, category EventCategory,
 		subCategory EventSubCategory, raisedTs int64) error
 	SendRPCEvent(ctx context.Context, id string, deviceEvent *voltha.RPCEvent, category EventCategory,
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
index e4493f9..4b46854 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
@@ -122,6 +122,11 @@
 
 /* Send out device events*/
 func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
+	return ep.SendDeviceEventWithKey(ctx, deviceEvent, category, subCategory, raisedTs, "")
+}
+
+/* Send out device events with key*/
+func (ep *EventProxy) SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64, key string) error {
 	if deviceEvent == nil {
 		logger.Error(ctx, "Recieved empty device event")
 		return errors.New("Device event nil")
@@ -134,11 +139,12 @@
 		return err
 	}
 	event.EventType = &de
-	if err := ep.sendEvent(ctx, &event); err != nil {
+
+	if err := ep.sendEvent(ctx, &event, key); err != nil {
 		logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
 		return err
 	}
-	logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+	logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"key": key, "Id": event.Header.Id, "Category": event.Header.Category,
 		"SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
 		"ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
 		"DeviceEventName": deviceEvent.DeviceEventName})
@@ -161,7 +167,8 @@
 		return err
 	}
 	event.EventType = &de
-	if err := ep.sendEvent(ctx, &event); err != nil {
+
+	if err := ep.sendEvent(ctx, &event, strconv.FormatInt(raisedTs, 10)); err != nil {
 		logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
 		return err
 	}
@@ -173,9 +180,9 @@
 
 }
 
-func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
+func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event, key string) error {
 	logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
-	if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
+	if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic, key); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
@@ -193,7 +200,13 @@
 
 // Start the event proxy
 func (ep *EventProxy) Start() error {
+	if !ep.eventTopicExits(context.Background()) {
+		logger.Errorw(context.Background(), "event-topic-doesn't-exist-in-kafka", log.Fields{"element": ep.eventTopic.Name})
+		return fmt.Errorf("event topic doesn't exist in kafka")
+	}
+
 	eq := ep.eventQueue
+
 	go eq.start(ep.queueCtx)
 	logger.Debugw(context.Background(), "event-proxy-starting...", log.Fields{"events-threashold": EVENT_THRESHOLD})
 	for {
@@ -218,8 +231,8 @@
 			logger.Warnw(ctx, "invalid-event", log.Fields{"element": elem})
 			continue
 		}
-		if err := ep.sendEvent(ctx, event); err != nil {
-			logger.Errorw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
+		if err := ep.sendEvent(ctx, event, ""); err != nil {
+			logger.Warnw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
 		} else {
 			logger.Debugw(ctx, "successfully-sent-rpc-event-to-kafka-bus", log.Fields{"id": event.Header.Id, "category": event.Header.Category,
 				"sub-category": event.Header.SubCategory, "type": event.Header.Type, "type-version": event.Header.TypeVersion,
@@ -345,3 +358,21 @@
 	eq.mutex.Unlock()
 
 }
+
+func (ep *EventProxy) eventTopicExits(ctx context.Context) bool {
+
+	// check if voltha.events topic exists
+	topics, err := ep.kafkaClient.ListTopics(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "fail-to-get-topics", log.Fields{"topic": ep.eventTopic.Name, "error": err})
+		return false
+	}
+
+	logger.Debugw(ctx, "topics in kafka", log.Fields{"topics": topics, "event-topic": ep.eventTopic.Name})
+	for _, topic := range topics {
+		if topic == ep.eventTopic.Name {
+			return true
+		}
+	}
+	return false
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go
index 1e50a63..46375fa 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go
@@ -629,6 +629,22 @@
 	return innerTag
 }
 
+func GetInnerTagFromWriteMetaData(ctx context.Context, metadata uint64) uint16 {
+	/*
+			  Write metadata instruction value (metadata) is 8 bytes:
+		    	MS 2 bytes: C Tag
+		    	Next 2 bytes: Technology Profile Id
+		    	Next 4 bytes: Port number (uni or nni)
+		    	This is set in the ONOS OltPipeline as a write metadata instruction
+	*/
+	var innerTag uint16 = 0
+	if metadata != 0 {
+		innerTag = uint16((metadata >> 48) & 0xFFFF)
+		logger.Debugw(ctx, "Found  CVLAN from write metadate action", log.Fields{"c_vlan": innerTag})
+	}
+	return innerTag
+}
+
 //GetInnerTagFromMetaData retrieves the inner tag from the Metadata_ofp. The port number (UNI on ONU) is in the
 // lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
 //// a Metadata_ofp field
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go
index fdc05bc..afc2955 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go
@@ -77,4 +77,5 @@
 	SendLiveness(ctx context.Context) error
 	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
 	EnableHealthinessChannel(ctx context.Context, enable bool) chan bool
+	ListTopics(ctx context.Context) ([]string, error)
 }
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
index 185f6ec..680aa67 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
@@ -327,13 +327,16 @@
 	topicDetails[topic.Name] = topicDetail
 
 	if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
-		if err == sarama.ErrTopicAlreadyExists {
-			//	Not an error
-			logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
-			return nil
+		switch typedErr := err.(type) {
+		case *sarama.TopicError:
+			if typedErr.Err == sarama.ErrTopicAlreadyExists {
+				err = nil
+			}
 		}
-		logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
-		return err
+		if err != nil {
+			logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
+			return err
+		}
 	}
 	// TODO: Wait until the topic has been created.  No API is available in the Sarama clusterAdmin to
 	// do so.
@@ -832,7 +835,7 @@
 	// This Creates the publisher
 	config := sarama.NewConfig()
 	config.Version = sarama.V1_0_0_0
-	config.Producer.Partitioner = sarama.NewRandomPartitioner
+	config.Producer.Partitioner = sarama.NewHashPartitioner
 	config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
 	config.Producer.Flush.Messages = sc.producerFlushMessages
 	config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
@@ -1152,3 +1155,21 @@
 	}
 	return nil
 }
+
+func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error) {
+
+	config := sarama.NewConfig()
+	client, err := sarama.NewClient([]string{sc.KafkaAddress}, config)
+	if err != nil {
+		logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+		return nil, err
+	}
+
+	topics, err := client.Topics()
+	if err != nil {
+		logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+		return nil, err
+	}
+
+	return topics, nil
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index f92f936..463d541 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -123,7 +123,7 @@
 github.com/klauspost/compress/huff0
 github.com/klauspost/compress/zstd
 github.com/klauspost/compress/zstd/internal/xxhash
-# github.com/opencord/voltha-lib-go/v7 v7.1.6
+# github.com/opencord/voltha-lib-go/v7 v7.1.8
 ## explicit
 github.com/opencord/voltha-lib-go/v7/pkg/config
 github.com/opencord/voltha-lib-go/v7/pkg/db