[VOL-3187]Pass Context down the execution call hierarchy across ofagent codebase
Change-Id: Ia5f2fa1509beefe0ddc427b83e39d2702782db8f
diff --git a/internal/pkg/ofagent/changeEvent.go b/internal/pkg/ofagent/changeEvent.go
index b114d36..7ec9aae 100644
--- a/internal/pkg/ofagent/changeEvent.go
+++ b/internal/pkg/ofagent/changeEvent.go
@@ -29,14 +29,14 @@
)
func (ofa *OFAgent) receiveChangeEvents(ctx context.Context) {
- logger.Debug("receive-change-events-started")
+ logger.Debug(ctx, "receive-change-events-started")
// If we exit, assume disconnected
defer func() {
ofa.events <- ofaEventVolthaDisconnected
- logger.Debug("receive-change-events-finished")
+ logger.Debug(ctx, "receive-change-events-finished")
}()
if ofa.volthaClient == nil {
- logger.Error("no-voltha-connection")
+ logger.Error(ctx, "no-voltha-connection")
return
}
opt := grpc.EmptyCallOption{}
@@ -44,7 +44,7 @@
defer streamDone()
stream, err := ofa.volthaClient.Get().ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
if err != nil {
- logger.Errorw("Unable to establish Receive Change Event Stream",
+ logger.Errorw(ctx, "Unable to establish Receive Change Event Stream",
log.Fields{"error": err})
return
}
@@ -57,18 +57,18 @@
default:
ce, err := stream.Recv()
if err != nil {
- logger.Errorw("error receiving change event",
+ logger.Errorw(ctx, "error receiving change event",
log.Fields{"error": err})
break top
}
ofa.changeEventChannel <- ce
- logger.Debug("receive-change-event-queued")
+ logger.Debug(ctx, "receive-change-event-queued")
}
}
}
func (ofa *OFAgent) handleChangeEvents(ctx context.Context) {
- logger.Debug("handle-change-event-started")
+ logger.Debug(ctx, "handle-change-event-started")
top:
for {
@@ -78,7 +78,7 @@
case changeEvent := <-ofa.changeEventChannel:
deviceID := changeEvent.GetId()
portStatus := changeEvent.GetPortStatus()
- logger.Debugw("received-change-event",
+ logger.Debugw(ctx, "received-change-event",
log.Fields{
"device-id": deviceID,
"port-status": portStatus})
@@ -86,7 +86,7 @@
if portStatus == nil {
if logger.V(log.WarnLevel) {
js, _ := json.Marshal(changeEvent.GetEvent())
- logger.Warnw("Received change event that was not port status",
+ logger.Warnw(ctx, "Received change event that was not port status",
log.Fields{"ChangeEvent": js})
}
break
@@ -118,11 +118,11 @@
ofDesc.SetState(ofp.PortState(desc.GetState()))
ofDesc.SetSupported(ofp.PortFeatures(desc.GetSupported()))
ofPortStatus.SetDesc(*ofDesc)
- if err := ofa.getOFClient(deviceID).SendMessage(ofPortStatus); err != nil {
- logger.Errorw("handle-change-events-send-message", log.Fields{"error": err})
+ if err := ofa.getOFClient(ctx, deviceID).SendMessage(ctx, ofPortStatus); err != nil {
+ logger.Errorw(ctx, "handle-change-events-send-message", log.Fields{"error": err})
}
}
}
- logger.Debug("handle-change-event-finsihed")
+ logger.Debug(ctx, "handle-change-event-finsihed")
}
diff --git a/internal/pkg/ofagent/common.go b/internal/pkg/ofagent/common.go
index 66bbbcf..913a5b1 100644
--- a/internal/pkg/ofagent/common.go
+++ b/internal/pkg/ofagent/common.go
@@ -21,12 +21,12 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-var logger log.Logger
+var logger log.CLogger
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "ofagent"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "ofagent"})
if err != nil {
panic(err)
}
diff --git a/internal/pkg/ofagent/connection.go b/internal/pkg/ofagent/connection.go
index b84dc3c..4d9279b 100644
--- a/internal/pkg/ofagent/connection.go
+++ b/internal/pkg/ofagent/connection.go
@@ -27,9 +27,9 @@
"google.golang.org/grpc"
)
-func (ofa *OFAgent) establishConnectionToVoltha(p *probe.Probe) error {
+func (ofa *OFAgent) establishConnectionToVoltha(ctx context.Context, p *probe.Probe) error {
if p != nil {
- p.UpdateStatus("voltha", probe.ServiceStatusPreparing)
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusPreparing)
}
if ofa.volthaConnection != nil {
@@ -45,21 +45,21 @@
svc := voltha.NewVolthaServiceClient(conn)
if svc != nil {
if _, err = svc.GetVoltha(context.Background(), &empty.Empty{}); err == nil {
- logger.Debugw("Established connection to Voltha",
+ logger.Debugw(ctx, "Established connection to Voltha",
log.Fields{
"VolthaApiEndPoint": ofa.VolthaApiEndPoint,
})
ofa.volthaConnection = conn
ofa.volthaClient.Set(svc)
if p != nil {
- p.UpdateStatus("voltha", probe.ServiceStatusRunning)
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusRunning)
}
ofa.events <- ofaEventVolthaConnected
return nil
}
}
}
- logger.Warnw("Failed to connect to voltha",
+ logger.Warnw(ctx, "Failed to connect to voltha",
log.Fields{
"VolthaApiEndPoint": ofa.VolthaApiEndPoint,
"error": err.Error(),
@@ -72,7 +72,7 @@
}
}
if p != nil {
- p.UpdateStatus("voltha", probe.ServiceStatusFailed)
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusFailed)
}
return errors.New("failed-to-connect-to-voltha")
}
diff --git a/internal/pkg/ofagent/ofagent.go b/internal/pkg/ofagent/ofagent.go
index f0ebd0e..9e0914f 100644
--- a/internal/pkg/ofagent/ofagent.go
+++ b/internal/pkg/ofagent/ofagent.go
@@ -61,7 +61,7 @@
changeEventChannel chan *voltha.ChangeEvent
}
-func NewOFAgent(config *OFAgent) (*OFAgent, error) {
+func NewOFAgent(ctx context.Context, config *OFAgent) (*OFAgent, error) {
ofa := OFAgent{
VolthaApiEndPoint: config.VolthaApiEndPoint,
OFControllerEndPoints: config.OFControllerEndPoints,
@@ -77,7 +77,7 @@
}
if ofa.DeviceListRefreshInterval <= 0 {
- logger.Warnw("device list refresh internal not valid, setting to default",
+ logger.Warnw(ctx, "device list refresh internal not valid, setting to default",
log.Fields{
"value": ofa.DeviceListRefreshInterval.String(),
"default": (1 * time.Minute).String()})
@@ -85,7 +85,7 @@
}
if ofa.ConnectionRetryDelay <= 0 {
- logger.Warnw("connection retry delay not value, setting to default",
+ logger.Warnw(ctx, "connection retry delay not value, setting to default",
log.Fields{
"value": ofa.ConnectionRetryDelay.String(),
"default": (3 * time.Second).String()})
@@ -98,7 +98,7 @@
// Run - make the inital connection to voltha and kicks off io streams
func (ofa *OFAgent) Run(ctx context.Context) {
- logger.Debugw("Starting GRPC - VOLTHA client",
+ logger.Debugw(ctx, "Starting GRPC - VOLTHA client",
log.Fields{
"voltha-endpoint": ofa.VolthaApiEndPoint,
"controller-endpoint": ofa.OFControllerEndPoints})
@@ -106,7 +106,7 @@
// If the context contains a k8s probe then register services
p := probe.GetProbeFromContext(ctx)
if p != nil {
- p.RegisterService("voltha")
+ p.RegisterService(ctx, "voltha")
}
ofa.events <- ofaEventStart
@@ -139,7 +139,7 @@
case event := <-ofa.events:
switch event {
case ofaEventStart:
- logger.Debug("ofagent-voltha-start-event")
+ logger.Debug(ctx, "ofagent-voltha-start-event")
// Start the loops that process messages
hdlCtx, hdlDone = context.WithCancel(context.Background())
@@ -150,14 +150,14 @@
// connection to voltha
state = ofaStateConnecting
go func() {
- if err := ofa.establishConnectionToVoltha(p); err != nil {
- logger.Errorw("voltha-connection-failed", log.Fields{"error": err})
+ if err := ofa.establishConnectionToVoltha(ctx, p); err != nil {
+ logger.Errorw(ctx, "voltha-connection-failed", log.Fields{"error": err})
panic(err)
}
}()
case ofaEventVolthaConnected:
- logger.Debug("ofagent-voltha-connect-event")
+ logger.Debug(ctx, "ofagent-voltha-connect-event")
// Start the loops that poll from voltha
if state != ofaStateConnected {
@@ -171,9 +171,9 @@
case ofaEventVolthaDisconnected:
if p != nil {
- p.UpdateStatus("voltha", probe.ServiceStatusNotReady)
+ p.UpdateStatus(ctx, "voltha", probe.ServiceStatusNotReady)
}
- logger.Debug("ofagent-voltha-disconnect-event")
+ logger.Debug(ctx, "ofagent-voltha-disconnect-event")
if state == ofaStateConnected {
state = ofaStateDisconnected
ofa.volthaClient.Clear()
@@ -183,17 +183,17 @@
if state != ofaStateConnecting {
state = ofaStateConnecting
go func() {
- if err := ofa.establishConnectionToVoltha(p); err != nil {
- logger.Errorw("voltha-connection-failed", log.Fields{"error": err})
+ if err := ofa.establishConnectionToVoltha(ctx, p); err != nil {
+ logger.Errorw(ctx, "voltha-connection-failed", log.Fields{"error": err})
panic(err)
}
}()
}
case ofaEventError:
- logger.Debug("ofagent-error-event")
+ logger.Debug(ctx, "ofagent-error-event")
default:
- logger.Fatalw("ofagent-unknown-event",
+ logger.Fatalw(ctx, "ofagent-unknown-event",
log.Fields{"event": event})
}
}
diff --git a/internal/pkg/ofagent/packetIn.go b/internal/pkg/ofagent/packetIn.go
index 3744307..2a49e94 100644
--- a/internal/pkg/ofagent/packetIn.go
+++ b/internal/pkg/ofagent/packetIn.go
@@ -31,14 +31,14 @@
)
func (ofa *OFAgent) receivePacketsIn(ctx context.Context) {
- logger.Debug("receive-packets-in-started")
+ logger.Debug(ctx, "receive-packets-in-started")
// If we exit, assume disconnected
defer func() {
ofa.events <- ofaEventVolthaDisconnected
- logger.Debug("receive-packets-in-finished")
+ logger.Debug(ctx, "receive-packets-in-finished")
}()
if ofa.volthaClient == nil {
- logger.Error("no-voltha-connection")
+ logger.Error(ctx, "no-voltha-connection")
return
}
opt := grpc.EmptyCallOption{}
@@ -46,7 +46,7 @@
defer streamDone()
stream, err := ofa.volthaClient.Get().ReceivePacketsIn(streamCtx, &empty.Empty{}, opt)
if err != nil {
- logger.Errorw("Unable to establish Receive PacketIn Stream",
+ logger.Errorw(ctx, "Unable to establish Receive PacketIn Stream",
log.Fields{"error": err})
return
}
@@ -60,7 +60,7 @@
default:
pkt, err := stream.Recv()
if err != nil {
- logger.Errorw("error receiving packet",
+ logger.Errorw(ctx, "error receiving packet",
log.Fields{"error": err})
break top
}
@@ -70,7 +70,7 @@
}
func (ofa *OFAgent) handlePacketsIn(ctx context.Context) {
- logger.Debug("handle-packets-in-started")
+ logger.Debug(ctx, "handle-packets-in-started")
top:
for {
select {
@@ -81,7 +81,7 @@
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(packetIn)
- logger.Debugw("packet-in received", log.Fields{"packet-in": js})
+ logger.Debugw(ctx, "packet-in received", log.Fields{"packet-in": js})
}
deviceID := packet.GetId()
ofPacketIn := ofp.NewPacketIn()
@@ -140,7 +140,7 @@
fields = append(fields, ofpVlanVid)
default:
- logger.Warnw("receive-packet-in:unhandled-oxm-field",
+ logger.Warnw(ctx, "receive-packet-in:unhandled-oxm-field",
log.Fields{"field": ofbField.Type})
}
}
@@ -150,14 +150,14 @@
ofPacketIn.SetMatch(*match)
ofPacketIn.SetReason(uint8(packetIn.GetReason()))
ofPacketIn.SetTableId(uint8(packetIn.GetTableId()))
- ofc := ofa.getOFClient(deviceID)
- if err := ofc.SendMessage(ofPacketIn); err != nil {
- logger.Errorw("send-message-failed", log.Fields{
+ ofc := ofa.getOFClient(ctx, deviceID)
+ if err := ofc.SendMessage(ctx, ofPacketIn); err != nil {
+ logger.Errorw(ctx, "send-message-failed", log.Fields{
"device-id": deviceID,
"error": err})
}
}
}
- logger.Debug("handle-packets-in-finished")
+ logger.Debug(ctx, "handle-packets-in-finished")
}
diff --git a/internal/pkg/ofagent/packetOut.go b/internal/pkg/ofagent/packetOut.go
index ffeafec..d4b6a73 100644
--- a/internal/pkg/ofagent/packetOut.go
+++ b/internal/pkg/ofagent/packetOut.go
@@ -25,14 +25,14 @@
)
func (ofa *OFAgent) streamPacketOut(ctx context.Context) {
- logger.Debug("packet-out-started")
+ logger.Debug(ctx, "packet-out-started")
// If we exit, assume disconnected
defer func() {
ofa.events <- ofaEventVolthaDisconnected
- logger.Debug("packet-out-finished")
+ logger.Debug(ctx, "packet-out-finished")
}()
if ofa.volthaClient == nil {
- logger.Error("no-voltha-connection")
+ logger.Error(ctx, "no-voltha-connection")
return
}
opt := grpc.EmptyCallOption{}
@@ -40,7 +40,7 @@
outClient, err := ofa.volthaClient.Get().StreamPacketsOut(streamCtx, opt)
defer streamDone()
if err != nil {
- logger.Errorw("streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
+ logger.Errorw(ctx, "streamPacketOut Error creating packetout stream ", log.Fields{"error": err})
return
}
top:
@@ -51,14 +51,14 @@
case ofPacketOut := <-ofa.packetOutChannel:
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(ofPacketOut)
- logger.Debugw("streamPacketOut Receive PacketOut from Channel", log.Fields{"PacketOut": js})
+ logger.Debugw(ctx, "streamPacketOut Receive PacketOut from Channel", log.Fields{"PacketOut": js})
}
if err := outClient.Send(ofPacketOut); err != nil {
- logger.Errorw("packet-out-send-error",
+ logger.Errorw(ctx, "packet-out-send-error",
log.Fields{"error": err.Error()})
break top
}
- logger.Debug("packet-out-send")
+ logger.Debug(ctx, "packet-out-send")
}
}
}
diff --git a/internal/pkg/ofagent/refresh.go b/internal/pkg/ofagent/refresh.go
index 1744578..43cbb86 100644
--- a/internal/pkg/ofagent/refresh.go
+++ b/internal/pkg/ofagent/refresh.go
@@ -27,7 +27,7 @@
func (ofa *OFAgent) synchronizeDeviceList(ctx context.Context) {
// Refresh once to get everything started
- ofa.refreshDeviceList()
+ ofa.refreshDeviceList(ctx)
tick := time.NewTicker(ofa.DeviceListRefreshInterval)
loop:
@@ -36,22 +36,22 @@
case <-ctx.Done():
break loop
case <-tick.C:
- ofa.refreshDeviceList()
+ ofa.refreshDeviceList(ctx)
}
}
tick.Stop()
}
-func (ofa *OFAgent) refreshDeviceList() {
+func (ofa *OFAgent) refreshDeviceList(ctx context.Context) {
// If we exit, assume disconnected
if ofa.volthaClient == nil {
- logger.Error("no-voltha-connection")
+ logger.Error(ctx, "no-voltha-connection")
ofa.events <- ofaEventVolthaDisconnected
return
}
deviceList, err := ofa.volthaClient.Get().ListLogicalDevices(context.Background(), &empty.Empty{})
if err != nil {
- logger.Errorw("ofagent failed to query device list from voltha",
+ logger.Errorw(ctx, "ofagent failed to query device list from voltha",
log.Fields{"error": err})
ofa.events <- ofaEventVolthaDisconnected
return
@@ -73,9 +73,9 @@
toDel = append(toDel, key)
}
}
- logger.Debugw("GrpcClient refreshDeviceList", log.Fields{"ToAdd": toAdd, "ToDel": toDel})
+ logger.Debugw(ctx, "GrpcClient refreshDeviceList", log.Fields{"ToAdd": toAdd, "ToDel": toDel})
for i := 0; i < len(toAdd); i++ {
- ofa.addOFClient(toAdd[i]) // client is started in addOFClient
+ ofa.addOFClient(ctx, toAdd[i]) // client is started in addOFClient
}
for i := 0; i < len(toDel); i++ {
ofa.clientMap[toDel[i]].Stop()
@@ -85,12 +85,12 @@
}
}
-func (ofa *OFAgent) addOFClient(deviceID string) *openflow.OFClient {
- logger.Debugw("GrpcClient addClient called ", log.Fields{"device-id": deviceID})
+func (ofa *OFAgent) addOFClient(ctx context.Context, deviceID string) *openflow.OFClient {
+ logger.Debugw(ctx, "GrpcClient addClient called ", log.Fields{"device-id": deviceID})
ofa.mapLock.Lock()
ofc := ofa.clientMap[deviceID]
if ofc == nil {
- ofc = openflow.NewOFClient(&openflow.OFClient{
+ ofc = openflow.NewOFClient(ctx, &openflow.OFClient{
DeviceID: deviceID,
OFControllerEndPoints: ofa.OFControllerEndPoints,
VolthaClient: ofa.volthaClient,
@@ -102,14 +102,14 @@
ofa.clientMap[deviceID] = ofc
}
ofa.mapLock.Unlock()
- logger.Debugw("Finished with addClient", log.Fields{"deviceID": deviceID})
+ logger.Debugw(ctx, "Finished with addClient", log.Fields{"deviceID": deviceID})
return ofc
}
-func (ofa *OFAgent) getOFClient(deviceID string) *openflow.OFClient {
+func (ofa *OFAgent) getOFClient(ctx context.Context, deviceID string) *openflow.OFClient {
ofc := ofa.clientMap[deviceID]
if ofc == nil {
- ofc = ofa.addOFClient(deviceID)
+ ofc = ofa.addOFClient(ctx, deviceID)
}
return ofc
}
diff --git a/internal/pkg/openflow/barrier.go b/internal/pkg/openflow/barrier.go
index deba220..7c7bf5e 100644
--- a/internal/pkg/openflow/barrier.go
+++ b/internal/pkg/openflow/barrier.go
@@ -17,16 +17,17 @@
package openflow
import (
+ "context"
"encoding/json"
ofp "github.com/opencord/goloxi/of13"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFConnection) handleBarrierRequest(request *ofp.BarrierRequest) {
+func (ofc *OFConnection) handleBarrierRequest(ctx context.Context, request *ofp.BarrierRequest) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
- logger.Debugw("handleBarrierRequest called with %s",
+ logger.Debugw(ctx, "handleBarrierRequest called with %s",
log.Fields{
"device-id": ofc.DeviceID,
"request": js})
@@ -34,7 +35,7 @@
reply := ofp.NewBarrierReply()
reply.SetVersion(4)
reply.SetXid(request.GetXid())
- if err := ofc.SendMessage(reply); err != nil {
- logger.Errorw("barrier-request-send-message", log.Fields{"error": err})
+ if err := ofc.SendMessage(ctx, reply); err != nil {
+ logger.Errorw(ctx, "barrier-request-send-message", log.Fields{"error": err})
}
}
diff --git a/internal/pkg/openflow/client.go b/internal/pkg/openflow/client.go
index 635c712..86bb6b9 100644
--- a/internal/pkg/openflow/client.go
+++ b/internal/pkg/openflow/client.go
@@ -114,7 +114,7 @@
}
type RoleManager interface {
- UpdateRoles(from string, request *ofp.RoleRequest) bool
+ UpdateRoles(ctx context.Context, from string, request *ofp.RoleRequest) bool
}
func distance(a uint64, b uint64) int64 {
@@ -122,8 +122,8 @@
}
// UpdateRoles validates a role request and updates role state for connections where it changed
-func (ofc *OFClient) UpdateRoles(from string, request *ofp.RoleRequest) bool {
- log.Debug("updating role", log.Fields{
+func (ofc *OFClient) UpdateRoles(ctx context.Context, from string, request *ofp.RoleRequest) bool {
+ logger.Debug(ctx, "updating role", log.Fields{
"from": from,
"to": request.Role,
"id": request.GenerationId})
@@ -168,7 +168,7 @@
// NewClient returns an initialized OFClient instance based on the configuration
// specified
-func NewOFClient(config *OFClient) *OFClient {
+func NewOFClient(ctx context.Context, config *OFClient) *OFClient {
ofc := OFClient{
DeviceID: config.DeviceID,
@@ -184,7 +184,7 @@
}
if ofc.ConnectionRetryDelay <= 0 {
- logger.Warnw("connection retry delay not valid, setting to default",
+ logger.Warnw(ctx, "connection retry delay not valid, setting to default",
log.Fields{
"device-id": ofc.DeviceID,
"value": ofc.ConnectionRetryDelay.String(),
@@ -228,10 +228,10 @@
}
}
-func (ofc *OFClient) SendMessage(message Message) error {
+func (ofc *OFClient) SendMessage(ctx context.Context, message Message) error {
for _, connection := range ofc.connections {
if connection.role == ofcRoleMaster || connection.role == ofcRoleEqual {
- err := connection.SendMessage(message)
+ err := connection.SendMessage(ctx, message)
if err != nil {
return err
}
diff --git a/internal/pkg/openflow/common.go b/internal/pkg/openflow/common.go
index f737ad6..2510c42 100644
--- a/internal/pkg/openflow/common.go
+++ b/internal/pkg/openflow/common.go
@@ -21,12 +21,12 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-var logger log.Logger
+var logger log.CLogger
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "openflow"})
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "openflow"})
if err != nil {
panic(err)
}
diff --git a/internal/pkg/openflow/connection.go b/internal/pkg/openflow/connection.go
index fd0aa48..aafe741 100644
--- a/internal/pkg/openflow/connection.go
+++ b/internal/pkg/openflow/connection.go
@@ -67,9 +67,9 @@
return &header, nil
}
-func (ofc *OFConnection) establishConnectionToController() error {
+func (ofc *OFConnection) establishConnectionToController(ctx context.Context) error {
if ofc.conn != nil {
- logger.Debugw("closing-of-connection-to-reconnect",
+ logger.Debugw(ctx, "closing-of-connection-to-reconnect",
log.Fields{"device-id": ofc.DeviceID})
ofc.conn.Close()
ofc.conn = nil
@@ -77,18 +77,18 @@
try := 1
for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
- logger.Debugw("openflow-client unable to resolve endpoint",
+ logger.Debugw(ctx, "openflow-client unable to resolve endpoint",
log.Fields{
"device-id": ofc.DeviceID,
"endpoint": ofc.OFControllerEndPoint})
} else {
if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
ofc.conn = connection
- ofc.sayHello()
+ ofc.sayHello(ctx)
ofc.events <- ofcEventConnect
return nil
} else {
- logger.Warnw("openflow-client-connect-error",
+ logger.Warnw(ctx, "openflow-client-connect-error",
log.Fields{
"device-id": ofc.DeviceID,
"endpoint": ofc.OFControllerEndPoint})
@@ -117,33 +117,33 @@
select {
case <-ctx.Done():
state = ofcStateStopped
- logger.Debugw("state-transition-context-done",
+ logger.Debugw(ctx, "state-transition-context-done",
log.Fields{"device-id": ofc.DeviceID})
break top
case event := <-ofc.events:
previous := state
switch event {
case ofcEventStart:
- logger.Debugw("ofc-event-start",
+ logger.Debugw(ctx, "ofc-event-start",
log.Fields{"device-id": ofc.DeviceID})
if state == ofcStateCreated {
state = ofcStateStarted
- logger.Debug("STARTED MORE THAN ONCE")
+ logger.Debug(ctx, "STARTED MORE THAN ONCE")
go func() {
- if err := ofc.establishConnectionToController(); err != nil {
- logger.Errorw("controller-connection-failed", log.Fields{"error": err})
+ if err := ofc.establishConnectionToController(ctx); err != nil {
+ logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
panic(err)
}
}()
} else {
- logger.Errorw("illegal-state-transition",
+ logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
case ofcEventConnect:
- logger.Debugw("ofc-event-connected",
+ logger.Debugw(ctx, "ofc-event-connected",
log.Fields{"device-id": ofc.DeviceID})
if state == ofcStateStarted || state == ofcStateDisconnected {
state = ofcStateConnected
@@ -151,14 +151,14 @@
go ofc.messageSender(ofCtx)
go ofc.processOFStream(ofCtx)
} else {
- logger.Errorw("illegal-state-transition",
+ logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
case ofcEventDisconnect:
- logger.Debugw("ofc-event-disconnected",
+ logger.Debugw(ctx, "ofc-event-disconnected",
log.Fields{
"device-id": ofc.DeviceID,
"state": state.String()})
@@ -169,33 +169,33 @@
ofDone = nil
}
go func() {
- if err := ofc.establishConnectionToController(); err != nil {
+ if err := ofc.establishConnectionToController(ctx); err != nil {
log.Errorw("controller-connection-failed", log.Fields{"error": err})
panic(err)
}
}()
} else {
- logger.Errorw("illegal-state-transition",
+ logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
case ofcEventStop:
- logger.Debugw("ofc-event-stop",
+ logger.Debugw(ctx, "ofc-event-stop",
log.Fields{"device-id": ofc.DeviceID})
if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
state = ofcStateStopped
break top
} else {
- logger.Errorw("illegal-state-transition",
+ logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
}
- logger.Debugw("state-transition",
+ logger.Debugw(ctx, "state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"previous-state": previous.String(),
@@ -237,20 +237,20 @@
for {
select {
case <-ctx.Done():
- logger.Error("of-loop-ending-context-done")
+ logger.Error(ctx, "of-loop-ending-context-done")
break top
default:
// Read 8 bytes, the standard OF header
read, err := io.ReadFull(fromController, headerBuf)
if err != nil {
if err == io.EOF {
- logger.Infow("controller-disconnected",
+ logger.Infow(ctx, "controller-disconnected",
log.Fields{
"device-id": ofc.DeviceID,
"controller": ofc.OFControllerEndPoint,
})
} else {
- logger.Errorw("bad-of-header",
+ logger.Errorw(ctx, "bad-of-header",
log.Fields{
"byte-count": read,
"device-id": ofc.DeviceID,
@@ -267,7 +267,7 @@
* Header is bad, assume stream is corrupted
* and needs to be restarted
*/
- logger.Errorw("bad-of-packet",
+ logger.Errorw(ctx, "bad-of-packet",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
@@ -280,7 +280,7 @@
copy(messageBuf, headerBuf)
read, err = io.ReadFull(fromController, messageBuf[8:])
if err != nil {
- logger.Errorw("bad-of-packet",
+ logger.Errorw(ctx, "bad-of-packet",
log.Fields{
"byte-count": read,
"device-id": ofc.DeviceID,
@@ -294,7 +294,7 @@
if err != nil {
// nolint: staticcheck
js, _ := json.Marshal(decoder)
- logger.Errorw("failed-to-decode",
+ logger.Errorw(ctx, "failed-to-decode",
log.Fields{
"device-id": ofc.DeviceID,
"decoder": js,
@@ -303,7 +303,7 @@
}
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(msg)
- logger.Debugw("packet-header",
+ logger.Debugw(ctx, "packet-header",
log.Fields{
"device-id": ofc.DeviceID,
"header": js})
@@ -318,15 +318,15 @@
* with no guarantees of handling all messages before a barrier.
* A multiple queue (incoming worker and outgoing) is a possible solution.
*/
- go ofc.parseHeader(msg)
+ go ofc.parseHeader(ctx, msg)
}
}
- logger.Debugw("end-of-stream",
+ logger.Debugw(ctx, "end-of-stream",
log.Fields{"device-id": ofc.DeviceID})
ofc.events <- ofcEventDisconnect
}
-func (ofc *OFConnection) sayHello() {
+func (ofc *OFConnection) sayHello(ctx context.Context) {
hello := ofp.NewHello()
hello.Xid = uint32(GetXid())
elem := ofp.NewHelloElemVersionbitmap()
@@ -336,94 +336,94 @@
hello.SetElements([]ofp.IHelloElem{elem})
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(hello)
- logger.Debugw("sayHello Called",
+ logger.Debugw(ctx, "sayHello Called",
log.Fields{
"device-id": ofc.DeviceID,
"hello-message": js})
}
- if err := ofc.SendMessage(hello); err != nil {
- logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
+ if err := ofc.SendMessage(ctx, hello); err != nil {
+ logger.Fatalw(ctx, "Failed saying hello to Openflow Server, unable to proceed",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
}
}
-func (ofc *OFConnection) parseHeader(header ofp.IHeader) {
+func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader) {
headerType := header.GetType()
- logger.Debugw("packet-header-type",
+ logger.Debugw(ctx, "packet-header-type",
log.Fields{
"header-type": ofp.Type(headerType).String()})
switch headerType {
case ofp.OFPTHello:
//x := header.(*ofp.Hello)
case ofp.OFPTError:
- go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
+ go ofc.handleErrMsg(ctx, header.(*ofp.ErrorMsg))
case ofp.OFPTEchoRequest:
- go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
+ go ofc.handleEchoRequest(ctx, header.(*ofp.EchoRequest))
case ofp.OFPTEchoReply:
case ofp.OFPTExperimenter:
case ofp.OFPTFeaturesRequest:
go func() {
- if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
- logger.Errorw("handle-feature-request", log.Fields{"error": err})
+ if err := ofc.handleFeatureRequest(ctx, header.(*ofp.FeaturesRequest)); err != nil {
+ logger.Errorw(ctx, "handle-feature-request", log.Fields{"error": err})
}
}()
case ofp.OFPTFeaturesReply:
case ofp.OFPTGetConfigRequest:
- go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
+ go ofc.handleGetConfigRequest(ctx, header.(*ofp.GetConfigRequest))
case ofp.OFPTGetConfigReply:
case ofp.OFPTSetConfig:
- go ofc.handleSetConfig(header.(*ofp.SetConfig))
+ go ofc.handleSetConfig(ctx, header.(*ofp.SetConfig))
case ofp.OFPTPacketIn:
case ofp.OFPTFlowRemoved:
case ofp.OFPTPortStatus:
case ofp.OFPTPacketOut:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
- ofc.sendRoleSlaveError(header)
+ ofc.sendRoleSlaveError(ctx, header)
return
}
- go ofc.handlePacketOut(header.(*ofp.PacketOut))
+ go ofc.handlePacketOut(ctx, header.(*ofp.PacketOut))
case ofp.OFPTFlowMod:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
- ofc.sendRoleSlaveError(header)
+ ofc.sendRoleSlaveError(ctx, header)
return
}
switch header.(ofp.IFlowMod).GetCommand() {
case ofp.OFPFCAdd:
- ofc.handleFlowAdd(header.(*ofp.FlowAdd))
+ ofc.handleFlowAdd(ctx, header.(*ofp.FlowAdd))
case ofp.OFPFCModify:
- ofc.handleFlowMod(header.(*ofp.FlowMod))
+ ofc.handleFlowMod(ctx, header.(*ofp.FlowMod))
case ofp.OFPFCModifyStrict:
- ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
+ ofc.handleFlowModStrict(ctx, header.(*ofp.FlowModifyStrict))
case ofp.OFPFCDelete:
- ofc.handleFlowDelete(header.(*ofp.FlowDelete))
+ ofc.handleFlowDelete(ctx, header.(*ofp.FlowDelete))
case ofp.OFPFCDeleteStrict:
- ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
+ ofc.handleFlowDeleteStrict(ctx, header.(*ofp.FlowDeleteStrict))
}
case ofp.OFPTStatsRequest:
go func() {
- if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
- logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
+ if err := ofc.handleStatsRequest(ctx, header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
+ logger.Errorw(ctx, "ofpt-stats-request", log.Fields{"error": err})
}
}()
case ofp.OFPTBarrierRequest:
/* See note above at case ofp.OFPTFlowMod:*/
- ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
+ ofc.handleBarrierRequest(ctx, header.(*ofp.BarrierRequest))
case ofp.OFPTRoleRequest:
- go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
+ go ofc.handleRoleRequest(ctx, header.(*ofp.RoleRequest))
case ofp.OFPTMeterMod:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
- ofc.sendRoleSlaveError(header)
+ ofc.sendRoleSlaveError(ctx, header)
return
}
- ofc.handleMeterModRequest(header.(*ofp.MeterMod))
+ ofc.handleMeterModRequest(ctx, header.(*ofp.MeterMod))
case ofp.OFPTGroupMod:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
- ofc.sendRoleSlaveError(header)
+ ofc.sendRoleSlaveError(ctx, header)
return
}
- ofc.handleGroupMod(header.(ofp.IGroupMod))
+ ofc.handleGroupMod(ctx, header.(ofp.IGroupMod))
}
}
@@ -433,7 +433,7 @@
Serialize(encoder *goloxi.Encoder) error
}
-func (ofc *OFConnection) doSend(msg Message) error {
+func (ofc *OFConnection) doSend(ctx context.Context, msg Message) error {
if ofc.conn == nil {
return errors.New("no-connection")
}
@@ -444,7 +444,7 @@
bytes := enc.Bytes()
if _, err := ofc.conn.Write(bytes); err != nil {
- logger.Errorw("unable-to-send-message-to-controller",
+ logger.Errorw(ctx, "unable-to-send-message-to-controller",
log.Fields{
"device-id": ofc.DeviceID,
"message": msg,
@@ -457,7 +457,7 @@
func (ofc *OFConnection) messageSender(ctx context.Context) {
// first process last fail if it exists
if ofc.lastUnsentMessage != nil {
- if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
+ if err := ofc.doSend(ctx, ofc.lastUnsentMessage); err != nil {
ofc.events <- ofcEventDisconnect
return
}
@@ -469,30 +469,30 @@
case <-ctx.Done():
break top
case msg := <-ofc.sendChannel:
- if err := ofc.doSend(msg); err != nil {
+ if err := ofc.doSend(ctx, msg); err != nil {
ofc.lastUnsentMessage = msg
ofc.events <- ofcEventDisconnect
- logger.Debugw("message-sender-error",
+ logger.Debugw(ctx, "message-sender-error",
log.Fields{
"device-id": ofc.DeviceID,
"error": err.Error()})
break top
}
- logger.Debugw("message-sender-send",
+ logger.Debugw(ctx, "message-sender-send",
log.Fields{
"device-id": ofc.DeviceID})
ofc.lastUnsentMessage = nil
}
}
- logger.Debugw("message-sender-finished",
+ logger.Debugw(ctx, "message-sender-finished",
log.Fields{
"device-id": ofc.DeviceID})
}
// SendMessage queues a message to be sent to the openflow controller
-func (ofc *OFConnection) SendMessage(message Message) error {
- logger.Debug("queuing-message")
+func (ofc *OFConnection) SendMessage(ctx context.Context, message Message) error {
+ logger.Debug(ctx, "queuing-message")
ofc.sendChannel <- message
return nil
}
diff --git a/internal/pkg/openflow/echo.go b/internal/pkg/openflow/echo.go
index 3b1f774..e02e00f 100644
--- a/internal/pkg/openflow/echo.go
+++ b/internal/pkg/openflow/echo.go
@@ -17,15 +17,16 @@
package openflow
import (
+ "context"
"encoding/json"
ofp "github.com/opencord/goloxi/of13"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFConnection) handleEchoRequest(request *ofp.EchoRequest) {
+func (ofc *OFConnection) handleEchoRequest(ctx context.Context, request *ofp.EchoRequest) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
- logger.Debugw("handleEchoRequest called",
+ logger.Debugw(ctx, "handleEchoRequest called",
log.Fields{
"device-id": ofc.DeviceID,
"request": js})
@@ -33,7 +34,7 @@
reply := ofp.NewEchoReply()
reply.SetXid(request.GetXid())
reply.SetVersion(request.GetVersion())
- if err := ofc.SendMessage(reply); err != nil {
- logger.Errorw("handle-echo-request-send-message", log.Fields{"error": err})
+ if err := ofc.SendMessage(ctx, reply); err != nil {
+ logger.Errorw(ctx, "handle-echo-request-send-message", log.Fields{"error": err})
}
}
diff --git a/internal/pkg/openflow/error.go b/internal/pkg/openflow/error.go
index d7b4e43..cfb09a6 100644
--- a/internal/pkg/openflow/error.go
+++ b/internal/pkg/openflow/error.go
@@ -17,15 +17,16 @@
package openflow
import (
+ "context"
"encoding/json"
ofp "github.com/opencord/goloxi/of13"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFConnection) handleErrMsg(message ofp.IErrorMsg) {
+func (ofc *OFConnection) handleErrMsg(ctx context.Context, message ofp.IErrorMsg) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(message)
- logger.Debugw("handleErrMsg called",
+ logger.Debugw(ctx, "handleErrMsg called",
log.Fields{
"device-id": ofc.DeviceID,
"request": js})
diff --git a/internal/pkg/openflow/feature.go b/internal/pkg/openflow/feature.go
index d897958..7f3b557 100644
--- a/internal/pkg/openflow/feature.go
+++ b/internal/pkg/openflow/feature.go
@@ -25,10 +25,10 @@
"github.com/opencord/voltha-protos/v3/go/common"
)
-func (ofc *OFConnection) handleFeatureRequest(request *ofp.FeaturesRequest) error {
+func (ofc *OFConnection) handleFeatureRequest(ctx context.Context, request *ofp.FeaturesRequest) error {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
- logger.Debugw("handleFeatureRequest called",
+ logger.Debugw(ctx, "handleFeatureRequest called",
log.Fields{
"device-id": ofc.DeviceID,
"request": js})
@@ -55,11 +55,11 @@
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(reply)
- logger.Debugw("handleFeatureRequestReturn",
+ logger.Debugw(ctx, "handleFeatureRequestReturn",
log.Fields{
"device-id": ofc.DeviceID,
"reply": js})
}
- err = ofc.SendMessage(reply)
+ err = ofc.SendMessage(ctx, reply)
return err
}
diff --git a/internal/pkg/openflow/flowMod.go b/internal/pkg/openflow/flowMod.go
index 82cacd0..1a47d17 100644
--- a/internal/pkg/openflow/flowMod.go
+++ b/internal/pkg/openflow/flowMod.go
@@ -71,10 +71,10 @@
"vlan_vid_masked": 200, //made up
}
-func (ofc *OFConnection) handleFlowAdd(flowAdd *ofp.FlowAdd) {
+func (ofc *OFConnection) handleFlowAdd(ctx context.Context, flowAdd *ofp.FlowAdd) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowAdd)
- logger.Debugw("handleFlowAdd called",
+ logger.Debugw(ctx, "handleFlowAdd called",
log.Fields{
"device-id": ofc.DeviceID,
"params": js})
@@ -82,7 +82,7 @@
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
- logger.Errorw("no-voltha-connection",
+ logger.Errorw(ctx, "no-voltha-connection",
log.Fields{"device-id": ofc.DeviceID})
return
}
@@ -234,14 +234,14 @@
}
if logger.V(log.DebugLevel) {
flowUpdateJs, _ := json.Marshal(flowUpdate)
- logger.Debugf("FlowAdd being sent to Voltha",
+ logger.Debugf(ctx, "FlowAdd being sent to Voltha",
log.Fields{
"device-id": ofc.DeviceID,
"flow-mod-object": flowUpdate,
"flow-mod-request": flowUpdateJs})
}
if _, err := volthaClient.UpdateLogicalDeviceFlowTable(context.Background(), &flowUpdate); err != nil {
- logger.Errorw("Error calling FlowAdd ",
+ logger.Errorw(ctx, "Error calling FlowAdd ",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
@@ -267,9 +267,9 @@
binary.BigEndian.PutUint64(cookie, flowAdd.Cookie)
bs = append(bs, cookie...)
message.SetData(bs)
- err := ofc.SendMessage(message)
+ err := ofc.SendMessage(ctx, message)
if err != nil {
- logger.Errorw("Error reporting failure of FlowUpdate to controller",
+ logger.Errorw(ctx, "Error reporting failure of FlowUpdate to controller",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
@@ -277,47 +277,47 @@
}
}
-func (ofc *OFConnection) handleFlowMod(flowMod *ofp.FlowMod) {
+func (ofc *OFConnection) handleFlowMod(ctx context.Context, flowMod *ofp.FlowMod) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowMod)
- logger.Debugw("handleFlowMod called",
+ logger.Debugw(ctx, "handleFlowMod called",
log.Fields{
"device-id": ofc.DeviceID,
"flow-mod": js})
}
- logger.Errorw("handleFlowMod not implemented",
+ logger.Errorw(ctx, "handleFlowMod not implemented",
log.Fields{"device-id": ofc.DeviceID})
}
-func (ofc *OFConnection) handleFlowModStrict(flowModStrict *ofp.FlowModifyStrict) {
+func (ofc *OFConnection) handleFlowModStrict(ctx context.Context, flowModStrict *ofp.FlowModifyStrict) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowModStrict)
- logger.Debugw("handleFlowModStrict called",
+ logger.Debugw(ctx, "handleFlowModStrict called",
log.Fields{
"device-id": ofc.DeviceID,
"flow-mod-strict": js})
}
- logger.Error("handleFlowModStrict not implemented",
+ logger.Error(ctx, "handleFlowModStrict not implemented",
log.Fields{"device-id": ofc.DeviceID})
}
-func (ofc *OFConnection) handleFlowDelete(flowDelete *ofp.FlowDelete) {
+func (ofc *OFConnection) handleFlowDelete(ctx context.Context, flowDelete *ofp.FlowDelete) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowDelete)
- logger.Debugw("handleFlowDelete called",
+ logger.Debugw(ctx, "handleFlowDelete called",
log.Fields{
"device-id": ofc.DeviceID,
"flow-delete": js})
}
- logger.Error("handleFlowDelete not implemented",
+ logger.Error(ctx, "handleFlowDelete not implemented",
log.Fields{"device-id": ofc.DeviceID})
}
-func (ofc *OFConnection) handleFlowDeleteStrict(flowDeleteStrict *ofp.FlowDeleteStrict) {
+func (ofc *OFConnection) handleFlowDeleteStrict(ctx context.Context, flowDeleteStrict *ofp.FlowDeleteStrict) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(flowDeleteStrict)
- logger.Debugw("handleFlowDeleteStrict called",
+ logger.Debugw(ctx, "handleFlowDeleteStrict called",
log.Fields{
"device-id": ofc.DeviceID,
"flow-delete-strict": js})
@@ -325,7 +325,7 @@
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
- logger.Errorw("no-voltha-connection",
+ logger.Errorw(ctx, "no-voltha-connection",
log.Fields{"device-id": ofc.DeviceID})
return
}
@@ -438,13 +438,13 @@
if logger.V(log.DebugLevel) {
flowUpdateJs, _ := json.Marshal(flowUpdate)
- logger.Debugf("FlowDeleteStrict being sent to Voltha",
+ logger.Debugf(ctx, "FlowDeleteStrict being sent to Voltha",
log.Fields{
"device-id": ofc.DeviceID,
"flow-update": flowUpdateJs})
}
if _, err := volthaClient.UpdateLogicalDeviceFlowTable(context.Background(), &flowUpdate); err != nil {
- logger.Errorw("Error calling FlowDelete ",
+ logger.Errorw(ctx, "Error calling FlowDelete ",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
@@ -462,9 +462,9 @@
response.HardTimeout = flowDeleteStrict.HardTimeout
response.Xid = flowDeleteStrict.Xid
- err := ofc.SendMessage(response)
+ err := ofc.SendMessage(ctx, response)
if err != nil {
- logger.Errorw("Error sending FlowRemoved to ONOS",
+ logger.Errorw(ctx, "Error sending FlowRemoved to ONOS",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
diff --git a/internal/pkg/openflow/getConfig.go b/internal/pkg/openflow/getConfig.go
index 26f9f89..eef49eb 100644
--- a/internal/pkg/openflow/getConfig.go
+++ b/internal/pkg/openflow/getConfig.go
@@ -17,15 +17,16 @@
package openflow
import (
+ "context"
"encoding/json"
ofp "github.com/opencord/goloxi/of13"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFConnection) handleGetConfigRequest(request *ofp.GetConfigRequest) {
+func (ofc *OFConnection) handleGetConfigRequest(ctx context.Context, request *ofp.GetConfigRequest) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
- logger.Debugw("handleGetConfigRequest called",
+ logger.Debugw(ctx, "handleGetConfigRequest called",
log.Fields{
"device-id": ofc.DeviceID,
"request": js})
@@ -36,12 +37,12 @@
reply.SetMissSendLen(ofp.OFPCMLNoBuffer)
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(reply)
- logger.Debugw("handleGetConfigRequest reply",
+ logger.Debugw(ctx, "handleGetConfigRequest reply",
log.Fields{
"device-id": ofc.DeviceID,
"reply": js})
}
- if err := ofc.SendMessage(reply); err != nil {
- logger.Errorw("handle-get-config-request-send-message", log.Fields{"error": err})
+ if err := ofc.SendMessage(ctx, reply); err != nil {
+ logger.Errorw(ctx, "handle-get-config-request-send-message", log.Fields{"error": err})
}
}
diff --git a/internal/pkg/openflow/group.go b/internal/pkg/openflow/group.go
index b9d107b..425e99b 100644
--- a/internal/pkg/openflow/group.go
+++ b/internal/pkg/openflow/group.go
@@ -25,11 +25,11 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
)
-func (ofc *OFConnection) handleGroupMod(groupMod ofp.IGroupMod) {
+func (ofc *OFConnection) handleGroupMod(ctx context.Context, groupMod ofp.IGroupMod) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
- logger.Errorw("no-voltha-connection",
+ logger.Errorw(ctx, "no-voltha-connection",
log.Fields{"device-id": ofc.DeviceID})
return
}
@@ -37,8 +37,8 @@
groupUpdate := &openflow_13.FlowGroupTableUpdate{
Id: ofc.DeviceID,
GroupMod: &voltha.OfpGroupMod{
- Command: openflowGroupModCommandToVoltha(groupMod.GetCommand()),
- Type: openflowGroupTypeToVoltha(groupMod.GetGroupType()),
+ Command: openflowGroupModCommandToVoltha(ctx, groupMod.GetCommand()),
+ Type: openflowGroupTypeToVoltha(ctx, groupMod.GetGroupType()),
GroupId: groupMod.GetGroupId(),
Buckets: openflowBucketsToVoltha(groupMod.GetBuckets()),
},
@@ -46,13 +46,13 @@
_, err := volthaClient.UpdateLogicalDeviceFlowGroupTable(context.Background(), groupUpdate)
if err != nil {
- logger.Errorw("Error updating group table",
+ logger.Errorw(ctx, "Error updating group table",
log.Fields{"device-id": ofc.DeviceID, "error": err})
}
}
-func openflowGroupModCommandToVoltha(command ofp.GroupModCommand) openflow_13.OfpGroupModCommand {
+func openflowGroupModCommandToVoltha(ctx context.Context, command ofp.GroupModCommand) openflow_13.OfpGroupModCommand {
switch command {
case ofp.OFPGCAdd:
return openflow_13.OfpGroupModCommand_OFPGC_ADD
@@ -61,11 +61,11 @@
case ofp.OFPGCDelete:
return openflow_13.OfpGroupModCommand_OFPGC_DELETE
}
- logger.Errorw("Unknown group mod command", log.Fields{"command": command})
+ logger.Errorw(ctx, "Unknown group mod command", log.Fields{"command": command})
return 0
}
-func openflowGroupTypeToVoltha(t ofp.GroupType) openflow_13.OfpGroupType {
+func openflowGroupTypeToVoltha(ctx context.Context, t ofp.GroupType) openflow_13.OfpGroupType {
switch t {
case ofp.OFPGTAll:
return openflow_13.OfpGroupType_OFPGT_ALL
@@ -76,11 +76,11 @@
case ofp.OFPGTFf:
return openflow_13.OfpGroupType_OFPGT_FF
}
- logger.Errorw("Unknown openflow group type", log.Fields{"type": t})
+ logger.Errorw(ctx, "Unknown openflow group type", log.Fields{"type": t})
return 0
}
-func volthaGroupTypeToOpenflow(t openflow_13.OfpGroupType) ofp.GroupType {
+func volthaGroupTypeToOpenflow(ctx context.Context, t openflow_13.OfpGroupType) ofp.GroupType {
switch t {
case openflow_13.OfpGroupType_OFPGT_ALL:
return ofp.OFPGTAll
@@ -91,7 +91,7 @@
case openflow_13.OfpGroupType_OFPGT_FF:
return ofp.OFPGTFf
}
- logger.Errorw("Unknown voltha group type", log.Fields{"type": t})
+ logger.Errorw(ctx, "Unknown voltha group type", log.Fields{"type": t})
return 0
}
@@ -121,11 +121,11 @@
return outActions
}
-func volthaBucketsToOpenflow(buckets []*openflow_13.OfpBucket) []*ofp.Bucket {
+func volthaBucketsToOpenflow(ctx context.Context, buckets []*openflow_13.OfpBucket) []*ofp.Bucket {
outBuckets := make([]*ofp.Bucket, len(buckets))
for i, bucket := range buckets {
- actions := volthaActionsToOpenflow(bucket.Actions)
+ actions := volthaActionsToOpenflow(ctx, bucket.Actions)
b := &ofp.Bucket{
Weight: uint16(bucket.Weight),
WatchPort: ofp.Port(bucket.WatchPort),
@@ -138,11 +138,11 @@
return outBuckets
}
-func volthaActionsToOpenflow(actions []*openflow_13.OfpAction) []goloxi.IAction {
+func volthaActionsToOpenflow(ctx context.Context, actions []*openflow_13.OfpAction) []goloxi.IAction {
outActions := make([]goloxi.IAction, len(actions))
for i, action := range actions {
- outActions[i] = parseAction(action)
+ outActions[i] = parseAction(ctx, action)
}
return outActions
diff --git a/internal/pkg/openflow/meter.go b/internal/pkg/openflow/meter.go
index a6411bb..df1b533 100644
--- a/internal/pkg/openflow/meter.go
+++ b/internal/pkg/openflow/meter.go
@@ -24,10 +24,10 @@
"golang.org/x/net/context"
)
-func (ofc *OFConnection) handleMeterModRequest(request *ofp.MeterMod) {
+func (ofc *OFConnection) handleMeterModRequest(ctx context.Context, request *ofp.MeterMod) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
- logger.Debugw("handleMeterModRequest called",
+ logger.Debugw(ctx, "handleMeterModRequest called",
log.Fields{
"device-id": ofc.DeviceID,
"request": js})
@@ -35,7 +35,7 @@
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
- logger.Errorw("no-voltha-connection",
+ logger.Errorw(ctx, "no-voltha-connection",
log.Fields{"device-id": ofc.DeviceID})
return
}
@@ -88,13 +88,13 @@
meterModUpdate.MeterMod = &meterMod
if logger.V(log.DebugLevel) {
meterModJS, _ := json.Marshal(meterModUpdate)
- logger.Debugw("handleMeterModUpdate sending request",
+ logger.Debugw(ctx, "handleMeterModUpdate sending request",
log.Fields{
"device-id": ofc.DeviceID,
"meter-mod-request": meterModJS})
}
if _, err := volthaClient.UpdateLogicalDeviceMeterTable(context.Background(), &meterModUpdate); err != nil {
- logger.Errorw("Error calling UpdateLogicalDeviceMeterTable",
+ logger.Errorw(ctx, "Error calling UpdateLogicalDeviceMeterTable",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
diff --git a/internal/pkg/openflow/packet.go b/internal/pkg/openflow/packet.go
index 9a13de2..dcef5fc 100644
--- a/internal/pkg/openflow/packet.go
+++ b/internal/pkg/openflow/packet.go
@@ -17,16 +17,17 @@
package openflow
import (
+ "context"
"encoding/json"
ofp "github.com/opencord/goloxi/of13"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/voltha"
)
-func (ofc *OFConnection) handlePacketOut(packetOut *ofp.PacketOut) {
+func (ofc *OFConnection) handlePacketOut(ctx context.Context, packetOut *ofp.PacketOut) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(packetOut)
- logger.Debugw("handlePacketOut called",
+ logger.Debugw(ctx, "handlePacketOut called",
log.Fields{
"device-id": ofc.DeviceID,
"packet-out": js})
@@ -51,7 +52,7 @@
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(pbPacketOut)
- logger.Debugw("handlePacketOut sending",
+ logger.Debugw(ctx, "handlePacketOut sending",
log.Fields{
"device-id": ofc.DeviceID,
"packet-out": js})
diff --git a/internal/pkg/openflow/parseGrpcReturn.go b/internal/pkg/openflow/parseGrpcReturn.go
index 9967d9b..8f67e06 100644
--- a/internal/pkg/openflow/parseGrpcReturn.go
+++ b/internal/pkg/openflow/parseGrpcReturn.go
@@ -17,6 +17,7 @@
import (
"bytes"
+ "context"
"encoding/binary"
"encoding/json"
"github.com/opencord/goloxi"
@@ -26,10 +27,10 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
)
-func parseOxm(ofbField *openflow_13.OfpOxmOfbField) goloxi.IOxm {
+func parseOxm(ctx context.Context, ofbField *openflow_13.OfpOxmOfbField) goloxi.IOxm {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(ofbField)
- logger.Debugw("parseOxm called",
+ logger.Debugw(ctx, "parseOxm called",
log.Fields{"ofbField": js})
}
@@ -60,7 +61,7 @@
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.BigEndian, val.Ipv4Dst)
if err != nil {
- logger.Errorw("error writing ipv4 address %v",
+ logger.Errorw(ctx, "error writing ipv4 address %v",
log.Fields{"error": err})
}
ofpIpv4Dst.Value = buf.Bytes()
@@ -118,17 +119,17 @@
default:
if logger.V(log.WarnLevel) {
js, _ := json.Marshal(ofbField)
- logger.Warnw("ParseOXM Unhandled OxmField",
+ logger.Warnw(ctx, "ParseOXM Unhandled OxmField",
log.Fields{"OfbField": js})
}
}
return nil
}
-func parseInstructions(ofpInstruction *openflow_13.OfpInstruction) ofp.IInstruction {
+func parseInstructions(ctx context.Context, ofpInstruction *openflow_13.OfpInstruction) ofp.IInstruction {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(ofpInstruction)
- logger.Debugw("parseInstructions called",
+ logger.Debugw(ctx, "parseInstructions called",
log.Fields{"Instruction": js})
}
instType := ofpInstruction.Type
@@ -154,13 +155,13 @@
var actions []goloxi.IAction
for _, ofpAction := range ofpInstruction.GetActions().Actions {
- action := parseAction(ofpAction)
+ action := parseAction(ctx, ofpAction)
actions = append(actions, action)
}
instruction.Actions = actions
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(instruction)
- logger.Debugw("parseInstructions returning",
+ logger.Debugw(ctx, "parseInstructions returning",
log.Fields{
"parsed-instruction": js})
}
@@ -170,10 +171,10 @@
return nil
}
-func parseAction(ofpAction *openflow_13.OfpAction) goloxi.IAction {
+func parseAction(ctx context.Context, ofpAction *openflow_13.OfpAction) goloxi.IAction {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(ofpAction)
- logger.Debugw("parseAction called",
+ logger.Debugw(ctx, "parseAction called",
log.Fields{"action": js})
}
switch ofpAction.Type {
@@ -194,7 +195,7 @@
ofpActionSetField := ofpAction.GetSetField()
setFieldAction := ofp.NewActionSetField()
- iOxm := parseOxm(ofpActionSetField.GetField().GetOfbField())
+ iOxm := parseOxm(ctx, ofpActionSetField.GetField().GetOfbField())
setFieldAction.Field = iOxm
return setFieldAction
case openflow_13.OfpActionType_OFPAT_GROUP:
@@ -205,7 +206,7 @@
default:
if logger.V(log.WarnLevel) {
js, _ := json.Marshal(ofpAction)
- logger.Warnw("parseAction unknow action",
+ logger.Warnw(ctx, "parseAction unknow action",
log.Fields{"action": js})
}
}
diff --git a/internal/pkg/openflow/role.go b/internal/pkg/openflow/role.go
index 1ea82c8..740616f 100644
--- a/internal/pkg/openflow/role.go
+++ b/internal/pkg/openflow/role.go
@@ -17,16 +17,17 @@
package openflow
import (
+ "context"
"encoding/json"
"github.com/opencord/goloxi"
ofp "github.com/opencord/goloxi/of13"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFConnection) handleRoleRequest(request *ofp.RoleRequest) {
+func (ofc *OFConnection) handleRoleRequest(ctx context.Context, request *ofp.RoleRequest) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
- logger.Debugw("handleRoleRequest called",
+ logger.Debugw(ctx, "handleRoleRequest called",
log.Fields{
"device-id": ofc.DeviceID,
"request": js})
@@ -38,14 +39,14 @@
reply.SetVersion(request.GetVersion())
reply.SetRole(ofp.ControllerRole(ofc.role))
reply.SetGenerationId(request.GetGenerationId())
- if err := ofc.SendMessage(reply); err != nil {
- logger.Errorw("handle-role-request-send-message", log.Fields{
+ if err := ofc.SendMessage(ctx, reply); err != nil {
+ logger.Errorw(ctx, "handle-role-request-send-message", log.Fields{
"device-id": ofc.DeviceID,
"error": err})
}
}
- ok := ofc.roleManager.UpdateRoles(ofc.OFControllerEndPoint, request)
+ ok := ofc.roleManager.UpdateRoles(ctx, ofc.OFControllerEndPoint, request)
if ok {
reply := ofp.NewRoleReply()
@@ -53,8 +54,8 @@
reply.SetVersion(request.GetVersion())
reply.SetRole(request.GetRole())
reply.SetGenerationId(request.GetGenerationId())
- if err := ofc.SendMessage(reply); err != nil {
- logger.Errorw("handle-role-request-send-message", log.Fields{
+ if err := ofc.SendMessage(ctx, reply); err != nil {
+ logger.Errorw(ctx, "handle-role-request-send-message", log.Fields{
"device-id": ofc.DeviceID,
"error": err})
}
@@ -70,15 +71,15 @@
reply.Data = enc.Bytes()
}
- if err := ofc.SendMessage(reply); err != nil {
- logger.Errorw("handle-role-request-send-message", log.Fields{
+ if err := ofc.SendMessage(ctx, reply); err != nil {
+ logger.Errorw(ctx, "handle-role-request-send-message", log.Fields{
"device-id": ofc.DeviceID,
"error": err})
}
}
}
-func (ofc *OFConnection) sendRoleSlaveError(request ofp.IHeader) {
+func (ofc *OFConnection) sendRoleSlaveError(ctx context.Context, request ofp.IHeader) {
reply := ofp.NewBadRequestErrorMsg()
reply.SetXid(request.GetXid())
reply.SetVersion(request.GetVersion())
@@ -90,8 +91,8 @@
reply.Data = enc.Bytes()
}
- if err := ofc.SendMessage(reply); err != nil {
- logger.Errorw("send-role-slave-error", log.Fields{
+ if err := ofc.SendMessage(ctx, reply); err != nil {
+ logger.Errorw(ctx, "send-role-slave-error", log.Fields{
"device-id": ofc.DeviceID,
"error": err})
}
diff --git a/internal/pkg/openflow/role_test.go b/internal/pkg/openflow/role_test.go
index 66a0387..5023cc8 100644
--- a/internal/pkg/openflow/role_test.go
+++ b/internal/pkg/openflow/role_test.go
@@ -17,6 +17,7 @@
package openflow
import (
+ "context"
ofp "github.com/opencord/goloxi/of13"
"github.com/stretchr/testify/assert"
"testing"
@@ -30,7 +31,7 @@
generateError bool
}
-func (trm *testRoleManager) UpdateRoles(from string, request *ofp.RoleRequest) bool {
+func (trm *testRoleManager) UpdateRoles(ctx context.Context, from string, request *ofp.RoleRequest) bool {
trm.from = from
trm.role = request.Role
trm.generationId = request.GenerationId
@@ -79,12 +80,14 @@
// change role of e1 to master
rr := createRoleRequest(ofp.OFPCRRoleMaster, 1)
- ok := ofclient.UpdateRoles("e1", rr)
+ ctx := context.Background()
+
+ ok := ofclient.UpdateRoles(ctx, "e1", rr)
assert.True(t, ok)
assert.Equal(t, ofclient.connections["e1"].role, ofcRoleMaster)
// change role of e2 to master
- ok = ofclient.UpdateRoles("e2", rr)
+ ok = ofclient.UpdateRoles(ctx, "e2", rr)
assert.True(t, ok)
assert.Equal(t, ofclient.connections["e2"].role, ofcRoleMaster)
// e1 should now have reverted to slave
@@ -93,7 +96,7 @@
// change role of e2 to slave
rr = createRoleRequest(ofp.OFPCRRoleSlave, 1)
- ok = ofclient.UpdateRoles("e2", rr)
+ ok = ofclient.UpdateRoles(ctx, "e2", rr)
assert.True(t, ok)
assert.Equal(t, ofclient.connections["e2"].role, ofcRoleSlave)
}
@@ -103,14 +106,16 @@
rr1 := createRoleRequest(ofp.OFPCRRoleMaster, 2)
- ok := ofclient.UpdateRoles("e1", rr1)
+ ctx := context.Background()
+
+ ok := ofclient.UpdateRoles(ctx, "e1", rr1)
assert.True(t, ok)
assert.Equal(t, ofclient.connections["e1"].role, ofcRoleMaster)
// 'stale' role request
rr2 := createRoleRequest(ofp.OFPCRRoleSlave, 1)
- ok = ofclient.UpdateRoles("e1", rr2)
+ ok = ofclient.UpdateRoles(ctx, "e1", rr2)
// should not have succeeded
assert.False(t, ok)
// role should remain master
@@ -129,7 +134,7 @@
rr := createRoleRequest(ofp.OFPCRRoleMaster, 1)
- connection.handleRoleRequest(rr)
+ connection.handleRoleRequest(context.Background(), rr)
assert.Equal(t, "e1", trm.from)
assert.EqualValues(t, ofp.OFPCRRoleMaster, trm.role)
@@ -152,7 +157,7 @@
rr := createRoleRequest(ofp.OFPCRRoleMaster, 1)
- connection.handleRoleRequest(rr)
+ connection.handleRoleRequest(context.Background(), rr)
resp := (<-connection.sendChannel).(*ofp.RoleRequestFailedErrorMsg)
diff --git a/internal/pkg/openflow/setConfig.go b/internal/pkg/openflow/setConfig.go
index 96d77ce..3ffba6b 100644
--- a/internal/pkg/openflow/setConfig.go
+++ b/internal/pkg/openflow/setConfig.go
@@ -17,15 +17,16 @@
package openflow
import (
+ "context"
"encoding/json"
ofp "github.com/opencord/goloxi/of13"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-func (ofc *OFConnection) handleSetConfig(request *ofp.SetConfig) {
+func (ofc *OFConnection) handleSetConfig(ctx context.Context, request *ofp.SetConfig) {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
- logger.Debugw("handleSetConfig called",
+ logger.Debugw(ctx, "handleSetConfig called",
log.Fields{
"device-id": ofc.DeviceID,
"request": js})
diff --git a/internal/pkg/openflow/stats.go b/internal/pkg/openflow/stats.go
index 0a0c8c8..78a9b93 100644
--- a/internal/pkg/openflow/stats.go
+++ b/internal/pkg/openflow/stats.go
@@ -28,10 +28,10 @@
"github.com/opencord/voltha-protos/v3/go/openflow_13"
)
-func (ofc *OFConnection) handleStatsRequest(request ofp.IHeader, statType uint16) error {
+func (ofc *OFConnection) handleStatsRequest(ctx context.Context, request ofp.IHeader, statType uint16) error {
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(request)
- logger.Debugw("handleStatsRequest called",
+ logger.Debugw(ctx, "handleStatsRequest called",
log.Fields{
"device-id": ofc.DeviceID,
"stat-type": statType,
@@ -48,23 +48,23 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-desc",
+ logger.Debugw(ctx, "handle-stats-request-desc",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTFlow:
statsReq := request.(*ofp.FlowStatsRequest)
- responses, err := ofc.handleFlowStatsRequest(statsReq)
+ responses, err := ofc.handleFlowStatsRequest(ctx, statsReq)
if err != nil {
return err
}
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(responses)
- logger.Debugw("handle-stats-request-flow",
+ logger.Debugw(ctx, "handle-stats-request-flow",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -72,7 +72,7 @@
"response": resJs})
}
for _, response := range responses {
- err := ofc.SendMessage(response)
+ err := ofc.SendMessage(ctx, response)
if err != nil {
return err
}
@@ -88,20 +88,20 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-aggregate",
+ logger.Debugw(ctx, "handle-stats-request-aggregate",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTTable:
statsReq := request.(*ofp.TableStatsRequest)
response, e := ofc.handleTableStatsRequest(statsReq)
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-table",
+ logger.Debugw(ctx, "handle-stats-request-table",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
@@ -110,7 +110,7 @@
if e != nil {
return e
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTPort:
statsReq := request.(*ofp.PortStatsRequest)
responses, err := ofc.handlePortStatsRequest(statsReq)
@@ -120,14 +120,14 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(responses)
- logger.Debugw("handle-stats-request-port",
+ logger.Debugw(ctx, "handle-stats-request-port",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
for _, response := range responses {
- err := ofc.SendMessage(response)
+ err := ofc.SendMessage(ctx, response)
if err != nil {
return err
}
@@ -142,13 +142,13 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-queue",
+ logger.Debugw(ctx, "handle-stats-request-queue",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTGroup:
statsReq := request.(*ofp.GroupStatsRequest)
response, err := ofc.handleGroupStatsRequest(statsReq)
@@ -158,29 +158,29 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-group",
+ logger.Debugw(ctx, "handle-stats-request-group",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTGroupDesc:
statsReq := request.(*ofp.GroupDescStatsRequest)
- response, err := ofc.handleGroupStatsDescRequest(statsReq)
+ response, err := ofc.handleGroupStatsDescRequest(ctx, statsReq)
if err != nil {
return err
}
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-group-desc",
+ logger.Debugw(ctx, "handle-stats-request-group-desc",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTGroupFeatures:
statsReq := request.(*ofp.GroupFeaturesStatsRequest)
@@ -191,13 +191,13 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-group-features",
+ logger.Debugw(ctx, "handle-stats-request-group-features",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTMeter:
statsReq := request.(*ofp.MeterStatsRequest)
response, err := ofc.handleMeterStatsRequest(statsReq)
@@ -207,13 +207,13 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-meter",
+ logger.Debugw(ctx, "handle-stats-request-meter",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTMeterConfig:
statsReq := request.(*ofp.MeterConfigStatsRequest)
response, err := ofc.handleMeterConfigStatsRequest(statsReq)
@@ -223,13 +223,13 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-meter-config",
+ logger.Debugw(ctx, "handle-stats-request-meter-config",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTMeterFeatures:
statsReq := request.(*ofp.MeterFeaturesStatsRequest)
response, err := ofc.handleMeterFeatureStatsRequest(statsReq)
@@ -239,13 +239,13 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-meter-features",
+ logger.Debugw(ctx, "handle-stats-request-meter-features",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTTableFeatures:
statsReq := request.(*ofp.TableFeaturesStatsRequest)
response, err := ofc.handleTableFeaturesStatsRequest(statsReq)
@@ -255,13 +255,13 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-table-features",
+ logger.Debugw(ctx, "handle-stats-request-table-features",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
case ofp.OFPSTPortDesc:
statsReq := request.(*ofp.PortDescStatsRequest)
responses, err := ofc.handlePortDescStatsRequest(statsReq)
@@ -271,14 +271,14 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(responses)
- logger.Debugw("handle-stats-request-port-desc",
+ logger.Debugw(ctx, "handle-stats-request-port-desc",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
for _, response := range responses {
- err := ofc.SendMessage(response)
+ err := ofc.SendMessage(ctx, response)
if err != nil {
return err
}
@@ -294,13 +294,13 @@
if logger.V(log.DebugLevel) {
reqJs, _ := json.Marshal(statsReq)
resJs, _ := json.Marshal(response)
- logger.Debugw("handle-stats-request-experimenter",
+ logger.Debugw(ctx, "handle-stats-request-experimenter",
log.Fields{
"device-id": ofc.DeviceID,
"request": reqJs,
"response": resJs})
}
- return ofc.SendMessage(response)
+ return ofc.SendMessage(ctx, response)
}
return nil
}
@@ -330,7 +330,7 @@
return response, nil
}
-func (ofc *OFConnection) handleFlowStatsRequest(request *ofp.FlowStatsRequest) ([]*ofp.FlowStatsReply, error) {
+func (ofc *OFConnection) handleFlowStatsRequest(ctx context.Context, request *ofp.FlowStatsRequest) ([]*ofp.FlowStatsReply, error) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
return nil, NoVolthaConnectionError
@@ -360,7 +360,7 @@
for _, oxmField := range pbMatch.GetOxmFields() {
field := oxmField.GetField()
ofbField := field.(*openflow_13.OfpOxmField_OfbField).OfbField
- iOxm := parseOxm(ofbField)
+ iOxm := parseOxm(ctx, ofbField)
fields = append(fields, iOxm)
}
@@ -368,7 +368,7 @@
entry.SetMatch(*match)
var instructions []ofp.IInstruction
for _, ofpInstruction := range item.Instructions {
- instruction := parseInstructions(ofpInstruction)
+ instruction := parseInstructions(ctx, ofpInstruction)
instructions = append(instructions, instruction)
}
entry.Instructions = instructions
@@ -452,7 +452,7 @@
return response, nil
}
-func (ofc *OFConnection) handleGroupStatsDescRequest(request *ofp.GroupDescStatsRequest) (*ofp.GroupDescStatsReply, error) {
+func (ofc *OFConnection) handleGroupStatsDescRequest(ctx context.Context, request *ofp.GroupDescStatsRequest) (*ofp.GroupDescStatsReply, error) {
volthaClient := ofc.VolthaClient.Get()
if volthaClient == nil {
return nil, NoVolthaConnectionError
@@ -470,10 +470,10 @@
for _, item := range reply.GetItems() {
desc := item.GetDesc()
- buckets := volthaBucketsToOpenflow(desc.Buckets)
+ buckets := volthaBucketsToOpenflow(ctx, desc.Buckets)
groupDesc := &ofp.GroupDescStatsEntry{
- GroupType: volthaGroupTypeToOpenflow(desc.Type),
+ GroupType: volthaGroupTypeToOpenflow(ctx, desc.Type),
GroupId: desc.GroupId,
Buckets: buckets,
}
diff --git a/internal/pkg/openflow/stats_test.go b/internal/pkg/openflow/stats_test.go
index 05273bf..35cd503 100644
--- a/internal/pkg/openflow/stats_test.go
+++ b/internal/pkg/openflow/stats_test.go
@@ -17,6 +17,7 @@
package openflow
import (
+ "context"
"fmt"
"math"
"testing"
@@ -192,7 +193,7 @@
request := of13.NewFlowStatsRequest()
- replies, err := ofc.handleFlowStatsRequest(request)
+ replies, err := ofc.handleFlowStatsRequest(context.Background(), request)
assert.Equal(t, err, nil)
// check that the correct number of messages is generated