VOL-4077: Improve storage usage on etcd
- Do away with unnecessary data storage on etcd if it can be
reconciled on adapter restart
- For data that needs storage, use lesser footprint if possible
- Use write-through-cache for all data stored on etcd via
resource manager module
- Use ResourceManager module per interface to localize lock
contention per PON port
Change-Id: I21d38216fab195d738a446b3f96a00251569e38b
diff --git a/internal/pkg/core/common.go b/internal/pkg/core/common.go
index f959b7f..43f0047 100644
--- a/internal/pkg/core/common.go
+++ b/internal/pkg/core/common.go
@@ -18,7 +18,7 @@
package core
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 7f2b53e..4243a26 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -35,12 +35,12 @@
"github.com/golang/protobuf/ptypes"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v4/pkg/config"
- "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
- flow_utils "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- "github.com/opencord/voltha-lib-go/v4/pkg/pmmetrics"
+ "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/config"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
+ flow_utils "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/pmmetrics"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
@@ -82,7 +82,9 @@
flowMgr []*OpenOltFlowMgr
groupMgr *OpenOltGroupMgr
eventMgr *OpenOltEventMgr
- resourceMgr *rsrcMgr.OpenOltResourceMgr
+ resourceMgr []*rsrcMgr.OpenOltResourceMgr
+
+ deviceInfo *oop.DeviceInfo
discOnus sync.Map
onus sync.Map
@@ -528,9 +530,6 @@
_ = olterrors.NewErrAdapter("handle-indication-error", log.Fields{"type": "interface-oper-nni", "device-id": dh.device.Id}, err).Log()
}
}()
- if err := dh.resourceMgr.AddNNIToKVStore(ctx, intfOperInd.GetIntfId()); err != nil {
- logger.Warn(ctx, err)
- }
} else if intfOperInd.GetType() == "pon" {
// TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
// Handle pon port update
@@ -826,29 +825,35 @@
}
func (dh *DeviceHandler) initializeDeviceHandlerModules(ctx context.Context) error {
- deviceInfo, err := dh.populateDeviceInfo(ctx)
+ var err error
+ dh.deviceInfo, err = dh.populateDeviceInfo(ctx)
if err != nil {
return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
}
- dh.totalPonPorts = deviceInfo.GetPonPorts()
- dh.agentPreviouslyConnected = deviceInfo.PreviouslyConnected
+ dh.totalPonPorts = dh.deviceInfo.GetPonPorts()
+ dh.agentPreviouslyConnected = dh.deviceInfo.PreviouslyConnected
- // Instantiate resource manager
- if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, deviceInfo, dh.cm.Backend.PathPrefix); dh.resourceMgr == nil {
- return olterrors.ErrResourceManagerInstantiating
- }
-
- dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
-
+ dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts)
dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
- for i := range dh.flowMgr {
- // Instantiate flow manager
- if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr, uint32(i)); dh.flowMgr[i] == nil {
+ var i uint32
+ for i = 0; i < dh.totalPonPorts; i++ {
+ // Instantiate resource manager
+ if dh.resourceMgr[i] = rsrcMgr.NewResourceMgr(ctx, i, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, dh.deviceInfo, dh.cm.Backend.PathPrefix); dh.resourceMgr[i] == nil {
return olterrors.ErrResourceManagerInstantiating
}
}
-
+ // GroupManager instance is per OLT. But it needs a reference to any instance of resourceMgr to interface with
+ // the KV store to manage mcast group data. Provide the first instance (0th index)
+ if dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0]); dh.groupMgr == nil {
+ return olterrors.ErrGroupManagerInstantiating
+ }
+ for i = 0; i < dh.totalPonPorts; i++ {
+ // Instantiate flow manager
+ if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr[i], dh.groupMgr, i); dh.flowMgr[i] == nil {
+ return olterrors.ErrFlowManagerInstantiating
+ }
+ }
/* TODO: Instantiate Alarm , stats , BW managers */
/* Instantiating Event Manager to handle Alarms and KPIs */
dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
@@ -913,7 +918,7 @@
ports, err := dh.coreProxy.ListDevicePorts(log.WithSpanFromContext(context.Background(), ctx), dh.device.Id)
if err != nil {
- logger.Warnw(ctx, "failed-to-list-ports", log.Fields{"device-id": dh.device.Id, "error": err})
+ logger.Warnw(ctx, "failed-to-list-ports", log.Fields{"device-id": dh.device.Id, "err": err})
continue
}
for _, port := range ports {
@@ -935,10 +940,9 @@
}
logger.Debugw(ctx, "publish-pon-metrics", log.Fields{"pon-port": port.Label})
- //ONU & Gem Stats
- onuGemInfo := dh.flowMgr[intfID].onuGemInfo
- if len(onuGemInfo) != 0 {
- go dh.portStats.collectOnuAndGemStats(ctx, onuGemInfo)
+ onuGemInfoLst := dh.flowMgr[intfID].getOnuGemInfoList()
+ if len(onuGemInfoLst) > 0 {
+ go dh.portStats.collectOnuAndGemStats(ctx, onuGemInfoLst)
}
}
}
@@ -978,6 +982,15 @@
}, nil
}
+// GetInterAdapterTechProfileDownloadMessage fetches the TechProfileDownloadMessage for the caller.
+func (dh *DeviceHandler) GetInterAdapterTechProfileDownloadMessage(ctx context.Context, tpPath string, ponPortNum uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
+ ifID, err := IntfIDFromPonPortNum(ctx, ponPortNum)
+ if err != nil {
+ return nil
+ }
+ return dh.flowMgr[ifID].getTechProfileDownloadMessage(ctx, tpPath, ifID, onuID, uniID)
+}
+
func (dh *DeviceHandler) omciIndication(ctx context.Context, omciInd *oop.OmciIndication) error {
logger.Debugw(ctx, "omci-indication", log.Fields{"intf-id": omciInd.IntfId, "onu-id": omciInd.OnuId, "device-id": dh.device.Id})
var deviceType string
@@ -1038,44 +1051,47 @@
func (dh *DeviceHandler) ProcessInterAdapterMessage(ctx context.Context, msg *ic.InterAdapterMessage) error {
logger.Debugw(ctx, "process-inter-adapter-message", log.Fields{"msgID": msg.Header.Id})
if msg.Header.Type == ic.InterAdapterMessageType_OMCI_REQUEST {
- msgID := msg.Header.Id
- fromTopic := msg.Header.FromTopic
- toTopic := msg.Header.ToTopic
- toDeviceID := msg.Header.ToDeviceId
- proxyDeviceID := msg.Header.ProxyDeviceId
+ return dh.handleInterAdapterOmciMsg(ctx, msg)
+ }
+ return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
+}
- logger.Debugw(ctx, "omci-request-message-header", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
+func (dh *DeviceHandler) handleInterAdapterOmciMsg(ctx context.Context, msg *ic.InterAdapterMessage) error {
+ msgID := msg.Header.Id
+ fromTopic := msg.Header.FromTopic
+ toTopic := msg.Header.ToTopic
+ toDeviceID := msg.Header.ToDeviceId
+ proxyDeviceID := msg.Header.ProxyDeviceId
- msgBody := msg.GetBody()
+ logger.Debugw(ctx, "omci-request-message-header", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- omciMsg := &ic.InterAdapterOmciMessage{}
- if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
- return olterrors.NewErrAdapter("cannot-unmarshal-omci-msg-body", log.Fields{"msgbody": msgBody}, err)
+ msgBody := msg.GetBody()
+
+ omciMsg := &ic.InterAdapterOmciMessage{}
+ if err := ptypes.UnmarshalAny(msgBody, omciMsg); err != nil {
+ return olterrors.NewErrAdapter("cannot-unmarshal-omci-msg-body", log.Fields{"msgbody": msgBody}, err)
+ }
+
+ if omciMsg.GetProxyAddress() == nil {
+ onuDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, toDeviceID)
+ if err != nil {
+ return olterrors.NewErrNotFound("onu", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err)
}
-
- if omciMsg.GetProxyAddress() == nil {
- onuDevice, err := dh.coreProxy.GetDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, toDeviceID)
- if err != nil {
- return olterrors.NewErrNotFound("onu", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
- }
- logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- if err := dh.sendProxiedMessage(ctx, onuDevice, omciMsg); err != nil {
- return olterrors.NewErrCommunication("send-failed", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
- }
- } else {
- logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
- if err := dh.sendProxiedMessage(ctx, nil, omciMsg); err != nil {
- return olterrors.NewErrCommunication("send-failed", log.Fields{
- "device-id": dh.device.Id,
- "onu-device-id": toDeviceID}, err)
- }
+ logger.Debugw(ctx, "device-retrieved-from-core", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
+ if err := dh.sendProxiedMessage(ctx, onuDevice, omciMsg); err != nil {
+ return olterrors.NewErrCommunication("send-failed", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err)
}
} else {
- return olterrors.NewErrInvalidValue(log.Fields{"inter-adapter-message-type": msg.Header.Type}, nil)
+ logger.Debugw(ctx, "proxy-address-found-in-omci-message", log.Fields{"msgID": msgID, "fromTopic": fromTopic, "toTopic": toTopic, "toDeviceID": toDeviceID, "proxyDeviceID": proxyDeviceID})
+ if err := dh.sendProxiedMessage(ctx, nil, omciMsg); err != nil {
+ return olterrors.NewErrCommunication("send-failed", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device-id": toDeviceID}, err)
+ }
}
return nil
}
@@ -1129,7 +1145,6 @@
if err := dh.flowMgr[intfID].UpdateOnuInfo(ctx, intfID, uint32(onuID), serialNumber); err != nil {
return olterrors.NewErrAdapter("onu-activate-failed", log.Fields{"onu": onuID, "intf-id": intfID}, err)
}
- // TODO: need resource manager
var pir uint32 = 1000000
Onu := oop.Onu{IntfId: intfID, OnuId: uint32(onuID), SerialNumber: serialNum, Pir: pir, OmccEncryption: dh.openOLT.config.OmccEncryption}
if _, err := dh.Client.ActivateOnu(ctx, &Onu); err != nil {
@@ -1182,7 +1197,7 @@
alarmInd.LosStatus = statusCheckOff
go func() {
if err := dh.eventMgr.onuAlarmIndication(ctx, &alarmInd, onuInCache.(*OnuDevice).deviceID, raisedTs); err != nil {
- logger.Debugw(ctx, "indication-failed", log.Fields{"error": err})
+ logger.Debugw(ctx, "indication-failed", log.Fields{"err": err})
}
}()
}
@@ -1220,7 +1235,7 @@
logger.Debugw(ctx, "creating-new-onu", log.Fields{"sn": sn})
// we need to create a new ChildDevice
ponintfid := onuDiscInd.GetIntfId()
- onuID, err = dh.resourceMgr.GetONUID(ctx, ponintfid)
+ onuID, err = dh.resourceMgr[ponintfid].GetONUID(ctx, ponintfid)
logger.Infow(ctx, "creating-new-onu-got-onu-id", log.Fields{"sn": sn, "onuId": onuID})
@@ -1236,13 +1251,13 @@
if onuDevice, err = dh.coreProxy.ChildDeviceDetected(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, int(parentPortNo),
"", int(channelID), string(onuDiscInd.SerialNumber.GetVendorId()), sn, int64(onuID)); err != nil {
dh.discOnus.Delete(sn)
- dh.resourceMgr.FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
+ dh.resourceMgr[ponintfid].FreeonuID(ctx, ponintfid, []uint32{onuID}) // NOTE I'm not sure this method is actually cleaning up the right thing
return olterrors.NewErrAdapter("core-proxy-child-device-detected-failed", log.Fields{
"pon-intf-id": ponintfid,
"serial-number": sn}, err)
}
if err := dh.eventMgr.OnuDiscoveryIndication(ctx, onuDiscInd, dh.device.Id, onuDevice.Id, onuID, sn, time.Now().Unix()); err != nil {
- logger.Warnw(ctx, "discovery-indication-failed", log.Fields{"error": err})
+ logger.Warnw(ctx, "discovery-indication-failed", log.Fields{"err": err})
}
logger.Infow(ctx, "onu-child-device-added",
log.Fields{"onuDevice": onuDevice,
@@ -1341,7 +1356,7 @@
}
if onuInd.OperState == "down" && onuInd.FailReason != oop.OnuIndication_ONU_ACTIVATION_FAIL_REASON_NONE {
if err := dh.eventMgr.onuActivationIndication(ctx, onuActivationFailEvent, onuInd, dh.device.Id, time.Now().Unix()); err != nil {
- logger.Warnw(ctx, "onu-activation-indication-reporting-failed", log.Fields{"error": err})
+ logger.Warnw(ctx, "onu-activation-indication-reporting-failed", log.Fields{"err": err})
}
}
if err := dh.updateOnuStates(ctx, onuDevice, onuInd); err != nil {
@@ -1614,7 +1629,7 @@
//get the child device for the parent device
onuDevices, err := dh.coreProxy.GetChildDevices(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id)
if err != nil {
- logger.Errorw(ctx, "failed-to-get-child-devices-information", log.Fields{"device-id": dh.device.Id, "error": err})
+ logger.Errorw(ctx, "failed-to-get-child-devices-information", log.Fields{"device-id": dh.device.Id, "err": err})
}
if onuDevices != nil {
for _, onuDevice := range onuDevices.Items {
@@ -1679,57 +1694,29 @@
logger.Debugw(ctx, "failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
- tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
+ tpIDList := dh.resourceMgr[onu.IntfID].GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
for _, tpID := range tpIDList {
- if err = dh.resourceMgr.RemoveMeterInfoForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+ if err = dh.resourceMgr[onu.IntfID].RemoveMeterInfoForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.RemoveMeterInfoForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+ if err = dh.resourceMgr[onu.IntfID].RemoveMeterInfoForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
}
- dh.resourceMgr.FreePONResourcesForONU(ctx, onu.IntfID, onu.OnuID, uniID)
- if err = dh.resourceMgr.RemoveTechProfileIDsForOnu(ctx, onu.IntfID, onu.OnuID, uniID); err != nil {
+ dh.resourceMgr[onu.IntfID].FreePONResourcesForONU(ctx, onu.IntfID, onu.OnuID, uniID)
+ if err = dh.resourceMgr[onu.IntfID].RemoveTechProfileIDsForOnu(ctx, onu.IntfID, onu.OnuID, uniID); err != nil {
logger.Debugw(ctx, "failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
}
logger.Debugw(ctx, "removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
- if err = dh.resourceMgr.DeletePacketInGemPortForOnu(ctx, onu.IntfID, onu.OnuID, port); err != nil {
+ if err = dh.resourceMgr[onu.IntfID].DeletePacketInGemPortForOnu(ctx, onu.IntfID, onu.OnuID, port); err != nil {
logger.Debugw(ctx, "failed-to-remove-gemport-pkt-in", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
}
- if err = dh.resourceMgr.RemoveAllFlowsForIntfOnuUniKey(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID)); err != nil {
- logger.Debugw(ctx, "failed-to-remove-flow-for", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
- }
}
return nil
}
-func (dh *DeviceHandler) clearNNIData(ctx context.Context) error {
- nniUniID := -1
- nniOnuID := -1
-
- if dh.resourceMgr == nil {
- return olterrors.NewErrNotFound("resource-manager", log.Fields{"device-id": dh.device.Id}, nil)
- }
- //Free the flow-ids for the NNI port
- nni, err := dh.resourceMgr.GetNNIFromKVStore(ctx)
- if err != nil {
- return olterrors.NewErrPersistence("get", "nni", 0, nil, err)
- }
- logger.Debugw(ctx, "nni-", log.Fields{"nni": nni})
- for _, nniIntfID := range nni {
- dh.resourceMgr.RemoveResourceMap(ctx, nniIntfID, int32(nniOnuID), int32(nniUniID))
- _ = dh.resourceMgr.RemoveAllFlowsForIntfOnuUniKey(ctx, nniIntfID, -1, -1)
-
- }
- if err = dh.resourceMgr.DelNNiFromKVStore(ctx); err != nil {
- return olterrors.NewErrPersistence("clear", "nni", 0, nil, err)
- }
-
- return nil
-}
-
// DeleteDevice deletes the device instance from openolt handler array. Also clears allocated resource manager resources. Also reboots the OLT hardware!
func (dh *DeviceHandler) DeleteDevice(ctx context.Context, device *voltha.Device) error {
logger.Debug(ctx, "function-entry-delete-device")
@@ -1767,13 +1754,8 @@
if dh.resourceMgr != nil {
var ponPort uint32
for ponPort = 0; ponPort < dh.totalPonPorts; ponPort++ {
- var onuGemData []rsrcMgr.OnuGemInfo
- err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ctx, ponPort, &onuGemData)
- if err != nil {
- _ = olterrors.NewErrNotFound("onu", log.Fields{
- "device-id": dh.device.Id,
- "pon-port": ponPort}, err).Log()
- }
+ var err error
+ onuGemData := dh.flowMgr[ponPort].getOnuGemInfoList()
for i, onu := range onuGemData {
onuID := make([]uint32, 1)
logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
@@ -1782,31 +1764,22 @@
}
// Clear flowids for gem cache.
for _, gem := range onu.GemPorts {
- dh.resourceMgr.DeleteFlowIDsForGem(ctx, ponPort, gem)
+ dh.resourceMgr[ponPort].DeleteFlowIDsForGem(ctx, ponPort, gem)
}
onuID[0] = onu.OnuID
- dh.resourceMgr.FreeonuID(ctx, ponPort, onuID)
+ dh.resourceMgr[ponPort].FreeonuID(ctx, ponPort, onuID)
+ err = dh.resourceMgr[ponPort].DelOnuGemInfo(ctx, ponPort, onu.OnuID)
+ if err != nil {
+ logger.Errorw(ctx, "failed-to-update-onugem-info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
+ }
}
- dh.resourceMgr.DeleteIntfIDGempMapPath(ctx, ponPort)
- onuGemData = nil
- err = dh.resourceMgr.DelOnuGemInfoForIntf(ctx, ponPort)
- if err != nil {
- logger.Errorw(ctx, "failed-to-update-onugem-info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
- }
+ /* Clear the resource pool for each PON port in the background */
+ go func(ponPort uint32) {
+ if err := dh.resourceMgr[ponPort].Delete(ctx, ponPort); err != nil {
+ logger.Debug(ctx, err)
+ }
+ }(ponPort)
}
- /* Clear the flows from KV store associated with NNI port.
- There are mostly trap rules from NNI port (like LLDP)
- */
- if err := dh.clearNNIData(ctx); err != nil {
- logger.Errorw(ctx, "failed-to-clear-data-for-NNI-port", log.Fields{"device-id": dh.device.Id})
- }
-
- /* Clear the resource pool for each PON port in the background */
- go func() {
- if err := dh.resourceMgr.Delete(ctx); err != nil {
- logger.Debug(ctx, err)
- }
- }()
}
/*Delete ONU map for the device*/
@@ -2190,56 +2163,48 @@
}
onu := &oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: sn}
+ //clear PON resources associated with ONU
+ onuGem, err := dh.resourceMgr[intfID].GetOnuGemInfo(ctx, intfID, onuID)
+ if err != nil || onuGem == nil || onuGem.OnuID != onuID {
+ logger.Warnw(ctx, "failed-to-get-onu-info-for-pon-port", log.Fields{
+ "device-id": dh.device.Id,
+ "intf-id": intfID,
+ "onuID": onuID,
+ "err": err})
+ } else {
+ logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
+ if err := dh.clearUNIData(ctx, onuGem); err != nil {
+ logger.Warnw(ctx, "failed-to-clear-uni-data-for-onu", log.Fields{
+ "device-id": dh.device.Id,
+ "onu-device": onu,
+ "err": err})
+ }
+ // Clear flowids for gem cache.
+ for _, gem := range onuGem.GemPorts {
+ dh.resourceMgr[intfID].DeleteFlowIDsForGem(ctx, intfID, gem)
+ }
+ err := dh.resourceMgr[intfID].DelOnuGemInfo(ctx, intfID, onuID)
+ if err != nil {
+ logger.Warnw(ctx, "persistence-update-onu-gem-info-failed", log.Fields{
+ "intf-id": intfID,
+ "onu-device": onu,
+ "onu-gem": onuGem,
+ "err": err})
+ //Not returning error on cleanup.
+ }
+ logger.Debugw(ctx, "removed-onu-gem-info", log.Fields{"intf": intfID, "onu-device": onu, "onugem": onuGem})
+ dh.resourceMgr[intfID].FreeonuID(ctx, intfID, []uint32{onuGem.OnuID})
+ }
+ dh.onus.Delete(onuKey)
+ dh.discOnus.Delete(onuSn)
+
+ // Now clear the ONU on the OLT
if _, err := dh.Client.DeleteOnu(log.WithSpanFromContext(context.Background(), ctx), onu); err != nil {
return olterrors.NewErrAdapter("failed-to-delete-onu", log.Fields{
"device-id": dh.device.Id,
"onu-id": onuID}, err).Log()
}
- //clear PON resources associated with ONU
- var onuGemData []rsrcMgr.OnuGemInfo
- if onuMgr, ok := dh.resourceMgr.ResourceMgrs[intfID]; !ok {
- logger.Warnw(ctx, "failed-to-get-resource-manager-for-interface-Id", log.Fields{
- "device-id": dh.device.Id,
- "intf-id": intfID})
- } else {
- if err := onuMgr.GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
- logger.Warnw(ctx, "failed-to-get-onu-info-for-pon-port", log.Fields{
- "device-id": dh.device.Id,
- "intf-id": intfID,
- "error": err})
- } else {
- for i, onu := range onuGemData {
- if onu.OnuID == onuID && onu.SerialNumber == onuSn {
- logger.Debugw(ctx, "onu-data", log.Fields{"onu": onu})
- if err := dh.clearUNIData(ctx, &onuGemData[i]); err != nil {
- logger.Warnw(ctx, "failed-to-clear-uni-data-for-onu", log.Fields{
- "device-id": dh.device.Id,
- "onu-device": onu,
- "error": err})
- }
- // Clear flowids for gem cache.
- for _, gem := range onu.GemPorts {
- dh.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gem)
- }
- onuGemData = append(onuGemData[:i], onuGemData[i+1:]...)
- err := onuMgr.AddOnuGemInfo(ctx, intfID, onuGemData)
- if err != nil {
- logger.Warnw(ctx, "persistence-update-onu-gem-info-failed", log.Fields{
- "intf-id": intfID,
- "onu-device": onu,
- "onu-gem": onuGemData,
- "error": err})
- //Not returning error on cleanup.
- }
- logger.Debugw(ctx, "removed-onu-gem-info", log.Fields{"intf": intfID, "onu-device": onu, "onugem": onuGemData})
- dh.resourceMgr.FreeonuID(ctx, intfID, []uint32{onu.OnuID})
- break
- }
- }
- }
- }
- dh.onus.Delete(onuKey)
- dh.discOnus.Delete(onuSn)
+
return nil
}
@@ -2334,7 +2299,7 @@
/*
resp, err = dh.Client.GetValue(ctx, valueparam)
if err != nil {
- logger.Errorw("error-while-getValue", log.Fields{"DeviceID": dh.device, "onu-id": onuid, "error": err})
+ logger.Errorw("error-while-getValue", log.Fields{"DeviceID": dh.device, "onu-id": onuid, "err": err})
return nil, err
}
*/
@@ -2429,6 +2394,7 @@
// Step2 : Push the McastFlowOrGroupControlBlock to appropriate channel
// Step3 : Wait on response channel for response
// Step4 : Return error value
+ startTime := time.Now()
logger.Debugw(ctx, "process-flow-or-group", log.Fields{"flow": flow, "group": group, "action": action})
errChan := make(chan error)
var groupID uint32
@@ -2451,7 +2417,7 @@
dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
// Wait for handler to return error value
err := <-errChan
- logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"flow": flow, "group": group, "action": action, "err": err})
+ logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
return err
}
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 1cae8b6..d69be1a 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -19,8 +19,7 @@
import (
"context"
- conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ conf "github.com/opencord/voltha-lib-go/v5/pkg/config"
"net"
"reflect"
"sync"
@@ -29,11 +28,11 @@
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- "github.com/opencord/voltha-lib-go/v4/pkg/pmmetrics"
- ponrmgr "github.com/opencord/voltha-lib-go/v4/pkg/ponresourcemanager"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/pmmetrics"
+ ponrmgr "github.com/opencord/voltha-lib-go/v5/pkg/ponresourcemanager"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
@@ -169,16 +168,17 @@
}}
deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: oopRanges, Model: "openolt", DeviceId: dh.device.Id, PonPorts: NumPonPorts}
- rsrMgr := resourcemanager.OpenOltResourceMgr{DeviceID: dh.device.Id, DeviceType: dh.device.Type, DevInfo: deviceInf,
- KVStore: &db.Backend{
- Client: &mocks.MockKVClient{},
- }}
- rsrMgr.AllocIDMgmtLock = make([]sync.RWMutex, deviceInf.PonPorts)
- rsrMgr.GemPortIDMgmtLock = make([]sync.RWMutex, deviceInf.PonPorts)
- rsrMgr.OnuIDMgmtLock = make([]sync.RWMutex, deviceInf.PonPorts)
+ dh.deviceInfo = deviceInf
+ dh.resourceMgr = make([]*resourcemanager.OpenOltResourceMgr, deviceInf.PonPorts)
+ var i uint32
+ for i = 0; i < deviceInf.PonPorts; i++ {
+ dh.resourceMgr[i] = &resourcemanager.OpenOltResourceMgr{DeviceID: dh.device.Id, DeviceType: dh.device.Type, DevInfo: deviceInf,
+ KVStore: &db.Backend{
+ Client: &mocks.MockKVClient{},
+ }}
+ dh.resourceMgr[i].InitLocalCache()
+ }
- dh.resourceMgr = &rsrMgr
- dh.resourceMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
ranges := make(map[string]interface{})
sharedIdxByType := make(map[string]string)
sharedIdxByType["ALLOC_ID"] = "ALLOC_ID"
@@ -195,13 +195,6 @@
ranges["flow_id_shared"] = uint32(0)
ponmgr := &ponrmgr.PONResourceManager{}
-
- ctx := context.TODO()
- tpMgr, err := tp.NewTechProfile(ctx, ponmgr, "etcd", "127.0.0.1", "/")
- if err != nil {
- logger.Fatal(ctx, err.Error())
- }
-
ponmgr.DeviceID = "onu-1"
ponmgr.IntfIDs = []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
ponmgr.KVStore = &db.Backend{
@@ -209,19 +202,28 @@
}
ponmgr.PonResourceRanges = ranges
ponmgr.SharedIdxByType = sharedIdxByType
+ ponmgr.Technology = "XGS-PON"
+ for i = 0; i < deviceInf.PonPorts; i++ {
+ dh.resourceMgr[i].PonRsrMgr = ponmgr
+ }
+
+ /*
+ tpMgr, err := tp.NewTechProfile(ctx, ponmgr, "etcd", "127.0.0.1", "/")
+ if err != nil {
+ logger.Fatal(ctx, err.Error())
+ }
+ */
+ tpMgr := &mocks.MockTechProfile{TpID: 64}
ponmgr.TechProfileMgr = tpMgr
- for i := 0; i < NumPonPorts; i++ {
- dh.resourceMgr.ResourceMgrs[uint32(i)] = ponmgr
- }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr)
+ dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0])
dh.totalPonPorts = NumPonPorts
dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
- for i := 0; i < int(dh.totalPonPorts); i++ {
+ for i = 0; i < dh.totalPonPorts; i++ {
// Instantiate flow manager
- if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr, dh.groupMgr, uint32(i)); dh.flowMgr[i] == nil {
+ if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr[i], dh.groupMgr, uint32(i)); dh.flowMgr[i] == nil {
return nil
}
}
@@ -443,18 +445,18 @@
var err error
if marshalledData, err = ptypes.MarshalAny(body); err != nil {
- logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"error": err})
+ logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"err": err})
}
var marshalledData1 *any.Any
if marshalledData1, err = ptypes.MarshalAny(body2); err != nil {
- logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"error": err})
+ logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"err": err})
}
var marshalledData2 *any.Any
if marshalledData2, err = ptypes.MarshalAny(body3); err != nil {
- logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"error": err})
+ logger.Errorw(ctx, "cannot-marshal-request", log.Fields{"err": err})
}
type args struct {
msg *ic.InterAdapterMessage
diff --git a/internal/pkg/core/olt_platform.go b/internal/pkg/core/olt_platform.go
index b85dd4b..9526ee7 100644
--- a/internal/pkg/core/olt_platform.go
+++ b/internal/pkg/core/olt_platform.go
@@ -20,8 +20,8 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
@@ -112,6 +112,10 @@
minNniIntPortNum = (1 << bitsforNNIID)
// maxNniPortNum is used to store the maximum range of nni port number ((1 << 21)-1) 2097151
maxNniPortNum = ((1 << (bitsforNNIID + 1)) - 1)
+ // minPonIntfPortNum stores the minimum pon port number
+ minPonIntfPortNum = ponIntfMarkerValue << ponIntfMarkerPos
+ // maxPonIntfPortNum stores the maximum pon port number
+ maxPonIntfPortNum = (ponIntfMarkerValue << ponIntfMarkerPos) | (2 ^ bitsForPONID - 1)
)
//MinUpstreamPortID value
@@ -177,6 +181,15 @@
return (portNum & 0xFFFF), nil
}
+//IntfIDFromPonPortNum returns Intf ID derived from portNum
+func IntfIDFromPonPortNum(ctx context.Context, portNum uint32) (uint32, error) {
+ if portNum < minPonIntfPortNum || portNum > maxPonIntfPortNum {
+ logger.Errorw(ctx, "ponportnumber-is-not-in-valid-range", log.Fields{"portnum": portNum})
+ return uint32(0), olterrors.ErrInvalidPortRange
+ }
+ return (portNum & 0x7FFF), nil
+}
+
//IntfIDToPortTypeName returns port type derived from the intfId
func IntfIDToPortTypeName(intfID uint32) voltha.Port_PortType {
if ((ponIntfMarkerValue << ponIntfMarkerPos) ^ intfID) < MaxPonsPerOlt {
diff --git a/internal/pkg/core/olt_platform_test.go b/internal/pkg/core/olt_platform_test.go
index 4c89eaa..6ff32db 100644
--- a/internal/pkg/core/olt_platform_test.go
+++ b/internal/pkg/core/olt_platform_test.go
@@ -19,7 +19,7 @@
import (
"context"
- fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
+ fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/voltha"
diff --git a/internal/pkg/core/olt_state_transitions.go b/internal/pkg/core/olt_state_transitions.go
index e67bd43..5fab1d5 100644
--- a/internal/pkg/core/olt_state_transitions.go
+++ b/internal/pkg/core/olt_state_transitions.go
@@ -22,7 +22,7 @@
"reflect"
"runtime"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
// DeviceState OLT Device state
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index 436d67a..33b3f1b 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -22,11 +22,11 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
- conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
- "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+ conf "github.com/opencord/voltha-lib-go/v5/pkg/config"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"github.com/opencord/voltha-protos/v4/go/extension"
@@ -159,6 +159,20 @@
return olterrors.NewErrNotFound("device-handler", log.Fields{"device-id": targetDevice}, nil)
}
+//Process_tech_profile_instance_request processes tech profile request message from onu adapter
+func (oo *OpenOLT) Process_tech_profile_instance_request(ctx context.Context, msg *ic.InterAdapterTechProfileInstanceRequestMessage) *ic.InterAdapterTechProfileDownloadMessage {
+ logger.Debugw(ctx, "Process_tech_profile_instance_request", log.Fields{"tpPath": msg.TpInstancePath})
+ targetDeviceID := msg.ParentDeviceId // Request?
+ if targetDeviceID == "" {
+ logger.Error(ctx, "device-id-nil")
+ return nil
+ }
+ if handler := oo.getDeviceHandler(targetDeviceID); handler != nil {
+ return handler.GetInterAdapterTechProfileDownloadMessage(ctx, msg.TpInstancePath, msg.ParentPonPort, msg.OnuId, msg.UniId)
+ }
+ return nil
+}
+
//Adapter_descriptor not implemented
func (oo *OpenOLT) Adapter_descriptor(ctx context.Context) error {
return olterrors.ErrNotImplemented
@@ -407,7 +421,7 @@
if handler := oo.getDeviceHandler(deviceID); handler != nil {
if resp, err = handler.getExtValue(ctx, device, valueparam); err != nil {
logger.Errorw(ctx, "error-occurred-during-get-ext-value", log.Fields{"device-id": deviceID, "onu-id": device.Id,
- "error": err})
+ "err": err})
return nil, err
}
}
diff --git a/internal/pkg/core/openolt_eventmgr.go b/internal/pkg/core/openolt_eventmgr.go
index 336592c..1b5abd1 100644
--- a/internal/pkg/core/openolt_eventmgr.go
+++ b/internal/pkg/core/openolt_eventmgr.go
@@ -23,8 +23,8 @@
"fmt"
"strconv"
- "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"github.com/opencord/voltha-protos/v4/go/common"
oop "github.com/opencord/voltha-protos/v4/go/openolt"
@@ -660,7 +660,7 @@
func (em *OpenOltEventMgr) oltIntfOperIndication(ctx context.Context, ifindication *oop.IntfOperIndication, deviceID string, raisedTs int64) {
portNo := IntfIDToPortNo(ifindication.IntfId, voltha.Port_PON_OLT)
if port, err := em.handler.coreProxy.GetDevicePort(ctx, deviceID, portNo); err != nil {
- logger.Warnw(ctx, "Error while fetching port object", log.Fields{"device-id": deviceID, "error": err})
+ logger.Warnw(ctx, "Error while fetching port object", log.Fields{"device-id": deviceID, "err": err})
} else if port.AdminState != common.AdminState_ENABLED {
logger.Debugw(ctx, "port-disable/enable-event-not-generated--the-port-is-not-enabled-by-operator", log.Fields{"device-id": deviceID, "port": port})
return
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 4ae86ac..9c1cb09 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -22,14 +22,15 @@
"encoding/hex"
"errors"
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/meters"
+ "github.com/opencord/voltha-lib-go/v5/pkg/meters"
"strconv"
"strings"
"sync"
+ "time"
- "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ tp "github.com/opencord/voltha-lib-go/v5/pkg/techprofile"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
"github.com/opencord/voltha-protos/v4/go/common"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
@@ -154,11 +155,6 @@
pbit1 = '1'
)
-type gemPortKey struct {
- intfID uint32
- gemPort uint32
-}
-
type schedQueue struct {
direction tp_pb.Direction
intfID uint32
@@ -186,15 +182,6 @@
gemToAes map[uint32]bool
}
-// subscriberDataPathFlowIDKey is key to subscriberDataPathFlowIDMap map
-type subscriberDataPathFlowIDKey struct {
- intfID uint32
- onuID uint32
- uniID uint32
- direction string
- tpID uint32
-}
-
// This control block is created per flow add/remove and pushed on the incomingFlows channel slice
// The flowControlBlock is then picked by the perOnuFlowHandlerRoutine for further processing.
// There is on perOnuFlowHandlerRoutine routine per ONU that constantly monitors for any incoming
@@ -210,37 +197,28 @@
//OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
type OpenOltFlowMgr struct {
ponPortIdx uint32 // Pon Port this FlowManager is responsible for
- techprofile map[uint32]tp.TechProfileIf
+ techprofile tp.TechProfileIf
deviceHandler *DeviceHandler
grpMgr *OpenOltGroupMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
- onuIdsLock sync.RWMutex // TODO: Do we need this?
-
- flowsUsedByGemPort map[uint32][]uint64 // gem port id to flow ids
- flowsUsedByGemPortKey sync.RWMutex // lock to be used to access the flowsUsedByGemPort map
+ gemToFlowIDs map[uint32][]uint64 // gem port id to flow ids
+ gemToFlowIDsKey sync.RWMutex // lock to be used to access the gemToFlowIDs map
packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
packetInGemPortLock sync.RWMutex
// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
- onuGemInfo []rsrcMgr.OnuGemInfo //onu, gem and uni info local cache
+ onuGemInfoMap map[uint32]*rsrcMgr.OnuGemInfo //onu, gem and uni info local cache -> map of onuID to OnuGemInfo
// We need to have a global lock on the onuGemInfo map
onuGemInfoLock sync.RWMutex
- // Map of voltha flowID associated with subscriberDataPathFlowIDKey
- // This information is not persisted on Kv store and hence should be reconciled on adapter restart
- subscriberDataPathFlowIDMap map[subscriberDataPathFlowIDKey]uint64
- subscriberDataPathFlowIDMapLock sync.RWMutex
+ flowIDToGems map[uint64][]uint32
+ flowIDToGemsLock sync.RWMutex
// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
incomingFlows []chan flowControlBlock
-
- //this map keeps uni port info by gem and pon port. This relation shall be used for packet-out operations
- gemToUniMap map[gemPortKey][]uint32
- //We need to have a global lock on the gemToUniLock map
- gemToUniLock sync.RWMutex
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -248,23 +226,18 @@
logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
var flowMgr OpenOltFlowMgr
var err error
- var idx uint32
flowMgr.deviceHandler = dh
+ flowMgr.ponPortIdx = ponPortIdx
flowMgr.grpMgr = grpMgr
flowMgr.resourceMgr = rMgr
- flowMgr.techprofile = make(map[uint32]tp.TechProfileIf)
if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
- logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"error": err})
+ logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
return nil
}
- flowMgr.onuIdsLock = sync.RWMutex{}
- flowMgr.flowsUsedByGemPort = make(map[uint32][]uint64)
+ flowMgr.gemToFlowIDs = make(map[uint32][]uint64)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
- flowMgr.packetInGemPortLock = sync.RWMutex{}
- flowMgr.onuGemInfoLock = sync.RWMutex{}
- flowMgr.subscriberDataPathFlowIDMap = make(map[subscriberDataPathFlowIDKey]uint64)
- flowMgr.subscriberDataPathFlowIDMapLock = sync.RWMutex{}
+ flowMgr.flowIDToGems = make(map[uint64][]uint32)
// Create a slice of buffered channels for handling concurrent flows per ONU.
// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
@@ -276,54 +249,35 @@
// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
}
-
+ flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
//Load the onugem info cache from kv store on flowmanager start
- if flowMgr.onuGemInfo, err = rMgr.GetOnuGemInfo(ctx, ponPortIdx); err != nil {
- logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
+ onuIDStart := flowMgr.deviceHandler.deviceInfo.OnuIdStart
+ onuIDEnd := flowMgr.deviceHandler.deviceInfo.OnuIdEnd
+ for onuID := onuIDStart; onuID <= onuIDEnd; onuID++ {
+ // check for a valid serial number in onuGem as GetOnuGemInfo can return nil error in case of nothing found in the path.
+ onugem, err := rMgr.GetOnuGemInfo(ctx, onuID, ponPortIdx)
+ if err == nil && onugem != nil && onugem.SerialNumber != "" {
+ flowMgr.onuGemInfoMap[onuID] = onugem
+ }
}
- //Load flowID list per gem map per interface from the kvstore.
- flowMgr.loadFlowIDlistForGem(ctx, idx)
+
+ //Load flowID list per gem map And gemIDs per flow per interface from the kvstore.
+ flowMgr.loadFlowIDsForGemAndGemIDsForFlow(ctx)
+
//load interface to multicast queue map from kv store
-
- flowMgr.gemToUniMap = make(map[gemPortKey][]uint32)
- flowMgr.gemToUniLock = sync.RWMutex{}
-
flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
- flowMgr.reconcileSubscriberDataPathFlowIDMap(ctx)
logger.Info(ctx, "initialization-of-flow-manager-success")
return &flowMgr
}
-// toGemToUniMap adds uni info consisting of onu and uni ID to the map and associates it with a gem port
-func (f *OpenOltFlowMgr) toGemToUniMap(ctx context.Context, gemPK gemPortKey, onuID uint32, uniID uint32) {
- f.gemToUniLock.Lock()
- f.gemToUniMap[gemPK] = []uint32{onuID, uniID}
- f.gemToUniLock.Unlock()
-}
-
-// fromGemToUniMap returns onu and uni ID associated with the given key
-func (f *OpenOltFlowMgr) fromGemToUniMap(key gemPortKey) ([]uint32, bool) {
- f.gemToUniLock.RLock()
- defer f.gemToUniLock.RUnlock()
- val, ok := f.gemToUniMap[key]
- return val, ok
-}
-
-// removeFromGemToUniMap removes an entry associated with the given key from gemToUniMap
-func (f *OpenOltFlowMgr) removeFromGemToUniMap(key gemPortKey) {
- f.gemToUniLock.Lock()
- defer f.gemToUniLock.Unlock()
- delete(f.gemToUniMap, key)
-}
-
func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
// Flow is not replicated in this case, we need to register the flow for a single gem-port
- return f.registerFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
+ return f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
} else if deviceFlow.ReplicateFlow && len(deviceFlow.PbitToGemport) > 0 {
// Flow is replicated in this case. We need to register the flow for all the gem-ports it is replicated to.
for _, gemPort := range deviceFlow.PbitToGemport {
- if err := f.registerFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
+ if err := f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
return err
}
}
@@ -331,15 +285,26 @@
return nil
}
-func (f *OpenOltFlowMgr) registerFlowIDForGem(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
- f.flowsUsedByGemPortKey.Lock()
- flowIDList, ok := f.flowsUsedByGemPort[gemPortID]
+func (f *OpenOltFlowMgr) registerFlowIDForGemAndGemIDForFlow(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
+ // update gem->flows map
+ f.gemToFlowIDsKey.Lock()
+ flowIDList, ok := f.gemToFlowIDs[gemPortID]
if !ok {
flowIDList = []uint64{flowFromCore.Id}
+ } else {
+ flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
}
- flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
- f.flowsUsedByGemPort[gemPortID] = flowIDList
- f.flowsUsedByGemPortKey.Unlock()
+ f.gemToFlowIDs[gemPortID] = flowIDList
+ f.gemToFlowIDsKey.Unlock()
+
+ // update flow->gems map
+ f.flowIDToGemsLock.Lock()
+ if _, ok := f.flowIDToGems[flowFromCore.Id]; !ok {
+ f.flowIDToGems[flowFromCore.Id] = []uint32{gemPortID}
+ } else {
+ f.flowIDToGems[flowFromCore.Id] = appendUnique32bit(f.flowIDToGems[flowFromCore.Id], gemPortID)
+ }
+ f.flowIDToGemsLock.Unlock()
// update the flowids for a gem to the KVstore
return f.resourceMgr.UpdateFlowIDsForGem(ctx, accessIntfID, gemPortID, flowIDList)
@@ -452,7 +417,7 @@
if meterInfo != nil {
logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
- if meterInfo.MeterConfig.MeterId == sq.meterID {
+ if meterInfo.MeterID == sq.meterID {
if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true); err != nil {
return err
}
@@ -460,7 +425,7 @@
}
return olterrors.NewErrInvalidValue(log.Fields{
"unsupported": "meter-id",
- "kv-store-meter-id": meterInfo.MeterConfig.MeterId,
+ "kv-store-meter-id": meterInfo.MeterID,
"meter-id-in-flow": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
}
@@ -472,18 +437,9 @@
"device-id": f.deviceHandler.device.Id})
if sq.direction == tp_pb.Direction_UPSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetUsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+ SchedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
} else if sq.direction == tp_pb.Direction_DOWNSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetDsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
- }
-
- if err != nil {
- return olterrors.NewErrNotFound("scheduler-config",
- log.Fields{
- "intf-id": sq.intfID,
- "direction": sq.direction,
- "tp-inst": sq.tpInst,
- "device-id": f.deviceHandler.device.Id}, err)
+ SchedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
}
found := false
@@ -491,13 +447,10 @@
if sq.flowMetadata != nil {
for _, meter := range sq.flowMetadata.Meters {
if sq.meterID == meter.MeterId {
- meterInfo.MeterConfig = ofp.OfpMeterConfig{}
- meterInfo.MeterConfig.MeterId = meter.MeterId
- meterInfo.MeterConfig.Flags = meter.Flags
+ meterInfo.MeterID = meter.MeterId
meterInfo.RefCnt = 1 // initialize it to 1, since this is the first flow that referenced the meter id.
- meterInfo.MeterConfig.Bands = append(meterInfo.MeterConfig.Bands, meter.Bands...)
logger.Debugw(ctx, "found-meter-config-from-flowmetadata",
- log.Fields{"meterConfig": meterInfo.MeterConfig,
+ log.Fields{"meter": meter,
"device-id": f.deviceHandler.device.Id})
found = true
break
@@ -515,14 +468,14 @@
}
var TrafficShaping *tp_pb.TrafficShapingInfo
- if TrafficShaping, err = meters.GetTrafficShapingInfo(ctx, &meterInfo.MeterConfig); err != nil {
+ if TrafficShaping, err = meters.GetTrafficShapingInfo(ctx, sq.flowMetadata.Meters[0]); err != nil {
return olterrors.NewErrInvalidValue(log.Fields{
"reason": "invalid-meter-config",
"meter-id": sq.meterID,
"device-id": f.deviceHandler.device.Id}, nil)
}
- TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
if err := f.pushSchedulerQueuesToDevice(ctx, sq, TrafficSched); err != nil {
@@ -549,7 +502,7 @@
}
func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(ctx context.Context, sq schedQueue, TrafficSched []*tp_pb.TrafficScheduler) error {
- trafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
+ trafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
if err != nil {
return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
@@ -598,10 +551,9 @@
"device-id": f.deviceHandler.device.Id})
if sq.direction == tp_pb.Direction_DOWNSTREAM {
- multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile))
+ multicastTrafficQueues := f.techprofile.GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance))
if len(multicastTrafficQueues) > 0 {
- if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present {
- //assumed that there is only one queue per PON for the multicast service
+ if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present { //assumed that there is only one queue per PON for the multicast service
//the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
//just put it in interfaceToMcastQueueMap to use for building group members
logger.Debugw(ctx, "multicast-traffic-queues", log.Fields{"device-id": f.deviceHandler.device.Id})
@@ -613,7 +565,7 @@
f.grpMgr.UpdateInterfaceToMcastQueueMap(sq.intfID, val)
//also store the queue info in kv store
if err := f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID, multicastQueuePerPonPort.GemportId, multicastQueuePerPonPort.Priority); err != nil {
- logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"error": err})
+ logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"err": err})
return err
}
@@ -639,27 +591,19 @@
"uni-port": sq.uniPort,
"device-id": f.deviceHandler.device.Id})
if sq.direction == tp_pb.Direction_UPSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetUsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+ SchedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
Direction = "upstream"
} else if sq.direction == tp_pb.Direction_DOWNSTREAM {
- SchedCfg, err = f.techprofile[sq.intfID].GetDsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+ SchedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
Direction = "downstream"
}
- if err != nil {
- return olterrors.NewErrNotFound("scheduler-config",
- log.Fields{
- "int-id": sq.intfID,
- "direction": sq.direction,
- "device-id": f.deviceHandler.device.Id}, err)
- }
-
TrafficShaping := &tp_pb.TrafficShapingInfo{} // this info is not really useful for the agent during flow removal. Just use default values.
- TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
TrafficSched[0].TechProfileId = sq.tpID
- TrafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
+ TrafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
if err != nil {
return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
log.Fields{
@@ -703,7 +647,7 @@
"uni-port": sq.uniPort})
if sq.direction == tp_pb.Direction_UPSTREAM {
- allocID := sq.tpInst.(*tp.TechProfile).UsScheduler.AllocID
+ allocID := sq.tpInst.(*tp_pb.TechProfileInstance).UsScheduler.AllocId
f.resourceMgr.FreeAllocID(ctx, sq.intfID, sq.onuID, sq.uniID, allocID)
// Delete the TCONT on the ONU.
uni := getUniPortPath(f.deviceHandler.device.Id, sq.intfID, int32(sq.onuID), int32(sq.uniID))
@@ -752,7 +696,6 @@
var gemPortIDs []uint32
tpInstanceExists := false
var err error
-
allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
tpPath := f.getTPpath(ctx, intfID, uni, TpID)
@@ -765,24 +708,24 @@
"tp-id": TpID})
// Check tech profile instance already exists for derived port name
- techProfileInstance, _ := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, TpID, tpPath)
+ techProfileInstance, _ := f.techprofile.GetTPInstance(ctx, tpPath)
if techProfileInstance == nil {
logger.Infow(ctx, "tp-instance-not-found--creating-new",
log.Fields{
"path": tpPath,
"device-id": f.deviceHandler.device.Id})
- techProfileInstance, err = f.techprofile[intfID].CreateTechProfInstance(ctx, TpID, uni, intfID)
+ techProfileInstance, err = f.techprofile.CreateTechProfileInstance(ctx, TpID, uni, intfID)
if err != nil {
// This should not happen, something wrong in KV backend transaction
logger.Errorw(ctx, "tp-instance-create-failed",
log.Fields{
- "error": err,
+ "err": err,
"tp-id": TpID,
"device-id": f.deviceHandler.device.Id})
return 0, nil, nil
}
if err := f.resourceMgr.UpdateTechProfileIDForOnu(ctx, intfID, onuID, uniID, TpID); err != nil {
- logger.Warnw(ctx, "failed-to-update-tech-profile-id", log.Fields{"error": err})
+ logger.Warnw(ctx, "failed-to-update-tech-profile-id", log.Fields{"err": err})
}
} else {
logger.Debugw(ctx, "tech-profile-instance-already-exist-for-given port-name",
@@ -793,14 +736,14 @@
}
switch tpInst := techProfileInstance.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
if UsMeterID != 0 {
sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
logger.Errorw(ctx, "CreateSchedulerQueues-failed-upstream",
log.Fields{
- "error": err,
+ "err": err,
"onu-id": onuID,
"uni-id": uniID,
"intf-id": intfID,
@@ -815,7 +758,7 @@
if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
logger.Errorw(ctx, "CreateSchedulerQueues-failed-downstream",
log.Fields{
- "error": err,
+ "err": err,
"onu-id": onuID,
"uni-id": uniID,
"intf-id": intfID,
@@ -824,9 +767,9 @@
return 0, nil, nil
}
}
- allocID := tpInst.UsScheduler.AllocID
+ allocID := tpInst.UsScheduler.AllocId
for _, gem := range tpInst.UpstreamGemPortAttributeList {
- gemPortIDs = append(gemPortIDs, gem.GemportID)
+ gemPortIDs = append(gemPortIDs, gem.GemportId)
}
allocIDs = appendUnique32bit(allocIDs, allocID)
@@ -848,12 +791,12 @@
// Send Tconts and GEM ports to KV store
f.storeTcontsGEMPortsIntoKVStore(ctx, intfID, onuID, uniID, allocIDs, allgemPortIDs)
return allocID, gemPortIDs, techProfileInstance
- case *tp.EponProfile:
+ case *openoltpb2.EponTechProfileInstance:
// CreateSchedulerQueues for EPON needs to be implemented here
// when voltha-protos for EPON is completed.
- allocID := tpInst.AllocID
+ allocID := tpInst.AllocId
for _, gem := range tpInst.UpstreamQueueAttributeList {
- gemPortIDs = append(gemPortIDs, gem.GemportID)
+ gemPortIDs = append(gemPortIDs, gem.GemportId)
}
allocIDs = appendUnique32bit(allocIDs, allocID)
@@ -897,36 +840,18 @@
if err := f.resourceMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs); err != nil {
logger.Errorw(ctx, "error-while-uploading-gemports-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
}
- if err := f.resourceMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, gemPortIDs, intfID, onuID, uniID); err != nil {
- logger.Error(ctx, "error-while-uploading-gemtopon-map-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
- } else {
- //add to gem to uni cache
- f.addGemPortUniAssociationsToCache(ctx, intfID, onuID, uniID, gemPortIDs)
- }
+
logger.Infow(ctx, "stored-tconts-and-gem-into-kv-store-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
for _, gemPort := range gemPortIDs {
f.addGemPortToOnuInfoMap(ctx, intfID, onuID, gemPort)
}
}
-//addGemPortUniAssociationsToCache
-func (f *OpenOltFlowMgr) addGemPortUniAssociationsToCache(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortIDs []uint32) {
- for _, gemPortID := range gemPortIDs {
- key := gemPortKey{
- intfID: intfID,
- gemPort: gemPortID,
- }
- f.toGemToUniMap(ctx, key, onuID, uniID)
- }
- logger.Debugw(ctx, "gem-to-uni-info-added-to-cache", log.Fields{"device-id": f.deviceHandler.device.Id, "intfID": intfID,
- "gemPortIDs": gemPortIDs, "onuID": onuID, "uniID": uniID})
-}
-
func (f *OpenOltFlowMgr) populateTechProfilePerPonPort(ctx context.Context) error {
var tpCount int
for _, techRange := range f.resourceMgr.DevInfo.Ranges {
for _, intfID := range techRange.IntfIds {
- f.techprofile[intfID] = f.resourceMgr.ResourceMgrs[intfID].TechProfileMgr
+ f.techprofile = f.resourceMgr.PonRsrMgr.TechProfileMgr
tpCount++
logger.Debugw(ctx, "init-tech-profile-done",
log.Fields{
@@ -1004,13 +929,6 @@
func (f *OpenOltFlowMgr) addSymmetricDataPathFlow(ctx context.Context, flowContext *flowContext, direction string) error {
- var inverseDirection string
- if direction == Upstream {
- inverseDirection = Downstream
- } else {
- inverseDirection = Upstream
- }
-
intfID := flowContext.intfID
onuID := flowContext.onuID
uniID := flowContext.uniID
@@ -1067,33 +985,23 @@
}, err).Log()
}
- // Get symmetric flowID if it exists
- // This symmetric flowID will be needed by agent software to use the same device flow-id that was used for the
- // symmetric flow earlier
- // symmetric flowID 0 is considered by agent as non-existent symmetric flow
- keySymm := subscriberDataPathFlowIDKey{intfID: intfID, onuID: onuID, uniID: uniID, direction: inverseDirection, tpID: tpID}
- f.subscriberDataPathFlowIDMapLock.RLock()
- symmFlowID := f.subscriberDataPathFlowIDMap[keySymm]
- f.subscriberDataPathFlowIDMapLock.RUnlock()
-
flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
- OnuId: int32(onuID),
- UniId: int32(uniID),
- FlowId: logicalFlow.Id,
- FlowType: direction,
- AllocId: int32(allocID),
- NetworkIntfId: int32(networkIntfID),
- GemportId: int32(gemPortID),
- Classifier: classifierProto,
- Action: actionProto,
- Priority: int32(logicalFlow.Priority),
- Cookie: logicalFlow.Cookie,
- PortNo: flowContext.portNo,
- TechProfileId: tpID,
- ReplicateFlow: len(flowContext.pbitToGem) > 0,
- PbitToGemport: flowContext.pbitToGem,
- SymmetricFlowId: symmFlowID,
- GemportToAes: flowContext.gemToAes,
+ OnuId: int32(onuID),
+ UniId: int32(uniID),
+ FlowId: logicalFlow.Id,
+ FlowType: direction,
+ AllocId: int32(allocID),
+ NetworkIntfId: int32(networkIntfID),
+ GemportId: int32(gemPortID),
+ Classifier: classifierProto,
+ Action: actionProto,
+ Priority: int32(logicalFlow.Priority),
+ Cookie: logicalFlow.Cookie,
+ PortNo: flowContext.portNo,
+ TechProfileId: tpID,
+ ReplicateFlow: len(flowContext.pbitToGem) > 0,
+ PbitToGemport: flowContext.pbitToGem,
+ GemportToAes: flowContext.gemToAes,
}
if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
return olterrors.NewErrFlowOp("add", logicalFlow.Id, nil, err).Log()
@@ -1104,21 +1012,6 @@
"flow": flow,
"intf-id": intfID,
"onu-id": onuID})
- flowInfo := rsrcMgr.FlowInfo{Flow: &flow, IsSymmtricFlow: true}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(flow.AccessIntfId), flow.OnuId, flow.UniId, flow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id,
- log.Fields{
- "flow": flow,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": intfID,
- "onu-id": onuID}, err).Log()
- }
-
- // Update the current flowID to the map
- keyCurr := subscriberDataPathFlowIDKey{intfID: intfID, onuID: onuID, uniID: uniID, direction: direction, tpID: tpID}
- f.subscriberDataPathFlowIDMapLock.Lock()
- f.subscriberDataPathFlowIDMap[keyCurr] = logicalFlow.Id
- f.subscriberDataPathFlowIDMapLock.Unlock()
return nil
}
@@ -1206,13 +1099,6 @@
"flow-id": logicalFlow.Id,
"intf-id": intfID,
"onu-id": onuID})
- flowInfo := rsrcMgr.FlowInfo{Flow: &dhcpFlow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(dhcpFlow.AccessIntfId), dhcpFlow.OnuId, dhcpFlow.UniId, dhcpFlow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", dhcpFlow.FlowId,
- log.Fields{
- "flow": dhcpFlow,
- "device-id": f.deviceHandler.device.Id}, err).Log()
- }
return nil
}
@@ -1301,11 +1187,6 @@
return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
}
- flowInfo := rsrcMgr.FlowInfo{Flow: &flow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(flow.AccessIntfId), flow.OnuId, flow.UniId, flow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flow.FlowId, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
- }
-
return nil
}
@@ -1336,7 +1217,7 @@
uplinkAction := make(map[string]interface{})
// Fill Classfier
- uplinkClassifier[EthType] = uint32(ethType)
+ uplinkClassifier[EthType] = ethType
uplinkClassifier[PacketTagType] = SingleTag
uplinkClassifier[VlanVid] = vlanID
uplinkClassifier[VlanPcp] = classifier[VlanPcp]
@@ -1415,13 +1296,7 @@
"intf-id": intfID,
"ethType": ethType,
})
- flowInfo := rsrcMgr.FlowInfo{Flow: &upstreamFlow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(upstreamFlow.AccessIntfId), upstreamFlow.OnuId, upstreamFlow.UniId, upstreamFlow.FlowId, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", upstreamFlow.FlowId,
- log.Fields{
- "flow": upstreamFlow,
- "device-id": f.deviceHandler.device.Id}, err).Log()
- }
+
return nil
}
@@ -1502,7 +1377,7 @@
// getTPpath return the ETCD path for a given UNI port
func (f *OpenOltFlowMgr) getTPpath(ctx context.Context, intfID uint32, uniPath string, TpID uint32) string {
- return f.techprofile[intfID].GetTechProfileInstanceKVPath(ctx, TpID, uniPath)
+ return f.techprofile.GetTechProfileInstanceKey(ctx, TpID, uniPath)
}
// DeleteTechProfileInstances removes the tech profile instances from persistent storage
@@ -1512,11 +1387,11 @@
for _, tpID := range tpIDList {
if err := f.DeleteTechProfileInstance(ctx, intfID, onuID, uniID, uniPortName, tpID); err != nil {
- _ = olterrors.NewErrAdapter("delete-tech-profile-failed", log.Fields{"device-id": f.deviceHandler.device.Id}, err).Log()
+ logger.Errorw(ctx, "delete-tech-profile-failed", log.Fields{"err": err, "device-id": f.deviceHandler.device.Id})
// return err
// We should continue to delete tech-profile instances for other TP IDs
}
- logger.Debugw(ctx, "tech-profile-deleted", log.Fields{"device-id": f.deviceHandler.device.Id, "tp-id": tpID})
+ logger.Debugw(ctx, "tech-profile-instance-deleted", log.Fields{"device-id": f.deviceHandler.device.Id, "uniPortName": uniPortName, "tp-id": tpID})
}
return nil
}
@@ -1526,7 +1401,7 @@
if uniPortName == "" {
uniPortName = getUniPortPath(f.deviceHandler.device.Id, intfID, int32(onuID), int32(uniID))
}
- if err := f.techprofile[intfID].DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
+ if err := f.techprofile.DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
return olterrors.NewErrAdapter("failed-to-delete-tp-instance-from-kv-store",
log.Fields{
"tp-id": tpID,
@@ -1698,13 +1573,7 @@
"device-id": f.deviceHandler.device.Id,
"onu-id": onuID,
"flow-id": flow.Id})
- flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flow.Id,
- log.Fields{
- "flow": downstreamflow,
- "device-id": f.deviceHandler.device.Id}, err)
- }
+
return nil
}
@@ -1781,8 +1650,8 @@
return err
}
- delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpPath: tpPath, GemPortId: gemPortID}
- logger.Debugw(ctx, "sending-gem-port-delete-to-openonu-adapter",
+ delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpInstancePath: tpPath, GemPortId: gemPortID}
+ logger.Infow(ctx, "sending-gem-port-delete-to-openonu-adapter",
log.Fields{
"msg": *delGemPortMsg,
"device-id": f.deviceHandler.device.Id})
@@ -1822,7 +1691,7 @@
return err
}
- delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpPath: tpPath, AllocId: allocID}
+ delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpInstancePath: tpPath, AllocId: allocID}
logger.Debugw(ctx, "sending-tcont-delete-to-openonu-adapter",
log.Fields{
"msg": *delTcontMsg,
@@ -1853,37 +1722,38 @@
// Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
// is conveyed to ONOS during packet-in OF message.
func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(ctx context.Context, intfID uint32, onuID uint32, gemPortID uint32) {
-
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
-
logger.Infow(ctx, "deleting-gem-from-local-cache",
log.Fields{
"gem-port-id": gemPortID,
"intf-id": intfID,
"onu-id": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo})
-
- onugem := f.onuGemInfo
+ "device-id": f.deviceHandler.device.Id})
+ f.onuGemInfoLock.RLock()
+ onugem, ok := f.onuGemInfoMap[onuID]
+ f.onuGemInfoLock.RUnlock()
+ if !ok {
+ logger.Warnw(ctx, "onu gem info already cleared from cache", log.Fields{
+ "gem-port-id": gemPortID,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id})
+ return
+ }
deleteLoop:
- for i, onu := range onugem {
- if onu.OnuID == onuID {
- for j, gem := range onu.GemPorts {
- // If the gemport is found, delete it from local cache.
- if gem == gemPortID {
- onu.GemPorts = append(onu.GemPorts[:j], onu.GemPorts[j+1:]...)
- onugem[i] = onu
- logger.Infow(ctx, "removed-gemport-from-local-cache",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "deletedgemport-id": gemPortID,
- "gemports": onu.GemPorts,
- "device-id": f.deviceHandler.device.Id})
- break deleteLoop
- }
- }
+ for j, gem := range onugem.GemPorts {
+ // If the gemport is found, delete it from local cache.
+ if gem == gemPortID {
+ onugem.GemPorts = append(onugem.GemPorts[:j], onugem.GemPorts[j+1:]...)
+ f.onuGemInfoLock.Lock()
+ f.onuGemInfoMap[onuID] = onugem
+ f.onuGemInfoLock.Unlock()
+ logger.Infow(ctx, "removed-gemport-from-local-cache",
+ log.Fields{
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "deletedgemport-id": gemPortID,
+ "gemports": onugem.GemPorts,
+ "device-id": f.deviceHandler.device.Id})
break deleteLoop
}
}
@@ -1891,7 +1761,7 @@
//clearResources clears pon resources in kv store and the device
// nolint: gocyclo
-func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, intfID uint32, onuID int32, uniID int32,
+func (f *OpenOltFlowMgr) clearResources(ctx context.Context, intfID uint32, onuID int32, uniID int32,
gemPortID int32, flowID uint64, portNum uint32, tpID uint32) error {
uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
@@ -1900,27 +1770,22 @@
log.Fields{
"tpPath": tpPath,
"device-id": f.deviceHandler.device.Id})
- techprofileInst, err := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
- if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
- return olterrors.NewErrNotFound("tech-profile-in-kv-store",
- log.Fields{
- "tp-id": tpID,
- "path": tpPath}, err)
- }
used := f.isGemPortUsedByAnotherFlow(uint32(gemPortID))
if used {
- f.flowsUsedByGemPortKey.Lock()
- defer f.flowsUsedByGemPortKey.Unlock()
+ f.gemToFlowIDsKey.RLock()
+ flowIDs := f.gemToFlowIDs[uint32(gemPortID)]
+ f.gemToFlowIDsKey.RUnlock()
- flowIDs := f.flowsUsedByGemPort[uint32(gemPortID)]
for i, flowIDinMap := range flowIDs {
if flowIDinMap == flowID {
flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
- // everytime flowsUsedByGemPort cache is updated the same should be updated
+ f.gemToFlowIDsKey.Lock()
+ f.gemToFlowIDs[uint32(gemPortID)] = flowIDs
+ f.gemToFlowIDsKey.Unlock()
+ // everytime gemToFlowIDs cache is updated the same should be updated
// in kv store by calling UpdateFlowIDsForGem
- f.flowsUsedByGemPort[uint32(gemPortID)] = flowIDs
if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, uint32(gemPortID), flowIDs); err != nil {
return err
}
@@ -1936,28 +1801,17 @@
return nil
}
logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
- f.resourceMgr.RemoveGemPortIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
- // TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
- // But it is anyway eventually removed later when the TechProfile is freed, so not a big issue for now.
- f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), intfID)
- // also clear gem to uni cache
- f.removeFromGemToUniMap(gemPortKey{
- intfID: intfID,
- gemPort: uint32(gemPortID),
- })
f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), uint32(gemPortID))
-
- f.onuIdsLock.Lock() // TODO: What is this lock?
-
- //everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
+ _ = f.resourceMgr.RemoveGemFromOnuGemInfo(ctx, intfID, uint32(onuID), uint32(gemPortID)) // ignore error and proceed.
+ //everytime an entry is deleted from gemToFlowIDs cache, the same should be updated in kv as well
// by calling DeleteFlowIDsForGem
- f.flowsUsedByGemPortKey.Lock()
- delete(f.flowsUsedByGemPort, uint32(gemPortID))
- f.flowsUsedByGemPortKey.Unlock()
- f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
- f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
+ f.gemToFlowIDsKey.Lock()
+ delete(f.gemToFlowIDs, uint32(gemPortID))
+ f.gemToFlowIDsKey.Unlock()
- f.onuIdsLock.Unlock()
+ f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
+
+ f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
// Delete the gem port on the ONU.
if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
@@ -1970,8 +1824,15 @@
"device-id": f.deviceHandler.device.Id,
"gemport-id": gemPortID})
}
+ techprofileInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
+ if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
+ return olterrors.NewErrNotFound("tech-profile-in-kv-store",
+ log.Fields{
+ "tp-id": tpID,
+ "path": tpPath}, err)
+ }
switch techprofileInst := techprofileInst.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
ok, _ := f.isTechProfileUsedByAnotherGem(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst, uint32(gemPortID))
if !ok {
if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
@@ -1987,23 +1848,23 @@
logger.Warn(ctx, err)
}
}
- case *tp.EponProfile:
+ case *tp_pb.EponTechProfileInstance:
if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
logger.Warn(ctx, err)
}
if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
logger.Warn(ctx, err)
}
- f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
+ f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId)
// Delete the TCONT on the ONU.
- if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
+ if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId, tpPath); err != nil {
logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
log.Fields{
"intf": intfID,
"onu-id": onuID,
"uni-id": uniID,
"device-id": f.deviceHandler.device.Id,
- "alloc-id": techprofileInst.AllocID})
+ "alloc-id": techprofileInst.AllocId})
}
default:
logger.Errorw(ctx, "error-unknown-tech",
@@ -2016,7 +1877,6 @@
// nolint: gocyclo
func (f *OpenOltFlowMgr) clearFlowFromDeviceAndResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) error {
- var flowInfo *rsrcMgr.FlowInfo
logger.Infow(ctx, "clear-flow-from-resource-manager",
log.Fields{
"flowDirection": flowDirection,
@@ -2037,6 +1897,16 @@
onuID := int32(onu)
uniID := int32(uni)
+ tpID, err := getTpIDFromFlow(ctx, flow)
+ if err != nil {
+ return olterrors.NewErrNotFound("tp-id",
+ log.Fields{
+ "flow": flow,
+ "intf-id": Intf,
+ "onu-id": onuID,
+ "uni-id": uniID,
+ "device-id": f.deviceHandler.device.Id}, err)
+ }
for _, field := range flows.GetOfbFields(flow) {
if field.Type == flows.IP_PROTO {
@@ -2060,86 +1930,45 @@
logger.Errorw(ctx, "invalid-in-port-number",
log.Fields{
"port-number": inPort,
- "error": err})
+ "err": err})
return err
}
}
- if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flow.Id); flowInfo == nil {
- logger.Errorw(ctx, "flow-info-not-found-for-flow-to-be-removed", log.Fields{"flow-id": flow.Id, "intf-id": Intf, "onu-id": onuID, "uni-id": uniID})
- return olterrors.NewErrPersistence("remove", "flow", flow.Id, log.Fields{"flow": flow}, err)
- }
- removeFlowMessage := openoltpb2.Flow{FlowId: flowInfo.Flow.FlowId, FlowType: flowInfo.Flow.FlowType}
- logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": flowInfo.Flow})
+
+ removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, AccessIntfId: int32(Intf), OnuId: onuID, UniId: uniID, TechProfileId: tpID, FlowType: flowDirection}
+ logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": flow})
if err = f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
return err
}
- if err = f.resourceMgr.RemoveFlowIDInfo(ctx, Intf, onuID, uniID, flow.Id); err != nil {
- logger.Errorw(ctx, "failed-to-remove-flow-on-kv-store", log.Fields{"error": err})
- return err
- }
- tpID, err := getTpIDFromFlow(ctx, flow)
- if err != nil {
- return olterrors.NewErrNotFound("tp-id",
- log.Fields{
- "flow": flow,
- "intf-id": Intf,
- "onu-id": onuID,
- "uni-id": uniID,
- "device-id": f.deviceHandler.device.Id}, err)
- }
- if !flowInfo.Flow.ReplicateFlow {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, tpID); err != nil {
+ f.flowIDToGemsLock.Lock()
+ gems, ok := f.flowIDToGems[flow.Id]
+ if !ok {
+ logger.Errorw(ctx, "flow-id-to-gem-map-not-found", log.Fields{"flowID": flow.Id})
+ f.flowIDToGemsLock.Unlock()
+ return olterrors.NewErrNotFound("flow-id-to-gem-map-not-found", log.Fields{"flowID": flow.Id}, nil)
+ }
+ copyOfGems := make([]uint32, len(gems))
+ _ = copy(copyOfGems, gems)
+ // Delete the flow-id to gemport list entry from the map now the flow is deleted.
+ delete(f.flowIDToGems, flow.Id)
+ f.flowIDToGemsLock.Unlock()
+
+ logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": copyOfGems})
+ for _, gem := range copyOfGems {
+ if err = f.clearResources(ctx, Intf, onuID, uniID, int32(gem), flow.Id, portNum, tpID); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
- "flow-id": flow.Id,
- "stored-flow": flowInfo.Flow,
- "device-id": f.deviceHandler.device.Id,
- "stored-flow-id": flowInfo.Flow.FlowId,
- "onu-id": onuID,
- "intf": Intf,
- "err": err,
+ "flow-id": flow.Id,
+ "device-id": f.deviceHandler.device.Id,
+ "onu-id": onuID,
+ "intf": Intf,
+ "gem": gem,
+ "err": err,
})
return err
}
- } else {
- gems := make([]uint32, 0)
- for _, gem := range flowInfo.Flow.PbitToGemport {
- gems = appendUnique32bit(gems, gem)
- }
- logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
- for _, gem := range gems {
- if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, tpID); err != nil {
- logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
- "flow-id": flow.Id,
- "stored-flow": flowInfo.Flow,
- "device-id": f.deviceHandler.device.Id,
- "stored-flow-id": flowInfo.Flow.FlowId,
- "onu-id": onuID,
- "intf": Intf,
- "gem": gem,
- "err": err,
- })
- return err
- }
- }
}
- // If datapath flow, clear the symmetric flow data from the subscriberDataPathFlowIDMap map
- if isDatapathFlow(flow) {
- if tpID, err := getTpIDFromFlow(ctx, flow); err != nil {
- var inverseDirection string
- if flowDirection == Upstream {
- inverseDirection = Downstream
- } else {
- inverseDirection = Upstream
- }
-
- keySymm := subscriberDataPathFlowIDKey{intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), direction: inverseDirection, tpID: tpID}
- f.subscriberDataPathFlowIDMapLock.Lock()
- delete(f.subscriberDataPathFlowIDMap, keySymm)
- f.subscriberDataPathFlowIDMapLock.Unlock()
- }
- }
// Decrement reference count for the meter associated with the given <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, flowDirection, Intf, uint32(onuID), uint32(uniID), tpID, false); err != nil {
return err
@@ -2203,7 +2032,8 @@
// Step2 : Push the flowControlBlock to ONU channel
// Step3 : Wait on response channel for response
// Step4 : Return error value
- logger.Debugw(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
+ startTime := time.Now()
+ logger.Infow(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
errChan := make(chan error)
flowCb := flowControlBlock{
ctx: ctx,
@@ -2223,7 +2053,7 @@
f.incomingFlows[onuID] <- flowCb
// Wait on the channel for flow handlers return value
err := <-errChan
- logger.Debugw(ctx, "process-flow--received-resp", log.Fields{"flow": flow, "addFlow": addFlow, "err": err})
+ logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
return err
}
@@ -2235,17 +2065,17 @@
// process the flow completely before proceeding to handle the next flow
flowCb := <-subscriberFlowChannel
if flowCb.addFlow {
- logger.Debugw(flowCb.ctx, "adding-flow",
- log.Fields{"device-id": f.deviceHandler.device.Id,
- "flowToAdd": flowCb.flow})
+ logger.Info(flowCb.ctx, "adding-flow-start")
+ startTime := time.Now()
err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+ logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
// Pass the return value over the return channel
*flowCb.errChan <- err
} else {
- logger.Debugw(flowCb.ctx, "removing-flow",
- log.Fields{"device-id": f.deviceHandler.device.Id,
- "flowToRemove": flowCb.flow})
+ logger.Info(flowCb.ctx, "removing-flow-start")
+ startTime := time.Now()
err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+ logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
// Pass the return value over the return channel
*flowCb.errChan <- err
}
@@ -2393,14 +2223,10 @@
}
//cached group can be removed now
if err := f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true); err != nil {
- logger.Warnw(ctx, "failed-to-remove-flow-group", log.Fields{"group-id": groupID, "error": err})
+ logger.Warnw(ctx, "failed-to-remove-flow-group", log.Fields{"group-id": groupID, "err": err})
}
}
- flowInfo := rsrcMgr.FlowInfo{Flow: &multicastFlow}
- if err = f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", flow.Id, log.Fields{"flow": multicastFlow}, err)
- }
return nil
}
@@ -2413,16 +2239,13 @@
}
return nniInterfaceID, nil
}
- // find the first NNI interface id of the device
- nniPorts, e := f.resourceMgr.GetNNIFromKVStore(ctx)
- if e == nil && len(nniPorts) > 0 {
- return nniPorts[0], nil
- }
- return 0, olterrors.NewErrNotFound("nni-port", nil, e).Log()
+
+ // TODO: For now we support only one NNI port in VOLTHA. We shall use only the first NNI port, i.e., interface-id 0.
+ return 0, nil
}
//sendTPDownloadMsgToChild send payload
-func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
+func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32, tpInst tp_pb.TechProfileInstance) error {
onuDev, err := f.getOnuDevice(ctx, intfID, onuID)
if err != nil {
@@ -2436,7 +2259,11 @@
logger.Debugw(ctx, "got-child-device-from-olt-device-handler", log.Fields{"onu-id": onuDev.deviceID})
tpPath := f.getTPpath(ctx, intfID, uni, TpID)
- tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID, Path: tpPath}
+ tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{
+ UniId: uniID,
+ TpInstancePath: tpPath,
+ TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: &tpInst},
+ }
logger.Debugw(ctx, "sending-load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"tpDownloadMsg": *tpDownloadMsg})
sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
tpDownloadMsg,
@@ -2458,24 +2285,25 @@
}
//UpdateOnuInfo function adds onu info to cache and kvstore
+//UpdateOnuInfo function adds onu info to cache and kvstore
func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
- onugem := f.onuGemInfo
+ f.onuGemInfoLock.RLock()
+ _, ok := f.onuGemInfoMap[onuID]
+ f.onuGemInfoLock.RUnlock()
// If the ONU already exists in onuGemInfo list, nothing to do
- for _, onu := range onugem {
- if onu.OnuID == onuID && onu.SerialNumber == serialNum {
- logger.Debugw(ctx, "onu-id-already-exists-in-cache",
- log.Fields{"onuID": onuID,
- "serialNum": serialNum})
- return nil
- }
+ if ok {
+ logger.Debugw(ctx, "onu-id-already-exists-in-cache",
+ log.Fields{"onuID": onuID,
+ "serialNum": serialNum})
+ return nil
}
- onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
- f.onuGemInfo = append(f.onuGemInfo, onu)
- if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onu); err != nil {
+ onuGemInfo := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
+ f.onuGemInfoLock.Lock()
+ f.onuGemInfoMap[onuID] = &onuGemInfo
+ f.onuGemInfoLock.Unlock()
+ if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onuID, onuGemInfo); err != nil {
return err
}
logger.Infow(ctx, "updated-onuinfo",
@@ -2483,7 +2311,7 @@
"intf-id": intfID,
"onu-id": onuID,
"serial-num": serialNum,
- "onu": onu,
+ "onu": onuGemInfo,
"device-id": f.deviceHandler.device.Id})
return nil
}
@@ -2491,34 +2319,46 @@
//addGemPortToOnuInfoMap function adds GEMport to ONU map
func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
-
logger.Infow(ctx, "adding-gem-to-onu-info-map",
log.Fields{
"gem-port-id": gemPort,
"intf-id": intfID,
"onu-id": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo})
- onugem := f.onuGemInfo
- // update the gem to the local cache as well as to kv strore
- for idx, onu := range onugem {
- if onu.OnuID == onuID {
- // check if gem already exists , else update the cache and kvstore
- for _, gem := range onu.GemPorts {
- if gem == gemPort {
- logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
- log.Fields{
- "gem": gemPort,
- "device-id": f.deviceHandler.device.Id})
- return
- }
+ "device-id": f.deviceHandler.device.Id})
+ f.onuGemInfoLock.RLock()
+ onugem, ok := f.onuGemInfoMap[onuID]
+ f.onuGemInfoLock.RUnlock()
+ if !ok {
+ logger.Warnw(ctx, "onu gem info is missing", log.Fields{
+ "gem-port-id": gemPort,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id})
+ return
+ }
+
+ if onugem.OnuID == onuID {
+ // check if gem already exists , else update the cache and kvstore
+ for _, gem := range onugem.GemPorts {
+ if gem == gemPort {
+ logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
+ log.Fields{
+ "gem": gemPort,
+ "device-id": f.deviceHandler.device.Id})
+ return
}
- onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
- f.onuGemInfo = onugem
- break
}
+ onugem.GemPorts = append(onugem.GemPorts, gemPort)
+ f.onuGemInfoLock.Lock()
+ f.onuGemInfoMap[onuID] = onugem
+ f.onuGemInfoLock.Unlock()
+ } else {
+ logger.Warnw(ctx, "mismatched onu id", log.Fields{
+ "gem-port-id": gemPort,
+ "intf-id": intfID,
+ "onu-id": onuID,
+ "device-id": f.deviceHandler.device.Id})
+ return
}
err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
if err != nil {
@@ -2535,24 +2375,18 @@
"gem-port-id": gemPort,
"intf-id": intfID,
"onu-id": onuID,
- "device-id": f.deviceHandler.device.Id,
- "onu-gem": f.onuGemInfo})
+ "device-id": f.deviceHandler.device.Id})
}
//GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(ctx context.Context, packetIn *openoltpb2.PacketIndication) (uint32, error) {
var logicalPortNum uint32
- var onuID, uniID uint32
- var err error
if packetIn.IntfType == "pon" {
// packet indication does not have serial number , so sending as nil
// get onu and uni ids associated with the given pon and gem ports
- if onuID, uniID, err = f.GetUniPortByPonPortGemPort(ctx, packetIn.IntfId, packetIn.GemportId); err != nil {
- // Called method is returning error with all data populated; just return the same
- return logicalPortNum, err
- }
- logger.Debugf(ctx, "retrieved ONU and UNI IDs [%d, %d] by interface:%d, gem:%d")
+ onuID, uniID := packetIn.OnuId, packetIn.UniId
+ logger.Debugf(ctx, "retrieved ONU and UNI IDs [%d, %d] by interface:%d, gem:%d", packetIn.OnuId, packetIn.UniId, packetIn.GemportId)
if packetIn.PortNo != 0 {
logicalPortNum = packetIn.PortNo
@@ -2576,40 +2410,6 @@
return logicalPortNum, nil
}
-//GetUniPortByPonPortGemPort return onu and uni IDs associated with given pon and gem ports
-func (f *OpenOltFlowMgr) GetUniPortByPonPortGemPort(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, uint32, error) {
- key := gemPortKey{
- intfID: intfID,
- gemPort: gemPortID,
- }
- uniPortInfo, ok := f.fromGemToUniMap(key) //try to get from the cache first
- if ok {
- if len(uniPortInfo) > 1 {
- //return onu ID and uni port from the cache
- logger.Debugw(ctx, "found-uni-port-by-pon-and-gem-ports",
- log.Fields{
- "intfID": intfID,
- "gemPortID": gemPortID,
- "onuID, uniID": uniPortInfo})
- return uniPortInfo[0], uniPortInfo[1], nil
- }
- }
- //If uni port is not found in cache try to get it from kv store. if it is found in kv store, update the cache and return.
- onuID, uniID, err := f.resourceMgr.GetUniPortByPonPortGemPortFromKVStore(ctx, intfID, gemPortID)
- if err == nil {
- f.toGemToUniMap(ctx, key, onuID, uniID)
- logger.Infow(ctx, "found-uni-port-by-pon-and-gem-port-from-kv-store-and-updating-cache-with-uni-port",
- log.Fields{
- "gemPortKey": key,
- "onuID": onuID,
- "uniID": uniID})
- return onuID, uniID, nil
- }
- return uint32(0), uint32(0), olterrors.NewErrNotFound("uni-id",
- log.Fields{"interfaceID": intfID, "gemPortID": gemPortID},
- errors.New("no uni port found"))
-}
-
//GetPacketOutGemPortID returns gemPortId
func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32, packet []byte) (uint32, error) {
var gemPortID uint32
@@ -2721,10 +2521,6 @@
return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
}
logger.Info(ctx, "trap-on-nni-flow-added–to-device-successfully")
- flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
- }
return nil
}
@@ -2814,10 +2610,7 @@
return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
}
logger.Info(ctx, "igmp-trap-on-nni-flow-added-to-device-successfully")
- flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
- if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id, flowInfo); err != nil {
- return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
- }
+
return nil
}
@@ -2846,10 +2639,10 @@
pbitToGem := make(map[uint32]uint32)
gemToAes := make(map[uint32]bool)
- var attributes []tp.IGemPortAttribute
+ var attributes []*tp_pb.GemPortAttributes
var direction = tp_pb.Direction_UPSTREAM
switch TpInst := TpInst.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
if IsUpstream(actionInfo[Output].(uint32)) {
attributes = TpInst.UpstreamGemPortAttributeList
} else {
@@ -2881,9 +2674,9 @@
}
}
} else { // Extract the exact gemport which maps to the PCP classifier in the flow
- if gem := f.techprofile[intfID].GetGemportForPbit(ctx, TpInst, direction, pcp.(uint32)); gem != nil {
- gemPortID = gem.(tp.IGemPortAttribute).GemportID
- gemToAes[gemPortID], _ = strconv.ParseBool(gem.(tp.IGemPortAttribute).AesEncryption)
+ if gem := f.techprofile.GetGemportForPbit(ctx, TpInst, direction, pcp.(uint32)); gem != nil {
+ gemPortID = gem.(*tp_pb.GemPortAttributes).GemportId
+ gemToAes[gemPortID], _ = strconv.ParseBool(gem.(*tp_pb.GemPortAttributes).AesEncryption)
}
}
@@ -2981,26 +2774,26 @@
}
// Send Techprofile download event to child device in go routine as it takes time
go func() {
- if err := f.sendTPDownloadMsgToChild(ctx, intfID, onuID, uniID, uni, tpID); err != nil {
+ if err := f.sendTPDownloadMsgToChild(ctx, intfID, onuID, uniID, uni, tpID, *(TpInst.(*tp_pb.TechProfileInstance))); err != nil {
logger.Warn(ctx, err)
}
}()
}
func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPortID uint32) bool {
- f.flowsUsedByGemPortKey.RLock()
- flowIDList := f.flowsUsedByGemPort[gemPortID]
- f.flowsUsedByGemPortKey.RUnlock()
+ f.gemToFlowIDsKey.RLock()
+ flowIDList := f.gemToFlowIDs[gemPortID]
+ f.gemToFlowIDsKey.RUnlock()
return len(flowIDList) > 1
}
-func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
+func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpInst *tp_pb.TechProfileInstance, gemPortID uint32) (bool, uint32) {
currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, ponIntf, onuID, uniID)
tpGemPorts := tpInst.UpstreamGemPortAttributeList
for _, currentGemPort := range currentGemPorts {
for _, tpGemPort := range tpGemPorts {
- if (currentGemPort == tpGemPort.GemportID) && (currentGemPort != gemPortID) {
+ if (currentGemPort == tpGemPort.GemportId) && (currentGemPort != gemPortID) {
return true, currentGemPort
}
}
@@ -3010,21 +2803,21 @@
}
func (f *OpenOltFlowMgr) isAllocUsedByAnotherUNI(ctx context.Context, sq schedQueue) bool {
- tpInst := sq.tpInst.(*tp.TechProfile)
- if tpInst.InstanceCtrl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
- tpInstances := f.techprofile[sq.intfID].FindAllTpInstances(ctx, f.deviceHandler.device.Id, sq.tpID, sq.intfID, sq.onuID).([]tp.TechProfile)
+ tpInst := sq.tpInst.(*tp_pb.TechProfileInstance)
+ if tpInst.InstanceControl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
+ tpInstances := f.techprofile.FindAllTpInstances(ctx, f.deviceHandler.device.Id, sq.tpID, sq.intfID, sq.onuID).([]tp_pb.TechProfileInstance)
logger.Debugw(ctx, "got-single-instance-tp-instances", log.Fields{"tp-instances": tpInstances})
for i := 0; i < len(tpInstances); i++ {
tpI := tpInstances[i]
if tpI.SubscriberIdentifier != tpInst.SubscriberIdentifier &&
- tpI.UsScheduler.AllocID == tpInst.UsScheduler.AllocID {
+ tpI.UsScheduler.AllocId == tpInst.UsScheduler.AllocId {
logger.Debugw(ctx, "alloc-is-in-use",
log.Fields{
"device-id": f.deviceHandler.device.Id,
"intfID": sq.intfID,
"onuID": sq.onuID,
"uniID": sq.uniID,
- "allocID": tpI.UsScheduler.AllocID,
+ "allocID": tpI.UsScheduler.AllocId,
})
return true
}
@@ -3250,7 +3043,7 @@
logger.Debugw(ctx, "invalid-action-port-number",
log.Fields{
"port-number": action[Output].(uint32),
- "error": err})
+ "err": err})
return uint32(0), err
}
logger.Infow(ctx, "output-nni-intfId-is", log.Fields{"intf-id": intfID})
@@ -3261,7 +3054,7 @@
logger.Debugw(ctx, "invalid-classifier-port-number",
log.Fields{
"port-number": action[Output].(uint32),
- "error": err})
+ "err": err})
return uint32(0), err
}
logger.Infow(ctx, "input-nni-intfId-is", log.Fields{"intf-id": intfID})
@@ -3331,69 +3124,39 @@
return 0, 0, nil
}
-// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
-func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
-
- f.onuGemInfoLock.Lock()
- defer f.onuGemInfoLock.Unlock()
-
- onugem := f.onuGemInfo
- for idx, onu := range onugem {
- if onu.OnuID == onuID {
- for _, uni := range onu.UniPorts {
- if uni == portNum {
- logger.Infow(ctx, "uni-already-in-cache--no-need-to-update-cache-and-kv-store", log.Fields{"uni": portNum})
- return
+func (f *OpenOltFlowMgr) loadFlowIDsForGemAndGemIDsForFlow(ctx context.Context) {
+ logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - start")
+ f.onuGemInfoLock.RLock()
+ f.gemToFlowIDsKey.Lock()
+ f.flowIDToGemsLock.Lock()
+ for _, og := range f.onuGemInfoMap {
+ for _, gem := range og.GemPorts {
+ flowIDs, err := f.resourceMgr.GetFlowIDsForGem(ctx, f.ponPortIdx, gem)
+ if err != nil {
+ f.gemToFlowIDs[gem] = flowIDs
+ for _, flowID := range flowIDs {
+ if _, ok := f.flowIDToGems[flowID]; !ok {
+ f.flowIDToGems[flowID] = []uint32{gem}
+ } else {
+ f.flowIDToGems[flowID] = appendUnique32bit(f.flowIDToGems[flowID], gem)
+ }
}
}
- onugem[idx].UniPorts = append(onugem[idx].UniPorts, portNum)
- f.onuGemInfo = onugem
}
}
- f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNum)
-
-}
-
-func (f *OpenOltFlowMgr) loadFlowIDlistForGem(ctx context.Context, intf uint32) {
- flowIDsList, err := f.resourceMgr.GetFlowIDsGemMapForInterface(ctx, intf)
- if err != nil {
- logger.Error(ctx, "failed-to-get-flowid-list-per-gem", log.Fields{"intf": intf})
- return
- }
- f.flowsUsedByGemPortKey.Lock()
- for gem, FlowIDs := range flowIDsList {
- f.flowsUsedByGemPort[gem] = FlowIDs
- }
- f.flowsUsedByGemPortKey.Unlock()
+ f.flowIDToGemsLock.Unlock()
+ f.gemToFlowIDsKey.Unlock()
+ f.onuGemInfoLock.RUnlock()
+ logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - end")
}
//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
// clears resources reserved for this multicast flow
func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) error {
- classifierInfo := make(map[string]interface{})
- var flowInfo *rsrcMgr.FlowInfo
- formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
- networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
-
- if err != nil {
- logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
- return err
- }
-
- var onuID = int32(NoneOnuID)
- var uniID = int32(NoneUniID)
- if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id); flowInfo == nil {
- return olterrors.NewErrPersistence("remove", "flow", flow.Id,
- log.Fields{
- "flow": flow,
- "device-id": f.deviceHandler.device.Id,
- "intf-id": networkInterfaceID,
- "onu-id": onuID}, err).Log()
- }
- removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, FlowType: flowInfo.Flow.FlowType}
+ removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, FlowType: Multicast}
logger.Debugw(ctx, "multicast-flow-to-be-deleted",
log.Fields{
- "flow": flowInfo.Flow,
+ "flow": flow,
"flow-id": flow.Id,
"device-id": f.deviceHandler.device.Id})
// Remove from device
@@ -3402,60 +3165,44 @@
logger.Errorw(ctx, "failed-to-remove-multicast-flow",
log.Fields{
"flow-id": flow.Id,
- "error": err})
+ "err": err})
return err
}
- // Remove flow from KV store
- return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id)
+
+ return nil
}
-// reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
-func (f *OpenOltFlowMgr) reconcileSubscriberDataPathFlowIDMap(ctx context.Context) {
- onuGemInfo, err := f.resourceMgr.GetOnuGemInfo(ctx, f.ponPortIdx)
+func (f *OpenOltFlowMgr) getTechProfileDownloadMessage(ctx context.Context, tpPath string, ponID uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
+ tpInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
if err != nil {
- _ = olterrors.NewErrNotFound("onu", log.Fields{
- "pon-port": f.ponPortIdx}, err).Log()
- return
+ logger.Errorw(ctx, "error-fetching-tp-instance", log.Fields{"tpPath": tpPath})
+ return nil
}
- f.subscriberDataPathFlowIDMapLock.Lock()
- defer f.subscriberDataPathFlowIDMapLock.Unlock()
-
- for _, onu := range onuGemInfo {
- for _, uniID := range onu.UniPorts {
- flowIDs, err := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID))
- if err != nil {
- logger.Fatalf(ctx, "failed-to-read-flow-ids-of-onu-during-reconciliation")
- }
- for _, flowID := range flowIDs {
- flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
- if flowInfo == nil {
- // Error is already logged in the called function
- continue
- }
- if flowInfo.Flow.Classifier.PktTagType == DoubleTag &&
- flowInfo.Flow.FlowType == Downstream &&
- flowInfo.Flow.Classifier.OVid > 0 &&
- flowInfo.Flow.TechProfileId > 0 {
- key := subscriberDataPathFlowIDKey{intfID: onu.IntfID, onuID: onu.OnuID, uniID: uniID, direction: flowInfo.Flow.FlowType, tpID: flowInfo.Flow.TechProfileId}
- if _, ok := f.subscriberDataPathFlowIDMap[key]; !ok {
- f.subscriberDataPathFlowIDMap[key] = flowInfo.Flow.FlowId
- }
- } else if flowInfo.Flow.Classifier.PktTagType == SingleTag &&
- flowInfo.Flow.FlowType == Upstream &&
- flowInfo.Flow.Action.OVid > 0 &&
- flowInfo.Flow.TechProfileId > 0 {
- key := subscriberDataPathFlowIDKey{intfID: onu.IntfID, onuID: onu.OnuID, uniID: uniID, direction: flowInfo.Flow.FlowType, tpID: flowInfo.Flow.TechProfileId}
- if _, ok := f.subscriberDataPathFlowIDMap[key]; !ok {
- f.subscriberDataPathFlowIDMap[key] = flowInfo.Flow.FlowId
- }
- }
- }
+ switch tpInst := tpInst.(type) {
+ case *tp_pb.TechProfileInstance:
+ logger.Debugw(ctx, "fetched-tp-instance-successfully--formulating-tp-download-msg", log.Fields{"tpPath": tpPath})
+ return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+ TpInstancePath: tpPath,
+ TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: tpInst},
}
+ case *openoltpb2.EponTechProfileInstance:
+ return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+ TpInstancePath: tpPath,
+ TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_EponTpInstance{EponTpInstance: tpInst},
+ }
+ default:
+ logger.Errorw(ctx, "unknown-tech", log.Fields{"tpPath": tpPath})
}
+ return nil
}
-// isDatapathFlow declares a flow as datapath flow if it is not a controller bound flow and the flow does not have group
-func isDatapathFlow(flow *ofp.OfpFlowStats) bool {
- return !IsControllerBoundFlow(flows.GetOutPort(flow)) && !flows.HasGroup(flow)
+func (f *OpenOltFlowMgr) getOnuGemInfoList() []rsrcMgr.OnuGemInfo {
+ var onuGemInfoLst []rsrcMgr.OnuGemInfo
+ f.onuGemInfoLock.RLock()
+ defer f.onuGemInfoLock.RUnlock()
+ for _, v := range f.onuGemInfoMap {
+ onuGemInfoLst = append(onuGemInfoLst, *v)
+ }
+ return onuGemInfoLst
}
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 44a02fb..214fa61 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -29,15 +29,10 @@
"github.com/opencord/voltha-protos/v4/go/voltha"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
- "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
+ fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
- "github.com/opencord/voltha-openolt-adapter/pkg/mocks"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/openolt"
openoltpb2 "github.com/opencord/voltha-protos/v4/go/openolt"
tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
)
@@ -48,41 +43,11 @@
_, _ = log.SetDefaultLogger(log.JSON, log.DebugLevel, nil)
flowMgr = newMockFlowmgr()
}
-func newMockResourceMgr() *resourcemanager.OpenOltResourceMgr {
- ranges := []*openolt.DeviceInfo_DeviceResourceRanges{
- {
- IntfIds: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
- Technology: "Default",
- },
- }
-
- deviceinfo := &openolt.DeviceInfo{Vendor: "openolt", Model: "openolt", HardwareVersion: "1.0", FirmwareVersion: "1.0",
- DeviceId: "olt", DeviceSerialNumber: "openolt", PonPorts: 16, Technology: "Default",
- OnuIdStart: OnuIDStart, OnuIdEnd: OnuIDEnd, AllocIdStart: AllocIDStart, AllocIdEnd: AllocIDEnd,
- GemportIdStart: GemIDStart, GemportIdEnd: GemIDEnd, FlowIdStart: FlowIDStart, FlowIdEnd: FlowIDEnd,
- Ranges: ranges,
- }
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- rsrMgr := resourcemanager.NewResourceMgr(ctx, "olt", "127.0.0.1:2379", "etcd", "olt", deviceinfo, "service/voltha")
- for key := range rsrMgr.ResourceMgrs {
- rsrMgr.ResourceMgrs[key].KVStore = &db.Backend{}
- rsrMgr.ResourceMgrs[key].KVStore.Client = &mocks.MockKVClient{}
- rsrMgr.ResourceMgrs[key].TechProfileMgr = mocks.MockTechProfile{TpID: key}
- }
- return rsrMgr
-}
func newMockFlowmgr() []*OpenOltFlowMgr {
- rMgr := newMockResourceMgr()
dh := newMockDeviceHandler()
- rMgr.KVStore = &db.Backend{}
- rMgr.KVStore.Client = &mocks.MockKVClient{}
-
- dh.resourceMgr = rMgr
-
- // onuGemInfo := make([]rsrcMgr.OnuGemInfo, NumPonPorts)
+ // onuGemInfoMap := make([]rsrcMgr.onuGemInfoMap, NumPonPorts)
var i uint32
for i = 0; i < NumPonPorts; i++ {
@@ -90,11 +55,7 @@
packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: i, OnuID: i + 1, LogicalPort: i + 1, VlanID: uint16(i), Priority: uint8(i)}] = i + 1
dh.flowMgr[i].packetInGemPort = packetInGemPort
- tps := make(map[uint32]tp.TechProfileIf)
- for key := range rMgr.ResourceMgrs {
- tps[key] = mocks.MockTechProfile{TpID: key}
- }
- dh.flowMgr[i].techprofile = tps
+ dh.flowMgr[i].techprofile = dh.resourceMgr[i].PonRsrMgr.TechProfileMgr
interface2mcastQeueuMap := make(map[uint32]*QueueInfoBrief)
interface2mcastQeueuMap[0] = &QueueInfoBrief{
gemPortID: 4000,
@@ -102,21 +63,22 @@
}
dh.flowMgr[i].grpMgr.interfaceToMcastQueueMap = interface2mcastQeueuMap
}
-
return dh.flowMgr
}
func TestOpenOltFlowMgr_CreateSchedulerQueues(t *testing.T) {
- tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
+ tprofile := &tp_pb.TechProfileInstance{Name: "tp1", SubscriberIdentifier: "subscriber1",
ProfileType: "pt1", NumGemPorts: 1, Version: 1,
- InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
+ InstanceControl: &tp_pb.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
}
- tprofile.UsScheduler.Direction = "UPSTREAM"
- tprofile.UsScheduler.QSchedPolicy = "WRR"
+ tprofile.UsScheduler = &openoltpb2.SchedulerAttributes{}
+ tprofile.UsScheduler.Direction = tp_pb.Direction_UPSTREAM
+ tprofile.UsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
tprofile2 := tprofile
- tprofile2.DsScheduler.Direction = "DOWNSTREAM"
- tprofile2.DsScheduler.QSchedPolicy = "WRR"
+ tprofile2.DsScheduler = &openoltpb2.SchedulerAttributes{}
+ tprofile2.DsScheduler.Direction = tp_pb.Direction_DOWNSTREAM
+ tprofile2.DsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
tests := []struct {
name string
@@ -135,17 +97,17 @@
{"CreateSchedulerQueues-19", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, createFlowMetadata(tprofile, 5, Upstream)}, false},
{"CreateSchedulerQueues-20", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, createFlowMetadata(tprofile2, 5, Downstream)}, false},
- {"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, createFlowMetadata(tprofile, 0, Upstream)}, true},
- {"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, createFlowMetadata(tprofile2, 0, Downstream)}, true},
+ {"CreateSchedulerQueues-1", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, createFlowMetadata(tprofile, 0, Upstream)}, false},
+ {"CreateSchedulerQueues-2", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, createFlowMetadata(tprofile2, 0, Downstream)}, false},
{"CreateSchedulerQueues-3", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, createFlowMetadata(tprofile, 2, Upstream)}, true},
{"CreateSchedulerQueues-4", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, createFlowMetadata(tprofile2, 2, Downstream)}, true},
{"CreateSchedulerQueues-5", schedQueue{tp_pb.Direction_UPSTREAM, 1, 2, 2, 64, 2, tprofile, 2, createFlowMetadata(tprofile, 3, Upstream)}, true},
{"CreateSchedulerQueues-6", schedQueue{tp_pb.Direction_DOWNSTREAM, 1, 2, 2, 65, 2, tprofile2, 2, createFlowMetadata(tprofile2, 3, Downstream)}, true},
//Negative testcases
- {"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-7", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 1, &voltha.FlowMetadata{}}, false},
{"CreateSchedulerQueues-8", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 0, &voltha.FlowMetadata{}}, true},
- {"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, true},
+ {"CreateSchedulerQueues-9", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 1, &voltha.FlowMetadata{}}, false},
{"CreateSchedulerQueues-10", schedQueue{tp_pb.Direction_UPSTREAM, 0, 1, 1, 64, 1, tprofile, 2, &voltha.FlowMetadata{}}, true},
{"CreateSchedulerQueues-11", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, &voltha.FlowMetadata{}}, true},
{"CreateSchedulerQueues-12", schedQueue{tp_pb.Direction_DOWNSTREAM, 0, 1, 1, 65, 1, tprofile2, 2, nil}, true},
@@ -161,35 +123,35 @@
}
}
-func createFlowMetadata(techProfile *tp.TechProfile, tcontType int, direction string) *voltha.FlowMetadata {
- var additionalBw string
+func createFlowMetadata(techProfile *tp_pb.TechProfileInstance, tcontType int, direction string) *voltha.FlowMetadata {
+ var additionalBw openoltpb2.AdditionalBW
bands := make([]*ofp.OfpMeterBandHeader, 0)
switch tcontType {
case 1:
//tcont-type-1
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 10000, BurstSize: 0, Data: &ofp.OfpMeterBandHeader_Drop{}})
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 10000, BurstSize: 0, Data: &ofp.OfpMeterBandHeader_Drop{}})
- additionalBw = "AdditionalBW_None"
+ additionalBw = tp_pb.AdditionalBW_AdditionalBW_None
case 2:
//tcont-type-2
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 60000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 50000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
- additionalBw = "AdditionalBW_None"
+ additionalBw = tp_pb.AdditionalBW_AdditionalBW_None
case 3:
//tcont-type-3
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 100000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 50000, BurstSize: 20000, Data: &ofp.OfpMeterBandHeader_Drop{}})
- additionalBw = "AdditionalBW_NA"
+ additionalBw = tp_pb.AdditionalBW_AdditionalBW_NA
case 4:
//tcont-type-4
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 200000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
- additionalBw = "AdditionalBW_BestEffort"
+ additionalBw = tp_pb.AdditionalBW_AdditionalBW_BestEffort
case 5:
//tcont-type-5
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 50000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 100000, BurstSize: 10000, Data: &ofp.OfpMeterBandHeader_Drop{}})
bands = append(bands, &ofp.OfpMeterBandHeader{Type: ofp.OfpMeterBandType_OFPMBT_DROP, Rate: 10000, BurstSize: 0, Data: &ofp.OfpMeterBandHeader_Drop{}})
- additionalBw = "AdditionalBW_BestEffort"
+ additionalBw = tp_pb.AdditionalBW_AdditionalBW_BestEffort
default:
// do nothing, we will return meter config with no meter bands
}
@@ -206,18 +168,20 @@
}
func TestOpenOltFlowMgr_RemoveSchedulerQueues(t *testing.T) {
- tprofile := &tp.TechProfile{Name: "tp1", SubscriberIdentifier: "subscriber1",
+ tprofile := &tp_pb.TechProfileInstance{Name: "tp1", SubscriberIdentifier: "subscriber1",
ProfileType: "pt1", NumGemPorts: 1, Version: 1,
- InstanceCtrl: tp.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
+ InstanceControl: &tp_pb.InstanceControl{Onu: "1", Uni: "1", MaxGemPayloadSize: "1"},
}
- tprofile.UsScheduler.Direction = "UPSTREAM"
- tprofile.UsScheduler.AdditionalBw = "AdditionalBW_None"
- tprofile.UsScheduler.QSchedPolicy = "WRR"
+ tprofile.UsScheduler = &openoltpb2.SchedulerAttributes{}
+ tprofile.UsScheduler.Direction = tp_pb.Direction_UPSTREAM
+ tprofile.UsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_None
+ tprofile.UsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
tprofile2 := tprofile
- tprofile2.DsScheduler.Direction = "DOWNSTREAM"
- tprofile2.DsScheduler.AdditionalBw = "AdditionalBW_None"
- tprofile2.DsScheduler.QSchedPolicy = "WRR"
+ tprofile2.DsScheduler = &openoltpb2.SchedulerAttributes{}
+ tprofile2.DsScheduler.Direction = tp_pb.Direction_DOWNSTREAM
+ tprofile2.DsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_None
+ tprofile2.DsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
//defTprofile := &tp.DefaultTechProfile{}
tests := []struct {
name string
@@ -267,7 +231,6 @@
args args
}{
{"createTcontGemports-1", args{intfID: 0, onuID: 1, uniID: 1, uni: "16", uniPort: 1, TpID: 64, UsMeterID: 1, DsMeterID: 1, flowMetadata: flowmetadata}},
- {"createTcontGemports-1", args{intfID: 0, onuID: 1, uniID: 1, uni: "16", uniPort: 1, TpID: 65, UsMeterID: 1, DsMeterID: 1, flowMetadata: flowmetadata}},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -275,11 +238,11 @@
t.Run(tt.name, func(t *testing.T) {
_, _, tpInst := flowMgr[tt.args.intfID].createTcontGemports(ctx, tt.args.intfID, tt.args.onuID, tt.args.uniID, tt.args.uni, tt.args.uniPort, tt.args.TpID, tt.args.UsMeterID, tt.args.DsMeterID, tt.args.flowMetadata)
switch tpInst := tpInst.(type) {
- case *tp.TechProfile:
+ case *tp_pb.TechProfileInstance:
if tt.args.TpID != 64 {
t.Errorf("OpenOltFlowMgr.createTcontGemports() error = different tech, tech %v", tpInst)
}
- case *tp.EponProfile:
+ case *tp_pb.EponTechProfileInstance:
if tt.args.TpID != 65 {
t.Errorf("OpenOltFlowMgr.createTcontGemports() error = different tech, tech %v", tpInst)
}
@@ -680,7 +643,7 @@
// clean the flowMgr
for i := 0; i < intfNum; i++ {
- flowMgr[i].onuGemInfo = make([]rsrcMgr.OnuGemInfo, 0)
+ flowMgr[i].onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -696,7 +659,6 @@
// Add gemPorts to OnuInfo in parallel threads
wg := sync.WaitGroup{}
-
for o := 1; o <= onuNum; o++ {
for i := 0; i < intfNum; i++ {
wg.Add(1)
@@ -711,15 +673,15 @@
wg.Wait()
- // check that each entry of onuGemInfo has the correct number of ONUs
+ // check that each entry of onuGemInfoMap has the correct number of ONUs
for i := 0; i < intfNum; i++ {
- lenofOnu := len(flowMgr[i].onuGemInfo)
+ lenofOnu := len(flowMgr[i].onuGemInfoMap)
if onuNum != lenofOnu {
- t.Errorf("OnuGemInfo length is not as expected len = %d, want %d", lenofOnu, onuNum)
+ t.Errorf("onuGemInfoMap length is not as expected len = %d, want %d", lenofOnu, onuNum)
}
for o := 1; o <= onuNum; o++ {
- lenOfGemPorts := len(flowMgr[i].onuGemInfo[o-1].GemPorts)
+ lenOfGemPorts := len(flowMgr[i].onuGemInfoMap[uint32(o)].GemPorts)
// check that each onuEntry has 1 gemPort
if lenOfGemPorts != 1 {
t.Errorf("Expected 1 GemPort per ONU, found %d", lenOfGemPorts)
@@ -727,7 +689,7 @@
// check that the value of the gemport is correct
gemID, _ := strconv.Atoi(fmt.Sprintf("90%d%d", i, o-1))
- currentValue := flowMgr[i].onuGemInfo[o-1].GemPorts[0]
+ currentValue := flowMgr[i].onuGemInfoMap[uint32(o)].GemPorts[0]
if uint32(gemID) != currentValue {
t.Errorf("Expected GemPort value to be %d, found %d", gemID, currentValue)
}
@@ -774,11 +736,11 @@
for _, gemPortDeleted := range tt.args.gemPortIDsToBeDeleted {
flowMgr[tt.args.intfID].deleteGemPortFromLocalCache(ctx, tt.args.intfID, tt.args.onuID, gemPortDeleted)
}
- lenofGemPorts := len(flowMgr[tt.args.intfID].onuGemInfo[0].GemPorts)
+ lenofGemPorts := len(flowMgr[tt.args.intfID].onuGemInfoMap[1].GemPorts)
if lenofGemPorts != tt.args.finalLength {
t.Errorf("GemPorts length is not as expected len = %d, want %d", lenofGemPorts, tt.args.finalLength)
}
- gemPorts := flowMgr[tt.args.intfID].onuGemInfo[0].GemPorts
+ gemPorts := flowMgr[tt.args.intfID].onuGemInfoMap[1].GemPorts
if !reflect.DeepEqual(tt.args.gemPortIDsRemaining, gemPorts) {
t.Errorf("GemPorts are not as expected = %v, want %v", gemPorts, tt.args.gemPortIDsRemaining)
}
@@ -798,11 +760,11 @@
wantErr bool
}{
// TODO: Add test cases.
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 255, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 0, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048576, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 255, OnuId: 1, UniId: 0, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "nni", IntfId: 0, GemportId: 1, OnuId: 1, UniId: 0, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1048576, false},
// Negative Test cases.
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 0, true},
- {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 257, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 16, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 1, GemportId: 1, OnuId: 1, UniId: 0, FlowId: 100, PortNo: 1, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 1, false},
+ {"GetLogicalPortFromPacketIn", args{packetIn: &openoltpb2.PacketIndication{IntfType: "pon", IntfId: 0, GemportId: 257, OnuId: 1, UniId: 0, FlowId: 100, PortNo: 0, Cookie: 100, Pkt: []byte("GetLogicalPortFromPacketIn")}}, 16, false},
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -1067,51 +1029,55 @@
// So just return in case of error
return
}
-
- TpInst := &tp.TechProfile{
+ /*
+ usGemList := make([]*tp_pb.GemPortAttributes, 4)
+ usGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ usGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ usGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ usGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ dsGemList := make([]*tp_pb.GemPortAttributes, 4)
+ dsGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ dsGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ dsGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ dsGemList = append(usGemList, &tp_pb.GemPortAttributes{})
+ */
+ TpInst := &tp_pb.TechProfileInstance{
Name: "Test-Tech-Profile",
SubscriberIdentifier: "257",
ProfileType: "Mock",
Version: 1,
NumGemPorts: 4,
- InstanceCtrl: tp.InstanceControl{
+ InstanceControl: &tp_pb.InstanceControl{
Onu: "1",
Uni: "16",
},
+ UsScheduler: &openoltpb2.SchedulerAttributes{},
+ DsScheduler: &openoltpb2.SchedulerAttributes{},
}
TpInst.UsScheduler.Priority = 1
- TpInst.UsScheduler.Direction = "upstream"
- TpInst.UsScheduler.AllocID = 1
- TpInst.UsScheduler.AdditionalBw = "None"
- TpInst.UsScheduler.QSchedPolicy = "PQ"
+ TpInst.UsScheduler.Direction = tp_pb.Direction_UPSTREAM
+ TpInst.UsScheduler.AllocId = 1
+ TpInst.UsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_None
+ TpInst.UsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
TpInst.UsScheduler.Weight = 4
TpInst.DsScheduler.Priority = 1
- TpInst.DsScheduler.Direction = "upstream"
- TpInst.DsScheduler.AllocID = 1
- TpInst.DsScheduler.AdditionalBw = "None"
- TpInst.DsScheduler.QSchedPolicy = "PQ"
+ TpInst.DsScheduler.Direction = tp_pb.Direction_DOWNSTREAM
+ TpInst.DsScheduler.AllocId = 1
+ TpInst.DsScheduler.AdditionalBw = tp_pb.AdditionalBW_AdditionalBW_None
+ TpInst.DsScheduler.QSchedPolicy = tp_pb.SchedulingPolicy_WRR
TpInst.DsScheduler.Weight = 4
+ TpInst.UpstreamGemPortAttributeList = make([]*tp_pb.GemPortAttributes, 0)
+ TpInst.UpstreamGemPortAttributeList = append(TpInst.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 1, PbitMap: "0b00000011"})
+ TpInst.UpstreamGemPortAttributeList = append(TpInst.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 2, PbitMap: "0b00001100"})
+ TpInst.UpstreamGemPortAttributeList = append(TpInst.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 3, PbitMap: "0b00110000"})
+ TpInst.UpstreamGemPortAttributeList = append(TpInst.UpstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 4, PbitMap: "0b11000000"})
- TpInst.UpstreamGemPortAttributeList = make([]tp.IGemPortAttribute, 4)
- TpInst.UpstreamGemPortAttributeList[0].GemportID = 1
- TpInst.UpstreamGemPortAttributeList[0].PbitMap = "0b00000011"
- TpInst.UpstreamGemPortAttributeList[0].GemportID = 2
- TpInst.UpstreamGemPortAttributeList[0].PbitMap = "0b00001100"
- TpInst.UpstreamGemPortAttributeList[0].GemportID = 3
- TpInst.UpstreamGemPortAttributeList[0].PbitMap = "0b00110000"
- TpInst.UpstreamGemPortAttributeList[0].GemportID = 4
- TpInst.UpstreamGemPortAttributeList[0].PbitMap = "0b11000000"
-
- TpInst.DownstreamGemPortAttributeList = make([]tp.IGemPortAttribute, 4)
- TpInst.DownstreamGemPortAttributeList[0].GemportID = 1
- TpInst.DownstreamGemPortAttributeList[0].PbitMap = "0b00000011"
- TpInst.DownstreamGemPortAttributeList[0].GemportID = 2
- TpInst.DownstreamGemPortAttributeList[0].PbitMap = "0b00001100"
- TpInst.DownstreamGemPortAttributeList[0].GemportID = 3
- TpInst.DownstreamGemPortAttributeList[0].PbitMap = "0b00110000"
- TpInst.DownstreamGemPortAttributeList[0].GemportID = 4
- TpInst.DownstreamGemPortAttributeList[0].PbitMap = "0b11000000"
+ TpInst.DownstreamGemPortAttributeList = make([]*tp_pb.GemPortAttributes, 0)
+ TpInst.DownstreamGemPortAttributeList = append(TpInst.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 1, PbitMap: "0b00000011"})
+ TpInst.DownstreamGemPortAttributeList = append(TpInst.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 2, PbitMap: "0b00001100"})
+ TpInst.DownstreamGemPortAttributeList = append(TpInst.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 3, PbitMap: "0b00110000"})
+ TpInst.DownstreamGemPortAttributeList = append(TpInst.DownstreamGemPortAttributeList, &tp_pb.GemPortAttributes{GemportId: 4, PbitMap: "0b11000000"})
type args struct {
args map[string]uint32
@@ -1123,7 +1089,7 @@
onuID uint32
uniID uint32
portNo uint32
- TpInst *tp.TechProfile
+ TpInst *tp_pb.TechProfileInstance
allocID []uint32
gemPorts []uint32
TpID uint32
diff --git a/internal/pkg/core/openolt_groupmgr.go b/internal/pkg/core/openolt_groupmgr.go
index a87073b..4f633a7 100644
--- a/internal/pkg/core/openolt_groupmgr.go
+++ b/internal/pkg/core/openolt_groupmgr.go
@@ -16,8 +16,8 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
diff --git a/internal/pkg/core/openolt_test.go b/internal/pkg/core/openolt_test.go
index 3933475..8665028 100644
--- a/internal/pkg/core/openolt_test.go
+++ b/internal/pkg/core/openolt_test.go
@@ -28,13 +28,13 @@
"reflect"
"testing"
- conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
+ conf "github.com/opencord/voltha-lib-go/v5/pkg/config"
- com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
- "github.com/opencord/voltha-lib-go/v4/pkg/events"
- fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ com "github.com/opencord/voltha-lib-go/v5/pkg/adapters/common"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events"
+ fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/config"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
diff --git a/internal/pkg/core/statsmanager.go b/internal/pkg/core/statsmanager.go
index 42c60d7..4d7d52d 100755
--- a/internal/pkg/core/statsmanager.go
+++ b/internal/pkg/core/statsmanager.go
@@ -25,7 +25,7 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
"github.com/opencord/voltha-protos/v4/go/extension"
"github.com/opencord/voltha-protos/v4/go/openolt"
@@ -285,7 +285,7 @@
var Ports interface{}
Ports, _ = InitPorts(ctx, "nni", Dev.device.Id, 1)
StatMgr.NorthBoundPort, _ = Ports.(map[uint32]*NniPort)
- NumPonPorts := Dev.resourceMgr.DevInfo.GetPonPorts()
+ NumPonPorts := Dev.resourceMgr[0].DevInfo.GetPonPorts()
Ports, _ = InitPorts(ctx, "pon", Dev.device.Id, NumPonPorts)
StatMgr.SouthBoundPort, _ = Ports.(map[uint32]*PonPort)
if StatMgr.Device.openOLT.enableONUStats {