VOL-3501 Refeactoring of code for event handling
Change-Id: I7d225a7b3b664efdaef5a6c9c84a118bac594be7
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/common.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/common.go
new file mode 100644
index 0000000..489a493
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package events
+
+import (
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif/events_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
similarity index 83%
rename from vendor/github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif/events_proxy_if.go
rename to vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
index 7d8a053..7418ea1 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif/events_proxy_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-present Open Networking Foundation
+ * Copyright 2020-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package adapterif
+package eventif
import (
"context"
@@ -27,6 +27,8 @@
subCategory EventSubCategory, raisedTs int64) error
SendKpiEvent(ctx context.Context, id string, deviceEvent *voltha.KpiEvent2, category EventCategory,
subCategory EventSubCategory, raisedTs int64) error
+ SendRPCEvent(ctx context.Context, id string, deviceEvent *voltha.RPCEvent, category EventCategory,
+ subCategory *EventSubCategory, raisedTs int64) error
}
const (
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/adapters/common/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
similarity index 69%
rename from vendor/github.com/opencord/voltha-lib-go/v4/pkg/adapters/common/events_proxy.go
rename to vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
index b16c1ae..a4b12f7 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/adapters/common/events_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
@@ -1,5 +1,5 @@
/*
- * Copyright 2018-present Open Networking Foundation
+ * Copyright 2020-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package common
+package events
import (
"context"
@@ -25,7 +25,7 @@
"time"
"github.com/golang/protobuf/ptypes"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
@@ -63,9 +63,9 @@
}
func (ep *EventProxy) getEventHeader(eventName string,
- category adapterif.EventCategory,
- subCategory adapterif.EventSubCategory,
- eventType adapterif.EventType,
+ category eventif.EventCategory,
+ subCategory *eventif.EventSubCategory,
+ eventType eventif.EventType,
raisedTs int64) (*voltha.EventHeader, error) {
var header voltha.EventHeader
if strings.Contains(eventName, "_") {
@@ -76,9 +76,11 @@
/* Populating event header */
header.Id = ep.formatId(eventName)
header.Category = category
- header.SubCategory = subCategory
+ if subCategory != nil {
+ header.SubCategory = *subCategory
+ }
header.Type = eventType
- header.TypeVersion = adapterif.EventTypeVersion
+ header.TypeVersion = eventif.EventTypeVersion
// raisedTs is in nanoseconds
timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
@@ -96,8 +98,33 @@
return &header, nil
}
+/* Send out rpc events*/
+func (ep *EventProxy) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category eventif.EventCategory, subCategory *eventif.EventSubCategory, raisedTs int64) error {
+ if rpcEvent == nil {
+ logger.Error(ctx, "Received empty rpc event")
+ return errors.New("rpc event nil")
+ }
+ var event voltha.Event
+ var err error
+ if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_RPC_EVENT, raisedTs); err != nil {
+ return err
+ }
+ event.EventType = &voltha.Event_RpcEvent{RpcEvent: rpcEvent}
+ if err := ep.sendEvent(ctx, &event); err != nil {
+ logger.Errorw(ctx, "Failed to send rpc event to KAFKA bus", log.Fields{"rpc-event": rpcEvent})
+ return err
+ }
+ logger.Debugw(ctx, "Successfully sent RPC event to KAFKA bus", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
+ "ReportedTs": event.Header.ReportedTs, "ResourceId": rpcEvent.ResourceId, "Context": rpcEvent.Context,
+ "RPCEventName": id})
+
+ return nil
+
+}
+
/* Send out device events*/
-func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
+func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
if deviceEvent == nil {
logger.Error(ctx, "Recieved empty device event")
return errors.New("Device event nil")
@@ -106,7 +133,7 @@
var de voltha.Event_DeviceEvent
var err error
de.DeviceEvent = deviceEvent
- if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
+ if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, &subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
return err
}
event.EventType = &de
@@ -124,7 +151,7 @@
}
// SendKpiEvent is to send kpi events to voltha.event topic
-func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
+func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
if kpiEvent == nil {
logger.Error(ctx, "Recieved empty kpi event")
return errors.New("KPI event nil")
@@ -133,7 +160,7 @@
var de voltha.Event_KpiEvent2
var err error
de.KpiEvent2 = kpiEvent
- if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
+ if event.Header, err = ep.getEventHeader(id, category, &subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
return err
}
event.EventType = &de
@@ -149,9 +176,8 @@
}
-/* TODO: Send out KPI events*/
-
func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
+ logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
return err
}