[VOL-3069]Pass Context down the execution call hierarchy across voltha codebase
Change-Id: I16560357c5fc130f834929e7e2e92cee14b518e2
diff --git a/pkg/adapters/adapterif/events_proxy_if.go b/pkg/adapters/adapterif/events_proxy_if.go
index c144935..dbd8140 100644
--- a/pkg/adapters/adapterif/events_proxy_if.go
+++ b/pkg/adapters/adapterif/events_proxy_if.go
@@ -17,14 +17,15 @@
package adapterif
import (
+ "context"
"github.com/opencord/voltha-protos/v3/go/voltha"
)
// EventProxy interface for eventproxy
type EventProxy interface {
- SendDeviceEvent(deviceEvent *voltha.DeviceEvent, category EventCategory,
+ SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category EventCategory,
subCategory EventSubCategory, raisedTs int64) error
- SendKpiEvent(id string, deviceEvent *voltha.KpiEvent2, category EventCategory,
+ SendKpiEvent(ctx context.Context, id string, deviceEvent *voltha.KpiEvent2, category EventCategory,
subCategory EventSubCategory, raisedTs int64) error
}
diff --git a/pkg/adapters/common/adapter_proxy.go b/pkg/adapters/common/adapter_proxy.go
index cd5750f..ca44d0d 100644
--- a/pkg/adapters/common/adapter_proxy.go
+++ b/pkg/adapters/common/adapter_proxy.go
@@ -35,14 +35,14 @@
endpointMgr kafka.EndpointManager
}
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
proxy := AdapterProxy{
kafkaICProxy: kafkaProxy,
adapterTopic: adapterTopic,
coreTopic: coreTopic,
endpointMgr: kafka.NewEndpointManager(backend),
}
- logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+ logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
return &proxy
}
@@ -54,14 +54,14 @@
toDeviceId string,
proxyDeviceId string,
messageId string) error {
- logger.Debugw("sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
+ logger.Debugw(ctx, "sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
"to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
//Marshal the message
var marshalledMsg *any.Any
var err error
if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
- logger.Warnw("cannot-marshal-msg", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-marshal-msg", log.Fields{"error": err})
return err
}
@@ -90,7 +90,7 @@
}
// Set up the required rpc arguments
- endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
+ endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
if err != nil {
return err
}
@@ -99,6 +99,6 @@
rpc := "process_inter_adapter_message"
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
- logger.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
- return unPackResponse(rpc, "", success, result)
+ logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ return unPackResponse(ctx, rpc, "", success, result)
}
diff --git a/pkg/adapters/common/adapter_proxy_test.go b/pkg/adapters/common/adapter_proxy_test.go
index 3ba8290..01a6603 100644
--- a/pkg/adapters/common/adapter_proxy_test.go
+++ b/pkg/adapters/common/adapter_proxy_test.go
@@ -39,10 +39,11 @@
func init() {
+ ctx := context.Background()
var err error
embedEtcdServerPort, err = freeport.GetFreePort()
if err != nil {
- logger.Fatal("Cannot get freeport for KvClient")
+ logger.Fatal(ctx, "Cannot get freeport for KvClient")
}
}
@@ -54,8 +55,8 @@
Response: &voltha.Device{Id: "testDeviceId"},
},
}
- backend := db.NewBackend("etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
- adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
+ backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
+ adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
assert.NotNil(t, adapter)
}
@@ -68,9 +69,9 @@
Response: &voltha.Device{Id: "testDeviceId"},
},
}
- backend := db.NewBackend("etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
+ backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
- adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
+ adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
adapter.endpointMgr = mocks.NewEndpointManager()
@@ -112,9 +113,9 @@
Response: &voltha.Device{Id: "testDeviceId"},
},
}
- backend := db.NewBackend("etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
+ backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
- adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
+ adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
adapter.endpointMgr = mocks.NewEndpointManager()
@@ -140,9 +141,9 @@
Response: &voltha.Device{Id: "testDeviceId"},
},
}
- backend := db.NewBackend("etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
+ backend := db.NewBackend(context.Background(), "etcd", embedEtcdServerHost+":"+strconv.Itoa(embedEtcdServerPort), defaultTimeout, defaultPathPrefix)
- adapter := NewAdapterProxy(mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
+ adapter := NewAdapterProxy(context.Background(), mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic", backend)
adapter.endpointMgr = mocks.NewEndpointManager()
diff --git a/pkg/adapters/common/common.go b/pkg/adapters/common/common.go
index 95a036d..ad8b11b 100644
--- a/pkg/adapters/common/common.go
+++ b/pkg/adapters/common/common.go
@@ -19,12 +19,12 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-var logger log.Logger
+var logger log.CLogger
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "common"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "common"})
if err != nil {
panic(err)
}
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index 20e1a52..28b532f 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -37,28 +37,28 @@
lockDeviceIdCoreMap sync.RWMutex
}
-func NewCoreProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
+func NewCoreProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
var proxy CoreProxy
proxy.kafkaICProxy = kafkaProxy
proxy.adapterTopic = adapterTopic
proxy.coreTopic = coreTopic
proxy.deviceIdCoreMap = make(map[string]string)
proxy.lockDeviceIdCoreMap = sync.RWMutex{}
- logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+ logger.Debugw(ctx, "TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
return &proxy
}
-func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
+func unPackResponse(ctx context.Context, rpc string, deviceId string, success bool, response *a.Any) error {
if success {
return nil
} else {
unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
- logger.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
+ logger.Debugw(ctx, "response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
// TODO: Need to get the real error code
return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
}
@@ -94,18 +94,18 @@
}
func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
- logger.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
+ logger.Debugw(ctx, "registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
rpc := "Register"
topic := kafka.Topic{Name: ap.coreTopic}
replyToTopic := ap.getAdapterTopic()
args := make([]*kafka.KVArg, 2)
if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
- log.Fatal("totalReplicas can't be 0, since you're here you have at least one")
+ logger.Fatal(ctx, "totalReplicas can't be 0, since you're here you have at least one")
}
if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
- log.Fatal("currentReplica can't be 0, it has to start from 1")
+ logger.Fatal(ctx, "currentReplica can't be 0, it has to start from 1")
}
if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
@@ -117,7 +117,7 @@
}
if adapter.CurrentReplica > adapter.TotalReplicas {
- log.Fatalf("CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+ logger.Fatalf(ctx, "CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
adapter.CurrentReplica, adapter.TotalReplicas)
}
@@ -131,12 +131,12 @@
}
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
- logger.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
- return unPackResponse(rpc, "", success, result)
+ logger.Debugw(ctx, "Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ return unPackResponse(ctx, rpc, "", success, result)
}
func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
- logger.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
+ logger.Debugw(ctx, "DeviceUpdate", log.Fields{"deviceId": device.Id})
rpc := "DeviceUpdate"
toTopic := ap.getCoreTopic(device.Id)
args := make([]*kafka.KVArg, 1)
@@ -147,12 +147,12 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
- logger.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
- return unPackResponse(rpc, device.Id, success, result)
+ logger.Debugw(ctx, "DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
+ return unPackResponse(ctx, rpc, device.Id, success, result)
}
func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
- logger.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
+ logger.Debugw(ctx, "PortCreated", log.Fields{"portNo": port.PortNo})
rpc := "PortCreated"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -171,12 +171,12 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- logger.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
+ logger.Debugw(ctx, "PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
}
func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_Types) error {
- logger.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
+ logger.Debugw(ctx, "PortsStateUpdate", log.Fields{"deviceId": deviceId})
rpc := "PortsStateUpdate"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -197,12 +197,12 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- logger.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
+ logger.Debugw(ctx, "PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
}
func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
- logger.Debugw("DeleteAllPorts", log.Fields{"deviceId": deviceId})
+ logger.Debugw(ctx, "DeleteAllPorts", log.Fields{"deviceId": deviceId})
rpc := "DeleteAllPorts"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -218,13 +218,13 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- logger.Debugw("DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
+ logger.Debugw(ctx, "DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
}
func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error {
- logger.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
+ logger.Debugw(ctx, "DeviceStateUpdate", log.Fields{"deviceId": deviceId})
rpc := "DeviceStateUpdate"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -249,13 +249,13 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- logger.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
+ logger.Debugw(ctx, "DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
}
func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
- logger.Debugw("ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
+ logger.Debugw(ctx, "ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
rpc := "ChildDeviceDetected"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -300,12 +300,12 @@
}
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- logger.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ logger.Debugw(ctx, "ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
volthaDevice := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return volthaDevice, nil
@@ -313,17 +313,17 @@
unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
- logger.Debugw("ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+ logger.Debugw(ctx, "ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
}
}
func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
- logger.Debugw("ChildDevicesLost", log.Fields{"pDeviceId": parentDeviceId})
+ logger.Debugw(ctx, "ChildDevicesLost", log.Fields{"pDeviceId": parentDeviceId})
rpc := "ChildDevicesLost"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -338,12 +338,12 @@
}
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- logger.Debugw("ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
- return unPackResponse(rpc, parentDeviceId, success, result)
+ logger.Debugw(ctx, "ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(ctx, rpc, parentDeviceId, success, result)
}
func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
- logger.Debugw("ChildDevicesDetected", log.Fields{"pDeviceId": parentDeviceId})
+ logger.Debugw(ctx, "ChildDevicesDetected", log.Fields{"pDeviceId": parentDeviceId})
rpc := "ChildDevicesDetected"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -358,12 +358,12 @@
}
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- logger.Debugw("ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
- return unPackResponse(rpc, parentDeviceId, success, result)
+ logger.Debugw(ctx, "ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(ctx, rpc, parentDeviceId, success, result)
}
func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
- logger.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
+ logger.Debugw(ctx, "GetDevice", log.Fields{"deviceId": deviceId})
rpc := "GetDevice"
toTopic := ap.getCoreTopic(parentDeviceId)
@@ -377,12 +377,12 @@
}
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- logger.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ logger.Debugw(ctx, "GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
volthaDevice := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return volthaDevice, nil
@@ -390,16 +390,16 @@
unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
- logger.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+ logger.Debugw(ctx, "GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
// TODO: Need to get the real error code
- return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
}
}
func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
- logger.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
+ logger.Debugw(ctx, "GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
rpc := "GetChildDevice"
toTopic := ap.getCoreTopic(parentDeviceId)
@@ -437,12 +437,12 @@
}
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- logger.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ logger.Debugw(ctx, "GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
volthaDevice := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return volthaDevice, nil
@@ -450,16 +450,16 @@
unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
- logger.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+ logger.Debugw(ctx, "GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
}
}
func (ap *CoreProxy) GetChildDevices(ctx context.Context, parentDeviceId string) (*voltha.Devices, error) {
- logger.Debugw("GetChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+ logger.Debugw(ctx, "GetChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
rpc := "GetChildDevices"
toTopic := ap.getCoreTopic(parentDeviceId)
@@ -473,12 +473,12 @@
}
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- logger.Debugw("GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ logger.Debugw(ctx, "GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
if success {
volthaDevices := &voltha.Devices{}
if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return volthaDevices, nil
@@ -486,16 +486,16 @@
unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
- logger.Debugw("GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+ logger.Debugw(ctx, "GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
}
}
func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
- logger.Debugw("SendPacketIn", log.Fields{"deviceId": deviceId, "port": port, "pktPayload": pktPayload})
+ logger.Debugw(ctx, "SendPacketIn", log.Fields{"deviceId": deviceId, "port": port, "pktPayload": pktPayload})
rpc := "PacketIn"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -519,12 +519,12 @@
Value: pkt,
}
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- logger.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
+ logger.Debugw(ctx, "SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
}
func (ap *CoreProxy) DeviceReasonUpdate(ctx context.Context, deviceId string, deviceReason string) error {
- logger.Debugw("DeviceReasonUpdate", log.Fields{"deviceId": deviceId, "deviceReason": deviceReason})
+ logger.Debugw(ctx, "DeviceReasonUpdate", log.Fields{"deviceId": deviceId, "deviceReason": deviceReason})
rpc := "DeviceReasonUpdate"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -543,12 +543,12 @@
Value: reason,
}
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- logger.Debugw("DeviceReason-response", log.Fields{"pDeviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
+ logger.Debugw(ctx, "DeviceReason-response", log.Fields{"pDeviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
}
func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
- logger.Debugw("DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
+ logger.Debugw(ctx, "DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
rpc := "DevicePMConfigUpdate"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -561,12 +561,12 @@
Value: pmConfigs,
}
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
- logger.Debugw("DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
- return unPackResponse(rpc, pmConfigs.Id, success, result)
+ logger.Debugw(ctx, "DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
+ return unPackResponse(ctx, rpc, pmConfigs.Id, success, result)
}
func (ap *CoreProxy) ReconcileChildDevices(ctx context.Context, parentDeviceId string) error {
- logger.Debugw("ReconcileChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+ logger.Debugw(ctx, "ReconcileChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
rpc := "ReconcileChildDevices"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -578,13 +578,13 @@
}
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- logger.Debugw("ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
- return unPackResponse(rpc, parentDeviceId, success, result)
+ logger.Debugw(ctx, "ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(ctx, rpc, parentDeviceId, success, result)
}
func (ap *CoreProxy) PortStateUpdate(ctx context.Context, deviceId string, pType voltha.Port_PortType, portNum uint32,
operStatus voltha.OperStatus_Types) error {
- logger.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId, "portType": pType, "portNo": portNum, "operation_status": operStatus})
+ logger.Debugw(ctx, "PortStateUpdate", log.Fields{"deviceId": deviceId, "portType": pType, "portNo": portNum, "operation_status": operStatus})
rpc := "PortStateUpdate"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
@@ -615,6 +615,6 @@
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- logger.Debugw("PortStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
+ logger.Debugw(ctx, "PortStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
}
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index a1b4290..2fb4df7 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -45,7 +45,7 @@
},
}
- proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+ proxy := NewCoreProxy(context.Background(), &mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
adapter := &voltha.Adapter{
Id: "testAdapter",
@@ -88,7 +88,7 @@
},
}
- proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+ proxy := NewCoreProxy(context.Background(), &mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
adapter := &voltha.Adapter{
Id: "testAdapter",
@@ -128,7 +128,7 @@
},
}
- proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+ proxy := NewCoreProxy(context.Background(), &mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
kwargs := make(map[string]interface{})
kwargs["serial_number"] = "TEST00000000001"
@@ -159,7 +159,7 @@
},
}
- proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+ proxy := NewCoreProxy(context.Background(), &mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
kwargs := make(map[string]interface{})
kwargs["onu_id"] = uint32(1234)
@@ -190,7 +190,7 @@
},
}
- proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+ proxy := NewCoreProxy(context.Background(), &mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
kwargs := make(map[string]interface{})
kwargs["onu_id"] = uint32(1234)
@@ -213,7 +213,7 @@
},
}
- proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+ proxy := NewCoreProxy(context.Background(), &mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
kwargs := make(map[string]interface{})
kwargs["onu_id"] = uint32(1234)
@@ -241,7 +241,7 @@
},
}
- proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+ proxy := NewCoreProxy(context.Background(), &mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
parentDeviceId := "aabbcc"
devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
@@ -268,7 +268,7 @@
},
}
- proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+ proxy := NewCoreProxy(context.Background(), &mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
parentDeviceId := "aabbcc"
devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
@@ -288,7 +288,7 @@
},
}
- proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+ proxy := NewCoreProxy(context.Background(), &mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
parentDeviceId := "aabbcc"
devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
diff --git a/pkg/adapters/common/events_proxy.go b/pkg/adapters/common/events_proxy.go
index da9c9eb..b79bafe 100644
--- a/pkg/adapters/common/events_proxy.go
+++ b/pkg/adapters/common/events_proxy.go
@@ -17,6 +17,7 @@
package common
import (
+ "context"
"errors"
"fmt"
"strconv"
@@ -96,9 +97,9 @@
}
/* Send out device events*/
-func (ep *EventProxy) SendDeviceEvent(deviceEvent *voltha.DeviceEvent, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
+func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
if deviceEvent == nil {
- logger.Error("Recieved empty device event")
+ logger.Error(ctx, "Recieved empty device event")
return errors.New("Device event nil")
}
var event voltha.Event
@@ -109,11 +110,11 @@
return err
}
event.EventType = &de
- if err := ep.sendEvent(&event); err != nil {
- logger.Errorw("Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
+ if err := ep.sendEvent(ctx, &event); err != nil {
+ logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
return err
}
- logger.Infow("Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
"SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
"ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
"DeviceEventName": deviceEvent.DeviceEventName})
@@ -123,9 +124,9 @@
}
// SendKpiEvent is to send kpi events to voltha.event topic
-func (ep *EventProxy) SendKpiEvent(id string, kpiEvent *voltha.KpiEvent2, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
+func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
if kpiEvent == nil {
- logger.Error("Recieved empty kpi event")
+ logger.Error(ctx, "Recieved empty kpi event")
return errors.New("KPI event nil")
}
var event voltha.Event
@@ -136,11 +137,11 @@
return err
}
event.EventType = &de
- if err := ep.sendEvent(&event); err != nil {
- logger.Errorw("Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
+ if err := ep.sendEvent(ctx, &event); err != nil {
+ logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
return err
}
- logger.Infow("Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ logger.Infow(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
"SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
"ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
@@ -150,11 +151,11 @@
/* TODO: Send out KPI events*/
-func (ep *EventProxy) sendEvent(event *voltha.Event) error {
- if err := ep.kafkaClient.Send(event, &ep.eventTopic); err != nil {
+func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
+ if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
return err
}
- logger.Debugw("Sent event to kafka", log.Fields{"event": event})
+ logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
return nil
}
diff --git a/pkg/adapters/common/request_handler.go b/pkg/adapters/common/request_handler.go
index 62d8cdd..a92ed51 100644
--- a/pkg/adapters/common/request_handler.go
+++ b/pkg/adapters/common/request_handler.go
@@ -16,6 +16,7 @@
package common
import (
+ "context"
"errors"
"github.com/golang/protobuf/ptypes"
@@ -58,9 +59,9 @@
return nil, nil
}
-func (rhp *RequestHandlerProxy) Adopt_device(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *RequestHandlerProxy) Adopt_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -71,38 +72,38 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw("Adopt_device", log.Fields{"deviceId": device.Id})
+ logger.Debugw(ctx, "Adopt_device", log.Fields{"deviceId": device.Id})
//Update the core reference for that device
rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
//Invoke the adopt device on the adapter
- if err := rhp.adapter.Adopt_device(device); err != nil {
+ if err := rhp.adapter.Adopt_device(ctx, device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Reconcile_device(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *RequestHandlerProxy) Reconcile_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -114,17 +115,17 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
@@ -133,7 +134,7 @@
rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
//Invoke the reconcile device API on the adapter
- if err := rhp.adapter.Reconcile_device(device); err != nil {
+ if err := rhp.adapter.Reconcile_device(ctx, device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
@@ -143,9 +144,9 @@
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Disable_device(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *RequestHandlerProxy) Disable_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -157,17 +158,17 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
@@ -175,15 +176,15 @@
//Update the core reference for that device
rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
//Invoke the Disable_device API on the adapter
- if err := rhp.adapter.Disable_device(device); err != nil {
+ if err := rhp.adapter.Disable_device(ctx, device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Reenable_device(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *RequestHandlerProxy) Reenable_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -195,17 +196,17 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
@@ -213,15 +214,15 @@
//Update the core reference for that device
rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
//Invoke the Reenable_device API on the adapter
- if err := rhp.adapter.Reenable_device(device); err != nil {
+ if err := rhp.adapter.Reenable_device(ctx, device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Reboot_device(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *RequestHandlerProxy) Reboot_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -233,17 +234,17 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
@@ -251,7 +252,7 @@
//Update the core reference for that device
rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
//Invoke the Reboot_device API on the adapter
- if err := rhp.adapter.Reboot_device(device); err != nil {
+ if err := rhp.adapter.Reboot_device(ctx, device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
@@ -262,9 +263,9 @@
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Delete_device(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *RequestHandlerProxy) Delete_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -276,17 +277,17 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
@@ -294,7 +295,7 @@
//Update the core reference for that device
rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
//Invoke the delete_device API on the adapter
- if err := rhp.adapter.Delete_device(device); err != nil {
+ if err := rhp.adapter.Delete_device(ctx, device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
@@ -304,10 +305,10 @@
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Update_flows_bulk(args []*ic.Argument) (*empty.Empty, error) {
- logger.Debug("Update_flows_bulk")
+func (rhp *RequestHandlerProxy) Update_flows_bulk(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ logger.Debug(ctx, "Update_flows_bulk")
if len(args) < 5 {
- logger.Warn("Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -320,43 +321,43 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case "flows":
if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
- logger.Warnw("cannot-unmarshal-flows", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-flows", log.Fields{"error": err})
return nil, err
}
case "groups":
if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
- logger.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-groups", log.Fields{"error": err})
return nil, err
}
case "flow_metadata":
if err := ptypes.UnmarshalAny(arg.Value, flowMetadata); err != nil {
- logger.Warnw("cannot-unmarshal-metadata", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-metadata", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
+ logger.Debugw(ctx, "Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
//Invoke the bulk flow update API of the adapter
- if err := rhp.adapter.Update_flows_bulk(device, flows, groups, flowMetadata); err != nil {
+ if err := rhp.adapter.Update_flows_bulk(ctx, device, flows, groups, flowMetadata); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Update_flows_incrementally(args []*ic.Argument) (*empty.Empty, error) {
- logger.Debug("Update_flows_incrementally")
+func (rhp *RequestHandlerProxy) Update_flows_incrementally(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ logger.Debug(ctx, "Update_flows_incrementally")
if len(args) < 5 {
- logger.Warn("Update_flows_incrementally-invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "Update_flows_incrementally-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -369,43 +370,43 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case "flow_changes":
if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
- logger.Warnw("cannot-unmarshal-flows", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-flows", log.Fields{"error": err})
return nil, err
}
case "group_changes":
if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
- logger.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-groups", log.Fields{"error": err})
return nil, err
}
case "flow_metadata":
if err := ptypes.UnmarshalAny(arg.Value, flowMetadata); err != nil {
- logger.Warnw("cannot-unmarshal-metadata", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-metadata", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw("Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
+ logger.Debugw(ctx, "Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
//Invoke the incremental flow update API of the adapter
- if err := rhp.adapter.Update_flows_incrementally(device, flows, groups, flowMetadata); err != nil {
+ if err := rhp.adapter.Update_flows_incrementally(ctx, device, flows, groups, flowMetadata); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Update_pm_config(args []*ic.Argument) (*empty.Empty, error) {
- logger.Debug("Update_pm_config")
+func (rhp *RequestHandlerProxy) Update_pm_config(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ logger.Debug(ctx, "Update_pm_config")
if len(args) < 2 {
- logger.Warn("Update_pm_config-invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "Update_pm_config-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -416,33 +417,33 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case "pm_configs":
if err := ptypes.UnmarshalAny(arg.Value, pmConfigs); err != nil {
- logger.Warnw("cannot-unmarshal-pm-configs", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-pm-configs", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw("Update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ logger.Debugw(ctx, "Update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
//Invoke the pm config update API of the adapter
- if err := rhp.adapter.Update_pm_config(device, pmConfigs); err != nil {
+ if err := rhp.adapter.Update_pm_config(ctx, device, pmConfigs); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Receive_packet_out(args []*ic.Argument) (*empty.Empty, error) {
- logger.Debugw("Receive_packet_out", log.Fields{"args": args})
+func (rhp *RequestHandlerProxy) Receive_packet_out(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ logger.Debugw(ctx, "Receive_packet_out", log.Fields{"args": args})
if len(args) < 3 {
- logger.Warn("Receive_packet_out-invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "Receive_packet_out-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -454,29 +455,29 @@
switch arg.Key {
case "deviceId":
if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
- logger.Warnw("cannot-unmarshal-deviceId", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-deviceId", log.Fields{"error": err})
return nil, err
}
case "outPort":
if err := ptypes.UnmarshalAny(arg.Value, egressPort); err != nil {
- logger.Warnw("cannot-unmarshal-egressPort", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-egressPort", log.Fields{"error": err})
return nil, err
}
case "packet":
if err := ptypes.UnmarshalAny(arg.Value, packet); err != nil {
- logger.Warnw("cannot-unmarshal-packet", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-packet", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw("Receive_packet_out", log.Fields{"deviceId": deviceId.Val, "outPort": egressPort, "packet": packet})
+ logger.Debugw(ctx, "Receive_packet_out", log.Fields{"deviceId": deviceId.Val, "outPort": egressPort, "packet": packet})
//Invoke the adopt device on the adapter
- if err := rhp.adapter.Receive_packet_out(deviceId.Val, int(egressPort.Val), packet); err != nil {
+ if err := rhp.adapter.Receive_packet_out(ctx, deviceId.Val, int(egressPort.Val), packet); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
@@ -490,9 +491,9 @@
return new(empty.Empty), nil
}
-func (rhp *RequestHandlerProxy) Get_ofp_device_info(args []*ic.Argument) (*ic.SwitchCapability, error) {
+func (rhp *RequestHandlerProxy) Get_ofp_device_info(ctx context.Context, args []*ic.Argument) (*ic.SwitchCapability, error) {
if len(args) < 2 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -502,31 +503,31 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
+ logger.Debugw(ctx, "Get_ofp_device_info", log.Fields{"deviceId": device.Id})
var cap *ic.SwitchCapability
var err error
- if cap, err = rhp.adapter.Get_ofp_device_info(device); err != nil {
+ if cap, err = rhp.adapter.Get_ofp_device_info(ctx, device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
- logger.Debugw("Get_ofp_device_info", log.Fields{"cap": cap})
+ logger.Debugw(ctx, "Get_ofp_device_info", log.Fields{"cap": cap})
return cap, nil
}
-func (rhp *RequestHandlerProxy) Process_inter_adapter_message(args []*ic.Argument) (*empty.Empty, error) {
+func (rhp *RequestHandlerProxy) Process_inter_adapter_message(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -536,21 +537,21 @@
switch arg.Key {
case "msg":
if err := ptypes.UnmarshalAny(arg.Value, iaMsg); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw("Process_inter_adapter_message", log.Fields{"msgId": iaMsg.Header.Id})
+ logger.Debugw(ctx, "Process_inter_adapter_message", log.Fields{"msgId": iaMsg.Header.Id})
//Invoke the inter adapter API on the handler
- if err := rhp.adapter.Process_inter_adapter_message(iaMsg); err != nil {
+ if err := rhp.adapter.Process_inter_adapter_message(ctx, iaMsg); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
@@ -577,30 +578,30 @@
return &voltha.ImageDownload{}, nil
}
-func (rhp *RequestHandlerProxy) Enable_port(args []*ic.Argument) error {
- logger.Debugw("enable_port", log.Fields{"args": args})
- deviceId, port, err := rhp.getEnableDisableParams(args)
+func (rhp *RequestHandlerProxy) Enable_port(ctx context.Context, args []*ic.Argument) error {
+ logger.Debugw(ctx, "enable_port", log.Fields{"args": args})
+ deviceId, port, err := rhp.getEnableDisableParams(ctx, args)
if err != nil {
- logger.Warnw("enable_port", log.Fields{"args": args, "deviceId": deviceId, "port": port})
+ logger.Warnw(ctx, "enable_port", log.Fields{"args": args, "deviceId": deviceId, "port": port})
return err
}
- return rhp.adapter.Enable_port(deviceId, port)
+ return rhp.adapter.Enable_port(ctx, deviceId, port)
}
-func (rhp *RequestHandlerProxy) Disable_port(args []*ic.Argument) error {
- logger.Debugw("disable_port", log.Fields{"args": args})
- deviceId, port, err := rhp.getEnableDisableParams(args)
+func (rhp *RequestHandlerProxy) Disable_port(ctx context.Context, args []*ic.Argument) error {
+ logger.Debugw(ctx, "disable_port", log.Fields{"args": args})
+ deviceId, port, err := rhp.getEnableDisableParams(ctx, args)
if err != nil {
- logger.Warnw("disable_port", log.Fields{"args": args, "deviceId": deviceId, "port": port})
+ logger.Warnw(ctx, "disable_port", log.Fields{"args": args, "deviceId": deviceId, "port": port})
return err
}
- return rhp.adapter.Disable_port(deviceId, port)
+ return rhp.adapter.Disable_port(ctx, deviceId, port)
}
-func (rhp *RequestHandlerProxy) getEnableDisableParams(args []*ic.Argument) (string, *voltha.Port, error) {
- logger.Debugw("getEnableDisableParams", log.Fields{"args": args})
+func (rhp *RequestHandlerProxy) getEnableDisableParams(ctx context.Context, args []*ic.Argument) (string, *voltha.Port, error) {
+ logger.Debugw(ctx, "getEnableDisableParams", log.Fields{"args": args})
if len(args) < 3 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
return "", nil, errors.New("invalid-number-of-args")
}
deviceId := &ic.StrType{}
@@ -609,12 +610,12 @@
switch arg.Key {
case "deviceId":
if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return "", nil, err
}
case "port":
if err := ptypes.UnmarshalAny(arg.Value, port); err != nil {
- logger.Warnw("cannot-unmarshal-port", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-port", log.Fields{"error": err})
return "", nil, err
}
}
@@ -622,9 +623,9 @@
return deviceId.Val, port, nil
}
-func (rhp *RequestHandlerProxy) Child_device_lost(args []*ic.Argument) error {
+func (rhp *RequestHandlerProxy) Child_device_lost(ctx context.Context, args []*ic.Argument) error {
if len(args) < 4 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
return errors.New("invalid-number-of-args")
}
@@ -636,22 +637,22 @@
switch arg.Key {
case "pDeviceId":
if err := ptypes.UnmarshalAny(arg.Value, pDeviceId); err != nil {
- logger.Warnw("cannot-unmarshal-parent-deviceId", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-parent-deviceId", log.Fields{"error": err})
return err
}
case "pPortNo":
if err := ptypes.UnmarshalAny(arg.Value, pPortNo); err != nil {
- logger.Warnw("cannot-unmarshal-port", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-port", log.Fields{"error": err})
return err
}
case "onuID":
if err := ptypes.UnmarshalAny(arg.Value, onuID); err != nil {
- logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
return err
}
}
@@ -659,15 +660,15 @@
//Update the core reference for that device
rhp.coreProxy.UpdateCoreReference(pDeviceId.Val, fromTopic.Val)
//Invoke the Child_device_lost API on the adapter
- if err := rhp.adapter.Child_device_lost(pDeviceId.Val, uint32(pPortNo.Val), uint32(onuID.Val)); err != nil {
+ if err := rhp.adapter.Child_device_lost(ctx, pDeviceId.Val, uint32(pPortNo.Val), uint32(onuID.Val)); err != nil {
return status.Errorf(codes.NotFound, "%s", err.Error())
}
return nil
}
-func (rhp *RequestHandlerProxy) Start_omci_test(args []*ic.Argument) (*ic.TestResponse, error) {
+func (rhp *RequestHandlerProxy) Start_omci_test(ctx context.Context, args []*ic.Argument) (*ic.TestResponse, error) {
if len(args) < 2 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -681,26 +682,26 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case "omcitestrequest":
if err := ptypes.UnmarshalAny(arg.Value, request); err != nil {
- logger.Warnw("cannot-unmarshal-omcitestrequest", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-omcitestrequest", log.Fields{"error": err})
return nil, err
}
}
}
- logger.Debugw("Start_omci_test", log.Fields{"device-id": device.Id, "req": request})
- result, err := rhp.adapter.Start_omci_test(device, request)
+ logger.Debugw(ctx, "Start_omci_test", log.Fields{"device-id": device.Id, "req": request})
+ result, err := rhp.adapter.Start_omci_test(ctx, device, request)
if err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return result, nil
}
-func (rhp *RequestHandlerProxy) Get_ext_value(args []*ic.Argument) (*voltha.ReturnValues, error) {
+func (rhp *RequestHandlerProxy) Get_ext_value(ctx context.Context, args []*ic.Argument) (*voltha.ReturnValues, error) {
if len(args) < 3 {
- logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
return nil, errors.New("invalid-number-of-args")
}
@@ -711,24 +712,24 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case "pDeviceId":
if err := ptypes.UnmarshalAny(arg.Value, pDeviceId); err != nil {
- logger.Warnw("cannot-unmarshal-parent-deviceId", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-parent-deviceId", log.Fields{"error": err})
return nil, err
}
case "valuetype":
if err := ptypes.UnmarshalAny(arg.Value, valuetype); err != nil {
- logger.Warnw("cannot-unmarshal-valuetype", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-valuetype", log.Fields{"error": err})
return nil, err
}
default:
- logger.Warnw("key-not-found", log.Fields{"arg.Key": arg.Key})
+ logger.Warnw(ctx, "key-not-found", log.Fields{"arg.Key": arg.Key})
}
}
//Invoke the Get_value API on the adapter
- return rhp.adapter.Get_ext_value(pDeviceId.Val, device, voltha.ValueType_Type(valuetype.Val))
+ return rhp.adapter.Get_ext_value(ctx, pDeviceId.Val, device, voltha.ValueType_Type(valuetype.Val))
}
diff --git a/pkg/adapters/common/utils.go b/pkg/adapters/common/utils.go
index 94e8bd6..3d91119 100644
--- a/pkg/adapters/common/utils.go
+++ b/pkg/adapters/common/utils.go
@@ -16,6 +16,7 @@
package common
import (
+ "context"
"fmt"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
@@ -75,7 +76,7 @@
return string(b)
}
-func ICProxyErrorCodeToGrpcErrorCode(icErr ic.ErrorCodeCodes) codes.Code {
+func ICProxyErrorCodeToGrpcErrorCode(ctx context.Context, icErr ic.ErrorCodeCodes) codes.Code {
switch icErr {
case ic.ErrorCode_INVALID_PARAMETERS:
return codes.InvalidArgument
@@ -84,7 +85,7 @@
case ic.ErrorCode_DEADLINE_EXCEEDED:
return codes.DeadlineExceeded
default:
- logger.Warnw("cannnot-map-ic-error-code-to-grpc-error-code", log.Fields{"err": icErr})
+ logger.Warnw(ctx, "cannnot-map-ic-error-code-to-grpc-error-code", log.Fields{"err": icErr})
return codes.Internal
}
}
diff --git a/pkg/adapters/common/utils_test.go b/pkg/adapters/common/utils_test.go
index 87594fd..a93dd2f 100644
--- a/pkg/adapters/common/utils_test.go
+++ b/pkg/adapters/common/utils_test.go
@@ -16,6 +16,7 @@
package common
import (
+ "context"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
@@ -72,12 +73,12 @@
}
func TestICProxyErrorCodeToGrpcErrorCode(t *testing.T) {
- unsupported := ICProxyErrorCodeToGrpcErrorCode(ic.ErrorCode_UNSUPPORTED_REQUEST)
+ unsupported := ICProxyErrorCodeToGrpcErrorCode(context.Background(), ic.ErrorCode_UNSUPPORTED_REQUEST)
assert.Equal(t, unsupported, codes.Unavailable)
- invalid := ICProxyErrorCodeToGrpcErrorCode(ic.ErrorCode_INVALID_PARAMETERS)
+ invalid := ICProxyErrorCodeToGrpcErrorCode(context.Background(), ic.ErrorCode_INVALID_PARAMETERS)
assert.Equal(t, invalid, codes.InvalidArgument)
- timeout := ICProxyErrorCodeToGrpcErrorCode(ic.ErrorCode_DEADLINE_EXCEEDED)
+ timeout := ICProxyErrorCodeToGrpcErrorCode(context.Background(), ic.ErrorCode_DEADLINE_EXCEEDED)
assert.Equal(t, timeout, codes.DeadlineExceeded)
}
diff --git a/pkg/adapters/iAdapter.go b/pkg/adapters/iAdapter.go
index 1e81890..ce0b791 100644
--- a/pkg/adapters/iAdapter.go
+++ b/pkg/adapters/iAdapter.go
@@ -16,6 +16,7 @@
package adapters
import (
+ "context"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
"github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -23,34 +24,34 @@
//IAdapter represents the set of APIs a voltha adapter has to support.
type IAdapter interface {
- Adapter_descriptor() error
- Device_types() (*voltha.DeviceTypes, error)
- Health() (*voltha.HealthStatus, error)
- Adopt_device(device *voltha.Device) error
- Reconcile_device(device *voltha.Device) error
- Abandon_device(device *voltha.Device) error
- Disable_device(device *voltha.Device) error
- Reenable_device(device *voltha.Device) error
- Reboot_device(device *voltha.Device) error
- Self_test_device(device *voltha.Device) error
- Delete_device(device *voltha.Device) error
- Get_device_details(device *voltha.Device) error
- Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error
- Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error
- Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error
- Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error
- Suppress_event(filter *voltha.EventFilter) error
- Unsuppress_event(filter *voltha.EventFilter) error
- Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error)
- Process_inter_adapter_message(msg *ic.InterAdapterMessage) error
- Download_image(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
- Get_image_download_status(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
- Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
- Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
- Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
- Enable_port(deviceId string, port *voltha.Port) error
- Disable_port(deviceId string, port *voltha.Port) error
- Child_device_lost(parentDeviceId string, parentPortNo uint32, onuID uint32) error
- Start_omci_test(device *voltha.Device, request *voltha.OmciTestRequest) (*voltha.TestResponse, error)
- Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error)
+ Adapter_descriptor(ctx context.Context) error
+ Device_types(ctx context.Context) (*voltha.DeviceTypes, error)
+ Health(ctx context.Context) (*voltha.HealthStatus, error)
+ Adopt_device(ctx context.Context, device *voltha.Device) error
+ Reconcile_device(ctx context.Context, device *voltha.Device) error
+ Abandon_device(ctx context.Context, device *voltha.Device) error
+ Disable_device(ctx context.Context, device *voltha.Device) error
+ Reenable_device(ctx context.Context, device *voltha.Device) error
+ Reboot_device(ctx context.Context, device *voltha.Device) error
+ Self_test_device(ctx context.Context, device *voltha.Device) error
+ Delete_device(ctx context.Context, device *voltha.Device) error
+ Get_device_details(ctx context.Context, device *voltha.Device) error
+ Update_flows_bulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error
+ Update_flows_incrementally(ctx context.Context, device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error
+ Update_pm_config(ctx context.Context, device *voltha.Device, pm_configs *voltha.PmConfigs) error
+ Receive_packet_out(ctx context.Context, deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error
+ Suppress_event(ctx context.Context, filter *voltha.EventFilter) error
+ Unsuppress_event(ctx context.Context, filter *voltha.EventFilter) error
+ Get_ofp_device_info(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error)
+ Process_inter_adapter_message(ctx context.Context, msg *ic.InterAdapterMessage) error
+ Download_image(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Get_image_download_status(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Cancel_image_download(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Activate_image_update(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Revert_image_update(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Enable_port(ctx context.Context, deviceId string, port *voltha.Port) error
+ Disable_port(ctx context.Context, deviceId string, port *voltha.Port) error
+ Child_device_lost(ctx context.Context, parentDeviceId string, parentPortNo uint32, onuID uint32) error
+ Start_omci_test(ctx context.Context, device *voltha.Device, request *voltha.OmciTestRequest) (*voltha.TestResponse, error)
+ Get_ext_value(ctx context.Context, deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error)
}