VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
- Pass information to agent to do the flow replication
- Consolidate various locks in the adapter and remove reduntant locks
- use voltha-proto version 4.0.2 and voltha-lib-go version 4.0.0
- Bump adapter version to 3.0.0
Change-Id: Ic053c54e5319bb1736ec74facfc79dd10058ecf5
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 3f642cf..abfb73d 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -23,17 +23,18 @@
"errors"
"fmt"
"strconv"
+ "strings"
"sync"
"time"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
- "github.com/opencord/voltha-lib-go/v3/pkg/db"
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
- ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
- "github.com/opencord/voltha-protos/v3/go/openolt"
+ "github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ ponrmgr "github.com/opencord/voltha-lib-go/v4/pkg/ponresourcemanager"
+ ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
+ "github.com/opencord/voltha-protos/v4/go/openolt"
)
const (
@@ -47,11 +48,12 @@
MeterIDPathSuffix = "{%d,%d,%d}/{%d}/meter_id/{%s}"
//NnniIntfID - nniintfids
NnniIntfID = "nniintfids"
- //OnuPacketINPathPrefix to be used as prefix for keys to be used to store packet-in gem ports
- OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d"
- // OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, pcketout
- //format: onu_packetin/<intfid>,<onuid>,<logicalport>,<vlanId>,<priority>
- OnuPacketINPath = OnuPacketINPathPrefix + ",%d,%d}"
+ // OnuPacketINPathPrefix - path prefix where ONU packet-in vlanID/PCP is stored
+ //format: onu_packetin/{<intfid>,<onuid>,<logicalport>}
+ OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d}"
+ // OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, packetout
+ //format: onu_packetin/{<intfid>,<onuid>,<logicalport>}/{<vlanId>,<priority>}
+ OnuPacketINPath = OnuPacketINPathPrefix + "/{%d,%d}"
//FlowIDsForGem flowids_per_gem/<intfid>
FlowIDsForGem = "flowids_per_gem/{%d}"
//McastQueuesForIntf multicast queues for pon interfaces
@@ -68,14 +70,19 @@
// We preserve the groups under "FlowGroupsCached" directory in the KV store temporarily. Having set members,
// we remove the group from the cached group store.
FlowGroupCached = "flow_groups_cached/{%d}"
+
+ //FlowIDPath - Path on the KV store for storing list of Flow IDs for a given subscriber
+ //Format: BasePathKvStore/<(pon_intf_id, onu_id, uni_id)>/flow_ids
+ FlowIDPath = "{%s}/flow_ids"
+ //FlowIDInfoPath - Used to store more metadata associated with the flow_id
+ //Format: BasePathKvStore/<(pon_intf_id, onu_id, uni_id)>/flow_id_info/<flow_id>
+ FlowIDInfoPath = "{%s}/flow_id_info/{%d}"
)
// FlowInfo holds the flow information
type FlowInfo struct {
- Flow *openolt.Flow
- FlowStoreCookie uint64
- FlowCategory string
- LogicalFlowID uint64
+ Flow *openolt.Flow
+ IsSymmtricFlow bool
}
// OnuGemInfo holds onu information along with gem port list and uni port list
@@ -119,12 +126,6 @@
AllocIDMgmtLock []sync.RWMutex
// This protects concurrent onu_id allocate/delete calls on a per PON port basis
OnuIDMgmtLock []sync.RWMutex
- // This protects concurrent flow_id allocate/delete calls. We do not need this on a
- // per PON port basis as flow IDs are unique across the OLT.
- FlowIDMgmtLock sync.RWMutex
-
- // This protects concurrent access to flowids_per_gem info stored on KV store
- flowIDToGemInfoLock sync.RWMutex
}
func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
@@ -164,6 +165,7 @@
func NewResourceMgr(ctx context.Context, deviceID string, KVStoreAddress string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo) *OpenOltResourceMgr {
var ResourceMgr OpenOltResourceMgr
logger.Debugf(ctx, "Init new resource manager , address: %s, device-id: %s", KVStoreAddress, deviceID)
+ ResourceMgr.DeviceID = deviceID
ResourceMgr.Address = KVStoreAddress
ResourceMgr.DeviceType = deviceType
ResourceMgr.DevInfo = devInfo
@@ -249,7 +251,7 @@
GlobalPONRsrcMgr = RsrcMgrsByTech[technology]
}
for _, IntfID := range TechRange.IntfIds {
- ResourceMgr.ResourceMgrs[uint32(IntfID)] = RsrcMgrsByTech[technology]
+ ResourceMgr.ResourceMgrs[IntfID] = RsrcMgrsByTech[technology]
}
// self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
InitializeDeviceResourceRangeAndPool(ctx, RsrcMgrsByTech[technology], GlobalPONRsrcMgr,
@@ -446,16 +448,16 @@
return 0, err
}
// Get ONU id for a provided pon interface ID.
- ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
+ onuID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
ponrmgr.ONU_ID, 1)
if err != nil {
logger.Errorf(ctx, "Failed to get resource for interface %d for type %s",
ponIntfID, ponrmgr.ONU_ID)
return 0, err
}
- if ONUID != nil {
- RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(ctx, fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
- return ONUID[0], err
+ if onuID != nil {
+ RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(ctx, fmt.Sprintf("%d,%d", ponIntfID, onuID[0]))
+ return onuID[0], err
}
return 0, err // return OnuID 0 on error
@@ -464,81 +466,170 @@
// GetFlowIDInfo returns the slice of flow info of the given pon-port
// Note: For flows which trap from the NNI and not really associated with any particular
// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint32) *[]FlowInfo {
- var flows []FlowInfo
+func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint64) *FlowInfo {
+ var flowInfo FlowInfo
- FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- if err := RsrcMgr.ResourceMgrs[ponIntfID].GetFlowIDInfo(ctx, FlowPath, flowID, &flows); err != nil {
- logger.Errorw(ctx, "Error while getting flows from KV store", log.Fields{"flowId": flowID})
+ subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+ Path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
+ value, err := RsrcMgr.KVStore.Get(ctx, Path)
+ if err == nil {
+ if value != nil {
+ Val, err := toByte(value.Value)
+ if err != nil {
+ logger.Errorw(ctx, "Failed to convert flowinfo into byte array", log.Fields{"error": err, "subs": subs})
+ return nil
+ }
+ if err = json.Unmarshal(Val, &flowInfo); err != nil {
+ logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"error": err, "subs": subs})
+ return nil
+ }
+ }
+ }
+ if flowInfo.Flow == nil {
+ logger.Debugw(ctx, "No flowInfo found in KV store", log.Fields{"subs": subs})
return nil
}
- if len(flows) == 0 {
- logger.Debugw(ctx, "No flowInfo found in KV store", log.Fields{"flowPath": FlowPath})
- return nil
- }
- return &flows
+ return &flowInfo
}
// GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
// Note: For flows which trap from the NNI and not really associated with any particular
// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, PONIntfID uint32, ONUID int32, UNIID int32) []uint32 {
+func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32) ([]uint64, error) {
- FlowPath := fmt.Sprintf("%d,%d,%d", PONIntfID, ONUID, UNIID)
- if mgrs, exist := RsrcMgr.ResourceMgrs[PONIntfID]; exist {
- return mgrs.GetCurrentFlowIDsForOnu(ctx, FlowPath)
+ subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+ path := fmt.Sprintf(FlowIDPath, subs)
+
+ var data []uint64
+ value, err := RsrcMgr.KVStore.Get(ctx, path)
+ if err == nil {
+ if value != nil {
+ Val, _ := toByte(value.Value)
+ if err = json.Unmarshal(Val, &data); err != nil {
+ logger.Error(ctx, "Failed to unmarshal")
+ return nil, err
+ }
+ }
}
- return nil
+ return data, nil
}
// UpdateFlowIDInfo updates flow info for the given pon interface, onu id, and uni id
// Note: For flows which trap from the NNI and not really associated with any particular
// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDInfo(ctx context.Context, ponIntfID int32, onuID int32, uniID int32,
- flowID uint32, flowData *[]FlowInfo) error {
- FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- return RsrcMgr.ResourceMgrs[uint32(ponIntfID)].UpdateFlowIDInfoForOnu(ctx, FlowPath, flowID, *flowData)
+func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
+ flowID uint64, flowData FlowInfo) error {
+
+ subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+ path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
+
+ var value []byte
+ var err error
+ value, err = json.Marshal(flowData)
+ if err != nil {
+ logger.Errorf(ctx, "failed to Marshal, resource path %s", path)
+ return err
+ }
+
+ if err = RsrcMgr.KVStore.Put(ctx, path, value); err != nil {
+ logger.Errorf(ctx, "Failed to update resource %s", path)
+ }
+
+ // Update the flowID list for the ONU
+ if err = RsrcMgr.UpdateFlowIDForOnu(ctx, ponIntfID, onuID, uniID, flowID, true); err != nil {
+ // If the operation fails, try to remove FlowInfo from the KV store
+ _ = RsrcMgr.KVStore.Delete(ctx, path)
+ return err
+ }
+ return err
}
-// GetFlowID return flow ID for a given pon interface id, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ctx context.Context, ponIntfID uint32, ONUID int32, uniID int32,
- gemportID uint32,
- flowStoreCookie uint64,
- flowCategory string, vlanVid uint32, vlanPcp ...uint32) (uint32, error) {
+// UpdateFlowIDForOnu updates the flow_id list of the ONU (add or remove flow_id from the list)
+func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDForOnu(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint64, add bool) error {
+ /*
+ Update the flow_id list of the ONU (add or remove flow_id from the list)
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: flow ID
+ :param add: Boolean flag to indicate whether the flow_id should be
+ added or removed from the list. Defaults to adding the flow.
+ */
+ var Value []byte
+ var err error
+ var retVal bool
+ var idx uint64
+ subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+ path := fmt.Sprintf(FlowIDPath, subs)
+ flowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, ponIntfID, onuID, uniID)
+ if err != nil {
+ // Error logged in the called function
+ return err
+ }
+
+ if add {
+ if retVal, _ = checkForFlowIDInList(flowIDs, flowID); retVal {
+ return nil
+ }
+ flowIDs = append(flowIDs, flowID)
+ } else {
+ if retVal, idx = checkForFlowIDInList(flowIDs, flowID); !retVal {
+ return nil
+ }
+ // delete the index and shift
+ flowIDs = append(flowIDs[:idx], flowIDs[idx+1:]...)
+ }
+ Value, err = json.Marshal(flowIDs)
+ if err != nil {
+ logger.Error(ctx, "Failed to Marshal")
+ return err
+ }
+
+ if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
+ logger.Errorf(ctx, "Failed to update resource %s", path)
+ return err
+ }
+ return err
+}
+
+// RemoveFlowIDInfo remove flow info for the given pon interface, onu id, and uni id
+// Note: For flows which trap from the NNI and not really associated with any particular
+// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
+func (RsrcMgr *OpenOltResourceMgr) RemoveFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
+ flowID uint64) error {
+
+ subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+ path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
var err error
- FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, ONUID, uniID)
+ if err = RsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ logger.Errorf(ctx, "Failed to delete resource %s", path)
+ return err
+ }
- RsrcMgr.FlowIDMgmtLock.Lock()
- defer RsrcMgr.FlowIDMgmtLock.Unlock()
+ // Update the flowID list for the ONU
+ err = RsrcMgr.UpdateFlowIDForOnu(ctx, ponIntfID, onuID, uniID, flowID, false)
- FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(ctx, FlowPath)
- if FlowIDs != nil {
- logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "ONUID": ONUID, "uniID": uniID, "KVpath": FlowPath})
- for _, flowID := range FlowIDs {
- FlowInfo := RsrcMgr.GetFlowIDInfo(ctx, ponIntfID, int32(ONUID), int32(uniID), uint32(flowID))
- er := getFlowIDFromFlowInfo(ctx, FlowInfo, flowID, gemportID, flowStoreCookie, flowCategory, vlanVid, vlanPcp...)
- if er == nil {
- logger.Debugw(ctx, "Found flowid for the vlan, pcp, and gem",
- log.Fields{"flowID": flowID, "vlanVid": vlanVid, "vlanPcp": vlanPcp, "gemPortID": gemportID})
- return flowID, er
- }
+ return err
+}
+
+// RemoveAllFlowsForIntfOnuUniKey removes flow info for the given interface, onu id, and uni id
+func (RsrcMgr *OpenOltResourceMgr) RemoveAllFlowsForIntfOnuUniKey(ctx context.Context, intf uint32, onuID int32, uniID int32) error {
+ flowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, intf, onuID, uniID)
+ if err != nil {
+ // error logged in the called function
+ return err
+ }
+ for _, flID := range flowIDs {
+ if err := RsrcMgr.RemoveFlowIDInfo(ctx, intf, onuID, uniID, flID); err != nil {
+ logger.Errorw(ctx, "failed-to-delete-flow-id-info", log.Fields{"intf": intf, "onuID": onuID, "uniID": uniID, "flowID": flID})
}
}
- logger.Debug(ctx, "No matching flows with flow cookie or flow category, allocating new flowid")
- FlowIDs, err = RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
- ponrmgr.FLOW_ID, 1)
- if err != nil {
- logger.Errorf(ctx, "Failed to get resource for interface %d for type %s",
- ponIntfID, ponrmgr.FLOW_ID)
- return uint32(0), err
+ subs := fmt.Sprintf("%d,%d,%d", intf, onuID, uniID)
+ path := fmt.Sprintf(FlowIDPath, subs)
+ if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ logger.Errorf(ctx, "Failed to delete resource %s", path)
+ return err
}
- if FlowIDs != nil {
- _ = RsrcMgr.ResourceMgrs[ponIntfID].UpdateFlowIDForOnu(ctx, FlowPath, FlowIDs[0], true)
- return FlowIDs[0], err
- }
-
- return 0, err
+ return nil
}
// GetAllocID return the first Alloc ID for a given pon interface id and onu id and then update the resource map on
@@ -738,47 +829,6 @@
}
}
-// FreeFlowID returns the free flow id for a given interface, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) FreeFlowID(ctx context.Context, IntfID uint32, onuID int32,
- uniID int32, FlowID uint32) {
- var IntfONUID string
- var err error
-
- RsrcMgr.FlowIDMgmtLock.Lock()
- defer RsrcMgr.FlowIDMgmtLock.Unlock()
-
- FlowIds := make([]uint32, 0)
- FlowIds = append(FlowIds, FlowID)
- IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
- err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(ctx, IntfONUID, FlowID, false)
- if err != nil {
- logger.Errorw(ctx, "Failed to Update flow id for", log.Fields{"intf": IntfONUID})
- }
- RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfONUID, FlowID)
-
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowIds)
-}
-
-// FreeFlowIDs releases the flow Ids
-func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(ctx context.Context, IntfID uint32, onuID uint32,
- uniID uint32, FlowID []uint32) {
- RsrcMgr.FlowIDMgmtLock.Lock()
- defer RsrcMgr.FlowIDMgmtLock.Unlock()
-
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowID)
-
- var IntfOnuIDUniID string
- var err error
- for _, flow := range FlowID {
- IntfOnuIDUniID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
- err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(ctx, IntfOnuIDUniID, flow, false)
- if err != nil {
- logger.Errorw(ctx, "Failed to Update flow id for", log.Fields{"intf": IntfOnuIDUniID})
- }
- RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfOnuIDUniID, flow)
- }
-}
-
// FreeAllocID frees AllocID on the PON resource pool and also frees the allocID association
// for the given OLT device.
func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(ctx context.Context, IntfID uint32, onuID uint32,
@@ -826,13 +876,6 @@
GEMPortIDs)
RsrcMgr.GemPortIDMgmtLock[intfID].Unlock()
- RsrcMgr.FlowIDMgmtLock.Lock()
- FlowIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentFlowIDsForOnu(ctx, IntfOnuIDUniID)
- RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
- ponrmgr.FLOW_ID,
- FlowIDs)
- RsrcMgr.FlowIDMgmtLock.Unlock()
-
// Clear resource map associated with (pon_intf_id, gemport_id) tuple.
RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
// Clear the ONU Id associated with the (pon_intf_id, gemport_id) tuple.
@@ -841,25 +884,21 @@
}
}
-// IsFlowCookieOnKVStore checks if the given flow cookie is present on the kv store
-// Returns true if the flow cookie is found, otherwise it returns false
-func (RsrcMgr *OpenOltResourceMgr) IsFlowCookieOnKVStore(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
- flowStoreCookie uint64) bool {
+// IsFlowOnKvStore checks if the given flowID is present on the kv store
+// Returns true if the flowID is found, otherwise it returns false
+func (RsrcMgr *OpenOltResourceMgr) IsFlowOnKvStore(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
+ flowID uint64) bool {
- FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(ctx, FlowPath)
+ FlowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, ponIntfID, onuID, uniID)
+ if err != nil {
+ // error logged in the called function
+ return false
+ }
if FlowIDs != nil {
- logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "onuID": onuID, "uniID": uniID, "KVpath": FlowPath})
- for _, flowID := range FlowIDs {
- FlowInfo := RsrcMgr.GetFlowIDInfo(ctx, ponIntfID, int32(onuID), int32(uniID), uint32(flowID))
- if FlowInfo != nil {
- logger.Debugw(ctx, "Found flows", log.Fields{"flows": *FlowInfo, "flowId": flowID})
- for _, Info := range *FlowInfo {
- if Info.FlowStoreCookie == flowStoreCookie {
- logger.Debug(ctx, "Found flow matching with flowStore cookie", log.Fields{"flowId": flowID, "flowStoreCookie": flowStoreCookie})
- return true
- }
- }
+ logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "onuID": onuID, "uniID": uniID})
+ for _, id := range FlowIDs {
+ if flowID == id {
+ return true
}
}
}
@@ -1017,57 +1056,6 @@
return nil
}
-func getFlowIDFromFlowInfo(ctx context.Context, FlowInfo *[]FlowInfo, flowID, gemportID uint32, flowStoreCookie uint64, flowCategory string,
- vlanVid uint32, vlanPcp ...uint32) error {
- if FlowInfo != nil {
- for _, Info := range *FlowInfo {
- if int32(gemportID) == Info.Flow.GemportId && flowCategory != "" && Info.FlowCategory == flowCategory {
- logger.Debug(ctx, "Found flow matching with flow category", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
- if Info.FlowCategory == "HSIA_FLOW" {
- if err := checkVlanAndPbitEqualityForFlows(vlanVid, Info, vlanPcp[0]); err == nil {
- return nil
- }
- }
- }
- if int32(gemportID) == Info.Flow.GemportId && flowStoreCookie != 0 && Info.FlowStoreCookie == flowStoreCookie {
- if flowCategory != "" && Info.FlowCategory == flowCategory {
- logger.Debug(ctx, "Found flow matching with flow category", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
- return nil
- }
- }
- }
- }
- logger.Debugw(ctx, "the flow can be related to a different service", log.Fields{"flow_info": FlowInfo})
- return errors.New("invalid flow-info")
-}
-
-func checkVlanAndPbitEqualityForFlows(vlanVid uint32, Info FlowInfo, vlanPcp uint32) error {
- if err := checkVlanEqualityForFlows(vlanVid, Info); err != nil {
- return err
- }
-
- //flow has remark action and pbits
- if Info.Flow.Action.Cmd.RemarkInnerPbits || Info.Flow.Action.Cmd.RemarkOuterPbits {
- if vlanPcp == Info.Flow.Action.OPbits || vlanPcp == Info.Flow.Action.IPbits {
- return nil
- }
- } else if vlanPcp == Info.Flow.Classifier.OPbits {
- //no remark action but flow has pbits
- return nil
- } else if vlanPcp == 0xff || Info.Flow.Classifier.OPbits == 0xff {
- // no pbit found
- return nil
- }
- return errors.New("not found in terms of pbit equality")
-}
-
-func checkVlanEqualityForFlows(vlanVid uint32, Info FlowInfo) error {
- if vlanVid == Info.Flow.Action.OVid || vlanVid == Info.Flow.Classifier.IVid {
- return nil
- }
- return errors.New("not found in terms of vlan_id equality")
-}
-
//AddGemToOnuGemInfo adds gemport to onugem info kvstore
func (RsrcMgr *OpenOltResourceMgr) AddGemToOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) error {
var onuGemData []OnuGemInfo
@@ -1122,14 +1110,14 @@
if err = RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(ctx, IntfID, &onuGemData); err != nil {
logger.Errorf(ctx, "failed to get onuifo for intfid %d", IntfID)
- return olterrors.NewErrPersistence("get", "OnuGemInfo", IntfID,
+ return olterrors.NewErrPersistence("get", "OnuGemInfo", uint64(IntfID),
log.Fields{"onuGem": onuGem, "intfID": IntfID}, err)
}
onuGemData = append(onuGemData, onuGem)
err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(ctx, IntfID, onuGemData)
if err != nil {
logger.Error(ctx, "Failed to add onugem to kv store")
- return olterrors.NewErrPersistence("set", "OnuGemInfo", IntfID,
+ return olterrors.NewErrPersistence("set", "OnuGemInfo", uint64(IntfID),
log.Fields{"onuGemData": onuGemData, "intfID": IntfID}, err)
}
@@ -1212,26 +1200,30 @@
return gemPort, nil
}
-//DelGemPortPktInOfAllServices deletes the gemports from pkt in path for all services
-func (RsrcMgr *OpenOltResourceMgr) DelGemPortPktInOfAllServices(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
+//DeletePacketInGemPortForOnu deletes the packet-in gemport for ONU
+func (RsrcMgr *OpenOltResourceMgr) DeletePacketInGemPortForOnu(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
- //retrieve stored gem port from the store first.
- Path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
- logger.Debugf(ctx, "getting flows from the path:%s", Path)
- Value, err := RsrcMgr.KVStore.List(ctx, Path)
+ path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
+ value, err := RsrcMgr.KVStore.List(ctx, path)
if err != nil {
- logger.Errorf(ctx, "failed to get flows from kvstore for path %s", Path)
- return errors.New("failed to get flows from kvstore for path " + Path)
+ logger.Errorf(ctx, "failed-to-read-value-from-path-%s", path)
+ return errors.New("failed-to-read-value-from-path-" + path)
}
- logger.Debugf(ctx, "%d flows retrieved from the path:%s", len(Value), Path)
//remove them one by one
- for key := range Value {
+ for key := range value {
+ // Formulate the right key path suffix ti be delete
+ stringToBeReplaced := fmt.Sprintf(BasePathKvStore, RsrcMgr.DeviceID) + "/"
+ replacedWith := ""
+ key = strings.Replace(key, stringToBeReplaced, replacedWith, 1)
+
+ logger.Debugf(ctx, "removing-key-%s", key)
if err := RsrcMgr.KVStore.Delete(ctx, key); err != nil {
- logger.Errorf(ctx, "Falied to remove resource %s", key)
+ logger.Errorf(ctx, "failed-to-remove-resource-%s", key)
return err
}
}
+
return nil
}
@@ -1306,7 +1298,7 @@
}
//UpdateFlowIDsForGem updates flow id per gemport
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint32) error {
+func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint64) error {
var val []byte
path := fmt.Sprintf(FlowIDsForGem, intf)
@@ -1316,7 +1308,7 @@
return err
}
if flowsForGem == nil {
- flowsForGem = make(map[uint32][]uint32)
+ flowsForGem = make(map[uint32][]uint64)
}
flowsForGem[gem] = flowIDs
val, err = json.Marshal(flowsForGem)
@@ -1325,8 +1317,6 @@
return err
}
- RsrcMgr.flowIDToGemInfoLock.Lock()
- defer RsrcMgr.flowIDToGemInfoLock.Unlock()
if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
return err
@@ -1358,21 +1348,17 @@
return
}
- RsrcMgr.flowIDToGemInfoLock.Lock()
- defer RsrcMgr.flowIDToGemInfoLock.Unlock()
if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
}
}
//GetFlowIDsGemMapForInterface gets flowids per gemport and interface
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDsGemMapForInterface(ctx context.Context, intf uint32) (map[uint32][]uint32, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetFlowIDsGemMapForInterface(ctx context.Context, intf uint32) (map[uint32][]uint64, error) {
path := fmt.Sprintf(FlowIDsForGem, intf)
- var flowsForGem map[uint32][]uint32
+ var flowsForGem map[uint32][]uint64
var val []byte
- RsrcMgr.flowIDToGemInfoLock.RLock()
value, err := RsrcMgr.KVStore.Get(ctx, path)
- RsrcMgr.flowIDToGemInfoLock.RUnlock()
if err != nil {
logger.Error(ctx, "failed to get data from kv store")
return nil, err
@@ -1393,8 +1379,7 @@
//DeleteIntfIDGempMapPath deletes the intf id path used to store flow ids per gem to kvstore.
func (RsrcMgr *OpenOltResourceMgr) DeleteIntfIDGempMapPath(ctx context.Context, intf uint32) {
path := fmt.Sprintf(FlowIDsForGem, intf)
- RsrcMgr.flowIDToGemInfoLock.Lock()
- defer RsrcMgr.flowIDToGemInfoLock.Unlock()
+
if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
logger.Errorw(ctx, "Failed to delete nni interfaces from kv store", log.Fields{"path": path})
}
@@ -1538,3 +1523,32 @@
}
return false, groupInfo, nil
}
+
+// toByte converts an interface value to a []byte. The interface should either be of
+// a string type or []byte. Otherwise, an error is returned.
+func toByte(value interface{}) ([]byte, error) {
+ switch t := value.(type) {
+ case []byte:
+ return value.([]byte), nil
+ case string:
+ return []byte(value.(string)), nil
+ default:
+ return nil, fmt.Errorf("unexpected-type-%T", t)
+ }
+}
+
+func checkForFlowIDInList(FlowIDList []uint64, FlowID uint64) (bool, uint64) {
+ /*
+ Check for a flow id in a given list of flow IDs.
+ :param FLowIDList: List of Flow IDs
+ :param FlowID: Flowd to check in the list
+ : return true and the index if present false otherwise.
+ */
+
+ for idx := range FlowIDList {
+ if FlowID == FlowIDList[idx] {
+ return true, uint64(idx)
+ }
+ }
+ return false, 0
+}