[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor

Change-Id: Ie7cde57f423c2e947cbc9a42f86f96fb6ac97826
diff --git a/internal/pkg/core/openolt_eventmgr.go b/internal/pkg/core/openolt_eventmgr.go
index 971a173..07d2bd0 100644
--- a/internal/pkg/core/openolt_eventmgr.go
+++ b/internal/pkg/core/openolt_eventmgr.go
@@ -283,7 +283,7 @@
 	de.ResourceId = oltDeviceID
 	de.DeviceEventName = fmt.Sprintf("%s_%s", onuDiscoveryEvent, "RAISE_EVENT")
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_PON, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_PON, raisedTs, onuDeviceID); err != nil {
 		return olterrors.NewErrCommunication("send-onu-discovery-event",
 			log.Fields{
 				"serial-number": serialNumber,
@@ -340,7 +340,7 @@
 	return nil
 }
 
-func (em *OpenOltEventMgr) populateContextWithSerialDeviceID(context map[string]string, intfID, onuID uint32) {
+func (em *OpenOltEventMgr) populateContextWithSerialDeviceID(context map[string]string, intfID, onuID uint32) string {
 	var serialNumber = ""
 	var onuDeviceID = ""
 	onu := em.handler.formOnuKey(intfID, onuID)
@@ -352,13 +352,14 @@
 	context[ContextOltPortLabel], _ = GetportLabel(intfID, voltha.Port_PON_OLT)
 	context[ContextOnuSerialNumber] = serialNumber
 	context[ContextOnuDeviceID] = onuDeviceID
+	return onuDeviceID
 }
 
 func (em *OpenOltEventMgr) onuDyingGaspIndication(ctx context.Context, dgi *oop.DyingGaspIndication, deviceID string, raisedTs int64) error {
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
-	em.populateContextWithSerialDeviceID(context, dgi.IntfId, dgi.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, dgi.IntfId, dgi.OnuId)
 
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(dgi.IntfId), base10)
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(dgi.OnuId), base10)
@@ -368,7 +369,7 @@
 	de.ResourceId = deviceID
 	de.DeviceEventName = fmt.Sprintf("%s_%s", onuDyingGaspEvent, "EVENT")
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-dying-gasp-event-sent-to-kafka", log.Fields{"intf-id": dgi.IntfId})
@@ -448,7 +449,7 @@
 	/* Populating event context */
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuAlarm.IntfId), base10)
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuAlarm.OnuId), base10)
-	em.populateContextWithSerialDeviceID(context, onuAlarm.IntfId, onuAlarm.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuAlarm.IntfId, onuAlarm.OnuId)
 
 	/* Populating device event body */
 	de.Context = context
@@ -485,7 +486,7 @@
 	}
 
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-los-event-sent-to-kafka", log.Fields{"onu-id": onuAlarm.OnuId, "intf-id": onuAlarm.IntfId})
@@ -500,14 +501,14 @@
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuInd.OnuId), base10)
 	context[ContextOnuFailureReaseon] = onuInd.FailReason.String()
 
-	em.populateContextWithSerialDeviceID(context, onuInd.IntfId, onuInd.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuInd.IntfId, onuInd.OnuId)
 
 	/* Populating device event body */
 	de.Context = context
 	de.ResourceId = deviceID
 	de.DeviceEventName = eventName
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_PON, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_PON, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-activation-failure-event-sent-to-kafka", log.Fields{"onu-id": onuInd.OnuId, "intf-id": onuInd.IntfId})
@@ -521,7 +522,7 @@
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuLossOmci.IntfId), base10)
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuLossOmci.OnuId), base10)
 
-	em.populateContextWithSerialDeviceID(context, onuLossOmci.IntfId, onuLossOmci.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuLossOmci.IntfId, onuLossOmci.OnuId)
 
 	/* Populating device event body */
 	de.Context = context
