VOL-2180 code changes for context addition
Integrating InterContainerProxy interface changes
Change-Id: Ia20c5ac3093b7845acf80cce801ec0c1d90c125f
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index b541978..10b0dbf 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -276,9 +276,9 @@
}
// readIndications to read the indications from the OLT device
-func (dh *DeviceHandler) readIndications() {
+func (dh *DeviceHandler) readIndications(ctx context.Context) {
defer log.Errorw("Indications ended", log.Fields{})
- indications, err := dh.Client.EnableIndication(context.Background(), new(oop.Empty))
+ indications, err := dh.Client.EnableIndication(ctx, new(oop.Empty))
if err != nil {
log.Errorw("Failed to read indications", log.Fields{"err": err})
return
@@ -288,7 +288,7 @@
return
}
/* get device state */
- device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
+ device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
if err != nil || device == nil {
/*TODO: needs to handle error scenarios */
log.Errorw("Failed to fetch device info", log.Fields{"err": err})
@@ -323,7 +323,7 @@
indicationBackoff.Reset()
}
time.Sleep(indicationBackoff.NextBackOff())
- indications, err = dh.Client.EnableIndication(context.Background(), new(oop.Empty))
+ indications, err = dh.Client.EnableIndication(ctx, new(oop.Empty))
if err != nil {
log.Errorw("Failed to read indications", log.Fields{"err": err})
return
@@ -336,8 +336,8 @@
log.Debug("Device deleted stoping the read indication thread")
break
}
- dh.transitionMap.Handle(DeviceDownInd)
- dh.transitionMap.Handle(DeviceInit)
+ dh.transitionMap.Handle(ctx, DeviceDownInd)
+ dh.transitionMap.Handle(ctx, DeviceInit)
break
}
// Reset backoff if we have a successful receive
@@ -351,27 +351,27 @@
log.Infow("olt is admin down, ignore indication", log.Fields{"indication": indication})
continue
}
- dh.handleIndication(indication)
+ dh.handleIndication(ctx, indication)
}
}
-func (dh *DeviceHandler) handleOltIndication(oltIndication *oop.OltIndication) {
+func (dh *DeviceHandler) handleOltIndication(ctx context.Context, oltIndication *oop.OltIndication) {
raisedTs := time.Now().UnixNano()
if oltIndication.OperState == "up" && dh.transitionMap.currentDeviceState != deviceStateUp {
- dh.transitionMap.Handle(DeviceUpInd)
+ dh.transitionMap.Handle(ctx, DeviceUpInd)
} else if oltIndication.OperState == "down" {
- dh.transitionMap.Handle(DeviceDownInd)
+ dh.transitionMap.Handle(ctx, DeviceDownInd)
}
// Send or clear Alarm
dh.eventMgr.oltUpDownIndication(oltIndication, dh.deviceID, raisedTs)
}
-func (dh *DeviceHandler) handleIndication(indication *oop.Indication) {
+func (dh *DeviceHandler) handleIndication(ctx context.Context, indication *oop.Indication) {
raisedTs := time.Now().UnixNano()
switch indication.Data.(type) {
case *oop.Indication_OltInd:
- dh.handleOltIndication(indication.GetOltInd())
+ dh.handleOltIndication(ctx, indication.GetOltInd())
case *oop.Indication_IntfInd:
intfInd := indication.GetIntfInd()
go dh.addPort(intfInd.GetIntfId(), voltha.Port_PON_OLT, intfInd.GetOperState())
@@ -380,7 +380,7 @@
intfOperInd := indication.GetIntfOperInd()
if intfOperInd.GetType() == "nni" {
go dh.addPort(intfOperInd.GetIntfId(), voltha.Port_ETHERNET_NNI, intfOperInd.GetOperState())
- dh.resourceMgr.AddNNIToKVStore(intfOperInd.GetIntfId())
+ dh.resourceMgr.AddNNIToKVStore(ctx, intfOperInd.GetIntfId())
} else if intfOperInd.GetType() == "pon" {
// TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
// Handle pon port update
@@ -392,7 +392,7 @@
onuDiscInd := indication.GetOnuDiscInd()
log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
- go dh.onuDiscIndication(onuDiscInd, sn)
+ go dh.onuDiscIndication(ctx, onuDiscInd, sn)
case *oop.Indication_OnuInd:
onuInd := indication.GetOnuInd()
log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
@@ -404,7 +404,7 @@
case *oop.Indication_PktInd:
pktInd := indication.GetPktInd()
log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
- go dh.handlePacketIndication(pktInd)
+ go dh.handlePacketIndication(ctx, pktInd)
case *oop.Indication_PortStats:
portStats := indication.GetPortStats()
go dh.portStats.PortStatisticsIndication(portStats, dh.resourceMgr.DevInfo.GetPonPorts())
@@ -419,9 +419,9 @@
}
// doStateUp handle the olt up indication and update to voltha core
-func (dh *DeviceHandler) doStateUp() error {
+func (dh *DeviceHandler) doStateUp(ctx context.Context) error {
// Synchronous call to update device state - this method is run in its own go routine
- if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
+ if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
voltha.OperStatus_ACTIVE); err != nil {
log.Errorw("Failed to update device with OLT UP indication", log.Fields{"deviceID": dh.device.Id, "error": err})
return err
@@ -430,12 +430,12 @@
}
// doStateDown handle the olt down indication
-func (dh *DeviceHandler) doStateDown() error {
+func (dh *DeviceHandler) doStateDown(ctx context.Context) error {
dh.lockDevice.Lock()
defer dh.lockDevice.Unlock()
log.Debug("do-state-down-start")
- device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
+ device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
if err != nil || device == nil {
/*TODO: needs to handle error scenarios */
log.Errorw("Failed to fetch device device", log.Fields{"err": err})
@@ -444,7 +444,7 @@
cloned := proto.Clone(device).(*voltha.Device)
// Update the all ports state on that device to disable
- if er := dh.coreProxy.PortsStateUpdate(context.TODO(), cloned.Id, voltha.OperStatus_UNKNOWN); er != nil {
+ if er := dh.coreProxy.PortsStateUpdate(ctx, cloned.Id, voltha.OperStatus_UNKNOWN); er != nil {
log.Errorw("updating-ports-failed", log.Fields{"deviceID": device.Id, "error": er})
return er
}
@@ -454,13 +454,13 @@
cloned.ConnectStatus = common.ConnectStatus_UNREACHABLE
dh.device = cloned
- if er := dh.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); er != nil {
+ if er := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); er != nil {
log.Errorw("error-updating-device-state", log.Fields{"deviceID": device.Id, "error": er})
return er
}
//get the child device for the parent device
- onuDevices, err := dh.coreProxy.GetChildDevices(context.TODO(), dh.device.Id)
+ onuDevices, err := dh.coreProxy.GetChildDevices(ctx, dh.device.Id)
if err != nil {
log.Errorw("failed to get child devices information", log.Fields{"deviceID": dh.device.Id, "error": err})
return err
@@ -470,7 +470,7 @@
// Update onu state as down in onu adapter
onuInd := oop.OnuIndication{}
onuInd.OperState = "down"
- er := dh.AdapterProxy.SendInterAdapterMessage(context.TODO(), &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
+ er := dh.AdapterProxy.SendInterAdapterMessage(ctx, &onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
"openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
if er != nil {
log.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
@@ -486,7 +486,7 @@
}
// doStateInit dial the grpc before going to init state
-func (dh *DeviceHandler) doStateInit() error {
+func (dh *DeviceHandler) doStateInit(ctx context.Context) error {
var err error
dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(), grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
@@ -497,20 +497,20 @@
}
// postInit create olt client instance to invoke RPC on the olt device
-func (dh *DeviceHandler) postInit() error {
+func (dh *DeviceHandler) postInit(ctx context.Context) error {
dh.Client = oop.NewOpenoltClient(dh.clientCon)
- dh.transitionMap.Handle(GrpcConnected)
+ dh.transitionMap.Handle(ctx, GrpcConnected)
return nil
}
// doStateConnected get the device info and update to voltha core
-func (dh *DeviceHandler) doStateConnected() error {
+func (dh *DeviceHandler) doStateConnected(ctx context.Context) error {
log.Debug("OLT device has been connected")
// Case where OLT is disabled and then rebooted.
if dh.adminState == "down" {
log.Debugln("do-state-connected--device-admin-state-down")
- device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
+ device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
if err != nil || device == nil {
/*TODO: needs to handle error scenarios */
log.Errorw("Failed to fetch device device", log.Fields{"err": err})
@@ -520,18 +520,18 @@
cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
cloned.OperStatus = voltha.OperStatus_UNKNOWN
dh.device = cloned
- if er := dh.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); er != nil {
+ if er := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); er != nil {
log.Errorw("error-updating-device-state", log.Fields{"deviceID": dh.device.Id, "error": er})
}
// Since the device was disabled before the OLT was rebooted, enforce the OLT to be Disabled after re-connection.
- _, err = dh.Client.DisableOlt(context.Background(), new(oop.Empty))
+ _, err = dh.Client.DisableOlt(ctx, new(oop.Empty))
if err != nil {
log.Errorw("Failed to disable olt ", log.Fields{"err": err})
}
// Start reading indications
- go dh.readIndications()
+ go dh.readIndications(ctx)
return nil
}
@@ -555,12 +555,12 @@
KVStoreHostPort := fmt.Sprintf("%s:%d", dh.openOLT.KVStoreHost, dh.openOLT.KVStorePort)
// Instantiate resource manager
- if dh.resourceMgr = rsrcMgr.NewResourceMgr(dh.deviceID, KVStoreHostPort, dh.openOLT.KVStoreType, dh.deviceType, deviceInfo); dh.resourceMgr == nil {
+ if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.deviceID, KVStoreHostPort, dh.openOLT.KVStoreType, dh.deviceType, deviceInfo); dh.resourceMgr == nil {
log.Error("Error while instantiating resource manager")
return errors.New("instantiating resource manager failed")
}
// Instantiate flow manager
- if dh.flowMgr = NewFlowManager(dh, dh.resourceMgr); dh.flowMgr == nil {
+ if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
log.Error("Error while instantiating flow manager")
return errors.New("instantiating flow manager failed")
}
@@ -571,7 +571,7 @@
dh.portStats = NewOpenOltStatsMgr(dh)
// Start reading indications
- go dh.readIndications()
+ go dh.readIndications(ctx)
return nil
}
@@ -656,10 +656,10 @@
}
//AdoptDevice adopts the OLT device
-func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
+func (dh *DeviceHandler) AdoptDevice(ctx context.Context, device *voltha.Device) {
dh.transitionMap = NewTransitionMap(dh)
log.Infow("Adopt_device", log.Fields{"deviceID": device.Id, "Address": device.GetHostAndPort()})
- dh.transitionMap.Handle(DeviceInit)
+ dh.transitionMap.Handle(ctx, DeviceInit)
// Now, set the initial PM configuration for that device
if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
@@ -840,13 +840,13 @@
log.Debugw("Sent Omci message", log.Fields{"intfID": intfID, "onuID": onuID, "omciMsg": hex.EncodeToString(omciMsg.Message)})
}
-func (dh *DeviceHandler) activateONU(intfID uint32, onuID int64, serialNum *oop.SerialNumber, serialNumber string) {
+func (dh *DeviceHandler) activateONU(ctx context.Context, intfID uint32, onuID int64, serialNum *oop.SerialNumber, serialNumber string) {
log.Debugw("activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
- dh.flowMgr.UpdateOnuInfo(intfID, uint32(onuID), serialNumber)
+ dh.flowMgr.UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber)
// TODO: need resource manager
var pir uint32 = 1000000
Onu := oop.Onu{IntfId: intfID, OnuId: uint32(onuID), SerialNumber: serialNum, Pir: pir}
- if _, err := dh.Client.ActivateOnu(context.Background(), &Onu); err != nil {
+ if _, err := dh.Client.ActivateOnu(ctx, &Onu); err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.AlreadyExists {
log.Debug("ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
@@ -858,7 +858,7 @@
}
}
-func (dh *DeviceHandler) onuDiscIndication(onuDiscInd *oop.OnuDiscIndication, sn string) {
+func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string) {
channelID := onuDiscInd.GetIntfId()
parentPortNo := IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
@@ -880,19 +880,19 @@
return
}
- onuDevice, err := dh.coreProxy.GetChildDevice(context.TODO(), dh.device.Id, kwargs)
+ onuDevice, err := dh.coreProxy.GetChildDevice(ctx, dh.device.Id, kwargs)
var onuID uint32
if onuDevice == nil || err != nil {
//This is the first time ONU discovered. Create an OnuID for it.
ponintfid := onuDiscInd.GetIntfId()
dh.lockDevice.Lock()
- onuID, err = dh.resourceMgr.GetONUID(ponintfid)
+ onuID, err = dh.resourceMgr.GetONUID(ctx, ponintfid)
dh.lockDevice.Unlock()
if err != nil {
log.Errorw("failed to fetch onuID from resource manager", log.Fields{"pon-intf-id": ponintfid, "err": err})
return
}
- if onuDevice, err = dh.coreProxy.ChildDeviceDetected(context.TODO(), dh.device.Id, int(parentPortNo),
+ if onuDevice, err = dh.coreProxy.ChildDeviceDetected(ctx, dh.device.Id, int(parentPortNo),
"", int(channelID),
string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuID)); onuDevice == nil {
log.Errorw("Create onu error",
@@ -916,7 +916,7 @@
dh.onus.Store(onuKey, onuDev)
log.Debugw("new-onu-device-discovered", log.Fields{"onu": onuDev})
- err = dh.coreProxy.DeviceStateUpdate(context.TODO(), onuDevice.Id, common.ConnectStatus_REACHABLE, common.OperStatus_DISCOVERED)
+ err = dh.coreProxy.DeviceStateUpdate(ctx, onuDevice.Id, common.ConnectStatus_REACHABLE, common.OperStatus_DISCOVERED)
if err != nil {
log.Errorw("failed to update device state", log.Fields{"DeviceID": onuDevice.Id, "err": err})
return
@@ -926,7 +926,7 @@
//In onuIndication the operStatus of device is checked. If it is still not updated in KV store
//then the initialisation fails.
time.Sleep(1 * time.Second)
- dh.activateONU(onuDiscInd.IntfId, int64(onuID), onuDiscInd.SerialNumber, sn)
+ dh.activateONU(ctx, onuDiscInd.IntfId, int64(onuID), onuDiscInd.SerialNumber, sn)
return
}
@@ -982,13 +982,14 @@
}
func (dh *DeviceHandler) updateOnuStates(onuDevice *voltha.Device, onuInd *oop.OnuIndication, foundInCache bool) {
+ ctx := context.TODO()
log.Debugw("onu-indication-for-state", log.Fields{"onuIndication": onuInd, "DeviceId": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
dh.updateOnuAdminState(onuInd)
// operState
if onuInd.OperState == "down" {
log.Debugw("sending-interadapter-onu-indication", log.Fields{"onuIndication": onuInd, "DeviceId": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
// TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
- err := dh.AdapterProxy.SendInterAdapterMessage(context.TODO(), onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
+ err := dh.AdapterProxy.SendInterAdapterMessage(ctx, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
"openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
if err != nil {
log.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
@@ -1002,7 +1003,7 @@
}
log.Debugw("sending-interadapter-onu-indication", log.Fields{"onuIndication": onuInd, "DeviceId": onuDevice.Id, "operStatus": onuDevice.OperStatus, "adminStatus": onuDevice.AdminState})
// TODO NEW CORE do not hardcode adapter name. Handler needs Adapter reference
- err := dh.AdapterProxy.SendInterAdapterMessage(context.TODO(), onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
+ err := dh.AdapterProxy.SendInterAdapterMessage(ctx, onuInd, ic.InterAdapterMessageType_ONU_IND_REQUEST,
"openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
if err != nil {
log.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
@@ -1103,17 +1104,17 @@
}
//UpdateFlowsIncrementally updates the device flow
-func (dh *DeviceHandler) UpdateFlowsIncrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
+func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("Received-incremental-flowupdate-in-device-handler", log.Fields{"deviceID": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
if flows != nil {
for _, flow := range flows.ToRemove.Items {
log.Debug("Removing flow", log.Fields{"deviceId": device.Id, "flowToRemove": flow})
- dh.flowMgr.RemoveFlow(flow)
+ dh.flowMgr.RemoveFlow(ctx, flow)
}
for _, flow := range flows.ToAdd.Items {
log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
- dh.flowMgr.AddFlow(flow, flowMetadata)
+ dh.flowMgr.AddFlow(ctx, flow, flowMetadata)
}
}
if groups != nil && flows != nil {
@@ -1125,10 +1126,10 @@
if groups != nil {
for _, group := range groups.ToAdd.Items {
- dh.flowMgr.AddGroup(group)
+ dh.flowMgr.AddGroup(ctx, group)
}
for _, group := range groups.ToUpdate.Items {
- dh.flowMgr.ModifyGroup(group)
+ dh.flowMgr.ModifyGroup(ctx, group)
}
if len(groups.ToRemove.Items) != 0 {
log.Debug("Group delete operation is not supported for now")
@@ -1245,45 +1246,45 @@
return nil
}
-func (dh *DeviceHandler) clearUNIData(onu *rsrcMgr.OnuGemInfo) error {
+func (dh *DeviceHandler) clearUNIData(ctx context.Context, onu *rsrcMgr.OnuGemInfo) error {
var uniID uint32
var err error
for _, port := range onu.UniPorts {
uniID = UniIDFromPortNum(uint32(port))
log.Debugw("clearing-resource-data-for-uni-port", log.Fields{"port": port, "uniID": uniID})
/* Delete tech-profile instance from the KV store */
- if err = dh.flowMgr.DeleteTechProfileInstances(onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
+ if err = dh.flowMgr.DeleteTechProfileInstances(ctx, onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
log.Debugw("Failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
}
log.Debugw("Deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
- flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(onu.IntfID, int32(onu.OnuID), int32(uniID))
+ flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID))
for _, flowID := range flowIDs {
- dh.resourceMgr.FreeFlowID(onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
+ dh.resourceMgr.FreeFlowID(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
}
- tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(onu.IntfID, onu.OnuID, uniID)
+ tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
for _, tpID := range tpIDList {
- if err = dh.resourceMgr.RemoveMeterIDForOnu("upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+ if err = dh.resourceMgr.RemoveMeterIDForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
log.Debugw("Failed-to-remove-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
}
log.Debugw("Removed-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.RemoveMeterIDForOnu("downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+ if err = dh.resourceMgr.RemoveMeterIDForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
log.Debugw("Failed-to-remove-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
}
log.Debugw("Removed-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
}
- dh.resourceMgr.FreePONResourcesForONU(onu.IntfID, onu.OnuID, uniID)
- if err = dh.resourceMgr.RemoveTechProfileIDsForOnu(onu.IntfID, onu.OnuID, uniID); err != nil {
+ dh.resourceMgr.FreePONResourcesForONU(ctx, onu.IntfID, onu.OnuID, uniID)
+ if err = dh.resourceMgr.RemoveTechProfileIDsForOnu(ctx, onu.IntfID, onu.OnuID, uniID); err != nil {
log.Debugw("Failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
}
log.Debugw("Removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.DelGemPortPktIn(onu.IntfID, onu.OnuID, uint32(port)); err != nil {
+ if err = dh.resourceMgr.DelGemPortPktIn(ctx, onu.IntfID, onu.OnuID, uint32(port)); err != nil {
log.Debugw("Failed-to-remove-gemport-pkt-in", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
}
}
return nil
}
-func (dh *DeviceHandler) clearNNIData() error {
+func (dh *DeviceHandler) clearNNIData(ctx context.Context) error {
nniUniID := -1
nniOnuID := -1
@@ -1291,21 +1292,21 @@
return fmt.Errorf("no resource manager for deviceID %s", dh.deviceID)
}
//Free the flow-ids for the NNI port
- nni, err := dh.resourceMgr.GetNNIFromKVStore()
+ nni, err := dh.resourceMgr.GetNNIFromKVStore(ctx)
if err != nil {
log.Error("Failed to fetch nni from kv store")
return err
}
log.Debugw("NNI are ", log.Fields{"nni": nni})
for _, nniIntfID := range nni {
- flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(uint32(nniIntfID), int32(nniOnuID), int32(nniUniID))
+ flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(ctx, uint32(nniIntfID), int32(nniOnuID), int32(nniUniID))
log.Debugw("Current flow ids for nni", log.Fields{"flow-ids": flowIDs})
for _, flowID := range flowIDs {
- dh.resourceMgr.FreeFlowID(uint32(nniIntfID), -1, -1, uint32(flowID))
+ dh.resourceMgr.FreeFlowID(ctx, uint32(nniIntfID), -1, -1, uint32(flowID))
}
- dh.resourceMgr.RemoveResourceMap(nniIntfID, int32(nniOnuID), int32(nniUniID))
+ dh.resourceMgr.RemoveResourceMap(ctx, nniIntfID, int32(nniOnuID), int32(nniUniID))
}
- if err = dh.resourceMgr.DelNNiFromKVStore(); err != nil {
+ if err = dh.resourceMgr.DelNNiFromKVStore(ctx); err != nil {
log.Error("Failed to clear nni from kv store")
return err
}
@@ -1313,7 +1314,7 @@
}
// DeleteDevice deletes the device instance from openolt handler array. Also clears allocated resource manager resources. Also reboots the OLT hardware!
-func (dh *DeviceHandler) DeleteDevice(device *voltha.Device) error {
+func (dh *DeviceHandler) DeleteDevice(ctx context.Context, device *voltha.Device) error {
log.Debug("Function entry delete device")
dh.lockDevice.Lock()
if dh.adminState == "deleted" {
@@ -1331,7 +1332,7 @@
var ponPort uint32
for ponPort = 0; ponPort < noOfPonPorts; ponPort++ {
var onuGemData []rsrcMgr.OnuGemInfo
- err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ponPort, &onuGemData)
+ err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ctx, ponPort, &onuGemData)
if err != nil {
log.Errorw("Failed to get onu info for port ", log.Fields{"ponport": ponPort})
return err
@@ -1339,19 +1340,19 @@
for _, onu := range onuGemData {
onuID := make([]uint32, 1)
log.Debugw("onu data ", log.Fields{"onu": onu})
- if err = dh.clearUNIData(&onu); err != nil {
+ if err = dh.clearUNIData(ctx, &onu); err != nil {
log.Errorw("Failed to clear data for onu", log.Fields{"onu-device": onu})
}
// Clear flowids for gem cache.
for _, gem := range onu.GemPorts {
- dh.resourceMgr.DeleteFlowIDsForGem(ponPort, gem)
+ dh.resourceMgr.DeleteFlowIDsForGem(ctx, ponPort, gem)
}
onuID[0] = onu.OnuID
- dh.resourceMgr.FreeonuID(ponPort, onuID)
+ dh.resourceMgr.FreeonuID(ctx, ponPort, onuID)
}
- dh.resourceMgr.DeleteIntfIDGempMapPath(ponPort)
+ dh.resourceMgr.DeleteIntfIDGempMapPath(ctx, ponPort)
onuGemData = nil
- err = dh.resourceMgr.DelOnuGemInfoForIntf(ponPort)
+ err = dh.resourceMgr.DelOnuGemInfoForIntf(ctx, ponPort)
if err != nil {
log.Errorw("Failed to update onugem info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
}
@@ -1359,12 +1360,12 @@
/* Clear the flows from KV store associated with NNI port.
There are mostly trap rules from NNI port (like LLDP)
*/
- if err := dh.clearNNIData(); err != nil {
+ if err := dh.clearNNIData(ctx); err != nil {
log.Errorw("Failed to clear data for NNI port", log.Fields{"device-id": dh.deviceID})
}
/* Clear the resource pool for each PON port in the background */
- go dh.resourceMgr.Delete()
+ go dh.resourceMgr.Delete(ctx)
}
/*Delete ONU map for the device*/
@@ -1380,7 +1381,7 @@
dh.stopHeartbeatCheck <- true
//Reset the state
if dh.Client != nil {
- if _, err := dh.Client.Reboot(context.Background(), new(oop.Empty)); err != nil {
+ if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
log.Errorw("Failed-to-reboot-olt ", log.Fields{"deviceID": dh.deviceID, "err": err})
return err
}
@@ -1388,7 +1389,7 @@
cloned := proto.Clone(device).(*voltha.Device)
cloned.OperStatus = voltha.OperStatus_UNKNOWN
cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
- if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ if err := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
log.Errorw("error-updating-device-state", log.Fields{"deviceID": device.Id, "error": err})
return err
}
@@ -1407,12 +1408,12 @@
return nil
}
-func (dh *DeviceHandler) handlePacketIndication(packetIn *oop.PacketIndication) {
+func (dh *DeviceHandler) handlePacketIndication(ctx context.Context, packetIn *oop.PacketIndication) {
log.Debugw("Received packet-in", log.Fields{
"packet-indication": *packetIn,
"packet": hex.EncodeToString(packetIn.Pkt),
})
- logicalPortNum, err := dh.flowMgr.GetLogicalPortFromPacketIn(packetIn)
+ logicalPortNum, err := dh.flowMgr.GetLogicalPortFromPacketIn(ctx, packetIn)
if err != nil {
log.Errorw("Error getting logical port from packet-in", log.Fields{
"error": err,
@@ -1437,7 +1438,7 @@
}
// PacketOut sends packet-out from VOLTHA to OLT on the egress port provided
-func (dh *DeviceHandler) PacketOut(egressPortNo int, packet *of.OfpPacketOut) error {
+func (dh *DeviceHandler) PacketOut(ctx context.Context, egressPortNo int, packet *of.OfpPacketOut) error {
log.Debugw("incoming-packet-out", log.Fields{
"deviceID": dh.deviceID,
"egress_port_no": egressPortNo,
@@ -1469,7 +1470,7 @@
onuID := OnuIDFromPortNum(uint32(egressPortNo))
uniID := UniIDFromPortNum(uint32(egressPortNo))
- gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(intfID, onuID, uint32(egressPortNo))
+ gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(ctx, intfID, onuID, uint32(egressPortNo))
if err != nil {
// In this case the openolt agent will receive the gemPortID as 0.
// The agent tries to retrieve the gemPortID in this case.
@@ -1490,7 +1491,7 @@
"packet": hex.EncodeToString(packet.Data),
})
- if _, err := dh.Client.OnuPacketOut(context.Background(), &onuPkt); err != nil {
+ if _, err := dh.Client.OnuPacketOut(ctx, &onuPkt); err != nil {
log.Errorw("Error while sending packet-out to ONU", log.Fields{
"error": err,
"packet": hex.EncodeToString(packet.Data),
@@ -1505,7 +1506,7 @@
"packet": hex.EncodeToString(packet.Data),
})
- if _, err := dh.Client.UplinkPacketOut(context.Background(), &uplinkPkt); err != nil {
+ if _, err := dh.Client.UplinkPacketOut(ctx, &uplinkPkt); err != nil {
log.Errorw("Error while sending packet-out to NNI", log.Fields{
"error": err,
"packet": hex.EncodeToString(packet.Data),
@@ -1549,7 +1550,7 @@
log.Debug("We got hearbeat after the timeout expired, changing the states")
go dh.notifyChildDevices("up")
- if err := dh.coreProxy.DeviceStateUpdate(context.Background(), dh.device.Id, voltha.ConnectStatus_REACHABLE,
+ if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
voltha.OperStatus_ACTIVE); err != nil {
log.Errorw("Failed to update device state", log.Fields{"deviceID": dh.device.Id, "error": err})
}
@@ -1588,6 +1589,7 @@
}
func (dh *DeviceHandler) invokeDisableorEnablePort(port *voltha.Port, enablePort bool) error {
+ ctx := context.Background()
log.Infow("invokeDisableorEnablePort", log.Fields{"port": port, "Enable": enablePort})
if port.GetType() == voltha.Port_ETHERNET_NNI {
// Bug is opened for VOL-2505 to support NNI disable feature.
@@ -1601,7 +1603,7 @@
var operStatus voltha.OperStatus_Types
if enablePort {
operStatus = voltha.OperStatus_ACTIVE
- out, err := dh.Client.EnablePonIf(context.Background(), ponIntf)
+ out, err := dh.Client.EnablePonIf(ctx, ponIntf)
if err != nil {
log.Errorw("error-while-enable-Pon-port", log.Fields{"DeviceID": dh.device, "Port": port, "error": err})
@@ -1612,7 +1614,7 @@
log.Infow("enabled-pon-port", log.Fields{"out": out, "DeviceID": dh.device, "Port": port})
} else {
operStatus = voltha.OperStatus_UNKNOWN
- out, err := dh.Client.DisablePonIf(context.Background(), ponIntf)
+ out, err := dh.Client.DisablePonIf(ctx, ponIntf)
if err != nil {
log.Errorw("error-while-disabling-interface", log.Fields{"DeviceID": dh.device, "Port": port})
return err
@@ -1621,7 +1623,7 @@
dh.activePorts.Store(ponID, false)
log.Infow("disabled-pon-port", log.Fields{"out": out, "DeviceID": dh.device, "Port": port})
}
- if errs := dh.coreProxy.PortStateUpdate(context.Background(), dh.deviceID, voltha.Port_PON_OLT, port.PortNo, operStatus); errs != nil {
+ if errs := dh.coreProxy.PortStateUpdate(ctx, dh.deviceID, voltha.Port_PON_OLT, port.PortNo, operStatus); errs != nil {
log.Errorw("portstate-update-failed", log.Fields{"Device": dh.deviceID, "port": port.PortNo, "error": errs})
return errs
}
diff --git a/adaptercore/device_handler_test.go b/adaptercore/device_handler_test.go
index 7fbfea8..dcb1f17 100644
--- a/adaptercore/device_handler_test.go
+++ b/adaptercore/device_handler_test.go
@@ -184,7 +184,9 @@
}
dh.resourceMgr.ResourceMgrs[1] = ponmgr
dh.resourceMgr.ResourceMgrs[2] = ponmgr
- dh.flowMgr = NewFlowManager(dh, dh.resourceMgr)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr)
dh.Client = &mocks.MockOpenoltClient{}
dh.eventMgr = &OpenOltEventMgr{eventProxy: &mocks.MockEventProxy{}, handler: dh}
dh.transitionMap = &TransitionMap{}
@@ -669,7 +671,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dh := tt.deviceHandler
- dh.handleIndication(tt.args.indication)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ dh.handleIndication(ctx, tt.args.indication)
})
}
}
@@ -756,7 +760,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dh := newMockDeviceHandler()
- dh.handleOltIndication(tt.args.oltIndication)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ dh.handleOltIndication(ctx, tt.args.oltIndication)
})
}
}
@@ -781,7 +787,9 @@
//dh.doStateInit()
// context.
//dh.AdoptDevice(tt.args.device)
- tt.devicehandler.postInit()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ tt.devicehandler.postInit(ctx)
})
}
}
@@ -808,7 +816,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- tt.devicehandler.activateONU(tt.args.intfID, tt.args.onuID,
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ tt.devicehandler.activateONU(ctx, tt.args.intfID, tt.args.onuID,
tt.args.serialNum, tt.args.serialNumber)
})
}
@@ -855,7 +865,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dh := tt.devicehandler
- if err := dh.PacketOut(tt.args.egressPortNo, tt.args.packet); (err != nil) != tt.wantErr {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := dh.PacketOut(ctx, tt.args.egressPortNo, tt.args.packet); (err != nil) != tt.wantErr {
t.Errorf("DeviceHandler.PacketOut() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -881,7 +893,9 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := tt.devicehandler.doStateUp(); (err != nil) != tt.wantErr {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := tt.devicehandler.doStateUp(ctx); (err != nil) != tt.wantErr {
t.Logf("DeviceHandler.doStateUp() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -903,7 +917,9 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := tt.devicehandler.doStateDown(); (err != nil) != tt.wantErr {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := tt.devicehandler.doStateDown(ctx); (err != nil) != tt.wantErr {
t.Logf("DeviceHandler.doStateDown() error = %v", err)
}
})
@@ -999,7 +1015,9 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- tt.devicehandler.onuDiscIndication(tt.args.onuDiscInd, tt.args.sn)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ tt.devicehandler.onuDiscIndication(ctx, tt.args.onuDiscInd, tt.args.sn)
})
}
}
@@ -1056,7 +1074,9 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- tt.devicehandler.readIndications()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ tt.devicehandler.readIndications(ctx)
})
}
}
diff --git a/adaptercore/olt_state_transitions.go b/adaptercore/olt_state_transitions.go
index 697ee5f..51b7ef0 100644
--- a/adaptercore/olt_state_transitions.go
+++ b/adaptercore/olt_state_transitions.go
@@ -18,6 +18,7 @@
package adaptercore
import (
+ "context"
"reflect"
"runtime"
@@ -57,7 +58,7 @@
)
// TransitionHandler function type for handling transition
-type TransitionHandler func() error
+type TransitionHandler func(ctx context.Context) error
// Transition to store state machine
type Transition struct {
@@ -144,7 +145,7 @@
// Handle moves the state machine to next state based on the trigger and invokes the before and
// after handlers if the transition is a valid transition
-func (tMap *TransitionMap) Handle(trigger Trigger) {
+func (tMap *TransitionMap) Handle(ctx context.Context, trigger Trigger) {
// Check whether the transtion is valid from current state
if !tMap.isValidTransition(trigger) {
@@ -159,7 +160,7 @@
}
for _, handler := range beforeHandlers {
log.Debugw("running-before-handler", log.Fields{"handler": funcName(handler)})
- if err := handler(); err != nil {
+ if err := handler(ctx); err != nil {
// TODO handle error
log.Error(err)
return
@@ -177,7 +178,7 @@
}
for _, handler := range afterHandlers {
log.Debugw("running-after-handler", log.Fields{"handler": funcName(handler)})
- if err := handler(); err != nil {
+ if err := handler(ctx); err != nil {
// TODO handle error
log.Error(err)
return
diff --git a/adaptercore/olt_state_transitions_test.go b/adaptercore/olt_state_transitions_test.go
index 31b7d56..6c256a1 100644
--- a/adaptercore/olt_state_transitions_test.go
+++ b/adaptercore/olt_state_transitions_test.go
@@ -17,9 +17,11 @@
package adaptercore
import (
+ "context"
"errors"
"reflect"
"testing"
+ "time"
)
/**
@@ -43,9 +45,9 @@
transition := Transition{
previousState: []DeviceState{deviceStateConnected},
currentState: deviceStateConnected,
- after: []TransitionHandler{func() error {
+ after: []TransitionHandler{func(ctx context.Context) error {
return nil
- }, func() error {
+ }, func(ctx context.Context) error {
return errors.New("transition error")
}},
}
@@ -61,9 +63,9 @@
transition := Transition{
previousState: []DeviceState{deviceStateConnected},
currentState: deviceStateConnected,
- before: []TransitionHandler{func() error {
+ before: []TransitionHandler{func(ctx context.Context) error {
return nil
- }, func() error {
+ }, func(ctx context.Context) error {
return errors.New("transition error")
}},
}
@@ -121,7 +123,9 @@
transitions: tt.fields.transitions,
currentDeviceState: tt.fields.currentDeviceState,
}
- tMap.Handle(tt.args.trigger)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ tMap.Handle(ctx, tt.args.trigger)
})
}
}
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
index cfa3299..15e8aae 100644
--- a/adaptercore/openolt.go
+++ b/adaptercore/openolt.go
@@ -39,7 +39,7 @@
coreProxy adapterif.CoreProxy
adapterProxy adapterif.AdapterProxy
eventProxy adapterif.EventProxy
- kafkaICProxy *kafka.InterContainerProxy
+ kafkaICProxy kafka.InterContainerProxy
config *config.AdapterFlags
numOnus int
KVStoreHost string
@@ -53,7 +53,7 @@
}
//NewOpenOLT returns a new instance of OpenOLT
-func NewOpenOLT(ctx context.Context, kafkaICProxy *kafka.InterContainerProxy,
+func NewOpenOLT(ctx context.Context, kafkaICProxy kafka.InterContainerProxy,
coreProxy adapterif.CoreProxy, adapterProxy adapterif.AdapterProxy,
eventProxy adapterif.EventProxy, cfg *config.AdapterFlags) *OpenOLT {
var openOLT OpenOLT
@@ -128,7 +128,8 @@
//createDeviceTopic returns
func (oo *OpenOLT) createDeviceTopic(device *voltha.Device) error {
log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
- deviceTopic := kafka.Topic{Name: oo.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
+ defaultTopic := oo.kafkaICProxy.GetDefaultTopic()
+ deviceTopic := kafka.Topic{Name: defaultTopic.Name + "_" + device.Id}
// TODO for the offset
if err := oo.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, 0); err != nil {
log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
@@ -139,6 +140,7 @@
// Adopt_device creates a new device handler if not present already and then adopts the device
func (oo *OpenOLT) Adopt_device(device *voltha.Device) error {
+ ctx := context.Background()
if device == nil {
log.Warn("device-is-nil")
return errors.New("nil-device")
@@ -148,7 +150,7 @@
if handler = oo.getDeviceHandler(device.Id); handler == nil {
handler := NewDeviceHandler(oo.coreProxy, oo.adapterProxy, oo.eventProxy, device, oo)
oo.addDeviceHandlerToMap(handler)
- go handler.AdoptDevice(device)
+ go handler.AdoptDevice(ctx, device)
// Launch the creation of the device topic
// go oo.createDeviceTopic(device)
}
@@ -206,6 +208,7 @@
//Reconcile_device unimplemented
func (oo *OpenOLT) Reconcile_device(device *voltha.Device) error {
+ ctx := context.Background()
if device == nil {
log.Warn("device-is-nil")
return errors.New("nil-device")
@@ -216,7 +219,7 @@
handler := NewDeviceHandler(oo.coreProxy, oo.adapterProxy, oo.eventProxy, device, oo)
oo.addDeviceHandlerToMap(handler)
handler.transitionMap = NewTransitionMap(handler)
- handler.transitionMap.Handle(DeviceInit)
+ handler.transitionMap.Handle(ctx, DeviceInit)
}
return nil
}
@@ -265,8 +268,9 @@
//Delete_device unimplemented
func (oo *OpenOLT) Delete_device(device *voltha.Device) error {
log.Infow("delete-device", log.Fields{"deviceId": device.Id})
+ ctx := context.Background()
if handler := oo.getDeviceHandler(device.Id); handler != nil {
- if err := handler.DeleteDevice(device); err != nil {
+ if err := handler.DeleteDevice(ctx, device); err != nil {
log.Errorw("failed-to-handle-delete-device", log.Fields{"device-id": device.Id})
}
oo.deleteDeviceHandlerToMap(handler)
@@ -289,8 +293,9 @@
//Update_flows_incrementally updates (add/remove) the flows on a given device
func (oo *OpenOLT) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
log.Debugw("Update_flows_incrementally", log.Fields{"deviceId": device.Id, "flows": flows, "flowMetadata": flowMetadata})
+ ctx := context.Background()
if handler := oo.getDeviceHandler(device.Id); handler != nil {
- return handler.UpdateFlowsIncrementally(device, flows, groups, flowMetadata)
+ return handler.UpdateFlowsIncrementally(ctx, device, flows, groups, flowMetadata)
}
log.Errorw("Update_flows_incrementally failed-device-handler-not-set", log.Fields{"deviceId": device.Id})
return errors.New("device-handler-not-set")
@@ -304,8 +309,9 @@
//Receive_packet_out sends packet out to the device
func (oo *OpenOLT) Receive_packet_out(deviceID string, egressPortNo int, packet *openflow_13.OfpPacketOut) error {
log.Debugw("Receive_packet_out", log.Fields{"deviceId": deviceID, "egress_port_no": egressPortNo, "pkt": packet})
+ ctx := context.Background()
if handler := oo.getDeviceHandler(deviceID); handler != nil {
- return handler.PacketOut(egressPortNo, packet)
+ return handler.PacketOut(ctx, egressPortNo, packet)
}
log.Errorw("Receive_packet_out failed-device-handler-not-set", log.Fields{"deviceId": deviceID, "egressport": egressPortNo, "packet": packet})
return errors.New("device-handler-not-set")
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index e882e18..892663f 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -221,7 +221,7 @@
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
-func NewFlowManager(dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
+func NewFlowManager(ctx context.Context, dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
log.Info("Initializing flow manager")
var flowMgr OpenOltFlowMgr
var err error
@@ -241,18 +241,18 @@
ponPorts := rMgr.DevInfo.GetPonPorts()
//Load the onugem info cache from kv store on flowmanager start
for idx = 0; idx < ponPorts; idx++ {
- if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(idx); err != nil {
+ if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(ctx, idx); err != nil {
log.Error("Failed to load onu gem info cache")
}
//Load flowID list per gem map per interface from the kvstore.
- flowMgr.loadFlowIDlistForGem(idx)
+ flowMgr.loadFlowIDlistForGem(ctx, idx)
}
flowMgr.lockCache = sync.RWMutex{}
flowMgr.pendingFlowDelete = sync.Map{}
flowMgr.perUserFlowHandleLock = mapmutex.NewMapMutex()
flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
//load interface to multicast queue map from kv store
- flowMgr.loadInterfaceToMulticastQueueMap()
+ flowMgr.loadInterfaceToMulticastQueueMap(ctx)
log.Info("Initialization of flow manager success!!")
return &flowMgr
}
@@ -273,7 +273,7 @@
}
}
-func (f *OpenOltFlowMgr) registerFlow(flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) {
+func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) {
log.Debug("Registering Flow for Device ", log.Fields{"flow": flowFromCore},
log.Fields{"device": f.deviceHandler.deviceID})
gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
@@ -284,10 +284,10 @@
flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
f.flowsUsedByGemPort[gemPK] = flowIDList
// update the flowids for a gem to the KVstore
- f.resourceMgr.UpdateFlowIDsForGem(uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
+ f.resourceMgr.UpdateFlowIDsForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowIDList)
}
-func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32,
+func (f *OpenOltFlowMgr) divideAndAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) {
var allocID uint32
@@ -309,7 +309,7 @@
tpLockMapKey := tpLockKey{intfID, onuID, uniID}
if f.perUserFlowHandleLock.TryLock(tpLockMapKey) {
- allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
+ allocID, gemPorts, TpInst = f.createTcontGemports(ctx, intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
if allocID == 0 || gemPorts == nil || TpInst == nil {
log.Error("alloc-id-gem-ports-tp-unavailable")
f.perUserFlowHandleLock.Unlock(tpLockMapKey)
@@ -325,7 +325,7 @@
/* Flows can be added specific to gemport if p-bits are received.
* If no pbit mentioned then adding flows for all gemports
*/
- f.checkAndAddFlow(args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
+ f.checkAndAddFlow(ctx, args, classifierInfo, actionInfo, flow, TpInst, gemPorts, TpID, uni)
f.perUserFlowHandleLock.Unlock(tpLockMapKey)
} else {
log.Errorw("failed to acquire per user flow handle lock",
@@ -335,7 +335,7 @@
}
// CreateSchedulerQueues creates traffic schedulers on the device with the given scheduler configuration and traffic shaping info
-func (f *OpenOltFlowMgr) CreateSchedulerQueues(sq schedQueue) error {
+func (f *OpenOltFlowMgr) CreateSchedulerQueues(ctx context.Context, sq schedQueue) error {
log.Debugw("CreateSchedulerQueues", log.Fields{"Dir": sq.direction, "IntfID": sq.intfID,
"OnuID": sq.onuID, "UniID": sq.uniID, "TpID": sq.tpID, "MeterID": sq.meterID,
@@ -352,7 +352,7 @@
*/
var SchedCfg *tp_pb.SchedulerConfig
- KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
log.Error("Failed to get meter for intf %d, onuid %d, uniid %d", sq.intfID, sq.onuID, sq.uniID)
return err
@@ -409,7 +409,7 @@
TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst, SchedCfg, TrafficShaping)}
- if err := f.pushSchedulerQueuesToDevice(sq, TrafficShaping, TrafficSched); err != nil {
+ if err := f.pushSchedulerQueuesToDevice(ctx, sq, TrafficShaping, TrafficSched); err != nil {
log.Errorw("Failed to push traffic scheduler and queues to device", log.Fields{"intfID": sq.intfID, "direction": sq.direction})
return err
}
@@ -417,7 +417,7 @@
/* After we successfully applied the scheduler configuration on the OLT device,
* store the meter id on the KV store, for further reference.
*/
- if err := f.resourceMgr.UpdateMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterConfig); err != nil {
+ if err := f.resourceMgr.UpdateMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterConfig); err != nil {
log.Error("Failed to update meter id for onu %d, meterid %d", sq.onuID, sq.meterID)
return err
}
@@ -426,7 +426,7 @@
return nil
}
-func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(sq schedQueue, TrafficShaping *tp_pb.TrafficShapingInfo, TrafficSched []*tp_pb.TrafficScheduler) error {
+func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(ctx context.Context, sq schedQueue, TrafficShaping *tp_pb.TrafficShapingInfo, TrafficSched []*tp_pb.TrafficScheduler) error {
trafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(sq.tpInst, sq.direction)
@@ -436,7 +436,7 @@
}
log.Debugw("Sending Traffic scheduler create to device", log.Fields{"Direction": sq.direction, "TrafficScheds": TrafficSched})
- if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficScheds: TrafficSched}); err != nil {
@@ -447,7 +447,7 @@
// On receiving the CreateTrafficQueues request, the driver should create corresponding
// downstream queues.
log.Debugw("Sending Traffic Queues create to device", log.Fields{"Direction": sq.direction, "TrafficQueues": trafficQueues})
- if _, err := f.deviceHandler.Client.CreateTrafficQueues(context.Background(),
+ if _, err := f.deviceHandler.Client.CreateTrafficQueues(ctx,
&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: trafficQueues}); err != nil {
@@ -468,7 +468,7 @@
servicePriority: multicastQueuePerPonPort.Priority,
}
//also store the queue info in kv store
- f.resourceMgr.AddMcastQueueForIntf(sq.intfID,
+ f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID,
multicastQueuePerPonPort.GemportId,
multicastQueuePerPonPort.Priority)
}
@@ -478,7 +478,7 @@
}
// RemoveSchedulerQueues removes the traffic schedulers from the device based on the given scheduler configuration and traffic shaping info
-func (f *OpenOltFlowMgr) RemoveSchedulerQueues(sq schedQueue) error {
+func (f *OpenOltFlowMgr) RemoveSchedulerQueues(ctx context.Context, sq schedQueue) error {
var Direction string
var SchedCfg *tp_pb.SchedulerConfig
@@ -498,7 +498,7 @@
return err
}
- KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
log.Errorf("Failed to get Meter for Onu %d", sq.onuID)
return err
@@ -524,7 +524,7 @@
return err
}
- if _, err = f.deviceHandler.Client.RemoveTrafficQueues(context.Background(),
+ if _, err = f.deviceHandler.Client.RemoveTrafficQueues(ctx,
&tp_pb.TrafficQueues{IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficQueues: TrafficQueues}); err != nil {
@@ -532,7 +532,7 @@
return err
}
log.Debug("Removed traffic queues successfully")
- if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(ctx, &tp_pb.TrafficSchedulers{
IntfId: sq.intfID, OnuId: sq.onuID,
UniId: sq.uniID, PortNo: sq.uniPort,
TrafficScheds: TrafficSched}); err != nil {
@@ -545,7 +545,7 @@
/* After we successfully remove the scheduler configuration on the OLT device,
* delete the meter id on the KV store.
*/
- err = f.resourceMgr.RemoveMeterIDForOnu(Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+ err = f.resourceMgr.RemoveMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
if err != nil {
log.Errorf("Failed to remove meter for onu %d, meter id %d", sq.onuID, KVStoreMeter.MeterId)
return err
@@ -555,31 +555,31 @@
}
// This function allocates tconts and GEM ports for an ONU
-func (f *OpenOltFlowMgr) createTcontGemports(intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) (uint32, []uint32, *tp.TechProfile) {
+func (f *OpenOltFlowMgr) createTcontGemports(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) (uint32, []uint32, *tp.TechProfile) {
var allocIDs []uint32
var allgemPortIDs []uint32
var gemPortIDs []uint32
tpInstanceExists := false
var err error
- allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(intfID, onuID, uniID)
- allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
+ allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
+ allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
tpPath := f.getTPpath(intfID, uni, TpID)
log.Infow("creating-new-tcont-and-gem", log.Fields{"pon": intfID, "onu": onuID, "uni": uniID})
// Check tech profile instance already exists for derived port name
- techProfileInstance, _ := f.techprofile[intfID].GetTPInstanceFromKVStore(TpID, tpPath)
+ techProfileInstance, _ := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, TpID, tpPath)
if techProfileInstance == nil {
log.Infow("tp-instance-not-found--creating-new", log.Fields{"path": tpPath})
- techProfileInstance, err = f.techprofile[intfID].CreateTechProfInstance(TpID, uni, intfID)
+ techProfileInstance, err = f.techprofile[intfID].CreateTechProfInstance(ctx, TpID, uni, intfID)
if err != nil {
// This should not happen, something wrong in KV backend transaction
log.Errorw("tp-instance-create-failed", log.Fields{"error": err, "tpID": TpID})
return 0, nil, nil
}
- f.resourceMgr.UpdateTechProfileIDForOnu(intfID, onuID, uniID, TpID)
+ f.resourceMgr.UpdateTechProfileIDForOnu(ctx, intfID, onuID, uniID, TpID)
} else {
log.Debugw("Tech-profile-instance-already-exist-for-given port-name", log.Fields{"uni": uni})
tpInstanceExists = true
@@ -587,7 +587,7 @@
if UsMeterID != 0 {
sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
- if err := f.CreateSchedulerQueues(sq); err != nil {
+ if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
log.Errorw("CreateSchedulerQueues Failed-upstream", log.Fields{"error": err, "meterID": UsMeterID})
return 0, nil, nil
}
@@ -595,7 +595,7 @@
if DsMeterID != 0 {
sq := schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
uniPort: uniPort, tpInst: techProfileInstance, meterID: DsMeterID, flowMetadata: flowMetadata}
- if err := f.CreateSchedulerQueues(sq); err != nil {
+ if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
log.Errorw("CreateSchedulerQueues Failed-downstream", log.Fields{"error": err, "meterID": DsMeterID})
return 0, nil, nil
}
@@ -617,27 +617,27 @@
log.Debugw("Allocated Tcont and GEM ports", log.Fields{"allocIDs": allocIDs, "gemports": allgemPortIDs})
// Send Tconts and GEM ports to KV store
- f.storeTcontsGEMPortsIntoKVStore(intfID, onuID, uniID, allocIDs, allgemPortIDs)
+ f.storeTcontsGEMPortsIntoKVStore(ctx, intfID, onuID, uniID, allocIDs, allgemPortIDs)
return allocID, gemPortIDs, techProfileInstance
}
-func (f *OpenOltFlowMgr) storeTcontsGEMPortsIntoKVStore(intfID uint32, onuID uint32, uniID uint32, allocID []uint32, gemPortIDs []uint32) {
+func (f *OpenOltFlowMgr) storeTcontsGEMPortsIntoKVStore(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, allocID []uint32, gemPortIDs []uint32) {
log.Debugw("Storing allocated Tconts and GEM ports into KV store",
log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "allocID": allocID, "gemPortIDs": gemPortIDs})
/* Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV store */
- if err := f.resourceMgr.UpdateAllocIdsForOnu(intfID, onuID, uniID, allocID); err != nil {
+ if err := f.resourceMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocID); err != nil {
log.Error("Errow while uploading allocID to KV store")
}
- if err := f.resourceMgr.UpdateGEMPortIDsForOnu(intfID, onuID, uniID, gemPortIDs); err != nil {
+ if err := f.resourceMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs); err != nil {
log.Error("Errow while uploading GEMports to KV store")
}
- if err := f.resourceMgr.UpdateGEMportsPonportToOnuMapOnKVStore(gemPortIDs, intfID, onuID, uniID); err != nil {
+ if err := f.resourceMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, gemPortIDs, intfID, onuID, uniID); err != nil {
log.Error("Errow while uploading gemtopon map to KV store")
}
log.Debug("Stored tconts and GEM into KV store successfully")
for _, gemPort := range gemPortIDs {
- f.addGemPortToOnuInfoMap(intfID, onuID, gemPort)
+ f.addGemPortToOnuInfoMap(ctx, intfID, onuID, gemPort)
}
}
@@ -661,18 +661,18 @@
return nil
}
-func (f *OpenOltFlowMgr) addUpstreamDataFlow(intfID uint32, onuID uint32, uniID uint32,
+func (f *OpenOltFlowMgr) addUpstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
portNo uint32, uplinkClassifier map[string]interface{},
uplinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
allocID uint32, gemportID uint32) {
uplinkClassifier[PacketTagType] = SingleTag
log.Debugw("Adding upstream data flow", log.Fields{"uplinkClassifier": uplinkClassifier, "uplinkAction": uplinkAction})
- f.addHSIAFlow(intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
+ f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, uplinkClassifier, uplinkAction,
Upstream, logicalFlow, allocID, gemportID)
/* TODO: Install Secondary EAP on the subscriber vlan */
}
-func (f *OpenOltFlowMgr) addDownstreamDataFlow(intfID uint32, onuID uint32, uniID uint32,
+func (f *OpenOltFlowMgr) addDownstreamDataFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32,
portNo uint32, downlinkClassifier map[string]interface{},
downlinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
allocID uint32, gemportID uint32) {
@@ -702,11 +702,11 @@
return
}
- f.addHSIAFlow(intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
+ f.addHSIAFlow(ctx, intfID, onuID, uniID, portNo, downlinkClassifier, downlinkAction,
Downstream, logicalFlow, allocID, gemportID)
}
-func (f *OpenOltFlowMgr) addHSIAFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
+func (f *OpenOltFlowMgr) addHSIAFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
allocID uint32, gemPortID uint32) {
var networkIntfID uint32
@@ -726,11 +726,11 @@
log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
}
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
if err != nil {
log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
return
@@ -765,10 +765,10 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &flow); ok {
log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
- flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
flow.OnuId,
flow.UniId,
flow.FlowId /*flowCategory,*/, flowsToKVStore); err != nil {
@@ -778,7 +778,7 @@
}
}
-func (f *OpenOltFlowMgr) addDHCPTrapFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
+func (f *OpenOltFlowMgr) addDHCPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
var dhcpFlow openoltpb2.Flow
var actionProto *openoltpb2.Action
@@ -802,12 +802,12 @@
delete(classifier, VlanVid)
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err = f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
+ flowID, err = f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, DhcpFlow, 0 /*classifier[VLAN_PCP].(uint32)*/)
if err != nil {
log.Errorw("flowId unavailable for UL DHCP", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
@@ -840,10 +840,10 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &dhcpFlow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &dhcpFlow); ok {
log.Debug("DHCP UL flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(dhcpFlow.AccessIntfId,
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, dhcpFlow.AccessIntfId,
dhcpFlow.OnuId,
dhcpFlow.UniId,
dhcpFlow.FlowId, flowsToKVStore); err != nil {
@@ -856,13 +856,13 @@
}
//addIGMPTrapFlow creates IGMP trap-to-host flow
-func (f *OpenOltFlowMgr) addIGMPTrapFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
+func (f *OpenOltFlowMgr) addIGMPTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
- f.addUpstreamTrapFlow(intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
+ f.addUpstreamTrapFlow(ctx, intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
}
//addUpstreamTrapFlow creates a trap-to-host flow
-func (f *OpenOltFlowMgr) addUpstreamTrapFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
+func (f *OpenOltFlowMgr) addUpstreamTrapFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string) {
var flow openoltpb2.Flow
@@ -885,12 +885,12 @@
delete(classifier, VlanVid)
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkIntfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkIntfID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, flowType, 0, 0 /*classifier[VLAN_PCP].(uint32)*/)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, flowType, 0, 0 /*classifier[VLAN_PCP].(uint32)*/)
if err != nil {
log.Errorw("flowId unavailable for upstream trap flow", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie, "flowType": flowType})
@@ -923,11 +923,11 @@
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &flow); ok {
log.Debugf("%s UL flow added to device successfully", flowType)
- flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, flow.AccessIntfId,
flow.OnuId,
flow.UniId,
flow.FlowId, flowsToKVStore); err != nil {
@@ -940,7 +940,7 @@
}
// Add EAPOL flow to device with mac, vlanId as classifier for upstream and downstream
-func (f *OpenOltFlowMgr) addEAPOLFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) {
+func (f *OpenOltFlowMgr) addEAPOLFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) {
log.Debugw("Adding EAPOL to device", log.Fields{"intfId": intfID, "onuId": onuID, "portNo": portNo, "allocId": allocID, "gemPortId": gemPortID, "vlanId": vlanID, "flow": logicalFlow})
uplinkClassifier := make(map[string]interface{})
@@ -956,12 +956,12 @@
// Fill action
uplinkAction[TrapToHost] = true
flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(intfID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
//Add Uplink EAPOL Flow
- uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
+ uplinkFlowID, err := f.resourceMgr.GetFlowID(ctx, intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
if err != nil {
log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
return
@@ -999,11 +999,11 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &upstreamFlow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &upstreamFlow); ok {
log.Debug("EAPOL UL flow added to device successfully")
flowCategory := "EAPOL"
- flowsToKVStore := f.getUpdatedFlowInfo(&upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(upstreamFlow.AccessIntfId,
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, upstreamFlow.AccessIntfId,
upstreamFlow.OnuId,
upstreamFlow.UniId,
upstreamFlow.FlowId,
@@ -1085,11 +1085,11 @@
}
// DeleteTechProfileInstances removes the tech profile instances from persistent storage
-func (f *OpenOltFlowMgr) DeleteTechProfileInstances(intfID uint32, onuID uint32, uniID uint32, sn string) error {
- tpIDList := f.resourceMgr.GetTechProfileIDForOnu(intfID, onuID, uniID)
+func (f *OpenOltFlowMgr) DeleteTechProfileInstances(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, sn string) error {
+ tpIDList := f.resourceMgr.GetTechProfileIDForOnu(ctx, intfID, onuID, uniID)
uniPortName := fmt.Sprintf("pon-{%d}/onu-{%d}/uni-{%d}", intfID, onuID, uniID)
for _, tpID := range tpIDList {
- if err := f.DeleteTechProfileInstance(intfID, onuID, uniID, uniPortName, tpID); err != nil {
+ if err := f.DeleteTechProfileInstance(ctx, intfID, onuID, uniID, uniPortName, tpID); err != nil {
log.Debugw("Failed-to-delete-tp-instance-from-kv-store", log.Fields{"tp-id": tpID, "uni-port-name": uniPortName})
// return err
// We should continue to delete tech-profile instances for other TP IDs
@@ -1099,11 +1099,11 @@
}
// DeleteTechProfileInstance removes the tech profile instance from persistent storage
-func (f *OpenOltFlowMgr) DeleteTechProfileInstance(intfID uint32, onuID uint32, uniID uint32, uniPortName string, tpID uint32) error {
+func (f *OpenOltFlowMgr) DeleteTechProfileInstance(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uniPortName string, tpID uint32) error {
if uniPortName == "" {
uniPortName = fmt.Sprintf("pon-{%d}/onu-{%d}/uni-{%d}", intfID, onuID, uniID)
}
- if err := f.techprofile[intfID].DeleteTechProfileInstance(tpID, uniPortName); err != nil {
+ if err := f.techprofile[intfID].DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
log.Debugw("Failed-to-delete-tp-instance-from-kv-store", log.Fields{"tp-id": tpID, "uni-port-name": uniPortName})
return err
}
@@ -1137,7 +1137,7 @@
return generatedHash
}
-func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
+func (f *OpenOltFlowMgr) getUpdatedFlowInfo(ctx context.Context, flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
var flows = []rsrcMgr.FlowInfo{{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie, LogicalFlowID: logicalFlowID}}
var intfID uint32
/* For flows which trap out of the NNI, the AccessIntfId is invalid
@@ -1149,7 +1149,7 @@
intfID = uint32(flow.NetworkIntfId)
}
// Get existing flows matching flowid for given subscriber from KV store
- existingFlows := f.resourceMgr.GetFlowIDInfo(intfID, flow.OnuId, flow.UniId, flow.FlowId)
+ existingFlows := f.resourceMgr.GetFlowIDInfo(ctx, intfID, flow.OnuId, flow.UniId, flow.FlowId)
if existingFlows != nil {
log.Debugw("Flow exists for given flowID, appending it to current flow", log.Fields{"flowID": flow.FlowId})
//for _, f := range *existingFlows {
@@ -1184,9 +1184,9 @@
// return &flows
//}
-func (f *OpenOltFlowMgr) updateFlowInfoToKVStore(intfID int32, onuID int32, uniID int32, flowID uint32, flows *[]rsrcMgr.FlowInfo) error {
+func (f *OpenOltFlowMgr) updateFlowInfoToKVStore(ctx context.Context, intfID int32, onuID int32, uniID int32, flowID uint32, flows *[]rsrcMgr.FlowInfo) error {
log.Debugw("Storing flow(s) into KV store", log.Fields{"flows": *flows})
- if err := f.resourceMgr.UpdateFlowIDInfo(intfID, onuID, uniID, flowID, flows); err != nil {
+ if err := f.resourceMgr.UpdateFlowIDInfo(ctx, intfID, onuID, uniID, flowID, flows); err != nil {
log.Debug("Error while Storing flow into KV store")
return err
}
@@ -1194,7 +1194,7 @@
return nil
}
-func (f *OpenOltFlowMgr) addFlowToDevice(logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) bool {
+func (f *OpenOltFlowMgr) addFlowToDevice(ctx context.Context, logicalFlow *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) bool {
var intfID uint32
/* For flows which trap out of the NNI, the AccessIntfId is invalid
@@ -1218,12 +1218,12 @@
if err != nil {
log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": deviceFlow})
- f.resourceMgr.FreeFlowID(intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
+ f.resourceMgr.FreeFlowID(ctx, intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
return false
}
if deviceFlow.GemportId != -1 {
// No need to register the flow if it is a trap on nni flow.
- f.registerFlow(logicalFlow, deviceFlow)
+ f.registerFlow(ctx, logicalFlow, deviceFlow)
}
log.Debugw("Flow added to device successfully ", log.Fields{"flow": *deviceFlow})
return true
@@ -1266,7 +1266,7 @@
*/
-func (f *OpenOltFlowMgr) addLLDPFlow(flow *ofp.OfpFlowStats, portNo uint32) {
+func (f *OpenOltFlowMgr) addLLDPFlow(ctx context.Context, flow *ofp.OfpFlowStats, portNo uint32) {
classifierInfo := make(map[string]interface{})
actionInfo := make(map[string]interface{})
@@ -1293,11 +1293,11 @@
var networkInterfaceID = IntfIDFromNniPortNum(portNo)
var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
if err != nil {
log.Errorw("Flow id unavailable for LLDP traponNNI flow", log.Fields{"error": err})
@@ -1328,10 +1328,10 @@
Priority: int32(flow.Priority),
Cookie: flow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(flow, &downstreamflow); ok {
+ if ok := f.addFlowToDevice(ctx, flow, &downstreamflow); ok {
log.Debug("LLDP trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, flow.Id)
- if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, flow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
int32(onuID),
int32(uniID),
flowID, flowsToKVStore); err != nil {
@@ -1472,7 +1472,7 @@
}
//clearResources clears pon resources in kv store and the device
-func (f *OpenOltFlowMgr) clearResources(flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
+func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
gemPortID int32, flowID uint32, flowDirection string,
portNum uint32, updatedFlows []rsrcMgr.FlowInfo) error {
@@ -1487,7 +1487,7 @@
// So the flow should not be freed yet.
// For ex: Case of HSIA where same flow is shared
// between DS and US.
- f.updateFlowInfoToKVStore(int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows)
+ f.updateFlowInfoToKVStore(ctx, int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows)
if len(updatedFlows) == 0 {
// Do this for subscriber flows only (not trap from NNI flows)
if onuID != -1 && uniID != -1 {
@@ -1507,12 +1507,12 @@
}
log.Debugw("Releasing flow Id to resource manager", log.Fields{"Intf": Intf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
- f.resourceMgr.FreeFlowID(Intf, int32(onuID), int32(uniID), flowID)
+ f.resourceMgr.FreeFlowID(ctx, Intf, int32(onuID), int32(uniID), flowID)
uni := getUniPortPath(Intf, onuID, uniID)
tpPath := f.getTPpath(Intf, uni, tpID)
log.Debugw("Getting-techprofile-instance-for-subscriber", log.Fields{"TP-PATH": tpPath})
- techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(tpID, tpPath)
+ techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
if err != nil { // This should not happen, something wrong in KV backend transaction
log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": 20, "path": tpPath})
return err
@@ -1531,7 +1531,7 @@
// everytime flowsUsedByGemPort cache is updated the same should be updated
// in kv store by calling UpdateFlowIDsForGem
f.flowsUsedByGemPort[gemPK] = flowIDs
- f.resourceMgr.UpdateFlowIDsForGem(Intf, uint32(gemPortID), flowIDs)
+ f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs)
break
}
}
@@ -1539,17 +1539,17 @@
return nil
}
log.Debugf("Gem port id %d is not used by another flow - releasing the gem port", gemPortID)
- f.resourceMgr.RemoveGemPortIDForOnu(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
// But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
- f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(uint32(gemPortID), Intf)
+ f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), Intf)
f.deleteGemPortFromLocalCache(Intf, uint32(onuID), uint32(gemPortID))
f.onuIdsLock.Lock()
//everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
// by calling DeleteFlowIDsForGem
delete(f.flowsUsedByGemPort, gemPK)
- f.resourceMgr.DeleteFlowIDsForGem(Intf, uint32(gemPortID))
- f.resourceMgr.FreeGemPortID(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
+ f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
f.onuIdsLock.Unlock()
// Delete the gem port on the ONU.
if err := f.sendDeleteGemPortToChild(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
@@ -1557,13 +1557,13 @@
log.Fields{"err": err, "pon": Intf, "onuID": onuID, "uniID": uniID, "gemPortId": gemPortID})
}
- ok, _ := f.isTechProfileUsedByAnotherGem(Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
+ ok, _ := f.isTechProfileUsedByAnotherGem(ctx, Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
if !ok {
- f.resourceMgr.RemoveTechProfileIDForOnu(Intf, uint32(onuID), uint32(uniID), tpID)
- f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
- f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
- f.DeleteTechProfileInstance(Intf, uint32(onuID), uint32(uniID), "", tpID)
- f.resourceMgr.FreeAllocID(Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
+ f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID)
+ f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
+ f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
+ f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID)
+ f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
// Delete the TCONT on the ONU.
if err := f.sendDeleteTcontToChild(Intf, uint32(onuID), uint32(uniID), uint32(techprofileInst.UsScheduler.AllocID), tpPath); err != nil {
log.Errorw("error processing delete tcont towards onu",
@@ -1575,12 +1575,12 @@
return nil
}
-func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowDirection string) {
+func (f *OpenOltFlowMgr) clearFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) {
log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
if flowDirection == Multicast {
- f.clearMulticastFlowFromResourceManager(flow)
+ f.clearMulticastFlowFromResourceManager(ctx, flow)
return
}
@@ -1613,9 +1613,9 @@
log.Debug("Trap on nni flow set oni, uni to -1")
Intf = IntfIDFromNniPortNum(inPort)
}
- flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(Intf, onuID, uniID)
+ flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, Intf, onuID, uniID)
for _, flowID = range flowIds {
- flowInfo := f.resourceMgr.GetFlowIDInfo(Intf, onuID, uniID, flowID)
+ flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flowID)
if flowInfo == nil {
log.Debugw("No FlowInfo found found in KV store",
log.Fields{"Intf": Intf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
@@ -1634,7 +1634,7 @@
log.Debug("Flow removed from device successfully")
//Remove the Flow from FlowInfo
updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
- err = f.clearResources(flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
+ err = f.clearResources(ctx, flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
flowID, flowDirection, portNum, updatedFlows)
if err != nil {
log.Error("Failed to clear resources for flow", log.Fields{"flow": storedFlow})
@@ -1651,10 +1651,10 @@
//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
// clears resources reserved for this multicast flow
-func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) {
classifierInfo := make(map[string]interface{})
formulateClassifierInfoFromFlow(classifierInfo, flow)
- inPort, err := f.getInPortOfMulticastFlow(classifierInfo)
+ inPort, err := f.getInPortOfMulticastFlow(ctx, classifierInfo)
if err != nil {
log.Warnw("No inPort found. Cannot release resources of the multicast flow.", log.Fields{"flowId:": flow.Id})
@@ -1667,10 +1667,10 @@
var flowID uint32
var updatedFlows []rsrcMgr.FlowInfo
- flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(networkInterfaceID, onuID, uniID)
+ flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, networkInterfaceID, onuID, uniID)
for _, flowID = range flowIds {
- flowInfo := f.resourceMgr.GetFlowIDInfo(networkInterfaceID, onuID, uniID, flowID)
+ flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flowID)
if flowInfo == nil {
log.Debugw("No multicast FlowInfo found in the KV store",
log.Fields{"Intf": networkInterfaceID, "onuID": onuID, "uniID": uniID, "flowID": flowID})
@@ -1692,20 +1692,20 @@
log.Debugw("Multicast flow removed from device successfully", log.Fields{"flowId": flow.Id})
//Remove the Flow from FlowInfo
updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
- if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
log.Error("Failed to delete multicast flow from the KV store", log.Fields{"flow": storedFlow, "err": err})
return
}
//release flow id
log.Debugw("Releasing multicast flow id", log.Fields{"flowId": flowID, "interfaceID": networkInterfaceID})
- f.resourceMgr.FreeFlowID(uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
+ f.resourceMgr.FreeFlowID(ctx, uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
}
}
}
}
//RemoveFlow removes the flow from the device
-func (f *OpenOltFlowMgr) RemoveFlow(flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) RemoveFlow(ctx context.Context, flow *ofp.OfpFlowStats) {
log.Debugw("Removing Flow", log.Fields{"flow": flow})
var direction string
actionInfo := make(map[string]interface{})
@@ -1729,7 +1729,7 @@
} else {
direction = Downstream
}
- f.clearFlowFromResourceManager(flow, direction) //TODO: Take care of the limitations
+ f.clearFlowFromResourceManager(ctx, flow, direction) //TODO: Take care of the limitations
return
}
@@ -1770,7 +1770,7 @@
// AddFlow add flow to device
// nolint: gocyclo
-func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
+func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
classifierInfo := make(map[string]interface{})
actionInfo := make(map[string]interface{})
var UsMeterID uint32
@@ -1788,7 +1788,7 @@
if flows.HasGroup(flow) {
// handle multicast flow
- f.handleFlowWithGroup(actionInfo, classifierInfo, flow)
+ f.handleFlowWithGroup(ctx, actionInfo, classifierInfo, flow)
return
}
@@ -1805,7 +1805,7 @@
if ethType, ok := classifierInfo[EthType]; ok {
if ethType.(uint32) == LldpEthType {
log.Info("Adding LLDP flow")
- f.addLLDPFlow(flow, portNo)
+ f.addLLDPFlow(ctx, flow, portNo)
return
}
}
@@ -1814,7 +1814,7 @@
if udpSrc, ok := classifierInfo[UDPSrc]; ok {
if udpSrc.(uint32) == uint32(67) || udpSrc.(uint32) == uint32(546) {
log.Debug("trap-dhcp-from-nni-flow")
- f.addDHCPTrapFlowOnNNI(flow, classifierInfo, portNo)
+ f.addDHCPTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
return
}
}
@@ -1822,12 +1822,12 @@
}
if isIgmpTrapDownstreamFlow(classifierInfo) {
log.Debug("trap-igmp-from-nni-flow")
- f.addIgmpTrapFlowOnNNI(flow, classifierInfo, portNo)
+ f.addIgmpTrapFlowOnNNI(ctx, flow, classifierInfo, portNo)
return
}
f.deviceHandler.AddUniPortToOnu(intfID, onuID, portNo)
- f.resourceMgr.AddUniPortToOnuInfo(intfID, onuID, portNo)
+ f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
TpID, err := getTpIDFromFlow(flow)
if err != nil {
@@ -1847,17 +1847,14 @@
pnFlDelKey := pendingFlowDeleteKey{intfID, onuID, uniID}
if _, ok := f.pendingFlowDelete.Load(pnFlDelKey); !ok {
log.Debugw("no pending flows found, going ahead with flow install", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
- f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+ f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
} else {
- ctx := context.Background()
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
pendingFlowDelComplete := make(chan bool)
go f.waitForFlowDeletesToCompleteForOnu(ctx, intfID, onuID, uniID, pendingFlowDelComplete)
select {
case <-pendingFlowDelComplete:
log.Debugw("all pending flow deletes completed", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
- f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
+ f.divideAndAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
case <-time.After(10 * time.Second):
log.Errorw("pending flow deletes not completed after timeout", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
@@ -1866,11 +1863,11 @@
}
// handleFlowWithGroup adds multicast flow to the device.
-func (f *OpenOltFlowMgr) handleFlowWithGroup(actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) handleFlowWithGroup(ctx context.Context, actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
classifierInfo[PacketTagType] = DoubleTag
log.Debugw("add-multicast-flow", log.Fields{"classifierInfo": classifierInfo, "actionInfo": actionInfo})
- inPort, err := f.getInPortOfMulticastFlow(classifierInfo)
+ inPort, err := f.getInPortOfMulticastFlow(ctx, classifierInfo)
if err != nil {
log.Warnw("No inPort found. Ignoring multicast flow.", log.Fields{"flowId:": flow.Id})
return
@@ -1893,11 +1890,11 @@
networkInterfaceID := IntfIDFromNniPortNum(inPort)
var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debugw("multicast-flow-exists--not-re-adding", log.Fields{"classifierInfo": classifierInfo})
return
}
- flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
if err != nil {
log.Errorw("Flow id unavailable for multicast flow", log.Fields{"error": err})
return
@@ -1917,20 +1914,20 @@
Priority: int32(flow.Priority),
Cookie: flow.Cookie}
- if ok := f.addFlowToDevice(flow, &multicastFlow); ok {
+ if ok := f.addFlowToDevice(ctx, flow, &multicastFlow); ok {
log.Debug("multicast flow added to device successfully")
//get cached group
- group, _, err := f.GetFlowGroupFromKVStore(groupID, true)
+ group, _, err := f.GetFlowGroupFromKVStore(ctx, groupID, true)
if err == nil {
//calling groupAdd to set group members after multicast flow creation
- if f.ModifyGroup(group) {
+ if f.ModifyGroup(ctx, group) {
//cached group can be removed now
- f.resourceMgr.RemoveFlowGroupFromKVStore(groupID, true)
+ f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true)
}
}
- flowsToKVStore := f.getUpdatedFlowInfo(&multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
- if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
int32(onuID),
int32(uniID),
flowID, flowsToKVStore); err != nil {
@@ -1941,12 +1938,12 @@
}
//getInPortOfMulticastFlow return inPort criterion if exists; returns NNI interface of the device otherwise
-func (f *OpenOltFlowMgr) getInPortOfMulticastFlow(classifierInfo map[string]interface{}) (uint32, error) {
+func (f *OpenOltFlowMgr) getInPortOfMulticastFlow(ctx context.Context, classifierInfo map[string]interface{}) (uint32, error) {
if _, ok := classifierInfo[InPort]; ok {
return classifierInfo[InPort].(uint32), nil
}
// find first NNI port of the device
- nniPorts, e := f.resourceMgr.GetNNIFromKVStore()
+ nniPorts, e := f.resourceMgr.GetNNIFromKVStore(ctx)
if e == nil && len(nniPorts) > 0 {
return nniPorts[0], nil
}
@@ -1954,7 +1951,7 @@
}
// AddGroup add or update the group
-func (f *OpenOltFlowMgr) AddGroup(group *ofp.OfpGroupEntry) {
+func (f *OpenOltFlowMgr) AddGroup(ctx context.Context, group *ofp.OfpGroupEntry) {
log.Infow("add-group", log.Fields{"group": group})
if group == nil {
log.Warn("skipping nil group")
@@ -1968,13 +1965,13 @@
}
log.Debugw("Sending group to device", log.Fields{"groupToOlt": groupToOlt})
- _, err := f.deviceHandler.Client.PerformGroupOperation(context.Background(), &groupToOlt)
+ _, err := f.deviceHandler.Client.PerformGroupOperation(ctx, &groupToOlt)
if err != nil {
log.Errorw("add-group operation failed", log.Fields{"err": err, "groupToOlt": groupToOlt})
return
}
// group members not created yet. So let's store the group
- if err := f.resourceMgr.AddFlowGroupToKVStore(group, true); err != nil {
+ if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, true); err != nil {
log.Errorw("Group cannot be stored in KV store", log.Fields{"groupId": group.Desc.GroupId, "err": err})
} else {
log.Debugw("add-group operation performed on the device successfully ", log.Fields{"groupToOlt": groupToOlt})
@@ -1992,7 +1989,7 @@
}
// ModifyGroup updates the group
-func (f *OpenOltFlowMgr) ModifyGroup(group *ofp.OfpGroupEntry) bool {
+func (f *OpenOltFlowMgr) ModifyGroup(ctx context.Context, group *ofp.OfpGroupEntry) bool {
log.Infow("modify-group", log.Fields{"group": group})
if group == nil || group.Desc == nil {
log.Warn("cannot modify group; group is nil")
@@ -2001,7 +1998,7 @@
new := f.buildGroup(group.Desc.GroupId, group.Desc.Buckets)
//get existing members of the group
- val, groupExists, err := f.GetFlowGroupFromKVStore(group.Desc.GroupId, false)
+ val, groupExists, err := f.GetFlowGroupFromKVStore(ctx, group.Desc.GroupId, false)
if err != nil {
log.Errorw("Failed to retrieve the group from the store. Cannot modify group.",
@@ -2039,7 +2036,7 @@
}
if isSuccess {
- if err := f.resourceMgr.AddFlowGroupToKVStore(group, false); err != nil {
+ if err := f.resourceMgr.AddFlowGroupToKVStore(ctx, group, false); err != nil {
log.Errorw("Failed to save the group into kv store", log.Fields{"groupId": group.Desc.GroupId})
}
log.Debugw("modify-group was success. Storing the group", log.Fields{"group": group, "existingGroup": current})
@@ -2173,13 +2170,13 @@
}
//UpdateOnuInfo function adds onu info to cache and kvstore
-func (f *OpenOltFlowMgr) UpdateOnuInfo(intfID uint32, onuID uint32, serialNum string) {
+func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) {
f.lockCache.Lock()
defer f.lockCache.Unlock()
onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
- if err := f.resourceMgr.AddOnuInfo(intfID, onu); err != nil {
+ if err := f.resourceMgr.AddOnuInfo(ctx, intfID, onu); err != nil {
log.Errorw("failed to add onu info", log.Fields{"onu": onu})
return
}
@@ -2187,7 +2184,7 @@
}
//addGemPortToOnuInfoMap function adds GEMport to ONU map
-func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(intfID uint32, onuID uint32, gemPort uint32) {
+func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
f.lockCache.Lock()
defer f.lockCache.Unlock()
onugem := f.onuGemInfo[intfID]
@@ -2206,7 +2203,7 @@
f.onuGemInfo[intfID] = onugem
}
}
- err := f.resourceMgr.AddGemToOnuGemInfo(intfID, onuID, gemPort)
+ err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
if err != nil {
log.Errorw("Failed to add gem to onu", log.Fields{"intfId": intfID, "onuId": onuID, "gemPort": gemPort})
return
@@ -2236,7 +2233,7 @@
}
//GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
-func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(packetIn *openoltpb2.PacketIndication) (uint32, error) {
+func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(ctx context.Context, packetIn *openoltpb2.PacketIndication) (uint32, error) {
var logicalPortNum uint32
var onuID uint32
var err error
@@ -2254,7 +2251,7 @@
logicalPortNum = MkUniPortNum(packetIn.IntfId, onuID, uniID)
}
// Store the gem port through which the packet_in came. Use the same gem port for packet_out
- f.UpdateGemPortForPktIn(packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId)
+ f.UpdateGemPortForPktIn(ctx, packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId)
} else if packetIn.IntfType == "nni" {
logicalPortNum = IntfIDToPortNo(packetIn.IntfId, voltha.Port_ETHERNET_NNI)
}
@@ -2267,7 +2264,7 @@
}
//GetPacketOutGemPortID returns gemPortId
-func (f *OpenOltFlowMgr) GetPacketOutGemPortID(intfID uint32, onuID uint32, portNum uint32) (uint32, error) {
+func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) (uint32, error) {
var gemPortID uint32
var err error
@@ -2281,7 +2278,7 @@
return gemPortID, err
}
//If gem is not found in cache try to get it from kv store, if found in kv store, update the cache and return.
- gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(intfID, onuID, portNum)
+ gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(ctx, intfID, onuID, portNum)
if err == nil {
if gemPortID != 0 {
f.packetInGemPort[pktInkey] = gemPortID
@@ -2294,11 +2291,11 @@
return uint32(0), err
}
-func installFlowOnAllGemports(
- f1 func(intfId uint32, onuId uint32, uniId uint32,
+func installFlowOnAllGemports(ctx context.Context,
+ f1 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32,
portNo uint32, classifier map[string]interface{}, action map[string]interface{},
logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32),
- f2 func(intfId uint32, onuId uint32, uniId uint32, portNo uint32,
+ f2 func(ctx context.Context, intfId uint32, onuId uint32, uniId uint32, portNo uint32,
logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32,
classifier map[string]interface{}, action map[string]interface{}),
args map[string]uint32,
@@ -2310,9 +2307,9 @@
log.Debugw("Installing flow on all GEM ports", log.Fields{"FlowType": FlowType, "gemPorts": gemPorts, "vlan": vlanID})
for _, gemPortID := range gemPorts {
if FlowType == HsiaFlow || FlowType == DhcpFlow {
- f1(args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID)
+ f1(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID)
} else if FlowType == EapolFlow {
- f2(args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortID, vlanID[0], classifier, action)
+ f2(ctx, args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortID, vlanID[0], classifier, action)
} else {
log.Errorw("Unrecognized Flow Type", log.Fields{"FlowType": FlowType})
return
@@ -2320,7 +2317,7 @@
}
}
-func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
log.Debug("Adding trap-dhcp-of-nni-flow")
action := make(map[string]interface{})
classifier[PacketTagType] = DoubleTag
@@ -2349,11 +2346,11 @@
}
flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
if err != nil {
log.Errorw("Flow id unavailable for DHCP traponNNI flow", log.Fields{"error": err})
return
@@ -2383,10 +2380,10 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &downstreamflow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); ok {
log.Debug("DHCP trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
int32(onuID),
int32(uniID),
flowID, flowsToKVStore); err != nil {
@@ -2423,7 +2420,7 @@
}
//addIgmpTrapFlowOnNNI adds a trap-to-host flow on NNI
-func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(ctx context.Context, logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
log.Debugw("Adding igmp-trap-of-nni-flow", log.Fields{"classifierInfo": classifier})
action := make(map[string]interface{})
classifier[PacketTagType] = getPacketTypeFromClassifiers(classifier)
@@ -2449,11 +2446,11 @@
return
}
flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
- if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+ if present := f.resourceMgr.IsFlowCookieOnKVStore(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
log.Debug("igmp-flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
+ flowID, err := f.resourceMgr.GetFlowID(ctx, uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
if err != nil {
log.Errorw("IGMP flow id unavailable for trap-on-NNI flow", log.Fields{"error": err})
return
@@ -2483,10 +2480,10 @@
Priority: int32(logicalFlow.Priority),
Cookie: logicalFlow.Cookie,
PortNo: portNo}
- if ok := f.addFlowToDevice(logicalFlow, &downstreamflow); ok {
+ if ok := f.addFlowToDevice(ctx, logicalFlow, &downstreamflow); ok {
log.Debug("IGMP Trap on NNI flow added to device successfully")
- flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
- if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+ flowsToKVStore := f.getUpdatedFlowInfo(ctx, &downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+ if err := f.updateFlowInfoToKVStore(ctx, int32(networkInterfaceID),
int32(onuID),
int32(uniID),
flowID, flowsToKVStore); err != nil {
@@ -2509,7 +2506,7 @@
return "", nil
}
-func (f *OpenOltFlowMgr) checkAndAddFlow(args map[string]uint32, classifierInfo map[string]interface{},
+func (f *OpenOltFlowMgr) checkAndAddFlow(ctx context.Context, args map[string]uint32, classifierInfo map[string]interface{},
actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpInst *tp.TechProfile, gemPorts []uint32,
TpID uint32, uni string) {
var gemPort uint32
@@ -2526,10 +2523,10 @@
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
//Adding DHCP upstream flow
- f.addDHCPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
+ f.addDHCPTrapFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding DHCP upstream flow to all gemports
- installFlowOnAllGemports(f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, DhcpFlow)
+ installFlowOnAllGemports(ctx, f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, DhcpFlow)
}
} else if ipProto == IgmpProto {
@@ -2538,10 +2535,10 @@
gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
- f.addIGMPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
+ f.addIGMPTrapFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding IGMP upstream flow to all gem ports
- installFlowOnAllGemports(f.addIGMPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, IgmpFlow)
+ installFlowOnAllGemports(ctx, f.addIGMPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, IgmpFlow)
}
} else {
log.Errorw("Invalid-Classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
@@ -2561,9 +2558,9 @@
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
- f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID, gemPort, vlanID, classifierInfo, actionInfo)
+ f.addEAPOLFlow(ctx, intfID, onuID, uniID, portNo, flow, allocID, gemPort, vlanID, classifierInfo, actionInfo)
} else {
- installFlowOnAllGemports(nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanID)
+ installFlowOnAllGemports(ctx, nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanID)
}
}
} else if _, ok := actionInfo[PushVlan]; ok {
@@ -2573,10 +2570,10 @@
tp_pb.Direction_UPSTREAM,
pcp.(uint32))
//Adding HSIA upstream flow
- f.addUpstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
+ f.addUpstreamDataFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding HSIA upstream flow to all gemports
- installFlowOnAllGemports(f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
+ installFlowOnAllGemports(ctx, f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
}
} else if _, ok := actionInfo[PopVlan]; ok {
log.Info("Adding Downstream data rule")
@@ -2585,10 +2582,10 @@
tp_pb.Direction_DOWNSTREAM,
pcp.(uint32))
//Adding HSIA downstream flow
- f.addDownstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
+ f.addDownstreamDataFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
} else {
//Adding HSIA downstream flow to all gemports
- installFlowOnAllGemports(f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
+ installFlowOnAllGemports(ctx, f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
}
} else {
log.Errorw("Invalid-flow-type-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo, "flow": flow})
@@ -2606,8 +2603,8 @@
return false
}
-func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ponIntf uint32, onuID uint32, uniID uint32, tpID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
- currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ponIntf, onuID, uniID)
+func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
+ currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, ponIntf, onuID, uniID)
tpGemPorts := tpInst.UpstreamGemPortAttributeList
for _, currentGemPort := range currentGemPorts {
for _, tpGemPort := range tpGemPorts {
@@ -2618,14 +2615,14 @@
}
if tpInst.InstanceCtrl.Onu == "single-instance" {
// The TP information for the given TP ID, PON ID, ONU ID, UNI ID should be removed.
- f.resourceMgr.RemoveTechProfileIDForOnu(ponIntf, uint32(onuID), uint32(uniID), tpID)
- f.DeleteTechProfileInstance(ponIntf, uint32(onuID), uint32(uniID), "", tpID)
+ f.resourceMgr.RemoveTechProfileIDForOnu(ctx, ponIntf, uint32(onuID), uint32(uniID), tpID)
+ f.DeleteTechProfileInstance(ctx, ponIntf, uint32(onuID), uint32(uniID), "", tpID)
// Although we cleaned up TP Instance for the given (PON ID, ONU ID, UNI ID), the TP might
// still be used on other uni ports.
// So, we need to check and make sure that no other gem port is referring to the given TP ID
// on any other uni port.
- tpInstances := f.techprofile[ponIntf].FindAllTpInstances(tpID, ponIntf, onuID)
+ tpInstances := f.techprofile[ponIntf].FindAllTpInstances(ctx, tpID, ponIntf, onuID)
log.Debugw("got single instance tp instances", log.Fields{"tpInstances": tpInstances})
for i := 0; i < len(tpInstances); i++ {
tpI := tpInstances[i]
@@ -2839,7 +2836,7 @@
}
// UpdateGemPortForPktIn updates gemport for packet-in in to the cache and to the kv store as well.
-func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
+func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort}
f.lockCache.Lock()
@@ -2854,13 +2851,13 @@
}
f.packetInGemPort[pktInkey] = gemPort
- f.resourceMgr.UpdateGemPortForPktIn(pktInkey, gemPort)
+ f.resourceMgr.UpdateGemPortForPktIn(ctx, pktInkey, gemPort)
log.Debugw("pktin key not found in local cache or value is different. updating cache and kv store", log.Fields{"pktinkey": pktInkey, "gem": gemPort})
return
}
// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
-func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(intfID uint32, onuID uint32, portNum uint32) {
+func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
f.lockCache.Lock()
defer f.lockCache.Unlock()
@@ -2878,11 +2875,11 @@
f.onuGemInfo[intfID] = onugem
}
}
- f.resourceMgr.AddUniPortToOnuInfo(intfID, onuID, portNum)
+ f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNum)
}
-func (f *OpenOltFlowMgr) loadFlowIDlistForGem(intf uint32) {
- flowIDsList, err := f.resourceMgr.GetFlowIDsGemMapForInterface(intf)
+func (f *OpenOltFlowMgr) loadFlowIDlistForGem(ctx context.Context, intf uint32) {
+ flowIDsList, err := f.resourceMgr.GetFlowIDsGemMapForInterface(ctx, intf)
if err != nil {
log.Error("Failed to get flowid list per gem", log.Fields{"intf": intf})
return
@@ -2896,8 +2893,8 @@
//loadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
//and put them into interfaceToMcastQueueMap.
-func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap() {
- storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap()
+func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap(ctx context.Context) {
+ storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap(ctx)
if err != nil {
log.Error("Failed to get pon interface to multicast queue map")
return
@@ -2914,8 +2911,8 @@
//GetFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
//Returns (nil, false, nil) if the group does not exists in the KV store.
-func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
- exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(groupID, cached)
+func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
+ exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(ctx, groupID, cached)
if err != nil {
log.Errorw("Failed to get the flow group from KV store", log.Fields{"groupId": groupID, "err": err})
return nil, false, errors.New("failed to retrieve the flow group")
diff --git a/adaptercore/openolt_flowmgr_test.go b/adaptercore/openolt_flowmgr_test.go
index 8342a79..a3dd062 100644
--- a/adaptercore/openolt_flowmgr_test.go
+++ b/adaptercore/openolt_flowmgr_test.go
@@ -18,8 +18,10 @@
package adaptercore
import (
+ "context"
"fmt"
"testing"
+ "time"
"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -52,7 +54,9 @@
GemportIdStart: 1, GemportIdEnd: 1, FlowIdStart: 1, FlowIdEnd: 1,
Ranges: ranges,
}
- rsrMgr := resourcemanager.NewResourceMgr("olt", "127.0.0.1:2379", "etcd", "olt", deviceinfo)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ rsrMgr := resourcemanager.NewResourceMgr(ctx, "olt", "127.0.0.1:2379", "etcd", "olt", deviceinfo)
for key := range rsrMgr.ResourceMgrs {
rsrMgr.ResourceMgrs[key].KVStore = &db.Backend{}
rsrMgr.ResourceMgrs[key].KVStore.Client = &mocks.MockKVClient{}
@@ -69,7 +73,9 @@
rMgr.KVStore.Client = &mocks.MockKVClient{}
dh.resourceMgr = rMgr
- flwMgr := NewFlowManager(dh, rMgr)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ flwMgr := NewFlowManager(ctx, dh, rMgr)
onuGemInfo1 := make([]rsrcMgr.OnuGemInfo, 2)
onuGemInfo2 := make([]rsrcMgr.OnuGemInfo, 2)
@@ -151,9 +157,11 @@
{"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
{"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 2, nil}, true},
}
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.CreateSchedulerQueues(tt.schedQueue); (err != nil) != tt.wantErr {
+ if err := flowMgr.CreateSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.CreateSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -196,9 +204,11 @@
{"RemoveSchedulerQueues", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
{"RemoveSchedulerQueues", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 1, 1, 65, 1, tprofile2, 0, nil}, false},
}
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.RemoveSchedulerQueues(tt.schedQueue); (err != nil) != tt.wantErr {
+ if err := flowMgr.RemoveSchedulerQueues(ctx, tt.schedQueue); (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.RemoveSchedulerQueues() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -281,9 +291,11 @@
{"RemoveFlow", args{flow: dhcpofpstats}},
{"RemoveFlow", args{flow: multicastOfpStats}},
}
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- flowMgr.RemoveFlow(tt.args.flow)
+ flowMgr.RemoveFlow(ctx, tt.args.flow)
})
}
// t.Error("=====")
@@ -524,9 +536,11 @@
//ofpstats10
{"AddFlow", args{flow: ofpstats11, flowMetadata: flowMetadata}},
}
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- flowMgr.AddFlow(tt.args.flow, tt.args.flowMetadata)
+ flowMgr.AddFlow(ctx, tt.args.flow, tt.args.flowMetadata)
})
}
}
@@ -546,10 +560,12 @@
{"UpdateOnuInfo", args{1, 1, "onu1"}},
{"UpdateOnuInfo", args{2, 3, "onu1"}},
}
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- flowMgr.UpdateOnuInfo(tt.args.intfID, tt.args.onuID, tt.args.serialNum)
+ flowMgr.UpdateOnuInfo(ctx, tt.args.intfID, tt.args.onuID, tt.args.serialNum)
})
}
}
@@ -572,10 +588,12 @@
{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 2, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
{"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 4112, false},
}
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := flowMgr.GetLogicalPortFromPacketIn(tt.args.packetIn)
+ got, err := flowMgr.GetLogicalPortFromPacketIn(ctx, tt.args.packetIn)
if (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.GetLogicalPortFromPacketIn() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -606,10 +624,12 @@
{"GetPacketOutGemPortID", args{intfID: 2, onuID: 2, portNum: 2}, 2, false},
{"GetPacketOutGemPortID", args{intfID: 1, onuID: 2, portNum: 2}, 0, true},
}
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := flowMgr.GetPacketOutGemPortID(tt.args.intfID, tt.args.onuID, tt.args.portNum)
+ got, err := flowMgr.GetPacketOutGemPortID(ctx, tt.args.intfID, tt.args.onuID, tt.args.portNum)
if (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.GetPacketOutGemPortID() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -639,9 +659,11 @@
// TODO: Add test cases.
{"DeleteTechProfileInstance", args{intfID: 0, onuID: 1, uniID: 1, sn: "", tpID: 64}, false},
}
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := flowMgr.DeleteTechProfileInstance(tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.sn, tt.args.tpID); (err != nil) != tt.wantErr {
+ if err := flowMgr.DeleteTechProfileInstance(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.sn, tt.args.tpID); (err != nil) != tt.wantErr {
t.Errorf("OpenOltFlowMgr.DeleteTechProfileInstance() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -889,18 +911,22 @@
},
},
}
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- flowMgr.checkAndAddFlow(tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
+ flowMgr.checkAndAddFlow(ctx, tt.args.args, tt.args.classifierInfo, tt.args.actionInfo, tt.args.flow,
tt.args.TpInst, tt.args.gemPorts, tt.args.TpID, tt.args.uni)
})
}
}
func TestOpenOltFlowMgr_TestMulticastFlow(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
//create group
group := newGroup(2, []uint32{1})
- flowMgr.AddGroup(group)
+ flowMgr.AddGroup(ctx, group)
//create multicast flow
multicastFlowArgs := &fu.FlowArgs{
@@ -916,10 +942,10 @@
},
}
ofpStats := fu.MkFlowStat(multicastFlowArgs)
- flowMgr.AddFlow(ofpStats, &voltha.FlowMetadata{})
+ flowMgr.AddFlow(ctx, ofpStats, &voltha.FlowMetadata{})
//add bucket to the group
group = newGroup(2, []uint32{1, 2})
- flowMgr.ModifyGroup(group)
+ flowMgr.ModifyGroup(ctx, group)
}
diff --git a/adaptercore/openolt_test.go b/adaptercore/openolt_test.go
index 7bdfdcd..0b6b366 100644
--- a/adaptercore/openolt_test.go
+++ b/adaptercore/openolt_test.go
@@ -44,7 +44,7 @@
coreProxy *com.CoreProxy
adapterProxy *com.AdapterProxy
eventProxy *com.EventProxy
- kafkaICProxy *kafka.InterContainerProxy
+ kafkaICProxy kafka.InterContainerProxy
numOnus int
KVStoreHost string
KVStorePort int
diff --git a/adaptercore/resourcemanager/resourcemanager.go b/adaptercore/resourcemanager/resourcemanager.go
index 0e3da4d..db5f044 100755
--- a/adaptercore/resourcemanager/resourcemanager.go
+++ b/adaptercore/resourcemanager/resourcemanager.go
@@ -18,6 +18,7 @@
package resourcemanager
import (
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -143,7 +144,7 @@
// NewResourceMgr init a New resource manager instance which in turn instantiates pon resource manager
// instances according to technology. Initializes the default resource ranges for all
// the resources.
-func NewResourceMgr(deviceID string, KVStoreHostPort string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo) *OpenOltResourceMgr {
+func NewResourceMgr(ctx context.Context, deviceID string, KVStoreHostPort string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo) *OpenOltResourceMgr {
var ResourceMgr OpenOltResourceMgr
log.Debugf("Init new resource manager , host_port: %s, deviceid: %s", KVStoreHostPort, deviceID)
ResourceMgr.HostAndPort = KVStoreHostPort
@@ -233,13 +234,13 @@
ResourceMgr.ResourceMgrs[uint32(IntfID)] = RsrcMgrsByTech[technology]
}
// self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
- InitializeDeviceResourceRangeAndPool(RsrcMgrsByTech[technology], GlobalPONRsrcMgr,
+ InitializeDeviceResourceRangeAndPool(ctx, RsrcMgrsByTech[technology], GlobalPONRsrcMgr,
TechRange, devInfo)
}
// After we have initialized resource ranges, initialize the
// resource pools accordingly.
for _, PONRMgr := range RsrcMgrsByTech {
- _ = PONRMgr.InitDeviceResourcePool()
+ _ = PONRMgr.InitDeviceResourcePool(ctx)
}
log.Info("Initialization of resource manager success!")
return &ResourceMgr
@@ -249,14 +250,14 @@
// device specific information. If KV doesn't exist
// or is broader than the device, the device's information will
// dictate the range limits
-func InitializeDeviceResourceRangeAndPool(ponRMgr *ponrmgr.PONResourceManager, globalPONRMgr *ponrmgr.PONResourceManager,
+func InitializeDeviceResourceRangeAndPool(ctx context.Context, ponRMgr *ponrmgr.PONResourceManager, globalPONRMgr *ponrmgr.PONResourceManager,
techRange *openolt.DeviceInfo_DeviceResourceRanges, devInfo *openolt.DeviceInfo) {
// init the resource range pool according to the sharing type
log.Debugf("Resource range pool init for technology %s", ponRMgr.Technology)
// first load from KV profiles
- status := ponRMgr.InitResourceRangesFromKVStore()
+ status := ponRMgr.InitResourceRangesFromKVStore(ctx)
if !status {
log.Debugf("Failed to load resource ranges from KV store for tech %s", ponRMgr.Technology)
}
@@ -384,7 +385,7 @@
}
// Delete clears used resources for the particular olt device being deleted
-func (RsrcMgr *OpenOltResourceMgr) Delete() error {
+func (RsrcMgr *OpenOltResourceMgr) Delete(ctx context.Context) error {
/* TODO
def __del__(self):
self.log.info("clearing-device-resource-pool")
@@ -407,7 +408,7 @@
self.resource_mgrs[pon_intf_id].assert_resource_limits(uni_id, PONResourceManager.UNI_ID)
*/
for _, rsrcMgr := range RsrcMgr.ResourceMgrs {
- if err := rsrcMgr.ClearDeviceResourcePool(); err != nil {
+ if err := rsrcMgr.ClearDeviceResourcePool(ctx); err != nil {
log.Debug("Failed to clear device resource pool")
return err
}
@@ -417,14 +418,14 @@
}
// GetONUID returns the available OnuID for the given pon-port
-func (RsrcMgr *OpenOltResourceMgr) GetONUID(ponIntfID uint32) (uint32, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetONUID(ctx context.Context, ponIntfID uint32) (uint32, error) {
// Check if Pon Interface ID is present in Resource-manager-map
if _, ok := RsrcMgr.ResourceMgrs[ponIntfID]; !ok {
err := errors.New("invalid-pon-interface-" + strconv.Itoa(int(ponIntfID)))
return 0, err
}
// Get ONU id for a provided pon interface ID.
- ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ponIntfID,
+ ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
ponrmgr.ONU_ID, 1)
if err != nil {
log.Errorf("Failed to get resource for interface %d for type %s",
@@ -432,7 +433,7 @@
return 0, err
}
if ONUID != nil {
- RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
+ RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(ctx, fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
return ONUID[0], err
}
@@ -442,11 +443,11 @@
// GetFlowIDInfo returns the slice of flow info of the given pon-port
// Note: For flows which trap from the NNI and not really associated with any particular
// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ponIntfID uint32, onuID int32, uniID int32, flowID uint32) *[]FlowInfo {
+func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint32) *[]FlowInfo {
var flows []FlowInfo
FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- if err := RsrcMgr.ResourceMgrs[ponIntfID].GetFlowIDInfo(FlowPath, flowID, &flows); err != nil {
+ if err := RsrcMgr.ResourceMgrs[ponIntfID].GetFlowIDInfo(ctx, FlowPath, flowID, &flows); err != nil {
log.Errorw("Error while getting flows from KV store", log.Fields{"flowId": flowID})
return nil
}
@@ -460,11 +461,11 @@
// GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
// Note: For flows which trap from the NNI and not really associated with any particular
// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(PONIntfID uint32, ONUID int32, UNIID int32) []uint32 {
+func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, PONIntfID uint32, ONUID int32, UNIID int32) []uint32 {
FlowPath := fmt.Sprintf("%d,%d,%d", PONIntfID, ONUID, UNIID)
if mgrs, exist := RsrcMgr.ResourceMgrs[PONIntfID]; exist {
- return mgrs.GetCurrentFlowIDsForOnu(FlowPath)
+ return mgrs.GetCurrentFlowIDsForOnu(ctx, FlowPath)
}
return nil
}
@@ -472,25 +473,25 @@
// UpdateFlowIDInfo updates flow info for the given pon interface, onu id, and uni id
// Note: For flows which trap from the NNI and not really associated with any particular
// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDInfo(ponIntfID int32, onuID int32, uniID int32,
+func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDInfo(ctx context.Context, ponIntfID int32, onuID int32, uniID int32,
flowID uint32, flowData *[]FlowInfo) error {
FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- return RsrcMgr.ResourceMgrs[uint32(ponIntfID)].UpdateFlowIDInfoForOnu(FlowPath, flowID, *flowData)
+ return RsrcMgr.ResourceMgrs[uint32(ponIntfID)].UpdateFlowIDInfoForOnu(ctx, FlowPath, flowID, *flowData)
}
// GetFlowID return flow ID for a given pon interface id, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ponIntfID uint32, ONUID int32, uniID int32,
+func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ctx context.Context, ponIntfID uint32, ONUID int32, uniID int32,
gemportID uint32,
flowStoreCookie uint64,
flowCategory string, vlanPcp ...uint32) (uint32, error) {
var err error
FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, ONUID, uniID)
- FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(FlowPath)
+ FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(ctx, FlowPath)
if FlowIDs != nil {
log.Debugw("Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "ONUID": ONUID, "uniID": uniID, "KVpath": FlowPath})
for _, flowID := range FlowIDs {
- FlowInfo := RsrcMgr.GetFlowIDInfo(ponIntfID, int32(ONUID), int32(uniID), uint32(flowID))
+ FlowInfo := RsrcMgr.GetFlowIDInfo(ctx, ponIntfID, int32(ONUID), int32(uniID), uint32(flowID))
er := getFlowIDFromFlowInfo(FlowInfo, flowID, gemportID, flowStoreCookie, flowCategory, vlanPcp...)
if er == nil {
return flowID, er
@@ -498,7 +499,7 @@
}
}
log.Debug("No matching flows with flow cookie or flow category, allocating new flowid")
- FlowIDs, err = RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ponIntfID,
+ FlowIDs, err = RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
ponrmgr.FLOW_ID, 1)
if err != nil {
log.Errorf("Failed to get resource for interface %d for type %s",
@@ -506,7 +507,7 @@
return uint32(0), err
}
if FlowIDs != nil {
- _ = RsrcMgr.ResourceMgrs[ponIntfID].UpdateFlowIDForOnu(FlowPath, FlowIDs[0], true)
+ _ = RsrcMgr.ResourceMgrs[ponIntfID].UpdateFlowIDForOnu(ctx, FlowPath, FlowIDs[0], true)
return FlowIDs[0], err
}
@@ -516,11 +517,11 @@
// GetAllocID return the first Alloc ID for a given pon interface id and onu id and then update the resource map on
// the KV store with the list of alloc_ids allocated for the pon_intf_onu_id tuple
// Currently of all the alloc_ids available, it returns the first alloc_id in the list for tha given ONU
-func (RsrcMgr *OpenOltResourceMgr) GetAllocID(intfID uint32, onuID uint32, uniID uint32) uint32 {
+func (RsrcMgr *OpenOltResourceMgr) GetAllocID(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) uint32 {
var err error
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(IntfOnuIDUniID)
+ AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
if AllocID != nil {
// Since we support only one alloc_id for the ONU at the moment,
// return the first alloc_id in the list, if available, for that
@@ -528,7 +529,7 @@
log.Debugw("Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
return AllocID[0]
}
- AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(intfID,
+ AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(ctx, intfID,
ponrmgr.ALLOC_ID, 1)
if AllocID == nil || err != nil {
@@ -537,7 +538,7 @@
}
// update the resource map on KV store with the list of alloc_id
// allocated for the pon_intf_onu_id tuple
- err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(IntfOnuIDUniID, AllocID)
+ err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(ctx, IntfOnuIDUniID, AllocID)
if err != nil {
log.Error("Failed to update Alloc ID")
return 0
@@ -547,28 +548,28 @@
}
// UpdateAllocIdsForOnu updates alloc ids in kv store for a given pon interface id, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) UpdateAllocIdsForOnu(ponPort uint32, onuID uint32, uniID uint32, allocID []uint32) error {
+func (RsrcMgr *OpenOltResourceMgr) UpdateAllocIdsForOnu(ctx context.Context, ponPort uint32, onuID uint32, uniID uint32, allocID []uint32) error {
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
- return RsrcMgr.ResourceMgrs[ponPort].UpdateAllocIdsForOnu(IntfOnuIDUniID,
+ return RsrcMgr.ResourceMgrs[ponPort].UpdateAllocIdsForOnu(ctx, IntfOnuIDUniID,
allocID)
}
// GetCurrentGEMPortIDsForOnu returns gem ports for given pon interface , onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentGEMPortIDsForOnu(intfID uint32, onuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) GetCurrentGEMPortIDsForOnu(ctx context.Context, intfID uint32, onuID uint32,
uniID uint32) []uint32 {
/* Get gem ports for given pon interface , onu id and uni id. */
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- return RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(IntfOnuIDUniID)
+ return RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
}
// GetCurrentAllocIDsForOnu returns alloc ids for given pon interface and onu id
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentAllocIDsForOnu(intfID uint32, onuID uint32, uniID uint32) []uint32 {
+func (RsrcMgr *OpenOltResourceMgr) GetCurrentAllocIDsForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) []uint32 {
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(IntfOnuIDUniID)
+ AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
if AllocID != nil {
return AllocID
}
@@ -576,15 +577,15 @@
}
// RemoveAllocIDForOnu removes the alloc id for given pon interface, onu id, uni id and alloc id
-func (RsrcMgr *OpenOltResourceMgr) RemoveAllocIDForOnu(intfID uint32, onuID uint32, uniID uint32, allocID uint32) {
- allocIDs := RsrcMgr.GetCurrentAllocIDsForOnu(intfID, onuID, uniID)
+func (RsrcMgr *OpenOltResourceMgr) RemoveAllocIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, allocID uint32) {
+ allocIDs := RsrcMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
for i := 0; i < len(allocIDs); i++ {
if allocIDs[i] == allocID {
allocIDs = append(allocIDs[:i], allocIDs[i+1:]...)
break
}
}
- err := RsrcMgr.UpdateAllocIdsForOnu(intfID, onuID, uniID, allocIDs)
+ err := RsrcMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocIDs)
if err != nil {
log.Errorf("Failed to Remove Alloc Id For Onu. IntfID %d onuID %d uniID %d allocID %d",
intfID, onuID, uniID, allocID)
@@ -592,15 +593,15 @@
}
// RemoveGemPortIDForOnu removes the gem port id for given pon interface, onu id, uni id and gem port id
-func (RsrcMgr *OpenOltResourceMgr) RemoveGemPortIDForOnu(intfID uint32, onuID uint32, uniID uint32, gemPortID uint32) {
- gemPortIDs := RsrcMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
+func (RsrcMgr *OpenOltResourceMgr) RemoveGemPortIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortID uint32) {
+ gemPortIDs := RsrcMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
for i := 0; i < len(gemPortIDs); i++ {
if gemPortIDs[i] == gemPortID {
gemPortIDs = append(gemPortIDs[:i], gemPortIDs[i+1:]...)
break
}
}
- err := RsrcMgr.UpdateGEMPortIDsForOnu(intfID, onuID, uniID, gemPortIDs)
+ err := RsrcMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs)
if err != nil {
log.Errorf("Failed to Remove Gem Id For Onu. IntfID %d onuID %d uniID %d gemPortId %d",
intfID, onuID, uniID, gemPortID)
@@ -610,7 +611,7 @@
// UpdateGEMportsPonportToOnuMapOnKVStore updates onu and uni id associated with the gem port to the kv store
// This stored information is used when packet_indication is received and we need to derive the ONU Id for which
// the packet arrived based on the pon_intf and gemport available in the packet_indication
-func (RsrcMgr *OpenOltResourceMgr) UpdateGEMportsPonportToOnuMapOnKVStore(gemPorts []uint32, PonPort uint32,
+func (RsrcMgr *OpenOltResourceMgr) UpdateGEMportsPonportToOnuMapOnKVStore(ctx context.Context, gemPorts []uint32, PonPort uint32,
onuID uint32, uniID uint32) error {
/* Update onu and uni id associated with the gem port to the kv store. */
@@ -624,7 +625,7 @@
return err
}
- if err = RsrcMgr.KVStore.Put(IntfGEMPortPath, Val); err != nil {
+ if err = RsrcMgr.KVStore.Put(ctx, IntfGEMPortPath, Val); err != nil {
log.Errorf("Failed to update resource %s", IntfGEMPortPath)
return err
}
@@ -633,9 +634,9 @@
}
// RemoveGEMportPonportToOnuMapOnKVStore removes the relationship between the gem port and pon port
-func (RsrcMgr *OpenOltResourceMgr) RemoveGEMportPonportToOnuMapOnKVStore(GemPort uint32, PonPort uint32) {
+func (RsrcMgr *OpenOltResourceMgr) RemoveGEMportPonportToOnuMapOnKVStore(ctx context.Context, GemPort uint32, PonPort uint32) {
IntfGEMPortPath := fmt.Sprintf("%d,%d", PonPort, GemPort)
- err := RsrcMgr.KVStore.Delete(IntfGEMPortPath)
+ err := RsrcMgr.KVStore.Delete(ctx, IntfGEMPortPath)
if err != nil {
log.Errorf("Failed to Remove Gem port-Pon port to onu map on kv store. Gem %d PonPort %d", GemPort, PonPort)
}
@@ -643,7 +644,7 @@
// GetGEMPortID gets gem port id for a particular pon port, onu id and uni id and then update the resource map on
// the KV store with the list of gemport_id allocated for the pon_intf_onu_id tuple
-func (RsrcMgr *OpenOltResourceMgr) GetGEMPortID(ponPort uint32, onuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) GetGEMPortID(ctx context.Context, ponPort uint32, onuID uint32,
uniID uint32, NumOfPorts uint32) ([]uint32, error) {
/* Get gem port id for a particular pon port, onu id
@@ -653,12 +654,12 @@
var err error
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
- GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(IntfOnuIDUniID)
+ GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
if GEMPortList != nil {
return GEMPortList, nil
}
- GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ponPort,
+ GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ctx, ponPort,
ponrmgr.GEMPORT_ID, NumOfPorts)
if err != nil && GEMPortList == nil {
log.Errorf("Failed to get gem port id for %s", IntfOnuIDUniID)
@@ -667,41 +668,41 @@
// update the resource map on KV store with the list of gemport_id
// allocated for the pon_intf_onu_id tuple
- err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(IntfOnuIDUniID,
+ err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(ctx, IntfOnuIDUniID,
GEMPortList)
if err != nil {
log.Errorf("Failed to update GEM ports to kv store for %s", IntfOnuIDUniID)
return nil, err
}
- _ = RsrcMgr.UpdateGEMportsPonportToOnuMapOnKVStore(GEMPortList, ponPort,
+ _ = RsrcMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, GEMPortList, ponPort,
onuID, uniID)
return GEMPortList, err
}
// UpdateGEMPortIDsForOnu updates gemport ids on to the kv store for a given pon port, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) UpdateGEMPortIDsForOnu(ponPort uint32, onuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) UpdateGEMPortIDsForOnu(ctx context.Context, ponPort uint32, onuID uint32,
uniID uint32, GEMPortList []uint32) error {
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
- return RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(IntfOnuIDUniID,
+ return RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(ctx, IntfOnuIDUniID,
GEMPortList)
}
// FreeonuID releases(make free) onu id for a particular pon-port
-func (RsrcMgr *OpenOltResourceMgr) FreeonuID(intfID uint32, onuID []uint32) {
+func (RsrcMgr *OpenOltResourceMgr) FreeonuID(ctx context.Context, intfID uint32, onuID []uint32) {
- RsrcMgr.ResourceMgrs[intfID].FreeResourceID(intfID, ponrmgr.ONU_ID, onuID)
+ RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID, ponrmgr.ONU_ID, onuID)
/* Free onu id for a particular interface.*/
var IntfonuID string
for _, onu := range onuID {
IntfonuID = fmt.Sprintf("%d,%d", intfID, onu)
- RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(IntfonuID)
+ RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfonuID)
}
}
// FreeFlowID returns the free flow id for a given interface, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) FreeFlowID(IntfID uint32, onuID int32,
+func (RsrcMgr *OpenOltResourceMgr) FreeFlowID(ctx context.Context, IntfID uint32, onuID int32,
uniID int32, FlowID uint32) {
var IntfONUID string
var err error
@@ -709,92 +710,92 @@
FlowIds = append(FlowIds, FlowID)
IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
- err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(IntfONUID, FlowID, false)
+ err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(ctx, IntfONUID, FlowID, false)
if err != nil {
log.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfONUID})
}
- RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(IntfONUID, FlowID)
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.FLOW_ID, FlowIds)
+ RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfONUID, FlowID)
+ RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowIds)
}
// FreeFlowIDs releases the flow Ids
-func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(IntfID uint32, onuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(ctx context.Context, IntfID uint32, onuID uint32,
uniID uint32, FlowID []uint32) {
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.FLOW_ID, FlowID)
+ RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowID)
var IntfOnuIDUniID string
var err error
for _, flow := range FlowID {
IntfOnuIDUniID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
- err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(IntfOnuIDUniID, flow, false)
+ err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(ctx, IntfOnuIDUniID, flow, false)
if err != nil {
log.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfOnuIDUniID})
}
- RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(IntfOnuIDUniID, flow)
+ RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfOnuIDUniID, flow)
}
}
// FreeAllocID frees AllocID on the PON resource pool and also frees the allocID association
// for the given OLT device.
-func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(IntfID uint32, onuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(ctx context.Context, IntfID uint32, onuID uint32,
uniID uint32, allocID uint32) {
- RsrcMgr.RemoveAllocIDForOnu(IntfID, onuID, uniID, allocID)
+ RsrcMgr.RemoveAllocIDForOnu(ctx, IntfID, onuID, uniID, allocID)
allocIDs := make([]uint32, 0)
allocIDs = append(allocIDs, allocID)
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.ALLOC_ID, allocIDs)
+ RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.ALLOC_ID, allocIDs)
}
// FreeGemPortID frees GemPortID on the PON resource pool and also frees the gemPortID association
// for the given OLT device.
-func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(IntfID uint32, onuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(ctx context.Context, IntfID uint32, onuID uint32,
uniID uint32, gemPortID uint32) {
- RsrcMgr.RemoveGemPortIDForOnu(IntfID, onuID, uniID, gemPortID)
+ RsrcMgr.RemoveGemPortIDForOnu(ctx, IntfID, onuID, uniID, gemPortID)
gemPortIDs := make([]uint32, 0)
gemPortIDs = append(gemPortIDs, gemPortID)
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
+ RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
}
// FreePONResourcesForONU make the pon resources free for a given pon interface and onu id, and the clears the
// resource map and the onuID associated with (pon_intf_id, gemport_id) tuple,
-func (RsrcMgr *OpenOltResourceMgr) FreePONResourcesForONU(intfID uint32, onuID uint32, uniID uint32) {
+func (RsrcMgr *OpenOltResourceMgr) FreePONResourcesForONU(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- AllocIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(IntfOnuIDUniID)
+ AllocIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
- RsrcMgr.ResourceMgrs[intfID].FreeResourceID(intfID,
+ RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
ponrmgr.ALLOC_ID,
AllocIDs)
- GEMPortIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(IntfOnuIDUniID)
- RsrcMgr.ResourceMgrs[intfID].FreeResourceID(intfID,
+ GEMPortIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
+ RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
ponrmgr.GEMPORT_ID,
GEMPortIDs)
- FlowIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentFlowIDsForOnu(IntfOnuIDUniID)
- RsrcMgr.ResourceMgrs[intfID].FreeResourceID(intfID,
+ FlowIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentFlowIDsForOnu(ctx, IntfOnuIDUniID)
+ RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
ponrmgr.FLOW_ID,
FlowIDs)
// Clear resource map associated with (pon_intf_id, gemport_id) tuple.
- RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(IntfOnuIDUniID)
+ RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
// Clear the ONU Id associated with the (pon_intf_id, gemport_id) tuple.
for _, GEM := range GEMPortIDs {
- _ = RsrcMgr.KVStore.Delete(fmt.Sprintf("%d,%d", intfID, GEM))
+ _ = RsrcMgr.KVStore.Delete(ctx, fmt.Sprintf("%d,%d", intfID, GEM))
}
}
// IsFlowCookieOnKVStore checks if the given flow cookie is present on the kv store
// Returns true if the flow cookie is found, otherwise it returns false
-func (RsrcMgr *OpenOltResourceMgr) IsFlowCookieOnKVStore(ponIntfID uint32, onuID int32, uniID int32,
+func (RsrcMgr *OpenOltResourceMgr) IsFlowCookieOnKVStore(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
flowStoreCookie uint64) bool {
FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(FlowPath)
+ FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(ctx, FlowPath)
if FlowIDs != nil {
log.Debugw("Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "onuID": onuID, "uniID": uniID, "KVpath": FlowPath})
for _, flowID := range FlowIDs {
- FlowInfo := RsrcMgr.GetFlowIDInfo(ponIntfID, int32(onuID), int32(uniID), uint32(flowID))
+ FlowInfo := RsrcMgr.GetFlowIDInfo(ctx, ponIntfID, int32(onuID), int32(uniID), uint32(flowID))
if FlowInfo != nil {
log.Debugw("Found flows", log.Fields{"flows": *FlowInfo, "flowId": flowID})
for _, Info := range *FlowInfo {
@@ -811,10 +812,10 @@
// GetTechProfileIDForOnu fetches Tech-Profile-ID from the KV-Store for the given onu based on the path
// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) GetTechProfileIDForOnu(IntfID uint32, OnuID uint32, UniID uint32) []uint32 {
+func (RsrcMgr *OpenOltResourceMgr) GetTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32) []uint32 {
Path := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
var Data []uint32
- Value, err := RsrcMgr.KVStore.Get(Path)
+ Value, err := RsrcMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
Val, err := kvstore.ToByte(Value.Value)
@@ -837,9 +838,9 @@
// RemoveTechProfileIDsForOnu deletes all tech profile ids from the KV-Store for the given onu based on the path
// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDsForOnu(IntfID uint32, OnuID uint32, UniID uint32) error {
+func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDsForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32) error {
IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
- if err := RsrcMgr.KVStore.Delete(IntfOnuUniID); err != nil {
+ if err := RsrcMgr.KVStore.Delete(ctx, IntfOnuUniID); err != nil {
log.Errorw("Failed to delete techprofile id resource in KV store", log.Fields{"path": IntfOnuUniID})
return err
}
@@ -848,8 +849,8 @@
// RemoveTechProfileIDForOnu deletes a specific tech profile id from the KV-Store for the given onu based on the path
// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDForOnu(IntfID uint32, OnuID uint32, UniID uint32, TpID uint32) error {
- tpIDList := RsrcMgr.GetTechProfileIDForOnu(IntfID, OnuID, UniID)
+func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32, TpID uint32) error {
+ tpIDList := RsrcMgr.GetTechProfileIDForOnu(ctx, IntfID, OnuID, UniID)
for i, tpIDInList := range tpIDList {
if tpIDInList == TpID {
tpIDList = append(tpIDList[:i], tpIDList[i+1:]...)
@@ -861,7 +862,7 @@
log.Error("failed to Marshal")
return err
}
- if err = RsrcMgr.KVStore.Put(IntfOnuUniID, Value); err != nil {
+ if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
log.Errorf("Failed to update resource %s", IntfOnuUniID)
return err
}
@@ -870,14 +871,14 @@
// UpdateTechProfileIDForOnu updates (put) already present tech-profile-id for the given onu based on the path
// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) UpdateTechProfileIDForOnu(IntfID uint32, OnuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) UpdateTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32,
UniID uint32, TpID uint32) error {
var Value []byte
var err error
IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
- tpIDList := RsrcMgr.GetTechProfileIDForOnu(IntfID, OnuID, UniID)
+ tpIDList := RsrcMgr.GetTechProfileIDForOnu(ctx, IntfID, OnuID, UniID)
for _, value := range tpIDList {
if value == TpID {
log.Debugf("TpID %d is already in tpIdList for the path %s", TpID, IntfOnuUniID)
@@ -891,7 +892,7 @@
log.Error("failed to Marshal")
return err
}
- if err = RsrcMgr.KVStore.Put(IntfOnuUniID, Value); err != nil {
+ if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
log.Errorf("Failed to update resource %s", IntfOnuUniID)
return err
}
@@ -900,7 +901,7 @@
// UpdateMeterIDForOnu updates the meter id in the KV-Store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) UpdateMeterIDForOnu(Direction string, IntfID uint32, OnuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) UpdateMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
UniID uint32, TpID uint32, MeterConfig *ofp.OfpMeterConfig) error {
var Value []byte
var err error
@@ -911,7 +912,7 @@
log.Error("failed to Marshal meter config")
return err
}
- if err = RsrcMgr.KVStore.Put(IntfOnuUniID, Value); err != nil {
+ if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
log.Errorf("Failed to store meter into KV store %s", IntfOnuUniID)
return err
}
@@ -920,11 +921,11 @@
// GetMeterIDForOnu fetches the meter id from the kv store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) GetMeterIDForOnu(Direction string, IntfID uint32, OnuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) GetMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
UniID uint32, TpID uint32) (*ofp.OfpMeterConfig, error) {
Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
var meterConfig ofp.OfpMeterConfig
- Value, err := RsrcMgr.KVStore.Get(Path)
+ Value, err := RsrcMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
log.Debug("Found meter in KV store", log.Fields{"Direction": Direction})
@@ -950,10 +951,10 @@
// RemoveMeterIDForOnu deletes the meter id from the kV-Store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) RemoveMeterIDForOnu(Direction string, IntfID uint32, OnuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) RemoveMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
UniID uint32, TpID uint32) error {
Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
- if err := RsrcMgr.KVStore.Delete(Path); err != nil {
+ if err := RsrcMgr.KVStore.Delete(ctx, Path); err != nil {
log.Errorf("Failed to delete meter id %s from kvstore ", Path)
return err
}
@@ -983,11 +984,11 @@
}
//AddGemToOnuGemInfo adds gemport to onugem info kvstore
-func (RsrcMgr *OpenOltResourceMgr) AddGemToOnuGemInfo(intfID uint32, onuID uint32, gemPort uint32) error {
+func (RsrcMgr *OpenOltResourceMgr) AddGemToOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) error {
var onuGemData []OnuGemInfo
var err error
- if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(intfID, &onuGemData); err != nil {
+ if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
log.Errorf("failed to get onuifo for intfid %d", intfID)
return err
}
@@ -1009,7 +1010,7 @@
break
}
}
- err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(intfID, onuGemData)
+ err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(ctx, intfID, onuGemData)
if err != nil {
log.Error("Failed to add onugem to kv store")
return err
@@ -1018,10 +1019,10 @@
}
//GetOnuGemInfo gets onu gem info from the kvstore per interface
-func (RsrcMgr *OpenOltResourceMgr) GetOnuGemInfo(IntfID uint32) ([]OnuGemInfo, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetOnuGemInfo(ctx context.Context, IntfID uint32) ([]OnuGemInfo, error) {
var onuGemData []OnuGemInfo
- if err := RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(IntfID, &onuGemData); err != nil {
+ if err := RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(ctx, IntfID, &onuGemData); err != nil {
log.Errorf("failed to get onuifo for intfid %d", IntfID)
return nil, err
}
@@ -1030,16 +1031,16 @@
}
// AddOnuInfo adds onu info on to the kvstore per interface
-func (RsrcMgr *OpenOltResourceMgr) AddOnuInfo(IntfID uint32, onuGem OnuGemInfo) error {
+func (RsrcMgr *OpenOltResourceMgr) AddOnuInfo(ctx context.Context, IntfID uint32, onuGem OnuGemInfo) error {
var onuGemData []OnuGemInfo
var err error
- if err = RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(IntfID, &onuGemData); err != nil {
+ if err = RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(ctx, IntfID, &onuGemData); err != nil {
log.Errorf("failed to get onuifo for intfid %d", IntfID)
return err
}
onuGemData = append(onuGemData, onuGem)
- err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(IntfID, onuGemData)
+ err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(ctx, IntfID, onuGemData)
if err != nil {
log.Error("Failed to add onugem to kv store")
return err
@@ -1050,11 +1051,11 @@
}
// UpdateOnuInfo updates Onuinfo on the kvstore per interface
-func (RsrcMgr *OpenOltResourceMgr) UpdateOnuInfo(IntfID uint32, onuGem []OnuGemInfo) error {
+func (RsrcMgr *OpenOltResourceMgr) UpdateOnuInfo(ctx context.Context, IntfID uint32, onuGem []OnuGemInfo) error {
var onuGemData []OnuGemInfo
var err error
- err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(IntfID, onuGemData)
+ err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(ctx, IntfID, onuGemData)
if err != nil {
log.Error("Failed to add onugem to kv store")
return err
@@ -1065,11 +1066,11 @@
}
// AddUniPortToOnuInfo adds uni port to the onuinfo kvstore. check if the uni is already present if not update the kv store.
-func (RsrcMgr *OpenOltResourceMgr) AddUniPortToOnuInfo(intfID uint32, onuID uint32, portNo uint32) {
+func (RsrcMgr *OpenOltResourceMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNo uint32) {
var onuGemData []OnuGemInfo
var err error
- if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(intfID, &onuGemData); err != nil {
+ if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
log.Errorf("failed to get onuifo for intfid %d", intfID)
return
}
@@ -1085,7 +1086,7 @@
break
}
}
- err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(intfID, onuGemData)
+ err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(ctx, intfID, onuGemData)
if err != nil {
log.Errorw("Failed to add uin port in onugem to kv store", log.Fields{"uni": portNo})
return
@@ -1094,7 +1095,7 @@
}
//UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno
-func (RsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(pktIn PacketInInfoKey, gemPort uint32) {
+func (RsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort)
Value, err := json.Marshal(gemPort)
@@ -1102,7 +1103,7 @@
log.Error("Failed to marshal data")
return
}
- if err = RsrcMgr.KVStore.Put(path, Value); err != nil {
+ if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
log.Errorw("Failed to put to kvstore", log.Fields{"path": path, "value": gemPort})
return
}
@@ -1112,14 +1113,14 @@
}
// GetGemPortFromOnuPktIn gets the gem port from onu pkt in path, path being intfid, onuid, portno
-func (RsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(intfID uint32, onuID uint32, logicalPort uint32) (uint32, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) (uint32, error) {
var Val []byte
var gemPort uint32
path := fmt.Sprintf(OnuPacketINPath, intfID, onuID, logicalPort)
- value, err := RsrcMgr.KVStore.Get(path)
+ value, err := RsrcMgr.KVStore.Get(ctx, path)
if err != nil {
log.Errorw("Failed to get from kv store", log.Fields{"path": path})
return uint32(0), err
@@ -1142,10 +1143,10 @@
}
// DelGemPortPktIn deletes the gemport from the pkt in path
-func (RsrcMgr *OpenOltResourceMgr) DelGemPortPktIn(intfID uint32, onuID uint32, logicalPort uint32) error {
+func (RsrcMgr *OpenOltResourceMgr) DelGemPortPktIn(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
path := fmt.Sprintf(OnuPacketINPath, intfID, onuID, logicalPort)
- if err := RsrcMgr.KVStore.Delete(path); err != nil {
+ if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
log.Errorf("Falied to remove resource %s", path)
return err
}
@@ -1153,8 +1154,8 @@
}
// DelOnuGemInfoForIntf deletes the onugem info from kvstore per interface
-func (RsrcMgr *OpenOltResourceMgr) DelOnuGemInfoForIntf(intfID uint32) error {
- if err := RsrcMgr.ResourceMgrs[intfID].DelOnuGemInfoForIntf(intfID); err != nil {
+func (RsrcMgr *OpenOltResourceMgr) DelOnuGemInfoForIntf(ctx context.Context, intfID uint32) error {
+ if err := RsrcMgr.ResourceMgrs[intfID].DelOnuGemInfoForIntf(ctx, intfID); err != nil {
log.Errorw("failed to delete onu gem info for", log.Fields{"intfid": intfID})
return err
}
@@ -1162,13 +1163,13 @@
}
//GetNNIFromKVStore gets NNi intfids from kvstore. path being per device
-func (RsrcMgr *OpenOltResourceMgr) GetNNIFromKVStore() ([]uint32, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetNNIFromKVStore(ctx context.Context) ([]uint32, error) {
var nni []uint32
var Val []byte
path := fmt.Sprintf(NnniIntfID)
- value, err := RsrcMgr.KVStore.Get(path)
+ value, err := RsrcMgr.KVStore.Get(ctx, path)
if err != nil {
log.Error("failed to get data from kv store")
return nil, err
@@ -1187,10 +1188,10 @@
}
// AddNNIToKVStore adds Nni interfaces to kvstore, path being per device.
-func (RsrcMgr *OpenOltResourceMgr) AddNNIToKVStore(nniIntf uint32) error {
+func (RsrcMgr *OpenOltResourceMgr) AddNNIToKVStore(ctx context.Context, nniIntf uint32) error {
var Value []byte
- nni, err := RsrcMgr.GetNNIFromKVStore()
+ nni, err := RsrcMgr.GetNNIFromKVStore(ctx)
if err != nil {
log.Error("failed to fetch nni interfaces from kv store")
return err
@@ -1202,7 +1203,7 @@
if err != nil {
log.Error("Failed to marshal data")
}
- if err = RsrcMgr.KVStore.Put(path, Value); err != nil {
+ if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
log.Errorw("Failed to put to kvstore", log.Fields{"path": path, "value": Value})
return err
}
@@ -1211,11 +1212,11 @@
}
// DelNNiFromKVStore deletes nni interface list from kv store.
-func (RsrcMgr *OpenOltResourceMgr) DelNNiFromKVStore() error {
+func (RsrcMgr *OpenOltResourceMgr) DelNNiFromKVStore(ctx context.Context) error {
path := fmt.Sprintf(NnniIntfID)
- if err := RsrcMgr.KVStore.Delete(path); err != nil {
+ if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
log.Errorw("Failed to delete nni interfaces from kv store", log.Fields{"path": path})
return err
}
@@ -1223,11 +1224,11 @@
}
//UpdateFlowIDsForGem updates flow id per gemport
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(intf uint32, gem uint32, flowIDs []uint32) error {
+func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint32) error {
var val []byte
path := fmt.Sprintf(FlowIDsForGem, intf)
- flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(intf)
+ flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(ctx, intf)
if err != nil {
log.Error("Failed to ger flowids for interface", log.Fields{"error": err, "intf": intf})
return err
@@ -1241,7 +1242,7 @@
log.Error("Failed to marshal data", log.Fields{"error": err})
return err
}
- if err = RsrcMgr.KVStore.Put(path, val); err != nil {
+ if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
log.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
return err
}
@@ -1250,11 +1251,11 @@
}
//DeleteFlowIDsForGem deletes the flowID list entry per gem from kvstore.
-func (RsrcMgr *OpenOltResourceMgr) DeleteFlowIDsForGem(intf uint32, gem uint32) {
+func (RsrcMgr *OpenOltResourceMgr) DeleteFlowIDsForGem(ctx context.Context, intf uint32, gem uint32) {
path := fmt.Sprintf(FlowIDsForGem, intf)
var val []byte
- flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(intf)
+ flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(ctx, intf)
if err != nil {
log.Error("Failed to ger flowids for interface", log.Fields{"error": err, "intf": intf})
return
@@ -1271,7 +1272,7 @@
log.Error("Failed to marshal data", log.Fields{"error": err})
return
}
- if err = RsrcMgr.KVStore.Put(path, val); err != nil {
+ if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
log.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
return
}
@@ -1279,12 +1280,12 @@
}
//GetFlowIDsGemMapForInterface gets flowids per gemport and interface
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDsGemMapForInterface(intf uint32) (map[uint32][]uint32, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetFlowIDsGemMapForInterface(ctx context.Context, intf uint32) (map[uint32][]uint32, error) {
path := fmt.Sprintf(FlowIDsForGem, intf)
var flowsForGem map[uint32][]uint32
var val []byte
- value, err := RsrcMgr.KVStore.Get(path)
+ value, err := RsrcMgr.KVStore.Get(ctx, path)
if err != nil {
log.Error("failed to get data from kv store")
return nil, err
@@ -1303,10 +1304,9 @@
}
//DeleteIntfIDGempMapPath deletes the intf id path used to store flow ids per gem to kvstore.
-func (RsrcMgr *OpenOltResourceMgr) DeleteIntfIDGempMapPath(intf uint32) {
-
+func (RsrcMgr *OpenOltResourceMgr) DeleteIntfIDGempMapPath(ctx context.Context, intf uint32) {
path := fmt.Sprintf(FlowIDsForGem, intf)
- if err := RsrcMgr.KVStore.Delete(path); err != nil {
+ if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
log.Errorw("Failed to delete nni interfaces from kv store", log.Fields{"path": path})
return
}
@@ -1314,18 +1314,18 @@
}
// RemoveResourceMap Clear resource map associated with (intfid, onuid, uniid) tuple.
-func (RsrcMgr *OpenOltResourceMgr) RemoveResourceMap(intfID uint32, onuID int32, uniID int32) {
+func (RsrcMgr *OpenOltResourceMgr) RemoveResourceMap(ctx context.Context, intfID uint32, onuID int32, uniID int32) {
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(IntfOnuIDUniID)
+ RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
}
//GetMcastQueuePerInterfaceMap gets multicast queue info per pon interface
-func (RsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap() (map[uint32][]uint32, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap(ctx context.Context) (map[uint32][]uint32, error) {
path := fmt.Sprintf(McastQueuesForIntf)
var mcastQueueToIntfMap map[uint32][]uint32
var val []byte
- kvPair, err := RsrcMgr.KVStore.Get(path)
+ kvPair, err := RsrcMgr.KVStore.Get(ctx, path)
if err != nil {
log.Error("failed to get data from kv store")
return nil, err
@@ -1344,11 +1344,11 @@
}
//AddMcastQueueForIntf adds multicast queue for pon interface
-func (RsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(intf uint32, gem uint32, servicePriority uint32) error {
+func (RsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(ctx context.Context, intf uint32, gem uint32, servicePriority uint32) error {
var val []byte
path := fmt.Sprintf(McastQueuesForIntf)
- mcastQueues, err := RsrcMgr.GetMcastQueuePerInterfaceMap()
+ mcastQueues, err := RsrcMgr.GetMcastQueuePerInterfaceMap(ctx)
if err != nil {
log.Errorw("Failed to get multicast queue info for interface", log.Fields{"error": err, "intf": intf})
return err
@@ -1361,7 +1361,7 @@
log.Errorw("Failed to marshal data", log.Fields{"error": err})
return err
}
- if err = RsrcMgr.KVStore.Put(path, val); err != nil {
+ if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
log.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
return err
}
@@ -1370,7 +1370,7 @@
}
//AddFlowGroupToKVStore adds flow group into KV store
-func (RsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(groupEntry *ofp.OfpGroupEntry, cached bool) error {
+func (RsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(ctx context.Context, groupEntry *ofp.OfpGroupEntry, cached bool) error {
var Value []byte
var err error
var path string
@@ -1400,7 +1400,7 @@
return err
}
- if err = RsrcMgr.KVStore.Put(path, Value); err != nil {
+ if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
log.Errorf("Failed to update resource %s", path)
return err
}
@@ -1408,14 +1408,14 @@
}
//RemoveFlowGroupFromKVStore removes flow group from KV store
-func (RsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(groupID uint32, cached bool) bool {
+func (RsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) bool {
var path string
if cached {
path = fmt.Sprintf(FlowGroupCached, groupID)
} else {
path = fmt.Sprintf(FlowGroup, groupID)
}
- if err := RsrcMgr.KVStore.Delete(path); err != nil {
+ if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
log.Errorf("Failed to remove resource %s due to %s", path, err)
return false
}
@@ -1425,7 +1425,7 @@
//GetFlowGroupFromKVStore fetches flow group from the KV store. Returns (false, {} error) if any problem occurs during
//fetching the data. Returns (true, groupInfo, nil) if the group is fetched successfully.
// Returns (false, {}, nil) if the group does not exists in the KV store.
-func (RsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(groupID uint32, cached bool) (bool, GroupInfo, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (bool, GroupInfo, error) {
var groupInfo GroupInfo
var path string
if cached {
@@ -1433,7 +1433,7 @@
} else {
path = fmt.Sprintf(FlowGroup, groupID)
}
- kvPair, err := RsrcMgr.KVStore.Get(path)
+ kvPair, err := RsrcMgr.KVStore.Get(ctx, path)
if err != nil {
return false, groupInfo, err
}
diff --git a/adaptercore/resourcemanager/resourcemanager_test.go b/adaptercore/resourcemanager/resourcemanager_test.go
index 2cc6953..a5e97db 100644
--- a/adaptercore/resourcemanager/resourcemanager_test.go
+++ b/adaptercore/resourcemanager/resourcemanager_test.go
@@ -24,6 +24,7 @@
package resourcemanager
import (
+ "context"
"encoding/json"
"errors"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
@@ -37,6 +38,7 @@
"strconv"
"strings"
"testing"
+ "time"
)
func init() {
@@ -117,12 +119,12 @@
}
// List function implemented for KVClient.
-func (kvclient *MockResKVClient) List(key string, timeout int) (map[string]*kvstore.KVPair, error) {
+func (kvclient *MockResKVClient) List(ctx context.Context, key string) (map[string]*kvstore.KVPair, error) {
return nil, errors.New("key didn't find")
}
// Get mock function implementation for KVClient
-func (kvclient *MockResKVClient) Get(key string, timeout int) (*kvstore.KVPair, error) {
+func (kvclient *MockResKVClient) Get(ctx context.Context, key string) (*kvstore.KVPair, error) {
log.Debugw("Warning Warning Warning: Get of MockKVClient called", log.Fields{"key": key})
if key != "" {
if strings.Contains(key, MeterConfig) {
@@ -183,7 +185,7 @@
}
// Put mock function implementation for KVClient
-func (kvclient *MockResKVClient) Put(key string, value interface{}, timeout int) error {
+func (kvclient *MockResKVClient) Put(ctx context.Context, key string, value interface{}) error {
if key != "" {
return nil
}
@@ -191,37 +193,37 @@
}
// Delete mock function implementation for KVClient
-func (kvclient *MockResKVClient) Delete(key string, timeout int) error {
+func (kvclient *MockResKVClient) Delete(ctx context.Context, key string) error {
return nil
}
// Reserve mock function implementation for KVClient
-func (kvclient *MockResKVClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
+func (kvclient *MockResKVClient) Reserve(ctx context.Context, key string, value interface{}, ttl int64) (interface{}, error) {
return nil, errors.New("key didn't find")
}
// ReleaseReservation mock function implementation for KVClient
-func (kvclient *MockResKVClient) ReleaseReservation(key string) error {
+func (kvclient *MockResKVClient) ReleaseReservation(ctx context.Context, key string) error {
return nil
}
// ReleaseAllReservations mock function implementation for KVClient
-func (kvclient *MockResKVClient) ReleaseAllReservations() error {
+func (kvclient *MockResKVClient) ReleaseAllReservations(ctx context.Context) error {
return nil
}
// RenewReservation mock function implementation for KVClient
-func (kvclient *MockResKVClient) RenewReservation(key string) error {
+func (kvclient *MockResKVClient) RenewReservation(ctx context.Context, key string) error {
return nil
}
// Watch mock function implementation for KVClient
-func (kvclient *MockResKVClient) Watch(key string) chan *kvstore.Event {
+func (kvclient *MockResKVClient) Watch(ctx context.Context, key string) chan *kvstore.Event {
return nil
}
// AcquireLock mock function implementation for KVClient
-func (kvclient *MockResKVClient) AcquireLock(lockName string, timeout int) error {
+func (kvclient *MockResKVClient) AcquireLock(ctx context.Context, lockName string, timeout int) error {
return nil
}
@@ -231,7 +233,7 @@
}
// IsConnectionUp mock function implementation for KVClient
-func (kvclient *MockResKVClient) IsConnectionUp(timeout int) bool { // timeout in second
+func (kvclient *MockResKVClient) IsConnectionUp(ctx context.Context) bool { // timeout in second
return true
}
@@ -278,7 +280,9 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if got := NewResourceMgr(tt.args.deviceID, tt.args.KVStoreHostPort, tt.args.kvStoreType, tt.args.deviceType, tt.args.devInfo); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if got := NewResourceMgr(ctx, tt.args.deviceID, tt.args.KVStoreHostPort, tt.args.kvStoreType, tt.args.deviceType, tt.args.devInfo); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("NewResourceMgr() = %v, want %v", got, tt.want)
}
})
@@ -296,7 +300,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if err := RsrcMgr.Delete(); (err != nil) && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := RsrcMgr.Delete(ctx); (err != nil) && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
t.Errorf("Delete() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -320,7 +326,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- RsrcMgr.FreeFlowID(tt.args.IntfID, tt.args.onuID, tt.args.uniID, tt.args.FlowID)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ RsrcMgr.FreeFlowID(ctx, tt.args.IntfID, tt.args.onuID, tt.args.uniID, tt.args.FlowID)
})
}
}
@@ -343,7 +351,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- RsrcMgr.FreeFlowIDs(tt.args.IntfID, tt.args.onuID, tt.args.uniID, tt.args.FlowID)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ RsrcMgr.FreeFlowIDs(ctx, tt.args.IntfID, tt.args.onuID, tt.args.uniID, tt.args.FlowID)
})
}
}
@@ -364,7 +374,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- RsrcMgr.FreePONResourcesForONU(tt.args.intfID, tt.args.onuID, tt.args.uniID)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ RsrcMgr.FreePONResourcesForONU(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID)
})
}
}
@@ -384,7 +396,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- RsrcMgr.FreeonuID(tt.args.intfID, tt.args.onuID)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ RsrcMgr.FreeonuID(ctx, tt.args.intfID, tt.args.onuID)
})
}
}
@@ -407,7 +421,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if got := RsrcMgr.GetAllocID(tt.args.intfID, tt.args.onuID, tt.args.uniID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if got := RsrcMgr.GetAllocID(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("GetAllocID() = %v, want %v", got, tt.want)
}
})
@@ -431,7 +447,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if got := RsrcMgr.GetCurrentAllocIDsForOnu(tt.args.intfID, tt.args.onuID, tt.args.uniID); !reflect.DeepEqual(got, tt.want) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if got := RsrcMgr.GetCurrentAllocIDsForOnu(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID); !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetCurrentAllocIDsForOnu() = %v, want %v", got, tt.want)
}
})
@@ -456,7 +474,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if got := RsrcMgr.GetCurrentFlowIDsForOnu(tt.args.PONIntfID, tt.args.ONUID, tt.args.UNIID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if got := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, tt.args.PONIntfID, tt.args.ONUID, tt.args.UNIID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("GetCurrentFlowIDsForOnu() = %v, want %v", got, tt.want)
}
})
@@ -480,7 +500,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if got := RsrcMgr.GetCurrentGEMPortIDsForOnu(tt.args.intfID, tt.args.onuID, tt.args.uniID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if got := RsrcMgr.GetCurrentGEMPortIDsForOnu(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("GetCurrentGEMPortIDsForOnu() = %v, want %v", got, tt.want)
}
})
@@ -511,7 +533,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- got, err := RsrcMgr.GetFlowID(tt.args.ponIntfID, tt.args.ONUID, tt.args.uniID, tt.args.gemportID, tt.args.flowStoreCookie, tt.args.flowCategory, tt.args.vlanPcp...)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ got, err := RsrcMgr.GetFlowID(ctx, tt.args.ponIntfID, tt.args.ONUID, tt.args.uniID, tt.args.gemportID, tt.args.flowStoreCookie, tt.args.flowCategory, tt.args.vlanPcp...)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
t.Errorf("GetFlowID() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -543,7 +567,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- got, err := RsrcMgr.GetGEMPortID(tt.args.ponPort, tt.args.onuID, tt.args.uniID, tt.args.NumOfPorts)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ got, err := RsrcMgr.GetGEMPortID(ctx, tt.args.ponPort, tt.args.onuID, tt.args.uniID, tt.args.NumOfPorts)
if reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
t.Errorf("GetGEMPortID() error = %v, wantErr %v", err, tt.wantErr)
return
@@ -578,7 +604,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- got, err := RsrcMgr.GetMeterIDForOnu(tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID, tt.args.tpID)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ got, err := RsrcMgr.GetMeterIDForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID, tt.args.tpID)
if reflect.TypeOf(got) != reflect.TypeOf(tt.want) && err != nil {
t.Errorf("GetMeterIDForOnu() got = %v, want %v", got, tt.want)
}
@@ -602,7 +630,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- got, err := RsrcMgr.GetONUID(tt.args.ponIntfID)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ got, err := RsrcMgr.GetONUID(ctx, tt.args.ponIntfID)
if got != tt.want && err != nil {
t.Errorf("GetONUID() got = %v, want %v", got, tt.want)
}
@@ -629,7 +659,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if got := RsrcMgr.GetTechProfileIDForOnu(tt.args.IntfID, tt.args.OnuID, tt.args.UniID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if got := RsrcMgr.GetTechProfileIDForOnu(ctx, tt.args.IntfID, tt.args.OnuID, tt.args.UniID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("GetTechProfileIDForOnu() = %v, want %v", got, tt.want)
}
})
@@ -654,7 +686,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if got := RsrcMgr.IsFlowCookieOnKVStore(tt.args.ponIntfID, tt.args.onuID, tt.args.uniID, tt.args.flowStoreCookie); got != tt.want {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if got := RsrcMgr.IsFlowCookieOnKVStore(ctx, tt.args.ponIntfID, tt.args.onuID, tt.args.uniID, tt.args.flowStoreCookie); got != tt.want {
t.Errorf("IsFlowCookieOnKVStore() = %v, want %v", got, tt.want)
}
})
@@ -682,7 +716,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if err := RsrcMgr.RemoveMeterIDForOnu(tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := RsrcMgr.RemoveMeterIDForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
tt.args.tpID); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
t.Errorf("RemoveMeterIDForOnu() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -709,7 +745,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if err := RsrcMgr.RemoveTechProfileIDForOnu(tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := RsrcMgr.RemoveTechProfileIDForOnu(ctx, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
tt.args.tpID); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
t.Errorf("RemoveTechProfileIDForOnu() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -736,7 +774,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if err := RsrcMgr.UpdateAllocIdsForOnu(tt.args.ponPort, tt.args.onuID, tt.args.uniID, tt.args.allocID); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := RsrcMgr.UpdateAllocIdsForOnu(ctx, tt.args.ponPort, tt.args.onuID, tt.args.uniID, tt.args.allocID); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
t.Errorf("UpdateAllocIdsForOnu() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -762,7 +802,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if err := RsrcMgr.UpdateFlowIDInfo(tt.args.ponIntfID, tt.args.onuID, tt.args.uniID, tt.args.flowID, tt.args.flowData); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := RsrcMgr.UpdateFlowIDInfo(ctx, tt.args.ponIntfID, tt.args.onuID, tt.args.uniID, tt.args.flowID, tt.args.flowData); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
t.Errorf("UpdateFlowIDInfo() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -789,7 +831,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if err := RsrcMgr.UpdateGEMPortIDsForOnu(tt.args.ponPort, tt.args.onuID, tt.args.uniID, tt.args.GEMPortList); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := RsrcMgr.UpdateGEMPortIDsForOnu(ctx, tt.args.ponPort, tt.args.onuID, tt.args.uniID, tt.args.GEMPortList); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
t.Errorf("UpdateGEMPortIDsForOnu() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -815,7 +859,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if err := RsrcMgr.UpdateGEMportsPonportToOnuMapOnKVStore(tt.args.gemPorts, tt.args.PonPort,
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := RsrcMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, tt.args.gemPorts, tt.args.PonPort,
tt.args.onuID, tt.args.uniID); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
t.Errorf("UpdateGEMportsPonportToOnuMapOnKVStore() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -844,7 +890,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if err := RsrcMgr.UpdateMeterIDForOnu(tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := RsrcMgr.UpdateMeterIDForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
tt.args.tpID, tt.args.MeterConfig); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
t.Errorf("UpdateMeterIDForOnu() got = %v, want %v", err, tt.wantErr)
}
@@ -871,7 +919,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- if err := RsrcMgr.UpdateTechProfileIDForOnu(tt.args.IntfID, tt.args.OnuID, tt.args.UniID, tt.args.TpID); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := RsrcMgr.UpdateTechProfileIDForOnu(ctx, tt.args.IntfID, tt.args.OnuID, tt.args.UniID, tt.args.TpID); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
t.Errorf("UpdateTechProfileIDForOnu() got = %v, want %v", err, tt.wantErr)
}
})
@@ -1004,7 +1054,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- err := RsrcMgr.AddMcastQueueForIntf(tt.args.intf, tt.args.gem, tt.args.servicePriority)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ err := RsrcMgr.AddMcastQueueForIntf(ctx, tt.args.intf, tt.args.gem, tt.args.servicePriority)
if err != nil {
t.Errorf("%s got err= %s wants nil", tt.name, err)
return
@@ -1054,7 +1106,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- err := RsrcMgr.AddFlowGroupToKVStore(tt.args.group, tt.args.cached)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ err := RsrcMgr.AddFlowGroupToKVStore(ctx, tt.args.group, tt.args.cached)
if err != nil {
t.Errorf("%s got err= %s wants nil", tt.name, err)
return
@@ -1081,7 +1135,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- success := RsrcMgr.RemoveFlowGroupFromKVStore(tt.args.groupID, tt.args.cached)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ success := RsrcMgr.RemoveFlowGroupFromKVStore(ctx, tt.args.groupID, tt.args.cached)
if !success {
t.Errorf("%s got false but wants true", tt.name)
return
@@ -1109,7 +1165,9 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
- exists, groupInfo, err := RsrcMgr.GetFlowGroupFromKVStore(tt.args.groupID, tt.args.cached)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ exists, groupInfo, err := RsrcMgr.GetFlowGroupFromKVStore(ctx, tt.args.groupID, tt.args.cached)
if err != nil {
t.Errorf("%s got error but wants nil error", tt.name)
return