VOL-3734 optimise rpc events to be send to queue and then to kafka from the queue

Change-Id: I5e068722412b6d9526760900d9173aaf51e00946
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 4b864de..e066914 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -146,7 +146,7 @@
 		go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus)
 	}
 
-	defer kafkaClientEvent.Stop(ctx)
+	defer stopEventProxy(ctx, kafkaClientEvent, eventProxy)
 
 	// create kv path
 	dbPath := model.NewDBPath(backend)
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 7e401aa..89fcc51 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -300,8 +300,8 @@
 	var rpce *voltha.RPCEvent
 	defer func() {
 		if rpce != nil {
-			go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+			agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 		}
 	}()
 	select {
@@ -338,8 +338,8 @@
 	var rpce *voltha.RPCEvent
 	defer func() {
 		if rpce != nil {
-			go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+			agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 		}
 	}()
 
@@ -473,8 +473,8 @@
 	var rpce *voltha.RPCEvent
 	defer func() {
 		if rpce != nil {
-			go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+			agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 		}
 	}()
 	select {
@@ -960,8 +960,8 @@
 		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
 		// Sending RPC EVENT here
 		rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
-		go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
-			nil, time.Now().UnixNano())
+		agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+			nil, time.Now().Unix())
 
 	}
 	return nil
@@ -1005,8 +1005,8 @@
 		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
 		// Sending RPC EVENT here
 		rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
-		go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
-			nil, time.Now().UnixNano())
+		agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+			nil, time.Now().Unix())
 	}
 	return nil
 }
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
index 3e13772..c59080d 100644
--- a/rw_core/core/device/agent_pm_config.go
+++ b/rw_core/core/device/agent_pm_config.go
@@ -47,8 +47,8 @@
 	var rpce *voltha.RPCEvent
 	defer func() {
 		if rpce != nil {
-			go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+			agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 		}
 	}()
 	// We need to send the response for the PM Config Updates in a synchronous manner to the caller.
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 472ae83..d023c5a 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -21,6 +21,9 @@
 	"encoding/binary"
 	"encoding/hex"
 	"fmt"
+	"sync"
+	"time"
+
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
@@ -30,8 +33,6 @@
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 	"github.com/opentracing/opentracing-go"
 	jtracing "github.com/uber/jaeger-client-go"
-	"sync"
-	"time"
 )
 
 type Manager struct {
@@ -131,9 +132,9 @@
 			})
 			if err := packetsIn.Send(&packet); err != nil {
 				logger.Errorw(ctx, "failed-to-send-packet", log.Fields{"error": err})
-				go q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
+				q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
 					nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION,
-					nil, time.Now().UnixNano())
+					nil, time.Now().Unix())
 				// save the last failed packet in
 				streamingTracker.failedPacket = packet
 			} else {
@@ -221,9 +222,9 @@
 			logger.Debugw(ctx, "sending-change-event", log.Fields{"event": event})
 			if err := changeEvents.Send(&event); err != nil {
 				logger.Errorw(ctx, "failed-to-send-change-event", log.Fields{"error": err})
-				go q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
+				q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
 					nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil,
-					time.Now().UnixNano())
+					time.Now().Unix())
 				// save last failed change event
 				streamingTracker.failedPacket = event
 			} else {
@@ -272,17 +273,19 @@
 }
 
 func (q *RPCEventManager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
-	//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
 	if rpcEvent.Rpc != "" {
-		_ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+		if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
+			logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
+		}
 	}
 }
 
 func (q *RPCEventManager) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
 	id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
 	rpcEvent := q.NewRPCEvent(ctx, resourceID, desc, context)
-	//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
 	if rpcEvent.Rpc != "" {
-		_ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+		if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
+			logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
+		}
 	}
 }
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 6e26e42..46d4ffb 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -200,9 +200,9 @@
 				if deviceRules != nil {
 					context["device-rules"] = deviceRules.String()
 				}
-				go agent.ldeviceMgr.SendRPCEvent(ctx,
+				agent.ldeviceMgr.SendRPCEvent(ctx,
 					agent.logicalDeviceID, "failed-to-add-flow", context, "RPC_ERROR_RAISE_EVENT",
-					voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+					voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 			}
 		}()
 	}
@@ -360,9 +360,9 @@
 					context["device-rules"] = deviceRules.String()
 				}
 
-				go agent.ldeviceMgr.SendRPCEvent(ctx,
+				agent.ldeviceMgr.SendRPCEvent(ctx,
 					agent.logicalDeviceID, "failed-to-update-device-flows", context, "RPC_ERROR_RAISE_EVENT",
-					voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+					voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 				// TODO: Revert the flow deletion
 				// send event, and allow any queued events to be sent as well
 				agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
@@ -456,9 +456,9 @@
 				context["device-rules"] = deviceRules.String()
 			}
 			// Create context and send extra information as part of it.
-			go agent.ldeviceMgr.SendRPCEvent(ctx,
+			agent.ldeviceMgr.SendRPCEvent(ctx,
 				agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 		}
 	}()
 
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index 1b6babc..df324d5 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -100,9 +100,9 @@
 			if deviceRules != nil {
 				context["device-rules"] = deviceRules.String()
 			}
-			go agent.ldeviceMgr.SendRPCEvent(ctx,
+			agent.ldeviceMgr.SendRPCEvent(ctx,
 				agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 			//TODO: Revert flow changes
 		}
 	}()
@@ -185,9 +185,9 @@
 			if deviceRules != nil {
 				context["device-rules"] = deviceRules.String()
 			}
-			go agent.ldeviceMgr.SendRPCEvent(ctx,
+			agent.ldeviceMgr.SendRPCEvent(ctx,
 				agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 			//TODO: Revert flow changes
 		}
 	}()
@@ -238,9 +238,9 @@
 			if deviceRules != nil {
 				context["device-rules"] = deviceRules.String()
 			}
-			go agent.ldeviceMgr.SendRPCEvent(ctx,
+			agent.ldeviceMgr.SendRPCEvent(ctx,
 				agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
-				voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+				voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 			//TODO: Revert flow changes
 		}
 	}()
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index e8e28f7..39342da 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -568,8 +568,8 @@
 		if err != nil {
 			logger.Errorw(ctx, "failed-to-receive-packet-out", log.Fields{"error": err})
 			// we do not have the resource Id here due to error in the packet, setting to empty
-			go ldMgr.SendRPCEvent(pktCtx, "", err.Error(), nil,
-				"RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+			ldMgr.SendRPCEvent(pktCtx, "", err.Error(), nil,
+				"RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
 			continue
 		}
 
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index ae33090..5793eb5 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -87,6 +87,7 @@
 				return nil, ctx.Err()
 			}
 		}
+		go ep.Start()
 		if updateProbeService {
 			probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusRunning)
 		}
@@ -96,6 +97,11 @@
 	return ep, nil
 }
 
+func stopEventProxy(ctx context.Context, kafkaClient kafka.Client, ep *events.EventProxy) {
+	defer kafkaClient.Stop(ctx)
+	ep.Stop()
+}
+
 // Interface that is valid for both EventProxy and InterContainerProxy
 type KafkaProxy interface {
 	EnableLivenessChannel(ctx context.Context, enable bool) chan bool