@@ -532,7 +533,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOmciEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-loss-of-omci-channel-event-sent-to-kafka", log.Fields{"onu-id": onuLossOmci.OnuId, "intf-id": onuLossOmci.IntfId})
@@ -548,7 +549,7 @@
 	context[ContextOnuDrift] = strconv.FormatUint(uint64(onuDriftWindow.Drift), base10)
 	context[ContextOnuNewEqd] = strconv.FormatUint(uint64(onuDriftWindow.NewEqd), base10)
 
-	em.populateContextWithSerialDeviceID(context, onuDriftWindow.IntfId, onuDriftWindow.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuDriftWindow.IntfId, onuDriftWindow.OnuId)
 
 	/* Populating device event body */
 	de.Context = context
@@ -559,7 +560,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuDriftOfWindowEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-drift-of-window-event-sent-to-kafka", log.Fields{"onu-id": onuDriftWindow.OnuId, "intf-id": onuDriftWindow.IntfId})
@@ -574,7 +575,7 @@
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuSignalDegrade.OnuId), base10)
 	context[ContextOnuInverseBitErrorRate] = strconv.FormatUint(uint64(onuSignalDegrade.InverseBitErrorRate), base10)
 
-	em.populateContextWithSerialDeviceID(context, onuSignalDegrade.IntfId, onuSignalDegrade.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuSignalDegrade.IntfId, onuSignalDegrade.OnuId)
 
 	/* Populating device event body */
 	de.Context = context
@@ -585,7 +586,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuSignalDegradeEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-signal-degrade-event-sent-to-kafka", log.Fields{"onu-id": onuSignalDegrade.OnuId, "intf-id": onuSignalDegrade.IntfId})
@@ -596,7 +597,7 @@
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
-	em.populateContextWithSerialDeviceID(context, onuSignalsFail.IntfId, onuSignalsFail.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuSignalsFail.IntfId, onuSignalsFail.OnuId)
 
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuSignalsFail.OnuId), base10)
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuSignalsFail.IntfId), base10)
@@ -610,7 +611,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuSignalsFailEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-signals-fail-event-sent-to-kafka", log.Fields{"onu-id": onuSignalsFail.OnuId, "intf-id": onuSignalsFail.IntfId})
@@ -621,7 +622,7 @@
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
-	em.populateContextWithSerialDeviceID(context, onuStartupFail.IntfId, onuStartupFail.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuStartupFail.IntfId, onuStartupFail.OnuId)
 
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuStartupFail.OnuId), base10)
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuStartupFail.IntfId), base10)
@@ -635,7 +636,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuStartupFailEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_PON, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_PON, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-startup-fail-event-sent-to-kafka", log.Fields{"onu-id": onuStartupFail.OnuId, "intf-id": onuStartupFail.IntfId})
@@ -646,7 +647,7 @@
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
-	em.populateContextWithSerialDeviceID(context, onuLOKI.IntfId, onuLOKI.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuLOKI.IntfId, onuLOKI.OnuId)
 
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuLOKI.OnuId), base10)
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuLOKI.IntfId), base10)
@@ -660,7 +661,7 @@
 	}
 
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_SECURITY, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_SECURITY, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-loss-of-key-sync-event-sent-to-kafka", log.Fields{"onu-id": onuLOKI.OnuId, "intf-id": onuLOKI.IntfId})
@@ -703,7 +704,7 @@
 	var de voltha.DeviceEvent
 	context := make(map[string]string)
 	/* Populating event context */
-	em.populateContextWithSerialDeviceID(context, onuDFI.IntfId, onuDFI.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuDFI.IntfId, onuDFI.OnuId)
 
 	context[ContextOnuOnuID] = strconv.FormatUint(uint64(onuDFI.OnuId), base10)
 	context[ContextOnuPonIntfID] = strconv.FormatUint(uint64(onuDFI.IntfId), base10)
@@ -716,7 +717,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuDeactivationFailureEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, &de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-deactivation-failure-event-sent-to-kafka", log.Fields{"onu-id": onuDFI.OnuId, "intf-id": onuDFI.IntfId})
@@ -730,7 +731,7 @@
 		ContextOnuPonIntfID:                  strconv.FormatUint(uint64(intfID), base10),
 		ContextOnuRemoteDefectIndicatorCount: strconv.FormatUint(rdiCount, base10),
 	}
