VOL-3657 support stack id in rpc events
Change-Id: Ie94fc7aa6f3a4c58803156f75d29ecad7f11e601
diff --git a/VERSION b/VERSION
index 9781df7..dbe5900 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.8.0-dev
+2.8.1
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 2788e56..2b5017e 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -53,6 +53,7 @@
defaultTraceEnabled = false
defaultTraceAgentAddress = "127.0.0.1:6831"
defaultLogCorrelationEnabled = true
+ defaultVolthaStackID = "voltha"
)
// RWCoreFlags represents the set of configurations used by the read-write core service
@@ -86,6 +87,7 @@
TraceEnabled bool
TraceAgentAddress string
LogCorrelationEnabled bool
+ VolthaStackID string
}
// NewRWCoreFlags returns a new RWCore config
@@ -119,6 +121,7 @@
TraceEnabled: defaultTraceEnabled,
TraceAgentAddress: defaultTraceAgentAddress,
LogCorrelationEnabled: defaultLogCorrelationEnabled,
+ VolthaStackID: defaultVolthaStackID,
}
return &rwCoreFlag
}
@@ -203,5 +206,8 @@
help = fmt.Sprintf("Whether to enrich log statements with fields denoting operation being executed for achieving correlation?")
flag.BoolVar(&(cf.LogCorrelationEnabled), "log_correlation_enabled", defaultLogCorrelationEnabled, help)
+ help = fmt.Sprintf("ID for the current voltha stack")
+ flag.StringVar(&cf.VolthaStackID, "stack_id", defaultVolthaStackID, help)
+
flag.Parse()
}
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index f42117e..631a56c 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -123,7 +123,7 @@
proxy := model.NewDBPath(backend)
nb.adapterMgr = adapter.NewAdapterManager(ctx, proxy, nb.coreInstanceID, nb.kClient)
eventProxy := events.NewEventProxy(events.MsgClient(nb.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
- nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CoreTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
+ nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CoreTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy, cfg.VolthaStackID)
nb.adapterMgr.Start(ctx)
if err := nb.kmp.Start(ctx); err != nil {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0b3dd94..459c2fa 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -165,7 +165,7 @@
// create the core of the system, the device managers
endpointMgr := kafka.NewEndpointManager(backend)
- deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout, eventProxy)
+ deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout, eventProxy, cf.VolthaStackID)
// register kafka RPC handler
registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic)
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 6947447..501cc61 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -143,7 +143,7 @@
proxy := model.NewDBPath(backend)
dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
- dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy, cfg.VolthaStackID)
dat.adapterMgr.Start(context.Background())
if err = dat.kmp.Start(ctx); err != nil {
logger.Fatal(ctx, "Cannot start InterContainerProxy")
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 4489196..472ae83 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -45,22 +45,24 @@
type RPCEventManager struct {
eventProxy eventif.EventProxy
coreInstanceID string
+ stackID string
}
-func NewManager(proxyForRPCEvents eventif.EventProxy, instanceID string) *Manager {
+func NewManager(proxyForRPCEvents eventif.EventProxy, instanceID string, stackID string) *Manager {
return &Manager{
packetInQueue: make(chan openflow_13.PacketIn, 100),
packetInQueueDone: make(chan bool, 1),
changeEventQueue: make(chan openflow_13.ChangeEvent, 100),
changeEventQueueDone: make(chan bool, 1),
- RPCEventManager: NewRPCEventManager(proxyForRPCEvents, instanceID),
+ RPCEventManager: NewRPCEventManager(proxyForRPCEvents, instanceID, stackID),
}
}
-func NewRPCEventManager(proxyForRPCEvents eventif.EventProxy, instanceID string) *RPCEventManager {
+func NewRPCEventManager(proxyForRPCEvents eventif.EventProxy, instanceID string, stackID string) *RPCEventManager {
return &RPCEventManager{
eventProxy: proxyForRPCEvents,
coreInstanceID: instanceID,
+ stackID: stackID,
}
}
func (q *Manager) SendPacketIn(ctx context.Context, deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
@@ -259,6 +261,7 @@
OperationId: opID,
ResourceId: resourceID,
Service: q.coreInstanceID,
+ StackId: q.stackID,
Status: &common.OperationResp{
Code: common.OperationResp_OPERATION_FAILURE,
},
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 71ecf26..818e6ac 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -162,7 +162,7 @@
proxy := model.NewDBPath(backend)
adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
- lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
+ lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy, cfg.VolthaStackID)
if err = lda.kmp.Start(ctx); err != nil {
logger.Fatal(ctx, "Cannot start InterContainerProxy")
}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 02eaf11..9d9f933 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -61,7 +61,7 @@
}
//NewManagers creates the Manager and the Logical Manager.
-func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration, eventProxy *events.EventProxy) (*Manager, *LogicalManager) {
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration, eventProxy *events.EventProxy, stackID string) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
@@ -71,13 +71,13 @@
dProxy: dbPath.Proxy("devices"),
adapterMgr: adapterMgr,
defaultTimeout: defaultCoreTimeout,
- RPCEventManager: event.NewRPCEventManager(eventProxy, coreInstanceID),
+ RPCEventManager: event.NewRPCEventManager(eventProxy, coreInstanceID, stackID),
deviceLoadingInProgress: make(map[string][]chan int),
}
deviceMgr.stateTransitions = state.NewTransitionMap(deviceMgr)
logicalDeviceMgr := &LogicalManager{
- Manager: event.NewManager(eventProxy, coreInstanceID),
+ Manager: event.NewManager(eventProxy, coreInstanceID, stackID),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
dbPath: dbPath,