VOL-4079 Publishing device state changes on bus

Change-Id: I7c356026a8ff6f15251fe231bbf5dd637db6da8b
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index d023c5a..00e08d2 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -26,6 +26,7 @@
 
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/rw_core/utils"
+	ev "github.com/opencord/voltha-lib-go/v4/pkg/events"
 	"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	"github.com/opencord/voltha-protos/v4/go/common"
@@ -40,28 +41,28 @@
 	packetInQueueDone    chan bool
 	changeEventQueue     chan openflow_13.ChangeEvent
 	changeEventQueueDone chan bool
-	RPCEventManager      *RPCEventManager
+	Agent                *Agent
 }
 
-type RPCEventManager struct {
+type Agent struct {
 	eventProxy     eventif.EventProxy
 	coreInstanceID string
 	stackID        string
 }
 
-func NewManager(proxyForRPCEvents eventif.EventProxy, instanceID string, stackID string) *Manager {
+func NewManager(proxyForEvents eventif.EventProxy, instanceID string, stackID string) *Manager {
 	return &Manager{
 		packetInQueue:        make(chan openflow_13.PacketIn, 100),
 		packetInQueueDone:    make(chan bool, 1),
 		changeEventQueue:     make(chan openflow_13.ChangeEvent, 100),
 		changeEventQueueDone: make(chan bool, 1),
-		RPCEventManager:      NewRPCEventManager(proxyForRPCEvents, instanceID, stackID),
+		Agent:                NewAgent(proxyForEvents, instanceID, stackID),
 	}
 }
 
-func NewRPCEventManager(proxyForRPCEvents eventif.EventProxy, instanceID string, stackID string) *RPCEventManager {
-	return &RPCEventManager{
-		eventProxy:     proxyForRPCEvents,
+func NewAgent(proxyForEvents eventif.EventProxy, instanceID string, stackID string) *Agent {
+	return &Agent{
+		eventProxy:     proxyForEvents,
 		coreInstanceID: instanceID,
 		stackID:        stackID,
 	}
@@ -132,7 +133,7 @@
 			})
 			if err := packetsIn.Send(&packet); err != nil {
 				logger.Errorw(ctx, "failed-to-send-packet", log.Fields{"error": err})
-				q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
+				go q.Agent.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
 					nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION,
 					nil, time.Now().Unix())
 				// save the last failed packet in
@@ -222,7 +223,7 @@
 			logger.Debugw(ctx, "sending-change-event", log.Fields{"event": event})
 			if err := changeEvents.Send(&event); err != nil {
 				logger.Errorw(ctx, "failed-to-send-change-event", log.Fields{"error": err})
-				q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
+				go q.Agent.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
 					nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil,
 					time.Now().Unix())
 				// save last failed change event
@@ -246,7 +247,7 @@
 	return q.changeEventQueue
 }
 
-func (q *RPCEventManager) NewRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string) *voltha.RPCEvent {
+func (q *Agent) NewRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string) *voltha.RPCEvent {
 	logger.Debugw(ctx, "new-rpc-event", log.Fields{"resource-id": resourceID})
 	var opID string
 	var rpc string
@@ -272,7 +273,8 @@
 	return rpcev
 }
 
-func (q *RPCEventManager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+func (q *Agent) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+	//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
 	if rpcEvent.Rpc != "" {
 		if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
 			logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
@@ -280,7 +282,7 @@
 	}
 }
 
-func (q *RPCEventManager) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+func (q *Agent) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
 	id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
 	rpcEvent := q.NewRPCEvent(ctx, resourceID, desc, context)
 	if rpcEvent.Rpc != "" {
@@ -289,3 +291,24 @@
 		}
 	}
 }
+
+// SendDeviceStateChangeEvent sends Device State Change Event to message bus
+func (q *Agent) SendDeviceStateChangeEvent(ctx context.Context,
+	prevOperStatus voltha.OperStatus_Types, prevConnStatus voltha.ConnectStatus_Types, prevAdminStatus voltha.AdminState_Types,
+	device *voltha.Device, raisedTs int64) error {
+	de := ev.CreateDeviceStateChangeEvent(device.SerialNumber, device.Id, device.ParentId,
+		prevOperStatus, prevConnStatus, prevAdminStatus,
+		device.OperStatus, device.ConnectStatus, device.AdminState,
+		device.ParentPortNo, device.Root)
+
+	subCategory := voltha.EventSubCategory_ONU
+	if device.Root {
+		subCategory = voltha.EventSubCategory_OLT
+	}
+	if err := q.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, subCategory, raisedTs); err != nil {
+		logger.Errorw(ctx, "error-sending-device-event", log.Fields{"id": device.Id, "err": err})
+		return err
+	}
+	logger.Debugw(ctx, "device-state-change-sent", log.Fields{"event": *de})
+	return nil
+}