[VOL-3187]Pass Context down the execution call hierarchy across voltha-go codebase
Change-Id: I6bc2a0f7226c1beed4ae01a15d7b5c4dc04358d8
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 1b1dc59..5cd72b4 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -57,13 +57,13 @@
done chan int
}
-func newLDATest() *LDATest {
+func newLDATest(ctx context.Context) *LDATest {
test := &LDATest{}
// Start the embedded etcd server
var err error
- test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
+ test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer(ctx, "voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
if err != nil {
- logger.Fatal(err)
+ logger.Fatal(ctx, err)
}
// Create the kafka client
test.kClient = mock_kafka.NewKafkaClient()
@@ -131,17 +131,17 @@
return test
}
-func (lda *LDATest) startCore(inCompeteMode bool) {
+func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = lda.defaultTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
grpcPort, err := freeport.GetFreePort()
if err != nil {
- logger.Fatal("Cannot get a freeport for grpc")
+ logger.Fatal(ctx, "Cannot get a freeport for grpc")
}
cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
- client := tst.SetupKVClient(cfg, lda.coreInstanceID)
+ client := tst.SetupKVClient(ctx, cfg, lda.coreInstanceID)
backend := &db.Backend{
Client: client,
StoreType: cfg.KVStoreType,
@@ -155,24 +155,24 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
- adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
+ adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
- if err = lda.kmp.Start(); err != nil {
- logger.Fatal("Cannot start InterContainerProxy")
+ if err = lda.kmp.Start(ctx); err != nil {
+ logger.Fatal(ctx, "Cannot start InterContainerProxy")
}
adapterMgr.Start(context.Background())
}
-func (lda *LDATest) stopAll() {
+func (lda *LDATest) stopAll(ctx context.Context) {
if lda.kClient != nil {
- lda.kClient.Stop()
+ lda.kClient.Stop(ctx)
}
if lda.kmp != nil {
- lda.kmp.Stop()
+ lda.kmp.Stop(ctx)
}
if lda.etcdServer != nil {
- tst.StopEmbeddedEtcdServer(lda.etcdServer)
+ tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
}
}
@@ -182,7 +182,7 @@
clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
clonedLD.Id = com.GetRandomString(10)
clonedLD.DatapathId = rand.Uint64()
- lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
+ lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
for _, port := range clonedLD.Ports {
handle, created, err := lDeviceAgent.portLoader.LockOrCreate(context.Background(), port)
@@ -245,14 +245,15 @@
},
}
localWG.Add(1)
+ ctx := context.Background()
go func() {
- err := ldAgent.meterAdd(context.Background(), meterMod)
+ err := ldAgent.meterAdd(ctx, meterMod)
assert.Nil(t, err)
localWG.Done()
}()
// wait for go routines to be done
localWG.Wait()
- meterEntry := fu.MeterEntryFromMeterMod(meterMod)
+ meterEntry := fu.MeterEntryFromMeterMod(ctx, meterMod)
meterHandle, have := ldAgent.meterLoader.Lock(meterMod.MeterId)
assert.Equal(t, have, true)
@@ -269,7 +270,7 @@
expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
- updatedLogicalDevicePorts := ldAgent.listLogicalDevicePorts()
+ updatedLogicalDevicePorts := ldAgent.listLogicalDevicePorts(context.Background())
for _, p := range expectedChange.Ports {
assert.True(t, proto.Equal(p, updatedLogicalDevicePorts[p.DevicePortNo]))
}
@@ -277,12 +278,13 @@
}
func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
- lda := newLDATest()
+ ctx := context.Background()
+ lda := newLDATest(ctx)
assert.NotNil(t, lda)
- defer lda.stopAll()
+ defer lda.stopAll(ctx)
// Start the Core
- lda.startCore(false)
+ lda.startCore(ctx, false)
var wg sync.WaitGroup
numConCurrentLogicalDeviceAgents := 3