VOL-4079 Publishing device state changes on bus
Change-Id: I7c356026a8ff6f15251fe231bbf5dd637db6da8b
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 89fcc51..003f77a 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -21,14 +21,15 @@
"encoding/hex"
"errors"
"fmt"
+ "reflect"
+ "sync"
+ "time"
+
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "reflect"
- "sync"
- "time"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -162,6 +163,7 @@
desc = fmt.Sprintf("failed-adding-device-%s: %s", agent.deviceID, err.Error())
return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
}
+ _ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, device.OperStatus, device.ConnectStatus, prevState, device, time.Now().Unix())
operStatus.Code = common.OperationResp_OPERATION_SUCCESS
agent.device = device
}
@@ -950,7 +952,10 @@
prevDevice := agent.device
// update the device
agent.device = device
-
+ //If any of the states has chenged, send the change event.
+ if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
+ _ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
+ }
// release lock before processing transition
agent.requestQueue.RequestComplete()
subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
@@ -996,6 +1001,10 @@
prevDevice := agent.device
// update the device
agent.device = device
+ //If any of the states has chenged, send the change event.
+ if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
+ _ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
+ }
// release lock before processing transition
agent.requestQueue.RequestComplete()
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index d023c5a..00e08d2 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -26,6 +26,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/rw_core/utils"
+ ev "github.com/opencord/voltha-lib-go/v4/pkg/events"
"github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-protos/v4/go/common"
@@ -40,28 +41,28 @@
packetInQueueDone chan bool
changeEventQueue chan openflow_13.ChangeEvent
changeEventQueueDone chan bool
- RPCEventManager *RPCEventManager
+ Agent *Agent
}
-type RPCEventManager struct {
+type Agent struct {
eventProxy eventif.EventProxy
coreInstanceID string
stackID string
}
-func NewManager(proxyForRPCEvents eventif.EventProxy, instanceID string, stackID string) *Manager {
+func NewManager(proxyForEvents eventif.EventProxy, instanceID string, stackID string) *Manager {
return &Manager{
packetInQueue: make(chan openflow_13.PacketIn, 100),
packetInQueueDone: make(chan bool, 1),
changeEventQueue: make(chan openflow_13.ChangeEvent, 100),
changeEventQueueDone: make(chan bool, 1),
- RPCEventManager: NewRPCEventManager(proxyForRPCEvents, instanceID, stackID),
+ Agent: NewAgent(proxyForEvents, instanceID, stackID),
}
}
-func NewRPCEventManager(proxyForRPCEvents eventif.EventProxy, instanceID string, stackID string) *RPCEventManager {
- return &RPCEventManager{
- eventProxy: proxyForRPCEvents,
+func NewAgent(proxyForEvents eventif.EventProxy, instanceID string, stackID string) *Agent {
+ return &Agent{
+ eventProxy: proxyForEvents,
coreInstanceID: instanceID,
stackID: stackID,
}
@@ -132,7 +133,7 @@
})
if err := packetsIn.Send(&packet); err != nil {
logger.Errorw(ctx, "failed-to-send-packet", log.Fields{"error": err})
- q.RPCEventManager.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
+ go q.Agent.GetAndSendRPCEvent(ctx, packet.Id, err.Error(),
nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION,
nil, time.Now().Unix())
// save the last failed packet in
@@ -222,7 +223,7 @@
logger.Debugw(ctx, "sending-change-event", log.Fields{"event": event})
if err := changeEvents.Send(&event); err != nil {
logger.Errorw(ctx, "failed-to-send-change-event", log.Fields{"error": err})
- q.RPCEventManager.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
+ go q.Agent.GetAndSendRPCEvent(ctx, event.Id, err.Error(),
nil, "RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil,
time.Now().Unix())
// save last failed change event
@@ -246,7 +247,7 @@
return q.changeEventQueue
}
-func (q *RPCEventManager) NewRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string) *voltha.RPCEvent {
+func (q *Agent) NewRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string) *voltha.RPCEvent {
logger.Debugw(ctx, "new-rpc-event", log.Fields{"resource-id": resourceID})
var opID string
var rpc string
@@ -272,7 +273,8 @@
return rpcev
}
-func (q *RPCEventManager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+func (q *Agent) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
+ //TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
if rpcEvent.Rpc != "" {
if err := q.eventProxy.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs); err != nil {
logger.Errorw(ctx, "failed-to-send-rpc-event", log.Fields{"resource-id": id})
@@ -280,7 +282,7 @@
}
}
-func (q *RPCEventManager) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
+func (q *Agent) GetAndSendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
rpcEvent := q.NewRPCEvent(ctx, resourceID, desc, context)
if rpcEvent.Rpc != "" {
@@ -289,3 +291,24 @@
}
}
}
+
+// SendDeviceStateChangeEvent sends Device State Change Event to message bus
+func (q *Agent) SendDeviceStateChangeEvent(ctx context.Context,
+ prevOperStatus voltha.OperStatus_Types, prevConnStatus voltha.ConnectStatus_Types, prevAdminStatus voltha.AdminState_Types,
+ device *voltha.Device, raisedTs int64) error {
+ de := ev.CreateDeviceStateChangeEvent(device.SerialNumber, device.Id, device.ParentId,
+ prevOperStatus, prevConnStatus, prevAdminStatus,
+ device.OperStatus, device.ConnectStatus, device.AdminState,
+ device.ParentPortNo, device.Root)
+
+ subCategory := voltha.EventSubCategory_ONU
+ if device.Root {
+ subCategory = voltha.EventSubCategory_OLT
+ }
+ if err := q.eventProxy.SendDeviceEvent(ctx, de, voltha.EventCategory_EQUIPMENT, subCategory, raisedTs); err != nil {
+ logger.Errorw(ctx, "error-sending-device-event", log.Fields{"id": device.Id, "err": err})
+ return err
+ }
+ logger.Debugw(ctx, "device-state-change-sent", log.Fields{"event": *de})
+ return nil
+}
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 39342da..360d83b 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -586,6 +586,6 @@
func (ldMgr *LogicalManager) SendRPCEvent(ctx context.Context, resourceID, desc string, context map[string]string,
id string, category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
- ldMgr.Manager.RPCEventManager.GetAndSendRPCEvent(ctx, resourceID, desc, context, id,
+ ldMgr.Manager.Agent.GetAndSendRPCEvent(ctx, resourceID, desc, context, id,
category, subCategory, raisedTs)
}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 43bd2be..dd64ab9 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -47,7 +47,7 @@
rootDevices map[string]bool
lockRootDeviceMap sync.RWMutex
adapterProxy *remote.AdapterProxy
- *event.RPCEventManager
+ *event.Agent
adapterMgr *adapter.Manager
logicalDeviceMgr *LogicalManager
kafkaICProxy kafka.InterContainerProxy
@@ -71,7 +71,7 @@
dProxy: dbPath.Proxy("devices"),
adapterMgr: adapterMgr,
defaultTimeout: defaultCoreTimeout,
- RPCEventManager: event.NewRPCEventManager(eventProxy, coreInstanceID, stackID),
+ Agent: event.NewAgent(eventProxy, coreInstanceID, stackID),
deviceLoadingInProgress: make(map[string][]chan int),
}
deviceMgr.stateTransitions = state.NewTransitionMap(deviceMgr)
@@ -1633,7 +1633,7 @@
func (dMgr *Manager) SendRPCEvent(ctx context.Context, id string, rpcEvent *voltha.RPCEvent,
category voltha.EventCategory_Types, subCategory *voltha.EventSubCategory_Types, raisedTs int64) {
//TODO Instead of directly sending to the kafka bus, queue the message and send it asynchronously
- dMgr.RPCEventManager.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
+ dMgr.Agent.SendRPCEvent(ctx, id, rpcEvent, category, subCategory, raisedTs)
}
func (dMgr *Manager) GetTransientState(ctx context.Context, id string) (voltha.DeviceTransientState_Types, error) {