VOL-4079 Publishing device state changes on bus

Change-Id: I7c356026a8ff6f15251fe231bbf5dd637db6da8b
diff --git a/go.mod b/go.mod
index 55ed96b..4568788 100644
--- a/go.mod
+++ b/go.mod
@@ -8,7 +8,7 @@
 	github.com/golang/mock v1.5.0
 	github.com/golang/protobuf v1.3.2
 	github.com/google/uuid v1.1.1
-	github.com/opencord/voltha-lib-go/v4 v4.3.2
+	github.com/opencord/voltha-lib-go/v4 v4.3.3
 	github.com/opencord/voltha-protos/v4 v4.1.2
 	github.com/opentracing/opentracing-go v1.1.0
 	github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
diff --git a/go.sum b/go.sum
index e9958a7..018d27f 100644
--- a/go.sum
+++ b/go.sum
@@ -143,8 +143,8 @@
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
 github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v4 v4.3.2 h1:Ryh+anYbo5BM+LEFYADc2/JnDVYPI625t8AAf6BPo7k=
-github.com/opencord/voltha-lib-go/v4 v4.3.2/go.mod h1:x0a7TxyzxPFaiewkbFiuy0+ftX5w4zeCRlFyyGZ4hhw=
+github.com/opencord/voltha-lib-go/v4 v4.3.3 h1:21r8Oh825Oqg8ud/3qiVSfVD2ylRr2aDrdwDkJFI21k=
+github.com/opencord/voltha-lib-go/v4 v4.3.3/go.mod h1:x0a7TxyzxPFaiewkbFiuy0+ftX5w4zeCRlFyyGZ4hhw=
 github.com/opencord/voltha-protos/v4 v4.1.2 h1:iK7rhQXBtd6H2UWqdCPLQchcoGn8XV8XcVI3CBzGDfg=
 github.com/opencord/voltha-protos/v4 v4.1.2/go.mod h1:W/OIFIyvFh/C0vchRUuarIsMylEhzCRM9pNxLvkPtKc=
 github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 89fcc51..003f77a 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -21,14 +21,15 @@
 	"encoding/hex"
 	"errors"
 	"fmt"
+	"reflect"
+	"sync"
+	"time"
+
 	"github.com/gogo/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/empty"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-	"reflect"
-	"sync"
-	"time"
 
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -162,6 +163,7 @@
 			desc = fmt.Sprintf("failed-adding-device-%s: %s", agent.deviceID, err.Error())
 			return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
 		}
+		_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, device.OperStatus, device.ConnectStatus, prevState, device, time.Now().Unix())
 		operStatus.Code = common.OperationResp_OPERATION_SUCCESS
 		agent.device = device
 	}
@@ -950,7 +952,10 @@
 	prevDevice := agent.device
 	// update the device
 	agent.device = device
-
+	//If any of the states has chenged, send the change event.
+	if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
+		_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
+	}
 	// release lock before processing transition
 	agent.requestQueue.RequestComplete()
 	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
@@ -996,6 +1001,10 @@
 	prevDevice := agent.device
 	// update the device
 	agent.device = device
+	//If any of the states has chenged, send the change event.
+	if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
+		_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
+	}
 
 	// release lock before processing transition
 	agent.requestQueue.RequestComplete()
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index d023c5a..00e08d2 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -26,6 +26,7 @@
 
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/rw_core/utils"
+	ev "github.com/opencord/voltha-lib-go/v4/pkg/events"
 	"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	"github.com/opencord/voltha-protos/v4/go/common"
@@ -40,28 +41,28 @@
 	packetInQueueDone    chan bool
 	changeEventQueue     chan openflow_13.ChangeEvent
 	changeEventQueueDone chan bool
-	RPCEventManager      *RPCEventManager
+	Agent                *Agent
 }
 
