[VOL-5417] Add GetWithPrefix and GetWithPrefixKeysOnly functions for the kvclient Interface
Change-Id: I446f414d157d5794de4302450fc075073243b564
Signed-off-by: pnalmas <praneeth.nalmas@radisys.com>
diff --git a/pkg/db/backend.go b/pkg/db/backend.go
index dc24fe6..bb99970 100644
--- a/pkg/db/backend.go
+++ b/pkg/db/backend.go
@@ -87,6 +87,7 @@
func (b *Backend) makePath(ctx context.Context, key string) string {
path := fmt.Sprintf("%s/%s", b.PathPrefix, key)
+ logger.Debugw(ctx, "make-path", log.Fields{"key": key, "path": path})
return path
}
@@ -145,7 +146,7 @@
}
// Extract Alive status of Kvstore based on type of error
-func (b *Backend) isErrorIndicatingAliveKvstore(ctx context.Context, err error) bool {
+func (b *Backend) isErrorIndicatingAliveKvstore(err error) bool {
// Alive unless observed an error indicating so
alive := true
@@ -188,7 +189,7 @@
pair, err := b.Client.List(ctx, formattedPath)
- b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(ctx, err))
+ b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(err))
return pair, err
}
@@ -203,7 +204,37 @@
pair, err := b.Client.Get(ctx, formattedPath)
- b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(ctx, err))
+ b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(err))
+
+ return pair, err
+}
+
+// GetWithPrefix retrieves one or more items that match the specified key prefix
+func (b *Backend) GetWithPrefix(ctx context.Context, prefixKey string) (map[string]*kvstore.KVPair, error) {
+ span, ctx := log.CreateChildSpan(ctx, "kvs-get-with-prefix")
+ defer span.Finish()
+
+ formattedPath := b.makePath(ctx, prefixKey)
+ logger.Debugw(ctx, "get-entries-matching-prefix-key", log.Fields{"key": prefixKey, "path": formattedPath})
+
+ pair, err := b.Client.GetWithPrefix(ctx, formattedPath)
+
+ b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(err))
+
+ return pair, err
+}
+
+// GetWithPrefixKeysOnly retrieves one or more keys that match the specified key prefix
+func (b *Backend) GetWithPrefixKeysOnly(ctx context.Context, prefixKey string) ([]string, error) {
+ span, ctx := log.CreateChildSpan(ctx, "kvs-get-with-prefix")
+ defer span.Finish()
+
+ formattedPath := b.makePath(ctx, prefixKey)
+ logger.Debugw(ctx, "get-keys-entries-matching-prefix-key", log.Fields{"key": prefixKey, "path": formattedPath})
+
+ pair, err := b.Client.GetWithPrefixKeysOnly(ctx, formattedPath)
+
+ b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(err))
return pair, err
}
@@ -218,7 +249,7 @@
err := b.Client.Put(ctx, formattedPath, value)
- b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(ctx, err))
+ b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(err))
return err
}
@@ -233,7 +264,7 @@
err := b.Client.Delete(ctx, formattedPath)
- b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(ctx, err))
+ b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(err))
return err
}
@@ -247,7 +278,7 @@
err := b.Client.DeleteWithPrefix(ctx, formattedPath)
- b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(ctx, err))
+ b.updateLiveness(ctx, b.isErrorIndicatingAliveKvstore(err))
return err
}
diff --git a/pkg/db/backend_test.go b/pkg/db/backend_test.go
index 2b38afe..c545739 100644
--- a/pkg/db/backend_test.go
+++ b/pkg/db/backend_test.go
@@ -216,7 +216,7 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if backend.isErrorIndicatingAliveKvstore(context.Background(), tt.arg) != tt.want {
+ if backend.isErrorIndicatingAliveKvstore(tt.arg) != tt.want {
t.Errorf("isErrorIndicatingAliveKvstore failed for %s: expected %t but got %t", tt.name, tt.want, !tt.want)
}
})
diff --git a/pkg/db/kvstore/client.go b/pkg/db/kvstore/client.go
index c59f4b3..85bc5f5 100644
--- a/pkg/db/kvstore/client.go
+++ b/pkg/db/kvstore/client.go
@@ -76,6 +76,8 @@
type Client interface {
List(ctx context.Context, key string) (map[string]*KVPair, error)
Get(ctx context.Context, key string) (*KVPair, error)
+ GetWithPrefix(ctx context.Context, prefixKey string) (map[string]*KVPair, error)
+ GetWithPrefixKeysOnly(ctx context.Context, prefixKey string) ([]string, error)
Put(ctx context.Context, key string, value interface{}) error
Delete(ctx context.Context, key string) error
DeleteWithPrefix(ctx context.Context, prefixKey string) error
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index 8439afe..c5df1d8 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -171,6 +171,57 @@
}
}
+// GetWithPrefix fetches all key-value pairs with the specified prefix from etcd.
+// Returns a map of key-value pairs or an error if the operation fails.
+func (c *EtcdClient) GetWithPrefix(ctx context.Context, prefixKey string) (map[string]*KVPair, error) {
+ // Acquire a client from the pool
+ client, err := c.pool.Get(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get client from pool: %w", err)
+ }
+ defer c.pool.Put(client)
+
+ // Fetch keys with the prefix
+ resp, err := client.Get(ctx, prefixKey, v3Client.WithPrefix())
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch entries for prefix %s: %w", prefixKey, err)
+ }
+
+ // Initialize the result map
+ result := make(map[string]*KVPair)
+
+ // Iterate through the fetched key-value pairs and populate the map
+ for _, ev := range resp.Kvs {
+ result[string(ev.Key)] = NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version)
+ }
+
+ return result, nil
+}
+
+// GetWithPrefixKeysOnly retrieves only the keys that match a given prefix.
+func (c *EtcdClient) GetWithPrefixKeysOnly(ctx context.Context, prefixKey string) ([]string, error) {
+ // Acquire a client from the pool
+ client, err := c.pool.Get(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get client from pool: %w", err)
+ }
+ defer c.pool.Put(client)
+
+ // Fetch keys with the prefix
+ resp, err := client.Get(ctx, prefixKey, v3Client.WithPrefix(), v3Client.WithKeysOnly())
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch entries for prefix %s: %w", prefixKey, err)
+ }
+
+ // Extract keys from the response
+ keys := []string{}
+ for _, kv := range resp.Kvs {
+ keys = append(keys, string(kv.Key))
+ }
+
+ return keys, nil
+}
+
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
// accepts only a string as a value for a put operation. Timeout defines how long the function will
// wait for a response
diff --git a/pkg/db/kvstore/redisclient.go b/pkg/db/kvstore/redisclient.go
index 742916c..7ed4159 100644
--- a/pkg/db/kvstore/redisclient.go
+++ b/pkg/db/kvstore/redisclient.go
@@ -415,3 +415,41 @@
logger.Errorw(ctx, "error-closing-client", log.Fields{"error": err})
}
}
+
+func (c *RedisClient) GetWithPrefix(ctx context.Context, prefix string) (map[string]*KVPair, error) {
+ var err error
+ var keys []string
+ m := make(map[string]*KVPair)
+ var values []interface{}
+
+ if keys, err = c.scanAllKeysWithPrefix(ctx, prefix); err != nil {
+ return nil, err
+ }
+
+ if len(keys) != 0 {
+ values, err = c.redisAPI.MGet(ctx, keys...).Result()
+ if err != nil {
+ return nil, err
+ }
+ }
+ for i, key := range keys {
+ if valBytes, err := ToByte(values[i]); err == nil {
+ m[key] = NewKVPair(key, interface{}(valBytes), "", 0, 0)
+ }
+ }
+ return m, nil
+}
+
+func (c *RedisClient) GetWithPrefixKeysOnly(ctx context.Context, prefix string) ([]string, error) {
+ // Use the scanAllKeysWithPrefix function to fetch keys matching the prefix
+ keys, err := c.scanAllKeysWithPrefix(ctx, prefix)
+ if err != nil {
+ return nil, fmt.Errorf("failed to scan keys with prefix %s: %v", prefix, err)
+ }
+
+ if len(keys) == 0 {
+ logger.Debugw(ctx, "no-keys-found", log.Fields{"prefix": prefix})
+ }
+
+ return keys, nil
+}
diff --git a/pkg/ponresourcemanager/ponresourcemanager.go b/pkg/ponresourcemanager/ponresourcemanager.go
index 1af791f..26bef32 100755
--- a/pkg/ponresourcemanager/ponresourcemanager.go
+++ b/pkg/ponresourcemanager/ponresourcemanager.go
@@ -214,13 +214,13 @@
PONMgr.KVStore = SetKVClient(ctx, Technology, Backend, Address, false, basePathKvStore)
if PONMgr.KVStore == nil {
logger.Error(ctx, "KV Client initilization failed")
- return nil, errors.New("Failed to init KV client")
+ return nil, errors.New("failed to init KV client")
}
// init kv client to read from the config path
PONMgr.KVStoreForConfig = SetKVClient(ctx, Technology, Backend, Address, true, basePathKvStore)
if PONMgr.KVStoreForConfig == nil {
logger.Error(ctx, "KV Config Client initilization failed")
- return nil, errors.New("Failed to init KV Config client")
+ return nil, errors.New("failed to init KV Config client")
}
PONMgr.PonResourceRanges = make(map[string]interface{})
@@ -483,7 +483,7 @@
}
if status := PONRMgr.ClearResourceIDPool(ctx, Intf, ONU_ID); !status {
logger.Error(ctx, "Failed to clear ONU ID resource pool")
- return errors.New("Failed to clear ONU ID resource pool")
+ return errors.New("failed to clear ONU ID resource pool")
}
if SharedPoolID != 0 {
break
@@ -497,7 +497,7 @@
}
if status := PONRMgr.ClearResourceIDPool(ctx, Intf, ALLOC_ID); !status {
logger.Error(ctx, "Failed to clear ALLOC ID resource pool ")
- return errors.New("Failed to clear ALLOC ID resource pool")
+ return errors.New("failed to clear ALLOC ID resource pool")
}
if SharedPoolID != 0 {
break
@@ -510,7 +510,7 @@
}
if status := PONRMgr.ClearResourceIDPool(ctx, Intf, GEMPORT_ID); !status {
logger.Error(ctx, "Failed to clear GEMPORT ID resource pool")
- return errors.New("Failed to clear GEMPORT ID resource pool")
+ return errors.New("failed to clear GEMPORT ID resource pool")
}
if SharedPoolID != 0 {
break
@@ -524,7 +524,7 @@
}
if status := PONRMgr.ClearResourceIDPool(ctx, Intf, FLOW_ID); !status {
logger.Error(ctx, "Failed to clear FLOW ID resource pool")
- return errors.New("Failed to clear FLOW ID resource pool")
+ return errors.New("failed to clear FLOW ID resource pool")
}
if SharedPoolID != 0 {
break
@@ -539,22 +539,22 @@
if status := PONRMgr.ClearResourceIDPool(ctx, intfID, ONU_ID); !status {
logger.Error(ctx, "Failed to clear ONU ID resource pool")
- return errors.New("Failed to clear ONU ID resource pool")
+ return errors.New("failed to clear ONU ID resource pool")
}
if status := PONRMgr.ClearResourceIDPool(ctx, intfID, ALLOC_ID); !status {
logger.Error(ctx, "Failed to clear ALLOC ID resource pool ")
- return errors.New("Failed to clear ALLOC ID resource pool")
+ return errors.New("failed to clear ALLOC ID resource pool")
}
if status := PONRMgr.ClearResourceIDPool(ctx, intfID, GEMPORT_ID); !status {
logger.Error(ctx, "Failed to clear GEMPORT ID resource pool")
- return errors.New("Failed to clear GEMPORT ID resource pool")
+ return errors.New("failed to clear GEMPORT ID resource pool")
}
if status := PONRMgr.ClearResourceIDPool(ctx, intfID, FLOW_ID); !status {
logger.Error(ctx, "Failed to clear FLOW ID resource pool")
- return errors.New("Failed to clear FLOW ID resource pool")
+ return errors.New("failed to clear FLOW ID resource pool")
}
return nil
@@ -580,7 +580,7 @@
Path := PONRMgr.GetPath(ctx, Intf, ResourceType)
if Path == "" {
logger.Errorf(ctx, "Failed to get path for resource type %s", ResourceType)
- return fmt.Errorf("Failed to get path for resource type %s", ResourceType)
+ return fmt.Errorf("failed to get path for resource type %s", ResourceType)
}
//In case of adapter reboot and reconciliation resource in kv store
@@ -663,7 +663,7 @@
var TSData *bitmap.Threadsafe
if TSData = bitmap.NewTS(int(EndIDx)); TSData == nil {
logger.Error(ctx, "Failed to create a bitmap")
- return nil, errors.New("Failed to create bitmap")
+ return nil, errors.New("failed to create bitmap")
}
for _, excludedID := range Excluded {
if excludedID < StartIDx || excludedID > EndIDx {
@@ -781,7 +781,7 @@
if NumIDs < 1 {
logger.Error(ctx, "Invalid number of resources requested")
- return nil, fmt.Errorf("Invalid number of resources requested %d", NumIDs)
+ return nil, fmt.Errorf("invalid number of resources requested %d", NumIDs)
}
// delegate to the master instance if sharing enabled across instances
@@ -794,7 +794,7 @@
Path := PONRMgr.GetPath(ctx, IntfID, ResourceType)
if Path == "" {
logger.Errorf(ctx, "Failed to get path for resource type %s", ResourceType)
- return nil, fmt.Errorf("Failed to get path for resource type %s", ResourceType)
+ return nil, fmt.Errorf("failed to get path for resource type %s", ResourceType)
}
logger.Debugf(ctx, "Get resource for type %s on path %s", ResourceType, Path)
var Result []uint32
@@ -831,7 +831,7 @@
//Update resource in kv store
if PONRMgr.UpdateResource(ctx, Path, Resource) != nil {
logger.Errorf(ctx, "Failed to update resource %s", Path)
- return nil, fmt.Errorf("Failed to update resource %s", Path)
+ return nil, fmt.Errorf("failed to update resource %s", Path)
}
return Result, nil
}
@@ -863,12 +863,12 @@
})
if !checkValidResourceType(ResourceType) {
- err := fmt.Errorf("Invalid resource type: %s", ResourceType)
+ err := fmt.Errorf("invalid resource type: %s", ResourceType)
logger.Error(ctx, err.Error())
return err
}
if ReleaseContent == nil {
- err := fmt.Errorf("Nothing to release")
+ err := fmt.Errorf("nothing to release")
logger.Debug(ctx, err.Error())
return err
}
@@ -879,7 +879,7 @@
}
Path := PONRMgr.GetPath(ctx, IntfID, ResourceType)
if Path == "" {
- err := fmt.Errorf("Failed to get path for IntfId %d and ResourceType %s", IntfID, ResourceType)
+ err := fmt.Errorf("failed to get path for IntfId %d and ResourceType %s", IntfID, ResourceType)
logger.Error(ctx, err.Error())
return err
}
@@ -892,7 +892,7 @@
PONRMgr.ReleaseID(ctx, Resource, Val)
}
if PONRMgr.UpdateResource(ctx, Path, Resource) != nil {
- err := fmt.Errorf("Free resource for %s failed", Path)
+ err := fmt.Errorf("free resource for %s failed", Path)
logger.Errorf(ctx, err.Error())
return err
}
@@ -1292,7 +1292,7 @@
}
Data := bitmap.TSFromData(ByteArray, false)
if Data == nil {
- return 0, errors.New("Failed to get data from byte array")
+ return 0, errors.New("failed to get data from byte array")
}
Len := Data.Len()
diff --git a/pkg/ponresourcemanager/ponresourcemanager_test.go b/pkg/ponresourcemanager/ponresourcemanager_test.go
index ff80b11..626864e 100644
--- a/pkg/ponresourcemanager/ponresourcemanager_test.go
+++ b/pkg/ponresourcemanager/ponresourcemanager_test.go
@@ -20,11 +20,12 @@
"context"
"encoding/json"
"errors"
- "github.com/boljen/go-bitmap"
"strings"
"testing"
"time"
+ "github.com/boljen/go-bitmap"
+
"github.com/opencord/voltha-lib-go/v7/pkg/db"
"github.com/opencord/voltha-lib-go/v7/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
@@ -44,6 +45,7 @@
func newMockKvClient(ctx context.Context) *MockResKVClient {
var mockResKVClient MockResKVClient
mockResKVClient.resourceMap = make(map[string]interface{})
+ logger.Debug(ctx, "Creating new MockKVClient")
return &mockResKVClient
}
@@ -74,6 +76,42 @@
return nil, errors.New("key didn't find")
}
+// GetWithPrefix mock function implementation for KVClient
+func (kvclient *MockResKVClient) GetWithPrefix(ctx context.Context, prefixKey string) (map[string]*kvstore.KVPair, error) {
+ logger.Debugw(ctx, "GetWithPrefix of MockKVClient called", log.Fields{"prefixKey": prefixKey})
+ if prefixKey != "" {
+ if strings.Contains(prefixKey, GEM_POOL_PATH) {
+ logger.Debug(ctx, "Getting keys with prefix:", GEM_POOL_PATH)
+ maps := make(map[string]*kvstore.KVPair)
+ for key, resource := range kvclient.resourceMap {
+ if strings.HasPrefix(key, prefixKey) {
+ maps[key] = kvstore.NewKVPair(key, resource, "mock", 3000, 1)
+ }
+ }
+ return maps, nil
+ }
+ }
+ return nil, errors.New("prefixKey didn't find")
+}
+
+// GetWithPrefixKeysOnly returns only the keys with the specified prefix.
+func (kvclient *MockResKVClient) GetWithPrefixKeysOnly(ctx context.Context, prefixKey string) ([]string, error) {
+ logger.Debugw(ctx, "GetWithPrefixKeysOnly of MockKVClient called", log.Fields{"prefixKey": prefixKey})
+ if prefixKey != "" {
+ if strings.Contains(prefixKey, GEM_POOL_PATH) {
+ logger.Debug(ctx, "Getting keys with prefix:", GEM_POOL_PATH)
+ var keys []string
+ for key := range kvclient.resourceMap {
+ if strings.HasPrefix(key, prefixKey) {
+ keys = append(keys, key)
+ }
+ }
+ return keys, nil
+ }
+ }
+ return nil, errors.New("prefixKey not found")
+}
+
// Put mock function implementation for KVClient
func (kvclient *MockResKVClient) Put(ctx context.Context, key string, value interface{}) error {
if key != "" {