[VOL-4756] Cleanup TODO context
Change-Id: I21d5ec8cc015154bc893e54c652d31562d8da5d9
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
index cea6960..b410490 100644
--- a/internal/pkg/controller/device.go
+++ b/internal/pkg/controller/device.go
@@ -130,7 +130,7 @@
}
// NewDevice is the constructor for Device
-func NewDevice(id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID string) *Device {
+func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID string) *Device {
var device Device
device.ID = id
device.SerialNum = slno
@@ -143,7 +143,7 @@
device.flowQueue = make(map[uint32]*UniIDFlowQueue)
//Get the flowhash from db and update the flowhash variable in the device.
device.SouthBoundID = southBoundID
- flowHash, err := db.GetFlowHash(id)
+ flowHash, err := db.GetFlowHash(cntx, id)
if err != nil {
device.flowHash = DefaultMaxFlowQueues
} else {
@@ -177,7 +177,7 @@
}
// AddFlow - Adds the flow to the device and also to the database
-func (d *Device) AddFlow(flow *of.VoltSubFlow) error {
+func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
d.flowLock.Lock()
defer d.flowLock.Unlock()
logger.Infow(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
@@ -185,34 +185,34 @@
return errors.New(ErrDuplicateFlow)
}
d.flows[flow.Cookie] = flow
- d.AddFlowToDb(flow)
+ d.AddFlowToDb(cntx, flow)
return nil
}
// AddFlowToDb is the utility to add the flow to the device
-func (d *Device) AddFlowToDb(flow *of.VoltSubFlow) {
+func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
if b, err := json.Marshal(flow); err == nil {
- if err = db.PutFlow(d.ID, flow.Cookie, string(b)); err != nil {
+ if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
}
}
}
// DelFlow - Deletes the flow from the device and the database
-func (d *Device) DelFlow(flow *of.VoltSubFlow) error {
+func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
d.flowLock.Lock()
defer d.flowLock.Unlock()
if _, ok := d.flows[flow.Cookie]; ok {
delete(d.flows, flow.Cookie)
- d.DelFlowFromDb(flow.Cookie)
+ d.DelFlowFromDb(cntx, flow.Cookie)
return nil
}
return errors.New("Flow does not Exist")
}
// DelFlowFromDb is utility to delete the flow from the device
-func (d *Device) DelFlowFromDb(flowID uint64) {
- _ = db.DelFlow(d.ID, flowID)
+func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
+ _ = db.DelFlow(cntx, d.ID, flowID)
}
// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
@@ -231,22 +231,22 @@
}
// DelFlowWithOldCookie is to delete flow with old cookie.
-func (d *Device) DelFlowWithOldCookie(flow *of.VoltSubFlow) error {
+func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
d.flowLock.Lock()
defer d.flowLock.Unlock()
if _, ok := d.flows[flow.OldCookie]; ok {
logger.Infow(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
log.Fields{"OldCookie": flow.OldCookie})
delete(d.flows, flow.OldCookie)
- d.DelFlowFromDb(flow.OldCookie)
+ d.DelFlowFromDb(cntx, flow.OldCookie)
return nil
}
return errors.New("Flow does not Exist")
}
// RestoreFlowsFromDb to restore flows from database
-func (d *Device) RestoreFlowsFromDb() {
- flows, _ := db.GetFlows(d.ID)
+func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
+ flows, _ := db.GetFlows(cntx, d.ID)
for _, flow := range flows {
b, ok := flow.Value.([]byte)
if !ok {
@@ -277,41 +277,41 @@
// Group operations at the device which include update and delete
// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
-func (d *Device) UpdateGroupEntry(group *of.Group) {
+func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
logger.Infow(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
d.groups.Store(group.GroupID, group)
- d.AddGroupToDb(group)
+ d.AddGroupToDb(cntx, group)
}
// AddGroupToDb - Utility to add the group to the device DB
-func (d *Device) AddGroupToDb(group *of.Group) {
+func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
if b, err := json.Marshal(group); err == nil {
logger.Infow(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
- if err = db.PutGroup(d.ID, group.GroupID, string(b)); err != nil {
+ if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
}
}
}
// DelGroupEntry - Deletes the group from the device and the database
-func (d *Device) DelGroupEntry(group *of.Group) {
+func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
if _, ok := d.groups.Load(group.GroupID); ok {
d.groups.Delete(group.GroupID)
- d.DelGroupFromDb(group.GroupID)
+ d.DelGroupFromDb(cntx, group.GroupID)
}
}
// DelGroupFromDb - Utility to delete the Group from the device
-func (d *Device) DelGroupFromDb(groupID uint32) {
- _ = db.DelGroup(d.ID, groupID)
+func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
+ _ = db.DelGroup(cntx, d.ID, groupID)
}
//RestoreGroupsFromDb - restores all groups from DB
-func (d *Device) RestoreGroupsFromDb() {
+func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
logger.Info(ctx, "Restoring Groups")
- groups, _ := db.GetGroups(d.ID)
+ groups, _ := db.GetGroups(cntx, d.ID)
for _, group := range groups {
b, ok := group.Value.([]byte)
if !ok {
@@ -338,14 +338,14 @@
}
// AddMeter to add meter
-func (d *Device) AddMeter(meter *of.Meter) error {
+func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
d.meterLock.Lock()
defer d.meterLock.Unlock()
if _, ok := d.meters[meter.ID]; ok {
return errors.New("Duplicate Meter")
}
d.meters[meter.ID] = meter
- go d.AddMeterToDb(meter)
+ go d.AddMeterToDb(cntx, meter)
return nil
}
@@ -360,34 +360,34 @@
}
// DelMeter to delete meter
-func (d *Device) DelMeter(meter *of.Meter) bool {
+func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
d.meterLock.Lock()
defer d.meterLock.Unlock()
if _, ok := d.meters[meter.ID]; ok {
delete(d.meters, meter.ID)
- go d.DelMeterFromDb(meter.ID)
+ go d.DelMeterFromDb(cntx, meter.ID)
return true
}
return false
}
// AddMeterToDb is utility to add the Group to the device
-func (d *Device) AddMeterToDb(meter *of.Meter) {
+func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
if b, err := json.Marshal(meter); err == nil {
- if err = db.PutDeviceMeter(d.ID, meter.ID, string(b)); err != nil {
+ if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
}
}
}
// DelMeterFromDb to delete meter from db
-func (d *Device) DelMeterFromDb(id uint32) {
- _ = db.DelDeviceMeter(d.ID, id)
+func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
+ _ = db.DelDeviceMeter(cntx, d.ID, id)
}
// RestoreMetersFromDb to restore meters from db
-func (d *Device) RestoreMetersFromDb() {
- meters, _ := db.GetDeviceMeters(d.ID)
+func (d *Device) RestoreMetersFromDb(cntx context.Context) {
+ meters, _ := db.GetDeviceMeters(cntx, d.ID)
for _, meter := range meters {
b, ok := meter.Value.([]byte)
if !ok {
@@ -420,7 +420,7 @@
// AddPort to add the port as requested by the device/VOLTHA
// Inform the application if the port is successfully added
-func (d *Device) AddPort(id uint32, name string) error {
+func (d *Device) AddPort(cntx context.Context, id uint32, name string) error {
d.portLock.Lock()
defer d.portLock.Unlock()
@@ -434,36 +434,36 @@
p := NewDevicePort(id, name)
d.PortsByID[id] = p
d.PortsByName[name] = p
- d.WritePortToDb(p)
- GetController().PortAddInd(d.ID, p.ID, p.Name)
+ d.WritePortToDb(cntx, p)
+ GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
return nil
}
// DelPort to delete the port as requested by the device/VOLTHA
// Inform the application if the port is successfully deleted
-func (d *Device) DelPort(id uint32) error {
+func (d *Device) DelPort(cntx context.Context, id uint32) error {
p := d.GetPortByID(id)
if p == nil {
return errors.New("Unknown Port")
}
if p.State == PortStateUp {
- GetController().PortDownInd(d.ID, p.Name)
+ GetController().PortDownInd(cntx, d.ID, p.Name)
}
d.portLock.Lock()
defer d.portLock.Unlock()
- GetController().PortDelInd(d.ID, p.Name)
+ GetController().PortDelInd(cntx, d.ID, p.Name)
delete(d.PortsByID, p.ID)
delete(d.PortsByName, p.Name)
- d.DelPortFromDb(p.ID)
+ d.DelPortFromDb(cntx, p.ID)
logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
return nil
}
// UpdatePortByName is utility to update the port by Name
-func (d *Device) UpdatePortByName(name string, port uint32) {
+func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
d.portLock.Lock()
defer d.portLock.Unlock()
@@ -474,7 +474,7 @@
delete(d.PortsByID, p.ID)
p.ID = port
d.PortsByID[port] = p
- d.WritePortToDb(p)
+ d.WritePortToDb(cntx, p)
GetController().PortUpdateInd(d.ID, p.Name, p.ID)
logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
}
@@ -539,42 +539,42 @@
}
// WritePortToDb to add the port to the database
-func (d *Device) WritePortToDb(port *DevicePort) {
+func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
port.Version = database.PresentVersionMap[database.DevicePortPath]
if b, err := json.Marshal(port); err == nil {
- if err = db.PutPort(d.ID, port.ID, string(b)); err != nil {
+ if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
}
}
}
// DelPortFromDb to delete port from database
-func (d *Device) DelPortFromDb(id uint32) {
- _ = db.DelPort(d.ID, id)
+func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
+ _ = db.DelPort(cntx, d.ID, id)
}
// RestorePortsFromDb to restore ports from database
-func (d *Device) RestorePortsFromDb() {
- ports, _ := db.GetPorts(d.ID)
+func (d *Device) RestorePortsFromDb(cntx context.Context) {
+ ports, _ := db.GetPorts(cntx, d.ID)
for _, port := range ports {
b, ok := port.Value.([]byte)
if !ok {
logger.Warn(ctx, "The value type is not []byte")
continue
}
- d.CreatePortFromString(b)
+ d.CreatePortFromString(cntx, b)
}
}
// CreatePortFromString to create port from string
-func (d *Device) CreatePortFromString(b []byte) {
+func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
var port DevicePort
if err := json.Unmarshal(b, &port); err == nil {
if _, ok := d.PortsByID[port.ID]; !ok {
logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
d.PortsByID[port.ID] = &port
d.PortsByName[port.Name] = &port
- GetController().PortAddInd(d.ID, port.ID, port.Name)
+ GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
} else {
logger.Warnw(ctx, "Duplicate Port", log.Fields{"ID": port.ID})
}
@@ -662,7 +662,7 @@
}
// DeviceRebootInd is called when the logical device is rebooted.
-func (d *Device) DeviceRebootInd() {
+func (d *Device) DeviceRebootInd(cntx context.Context) {
logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
if d.State == DeviceStateREBOOTED {
@@ -673,19 +673,19 @@
d.State = DeviceStateREBOOTED
GetController().SetRebootInProgressForDevice(d.ID)
- GetController().DeviceRebootInd(d.ID, d.SerialNum, d.SouthBoundID)
- d.ReSetAllPortStates()
+ GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
+ d.ReSetAllPortStates(cntx)
}
// DeviceDisabledInd is called when the logical device is disabled
-func (d *Device) DeviceDisabledInd() {
+func (d *Device) DeviceDisabledInd(cntx context.Context) {
logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
d.State = DeviceStateDISABLED
- GetController().DeviceDisableInd(d.ID)
+ GetController().DeviceDisableInd(cntx, d.ID)
}
//ReSetAllPortStates - Set all logical device port status to DOWN
-func (d *Device) ReSetAllPortStates() {
+func (d *Device) ReSetAllPortStates(cntx context.Context) {
logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
d.portLock.Lock()
@@ -694,15 +694,15 @@
for _, port := range d.PortsByID {
if port.State != PortStateDown {
logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
- GetController().PortDownInd(d.ID, port.Name)
+ GetController().PortDownInd(cntx, d.ID, port.Name)
port.State = PortStateDown
- d.WritePortToDb(port)
+ d.WritePortToDb(cntx, port)
}
}
}
//ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
-func (d *Device) ReSetAllPortStatesInDb() {
+func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
d.portLock.Lock()
@@ -712,14 +712,14 @@
if port.State != PortStateDown {
logger.Infow(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
port.State = PortStateDown
- d.WritePortToDb(port)
+ d.WritePortToDb(cntx, port)
}
}
}
// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
// to update only when the port state is DOWN
-func (d *Device) ProcessPortUpdate(portName string, port uint32, state uint32) {
+func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
if p := d.GetPortByName(portName); p != nil {
if p.ID != port {
logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
@@ -727,10 +727,10 @@
logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
return
}
- d.UpdatePortByName(portName, port)
+ d.UpdatePortByName(cntx, portName, port)
logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
}
- d.ProcessPortState(port, state)
+ d.ProcessPortState(cntx, port, state)
}
}
@@ -750,7 +750,7 @@
// ProcessPortState deals with the change in port status and taking action
// based on the new state and the old state
-func (d *Device) ProcessPortState(port uint32, state uint32) {
+func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
if d.State != DeviceStateUP && !util.IsNniPort(port) {
logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
return
@@ -765,15 +765,15 @@
if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
// Transition from DOWN to UP
logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
- GetController().PortUpInd(d.ID, p.Name)
+ GetController().PortUpInd(cntx, d.ID, p.Name)
p.State = PortStateUp
- d.WritePortToDb(p)
+ d.WritePortToDb(cntx, p)
} else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
// Transition from UP to Down
logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
- GetController().PortDownInd(d.ID, p.Name)
+ GetController().PortDownInd(cntx, d.ID, p.Name)
p.State = PortStateDown
- d.WritePortToDb(p)
+ d.WritePortToDb(cntx, p)
} else {
logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
}
@@ -781,7 +781,7 @@
}
// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
-func (d *Device) ProcessPortStateAfterReboot(port uint32, state uint32) {
+func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
if d.State != DeviceStateUP && !util.IsNniPort(port) {
logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
return
@@ -791,10 +791,10 @@
p.Tasks.Initialize(d.ctx)
if p.State == PortStateUp {
logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
- GetController().PortUpInd(d.ID, p.Name)
+ GetController().PortUpInd(cntx, d.ID, p.Name)
} else if p.State == PortStateDown {
logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
- GetController().PortDownInd(d.ID, p.Name)
+ GetController().PortDownInd(cntx, d.ID, p.Name)
}
}
}
@@ -810,7 +810,7 @@
// PacketIn handle the incoming packet-in and deliver to the application for the
// actual processing
-func (d *Device) PacketIn(pkt *ofp.PacketIn) {
+func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
@@ -819,7 +819,7 @@
data := pkt.PacketIn.Data
port := PacketInGetPort(pkt.PacketIn)
if pName, err := d.GetPortName(port); err == nil {
- GetController().PacketInInd(d.ID, pName, data)
+ GetController().PacketInInd(cntx, d.ID, pName, data)
} else {
logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
}
@@ -954,21 +954,21 @@
}
// SetFlowHash sets the device flow hash and writes to the DB.
-func (d *Device) SetFlowHash(hash uint32) {
+func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
d.flowQueueLock.Lock()
defer d.flowQueueLock.Unlock()
d.flowHash = hash
- d.writeFlowHashToDB()
+ d.writeFlowHashToDB(cntx)
}
-func (d *Device) writeFlowHashToDB() {
+func (d *Device) writeFlowHashToDB(cntx context.Context) {
hash, err := json.Marshal(d.flowHash)
if err != nil {
logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash})
return
}
- if err := db.PutFlowHash(d.ID, string(hash)); err != nil {
+ if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash})
}
}
@@ -987,12 +987,12 @@
return false
}
-func (d *Device) triggerFlowNotification(cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
flow, _ := d.GetFlow(cookie)
- d.triggerFlowResultNotification(cookie, flow, oper, bwDetails, err)
+ d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
}
-func (d *Device) triggerFlowResultNotification(cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
statusCode, statusMsg := infraerror.GetErrorInfo(err)
success := isFlowOperSuccess(statusCode, oper)
@@ -1001,7 +1001,7 @@
if dbFlow, ok := d.GetFlow(cookie); ok {
dbFlow.State = uint8(state)
dbFlow.ErrorReason = reason
- d.AddFlowToDb(dbFlow)
+ d.AddFlowToDb(cntx, dbFlow)
}
}
@@ -1019,7 +1019,7 @@
logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
} else {
if success && flow != nil {
- if err := d.DelFlow(flow); err != nil {
+ if err := d.DelFlow(cntx, flow); err != nil {
logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
}
} else if !success {
@@ -1038,5 +1038,5 @@
}
logger.Infow(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
- GetController().ProcessFlowModResultIndication(flowResult)
+ GetController().ProcessFlowModResultIndication(cntx, flowResult)
}