[VOL-3187]Pass Context down the execution call hierarchy across ofagent codebase
Change-Id: Ia5f2fa1509beefe0ddc427b83e39d2702782db8f
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
index b114d36..7ec9aae 100644
--- a/internal/pkg/ofagent/changeEvent.go
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -29,14 +29,14 @@
)
func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
- logger.Debug("receive-change-events-started")
+ logger.Debug(ctx, "receive-change-events-started")
// If we exit, assume disconnected
defer func() {
ofa.events <- ofaEventVolthaDisconnected
- logger.Debug("receive-change-events-finished")
+ logger.Debug(ctx, "receive-change-events-finished")
}()
if ofa.volthaClient == nil {
- logger.Error("no-voltha-connection")
+ logger.Error(ctx, "no-voltha-connection")
return
}
opt := grpc.EmptyCallOption{}
@@ -44,7 +44,7 @@
defer streamDone()
stream, err := ofa.volthaClient.Get().ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
if err != nil {
- logger.Errorw("Unable to establish Receive Change Event Stream",
+ logger.Errorw(ctx, "Unable to establish Receive Change Event Stream",
log.Fields{"error": err})
return
}
@@ -57,18 +57,18 @@
default:
ce, err := stream.Recv()
if err != nil {
- logger.Errorw("error receiving change event",
+ logger.Errorw(ctx, "error receiving change event",
log.Fields{"error": err})
break top
}
ofa.changeEventChannel <- ce
- logger.Debug("receive-change-event-queued")
+ logger.Debug(ctx, "receive-change-event-queued")
}
}
}
func (ofa *OFAgent) handleChangeEvents(ctx context.Context) {
- logger.Debug("handle-change-event-started")
+ logger.Debug(ctx, "handle-change-event-started")
top:
for {
@@ -78,7 +78,7 @@
case changeEvent := <-ofa.changeEventChannel:
deviceID := changeEvent.GetId()
portStatus := changeEvent.GetPortStatus()
- logger.Debugw("received-change-event",
+ logger.Debugw(ctx, "received-change-event",
log.Fields{
"device-id": deviceID,
"port-status": portStatus})
@@ -86,7 +86,7 @@
if portStatus == nil {
if logger.V(log.WarnLevel) {
js, _ := json.Marshal(changeEvent.GetEvent())
- logger.Warnw("Received change event that was not port status",
+ logger.Warnw(ctx, "Received change event that was not port status",
log.Fields{"ChangeEvent": js})
}
break
@@ -118,11 +118,11 @@
ofDesc.SetState(ofp.PortState(desc.GetState()))
ofDesc.SetSupported(ofp.PortFeatures(desc.GetSupported()))
ofPortStatus.SetDesc(*ofDesc)
- if err := ofa.getOFClient(deviceID).SendMessage(ofPortStatus); err != nil {
- logger.Errorw("handle-change-events-send-message", log.Fields{"error": err})
+ if err := ofa.getOFClient(ctx, deviceID).SendMessage(ctx, ofPortStatus); err != nil {
+ logger.Errorw(ctx, "handle-change-events-send-message", log.Fields{"error": err})
}
}
}
- logger.Debug("handle-change-event-finsihed")
+ logger.Debug(ctx, "handle-change-event-finsihed")
}
diff --git a/internal/pkg/ofagent/common.go b/internal/pkg/ofagent/common.go
index 66bbbcf..913a5b1 100644
--- a/internal/pkg/ofagent/common.go
+++ b/internal/pkg/ofagent/common.go
@@ -21,12 +21,12 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-var logger log.Logger
+var logger log.CLogger
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "ofagent"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "ofagent"})
if err != nil {
panic(err)
}
diff --git a/internal/pkg/ofagent/connection.go b/internal/pkg/ofagent/connection.go
index b84dc3c..4d9279b 100644
--- a/internal/pkg/ofagent/connection.go
+++ b/internal/pkg/ofagent/connection.go
@@ -27,9 +27,9 @@
"google.golang.org/grpc"
)
-func (ofa *OFAgent) establishConnectionToVoltha(p *probe.Probe) error {
+func (ofa *OFAgent) establishConnectionToVoltha(ctx context.Context, p *probe.Probe) error {
if p != nil {
- p.UpdateStatus("voltha", probe.ServiceStatusPreparing)
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusPreparing)
}
if ofa.volthaConnection != nil {
@@ -45,21 +45,21 @@
svc := voltha.NewVolthaServiceClient(conn)
if svc != nil {
if _, err = svc.GetVoltha(context.Background(), &empty.Empty{}); err == nil {
- logger.Debugw("Established connection to Voltha",
+ logger.Debugw(ctx, "Established connection to Voltha",
log.Fields{
"VolthaApiEndPoint": ofa.VolthaApiEndPoint,
})
ofa.volthaConnection = conn
ofa.volthaClient.Set(svc)
if p != nil {
- p.UpdateStatus("voltha", probe.ServiceStatusRunning)
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusRunning)
}
ofa.events <- ofaEventVolthaConnected
return nil
}
}
}
- logger.Warnw("Failed to connect to voltha",
+ logger.Warnw(ctx, "Failed to connect to voltha",
log.Fields{
"VolthaApiEndPoint": ofa.VolthaApiEndPoint,
"error": err.Error(),
@@ -72,7 +72,7 @@
}
}
if p != nil {
- p.UpdateStatus("voltha", probe.ServiceStatusFailed)
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusFailed)
}
return errors.New("failed-to-connect-to-voltha")
}
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
index f0ebd0e..9e0914f 100644
--- a/internal/pkg/ofagent/ofagent.go
+++ b/internal/pkg/ofagent/ofagent.go
@@ -61,7 +61,7 @@
changeEventChannel chan *voltha.ChangeEvent
}
-func NewOFAgent(config *OFAgent) (*OFAgent, error) {
+func NewOFAgent(ctx context.Context, config *OFAgent) (*OFAgent, error) {
ofa := OFAgent{
VolthaApiEndPoint: config.VolthaApiEndPoint,
OFControllerEndPoints: config.OFControllerEndPoints,
@@ -77,7 +77,7 @@
}
if ofa.DeviceListRefreshInterval <= 0 {
- logger.Warnw("device list refresh internal not valid, setting to default",
+ logger.Warnw(ctx, "device list refresh internal not valid, setting to default",
log.Fields{
"value": ofa.DeviceListRefreshInterval.String(),
"default": (1 * time.Minute).String()})
@@ -85,7 +85,7 @@
}
if ofa.ConnectionRetryDelay <= 0 {
- logger.Warnw("connection retry delay not value, setting to default",
+ logger.Warnw(ctx, "connection retry delay not value, setting to default",
log.Fields{
"value": ofa.ConnectionRetryDelay.String(),
"default": (3 * time.Second).String()})
@@ -98,7 +98,7 @@
// Run - make the inital connection to voltha and kicks off io streams
func (ofa *OFAgent) Run(ctx context.Context) {
- logger.Debugw("Starting GRPC - VOLTHA client",
+ logger.Debugw(ctx, "Starting GRPC - VOLTHA client",
log.Fields{
"voltha-endpoint": ofa.VolthaApiEndPoint,
"controller-endpoint": ofa.OFControllerEndPoints})
@@ -106,7 +106,7 @@
// If the context contains a k8s probe then register services
p := probe.GetProbeFromContext(ctx)
if p != nil {
- p.RegisterService("voltha")
+ p.RegisterService(ctx, "voltha")
}
ofa.events <- ofaEventStart
@@ -139,7 +139,7 @@
case event := <-ofa.events:
switch event {
case ofaEventStart:
- logger.Debug("ofagent-voltha-start-event")
+ logger.Debug(ctx, "ofagent-voltha-start-event")
// Start the loops that process messages
hdlCtx, hdlDone = context.WithCancel(context.Background())
@@ -150,14 +150,14 @@
// connection to voltha
state = ofaStateConnecting
go func() {
- if err := ofa.establishConnectionToVoltha(p); err != nil {
- logger.Errorw("voltha-connection-failed", log.Fields{"error": err})
+ if err := ofa.establishConnectionToVoltha(ctx, p); err != nil {
+ logger.Errorw(ctx, "voltha-connection-failed", log.Fields{"error": err})
panic(err)
}
}()
case ofaEventVolthaConnected:
- logger.Debug("ofagent-voltha-connect-event")
+ logger.Debug(ctx, "ofagent-voltha-connect-event")
// Start the loops that poll from voltha
if state != ofaStateConnected {
@@ -171,9 +171,9 @@
case ofaEventVolthaDisconnected:
if p != nil {
- p.UpdateStatus("voltha", probe.ServiceStatusNotReady)
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusNotReady)
}
- logger.Debug("ofagent-voltha-disconnect-event")
+ logger.Debug(ctx, "ofagent-voltha-disconnect-event")
if state == ofaStateConnected {
state = ofaStateDisconnected
ofa.volthaClient.Clear()
@@ -183,17 +183,17 @@
if state != ofaStateConnecting {
state = ofaStateConnecting
go func() {
- if err := ofa.establishConnectionToVoltha(p); err != nil {
- logger.Errorw("voltha-connection-failed", log.Fields{"error": err})
+ if err := ofa.establishConnectionToVoltha(ctx, p); err != nil {
+ logger.Errorw(ctx, "voltha-connection-failed", log.Fields{"error": err})
panic(err)
}
}()
}
case ofaEventError:
- logger.Debug("ofagent-error-event")
+ logger.Debug(ctx, "ofagent-error-event")
default:
- logger.Fatalw("ofagent-unknown-event",
+ logger.Fatalw(ctx, "ofagent-unknown-event",
log.Fields{"event": event})
}
}
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
index 3744307..2a49e94 100644
--- a/internal/pkg/ofagent/packetIn.go
+++ b/internal/pkg/ofagent/packetIn.go
@@ -31,14 +31,14 @@
)
func (ofa *OFAgent) receivePacketsIn(ctx context.Context) {
- logger.Debug("receive-packets-in-started")
+ logger.Debug(ctx, "receive-packets-in-started")
// If we exit, assume disconnected
defer func() {
ofa.events <- ofaEventVolthaDisconnected
- logger.Debug("receive-packets-in-finished")
+ logger.Debug(ctx, "receive-packets-in-finished")
}()
if ofa.volthaClient == nil {
- logger.Error("no-voltha-connection")
+ logger.Error(ctx, "no-voltha-connection")
return
}
opt := grpc.EmptyCallOption{}
@@ -46,7 +46,7 @@
defer streamDone()
stream, err := ofa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
if err != nil {
- logger.Errorw("Unable to establish Receive PacketIn Stream",
+ logger.Errorw(ctx, "Unable to establish Receive PacketIn Stream",
log.Fields{"error": err})
return
}
@@ -60,7 +60,7 @@
default:
pkt, err := stream.Recv()
if err != nil {
- logger.Errorw("error receiving packet",
+ logger.Errorw(ctx, "error receiving packet",
log.Fields{"error": err})
break top
}
@@ -70,7 +70,7 @@
}
func (ofa *OFAgent) handlePacketsIn(ctx context.Context) {
- logger.Debug("handle-packets-in-started")
+ logger.Debug(ctx, "handle-packets-in-started")
top:
for {
select {
@@ -81,7 +81,7 @@
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(packetIn)
- logger.Debugw("packet-in received", log.Fields{"packet-in": js})
+ logger.Debugw(ctx, "packet-in received", log.Fields{"packet-in": js})
}
deviceID := packet.GetId()
ofPacketIn := ofp.NewPacketIn()
@@ -140,7 +140,7 @@
fields = append(fields, ofpVlanVid)
default:
- logger.Warnw("receive-packet-in:unhandled-oxm-field",
+ logger.Warnw(ctx, "receive-packet-in:unhandled-oxm-field",
log.Fields{"field": ofbField.Type})
}
}
@@ -150,14 +150,14 @@
ofPacketIn.SetMatch(*match)
ofPacketIn.SetReason(uint8(packetIn.GetReason()))
ofPacketIn.SetTableId(uint8(packetIn.GetTableId()))
- ofc := ofa.getOFClient(deviceID)
- if err := ofc.SendMessage(ofPacketIn); err != nil {
- logger.Errorw("send-message-failed", log.Fields{
+ ofc := ofa.getOFClient(ctx, deviceID)
+ if err := ofc.SendMessage(ctx, ofPacketIn); err != nil {
+ logger.Errorw(ctx, "send-message-failed", log.Fields{
"device-id": deviceID,
"error": err})
}
}
}
- logger.Debug("handle-packets-in-finished")
+ logger.Debug(ctx, "handle-packets-in-finished")
}
diff --git a/internal/pkg/ofagent/packetOut.go b/internal/pkg/ofagent/packetOut.go
index ffeafec..d4b6a73 100644
--- a/internal/pkg/ofagent/packetOut.go
+++ b/internal/pkg/ofagent/packetOut.go
@@ -25,14 +25,14 @@
)
func (ofa *OFAgent) streamPacketOut(ctx context.Context) {
- logger.Debug("packet-out-started")
+ logger.Debug(ctx, "packet-out-started")
// If we exit, assume disconnected
defer func() {
ofa.events <- ofaEventVolthaDisconnected
- logger.Debug("packet-out-finished")
+ logger.Debug(ctx, "packet-out-finished")
}()
if ofa.volthaClient == nil {
- logger.Error("no-voltha-connection")
+ logger.Error(ctx, "no-voltha-connection")
return
}
opt := grpc.EmptyCallOption{}
@@ -40,7 +40,7 @@
outClient, err := ofa.volthaClient.Get().StreamPacketsOut(streamCtx, opt)
defer streamDone()
if err != nil {
- logger.Errorw("streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
+ logger.Errorw(ctx, "streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
return
}
top:
@@ -51,14 +51,14 @@
case ofPacketOut := <-ofa.packetOutChannel:
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(ofPacketOut)
- logger.Debugw("streamPacketOut Receive PacketOut from Channel", log.Fields{"PacketOut": js})
+ logger.Debugw(ctx, "streamPacketOut Receive PacketOut from Channel", log.Fields{"PacketOut": js})
}
if err := outClient.Send(ofPacketOut); err != nil {
- logger.Errorw("packet-out-send-error",
+ logger.Errorw(ctx, "packet-out-send-error",
log.Fields{"error": err.Error()})
break top
}
- logger.Debug("packet-out-send")
+ logger.Debug(ctx, "packet-out-send")
}
}
}
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
index 1744578..43cbb86 100644
--- a/internal/pkg/ofagent/refresh.go
+++ b/internal/pkg/ofagent/refresh.go
@@ -27,7 +27,7 @@
func (ofa *OFAgent) synchronizeDeviceList(ctx context.Context) {
// Refresh once to get everything started
- ofa.refreshDeviceList()
+ ofa.refreshDeviceList(ctx)
tick := time.NewTicker(ofa.DeviceListRefreshInterval)
loop:
@@ -36,22 +36,22 @@
case <-ctx.Done():
break loop
case <-tick.C:
- ofa.refreshDeviceList()
+ ofa.refreshDeviceList(ctx)
}
}
tick.Stop()
}
-func (ofa *OFAgent) refreshDeviceList() {
+func (ofa *OFAgent) refreshDeviceList(ctx context.Context) {
// If we exit, assume disconnected
if ofa.volthaClient == nil {
- logger.Error("no-voltha-connection")
+ logger.Error(ctx, "no-voltha-connection")
ofa.events <- ofaEventVolthaDisconnected
return
}
deviceList, err := ofa.volthaClient.Get().ListLogicalDevices(context.Background(), &empty.Empty{})
if err != nil {
- logger.Errorw("ofagent failed to query device list from voltha",
+ logger.Errorw(ctx, "ofagent failed to query device list from voltha",
log.Fields{"error": err})
ofa.events <- ofaEventVolthaDisconnected
return
@@ -73,9 +73,9 @@
toDel = append(toDel, key)
}
}
- logger.Debugw("GrpcClient refreshDeviceList", log.Fields{"ToAdd": toAdd, "ToDel": toDel})
+ logger.Debugw(ctx, "GrpcClient refreshDeviceList", log.Fields{"ToAdd": toAdd, "ToDel": toDel})
for i := 0; i < len(toAdd); i++ {
- ofa.addOFClient(toAdd[i]) // client is started in addOFClient
+ ofa.addOFClient(ctx, toAdd[i]) // client is started in addOFClient
}
for i := 0; i < len(toDel); i++ {
ofa.clientMap[toDel[i]].Stop()
@@ -85,12 +85,12 @@
}
}
-func (ofa *OFAgent) addOFClient(deviceID string) *openflow.OFClient {
- logger.Debugw("GrpcClient addClient called ", log.Fields{"device-id": deviceID})
+func (ofa *OFAgent) addOFClient(ctx context.Context, deviceID string) *openflow.OFClient {
+ logger.Debugw(ctx, "GrpcClient addClient called ", log.Fields{"device-id": deviceID})
ofa.mapLock.Lock()
ofc := ofa.clientMap[deviceID]
if ofc == nil {
- ofc = openflow.NewOFClient(&openflow.OFClient{
+ ofc = openflow.NewOFClient(ctx, &openflow.OFClient{
DeviceID: deviceID,
OFControllerEndPoints: ofa.OFControllerEndPoints,
VolthaClient: ofa.volthaClient,
@@ -102,14 +102,14 @@
ofa.clientMap[deviceID] = ofc
}
ofa.mapLock.Unlock()
- logger.Debugw("Finished with addClient", log.Fields{"deviceID": deviceID})
+ logger.Debugw(ctx, "Finished with addClient", log.Fields{"deviceID": deviceID})
return ofc
}
-func (ofa *OFAgent) getOFClient(deviceID string) *openflow.OFClient {
+func (ofa *OFAgent) getOFClient(ctx context.Context, deviceID string) *openflow.OFClient {
ofc := ofa.clientMap[deviceID]
if ofc == nil {
- ofc = ofa.addOFClient(deviceID)
+ ofc = ofa.addOFClient(ctx, deviceID)
}
return ofc
}