VOL-4077: Improve storage usage on etcd
- Do away with unnecessary data storage on etcd if it can be
reconciled on adapter restart
- For data that needs storage, use lesser footprint if possible
- Use write-through-cache for all data stored on etcd via
resource manager module
- Use ResourceManager module per interface to localize lock
contention per PON port
Change-Id: I21d38216fab195d738a446b3f96a00251569e38b
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index a501310..737f694 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -22,17 +22,14 @@
"encoding/json"
"errors"
"fmt"
- "strconv"
"strings"
"sync"
"time"
- "github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
-
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- ponrmgr "github.com/opencord/voltha-lib-go/v4/pkg/ponresourcemanager"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ ponrmgr "github.com/opencord/voltha-lib-go/v5/pkg/ponresourcemanager"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
"github.com/opencord/voltha-protos/v4/go/openolt"
)
@@ -42,20 +39,18 @@
KvstoreTimeout = 5 * time.Second
// BasePathKvStore - <pathPrefix>/openolt/<device_id>
BasePathKvStore = "%s/openolt/{%s}"
- // TpIDPathSuffix - <(pon_id, onu_id, uni_id)>/tp_id
- TpIDPathSuffix = "{%d,%d,%d}/tp_id"
+ // tpIDPathSuffix - <(pon_id, onu_id, uni_id)>/tp_id
+ tpIDPathSuffix = "{%d,%d,%d}/tp_id"
//MeterIDPathSuffix - <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
MeterIDPathSuffix = "{%d,%d,%d}/{%d}/meter_id/{%s}"
- //NnniIntfID - nniintfids
- NnniIntfID = "nniintfids"
// OnuPacketINPathPrefix - path prefix where ONU packet-in vlanID/PCP is stored
//format: onu_packetin/{<intfid>,<onuid>,<logicalport>}
OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d}"
// OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, packetout
//format: onu_packetin/{<intfid>,<onuid>,<logicalport>}/{<vlanId>,<priority>}
OnuPacketINPath = OnuPacketINPathPrefix + "/{%d,%d}"
- //FlowIDsForGem flowids_per_gem/<intfid>
- FlowIDsForGem = "flowids_per_gem/{%d}"
+ //FlowIDsForGem flowids_per_gem/<intfid>/<gemport-id>
+ FlowIDsForGem = "flowids_per_gem/{%d}/{%d}"
//McastQueuesForIntf multicast queues for pon interfaces
McastQueuesForIntf = "mcast_qs_for_int"
//FlowGroup flow_groups/<flow_group_id>
@@ -74,9 +69,10 @@
//FlowIDPath - Path on the KV store for storing list of Flow IDs for a given subscriber
//Format: BasePathKvStore/<(pon_intf_id, onu_id, uni_id)>/flow_ids
FlowIDPath = "{%s}/flow_ids"
- //FlowIDInfoPath - Used to store more metadata associated with the flow_id
- //Format: BasePathKvStore/<(pon_intf_id, onu_id, uni_id)>/flow_id_info/<flow_id>
- FlowIDInfoPath = "{%s}/flow_id_info/{%d}"
+
+ //OnuGemInfoPath is path on the kvstore to store onugem info map
+ //format: <device-id>/onu_gem_info/<intfid>
+ OnuGemInfoPath = "onu_gem_info/{%d}/{%d}" // onu_gem/<intfid>/<onuID>
)
// FlowInfo holds the flow information
@@ -111,12 +107,13 @@
// MeterInfo store meter information at path <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
type MeterInfo struct {
- RefCnt uint8 // number of flow references for this meter. When RefCnt is 0, the MeterInfo should be deleted.
- MeterConfig ofp.OfpMeterConfig
+ RefCnt uint8 // number of flow references for this meter. When RefCnt is 0, the MeterInfo should be deleted.
+ MeterID uint32
}
// OpenOltResourceMgr holds resource related information as provided below for each field
type OpenOltResourceMgr struct {
+ PonIntfID uint32
DeviceID string // OLT device id
Address string // Host and port of the kv store to connect to
Args string // args
@@ -124,14 +121,40 @@
DeviceType string
DevInfo *openolt.DeviceInfo // device information
// array of pon resource managers per interface technology
- ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+ PonRsrMgr *ponrmgr.PONResourceManager
- // This protects concurrent gemport_id allocate/delete calls on a per PON port basis
- GemPortIDMgmtLock []sync.RWMutex
- // This protects concurrent alloc_id allocate/delete calls on a per PON port basis
- AllocIDMgmtLock []sync.RWMutex
- // This protects concurrent onu_id allocate/delete calls on a per PON port basis
- OnuIDMgmtLock []sync.RWMutex
+ // Local maps used for write-through-cache - start
+ flowIDsForOnu map[string][]uint64
+ flowIDsForOnuLock sync.RWMutex
+
+ allocIDsForOnu map[string][]uint32
+ allocIDsForOnuLock sync.RWMutex
+
+ gemPortIDsForOnu map[string][]uint32
+ gemPortIDsForOnuLock sync.RWMutex
+
+ techProfileIDsForOnu map[string][]uint32
+ techProfileIDsForOnuLock sync.RWMutex
+
+ meterInfoForOnu map[string]*MeterInfo
+ meterInfoForOnuLock sync.RWMutex
+
+ onuGemInfo map[string]*OnuGemInfo
+ onuGemInfoLock sync.RWMutex
+
+ gemPortForPacketInInfo map[string]uint32
+ gemPortForPacketInInfoLock sync.RWMutex
+
+ flowIDsForGem map[uint32][]uint64
+ flowIDsForGemLock sync.RWMutex
+
+ mcastQueueForIntf map[uint32][]uint32
+ mcastQueueForIntfLock sync.RWMutex
+ mcastQueueForIntfLoadedFromKvStore bool
+
+ groupInfo map[string]*GroupInfo
+ groupInfoLock sync.RWMutex
+ // Local maps used for write-through-cache - end
}
func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
@@ -152,6 +175,7 @@
logger.Fatalw(ctx, "Failed to init KV client\n", log.Fields{"err": err})
return nil
}
+ // return db.NewBackend(ctx, backend, addr, KvstoreTimeout, fmt.Sprintf(BasePathKvStore, basePathKvStore, DeviceID))
kvbackend := &db.Backend{
Client: kvClient,
@@ -166,346 +190,163 @@
// NewResourceMgr init a New resource manager instance which in turn instantiates pon resource manager
// instances according to technology. Initializes the default resource ranges for all
// the resources.
-func NewResourceMgr(ctx context.Context, deviceID string, KVStoreAddress string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo, basePathKvStore string) *OpenOltResourceMgr {
+func NewResourceMgr(ctx context.Context, PonIntfID uint32, deviceID string, KVStoreAddress string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo, basePathKvStore string) *OpenOltResourceMgr {
var ResourceMgr OpenOltResourceMgr
- logger.Debugf(ctx, "Init new resource manager , address: %s, device-id: %s", KVStoreAddress, deviceID)
+ logger.Debugf(ctx, "Init new resource manager , ponIf: %v, address: %s, device-id: %s", PonIntfID, KVStoreAddress, deviceID)
+ ResourceMgr.PonIntfID = PonIntfID
ResourceMgr.DeviceID = deviceID
ResourceMgr.Address = KVStoreAddress
ResourceMgr.DeviceType = deviceType
ResourceMgr.DevInfo = devInfo
- NumPONPorts := devInfo.GetPonPorts()
Backend := kvStoreType
ResourceMgr.KVStore = SetKVClient(ctx, Backend, ResourceMgr.Address, deviceID, basePathKvStore)
if ResourceMgr.KVStore == nil {
logger.Error(ctx, "Failed to setup KV store")
}
- Ranges := make(map[string]*openolt.DeviceInfo_DeviceResourceRanges)
- RsrcMgrsByTech := make(map[string]*ponrmgr.PONResourceManager)
- ResourceMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
-
- ResourceMgr.AllocIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
- ResourceMgr.GemPortIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
- ResourceMgr.OnuIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
// TODO self.args = registry('main').get_args()
- /*
- If a legacy driver returns protobuf without any ranges,s synthesize one from
- the legacy global per-device information. This, in theory, is temporary until
- the legacy drivers are upgrade to support pool ranges.
- */
- if devInfo.Ranges == nil {
- var ranges openolt.DeviceInfo_DeviceResourceRanges
- ranges.Technology = devInfo.GetTechnology()
-
- var index uint32
- for index = 0; index < NumPONPorts; index++ {
- ranges.IntfIds = append(ranges.IntfIds, index)
- }
-
- var Pool openolt.DeviceInfo_DeviceResourceRanges_Pool
- Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_ONU_ID
- Pool.Start = devInfo.OnuIdStart
- Pool.End = devInfo.OnuIdEnd
- Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF
- onuPool := Pool
- ranges.Pools = append(ranges.Pools, &onuPool)
-
- Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_ALLOC_ID
- Pool.Start = devInfo.AllocIdStart
- Pool.End = devInfo.AllocIdEnd
- Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
- allocPool := Pool
- ranges.Pools = append(ranges.Pools, &allocPool)
-
- Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_GEMPORT_ID
- Pool.Start = devInfo.GemportIdStart
- Pool.End = devInfo.GemportIdEnd
- Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
- gemPool := Pool
- ranges.Pools = append(ranges.Pools, &gemPool)
-
- Pool.Type = openolt.DeviceInfo_DeviceResourceRanges_Pool_FLOW_ID
- Pool.Start = devInfo.FlowIdStart
- Pool.End = devInfo.FlowIdEnd
- Pool.Sharing = openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH
- ranges.Pools = append(ranges.Pools, &Pool)
- // Add to device info
- devInfo.Ranges = append(devInfo.Ranges, &ranges)
- }
-
// Create a separate Resource Manager instance for each range. This assumes that
// each technology is represented by only a single range
- var GlobalPONRsrcMgr *ponrmgr.PONResourceManager
- var err error
for _, TechRange := range devInfo.Ranges {
- technology := TechRange.Technology
- logger.Debugf(ctx, "Device info technology %s", technology)
- Ranges[technology] = TechRange
+ for _, intfID := range TechRange.IntfIds {
+ if intfID == PonIntfID {
+ technology := TechRange.Technology
+ logger.Debugf(ctx, "Device info technology %s, intf-id %v", technology, PonIntfID)
- RsrcMgrsByTech[technology], err = ponrmgr.NewPONResourceManager(ctx, technology, deviceType, deviceID,
- Backend, ResourceMgr.Address, basePathKvStore)
- if err != nil {
- logger.Errorf(ctx, "Failed to create pon resource manager instance for technology %s", technology)
- return nil
+ rsrMgr, err := ponrmgr.NewPONResourceManager(ctx, technology, deviceType, deviceID,
+ Backend, ResourceMgr.Address, basePathKvStore)
+ if err != nil {
+ logger.Errorf(ctx, "Failed to create pon resource manager instance for technology %s", technology)
+ return nil
+ }
+ ResourceMgr.PonRsrMgr = rsrMgr
+ // self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
+ InitializeDeviceResourceRangeAndPool(ctx, rsrMgr, TechRange, devInfo)
+ if err := ResourceMgr.PonRsrMgr.InitDeviceResourcePoolForIntf(ctx, intfID); err != nil {
+ logger.Fatal(ctx, "failed-to-initialize-device-resource-pool-intf-id-%v-device-id", ResourceMgr.PonIntfID, ResourceMgr.DeviceID)
+ return nil
+ }
+ }
}
- // resource_mgrs_by_tech[technology] = resource_mgr
- if GlobalPONRsrcMgr == nil {
- GlobalPONRsrcMgr = RsrcMgrsByTech[technology]
- }
- for _, IntfID := range TechRange.IntfIds {
- ResourceMgr.ResourceMgrs[IntfID] = RsrcMgrsByTech[technology]
- }
- // self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
- InitializeDeviceResourceRangeAndPool(ctx, RsrcMgrsByTech[technology], GlobalPONRsrcMgr,
- TechRange, devInfo)
}
- // After we have initialized resource ranges, initialize the
- // resource pools accordingly.
- for _, PONRMgr := range RsrcMgrsByTech {
- _ = PONRMgr.InitDeviceResourcePool(ctx)
- }
+
+ ResourceMgr.InitLocalCache()
+
logger.Info(ctx, "Initialization of resource manager success!")
return &ResourceMgr
}
+//InitLocalCache initializes local maps used for write-through-cache
+func (rsrcMgr *OpenOltResourceMgr) InitLocalCache() {
+ rsrcMgr.flowIDsForOnu = make(map[string][]uint64)
+ rsrcMgr.allocIDsForOnu = make(map[string][]uint32)
+ rsrcMgr.gemPortIDsForOnu = make(map[string][]uint32)
+ rsrcMgr.techProfileIDsForOnu = make(map[string][]uint32)
+ rsrcMgr.meterInfoForOnu = make(map[string]*MeterInfo)
+ rsrcMgr.onuGemInfo = make(map[string]*OnuGemInfo)
+ rsrcMgr.gemPortForPacketInInfo = make(map[string]uint32)
+ rsrcMgr.flowIDsForGem = make(map[uint32][]uint64)
+ rsrcMgr.mcastQueueForIntf = make(map[uint32][]uint32)
+ rsrcMgr.groupInfo = make(map[string]*GroupInfo)
+}
+
// InitializeDeviceResourceRangeAndPool initializes the resource range pool according to the sharing type, then apply
// device specific information. If KV doesn't exist
// or is broader than the device, the device's information will
// dictate the range limits
-func InitializeDeviceResourceRangeAndPool(ctx context.Context, ponRMgr *ponrmgr.PONResourceManager, globalPONRMgr *ponrmgr.PONResourceManager,
+func InitializeDeviceResourceRangeAndPool(ctx context.Context, ponRMgr *ponrmgr.PONResourceManager,
techRange *openolt.DeviceInfo_DeviceResourceRanges, devInfo *openolt.DeviceInfo) {
+ // var ONUIDShared, AllocIDShared, GEMPortIDShared openolt.DeviceInfo_DeviceResourceRanges_Pool_SharingType
+ var ONUIDStart, ONUIDEnd, AllocIDStart, AllocIDEnd, GEMPortIDStart, GEMPortIDEnd uint32
+ var ONUIDShared, AllocIDShared, GEMPortIDShared, FlowIDShared uint32
+
+ // The below variables are just dummy and needed to pass as arguments to InitDefaultPONResourceRanges function.
+ // The openolt adapter does not need flowIDs to be managed as it is managed on the OLT device
+ // The UNI IDs are dynamically generated by openonu adapter for every discovered UNI.
+ var flowIDDummyStart, flowIDDummyEnd uint32 = 1, 2
+ var uniIDDummyStart, uniIDDummyEnd uint32 = 0, 1
// init the resource range pool according to the sharing type
-
- logger.Debugf(ctx, "Resource range pool init for technology %s", ponRMgr.Technology)
- // first load from KV profiles
- status := ponRMgr.InitResourceRangesFromKVStore(ctx)
- if !status {
- logger.Debugf(ctx, "Failed to load resource ranges from KV store for tech %s", ponRMgr.Technology)
- }
-
- /*
- Then apply device specific information. If KV doesn't exist
- or is broader than the device, the device's information will
- dictate the range limits
- */
- logger.Debugw(ctx, "Using device info to init pon resource ranges", log.Fields{"Tech": ponRMgr.Technology})
-
- ONUIDStart := devInfo.OnuIdStart
- ONUIDEnd := devInfo.OnuIdEnd
- ONUIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF
- ONUIDSharedPoolID := uint32(0)
- AllocIDStart := devInfo.AllocIdStart
- AllocIDEnd := devInfo.AllocIdEnd
- AllocIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
- AllocIDSharedPoolID := uint32(0)
- GEMPortIDStart := devInfo.GemportIdStart
- GEMPortIDEnd := devInfo.GemportIdEnd
- GEMPortIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
- GEMPortIDSharedPoolID := uint32(0)
- FlowIDStart := devInfo.FlowIdStart
- FlowIDEnd := devInfo.FlowIdEnd
- FlowIDShared := openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH // TODO EdgeCore/BAL limitation
- FlowIDSharedPoolID := uint32(0)
-
- var FirstIntfPoolID uint32
- var SharedPoolID uint32
-
- /*
- * As a zero check is made against SharedPoolID to check whether the resources are shared across all intfs
- * if resources are shared across interfaces then SharedPoolID is given a positive number.
- */
- for _, FirstIntfPoolID = range techRange.IntfIds {
- // skip the intf id 0
- if FirstIntfPoolID == 0 {
- continue
- }
- break
- }
-
+ logger.Debugw(ctx, "Device info init", log.Fields{"technology": techRange.Technology,
+ "onu_id_start": ONUIDStart, "onu_id_end": ONUIDEnd,
+ "alloc_id_start": AllocIDStart, "alloc_id_end": AllocIDEnd,
+ "gemport_id_start": GEMPortIDStart, "gemport_id_end": GEMPortIDEnd,
+ "intf_ids": techRange.IntfIds,
+ })
for _, RangePool := range techRange.Pools {
- if RangePool.Sharing == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
- SharedPoolID = FirstIntfPoolID
- } else if RangePool.Sharing == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_SAME_TECH {
- SharedPoolID = FirstIntfPoolID
- } else {
- SharedPoolID = 0
- }
+ // FIXME: Remove hardcoding
if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_ONU_ID {
ONUIDStart = RangePool.Start
ONUIDEnd = RangePool.End
- ONUIDShared = RangePool.Sharing
- ONUIDSharedPoolID = SharedPoolID
+ ONUIDShared = uint32(RangePool.Sharing)
} else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_ALLOC_ID {
AllocIDStart = RangePool.Start
AllocIDEnd = RangePool.End
- AllocIDShared = RangePool.Sharing
- AllocIDSharedPoolID = SharedPoolID
+ AllocIDShared = uint32(RangePool.Sharing)
} else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_GEMPORT_ID {
GEMPortIDStart = RangePool.Start
GEMPortIDEnd = RangePool.End
- GEMPortIDShared = RangePool.Sharing
- GEMPortIDSharedPoolID = SharedPoolID
- } else if RangePool.Type == openolt.DeviceInfo_DeviceResourceRanges_Pool_FLOW_ID {
- FlowIDStart = RangePool.Start
- FlowIDEnd = RangePool.End
- FlowIDShared = RangePool.Sharing
- FlowIDSharedPoolID = SharedPoolID
+ GEMPortIDShared = uint32(RangePool.Sharing)
}
}
- logger.Debugw(ctx, "Device info init", log.Fields{"technology": techRange.Technology,
- "onu_id_start": ONUIDStart, "onu_id_end": ONUIDEnd, "onu_id_shared_pool_id": ONUIDSharedPoolID,
- "alloc_id_start": AllocIDStart, "alloc_id_end": AllocIDEnd,
- "alloc_id_shared_pool_id": AllocIDSharedPoolID,
- "gemport_id_start": GEMPortIDStart, "gemport_id_end": GEMPortIDEnd,
- "gemport_id_shared_pool_id": GEMPortIDSharedPoolID,
- "flow_id_start": FlowIDStart,
- "flow_id_end_idx": FlowIDEnd,
- "flow_id_shared_pool_id": FlowIDSharedPoolID,
- "intf_ids": techRange.IntfIds,
- "uni_id_start": 0,
- "uni_id_end_idx": 1, /*MaxUNIIDperONU()*/
- })
-
- ponRMgr.InitDefaultPONResourceRanges(ctx, ONUIDStart, ONUIDEnd, ONUIDSharedPoolID,
- AllocIDStart, AllocIDEnd, AllocIDSharedPoolID,
- GEMPortIDStart, GEMPortIDEnd, GEMPortIDSharedPoolID,
- FlowIDStart, FlowIDEnd, FlowIDSharedPoolID, 0, 1,
+ ponRMgr.InitDefaultPONResourceRanges(ctx, ONUIDStart, ONUIDEnd, ONUIDShared,
+ AllocIDStart, AllocIDEnd, AllocIDShared,
+ GEMPortIDStart, GEMPortIDEnd, GEMPortIDShared,
+ flowIDDummyStart, flowIDDummyEnd, FlowIDShared, uniIDDummyStart, uniIDDummyEnd,
devInfo.PonPorts, techRange.IntfIds)
- // For global sharing, make sure to refresh both local and global resource manager instances' range
-
- if ONUIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
- globalPONRMgr.UpdateRanges(ctx, ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
- "", 0, nil)
- ponRMgr.UpdateRanges(ctx, ponrmgr.ONU_ID_START_IDX, ONUIDStart, ponrmgr.ONU_ID_END_IDX, ONUIDEnd,
- "", 0, globalPONRMgr)
- }
- if AllocIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
- globalPONRMgr.UpdateRanges(ctx, ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
- "", 0, nil)
-
- ponRMgr.UpdateRanges(ctx, ponrmgr.ALLOC_ID_START_IDX, AllocIDStart, ponrmgr.ALLOC_ID_END_IDX, AllocIDEnd,
- "", 0, globalPONRMgr)
- }
- if GEMPortIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
- globalPONRMgr.UpdateRanges(ctx, ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
- "", 0, nil)
- ponRMgr.UpdateRanges(ctx, ponrmgr.GEMPORT_ID_START_IDX, GEMPortIDStart, ponrmgr.GEMPORT_ID_END_IDX, GEMPortIDEnd,
- "", 0, globalPONRMgr)
- }
- if FlowIDShared == openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH {
- globalPONRMgr.UpdateRanges(ctx, ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
- "", 0, nil)
- ponRMgr.UpdateRanges(ctx, ponrmgr.FLOW_ID_START_IDX, FlowIDStart, ponrmgr.FLOW_ID_END_IDX, FlowIDEnd,
- "", 0, globalPONRMgr)
- }
-
- // Make sure loaded range fits the platform bit encoding ranges
- ponRMgr.UpdateRanges(ctx, ponrmgr.UNI_ID_START_IDX, 0, ponrmgr.UNI_ID_END_IDX /* TODO =OpenOltPlatform.MAX_UNIS_PER_ONU-1*/, 1, "", 0, nil)
}
// Delete clears used resources for the particular olt device being deleted
-func (RsrcMgr *OpenOltResourceMgr) Delete(ctx context.Context) error {
- /* TODO
- def __del__(self):
- self.log.info("clearing-device-resource-pool")
- for key, resource_mgr in self.resource_mgrs.iteritems():
- resource_mgr.clear_device_resource_pool()
-
- def assert_pon_id_limit(self, pon_intf_id):
- assert pon_intf_id in self.resource_mgrs
-
- def assert_onu_id_limit(self, pon_intf_id, onu_id):
- self.assert_pon_id_limit(pon_intf_id)
- self.resource_mgrs[pon_intf_id].assert_resource_limits(onu_id, PONResourceManager.ONU_ID)
-
- @property
- def max_uni_id_per_onu(self):
- return 0 #OpenOltPlatform.MAX_UNIS_PER_ONU-1, zero-based indexing Uncomment or override to make default multi-uni
-
- def assert_uni_id_limit(self, pon_intf_id, onu_id, uni_id):
- self.assert_onu_id_limit(pon_intf_id, onu_id)
- self.resource_mgrs[pon_intf_id].assert_resource_limits(uni_id, PONResourceManager.UNI_ID)
- */
- for _, rsrcMgr := range RsrcMgr.ResourceMgrs {
- if err := rsrcMgr.ClearDeviceResourcePool(ctx); err != nil {
- logger.Debug(ctx, "Failed to clear device resource pool")
- return err
- }
+func (rsrcMgr *OpenOltResourceMgr) Delete(ctx context.Context, intfID uint32) error {
+ if err := rsrcMgr.PonRsrMgr.ClearDeviceResourcePoolForIntf(ctx, intfID); err != nil {
+ logger.Debug(ctx, "Failed to clear device resource pool")
+ return err
}
logger.Debug(ctx, "Cleared device resource pool")
return nil
}
-// GetONUID returns the available OnuID for the given pon-port
-func (RsrcMgr *OpenOltResourceMgr) GetONUID(ctx context.Context, ponIntfID uint32) (uint32, error) {
- // Check if Pon Interface ID is present in Resource-manager-map
- RsrcMgr.OnuIDMgmtLock[ponIntfID].Lock()
- defer RsrcMgr.OnuIDMgmtLock[ponIntfID].Unlock()
-
- if _, ok := RsrcMgr.ResourceMgrs[ponIntfID]; !ok {
- err := errors.New("invalid-pon-interface-" + strconv.Itoa(int(ponIntfID)))
- return 0, err
- }
+// GetONUID returns the available onuID for the given pon-port
+func (rsrcMgr *OpenOltResourceMgr) GetONUID(ctx context.Context, PonIntfID uint32) (uint32, error) {
// Get ONU id for a provided pon interface ID.
- onuID, err := RsrcMgr.ResourceMgrs[ponIntfID].TechProfileMgr.GetResourceID(ctx, ponIntfID,
+ onuID, err := rsrcMgr.PonRsrMgr.TechProfileMgr.GetResourceID(ctx, PonIntfID,
ponrmgr.ONU_ID, 1)
if err != nil {
logger.Errorf(ctx, "Failed to get resource for interface %d for type %s",
- ponIntfID, ponrmgr.ONU_ID)
+ PonIntfID, ponrmgr.ONU_ID)
return 0, err
}
- if onuID != nil {
- RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(ctx, fmt.Sprintf("%d,%d", ponIntfID, onuID[0]))
+ if len(onuID) > 0 {
+ rsrcMgr.PonRsrMgr.InitResourceMap(ctx, fmt.Sprintf("%d,%d", PonIntfID, onuID[0]))
return onuID[0], err
}
- return 0, err // return OnuID 0 on error
-}
-
-// GetFlowIDInfo returns the slice of flow info of the given pon-port
-// Note: For flows which trap from the NNI and not really associated with any particular
-// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint64) *FlowInfo {
- var flowInfo FlowInfo
-
- subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- Path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
- value, err := RsrcMgr.KVStore.Get(ctx, Path)
- if err == nil {
- if value != nil {
- Val, err := toByte(value.Value)
- if err != nil {
- logger.Errorw(ctx, "Failed to convert flowinfo into byte array", log.Fields{"error": err, "subs": subs})
- return nil
- }
- if err = json.Unmarshal(Val, &flowInfo); err != nil {
- logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"error": err, "subs": subs})
- return nil
- }
- }
- }
- if flowInfo.Flow == nil {
- logger.Debugw(ctx, "No flowInfo found in KV store", log.Fields{"subs": subs})
- return nil
- }
- return &flowInfo
+ return 0, err // return onuID 0 on error
}
// GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
// Note: For flows which trap from the NNI and not really associated with any particular
// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32) ([]uint64, error) {
+func (rsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, PonIntfID uint32, onuID int32, uniID int32) ([]uint64, error) {
- subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+ subs := fmt.Sprintf("%d,%d,%d", PonIntfID, onuID, uniID)
path := fmt.Sprintf(FlowIDPath, subs)
+ // fetch from cache
+ rsrcMgr.flowIDsForOnuLock.RLock()
+ flowIDsForOnu, ok := rsrcMgr.flowIDsForOnu[path]
+ rsrcMgr.flowIDsForOnuLock.RUnlock()
+
+ if ok {
+ return flowIDsForOnu, nil
+ }
+
var data []uint64
- value, err := RsrcMgr.KVStore.Get(ctx, path)
+ value, err := rsrcMgr.KVStore.Get(ctx, path)
if err == nil {
if value != nil {
Val, _ := toByte(value.Value)
@@ -515,339 +356,126 @@
}
}
}
+ // update cache
+ rsrcMgr.flowIDsForOnuLock.Lock()
+ rsrcMgr.flowIDsForOnu[path] = data
+ rsrcMgr.flowIDsForOnuLock.Unlock()
+
return data, nil
}
-// UpdateFlowIDInfo updates flow info for the given pon interface, onu id, and uni id
-// Note: For flows which trap from the NNI and not really associated with any particular
-// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
- flowID uint64, flowData FlowInfo) error {
-
- subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
-
- var value []byte
- var err error
- value, err = json.Marshal(flowData)
- if err != nil {
- logger.Errorf(ctx, "failed to Marshal, resource path %s", path)
- return err
- }
-
- if err = RsrcMgr.KVStore.Put(ctx, path, value); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", path)
- }
-
- // Update the flowID list for the ONU
- if err = RsrcMgr.UpdateFlowIDForOnu(ctx, ponIntfID, onuID, uniID, flowID, true); err != nil {
- // If the operation fails, try to remove FlowInfo from the KV store
- _ = RsrcMgr.KVStore.Delete(ctx, path)
- return err
- }
- return err
-}
-
-// UpdateFlowIDForOnu updates the flow_id list of the ONU (add or remove flow_id from the list)
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDForOnu(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint64, add bool) error {
- /*
- Update the flow_id list of the ONU (add or remove flow_id from the list)
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :param flow_id: flow ID
- :param add: Boolean flag to indicate whether the flow_id should be
- added or removed from the list. Defaults to adding the flow.
- */
- var Value []byte
- var err error
- var retVal bool
- var idx uint64
- subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- path := fmt.Sprintf(FlowIDPath, subs)
- flowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, ponIntfID, onuID, uniID)
- if err != nil {
- // Error logged in the called function
- return err
- }
-
- if add {
- if retVal, _ = checkForFlowIDInList(flowIDs, flowID); retVal {
- return nil
- }
- flowIDs = append(flowIDs, flowID)
- } else {
- if retVal, idx = checkForFlowIDInList(flowIDs, flowID); !retVal {
- return nil
- }
- // delete the index and shift
- flowIDs = append(flowIDs[:idx], flowIDs[idx+1:]...)
- }
- Value, err = json.Marshal(flowIDs)
- if err != nil {
- logger.Error(ctx, "Failed to Marshal")
- return err
- }
-
- if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", path)
- return err
- }
- return err
-}
-
-// RemoveFlowIDInfo remove flow info for the given pon interface, onu id, and uni id
-// Note: For flows which trap from the NNI and not really associated with any particular
-// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) RemoveFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
- flowID uint64) error {
-
- subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
- path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
-
- var err error
- if err = RsrcMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorf(ctx, "Failed to delete resource %s", path)
- return err
- }
-
- // Update the flowID list for the ONU
- err = RsrcMgr.UpdateFlowIDForOnu(ctx, ponIntfID, onuID, uniID, flowID, false)
-
- return err
-}
-
-// RemoveAllFlowsForIntfOnuUniKey removes flow info for the given interface, onu id, and uni id
-func (RsrcMgr *OpenOltResourceMgr) RemoveAllFlowsForIntfOnuUniKey(ctx context.Context, intf uint32, onuID int32, uniID int32) error {
- flowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, intf, onuID, uniID)
- if err != nil {
- // error logged in the called function
- return err
- }
- for _, flID := range flowIDs {
- if err := RsrcMgr.RemoveFlowIDInfo(ctx, intf, onuID, uniID, flID); err != nil {
- logger.Errorw(ctx, "failed-to-delete-flow-id-info", log.Fields{"intf": intf, "onuID": onuID, "uniID": uniID, "flowID": flID})
- }
- }
- subs := fmt.Sprintf("%d,%d,%d", intf, onuID, uniID)
- path := fmt.Sprintf(FlowIDPath, subs)
- if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorf(ctx, "Failed to delete resource %s", path)
- return err
- }
- return nil
-}
-
-// GetAllocID return the first Alloc ID for a given pon interface id and onu id and then update the resource map on
-// the KV store with the list of alloc_ids allocated for the pon_intf_onu_id tuple
-// Currently of all the alloc_ids available, it returns the first alloc_id in the list for tha given ONU
-func (RsrcMgr *OpenOltResourceMgr) GetAllocID(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) uint32 {
-
- var err error
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
-
- RsrcMgr.AllocIDMgmtLock[intfID].Lock()
- defer RsrcMgr.AllocIDMgmtLock[intfID].Unlock()
-
- AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
- if AllocID != nil {
- // Since we support only one alloc_id for the ONU at the moment,
- // return the first alloc_id in the list, if available, for that
- // ONU.
- logger.Debugw(ctx, "Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
- return AllocID[0]
- }
- AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(ctx, intfID,
- ponrmgr.ALLOC_ID, 1)
-
- if AllocID == nil || err != nil {
- logger.Error(ctx, "Failed to allocate alloc id")
- return 0
- }
- // update the resource map on KV store with the list of alloc_id
- // allocated for the pon_intf_onu_id tuple
- err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(ctx, IntfOnuIDUniID, AllocID)
- if err != nil {
- logger.Error(ctx, "Failed to update Alloc ID")
- return 0
- }
- logger.Debugw(ctx, "Allocated new Tcont from pon resource mgr", log.Fields{"AllocID": AllocID})
- return AllocID[0]
-}
-
// UpdateAllocIdsForOnu updates alloc ids in kv store for a given pon interface id, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) UpdateAllocIdsForOnu(ctx context.Context, ponPort uint32, onuID uint32, uniID uint32, allocID []uint32) error {
+func (rsrcMgr *OpenOltResourceMgr) UpdateAllocIdsForOnu(ctx context.Context, ponPort uint32, onuID uint32, uniID uint32, allocIDs []uint32) error {
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
- return RsrcMgr.ResourceMgrs[ponPort].UpdateAllocIdsForOnu(ctx, IntfOnuIDUniID,
- allocID)
+ intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
+ // update cache
+ rsrcMgr.allocIDsForOnuLock.Lock()
+ rsrcMgr.allocIDsForOnu[intfOnuIDuniID] = allocIDs
+ rsrcMgr.allocIDsForOnuLock.Unlock()
+
+ // Note: in case the write to DB fails there could be inconsistent data between cache and db.
+ // Although this is highly unlikely with DB retries in place, this is something we have to deal with in the next release
+ return rsrcMgr.PonRsrMgr.UpdateAllocIdsForOnu(ctx, intfOnuIDuniID,
+ allocIDs)
}
// GetCurrentGEMPortIDsForOnu returns gem ports for given pon interface , onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentGEMPortIDsForOnu(ctx context.Context, intfID uint32, onuID uint32,
+func (rsrcMgr *OpenOltResourceMgr) GetCurrentGEMPortIDsForOnu(ctx context.Context, intfID uint32, onuID uint32,
uniID uint32) []uint32 {
- /* Get gem ports for given pon interface , onu id and uni id. */
+ intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- return RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
+ // fetch from cache
+ rsrcMgr.gemPortIDsForOnuLock.RLock()
+ gemIDs, ok := rsrcMgr.gemPortIDsForOnu[intfOnuIDuniID]
+ rsrcMgr.gemPortIDsForOnuLock.RUnlock()
+ if ok {
+ return gemIDs
+ }
+ /* Get gem ports for given pon interface , onu id and uni id. */
+ gemIDs = rsrcMgr.PonRsrMgr.GetCurrentGEMPortIDsForOnu(ctx, intfOnuIDuniID)
+
+ // update cache
+ rsrcMgr.gemPortIDsForOnuLock.Lock()
+ rsrcMgr.gemPortIDsForOnu[intfOnuIDuniID] = gemIDs
+ rsrcMgr.gemPortIDsForOnuLock.Unlock()
+
+ return gemIDs
}
// GetCurrentAllocIDsForOnu returns alloc ids for given pon interface and onu id
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentAllocIDsForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) []uint32 {
+func (rsrcMgr *OpenOltResourceMgr) GetCurrentAllocIDsForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) []uint32 {
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
- if AllocID != nil {
- return AllocID
+ intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
+ // fetch from cache
+ rsrcMgr.allocIDsForOnuLock.RLock()
+ allocIDs, ok := rsrcMgr.allocIDsForOnu[intfOnuIDuniID]
+ rsrcMgr.allocIDsForOnuLock.RUnlock()
+ if ok {
+ return allocIDs
}
- return []uint32{}
+ allocIDs = rsrcMgr.PonRsrMgr.GetCurrentAllocIDForOnu(ctx, intfOnuIDuniID)
+
+ // update cache
+ rsrcMgr.allocIDsForOnuLock.Lock()
+ rsrcMgr.allocIDsForOnu[intfOnuIDuniID] = allocIDs
+ rsrcMgr.allocIDsForOnuLock.Unlock()
+
+ return allocIDs
}
// RemoveAllocIDForOnu removes the alloc id for given pon interface, onu id, uni id and alloc id
-func (RsrcMgr *OpenOltResourceMgr) RemoveAllocIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, allocID uint32) {
- allocIDs := RsrcMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
+func (rsrcMgr *OpenOltResourceMgr) RemoveAllocIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, allocID uint32) {
+ allocIDs := rsrcMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
for i := 0; i < len(allocIDs); i++ {
if allocIDs[i] == allocID {
allocIDs = append(allocIDs[:i], allocIDs[i+1:]...)
break
}
}
- err := RsrcMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocIDs)
+ err := rsrcMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocIDs)
if err != nil {
- logger.Errorf(ctx, "Failed to Remove Alloc Id For Onu. IntfID %d onuID %d uniID %d allocID %d",
+ logger.Errorf(ctx, "Failed to Remove Alloc Id For Onu. intfID %d onuID %d uniID %d allocID %d",
intfID, onuID, uniID, allocID)
}
}
// RemoveGemPortIDForOnu removes the gem port id for given pon interface, onu id, uni id and gem port id
-func (RsrcMgr *OpenOltResourceMgr) RemoveGemPortIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortID uint32) {
- gemPortIDs := RsrcMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
+func (rsrcMgr *OpenOltResourceMgr) RemoveGemPortIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortID uint32) {
+ gemPortIDs := rsrcMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
for i := 0; i < len(gemPortIDs); i++ {
if gemPortIDs[i] == gemPortID {
gemPortIDs = append(gemPortIDs[:i], gemPortIDs[i+1:]...)
break
}
}
- err := RsrcMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs)
+ err := rsrcMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs)
if err != nil {
- logger.Errorf(ctx, "Failed to Remove Gem Id For Onu. IntfID %d onuID %d uniID %d gemPortId %d",
+ logger.Errorf(ctx, "Failed to Remove Gem Id For Onu. intfID %d onuID %d uniID %d gemPortId %d",
intfID, onuID, uniID, gemPortID)
}
}
-//GetUniPortByPonPortGemPortFromKVStore retrieves onu and uni ID associated with the pon and gem ports.
-func (RsrcMgr *OpenOltResourceMgr) GetUniPortByPonPortGemPortFromKVStore(ctx context.Context, PonPort uint32, GemPort uint32) (uint32, uint32, error) {
- IntfGEMPortPath := fmt.Sprintf("%d,%d", PonPort, GemPort)
- logger.Debugf(ctx, "Getting ONU and UNI IDs from the path %s", IntfGEMPortPath)
- var Data []uint32
- Value, err := RsrcMgr.KVStore.Get(ctx, IntfGEMPortPath)
- if err == nil {
- if Value != nil {
- Val, _ := ponrmgr.ToByte(Value.Value)
- if err = json.Unmarshal(Val, &Data); err != nil {
- logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"error": err})
- return 0, 0, errors.New("failed to unmarshal the data retrieved")
- }
- }
- } else {
- logger.Errorf(ctx, "Failed to get data from kvstore for %s", IntfGEMPortPath, err)
- return 0, 0, errors.New("could not get data")
- }
- if len(Data) < 2 {
- return 0, 0, errors.New("invalid data format")
- }
- return Data[0], Data[1], nil
-}
-
-// UpdateGEMportsPonportToOnuMapOnKVStore updates onu and uni id associated with the gem port to the kv store
-// This stored information is used when packet_indication is received and we need to derive the ONU Id for which
-// the packet arrived based on the pon_intf and gemport available in the packet_indication
-func (RsrcMgr *OpenOltResourceMgr) UpdateGEMportsPonportToOnuMapOnKVStore(ctx context.Context, gemPorts []uint32, PonPort uint32,
- onuID uint32, uniID uint32) error {
-
- /* Update onu and uni id associated with the gem port to the kv store. */
- var IntfGEMPortPath string
- Data := []uint32{onuID, uniID}
- for _, GEM := range gemPorts {
- IntfGEMPortPath = fmt.Sprintf("%d,%d", PonPort, GEM)
- Val, err := json.Marshal(Data)
- if err != nil {
- logger.Error(ctx, "failed to Marshal")
- return err
- }
-
- if err = RsrcMgr.KVStore.Put(ctx, IntfGEMPortPath, Val); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", IntfGEMPortPath)
- return err
- }
- }
- return nil
-}
-
-// RemoveGEMportPonportToOnuMapOnKVStore removes the relationship between the gem port and pon port
-func (RsrcMgr *OpenOltResourceMgr) RemoveGEMportPonportToOnuMapOnKVStore(ctx context.Context, GemPort uint32, PonPort uint32) {
- IntfGEMPortPath := fmt.Sprintf("%d,%d", PonPort, GemPort)
- err := RsrcMgr.KVStore.Delete(ctx, IntfGEMPortPath)
- if err != nil {
- logger.Errorf(ctx, "Failed to Remove Gem port-Pon port to onu map on kv store. Gem %d PonPort %d", GemPort, PonPort)
- }
-}
-
-// GetGEMPortID gets gem port id for a particular pon port, onu id and uni id and then update the resource map on
-// the KV store with the list of gemport_id allocated for the pon_intf_onu_id tuple
-func (RsrcMgr *OpenOltResourceMgr) GetGEMPortID(ctx context.Context, ponPort uint32, onuID uint32,
- uniID uint32, NumOfPorts uint32) ([]uint32, error) {
-
- /* Get gem port id for a particular pon port, onu id
- and uni id.
- */
-
- var err error
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
-
- RsrcMgr.GemPortIDMgmtLock[ponPort].Lock()
- defer RsrcMgr.GemPortIDMgmtLock[ponPort].Unlock()
-
- GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
- if GEMPortList != nil {
- return GEMPortList, nil
- }
-
- GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ctx, ponPort,
- ponrmgr.GEMPORT_ID, NumOfPorts)
- if err != nil && GEMPortList == nil {
- logger.Errorf(ctx, "Failed to get gem port id for %s", IntfOnuIDUniID)
- return nil, err
- }
-
- // update the resource map on KV store with the list of gemport_id
- // allocated for the pon_intf_onu_id tuple
- err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(ctx, IntfOnuIDUniID,
- GEMPortList)
- if err != nil {
- logger.Errorf(ctx, "Failed to update GEM ports to kv store for %s", IntfOnuIDUniID)
- return nil, err
- }
- _ = RsrcMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, GEMPortList, ponPort,
- onuID, uniID)
- return GEMPortList, err
-}
-
// UpdateGEMPortIDsForOnu updates gemport ids on to the kv store for a given pon port, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) UpdateGEMPortIDsForOnu(ctx context.Context, ponPort uint32, onuID uint32,
- uniID uint32, GEMPortList []uint32) error {
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
- return RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(ctx, IntfOnuIDUniID,
- GEMPortList)
+func (rsrcMgr *OpenOltResourceMgr) UpdateGEMPortIDsForOnu(ctx context.Context, ponPort uint32, onuID uint32,
+ uniID uint32, gemIDs []uint32) error {
+ intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
+ // update cache
+ rsrcMgr.gemPortIDsForOnuLock.Lock()
+ rsrcMgr.gemPortIDsForOnu[intfOnuIDuniID] = gemIDs
+ rsrcMgr.gemPortIDsForOnuLock.Unlock()
+
+ // Note: in case the write to DB fails there could be inconsistent data between cache and db.
+ // Although this is highly unlikely with DB retries in place, this is something we have to deal with in the next release
+ return rsrcMgr.PonRsrMgr.UpdateGEMPortIDsForOnu(ctx, intfOnuIDuniID,
+ gemIDs)
}
// FreeonuID releases(make free) onu id for a particular pon-port
-func (RsrcMgr *OpenOltResourceMgr) FreeonuID(ctx context.Context, intfID uint32, onuID []uint32) {
+func (rsrcMgr *OpenOltResourceMgr) FreeonuID(ctx context.Context, intfID uint32, onuID []uint32) {
- RsrcMgr.OnuIDMgmtLock[intfID].Lock()
- defer RsrcMgr.OnuIDMgmtLock[intfID].Unlock()
-
- if err := RsrcMgr.ResourceMgrs[intfID].TechProfileMgr.FreeResourceID(ctx, intfID, ponrmgr.ONU_ID, onuID); err != nil {
+ if err := rsrcMgr.PonRsrMgr.TechProfileMgr.FreeResourceID(ctx, intfID, ponrmgr.ONU_ID, onuID); err != nil {
logger.Errorw(ctx, "error-while-freeing-onu-id", log.Fields{
"intf-id": intfID,
"onu-id": onuID,
@@ -859,23 +487,23 @@
var IntfonuID string
for _, onu := range onuID {
IntfonuID = fmt.Sprintf("%d,%d", intfID, onu)
- RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfonuID)
+ rsrcMgr.PonRsrMgr.RemoveResourceMap(ctx, IntfonuID)
}
}
// FreeAllocID frees AllocID on the PON resource pool and also frees the allocID association
// for the given OLT device.
-func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(ctx context.Context, IntfID uint32, onuID uint32,
+// The caller should ensure that this is a blocking call and this operation is serialized for
+// the ONU so as not cause resource corruption since there are no mutexes used here.
+func (rsrcMgr *OpenOltResourceMgr) FreeAllocID(ctx context.Context, intfID uint32, onuID uint32,
uniID uint32, allocID uint32) {
- RsrcMgr.AllocIDMgmtLock[IntfID].Lock()
- defer RsrcMgr.AllocIDMgmtLock[IntfID].Unlock()
- RsrcMgr.RemoveAllocIDForOnu(ctx, IntfID, onuID, uniID, allocID)
+ rsrcMgr.RemoveAllocIDForOnu(ctx, intfID, onuID, uniID, allocID)
allocIDs := make([]uint32, 0)
allocIDs = append(allocIDs, allocID)
- if err := RsrcMgr.ResourceMgrs[IntfID].TechProfileMgr.FreeResourceID(ctx, IntfID, ponrmgr.ALLOC_ID, allocIDs); err != nil {
+ if err := rsrcMgr.PonRsrMgr.TechProfileMgr.FreeResourceID(ctx, intfID, ponrmgr.ALLOC_ID, allocIDs); err != nil {
logger.Errorw(ctx, "error-while-freeing-alloc-id", log.Fields{
- "intf-id": IntfID,
+ "intf-id": intfID,
"onu-id": onuID,
"err": err.Error(),
})
@@ -884,33 +512,35 @@
// FreeGemPortID frees GemPortID on the PON resource pool and also frees the gemPortID association
// for the given OLT device.
-func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(ctx context.Context, IntfID uint32, onuID uint32,
+// The caller should ensure that this is a blocking call and this operation is serialized for
+// the ONU so as not cause resource corruption since there are no mutexes used here.
+func (rsrcMgr *OpenOltResourceMgr) FreeGemPortID(ctx context.Context, intfID uint32, onuID uint32,
uniID uint32, gemPortID uint32) {
- RsrcMgr.GemPortIDMgmtLock[IntfID].Lock()
- defer RsrcMgr.GemPortIDMgmtLock[IntfID].Unlock()
+ rsrcMgr.RemoveGemPortIDForOnu(ctx, intfID, onuID, uniID, gemPortID)
- RsrcMgr.RemoveGemPortIDForOnu(ctx, IntfID, onuID, uniID, gemPortID)
gemPortIDs := make([]uint32, 0)
gemPortIDs = append(gemPortIDs, gemPortID)
- if err := RsrcMgr.ResourceMgrs[IntfID].TechProfileMgr.FreeResourceID(ctx, IntfID, ponrmgr.GEMPORT_ID, gemPortIDs); err != nil {
+ if err := rsrcMgr.PonRsrMgr.TechProfileMgr.FreeResourceID(ctx, intfID, ponrmgr.GEMPORT_ID, gemPortIDs); err != nil {
logger.Errorw(ctx, "error-while-freeing-gem-port-id", log.Fields{
- "intf-id": IntfID,
+ "intf-id": intfID,
"onu-id": onuID,
"err": err.Error(),
})
}
}
-// FreePONResourcesForONU make the pon resources free for a given pon interface and onu id, and the clears the
-// resource map and the onuID associated with (pon_intf_id, gemport_id) tuple,
-func (RsrcMgr *OpenOltResourceMgr) FreePONResourcesForONU(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
+// FreePONResourcesForONU make the pon resources free for a given pon interface and onu id
+func (rsrcMgr *OpenOltResourceMgr) FreePONResourcesForONU(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) {
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
+ intfOnuIDuniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- RsrcMgr.AllocIDMgmtLock[intfID].Lock()
- AllocIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
+ AllocIDs := rsrcMgr.PonRsrMgr.GetCurrentAllocIDForOnu(ctx, intfOnuIDuniID)
- if err := RsrcMgr.ResourceMgrs[intfID].TechProfileMgr.FreeResourceID(ctx, intfID,
+ rsrcMgr.allocIDsForOnuLock.Lock()
+ delete(rsrcMgr.allocIDsForOnu, intfOnuIDuniID)
+ rsrcMgr.allocIDsForOnuLock.Unlock()
+
+ if err := rsrcMgr.PonRsrMgr.TechProfileMgr.FreeResourceID(ctx, intfID,
ponrmgr.ALLOC_ID,
AllocIDs); err != nil {
logger.Errorw(ctx, "error-while-freeing-all-alloc-ids-for-onu", log.Fields{
@@ -919,11 +549,14 @@
"err": err.Error(),
})
}
- RsrcMgr.AllocIDMgmtLock[intfID].Unlock()
- RsrcMgr.GemPortIDMgmtLock[intfID].Lock()
- GEMPortIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
- if err := RsrcMgr.ResourceMgrs[intfID].TechProfileMgr.FreeResourceID(ctx, intfID,
+ GEMPortIDs := rsrcMgr.PonRsrMgr.GetCurrentGEMPortIDsForOnu(ctx, intfOnuIDuniID)
+
+ rsrcMgr.gemPortIDsForOnuLock.Lock()
+ delete(rsrcMgr.gemPortIDsForOnu, intfOnuIDuniID)
+ rsrcMgr.gemPortIDsForOnuLock.Unlock()
+
+ if err := rsrcMgr.PonRsrMgr.TechProfileMgr.FreeResourceID(ctx, intfID,
ponrmgr.GEMPORT_ID,
GEMPortIDs); err != nil {
logger.Errorw(ctx, "error-while-freeing-all-gem-port-ids-for-onu", log.Fields{
@@ -932,28 +565,23 @@
"err": err.Error(),
})
}
- RsrcMgr.GemPortIDMgmtLock[intfID].Unlock()
// Clear resource map associated with (pon_intf_id, gemport_id) tuple.
- RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
- // Clear the ONU Id associated with the (pon_intf_id, gemport_id) tuple.
- for _, GEM := range GEMPortIDs {
- _ = RsrcMgr.KVStore.Delete(ctx, fmt.Sprintf("%d,%d", intfID, GEM))
- }
+ rsrcMgr.PonRsrMgr.RemoveResourceMap(ctx, intfOnuIDuniID)
}
// IsFlowOnKvStore checks if the given flowID is present on the kv store
// Returns true if the flowID is found, otherwise it returns false
-func (RsrcMgr *OpenOltResourceMgr) IsFlowOnKvStore(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
+func (rsrcMgr *OpenOltResourceMgr) IsFlowOnKvStore(ctx context.Context, intfID uint32, onuID int32, uniID int32,
flowID uint64) bool {
- FlowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, ponIntfID, onuID, uniID)
+ FlowIDs, err := rsrcMgr.GetCurrentFlowIDsForOnu(ctx, intfID, onuID, uniID)
if err != nil {
// error logged in the called function
return false
}
if FlowIDs != nil {
- logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "onuID": onuID, "uniID": uniID})
+ logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
for _, id := range FlowIDs {
if flowID == id {
return true
@@ -964,89 +592,117 @@
}
// GetTechProfileIDForOnu fetches Tech-Profile-ID from the KV-Store for the given onu based on the path
-// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) GetTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32) []uint32 {
- Path := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
- var Data []uint32
- Value, err := RsrcMgr.KVStore.Get(ctx, Path)
+// This path is formed as the following: {intfID, onuID, uniID}/tp_id
+func (rsrcMgr *OpenOltResourceMgr) GetTechProfileIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) []uint32 {
+ Path := fmt.Sprintf(tpIDPathSuffix, intfID, onuID, uniID)
+ // fetch from cache
+ rsrcMgr.techProfileIDsForOnuLock.RLock()
+ tpIDs, ok := rsrcMgr.techProfileIDsForOnu[Path]
+ rsrcMgr.techProfileIDsForOnuLock.RUnlock()
+ if ok {
+ return tpIDs
+ }
+ Value, err := rsrcMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
Val, err := kvstore.ToByte(Value.Value)
if err != nil {
- logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"error": err})
- return Data
+ logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"err": err})
+ return tpIDs
}
- if err = json.Unmarshal(Val, &Data); err != nil {
- logger.Error(ctx, "Failed to unmarshal", log.Fields{"error": err})
- return Data
+ if err = json.Unmarshal(Val, &tpIDs); err != nil {
+ logger.Error(ctx, "Failed to unmarshal", log.Fields{"err": err})
+ return tpIDs
}
}
} else {
logger.Errorf(ctx, "Failed to get TP id from kvstore for path %s", Path)
}
- logger.Debugf(ctx, "Getting TP id %d from path %s", Data, Path)
- return Data
+ logger.Debugf(ctx, "Getting TP id %d from path %s", tpIDs, Path)
+
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ rsrcMgr.techProfileIDsForOnu[Path] = tpIDs
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
+
+ return tpIDs
}
// RemoveTechProfileIDsForOnu deletes all tech profile ids from the KV-Store for the given onu based on the path
-// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDsForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32) error {
- IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
- if err := RsrcMgr.KVStore.Delete(ctx, IntfOnuUniID); err != nil {
- logger.Errorw(ctx, "Failed to delete techprofile id resource in KV store", log.Fields{"path": IntfOnuUniID})
+// This path is formed as the following: {intfID, onuID, uniID}/tp_id
+func (rsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDsForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32) error {
+ intfOnuUniID := fmt.Sprintf(tpIDPathSuffix, intfID, onuID, uniID)
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ delete(rsrcMgr.techProfileIDsForOnu, intfOnuUniID)
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
+
+ if err := rsrcMgr.KVStore.Delete(ctx, intfOnuUniID); err != nil {
+ logger.Errorw(ctx, "Failed to delete techprofile id resource in KV store", log.Fields{"path": intfOnuUniID})
return err
}
return nil
}
// RemoveTechProfileIDForOnu deletes a specific tech profile id from the KV-Store for the given onu based on the path
-// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32, UniID uint32, TpID uint32) error {
- tpIDList := RsrcMgr.GetTechProfileIDForOnu(ctx, IntfID, OnuID, UniID)
+// This path is formed as the following: {intfID, onuID, uniID}/tp_id
+func (rsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDForOnu(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, tpID uint32) error {
+ tpIDList := rsrcMgr.GetTechProfileIDForOnu(ctx, intfID, onuID, uniID)
for i, tpIDInList := range tpIDList {
- if tpIDInList == TpID {
+ if tpIDInList == tpID {
tpIDList = append(tpIDList[:i], tpIDList[i+1:]...)
}
}
- IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
+ intfOnuUniID := fmt.Sprintf(tpIDPathSuffix, intfID, onuID, uniID)
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ rsrcMgr.techProfileIDsForOnu[intfOnuUniID] = tpIDList
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
+
Value, err := json.Marshal(tpIDList)
if err != nil {
logger.Error(ctx, "failed to Marshal")
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", IntfOnuUniID)
+ if err = rsrcMgr.KVStore.Put(ctx, intfOnuUniID, Value); err != nil {
+ logger.Errorf(ctx, "Failed to update resource %s", intfOnuUniID)
return err
}
return err
}
// UpdateTechProfileIDForOnu updates (put) already present tech-profile-id for the given onu based on the path
-// This path is formed as the following: {IntfID, OnuID, UniID}/tp_id
-func (RsrcMgr *OpenOltResourceMgr) UpdateTechProfileIDForOnu(ctx context.Context, IntfID uint32, OnuID uint32,
- UniID uint32, TpID uint32) error {
+// This path is formed as the following: {intfID, onuID, uniID}/tp_id
+func (rsrcMgr *OpenOltResourceMgr) UpdateTechProfileIDForOnu(ctx context.Context, intfID uint32, onuID uint32,
+ uniID uint32, tpID uint32) error {
var Value []byte
var err error
- IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
+ intfOnuUniID := fmt.Sprintf(tpIDPathSuffix, intfID, onuID, uniID)
- tpIDList := RsrcMgr.GetTechProfileIDForOnu(ctx, IntfID, OnuID, UniID)
+ tpIDList := rsrcMgr.GetTechProfileIDForOnu(ctx, intfID, onuID, uniID)
for _, value := range tpIDList {
- if value == TpID {
- logger.Debugf(ctx, "TpID %d is already in tpIdList for the path %s", TpID, IntfOnuUniID)
+ if value == tpID {
+ logger.Debugf(ctx, "tpID %d is already in tpIdList for the path %s", tpID, intfOnuUniID)
return err
}
}
- logger.Debugf(ctx, "updating tp id %d on path %s", TpID, IntfOnuUniID)
- tpIDList = append(tpIDList, TpID)
+ logger.Debugf(ctx, "updating tp id %d on path %s", tpID, intfOnuUniID)
+ tpIDList = append(tpIDList, tpID)
+
+ // update cache
+ rsrcMgr.techProfileIDsForOnuLock.Lock()
+ rsrcMgr.techProfileIDsForOnu[intfOnuUniID] = tpIDList
+ rsrcMgr.techProfileIDsForOnuLock.Unlock()
+
Value, err = json.Marshal(tpIDList)
if err != nil {
logger.Error(ctx, "failed to Marshal")
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", IntfOnuUniID)
+ if err = rsrcMgr.KVStore.Put(ctx, intfOnuUniID, Value); err != nil {
+ logger.Errorf(ctx, "Failed to update resource %s", intfOnuUniID)
return err
}
return err
@@ -1054,41 +710,56 @@
// StoreMeterInfoForOnu updates the meter id in the KV-Store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) StoreMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
- UniID uint32, TpID uint32, meterInfo *MeterInfo) error {
+func (rsrcMgr *OpenOltResourceMgr) StoreMeterInfoForOnu(ctx context.Context, Direction string, intfID uint32, onuID uint32,
+ uniID uint32, tpID uint32, meterInfo *MeterInfo) error {
var Value []byte
var err error
- IntfOnuUniID := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
+ intfOnuUniID := fmt.Sprintf(MeterIDPathSuffix, intfID, onuID, uniID, tpID, Direction)
+
+ // update cache
+ rsrcMgr.meterInfoForOnuLock.Lock()
+ rsrcMgr.meterInfoForOnu[intfOnuUniID] = meterInfo
+ rsrcMgr.meterInfoForOnuLock.Unlock()
+
Value, err = json.Marshal(*meterInfo)
if err != nil {
logger.Error(ctx, "failed to Marshal meter config")
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, IntfOnuUniID, Value); err != nil {
- logger.Errorf(ctx, "Failed to store meter into KV store %s", IntfOnuUniID)
+ if err = rsrcMgr.KVStore.Put(ctx, intfOnuUniID, Value); err != nil {
+ logger.Errorf(ctx, "Failed to store meter into KV store %s", intfOnuUniID)
return err
}
- logger.Debugw(ctx, "meter info updated successfully", log.Fields{"path": IntfOnuUniID, "meter-info": meterInfo})
+ logger.Debugw(ctx, "meter info updated successfully", log.Fields{"path": intfOnuUniID, "meter-info": meterInfo})
return err
}
// GetMeterInfoForOnu fetches the meter id from the kv store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) GetMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
- UniID uint32, TpID uint32) (*MeterInfo, error) {
- Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
+func (rsrcMgr *OpenOltResourceMgr) GetMeterInfoForOnu(ctx context.Context, Direction string, intfID uint32, onuID uint32,
+ uniID uint32, tpID uint32) (*MeterInfo, error) {
+ Path := fmt.Sprintf(MeterIDPathSuffix, intfID, onuID, uniID, tpID, Direction)
+
+ // get from cache
+ rsrcMgr.meterInfoForOnuLock.RLock()
+ val, ok := rsrcMgr.meterInfoForOnu[Path]
+ rsrcMgr.meterInfoForOnuLock.RUnlock()
+ if ok {
+ return val, nil
+ }
+
var meterInfo MeterInfo
- Value, err := RsrcMgr.KVStore.Get(ctx, Path)
+ Value, err := rsrcMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
logger.Debug(ctx, "Found meter info in KV store", log.Fields{"Direction": Direction})
Val, er := kvstore.ToByte(Value.Value)
if er != nil {
- logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"error": er})
+ logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"err": er})
return nil, er
}
if er = json.Unmarshal(Val, &meterInfo); er != nil {
- logger.Error(ctx, "Failed to unmarshal meter info", log.Fields{"error": er})
+ logger.Error(ctx, "Failed to unmarshal meter info", log.Fields{"err": er})
return nil, er
}
} else {
@@ -1099,25 +770,30 @@
logger.Errorf(ctx, "Failed to get Meter config from kvstore for path %s", Path)
}
+ // update cache
+ rsrcMgr.meterInfoForOnuLock.Lock()
+ rsrcMgr.meterInfoForOnu[Path] = &meterInfo
+ rsrcMgr.meterInfoForOnuLock.Unlock()
+
return &meterInfo, err
}
// HandleMeterInfoRefCntUpdate increments or decrements the reference counter for a given meter.
// When reference count becomes 0, it clears the meter information from the kv store
-func (RsrcMgr *OpenOltResourceMgr) HandleMeterInfoRefCntUpdate(ctx context.Context, Direction string,
- IntfID uint32, OnuID uint32, UniID uint32, TpID uint32, increment bool) error {
- meterInfo, err := RsrcMgr.GetMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID)
+func (rsrcMgr *OpenOltResourceMgr) HandleMeterInfoRefCntUpdate(ctx context.Context, Direction string,
+ intfID uint32, onuID uint32, uniID uint32, tpID uint32, increment bool) error {
+ meterInfo, err := rsrcMgr.GetMeterInfoForOnu(ctx, Direction, intfID, onuID, uniID, tpID)
if err != nil {
return err
} else if meterInfo == nil {
// If we are increasing the reference count, we expect the meter information to be present on KV store.
// But if decrementing the reference count, the meter is possibly already cleared from KV store. Just log warn but do not return error.
if increment {
- logger.Errorf(ctx, "error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
- return fmt.Errorf("error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
+ logger.Errorf(ctx, "error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", intfID, onuID, uniID, tpID, Direction)
+ return fmt.Errorf("error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", intfID, onuID, uniID, tpID, Direction)
}
logger.Warnw(ctx, "meter is already cleared",
- log.Fields{"intfID": IntfID, "onuID": OnuID, "uniID": UniID, "direction": Direction, "increment": increment})
+ log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID, "direction": Direction, "increment": increment})
return nil
}
@@ -1127,13 +803,13 @@
meterInfo.RefCnt--
// If RefCnt become 0 clear the meter information from the DB.
if meterInfo.RefCnt == 0 {
- if err := RsrcMgr.RemoveMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID); err != nil {
+ if err := rsrcMgr.RemoveMeterInfoForOnu(ctx, Direction, intfID, onuID, uniID, tpID); err != nil {
return err
}
return nil
}
}
- if err := RsrcMgr.StoreMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID, meterInfo); err != nil {
+ if err := rsrcMgr.StoreMeterInfoForOnu(ctx, Direction, intfID, onuID, uniID, tpID, meterInfo); err != nil {
return err
}
return nil
@@ -1141,44 +817,44 @@
// RemoveMeterInfoForOnu deletes the meter id from the kV-Store for the given onu based on the path
// This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) RemoveMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
- UniID uint32, TpID uint32) error {
- Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
- if err := RsrcMgr.KVStore.Delete(ctx, Path); err != nil {
+func (rsrcMgr *OpenOltResourceMgr) RemoveMeterInfoForOnu(ctx context.Context, Direction string, intfID uint32, onuID uint32,
+ uniID uint32, tpID uint32) error {
+ Path := fmt.Sprintf(MeterIDPathSuffix, intfID, onuID, uniID, tpID, Direction)
+
+ // update cache
+ rsrcMgr.meterInfoForOnuLock.Lock()
+ delete(rsrcMgr.meterInfoForOnu, Path)
+ rsrcMgr.meterInfoForOnuLock.Unlock()
+
+ if err := rsrcMgr.KVStore.Delete(ctx, Path); err != nil {
logger.Errorf(ctx, "Failed to delete meter id %s from kvstore ", Path)
return err
}
return nil
}
-//AddGemToOnuGemInfo adds gemport to onugem info kvstore
-func (RsrcMgr *OpenOltResourceMgr) AddGemToOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) error {
- var onuGemData []OnuGemInfo
- var err error
-
- if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
+//AddGemToOnuGemInfo adds gemport to onugem info kvstore and also local cache
+func (rsrcMgr *OpenOltResourceMgr) AddGemToOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) error {
+ onugem, err := rsrcMgr.GetOnuGemInfo(ctx, intfID, onuID)
+ if err != nil || onugem == nil || onugem.SerialNumber == "" {
logger.Errorf(ctx, "failed to get onuifo for intfid %d", intfID)
return err
}
- if len(onuGemData) == 0 {
- logger.Errorw(ctx, "failed to ger Onuid info ", log.Fields{"intfid": intfID, "onuid": onuID})
- return err
+ if onugem.OnuID == onuID {
+ for _, gem := range onugem.GemPorts {
+ if gem == gemPort {
+ logger.Debugw(ctx, "Gem already present in onugem info, skpping addition", log.Fields{"gem": gem})
+ return nil
+ }
+ }
+ logger.Debugw(ctx, "Added gem to onugem info", log.Fields{"gem": gemPort})
+ onugem.GemPorts = append(onugem.GemPorts, gemPort)
+ } else {
+ logger.Errorw(ctx, "onu id in OnuGemInfo does not match", log.Fields{"onuID": onuID, "ponIf": intfID, "onuGemInfoOnuID": onugem.OnuID})
+ return fmt.Errorf("onu-id-in-OnuGemInfo-does-not-match-%v", onuID)
}
- for idx, onugem := range onuGemData {
- if onugem.OnuID == onuID {
- for _, gem := range onuGemData[idx].GemPorts {
- if gem == gemPort {
- logger.Debugw(ctx, "Gem already present in onugem info, skpping addition", log.Fields{"gem": gem})
- return nil
- }
- }
- logger.Debugw(ctx, "Added gem to onugem info", log.Fields{"gem": gemPort})
- onuGemData[idx].GemPorts = append(onuGemData[idx].GemPorts, gemPort)
- break
- }
- }
- err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(ctx, intfID, onuGemData)
+ err = rsrcMgr.AddOnuGemInfo(ctx, intfID, onuID, *onugem)
if err != nil {
logger.Error(ctx, "Failed to add onugem to kv store")
return err
@@ -1186,78 +862,161 @@
return err
}
-//GetOnuGemInfo gets onu gem info from the kvstore per interface
-func (RsrcMgr *OpenOltResourceMgr) GetOnuGemInfo(ctx context.Context, IntfID uint32) ([]OnuGemInfo, error) {
- var onuGemData []OnuGemInfo
+//RemoveGemFromOnuGemInfo removes gemport from onugem info on kvstore and also local cache
+func (rsrcMgr *OpenOltResourceMgr) RemoveGemFromOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) error {
+ onugem, err := rsrcMgr.GetOnuGemInfo(ctx, intfID, onuID)
+ if err != nil || onugem == nil || onugem.SerialNumber == "" {
+ logger.Errorf(ctx, "failed to get onuifo for intfid %d", intfID)
+ return err
+ }
+ updated := false
+ if onugem.OnuID == onuID {
+ for i, gem := range onugem.GemPorts {
+ if gem == gemPort {
+ logger.Debugw(ctx, "Gem found, removing from onu gem info", log.Fields{"gem": gem})
+ onugem.GemPorts = append(onugem.GemPorts[:i], onugem.GemPorts[i+1:]...)
+ updated = true
+ break
+ }
+ }
+ } else {
+ logger.Errorw(ctx, "onu id in OnuGemInfo does not match", log.Fields{"onuID": onuID, "ponIf": intfID, "onuGemInfoOnuID": onugem.OnuID})
+ return fmt.Errorf("onu-id-in-OnuGemInfo-does-not-match-%v", onuID)
+ }
+ if updated {
+ err = rsrcMgr.AddOnuGemInfo(ctx, intfID, onuID, *onugem)
+ if err != nil {
+ logger.Error(ctx, "Failed to add onugem to kv store")
+ return err
+ }
+ } else {
+ logger.Debugw(ctx, "Gem port not found in onu gem info", log.Fields{"gem": gemPort})
+ }
+ return nil
+}
- if err := RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(ctx, IntfID, &onuGemData); err != nil {
- logger.Errorf(ctx, "failed to get onuifo for intfid %d", IntfID)
+//GetOnuGemInfo gets onu gem info from the kvstore per interface
+func (rsrcMgr *OpenOltResourceMgr) GetOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32) (*OnuGemInfo, error) {
+ var err error
+ var Val []byte
+ var onugem OnuGemInfo
+
+ path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
+
+ rsrcMgr.onuGemInfoLock.RLock()
+ val, ok := rsrcMgr.onuGemInfo[path]
+ rsrcMgr.onuGemInfoLock.RUnlock()
+ if ok {
+ return val, nil
+ }
+ value, err := rsrcMgr.KVStore.Get(ctx, path)
+ if err != nil {
+ logger.Errorw(ctx, "Failed to get from kv store", log.Fields{"path": path})
+ return nil, err
+ } else if value == nil {
+ logger.Debug(ctx, "No onuinfo for path", log.Fields{"path": path})
+ return nil, nil // returning nil as this could happen if there are no onus for the interface yet
+ }
+ if Val, err = kvstore.ToByte(value.Value); err != nil {
+ logger.Error(ctx, "Failed to convert to byte array")
return nil, err
}
- return onuGemData, nil
+ if err = json.Unmarshal(Val, &onugem); err != nil {
+ logger.Error(ctx, "Failed to unmarshall")
+ return nil, err
+ }
+ logger.Debugw(ctx, "found onugem info from path", log.Fields{"path": path, "onuGemInfo": onugem})
+ rsrcMgr.onuGemInfoLock.Lock()
+ rsrcMgr.onuGemInfo[path] = &onugem
+ rsrcMgr.onuGemInfoLock.Unlock()
+
+ return &onugem, nil
}
// AddOnuGemInfo adds onu info on to the kvstore per interface
-func (RsrcMgr *OpenOltResourceMgr) AddOnuGemInfo(ctx context.Context, IntfID uint32, onuGem OnuGemInfo) error {
- var onuGemData []OnuGemInfo
+func (rsrcMgr *OpenOltResourceMgr) AddOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, onuGem OnuGemInfo) error {
+
+ var Value []byte
var err error
+ Path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
- if err = RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(ctx, IntfID, &onuGemData); err != nil {
- logger.Errorf(ctx, "failed to get onuifo for intfid %d", IntfID)
- return olterrors.NewErrPersistence("get", "OnuGemInfo", uint64(IntfID),
- log.Fields{"onuGem": onuGem, "intfID": IntfID}, err)
- }
- onuGemData = append(onuGemData, onuGem)
- err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(ctx, IntfID, onuGemData)
+ rsrcMgr.onuGemInfoLock.Lock()
+ rsrcMgr.onuGemInfo[Path] = &onuGem
+ rsrcMgr.onuGemInfoLock.Unlock()
+
+ Value, err = json.Marshal(onuGem)
if err != nil {
- logger.Error(ctx, "Failed to add onugem to kv store")
- return olterrors.NewErrPersistence("set", "OnuGemInfo", uint64(IntfID),
- log.Fields{"onuGemData": onuGemData, "intfID": IntfID}, err)
+ logger.Error(ctx, "failed to Marshal")
+ return err
}
- logger.Debugw(ctx, "added onu to onugeminfo", log.Fields{"intf": IntfID, "onugem": onuGem})
+ if err = rsrcMgr.KVStore.Put(ctx, Path, Value); err != nil {
+ logger.Errorf(ctx, "Failed to update resource %s", Path)
+ return err
+ }
+ logger.Debugw(ctx, "added onu gem info", log.Fields{"onuGemInfo": onuGem})
+ return err
+}
+
+// DelOnuGemInfo deletes the onugem info from kvstore per ONU
+func (rsrcMgr *OpenOltResourceMgr) DelOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32) error {
+ path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
+ rsrcMgr.onuGemInfoLock.Lock()
+ logger.Debugw(ctx, "removing onu gem info", log.Fields{"onuGemInfo": rsrcMgr.onuGemInfo[path]})
+ delete(rsrcMgr.onuGemInfo, path)
+ rsrcMgr.onuGemInfoLock.Unlock()
+
+ if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ logger.Errorf(ctx, "failed to remove resource %s", path)
+ return err
+ }
return nil
}
// AddUniPortToOnuInfo adds uni port to the onuinfo kvstore. check if the uni is already present if not update the kv store.
-func (RsrcMgr *OpenOltResourceMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNo uint32) {
- var onuGemData []OnuGemInfo
- var err error
+func (rsrcMgr *OpenOltResourceMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNo uint32) {
- if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(ctx, intfID, &onuGemData); err != nil {
- logger.Errorf(ctx, "failed to get onuifo for intfid %d", intfID)
+ onugem, err := rsrcMgr.GetOnuGemInfo(ctx, intfID, onuID)
+ if err != nil || onugem == nil || onugem.SerialNumber == "" {
+ logger.Warnf(ctx, "failed to get onuifo for intfid %d", intfID)
return
}
- for idx, onu := range onuGemData {
- if onu.OnuID == onuID {
- for _, uni := range onu.UniPorts {
- if uni == portNo {
- logger.Debugw(ctx, "uni already present in onugem info", log.Fields{"uni": portNo})
- return
- }
+
+ if onugem.OnuID == onuID {
+ for _, uni := range onugem.UniPorts {
+ if uni == portNo {
+ logger.Debugw(ctx, "uni already present in onugem info", log.Fields{"uni": portNo})
+ return
}
- onuGemData[idx].UniPorts = append(onuGemData[idx].UniPorts, portNo)
- break
}
+ onugem.UniPorts = append(onugem.UniPorts, portNo)
+ } else {
+ logger.Warnw(ctx, "onu id mismatch in onu gem info", log.Fields{"intfID": intfID, "onuID": onuID})
+ return
}
- err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(ctx, intfID, onuGemData)
+ err = rsrcMgr.AddOnuGemInfo(ctx, intfID, onuID, *onugem)
if err != nil {
- logger.Errorw(ctx, "Failed to add uin port in onugem to kv store", log.Fields{"uni": portNo})
+ logger.Errorw(ctx, "Failed to add uni port in onugem to kv store", log.Fields{"uni": portNo})
return
}
}
//UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno, vlan id, priority bit
-func (RsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
+func (rsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(ctx context.Context, pktIn PacketInInfoKey, gemPort uint32) {
path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort, pktIn.VlanID, pktIn.Priority)
+ // update cache
+ rsrcMgr.gemPortForPacketInInfoLock.Lock()
+ rsrcMgr.gemPortForPacketInInfo[path] = gemPort
+ rsrcMgr.gemPortForPacketInInfoLock.Unlock()
+
Value, err := json.Marshal(gemPort)
if err != nil {
logger.Error(ctx, "Failed to marshal data")
return
}
- if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
+ if err = rsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"path": path, "value": gemPort})
return
}
@@ -1265,15 +1024,22 @@
}
// GetGemPortFromOnuPktIn gets the gem port from onu pkt in path, path being intfid, onuid, portno, vlan id, priority bit
-func (RsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(ctx context.Context, packetInInfoKey PacketInInfoKey) (uint32, error) {
+func (rsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(ctx context.Context, packetInInfoKey PacketInInfoKey) (uint32, error) {
var Val []byte
- var gemPort uint32
path := fmt.Sprintf(OnuPacketINPath, packetInInfoKey.IntfID, packetInInfoKey.OnuID, packetInInfoKey.LogicalPort,
packetInInfoKey.VlanID, packetInInfoKey.Priority)
+ // get from cache
+ rsrcMgr.gemPortForPacketInInfoLock.RLock()
+ gemPort, ok := rsrcMgr.gemPortForPacketInInfo[path]
+ rsrcMgr.gemPortForPacketInInfoLock.RUnlock()
+ if ok {
+ logger.Debugw(ctx, "found packein gemport from path", log.Fields{"path": path, "gem": gemPort})
+ return gemPort, nil
+ }
- value, err := RsrcMgr.KVStore.Get(ctx, path)
+ value, err := rsrcMgr.KVStore.Get(ctx, path)
if err != nil {
logger.Errorw(ctx, "Failed to get from kv store", log.Fields{"path": path})
return uint32(0), err
@@ -1291,15 +1057,20 @@
return uint32(0), err
}
logger.Debugw(ctx, "found packein gemport from path", log.Fields{"path": path, "gem": gemPort})
+ // update cache
+ rsrcMgr.gemPortForPacketInInfoLock.Lock()
+ rsrcMgr.gemPortForPacketInInfo[path] = gemPort
+ rsrcMgr.gemPortForPacketInInfoLock.Unlock()
return gemPort, nil
}
//DeletePacketInGemPortForOnu deletes the packet-in gemport for ONU
-func (RsrcMgr *OpenOltResourceMgr) DeletePacketInGemPortForOnu(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
+func (rsrcMgr *OpenOltResourceMgr) DeletePacketInGemPortForOnu(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
- value, err := RsrcMgr.KVStore.List(ctx, path)
+
+ value, err := rsrcMgr.KVStore.List(ctx, path)
if err != nil {
logger.Errorf(ctx, "failed-to-read-value-from-path-%s", path)
return errors.New("failed-to-read-value-from-path-" + path)
@@ -1308,12 +1079,16 @@
//remove them one by one
for key := range value {
// Formulate the right key path suffix ti be delete
- stringToBeReplaced := fmt.Sprintf(BasePathKvStore, RsrcMgr.KVStore.PathPrefix, RsrcMgr.DeviceID) + "/"
+ stringToBeReplaced := fmt.Sprintf(BasePathKvStore, rsrcMgr.KVStore.PathPrefix, rsrcMgr.DeviceID) + "/"
replacedWith := ""
key = strings.Replace(key, stringToBeReplaced, replacedWith, 1)
+ // update cache
+ rsrcMgr.gemPortForPacketInInfoLock.Lock()
+ delete(rsrcMgr.gemPortForPacketInInfo, key)
+ rsrcMgr.gemPortForPacketInInfoLock.Unlock()
logger.Debugf(ctx, "removing-key-%s", key)
- if err := RsrcMgr.KVStore.Delete(ctx, key); err != nil {
+ if err := rsrcMgr.KVStore.Delete(ctx, key); err != nil {
logger.Errorf(ctx, "failed-to-remove-resource-%s", key)
return err
}
@@ -1322,222 +1097,156 @@
return nil
}
-// DelOnuGemInfoForIntf deletes the onugem info from kvstore per interface
-func (RsrcMgr *OpenOltResourceMgr) DelOnuGemInfoForIntf(ctx context.Context, intfID uint32) error {
- if err := RsrcMgr.ResourceMgrs[intfID].DelOnuGemInfoForIntf(ctx, intfID); err != nil {
- logger.Errorw(ctx, "failed to delete onu gem info for", log.Fields{"intfid": intfID})
- return err
+//GetFlowIDsForGem gets the list of FlowIDs for the given gemport
+func (rsrcMgr *OpenOltResourceMgr) GetFlowIDsForGem(ctx context.Context, intf uint32, gem uint32) ([]uint64, error) {
+ path := fmt.Sprintf(FlowIDsForGem, intf, gem)
+
+ // get from cache
+ rsrcMgr.flowIDsForGemLock.RLock()
+ flowIDs, ok := rsrcMgr.flowIDsForGem[gem]
+ rsrcMgr.flowIDsForGemLock.RUnlock()
+ if ok {
+ return flowIDs, nil
}
- return nil
-}
-//GetNNIFromKVStore gets NNi intfids from kvstore. path being per device
-func (RsrcMgr *OpenOltResourceMgr) GetNNIFromKVStore(ctx context.Context) ([]uint32, error) {
-
- var nni []uint32
- var Val []byte
-
- path := NnniIntfID
- value, err := RsrcMgr.KVStore.Get(ctx, path)
+ value, err := rsrcMgr.KVStore.Get(ctx, path)
if err != nil {
- logger.Error(ctx, "failed to get data from kv store")
+ logger.Errorw(ctx, "Failed to get from kv store", log.Fields{"path": path})
+ return nil, err
+ } else if value == nil {
+ logger.Debug(ctx, "no flow-ids found", log.Fields{"path": path})
+ return nil, nil
+ }
+ Val, err := kvstore.ToByte(value.Value)
+ if err != nil {
+ logger.Error(ctx, "Failed to convert to byte array")
return nil, err
}
- if value != nil {
- if Val, err = kvstore.ToByte(value.Value); err != nil {
- logger.Error(ctx, "Failed to convert to byte array")
- return nil, err
- }
- if err = json.Unmarshal(Val, &nni); err != nil {
- logger.Error(ctx, "Failed to unmarshall")
- return nil, err
- }
- }
- return nni, err
-}
-// AddNNIToKVStore adds Nni interfaces to kvstore, path being per device.
-func (RsrcMgr *OpenOltResourceMgr) AddNNIToKVStore(ctx context.Context, nniIntf uint32) error {
- var Value []byte
-
- nni, err := RsrcMgr.GetNNIFromKVStore(ctx)
- if err != nil {
- logger.Error(ctx, "failed to fetch nni interfaces from kv store")
- return err
+ if err = json.Unmarshal(Val, &flowIDs); err != nil {
+ logger.Error(ctx, "Failed to unmarshall")
+ return nil, err
}
- path := NnniIntfID
- nni = append(nni, nniIntf)
- Value, err = json.Marshal(nni)
- if err != nil {
- logger.Error(ctx, "Failed to marshal data")
- }
- if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
- logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"path": path, "value": Value})
- return err
- }
- logger.Debugw(ctx, "added nni to kv successfully", log.Fields{"path": path, "nni": nniIntf})
- return nil
-}
+ // update cache
+ rsrcMgr.flowIDsForGemLock.Lock()
+ rsrcMgr.flowIDsForGem[gem] = flowIDs
+ rsrcMgr.flowIDsForGemLock.Unlock()
-// DelNNiFromKVStore deletes nni interface list from kv store.
-func (RsrcMgr *OpenOltResourceMgr) DelNNiFromKVStore(ctx context.Context) error {
-
- path := NnniIntfID
-
- if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorw(ctx, "Failed to delete nni interfaces from kv store", log.Fields{"path": path})
- return err
- }
- return nil
+ return flowIDs, nil
}
//UpdateFlowIDsForGem updates flow id per gemport
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint64) error {
+func (rsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint64) error {
var val []byte
- path := fmt.Sprintf(FlowIDsForGem, intf)
+ path := fmt.Sprintf(FlowIDsForGem, intf, gem)
- flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(ctx, intf)
- if err != nil {
- logger.Error(ctx, "Failed to ger flowids for interface", log.Fields{"error": err, "intf": intf})
- return err
+ // update cache
+ rsrcMgr.flowIDsForGemLock.Lock()
+ rsrcMgr.flowIDsForGem[gem] = flowIDs
+ rsrcMgr.flowIDsForGemLock.Unlock()
+
+ if flowIDs == nil {
+ return nil
}
- if flowsForGem == nil {
- flowsForGem = make(map[uint32][]uint64)
- }
- flowsForGem[gem] = flowIDs
- val, err = json.Marshal(flowsForGem)
+ val, err := json.Marshal(flowIDs)
if err != nil {
- logger.Error(ctx, "Failed to marshal data", log.Fields{"error": err})
+ logger.Error(ctx, "Failed to marshal data", log.Fields{"err": err})
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
- logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
+ if err = rsrcMgr.KVStore.Put(ctx, path, val); err != nil {
+ logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"err": err, "path": path, "value": val})
return err
}
- logger.Debugw(ctx, "added flowid list for gem to kv successfully", log.Fields{"path": path, "flowidlist": flowsForGem[gem]})
+ logger.Debugw(ctx, "added flowid list for gem to kv successfully", log.Fields{"path": path, "flowidlist": flowIDs})
return nil
}
//DeleteFlowIDsForGem deletes the flowID list entry per gem from kvstore.
-func (RsrcMgr *OpenOltResourceMgr) DeleteFlowIDsForGem(ctx context.Context, intf uint32, gem uint32) {
- path := fmt.Sprintf(FlowIDsForGem, intf)
- var val []byte
-
- flowsForGem, err := RsrcMgr.GetFlowIDsGemMapForInterface(ctx, intf)
- if err != nil {
- logger.Error(ctx, "Failed to ger flowids for interface", log.Fields{"error": err, "intf": intf})
- return
+func (rsrcMgr *OpenOltResourceMgr) DeleteFlowIDsForGem(ctx context.Context, intf uint32, gem uint32) {
+ path := fmt.Sprintf(FlowIDsForGem, intf, gem)
+ // update cache
+ rsrcMgr.flowIDsForGemLock.Lock()
+ delete(rsrcMgr.flowIDsForGem, gem)
+ rsrcMgr.flowIDsForGemLock.Unlock()
+ if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ logger.Errorw(ctx, "Failed to delete from kvstore", log.Fields{"err": err, "path": path})
}
- if flowsForGem == nil {
- logger.Error(ctx, "No flowids found ", log.Fields{"intf": intf, "gemport": gem})
- return
- }
- // once we get the flows per gem map from kv , just delete the gem entry from the map
- delete(flowsForGem, gem)
- // once gem entry is deleted update the kv store.
- val, err = json.Marshal(flowsForGem)
- if err != nil {
- logger.Error(ctx, "Failed to marshal data", log.Fields{"error": err})
- return
- }
-
- if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
- logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
- }
-}
-
-//GetFlowIDsGemMapForInterface gets flowids per gemport and interface
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDsGemMapForInterface(ctx context.Context, intf uint32) (map[uint32][]uint64, error) {
- path := fmt.Sprintf(FlowIDsForGem, intf)
- var flowsForGem map[uint32][]uint64
- var val []byte
- value, err := RsrcMgr.KVStore.Get(ctx, path)
- if err != nil {
- logger.Error(ctx, "failed to get data from kv store")
- return nil, err
- }
- if value != nil && value.Value != nil {
- if val, err = kvstore.ToByte(value.Value); err != nil {
- logger.Error(ctx, "Failed to convert to byte array ", log.Fields{"error": err})
- return nil, err
- }
- if err = json.Unmarshal(val, &flowsForGem); err != nil {
- logger.Error(ctx, "Failed to unmarshall", log.Fields{"error": err})
- return nil, err
- }
- }
- return flowsForGem, nil
-}
-
-//DeleteIntfIDGempMapPath deletes the intf id path used to store flow ids per gem to kvstore.
-func (RsrcMgr *OpenOltResourceMgr) DeleteIntfIDGempMapPath(ctx context.Context, intf uint32) {
- path := fmt.Sprintf(FlowIDsForGem, intf)
-
- if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorw(ctx, "Failed to delete nni interfaces from kv store", log.Fields{"path": path})
- }
-}
-
-// RemoveResourceMap Clear resource map associated with (intfid, onuid, uniid) tuple.
-func (RsrcMgr *OpenOltResourceMgr) RemoveResourceMap(ctx context.Context, intfID uint32, onuID int32, uniID int32) {
- IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
}
//GetMcastQueuePerInterfaceMap gets multicast queue info per pon interface
-func (RsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap(ctx context.Context) (map[uint32][]uint32, error) {
+func (rsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap(ctx context.Context) (map[uint32][]uint32, error) {
path := McastQueuesForIntf
- var mcastQueueToIntfMap map[uint32][]uint32
var val []byte
- kvPair, err := RsrcMgr.KVStore.Get(ctx, path)
+ rsrcMgr.mcastQueueForIntfLock.RLock()
+ if rsrcMgr.mcastQueueForIntfLoadedFromKvStore {
+ rsrcMgr.mcastQueueForIntfLock.RUnlock()
+ return rsrcMgr.mcastQueueForIntf, nil
+ }
+ rsrcMgr.mcastQueueForIntfLock.RUnlock()
+
+ kvPair, err := rsrcMgr.KVStore.Get(ctx, path)
if err != nil {
logger.Error(ctx, "failed to get data from kv store")
return nil, err
}
if kvPair != nil && kvPair.Value != nil {
if val, err = kvstore.ToByte(kvPair.Value); err != nil {
- logger.Error(ctx, "Failed to convert to byte array ", log.Fields{"error": err})
+ logger.Error(ctx, "Failed to convert to byte array ", log.Fields{"err": err})
return nil, err
}
- if err = json.Unmarshal(val, &mcastQueueToIntfMap); err != nil {
- logger.Error(ctx, "Failed to unmarshall ", log.Fields{"error": err})
+ rsrcMgr.mcastQueueForIntfLock.Lock()
+ defer rsrcMgr.mcastQueueForIntfLock.Unlock()
+ if err = json.Unmarshal(val, &rsrcMgr.mcastQueueForIntf); err != nil {
+ logger.Error(ctx, "Failed to unmarshall ", log.Fields{"err": err})
return nil, err
}
+ rsrcMgr.mcastQueueForIntfLoadedFromKvStore = true
}
- return mcastQueueToIntfMap, nil
+ return rsrcMgr.mcastQueueForIntf, nil
}
//AddMcastQueueForIntf adds multicast queue for pon interface
-func (RsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(ctx context.Context, intf uint32, gem uint32, servicePriority uint32) error {
+func (rsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(ctx context.Context, intf uint32, gem uint32, servicePriority uint32) error {
var val []byte
path := McastQueuesForIntf
- mcastQueues, err := RsrcMgr.GetMcastQueuePerInterfaceMap(ctx)
+ // Load local cache from kv store the first time
+ rsrcMgr.mcastQueueForIntfLock.RLock()
+ if !rsrcMgr.mcastQueueForIntfLoadedFromKvStore {
+ rsrcMgr.mcastQueueForIntfLock.RUnlock()
+ _, err := rsrcMgr.GetMcastQueuePerInterfaceMap(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "Failed to get multicast queue info for interface", log.Fields{"err": err, "intf": intf})
+ return err
+ }
+ } else {
+ rsrcMgr.mcastQueueForIntfLock.RUnlock()
+ }
+
+ // Update KV store
+ rsrcMgr.mcastQueueForIntfLock.Lock()
+ rsrcMgr.mcastQueueForIntf[intf] = []uint32{gem, servicePriority}
+ val, err := json.Marshal(rsrcMgr.mcastQueueForIntf)
if err != nil {
- logger.Errorw(ctx, "Failed to get multicast queue info for interface", log.Fields{"error": err, "intf": intf})
+ rsrcMgr.mcastQueueForIntfLock.Unlock()
+ logger.Errorw(ctx, "Failed to marshal data", log.Fields{"err": err})
return err
}
- if mcastQueues == nil {
- mcastQueues = make(map[uint32][]uint32)
- }
- mcastQueues[intf] = []uint32{gem, servicePriority}
- if val, err = json.Marshal(mcastQueues); err != nil {
- logger.Errorw(ctx, "Failed to marshal data", log.Fields{"error": err})
+ rsrcMgr.mcastQueueForIntfLock.Unlock()
+
+ if err = rsrcMgr.KVStore.Put(ctx, path, val); err != nil {
+ logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"err": err, "path": path, "value": val})
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
- logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
- return err
- }
- logger.Debugw(ctx, "added multicast queue info to KV store successfully", log.Fields{"path": path, "mcastQueueInfo": mcastQueues[intf], "interfaceId": intf})
+ logger.Debugw(ctx, "added multicast queue info to KV store successfully", log.Fields{"path": path, "interfaceId": intf, "gem": gem, "svcPrior": servicePriority})
return nil
}
//AddFlowGroupToKVStore adds flow group into KV store
-func (RsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(ctx context.Context, groupEntry *ofp.OfpGroupEntry, cached bool) error {
+func (rsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(ctx context.Context, groupEntry *ofp.OfpGroupEntry, cached bool) error {
var Value []byte
var err error
var path string
@@ -1560,6 +1269,10 @@
OutPorts: outPorts,
}
+ rsrcMgr.groupInfoLock.Lock()
+ rsrcMgr.groupInfo[path] = &groupInfo
+ rsrcMgr.groupInfoLock.Unlock()
+
Value, err = json.Marshal(groupInfo)
if err != nil {
@@ -1567,7 +1280,7 @@
return err
}
- if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
+ if err = rsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
logger.Errorf(ctx, "Failed to update resource %s", path)
return err
}
@@ -1575,14 +1288,18 @@
}
//RemoveFlowGroupFromKVStore removes flow group from KV store
-func (RsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) error {
+func (rsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) error {
var path string
if cached {
path = fmt.Sprintf(FlowGroupCached, groupID)
} else {
path = fmt.Sprintf(FlowGroup, groupID)
}
- if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
+ rsrcMgr.groupInfoLock.Lock()
+ delete(rsrcMgr.groupInfo, path)
+ rsrcMgr.groupInfoLock.Unlock()
+
+ if err := rsrcMgr.KVStore.Delete(ctx, path); err != nil {
logger.Errorf(ctx, "Failed to remove resource %s due to %s", path, err)
return err
}
@@ -1592,7 +1309,7 @@
//GetFlowGroupFromKVStore fetches flow group from the KV store. Returns (false, {} error) if any problem occurs during
//fetching the data. Returns (true, groupInfo, nil) if the group is fetched successfully.
// Returns (false, {}, nil) if the group does not exists in the KV store.
-func (RsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (bool, GroupInfo, error) {
+func (rsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(ctx context.Context, groupID uint32, cached bool) (bool, GroupInfo, error) {
var groupInfo GroupInfo
var path string
if cached {
@@ -1600,20 +1317,34 @@
} else {
path = fmt.Sprintf(FlowGroup, groupID)
}
- kvPair, err := RsrcMgr.KVStore.Get(ctx, path)
+
+ // read from cache
+ rsrcMgr.groupInfoLock.RLock()
+ gi, ok := rsrcMgr.groupInfo[path]
+ rsrcMgr.groupInfoLock.RUnlock()
+ if ok {
+ return true, *gi, nil
+ }
+
+ kvPair, err := rsrcMgr.KVStore.Get(ctx, path)
if err != nil {
return false, groupInfo, err
}
if kvPair != nil && kvPair.Value != nil {
Val, err := kvstore.ToByte(kvPair.Value)
if err != nil {
- logger.Errorw(ctx, "Failed to convert flow group into byte array", log.Fields{"error": err})
+ logger.Errorw(ctx, "Failed to convert flow group into byte array", log.Fields{"err": err})
return false, groupInfo, err
}
if err = json.Unmarshal(Val, &groupInfo); err != nil {
- logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"error": err})
+ logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"err": err})
return false, groupInfo, err
}
+ // update cache
+ rsrcMgr.groupInfoLock.Lock()
+ rsrcMgr.groupInfo[path] = &groupInfo
+ rsrcMgr.groupInfoLock.Unlock()
+
return true, groupInfo, nil
}
return false, groupInfo, nil
@@ -1631,19 +1362,3 @@
return nil, fmt.Errorf("unexpected-type-%T", t)
}
}
-
-func checkForFlowIDInList(FlowIDList []uint64, FlowID uint64) (bool, uint64) {
- /*
- Check for a flow id in a given list of flow IDs.
- :param FLowIDList: List of Flow IDs
- :param FlowID: Flowd to check in the list
- : return true and the index if present false otherwise.
- */
-
- for idx := range FlowIDList {
- if FlowID == FlowIDList[idx] {
- return true, uint64(idx)
- }
- }
- return false, 0
-}