[VOL-1505] This update enables the core to add a key when
publishing an event onto kafka. The corresponding update is
done in the adapter GO components. Similar changes remain to
be done in pyvoltha.
Change-Id: I0bb1e3cb8c2fa9e0214f96d863819755d34a0bb9
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index a8e6311..34ab711 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -84,6 +84,8 @@
ReleaseAllReservations() error
RenewReservation(key string) error
Watch(key string) chan *Event
+ AcquireLock(lockName string, timeout int) error
+ ReleaseLock(lockName string) error
CloseWatch(key string, ch chan *Event)
Close()
}
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index a5c71ac..738ca92 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -497,3 +497,12 @@
log.Errorw("error-closing-client", log.Fields{"error": err})
}
}
+
+
+func (c *ConsulClient) AcquireLock(lockName string, timeout int) error {
+ return nil
+}
+
+func (c *ConsulClient) ReleaseLock(lockName string) error {
+ return nil
+}
\ No newline at end of file
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 9ecddca..2caa990 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -33,6 +33,10 @@
keyReservations map[string]*v3Client.LeaseID
watchedChannels map[string][]map[chan *Event]v3Client.Watcher
writeLock sync.Mutex
+ lockToMutexMap map[string]*v3Concurrency.Mutex
+ lockToSessionMap map[string]*v3Concurrency.Session
+ lockToMutexLock sync.Mutex
+
}
// NewEtcdClient returns a new client for the Etcd KV store
@@ -49,8 +53,10 @@
}
wc := make(map[string][]map[chan *Event]v3Client.Watcher)
reservations := make(map[string]*v3Client.LeaseID)
+ lockMutexMap := make(map[string]*v3Concurrency.Mutex)
+ lockSessionMap := make(map[string]*v3Concurrency.Session)
- return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations: reservations}, nil
+ return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations:reservations, lockToMutexMap:lockMutexMap, lockToSessionMap:lockSessionMap}, nil
}
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
@@ -431,3 +437,63 @@
log.Errorw("error-closing-client", log.Fields{"error": err})
}
}
+
+func (c *EtcdClient) addLockName(lockName string, lock *v3Concurrency.Mutex, session *v3Concurrency.Session) {
+ c.lockToMutexLock.Lock()
+ defer c.lockToMutexLock.Unlock()
+ c.lockToMutexMap[lockName] = lock
+ c.lockToSessionMap[lockName] = session
+}
+
+func (c *EtcdClient) deleteLockName(lockName string) {
+ c.lockToMutexLock.Lock()
+ defer c.lockToMutexLock.Unlock()
+ delete(c.lockToMutexMap, lockName)
+ delete(c.lockToSessionMap, lockName)
+}
+
+func (c *EtcdClient) getLock(lockName string) (*v3Concurrency.Mutex, *v3Concurrency.Session) {
+ c.lockToMutexLock.Lock()
+ defer c.lockToMutexLock.Unlock()
+ var lock *v3Concurrency.Mutex
+ var session *v3Concurrency.Session
+ if l, exist := c.lockToMutexMap[lockName]; exist {
+ lock = l
+ }
+ if s, exist := c.lockToSessionMap[lockName]; exist {
+ session = s
+ }
+ return lock, session
+}
+
+
+func (c *EtcdClient) AcquireLock(lockName string, timeout int) error {
+ duration := GetDuration(timeout)
+ ctx, cancel := context.WithTimeout(context.Background(), duration)
+ session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+ mu := v3Concurrency.NewMutex(session, "/devicelock_" + lockName)
+ if err := mu.Lock(context.Background()); err != nil {
+ return err
+ }
+ c.addLockName(lockName, mu, session)
+ cancel()
+ return nil
+}
+
+func (c *EtcdClient) ReleaseLock(lockName string) error {
+ lock, session := c.getLock(lockName)
+ var err error
+ if lock != nil {
+ if e := lock.Unlock(context.Background()); e != nil {
+ err = e
+ }
+ }
+ if session != nil {
+ if e := session.Close(); e != nil {
+ err = e
+ }
+ }
+ c.deleteLockName(lockName)
+
+ return err
+}
\ No newline at end of file