VOL-3657 support stack id in rpc events

Change-Id: Ie94fc7aa6f3a4c58803156f75d29ecad7f11e601
diff --git a/VERSION b/VERSION
index 9781df7..dbe5900 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.8.0-dev
+2.8.1
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 2788e56..2b5017e 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -53,6 +53,7 @@
 	defaultTraceEnabled              = false
 	defaultTraceAgentAddress         = "127.0.0.1:6831"
 	defaultLogCorrelationEnabled     = true
+	defaultVolthaStackID             = "voltha"
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
@@ -86,6 +87,7 @@
 	TraceEnabled              bool
 	TraceAgentAddress         string
 	LogCorrelationEnabled     bool
+	VolthaStackID             string
 }
 
 // NewRWCoreFlags returns a new RWCore config
@@ -119,6 +121,7 @@
 		TraceEnabled:              defaultTraceEnabled,
 		TraceAgentAddress:         defaultTraceAgentAddress,
 		LogCorrelationEnabled:     defaultLogCorrelationEnabled,
+		VolthaStackID:             defaultVolthaStackID,
 	}
 	return &rwCoreFlag
 }
@@ -203,5 +206,8 @@
 	help = fmt.Sprintf("Whether to enrich log statements with fields denoting operation being executed for achieving correlation?")
 	flag.BoolVar(&(cf.LogCorrelationEnabled), "log_correlation_enabled", defaultLogCorrelationEnabled, help)
 
+	help = fmt.Sprintf("ID for the current voltha stack")
+	flag.StringVar(&cf.VolthaStackID, "stack_id", defaultVolthaStackID, help)
+
 	flag.Parse()
 }
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index f42117e..631a56c 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -123,7 +123,7 @@
 	proxy := model.NewDBPath(backend)
 	nb.adapterMgr = adapter.NewAdapterManager(ctx, proxy, nb.coreInstanceID, nb.kClient)
 	eventProxy := events.NewEventProxy(events.MsgClient(nb.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
-	nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CoreTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
+	nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CoreTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy, cfg.VolthaStackID)
 	nb.adapterMgr.Start(ctx)
 
 	if err := nb.kmp.Start(ctx); err != nil {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0b3dd94..459c2fa 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -165,7 +165,7 @@
 
 	// create the core of the system, the device managers
 	endpointMgr := kafka.NewEndpointManager(backend)
-	deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout, eventProxy)
+	deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout, eventProxy, cf.VolthaStackID)
 
 	// register kafka RPC handler
 	registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic)
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 6947447..501cc61 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -143,7 +143,7 @@
 	proxy := model.NewDBPath(backend)
 	dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
 	eventProxy := events.NewEventProxy(events.MsgClient(dat.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
-	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
+	dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy, cfg.VolthaStackID)
 	dat.adapterMgr.Start(context.Background())
 	if err = dat.kmp.Start(ctx); err != nil {
 		logger.Fatal(ctx, "Cannot start InterContainerProxy")
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
index 4489196..472ae83 100644
--- a/rw_core/core/device/event/event.go
+++ b/rw_core/core/device/event/event.go
@@ -45,22 +45,24 @@
 type RPCEventManager struct {
 	eventProxy     eventif.EventProxy
 	coreInstanceID string
+	stackID        string
 }
 
-func NewManager(proxyForRPCEvents eventif.EventProxy, instanceID string) *Manager {
+func NewManager(proxyForRPCEvents eventif.EventProxy, instanceID string, stackID string) *Manager {
 	return &Manager{
 		packetInQueue:        make(chan openflow_13.PacketIn, 100),
 		packetInQueueDone:    make(chan bool, 1),
 		changeEventQueue:     make(chan openflow_13.ChangeEvent, 100),
 		changeEventQueueDone: make(chan bool, 1),
-		RPCEventManager:      NewRPCEventManager(proxyForRPCEvents, instanceID),
+		RPCEventManager:      NewRPCEventManager(proxyForRPCEvents, instanceID, stackID),
 	}
 }
 
-func NewRPCEventManager(proxyForRPCEvents eventif.EventProxy, instanceID string) *RPCEventManager {
+func NewRPCEventManager(proxyForRPCEvents eventif.EventProxy, instanceID string, stackID string) *RPCEventManager {
 	return &RPCEventManager{
 		eventProxy:     proxyForRPCEvents,
 		coreInstanceID: instanceID,
+		stackID:        stackID,
 	}
 }
 func (q *Manager) SendPacketIn(ctx context.Context, deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
@@ -259,6 +261,7 @@
 		OperationId: opID,
 		ResourceId:  resourceID,
 		Service:     q.coreInstanceID,
+		StackId:     q.stackID,
 		Status: &common.OperationResp{
 			Code: common.OperationResp_OPERATION_FAILURE,
 		},
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 71ecf26..818e6ac 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -162,7 +162,7 @@
 	proxy := model.NewDBPath(backend)
 	adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
 	eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
-	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy)
+	lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout, eventProxy, cfg.VolthaStackID)
 	if err = lda.kmp.Start(ctx); err != nil {
 		logger.Fatal(ctx, "Cannot start InterContainerProxy")
 	}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 02eaf11..9d9f933 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -61,7 +61,7 @@
 }
 
 //NewManagers creates the Manager and the Logical Manager.
-func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration, eventProxy *events.EventProxy) (*Manager, *LogicalManager) {
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration, eventProxy *events.EventProxy, stackID string) (*Manager, *LogicalManager) {
 	deviceMgr := &Manager{
 		rootDevices:             make(map[string]bool),
 		kafkaICProxy:            kmp,
@@ -71,13 +71,13 @@
 		dProxy:                  dbPath.Proxy("devices"),
 		adapterMgr:              adapterMgr,
 		defaultTimeout:          defaultCoreTimeout,
-		RPCEventManager:         event.NewRPCEventManager(eventProxy, coreInstanceID),
+		RPCEventManager:         event.NewRPCEventManager(eventProxy, coreInstanceID, stackID),
 		deviceLoadingInProgress: make(map[string][]chan int),
 	}
 	deviceMgr.stateTransitions = state.NewTransitionMap(deviceMgr)
 
 	logicalDeviceMgr := &LogicalManager{
-		Manager:                        event.NewManager(eventProxy, coreInstanceID),
+		Manager:                        event.NewManager(eventProxy, coreInstanceID, stackID),
 		deviceMgr:                      deviceMgr,
 		kafkaICProxy:                   kmp,
 		dbPath:                         dbPath,