[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor
Change-Id: I9949f9087c455c9976dad4b1e63be740bb58715f
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
index e4493f9..4b46854 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
@@ -122,6 +122,11 @@
/* Send out device events*/
func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
+ return ep.SendDeviceEventWithKey(ctx, deviceEvent, category, subCategory, raisedTs, "")
+}
+
+/* Send out device events with key*/
+func (ep *EventProxy) SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64, key string) error {
if deviceEvent == nil {
logger.Error(ctx, "Recieved empty device event")
return errors.New("Device event nil")
@@ -134,11 +139,12 @@
return err
}
event.EventType = &de
- if err := ep.sendEvent(ctx, &event); err != nil {
+
+ if err := ep.sendEvent(ctx, &event, key); err != nil {
logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
return err
}
- logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"key": key, "Id": event.Header.Id, "Category": event.Header.Category,
"SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
"ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
"DeviceEventName": deviceEvent.DeviceEventName})
@@ -161,7 +167,8 @@
return err
}
event.EventType = &de
- if err := ep.sendEvent(ctx, &event); err != nil {
+
+ if err := ep.sendEvent(ctx, &event, strconv.FormatInt(raisedTs, 10)); err != nil {
logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
return err
}
@@ -173,9 +180,9 @@
}
-func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
+func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event, key string) error {
logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
- if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
+ if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic, key); err != nil {
return err
}
logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
@@ -193,7 +200,13 @@
// Start the event proxy
func (ep *EventProxy) Start() error {
+ if !ep.eventTopicExits(context.Background()) {
+ logger.Errorw(context.Background(), "event-topic-doesn't-exist-in-kafka", log.Fields{"element": ep.eventTopic.Name})
+ return fmt.Errorf("event topic doesn't exist in kafka")
+ }
+
eq := ep.eventQueue
+
go eq.start(ep.queueCtx)
logger.Debugw(context.Background(), "event-proxy-starting...", log.Fields{"events-threashold": EVENT_THRESHOLD})
for {
@@ -218,8 +231,8 @@
logger.Warnw(ctx, "invalid-event", log.Fields{"element": elem})
continue
}
- if err := ep.sendEvent(ctx, event); err != nil {
- logger.Errorw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
+ if err := ep.sendEvent(ctx, event, ""); err != nil {
+ logger.Warnw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
} else {
logger.Debugw(ctx, "successfully-sent-rpc-event-to-kafka-bus", log.Fields{"id": event.Header.Id, "category": event.Header.Category,
"sub-category": event.Header.SubCategory, "type": event.Header.Type, "type-version": event.Header.TypeVersion,
@@ -345,3 +358,21 @@
eq.mutex.Unlock()
}
+
+func (ep *EventProxy) eventTopicExits(ctx context.Context) bool {
+
+ // check if voltha.events topic exists
+ topics, err := ep.kafkaClient.ListTopics(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "fail-to-get-topics", log.Fields{"topic": ep.eventTopic.Name, "error": err})
+ return false
+ }
+
+ logger.Debugw(ctx, "topics in kafka", log.Fields{"topics": topics, "event-topic": ep.eventTopic.Name})
+ for _, topic := range topics {
+ if topic == ep.eventTopic.Name {
+ return true
+ }
+ }
+ return false
+}