[VOL-3187]Pass Context down the execution call hierarchy across ofagent codebase
Change-Id: Ia5f2fa1509beefe0ddc427b83e39d2702782db8f
diff --git a/internal/pkg/openflow/connection.go b/internal/pkg/openflow/connection.go
index fd0aa48..aafe741 100644
--- a/internal/pkg/openflow/connection.go
+++ b/internal/pkg/openflow/connection.go
@@ -67,9 +67,9 @@
return &header, nil
}
-func (ofc *OFConnection) establishConnectionToController() error {
+func (ofc *OFConnection) establishConnectionToController(ctx context.Context) error {
if ofc.conn != nil {
- logger.Debugw("closing-of-connection-to-reconnect",
+ logger.Debugw(ctx, "closing-of-connection-to-reconnect",
log.Fields{"device-id": ofc.DeviceID})
ofc.conn.Close()
ofc.conn = nil
@@ -77,18 +77,18 @@
try := 1
for ofc.ConnectionMaxRetries == 0 || try < ofc.ConnectionMaxRetries {
if raddr, err := net.ResolveTCPAddr("tcp", ofc.OFControllerEndPoint); err != nil {
- logger.Debugw("openflow-client unable to resolve endpoint",
+ logger.Debugw(ctx, "openflow-client unable to resolve endpoint",
log.Fields{
"device-id": ofc.DeviceID,
"endpoint": ofc.OFControllerEndPoint})
} else {
if connection, err := net.DialTCP("tcp", nil, raddr); err == nil {
ofc.conn = connection
- ofc.sayHello()
+ ofc.sayHello(ctx)
ofc.events <- ofcEventConnect
return nil
} else {
- logger.Warnw("openflow-client-connect-error",
+ logger.Warnw(ctx, "openflow-client-connect-error",
log.Fields{
"device-id": ofc.DeviceID,
"endpoint": ofc.OFControllerEndPoint})
@@ -117,33 +117,33 @@
select {
case <-ctx.Done():
state = ofcStateStopped
- logger.Debugw("state-transition-context-done",
+ logger.Debugw(ctx, "state-transition-context-done",
log.Fields{"device-id": ofc.DeviceID})
break top
case event := <-ofc.events:
previous := state
switch event {
case ofcEventStart:
- logger.Debugw("ofc-event-start",
+ logger.Debugw(ctx, "ofc-event-start",
log.Fields{"device-id": ofc.DeviceID})
if state == ofcStateCreated {
state = ofcStateStarted
- logger.Debug("STARTED MORE THAN ONCE")
+ logger.Debug(ctx, "STARTED MORE THAN ONCE")
go func() {
- if err := ofc.establishConnectionToController(); err != nil {
- logger.Errorw("controller-connection-failed", log.Fields{"error": err})
+ if err := ofc.establishConnectionToController(ctx); err != nil {
+ logger.Errorw(ctx, "controller-connection-failed", log.Fields{"error": err})
panic(err)
}
}()
} else {
- logger.Errorw("illegal-state-transition",
+ logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
case ofcEventConnect:
- logger.Debugw("ofc-event-connected",
+ logger.Debugw(ctx, "ofc-event-connected",
log.Fields{"device-id": ofc.DeviceID})
if state == ofcStateStarted || state == ofcStateDisconnected {
state = ofcStateConnected
@@ -151,14 +151,14 @@
go ofc.messageSender(ofCtx)
go ofc.processOFStream(ofCtx)
} else {
- logger.Errorw("illegal-state-transition",
+ logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
case ofcEventDisconnect:
- logger.Debugw("ofc-event-disconnected",
+ logger.Debugw(ctx, "ofc-event-disconnected",
log.Fields{
"device-id": ofc.DeviceID,
"state": state.String()})
@@ -169,33 +169,33 @@
ofDone = nil
}
go func() {
- if err := ofc.establishConnectionToController(); err != nil {
+ if err := ofc.establishConnectionToController(ctx); err != nil {
log.Errorw("controller-connection-failed", log.Fields{"error": err})
panic(err)
}
}()
} else {
- logger.Errorw("illegal-state-transition",
+ logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
case ofcEventStop:
- logger.Debugw("ofc-event-stop",
+ logger.Debugw(ctx, "ofc-event-stop",
log.Fields{"device-id": ofc.DeviceID})
if state == ofcStateCreated || state == ofcStateConnected || state == ofcStateDisconnected {
state = ofcStateStopped
break top
} else {
- logger.Errorw("illegal-state-transition",
+ logger.Errorw(ctx, "illegal-state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"current-state": state.String(),
"event": event.String()})
}
}
- logger.Debugw("state-transition",
+ logger.Debugw(ctx, "state-transition",
log.Fields{
"device-id": ofc.DeviceID,
"previous-state": previous.String(),
@@ -237,20 +237,20 @@
for {
select {
case <-ctx.Done():
- logger.Error("of-loop-ending-context-done")
+ logger.Error(ctx, "of-loop-ending-context-done")
break top
default:
// Read 8 bytes, the standard OF header
read, err := io.ReadFull(fromController, headerBuf)
if err != nil {
if err == io.EOF {
- logger.Infow("controller-disconnected",
+ logger.Infow(ctx, "controller-disconnected",
log.Fields{
"device-id": ofc.DeviceID,
"controller": ofc.OFControllerEndPoint,
})
} else {
- logger.Errorw("bad-of-header",
+ logger.Errorw(ctx, "bad-of-header",
log.Fields{
"byte-count": read,
"device-id": ofc.DeviceID,
@@ -267,7 +267,7 @@
* Header is bad, assume stream is corrupted
* and needs to be restarted
*/
- logger.Errorw("bad-of-packet",
+ logger.Errorw(ctx, "bad-of-packet",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
@@ -280,7 +280,7 @@
copy(messageBuf, headerBuf)
read, err = io.ReadFull(fromController, messageBuf[8:])
if err != nil {
- logger.Errorw("bad-of-packet",
+ logger.Errorw(ctx, "bad-of-packet",
log.Fields{
"byte-count": read,
"device-id": ofc.DeviceID,
@@ -294,7 +294,7 @@
if err != nil {
// nolint: staticcheck
js, _ := json.Marshal(decoder)
- logger.Errorw("failed-to-decode",
+ logger.Errorw(ctx, "failed-to-decode",
log.Fields{
"device-id": ofc.DeviceID,
"decoder": js,
@@ -303,7 +303,7 @@
}
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(msg)
- logger.Debugw("packet-header",
+ logger.Debugw(ctx, "packet-header",
log.Fields{
"device-id": ofc.DeviceID,
"header": js})
@@ -318,15 +318,15 @@
* with no guarantees of handling all messages before a barrier.
* A multiple queue (incoming worker and outgoing) is a possible solution.
*/
- go ofc.parseHeader(msg)
+ go ofc.parseHeader(ctx, msg)
}
}
- logger.Debugw("end-of-stream",
+ logger.Debugw(ctx, "end-of-stream",
log.Fields{"device-id": ofc.DeviceID})
ofc.events <- ofcEventDisconnect
}
-func (ofc *OFConnection) sayHello() {
+func (ofc *OFConnection) sayHello(ctx context.Context) {
hello := ofp.NewHello()
hello.Xid = uint32(GetXid())
elem := ofp.NewHelloElemVersionbitmap()
@@ -336,94 +336,94 @@
hello.SetElements([]ofp.IHelloElem{elem})
if logger.V(log.DebugLevel) {
js, _ := json.Marshal(hello)
- logger.Debugw("sayHello Called",
+ logger.Debugw(ctx, "sayHello Called",
log.Fields{
"device-id": ofc.DeviceID,
"hello-message": js})
}
- if err := ofc.SendMessage(hello); err != nil {
- logger.Fatalw("Failed saying hello to Openflow Server, unable to proceed",
+ if err := ofc.SendMessage(ctx, hello); err != nil {
+ logger.Fatalw(ctx, "Failed saying hello to Openflow Server, unable to proceed",
log.Fields{
"device-id": ofc.DeviceID,
"error": err})
}
}
-func (ofc *OFConnection) parseHeader(header ofp.IHeader) {
+func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader) {
headerType := header.GetType()
- logger.Debugw("packet-header-type",
+ logger.Debugw(ctx, "packet-header-type",
log.Fields{
"header-type": ofp.Type(headerType).String()})
switch headerType {
case ofp.OFPTHello:
//x := header.(*ofp.Hello)
case ofp.OFPTError:
- go ofc.handleErrMsg(header.(*ofp.ErrorMsg))
+ go ofc.handleErrMsg(ctx, header.(*ofp.ErrorMsg))
case ofp.OFPTEchoRequest:
- go ofc.handleEchoRequest(header.(*ofp.EchoRequest))
+ go ofc.handleEchoRequest(ctx, header.(*ofp.EchoRequest))
case ofp.OFPTEchoReply:
case ofp.OFPTExperimenter:
case ofp.OFPTFeaturesRequest:
go func() {
- if err := ofc.handleFeatureRequest(header.(*ofp.FeaturesRequest)); err != nil {
- logger.Errorw("handle-feature-request", log.Fields{"error": err})
+ if err := ofc.handleFeatureRequest(ctx, header.(*ofp.FeaturesRequest)); err != nil {
+ logger.Errorw(ctx, "handle-feature-request", log.Fields{"error": err})
}
}()
case ofp.OFPTFeaturesReply:
case ofp.OFPTGetConfigRequest:
- go ofc.handleGetConfigRequest(header.(*ofp.GetConfigRequest))
+ go ofc.handleGetConfigRequest(ctx, header.(*ofp.GetConfigRequest))
case ofp.OFPTGetConfigReply:
case ofp.OFPTSetConfig:
- go ofc.handleSetConfig(header.(*ofp.SetConfig))
+ go ofc.handleSetConfig(ctx, header.(*ofp.SetConfig))
case ofp.OFPTPacketIn:
case ofp.OFPTFlowRemoved:
case ofp.OFPTPortStatus:
case ofp.OFPTPacketOut:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
- ofc.sendRoleSlaveError(header)
+ ofc.sendRoleSlaveError(ctx, header)
return
}
- go ofc.handlePacketOut(header.(*ofp.PacketOut))
+ go ofc.handlePacketOut(ctx, header.(*ofp.PacketOut))
case ofp.OFPTFlowMod:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
- ofc.sendRoleSlaveError(header)
+ ofc.sendRoleSlaveError(ctx, header)
return
}
switch header.(ofp.IFlowMod).GetCommand() {
case ofp.OFPFCAdd:
- ofc.handleFlowAdd(header.(*ofp.FlowAdd))
+ ofc.handleFlowAdd(ctx, header.(*ofp.FlowAdd))
case ofp.OFPFCModify:
- ofc.handleFlowMod(header.(*ofp.FlowMod))
+ ofc.handleFlowMod(ctx, header.(*ofp.FlowMod))
case ofp.OFPFCModifyStrict:
- ofc.handleFlowModStrict(header.(*ofp.FlowModifyStrict))
+ ofc.handleFlowModStrict(ctx, header.(*ofp.FlowModifyStrict))
case ofp.OFPFCDelete:
- ofc.handleFlowDelete(header.(*ofp.FlowDelete))
+ ofc.handleFlowDelete(ctx, header.(*ofp.FlowDelete))
case ofp.OFPFCDeleteStrict:
- ofc.handleFlowDeleteStrict(header.(*ofp.FlowDeleteStrict))
+ ofc.handleFlowDeleteStrict(ctx, header.(*ofp.FlowDeleteStrict))
}
case ofp.OFPTStatsRequest:
go func() {
- if err := ofc.handleStatsRequest(header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
- logger.Errorw("ofpt-stats-request", log.Fields{"error": err})
+ if err := ofc.handleStatsRequest(ctx, header, header.(ofp.IStatsRequest).GetStatsType()); err != nil {
+ logger.Errorw(ctx, "ofpt-stats-request", log.Fields{"error": err})
}
}()
case ofp.OFPTBarrierRequest:
/* See note above at case ofp.OFPTFlowMod:*/
- ofc.handleBarrierRequest(header.(*ofp.BarrierRequest))
+ ofc.handleBarrierRequest(ctx, header.(*ofp.BarrierRequest))
case ofp.OFPTRoleRequest:
- go ofc.handleRoleRequest(header.(*ofp.RoleRequest))
+ go ofc.handleRoleRequest(ctx, header.(*ofp.RoleRequest))
case ofp.OFPTMeterMod:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
- ofc.sendRoleSlaveError(header)
+ ofc.sendRoleSlaveError(ctx, header)
return
}
- ofc.handleMeterModRequest(header.(*ofp.MeterMod))
+ ofc.handleMeterModRequest(ctx, header.(*ofp.MeterMod))
case ofp.OFPTGroupMod:
if !(ofc.role == ofcRoleMaster || ofc.role == ofcRoleEqual) {
- ofc.sendRoleSlaveError(header)
+ ofc.sendRoleSlaveError(ctx, header)
return
}
- ofc.handleGroupMod(header.(ofp.IGroupMod))
+ ofc.handleGroupMod(ctx, header.(ofp.IGroupMod))
}
}
@@ -433,7 +433,7 @@
Serialize(encoder *goloxi.Encoder) error
}
-func (ofc *OFConnection) doSend(msg Message) error {
+func (ofc *OFConnection) doSend(ctx context.Context, msg Message) error {
if ofc.conn == nil {
return errors.New("no-connection")
}
@@ -444,7 +444,7 @@
bytes := enc.Bytes()
if _, err := ofc.conn.Write(bytes); err != nil {
- logger.Errorw("unable-to-send-message-to-controller",
+ logger.Errorw(ctx, "unable-to-send-message-to-controller",
log.Fields{
"device-id": ofc.DeviceID,
"message": msg,
@@ -457,7 +457,7 @@
func (ofc *OFConnection) messageSender(ctx context.Context) {
// first process last fail if it exists
if ofc.lastUnsentMessage != nil {
- if err := ofc.doSend(ofc.lastUnsentMessage); err != nil {
+ if err := ofc.doSend(ctx, ofc.lastUnsentMessage); err != nil {
ofc.events <- ofcEventDisconnect
return
}
@@ -469,30 +469,30 @@
case <-ctx.Done():
break top
case msg := <-ofc.sendChannel:
- if err := ofc.doSend(msg); err != nil {
+ if err := ofc.doSend(ctx, msg); err != nil {
ofc.lastUnsentMessage = msg
ofc.events <- ofcEventDisconnect
- logger.Debugw("message-sender-error",
+ logger.Debugw(ctx, "message-sender-error",
log.Fields{
"device-id": ofc.DeviceID,
"error": err.Error()})
break top
}
- logger.Debugw("message-sender-send",
+ logger.Debugw(ctx, "message-sender-send",
log.Fields{
"device-id": ofc.DeviceID})
ofc.lastUnsentMessage = nil
}
}
- logger.Debugw("message-sender-finished",
+ logger.Debugw(ctx, "message-sender-finished",
log.Fields{
"device-id": ofc.DeviceID})
}
// SendMessage queues a message to be sent to the openflow controller
-func (ofc *OFConnection) SendMessage(message Message) error {
- logger.Debug("queuing-message")
+func (ofc *OFConnection) SendMessage(ctx context.Context, message Message) error {
+ logger.Debug(ctx, "queuing-message")
ofc.sendChannel <- message
return nil
}