[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor
Change-Id: I55b40d97afaed0d75240fd6557f26da90950f6c5
diff --git a/VERSION b/VERSION
index fd2a018..94ff29c 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.1.0
+3.1.1
diff --git a/go.mod b/go.mod
index d306630..4c7a444 100644
--- a/go.mod
+++ b/go.mod
@@ -17,7 +17,7 @@
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
- github.com/opencord/voltha-lib-go/v7 v7.1.5
+ github.com/opencord/voltha-lib-go/v7 v7.1.8
github.com/opencord/voltha-protos/v5 v5.2.2
github.com/opentracing/opentracing-go v1.2.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
diff --git a/go.sum b/go.sum
index 24c8add..e724d7e 100644
--- a/go.sum
+++ b/go.sum
@@ -199,8 +199,8 @@
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.14.0 h1:ep6kpPVwmr/nTbklSx2nrLNSIO62DoYAhnPNIMhK8gI=
github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
-github.com/opencord/voltha-lib-go/v7 v7.1.5 h1:IdNDzcA8V8LXHVm/2dG9QJ1IqFO+kM1aCAC4By+3MEU=
-github.com/opencord/voltha-lib-go/v7 v7.1.5/go.mod h1:lnwlFfhDVMBg2siCv1CajB1fvfAU9Cs8VbB64LQ8zVg=
+github.com/opencord/voltha-lib-go/v7 v7.1.8 h1:5k+1Ul+T+gmvM7GONbK1/+YrX4tizAc3REgHoFvug0I=
+github.com/opencord/voltha-lib-go/v7 v7.1.8/go.mod h1:lnwlFfhDVMBg2siCv1CajB1fvfAU9Cs8VbB64LQ8zVg=
github.com/opencord/voltha-protos/v5 v5.2.2 h1:1Bcgl+Fmp00ZxlDrHZdcbjpMgOwX6TnZmOTrYm9SbR8=
github.com/opencord/voltha-protos/v5 v5.2.2/go.mod h1:ZGcyW79kQKIo7AySo1LRu613E6uiozixrCF0yNB/4x8=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 4375b9b..0f154f6 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -37,6 +37,8 @@
KVStoreTimeout time.Duration
KVStoreAddress string
EventTopic string
+ EventTopicPartitions int
+ EventTopicReplicas int
LogLevel string
Banner bool
DisplayVersionOnly bool
@@ -85,6 +87,16 @@
"voltha.events",
"RW Core Event topic")
+ fs.IntVar(&cf.EventTopicPartitions,
+ "EventTopicPartitions",
+ 3,
+ "RW Core Event topic partitions")
+
+ fs.IntVar(&cf.EventTopicReplicas,
+ "EventTopicReplicas",
+ 1,
+ "RW Core Event topic replicas")
+
fs.StringVar(&cf.KVStoreType,
"kv_store_type",
EtcdStoreName,
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c6ce503..58c6968 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -119,6 +119,14 @@
}
defer core.KafkaClient.Stop(ctx)
+ // create the voltha.events topic
+ topic := &kafka.Topic{Name: cf.EventTopic}
+ if err := core.KafkaClient.CreateTopic(ctx, topic, cf.EventTopicPartitions, cf.EventTopicReplicas); err != nil {
+ if err != nil {
+ logger.Fatal(ctx, "unable-to create topic", log.Fields{"topic": cf.EventTopic, "error": err})
+ }
+ }
+
// Create the event proxy to post events to KAFKA
eventProxy := events.NewEventProxy(events.MsgClient(core.KafkaClient), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
go func() {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go
index e4ebc36..625cd0b 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/eventif/events_proxy_if.go
@@ -26,6 +26,8 @@
type EventProxy interface {
SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category EventCategory,
subCategory EventSubCategory, raisedTs int64) error
+ SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category EventCategory,
+ subCategory EventSubCategory, raisedTs int64, key string) error
SendKpiEvent(ctx context.Context, id string, deviceEvent *voltha.KpiEvent2, category EventCategory,
subCategory EventSubCategory, raisedTs int64) error
SendRPCEvent(ctx context.Context, id string, deviceEvent *voltha.RPCEvent, category EventCategory,
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
index e4493f9..4b46854 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/events/events_proxy.go
@@ -122,6 +122,11 @@
/* Send out device events*/
func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
+ return ep.SendDeviceEventWithKey(ctx, deviceEvent, category, subCategory, raisedTs, "")
+}
+
+/* Send out device events with key*/
+func (ep *EventProxy) SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64, key string) error {
if deviceEvent == nil {
logger.Error(ctx, "Recieved empty device event")
return errors.New("Device event nil")
@@ -134,11 +139,12 @@
return err
}
event.EventType = &de
- if err := ep.sendEvent(ctx, &event); err != nil {
+
+ if err := ep.sendEvent(ctx, &event, key); err != nil {
logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
return err
}
- logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"key": key, "Id": event.Header.Id, "Category": event.Header.Category,
"SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
"ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
"DeviceEventName": deviceEvent.DeviceEventName})
@@ -161,7 +167,8 @@
return err
}
event.EventType = &de
- if err := ep.sendEvent(ctx, &event); err != nil {
+
+ if err := ep.sendEvent(ctx, &event, strconv.FormatInt(raisedTs, 10)); err != nil {
logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
return err
}
@@ -173,9 +180,9 @@
}
-func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
+func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event, key string) error {
logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
- if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
+ if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic, key); err != nil {
return err
}
logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
@@ -193,7 +200,13 @@
// Start the event proxy
func (ep *EventProxy) Start() error {
+ if !ep.eventTopicExits(context.Background()) {
+ logger.Errorw(context.Background(), "event-topic-doesn't-exist-in-kafka", log.Fields{"element": ep.eventTopic.Name})
+ return fmt.Errorf("event topic doesn't exist in kafka")
+ }
+
eq := ep.eventQueue
+
go eq.start(ep.queueCtx)
logger.Debugw(context.Background(), "event-proxy-starting...", log.Fields{"events-threashold": EVENT_THRESHOLD})
for {
@@ -218,8 +231,8 @@
logger.Warnw(ctx, "invalid-event", log.Fields{"element": elem})
continue
}
- if err := ep.sendEvent(ctx, event); err != nil {
- logger.Errorw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
+ if err := ep.sendEvent(ctx, event, ""); err != nil {
+ logger.Warnw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
} else {
logger.Debugw(ctx, "successfully-sent-rpc-event-to-kafka-bus", log.Fields{"id": event.Header.Id, "category": event.Header.Category,
"sub-category": event.Header.SubCategory, "type": event.Header.Type, "type-version": event.Header.TypeVersion,
@@ -345,3 +358,21 @@
eq.mutex.Unlock()
}
+
+func (ep *EventProxy) eventTopicExits(ctx context.Context) bool {
+
+ // check if voltha.events topic exists
+ topics, err := ep.kafkaClient.ListTopics(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "fail-to-get-topics", log.Fields{"topic": ep.eventTopic.Name, "error": err})
+ return false
+ }
+
+ logger.Debugw(ctx, "topics in kafka", log.Fields{"topics": topics, "event-topic": ep.eventTopic.Name})
+ for _, topic := range topics {
+ if topic == ep.eventTopic.Name {
+ return true
+ }
+ }
+ return false
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go
index 1e50a63..46375fa 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/flows/flow_utils.go
@@ -629,6 +629,22 @@
return innerTag
}
+func GetInnerTagFromWriteMetaData(ctx context.Context, metadata uint64) uint16 {
+ /*
+ Write metadata instruction value (metadata) is 8 bytes:
+ MS 2 bytes: C Tag
+ Next 2 bytes: Technology Profile Id
+ Next 4 bytes: Port number (uni or nni)
+ This is set in the ONOS OltPipeline as a write metadata instruction
+ */
+ var innerTag uint16 = 0
+ if metadata != 0 {
+ innerTag = uint16((metadata >> 48) & 0xFFFF)
+ logger.Debugw(ctx, "Found CVLAN from write metadate action", log.Fields{"c_vlan": innerTag})
+ }
+ return innerTag
+}
+
//GetInnerTagFromMetaData retrieves the inner tag from the Metadata_ofp. The port number (UNI on ONU) is in the
// lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
//// a Metadata_ofp field
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go
index fdc05bc..afc2955 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/client.go
@@ -77,4 +77,5 @@
SendLiveness(ctx context.Context) error
EnableLivenessChannel(ctx context.Context, enable bool) chan bool
EnableHealthinessChannel(ctx context.Context, enable bool) chan bool
+ ListTopics(ctx context.Context) ([]string, error)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
index 185f6ec..680aa67 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
@@ -327,13 +327,16 @@
topicDetails[topic.Name] = topicDetail
if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
- if err == sarama.ErrTopicAlreadyExists {
- // Not an error
- logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
- return nil
+ switch typedErr := err.(type) {
+ case *sarama.TopicError:
+ if typedErr.Err == sarama.ErrTopicAlreadyExists {
+ err = nil
+ }
}
- logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
- return err
+ if err != nil {
+ logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
+ return err
+ }
}
// TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
// do so.
@@ -832,7 +835,7 @@
// This Creates the publisher
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
- config.Producer.Partitioner = sarama.NewRandomPartitioner
+ config.Producer.Partitioner = sarama.NewHashPartitioner
config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
config.Producer.Flush.Messages = sc.producerFlushMessages
config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
@@ -1152,3 +1155,21 @@
}
return nil
}
+
+func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error) {
+
+ config := sarama.NewConfig()
+ client, err := sarama.NewClient([]string{sc.KafkaAddress}, config)
+ if err != nil {
+ logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+ return nil, err
+ }
+
+ topics, err := client.Topics()
+ if err != nil {
+ logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+ return nil, err
+ }
+
+ return topics, nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/kafka_client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/kafka_client.go
index ea410ac..a9b434f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/kafka_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka/kafka_client.go
@@ -174,3 +174,8 @@
logger.Debug(ctx, "EnableHealthinessChannel - unimplemented")
return nil
}
+
+func (kc *KafkaClient) ListTopics(ctx context.Context) ([]string, error) {
+ topics := []string{"voltha.events", "myTopic"}
+ return topics, nil
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index bfa29fa..6065511 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -221,7 +221,7 @@
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v7 v7.1.5
+# github.com/opencord/voltha-lib-go/v7 v7.1.8
## explicit
github.com/opencord/voltha-lib-go/v7/pkg/adapters/common
github.com/opencord/voltha-lib-go/v7/pkg/config