VOL-3734 optimise rpc events to be send to queue and then to kafka from the queue

Change-Id: I5e068722412b6d9526760900d9173aaf51e00946
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 472ae83..d023c5a 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -21,6 +21,9 @@
 	"encoding/binary"
 	"encoding/hex"
 	"fmt"
+	"sync"
+	"time"
+
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
@@ -30,8 +33,6 @@
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"github.com/opentracing/opentracing-go"
 	jtracing "github.com/uber/jaeger-client-go"
-	"sync"
-	"time"
 )
 
 type Manager struct {
@@ -131,9 +132,9 @@
 			})
 			if err := packetsIn.Send(&packet); err != nil {
 				logger.Errorw(ctx, "failed-to-send-packet", log.Fields{"error": err})
-				go q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
+				q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
 					nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION,
-					nil, time.Now().UnixNano())
+					nil, time.Now().Unix())
 				// save the last failed packet in
 				streamingTracker.failedPacket = packet
 			} else {
@@ -221,9 +222,9 @@
 			logger.Debugw(ctx, "sending-change-event", log.Fields{"event": event})
 			if err := changeEvents.Send(&event); err != nil {
 				logger.Errorw(ctx, "failed-to-send-change-event", log.Fields{"error": err})
-				go q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
+				q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
 					nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil,
-					time.Now().UnixNano())
+					time.Now().Unix())
 				// save last failed change event
 				streamingTracker.failedPacket = event
 			} else {
@@ -272,17 +273,19 @@
 }
 
 func (q *RPCEventManager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
-	//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
 	if rpcEvent.Rpc != "" {
-		_ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+		if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
+			logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
+		}
 	}
 }
 
 func (q *RPCEventManager) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
 	id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
 	rpcEvent := q.NewRPCEvent(ctx, resourceID, desc, context)
-	//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
 	if rpcEvent.Rpc != "" {
-		_ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+		if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
+			logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
+		}
 	}
 }