[VOL-2163] Supporting Async request

Introduces InvokeAsyncRPC to support aynchronous requests

Change-Id: Ica947a30140605d46518aa6c73f6661c0645ce92
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index 1014ada..d38f0f6 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -29,13 +29,13 @@
 
 // EtcdClient represents the Etcd KV store client
 type EtcdClient struct {
-	ectdAPI          *v3Client.Client
-	keyReservations  map[string]*v3Client.LeaseID
-	watchedChannels  sync.Map
-	writeLock        sync.Mutex
-	lockToMutexMap   map[string]*v3Concurrency.Mutex
-	lockToSessionMap map[string]*v3Concurrency.Session
-	lockToMutexLock  sync.Mutex
+	ectdAPI             *v3Client.Client
+	keyReservations     map[string]*v3Client.LeaseID
+	watchedChannels     sync.Map
+	keyReservationsLock sync.RWMutex
+	lockToMutexMap      map[string]*v3Concurrency.Mutex
+	lockToSessionMap    map[string]*v3Concurrency.Session
+	lockToMutexLock     sync.Mutex
 }
 
 // NewEtcdClient returns a new client for the Etcd KV store
@@ -114,13 +114,13 @@
 		return fmt.Errorf("unexpected-type-%T", value)
 	}
 
-	c.writeLock.Lock()
-	defer c.writeLock.Unlock()
-
 	var err error
 	// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
 	// that KV key permanent instead of automatically removing it after a lease expiration
-	if leaseID, ok := c.keyReservations[key]; ok {
+	c.keyReservationsLock.RLock()
+	leaseID, ok := c.keyReservations[key]
+	c.keyReservationsLock.RUnlock()
+	if ok {
 		_, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
 	} else {
 		_, err = c.ectdAPI.Put(ctx, key, val)
@@ -146,9 +146,6 @@
 // wait for a response
 func (c *EtcdClient) Delete(ctx context.Context, key string) error {
 
-	c.writeLock.Lock()
-	defer c.writeLock.Unlock()
-
 	// delete the key
 	if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
 		logger.Errorw("failed-to-delete-key", log.Fields{"key": key, "error": err})
@@ -177,9 +174,9 @@
 		return nil, err
 	}
 	// Register the lease id
-	c.writeLock.Lock()
+	c.keyReservationsLock.Lock()
 	c.keyReservations[key] = &resp.ID
-	c.writeLock.Unlock()
+	c.keyReservationsLock.Unlock()
 
 	// Revoke lease if reservation is not successful
 	reservationSuccessful := false
@@ -235,8 +232,8 @@
 
 // ReleaseAllReservations releases all key reservations previously made (using Reserve API)
 func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
-	c.writeLock.Lock()
-	defer c.writeLock.Unlock()
+	c.keyReservationsLock.Lock()
+	defer c.keyReservationsLock.Unlock()
 
 	for key, leaseID := range c.keyReservations {
 		_, err := c.ectdAPI.Revoke(ctx, *leaseID)
@@ -255,8 +252,8 @@
 	logger.Debugw("Release-reservation", log.Fields{"key": key})
 	var ok bool
 	var leaseID *v3Client.LeaseID
-	c.writeLock.Lock()
-	defer c.writeLock.Unlock()
+	c.keyReservationsLock.Lock()
+	defer c.keyReservationsLock.Unlock()
 	if leaseID, ok = c.keyReservations[key]; !ok {
 		return nil
 	}
@@ -278,9 +275,11 @@
 	// Get the leaseid using the key
 	var ok bool
 	var leaseID *v3Client.LeaseID
-	c.writeLock.Lock()
-	defer c.writeLock.Unlock()
-	if leaseID, ok = c.keyReservations[key]; !ok {
+	c.keyReservationsLock.RLock()
+	leaseID, ok = c.keyReservations[key]
+	c.keyReservationsLock.RUnlock()
+
+	if !ok {
 		return errors.New("key-not-reserved")
 	}
 
@@ -372,8 +371,6 @@
 	// Get the array of channels mapping
 	var watchedChannels []map[chan *Event]v3Client.Watcher
 	var ok bool
-	c.writeLock.Lock()
-	defer c.writeLock.Unlock()
 
 	if watchedChannels, ok = c.getChannelMaps(key); !ok {
 		logger.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
@@ -425,8 +422,6 @@
 
 // Close closes the KV store client
 func (c *EtcdClient) Close() {
-	c.writeLock.Lock()
-	defer c.writeLock.Unlock()
 	if err := c.ectdAPI.Close(); err != nil {
 		logger.Errorw("error-closing-client", log.Fields{"error": err})
 	}