VOL-4154: Changes to techprofile module for etcd storage improvements.
- using protobuf definitions of techprofile template and instance
- store smaller footprint resource instance on the kv store
- store techprofile instance in cache
- reconcile techprofile instance from resource instance on adapter restart
- retry etcd get/put/delete on failure
- remove dealing of onu-gem-info data from PONResourceManager module
as adapter has to deal with this.
Change-Id: I741181e3f0dc5c4a419ffbed577eb4d21b73c4d6
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index 98f0559..c2a38c6 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -22,7 +22,7 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
v3Client "go.etcd.io/etcd/clientv3"
v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
@@ -99,17 +99,43 @@
// wait for a response
func (c *EtcdClient) Get(ctx context.Context, key string) (*KVPair, error) {
- resp, err := c.ectdAPI.Get(ctx, key)
+ attempt := 0
+startLoop:
+ for {
+ resp, err := c.ectdAPI.Get(ctx, key)
+ if err != nil {
+ switch err {
+ case context.Canceled:
+ logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+ case context.DeadlineExceeded:
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ case v3rpcTypes.ErrEmptyKey:
+ logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ case v3rpcTypes.ErrLeaderChanged,
+ v3rpcTypes.ErrGRPCNoLeader,
+ v3rpcTypes.ErrTimeout,
+ v3rpcTypes.ErrTimeoutDueToLeaderFail,
+ v3rpcTypes.ErrTimeoutDueToConnectionLost:
+ // Retry for these server errors
+ attempt += 1
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "get-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return nil, err
+ }
+ logger.Warnw(ctx, "retrying-get", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
+ default:
+ logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ }
+ return nil, err
+ }
- if err != nil {
- logger.Error(ctx, err)
- return nil, err
+ for _, ev := range resp.Kvs {
+ // Only one value is returned
+ return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version), nil
+ }
+ return nil, nil
}
- for _, ev := range resp.Kvs {
- // Only one value is returned
- return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version), nil
- }
- return nil, nil
}
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
@@ -124,45 +150,87 @@
return fmt.Errorf("unexpected-type-%T", value)
}
- var err error
// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
// that KV key permanent instead of automatically removing it after a lease expiration
c.keyReservationsLock.RLock()
leaseID, ok := c.keyReservations[key]
c.keyReservationsLock.RUnlock()
- if ok {
- _, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
- } else {
- _, err = c.ectdAPI.Put(ctx, key, val)
- }
-
- if err != nil {
- switch err {
- case context.Canceled:
- logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
- case context.DeadlineExceeded:
- logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err})
- case v3rpcTypes.ErrEmptyKey:
- logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
- default:
- logger.Warnw(ctx, "bad-endpoints", log.Fields{"error": err})
+ attempt := 0
+startLoop:
+ for {
+ var err error
+ if ok {
+ _, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
+ } else {
+ _, err = c.ectdAPI.Put(ctx, key, val)
}
- return err
+ if err != nil {
+ switch err {
+ case context.Canceled:
+ logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+ case context.DeadlineExceeded:
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ case v3rpcTypes.ErrEmptyKey:
+ logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ case v3rpcTypes.ErrLeaderChanged,
+ v3rpcTypes.ErrGRPCNoLeader,
+ v3rpcTypes.ErrTimeout,
+ v3rpcTypes.ErrTimeoutDueToLeaderFail,
+ v3rpcTypes.ErrTimeoutDueToConnectionLost:
+ // Retry for these server errors
+ attempt += 1
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "put-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return err
+ }
+ logger.Warnw(ctx, "retrying-put", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
+ default:
+ logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ }
+ return err
+ }
+ return nil
}
- return nil
}
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
func (c *EtcdClient) Delete(ctx context.Context, key string) error {
- // delete the key
- if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
- logger.Errorw(ctx, "failed-to-delete-key", log.Fields{"key": key, "error": err})
- return err
+ attempt := 0
+startLoop:
+ for {
+ _, err := c.ectdAPI.Delete(ctx, key)
+ if err != nil {
+ switch err {
+ case context.Canceled:
+ logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+ case context.DeadlineExceeded:
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ case v3rpcTypes.ErrEmptyKey:
+ logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ case v3rpcTypes.ErrLeaderChanged,
+ v3rpcTypes.ErrGRPCNoLeader,
+ v3rpcTypes.ErrTimeout,
+ v3rpcTypes.ErrTimeoutDueToLeaderFail,
+ v3rpcTypes.ErrTimeoutDueToConnectionLost:
+ // Retry for these server errors
+ attempt += 1
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "delete-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return err
+ }
+ logger.Warnw(ctx, "retrying-delete", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
+ default:
+ logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ }
+ return err
+ }
+ logger.Debugw(ctx, "key(s)-deleted", log.Fields{"key": key})
+ return nil
}
- logger.Debugw(ctx, "key(s)-deleted", log.Fields{"key": key})
- return nil
}
func (c *EtcdClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {