[VOL-3187]Pass Context down the execution call hierarchy across voltha-go codebase
Change-Id: I6bc2a0f7226c1beed4ae01a15d7b5c4dc04358d8
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 5af75ec..e9bd663 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -63,13 +63,13 @@
done chan int
}
-func newDATest() *DATest {
+func newDATest(ctx context.Context) *DATest {
test := &DATest{}
// Start the embedded etcd server
var err error
- test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
+ test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer(ctx, "voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
if err != nil {
- logger.Fatal(err)
+ logger.Fatal(ctx, err)
}
// Create the kafka client
test.kClient = mock_kafka.NewKafkaClient()
@@ -111,17 +111,17 @@
return test
}
-func (dat *DATest) startCore() {
+func (dat *DATest) startCore(ctx context.Context) {
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = dat.defaultTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
grpcPort, err := freeport.GetFreePort()
if err != nil {
- logger.Fatal("Cannot get a freeport for grpc")
+ logger.Fatal(ctx, "Cannot get a freeport for grpc")
}
cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
- client := tst.SetupKVClient(cfg, dat.coreInstanceID)
+ client := tst.SetupKVClient(ctx, cfg, dat.coreInstanceID)
backend := &db.Backend{
Client: client,
StoreType: cfg.KVStoreType,
@@ -135,29 +135,29 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
- dat.adapterMgr = adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
+ dat.adapterMgr = adapter.NewAdapterManager(ctx, proxy, dat.coreInstanceID, dat.kClient)
dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
dat.adapterMgr.Start(context.Background())
- if err = dat.kmp.Start(); err != nil {
- logger.Fatal("Cannot start InterContainerProxy")
+ if err = dat.kmp.Start(ctx); err != nil {
+ logger.Fatal(ctx, "Cannot start InterContainerProxy")
}
- if err := dat.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CoreTopic}, kafka.OffsetNewest); err != nil {
- logger.Fatalf("Cannot add default request handler: %s", err)
+ if err := dat.kmp.SubscribeWithDefaultRequestHandler(ctx, kafka.Topic{Name: cfg.CoreTopic}, kafka.OffsetNewest); err != nil {
+ logger.Fatalf(ctx, "Cannot add default request handler: %s", err)
}
}
-func (dat *DATest) stopAll() {
+func (dat *DATest) stopAll(ctx context.Context) {
if dat.kClient != nil {
- dat.kClient.Stop()
+ dat.kClient.Stop(ctx)
}
if dat.kmp != nil {
- dat.kmp.Stop()
+ dat.kmp.Stop(ctx)
}
if dat.etcdServer != nil {
- tst.StopEmbeddedEtcdServer(dat.etcdServer)
+ tst.StopEmbeddedEtcdServer(ctx, dat.etcdServer)
}
}
@@ -244,13 +244,14 @@
}
func TestConcurrentDevices(t *testing.T) {
+ ctx := context.Background()
for i := 0; i < 2; i++ {
- da := newDATest()
+ da := newDATest(ctx)
assert.NotNil(t, da)
- defer da.stopAll()
+ defer da.stopAll(ctx)
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
// Start the Core
- da.startCore()
+ da.startCore(ctx)
var wg sync.WaitGroup
numConCurrentDeviceAgents := 20
@@ -264,33 +265,35 @@
}
}
func TestFlowUpdates(t *testing.T) {
- da := newDATest()
+ ctx := context.Background()
+ da := newDATest(ctx)
assert.NotNil(t, da)
- defer da.stopAll()
+ defer da.stopAll(ctx)
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
// Start the Core
- da.startCore()
- da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
+ da.startCore(ctx)
+ da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
a := da.createDeviceAgent(t)
cloned := a.getDeviceWithoutLock()
- err := a.updateDeviceStateInStoreWithoutLock(context.Background(), cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+ err := a.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
assert.Nil(t, err)
da.testFlowAddDeletes(t, a)
}
func TestGroupUpdates(t *testing.T) {
- da := newDATest()
+ ctx := context.Background()
+ da := newDATest(ctx)
assert.NotNil(t, da)
- defer da.stopAll()
+ defer da.stopAll(ctx)
// Start the Core
- da.startCore()
- da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
+ da.startCore(ctx)
+ da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
a := da.createDeviceAgent(t)
cloned := a.getDeviceWithoutLock()
- err := a.updateDeviceStateInStoreWithoutLock(context.Background(), cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+ err := a.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
assert.Nil(t, err)
da.testGroupAddDeletes(t, a)
}