-type RPCEventManager struct {
+type Agent struct {
 	eventProxy     eventif.EventProxy
 	coreInstanceID string
 	stackID        string
 }
 
-func NewManager(proxyForRPCEvents eventif.EventProxy, instanceID string, stackID string) *Manager {
+func NewManager(proxyForEvents eventif.EventProxy, instanceID string, stackID string) *Manager {
 	return &Manager{
 		packetInQueue:        make(chan openflow_13.PacketIn, 100),
 		packetInQueueDone:    make(chan bool, 1),
 		changeEventQueue:     make(chan openflow_13.ChangeEvent, 100),
 		changeEventQueueDone: make(chan bool, 1),
-		RPCEventManager:      NewRPCEventManager(proxyForRPCEvents, instanceID, stackID),
+		Agent:                NewAgent(proxyForEvents, instanceID, stackID),
 	}
 }
 
-func NewRPCEventManager(proxyForRPCEvents eventif.EventProxy, instanceID string, stackID string) *RPCEventManager {
-	return &RPCEventManager{
-		eventProxy:     proxyForRPCEvents,
+func NewAgent(proxyForEvents eventif.EventProxy, instanceID string, stackID string) *Agent {
+	return &Agent{
+		eventProxy:     proxyForEvents,
 		coreInstanceID: instanceID,
 		stackID:        stackID,
 	}
@@ -132,7 +133,7 @@
 			})
 			if err := packetsIn.Send(&packet); err != nil {
 				logger.Errorw(ctx, "failed-to-send-packet", log.Fields{"error": err})
-				q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
+				go q.Agent.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
 					nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION,
 					nil, time.Now().Unix())
 				// save the last failed packet in
@@ -222,7 +223,7 @@
 			logger.Debugw(ctx, "sending-change-event", log.Fields{"event": event})
 			if err := changeEvents.Send(&event); err != nil {
 				logger.Errorw(ctx, "failed-to-send-change-event", log.Fields{"error": err})
-				q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
+				go q.Agent.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
 					nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil,
 					time.Now().Unix())
 				// save last failed change event
@@ -246,7 +247,7 @@
 	return q.changeEventQueue
 }
 
-func (q *RPCEventManager) NewRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string) *voltha.RPCEvent {
+func (q *Agent) NewRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string) *voltha.RPCEvent {
 	logger.Debugw(ctx, "new-rpc-event", log.Fields{"resource-id": resourceID})
 	var opID string
 	var rpc string
@@ -272,7 +273,8 @@
 	return rpcev
 }
 
-func (q *RPCEventManager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+func (q *Agent) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+	//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
 	if rpcEvent.Rpc != "" {
 		if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
 			logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
@@ -280,7 +282,7 @@
 	}
 }
 
-func (q *RPCEventManager) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+func (q *Agent) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
 	id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
 	rpcEvent := q.NewRPCEvent(ctx, resourceID, desc, context)
 	if rpcEvent.Rpc != "" {
@@ -289,3 +291,24 @@
 		}
 	}
 }
+
+// SendDeviceStateChangeEvent sends Device State Change Event to message bus
+func (q *Agent) SendDeviceStateChangeEvent(ctx context.Context,
+	prevOperStatus voltha.OperStatus_Types, prevConnStatus voltha.ConnectStatus_Types, prevAdminStatus voltha.AdminState_Types,
+	device *voltha.Device, raisedTs int64) error {
+	de := ev.CreateDeviceStateChangeEvent(device.SerialNumber, device.Id, device.ParentId,
+		prevOperStatus, prevConnStatus, prevAdminStatus,
+		device.OperStatus, device.ConnectStatus, device.AdminState,
+		device.ParentPortNo, device.Root)
+
+	subCategory := voltha.EventSubCategory_ONU
+	if device.Root {
+		subCategory = voltha.EventSubCategory_OLT
+	}
+	if err := q.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, subCategory, raisedTs); err != nil {
+		logger.Errorw(ctx, "error-sending-device-event", log.Fields{"id": device.Id, "err": err})
+		return err
+	}
+	logger.Debugw(ctx, "device-state-change-sent", log.Fields{"event": *de})
+	return nil
+}
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 39342da..360d83b 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -586,6 +586,6 @@
 
 func (ldMgr *LogicalManager) SendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
 	id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
-	ldMgr.Manager.RPCEventManager.GetAndSendRPCEvent(ctx, resourceID, desc, context, id,
+	ldMgr.Manager.Agent.GetAndSendRPCEvent(ctx, resourceID, desc, context, id,
 		category, subCategory, raisedTs)
 }
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 43bd2be..dd64ab9 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -47,7 +47,7 @@
 	rootDevices       map[string]bool
 	lockRootDeviceMap sync.RWMutex
 	adapterProxy      *remote.AdapterProxy
-	*event.RPCEventManager
+	*event.Agent
 	adapterMgr              *adapter.Manager
 	logicalDeviceMgr        *LogicalManager
 	kafkaICProxy            kafka.InterContainerProxy
@@ -71,7 +71,7 @@
 		dProxy:                  dbPath.Proxy("devices"),
 		adapterMgr:              adapterMgr,
 		defaultTimeout:          defaultCoreTimeout,
