VOL-3734 optimise rpc events to be send to queue and then to kafka from the queue
Change-Id: I5e068722412b6d9526760900d9173aaf51e00946
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 472ae83..d023c5a 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -21,6 +21,9 @@
"encoding/binary"
"encoding/hex"
"fmt"
+ "sync"
+ "time"
+
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
@@ -30,8 +33,6 @@
"github.com/opencord/voltha-protos/v4/go/voltha"
"github.com/opentracing/opentracing-go"
jtracing "github.com/uber/jaeger-client-go"
- "sync"
- "time"
)
type Manager struct {
@@ -131,9 +132,9 @@
})
if err := packetsIn.Send(&packet); err != nil {
logger.Errorw(ctx, "failed-to-send-packet", log.Fields{"error": err})
- go q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
+ q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION,
- nil, time.Now().UnixNano())
+ nil, time.Now().Unix())
// save the last failed packet in
streamingTracker.failedPacket = packet
} else {
@@ -221,9 +222,9 @@
logger.Debugw(ctx, "sending-change-event", log.Fields{"event": event})
if err := changeEvents.Send(&event); err != nil {
logger.Errorw(ctx, "failed-to-send-change-event", log.Fields{"error": err})
- go q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
+ q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil,
- time.Now().UnixNano())
+ time.Now().Unix())
// save last failed change event
streamingTracker.failedPacket = event
} else {
@@ -272,17 +273,19 @@
}
func (q *RPCEventManager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
- //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
if rpcEvent.Rpc != "" {
- _ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+ if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
+ logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
+ }
}
}
func (q *RPCEventManager) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
rpcEvent := q.NewRPCEvent(ctx, resourceID, desc, context)
- //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
if rpcEvent.Rpc != "" {
- _ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+ if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
+ logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
+ }
}
}