VOL-2180 code changes for context in voltha-lib-go
Change-Id: Icd5b808f52f92970cef1e5a0ec2e4e3ef8e18695
diff --git a/VERSION b/VERSION
index b502146..75a22a2 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.2
+3.0.3
diff --git a/pkg/db/backend.go b/pkg/db/backend.go
index 23ad5a0..9bb49ac 100644
--- a/pkg/db/backend.go
+++ b/pkg/db/backend.go
@@ -119,8 +119,8 @@
// Perform a dummy Key Lookup on kvstore to test Connection Liveness and
// post on Liveness channel
-func (b *Backend) PerformLivenessCheck(timeout int) bool {
- alive := b.Client.IsConnectionUp(timeout)
+func (b *Backend) PerformLivenessCheck(ctx context.Context) bool {
+ alive := b.Client.IsConnectionUp(ctx)
logger.Debugw("kvstore-liveness-check-result", log.Fields{"alive": alive})
b.updateLiveness(alive)
@@ -187,14 +187,14 @@
}
// List retrieves one or more items that match the specified key
-func (b *Backend) List(key string) (map[string]*kvstore.KVPair, error) {
+func (b *Backend) List(ctx context.Context, key string) (map[string]*kvstore.KVPair, error) {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
logger.Debugw("listing-key", log.Fields{"key": key, "path": formattedPath})
- pair, err := b.Client.List(formattedPath, b.Timeout)
+ pair, err := b.Client.List(ctx, formattedPath)
b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
@@ -202,14 +202,14 @@
}
// Get retrieves an item that matches the specified key
-func (b *Backend) Get(key string) (*kvstore.KVPair, error) {
+func (b *Backend) Get(ctx context.Context, key string) (*kvstore.KVPair, error) {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
logger.Debugw("getting-key", log.Fields{"key": key, "path": formattedPath})
- pair, err := b.Client.Get(formattedPath, b.Timeout)
+ pair, err := b.Client.Get(ctx, formattedPath)
b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
@@ -217,14 +217,14 @@
}
// Put stores an item value under the specifed key
-func (b *Backend) Put(key string, value interface{}) error {
+func (b *Backend) Put(ctx context.Context, key string, value interface{}) error {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
logger.Debugw("putting-key", log.Fields{"key": key, "value": string(value.([]byte)), "path": formattedPath})
- err := b.Client.Put(formattedPath, value, b.Timeout)
+ err := b.Client.Put(ctx, formattedPath, value)
b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
@@ -232,14 +232,14 @@
}
// Delete removes an item under the specified key
-func (b *Backend) Delete(key string) error {
+func (b *Backend) Delete(ctx context.Context, key string) error {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
logger.Debugw("deleting-key", log.Fields{"key": key, "path": formattedPath})
- err := b.Client.Delete(formattedPath, b.Timeout)
+ err := b.Client.Delete(ctx, formattedPath)
b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
@@ -247,14 +247,14 @@
}
// CreateWatch starts watching events for the specified key
-func (b *Backend) CreateWatch(key string) chan *kvstore.Event {
+func (b *Backend) CreateWatch(ctx context.Context, key string) chan *kvstore.Event {
b.Lock()
defer b.Unlock()
formattedPath := b.makePath(key)
logger.Debugw("creating-key-watch", log.Fields{"key": key, "path": formattedPath})
- return b.Client.Watch(formattedPath)
+ return b.Client.Watch(ctx, formattedPath)
}
// DeleteWatch stops watching events for the specified key
diff --git a/pkg/db/backend_test.go b/pkg/db/backend_test.go
index 8eae015..0da95cd 100644
--- a/pkg/db/backend_test.go
+++ b/pkg/db/backend_test.go
@@ -119,14 +119,18 @@
// Liveness Check against Embedded Etcd Server should return alive state
func TestPerformLivenessCheck_EmbeddedEtcdServer(t *testing.T) {
backend := provisionBackendWithEmbeddedEtcdServer(t)
- alive := backend.PerformLivenessCheck(defaultTimeout)
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout*time.Second)
+ defer cancel()
+ alive := backend.PerformLivenessCheck(ctx)
assert.True(t, alive)
}
// Liveness Check against Dummy Etcd Server should return not-alive state
func TestPerformLivenessCheck_DummyEtcdServer(t *testing.T) {
backend := provisionBackendWithDummyEtcdServer(t)
- alive := backend.PerformLivenessCheck(defaultTimeout)
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout*time.Second)
+ defer cancel()
+ alive := backend.PerformLivenessCheck(ctx)
assert.False(t, alive)
}
@@ -144,7 +148,9 @@
// Enabling Liveness Channel after First Liveness Check
func TestEnableLivenessChannel_EmbeddedEtcdServer_AfterLivenessCheck(t *testing.T) {
backend := provisionBackendWithEmbeddedEtcdServer(t)
- backend.PerformLivenessCheck(defaultTimeout)
+ ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout*time.Second)
+ defer cancel()
+ backend.PerformLivenessCheck(ctx)
alive := backend.EnableLivenessChannel()
assert.NotNil(t, alive)
@@ -225,23 +231,25 @@
}
func TestPut_EmbeddedEtcdServer(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
backend := provisionBackendWithEmbeddedEtcdServer(t)
- err := backend.Put("key1", []uint8("value1"))
+ err := backend.Put(ctx, "key1", []uint8("value1"))
assert.Nil(t, err)
// Assert alive state has become true
assert.True(t, backend.alive)
// Assert that kvstore has this value stored
- kvpair, err := backend.Get("key1")
+ kvpair, err := backend.Get(ctx, "key1")
assert.NotNil(t, kvpair)
assert.Equal(t, defaultPathPrefix+"/key1", kvpair.Key)
assert.Equal(t, []uint8("value1"), kvpair.Value)
// Assert that Put overrides the Value
- err = backend.Put("key1", []uint8("value11"))
+ err = backend.Put(ctx, "key1", []uint8("value11"))
assert.Nil(t, err)
- kvpair, err = backend.Get("key1")
+ kvpair, err = backend.Get(ctx, "key1")
assert.NotNil(t, kvpair)
assert.Equal(t, defaultPathPrefix+"/key1", kvpair.Key)
assert.Equal(t, []uint8("value11"), kvpair.Value)
@@ -249,8 +257,10 @@
// Put operation should fail against Dummy Non-existent Etcd Server
func TestPut_DummyEtcdServer(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
backend := provisionBackendWithDummyEtcdServer(t)
- err := backend.Put("key1", []uint8("value1"))
+ err := backend.Put(ctx, "key1", []uint8("value1"))
assert.NotNil(t, err)
// Assert alive state is still false
@@ -259,29 +269,33 @@
// Test Get for existing and non-existing key
func TestGet_EmbeddedEtcdServer(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
backend := provisionBackendWithEmbeddedEtcdServer(t)
- err := backend.Put("key2", []uint8("value2"))
+ err := backend.Put(ctx, "key2", []uint8("value2"))
// Assert alive state has become true
assert.True(t, backend.alive)
// Assert that kvstore has this key stored
- kvpair, err := backend.Get("key2")
+ kvpair, err := backend.Get(ctx, "key2")
assert.NotNil(t, kvpair)
assert.Nil(t, err)
assert.Equal(t, defaultPathPrefix+"/key2", kvpair.Key)
assert.Equal(t, []uint8("value2"), kvpair.Value)
// Assert that Get works fine for absent key3
- kvpair, err = backend.Get("key3")
+ kvpair, err = backend.Get(ctx, "key3")
assert.Nil(t, kvpair)
assert.Nil(t, err) // no error as lookup is successful
}
// Get operation should fail against Dummy Non-existent Etcd Server
func TestGet_DummyEtcdServer(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
backend := provisionBackendWithDummyEtcdServer(t)
- kvpair, err := backend.Get("key2")
+ kvpair, err := backend.Get(ctx, "key2")
assert.NotNil(t, err)
assert.Nil(t, kvpair)
@@ -291,31 +305,35 @@
// Test Delete for existing and non-existing key
func TestDelete_EmbeddedEtcdServer(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
backend := provisionBackendWithEmbeddedEtcdServer(t)
- err := backend.Put("key3", []uint8("value3"))
+ err := backend.Put(ctx, "key3", []uint8("value3"))
// Assert alive state has become true
assert.True(t, backend.alive)
// Assert that kvstore has this key stored
- kvpair, err := backend.Get("key3")
+ kvpair, err := backend.Get(ctx, "key3")
assert.NotNil(t, kvpair)
// Delete and Assert that key has been removed
- err = backend.Delete("key3")
+ err = backend.Delete(ctx, "key3")
assert.Nil(t, err)
- kvpair, err = backend.Get("key3")
+ kvpair, err = backend.Get(ctx, "key3")
assert.Nil(t, kvpair)
// Assert that Delete silently ignores absent key3
- err = backend.Delete("key3")
+ err = backend.Delete(ctx, "key3")
assert.Nil(t, err)
}
// Delete operation should fail against Dummy Non-existent Etcd Server
func TestDelete_DummyEtcdServer(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
backend := provisionBackendWithDummyEtcdServer(t)
- err := backend.Delete("key3")
+ err := backend.Delete(ctx, "key3")
assert.NotNil(t, err)
// Assert alive state is still false
@@ -324,23 +342,27 @@
// Test List for series of values under a key path
func TestList_EmbeddedEtcdServer(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
key41 := "key4/subkey1"
key42 := "key4/subkey2"
backend := provisionBackendWithEmbeddedEtcdServer(t)
- backend.Put(key41, []uint8("value4-1"))
- backend.Put(key42, []uint8("value4-2"))
+ err := backend.Put(ctx, key41, []uint8("value4-1"))
+ assert.Nil(t, err)
+ err = backend.Put(ctx, key42, []uint8("value4-2"))
+ assert.Nil(t, err)
// Assert alive state has become true
assert.True(t, backend.alive)
// Assert that Get does not retrieve these Subkeys
- kvpair, err := backend.Get("key4")
+ kvpair, err := backend.Get(ctx, "key4")
assert.Nil(t, kvpair)
assert.Nil(t, err)
// Assert that List operation retrieves these Child Keys
- kvmap, err := backend.List("key4")
+ kvmap, err := backend.List(ctx, "key4")
assert.NotNil(t, kvmap)
assert.Nil(t, err)
assert.Equal(t, 2, len(kvmap))
@@ -354,8 +376,10 @@
// List operation should fail against Dummy Non-existent Etcd Server
func TestList_DummyEtcdServer(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
backend := provisionBackendWithDummyEtcdServer(t)
- kvmap, err := backend.List("key4")
+ kvmap, err := backend.List(ctx, "key4")
assert.Nil(t, kvmap)
assert.NotNil(t, err)
@@ -365,8 +389,10 @@
// Test Create and Delete Watch for Embedded Etcd Server
func TestCreateWatch_EmbeddedEtcdServer(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
backend := provisionBackendWithEmbeddedEtcdServer(t)
- eventChan := backend.CreateWatch("key5")
+ eventChan := backend.CreateWatch(ctx, "key5")
assert.NotNil(t, eventChan)
assert.Equal(t, 0, len(eventChan))
@@ -374,7 +400,7 @@
assert.False(t, backend.alive)
// Put a value for watched key and event should appear
- err := backend.Put("key5", []uint8("value5"))
+ err := backend.Put(ctx, "key5", []uint8("value5"))
assert.Nil(t, err)
time.Sleep(time.Millisecond * 100)
assert.Equal(t, 1, len(eventChan))
diff --git a/pkg/db/kvstore/client.go b/pkg/db/kvstore/client.go
index 088593a..d30e049 100644
--- a/pkg/db/kvstore/client.go
+++ b/pkg/db/kvstore/client.go
@@ -15,6 +15,8 @@
*/
package kvstore
+import "context"
+
const (
// Default timeout in seconds when making a kvstore request
defaultKVGetTimeout = 5
@@ -71,18 +73,18 @@
// Client represents the set of APIs a KV Client must implement
type Client interface {
- List(key string, timeout int) (map[string]*KVPair, error)
- Get(key string, timeout int) (*KVPair, error)
- Put(key string, value interface{}, timeout int) error
- Delete(key string, timeout int) error
- Reserve(key string, value interface{}, ttl int64) (interface{}, error)
- ReleaseReservation(key string) error
- ReleaseAllReservations() error
- RenewReservation(key string) error
- Watch(key string) chan *Event
- AcquireLock(lockName string, timeout int) error
+ List(ctx context.Context, key string) (map[string]*KVPair, error)
+ Get(ctx context.Context, key string) (*KVPair, error)
+ Put(ctx context.Context, key string, value interface{}) error
+ Delete(ctx context.Context, key string) error
+ Reserve(ctx context.Context, key string, value interface{}, ttl int64) (interface{}, error)
+ ReleaseReservation(ctx context.Context, key string) error
+ ReleaseAllReservations(ctx context.Context) error
+ RenewReservation(ctx context.Context, key string) error
+ Watch(ctx context.Context, key string) chan *Event
+ AcquireLock(ctx context.Context, lockName string, timeout int) error
ReleaseLock(lockName string) error
- IsConnectionUp(timeout int) bool // timeout in second
+ IsConnectionUp(ctx context.Context) bool // timeout in second
CloseWatch(key string, ch chan *Event)
Close()
}
diff --git a/pkg/db/kvstore/consulclient.go b/pkg/db/kvstore/consulclient.go
index e391293..fdf39be 100644
--- a/pkg/db/kvstore/consulclient.go
+++ b/pkg/db/kvstore/consulclient.go
@@ -64,19 +64,19 @@
}
// IsConnectionUp returns whether the connection to the Consul KV store is up
-func (c *ConsulClient) IsConnectionUp(timeout int) bool {
+func (c *ConsulClient) IsConnectionUp(ctx context.Context) bool {
logger.Error("Unimplemented function")
return false
}
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) List(key string, timeout int) (map[string]*KVPair, error) {
- duration := GetDuration(timeout)
+func (c *ConsulClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
+ deadline, _ := ctx.Deadline()
kv := c.consul.KV()
var queryOptions consulapi.QueryOptions
- queryOptions.WaitTime = duration
+ queryOptions.WaitTime = GetDuration(deadline.Second())
// For now we ignore meta data
kvps, _, err := kv.List(key, &queryOptions)
if err != nil {
@@ -92,13 +92,12 @@
// Get returns a key-value pair for a given key. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Get(key string, timeout int) (*KVPair, error) {
+func (c *ConsulClient) Get(ctx context.Context, key string) (*KVPair, error) {
- duration := GetDuration(timeout)
-
+ deadline, _ := ctx.Deadline()
kv := c.consul.KV()
var queryOptions consulapi.QueryOptions
- queryOptions.WaitTime = duration
+ queryOptions.WaitTime = GetDuration(deadline.Second())
// For now we ignore meta data
kvp, _, err := kv.Get(key, &queryOptions)
if err != nil {
@@ -115,7 +114,7 @@
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Put(key string, value interface{}, timeout int) error {
+func (c *ConsulClient) Put(ctx context.Context, key string, value interface{}) error {
// Validate that we can create a byte array from the value as consul API expects a byte array
var val []byte
@@ -141,7 +140,7 @@
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Delete(key string, timeout int) error {
+func (c *ConsulClient) Delete(ctx context.Context, key string) error {
kv := c.consul.KV()
var writeOptions consulapi.WriteOptions
c.writeLock.Lock()
@@ -219,7 +218,7 @@
// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
// then the value assigned to that key will be returned.
-func (c *ConsulClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
+func (c *ConsulClient) Reserve(ctx context.Context, key string, value interface{}, ttl int64) (interface{}, error) {
// Validate that we can create a byte array from the value as consul API expects a byte array
var val []byte
@@ -264,7 +263,7 @@
logger.Debugw("key-acquired", log.Fields{"key": key, "status": result})
// Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
- m, err := c.Get(key, defaultKVGetTimeout)
+ m, err := c.Get(ctx, key)
if err != nil {
return nil, err
}
@@ -286,7 +285,7 @@
}
// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
-func (c *ConsulClient) ReleaseAllReservations() error {
+func (c *ConsulClient) ReleaseAllReservations(ctx context.Context) error {
kv := c.consul.KV()
var kvp consulapi.KVPair
var result bool
@@ -311,7 +310,7 @@
}
// ReleaseReservation releases reservation for a specific key.
-func (c *ConsulClient) ReleaseReservation(key string) error {
+func (c *ConsulClient) ReleaseReservation(ctx context.Context, key string) error {
var ok bool
var reservedValue interface{}
c.writeLock.Lock()
@@ -337,7 +336,7 @@
// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
// period specified when reserving the key
-func (c *ConsulClient) RenewReservation(key string) error {
+func (c *ConsulClient) RenewReservation(ctx context.Context, key string) error {
// In the case of Consul, renew reservation of a reserve key only require renewing the client session.
c.writeLock.Lock()
@@ -361,7 +360,7 @@
// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
// listen to receive Events.
-func (c *ConsulClient) Watch(key string) chan *Event {
+func (c *ConsulClient) Watch(ctx context.Context, key string) chan *Event {
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
@@ -504,7 +503,7 @@
}
}
-func (c *ConsulClient) AcquireLock(lockName string, timeout int) error {
+func (c *ConsulClient) AcquireLock(ctx context.Context, lockName string, timeout int) error {
return nil
}
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index 7096748..a0f39cd 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -19,11 +19,12 @@
"context"
"errors"
"fmt"
+ "sync"
+
"github.com/opencord/voltha-lib-go/v3/pkg/log"
v3Client "go.etcd.io/etcd/clientv3"
v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- "sync"
)
// EtcdClient represents the Etcd KV store client
@@ -64,23 +65,19 @@
// IsConnectionUp returns whether the connection to the Etcd KV store is up. If a timeout occurs then
// it is assumed the connection is down or unreachable.
-func (c *EtcdClient) IsConnectionUp(timeout int) bool {
+func (c *EtcdClient) IsConnectionUp(ctx context.Context) bool {
// Let's try to get a non existent key. If the connection is up then there will be no error returned.
- if _, err := c.Get("non-existent-key", timeout); err != nil {
+ if _, err := c.Get(ctx, "non-existent-key"); err != nil {
return false
}
+ //cancel()
return true
}
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) List(key string, timeout int) (map[string]*KVPair, error) {
- duration := GetDuration(timeout)
-
- ctx, cancel := context.WithTimeout(context.Background(), duration)
-
+func (c *EtcdClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
- cancel()
if err != nil {
logger.Error(err)
return nil, err
@@ -94,13 +91,10 @@
// Get returns a key-value pair for a given key. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Get(key string, timeout int) (*KVPair, error) {
- duration := GetDuration(timeout)
-
- ctx, cancel := context.WithTimeout(context.Background(), duration)
+func (c *EtcdClient) Get(ctx context.Context, key string) (*KVPair, error) {
resp, err := c.ectdAPI.Get(ctx, key)
- cancel()
+
if err != nil {
logger.Error(err)
return nil, err
@@ -115,7 +109,7 @@
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
// accepts only a string as a value for a put operation. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Put(key string, value interface{}, timeout int) error {
+func (c *EtcdClient) Put(ctx context.Context, key string, value interface{}) error {
// Validate that we can convert value to a string as etcd API expects a string
var val string
@@ -124,10 +118,6 @@
return fmt.Errorf("unexpected-type-%T", value)
}
- duration := GetDuration(timeout)
-
- ctx, cancel := context.WithTimeout(context.Background(), duration)
-
c.writeLock.Lock()
defer c.writeLock.Unlock()
@@ -139,7 +129,7 @@
} else {
_, err = c.ectdAPI.Put(ctx, key, val)
}
- cancel()
+
if err != nil {
switch err {
case context.Canceled:
@@ -158,13 +148,7 @@
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Delete(key string, timeout int) error {
-
- duration := GetDuration(timeout)
-
- ctx, cancel := context.WithTimeout(context.Background(), duration)
-
- defer cancel()
+func (c *EtcdClient) Delete(ctx context.Context, key string) error {
c.writeLock.Lock()
defer c.writeLock.Unlock()
@@ -183,7 +167,7 @@
// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
// then the value assigned to that key will be returned.
-func (c *EtcdClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
+func (c *EtcdClient) Reserve(ctx context.Context, key string, value interface{}, ttl int64) (interface{}, error) {
// Validate that we can convert value to a string as etcd API expects a string
var val string
var er error
@@ -191,12 +175,6 @@
return nil, fmt.Errorf("unexpected-type%T", value)
}
- duration := GetDuration(connTimeout)
-
- // Create a lease
- ctx, cancel := context.WithTimeout(context.Background(), duration)
- defer cancel()
-
resp, err := c.ectdAPI.Grant(ctx, ttl)
if err != nil {
logger.Error(err)
@@ -211,7 +189,7 @@
reservationSuccessful := false
defer func() {
if !reservationSuccessful {
- if err = c.ReleaseReservation(key); err != nil {
+ if err = c.ReleaseReservation(context.Background(), key); err != nil {
logger.Error("cannot-release-lease")
}
}
@@ -241,7 +219,7 @@
}
} else {
// Read the Key to ensure this is our Key
- m, err := c.Get(key, defaultKVGetTimeout)
+ m, err := c.Get(ctx, key)
if err != nil {
return nil, err
}
@@ -260,12 +238,9 @@
}
// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
-func (c *EtcdClient) ReleaseAllReservations() error {
+func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
c.writeLock.Lock()
defer c.writeLock.Unlock()
- duration := GetDuration(connTimeout)
- ctx, cancel := context.WithTimeout(context.Background(), duration)
- defer cancel()
for key, leaseID := range c.keyReservations {
_, err := c.ectdAPI.Revoke(ctx, *leaseID)
@@ -279,7 +254,7 @@
}
// ReleaseReservation releases reservation for a specific key.
-func (c *EtcdClient) ReleaseReservation(key string) error {
+func (c *EtcdClient) ReleaseReservation(ctx context.Context, key string) error {
// Get the leaseid using the key
logger.Debugw("Release-reservation", log.Fields{"key": key})
var ok bool
@@ -289,9 +264,6 @@
if leaseID, ok = c.keyReservations[key]; !ok {
return nil
}
- duration := GetDuration(connTimeout)
- ctx, cancel := context.WithTimeout(context.Background(), duration)
- defer cancel()
if leaseID != nil {
_, err := c.ectdAPI.Revoke(ctx, *leaseID)
@@ -306,7 +278,7 @@
// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
// period specified when reserving the key
-func (c *EtcdClient) RenewReservation(key string) error {
+func (c *EtcdClient) RenewReservation(ctx context.Context, key string) error {
// Get the leaseid using the key
var ok bool
var leaseID *v3Client.LeaseID
@@ -315,9 +287,6 @@
if leaseID, ok = c.keyReservations[key]; !ok {
return errors.New("key-not-reserved")
}
- duration := GetDuration(connTimeout)
- ctx, cancel := context.WithTimeout(context.Background(), duration)
- defer cancel()
if leaseID != nil {
_, err := c.ectdAPI.KeepAliveOnce(ctx, *leaseID)
@@ -333,9 +302,9 @@
// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
// listen to receive Events.
-func (c *EtcdClient) Watch(key string) chan *Event {
+func (c *EtcdClient) Watch(ctx context.Context, key string) chan *Event {
w := v3Client.NewWatcher(c.ectdAPI)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context.WithCancel(ctx)
channel := w.Watch(ctx, key)
// Create a new channel
@@ -490,14 +459,11 @@
return lock, session
}
-func (c *EtcdClient) AcquireLock(lockName string, timeout int) error {
- duration := GetDuration(timeout)
- ctx, cancel := context.WithTimeout(context.Background(), duration)
- defer cancel()
+func (c *EtcdClient) AcquireLock(ctx context.Context, lockName string, timeout int) error {
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
mu := v3Concurrency.NewMutex(session, "/devicelock_"+lockName)
if err := mu.Lock(context.Background()); err != nil {
- cancel()
+ //cancel()
return err
}
c.addLockName(lockName, mu, session)
diff --git a/pkg/mocks/etcd_server_test.go b/pkg/mocks/etcd_server_test.go
index c2efd58..7a861d4 100644
--- a/pkg/mocks/etcd_server_test.go
+++ b/pkg/mocks/etcd_server_test.go
@@ -17,6 +17,7 @@
package mocks
import (
+ "context"
"fmt"
"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
"github.com/phayes/freeport"
@@ -52,9 +53,9 @@
func TestEtcdServerRW(t *testing.T) {
key := "myKey-1"
value := "myVal-1"
- err := client.Put(key, value, 10)
+ err := client.Put(context.Background(), key, value)
assert.Nil(t, err)
- kvp, err := client.Get(key, 10)
+ kvp, err := client.Get(context.Background(), key)
assert.Nil(t, err)
assert.NotNil(t, kvp)
assert.Equal(t, kvp.Key, key)
@@ -66,7 +67,7 @@
func TestEtcdServerReserve(t *testing.T) {
assert.NotNil(t, client)
txId := "tnxId-1"
- val, err := client.Reserve("transactions", txId, 10)
+ val, err := client.Reserve(context.Background(), "transactions", txId, 10)
assert.Nil(t, err)
assert.NotNil(t, val)
assert.Equal(t, val, txId)
diff --git a/pkg/ponresourcemanager/ponresourcemanager.go b/pkg/ponresourcemanager/ponresourcemanager.go
index 4587675..ad2150a 100755
--- a/pkg/ponresourcemanager/ponresourcemanager.go
+++ b/pkg/ponresourcemanager/ponresourcemanager.go
@@ -17,6 +17,7 @@
package ponresourcemanager
import (
+ "context"
"encoding/base64"
"encoding/json"
"errors"
@@ -209,7 +210,7 @@
OLT model key, if available
*/
-func (PONRMgr *PONResourceManager) InitResourceRangesFromKVStore() bool {
+func (PONRMgr *PONResourceManager) InitResourceRangesFromKVStore(ctx context.Context) bool {
//Initialize PON resource ranges with config fetched from kv store.
//:return boolean: True if PON resource ranges initialized else false
// Try to initialize the PON Resource Ranges from KV store based on the
@@ -220,7 +221,7 @@
}
Path := fmt.Sprintf(PON_RESOURCE_RANGE_CONFIG_PATH, PONRMgr.OLTModel)
//get resource from kv store
- Result, err := PONRMgr.KVStore.Get(Path)
+ Result, err := PONRMgr.KVStore.Get(ctx, Path)
if err != nil {
log.Debugf("Error in fetching resource %s from KV strore", Path)
return false
@@ -327,7 +328,7 @@
return true
}
-func (PONRMgr *PONResourceManager) InitDeviceResourcePool() error {
+func (PONRMgr *PONResourceManager) InitDeviceResourcePool(ctx context.Context) error {
//Initialize resource pool for all PON ports.
@@ -339,7 +340,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if err = PONRMgr.InitResourceIDPool(Intf, ONU_ID,
+ if err = PONRMgr.InitResourceIDPool(ctx, Intf, ONU_ID,
PONRMgr.PonResourceRanges[ONU_ID_START_IDX].(uint32),
PONRMgr.PonResourceRanges[ONU_ID_END_IDX].(uint32)); err != nil {
log.Error("Failed to init ONU ID resource pool")
@@ -355,7 +356,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if err = PONRMgr.InitResourceIDPool(Intf, ALLOC_ID,
+ if err = PONRMgr.InitResourceIDPool(ctx, Intf, ALLOC_ID,
PONRMgr.PonResourceRanges[ALLOC_ID_START_IDX].(uint32),
PONRMgr.PonResourceRanges[ALLOC_ID_END_IDX].(uint32)); err != nil {
log.Error("Failed to init ALLOC ID resource pool ")
@@ -370,7 +371,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if err = PONRMgr.InitResourceIDPool(Intf, GEMPORT_ID,
+ if err = PONRMgr.InitResourceIDPool(ctx, Intf, GEMPORT_ID,
PONRMgr.PonResourceRanges[GEMPORT_ID_START_IDX].(uint32),
PONRMgr.PonResourceRanges[GEMPORT_ID_END_IDX].(uint32)); err != nil {
log.Error("Failed to init GEMPORT ID resource pool")
@@ -386,7 +387,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if err = PONRMgr.InitResourceIDPool(Intf, FLOW_ID,
+ if err = PONRMgr.InitResourceIDPool(ctx, Intf, FLOW_ID,
PONRMgr.PonResourceRanges[FLOW_ID_START_IDX].(uint32),
PONRMgr.PonResourceRanges[FLOW_ID_END_IDX].(uint32)); err != nil {
log.Error("Failed to init FLOW ID resource pool")
@@ -399,7 +400,7 @@
return err
}
-func (PONRMgr *PONResourceManager) ClearDeviceResourcePool() error {
+func (PONRMgr *PONResourceManager) ClearDeviceResourcePool(ctx context.Context) error {
//Clear resource pool for all PON ports.
@@ -410,7 +411,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if status := PONRMgr.ClearResourceIDPool(Intf, ONU_ID); status != true {
+ if status := PONRMgr.ClearResourceIDPool(ctx, Intf, ONU_ID); status != true {
log.Error("Failed to clear ONU ID resource pool")
return errors.New("Failed to clear ONU ID resource pool")
}
@@ -424,7 +425,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if status := PONRMgr.ClearResourceIDPool(Intf, ALLOC_ID); status != true {
+ if status := PONRMgr.ClearResourceIDPool(ctx, Intf, ALLOC_ID); status != true {
log.Error("Failed to clear ALLOC ID resource pool ")
return errors.New("Failed to clear ALLOC ID resource pool")
}
@@ -437,7 +438,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if status := PONRMgr.ClearResourceIDPool(Intf, GEMPORT_ID); status != true {
+ if status := PONRMgr.ClearResourceIDPool(ctx, Intf, GEMPORT_ID); status != true {
log.Error("Failed to clear GEMPORT ID resource pool")
return errors.New("Failed to clear GEMPORT ID resource pool")
}
@@ -451,7 +452,7 @@
if SharedPoolID != 0 {
Intf = SharedPoolID
}
- if status := PONRMgr.ClearResourceIDPool(Intf, FLOW_ID); status != true {
+ if status := PONRMgr.ClearResourceIDPool(ctx, Intf, FLOW_ID); status != true {
log.Error("Failed to clear FLOW ID resource pool")
return errors.New("Failed to clear FLOW ID resource pool")
}
@@ -462,7 +463,7 @@
return nil
}
-func (PONRMgr *PONResourceManager) InitResourceIDPool(Intf uint32, ResourceType string, StartID uint32, EndID uint32) error {
+func (PONRMgr *PONResourceManager) InitResourceIDPool(ctx context.Context, Intf uint32, ResourceType string, StartID uint32, EndID uint32) error {
/*Initialize Resource ID pool for a given Resource Type on a given PON Port
@@ -476,7 +477,7 @@
// delegate to the master instance if sharing enabled across instances
SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
- return SharedResourceMgr.InitResourceIDPool(Intf, ResourceType, StartID, EndID)
+ return SharedResourceMgr.InitResourceIDPool(ctx, Intf, ResourceType, StartID, EndID)
}
Path := PONRMgr.GetPath(Intf, ResourceType)
@@ -487,7 +488,7 @@
//In case of adapter reboot and reconciliation resource in kv store
//checked for its presence if not kv store update happens
- Res, err := PONRMgr.GetResource(Path)
+ Res, err := PONRMgr.GetResource(ctx, Path)
if (err == nil) && (Res != nil) {
log.Debugf("Resource %s already present in store ", Path)
return nil
@@ -498,7 +499,7 @@
return err
}
// Add resource as json in kv store.
- err = PONRMgr.KVStore.Put(Path, FormatResult)
+ err = PONRMgr.KVStore.Put(ctx, Path, FormatResult)
if err == nil {
log.Debug("Successfuly posted to kv store")
return err
@@ -542,7 +543,7 @@
}
return Value, err
}
-func (PONRMgr *PONResourceManager) GetResource(Path string) (map[string]interface{}, error) {
+func (PONRMgr *PONResourceManager) GetResource(ctx context.Context, Path string) (map[string]interface{}, error) {
/*
Get resource from kv store.
@@ -555,7 +556,7 @@
Result := make(map[string]interface{})
var Str string
- Resource, err := PONRMgr.KVStore.Get(Path)
+ Resource, err := PONRMgr.KVStore.Get(ctx, Path)
if (err != nil) || (Resource == nil) {
log.Debugf("Resource unavailable at %s", Path)
return nil, err
@@ -620,7 +621,7 @@
return Path
}
-func (PONRMgr *PONResourceManager) GetResourceID(IntfID uint32, ResourceType string, NumIDs uint32) ([]uint32, error) {
+func (PONRMgr *PONResourceManager) GetResourceID(ctx context.Context, IntfID uint32, ResourceType string, NumIDs uint32) ([]uint32, error) {
/*
Create alloc/gemport/onu/flow id for given OLT PON interface.
:param pon_intf_id: OLT PON interface id
@@ -637,7 +638,7 @@
SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
- return SharedResourceMgr.GetResourceID(IntfID, ResourceType, NumIDs)
+ return SharedResourceMgr.GetResourceID(ctx, IntfID, ResourceType, NumIDs)
}
log.Debugf("Fetching resource from %s rsrc mgr for resource %s", PONRMgr.Globalorlocal, ResourceType)
@@ -649,7 +650,7 @@
log.Debugf("Get resource for type %s on path %s", ResourceType, Path)
var Result []uint32
var NextID uint32
- Resource, err := PONRMgr.GetResource(Path)
+ Resource, err := PONRMgr.GetResource(ctx, Path)
if (err == nil) && (ResourceType == ONU_ID) || (ResourceType == FLOW_ID) {
if NextID, err = PONRMgr.GenerateNextID(Resource); err != nil {
log.Error("Failed to Generate ID")
@@ -679,7 +680,7 @@
}
//Update resource in kv store
- if PONRMgr.UpdateResource(Path, Resource) != nil {
+ if PONRMgr.UpdateResource(ctx, Path, Resource) != nil {
log.Errorf("Failed to update resource %s", Path)
return nil, errors.New(fmt.Sprintf("Failed to update resource %s", Path))
}
@@ -697,7 +698,7 @@
return false
}
-func (PONRMgr *PONResourceManager) FreeResourceID(IntfID uint32, ResourceType string, ReleaseContent []uint32) bool {
+func (PONRMgr *PONResourceManager) FreeResourceID(ctx context.Context, IntfID uint32, ResourceType string, ReleaseContent []uint32) bool {
/*
Release alloc/gemport/onu/flow id for given OLT PON interface.
:param pon_intf_id: OLT PON interface id
@@ -716,14 +717,14 @@
// delegate to the master instance if sharing enabled across instances
SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
- return SharedResourceMgr.FreeResourceID(IntfID, ResourceType, ReleaseContent)
+ return SharedResourceMgr.FreeResourceID(ctx, IntfID, ResourceType, ReleaseContent)
}
Path := PONRMgr.GetPath(IntfID, ResourceType)
if Path == "" {
log.Error("Failed to get path")
return false
}
- Resource, err := PONRMgr.GetResource(Path)
+ Resource, err := PONRMgr.GetResource(ctx, Path)
if err != nil {
log.Error("Failed to get resource")
return false
@@ -731,14 +732,14 @@
for _, Val := range ReleaseContent {
PONRMgr.ReleaseID(Resource, Val)
}
- if PONRMgr.UpdateResource(Path, Resource) != nil {
+ if PONRMgr.UpdateResource(ctx, Path, Resource) != nil {
log.Errorf("Free resource for %s failed", Path)
return false
}
return true
}
-func (PONRMgr *PONResourceManager) UpdateResource(Path string, Resource map[string]interface{}) error {
+func (PONRMgr *PONResourceManager) UpdateResource(ctx context.Context, Path string, Resource map[string]interface{}) error {
/*
Update resource in resource kv store.
:param path: path to update resource
@@ -751,7 +752,7 @@
log.Error("failed to Marshal")
return err
}
- err = PONRMgr.KVStore.Put(Path, Value)
+ err = PONRMgr.KVStore.Put(ctx, Path, Value)
if err != nil {
log.Error("failed to put data to kv store %s", Path)
return err
@@ -759,7 +760,7 @@
return nil
}
-func (PONRMgr *PONResourceManager) ClearResourceIDPool(IntfID uint32, ResourceType string) bool {
+func (PONRMgr *PONResourceManager) ClearResourceIDPool(ctx context.Context, contIntfID uint32, ResourceType string) bool {
/*
Clear Resource Pool for a given Resource Type on a given PON Port.
:return boolean: True if removed else False
@@ -768,15 +769,15 @@
// delegate to the master instance if sharing enabled across instances
SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
- return SharedResourceMgr.ClearResourceIDPool(IntfID, ResourceType)
+ return SharedResourceMgr.ClearResourceIDPool(ctx, contIntfID, ResourceType)
}
- Path := PONRMgr.GetPath(IntfID, ResourceType)
+ Path := PONRMgr.GetPath(contIntfID, ResourceType)
if Path == "" {
log.Error("Failed to get path")
return false
}
- if err := PONRMgr.KVStore.Delete(Path); err != nil {
+ if err := PONRMgr.KVStore.Delete(ctx, Path); err != nil {
log.Errorf("Failed to delete resource %s", Path)
return false
}
@@ -784,7 +785,7 @@
return true
}
-func (PONRMgr PONResourceManager) InitResourceMap(PONIntfONUID string) {
+func (PONRMgr PONResourceManager) InitResourceMap(ctx context.Context, PONIntfONUID string) {
/*
Initialize resource map
:param pon_intf_onu_id: reference of PON interface id and onu id
@@ -792,7 +793,7 @@
// initialize pon_intf_onu_id tuple to alloc_ids map
AllocIDPath := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
var AllocIDs []byte
- Result := PONRMgr.KVStore.Put(AllocIDPath, AllocIDs)
+ Result := PONRMgr.KVStore.Put(ctx, AllocIDPath, AllocIDs)
if Result != nil {
log.Error("Failed to update the KV store")
return
@@ -800,14 +801,14 @@
// initialize pon_intf_onu_id tuple to gemport_ids map
GEMPortIDPath := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
var GEMPortIDs []byte
- Result = PONRMgr.KVStore.Put(GEMPortIDPath, GEMPortIDs)
+ Result = PONRMgr.KVStore.Put(ctx, GEMPortIDPath, GEMPortIDs)
if Result != nil {
log.Error("Failed to update the KV store")
return
}
}
-func (PONRMgr PONResourceManager) RemoveResourceMap(PONIntfONUID string) bool {
+func (PONRMgr PONResourceManager) RemoveResourceMap(ctx context.Context, PONIntfONUID string) bool {
/*
Remove resource map
:param pon_intf_onu_id: reference of PON interface id and onu id
@@ -815,30 +816,30 @@
// remove pon_intf_onu_id tuple to alloc_ids map
var err error
AllocIDPath := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
- if err = PONRMgr.KVStore.Delete(AllocIDPath); err != nil {
+ if err = PONRMgr.KVStore.Delete(ctx, AllocIDPath); err != nil {
log.Errorf("Failed to remove resource %s", AllocIDPath)
return false
}
// remove pon_intf_onu_id tuple to gemport_ids map
GEMPortIDPath := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
- err = PONRMgr.KVStore.Delete(GEMPortIDPath)
+ err = PONRMgr.KVStore.Delete(ctx, GEMPortIDPath)
if err != nil {
log.Errorf("Failed to remove resource %s", GEMPortIDPath)
return false
}
FlowIDPath := fmt.Sprintf(FLOW_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
- if FlowIDs, err := PONRMgr.KVStore.List(FlowIDPath); err != nil {
+ if FlowIDs, err := PONRMgr.KVStore.List(ctx, FlowIDPath); err != nil {
for _, Flow := range FlowIDs {
FlowIDInfoPath := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, PONIntfONUID, Flow.Value)
- if err = PONRMgr.KVStore.Delete(FlowIDInfoPath); err != nil {
+ if err = PONRMgr.KVStore.Delete(ctx, FlowIDInfoPath); err != nil {
log.Errorf("Failed to remove resource %s", FlowIDInfoPath)
return false
}
}
}
- if err = PONRMgr.KVStore.Delete(FlowIDPath); err != nil {
+ if err = PONRMgr.KVStore.Delete(ctx, FlowIDPath); err != nil {
log.Errorf("Failed to remove resource %s", FlowIDPath)
return false
}
@@ -846,7 +847,7 @@
return true
}
-func (PONRMgr *PONResourceManager) GetCurrentAllocIDForOnu(IntfONUID string) []uint32 {
+func (PONRMgr *PONResourceManager) GetCurrentAllocIDForOnu(ctx context.Context, IntfONUID string) []uint32 {
/*
Get currently configured alloc ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
@@ -855,7 +856,7 @@
Path := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
var Data []uint32
- Value, err := PONRMgr.KVStore.Get(Path)
+ Value, err := PONRMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
Val, err := ToByte(Value.Value)
@@ -872,7 +873,7 @@
return Data
}
-func (PONRMgr *PONResourceManager) GetCurrentGEMPortIDsForOnu(IntfONUID string) []uint32 {
+func (PONRMgr *PONResourceManager) GetCurrentGEMPortIDsForOnu(ctx context.Context, IntfONUID string) []uint32 {
/*
Get currently configured gemport ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
@@ -882,7 +883,7 @@
Path := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
log.Debugf("Getting current gemports for %s", Path)
var Data []uint32
- Value, err := PONRMgr.KVStore.Get(Path)
+ Value, err := PONRMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
Val, _ := ToByte(Value.Value)
@@ -897,7 +898,7 @@
return Data
}
-func (PONRMgr *PONResourceManager) GetCurrentFlowIDsForOnu(IntfONUID string) []uint32 {
+func (PONRMgr *PONResourceManager) GetCurrentFlowIDsForOnu(ctx context.Context, IntfONUID string) []uint32 {
/*
Get currently configured flow ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
@@ -907,7 +908,7 @@
Path := fmt.Sprintf(FLOW_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
var Data []uint32
- Value, err := PONRMgr.KVStore.Get(Path)
+ Value, err := PONRMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
Val, _ := ToByte(Value.Value)
@@ -920,7 +921,7 @@
return Data
}
-func (PONRMgr *PONResourceManager) GetFlowIDInfo(IntfONUID string, FlowID uint32, Data interface{}) error {
+func (PONRMgr *PONResourceManager) GetFlowIDInfo(ctx context.Context, IntfONUID string, FlowID uint32, Data interface{}) error {
/*
Get flow details configured for the ONU.
:param pon_intf_onu_id: reference of PON interface id and onu id
@@ -931,7 +932,7 @@
Path := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, IntfONUID, FlowID)
- Value, err := PONRMgr.KVStore.Get(Path)
+ Value, err := PONRMgr.KVStore.Get(ctx, Path)
if err == nil {
if Value != nil {
Val, err := ToByte(Value.Value)
@@ -948,7 +949,7 @@
return err
}
-func (PONRMgr *PONResourceManager) RemoveFlowIDInfo(IntfONUID string, FlowID uint32) bool {
+func (PONRMgr *PONResourceManager) RemoveFlowIDInfo(ctx context.Context, IntfONUID string, FlowID uint32) bool {
/*
Get flow_id details configured for the ONU.
:param pon_intf_onu_id: reference of PON interface id and onu id
@@ -956,14 +957,14 @@
*/
Path := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, IntfONUID, FlowID)
- if err := PONRMgr.KVStore.Delete(Path); err != nil {
+ if err := PONRMgr.KVStore.Delete(ctx, Path); err != nil {
log.Errorf("Falied to remove resource %s", Path)
return false
}
return true
}
-func (PONRMgr *PONResourceManager) UpdateAllocIdsForOnu(IntfONUID string, AllocIDs []uint32) error {
+func (PONRMgr *PONResourceManager) UpdateAllocIdsForOnu(ctx context.Context, IntfONUID string, AllocIDs []uint32) error {
/*
Update currently configured alloc ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
@@ -978,14 +979,14 @@
return err
}
- if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
+ if err = PONRMgr.KVStore.Put(ctx, Path, Value); err != nil {
log.Errorf("Failed to update resource %s", Path)
return err
}
return err
}
-func (PONRMgr *PONResourceManager) UpdateGEMPortIDsForOnu(IntfONUID string, GEMPortIDs []uint32) error {
+func (PONRMgr *PONResourceManager) UpdateGEMPortIDsForOnu(ctx context.Context, IntfONUID string, GEMPortIDs []uint32) error {
/*
Update currently configured gemport ids for given pon_intf_onu_id
:param pon_intf_onu_id: reference of PON interface id and onu id
@@ -1002,7 +1003,7 @@
return err
}
- if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
+ if err = PONRMgr.KVStore.Put(ctx, Path, Value); err != nil {
log.Errorf("Failed to update resource %s", Path)
return err
}
@@ -1025,7 +1026,7 @@
return false, 0
}
-func (PONRMgr *PONResourceManager) UpdateFlowIDForOnu(IntfONUID string, FlowID uint32, Add bool) error {
+func (PONRMgr *PONResourceManager) UpdateFlowIDForOnu(ctx context.Context, IntfONUID string, FlowID uint32, Add bool) error {
/*
Update the flow_id list of the ONU (add or remove flow_id from the list)
:param pon_intf_onu_id: reference of PON interface id and onu id
@@ -1038,7 +1039,7 @@
var RetVal bool
var IDx uint32
Path := fmt.Sprintf(FLOW_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
- FlowIDs := PONRMgr.GetCurrentFlowIDsForOnu(IntfONUID)
+ FlowIDs := PONRMgr.GetCurrentFlowIDsForOnu(ctx, IntfONUID)
if Add {
if RetVal, IDx = checkForFlowIDInList(FlowIDs, FlowID); RetVal == true {
@@ -1058,14 +1059,14 @@
return err
}
- if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
+ if err = PONRMgr.KVStore.Put(ctx, Path, Value); err != nil {
log.Errorf("Failed to update resource %s", Path)
return err
}
return err
}
-func (PONRMgr *PONResourceManager) UpdateFlowIDInfoForOnu(IntfONUID string, FlowID uint32, FlowData interface{}) error {
+func (PONRMgr *PONResourceManager) UpdateFlowIDInfoForOnu(ctx context.Context, IntfONUID string, FlowID uint32, FlowData interface{}) error {
/*
Update any metadata associated with the flow_id. The flow_data could be json
or any of other data structure. The resource manager doesnt care
@@ -1082,7 +1083,7 @@
return err
}
- if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
+ if err = PONRMgr.KVStore.Put(ctx, Path, Value); err != nil {
log.Errorf("Failed to update resource %s", Path)
return err
}
@@ -1183,7 +1184,7 @@
}
}
-func (PONRMgr *PONResourceManager) AddOnuGemInfo(intfID uint32, onuGemData interface{}) error {
+func (PONRMgr *PONResourceManager) AddOnuGemInfo(ctx context.Context, intfID uint32, onuGemData interface{}) error {
/*
Update onugem info map,
:param pon_intf_id: reference of PON interface id
@@ -1198,14 +1199,14 @@
return err
}
- if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
+ if err = PONRMgr.KVStore.Put(ctx, Path, Value); err != nil {
log.Errorf("Failed to update resource %s", Path)
return err
}
return err
}
-func (PONRMgr *PONResourceManager) GetOnuGemInfo(IntfId uint32, onuGemInfo interface{}) error {
+func (PONRMgr *PONResourceManager) GetOnuGemInfo(ctx context.Context, IntfId uint32, onuGemInfo interface{}) error {
/*
Get onugeminfo map from kvstore
:param intfid: refremce pon intfid
@@ -1214,7 +1215,7 @@
var Val []byte
path := fmt.Sprintf(ONU_GEM_INFO_PATH, PONRMgr.DeviceID, IntfId)
- value, err := PONRMgr.KVStore.Get(path)
+ value, err := PONRMgr.KVStore.Get(ctx, path)
if err != nil {
log.Errorw("Failed to get from kv store", log.Fields{"path": path})
return err
@@ -1235,14 +1236,14 @@
return err
}
-func (PONRMgr *PONResourceManager) DelOnuGemInfoForIntf(intfId uint32) error {
+func (PONRMgr *PONResourceManager) DelOnuGemInfoForIntf(ctx context.Context, intfId uint32) error {
/*
delete onugem info for an interface from kvstore
:param intfid: refremce pon intfid
*/
path := fmt.Sprintf(ONU_GEM_INFO_PATH, PONRMgr.DeviceID, intfId)
- if err := PONRMgr.KVStore.Delete(path); err != nil {
+ if err := PONRMgr.KVStore.Delete(ctx, path); err != nil {
log.Errorf("Falied to remove resource %s", path)
return err
}
diff --git a/pkg/probe/probe_test.go b/pkg/probe/probe_test.go
index 37e8013..9edc561 100644
--- a/pkg/probe/probe_test.go
+++ b/pkg/probe/probe_test.go
@@ -18,12 +18,14 @@
import (
"context"
"encoding/json"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- "github.com/stretchr/testify/assert"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
+ "time"
+
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/stretchr/testify/assert"
)
func init() {
@@ -343,7 +345,8 @@
}
func TestGetProbeFromContextMssing(t *testing.T) {
- ctx := context.Background()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
pc := GetProbeFromContext(ctx)
assert.Nil(t, pc, "Context had a non-nil probe when it should have been nil")
}
@@ -375,7 +378,8 @@
func TestUpdateStatusFromContextWithoutProbe(t *testing.T) {
p := &Probe{}
p.RegisterService("one")
- ctx := context.Background()
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
UpdateStatusFromContext(ctx, "one", ServiceStatusRunning)
assert.Equal(t, 1, len(p.status), "wrong number of services")
diff --git a/pkg/techprofile/tech_profile.go b/pkg/techprofile/tech_profile.go
index 0358291..b268a4c 100644
--- a/pkg/techprofile/tech_profile.go
+++ b/pkg/techprofile/tech_profile.go
@@ -17,6 +17,7 @@
package techprofile
import (
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -32,7 +33,7 @@
// Interface to pon resource manager APIs
type iPonResourceMgr interface {
- GetResourceID(IntfID uint32, ResourceType string, NumIDs uint32) ([]uint32, error)
+ GetResourceID(ctx context.Context, IntfID uint32, ResourceType string, NumIDs uint32) ([]uint32, error)
GetResourceTypeAllocID() string
GetResourceTypeGemPortID() string
GetTechnology() string
@@ -298,13 +299,13 @@
return fmt.Sprintf(t.config.TPInstanceKVPath, t.resourceMgr.GetTechnology(), techProfiletblID, uniPortName)
}
-func (t *TechProfileMgr) GetTPInstanceFromKVStore(techProfiletblID uint32, path string) (*TechProfile, error) {
+func (t *TechProfileMgr) GetTPInstanceFromKVStore(ctx context.Context, techProfiletblID uint32, path string) (*TechProfile, error) {
var KvTpIns TechProfile
var resPtr *TechProfile = &KvTpIns
var err error
var kvResult *kvstore.KVPair
- kvResult, _ = t.config.KVBackend.Get(path)
+ kvResult, _ = t.config.KVBackend.Get(ctx, path)
if kvResult == nil {
log.Infow("tp-instance-not-found-on-kv", log.Fields{"key": path})
return nil, nil
@@ -321,24 +322,24 @@
return nil, err
}
-func (t *TechProfileMgr) addTechProfInstanceToKVStore(techProfiletblID uint32, uniPortName string, tpInstance *TechProfile) error {
+func (t *TechProfileMgr) addTechProfInstanceToKVStore(ctx context.Context, techProfiletblID uint32, uniPortName string, tpInstance *TechProfile) error {
path := t.GetTechProfileInstanceKVPath(techProfiletblID, uniPortName)
log.Debugw("Adding techprof instance to kvstore", log.Fields{"key": path, "tpinstance": tpInstance})
tpInstanceJson, err := json.Marshal(*tpInstance)
if err == nil {
// Backend will convert JSON byte array into string format
log.Debugw("Storing tech profile instance to KV Store", log.Fields{"key": path, "val": tpInstanceJson})
- err = t.config.KVBackend.Put(path, tpInstanceJson)
+ err = t.config.KVBackend.Put(ctx, path, tpInstanceJson)
} else {
log.Errorw("Error in marshaling into Json format", log.Fields{"key": path, "tpinstance": tpInstance})
}
return err
}
-func (t *TechProfileMgr) getTPFromKVStore(techProfiletblID uint32) *DefaultTechProfile {
+func (t *TechProfileMgr) getTPFromKVStore(ctx context.Context, techProfiletblID uint32) *DefaultTechProfile {
var kvtechprofile DefaultTechProfile
key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), techProfiletblID)
log.Debugw("Getting techprofile from KV store", log.Fields{"techProfiletblID": techProfiletblID, "Key": key})
- kvresult, err := t.config.KVBackend.Get(key)
+ kvresult, err := t.config.KVBackend.Get(ctx, key)
if err != nil {
log.Errorw("Error while fetching value from KV store", log.Fields{"key": key})
return nil
@@ -358,7 +359,7 @@
return nil
}
-func (t *TechProfileMgr) CreateTechProfInstance(techProfiletblID uint32, uniPortName string, intfId uint32) (*TechProfile, error) {
+func (t *TechProfileMgr) CreateTechProfInstance(ctx context.Context, techProfiletblID uint32, uniPortName string, intfId uint32) (*TechProfile, error) {
var tpInstance *TechProfile
log.Infow("creating-tp-instance", log.Fields{"tableid": techProfiletblID, "uni": uniPortName, "intId": intfId})
@@ -368,7 +369,7 @@
return nil, errors.New("uni-port-name-not-confirming-to-format")
}
- tp := t.getTPFromKVStore(techProfiletblID)
+ tp := t.getTPFromKVStore(ctx, techProfiletblID)
if tp != nil {
if err := t.validateInstanceControlAttr(tp.InstanceCtrl); err != nil {
log.Error("invalid-instance-ctrl-attr--using-default-tp")
@@ -381,11 +382,11 @@
tp = t.getDefaultTechProfile()
}
tpInstancePath := t.GetTechProfileInstanceKVPath(techProfiletblID, uniPortName)
- if tpInstance = t.allocateTPInstance(uniPortName, tp, intfId, tpInstancePath); tpInstance == nil {
+ if tpInstance = t.allocateTPInstance(ctx, uniPortName, tp, intfId, tpInstancePath); tpInstance == nil {
log.Error("tp-intance-allocation-failed")
return nil, errors.New("tp-intance-allocation-failed")
}
- if err := t.addTechProfInstanceToKVStore(techProfiletblID, uniPortName, tpInstance); err != nil {
+ if err := t.addTechProfInstanceToKVStore(ctx, techProfiletblID, uniPortName, tpInstance); err != nil {
log.Errorw("error-adding-tp-to-kv-store", log.Fields{"tableid": techProfiletblID, "uni": uniPortName})
return nil, errors.New("error-adding-tp-to-kv-store")
}
@@ -394,9 +395,9 @@
return tpInstance, nil
}
-func (t *TechProfileMgr) DeleteTechProfileInstance(techProfiletblID uint32, uniPortName string) error {
+func (t *TechProfileMgr) DeleteTechProfileInstance(ctx context.Context, techProfiletblID uint32, uniPortName string) error {
path := t.GetTechProfileInstanceKVPath(techProfiletblID, uniPortName)
- return t.config.KVBackend.Delete(path)
+ return t.config.KVBackend.Delete(ctx, path)
}
func (t *TechProfileMgr) validateInstanceControlAttr(instCtl InstanceControl) error {
@@ -418,7 +419,7 @@
return nil
}
-func (t *TechProfileMgr) allocateTPInstance(uniPortName string, tp *DefaultTechProfile, intfId uint32, tpInstPath string) *TechProfile {
+func (t *TechProfileMgr) allocateTPInstance(ctx context.Context, uniPortName string, tp *DefaultTechProfile, intfId uint32, tpInstPath string) *TechProfile {
var usGemPortAttributeList []iGemPortAttribute
var dsGemPortAttributeList []iGemPortAttribute
@@ -431,16 +432,16 @@
log.Infow("Allocating TechProfileMgr instance from techprofile template", log.Fields{"uniPortName": uniPortName, "intfId": intfId, "numGem": tp.NumGemPorts})
if tp.InstanceCtrl.Onu == "multi-instance" {
- if tcontIDs, err = t.resourceMgr.GetResourceID(intfId, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
+ if tcontIDs, err = t.resourceMgr.GetResourceID(ctx, intfId, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
log.Errorw("Error getting alloc id from rsrcrMgr", log.Fields{"intfId": intfId})
return nil
}
} else { // "single-instance"
- tpInst, err := t.getSingleInstanceTp(tpInstPath)
+ tpInst, err := t.getSingleInstanceTp(ctx, tpInstPath)
if tpInst == nil {
// No "single-instance" tp found on one any uni port for the given TP ID
// Allocate a new TcontID or AllocID
- if tcontIDs, err = t.resourceMgr.GetResourceID(intfId, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
+ if tcontIDs, err = t.resourceMgr.GetResourceID(ctx, intfId, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
log.Errorw("Error getting alloc id from rsrcrMgr", log.Fields{"intfId": intfId})
return nil
}
@@ -450,7 +451,7 @@
}
}
log.Debugw("Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
- if gemPorts, err = t.resourceMgr.GetResourceID(intfId, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts); err != nil {
+ if gemPorts, err = t.resourceMgr.GetResourceID(ctx, intfId, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts); err != nil {
log.Errorw("Error getting gemport ids from rsrcrMgr", log.Fields{"intfId": intfId, "numGemports": tp.NumGemPorts})
return nil
}
@@ -544,14 +545,14 @@
// getSingleInstanceTp returns another TpInstance for an ONU on a different
// uni port for the same TP ID, if it finds one, else nil.
-func (t *TechProfileMgr) getSingleInstanceTp(tpPath string) (*TechProfile, error) {
+func (t *TechProfileMgr) getSingleInstanceTp(ctx context.Context, tpPath string) (*TechProfile, error) {
var tpInst TechProfile
// For example:
// tpPath like "service/voltha/technology_profiles/xgspon/64/pon-{0}/onu-{1}/uni-{1}"
// is broken into ["service/voltha/technology_profiles/xgspon/64/pon-{0}/onu-{1}" ""]
uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPath, 2)
- kvPairs, _ := t.config.KVBackend.List(uniPathSlice[0])
+ kvPairs, _ := t.config.KVBackend.List(ctx, uniPathSlice[0])
// Find a valid TP Instance among all the UNIs of that ONU for the given TP ID
for keyPath, kvPair := range kvPairs {
@@ -899,11 +900,11 @@
}
// FindAllTpInstances returns all TechProfile instances for a given TechProfile table-id, pon interface ID and onu ID.
-func (t *TechProfileMgr) FindAllTpInstances(techProfiletblID uint32, ponIntf uint32, onuID uint32) []TechProfile {
+func (t *TechProfileMgr) FindAllTpInstances(ctx context.Context, techProfiletblID uint32, ponIntf uint32, onuID uint32) []TechProfile {
var tp TechProfile
onuTpInstancePath := fmt.Sprintf("%s/%d/pon-{%d}/onu-{%d}", t.resourceMgr.GetTechnology(), techProfiletblID, ponIntf, onuID)
- if kvPairs, _ := t.config.KVBackend.List(onuTpInstancePath); kvPairs != nil {
+ if kvPairs, _ := t.config.KVBackend.List(ctx, onuTpInstancePath); kvPairs != nil {
tpInstances := make([]TechProfile, 0, len(kvPairs))
for kvPath, kvPair := range kvPairs {
if value, err := kvstore.ToByte(kvPair.Value); err == nil {
diff --git a/pkg/techprofile/tech_profile_if.go b/pkg/techprofile/tech_profile_if.go
index 9184b5b..e605d49 100644
--- a/pkg/techprofile/tech_profile_if.go
+++ b/pkg/techprofile/tech_profile_if.go
@@ -17,6 +17,8 @@
package techprofile
import (
+ "context"
+
"github.com/opencord/voltha-lib-go/v3/pkg/db"
tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
)
@@ -24,9 +26,9 @@
type TechProfileIf interface {
SetKVClient() *db.Backend
GetTechProfileInstanceKVPath(techProfiletblID uint32, uniPortName string) string
- GetTPInstanceFromKVStore(techProfiletblID uint32, path string) (*TechProfile, error)
- CreateTechProfInstance(techProfiletblID uint32, uniPortName string, intfId uint32) (*TechProfile, error)
- DeleteTechProfileInstance(techProfiletblID uint32, uniPortName string) error
+ GetTPInstanceFromKVStore(ctx context.Context, techProfiletblID uint32, path string) (*TechProfile, error)
+ CreateTechProfInstance(ctx context.Context, techProfiletblID uint32, uniPortName string, intfId uint32) (*TechProfile, error)
+ DeleteTechProfileInstance(ctx context.Context, techProfiletblID uint32, uniPortName string) error
GetprotoBufParamValue(paramType string, paramKey string) int32
GetUsScheduler(tpInstance *TechProfile) (*tp_pb.SchedulerConfig, error)
GetDsScheduler(tpInstance *TechProfile) (*tp_pb.SchedulerConfig, error)
@@ -35,5 +37,5 @@
GetTrafficQueues(tp *TechProfile, Dir tp_pb.Direction) ([]*tp_pb.TrafficQueue, error)
GetMulticastTrafficQueues(tp *TechProfile) []*tp_pb.TrafficQueue
GetGemportIDForPbit(tp *TechProfile, Dir tp_pb.Direction, pbit uint32) uint32
- FindAllTpInstances(techProfiletblID uint32, ponIntf uint32, onuID uint32) []TechProfile
+ FindAllTpInstances(ctx context.Context, techProfiletblID uint32, ponIntf uint32, onuID uint32) []TechProfile
}