[VOL-1505]  This update enables the core to add a key when
publishing an event onto kafka.   The corresponding update is
done in the adapter GO components.   Similar changes remain to
be done in pyvoltha.

Change-Id: I0bb1e3cb8c2fa9e0214f96d863819755d34a0bb9
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index a8e6311..34ab711 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -84,6 +84,8 @@
 	ReleaseAllReservations() error
 	RenewReservation(key string) error
 	Watch(key string) chan *Event
+	AcquireLock(lockName string, timeout int) error
+	ReleaseLock(lockName string) error
 	CloseWatch(key string, ch chan *Event)
 	Close()
 }
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index a5c71ac..738ca92 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -497,3 +497,12 @@
 		log.Errorw("error-closing-client", log.Fields{"error": err})
 	}
 }
+
+
+func (c *ConsulClient)  AcquireLock(lockName string, timeout int) error {
+	return nil
+}
+
+func (c *ConsulClient)  ReleaseLock(lockName string) error {
+	return nil
+}
\ No newline at end of file
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 9ecddca..2caa990 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -33,6 +33,10 @@
 	keyReservations map[string]*v3Client.LeaseID
 	watchedChannels map[string][]map[chan *Event]v3Client.Watcher
 	writeLock       sync.Mutex
+	lockToMutexMap map[string]*v3Concurrency.Mutex
+	lockToSessionMap map[string]*v3Concurrency.Session
+	lockToMutexLock sync.Mutex
+
 }
 
 // NewEtcdClient returns a new client for the Etcd KV store
@@ -49,8 +53,10 @@
 	}
 	wc := make(map[string][]map[chan *Event]v3Client.Watcher)
 	reservations := make(map[string]*v3Client.LeaseID)
+	lockMutexMap := make(map[string]*v3Concurrency.Mutex)
+	lockSessionMap := make(map[string]*v3Concurrency.Session)
 
-	return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations: reservations}, nil
+	return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations:reservations, lockToMutexMap:lockMutexMap, lockToSessionMap:lockSessionMap}, nil
 }
 
 // List returns an array of key-value pairs with key as a prefix.  Timeout defines how long the function will
@@ -431,3 +437,63 @@
 		log.Errorw("error-closing-client", log.Fields{"error": err})
 	}
 }
+
+func (c *EtcdClient) addLockName(lockName string, lock *v3Concurrency.Mutex, session *v3Concurrency.Session) {
+	c.lockToMutexLock.Lock()
+	defer c.lockToMutexLock.Unlock()
+	c.lockToMutexMap[lockName] = lock
+	c.lockToSessionMap[lockName] = session
+}
+
+func (c *EtcdClient) deleteLockName(lockName string) {
+	c.lockToMutexLock.Lock()
+	defer c.lockToMutexLock.Unlock()
+	delete(c.lockToMutexMap, lockName)
+	delete(c.lockToSessionMap, lockName)
+}
+
+func (c *EtcdClient) getLock(lockName string) (*v3Concurrency.Mutex, *v3Concurrency.Session) {
+	c.lockToMutexLock.Lock()
+	defer c.lockToMutexLock.Unlock()
+	var lock *v3Concurrency.Mutex
+	var session *v3Concurrency.Session
+	if l, exist := c.lockToMutexMap[lockName]; exist {
+		lock = l
+	}
+	if s, exist := c.lockToSessionMap[lockName]; exist {
+		session = s
+	}
+	return lock, session
+}
+
+
+func (c *EtcdClient)  AcquireLock(lockName string, timeout int) error {
+	duration := GetDuration(timeout)
+	ctx, cancel := context.WithTimeout(context.Background(), duration)
+	session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+	mu := v3Concurrency.NewMutex(session, "/devicelock_" + lockName)
+	if err := mu.Lock(context.Background()); err != nil {
+		return err
+	}
+	c.addLockName(lockName, mu, session)
+	cancel()
+	return nil
+}
+
+func (c *EtcdClient)  ReleaseLock(lockName string) error {
+	lock, session := c.getLock(lockName)
+	var err error
+	if lock != nil {
+		if e := lock.Unlock(context.Background()); e != nil {
+			err = e
+		}
+	}
+	if session != nil {
+		if e := session.Close(); e != nil {
+			err = e
+		}
+	}
+	c.deleteLockName(lockName)
+
+	return err
+}
\ No newline at end of file