VOL-4077: Improve storage usage on etcd
- Do away with unnecessary data storage on etcd if it can be
reconciled on adapter restart
- For data that needs storage, use lesser footprint if possible
- Use write-through-cache for all data stored on etcd via
resource manager module
- Use ResourceManager module per interface to localize lock
contention per PON port
Change-Id: I21d38216fab195d738a446b3f96a00251569e38b
diff --git a/internal/pkg/core/common.go b/internal/pkg/core/common.go
index f959b7f..43f0047 100644
--- a/internal/pkg/core/common.go
+++ b/internal/pkg/core/common.go
@@ -18,7 +18,7 @@
package core
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 7f2b53e..4243a26 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -35,12 +35,12 @@
"github.com/golang/protobuf/ptypes"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v4/pkg/config"
- "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
- flow_utils "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- "github.com/opencord/voltha-lib-go/v4/pkg/pmmetrics"
+ "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/config"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
+ flow_utils "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/pmmetrics"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
@@ -82,7 +82,9 @@
flowMgr []*OpenOltFlowMgr
groupMgr *OpenOltGroupMgr
eventMgr *OpenOltEventMgr
- resourceMgr *rsrcMgr.OpenOltResourceMgr
+ resourceMgr []*rsrcMgr.OpenOltResourceMgr
+
+ deviceInfo *oop.DeviceInfo
discOnus sync.Map
onus sync.Map
@@ -528,9 +530,6 @@
_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "interface-oper-nni", "device-id": dh.device.Id}, err).Log()
}
}()
- if err := dh.resourceMgr.AddNNIToKVStore(ctx, intfOperInd.GetIntfId()); err != nil {
- logger.Warn(ctx, err)
- }
} else if intfOperInd.GetType() == "pon" {
// TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
// Handle pon port update
@@ -826,29 +825,35 @@
}
func (dh *DeviceHandler) initializeDeviceHandlerModules(ctx context.Context) error {
- deviceInfo, err := dh.populateDeviceInfo(ctx)
+ var err error
+ dh.deviceInfo, err = dh.populateDeviceInfo(ctx)
if err != nil {
return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
}
- dh.totalPonPorts = deviceInfo.GetPonPorts()
- dh.agentPreviouslyConnected = deviceInfo.PreviouslyConnected
+ dh.totalPonPorts = dh.deviceInfo.GetPonPorts()
+ dh.agentPreviouslyConnected = dh.deviceInfo.PreviouslyConnected
- // Instantiate resource manager
- if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, deviceInfo, dh.cm.Backend.PathPrefix); dh.resourceMgr == nil {
- return olterrors.ErrResourceManagerInstantiating
- }
-
- dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
-
+ dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts)
dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
- for i := range dh.flowMgr {
- // Instantiate flow manager
- if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr, uint32(i)); dh.flowMgr[i] == nil {
+ var i uint32
+ for i = 0; i < dh.totalPonPorts; i++ {
+ // Instantiate resource manager
+ if dh.resourceMgr[i] = rsrcMgr.NewResourceMgr(ctx, i, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, dh.deviceInfo, dh.cm.Backend.PathPrefix); dh.resourceMgr[i] == nil {
return olterrors.ErrResourceManagerInstantiating
}
}
-
+ // GroupManager instance is per OLT. But it needs a reference to any instance of resourceMgr to interface with
+ // the KV store to manage mcast group data. Provide the first instance (0th index)
+ if dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0]); dh.groupMgr == nil {
+ return olterrors.ErrGroupManagerInstantiating
+ }
+ for i = 0; i < dh.totalPonPorts; i++ {
+ // Instantiate flow manager
+ if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr[i], dh.groupMgr, i); dh.flowMgr[i] == nil {
+ return olterrors.ErrFlowManagerInstantiating
+ }
+ }
/* TODO: Instantiate Alarm , stats , BW managers */
/* Instantiating Event Manager to handle Alarms and KPIs */
dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
@@ -913,7 +918,7 @@
ports, err := dh.coreProxy.ListDevicePorts(log.WithSpanFromContext(context.Background(), ctx), dh.device.Id)
if err != nil {
- logger.Warnw(ctx, "failed-to-list-ports", log.Fields{"device-id": dh.device.Id, "error": err})
+ logger.Warnw(ctx, "failed-to-list-ports", log.Fields{"device-id": dh.device.Id, "err": err})
continue
}
for _, port := range ports {
@@ -935,10 +940,9 @@
}
logger.Debugw(ctx, "publish-pon-metrics", log.Fields{"pon-port": port.Label})
- //ONU & Gem Stats
- onuGemInfo := dh.flowMgr[intfID].onuGemInfo
- if len(onuGemInfo) != 0 {
- go dh.portStats.collectOnuAndGemStats(ctx, onuGemInfo)
+ onuGemInfoLst := dh.flowMgr[intfID].getOnuGemInfoList()
+ if len(onuGemInfoLst) > 0 {
+ go dh.portStats.collectOnuAndGemStats(ctx, onuGemInfoLst)
}
}
}
@@ -978,6 +982,15 @@
}, nil
}
+// GetInterAdapterTechProfileDownloadMessage fetches the TechProfileDownloadMessage for the caller.
+func (dh *DeviceHandler) GetInterAdapterTechProfileDownloadMessage(ctx context.Context, tpPath string, ponPortNum uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
+ ifID, err := IntfIDFromPonPortNum(ctx, ponPortNum)
+ if err != nil {
+ return nil
+ }
+ return dh.flowMgr[ifID].getTechProfileDownloadMessage(ctx, tpPath, ifID, onuID, uniID)
+}
+
func (dh *DeviceHandler) omciIndication(ctx context.Context, omciInd *oop.OmciIndication) error {
logger.Debugw(ctx, "omci-indication", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
var deviceType string
@@ -1038,44 +1051,47 @@
func (dh *DeviceHandler) ProcessInterAdapterMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
logger.Debugw(ctx, "process-inter-adapter-message", log.Fields{"msgID": msg.Header.Id})
if msg.Header.Type == ic.InterAdapterMessageType_OMCI_REQUEST {
- msgID := msg.Header.Id
- fromTopic := msg.Header.FromTopic
- toTopic := msg.Header.ToTopic
- toDeviceID := msg.Header.ToDeviceId
- proxyDeviceID := msg.Header.ProxyDeviceId
+ return dh.handleInterAdapterOmciMsg(ctx, msg)
+ }
+ return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
+}
- logger.Debugw(ctx, "omci-request-message-header", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
+func (dh *DeviceHandler) handleInterAdapterOmciMsg(ctx context.Context, msg *ic.InterAdapterMessage) error {
+ msgID := msg.Header.Id
+ fromTopic := msg.Header.FromTopic
+ toTopic := msg.Header.ToTopic
+ toDeviceID := msg.Header.ToDeviceId
+ proxyDeviceID := msg.Header.ProxyDeviceId
- msgBody := msg.GetBody()
+ logger.Debugw(ctx, "omci-request-message-header", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- omciMsg := &ic.InterAdapterOmciMessage{}
- if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
- return olterrors.NewErrAdapter("cannot-unmarshal-omci-msg-body", log.Fields{"msgbody": msgBody}, err)
+ msgBody := msg.GetBody()
+
+ omciMsg := &ic.InterAdapterOmciMessage{}
+ if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
+ return olterrors.NewErrAdapter("cannot-unmarshal-omci-msg-body", log.Fields{"msgbody": msgBody}, err)
+ }
+
+ if omciMsg.GetProxyAddress() == nil {
+ onuDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, toDeviceID)
+ if err != nil {
+ return olterrors.NewErrNotFound("onu", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err)
}
-
- if omciMsg.GetProxyAddress() == nil {
- onuDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, toDeviceID)
- if err != nil {
- return olterrors.NewErrNotFound("onu", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
- }
- logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- if err := dh.sendProxiedMessage(ctx, onuDevice, omciMsg); err != nil {
- return olterrors.NewErrCommunication("send-failed", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
- }
- } else {
- logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- if err := dh.sendProxiedMessage(ctx, nil, omciMsg); err != nil {
- return olterrors.NewErrCommunication("send-failed", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
- }
+ logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
+ if err := dh.sendProxiedMessage(ctx, onuDevice, omciMsg); err != nil {
+ return olterrors.NewErrCommunication("send-failed", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err)
}
} else {
- return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
+ logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
+ if err := dh.sendProxiedMessage(ctx, nil, omciMsg); err != nil {
+ return olterrors.NewErrCommunication("send-failed", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err)
+ }
}
return nil
}
@@ -1129,7 +1145,6 @@
if err := dh.flowMgr[intfID].UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
return olterrors.NewErrAdapter("onu-activate-failed", log.Fields{"onu": onuID, "intf-id": intfID}, err)
}
- // TODO: need resource manager
var pir uint32 = 1000000
Onu := oop.Onu{IntfId: intfID, OnuId: uint32(onuID), SerialNumber: serialNum, Pir: pir, OmccEncryption: dh.openOLT.config.OmccEncryption}
if _, err := dh.Client.ActivateOnu(ctx, &Onu); err != nil {
@@ -1182,7 +1197,7 @@
alarmInd.LosStatus = statusCheckOff
go func() {
if err := dh.eventMgr.onuAlarmIndication(ctx, &alarmInd, onuInCache.(*OnuDevice).deviceID, raisedTs); err != nil {
- logger.Debugw(ctx, "indication-failed", log.Fields{"error": err})
+ logger.Debugw(ctx, "indication-failed", log.Fields{"err": err})
}
}()
}
@@ -1220,7 +1235,7 @@
logger.Debugw(ctx, "creating-new-onu", log.Fields{"sn": sn})
// we need to create a new ChildDevice
ponintfid := onuDiscInd.GetIntfId()
- onuID, err = dh.resourceMgr.GetONUID(ctx, ponintfid)
+ onuID, err = dh.resourceMgr[ponintfid].GetONUID(ctx, ponintfid)
logger.Infow(ctx, "creating-new-onu-got-onu-id", log.Fields{"sn": sn, "onuId": onuID})
@@ -1236,13 +1251,13 @@
if onuDevice, err = dh.coreProxy.ChildDeviceDetected(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, int(parentPortNo),
"", int(channelID), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuID)); err != nil {
dh.discOnus.Delete(sn)
- dh.resourceMgr.FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
+ dh.resourceMgr[ponintfid].FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
return olterrors.NewErrAdapter("core-proxy-child-device-detected-failed", log.Fields{
"pon-intf-id": ponintfid,
"serial-number": sn}, err)
}
if err := dh.eventMgr.OnuDiscoveryIndication(ctx, onuDiscInd, dh.device.Id, onuDevice.Id, onuID, sn, time.Now().Unix()); err != nil {
- logger.Warnw(ctx, "discovery-indication-failed", log.Fields{"error": err})
+ logger.Warnw(ctx, "discovery-indication-failed", log.Fields{"err": err})
}
logger.Infow(ctx, "onu-child-device-added",
log.Fields{"onuDevice": onuDevice,
@@ -1341,7 +1356,7 @@
}
if onuInd.OperState == "down" && onuInd.FailReason != oop.OnuIndication_ONU_ACTIVATION_FAIL_REASON_NONE {
if err := dh.eventMgr.onuActivationIndication(ctx, onuActivationFailEvent, onuInd, dh.device.Id, time.Now().Unix()); err != nil {
- logger.Warnw(ctx, "onu-activation-indication-reporting-failed", log.Fields{"error": err})
+ logger.Warnw(ctx, "onu-activation-indication-reporting-failed", log.Fields{"err": err})
}
}
if err := dh.updateOnuStates(ctx, onuDevice, onuInd); err != nil {
@@ -1614,7 +1629,7 @@
//get the child device for the parent device
onuDevices, err := dh.coreProxy.GetChildDevices(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id)
if err != nil {
- logger.Errorw(ctx, "failed-to-get-child-devices-information", log.Fields{"device-id": dh.device.Id, "error": err})
+ logger.Errorw(ctx, "failed-to-get-child-devices-information", log.Fields{"device-id": dh.device.Id, "err": err})
}
if onuDevices != nil {
for _, onuDevice := range onuDevices.Items {
@@ -1679,57 +1694,29 @@
logger.Debugw(ctx, "failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
- tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
+ tpIDList := dh.resourceMgr[onu.IntfID].GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
for _, tpID := range tpIDList {
- if err = dh.resourceMgr.RemoveMeterInfoForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+ if err = dh.resourceMgr[onu.IntfID].RemoveMeterInfoForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.RemoveMeterInfoForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+ if err = dh.resourceMgr[onu.IntfID].RemoveMeterInfoForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
}
- dh.resourceMgr.FreePONResourcesForONU(ctx, onu.IntfID, onu.OnuID, uniID)
- if err = dh.resourceMgr.RemoveTechProfileIDsForOnu(ctx, onu.IntfID, onu.OnuID, uniID); err != nil {
+ dh.resourceMgr[onu.IntfID].FreePONResourcesForONU(ctx, onu.IntfID, onu.OnuID, uniID)
+ if err = dh.resourceMgr[onu.IntfID].RemoveTechProfileIDsForOnu(ctx, onu.IntfID, onu.OnuID, uniID); err != nil {
logger.Debugw(ctx, "failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.DeletePacketInGemPortForOnu(ctx, onu.IntfID, onu.OnuID, port); err != nil {
+ if err = dh.resourceMgr[onu.IntfID].DeletePacketInGemPortForOnu(ctx, onu.IntfID, onu.OnuID, port); err != nil {
logger.Debugw(ctx, "failed-to-remove-gemport-pkt-in", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
}
- if err = dh.resourceMgr.RemoveAllFlowsForIntfOnuUniKey(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID)); err != nil {
- logger.Debugw(ctx, "failed-to-remove-flow-for", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
- }
}
return nil
}
-func (dh *DeviceHandler) clearNNIData(ctx context.Context) error {
- nniUniID := -1
- nniOnuID := -1
-
- if dh.resourceMgr == nil {
- return olterrors.NewErrNotFound("resource-manager", log.Fields{"device-id": dh.device.Id}, nil)
- }
- //Free the flow-ids for the NNI port
- nni, err := dh.resourceMgr.GetNNIFromKVStore(ctx)
- if err != nil {
- return olterrors.NewErrPersistence("get", "nni", 0, nil, err)
- }
- logger.Debugw(ctx, "nni-", log.Fields{"nni": nni})
- for _, nniIntfID := range nni {
- dh.resourceMgr.RemoveResourceMap(ctx, nniIntfID, int32(nniOnuID), int32(nniUniID))
- _ = dh.resourceMgr.RemoveAllFlowsForIntfOnuUniKey(ctx, nniIntfID, -1, -1)
-
- }
- if err = dh.resourceMgr.DelNNiFromKVStore(ctx); err != nil {
- return olterrors.NewErrPersistence("clear", "nni", 0, nil, err)
- }
-
- return nil
-}
-
// DeleteDevice deletes the device instance from openolt handler array. Also clears allocated resource manager resources. Also reboots the OLT hardware!
func (dh *DeviceHandler) DeleteDevice(ctx context.Context, device *voltha.Device) error {
logger.Debug(ctx, "function-entry-delete-device")
@@ -1767,13 +1754,8 @@
if dh.resourceMgr != nil {
var ponPort uint32
for ponPort = 0; ponPort < dh.totalPonPorts; ponPort++ {
- var onuGemData []rsrcMgr.OnuGemInfo
- err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ctx, ponPort, &onuGemData)
- if err != nil {
- _ = olterrors.NewErrNotFound("onu", log.Fields{
- "device-id": dh.device.Id,
- "pon-port": ponPort}, err).Log()
- }
+ var err error
+ onuGemData := dh.flowMgr[ponPort].getOnuGemInfoList()
for i, onu := range onuGemData {
onuID := make([]uint32, 1)
logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
@@ -1782,31 +1764,22 @@
}
// Clear flowids for gem cache.
for _, gem := range onu.GemPorts {
- dh.resourceMgr.DeleteFlowIDsForGem(ctx, ponPort, gem)
+ dh.resourceMgr[ponPort].DeleteFlowIDsForGem(ctx, ponPort, gem)
}
onuID[0] = onu.OnuID
- dh.resourceMgr.FreeonuID(ctx, ponPort, onuID)
+ dh.resourceMgr[ponPort].FreeonuID(ctx, ponPort, onuID)
+ err = dh.resourceMgr[ponPort].DelOnuGemInfo(ctx, ponPort, onu.OnuID)
+ if err != nil {
+ logger.Errorw(ctx, "failed-to-update-onugem-info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
+ }
}
- dh.resourceMgr.DeleteIntfIDGempMapPath(ctx, ponPort)
- onuGemData = nil
- err = dh.resourceMgr.DelOnuGemInfoForIntf(ctx, ponPort)
- if err != nil {
- logger.Errorw(ctx, "failed-to-update-onugem-info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
- }
+ /* Clear the resource pool for each PON port in the background */
+ go func(ponPort uint32) {
+ if err := dh.resourceMgr[ponPort].Delete(ctx, ponPort); err != nil {
+ logger.Debug(ctx, err)
+ }
+ }(ponPort)
}
- /* Clear the flows from KV store associated with NNI port.
- There are mostly trap rules from NNI port (like LLDP)
- */
- if err := dh.clearNNIData(ctx); err != nil {
- logger.Errorw(ctx, "failed-to-clear-data-for-NNI-port", log.Fields{"device-id": dh.device.Id})
- }
-
- /* Clear the resource pool for each PON port in the background */
- go func() {
- if err := dh.resourceMgr.Delete(ctx); err != nil {
- logger.Debug(ctx, err)
- }
- }()
}
/*Delete ONU map for the device*/
@@ -2190,56 +2163,48 @@
}
onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
+ //clear PON resources associated with ONU
+ onuGem, err := dh.resourceMgr[intfID].GetOnuGemInfo(ctx, intfID, onuID)
+ if err != nil || onuGem == nil || onuGem.OnuID != onuID {
+ logger.Warnw(ctx, "failed-to-get-onu-info-for-pon-port", log.Fields{
+ "device-id": dh.device.Id,
+ "intf-id": intfID,
+ "onuID": onuID,
+ "err": err})
+ } else {
+ logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
+ if err := dh.clearUNIData(ctx, onuGem); err != nil {
+ logger.Warnw(ctx, "failed-to-clear-uni-data-for-onu", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device": onu,
+ "err": err})
+ }
+ // Clear flowids for gem cache.
+ for _, gem := range onuGem.GemPorts {
+ dh.resourceMgr[intfID].DeleteFlowIDsForGem(ctx, intfID, gem)
+ }
+ err := dh.resourceMgr[intfID].DelOnuGemInfo(ctx, intfID, onuID)
+ if err != nil {
+ logger.Warnw(ctx, "persistence-update-onu-gem-info-failed", log.Fields{
+ "intf-id": intfID,
+ "onu-device": onu,
+ "onu-gem": onuGem,
+ "err": err})
+ //Not returning error on cleanup.
+ }
+ logger.Debugw(ctx, "removed-onu-gem-info", log.Fields{"intf": intfID, "onu-device": onu, "onugem": onuGem})
+ dh.resourceMgr[intfID].FreeonuID(ctx, intfID, []uint32{onuGem.OnuID})
+ }
+ dh.onus.Delete(onuKey)
+ dh.discOnus.Delete(onuSn)
+
+ // Now clear the ONU on the OLT
if _, err := dh.Client.DeleteOnu(log.WithSpanFromContext(context.Background(), ctx), onu); err != nil {
return olterrors.NewErrAdapter("failed-to-delete-onu", log.Fields{
"device-id": dh.device.Id,
"onu-id": onuID}, err).Log()
}
- //clear PON resources associated with ONU
- var onuGemData []rsrcMgr.OnuGemInfo
- if onuMgr, ok := dh.resourceMgr.ResourceMgrs[intfID]; !ok {
- logger.Warnw(ctx, "failed-to-get-resource-manager-for-interface-Id", log.Fields{
- "device-id": dh.device.Id,
- "intf-id": intfID})
- } else {
- if err := onuMgr.GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
- logger.Warnw(ctx, "failed-to-get-onu-info-for-pon-port", log.Fields{
- "device-id": dh.device.Id,
- "intf-id": intfID,
- "error": err})
- } else {
- for i, onu := range onuGemData {
- if onu.OnuID == onuID && onu.SerialNumber == onuSn {
- logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
- if err := dh.clearUNIData(ctx, &onuGemData[i]); err != nil {
- logger.Warnw(ctx, "failed-to-clear-uni-data-for-onu", log.Fields{
- "device-id": dh.device.Id,
- "onu-device": onu,
- "error": err})
- }
- // Clear flowids for gem cache.
- for _, gem := range onu.GemPorts {
- dh.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gem)
- }
- onuGemData = append(onuGemData[:i], onuGemData[i+1:]...)
- err := onuMgr.AddOnuGemInfo(ctx, intfID, onuGemData)
- if err != nil {
- logger.Warnw(ctx, "persistence-update-onu-gem-info-failed", log.Fields{
- "intf-id": intfID,
- "onu-device": onu,
- "onu-gem": onuGemData,
- "error": err})
- //Not returning error on cleanup.
- }
- logger.Debugw(ctx, "removed-onu-gem-info", log.Fields{"intf": intfID, "onu-device": onu, "onugem": onuGemData})
- dh.resourceMgr.FreeonuID(ctx, intfID, []uint32{onu.OnuID})
- break
- }
- }
- }
- }
- dh.onus.Delete(onuKey)
- dh.discOnus.Delete(onuSn)
+
return nil
}
@@ -2334,7 +2299,7 @@
/*
resp, err = dh.Client.GetValue(ctx, valueparam)
if err != nil {
- logger.Errorw("error-while-getValue", log.Fields{"DeviceID": dh.device, "onu-id": onuid, "error": err})
+ logger.Errorw("error-while-getValue", log.Fields{"DeviceID": dh.device, "onu-id": onuid, "err": err})
return nil, err
}
*/
@@ -2429,6 +2394,7 @@
// Step2 : Push the McastFlowOrGroupControlBlock to appropriate channel
// Step3 : Wait on response channel for response
// Step4 : Return error value
+ startTime := time.Now()
logger.Debugw(ctx, "process-flow-or-group", log.Fields{"flow": flow, "group": group, "action": action})
errChan := make(chan error)
var groupID uint32
@@ -2451,7 +2417,7 @@
dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
// Wait for handler to return error value
err := <-errChan
- logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"flow": flow, "group": group, "action": action, "err": err})
+ logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
return err
}
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 1cae8b6..d69be1a 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -19,8 +19,7 @@
import (
"context"
- conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ conf "github.com/opencord/voltha-lib-go/v5/pkg/config"
"net"
"reflect"
"sync"
@@ -29,11 +28,11 @@
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- "github.com/opencord/voltha-lib-go/v4/pkg/pmmetrics"
- ponrmgr "github.com/opencord/voltha-lib-go/v4/pkg/ponresourcemanager"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/pmmetrics"
+ ponrmgr "github.com/opencord/voltha-lib-go/v5/pkg/ponresourcemanager"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
@@ -169,16 +168,17 @@
}}
deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: oopRanges, Model: "openolt", DeviceId: dh.device.Id, PonPorts: NumPonPorts}
- rsrMgr := resourcemanager.OpenOltResourceMgr{DeviceID: dh.device.Id, DeviceType: dh.device.Type, DevInfo: deviceInf,
- KVStore: &db.Backend{
- Client: &mocks.MockKVClient{},
- }}
- rsrMgr.AllocIDMgmtLock = make([]sync.RWMutex, deviceInf.PonPorts)
- rsrMgr.GemPortIDMgmtLock = make([]sync.RWMutex, deviceInf.PonPorts)
- rsrMgr.OnuIDMgmtLock = make([]sync.RWMutex, deviceInf.PonPorts)
+ dh.deviceInfo = deviceInf
+ dh.resourceMgr = make([]*resourcemanager.OpenOltResourceMgr, deviceInf.PonPorts)
+ var i uint32
+ for i = 0; i < deviceInf.PonPorts; i++ {
+ dh.resourceMgr[i] = &resourcemanager.OpenOltResourceMgr{DeviceID: dh.device.Id, DeviceType: dh.device.Type, DevInfo: deviceInf,
+ KVStore: &db.Backend{
+ Client: &mocks.MockKVClient{},
+ }}
+ dh.resourceMgr[i].InitLocalCache()
+ }
- dh.resourceMgr = &rsrMgr
- dh.resourceMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
ranges := make(map[string]interface{})
sharedIdxByType := make(map[string]string)
sharedIdxByType["ALLOC_ID"] = "ALLOC_ID"
@@ -195,13 +195,6 @@
ranges["flow_id_shared"] = uint32(0)
ponmgr := &ponrmgr.PONResourceManager{}
-
- ctx := context.TODO()
- tpMgr, err := tp.NewTechProfile(ctx, ponmgr, "etcd", "127.0.0.1", "/")
- if err != nil {
- logger.Fatal(ctx, err.Error())
- }
-
ponmgr.DeviceID = "onu-1"
ponmgr.IntfIDs = []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
ponmgr.KVStore = &db.Backend{
@@ -209,19 +202,28 @@
}
ponmgr.PonResourceRanges = ranges
ponmgr.SharedIdxByType = sharedIdxByType
+ ponmgr.Technology = "XGS-PON"
+ for i = 0; i < deviceInf.PonPorts; i++ {
+ dh.resourceMgr[i].PonRsrMgr = ponmgr
+ }
+
+ /*
+ tpMgr, err := tp.NewTechProfile(ctx, ponmgr, "etcd", "127.0.0.1", "/")
+ if err != nil {
+ logger.Fatal(ctx, err.Error())
+ }
+ */
+ tpMgr := &mocks.MockTechProfile{TpID: 64}
ponmgr.TechProfileMgr = tpMgr
- for i := 0; i < NumPonPorts; i++ {
- dh.resourceMgr.ResourceMgrs[uint32(i)] = ponmgr
- }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
+ dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0])
dh.totalPonPorts = NumPonPorts
dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
- for i := 0; i < int(dh.totalPonPorts); i++ {
+ for i = 0; i < dh.totalPonPorts; i++ {
// Instantiate flow manager
- if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr, uint32(i)); dh.flowMgr[i] == nil {
+ if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr[i], dh.groupMgr, uint32(i)); dh.flowMgr[i] == nil {
return nil
}
}
@@ -443,18 +445,18 @@
var err error
if marshalledData, err = ptypes.MarshalAny(body); err != nil {
- logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"error": err})
+ logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"err": err})
}
var marshalledData1 *any.Any
if marshalledData1, err = ptypes.MarshalAny(body2); err != nil {
- logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"error": err})
+ logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"err": err})
}
var marshalledData2 *any.Any
if marshalledData2, err = ptypes.MarshalAny(body3); err != nil {
- logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"error": err})
+ logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"err": err})
}
type args struct {
msg *ic.InterAdapterMessage
diff --git a/internal/pkg/core/olt_platform.go b/internal/pkg/core/olt_platform.go
index b85dd4b..9526ee7 100644
--- a/internal/pkg/core/olt_platform.go
+++ b/internal/pkg/core/olt_platform.go
@@ -20,8 +20,8 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
@@ -112,6 +112,10 @@
minNniIntPortNum = (1 << bitsforNNIID)
// maxNniPortNum is used to store the maximum range of nni port number ((1 << 21)-1) 2097151
maxNniPortNum = ((1 << (bitsforNNIID + 1)) - 1)
+ // minPonIntfPortNum stores the minimum pon port number
+ minPonIntfPortNum = ponIntfMarkerValue << ponIntfMarkerPos
+ // maxPonIntfPortNum stores the maximum pon port number
+ maxPonIntfPortNum = (ponIntfMarkerValue << ponIntfMarkerPos) | (2 ^ bitsForPONID - 1)
)
//MinUpstreamPortID value
@@ -177,6 +181,15 @@
return (portNum & 0xFFFF), nil
}
+//IntfIDFromPonPortNum returns Intf ID derived from portNum
+func IntfIDFromPonPortNum(ctx context.Context, portNum uint32) (uint32, error) {
+ if portNum < minPonIntfPortNum || portNum > maxPonIntfPortNum {
+ logger.Errorw(ctx, "ponportnumber-is-not-in-valid-range", log.Fields{"portnum": portNum})
+ return uint32(0), olterrors.ErrInvalidPortRange
+ }
+ return (portNum & 0x7FFF), nil
+}
+
//IntfIDToPortTypeName returns port type derived from the intfId
func IntfIDToPortTypeName(intfID uint32) voltha.Port_PortType {
if ((ponIntfMarkerValue << ponIntfMarkerPos) ^ intfID) < MaxPonsPerOlt {
diff --git a/internal/pkg/core/olt_platform_test.go b/internal/pkg/core/olt_platform_test.go
index 4c89eaa..6ff32db 100644
--- a/internal/pkg/core/olt_platform_test.go
+++ b/internal/pkg/core/olt_platform_test.go
@@ -19,7 +19,7 @@
import (
"context"
- fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
+ fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
diff --git a/internal/pkg/core/olt_state_transitions.go b/internal/pkg/core/olt_state_transitions.go
index e67bd43..5fab1d5 100644
--- a/internal/pkg/core/olt_state_transitions.go
+++ b/internal/pkg/core/olt_state_transitions.go
@@ -22,7 +22,7 @@
"reflect"
"runtime"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
// DeviceState OLT Device state
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index 436d67a..33b3f1b 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -22,11 +22,11 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
- conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
- "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+ conf "github.com/opencord/voltha-lib-go/v5/pkg/config"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"github.com/opencord/voltha-protos/v4/go/extension"
@@ -159,6 +159,20 @@
return olterrors.NewErrNotFound("device-handler", log.Fields{"device-id": targetDevice}, nil)
}
+//Process_tech_profile_instance_request processes tech profile request message from onu adapter
+func (oo *OpenOLT) Process_tech_profile_instance_request(ctx context.Context, msg *ic.InterAdapterTechProfileInstanceRequestMessage) *ic.InterAdapterTechProfileDownloadMessage {
+ logger.Debugw(ctx, "Process_tech_profile_instance_request", log.Fields{"tpPath": msg.TpInstancePath})
+ targetDeviceID := msg.ParentDeviceId // Request?
+ if targetDeviceID == "" {
+ logger.Error(ctx, "device-id-nil")
+ return nil
+ }
+ if handler := oo.getDeviceHandler(targetDeviceID); handler != nil {
+ return handler.GetInterAdapterTechProfileDownloadMessage(ctx, msg.TpInstancePath, msg.ParentPonPort, msg.OnuId, msg.UniId)
+ }
+ return nil
+}
+
//Adapter_descriptor not implemented
func (oo *OpenOLT) Adapter_descriptor(ctx context.Context) error {
return olterrors.ErrNotImplemented
@@ -407,7 +421,7 @@
if handler := oo.getDeviceHandler(deviceID); handler != nil {
if resp, err = handler.getExtValue(ctx, device, valueparam); err != nil {
logger.Errorw(ctx, "error-occurred-during-get-ext-value", log.Fields{"device-id": deviceID, "onu-id": device.Id,
- "error": err})
+ "err": err})
return nil, err
}
}
diff --git a/internal/pkg/core/openolt_eventmgr.go b/internal/pkg/core/openolt_eventmgr.go
index 336592c..1b5abd1 100644
--- a/internal/pkg/core/openolt_eventmgr.go
+++ b/internal/pkg/core/openolt_eventmgr.go
@@ -23,8 +23,8 @@
"fmt"
"strconv"
- "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"github.com/opencord/voltha-protos/v4/go/common"
oop "github.com/opencord/voltha-protos/v4/go/openolt"
@@ -660,7 +660,7 @@
func (em *OpenOltEventMgr) oltIntfOperIndication(ctx context.Context, ifindication *oop.IntfOperIndication, deviceID string, raisedTs int64) {
portNo := IntfIDToPortNo(ifindication.IntfId, voltha.Port_PON_OLT)
if port, err := em.handler.coreProxy.GetDevicePort(ctx, deviceID, portNo); err != nil {
- logger.Warnw(ctx, "Error while fetching port object", log.Fields{"device-id": deviceID, "error": err})
+ logger.Warnw(ctx, "Error while fetching port object", log.Fields{"device-id": deviceID, "err": err})
} else if port.AdminState != common.AdminState_ENABLED {
logger.Debugw(ctx, "port-disable/enable-event-not-generated--the-port-is-not-enabled-by-operator", log.Fields{"device-id": deviceID, "port": port})
return
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 4ae86ac..9c1cb09 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -22,14 +22,15 @@
"encoding/hex"
"errors"
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/meters"
+ "github.com/opencord/voltha-lib-go/v5/pkg/meters"
"strconv"
"strings"
"sync"
+ "time"
- "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ tp "github.com/opencord/voltha-lib-go/v5/pkg/techprofile"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
"github.com/opencord/voltha-protos/v4/go/common"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
@@ -154,11 +155,6 @@
pbit1 = '1'
)
-type gemPortKey struct {
- intfID uint32
- gemPort uint32
-}
-
type schedQueue struct {
direction tp_pb.Direction
intfID uint32
@@ -186,15 +182,6 @@
gemToAes map[uint32]bool
}
-// subscriberDataPathFlowIDKey is key to subscriberDataPathFlowIDMap map
-type subscriberDataPathFlowIDKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
- direction string
- tpID uint32
-}
-
// This control block is created per flow add/remove and pushed on the incomingFlows channel slice
// The flowControlBlock is then picked by the perOnuFlowHandlerRoutine for further processing.
// There is on perOnuFlowHandlerRoutine routine per ONU that constantly monitors for any incoming
@@ -210,37 +197,28 @@
//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
type OpenOltFlowMgr struct {
ponPortIdx uint32 // Pon Port this FlowManager is responsible for
- techprofile map[uint32]tp.TechProfileIf
+ techprofile tp.TechProfileIf
deviceHandler *DeviceHandler
grpMgr *OpenOltGroupMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
- onuIdsLock sync.RWMutex // TODO: Do we need this?
-
- flowsUsedByGemPort map[uint32][]uint64 // gem port id to flow ids
- flowsUsedByGemPortKey sync.RWMutex // lock to be used to access the flowsUsedByGemPort map
+ gemToFlowIDs map[uint32][]uint64 // gem port id to flow ids
+ gemToFlowIDsKey sync.RWMutex // lock to be used to access the gemToFlowIDs map
packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
packetInGemPortLock sync.RWMutex
// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
- onuGemInfo []rsrcMgr.OnuGemInfo //onu, gem and uni info local cache
+ onuGemInfoMap map[uint32]*rsrcMgr.OnuGemInfo //onu, gem and uni info local cache -> map of onuID to OnuGemInfo
// We need to have a global lock on the onuGemInfo map
onuGemInfoLock sync.RWMutex
- // Map of voltha flowID associated with subscriberDataPathFlowIDKey
- // This information is not persisted on Kv store and hence should be reconciled on adapter restart
- subscriberDataPathFlowIDMap map[subscriberDataPathFlowIDKey]uint64
- subscriberDataPathFlowIDMapLock sync.RWMutex
+ flowIDToGems map[uint64][]uint32
+ flowIDToGemsLock sync.RWMutex
// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
incomingFlows []chan flowControlBlock
-
- //this map keeps uni port info by gem and pon port. This relation shall be used for packet-out operations
- gemToUniMap map[gemPortKey][]uint32
- //We need to have a global lock on the gemToUniLock map
- gemToUniLock sync.RWMutex
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -248,23 +226,18 @@
logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
var flowMgr OpenOltFlowMgr
var err error
- var idx uint32
flowMgr.deviceHandler = dh
+ flowMgr.ponPortIdx = ponPortIdx
flowMgr.grpMgr = grpMgr
flowMgr.resourceMgr = rMgr
- flowMgr.techprofile = make(map[uint32]tp.TechProfileIf)
if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
- logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"error": err})
+ logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
return nil
}
- flowMgr.onuIdsLock = sync.RWMutex{}
- flowMgr.flowsUsedByGemPort = make(map[uint32][]uint64)
+ flowMgr.gemToFlowIDs = make(map[uint32][]uint64)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
- flowMgr.packetInGemPortLock = sync.RWMutex{}
- flowMgr.onuGemInfoLock = sync.RWMutex{}
- flowMgr.subscriberDataPathFlowIDMap = make(map[subscriberDataPathFlowIDKey]uint64)
- flowMgr.subscriberDataPathFlowIDMapLock = sync.RWMutex{}
+ flowMgr.flowIDToGems = make(map[uint64][]uint32)
// Create a slice of buffered channels for handling concurrent flows per ONU.
// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
@@ -276,54 +249,35 @@
// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
}
-
+ flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
//Load the onugem info cache from kv store on flowmanager start
- if flowMgr.onuGemInfo, err = rMgr.GetOnuGemInfo(ctx, ponPortIdx); err != nil {
- logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
+ onuIDStart := flowMgr.deviceHandler.deviceInfo.OnuIdStart
+ onuIDEnd := flowMgr.deviceHandler.deviceInfo.OnuIdEnd
+ for onuID := onuIDStart; onuID <= onuIDEnd; onuID++ {
+ // check for a valid serial number in onuGem as GetOnuGemInfo can return nil error in case of nothing found in the path.
+ onugem, err := rMgr.GetOnuGemInfo(ctx, onuID, ponPortIdx)
+ if err == nil && onugem != nil && onugem.SerialNumber != "" {
+ flowMgr.onuGemInfoMap[onuID] = onugem
+ }
}
- //Load flowID list per gem map per interface from the kvstore.
- flowMgr.loadFlowIDlistForGem(ctx, idx)
+
+ //Load flowID list per gem map And gemIDs per flow per interface from the kvstore.
+ flowMgr.loadFlowIDsForGemAndGemIDsForFlow(ctx)
+
//load interface to multicast queue map from kv store
-
- flowMgr.gemToUniMap = make(map[gemPortKey][]uint32)
- flowMgr.gemToUniLock = sync.RWMutex{}
-
flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
- flowMgr.reconcileSubscriberDataPathFlowIDMap(ctx)
logger.Info(ctx, "initialization-of-flow-manager-success")
return &flowMgr
}
-// toGemToUniMap adds uni info consisting of onu and uni ID to the map and associates it with a gem port
-func (f *OpenOltFlowMgr) toGemToUniMap(ctx context.Context, gemPK gemPortKey, onuID uint32, uniID uint32) {
- f.gemToUniLock.Lock()
- f.gemToUniMap[gemPK] = []uint32{onuID, uniID}
- f.gemToUniLock.Unlock()
-}
-
-// fromGemToUniMap returns onu and uni ID associated with the given key
-func (f *OpenOltFlowMgr) fromGemToUniMap(key gemPortKey) ([]uint32, bool) {
- f.gemToUniLock.RLock()
- defer f.gemToUniLock.RUnlock()
- val, ok := f.gemToUniMap[key]
- return val, ok
-}
-
-// removeFromGemToUniMap removes an entry associated with the given key from gemToUniMap
-func (f *OpenOltFlowMgr) removeFromGemToUniMap(key gemPortKey) {
- f.gemToUniLock.Lock()
- defer f.gemToUniLock.Unlock()
- delete(f.gemToUniMap, key)
-}
-
func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
// Flow is not replicated in this case, we need to register the flow for a single gem-port
- return f.registerFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
+ return f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
} else if deviceFlow.ReplicateFlow && len(deviceFlow.PbitToGemport) > 0 {
// Flow is replicated in this case. We need to register the flow for all the gem-ports it is replicated to.
for _, gemPort := range deviceFlow.PbitToGemport {
- if err := f.registerFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
+ if err := f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
return err
}
}
@@ -331,15 +285,26 @@
return nil
}
-func (f *OpenOltFlowMgr) registerFlowIDForGem(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
- f.flowsUsedByGemPortKey.Lock()
- flowIDList, ok := f.flowsUsedByGemPort[gemPortID]
+func (f *OpenOltFlowMgr) registerFlowIDForGemAndGemIDForFlow(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
+ // update gem->flows map
+ f.gemToFlowIDsKey.Lock()
+ flowIDList, ok := f.gemToFlowIDs[gemPortID]
if !ok {
flowIDList = []uint64{flowFromCore.Id}
+ } else {
+ flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
}
- flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
- f.flowsUsedByGemPort[gemPortID] = flowIDList
- f.flowsUsedByGemPortKey.Unlock()
+ f.gemToFlowIDs[gemPortID] = flowIDList
+ f.gemToFlowIDsKey.Unlock()
+
+ // update flow->gems map
+ f.flowIDToGemsLock.Lock()
+ if _, ok := f.flowIDToGems[flowFromCore.Id]; !ok {
+ f.flowIDToGems[flowFromCore.Id] = []uint32{gemPortID}
+ } else {
+ f.flowIDToGems[flowFromCore.Id] = appendUnique32bit(f.flowIDToGems[flowFromCore.Id], gemPortID)
+ }
+ f.flowIDToGemsLock.Unlock()
// update the flowids for a gem to the KVstore
return f.resourceMgr.UpdateFlowIDsForGem(ctx, accessIntfID, gemPortID, flowIDList)
@@ -452,7 +417,7 @@
if meterInfo != nil {
logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
- if meterInfo.MeterConfig.MeterId == sq.meterID {
+ if meterInfo.MeterID == sq.meterID {
if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true); err != nil {
return err
}
@@ -460,7 +425,7 @@
}
return olterrors.NewErrInvalidValue(log.Fields{
"unsupported": "meter-id",
- "kv-store-meter-id": meterInfo.MeterConfig.MeterId,
+ "kv-store-meter-id": meterInfo.MeterID,
"meter-id-in-flow": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
}
@@ -472,18 +437,9 @@
"device-id": f.deviceHandler.device.Id})
if sq.direction == tp_pb.Direction_UPSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetUsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+ SchedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
} else if sq.direction == tp_pb.Direction_DOWNSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetDsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
- }
-
- if err != nil {
- return olterrors.NewErrNotFound("scheduler-config",
- log.Fields{
- "intf-id": sq.intfID,
- "direction": sq.direction,
- "tp-inst": sq.tpInst,
- "device-id": f.deviceHandler.device.Id}, err)
+ SchedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
}
found := false
@@ -491,13 +447,10 @@
if sq.flowMetadata != nil {
for _, meter := range sq.flowMetadata.Meters {
if sq.meterID == meter.MeterId {
- meterInfo.MeterConfig = ofp.OfpMeterConfig{}
- meterInfo.MeterConfig.MeterId = meter.MeterId
- meterInfo.MeterConfig.Flags = meter.Flags
+ meterInfo.MeterID = meter.MeterId
meterInfo.RefCnt = 1 // initialize it to 1, since this is the first flow that referenced the meter id.
- meterInfo.MeterConfig.Bands = append(meterInfo.MeterConfig.Bands, meter.Bands...)
logger.Debugw(ctx, "found-meter-config-from-flowmetadata",
- log.Fields{"meterConfig": meterInfo.MeterConfig,
+ log.Fields{"meter": meter,
"device-id": f.deviceHandler.device.Id})
found = true
break
@@ -515,14 +468,14 @@
}
var TrafficShaping *tp_pb.TrafficShapingInfo
- if TrafficShaping, err = meters.GetTrafficShapingInfo(ctx, &meterInfo.MeterConfig); err != nil {
+ if TrafficShaping, err = meters.GetTrafficShapingInfo(ctx, sq.flowMetadata.Meters[0]); err != nil {
return olterrors.NewErrInvalidValue(log.Fields{
"reason": "invalid-meter-config",
"meter-id": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
}
- TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
if err := f.pushSchedulerQueuesToDevice(ctx, sq, TrafficSched); err != nil {
@@ -549,7 +502,7 @@
}
func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(ctx context.Context, sq schedQueue, TrafficSched []*tp_pb.TrafficScheduler) error {
- trafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
+ trafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
if err != nil {
return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
@@ -598,10 +551,9 @@
"device-id": f.deviceHandler.device.Id})
if sq.direction == tp_pb.Direction_DOWNSTREAM {
- multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile))
+ multicastTrafficQueues := f.techprofile.GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance))
if len(multicastTrafficQueues) > 0 {
- if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present {
- //assumed that there is only one queue per PON for the multicast service
+ if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present { //assumed that there is only one queue per PON for the multicast service
//the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
//just put it in interfaceToMcastQueueMap to use for building group members
logger.Debugw(ctx, "multicast-traffic-queues", log.Fields{"device-id": f.deviceHandler.device.Id})
@@ -613,7 +565,7 @@
f.grpMgr.UpdateInterfaceToMcastQueueMap(sq.intfID, val)
//also store the queue info in kv store
if err := f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID, multicastQueuePerPonPort.GemportId, multicastQueuePerPonPort.Priority); err != nil {
- logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"error": err})
+ logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"err": err})
return err
}
@@ -639,27 +591,19 @@
"uni-port": sq.uniPort,
"device-id": f.deviceHandler.device.Id})
if sq.direction == tp_pb.Direction_UPSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetUsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+ SchedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
Direction = "upstream"
} else if sq.direction == tp_pb.Direction_DOWNSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetDsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+ SchedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
Direction = "downstream"
}
- if err != nil {
- return olterrors.NewErrNotFound("scheduler-config",
- log.Fields{
- "int-id": sq.intfID,
- "direction": sq.direction,
- "device-id": f.deviceHandler.device.Id}, err)
- }
-
TrafficShaping := &tp_pb.TrafficShapingInfo{} // this info is not really useful for the agent during flow removal. Just use default values.
- TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
- TrafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
+ TrafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
if err != nil {
return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
log.Fields{
@@ -703,7 +647,7 @@
"uni-port": sq.uniPort})
if sq.direction == tp_pb.Direction_UPSTREAM {
- allocID := sq.tpInst.(*tp.TechProfile).UsScheduler.AllocID
+ allocID := sq.tpInst.(*tp_pb.TechProfileInstance).UsScheduler.AllocId
f.resourceMgr.FreeAllocID(ctx, sq.intfID, sq.onuID, sq.uniID, allocID)
// Delete the TCONT on the ONU.
uni := getUniPortPath(f.deviceHandler.device.Id, sq.intfID, int32(sq.onuID), int32(sq.uniID))
@@ -752,7 +696,6 @@
var gemPortIDs []uint32
tpInstanceExists := false
var err error
-
allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
tpPath := f.getTPpath(ctx, intfID, uni, TpID)
@@ -765,24 +708,24 @@
"tp-id": TpID})
// Check tech profile instance already exists for derived port name
- techProfileInstance, _ := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, TpID, tpPath)
+ techProfileInstance, _ := f.techprofile.GetTPInstance(ctx, tpPath)
if techProfileInstance == nil {
logger.Infow(ctx, "tp-instance-not-found--creating-new",
log.Fields{
"path": tpPath,
"device-id": f.deviceHandler.device.Id})
- techProfileInstance, err = f.techprofile[intfID].CreateTechProfInstance(ctx, TpID, uni, intfID)
+ techProfileInstance, err = f.techprofile.CreateTechProfileInstance(ctx, TpID, uni, intfID)
if err != nil {
// This should not happen, something wrong in KV backend transaction
logger.Errorw(ctx, "tp-instance-create-failed",
log.Fields{
- "error": err,
+ "err": err,
"tp-id": TpID,
"device-id": f.deviceHandler.device.Id})
return 0, nil, nil
}
if err := f.resourceMgr.UpdateTechProfileIDForOnu(ctx, intfID, onuID, uniID, TpID); err != nil {
- logger.Warnw(ctx, "failed-to-update-tech-profile-id", log.Fields{"error": err})
+ logger.Warnw(ctx, "failed-to-update-tech-profile-id", log.Fields{"err": err})
}
} else {
logger.Debugw(ctx, "tech-profile-instance-already-exist-for-given port-name",
@@ -793,14 +736,14 @@
}
switch tpInst := techProfileInstance.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
if UsMeterID != 0 {
sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
logger.Errorw(ctx, "CreateSchedulerQueues-failed-upstream",
log.Fields{
- "error": err,
+ "err": err,
"onu-id": onuID,
"uni-id": uniID,
"intf-id": intfID,
@@ -815,7 +758,7 @@
if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
logger.Errorw(ctx, "CreateSchedulerQueues-failed-downstream",
log.Fields{
- "error": err,
+ "err": err,
"onu-id": onuID,
"uni-id": uniID,
"intf-id": intfID,
@@ -824,9 +767,9 @@
return 0, nil, nil
}
}
- allocID := tpInst.UsScheduler.AllocID
+ allocID := tpInst.UsScheduler.AllocId
for _, gem := range tpInst.UpstreamGemPortAttributeList {
- gemPortIDs = append(gemPortIDs, gem.GemportID)
+ gemPortIDs = append(gemPortIDs, gem.GemportId)
}
allocIDs = appendUnique32bit(allocIDs, allocID)
@@ -848,12 +791,12 @@
// Send Tconts and GEM ports to KV store
f.storeTcontsGEMPortsIntoKVStore(ctx, intfID, onuID, uniID, allocIDs, allgemPortIDs)
return allocID, gemPortIDs, techProfileInstance
- case *tp.EponProfile:
+ case *openoltpb2.EponTechProfileInstance:
// CreateSchedulerQueues for EPON needs to be implemented here
// when voltha-protos for EPON is completed.
- allocID := tpInst.AllocID
+ allocID := tpInst.AllocId
for _, gem := range tpInst.UpstreamQueueAttributeList {
- gemPortIDs = append(gemPortIDs, gem.GemportID)
+ gemPortIDs = append(gemPortIDs, gem.GemportId)
}
allocIDs = appendUnique32bit(allocIDs, allocID)
@@ -897,36 +840,18 @@
if err := f.resourceMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs); err != nil {
logger.Errorw(ctx, "error-while-uploading-gemports-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
}
- if err := f.resourceMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, gemPortIDs, intfID, onuID, uniID); err != nil {
- logger.Error(ctx, "error-while-uploading-gemtopon-map-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
- } else {
- //add to gem to uni cache
- f.addGemPortUniAssociationsToCache(ctx, intfID, onuID, uniID, gemPortIDs)
- }
+
logger.Infow(ctx, "stored-tconts-and-gem-into-kv-store-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
for _, gemPort := range gemPortIDs {
f.addGemPortToOnuInfoMap(ctx, intfID, onuID, gemPort)
}
}
-//addGemPortUniAssociationsToCache
-func (f *OpenOltFlowMgr) addGemPortUniAssociationsToCache(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortIDs []uint32) {
- for _, gemPortID := range gemPortIDs {
- key := gemPortKey{
- intfID: intfID,
- gemPort: gemPortID,
- }
- f.toGemToUniMap(ctx, key, onuID, uniID)
- }
- logger.Debugw(ctx, "gem-to-uni-info-added-to-cache", log.Fields{"device-id": f.deviceHandler.device.Id, "intfID": intfID,
- "gemPortIDs": gemPortIDs, "onuID": onuID, "uniID": uniID})
-}
-
func (f *OpenOltFlowMgr) populateTechProfilePerPonPort(ctx context.Context) error {
var tpCount int
for _, techRange := range f.resourceMgr.DevInfo.Ranges {
for _, intfID := range techRange.IntfIds {
- f.techprofile[intfID] = f.resourceMgr.ResourceMgrs[intfID].TechProfileMgr
+ f.techprofile = f.resourceMgr.PonRsrMgr.TechProfileMgr
tpCount++
logger.Debugw(ctx, "init-tech-profile-done",
log.Fields{
@@ -1004,13 +929,6 @@
func (f *OpenOltFlowMgr) addSymmetricDataPathFlow(ctx context.Context, flowContext *flowContext, direction string) error {
- var inverseDirection string
- if direction == Upstream {
- inverseDirection = Downstream
- } else {
- inverseDirection = Upstream
- }
-
intfID := flowContext.intfID
onuID := flowContext.onuID
uniID := flowContext.uniID
@@ -1067,33 +985,23 @@
}, err).Log()
}
- // Get symmetric flowID if it exists
- // This symmetric flowID will be needed by agent software to use the same device flow-id that was used for the
- // symmetric flow earlier
- // symmetric flowID 0 is considered by agent as non-existent symmetric flow
- keySymm := subscriberDataPathFlowIDKey{intfID: intfID, onuID: onuID, uniID: uniID, direction: inverseDirection, tpID: tpID}
- f.subscriberDataPathFlowIDMapLock.RLock()
- symmFlowID := f.subscriberDataPathFlowIDMap[keySymm]
- f.subscriberDataPathFlowIDMapLock.RUnlock()
-
flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
- OnuId: int32(onuID),
- UniId: int32(uniID),
- FlowId: logicalFlow.Id,
- FlowType: direction,
- AllocId: int32(allocID),
- NetworkIntfId: int32(networkIntfID),
- GemportId: int32(gemPortID),
- Classifier: classifierProto,
- Action: actionProto,
- Priority: int32(logicalFlow.Priority),
- Cookie: logicalFlow.Cookie,
- PortNo: flowContext.portNo,
- TechProfileId: tpID,
- ReplicateFlow: len(flowContext.pbitToGem) > 0,
- PbitToGemport: flowContext.pbitToGem,
- SymmetricFlowId: symmFlowID,
- GemportToAes: flowContext.gemToAes,
+ OnuId: int32(onuID),
+ UniId: int32(uniID),
+ FlowId: logicalFlow.Id,
+ FlowType: direction,
+ AllocId: int32(allocID),
+ NetworkIntfId: int32(networkIntfID),
+ GemportId: int32(gemPortID),
+ Classifier: classifierProto,
+ Action: actionProto,
+ Priority: int32(logicalFlow.Priority),
+ Cookie: logicalFlow.Cookie,
+ PortNo: flowContext.portNo,
+ TechProfileId: tpID,
+ ReplicateFlow: len(flowContext.pbitToGem) > 0,
+ PbitToGemport: flowContext.pbitToGem,
+ GemportToAes: flowContext.gemToAes,
}
if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
return olterrors.NewErrFlowOp("add", logicalFlow.Id, nil, err).Log()
@@ -1104,21 +1012,6 @@
"flow": flow,
"intf-id": intfID,
"onu-id": onuID})
- flowInfo := rsrcMgr.FlowInfo{Flow: &flow, IsSymmtricFlow: true}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(flow.AccessIntfId), flow.OnuId, flow.UniId, flow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id,
- log.Fields{
- "flow": flow,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": intfID,
- "onu-id": onuID}, err).Log()
- }
-
- // Update the current flowID to the map
- keyCurr := subscriberDataPathFlowIDKey{intfID: intfID, onuID: onuID, uniID: uniID, direction: direction, tpID: tpID}
- f.subscriberDataPathFlowIDMapLock.Lock()
- f.subscriberDataPathFlowIDMap[keyCurr] = logicalFlow.Id
- f.subscriberDataPathFlowIDMapLock.Unlock()
return nil
}
@@ -1206,13 +1099,6 @@
"flow-id": logicalFlow.Id,
"intf-id": intfID,
"onu-id": onuID})
- flowInfo := rsrcMgr.FlowInfo{Flow: &dhcpFlow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(dhcpFlow.AccessIntfId), dhcpFlow.OnuId, dhcpFlow.UniId, dhcpFlow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", dhcpFlow.FlowId,
- log.Fields{
- "flow": dhcpFlow,
- "device-id": f.deviceHandler.device.Id}, err).Log()
- }
return nil
}
@@ -1301,11 +1187,6 @@
return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
}
- flowInfo := rsrcMgr.FlowInfo{Flow: &flow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(flow.AccessIntfId), flow.OnuId, flow.UniId, flow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flow.FlowId, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
- }
-
return nil
}
@@ -1336,7 +1217,7 @@
uplinkAction := make(map[string]interface{})
// Fill Classfier
- uplinkClassifier[EthType] = uint32(ethType)
+ uplinkClassifier[EthType] = ethType
uplinkClassifier[PacketTagType] = SingleTag
uplinkClassifier[VlanVid] = vlanID
uplinkClassifier[VlanPcp] = classifier[VlanPcp]
@@ -1415,13 +1296,7 @@
"intf-id": intfID,
"ethType": ethType,
})
- flowInfo := rsrcMgr.FlowInfo{Flow: &upstreamFlow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(upstreamFlow.AccessIntfId), upstreamFlow.OnuId, upstreamFlow.UniId, upstreamFlow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", upstreamFlow.FlowId,
- log.Fields{
- "flow": upstreamFlow,
- "device-id": f.deviceHandler.device.Id}, err).Log()
- }
+
return nil
}
@@ -1502,7 +1377,7 @@
// getTPpath return the ETCD path for a given UNI port
func (f *OpenOltFlowMgr) getTPpath(ctx context.Context, intfID uint32, uniPath string, TpID uint32) string {
- return f.techprofile[intfID].GetTechProfileInstanceKVPath(ctx, TpID, uniPath)
+ return f.techprofile.GetTechProfileInstanceKey(ctx, TpID, uniPath)
}
// DeleteTechProfileInstances removes the tech profile instances from persistent storage
@@ -1512,11 +1387,11 @@
for _, tpID := range tpIDList {
if err := f.DeleteTechProfileInstance(ctx, intfID, onuID, uniID, uniPortName, tpID); err != nil {
- _ = olterrors.NewErrAdapter("delete-tech-profile-failed", log.Fields{"device-id": f.deviceHandler.device.Id}, err).Log()
+ logger.Errorw(ctx, "delete-tech-profile-failed", log.Fields{"err": err, "device-id": f.deviceHandler.device.Id})
// return err
// We should continue to delete tech-profile instances for other TP IDs
}
- logger.Debugw(ctx, "tech-profile-deleted", log.Fields{"device-id": f.deviceHandler.device.Id, "tp-id": tpID})
+ logger.Debugw(ctx, "tech-profile-instance-deleted", log.Fields{"device-id": f.deviceHandler.device.Id, "uniPortName": uniPortName, "tp-id": tpID})
}
return nil
}
@@ -1526,7 +1401,7 @@
if uniPortName == "" {
uniPortName = getUniPortPath(f.deviceHandler.device.Id, intfID, int32(onuID), int32(uniID))
}
- if err := f.techprofile[intfID].DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
+ if err := f.techprofile.DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
return olterrors.NewErrAdapter("failed-to-delete-tp-instance-from-kv-store",
log.Fields{
"tp-id": tpID,
@@ -1698,13 +1573,7 @@
"device-id": f.deviceHandler.device.Id,
"onu-id": onuID,
"flow-id": flow.Id})
- flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flow.Id,
- log.Fields{
- "flow": downstreamflow,
- "device-id": f.deviceHandler.device.Id}, err)
- }
+
return nil
}
@@ -1781,8 +1650,8 @@
return err
}
- delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpPath: tpPath, GemPortId: gemPortID}
- logger.Debugw(ctx, "sending-gem-port-delete-to-openonu-adapter",
+ delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpInstancePath: tpPath, GemPortId: gemPortID}
+ logger.Infow(ctx, "sending-gem-port-delete-to-openonu-adapter",
log.Fields{
"msg": *delGemPortMsg,
"device-id": f.deviceHandler.device.Id})
@@ -1822,7 +1691,7 @@
return err
}
- delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpPath: tpPath, AllocId: allocID}
+ delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpInstancePath: tpPath, AllocId: allocID}
logger.Debugw(ctx, "sending-tcont-delete-to-openonu-adapter",
log.Fields{
"msg": *delTcontMsg,
@@ -1853,37 +1722,38 @@
// Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
// is conveyed to ONOS during packet-in OF message.
func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(ctx context.Context, intfID uint32, onuID uint32, gemPortID uint32) {
-
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
-
logger.Infow(ctx, "deleting-gem-from-local-cache",
log.Fields{
"gem-port-id": gemPortID,
"intf-id": intfID,
"onu-id": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo})
-
- onugem := f.onuGemInfo
+ "device-id": f.deviceHandler.device.Id})
+ f.onuGemInfoLock.RLock()
+ onugem, ok := f.onuGemInfoMap[onuID]
+ f.onuGemInfoLock.RUnlock()
+ if !ok {
+ logger.Warnw(ctx, "onu gem info already cleared from cache", log.Fields{
+ "gem-port-id": gemPortID,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id})
+ return
+ }
deleteLoop:
- for i, onu := range onugem {
- if onu.OnuID == onuID {
- for j, gem := range onu.GemPorts {
- // If the gemport is found, delete it from local cache.
- if gem == gemPortID {
- onu.GemPorts = append(onu.GemPorts[:j], onu.GemPorts[j+1:]...)
- onugem[i] = onu
- logger.Infow(ctx, "removed-gemport-from-local-cache",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "deletedgemport-id": gemPortID,
- "gemports": onu.GemPorts,
- "device-id": f.deviceHandler.device.Id})
- break deleteLoop
- }
- }
+ for j, gem := range onugem.GemPorts {
+ // If the gemport is found, delete it from local cache.
+ if gem == gemPortID {
+ onugem.GemPorts = append(onugem.GemPorts[:j], onugem.GemPorts[j+1:]...)
+ f.onuGemInfoLock.Lock()
+ f.onuGemInfoMap[onuID] = onugem
+ f.onuGemInfoLock.Unlock()
+ logger.Infow(ctx, "removed-gemport-from-local-cache",
+ log.Fields{
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "deletedgemport-id": gemPortID,
+ "gemports": onugem.GemPorts,
+ "device-id": f.deviceHandler.device.Id})
break deleteLoop
}
}
@@ -1891,7 +1761,7 @@
//clearResources clears pon resources in kv store and the device
// nolint: gocyclo
-func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, intfID uint32, onuID int32, uniID int32,
+func (f *OpenOltFlowMgr) clearResources(ctx context.Context, intfID uint32, onuID int32, uniID int32,
gemPortID int32, flowID uint64, portNum uint32, tpID uint32) error {
uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
@@ -1900,27 +1770,22 @@
log.Fields{
"tpPath": tpPath,
"device-id": f.deviceHandler.device.Id})
- techprofileInst, err := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
- if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
- return olterrors.NewErrNotFound("tech-profile-in-kv-store",
- log.Fields{
- "tp-id": tpID,
- "path": tpPath}, err)
- }
used := f.isGemPortUsedByAnotherFlow(uint32(gemPortID))
if used {
- f.flowsUsedByGemPortKey.Lock()
- defer f.flowsUsedByGemPortKey.Unlock()
+ f.gemToFlowIDsKey.RLock()
+ flowIDs := f.gemToFlowIDs[uint32(gemPortID)]
+ f.gemToFlowIDsKey.RUnlock()
- flowIDs := f.flowsUsedByGemPort[uint32(gemPortID)]
for i, flowIDinMap := range flowIDs {
if flowIDinMap == flowID {
flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
- // everytime flowsUsedByGemPort cache is updated the same should be updated
+ f.gemToFlowIDsKey.Lock()
+ f.gemToFlowIDs[uint32(gemPortID)] = flowIDs
+ f.gemToFlowIDsKey.Unlock()
+ // everytime gemToFlowIDs cache is updated the same should be updated
// in kv store by calling UpdateFlowIDsForGem
- f.flowsUsedByGemPort[uint32(gemPortID)] = flowIDs
if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, uint32(gemPortID), flowIDs); err != nil {
return err
}
@@ -1936,28 +1801,17 @@
return nil
}
logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
- f.resourceMgr.RemoveGemPortIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
- // TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
- // But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
- f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), intfID)
- // also clear gem to uni cache
- f.removeFromGemToUniMap(gemPortKey{
- intfID: intfID,
- gemPort: uint32(gemPortID),
- })
f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), uint32(gemPortID))
-
- f.onuIdsLock.Lock() // TODO: What is this lock?
-
- //everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
+ _ = f.resourceMgr.RemoveGemFromOnuGemInfo(ctx, intfID, uint32(onuID), uint32(gemPortID)) // ignore error and proceed.
+ //everytime an entry is deleted from gemToFlowIDs cache, the same should be updated in kv as well
// by calling DeleteFlowIDsForGem
- f.flowsUsedByGemPortKey.Lock()
- delete(f.flowsUsedByGemPort, uint32(gemPortID))
- f.flowsUsedByGemPortKey.Unlock()
- f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
- f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.gemToFlowIDsKey.Lock()
+ delete(f.gemToFlowIDs, uint32(gemPortID))
+ f.gemToFlowIDsKey.Unlock()
- f.onuIdsLock.Unlock()
+ f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
+
+ f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
// Delete the gem port on the ONU.
if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
@@ -1970,8 +1824,15 @@
"device-id": f.deviceHandler.device.Id,
"gemport-id": gemPortID})
}
+ techprofileInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
+ if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
+ return olterrors.NewErrNotFound("tech-profile-in-kv-store",
+ log.Fields{
+ "tp-id": tpID,
+ "path": tpPath}, err)
+ }
switch techprofileInst := techprofileInst.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
ok, _ := f.isTechProfileUsedByAnotherGem(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst, uint32(gemPortID))
if !ok {
if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
@@ -1987,23 +1848,23 @@
logger.Warn(ctx, err)
}
}
- case *tp.EponProfile:
+ case *tp_pb.EponTechProfileInstance:
if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
logger.Warn(ctx, err)
}
if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
logger.Warn(ctx, err)
}
- f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId)
// Delete the TCONT on the ONU.
- if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
+ if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId, tpPath); err != nil {
logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
log.Fields{
"intf": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id,
- "alloc-id": techprofileInst.AllocID})
+ "alloc-id": techprofileInst.AllocId})
}
default:
logger.Errorw(ctx, "error-unknown-tech",
@@ -2016,7 +1877,6 @@
// nolint: gocyclo
func (f *OpenOltFlowMgr) clearFlowFromDeviceAndResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) error {
- var flowInfo *rsrcMgr.FlowInfo
logger.Infow(ctx, "clear-flow-from-resource-manager",
log.Fields{
"flowDirection": flowDirection,
@@ -2037,6 +1897,16 @@
onuID := int32(onu)
uniID := int32(uni)
+ tpID, err := getTpIDFromFlow(ctx, flow)
+ if err != nil {
+ return olterrors.NewErrNotFound("tp-id",
+ log.Fields{
+ "flow": flow,
+ "intf-id": Intf,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "device-id": f.deviceHandler.device.Id}, err)
+ }
for _, field := range flows.GetOfbFields(flow) {
if field.Type == flows.IP_PROTO {
@@ -2060,86 +1930,45 @@
logger.Errorw(ctx, "invalid-in-port-number",
log.Fields{
"port-number": inPort,
- "error": err})
+ "err": err})
return err
}
}
- if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flow.Id); flowInfo == nil {
- logger.Errorw(ctx, "flow-info-not-found-for-flow-to-be-removed", log.Fields{"flow-id": flow.Id, "intf-id": Intf, "onu-id": onuID, "uni-id": uniID})
- return olterrors.NewErrPersistence("remove", "flow", flow.Id, log.Fields{"flow": flow}, err)
- }
- removeFlowMessage := openoltpb2.Flow{FlowId: flowInfo.Flow.FlowId, FlowType: flowInfo.Flow.FlowType}
- logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": flowInfo.Flow})
+
+ removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, AccessIntfId: int32(Intf), OnuId: onuID, UniId: uniID, TechProfileId: tpID, FlowType: flowDirection}
+ logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": flow})
if err = f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
return err
}
- if err = f.resourceMgr.RemoveFlowIDInfo(ctx, Intf, onuID, uniID, flow.Id); err != nil {
- logger.Errorw(ctx, "failed-to-remove-flow-on-kv-store", log.Fields{"error": err})
- return err
- }
- tpID, err := getTpIDFromFlow(ctx, flow)
- if err != nil {
- return olterrors.NewErrNotFound("tp-id",
- log.Fields{
- "flow": flow,
- "intf-id": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id}, err)
- }
- if !flowInfo.Flow.ReplicateFlow {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, tpID); err != nil {
+ f.flowIDToGemsLock.Lock()
+ gems, ok := f.flowIDToGems[flow.Id]
+ if !ok {
+ logger.Errorw(ctx, "flow-id-to-gem-map-not-found", log.Fields{"flowID": flow.Id})
+ f.flowIDToGemsLock.Unlock()
+ return olterrors.NewErrNotFound("flow-id-to-gem-map-not-found", log.Fields{"flowID": flow.Id}, nil)
+ }
+ copyOfGems := make([]uint32, len(gems))
+ _ = copy(copyOfGems, gems)
+ // Delete the flow-id to gemport list entry from the map now the flow is deleted.
+ delete(f.flowIDToGems, flow.Id)
+ f.flowIDToGemsLock.Unlock()
+
+ logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": copyOfGems})
+ for _, gem := range copyOfGems {
+ if err = f.clearResources(ctx, Intf, onuID, uniID, int32(gem), flow.Id, portNum, tpID); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
- "flow-id": flow.Id,
- "stored-flow": flowInfo.Flow,
- "device-id": f.deviceHandler.device.Id,
- "stored-flow-id": flowInfo.Flow.FlowId,
- "onu-id": onuID,
- "intf": Intf,
- "err": err,
+ "flow-id": flow.Id,
+ "device-id": f.deviceHandler.device.Id,
+ "onu-id": onuID,
+ "intf": Intf,
+ "gem": gem,
+ "err": err,
})
return err
}
- } else {
- gems := make([]uint32, 0)
- for _, gem := range flowInfo.Flow.PbitToGemport {
- gems = appendUnique32bit(gems, gem)
- }
- logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
- for _, gem := range gems {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, tpID); err != nil {
- logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
- "flow-id": flow.Id,
- "stored-flow": flowInfo.Flow,
- "device-id": f.deviceHandler.device.Id,
- "stored-flow-id": flowInfo.Flow.FlowId,
- "onu-id": onuID,
- "intf": Intf,
- "gem": gem,
- "err": err,
- })
- return err
- }
- }
}
- // If datapath flow, clear the symmetric flow data from the subscriberDataPathFlowIDMap map
- if isDatapathFlow(flow) {
- if tpID, err := getTpIDFromFlow(ctx, flow); err != nil {
- var inverseDirection string
- if flowDirection == Upstream {
- inverseDirection = Downstream
- } else {
- inverseDirection = Upstream
- }
-
- keySymm := subscriberDataPathFlowIDKey{intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), direction: inverseDirection, tpID: tpID}
- f.subscriberDataPathFlowIDMapLock.Lock()
- delete(f.subscriberDataPathFlowIDMap, keySymm)
- f.subscriberDataPathFlowIDMapLock.Unlock()
- }
- }
// Decrement reference count for the meter associated with the given <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, flowDirection, Intf, uint32(onuID), uint32(uniID), tpID, false); err != nil {
return err
@@ -2203,7 +2032,8 @@
// Step2 : Push the flowControlBlock to ONU channel
// Step3 : Wait on response channel for response
// Step4 : Return error value
- logger.Debugw(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
+ startTime := time.Now()
+ logger.Infow(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
errChan := make(chan error)
flowCb := flowControlBlock{
ctx: ctx,
@@ -2223,7 +2053,7 @@
f.incomingFlows[onuID] <- flowCb
// Wait on the channel for flow handlers return value
err := <-errChan
- logger.Debugw(ctx, "process-flow--received-resp", log.Fields{"flow": flow, "addFlow": addFlow, "err": err})
+ logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
return err
}
@@ -2235,17 +2065,17 @@
// process the flow completely before proceeding to handle the next flow
flowCb := <-subscriberFlowChannel
if flowCb.addFlow {
- logger.Debugw(flowCb.ctx, "adding-flow",
- log.Fields{"device-id": f.deviceHandler.device.Id,
- "flowToAdd": flowCb.flow})
+ logger.Info(flowCb.ctx, "adding-flow-start")
+ startTime := time.Now()
err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+ logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
// Pass the return value over the return channel
*flowCb.errChan <- err
} else {
- logger.Debugw(flowCb.ctx, "removing-flow",
- log.Fields{"device-id": f.deviceHandler.device.Id,
- "flowToRemove": flowCb.flow})
+ logger.Info(flowCb.ctx, "removing-flow-start")
+ startTime := time.Now()
err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+ logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
// Pass the return value over the return channel
*flowCb.errChan <- err
}
@@ -2393,14 +2223,10 @@
}
//cached group can be removed now
if err := f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true); err != nil {
- logger.Warnw(ctx, "failed-to-remove-flow-group", log.Fields{"group-id": groupID, "error": err})
+ logger.Warnw(ctx, "failed-to-remove-flow-group", log.Fields{"group-id": groupID, "err": err})
}
}
- flowInfo := rsrcMgr.FlowInfo{Flow: &multicastFlow}
- if err = f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flow.Id, log.Fields{"flow": multicastFlow}, err)
- }
return nil
}
@@ -2413,16 +2239,13 @@
}
return nniInterfaceID, nil
}
- // find the first NNI interface id of the device
- nniPorts, e := f.resourceMgr.GetNNIFromKVStore(ctx)
- if e == nil && len(nniPorts) > 0 {
- return nniPorts[0], nil
- }
- return 0, olterrors.NewErrNotFound("nni-port", nil, e).Log()
+
+ // TODO: For now we support only one NNI port in VOLTHA. We shall use only the first NNI port, i.e., interface-id 0.
+ return 0, nil
}
//sendTPDownloadMsgToChild send payload
-func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
+func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32, tpInst tp_pb.TechProfileInstance) error {
onuDev, err := f.getOnuDevice(ctx, intfID, onuID)
if err != nil {
@@ -2436,7 +2259,11 @@
logger.Debugw(ctx, "got-child-device-from-olt-device-handler", log.Fields{"onu-id": onuDev.deviceID})
tpPath := f.getTPpath(ctx, intfID, uni, TpID)
- tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID, Path: tpPath}
+ tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{
+ UniId: uniID,
+ TpInstancePath: tpPath,
+ TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: &tpInst},
+ }
logger.Debugw(ctx, "sending-load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"tpDownloadMsg": *tpDownloadMsg})
sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
tpDownloadMsg,
@@ -2458,24 +2285,25 @@
}
//UpdateOnuInfo function adds onu info to cache and kvstore
+//UpdateOnuInfo function adds onu info to cache and kvstore
func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
- onugem := f.onuGemInfo
+ f.onuGemInfoLock.RLock()
+ _, ok := f.onuGemInfoMap[onuID]
+ f.onuGemInfoLock.RUnlock()
// If the ONU already exists in onuGemInfo list, nothing to do
- for _, onu := range onugem {
- if onu.OnuID == onuID && onu.SerialNumber == serialNum {
- logger.Debugw(ctx, "onu-id-already-exists-in-cache",
- log.Fields{"onuID": onuID,
- "serialNum": serialNum})
- return nil
- }
+ if ok {
+ logger.Debugw(ctx, "onu-id-already-exists-in-cache",
+ log.Fields{"onuID": onuID,
+ "serialNum": serialNum})
+ return nil
}
- onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
- f.onuGemInfo = append(f.onuGemInfo, onu)
- if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onu); err != nil {
+ onuGemInfo := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
+ f.onuGemInfoLock.Lock()
+ f.onuGemInfoMap[onuID] = &onuGemInfo
+ f.onuGemInfoLock.Unlock()
+ if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onuID, onuGemInfo); err != nil {
return err
}
logger.Infow(ctx, "updated-onuinfo",
@@ -2483,7 +2311,7 @@
"intf-id": intfID,
"onu-id": onuID,
"serial-num": serialNum,
- "onu": onu,
+ "onu": onuGemInfo,
"device-id": f.deviceHandler.device.Id})
return nil
}
@@ -2491,34 +2319,46 @@
//addGemPortToOnuInfoMap function adds GEMport to ONU map
func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
-
logger.Infow(ctx, "adding-gem-to-onu-info-map",
log.Fields{
"gem-port-id": gemPort,
"intf-id": intfID,
"onu-id": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo})
- onugem := f.onuGemInfo
- // update the gem to the local cache as well as to kv strore
- for idx, onu := range onugem {
- if onu.OnuID == onuID {
- // check if gem already exists , else update the cache and kvstore
- for _, gem := range onu.GemPorts {
- if gem == gemPort {
- logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
- log.Fields{
- "gem": gemPort,
- "device-id": f.deviceHandler.device.Id})
- return
- }
+ "device-id": f.deviceHandler.device.Id})
+ f.onuGemInfoLock.RLock()
+ onugem, ok := f.onuGemInfoMap[onuID]
+ f.onuGemInfoLock.RUnlock()
+ if !ok {
+ logger.Warnw(ctx, "onu gem info is missing", log.Fields{
+ "gem-port-id": gemPort,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id})
+ return
+ }
+
+ if onugem.OnuID == onuID {
+ // check if gem already exists , else update the cache and kvstore
+ for _, gem := range onugem.GemPorts {
+ if gem == gemPort {
+ logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
+ log.Fields{
+ "gem": gemPort,
+ "device-id": f.deviceHandler.device.Id})
+ return
}
- onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
- f.onuGemInfo = onugem
- break
}
+ onugem.GemPorts = append(onugem.GemPorts, gemPort)
+ f.onuGemInfoLock.Lock()
+ f.onuGemInfoMap[onuID] = onugem
+ f.onuGemInfoLock.Unlock()
+ } else {
+ logger.Warnw(ctx, "mismatched onu id", log.Fields{
+ "gem-port-id": gemPort,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id})
+ return
}
err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
if err != nil {
@@ -2535,24 +2375,18 @@
"gem-port-id": gemPort,
"intf-id": intfID,
"onu-id": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo})
+ "device-id": f.deviceHandler.device.Id})
}
//GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(ctx context.Context, packetIn *openoltpb2.PacketIndication) (uint32, error) {
var logicalPortNum uint32
- var onuID, uniID uint32
- var err error
if packetIn.IntfType == "pon" {
// packet indication does not have serial number , so sending as nil
// get onu and uni ids associated with the given pon and gem ports
- if onuID, uniID, err = f.GetUniPortByPonPortGemPort(ctx, packetIn.IntfId, packetIn.GemportId); err != nil {
- // Called method is returning error with all data populated; just return the same
- return logicalPortNum, err
- }
- logger.Debugf(ctx, "retrieved ONU and UNI IDs [%d, %d] by interface:%d, gem:%d")
+ onuID, uniID := packetIn.OnuId, packetIn.UniId
+ logger.Debugf(ctx, "retrieved ONU and UNI IDs [%d, %d] by interface:%d, gem:%d", packetIn.OnuId, packetIn.UniId, packetIn.GemportId)
if packetIn.PortNo != 0 {
logicalPortNum = packetIn.PortNo
@@ -2576,40 +2410,6 @@
return logicalPortNum, nil
}
-//GetUniPortByPonPortGemPort return onu and uni IDs associated with given pon and gem ports
-func (f *OpenOltFlowMgr) GetUniPortByPonPortGemPort(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, uint32, error) {
- key := gemPortKey{
- intfID: intfID,
- gemPort: gemPortID,
- }
- uniPortInfo, ok := f.fromGemToUniMap(key) //try to get from the cache first
- if ok {
- if len(uniPortInfo) > 1 {
- //return onu ID and uni port from the cache
- logger.Debugw(ctx, "found-uni-port-by-pon-and-gem-ports",
- log.Fields{
- "intfID": intfID,
- "gemPortID": gemPortID,
- "onuID, uniID": uniPortInfo})
- return uniPortInfo[0], uniPortInfo[1], nil
- }
- }
- //If uni port is not found in cache try to get it from kv store. if it is found in kv store, update the cache and return.
- onuID, uniID, err := f.resourceMgr.GetUniPortByPonPortGemPortFromKVStore(ctx, intfID, gemPortID)
- if err == nil {
- f.toGemToUniMap(ctx, key, onuID, uniID)
- logger.Infow(ctx, "found-uni-port-by-pon-and-gem-port-from-kv-store-and-updating-cache-with-uni-port",
- log.Fields{
- "gemPortKey": key,
- "onuID": onuID,
- "uniID": uniID})
- return onuID, uniID, nil
- }
- return uint32(0), uint32(0), olterrors.NewErrNotFound("uni-id",
- log.Fields{"interfaceID": intfID, "gemPortID": gemPortID},
- errors.New("no uni port found"))
-}
-
//GetPacketOutGemPortID returns gemPortId
func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32, packet []byte) (uint32, error) {
var gemPortID uint32
@@ -2721,10 +2521,6 @@
return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
}
logger.Info(ctx, "trap-on-nni-flow-added–to-device-successfully")
- flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
- }
return nil
}
@@ -2814,10 +2610,7 @@
return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
}
logger.Info(ctx, "igmp-trap-on-nni-flow-added-to-device-successfully")
- flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
- }
+
return nil
}
@@ -2846,10 +2639,10 @@
pbitToGem := make(map[uint32]uint32)
gemToAes := make(map[uint32]bool)
- var attributes []tp.IGemPortAttribute
+ var attributes []*tp_pb.GemPortAttributes
var direction = tp_pb.Direction_UPSTREAM
switch TpInst := TpInst.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
if IsUpstream(actionInfo[Output].(uint32)) {
attributes = TpInst.UpstreamGemPortAttributeList
} else {
@@ -2881,9 +2674,9 @@
}
}
} else { // Extract the exact gemport which maps to the PCP classifier in the flow
- if gem := f.techprofile[intfID].GetGemportForPbit(ctx, TpInst, direction, pcp.(uint32)); gem != nil {
- gemPortID = gem.(tp.IGemPortAttribute).GemportID
- gemToAes[gemPortID], _ = strconv.ParseBool(gem.(tp.IGemPortAttribute).AesEncryption)
+ if gem := f.techprofile.GetGemportForPbit(ctx, TpInst, direction, pcp.(uint32)); gem != nil {
+ gemPortID = gem.(*tp_pb.GemPortAttributes).GemportId
+ gemToAes[gemPortID], _ = strconv.ParseBool(gem.(*tp_pb.GemPortAttributes).AesEncryption)
}
}
@@ -2981,26 +2774,26 @@
}
// Send Techprofile download event to child device in go routine as it takes time
go func() {
- if err := f.sendTPDownloadMsgToChild(ctx, intfID, onuID, uniID, uni, tpID); err != nil {
+ if err := f.sendTPDownloadMsgToChild(ctx, intfID, onuID, uniID, uni, tpID, *(TpInst.(*tp_pb.TechProfileInstance))); err != nil {
logger.Warn(ctx, err)
}
}()
}
func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPortID uint32) bool {
- f.flowsUsedByGemPortKey.RLock()
- flowIDList := f.flowsUsedByGemPort[gemPortID]
- f.flowsUsedByGemPortKey.RUnlock()
+ f.gemToFlowIDsKey.RLock()
+ flowIDList := f.gemToFlowIDs[gemPortID]
+ f.gemToFlowIDsKey.RUnlock()
return len(flowIDList) > 1
}
-func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
+func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpInst *tp_pb.TechProfileInstance, gemPortID uint32) (bool, uint32) {
currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, ponIntf, onuID, uniID)
tpGemPorts := tpInst.UpstreamGemPortAttributeList
for _, currentGemPort := range currentGemPorts {
for _, tpGemPort := range tpGemPorts {
- if (currentGemPort == tpGemPort.GemportID) && (currentGemPort != gemPortID) {
+ if (currentGemPort == tpGemPort.GemportId) && (currentGemPort != gemPortID) {
return true, currentGemPort
}
}
@@ -3010,21 +2803,21 @@
}
func (f *OpenOltFlowMgr) isAllocUsedByAnotherUNI(ctx context.Context, sq schedQueue) bool {
- tpInst := sq.tpInst.(*tp.TechProfile)
- if tpInst.InstanceCtrl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
- tpInstances := f.techprofile[sq.intfID].FindAllTpInstances(ctx, f.deviceHandler.device.Id, sq.tpID, sq.intfID, sq.onuID).([]tp.TechProfile)
+ tpInst := sq.tpInst.(*tp_pb.TechProfileInstance)
+ if tpInst.InstanceControl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
+ tpInstances := f.techprofile.FindAllTpInstances(ctx, f.deviceHandler.device.Id, sq.tpID, sq.intfID, sq.onuID).([]tp_pb.TechProfileInstance)
logger.Debugw(ctx, "got-single-instance-tp-instances", log.Fields{"tp-instances": tpInstances})
for i := 0; i < len(tpInstances); i++ {
tpI := tpInstances[i]
if tpI.SubscriberIdentifier != tpInst.SubscriberIdentifier &&
- tpI.UsScheduler.AllocID == tpInst.UsScheduler.AllocID {
+ tpI.UsScheduler.AllocId == tpInst.UsScheduler.AllocId {
logger.Debugw(ctx, "alloc-is-in-use",
log.Fields{
"device-id": f.deviceHandler.device.Id,
"intfID": sq.intfID,
"onuID": sq.onuID,
"uniID": sq.uniID,
- "allocID": tpI.UsScheduler.AllocID,
+ "allocID": tpI.UsScheduler.AllocId,
})
return true
}
@@ -3250,7 +3043,7 @@
logger.Debugw(ctx, "invalid-action-port-number",
log.Fields{
"port-number": action[Output].(uint32),
- "error": err})
+ "err": err})
return uint32(0), err
}
logger.Infow(ctx, "output-nni-intfId-is", log.Fields{"intf-id": intfID})
@@ -3261,7 +3054,7 @@
logger.Debugw(ctx, "invalid-classifier-port-number",
log.Fields{
"port-number": action[Output].(uint32),
- "error": err})
+ "err": err})
return uint32(0), err
}
logger.Infow(ctx, "input-nni-intfId-is", log.Fields{"intf-id": intfID})
@@ -3331,69 +3124,39 @@
return 0, 0, nil
}
-// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
-func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
-
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
-
- onugem := f.onuGemInfo
- for idx, onu := range onugem {
- if onu.OnuID == onuID {
- for _, uni := range onu.UniPorts {
- if uni == portNum {
- logger.Infow(ctx, "uni-already-in-cache--no-need-to-update-cache-and-kv-store", log.Fields{"uni": portNum})
- return
+func (f *OpenOltFlowMgr) loadFlowIDsForGemAndGemIDsForFlow(ctx context.Context) {
+ logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - start")
+ f.onuGemInfoLock.RLock()
+ f.gemToFlowIDsKey.Lock()
+ f.flowIDToGemsLock.Lock()
+ for _, og := range f.onuGemInfoMap {
+ for _, gem := range og.GemPorts {
+ flowIDs, err := f.resourceMgr.GetFlowIDsForGem(ctx, f.ponPortIdx, gem)
+ if err != nil {
+ f.gemToFlowIDs[gem] = flowIDs
+ for _, flowID := range flowIDs {
+ if _, ok := f.flowIDToGems[flowID]; !ok {
+ f.flowIDToGems[flowID] = []uint32{gem}
+ } else {
+ f.flowIDToGems[flowID] = appendUnique32bit(f.flowIDToGems[flowID], gem)
+ }
}
}
- onugem[idx].UniPorts = append(onugem[idx].UniPorts, portNum)
- f.onuGemInfo = onugem
}
}
- f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNum)
-
-}
-
-func (f *OpenOltFlowMgr) loadFlowIDlistForGem(ctx context.Context, intf uint32) {
- flowIDsList, err := f.resourceMgr.GetFlowIDsGemMapForInterface(ctx, intf)
- if err != nil {
- logger.Error(ctx, "failed-to-get-flowid-list-per-gem", log.Fields{"intf": intf})
- return
- }
- f.flowsUsedByGemPortKey.Lock()
- for gem, FlowIDs := range flowIDsList {
- f.flowsUsedByGemPort[gem] = FlowIDs
- }
- f.flowsUsedByGemPortKey.Unlock()
+ f.flowIDToGemsLock.Unlock()
+ f.gemToFlowIDsKey.Unlock()
+ f.onuGemInfoLock.RUnlock()
+ logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - end")
}
//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
// clears resources reserved for this multicast flow
func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) error {
- classifierInfo := make(map[string]interface{})
- var flowInfo *rsrcMgr.FlowInfo
- formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
- networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
-
- if err != nil {
- logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
- return err
- }
-
- var onuID = int32(NoneOnuID)
- var uniID = int32(NoneUniID)
- if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id); flowInfo == nil {
- return olterrors.NewErrPersistence("remove", "flow", flow.Id,
- log.Fields{
- "flow": flow,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": networkInterfaceID,
- "onu-id": onuID}, err).Log()
- }
- removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, FlowType: flowInfo.Flow.FlowType}
+ removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, FlowType: Multicast}
logger.Debugw(ctx, "multicast-flow-to-be-deleted",
log.Fields{
- "flow": flowInfo.Flow,
+ "flow": flow,
"flow-id": flow.Id,
"device-id": f.deviceHandler.device.Id})
// Remove from device
@@ -3402,60 +3165,44 @@
logger.Errorw(ctx, "failed-to-remove-multicast-flow",
log.Fields{
"flow-id": flow.Id,
- "error": err})
+ "err": err})
return err
}
- // Remove flow from KV store
- return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id)
+
+ return nil
}
-// reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
-func (f *OpenOltFlowMgr) reconcileSubscriberDataPathFlowIDMap(ctx context.Context) {
- onuGemInfo, err := f.resourceMgr.GetOnuGemInfo(ctx, f.ponPortIdx)
+func (f *OpenOltFlowMgr) getTechProfileDownloadMessage(ctx context.Context, tpPath string, ponID uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
+ tpInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
if err != nil {
- _ = olterrors.NewErrNotFound("onu", log.Fields{
- "pon-port": f.ponPortIdx}, err).Log()
- return
+ logger.Errorw(ctx, "error-fetching-tp-instance", log.Fields{"tpPath": tpPath})
+ return nil
}
- f.subscriberDataPathFlowIDMapLock.Lock()
- defer f.subscriberDataPathFlowIDMapLock.Unlock()
-
- for _, onu := range onuGemInfo {
- for _, uniID := range onu.UniPorts {
- flowIDs, err := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID))
- if err != nil {
- logger.Fatalf(ctx, "failed-to-read-flow-ids-of-onu-during-reconciliation")
- }
- for _, flowID := range flowIDs {
- flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
- if flowInfo == nil {
- // Error is already logged in the called function
- continue
- }
- if flowInfo.Flow.Classifier.PktTagType == DoubleTag &&
- flowInfo.Flow.FlowType == Downstream &&
- flowInfo.Flow.Classifier.OVid > 0 &&
- flowInfo.Flow.TechProfileId > 0 {
- key := subscriberDataPathFlowIDKey{intfID: onu.IntfID, onuID: onu.OnuID, uniID: uniID, direction: flowInfo.Flow.FlowType, tpID: flowInfo.Flow.TechProfileId}
- if _, ok := f.subscriberDataPathFlowIDMap[key]; !ok {
- f.subscriberDataPathFlowIDMap[key] = flowInfo.Flow.FlowId
- }
- } else if flowInfo.Flow.Classifier.PktTagType == SingleTag &&
- flowInfo.Flow.FlowType == Upstream &&
- flowInfo.Flow.Action.OVid > 0 &&
- flowInfo.Flow.TechProfileId > 0 {
- key := subscriberDataPathFlowIDKey{intfID: onu.IntfID, onuID: onu.OnuID, uniID: uniID, direction: flowInfo.Flow.FlowType, tpID: flowInfo.Flow.TechProfileId}
- if _, ok := f.subscriberDataPathFlowIDMap[key]; !ok {
- f.subscriberDataPathFlowIDMap[key] = flowInfo.Flow.FlowId
- }
- }
- }
+ switch tpInst := tpInst.(type) {
+ case *tp_pb.TechProfileInstance:
+ logger.Debugw(ctx, "fetched-tp-instance-successfully--formulating-tp-download-msg", log.Fields{"tpPath": tpPath})
+ return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+ TpInstancePath: tpPath,
+ TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: tpInst},
}
+ case *openoltpb2.EponTechProfileInstance:
+ return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+ TpInstancePath: tpPath,
+ TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_EponTpInstance{EponTpInstance: tpInst},
+ }
+ default:
+ logger.Errorw(ctx, "unknown-tech", log.Fields{"tpPath": tpPath})
}
+ return nil
}
-// isDatapathFlow declares a flow as datapath flow if it is not a controller bound flow and the flow does not have group
-func isDatapathFlow(flow *ofp.OfpFlowStats) bool {
- return !IsControllerBoundFlow(flows.GetOutPort(flow)) && !flows.HasGroup(flow)
+func (f *OpenOltFlowMgr) getOnuGemInfoList() []rsrcMgr.OnuGemInfo {
+ var onuGemInfoLst []rsrcMgr.OnuGemInfo
+ f.onuGemInfoLock.RLock()
+ defer f.onuGemInfoLock.RUnlock()
+ for _, v := range f.onuGemInfoMap {
+ onuGemInfoLst = append(onuGemInfoLst, *v)
+ }
+ return onuGemInfoLst
}
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 44a02fb..214fa61 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -29,15 +29,10 @@
"github.com/opencord/voltha-protos/v4/go/voltha"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
- "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
+ fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
- "github.com/opencord/voltha-openolt-adapter/pkg/mocks"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/openolt"
openoltpb2 "github.com/opencord/voltha-protos/v4/go/openolt"
tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
)
@@ -48,41 +43,11 @@
_, _ = log.SetDefaultLogger(log.JSON, log.DebugLevel, nil)
flowMgr = newMockFlowmgr()
}
-func newMockResourceMgr() *resourcemanager.OpenOltResourceMgr {
- ranges := []*openolt.DeviceInfo_DeviceResourceRanges{
- {
- IntfIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
- Technology: "Default",
- },
- }
-
- deviceinfo := &openolt.DeviceInfo{Vendor: "openolt", Model: "openolt", HardwareVersion: "1.0", FirmwareVersion: "1.0",
- DeviceId: "olt", DeviceSerialNumber: "openolt", PonPorts: 16, Technology: "Default",
- OnuIdStart: OnuIDStart, OnuIdEnd: OnuIDEnd, AllocIdStart: AllocIDStart, AllocIdEnd: AllocIDEnd,
- GemportIdStart: GemIDStart, GemportIdEnd: GemIDEnd, FlowIdStart: FlowIDStart, FlowIdEnd: FlowIDEnd,
- Ranges: ranges,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- rsrMgr := resourcemanager.NewResourceMgr(ctx, "olt", "127.0.0.1:2379", "etcd", "olt", deviceinfo, "service/voltha")
- for key := range rsrMgr.ResourceMgrs {
- rsrMgr.ResourceMgrs[key].KVStore = &db.Backend{}
- rsrMgr.ResourceMgrs[key].KVStore.Client = &mocks.MockKVClient{}
- rsrMgr.ResourceMgrs[key].TechProfileMgr = mocks.MockTechProfile{TpID: key}
- }
- return rsrMgr
-}
func newMockFlowmgr() []*OpenOltFlowMgr {
- rMgr := newMockResourceMgr()
dh := newMockDeviceHandler()
- rMgr.KVStore = &db.Backend{}
- rMgr.KVStore.Client = &mocks.MockKVClient{}
-
- dh.resourceMgr = rMgr
-
- // onuGemInfo := make([]rsrcMgr.OnuGemInfo, NumPonPorts)
+ // onuGemInfoMap := make([]rsrcMgr.onuGemInfoMap, NumPonPorts)
var i uint32
for i = 0; i < NumPonPorts; i++ {
@@ -90,11 +55,7 @@
packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: i, OnuID: i + 1, LogicalPort: i + 1, VlanID: uint16(i), Priority: uint8(i)}] = i + 1
dh.flowMgr[i].packetInGemPort = packetInGemPort
- tps := make(map[uint32]tp.TechProfileIf)
- for key := range rMgr.ResourceMgrs {
- tps[key] = mocks.MockTechProfile{TpID: key}
- }
- dh.flowMgr[i].techprofile = tps
+ dh.flowMgr[i].techprofile = dh.resourceMgr[i].PonRsrMgr.TechProfileMgr
interface2mcastQeueuMap := make(map[uint32]*QueueInfoBrief)
interface2mcastQeueuMap[0] = &QueueInfoBrief{
gemPortID: 4000,
@@ -102,21 +63,22 @@
}
dh.flowMgr[i].grpMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
}
-
return dh.flowMgr
}
func TestOpenOltFlowMgr_CreateSchedulerQueues(t *testing.T) {
- tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
+ tprofile := &tp_pb.TechProfileInstance{Name: "tp1", SubscriberIdentifier: "subscriber1",
ProfileType: "pt1", NumGemPorts: 1, Version: 1,
- InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
+ InstanceControl: &tp_pb.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
}
- tprofile.UsScheduler.Direction = "UPSTREAM"
- tprofile.UsScheduler.QSchedPolicy = "WRR"
+ tprofile.UsScheduler = &openoltpb2.SchedulerAttributes{}
+ tprofile.UsScheduler.Direction = tp_pb.Direction_UPSTREAM
+ tprofile.UsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
tprofile2 := tprofile
- tprofile2.DsScheduler.Direction = "DOWNSTREAM"
- tprofile2.DsScheduler.QSchedPolicy = "WRR"
+ tprofile2.DsScheduler = &openoltpb2.SchedulerAttributes{}
+ tprofile2.DsScheduler.Direction = tp_pb.Direction_DOWNSTREAM
+ tprofile2.DsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
tests := []struct {
name string
@@ -135,17 +97,17 @@
{"CreateSchedulerQueues-19", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, createFlowMetadata(tprofile, 5, Upstream)}, false},
{"CreateSchedulerQueues-20", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, createFlowMetadata(tprofile2, 5, Downstream)}, false},
- {"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, createFlowMetadata(tprofile, 0, Upstream)}, true},
- {"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, createFlowMetadata(tprofile2, 0, Downstream)}, true},
+ {"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, createFlowMetadata(tprofile, 0, Upstream)}, false},
+ {"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, createFlowMetadata(tprofile2, 0, Downstream)}, false},
{"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, createFlowMetadata(tprofile, 2, Upstream)}, true},
{"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, createFlowMetadata(tprofile2, 2, Downstream)}, true},
{"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 1, 2, 2, 64, 2, tprofile, 2, createFlowMetadata(tprofile, 3, Upstream)}, true},
{"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 2, 2, 65, 2, tprofile2, 2, createFlowMetadata(tprofile2, 3, Downstream)}, true},
//Negative testcases
- {"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, false},
{"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 0, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, false},
{"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, &voltha.FlowMetadata{}}, true},
{"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
{"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, nil}, true},
@@ -161,35 +123,35 @@
}
}
-func createFlowMetadata(techProfile *tp.TechProfile, tcontType int, direction string) *voltha.FlowMetadata {
- var additionalBw string
+func createFlowMetadata(techProfile *tp_pb.TechProfileInstance, tcontType int, direction string) *voltha.FlowMetadata {
+ var additionalBw openoltpb2.AdditionalBW
bands := make([]*ofp.OfpMeterBandHeader, 0)
switch tcontType {
case 1:
//tcont-type-1
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 10000, BurstSize: 0, Data: &ofp.OfpMeterBandHeader_Drop{}})
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 10000, BurstSize: 0, Data: &ofp.OfpMeterBandHeader_Drop{}})
- additionalBw = "AdditionalBW_None"
+ additionalBw = tp_pb.AdditionalBW_AdditionalBW_None
case 2:
//tcont-type-2
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 60000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 50000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
- additionalBw = "AdditionalBW_None"
+ additionalBw = tp_pb.AdditionalBW_AdditionalBW_None
case 3:
//tcont-type-3
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 100000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 50000, BurstSize: 20000, Data: &ofp.OfpMeterBandHeader_Drop{}})
- additionalBw = "AdditionalBW_NA"
+ additionalBw = tp_pb.AdditionalBW_AdditionalBW_NA
case 4:
//tcont-type-4
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 200000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
- additionalBw = "AdditionalBW_BestEffort"
+ additionalBw = tp_pb.AdditionalBW_AdditionalBW_BestEffort
case 5:
//tcont-type-5
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 50000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 100000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 10000, BurstSize: 0, Data: &ofp.OfpMeterBandHeader_Drop{}})
- additionalBw = "AdditionalBW_BestEffort"
+ additionalBw = tp_pb.AdditionalBW_AdditionalBW_BestEffort
default:
// do nothing, we will return meter config with no meter bands
}
@@ -206,18 +168,20 @@
}
func TestOpenOltFlowMgr_RemoveSchedulerQueues(t *testing.T) {
- tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
+ tprofile := &tp_pb.TechProfileInstance{Name: "tp1", SubscriberIdentifier: "subscriber1",
ProfileType: "pt1", NumGemPorts: 1, Version: 1,
- InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
+ InstanceControl: &tp_pb.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
}
- tprofile.UsScheduler.Direction = "UPSTREAM"
- tprofile.UsScheduler.AdditionalBw = "AdditionalBW_None"
- tprofile.UsScheduler.QSchedPolicy = "WRR"
+ tprofile.UsScheduler = &openoltpb2.SchedulerAttributes{}
+ tprofile.UsScheduler.Direction = tp_pb.Direction_UPSTREAM
+ tprofile.UsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_None
+ tprofile.UsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
tprofile2 := tprofile
- tprofile2.DsScheduler.Direction = "DOWNSTREAM"
- tprofile2.DsScheduler.AdditionalBw = "AdditionalBW_None"
- tprofile2.DsScheduler.QSchedPolicy = "WRR"
+ tprofile2.DsScheduler = &openoltpb2.SchedulerAttributes{}
+ tprofile2.DsScheduler.Direction = tp_pb.Direction_DOWNSTREAM
+ tprofile2.DsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_None
+ tprofile2.DsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
//defTprofile := &tp.DefaultTechProfile{}
tests := []struct {
name string
@@ -267,7 +231,6 @@
args args
}{
{"createTcontGemports-1", args{intfID: 0, onuID: 1, uniID: 1, uni: "16", uniPort: 1, TpID: 64, UsMeterID: 1, DsMeterID: 1, flowMetadata: flowmetadata}},
- {"createTcontGemports-1", args{intfID: 0, onuID: 1, uniID: 1, uni: "16", uniPort: 1, TpID: 65, UsMeterID: 1, DsMeterID: 1, flowMetadata: flowmetadata}},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -275,11 +238,11 @@
t.Run(tt.name, func(t *testing.T) {
_, _, tpInst := flowMgr[tt.args.intfID].createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
switch tpInst := tpInst.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
if tt.args.TpID != 64 {
t.Errorf("OpenOltFlowMgr.createTcontGemports() error = different tech, tech %v", tpInst)
}
- case *tp.EponProfile:
+ case *tp_pb.EponTechProfileInstance:
if tt.args.TpID != 65 {
t.Errorf("OpenOltFlowMgr.createTcontGemports() error = different tech, tech %v", tpInst)
}
@@ -680,7 +643,7 @@
// clean the flowMgr
for i := 0; i < intfNum; i++ {
- flowMgr[i].onuGemInfo = make([]rsrcMgr.OnuGemInfo, 0)
+ flowMgr[i].onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -696,7 +659,6 @@
// Add gemPorts to OnuInfo in parallel threads
wg := sync.WaitGroup{}
-
for o := 1; o <= onuNum; o++ {
for i := 0; i < intfNum; i++ {
wg.Add(1)
@@ -711,15 +673,15 @@
wg.Wait()
- // check that each entry of onuGemInfo has the correct number of ONUs
+ // check that each entry of onuGemInfoMap has the correct number of ONUs
for i := 0; i < intfNum; i++ {
- lenofOnu := len(flowMgr[i].onuGemInfo)
+ lenofOnu := len(flowMgr[i].onuGemInfoMap)
if onuNum != lenofOnu {
- t.Errorf("OnuGemInfo length is not as expected len = %d, want %d", lenofOnu, onuNum)
+ t.Errorf("onuGemInfoMap length is not as expected len = %d, want %d", lenofOnu, onuNum)
}
for o := 1; o <= onuNum; o++ {
- lenOfGemPorts := len(flowMgr[i].onuGemInfo[o-1].GemPorts)
+ lenOfGemPorts := len(flowMgr[i].onuGemInfoMap[uint32(o)].GemPorts)
// check that each onuEntry has 1 gemPort
if lenOfGemPorts != 1 {
t.Errorf("Expected 1 GemPort per ONU, found %d", lenOfGemPorts)
@@ -727,7 +689,7 @@
// check that the value of the gemport is correct
gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", i, o-1))
- currentValue := flowMgr[i].onuGemInfo[o-1].GemPorts[0]
+ currentValue := flowMgr[i].onuGemInfoMap[uint32(o)].GemPorts[0]
if uint32(gemID) != currentValue {
t.Errorf("Expected GemPort value to be %d, found %d", gemID, currentValue)
}
@@ -774,11 +736,11 @@
for _, gemPortDeleted := range tt.args.gemPortIDsToBeDeleted {
flowMgr[tt.args.intfID].deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
}
- lenofGemPorts := len(flowMgr[tt.args.intfID].onuGemInfo[0].GemPorts)
+ lenofGemPorts := len(flowMgr[tt.args.intfID].onuGemInfoMap[1].GemPorts)
if lenofGemPorts != tt.args.finalLength {
t.Errorf("GemPorts length is not as expected len = %d, want %d", lenofGemPorts, tt.args.finalLength)
}
- gemPorts := flowMgr[tt.args.intfID].onuGemInfo[0].GemPorts
+ gemPorts := flowMgr[tt.args.intfID].onuGemInfoMap[1].GemPorts
if !reflect.DeepEqual(tt.args.gemPortIDsRemaining, gemPorts) {
t.Errorf("GemPorts are not as expected = %v, want %v", gemPorts, tt.args.gemPortIDsRemaining)
}
@@ -798,11 +760,11 @@
wantErr bool
}{
// TODO: Add test cases.
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 255, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048576, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 255, OnuId: 1, UniId: 0, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 0, GemportId: 1, OnuId: 1, UniId: 0, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048576, false},
// Negative Test cases.
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 257, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 16, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, OnuId: 1, UniId: 0, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 257, OnuId: 1, UniId: 0, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 16, false},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -1067,51 +1029,55 @@
// So just return in case of error
return
}
-
- TpInst := &tp.TechProfile{
+ /*
+ usGemList := make([]*tp_pb.GemPortAttributes, 4)
+ usGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ usGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ usGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ usGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ dsGemList := make([]*tp_pb.GemPortAttributes, 4)
+ dsGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ dsGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ dsGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ dsGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ */
+ TpInst := &tp_pb.TechProfileInstance{
Name: "Test-Tech-Profile",
SubscriberIdentifier: "257",
ProfileType: "Mock",
Version: 1,
NumGemPorts: 4,
- InstanceCtrl: tp.InstanceControl{
+ InstanceControl: &tp_pb.InstanceControl{
Onu: "1",
Uni: "16",
},
+ UsScheduler: &openoltpb2.SchedulerAttributes{},
+ DsScheduler: &openoltpb2.SchedulerAttributes{},
}
TpInst.UsScheduler.Priority = 1
- TpInst.UsScheduler.Direction = "upstream"
- TpInst.UsScheduler.AllocID = 1
- TpInst.UsScheduler.AdditionalBw = "None"
- TpInst.UsScheduler.QSchedPolicy = "PQ"
+ TpInst.UsScheduler.Direction = tp_pb.Direction_UPSTREAM
+ TpInst.UsScheduler.AllocId = 1
+ TpInst.UsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_None
+ TpInst.UsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
TpInst.UsScheduler.Weight = 4
TpInst.DsScheduler.Priority = 1
- TpInst.DsScheduler.Direction = "upstream"
- TpInst.DsScheduler.AllocID = 1
- TpInst.DsScheduler.AdditionalBw = "None"
- TpInst.DsScheduler.QSchedPolicy = "PQ"
+ TpInst.DsScheduler.Direction = tp_pb.Direction_DOWNSTREAM
+ TpInst.DsScheduler.AllocId = 1
+ TpInst.DsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_None
+ TpInst.DsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
TpInst.DsScheduler.Weight = 4
+ TpInst.UpstreamGemPortAttributeList = make([]*tp_pb.GemPortAttributes, 0)
+ TpInst.UpstreamGemPortAttributeList = append(TpInst.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 1, PbitMap: "0b00000011"})
+ TpInst.UpstreamGemPortAttributeList = append(TpInst.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 2, PbitMap: "0b00001100"})
+ TpInst.UpstreamGemPortAttributeList = append(TpInst.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 3, PbitMap: "0b00110000"})
+ TpInst.UpstreamGemPortAttributeList = append(TpInst.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 4, PbitMap: "0b11000000"})
- TpInst.UpstreamGemPortAttributeList = make([]tp.IGemPortAttribute, 4)
- TpInst.UpstreamGemPortAttributeList[0].GemportID = 1
- TpInst.UpstreamGemPortAttributeList[0].PbitMap = "0b00000011"
- TpInst.UpstreamGemPortAttributeList[0].GemportID = 2
- TpInst.UpstreamGemPortAttributeList[0].PbitMap = "0b00001100"
- TpInst.UpstreamGemPortAttributeList[0].GemportID = 3
- TpInst.UpstreamGemPortAttributeList[0].PbitMap = "0b00110000"
- TpInst.UpstreamGemPortAttributeList[0].GemportID = 4
- TpInst.UpstreamGemPortAttributeList[0].PbitMap = "0b11000000"
-
- TpInst.DownstreamGemPortAttributeList = make([]tp.IGemPortAttribute, 4)
- TpInst.DownstreamGemPortAttributeList[0].GemportID = 1
- TpInst.DownstreamGemPortAttributeList[0].PbitMap = "0b00000011"
- TpInst.DownstreamGemPortAttributeList[0].GemportID = 2
- TpInst.DownstreamGemPortAttributeList[0].PbitMap = "0b00001100"
- TpInst.DownstreamGemPortAttributeList[0].GemportID = 3
- TpInst.DownstreamGemPortAttributeList[0].PbitMap = "0b00110000"
- TpInst.DownstreamGemPortAttributeList[0].GemportID = 4
- TpInst.DownstreamGemPortAttributeList[0].PbitMap = "0b11000000"
+ TpInst.DownstreamGemPortAttributeList = make([]*tp_pb.GemPortAttributes, 0)
+ TpInst.DownstreamGemPortAttributeList = append(TpInst.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 1, PbitMap: "0b00000011"})
+ TpInst.DownstreamGemPortAttributeList = append(TpInst.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 2, PbitMap: "0b00001100"})
+ TpInst.DownstreamGemPortAttributeList = append(TpInst.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 3, PbitMap: "0b00110000"})
+ TpInst.DownstreamGemPortAttributeList = append(TpInst.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 4, PbitMap: "0b11000000"})
type args struct {
args map[string]uint32
@@ -1123,7 +1089,7 @@
onuID uint32
uniID uint32
portNo uint32
- TpInst *tp.TechProfile
+ TpInst *tp_pb.TechProfileInstance
allocID []uint32
gemPorts []uint32
TpID uint32
diff --git a/internal/pkg/core/openolt_groupmgr.go b/internal/pkg/core/openolt_groupmgr.go
index a87073b..4f633a7 100644
--- a/internal/pkg/core/openolt_groupmgr.go
+++ b/internal/pkg/core/openolt_groupmgr.go
@@ -16,8 +16,8 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
diff --git a/internal/pkg/core/openolt_test.go b/internal/pkg/core/openolt_test.go
index 3933475..8665028 100644
--- a/internal/pkg/core/openolt_test.go
+++ b/internal/pkg/core/openolt_test.go
@@ -28,13 +28,13 @@
"reflect"
"testing"
- conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
+ conf "github.com/opencord/voltha-lib-go/v5/pkg/config"
- com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
- "github.com/opencord/voltha-lib-go/v4/pkg/events"
- fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events"
+ fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
diff --git a/internal/pkg/core/statsmanager.go b/internal/pkg/core/statsmanager.go
index 42c60d7..4d7d52d 100755
--- a/internal/pkg/core/statsmanager.go
+++ b/internal/pkg/core/statsmanager.go
@@ -25,7 +25,7 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"github.com/opencord/voltha-protos/v4/go/extension"
"github.com/opencord/voltha-protos/v4/go/openolt"
@@ -285,7 +285,7 @@
var Ports interface{}
Ports, _ = InitPorts(ctx, "nni", Dev.device.Id, 1)
StatMgr.NorthBoundPort, _ = Ports.(map[uint32]*NniPort)
- NumPonPorts := Dev.resourceMgr.DevInfo.GetPonPorts()
+ NumPonPorts := Dev.resourceMgr[0].DevInfo.GetPonPorts()
Ports, _ = InitPorts(ctx, "pon", Dev.device.Id, NumPonPorts)
StatMgr.SouthBoundPort, _ = Ports.(map[uint32]*PonPort)
if StatMgr.Device.openOLT.enableONUStats {
diff --git a/internal/pkg/olterrors/common.go b/internal/pkg/olterrors/common.go
index fa427a7..7a1d00d 100644
--- a/internal/pkg/olterrors/common.go
+++ b/internal/pkg/olterrors/common.go
@@ -18,7 +18,7 @@
package olterrors
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/internal/pkg/olterrors/olterrors.go b/internal/pkg/olterrors/olterrors.go
index c5790ac..43bcd07 100644
--- a/internal/pkg/olterrors/olterrors.go
+++ b/internal/pkg/olterrors/olterrors.go
@@ -21,7 +21,7 @@
"context"
"encoding/json"
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"strings"
)
@@ -364,4 +364,12 @@
// ErrResourceManagerInstantiating error returned when an unexpected
// condition occcurs while instantiating the resource manager
ErrResourceManagerInstantiating = NewErrAdapter("resoure-manager-instantiating", nil, nil)
+
+ // ErrFlowManagerInstantiating error returned when an unexpected
+ // condition occcurs while instantiating the flow manager
+ ErrFlowManagerInstantiating = NewErrAdapter("flow-manager-instantiating", nil, nil)
+
+ // ErrGroupManagerInstantiating error returned when an unexpected
+ // condition occcurs while instantiating the group manager
+ ErrGroupManagerInstantiating = NewErrAdapter("group-manager-instantiating", nil, nil)
)
diff --git a/internal/pkg/resourcemanager/common.go b/internal/pkg/resourcemanager/common.go
index 5b6eedf..256e657 100644
--- a/internal/pkg/resourcemanager/common.go
+++ b/internal/pkg/resourcemanager/common.go
@@ -18,7 +18,7 @@
package resourcemanager
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index a501310..737f694 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -22,17 +22,14 @@
"encoding/json"
"errors"
"fmt"
- "strconv"
"strings"
"sync"
"time"
- "github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
-
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- ponrmgr "github.com/opencord/voltha-lib-go/v4/pkg/ponresourcemanager"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ ponrmgr "github.com/opencord/voltha-lib-go/v5/pkg/ponresourcemanager"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/openolt"
)
@@ -42,20 +39,18 @@
KvstoreTimeout = 5 * time.Second
// BasePathKvStore - <pathPrefix>/openolt/<device_id>
BasePathKvStore = "%s/openolt/{%s}"
- // TpIDPathSuffix - <(pon_id, onu_id, uni_id)>/tp_id
- TpIDPathSuffix = "{%d,%d,%d}/tp_id"
+ // tpIDPathSuffix - <(pon_id, onu_id, uni_id)>/tp_id
+ tpIDPathSuffix = "{%d,%d,%d}/tp_id"
//MeterIDPathSuffix - <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
MeterIDPathSuffix = "{%d,%d,%d}/{%d}/meter_id/{%s}"
- //NnniIntfID - nniintfids
- NnniIntfID = "nniintfids"
// OnuPacketINPathPrefix - path prefix where ONU packet-in vlanID/PCP is stored
//format: onu_packetin/{<intfid>,<onuid>,<logicalport>}
OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d}"
// OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, packetout
//format: onu_packetin/{<intfid>,<onuid>,<logicalport>}/{<vlanId>,<priority>}
OnuPacketINPath = OnuPacketINPathPrefix + "/{%d,%d}"
- //FlowIDsForGem flowids_per_gem/<intfid>
- FlowIDsForGem = "flowids_per_gem/{%d}"
+ //FlowIDsForGem flowids_per_gem/<intfid>/<gemport-id>
+ FlowIDsForGem = "flowids_per_gem/{%d}/{%d}"
//McastQueuesForIntf multicast queues for pon interfaces
McastQueuesForIntf = "mcast_qs_for_int"
//FlowGroup flow_groups/<flow_group_id>
@@ -74,9 +69,10 @@
//FlowIDPath - Path on the KV store for storing list of Flow IDs for a given subscriber
//Format: BasePathKvStore/<(pon_intf_id, onu_id, uni_id)>/flow_ids
FlowIDPath = "{%s}/flow_ids"
- //FlowIDInfoPath - Used to store more metadata associated with the flow_id
- //Format: BasePathKvStore/<(pon_intf_id, onu_id, uni_id)>/flow_id_info/<flow_id>
- FlowIDInfoPath = "{%s}/flow_id_info/{%d}"
+
+ //OnuGemInfoPath is path on the kvstore to store onugem info map
+ //format: <device-id>/onu_gem_info/<intfid>
+ OnuGemInfoPath = "onu_gem_info/{%d}/{%d}" // onu_gem/<intfid>/<onuID>
)
// FlowInfo holds the flow information
@@ -111,12 +107,13 @@
// MeterInfo store meter information at path <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
type MeterInfo struct {
- RefCnt uint8 // number of flow references for this meter. When RefCnt is 0, the MeterInfo should be deleted.
- MeterConfig ofp.OfpMeterConfig
+ RefCnt uint8 // number of flow references for this meter. When RefCnt is 0, the MeterInfo should be deleted.
+ MeterID uint32
}
// OpenOltResourceMgr holds resource related information as provided below for each field
type OpenOltResourceMgr struct {
+ PonIntfID uint32
DeviceID string // OLT device id
Address string // Host and port of the kv store to connect to
Args string // args
@@ -124,14 +121,40 @@
DeviceType string
DevInfo *openolt.DeviceInfo // device information
// array of pon resource managers per interface technology
- ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+ PonRsrMgr *ponrmgr.PONResourceManager
- // This protects concurrent gemport_id allocate/delete calls on a per PON port basis
- GemPortIDMgmtLock []sync.RWMutex
- // This protects concurrent alloc_id allocate/delete calls on a per PON port basis
- AllocIDMgmtLock []sync.RWMutex
- // This protects concurrent onu_id allocate/delete calls on a per PON port basis
- OnuIDMgmtLock []sync.RWMutex
+ // Local maps used for write-through-cache - start
+ flowIDsForOnu map[string][]uint64
+ flowIDsForOnuLock sync.RWMutex
+
+ allocIDsForOnu map[string][]uint32
+ allocIDsForOnuLock sync.RWMutex
+
+ gemPortIDsForOnu map[string][]uint32
+ gemPortIDsForOnuLock sync.RWMutex
+
+ techProfileIDsForOnu map[string][]uint32
+ techProfileIDsForOnuLock sync.RWMutex
+
+ meterInfoForOnu map[string]*MeterInfo
+ meterInfoForOnuLock sync.RWMutex
+
+ onuGemInfo map[string]*OnuGemInfo
+ onuGemInfoLock sync.RWMutex
+
+ gemPortForPacketInInfo map[string]uint32
+ gemPortForPacketInInfoLock sync.RWMutex
+
+ flowIDsForGem map[uint32][]uint64
+ flowIDsForGemLock sync.RWMutex
+
+ mcastQueueForIntf map[uint32][]uint32
+ mcastQueueForIntfLock sync.RWMutex
+ mcastQueueForIntfLoadedFromKvStore bool
+
+ groupInfo map[string]*GroupInfo
+ groupInfoLock sync.RWMutex
+ // Local maps used for write-through-cache - end
}
func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
@@ -152,6 +175,7 @@
logger.Fatalw(ctx, "Failed to init KV client\n", log.Fields{"err": err})
return nil
}
+ // return db.NewBackend(ctx, backend, addr, KvstoreTimeout, fmt.Sprintf(BasePathKvStore, basePathKvStore, DeviceID))
kvbackend := &db.Backend{
Client: kvClient,
@@ -166,346 +190,163 @@
// NewResourceMgr init a New resource manager instance which in turn instantiates pon resource manager
// instances according to technology. Initializes the default resource ranges for all
// the resources.
-func NewResourceMgr(ctx context.Context, deviceID string, KVStoreAddress string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo, basePathKvStore string) *OpenOltResourceMgr {
+func NewResourceMgr(ctx context.Context, PonIntfID uint32, deviceID string, KVStoreAddress string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo, basePathKvStore string) *OpenOltResourceMgr {
var ResourceMgr OpenOltResourceMgr
- logger.Debugf(ctx, "Init new resource manager , address: %s, device-id: %s", KVStoreAddress, deviceID)
+ logger.Debugf(ctx, "Init new resource manager , ponIf: %v, address: %s, device-id: %s", PonIntfID, KVStoreAddress, deviceID)
+ ResourceMgr.PonIntfID = PonIntfID
ResourceMgr.DeviceID = deviceID
ResourceMgr.Address = KVStoreAddress
ResourceMgr.DeviceType = deviceType
ResourceMgr.DevInfo = devInfo
- NumPONPorts := devInfo.GetPonPorts()
Backend := kvStoreType
ResourceMgr.KVStore = SetKVClient(ctx, Backend, ResourceMgr.Address, deviceID, basePathKvStore)
if ResourceMgr.KVStore == nil {
logger.Error(ctx, "Failed to setup KV store")
}
- Ranges := make(map[string]*openolt.DeviceInfo_DeviceResourceRanges)
- RsrcMgrsByTech := make(map[string]*ponrmgr.PONResourceManager)
- ResourceMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
-
- ResourceMgr.AllocIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
- ResourceMgr.GemPortIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
- ResourceMgr.OnuIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
// TODO self.args = registry('main').get_args()
- /*
- If a legacy driver returns protobuf without any ranges,s synthesize one from
- the legacy global per-device information. This, in theory, is temporary until
- the legacy drivers are upgrade to support pool ranges.
- */
- if devInfo.Ranges == nil {
- var ranges openolt.DeviceInfo_DeviceResourceRanges
- ranges.Technology = devInfo.GetTechnology()
-
- var index uint32
- for index = 0; index < NumPONPorts; index++ {
- ranges.IntfIds = append(ranges.IntfIds, index)
- }
-
- var Pool openolt.DeviceInfo_DeviceResourceRanges_Pool
- Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_ONU_ID
- Pool.Start = devInfo.OnuIdStart
- Pool.End = devInfo.OnuIdEnd
- Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF
- onuPool := Pool
- ranges.Pools = append(ranges.Pools, &onuPool)
-
- Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_ALLOC_ID
- Pool.Start = devInfo.AllocIdStart
- Pool.End = devInfo.AllocIdEnd
- Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
- allocPool := Pool
- ranges.Pools = append(ranges.Pools, &allocPool)
-
- Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_GEMPORT_ID
- Pool.Start = devInfo.GemportIdStart
- Pool.End = devInfo.GemportIdEnd
- Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
- gemPool := Pool
- ranges.Pools = append(ranges.Pools, &gemPool)
-
- Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_FLOW_ID
- Pool.Start = devInfo.FlowIdStart
- Pool.End = devInfo.FlowIdEnd
- Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
- ranges.Pools = append(ranges.Pools, &Pool)
- // Add to device info
- devInfo.Ranges = append(devInfo.Ranges, &ranges)
- }
-
// Create a separate Resource Manager instance for each range. This assumes that
// each technology is represented by only a single range
- var GlobalPONRsrcMgr *ponrmgr.PONResourceManager
- var err error
for _, TechRange := range devInfo.Ranges {
- technology := TechRange.Technology
- logger.Debugf(ctx, "Device info technology %s", technology)
- Ranges[technology] = TechRange
+ for _, intfID := range TechRange.IntfIds {
+ if intfID == PonIntfID {
+ technology := TechRange.Technology
+ logger.Debugf(ctx, "Device info technology %s, intf-id %v", technology, PonIntfID)
- RsrcMgrsByTech[technology], err = ponrmgr.NewPONResourceManager(ctx, technology, deviceType, deviceID,
- Backend, ResourceMgr.Address, basePathKvStore)
- if err != nil {
- logger.Errorf(ctx, "Failed to create pon resource manager instance for technology %s", technology)
- return nil
+ rsrMgr, err := ponrmgr.NewPONResourceManager(ctx, technology, deviceType, deviceID,
+ Backend, ResourceMgr.Address, basePathKvStore)
+ if err != nil {
+ logger.Errorf(ctx, "Failed to create pon resource manager instance for technology %s", technology)
+ return nil
+ }
+ ResourceMgr.PonRsrMgr = rsrMgr
+ // self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
+ InitializeDeviceResourceRangeAndPool(ctx, rsrMgr, TechRange, devInfo)
+ if err := ResourceMgr.PonRsrMgr.InitDeviceResourcePoolForIntf(ctx, intfID); err != nil {
+ logger.Fatal(ctx, "failed-to-initialize-device-resource-pool-intf-id-%v-device-id", ResourceMgr.PonIntfID, ResourceMgr.DeviceID)
+ return nil
+ }
+ }
}
- // resource_mgrs_by_tech[technology] = resource_mgr
- if GlobalPONRsrcMgr == nil {
- GlobalPONRsrcMgr = RsrcMgrsByTech[technology]
- }
- for _, IntfID := range TechRange.IntfIds {
- ResourceMgr.ResourceMgrs[IntfID] = RsrcMgrsByTech[technology]
- }
- // self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
- InitializeDeviceResourceRangeAndPool(ctx, RsrcMgrsByTech[technology], GlobalPONRsrcMgr,
- TechRange, devInfo)
}
- // After we have initialized resource ranges, initialize the
- // resource pools accordingly.
- for _, PONRMgr := range RsrcMgrsByTech {
- _ = PONRMgr.InitDeviceResourcePool(ctx)
- }
+
+ ResourceMgr.InitLocalCache()
+
logger.Info(ctx, "Initialization of resource manager success!")
return &ResourceMgr
}
+//InitLocalCache initializes local maps used for write-through-cache
+func (rsrcMgr *OpenOltResourceMgr) InitLocalCache() {
+ rsrcMgr.flowIDsForOnu = make(map[string][]uint64)
+ rsrcMgr.allocIDsForOnu = make(map[string][]uint32)
+ rsrcMgr.gemPortIDsForOnu = make(map[string][]uint32)
+ rsrcMgr.techProfileIDsForOnu = make(map[string][]uint32)
+ rsrcMgr.meterInfoForOnu = make(map[string]*MeterInfo)
+ rsrcMgr.onuGemInfo = make(map[string]*OnuGemInfo)
+ rsrcMgr.gemPortForPacketInInfo = make(map[string]uint32)
+ rsrcMgr.flowIDsForGem = make(map[uint32][]uint64)
+ rsrcMgr.mcastQueueForIntf = make(map[uint32][]uint32)
+ rsrcMgr.groupInfo = make(map[string]*GroupInfo)
+}
+
// InitializeDeviceResourceRangeAndPool initializes the resource range pool according to the sharing type, then apply
// device specific information. If KV doesn't exist
// or is broader than the device, the device's information will
// dictate the range limits
-func InitializeDeviceResourceRangeAndPool(ctx context.Context, ponRMgr *ponrmgr.PONResourceManager, globalPONRMgr *ponrmgr.PONResourceManager,
+func InitializeDeviceResourceRangeAndPool(ctx context.Context, ponRMgr *ponrmgr.PONResourceManager,
techRange *openolt.DeviceInfo_DeviceResourceRanges, devInfo *openolt.DeviceInfo) {
+ // var ONUIDShared, AllocIDShared, GEMPortIDShared openolt.DeviceInfo_DeviceResourceRanges_Pool_SharingType
+ var ONUIDStart, ONUIDEnd, AllocIDStart, AllocIDEnd, GEMPortIDStart, GEMPortIDEnd uint32
+ var ONUIDShared, AllocIDShared, GEMPortIDShared, FlowIDShared uint32
+
+ // The below variables are just dummy and needed to pass as arguments to InitDefaultPONResourceRanges function.
+ // The openolt adapter does not need flowIDs to be managed as it is managed on the OLT device
+ // The UNI IDs are dynamically generated by openonu adapter for every discovered UNI.
+ var flowIDDummyStart, flowIDDummyEnd uint32 = 1, 2
+ var uniIDDummyStart, uniIDDummyEnd uint32 = 0, 1
// init the resource range pool according to the sharing type
-
- logger.Debugf(ctx, "Resource range pool init for technology %s", ponRMgr.Technology)
- // first load from KV profiles
- status := ponRMgr.InitResourceRangesFromKVStore(ctx)
- if !status {
- logger.Debugf(ctx, "Failed to load resource ranges from KV store for tech %s", ponRMgr.Technology)
- }
-
- /*
- Then apply device specific information. If KV doesn't exist
- or is broader than the device, the device's information will
- dictate the range limits
- */
- logger.Debugw(ctx, "Using device info to init pon resource ranges", log.Fields{"Tech": ponRMgr.Technology})
-
- ONUIDStart := devInfo.OnuIdStart
- ONUIDEnd := devInfo.OnuIdEnd
- ONUIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF
- ONUIDSharedPoolID := uint32(0)
- AllocIDStart := devInfo.AllocIdStart
- AllocIDEnd := devInfo.AllocIdEnd
- AllocIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
- AllocIDSharedPoolID := uint32(0)
- GEMPortIDStart := devInfo.GemportIdStart
- GEMPortIDEnd := devInfo.GemportIdEnd
- GEMPortIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
- GEMPortIDSharedPoolID := uint32(0)
- FlowIDStart := devInfo.FlowIdStart
- FlowIDEnd := devInfo.FlowIdEnd
- FlowIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
- FlowIDSharedPoolID := uint32(0)
-
- var FirstIntfPoolID uint32
- var SharedPoolID uint32
-
- /*
- * As a zero check is made against SharedPoolID to check whether the resources are shared across all intfs
- * if resources are shared across interfaces then SharedPoolID is given a positive number.
- */
- for _, FirstIntfPoolID = range techRange.IntfIds {
- // skip the intf id 0
- if FirstIntfPoolID == 0 {
- continue
- }
- break
- }
-
+ logger.Debugw(ctx, "Device info init", log.Fields{"technology": techRange.Technology,
+ "onu_id_start": ONUIDStart, "onu_id_end": ONUIDEnd,
+ "alloc_id_start": AllocIDStart, "alloc_id_end": AllocIDEnd,
+ "gemport_id_start": GEMPortIDStart, "gemport_id_end": GEMPortIDEnd,
+ "intf_ids": techRange.IntfIds,
+ })
for _, RangePool := range techRange.Pools {
- if RangePool.Sharing == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
- SharedPoolID = FirstIntfPoolID
- } else if RangePool.Sharing == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_SAME_TECH {
- SharedPoolID = FirstIntfPoolID
- } else {
- SharedPoolID = 0
- }
+ // FIXME: Remove hardcoding
if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_ONU_ID {
ONUIDStart = RangePool.Start
ONUIDEnd = RangePool.End
- ONUIDShared = RangePool.Sharing
- ONUIDSharedPoolID = SharedPoolID
+ ONUIDShared = uint32(RangePool.Sharing)
} else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_ALLOC_ID {
AllocIDStart = RangePool.Start
AllocIDEnd = RangePool.End
- AllocIDShared = RangePool.Sharing
- AllocIDSharedPoolID = SharedPoolID
+ AllocIDShared = uint32(RangePool.Sharing)
} else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_GEMPORT_ID {
GEMPortIDStart = RangePool.Start
GEMPortIDEnd = RangePool.End
- GEMPortIDShared = RangePool.Sharing
- GEMPortIDSharedPoolID = SharedPoolID
- } else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_FLOW_ID {
- FlowIDStart = RangePool.Start
- FlowIDEnd = RangePool.End
- FlowIDShared = RangePool.Sharing
- FlowIDSharedPoolID = SharedPoolID
+ GEMPortIDShared = uint32(RangePool.Sharing)
}
}
- logger.Debugw(ctx, "Device info init", log.Fields{"technology": techRange.Technology,
- "onu_id_start": ONUIDStart, "onu_id_end": ONUIDEnd, "onu_id_shared_pool_id": ONUIDSharedPoolID,
- "alloc_id_start": AllocIDStart, "alloc_id_end": AllocIDEnd,
- "alloc_id_shared_pool_id": AllocIDSharedPoolID,
- "gemport_id_start": GEMPortIDStart, "gemport_id_end": GEMPortIDEnd,
- "gemport_id_shared_pool_id": GEMPortIDSharedPoolID,
- "flow_id_start": FlowIDStart,
- "flow_id_end_idx": FlowIDEnd,
- "flow_id_shared_pool_id": FlowIDSharedPoolID,
- "intf_ids": techRange.IntfIds,
- "uni_id_start": 0,
- "uni_id_end_idx": 1, /*MaxUNIIDperONU()*/
- })
-
- ponRMgr.InitDefaultPONResourceRanges(ctx, ONUIDStart, ONUIDEnd, ONUIDSharedPoolID,
- AllocIDStart, AllocIDEnd, AllocIDSharedPoolID,
- GEMPortIDStart, GEMPortIDEnd, GEMPortIDSharedPoolID,
- FlowIDStart, FlowIDEnd, FlowIDSharedPoolID, 0, 1,
+ ponRMgr.InitDefaultPONResourceRanges(ctx, ONUIDStart, ONUIDEnd, ONUIDShared,
+ AllocIDStart, AllocIDEnd, AllocIDShared,
+ GEMPortIDStart, GEMPortIDEnd, GEMPortIDShared,
+ flowIDDummyStart, flowIDDummyEnd, FlowIDShared, uniIDDummyStart, uniIDDummyEnd,
devInfo.PonPorts, techRange.IntfIds)
- // For global sharing, make sure to refresh both local and global resource manager instances' range
-
- if ONUIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
- globalPONRMgr.UpdateRanges(ctx, ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
- "", 0, nil)
- ponRMgr.UpdateRanges(ctx, ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
- "", 0, globalPONRMgr)
- }
- if AllocIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
- globalPONRMgr.UpdateRanges(ctx, ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
- "", 0, nil)
-
- ponRMgr.UpdateRanges(ctx, ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
- "", 0, globalPONRMgr)
- }
- if GEMPortIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
- globalPONRMgr.UpdateRanges(ctx, ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
- "", 0, nil)
- ponRMgr.UpdateRanges(ctx, ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
- "", 0, globalPONRMgr)
- }
- if FlowIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
- globalPONRMgr.UpdateRanges(ctx, ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
- "", 0, nil)
- ponRMgr.UpdateRanges(ctx, ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
- "", 0, globalPONRMgr)
- }
-
- // Make sure loaded range fits the platform bit encoding ranges
- ponRMgr.UpdateRanges(ctx, ponrmgr.UNI_ID_START_IDX, 0, ponrmgr.UNI_ID_END_IDX /* TODO =OpenOltPlatform.MAX_UNIS_PER_ONU-1*/, 1, "", 0, nil)
}
// Delete clears used resources for the particular olt device being deleted
-func (RsrcMgr *OpenOltResourceMgr) Delete(ctx context.Context) error {
- /* TODO
- def __del__(self):
- self.log.info("clearing-device-resource-pool")
- for key, resource_mgr in self.resource_mgrs.iteritems():
- resource_mgr.clear_device_resource_pool()
-
- def assert_pon_id_limit(self, pon_intf_id):
- assert pon_intf_id in self.resource_mgrs
-
- def assert_onu_id_limit(self, pon_intf_id, onu_id):
- self.assert_pon_id_limit(pon_intf_id)
- self.resource_mgrs[pon_intf_id].assert_resource_limits(onu_id, PONResourceManager.ONU_ID)
-
- @property
- def max_uni_id_per_onu(self):
- return 0 #OpenOltPlatform.MAX_UNIS_PER_ONU-1, zero-based indexing Uncomment or override to make default multi-uni
-
- def assert_uni_id_limit(self, pon_intf_id, onu_id, uni_id):
- self.assert_onu_id_limit(pon_intf_id, onu_id)
- self.resource_mgrs[pon_intf_id].assert_resource_limits(uni_id, PONResourceManager.UNI_ID)
- */
- for _, rsrcMgr := range RsrcMgr.ResourceMgrs {
- if err := rsrcMgr.ClearDeviceResourcePool(ctx); err != nil {
- logger.Debug(ctx, "Failed to clear device resource pool")
- return err
- }
+func (rsrcMgr *OpenOltResourceMgr) Delete(ctx context.Context, intfID uint32) error {
+ if err := rsrcMgr.PonRsrMgr.ClearDeviceResourcePoolForIntf(ctx, intfID); err != nil {
+ logger.Debug(ctx, "Failed to clear device resource pool")
+ return err
}
logger.Debug(ctx, "Cleared device resource pool")
return nil
}
-// GetONUID returns the available OnuID for the given pon-port
-func (RsrcMgr *OpenOltResourceMgr) GetONUID(ctx context.Context, ponIntfID uint32) (uint32, error) {
- // Check if Pon Interface ID is present in Resource-manager-map
- RsrcMgr.OnuIDMgmtLock[ponIntfID].Lock()
- defer RsrcMgr.OnuIDMgmtLock[ponIntfID].Unlock()
-
- if _, ok := RsrcMgr.ResourceMgrs[ponIntfID]; !ok {
- err := errors.New("invalid-pon-interface-" + strconv.Itoa(int(ponIntfID)))
- return 0, err
- }
+// GetONUID returns the available onuID for the given pon-port
+func (rsrcMgr *OpenOltResourceMgr) GetONUID(ctx context.Context, PonIntfID uint32) (uint32, error) {
// Get ONU id for a provided pon interface ID.
- onuID, err := RsrcMgr.ResourceMgrs[ponIntfID].TechProfileMgr.GetResourceID(ctx, ponIntfID,
+ onuID, err := rsrcMgr.PonRsrMgr.TechProfileMgr.GetResourceID(ctx, PonIntfID,
ponrmgr.ONU_ID, 1)
if err != nil {
logger.Errorf(ctx, "Failed to get resource for interface %d for type %s",
- ponIntfID, ponrmgr.ONU_ID)
+ PonIntfID, ponrmgr.ONU_ID)
return 0, err
}
- if onuID != nil {
- RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(ctx, fmt.Sprintf("%d,%d", ponIntfID, onuID[0]))
+ if len(onuID) > 0 {
+ rsrcMgr.PonRsrMgr.InitResourceMap(ctx, fmt.Sprintf("%d,%d", PonIntfID, onuID[0]))
return onuID[0], err
}
- return 0, err // return OnuID 0 on error
-}
-
-// GetFlowIDInfo returns the slice of flow info of the given pon-port
-// Note: For flows which trap from the NNI and not really associated with any particular
-// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint64) *FlowInfo {
- var flowInfo FlowInfo
-
- subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- Path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
- value, err := RsrcMgr.KVStore.Get(ctx, Path)
- if err == nil {
- if value != nil {
- Val, err := toByte(value.Value)
- if err != nil {
- logger.Errorw(ctx, "Failed to convert flowinfo into byte array", log.Fields{"error": err, "subs": subs})
- return nil
- }
- if err = json.Unmarshal(Val, &flowInfo); err != nil {
- logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"error": err, "subs": subs})
- return nil
- }
- }
- }
- if flowInfo.Flow == nil {
- logger.Debugw(ctx, "No flowInfo found in KV store", log.Fields{"subs": subs})
- return nil
- }
- return &flowInfo
+ return 0, err // return onuID 0 on error
}
// GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
// Note: For flows which trap from the NNI and not really associated with any particular
// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32) ([]uint64, error) {
+func (rsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, PonIntfID uint32, onuID int32, uniID int32) ([]uint64, error) {
- subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+ subs := fmt.Sprintf("%d,%d,%d", PonIntfID, onuID, uniID)
path := fmt.Sprintf(FlowIDPath, subs)
+ // fetch from cache
+ rsrcMgr.flowIDsForOnuLock.RLock()
+ flowIDsForOnu, ok := rsrcMgr.flowIDsForOnu[path]
+ rsrcMgr.flowIDsForOnuLock.RUnlock()
+
+ if ok {
+ return flowIDsForOnu, nil
+ }
+
var data []uint64
- value, err := RsrcMgr.KVStore.Get(ctx, path)
+ value, err := rsrcMgr.KVStore.Get(ctx, path)
if err == nil {
if value != nil {
Val, _ := toByte(value.Value)
@@ -515,339 +356,126 @@
}
}
}
+ // update cache
+ rsrcMgr.flowIDsForOnuLock.Lock()
+ rsrcMgr.flowIDsForOnu[path] = data
+ rsrcMgr.flowIDsForOnuLock.Unlock()
+
return data, nil
}
-// UpdateFlowIDInfo updates flow info for the given pon interface, onu id, and uni id
-// Note: For flows which trap from the NNI and not really associated with any particular
-// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
- flowID uint64, flowData FlowInfo) error {
-
- subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
-
- var value []byte
- var err error
- value, err = json.Marshal(flowData)
- if err != nil {
- logger.Errorf(ctx, "failed to Marshal, resource path %s", path)
- return err
- }
-
- if err = RsrcMgr.KVStore.Put(ctx, path, value); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", path)
- }
-
- // Update the flowID list for the ONU
- if err = RsrcMgr.UpdateFlowIDForOnu(ctx, ponIntfID, onuID, uniID, flowID, true); err != nil {
- // If the operation fails, try to remove FlowInfo from the KV store
- _ = RsrcMgr.KVStore.Delete(ctx, path)
- return err
- }
- return err
-}
-
-// UpdateFlowIDForOnu updates the flow_id list of the ONU (add or remove flow_id from the list)
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDForOnu(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint64, add bool) error {
- /*
- Update the flow_id list of the ONU (add or remove flow_id from the list)
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :param flow_id: flow ID
- :param add: Boolean flag to indicate whether the flow_id should be
- added or removed from the list. Defaults to adding the flow.
- */
- var Value []byte
- var err error
- var retVal bool
- var idx uint64
- subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- path := fmt.Sprintf(FlowIDPath, subs)
- flowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, ponIntfID, onuID, uniID)
- if err != nil {
- // Error logged in the called function
- return err
- }
-
- if add {
- if retVal, _ = checkForFlowIDInList(flowIDs, flowID); retVal {
- return nil
- }
- flowIDs = append(flowIDs, flowID)
- } else {
- if retVal, idx = checkForFlowIDInList(flowIDs, flowID); !retVal {
- return nil
- }
- // delete the index and shift
- flowIDs = append(flowIDs[:idx], flowIDs[idx+1:]...)
- }
- Value, err = json.Marshal(flowIDs)
- if err != nil {
- logger.Error(ctx, "Failed to Marshal")
- return err
- }
-
- if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", path)
- return err
- }
- return err
-}
-
-// RemoveFlowIDInfo remove flow info for the given pon interface, onu id, and uni id
-// Note: For flows which trap from the NNI and not really associated with any particular
-// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) RemoveFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
- flowID uint64) error {
-
- subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
-
- var err error
- if err = RsrcMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorf(ctx, "Failed to delete resource %s", path)
- return err
- }
-
- // Update the flowID list for the ONU
- err = RsrcMgr.UpdateFlowIDForOnu(ctx, ponIntfID, onuID, uniID, flowID, false)
-
- return err
-}
-
-// RemoveAllFlowsForIntfOnuUniKey removes flow info for the given interface, onu id, and uni id
-func (RsrcMgr *OpenOltResourceMgr) RemoveAllFlowsForIntfOnuUniKey(ctx context.Context, intf uint32, onuID int32, uniID int32) error {
- flowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, intf, onuID, uniID)
- if err != nil {
- // error logged in the called function
- return err
- }
- for _, flID := range flowIDs {
- if err := RsrcMgr.RemoveFlowIDInfo(ctx, intf, onuID, uniID, flID); err != nil {
- logger.Errorw(ctx, "failed-to-delete-flow-id-info", log.Fields{"intf": intf, "onuID": onuID, "uniID": uniID, "flowID": flID})
- }
- }
- subs := fmt.Sprintf("%d,%d,%d", intf, onuID, uniID)
- path := fmt.Sprintf(FlowIDPath, subs)
- if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorf(ctx, "Failed to delete resource %s", path)
- return err
- }
- return nil
-}
-
-// GetAllocID return the first Alloc ID for a given pon interface id and onu id and then update the resource map on
-// the KV store with the list of alloc_ids allocated for the pon_intf_onu_id tuple
-// Currently of all the alloc_ids available, it returns the first alloc_id in the list for tha given ONU
-func (RsrcMgr *OpenOltResourceMgr) GetAllocID(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) uint32 {
-
- var err error
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
-
- RsrcMgr.AllocIDMgmtLock[intfID].Lock()
- defer RsrcMgr.AllocIDMgmtLock[intfID].Unlock()
-
- AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
- if AllocID != nil {
- // Since we support only one alloc_id for the ONU at the moment,
- // return the first alloc_id in the list, if available, for that
- // ONU.
- logger.Debugw(ctx, "Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
- return AllocID[0]
- }
- AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(ctx, intfID,
- ponrmgr.ALLOC_ID, 1)
-
- if AllocID == nil || err != nil {
- logger.Error(ctx, "Failed to allocate alloc id")
- return 0
- }
- // update the resource map on KV store with the list of alloc_id
- // allocated for the pon_intf_onu_id tuple
- err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(ctx, IntfOnuIDUniID, AllocID)
- if err != nil {
- logger.Error(ctx, "Failed to update Alloc ID")
- return 0
- }
- logger.Debugw(ctx, "Allocated new Tcont from pon resource mgr", log.Fields{"AllocID": AllocID})
- return AllocID[0]
-}
-
// UpdateAllocIdsForOnu updates alloc ids in kv store for a given pon interface id, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) UpdateAllocIdsForOnu(ctx context.Context, ponPort uint32, onuID uint32, uniID uint32, allocID []uint32) error {
+func (rsrcMgr *OpenOltResourceMgr) UpdateAllocIdsForOnu(ctx context.Context, ponPort uint32, onuID uint32, uniID uint32, allocIDs []uint32) error {
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
- return RsrcMgr.ResourceMgrs[ponPort].UpdateAllocIdsForOnu(ctx, IntfOnuIDUniID,
- allocID)
+ intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
+ // update cache
+ rsrcMgr.allocIDsForOnuLock.Lock()
+ rsrcMgr.allocIDsForOnu[intfOnuIDuniID] = allocIDs
+ rsrcMgr.allocIDsForOnuLock.Unlock()
+
+ // Note: in case the write to DB fails there could be inconsistent data between cache and db.
+ // Although this is highly unlikely with DB retries in place, this is something we have to deal with in the next release
+ return rsrcMgr.PonRsrMgr.UpdateAllocIdsForOnu(ctx, intfOnuIDuniID,
+ allocIDs)
}
// GetCurrentGEMPortIDsForOnu returns gem ports for given pon interface , onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentGEMPortIDsForOnu(ctx context.Context, intfID uint32, onuID uint32,
+func (rsrcMgr *OpenOltResourceMgr) GetCurrentGEMPortIDsForOnu(ctx context.Context, intfID uint32, onuID uint32,
uniID uint32) []uint32 {
- /* Get gem ports for given pon interface , onu id and uni id. */
+ intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- return RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
+ // fetch from cache
+ rsrcMgr.gemPortIDsForOnuLock.RLock()
+ gemIDs, ok := rsrcMgr.gemPortIDsForOnu[intfOnuIDuniID]
+ rsrcMgr.gemPortIDsForOnuLock.RUnlock()
+ if ok {
+ return gemIDs
+ }
+ /* Get gem ports for given pon interface , onu id and uni id. */
+ gemIDs = rsrcMgr.PonRsrMgr.GetCurrentGEMPortIDsForOnu(ctx, intfOnuIDuniID)
+
+ // update cache
+ rsrcMgr.gemPortIDsForOnuLock.Lock()
+ rsrcMgr.gemPortIDsForOnu[intfOnuIDuniID] = gemIDs
+ rsrcMgr.gemPortIDsForOnuLock.Unlock()
+
+ return gemIDs
}
// GetCurrentAllocIDsForOnu returns alloc ids for given pon interface and onu id
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentAllocIDsForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) []uint32 {
+func (rsrcMgr *OpenOltResourceMgr) GetCurrentAllocIDsForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) []uint32 {
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
- if AllocID != nil {
- return AllocID
+ intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
+ // fetch from cache
+ rsrcMgr.allocIDsForOnuLock.RLock()
+ allocIDs, ok := rsrcMgr.allocIDsForOnu[intfOnuIDuniID]
+ rsrcMgr.allocIDsForOnuLock.RUnlock()
+ if ok {
+ return allocIDs
}
- return []uint32{}
+ allocIDs = rsrcMgr.PonRsrMgr.GetCurrentAllocIDForOnu(ctx, intfOnuIDuniID)
+
+ // update cache
+ rsrcMgr.allocIDsForOnuLock.Lock()
+ rsrcMgr.allocIDsForOnu[intfOnuIDuniID] = allocIDs
+ rsrcMgr.allocIDsForOnuLock.Unlock()
+
+ return allocIDs
}
// RemoveAllocIDForOnu removes the alloc id for given pon interface, onu id, uni id and alloc id
-func (RsrcMgr *OpenOltResourceMgr) RemoveAllocIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, allocID uint32) {
- allocIDs := RsrcMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
+func (rsrcMgr *OpenOltResourceMgr) RemoveAllocIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, allocID uint32) {
+ allocIDs := rsrcMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
for i := 0; i < len(allocIDs); i++ {
if allocIDs[i] == allocID {
allocIDs = append(allocIDs[:i], allocIDs[i+1:]...)
break
}
}
- err := RsrcMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocIDs)
+ err := rsrcMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocIDs)
if err != nil {
- logger.Errorf(ctx, "Failed to Remove Alloc Id For Onu. IntfID %d onuID %d uniID %d allocID %d",
+ logger.Errorf(ctx, "Failed to Remove Alloc Id For Onu. intfID %d onuID %d uniID %d allocID %d",
intfID, onuID, uniID, allocID)
}
}
// RemoveGemPortIDForOnu removes the gem port id for given pon interface, onu id, uni id and gem port id
-func (RsrcMgr *OpenOltResourceMgr) RemoveGemPortIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortID uint32) {
- gemPortIDs := RsrcMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
+func (rsrcMgr *OpenOltResourceMgr) RemoveGemPortIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortID uint32) {
+ gemPortIDs := rsrcMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
for i := 0; i < len(gemPortIDs); i++ {
if gemPortIDs[i] == gemPortID {
gemPortIDs = append(gemPortIDs[:i], gemPortIDs[i+1:]...)
break
}
}
- err := RsrcMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs)
+ err := rsrcMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs)
if err != nil {
- logger.Errorf(ctx, "Failed to Remove Gem Id For Onu. IntfID %d onuID %d uniID %d gemPortId %d",
+ logger.Errorf(ctx, "Failed to Remove Gem Id For Onu. intfID %d onuID %d uniID %d gemPortId %d",
intfID, onuID, uniID, gemPortID)
}
}
-//GetUniPortByPonPortGemPortFromKVStore retrieves onu and uni ID associated with the pon and gem ports.
-func (RsrcMgr *OpenOltResourceMgr) GetUniPortByPonPortGemPortFromKVStore(ctx context.Context, PonPort uint32, GemPort uint32) (uint32, uint32, error) {
- IntfGEMPortPath := fmt.Sprintf("%d,%d", PonPort, GemPort)
- logger.Debugf(ctx, "Getting ONU and UNI IDs from the path %s", IntfGEMPortPath)
- var Data []uint32
- Value, err := RsrcMgr.KVStore.Get(ctx, IntfGEMPortPath)
- if err == nil {
- if Value != nil {
- Val, _ := ponrmgr.ToByte(Value.Value)
- if err = json.Unmarshal(Val, &Data); err != nil {
- logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"error": err})
- return 0, 0, errors.New("failed to unmarshal the data retrieved")
- }
- }
- } else {
- logger.Errorf(ctx, "Failed to get data from kvstore for %s", IntfGEMPortPath, err)
- return 0, 0, errors.New("could not get data")
- }
- if len(Data) < 2 {
- return 0, 0, errors.New("invalid data format")
- }
- return Data[0], Data[1], nil
-}
-
-// UpdateGEMportsPonportToOnuMapOnKVStore updates onu and uni id associated with the gem port to the kv store
-// This stored information is used when packet_indication is received and we need to derive the ONU Id for which
-// the packet arrived based on the pon_intf and gemport available in the packet_indication
-func (RsrcMgr *OpenOltResourceMgr) UpdateGEMportsPonportToOnuMapOnKVStore(ctx context.Context, gemPorts []uint32, PonPort uint32,
- onuID uint32, uniID uint32) error {
-
- /* Update onu and uni id associated with the gem port to the kv store. */
- var IntfGEMPortPath string
- Data := []uint32{onuID, uniID}
- for _, GEM := range gemPorts {
- IntfGEMPortPath = fmt.Sprintf("%d,%d", PonPort, GEM)
- Val, err := json.Marshal(Data)
- if err != nil {
- logger.Error(ctx, "failed to Marshal")
- return err
- }
-
- if err = RsrcMgr.KVStore.Put(ctx, IntfGEMPortPath, Val); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", IntfGEMPortPath)
- return err
- }
- }
- return nil
-}
-
-// RemoveGEMportPonportToOnuMapOnKVStore removes the relationship between the gem port and pon port
-func (RsrcMgr *OpenOltResourceMgr) RemoveGEMportPonportToOnuMapOnKVStore(ctx context.Context, GemPort uint32, PonPort uint32) {
- IntfGEMPortPath := fmt.Sprintf("%d,%d", PonPort, GemPort)
- err := RsrcMgr.KVStore.Delete(ctx, IntfGEMPortPath)
- if err != nil {
- logger.Errorf(ctx, "Failed to Remove Gem port-Pon port to onu map on kv store. Gem %d PonPort %d", GemPort, PonPort)
- }
-}
-
-// GetGEMPortID gets gem port id for a particular pon port, onu id and uni id and then update the resource map on
-// the KV store with the list of gemport_id allocated for the pon_intf_onu_id tuple
-func (RsrcMgr *OpenOltResourceMgr) GetGEMPortID(ctx context.Context, ponPort uint32, onuID uint32,
- uniID uint32, NumOfPorts uint32) ([]uint32, error) {
-
- /* Get gem port id for a particular pon port, onu id
- and uni id.
- */
-
- var err error
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
-
- RsrcMgr.GemPortIDMgmtLock[ponPort].Lock()
- defer RsrcMgr.GemPortIDMgmtLock[ponPort].Unlock()
-
- GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
- if GEMPortList != nil {
- return GEMPortList, nil
- }
-
- GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ctx, ponPort,
- ponrmgr.GEMPORT_ID, NumOfPorts)
- if err != nil && GEMPortList == nil {
- logger.Errorf(ctx, "Failed to get gem port id for %s", IntfOnuIDUniID)
- return nil, err
- }
-
- // update the resource map on KV store with the list of gemport_id
- // allocated for the pon_intf_onu_id tuple
- err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(ctx, IntfOnuIDUniID,
- GEMPortList)
- if err != nil {
- logger.Errorf(ctx, "Failed to update GEM ports to kv store for %s", IntfOnuIDUniID)
- return nil, err
- }
- _ = RsrcMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, GEMPortList, ponPort,
- onuID, uniID)
- return GEMPortList, err
-}
-
// UpdateGEMPortIDsForOnu updates gemport ids on to the kv store for a given pon port, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) UpdateGEMPortIDsForOnu(ctx context.Context, ponPort uint32, onuID uint32,
- uniID uint32, GEMPortList []uint32) error {
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
- return RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(ctx, IntfOnuIDUniID,
- GEMPortList)
+func (rsrcMgr *OpenOltResourceMgr) UpdateGEMPortIDsForOnu(ctx context.Context, ponPort uint32, onuID uint32,
+ uniID uint32, gemIDs []uint32) error {
+ intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
+ // update cache
+ rsrcMgr.gemPortIDsForOnuLock.Lock()
+ rsrcMgr.gemPortIDsForOnu[intfOnuIDuniID] = gemIDs
+ rsrcMgr.gemPortIDsForOnuLock.Unlock()
+
+ // Note: in case the write to DB fails there could be inconsistent data between cache and db.
+ // Although this is highly unlikely with DB retries in place, this is something we have to deal with in the next release
+ return rsrcMgr.PonRsrMgr.UpdateGEMPortIDsForOnu(ctx, intfOnuIDuniID,
+ gemIDs)
}
// FreeonuID releases(make free) onu id for a particular pon-port
-func (RsrcMgr *OpenOltResourceMgr) FreeonuID(ctx context.Context, intfID uint32, onuID []uint32) {
+func (rsrcMgr *OpenOltResourceMgr) FreeonuID(ctx context.Context, intfID uint32, onuID []uint32) {
- RsrcMgr.OnuIDMgmtLock[intfID].Lock()
- defer RsrcMgr.OnuIDMgmtLock[intfID].Unlock()
-
- if err := RsrcMgr.ResourceMgrs[intfID].TechProfileMgr.FreeResourceID(ctx, intfID, ponrmgr.ONU_ID, onuID); err != nil {
+ if err := rsrcMgr.PonRsrMgr.TechProfileMgr.FreeResourceID(ctx, intfID, ponrmgr.ONU_ID, onuID); err != nil {
logger.Errorw(ctx, "error-while-freeing-onu-id", log.Fields{
"intf-id": intfID,
"onu-id": onuID,
@@ -859,23 +487,23 @@
var IntfonuID string
for _, onu := range onuID {
IntfonuID = fmt.Sprintf("%d,%d", intfID, onu)
- RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfonuID)
+ rsrcMgr.PonRsrMgr.RemoveResourceMap(ctx, IntfonuID)
}
}
// FreeAllocID frees AllocID on the PON resource pool and also frees the allocID association
// for the given OLT device.
-func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(ctx context.Context, IntfID uint32, onuID uint32,
+// The caller should ensure that this is a blocking call and this operation is serialized for
+// the ONU so as not cause resource corruption since there are no mutexes used here.
+func (rsrcMgr *OpenOltResourceMgr) FreeAllocID(ctx context.Context, intfID uint32, onuID uint32,
uniID uint32, allocID uint32) {
- RsrcMgr.AllocIDMgmtLock[IntfID].Lock()
- defer RsrcMgr.AllocIDMgmtLock[IntfID].Unlock()
- RsrcMgr.RemoveAllocIDForOnu(ctx, IntfID, onuID, uniID, allocID)
+ rsrcMgr.RemoveAllocIDForOnu(ctx, intfID, onuID, uniID, allocID)
allocIDs := make([]uint32, 0)
allocIDs = append(allocIDs, allocID)
- if err := RsrcMgr.ResourceMgrs[IntfID].TechProfileMgr.FreeResourceID(ctx, IntfID, ponrmgr.ALLOC_ID, allocIDs); err != nil {
+ if err := rsrcMgr.PonRsrMgr.TechProfileMgr.FreeResourceID(ctx, intfID, ponrmgr.ALLOC_ID, allocIDs); err != nil {
logger.Errorw(ctx, "error-while-freeing-alloc-id", log.Fields{
- "intf-id": IntfID,
+ "intf-id": intfID,
"onu-id": onuID,
"err": err.Error(),
})
@@ -884,33 +512,35 @@
// FreeGemPortID frees GemPortID on the PON resource pool and also frees the gemPortID association
// for the given OLT device.
-func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(ctx context.Context, IntfID uint32, onuID uint32,
+// The caller should ensure that this is a blocking call and this operation is serialized for
+// the ONU so as not cause resource corruption since there are no mutexes used here.
+func (rsrcMgr *OpenOltResourceMgr) FreeGemPortID(ctx context.Context, intfID uint32, onuID uint32,
uniID uint32, gemPortID uint32) {
- RsrcMgr.GemPortIDMgmtLock[IntfID].Lock()
- defer RsrcMgr.GemPortIDMgmtLock[IntfID].Unlock()
+ rsrcMgr.RemoveGemPortIDForOnu(ctx, intfID, onuID, uniID, gemPortID)
- RsrcMgr.RemoveGemPortIDForOnu(ctx, IntfID, onuID, uniID, gemPortID)
gemPortIDs := make([]uint32, 0)
gemPortIDs = append(gemPortIDs, gemPortID)
- if err := RsrcMgr.ResourceMgrs[IntfID].TechProfileMgr.FreeResourceID(ctx, IntfID, ponrmgr.GEMPORT_ID, gemPortIDs); err != nil {
+ if err := rsrcMgr.PonRsrMgr.TechProfileMgr.FreeResourceID(ctx, intfID, ponrmgr.GEMPORT_ID, gemPortIDs); err != nil {
logger.Errorw(ctx, "error-while-freeing-gem-port-id", log.Fields{
- "intf-id": IntfID,
+ "intf-id": intfID,
"onu-id": onuID,
"err": err.Error(),
})
}
}
-// FreePONResourcesForONU make the pon resources free for a given pon interface and onu id, and the clears the
-// resource map and the onuID associated with (pon_intf_id, gemport_id) tuple,
-func (RsrcMgr *OpenOltResourceMgr) FreePONResourcesForONU(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
+// FreePONResourcesForONU make the pon resources free for a given pon interface and onu id
+func (rsrcMgr *OpenOltResourceMgr) FreePONResourcesForONU(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
+ intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- RsrcMgr.AllocIDMgmtLock[intfID].Lock()
- AllocIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
+ AllocIDs := rsrcMgr.PonRsrMgr.GetCurrentAllocIDForOnu(ctx, intfOnuIDuniID)
- if err := RsrcMgr.ResourceMgrs[intfID].TechProfileMgr.FreeResourceID(ctx, intfID,
+ rsrcMgr.allocIDsForOnuLock.Lock()
+ delete(rsrcMgr.allocIDsForOnu, intfOnuIDuniID)
+ rsrcMgr.allocIDsForOnuLock.Unlock()
+
+ if err := rsrcMgr.PonRsrMgr.TechProfileMgr.FreeResourceID(ctx, intfID,
ponrmgr.ALLOC_ID,
AllocIDs); err != nil {
logger.Errorw(ctx, "error-while-freeing-all-alloc-ids-for-onu", log.Fields{
@@ -919,11 +549,14 @@
"err": err.Error(),
})
}
- RsrcMgr.AllocIDMgmtLock[intfID].Unlock()
- RsrcMgr.GemPortIDMgmtLock[intfID].Lock()
- GEMPortIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
- if err := RsrcMgr.ResourceMgrs[intfID].TechProfileMgr.FreeResourceID(ctx, intfID,
+ GEMPortIDs := rsrcMgr.PonRsrMgr.GetCurrentGEMPortIDsForOnu(ctx, intfOnuIDuniID)
+
+ rsrcMgr.gemPortIDsForOnuLock.Lock()
+ delete(rsrcMgr.gemPortIDsForOnu, intfOnuIDuniID)
+ rsrcMgr.gemPortIDsForOnuLock.Unlock()
+
+ if err := rsrcMgr.PonRsrMgr.TechProfileMgr.FreeResourceID(ctx, intfID,
ponrmgr.GEMPORT_ID,
GEMPortIDs); err != nil {
logger.Errorw(ctx, "error-while-freeing-all-gem-port-ids-for-onu", log.Fields{
@@ -932,28 +565,23 @@
"err": err.Error(),
})
}
- RsrcMgr.GemPortIDMgmtLock[intfID].Unlock()
// Clear resource map associated with (pon_intf_id, gemport_id) tuple.
- RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
- // Clear the ONU Id associated with the (pon_intf_id, gemport_id) tuple.
- for _, GEM := range GEMPortIDs {
- _ = RsrcMgr.KVStore.Delete(ctx, fmt.Sprintf("%d,%d", intfID, GEM))
- }
+ rsrcMgr.PonRsrMgr.RemoveResourceMap(ctx, intfOnuIDuniID)
}
// IsFlowOnKvStore checks if the given flowID is present on the kv store
// Returns true if the flowID is found, otherwise it returns false
-func (RsrcMgr *OpenOltResourceMgr) IsFlowOnKvStore(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
+func (rsrcMgr *OpenOltResourceMgr) IsFlowOnKvStore(ctx context.Context, intfID uint32, onuID int32, uniID int32,
flowID uint64) bool {
- FlowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, ponIntfID, onuID, uniID)
+ FlowIDs, err := rsrcMgr.GetCurrentFlowIDsForOnu(ctx, intfID, onuID, uniID)
if err != nil {
// error logged in the called function
return false
}
if FlowIDs != nil {
- logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "onuID": onuID, "uniID": uniID})
+ logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
for _, id := range FlowIDs {
if flowID == id {
return true
@@ -964,89 +592,117 @@
}
// GetTechProfileIDForOnu fetches Tech-Profile-ID from the KV-Store for the given onu based on the path
-// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) GetTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32) []uint32 {
- Path := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
- var Data []uint32
- Value, err := RsrcMgr.KVStore.Get(ctx, Path)
+// This path is formed as the following: {intfID, onuID, uniID}/tp_id
+func (rsrcMgr *OpenOltResourceMgr) GetTechProfileIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) []uint32 {
+ Path := fmt.Sprintf(tpIDPathSuffix, intfID, onuID, uniID)
+ // fetch from cache
+ rsrcMgr.techProfileIDsForOnuLock.RLock()
+ tpIDs, ok := rsrcMgr.techProfileIDsForOnu[Path]
+ rsrcMgr.techProfileIDsForOnuLock.RUnlock()
+ if ok {
+ return tpIDs
+ }
+ Value, err := rsrcMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
Val, err := kvstore.ToByte(Value.Value)
if err != nil {
- logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"error": err})
- return Data
+ logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"err": err})
+ return tpIDs
}
- if err = json.Unmarshal(Val, &Data); err != nil {
- logger.Error(ctx, "Failed to unmarshal", log.Fields{"error": err})
- return Data
+ if err = json.Unmarshal(Val, &tpIDs); err != nil {
+ logger.Error(ctx, "Failed to unmarshal", log.Fields{"err": err})
+ return tpIDs
}
}
} else {
logger.Errorf(ctx, "Failed to get TP id from kvstore for path %s", Path)
}
- logger.Debugf(ctx, "Getting TP id %d from path %s", Data, Path)
- return Data
+ logger.Debugf(ctx, "Getting TP id %d from path %s", tpIDs, Path)
+
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ rsrcMgr.techProfileIDsForOnu[Path] = tpIDs
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
+
+ return tpIDs
}
// RemoveTechProfileIDsForOnu deletes all tech profile ids from the KV-Store for the given onu based on the path
-// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDsForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32) error {
- IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
- if err := RsrcMgr.KVStore.Delete(ctx, IntfOnuUniID); err != nil {
- logger.Errorw(ctx, "Failed to delete techprofile id resource in KV store", log.Fields{"path": IntfOnuUniID})
+// This path is formed as the following: {intfID, onuID, uniID}/tp_id
+func (rsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDsForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) error {
+ intfOnuUniID := fmt.Sprintf(tpIDPathSuffix, intfID, onuID, uniID)
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ delete(rsrcMgr.techProfileIDsForOnu, intfOnuUniID)
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
+
+ if err := rsrcMgr.KVStore.Delete(ctx, intfOnuUniID); err != nil {
+ logger.Errorw(ctx, "Failed to delete techprofile id resource in KV store", log.Fields{"path": intfOnuUniID})
return err
}
return nil
}
// RemoveTechProfileIDForOnu deletes a specific tech profile id from the KV-Store for the given onu based on the path
-// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32, TpID uint32) error {
- tpIDList := RsrcMgr.GetTechProfileIDForOnu(ctx, IntfID, OnuID, UniID)
+// This path is formed as the following: {intfID, onuID, uniID}/tp_id
+func (rsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, tpID uint32) error {
+ tpIDList := rsrcMgr.GetTechProfileIDForOnu(ctx, intfID, onuID, uniID)
for i, tpIDInList := range tpIDList {
- if tpIDInList == TpID {
+ if tpIDInList == tpID {
tpIDList = append(tpIDList[:i], tpIDList[i+1:]...)
}
}
- IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
+ intfOnuUniID := fmt.Sprintf(tpIDPathSuffix, intfID, onuID, uniID)
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ rsrcMgr.techProfileIDsForOnu[intfOnuUniID] = tpIDList
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
+
Value, err := json.Marshal(tpIDList)
if err != nil {
logger.Error(ctx, "failed to Marshal")
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", IntfOnuUniID)
+ if err = rsrcMgr.KVStore.Put(ctx, intfOnuUniID, Value); err != nil {
+ logger.Errorf(ctx, "Failed to update resource %s", intfOnuUniID)
return err
}
return err
}
// UpdateTechProfileIDForOnu updates (put) already present tech-profile-id for the given onu based on the path
-// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) UpdateTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32,
- UniID uint32, TpID uint32) error {
+// This path is formed as the following: {intfID, onuID, uniID}/tp_id
+func (rsrcMgr *OpenOltResourceMgr) UpdateTechProfileIDForOnu(ctx context.Context, intfID uint32, onuID uint32,
+ uniID uint32, tpID uint32) error {
var Value []byte
var err error
- IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
+ intfOnuUniID := fmt.Sprintf(tpIDPathSuffix, intfID, onuID, uniID)
- tpIDList := RsrcMgr.GetTechProfileIDForOnu(ctx, IntfID, OnuID, UniID)
+ tpIDList := rsrcMgr.GetTechProfileIDForOnu(ctx, intfID, onuID, uniID)
for _, value := range tpIDList {
- if value == TpID {
- logger.Debugf(ctx, "TpID %d is already in tpIdList for the path %s", TpID, IntfOnuUniID)
+ if value == tpID {
+ logger.Debugf(ctx, "tpID %d is already in tpIdList for the path %s", tpID, intfOnuUniID)
return err
}
}
- logger.Debugf(ctx, "updating tp id %d on path %s", TpID, IntfOnuUniID)
- tpIDList = append(tpIDList, TpID)
+ logger.Debugf(ctx, "updating tp id %d on path %s", tpID, intfOnuUniID)
+ tpIDList = append(tpIDList, tpID)
+
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ rsrcMgr.techProfileIDsForOnu[intfOnuUniID] = tpIDList
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
+
Value, err = json.Marshal(tpIDList)
if err != nil {
logger.Error(ctx, "failed to Marshal")
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", IntfOnuUniID)
+ if err = rsrcMgr.KVStore.Put(ctx, intfOnuUniID, Value); err != nil {
+ logger.Errorf(ctx, "Failed to update resource %s", intfOnuUniID)
return err
}
return err
@@ -1054,41 +710,56 @@
// StoreMeterInfoForOnu updates the meter id in the KV-Store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) StoreMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
- UniID uint32, TpID uint32, meterInfo *MeterInfo) error {
+func (rsrcMgr *OpenOltResourceMgr) StoreMeterInfoForOnu(ctx context.Context, Direction string, intfID uint32, onuID uint32,
+ uniID uint32, tpID uint32, meterInfo *MeterInfo) error {
var Value []byte
var err error
- IntfOnuUniID := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
+ intfOnuUniID := fmt.Sprintf(MeterIDPathSuffix, intfID, onuID, uniID, tpID, Direction)
+
+ // update cache
+ rsrcMgr.meterInfoForOnuLock.Lock()
+ rsrcMgr.meterInfoForOnu[intfOnuUniID] = meterInfo
+ rsrcMgr.meterInfoForOnuLock.Unlock()
+
Value, err = json.Marshal(*meterInfo)
if err != nil {
logger.Error(ctx, "failed to Marshal meter config")
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
- logger.Errorf(ctx, "Failed to store meter into KV store %s", IntfOnuUniID)
+ if err = rsrcMgr.KVStore.Put(ctx, intfOnuUniID, Value); err != nil {
+ logger.Errorf(ctx, "Failed to store meter into KV store %s", intfOnuUniID)
return err
}
- logger.Debugw(ctx, "meter info updated successfully", log.Fields{"path": IntfOnuUniID, "meter-info": meterInfo})
+ logger.Debugw(ctx, "meter info updated successfully", log.Fields{"path": intfOnuUniID, "meter-info": meterInfo})
return err
}
// GetMeterInfoForOnu fetches the meter id from the kv store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) GetMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
- UniID uint32, TpID uint32) (*MeterInfo, error) {
- Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
+func (rsrcMgr *OpenOltResourceMgr) GetMeterInfoForOnu(ctx context.Context, Direction string, intfID uint32, onuID uint32,
+ uniID uint32, tpID uint32) (*MeterInfo, error) {
+ Path := fmt.Sprintf(MeterIDPathSuffix, intfID, onuID, uniID, tpID, Direction)
+
+ // get from cache
+ rsrcMgr.meterInfoForOnuLock.RLock()
+ val, ok := rsrcMgr.meterInfoForOnu[Path]
+ rsrcMgr.meterInfoForOnuLock.RUnlock()
+ if ok {
+ return val, nil
+ }
+
var meterInfo MeterInfo
- Value, err := RsrcMgr.KVStore.Get(ctx, Path)
+ Value, err := rsrcMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
logger.Debug(ctx, "Found meter info in KV store", log.Fields{"Direction": Direction})
Val, er := kvstore.ToByte(Value.Value)
if er != nil {
- logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"error": er})
+ logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"err": er})
return nil, er
}
if er = json.Unmarshal(Val, &meterInfo); er != nil {
- logger.Error(ctx, "Failed to unmarshal meter info", log.Fields{"error": er})
+ logger.Error(ctx, "Failed to unmarshal meter info", log.Fields{"err": er})
return nil, er
}
} else {
@@ -1099,25 +770,30 @@
logger.Errorf(ctx, "Failed to get Meter config from kvstore for path %s", Path)
}
+ // update cache
+ rsrcMgr.meterInfoForOnuLock.Lock()
+ rsrcMgr.meterInfoForOnu[Path] = &meterInfo
+ rsrcMgr.meterInfoForOnuLock.Unlock()
+
return &meterInfo, err
}
// HandleMeterInfoRefCntUpdate increments or decrements the reference counter for a given meter.
// When reference count becomes 0, it clears the meter information from the kv store
-func (RsrcMgr *OpenOltResourceMgr) HandleMeterInfoRefCntUpdate(ctx context.Context, Direction string,
- IntfID uint32, OnuID uint32, UniID uint32, TpID uint32, increment bool) error {
- meterInfo, err := RsrcMgr.GetMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID)
+func (rsrcMgr *OpenOltResourceMgr) HandleMeterInfoRefCntUpdate(ctx context.Context, Direction string,
+ intfID uint32, onuID uint32, uniID uint32, tpID uint32, increment bool) error {
+ meterInfo, err := rsrcMgr.GetMeterInfoForOnu(ctx, Direction, intfID, onuID, uniID, tpID)
if err != nil {
return err
} else if meterInfo == nil {
// If we are increasing the reference count, we expect the meter information to be present on KV store.
// But if decrementing the reference count, the meter is possibly already cleared from KV store. Just log warn but do not return error.
if increment {
- logger.Errorf(ctx, "error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
- return fmt.Errorf("error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
+ logger.Errorf(ctx, "error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", intfID, onuID, uniID, tpID, Direction)
+ return fmt.Errorf("error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", intfID, onuID, uniID, tpID, Direction)
}
logger.Warnw(ctx, "meter is already cleared",
- log.Fields{"intfID": IntfID, "onuID": OnuID, "uniID": UniID, "direction": Direction, "increment": increment})
+ log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID, "direction": Direction, "increment": increment})
return nil
}
@@ -1127,13 +803,13 @@
meterInfo.RefCnt--
// If RefCnt become 0 clear the meter information from the DB.
if meterInfo.RefCnt == 0 {
- if err := RsrcMgr.RemoveMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID); err != nil {
+ if err := rsrcMgr.RemoveMeterInfoForOnu(ctx, Direction, intfID, onuID, uniID, tpID); err != nil {
return err
}
return nil
}
}
- if err := RsrcMgr.StoreMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID, meterInfo); err != nil {
+ if err := rsrcMgr.StoreMeterInfoForOnu(ctx, Direction, intfID, onuID, uniID, tpID, meterInfo); err != nil {
return err
}
return nil
@@ -1141,44 +817,44 @@
// RemoveMeterInfoForOnu deletes the meter id from the kV-Store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) RemoveMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
- UniID uint32, TpID uint32) error {
- Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
- if err := RsrcMgr.KVStore.Delete(ctx, Path); err != nil {
+func (rsrcMgr *OpenOltResourceMgr) RemoveMeterInfoForOnu(ctx context.Context, Direction string, intfID uint32, onuID uint32,
+ uniID uint32, tpID uint32) error {
+ Path := fmt.Sprintf(MeterIDPathSuffix, intfID, onuID, uniID, tpID, Direction)
+
+ // update cache
+ rsrcMgr.meterInfoForOnuLock.Lock()
+ delete(rsrcMgr.meterInfoForOnu, Path)
+ rsrcMgr.meterInfoForOnuLock.Unlock()
+
+ if err := rsrcMgr.KVStore.Delete(ctx, Path); err != nil {
logger.Errorf(ctx, "Failed to delete meter id %s from kvstore ", Path)
return err
}
return nil
}
-//AddGemToOnuGemInfo adds gemport to onugem info kvstore
-func (RsrcMgr *OpenOltResourceMgr) AddGemToOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) error {
- var onuGemData []OnuGemInfo
- var err error
-
- if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
+//AddGemToOnuGemInfo adds gemport to onugem info kvstore and also local cache
+func (rsrcMgr *OpenOltResourceMgr) AddGemToOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) error {
+ onugem, err := rsrcMgr.GetOnuGemInfo(ctx, intfID, onuID)
+ if err != nil || onugem == nil || onugem.SerialNumber == "" {
logger.Errorf(ctx, "failed to get onuifo for intfid %d", intfID)
return err
}
- if len(onuGemData) == 0 {
- logger.Errorw(ctx, "failed to ger Onuid info ", log.Fields{"intfid": intfID, "onuid": onuID})
- return err
+ if onugem.OnuID == onuID {
+ for _, gem := range onugem.GemPorts {
+ if gem == gemPort {
+ logger.Debugw(ctx, "Gem already present in onugem info, skpping addition", log.Fields{"gem": gem})
+ return nil
+ }
+ }
+ logger.Debugw(ctx, "Added gem to onugem info", log.Fields{"gem": gemPort})
+ onugem.GemPorts = append(onugem.GemPorts, gemPort)
+ } else {
+ logger.Errorw(ctx, "onu id in OnuGemInfo does not match", log.Fields{"onuID": onuID, "ponIf": intfID, "onuGemInfoOnuID": onugem.OnuID})
+ return fmt.Errorf("onu-id-in-OnuGemInfo-does-not-match-%v", onuID)
}
- for idx, onugem := range onuGemData {
- if onugem.OnuID == onuID {
- for _, gem := range onuGemData[idx].GemPorts {
- if gem == gemPort {
- logger.Debugw(ctx, "Gem already present in onugem info, skpping addition", log.Fields{"gem": gem})
- return nil
- }
- }
- logger.Debugw(ctx, "Added gem to onugem info", log.Fields{"gem": gemPort})
- onuGemData[idx].GemPorts = append(onuGemData[idx].GemPorts, gemPort)
- break
- }
- }
- err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(ctx, intfID, onuGemData)
+ err = rsrcMgr.AddOnuGemInfo(ctx, intfID, onuID, *onugem)
if err != nil {
logger.Error(ctx, "Failed to add onugem to kv store")
return err
@@ -1186,78 +862,161 @@
return err
}
-//GetOnuGemInfo gets onu gem info from the kvstore per interface
-func (RsrcMgr *OpenOltResourceMgr) GetOnuGemInfo(ctx context.Context, IntfID uint32) ([]OnuGemInfo, error) {
- var onuGemData []OnuGemInfo
+//RemoveGemFromOnuGemInfo removes gemport from onugem info on kvstore and also local cache
+func (rsrcMgr *OpenOltResourceMgr) RemoveGemFromOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) error {
+ onugem, err := rsrcMgr.GetOnuGemInfo(ctx, intfID, onuID)
+ if err != nil || onugem == nil || onugem.SerialNumber == "" {
+ logger.Errorf(ctx, "failed to get onuifo for intfid %d", intfID)
+ return err
+ }
+ updated := false
+ if onugem.OnuID == onuID {
+ for i, gem := range onugem.GemPorts {
+ if gem == gemPort {
+ logger.Debugw(ctx, "Gem found, removing from onu gem info", log.Fields{"gem": gem})
+ onugem.GemPorts = append(onugem.GemPorts[:i], onugem.GemPorts[i+1:]...)
+ updated = true
+ break
+ }
+ }
+ } else {
+ logger.Errorw(ctx, "onu id in OnuGemInfo does not match", log.Fields{"onuID": onuID, "ponIf": intfID, "onuGemInfoOnuID": onugem.OnuID})
+ return fmt.Errorf("onu-id-in-OnuGemInfo-does-not-match-%v", onuID)
+ }
+ if updated {
+ err = rsrcMgr.AddOnuGemInfo(ctx, intfID, onuID, *onugem)
+ if err != nil {
+ logger.Error(ctx, "Failed to add onugem to kv store")
+ return err
+ }
+ } else {
+ logger.Debugw(ctx, "Gem port not found in onu gem info", log.Fields{"gem": gemPort})
+ }
+ return nil
+}
- if err := RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(ctx, IntfID, &onuGemData); err != nil {
- logger.Errorf(ctx, "failed to get onuifo for intfid %d", IntfID)
+//GetOnuGemInfo gets onu gem info from the kvstore per interface
+func (rsrcMgr *OpenOltResourceMgr) GetOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32) (*OnuGemInfo, error) {
+ var err error
+ var Val []byte
+ var onugem OnuGemInfo
+
+ path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
+
+ rsrcMgr.onuGemInfoLock.RLock()
+ val, ok := rsrcMgr.onuGemInfo[path]
+ rsrcMgr.onuGemInfoLock.RUnlock()
+ if ok {
+ return val, nil
+ }
+ value, err := rsrcMgr.KVStore.Get(ctx, path)
+ if err != nil {
+ logger.Errorw(ctx, "Failed to get from kv store", log.Fields{"path": path})
+ return nil, err
+ } else if value == nil {
+ logger.Debug(ctx, "No onuinfo for path", log.Fields{"path": path})
+ return nil, nil // returning nil as this could happen if there are no onus for the interface yet
+ }
+ if Val, err = kvstore.ToByte(value.Value); err != nil {
+ logger.Error(ctx, "Failed to convert to byte array")
return nil, err
}
- return onuGemData, nil
+ if err = json.Unmarshal(Val, &onugem); err != nil {
+ logger.Error(ctx, "Failed to unmarshall")
+ return nil, err
+ }
+ logger.Debugw(ctx, "found onugem info from path", log.Fields{"path": path, "onuGemInfo": onugem})
+ rsrcMgr.onuGemInfoLock.Lock()
+ rsrcMgr.onuGemInfo[path] = &onugem
+ rsrcMgr.onuGemInfoLock.Unlock()
+
+ return &onugem, nil
}
// AddOnuGemInfo adds onu info on to the kvstore per interface
-func (RsrcMgr *OpenOltResourceMgr) AddOnuGemInfo(ctx context.Context, IntfID uint32, onuGem OnuGemInfo) error {
- var onuGemData []OnuGemInfo
+func (rsrcMgr *OpenOltResourceMgr) AddOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, onuGem OnuGemInfo) error {
+
+ var Value []byte
var err error
+ Path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
- if err = RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(ctx, IntfID, &onuGemData); err != nil {
- logger.Errorf(ctx, "failed to get onuifo for intfid %d", IntfID)
- return olterrors.NewErrPersistence("get", "OnuGemInfo", uint64(IntfID),
- log.Fields{"onuGem": onuGem, "intfID": IntfID}, err)
- }
- onuGemData = append(onuGemData, onuGem)
- err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(ctx, IntfID, onuGemData)
+ rsrcMgr.onuGemInfoLock.Lock()
+ rsrcMgr.onuGemInfo[Path] = &onuGem
+ rsrcMgr.onuGemInfoLock.Unlock()
+
+ Value, err = json.Marshal(onuGem)
if err != nil {
- logger.Error(ctx, "Failed to add onugem to kv store")
- return olterrors.NewErrPersistence("set", "OnuGemInfo", uint64(IntfID),
- log.Fields{"onuGemData": onuGemData, "intfID": IntfID}, err)
+ logger.Error(ctx, "failed to Marshal")
+ return err
}
- logger.Debugw(ctx, "added onu to onugeminfo", log.Fields{"intf": IntfID, "onugem": onuGem})
+ if err = rsrcMgr.KVStore.Put(ctx, Path, Value); err != nil {
+ logger.Errorf(ctx, "Failed to update resource %s", Path)
+ return err
+ }
+ logger.Debugw(ctx, "added onu gem info", log.Fields{"onuGemInfo": onuGem})
+ return err
+}
+
+// DelOnuGemInfo deletes the onugem info from kvstore per ONU
+func (rsrcMgr *OpenOltResourceMgr) DelOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32) error {
+ path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
+ rsrcMgr.onuGemInfoLock.Lock()
+ logger.Debugw(ctx, "removing onu gem info", log.Fields{"onuGemInfo": rsrcMgr.onuGemInfo[path]})
+ delete(rsrcMgr.onuGemInfo, path)
+ rsrcMgr.onuGemInfoLock.Unlock()
+
+ if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ logger.Errorf(ctx, "failed to remove resource %s", path)
+ return err
+ }
return nil
}
// AddUniPortToOnuInfo adds uni port to the onuinfo kvstore. check if the uni is already present if not update the kv store.
-func (RsrcMgr *OpenOltResourceMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNo uint32) {
- var onuGemData []OnuGemInfo
- var err error
+func (rsrcMgr *OpenOltResourceMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNo uint32) {
- if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
- logger.Errorf(ctx, "failed to get onuifo for intfid %d", intfID)
+ onugem, err := rsrcMgr.GetOnuGemInfo(ctx, intfID, onuID)
+ if err != nil || onugem == nil || onugem.SerialNumber == "" {
+ logger.Warnf(ctx, "failed to get onuifo for intfid %d", intfID)
return
}
- for idx, onu := range onuGemData {
- if onu.OnuID == onuID {
- for _, uni := range onu.UniPorts {
- if uni == portNo {
- logger.Debugw(ctx, "uni already present in onugem info", log.Fields{"uni": portNo})
- return
- }
+
+ if onugem.OnuID == onuID {
+ for _, uni := range onugem.UniPorts {
+ if uni == portNo {
+ logger.Debugw(ctx, "uni already present in onugem info", log.Fields{"uni": portNo})
+ return
}
- onuGemData[idx].UniPorts = append(onuGemData[idx].UniPorts, portNo)
- break
}
+ onugem.UniPorts = append(onugem.UniPorts, portNo)
+ } else {
+ logger.Warnw(ctx, "onu id mismatch in onu gem info", log.Fields{"intfID": intfID, "onuID": onuID})
+ return
}
- err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(ctx, intfID, onuGemData)
+ err = rsrcMgr.AddOnuGemInfo(ctx, intfID, onuID, *onugem)
if err != nil {
- logger.Errorw(ctx, "Failed to add uin port in onugem to kv store", log.Fields{"uni": portNo})
+ logger.Errorw(ctx, "Failed to add uni port in onugem to kv store", log.Fields{"uni": portNo})
return
}
}
//UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno, vlan id, priority bit
-func (RsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
+func (rsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort, pktIn.VlanID, pktIn.Priority)
+ // update cache
+ rsrcMgr.gemPortForPacketInInfoLock.Lock()
+ rsrcMgr.gemPortForPacketInInfo[path] = gemPort
+ rsrcMgr.gemPortForPacketInInfoLock.Unlock()
+
Value, err := json.Marshal(gemPort)
if err != nil {
logger.Error(ctx, "Failed to marshal data")
return
}
- if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
+ if err = rsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"path": path, "value": gemPort})
return
}
@@ -1265,15 +1024,22 @@
}
// GetGemPortFromOnuPktIn gets the gem port from onu pkt in path, path being intfid, onuid, portno, vlan id, priority bit
-func (RsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(ctx context.Context, packetInInfoKey PacketInInfoKey) (uint32, error) {
+func (rsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(ctx context.Context, packetInInfoKey PacketInInfoKey) (uint32, error) {
var Val []byte
- var gemPort uint32
path := fmt.Sprintf(OnuPacketINPath, packetInInfoKey.IntfID, packetInInfoKey.OnuID, packetInInfoKey.LogicalPort,
packetInInfoKey.VlanID, packetInInfoKey.Priority)
+ // get from cache
+ rsrcMgr.gemPortForPacketInInfoLock.RLock()
+ gemPort, ok := rsrcMgr.gemPortForPacketInInfo[path]
+ rsrcMgr.gemPortForPacketInInfoLock.RUnlock()
+ if ok {
+ logger.Debugw(ctx, "found packein gemport from path", log.Fields{"path": path, "gem": gemPort})
+ return gemPort, nil
+ }
- value, err := RsrcMgr.KVStore.Get(ctx, path)
+ value, err := rsrcMgr.KVStore.Get(ctx, path)
if err != nil {
logger.Errorw(ctx, "Failed to get from kv store", log.Fields{"path": path})
return uint32(0), err
@@ -1291,15 +1057,20 @@
return uint32(0), err
}
logger.Debugw(ctx, "found packein gemport from path", log.Fields{"path": path, "gem": gemPort})
+ // update cache
+ rsrcMgr.gemPortForPacketInInfoLock.Lock()
+ rsrcMgr.gemPortForPacketInInfo[path] = gemPort
+ rsrcMgr.gemPortForPacketInInfoLock.Unlock()
return gemPort, nil
}
//DeletePacketInGemPortForOnu deletes the packet-in gemport for ONU
-func (RsrcMgr *OpenOltResourceMgr) DeletePacketInGemPortForOnu(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
+func (rsrcMgr *OpenOltResourceMgr) DeletePacketInGemPortForOnu(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
- value, err := RsrcMgr.KVStore.List(ctx, path)
+
+ value, err := rsrcMgr.KVStore.List(ctx, path)
if err != nil {
logger.Errorf(ctx, "failed-to-read-value-from-path-%s", path)
return errors.New("failed-to-read-value-from-path-" + path)
@@ -1308,12 +1079,16 @@
//remove them one by one
for key := range value {
// Formulate the right key path suffix ti be delete
- stringToBeReplaced := fmt.Sprintf(BasePathKvStore, RsrcMgr.KVStore.PathPrefix, RsrcMgr.DeviceID) + "/"
+ stringToBeReplaced := fmt.Sprintf(BasePathKvStore, rsrcMgr.KVStore.PathPrefix, rsrcMgr.DeviceID) + "/"
replacedWith := ""
key = strings.Replace(key, stringToBeReplaced, replacedWith, 1)
+ // update cache
+ rsrcMgr.gemPortForPacketInInfoLock.Lock()
+ delete(rsrcMgr.gemPortForPacketInInfo, key)
+ rsrcMgr.gemPortForPacketInInfoLock.Unlock()
logger.Debugf(ctx, "removing-key-%s", key)
- if err := RsrcMgr.KVStore.Delete(ctx, key); err != nil {
+ if err := rsrcMgr.KVStore.Delete(ctx, key); err != nil {
logger.Errorf(ctx, "failed-to-remove-resource-%s", key)
return err
}
@@ -1322,222 +1097,156 @@
return nil
}
-// DelOnuGemInfoForIntf deletes the onugem info from kvstore per interface
-func (RsrcMgr *OpenOltResourceMgr) DelOnuGemInfoForIntf(ctx context.Context, intfID uint32) error {
- if err := RsrcMgr.ResourceMgrs[intfID].DelOnuGemInfoForIntf(ctx, intfID); err != nil {
- logger.Errorw(ctx, "failed to delete onu gem info for", log.Fields{"intfid": intfID})
- return err
+//GetFlowIDsForGem gets the list of FlowIDs for the given gemport
+func (rsrcMgr *OpenOltResourceMgr) GetFlowIDsForGem(ctx context.Context, intf uint32, gem uint32) ([]uint64, error) {
+ path := fmt.Sprintf(FlowIDsForGem, intf, gem)
+
+ // get from cache
+ rsrcMgr.flowIDsForGemLock.RLock()
+ flowIDs, ok := rsrcMgr.flowIDsForGem[gem]
+ rsrcMgr.flowIDsForGemLock.RUnlock()
+ if ok {
+ return flowIDs, nil
}
- return nil
-}
-//GetNNIFromKVStore gets NNi intfids from kvstore. path being per device
-func (RsrcMgr *OpenOltResourceMgr) GetNNIFromKVStore(ctx context.Context) ([]uint32, error) {
-
- var nni []uint32
- var Val []byte
-
- path := NnniIntfID
- value, err := RsrcMgr.KVStore.Get(ctx, path)
+ value, err := rsrcMgr.KVStore.Get(ctx, path)
if err != nil {
- logger.Error(ctx, "failed to get data from kv store")
+ logger.Errorw(ctx, "Failed to get from kv store", log.Fields{"path": path})
+ return nil, err
+ } else if value == nil {
+ logger.Debug(ctx, "no flow-ids found", log.Fields{"path": path})
+ return nil, nil
+ }
+ Val, err := kvstore.ToByte(value.Value)
+ if err != nil {
+ logger.Error(ctx, "Failed to convert to byte array")
return nil, err
}
- if value != nil {
- if Val, err = kvstore.ToByte(value.Value); err != nil {
- logger.Error(ctx, "Failed to convert to byte array")
- return nil, err
- }
- if err = json.Unmarshal(Val, &nni); err != nil {
- logger.Error(ctx, "Failed to unmarshall")
- return nil, err
- }
- }
- return nni, err
-}
-// AddNNIToKVStore adds Nni interfaces to kvstore, path being per device.
-func (RsrcMgr *OpenOltResourceMgr) AddNNIToKVStore(ctx context.Context, nniIntf uint32) error {
- var Value []byte
-
- nni, err := RsrcMgr.GetNNIFromKVStore(ctx)
- if err != nil {
- logger.Error(ctx, "failed to fetch nni interfaces from kv store")
- return err
+ if err = json.Unmarshal(Val, &flowIDs); err != nil {
+ logger.Error(ctx, "Failed to unmarshall")
+ return nil, err
}
- path := NnniIntfID
- nni = append(nni, nniIntf)
- Value, err = json.Marshal(nni)
- if err != nil {
- logger.Error(ctx, "Failed to marshal data")
- }
- if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
- logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"path": path, "value": Value})
- return err
- }
- logger.Debugw(ctx, "added nni to kv successfully", log.Fields{"path": path, "nni": nniIntf})
- return nil
-}
+ // update cache
+ rsrcMgr.flowIDsForGemLock.Lock()
+ rsrcMgr.flowIDsForGem[gem] = flowIDs
+ rsrcMgr.flowIDsForGemLock.Unlock()
-// DelNNiFromKVStore deletes nni interface list from kv store.
-func (RsrcMgr *OpenOltResourceMgr) DelNNiFromKVStore(ctx context.Context) error {
-
- path := NnniIntfID
-
- if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorw(ctx, "Failed to delete nni interfaces from kv store", log.Fields{"path": path})
- return err
- }
- return nil
+ return flowIDs, nil
}
//UpdateFlowIDsForGem updates flow id per gemport
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint64) error {
+func (rsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint64) error {
var val []byte
- path := fmt.Sprintf(FlowIDsForGem, intf)
+ path := fmt.Sprintf(FlowIDsForGem, intf, gem)
- flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(ctx, intf)
- if err != nil {
- logger.Error(ctx, "Failed to ger flowids for interface", log.Fields{"error": err, "intf": intf})
- return err
+ // update cache
+ rsrcMgr.flowIDsForGemLock.Lock()
+ rsrcMgr.flowIDsForGem[gem] = flowIDs
+ rsrcMgr.flowIDsForGemLock.Unlock()
+
+ if flowIDs == nil {
+ return nil
}
- if flowsForGem == nil {
- flowsForGem = make(map[uint32][]uint64)
- }
- flowsForGem[gem] = flowIDs
- val, err = json.Marshal(flowsForGem)
+ val, err := json.Marshal(flowIDs)
if err != nil {
- logger.Error(ctx, "Failed to marshal data", log.Fields{"error": err})
+ logger.Error(ctx, "Failed to marshal data", log.Fields{"err": err})
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
- logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
+ if err = rsrcMgr.KVStore.Put(ctx, path, val); err != nil {
+ logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"err": err, "path": path, "value": val})
return err
}
- logger.Debugw(ctx, "added flowid list for gem to kv successfully", log.Fields{"path": path, "flowidlist": flowsForGem[gem]})
+ logger.Debugw(ctx, "added flowid list for gem to kv successfully", log.Fields{"path": path, "flowidlist": flowIDs})
return nil
}
//DeleteFlowIDsForGem deletes the flowID list entry per gem from kvstore.
-func (RsrcMgr *OpenOltResourceMgr) DeleteFlowIDsForGem(ctx context.Context, intf uint32, gem uint32) {
- path := fmt.Sprintf(FlowIDsForGem, intf)
- var val []byte
-
- flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(ctx, intf)
- if err != nil {
- logger.Error(ctx, "Failed to ger flowids for interface", log.Fields{"error": err, "intf": intf})
- return
+func (rsrcMgr *OpenOltResourceMgr) DeleteFlowIDsForGem(ctx context.Context, intf uint32, gem uint32) {
+ path := fmt.Sprintf(FlowIDsForGem, intf, gem)
+ // update cache
+ rsrcMgr.flowIDsForGemLock.Lock()
+ delete(rsrcMgr.flowIDsForGem, gem)
+ rsrcMgr.flowIDsForGemLock.Unlock()
+ if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ logger.Errorw(ctx, "Failed to delete from kvstore", log.Fields{"err": err, "path": path})
}
- if flowsForGem == nil {
- logger.Error(ctx, "No flowids found ", log.Fields{"intf": intf, "gemport": gem})
- return
- }
- // once we get the flows per gem map from kv , just delete the gem entry from the map
- delete(flowsForGem, gem)
- // once gem entry is deleted update the kv store.
- val, err = json.Marshal(flowsForGem)
- if err != nil {
- logger.Error(ctx, "Failed to marshal data", log.Fields{"error": err})
- return
- }
-
- if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
- logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
- }
-}
-
-//GetFlowIDsGemMapForInterface gets flowids per gemport and interface
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDsGemMapForInterface(ctx context.Context, intf uint32) (map[uint32][]uint64, error) {
- path := fmt.Sprintf(FlowIDsForGem, intf)
- var flowsForGem map[uint32][]uint64
- var val []byte
- value, err := RsrcMgr.KVStore.Get(ctx, path)
- if err != nil {
- logger.Error(ctx, "failed to get data from kv store")
- return nil, err
- }
- if value != nil && value.Value != nil {
- if val, err = kvstore.ToByte(value.Value); err != nil {
- logger.Error(ctx, "Failed to convert to byte array ", log.Fields{"error": err})
- return nil, err
- }
- if err = json.Unmarshal(val, &flowsForGem); err != nil {
- logger.Error(ctx, "Failed to unmarshall", log.Fields{"error": err})
- return nil, err
- }
- }
- return flowsForGem, nil
-}
-
-//DeleteIntfIDGempMapPath deletes the intf id path used to store flow ids per gem to kvstore.
-func (RsrcMgr *OpenOltResourceMgr) DeleteIntfIDGempMapPath(ctx context.Context, intf uint32) {
- path := fmt.Sprintf(FlowIDsForGem, intf)
-
- if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorw(ctx, "Failed to delete nni interfaces from kv store", log.Fields{"path": path})
- }
-}
-
-// RemoveResourceMap Clear resource map associated with (intfid, onuid, uniid) tuple.
-func (RsrcMgr *OpenOltResourceMgr) RemoveResourceMap(ctx context.Context, intfID uint32, onuID int32, uniID int32) {
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
}
//GetMcastQueuePerInterfaceMap gets multicast queue info per pon interface
-func (RsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap(ctx context.Context) (map[uint32][]uint32, error) {
+func (rsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap(ctx context.Context) (map[uint32][]uint32, error) {
path := McastQueuesForIntf
- var mcastQueueToIntfMap map[uint32][]uint32
var val []byte
- kvPair, err := RsrcMgr.KVStore.Get(ctx, path)
+ rsrcMgr.mcastQueueForIntfLock.RLock()
+ if rsrcMgr.mcastQueueForIntfLoadedFromKvStore {
+ rsrcMgr.mcastQueueForIntfLock.RUnlock()
+ return rsrcMgr.mcastQueueForIntf, nil
+ }
+ rsrcMgr.mcastQueueForIntfLock.RUnlock()
+
+ kvPair, err := rsrcMgr.KVStore.Get(ctx, path)
if err != nil {
logger.Error(ctx, "failed to get data from kv store")
return nil, err
}
if kvPair != nil && kvPair.Value != nil {
if val, err = kvstore.ToByte(kvPair.Value); err != nil {
- logger.Error(ctx, "Failed to convert to byte array ", log.Fields{"error": err})
+ logger.Error(ctx, "Failed to convert to byte array ", log.Fields{"err": err})
return nil, err
}
- if err = json.Unmarshal(val, &mcastQueueToIntfMap); err != nil {
- logger.Error(ctx, "Failed to unmarshall ", log.Fields{"error": err})
+ rsrcMgr.mcastQueueForIntfLock.Lock()
+ defer rsrcMgr.mcastQueueForIntfLock.Unlock()
+ if err = json.Unmarshal(val, &rsrcMgr.mcastQueueForIntf); err != nil {
+ logger.Error(ctx, "Failed to unmarshall ", log.Fields{"err": err})
return nil, err
}
+ rsrcMgr.mcastQueueForIntfLoadedFromKvStore = true
}
- return mcastQueueToIntfMap, nil
+ return rsrcMgr.mcastQueueForIntf, nil
}
//AddMcastQueueForIntf adds multicast queue for pon interface
-func (RsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(ctx context.Context, intf uint32, gem uint32, servicePriority uint32) error {
+func (rsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(ctx context.Context, intf uint32, gem uint32, servicePriority uint32) error {
var val []byte
path := McastQueuesForIntf
- mcastQueues, err := RsrcMgr.GetMcastQueuePerInterfaceMap(ctx)
+ // Load local cache from kv store the first time
+ rsrcMgr.mcastQueueForIntfLock.RLock()
+ if !rsrcMgr.mcastQueueForIntfLoadedFromKvStore {
+ rsrcMgr.mcastQueueForIntfLock.RUnlock()
+ _, err := rsrcMgr.GetMcastQueuePerInterfaceMap(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "Failed to get multicast queue info for interface", log.Fields{"err": err, "intf": intf})
+ return err
+ }
+ } else {
+ rsrcMgr.mcastQueueForIntfLock.RUnlock()
+ }
+
+ // Update KV store
+ rsrcMgr.mcastQueueForIntfLock.Lock()
+ rsrcMgr.mcastQueueForIntf[intf] = []uint32{gem, servicePriority}
+ val, err := json.Marshal(rsrcMgr.mcastQueueForIntf)
if err != nil {
- logger.Errorw(ctx, "Failed to get multicast queue info for interface", log.Fields{"error": err, "intf": intf})
+ rsrcMgr.mcastQueueForIntfLock.Unlock()
+ logger.Errorw(ctx, "Failed to marshal data", log.Fields{"err": err})
return err
}
- if mcastQueues == nil {
- mcastQueues = make(map[uint32][]uint32)
- }
- mcastQueues[intf] = []uint32{gem, servicePriority}
- if val, err = json.Marshal(mcastQueues); err != nil {
- logger.Errorw(ctx, "Failed to marshal data", log.Fields{"error": err})
+ rsrcMgr.mcastQueueForIntfLock.Unlock()
+
+ if err = rsrcMgr.KVStore.Put(ctx, path, val); err != nil {
+ logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"err": err, "path": path, "value": val})
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
- logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
- return err
- }
- logger.Debugw(ctx, "added multicast queue info to KV store successfully", log.Fields{"path": path, "mcastQueueInfo": mcastQueues[intf], "interfaceId": intf})
+ logger.Debugw(ctx, "added multicast queue info to KV store successfully", log.Fields{"path": path, "interfaceId": intf, "gem": gem, "svcPrior": servicePriority})
return nil
}
//AddFlowGroupToKVStore adds flow group into KV store
-func (RsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(ctx context.Context, groupEntry *ofp.OfpGroupEntry, cached bool) error {
+func (rsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(ctx context.Context, groupEntry *ofp.OfpGroupEntry, cached bool) error {
var Value []byte
var err error
var path string
@@ -1560,6 +1269,10 @@
OutPorts: outPorts,
}
+ rsrcMgr.groupInfoLock.Lock()
+ rsrcMgr.groupInfo[path] = &groupInfo
+ rsrcMgr.groupInfoLock.Unlock()
+
Value, err = json.Marshal(groupInfo)
if err != nil {
@@ -1567,7 +1280,7 @@
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
+ if err = rsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
logger.Errorf(ctx, "Failed to update resource %s", path)
return err
}
@@ -1575,14 +1288,18 @@
}
//RemoveFlowGroupFromKVStore removes flow group from KV store
-func (RsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) error {
+func (rsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) error {
var path string
if cached {
path = fmt.Sprintf(FlowGroupCached, groupID)
} else {
path = fmt.Sprintf(FlowGroup, groupID)
}
- if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ rsrcMgr.groupInfoLock.Lock()
+ delete(rsrcMgr.groupInfo, path)
+ rsrcMgr.groupInfoLock.Unlock()
+
+ if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
logger.Errorf(ctx, "Failed to remove resource %s due to %s", path, err)
return err
}
@@ -1592,7 +1309,7 @@
//GetFlowGroupFromKVStore fetches flow group from the KV store. Returns (false, {} error) if any problem occurs during
//fetching the data. Returns (true, groupInfo, nil) if the group is fetched successfully.
// Returns (false, {}, nil) if the group does not exists in the KV store.
-func (RsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (bool, GroupInfo, error) {
+func (rsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (bool, GroupInfo, error) {
var groupInfo GroupInfo
var path string
if cached {
@@ -1600,20 +1317,34 @@
} else {
path = fmt.Sprintf(FlowGroup, groupID)
}
- kvPair, err := RsrcMgr.KVStore.Get(ctx, path)
+
+ // read from cache
+ rsrcMgr.groupInfoLock.RLock()
+ gi, ok := rsrcMgr.groupInfo[path]
+ rsrcMgr.groupInfoLock.RUnlock()
+ if ok {
+ return true, *gi, nil
+ }
+
+ kvPair, err := rsrcMgr.KVStore.Get(ctx, path)
if err != nil {
return false, groupInfo, err
}
if kvPair != nil && kvPair.Value != nil {
Val, err := kvstore.ToByte(kvPair.Value)
if err != nil {
- logger.Errorw(ctx, "Failed to convert flow group into byte array", log.Fields{"error": err})
+ logger.Errorw(ctx, "Failed to convert flow group into byte array", log.Fields{"err": err})
return false, groupInfo, err
}
if err = json.Unmarshal(Val, &groupInfo); err != nil {
- logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"error": err})
+ logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"err": err})
return false, groupInfo, err
}
+ // update cache
+ rsrcMgr.groupInfoLock.Lock()
+ rsrcMgr.groupInfo[path] = &groupInfo
+ rsrcMgr.groupInfoLock.Unlock()
+
return true, groupInfo, nil
}
return false, groupInfo, nil
@@ -1631,19 +1362,3 @@
return nil, fmt.Errorf("unexpected-type-%T", t)
}
}
-
-func checkForFlowIDInList(FlowIDList []uint64, FlowID uint64) (bool, uint64) {
- /*
- Check for a flow id in a given list of flow IDs.
- :param FLowIDList: List of Flow IDs
- :param FlowID: Flowd to check in the list
- : return true and the index if present false otherwise.
- */
-
- for idx := range FlowIDList {
- if FlowID == FlowIDList[idx] {
- return true, uint64(idx)
- }
- }
- return false, 0
-}
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index 53f8898..443f418 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -27,19 +27,18 @@
"context"
"encoding/json"
"errors"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ "github.com/opencord/voltha-openolt-adapter/pkg/mocks"
"reflect"
"strconv"
"strings"
- "sync"
"testing"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- ponrmgr "github.com/opencord/voltha-lib-go/v4/pkg/ponresourcemanager"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ ponrmgr "github.com/opencord/voltha-lib-go/v5/pkg/ponresourcemanager"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/openolt"
)
@@ -77,7 +76,7 @@
KVStore *db.Backend
DeviceType string
DevInfo *openolt.DeviceInfo
- ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+ PonRsrMgr *ponrmgr.PONResourceManager
NumOfPonPorts uint32
}
@@ -87,12 +86,11 @@
// getResMgr mocks OpenOltResourceMgr struct.
func getResMgr() *fields {
- ctx := context.TODO()
var resMgr fields
resMgr.KVStore = &db.Backend{
Client: &MockResKVClient{},
}
- resMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
+ resMgr.PonRsrMgr = &ponrmgr.PONResourceManager{}
ranges := make(map[string]interface{})
sharedIdxByType := make(map[string]string)
sharedIdxByType["ALLOC_ID"] = "ALLOC_ID"
@@ -108,25 +106,22 @@
ranges["gemport_id_shared"] = uint32(0)
ranges["flow_id_shared"] = uint32(0)
resMgr.NumOfPonPorts = 16
- ponMgr := &ponrmgr.PONResourceManager{}
- tpMgr, err := tp.NewTechProfile(ctx, ponMgr, "etcd", "127.0.0.1", "/")
- if err != nil {
- logger.Fatal(ctx, err.Error())
- }
-
- ponMgr.DeviceID = "onu-1"
- ponMgr.IntfIDs = []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
- ponMgr.KVStore = &db.Backend{
+ resMgr.PonRsrMgr.DeviceID = "onu-1"
+ resMgr.PonRsrMgr.IntfIDs = []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
+ resMgr.PonRsrMgr.KVStore = &db.Backend{
Client: &MockResKVClient{},
}
- ponMgr.PonResourceRanges = ranges
- ponMgr.SharedIdxByType = sharedIdxByType
- ponMgr.TechProfileMgr = tpMgr
+ resMgr.PonRsrMgr.Technology = "XGS-PON"
+ resMgr.PonRsrMgr.PonResourceRanges = ranges
+ resMgr.PonRsrMgr.SharedIdxByType = sharedIdxByType
+ /*
+ tpMgr, err := tp.NewTechProfile(ctx, resMgr.PonRsrMgr, "etcd", "127.0.0.1", "/")
+ if err != nil {
+ logger.Fatal(ctx, err.Error())
+ }
+ */
+ resMgr.PonRsrMgr.TechProfileMgr = &mocks.MockTechProfile{TpID: 64}
- var ponIntf uint32
- for ponIntf = 0; ponIntf < resMgr.NumOfPonPorts; ponIntf++ {
- resMgr.ResourceMgrs[ponIntf] = ponMgr
- }
return &resMgr
}
@@ -265,18 +260,15 @@
// testResMgrObject maps fields type to OpenOltResourceMgr type.
func testResMgrObject(testResMgr *fields) *OpenOltResourceMgr {
var rsrMgr = OpenOltResourceMgr{
- DeviceID: testResMgr.DeviceID,
- Args: testResMgr.Args,
- KVStore: testResMgr.KVStore,
- DeviceType: testResMgr.DeviceType,
- Address: testResMgr.Address,
- DevInfo: testResMgr.DevInfo,
- ResourceMgrs: testResMgr.ResourceMgrs,
+ DeviceID: testResMgr.DeviceID,
+ Args: testResMgr.Args,
+ KVStore: testResMgr.KVStore,
+ DeviceType: testResMgr.DeviceType,
+ Address: testResMgr.Address,
+ DevInfo: testResMgr.DevInfo,
+ PonRsrMgr: testResMgr.PonRsrMgr,
}
-
- rsrMgr.AllocIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
- rsrMgr.GemPortIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
- rsrMgr.OnuIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+ rsrMgr.InitLocalCache()
return &rsrMgr
}
@@ -284,6 +276,7 @@
func TestNewResourceMgr(t *testing.T) {
type args struct {
deviceID string
+ intfID uint32
KVStoreAddress string
kvStoreType string
deviceType string
@@ -295,14 +288,14 @@
args args
want *OpenOltResourceMgr
}{
- {"NewResourceMgr-2", args{"olt1", "1:2", "etcd",
+ {"NewResourceMgr-2", args{"olt1", 0, "1:2", "etcd",
"onu", &openolt.DeviceInfo{OnuIdStart: 1, OnuIdEnd: 1}, "service/voltha"}, &OpenOltResourceMgr{}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- if got := NewResourceMgr(ctx, tt.args.deviceID, tt.args.KVStoreAddress, tt.args.kvStoreType, tt.args.deviceType, tt.args.devInfo, tt.args.kvStorePrefix); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ if got := NewResourceMgr(ctx, tt.args.intfID, tt.args.deviceID, tt.args.KVStoreAddress, tt.args.kvStoreType, tt.args.deviceType, tt.args.devInfo, tt.args.kvStorePrefix); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("NewResourceMgr() = %v, want %v", got, tt.want)
}
})
@@ -310,19 +303,23 @@
}
func TestOpenOltResourceMgr_Delete(t *testing.T) {
+ type args struct {
+ intfID uint32
+ }
tests := []struct {
name string
fields *fields
wantErr error
+ args args
}{
- {"Delete-1", getResMgr(), errors.New("failed to clear device resource pool")},
+ {"Delete-1", getResMgr(), errors.New("failed to clear device resource pool"), args{intfID: 0}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
RsrcMgr := testResMgrObject(tt.fields)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- if err := RsrcMgr.Delete(ctx); (err != nil) && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
+ if err := RsrcMgr.Delete(ctx, tt.args.intfID); (err != nil) && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
t.Errorf("Delete() error = %v, wantErr %v", err, tt.wantErr)
}
})
@@ -374,33 +371,6 @@
}
}
-func TestOpenOltResourceMgr_GetAllocID(t *testing.T) {
-
- type args struct {
- intfID uint32
- onuID uint32
- uniID uint32
- }
- tests := []struct {
- name string
- fields *fields
- args args
- want uint32
- }{
- {"GetAllocID-1", getResMgr(), args{1, 2, 2}, 0},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- RsrcMgr := testResMgrObject(tt.fields)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if got := RsrcMgr.GetAllocID(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
- t.Errorf("GetAllocID() = %v, want %v", got, tt.want)
- }
- })
- }
-}
-
func TestOpenOltResourceMgr_GetCurrentAllocIDForOnu(t *testing.T) {
type args struct {
intfID uint32
@@ -420,8 +390,16 @@
RsrcMgr := testResMgrObject(tt.fields)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- if got := RsrcMgr.GetCurrentAllocIDsForOnu(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID); !reflect.DeepEqual(got, tt.want) {
+ got := RsrcMgr.GetCurrentAllocIDsForOnu(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID)
+ if len(got) != len(tt.want) {
t.Errorf("GetCurrentAllocIDsForOnu() = %v, want %v", got, tt.want)
+ } else {
+ for i := range tt.want {
+ if got[i] != tt.want[i] {
+ t.Errorf("GetCurrentAllocIDsForOnu() = %v, want %v", got, tt.want)
+ break
+ }
+ }
}
})
}
@@ -484,40 +462,6 @@
}
}
-func TestOpenOltResourceMgr_GetGEMPortID(t *testing.T) {
- type args struct {
- ponPort uint32
- onuID uint32
- uniID uint32
- NumOfPorts uint32
- }
- tests := []struct {
- name string
- fields *fields
- args args
- want []uint32
- wantErr error
- }{
- {"GetGEMPortID-1", getResMgr(), args{1, 2, 2, 2}, []uint32{},
- errors.New("failed to get gem port")},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- RsrcMgr := testResMgrObject(tt.fields)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- got, err := RsrcMgr.GetGEMPortID(ctx, tt.args.ponPort, tt.args.onuID, tt.args.uniID, tt.args.NumOfPorts)
- if reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
- t.Errorf("GetGEMPortID() error = %v, wantErr %v", err, tt.wantErr)
- return
- }
- if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
- t.Errorf("GetGEMPortID() got = %v, want %v", got, tt.want)
- }
- })
- }
-}
-
func TestOpenOltResourceMgr_GetMeterInfoForOnu(t *testing.T) {
type args struct {
Direction string
@@ -693,34 +637,6 @@
}
}
-func TestOpenOltResourceMgr_UpdateFlowIDInfo(t *testing.T) {
- type args struct {
- ponIntfID int32
- onuID int32
- uniID int32
- flowID uint64
- flowData FlowInfo
- }
- tests := []struct {
- name string
- fields *fields
- args args
- wantErr error
- }{
- {"UpdateFlowIDInfo-1", getResMgr(), args{1, 2, 2, 2, FlowInfo{}}, errors.New("")},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- RsrcMgr := testResMgrObject(tt.fields)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if err := RsrcMgr.UpdateFlowIDInfo(ctx, uint32(tt.args.ponIntfID), tt.args.onuID, tt.args.uniID, tt.args.flowID, tt.args.flowData); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
- t.Errorf("UpdateFlowIDInfo() error = %v, wantErr %v", err, tt.wantErr)
- }
- })
- }
-}
-
func TestOpenOltResourceMgr_UpdateGEMPortIDsForOnu(t *testing.T) {
type args struct {
@@ -750,35 +666,6 @@
}
}
-func TestOpenOltResourceMgr_UpdateGEMportsPonportToOnuMapOnKVStore(t *testing.T) {
- type args struct {
- gemPorts []uint32
- PonPort uint32
- onuID uint32
- uniID uint32
- }
- tests := []struct {
- name string
- fields *fields
- args args
- wantErr error
- }{
- {"UpdateGEMportsPonportToOnuMapOnKVStore-1", getResMgr(), args{[]uint32{1, 2},
- 1, 2, 2}, errors.New("failed to update resource")},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- RsrcMgr := testResMgrObject(tt.fields)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if err := RsrcMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, tt.args.gemPorts, tt.args.PonPort,
- tt.args.onuID, tt.args.uniID); err != nil && reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) {
- t.Errorf("UpdateGEMportsPonportToOnuMapOnKVStore() error = %v, wantErr %v", err, tt.wantErr)
- }
- })
- }
-}
-
func TestOpenOltResourceMgr_UpdateMeterIDForOnu(t *testing.T) {
type args struct {
Direction string