VOL-3734 optimise rpc events to be send to queue and then to kafka from the queue
Change-Id: I5e068722412b6d9526760900d9173aaf51e00946
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 4b864de..e066914 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -146,7 +146,7 @@
go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus)
}
- defer kafkaClientEvent.Stop(ctx)
+ defer stopEventProxy(ctx, kafkaClientEvent, eventProxy)
// create kv path
dbPath := model.NewDBPath(backend)
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 7e401aa..89fcc51 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -300,8 +300,8 @@
var rpce *voltha.RPCEvent
defer func() {
if rpce != nil {
- go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
- voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}
}()
select {
@@ -338,8 +338,8 @@
var rpce *voltha.RPCEvent
defer func() {
if rpce != nil {
- go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
- voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}
}()
@@ -473,8 +473,8 @@
var rpce *voltha.RPCEvent
defer func() {
if rpce != nil {
- go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
- voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}
}()
select {
@@ -960,8 +960,8 @@
logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
// Sending RPC EVENT here
rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
- go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
- nil, time.Now().UnixNano())
+ agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().Unix())
}
return nil
@@ -1005,8 +1005,8 @@
logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
// Sending RPC EVENT here
rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
- go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
- nil, time.Now().UnixNano())
+ agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
+ nil, time.Now().Unix())
}
return nil
}
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
index 3e13772..c59080d 100644
--- a/rw_core/core/device/agent_pm_config.go
+++ b/rw_core/core/device/agent_pm_config.go
@@ -47,8 +47,8 @@
var rpce *voltha.RPCEvent
defer func() {
if rpce != nil {
- go agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
- voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}
}()
// We need to send the response for the PM Config Updates in a synchronous manner to the caller.
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 472ae83..d023c5a 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -21,6 +21,9 @@
"encoding/binary"
"encoding/hex"
"fmt"
+ "sync"
+ "time"
+
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/rw_core/utils"
"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
@@ -30,8 +33,6 @@
"github.com/opencord/voltha-protos/v4/go/voltha"
"github.com/opentracing/opentracing-go"
jtracing "github.com/uber/jaeger-client-go"
- "sync"
- "time"
)
type Manager struct {
@@ -131,9 +132,9 @@
})
if err := packetsIn.Send(&packet); err != nil {
logger.Errorw(ctx, "failed-to-send-packet", log.Fields{"error": err})
- go q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
+ q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION,
- nil, time.Now().UnixNano())
+ nil, time.Now().Unix())
// save the last failed packet in
streamingTracker.failedPacket = packet
} else {
@@ -221,9 +222,9 @@
logger.Debugw(ctx, "sending-change-event", log.Fields{"event": event})
if err := changeEvents.Send(&event); err != nil {
logger.Errorw(ctx, "failed-to-send-change-event", log.Fields{"error": err})
- go q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
+ q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil,
- time.Now().UnixNano())
+ time.Now().Unix())
// save last failed change event
streamingTracker.failedPacket = event
} else {
@@ -272,17 +273,19 @@
}
func (q *RPCEventManager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
- //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
if rpcEvent.Rpc != "" {
- _ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+ if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
+ logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
+ }
}
}
func (q *RPCEventManager) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
rpcEvent := q.NewRPCEvent(ctx, resourceID, desc, context)
- //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
if rpcEvent.Rpc != "" {
- _ = q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+ if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
+ logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
+ }
}
}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
index 6e26e42..46d4ffb 100644
--- a/rw_core/core/device/logical_agent_flow.go
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -200,9 +200,9 @@
if deviceRules != nil {
context["device-rules"] = deviceRules.String()
}
- go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-add-flow", context, "RPC_ERROR_RAISE_EVENT",
- voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}
}()
}
@@ -360,9 +360,9 @@
context["device-rules"] = deviceRules.String()
}
- go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-update-device-flows", context, "RPC_ERROR_RAISE_EVENT",
- voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
// TODO: Revert the flow deletion
// send event, and allow any queued events to be sent as well
agent.ldeviceMgr.SendFlowChangeEvent(ctx, agent.logicalDeviceID, res, flowUpdate.Xid, flowUpdate.FlowMod.Cookie)
@@ -456,9 +456,9 @@
context["device-rules"] = deviceRules.String()
}
// Create context and send extra information as part of it.
- go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-delete-device-flows", context, "RPC_ERROR_RAISE_EVENT",
- voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
}
}()
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
index 1b6babc..df324d5 100644
--- a/rw_core/core/device/logical_agent_group.go
+++ b/rw_core/core/device/logical_agent_group.go
@@ -100,9 +100,9 @@
if deviceRules != nil {
context["device-rules"] = deviceRules.String()
}
- go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
- voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
//TODO: Revert flow changes
}
}()
@@ -185,9 +185,9 @@
if deviceRules != nil {
context["device-rules"] = deviceRules.String()
}
- go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
- voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
//TODO: Revert flow changes
}
}()
@@ -238,9 +238,9 @@
if deviceRules != nil {
context["device-rules"] = deviceRules.String()
}
- go agent.ldeviceMgr.SendRPCEvent(ctx,
+ agent.ldeviceMgr.SendRPCEvent(ctx,
agent.logicalDeviceID, "failed-to-update-device-flows-groups", context, "RPC_ERROR_RAISE_EVENT",
- voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
//TODO: Revert flow changes
}
}()
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index e8e28f7..39342da 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -568,8 +568,8 @@
if err != nil {
logger.Errorw(ctx, "failed-to-receive-packet-out", log.Fields{"error": err})
// we do not have the resource Id here due to error in the packet, setting to empty
- go ldMgr.SendRPCEvent(pktCtx, "", err.Error(), nil,
- "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, time.Now().UnixNano())
+ ldMgr.SendRPCEvent(pktCtx, "", err.Error(), nil,
+ "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
continue
}
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index ae33090..5793eb5 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -87,6 +87,7 @@
return nil, ctx.Err()
}
}
+ go ep.Start()
if updateProbeService {
probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusRunning)
}
@@ -96,6 +97,11 @@
return ep, nil
}
+func stopEventProxy(ctx context.Context, kafkaClient kafka.Client, ep *events.EventProxy) {
+ defer kafkaClient.Stop(ctx)
+ ep.Stop()
+}
+
// Interface that is valid for both EventProxy and InterContainerProxy
type KafkaProxy interface {
EnableLivenessChannel(ctx context.Context, enable bool) chan bool