-	em.populateContextWithSerialDeviceID(context, intfID, onuID)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, intfID, onuID)
 
 	/* Populating device event body */
 	de := &voltha.DeviceEvent{
@@ -743,7 +744,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuRemoteDefectIndication, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-remote-defect-event-sent-to-kafka", log.Fields{"onu-id": onuID, "intf-id": intfID})
@@ -780,7 +781,7 @@
 		ContextOnuPonIntfID:         strconv.FormatUint(uint64(onuGCD.IntfId), base10),
 		ContextOnuDelineationErrors: strconv.FormatUint(uint64(onuGCD.DelineationErrors), base10),
 	}
-	em.populateContextWithSerialDeviceID(context, onuGCD.IntfId, onuGCD.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuGCD.IntfId, onuGCD.OnuId)
 
 	/* Populating device event body */
 	de := &voltha.DeviceEvent{
@@ -793,7 +794,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOfGEMChannelDelineationEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, de, voltha.EventCategory_COMMUNICATION, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-loss-of-gem-channel-delineation-event-sent-to-kafka", log.Fields{"onu-id": onuGCD.OnuId, "intf-id": onuGCD.IntfId})
@@ -806,7 +807,7 @@
 		ContextOnuOnuID:     strconv.FormatUint(uint64(onuErr.OnuId), base10),
 		ContextOnuPonIntfID: strconv.FormatUint(uint64(onuErr.IntfId), base10),
 	}
-	em.populateContextWithSerialDeviceID(context, onuErr.IntfId, onuErr.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuErr.IntfId, onuErr.OnuId)
 	/* Populating device event body */
 	de := &voltha.DeviceEvent{
 		Context:    context,
@@ -818,7 +819,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuPhysicalEquipmentErrorEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-physical-equipment-error-event-sent-to-kafka", log.Fields{"onu-id": onuErr.OnuId, "intf-id": onuErr.IntfId})
@@ -831,7 +832,7 @@
 		ContextOnuOnuID:     strconv.FormatUint(uint64(onuLOA.OnuId), base10),
 		ContextOnuPonIntfID: strconv.FormatUint(uint64(onuLOA.IntfId), base10),
 	}
-	em.populateContextWithSerialDeviceID(context, onuLOA.IntfId, onuLOA.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuLOA.IntfId, onuLOA.OnuId)
 
 	/* Populating device event body */
 	de := &voltha.DeviceEvent{
@@ -844,7 +845,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuLossOfAcknowledgementEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-physical-equipment-error-event-sent-to-kafka", log.Fields{"onu-id": onuLOA.OnuId, "intf-id": onuLOA.IntfId})
@@ -858,7 +859,7 @@
 		ContextOnuPonIntfID:            strconv.FormatUint(uint64(onuDRE.IntfId), base10),
 		ContextOnuDifferentialDistance: strconv.FormatUint(uint64(onuDRE.Distance), base10),
 	}
-	em.populateContextWithSerialDeviceID(context, onuDRE.IntfId, onuDRE.OnuId)
+	onuDeviceID := em.populateContextWithSerialDeviceID(context, onuDRE.IntfId, onuDRE.OnuId)
 
 	/* Populating device event body */
 	de := &voltha.DeviceEvent{
@@ -871,7 +872,7 @@
 		de.DeviceEventName = fmt.Sprintf("%s_%s", onuDifferentialReachExceededEvent, "CLEAR_EVENT")
 	}
 	/* Send event to KAFKA */
-	if err := em.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs); err != nil {
+	if err := em.eventProxy.SendDeviceEventWithKey(ctx, de, voltha.EventCategory_EQUIPMENT, voltha.EventSubCategory_ONU, raisedTs, onuDeviceID); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "onu-differential-reach-exceeded–event-sent-to-kafka", log.Fields{"onu-id": onuDRE.OnuId, "intf-id": onuDRE.IntfId})