[VOL-2163] Supporting Async request
Introduces InvokeAsyncRPC to support aynchronous requests
Change-Id: Ica947a30140605d46518aa6c73f6661c0645ce92
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index 1014ada..d38f0f6 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -29,13 +29,13 @@
// EtcdClient represents the Etcd KV store client
type EtcdClient struct {
- ectdAPI *v3Client.Client
- keyReservations map[string]*v3Client.LeaseID
- watchedChannels sync.Map
- writeLock sync.Mutex
- lockToMutexMap map[string]*v3Concurrency.Mutex
- lockToSessionMap map[string]*v3Concurrency.Session
- lockToMutexLock sync.Mutex
+ ectdAPI *v3Client.Client
+ keyReservations map[string]*v3Client.LeaseID
+ watchedChannels sync.Map
+ keyReservationsLock sync.RWMutex
+ lockToMutexMap map[string]*v3Concurrency.Mutex
+ lockToSessionMap map[string]*v3Concurrency.Session
+ lockToMutexLock sync.Mutex
}
// NewEtcdClient returns a new client for the Etcd KV store
@@ -114,13 +114,13 @@
return fmt.Errorf("unexpected-type-%T", value)
}
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
-
var err error
// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
// that KV key permanent instead of automatically removing it after a lease expiration
- if leaseID, ok := c.keyReservations[key]; ok {
+ c.keyReservationsLock.RLock()
+ leaseID, ok := c.keyReservations[key]
+ c.keyReservationsLock.RUnlock()
+ if ok {
_, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
} else {
_, err = c.ectdAPI.Put(ctx, key, val)
@@ -146,9 +146,6 @@
// wait for a response
func (c *EtcdClient) Delete(ctx context.Context, key string) error {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
-
// delete the key
if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
logger.Errorw("failed-to-delete-key", log.Fields{"key": key, "error": err})
@@ -177,9 +174,9 @@
return nil, err
}
// Register the lease id
- c.writeLock.Lock()
+ c.keyReservationsLock.Lock()
c.keyReservations[key] = &resp.ID
- c.writeLock.Unlock()
+ c.keyReservationsLock.Unlock()
// Revoke lease if reservation is not successful
reservationSuccessful := false
@@ -235,8 +232,8 @@
// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
+ c.keyReservationsLock.Lock()
+ defer c.keyReservationsLock.Unlock()
for key, leaseID := range c.keyReservations {
_, err := c.ectdAPI.Revoke(ctx, *leaseID)
@@ -255,8 +252,8 @@
logger.Debugw("Release-reservation", log.Fields{"key": key})
var ok bool
var leaseID *v3Client.LeaseID
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
+ c.keyReservationsLock.Lock()
+ defer c.keyReservationsLock.Unlock()
if leaseID, ok = c.keyReservations[key]; !ok {
return nil
}
@@ -278,9 +275,11 @@
// Get the leaseid using the key
var ok bool
var leaseID *v3Client.LeaseID
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
- if leaseID, ok = c.keyReservations[key]; !ok {
+ c.keyReservationsLock.RLock()
+ leaseID, ok = c.keyReservations[key]
+ c.keyReservationsLock.RUnlock()
+
+ if !ok {
return errors.New("key-not-reserved")
}
@@ -372,8 +371,6 @@
// Get the array of channels mapping
var watchedChannels []map[chan *Event]v3Client.Watcher
var ok bool
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
if watchedChannels, ok = c.getChannelMaps(key); !ok {
logger.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
@@ -425,8 +422,6 @@
// Close closes the KV store client
func (c *EtcdClient) Close() {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
if err := c.ectdAPI.Close(); err != nil {
logger.Errorw("error-closing-client", log.Fields{"error": err})
}