-		RPCEventManager:         event.NewRPCEventManager(eventProxy, coreInstanceID, stackID),
+		Agent:                   event.NewAgent(eventProxy, coreInstanceID, stackID),
 		deviceLoadingInProgress: make(map[string][]chan int),
 	}
 	deviceMgr.stateTransitions = state.NewTransitionMap(deviceMgr)
@@ -1633,7 +1633,7 @@
 func (dMgr *Manager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent,
 	category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
 	//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
-	dMgr.RPCEventManager.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+	dMgr.Agent.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
 }
 
 func (dMgr *Manager) GetTransientState(ctx context.Context, id string) (voltha.DeviceTransientState_Types, error) {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/utils.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/utils.go
new file mode 100644
index 0000000..fe3a017
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/utils.go
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package events
+
+import (
+	"fmt"
+	"strconv"
+
+	"github.com/opencord/voltha-protos/v4/go/voltha"
+)
+
+type ContextType string
+
+const (
+	// ContextAdminState is for the admin state of the Device in the context of the event
+	ContextAdminState ContextType = "admin-state"
+	// ContextConnectState is for the connect state of the Device in the context of the event
+	ContextConnectState ContextType = "connect-state"
+	// ContextOperState is for the operational state of the Device in the context of the event
+	ContextOperState ContextType = "oper-state"
+	// ContextPrevdminState is for the previous admin state of the Device in the context of the event
+	ContextPrevAdminState ContextType = "prev-admin-state"
+	// ContextPrevConnectState is for the previous connect state of the Device in the context of the event
+	ContextPrevConnectState ContextType = "prev-connect-state"
+	// ContextPrevOperState is for the previous operational state of the Device in the context of the event
+	ContextPrevOperState ContextType = "prev-oper-state"
+	// ContextDeviceID is for the previous operational state of the Device in the context of the event
+	ContextDeviceID ContextType = "id"
+	// ContextParentID is for the parent id in the context of the event
+	ContextParentID ContextType = "parent-id"
+	// ContextSerialNumber is for the serial number of the Device in the context of the event
+	ContextSerialNumber ContextType = "serial-number"
+	// ContextIsRoot is for the root flag of Device in the context of the event
+	ContextIsRoot ContextType = "is-root"
+	// ContextParentPort is for the parent interface id of child in the context of the event
+	ContextParentPort ContextType = "parent-port"
+)
+
+type EventName string
+
+const (
+	DeviceStateChangeEvent EventName = "DEVICE_STATE_CHANGE"
+)
+
+type EventAction string
+
+const (
+	Raise EventAction = "RAISE_EVENT"
+	Clear EventAction = "CLEAR_EVENT"
+)
+
+//CreateDeviceStateChangeEvent forms and returns a new DeviceStateChange Event
+func CreateDeviceStateChangeEvent(serialNumber string, deviceID string, parentID string,
+	prevOperStatus voltha.OperStatus_Types, prevConnStatus voltha.ConnectStatus_Types, prevAdminStatus voltha.AdminState_Types,
+	operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types, adminStatus voltha.AdminState_Types,
+	parentPort uint32, isRoot bool) *voltha.DeviceEvent {
+
+	context := make(map[string]string)
+	/* Populating event context */
+	context[string(ContextSerialNumber)] = serialNumber
+	context[string(ContextDeviceID)] = deviceID
+	context[string(ContextParentID)] = parentID
+	context[string(ContextPrevOperState)] = prevOperStatus.String()
+	context[string(ContextPrevConnectState)] = prevConnStatus.String()
+	context[string(ContextPrevAdminState)] = prevAdminStatus.String()
+	context[string(ContextOperState)] = operStatus.String()
+	context[string(ContextConnectState)] = connStatus.String()
+	context[string(ContextAdminState)] = adminStatus.String()
+	context[string(ContextIsRoot)] = strconv.FormatBool(isRoot)
+	context[string(ContextParentPort)] = strconv.FormatUint(uint64(parentPort), 10)
+
+	return &voltha.DeviceEvent{
+		Context:         context,
+		ResourceId:      deviceID,
+		DeviceEventName: fmt.Sprintf("%s_%s", string(DeviceStateChangeEvent), string(Raise)),
+	}
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 60c073c..fb81c26 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -93,7 +93,7 @@
 github.com/modern-go/concurrent
 # github.com/modern-go/reflect2 v1.0.1
 github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v4 v4.3.2
+# github.com/opencord/voltha-lib-go/v4 v4.3.3
 ## explicit
 github.com/opencord/voltha-lib-go/v4/pkg/adapters
 github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif