[VOL-3187]Pass Context down the execution call hierarchy across voltha-go codebase
Change-Id: I6bc2a0f7226c1beed4ae01a15d7b5c4dc04358d8
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index 341624f..6c7053f 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -45,7 +45,7 @@
lockdDeviceTypeToAdapterMap sync.RWMutex
}
-func NewAdapterManager(dbPath *model.Path, coreInstanceID string, kafkaClient kafka.Client) *Manager {
+func NewAdapterManager(ctx context.Context, dbPath *model.Path, coreInstanceID string, kafkaClient kafka.Client) *Manager {
aMgr := &Manager{
coreInstanceID: coreInstanceID,
adapterProxy: dbPath.Proxy("adapters"),
@@ -53,7 +53,7 @@
deviceTypes: make(map[string]*voltha.DeviceType),
adapterAgents: make(map[string]*agent),
}
- kafkaClient.SubscribeForMetadata(aMgr.updateLastAdapterCommunication)
+ kafkaClient.SubscribeForMetadata(ctx, aMgr.updateLastAdapterCommunication)
return aMgr
}
@@ -67,33 +67,33 @@
func (aMgr *Manager) Start(ctx context.Context) {
probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusPreparing)
- logger.Info("starting-adapter-manager")
+ logger.Info(ctx, "starting-adapter-manager")
// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
// created if there are no data in the dB to start
- err := aMgr.loadAdaptersAndDevicetypesInMemory()
+ err := aMgr.loadAdaptersAndDevicetypesInMemory(ctx)
if err != nil {
- logger.Fatalf("failed-to-load-adapters-and-device-types-in-memory: %s", err)
+ logger.Fatalf(ctx, "failed-to-load-adapters-and-device-types-in-memory: %s", err)
}
probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusRunning)
- logger.Info("adapter-manager-started")
+ logger.Info(ctx, "adapter-manager-started")
}
//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory() error {
+func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory(ctx context.Context) error {
// Load the adapters
var adapters []*voltha.Adapter
if err := aMgr.adapterProxy.List(context.Background(), &adapters); err != nil {
- logger.Errorw("Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
+ logger.Errorw(ctx, "Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
return err
}
if len(adapters) != 0 {
for _, adapter := range adapters {
- if err := aMgr.addAdapter(adapter, false); err != nil {
- logger.Errorw("failed to add adapter", log.Fields{"adapterId": adapter.Id})
+ if err := aMgr.addAdapter(ctx, adapter, false); err != nil {
+ logger.Errorw(ctx, "failed to add adapter", log.Fields{"adapterId": adapter.Id})
} else {
- logger.Debugw("adapter added successfully", log.Fields{"adapterId": adapter.Id})
+ logger.Debugw(ctx, "adapter added successfully", log.Fields{"adapterId": adapter.Id})
}
}
}
@@ -101,19 +101,19 @@
// Load the device types
var deviceTypes []*voltha.DeviceType
if err := aMgr.deviceTypeProxy.List(context.Background(), &deviceTypes); err != nil {
- logger.Errorw("Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
+ logger.Errorw(ctx, "Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
return err
}
if len(deviceTypes) != 0 {
dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
for _, dType := range deviceTypes {
- logger.Debugw("found-existing-device-types", log.Fields{"deviceTypes": dTypes})
+ logger.Debugw(ctx, "found-existing-device-types", log.Fields{"deviceTypes": dTypes})
dTypes.Items = append(dTypes.Items, dType)
}
- return aMgr.addDeviceTypes(dTypes, false)
+ return aMgr.addDeviceTypes(ctx, dTypes, false)
}
- logger.Debug("no-existing-device-type-found")
+ logger.Debug(ctx, "no-existing-device-type-found")
return nil
}
@@ -128,24 +128,24 @@
}
}
-func (aMgr *Manager) addAdapter(adapter *voltha.Adapter, saveToDb bool) error {
+func (aMgr *Manager) addAdapter(ctx context.Context, adapter *voltha.Adapter, saveToDb bool) error {
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
- logger.Debugw("adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ logger.Debugw(ctx, "adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
if saveToDb {
// Save the adapter to the KV store - first check if it already exist
if have, err := aMgr.adapterProxy.Get(context.Background(), adapter.Id, &voltha.Adapter{}); err != nil {
- logger.Errorw("failed-to-get-adapters-from-cluster-proxy", log.Fields{"error": err})
+ logger.Errorw(ctx, "failed-to-get-adapters-from-cluster-proxy", log.Fields{"error": err})
return err
} else if !have {
if err := aMgr.adapterProxy.Set(context.Background(), adapter.Id, adapter); err != nil {
- logger.Errorw("failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ logger.Errorw(ctx, "failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
return err
}
- logger.Debugw("adapter-saved-to-KV-Store", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ logger.Debugw(ctx, "adapter-saved-to-KV-Store", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
} else {
log.Warnw("adding-adapter-already-in-KV-store", log.Fields{
@@ -160,11 +160,11 @@
return nil
}
-func (aMgr *Manager) addDeviceTypes(deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
+func (aMgr *Manager) addDeviceTypes(ctx context.Context, deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
if deviceTypes == nil {
return fmt.Errorf("no-device-type")
}
- logger.Debugw("adding-device-types", log.Fields{"deviceTypes": deviceTypes})
+ logger.Debugw(ctx, "adding-device-types", log.Fields{"deviceTypes": deviceTypes})
aMgr.lockAdaptersMap.Lock()
defer aMgr.lockAdaptersMap.Unlock()
aMgr.lockdDeviceTypeToAdapterMap.Lock()
@@ -179,16 +179,16 @@
// Save the device types to the KV store
for _, deviceType := range deviceTypes.Items {
if have, err := aMgr.deviceTypeProxy.Get(context.Background(), deviceType.Id, &voltha.DeviceType{}); err != nil {
- logger.Errorw("Failed-to--device-types-from-cluster-data-proxy", log.Fields{"error": err})
+ logger.Errorw(ctx, "Failed-to--device-types-from-cluster-data-proxy", log.Fields{"error": err})
return err
} else if !have {
// Does not exist - save it
clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
if err := aMgr.deviceTypeProxy.Set(context.Background(), deviceType.Id, clonedDType); err != nil {
- logger.Errorw("Failed-to-add-device-types-to-cluster-data-proxy", log.Fields{"error": err})
+ logger.Errorw(ctx, "Failed-to-add-device-types-to-cluster-data-proxy", log.Fields{"error": err})
return err
}
- logger.Debugw("device-type-saved-to-KV-Store", log.Fields{"deviceType": deviceType})
+ logger.Debugw(ctx, "device-type-saved-to-KV-Store", log.Fields{"deviceType": deviceType})
}
}
}
@@ -197,29 +197,29 @@
}
// ListAdapters returns the contents of all adapters known to the system
-func (aMgr *Manager) ListAdapters(_ context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
+func (aMgr *Manager) ListAdapters(ctx context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
result := &voltha.Adapters{Items: []*voltha.Adapter{}}
aMgr.lockAdaptersMap.RLock()
defer aMgr.lockAdaptersMap.RUnlock()
for _, adapterAgent := range aMgr.adapterAgents {
- if a := adapterAgent.getAdapter(); a != nil {
+ if a := adapterAgent.getAdapter(ctx); a != nil {
result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
}
}
return result, nil
}
-func (aMgr *Manager) getAdapter(adapterID string) *voltha.Adapter {
+func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) *voltha.Adapter {
aMgr.lockAdaptersMap.RLock()
defer aMgr.lockAdaptersMap.RUnlock()
if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
- return adapterAgent.getAdapter()
+ return adapterAgent.getAdapter(ctx)
}
return nil
}
-func (aMgr *Manager) RegisterAdapter(adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
- logger.Debugw("RegisterAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+func (aMgr *Manager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
+ logger.Debugw(ctx, "RegisterAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
if adapter.Type == "" {
@@ -231,27 +231,27 @@
return nil, status.Error(codes.InvalidArgument, "adapter-not-specifying-type")
}
- if aMgr.getAdapter(adapter.Id) != nil {
+ if aMgr.getAdapter(ctx, adapter.Id) != nil {
// Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
go func() {
err := aMgr.onAdapterRestart(context.Background(), adapter)
if err != nil {
- logger.Errorw("unable-to-restart-adapter", log.Fields{"error": err})
+ logger.Errorw(ctx, "unable-to-restart-adapter", log.Fields{"error": err})
}
}()
return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
}
// Save the adapter and the device types
- if err := aMgr.addAdapter(adapter, true); err != nil {
- logger.Errorw("failed-to-add-adapter", log.Fields{"error": err})
+ if err := aMgr.addAdapter(ctx, adapter, true); err != nil {
+ logger.Errorw(ctx, "failed-to-add-adapter", log.Fields{"error": err})
return nil, err
}
- if err := aMgr.addDeviceTypes(deviceTypes, true); err != nil {
- logger.Errorw("failed-to-add-device-types", log.Fields{"error": err})
+ if err := aMgr.addDeviceTypes(ctx, deviceTypes, true); err != nil {
+ logger.Errorw(ctx, "failed-to-add-device-types", log.Fields{"error": err})
return nil, err
}
- logger.Debugw("adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
+ logger.Debugw(ctx, "adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
@@ -270,8 +270,8 @@
}
// ListDeviceTypes returns all the device types known to the system
-func (aMgr *Manager) ListDeviceTypes(_ context.Context, _ *empty.Empty) (*voltha.DeviceTypes, error) {
- logger.Debug("ListDeviceTypes")
+func (aMgr *Manager) ListDeviceTypes(ctx context.Context, _ *empty.Empty) (*voltha.DeviceTypes, error) {
+ logger.Debug(ctx, "ListDeviceTypes")
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
@@ -283,8 +283,8 @@
}
// GetDeviceType returns the device type proto definition given the name of the device type
-func (aMgr *Manager) GetDeviceType(_ context.Context, deviceType *voltha.ID) (*voltha.DeviceType, error) {
- logger.Debugw("GetDeviceType", log.Fields{"typeid": deviceType.Id})
+func (aMgr *Manager) GetDeviceType(ctx context.Context, deviceType *voltha.ID) (*voltha.DeviceType, error) {
+ logger.Debugw(ctx, "GetDeviceType", log.Fields{"typeid": deviceType.Id})
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()