VOL-4154: Changes to techprofile module for etcd storage improvements.
- using protobuf definitions of techprofile template and instance
- store smaller footprint resource instance on the kv store
- store techprofile instance in cache
- reconcile techprofile instance from resource instance on adapter restart
- retry etcd get/put/delete on failure
- remove dealing of onu-gem-info data from PONResourceManager module
as adapter has to deal with this.
Change-Id: I741181e3f0dc5c4a419ffbed577eb4d21b73c4d6
diff --git a/pkg/db/backend.go b/pkg/db/backend.go
index bf30a48..ff0b5b7 100644
--- a/pkg/db/backend.go
+++ b/pkg/db/backend.go
@@ -23,8 +23,8 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
diff --git a/pkg/db/backend_test.go b/pkg/db/backend_test.go
index a5659e4..7f1d878 100644
--- a/pkg/db/backend_test.go
+++ b/pkg/db/backend_test.go
@@ -23,7 +23,7 @@
"testing"
"time"
- mocks "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
+ mocks "github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
diff --git a/pkg/db/common.go b/pkg/db/common.go
index 25cddf5..4bc92b1 100644
--- a/pkg/db/common.go
+++ b/pkg/db/common.go
@@ -16,7 +16,7 @@
package db
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/db/kvstore/common.go b/pkg/db/kvstore/common.go
index 99c603d..b8509db 100644
--- a/pkg/db/kvstore/common.go
+++ b/pkg/db/kvstore/common.go
@@ -16,7 +16,7 @@
package kvstore
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index 98f0559..c2a38c6 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -22,7 +22,7 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
v3Client "go.etcd.io/etcd/clientv3"
v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
@@ -99,17 +99,43 @@
// wait for a response
func (c *EtcdClient) Get(ctx context.Context, key string) (*KVPair, error) {
- resp, err := c.ectdAPI.Get(ctx, key)
+ attempt := 0
+startLoop:
+ for {
+ resp, err := c.ectdAPI.Get(ctx, key)
+ if err != nil {
+ switch err {
+ case context.Canceled:
+ logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+ case context.DeadlineExceeded:
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ case v3rpcTypes.ErrEmptyKey:
+ logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ case v3rpcTypes.ErrLeaderChanged,
+ v3rpcTypes.ErrGRPCNoLeader,
+ v3rpcTypes.ErrTimeout,
+ v3rpcTypes.ErrTimeoutDueToLeaderFail,
+ v3rpcTypes.ErrTimeoutDueToConnectionLost:
+ // Retry for these server errors
+ attempt += 1
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "get-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return nil, err
+ }
+ logger.Warnw(ctx, "retrying-get", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
+ default:
+ logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ }
+ return nil, err
+ }
- if err != nil {
- logger.Error(ctx, err)
- return nil, err
+ for _, ev := range resp.Kvs {
+ // Only one value is returned
+ return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version), nil
+ }
+ return nil, nil
}
- for _, ev := range resp.Kvs {
- // Only one value is returned
- return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version), nil
- }
- return nil, nil
}
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
@@ -124,45 +150,87 @@
return fmt.Errorf("unexpected-type-%T", value)
}
- var err error
// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
// that KV key permanent instead of automatically removing it after a lease expiration
c.keyReservationsLock.RLock()
leaseID, ok := c.keyReservations[key]
c.keyReservationsLock.RUnlock()
- if ok {
- _, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
- } else {
- _, err = c.ectdAPI.Put(ctx, key, val)
- }
-
- if err != nil {
- switch err {
- case context.Canceled:
- logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
- case context.DeadlineExceeded:
- logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err})
- case v3rpcTypes.ErrEmptyKey:
- logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
- default:
- logger.Warnw(ctx, "bad-endpoints", log.Fields{"error": err})
+ attempt := 0
+startLoop:
+ for {
+ var err error
+ if ok {
+ _, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
+ } else {
+ _, err = c.ectdAPI.Put(ctx, key, val)
}
- return err
+ if err != nil {
+ switch err {
+ case context.Canceled:
+ logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+ case context.DeadlineExceeded:
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ case v3rpcTypes.ErrEmptyKey:
+ logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ case v3rpcTypes.ErrLeaderChanged,
+ v3rpcTypes.ErrGRPCNoLeader,
+ v3rpcTypes.ErrTimeout,
+ v3rpcTypes.ErrTimeoutDueToLeaderFail,
+ v3rpcTypes.ErrTimeoutDueToConnectionLost:
+ // Retry for these server errors
+ attempt += 1
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "put-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return err
+ }
+ logger.Warnw(ctx, "retrying-put", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
+ default:
+ logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ }
+ return err
+ }
+ return nil
}
- return nil
}
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
func (c *EtcdClient) Delete(ctx context.Context, key string) error {
- // delete the key
- if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
- logger.Errorw(ctx, "failed-to-delete-key", log.Fields{"key": key, "error": err})
- return err
+ attempt := 0
+startLoop:
+ for {
+ _, err := c.ectdAPI.Delete(ctx, key)
+ if err != nil {
+ switch err {
+ case context.Canceled:
+ logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+ case context.DeadlineExceeded:
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ case v3rpcTypes.ErrEmptyKey:
+ logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ case v3rpcTypes.ErrLeaderChanged,
+ v3rpcTypes.ErrGRPCNoLeader,
+ v3rpcTypes.ErrTimeout,
+ v3rpcTypes.ErrTimeoutDueToLeaderFail,
+ v3rpcTypes.ErrTimeoutDueToConnectionLost:
+ // Retry for these server errors
+ attempt += 1
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "delete-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return err
+ }
+ logger.Warnw(ctx, "retrying-delete", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
+ default:
+ logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ }
+ return err
+ }
+ logger.Debugw(ctx, "key(s)-deleted", log.Fields{"key": key})
+ return nil
}
- logger.Debugw(ctx, "key(s)-deleted", log.Fields{"key": key})
- return nil
}
func (c *EtcdClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {
diff --git a/pkg/db/kvstore/kvutils.go b/pkg/db/kvstore/kvutils.go
index 70bd977..946dbf2 100644
--- a/pkg/db/kvstore/kvutils.go
+++ b/pkg/db/kvstore/kvutils.go
@@ -17,7 +17,18 @@
import (
"bytes"
+ "context"
"fmt"
+ "math"
+ "math/rand"
+ "time"
+)
+
+const (
+ minRetryInterval = 100
+ maxRetryInterval = 5000
+ incrementalFactor = 1.2
+ jitter = 0.2
)
// ToString converts an interface value to a string. The interface should either be of
@@ -56,3 +67,24 @@
}
return val1 == val2
}
+
+// backoff waits an amount of time that is proportional to the attempt value. The wait time in a range of
+// minRetryInterval and maxRetryInterval.
+func backoff(ctx context.Context, attempt int) error {
+ if attempt == 0 {
+ return nil
+ }
+ backoff := int(minRetryInterval + incrementalFactor*math.Exp(float64(attempt)))
+ backoff *= 1 + int(jitter*(rand.Float64()*2-1))
+ if backoff > maxRetryInterval {
+ backoff = maxRetryInterval
+ }
+ ticker := time.NewTicker(time.Duration(backoff) * time.Millisecond)
+ defer ticker.Stop()
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ }
+ return nil
+}
diff --git a/pkg/db/kvstore/kvutils_test.go b/pkg/db/kvstore/kvutils_test.go
index 98c96c9..5c2ef8c 100644
--- a/pkg/db/kvstore/kvutils_test.go
+++ b/pkg/db/kvstore/kvutils_test.go
@@ -16,8 +16,10 @@
package kvstore
import (
+ "context"
"github.com/stretchr/testify/assert"
"testing"
+ "time"
)
func TestToStringWithString(t *testing.T) {
@@ -58,3 +60,32 @@
assert.Equal(t, expectedResult, actualResult)
assert.NotEqual(t, error, nil)
}
+
+func TestBackoffNoWait(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
+ defer cancel()
+ err := backoff(ctx, 0)
+ assert.Nil(t, err)
+}
+
+func TestBackoffSuccess(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
+ defer cancel()
+ previous := time.Duration(0)
+ for i := 1; i < 5; i++ {
+ start := time.Now()
+ err := backoff(ctx, i)
+ assert.Nil(t, err)
+ current := time.Since(start)
+ assert.Greater(t, current.Milliseconds(), previous.Milliseconds())
+ previous = current
+ }
+}
+
+func TestBackoffContextTimeout(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
+ defer cancel()
+ err := backoff(ctx, 10)
+ assert.NotNil(t, err)
+ assert.Equal(t, context.DeadlineExceeded, err)
+}