VOL-1867 move simulated olt from voltha-go to voltha-simolt-adapter
Sourced from voltha-go commit 251a11c0ffe60512318a644cd6ce0dc4e12f4018
Change-Id: I8e7ee4da1fed739b3c461917301d2729a79307f5
diff --git a/vendor/github.com/hashicorp/consul/api/semaphore.go b/vendor/github.com/hashicorp/consul/api/semaphore.go
new file mode 100644
index 0000000..bc4f885
--- /dev/null
+++ b/vendor/github.com/hashicorp/consul/api/semaphore.go
@@ -0,0 +1,514 @@
+package api
+
+import (
+ "encoding/json"
+ "fmt"
+ "path"
+ "sync"
+ "time"
+)
+
+const (
+ // DefaultSemaphoreSessionName is the Session Name we assign if none is provided
+ DefaultSemaphoreSessionName = "Consul API Semaphore"
+
+ // DefaultSemaphoreSessionTTL is the default session TTL if no Session is provided
+ // when creating a new Semaphore. This is used because we do not have another
+ // other check to depend upon.
+ DefaultSemaphoreSessionTTL = "15s"
+
+ // DefaultSemaphoreWaitTime is how long we block for at a time to check if semaphore
+ // acquisition is possible. This affects the minimum time it takes to cancel
+ // a Semaphore acquisition.
+ DefaultSemaphoreWaitTime = 15 * time.Second
+
+ // DefaultSemaphoreKey is the key used within the prefix to
+ // use for coordination between all the contenders.
+ DefaultSemaphoreKey = ".lock"
+
+ // SemaphoreFlagValue is a magic flag we set to indicate a key
+ // is being used for a semaphore. It is used to detect a potential
+ // conflict with a lock.
+ SemaphoreFlagValue = 0xe0f69a2baa414de0
+)
+
+var (
+ // ErrSemaphoreHeld is returned if we attempt to double lock
+ ErrSemaphoreHeld = fmt.Errorf("Semaphore already held")
+
+ // ErrSemaphoreNotHeld is returned if we attempt to unlock a semaphore
+ // that we do not hold.
+ ErrSemaphoreNotHeld = fmt.Errorf("Semaphore not held")
+
+ // ErrSemaphoreInUse is returned if we attempt to destroy a semaphore
+ // that is in use.
+ ErrSemaphoreInUse = fmt.Errorf("Semaphore in use")
+
+ // ErrSemaphoreConflict is returned if the flags on a key
+ // used for a semaphore do not match expectation
+ ErrSemaphoreConflict = fmt.Errorf("Existing key does not match semaphore use")
+)
+
+// Semaphore is used to implement a distributed semaphore
+// using the Consul KV primitives.
+type Semaphore struct {
+ c *Client
+ opts *SemaphoreOptions
+
+ isHeld bool
+ sessionRenew chan struct{}
+ lockSession string
+ l sync.Mutex
+}
+
+// SemaphoreOptions is used to parameterize the Semaphore
+type SemaphoreOptions struct {
+ Prefix string // Must be set and have write permissions
+ Limit int // Must be set, and be positive
+ Value []byte // Optional, value to associate with the contender entry
+ Session string // Optional, created if not specified
+ SessionName string // Optional, defaults to DefaultLockSessionName
+ SessionTTL string // Optional, defaults to DefaultLockSessionTTL
+ MonitorRetries int // Optional, defaults to 0 which means no retries
+ MonitorRetryTime time.Duration // Optional, defaults to DefaultMonitorRetryTime
+ SemaphoreWaitTime time.Duration // Optional, defaults to DefaultSemaphoreWaitTime
+ SemaphoreTryOnce bool // Optional, defaults to false which means try forever
+}
+
+// semaphoreLock is written under the DefaultSemaphoreKey and
+// is used to coordinate between all the contenders.
+type semaphoreLock struct {
+ // Limit is the integer limit of holders. This is used to
+ // verify that all the holders agree on the value.
+ Limit int
+
+ // Holders is a list of all the semaphore holders.
+ // It maps the session ID to true. It is used as a set effectively.
+ Holders map[string]bool
+}
+
+// SemaphorePrefix is used to created a Semaphore which will operate
+// at the given KV prefix and uses the given limit for the semaphore.
+// The prefix must have write privileges, and the limit must be agreed
+// upon by all contenders.
+func (c *Client) SemaphorePrefix(prefix string, limit int) (*Semaphore, error) {
+ opts := &SemaphoreOptions{
+ Prefix: prefix,
+ Limit: limit,
+ }
+ return c.SemaphoreOpts(opts)
+}
+
+// SemaphoreOpts is used to create a Semaphore with the given options.
+// The prefix must have write privileges, and the limit must be agreed
+// upon by all contenders. If a Session is not provided, one will be created.
+func (c *Client) SemaphoreOpts(opts *SemaphoreOptions) (*Semaphore, error) {
+ if opts.Prefix == "" {
+ return nil, fmt.Errorf("missing prefix")
+ }
+ if opts.Limit <= 0 {
+ return nil, fmt.Errorf("semaphore limit must be positive")
+ }
+ if opts.SessionName == "" {
+ opts.SessionName = DefaultSemaphoreSessionName
+ }
+ if opts.SessionTTL == "" {
+ opts.SessionTTL = DefaultSemaphoreSessionTTL
+ } else {
+ if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
+ return nil, fmt.Errorf("invalid SessionTTL: %v", err)
+ }
+ }
+ if opts.MonitorRetryTime == 0 {
+ opts.MonitorRetryTime = DefaultMonitorRetryTime
+ }
+ if opts.SemaphoreWaitTime == 0 {
+ opts.SemaphoreWaitTime = DefaultSemaphoreWaitTime
+ }
+ s := &Semaphore{
+ c: c,
+ opts: opts,
+ }
+ return s, nil
+}
+
+// Acquire attempts to reserve a slot in the semaphore, blocking until
+// success, interrupted via the stopCh or an error is encountered.
+// Providing a non-nil stopCh can be used to abort the attempt.
+// On success, a channel is returned that represents our slot.
+// This channel could be closed at any time due to session invalidation,
+// communication errors, operator intervention, etc. It is NOT safe to
+// assume that the slot is held until Release() unless the Session is specifically
+// created without any associated health checks. By default Consul sessions
+// prefer liveness over safety and an application must be able to handle
+// the session being lost.
+func (s *Semaphore) Acquire(stopCh <-chan struct{}) (<-chan struct{}, error) {
+ // Hold the lock as we try to acquire
+ s.l.Lock()
+ defer s.l.Unlock()
+
+ // Check if we already hold the semaphore
+ if s.isHeld {
+ return nil, ErrSemaphoreHeld
+ }
+
+ // Check if we need to create a session first
+ s.lockSession = s.opts.Session
+ if s.lockSession == "" {
+ sess, err := s.createSession()
+ if err != nil {
+ return nil, fmt.Errorf("failed to create session: %v", err)
+ }
+
+ s.sessionRenew = make(chan struct{})
+ s.lockSession = sess
+ session := s.c.Session()
+ go session.RenewPeriodic(s.opts.SessionTTL, sess, nil, s.sessionRenew)
+
+ // If we fail to acquire the lock, cleanup the session
+ defer func() {
+ if !s.isHeld {
+ close(s.sessionRenew)
+ s.sessionRenew = nil
+ }
+ }()
+ }
+
+ // Create the contender entry
+ kv := s.c.KV()
+ made, _, err := kv.Acquire(s.contenderEntry(s.lockSession), nil)
+ if err != nil || !made {
+ return nil, fmt.Errorf("failed to make contender entry: %v", err)
+ }
+
+ // Setup the query options
+ qOpts := &QueryOptions{
+ WaitTime: s.opts.SemaphoreWaitTime,
+ }
+
+ start := time.Now()
+ attempts := 0
+WAIT:
+ // Check if we should quit
+ select {
+ case <-stopCh:
+ return nil, nil
+ default:
+ }
+
+ // Handle the one-shot mode.
+ if s.opts.SemaphoreTryOnce && attempts > 0 {
+ elapsed := time.Since(start)
+ if elapsed > s.opts.SemaphoreWaitTime {
+ return nil, nil
+ }
+
+ // Query wait time should not exceed the semaphore wait time
+ qOpts.WaitTime = s.opts.SemaphoreWaitTime - elapsed
+ }
+ attempts++
+
+ // Read the prefix
+ pairs, meta, err := kv.List(s.opts.Prefix, qOpts)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read prefix: %v", err)
+ }
+
+ // Decode the lock
+ lockPair := s.findLock(pairs)
+ if lockPair.Flags != SemaphoreFlagValue {
+ return nil, ErrSemaphoreConflict
+ }
+ lock, err := s.decodeLock(lockPair)
+ if err != nil {
+ return nil, err
+ }
+
+ // Verify we agree with the limit
+ if lock.Limit != s.opts.Limit {
+ return nil, fmt.Errorf("semaphore limit conflict (lock: %d, local: %d)",
+ lock.Limit, s.opts.Limit)
+ }
+
+ // Prune the dead holders
+ s.pruneDeadHolders(lock, pairs)
+
+ // Check if the lock is held
+ if len(lock.Holders) >= lock.Limit {
+ qOpts.WaitIndex = meta.LastIndex
+ goto WAIT
+ }
+
+ // Create a new lock with us as a holder
+ lock.Holders[s.lockSession] = true
+ newLock, err := s.encodeLock(lock, lockPair.ModifyIndex)
+ if err != nil {
+ return nil, err
+ }
+
+ // Attempt the acquisition
+ didSet, _, err := kv.CAS(newLock, nil)
+ if err != nil {
+ return nil, fmt.Errorf("failed to update lock: %v", err)
+ }
+ if !didSet {
+ // Update failed, could have been a race with another contender,
+ // retry the operation
+ goto WAIT
+ }
+
+ // Watch to ensure we maintain ownership of the slot
+ lockCh := make(chan struct{})
+ go s.monitorLock(s.lockSession, lockCh)
+
+ // Set that we own the lock
+ s.isHeld = true
+
+ // Acquired! All done
+ return lockCh, nil
+}
+
+// Release is used to voluntarily give up our semaphore slot. It is
+// an error to call this if the semaphore has not been acquired.
+func (s *Semaphore) Release() error {
+ // Hold the lock as we try to release
+ s.l.Lock()
+ defer s.l.Unlock()
+
+ // Ensure the lock is actually held
+ if !s.isHeld {
+ return ErrSemaphoreNotHeld
+ }
+
+ // Set that we no longer own the lock
+ s.isHeld = false
+
+ // Stop the session renew
+ if s.sessionRenew != nil {
+ defer func() {
+ close(s.sessionRenew)
+ s.sessionRenew = nil
+ }()
+ }
+
+ // Get and clear the lock session
+ lockSession := s.lockSession
+ s.lockSession = ""
+
+ // Remove ourselves as a lock holder
+ kv := s.c.KV()
+ key := path.Join(s.opts.Prefix, DefaultSemaphoreKey)
+READ:
+ pair, _, err := kv.Get(key, nil)
+ if err != nil {
+ return err
+ }
+ if pair == nil {
+ pair = &KVPair{}
+ }
+ lock, err := s.decodeLock(pair)
+ if err != nil {
+ return err
+ }
+
+ // Create a new lock without us as a holder
+ if _, ok := lock.Holders[lockSession]; ok {
+ delete(lock.Holders, lockSession)
+ newLock, err := s.encodeLock(lock, pair.ModifyIndex)
+ if err != nil {
+ return err
+ }
+
+ // Swap the locks
+ didSet, _, err := kv.CAS(newLock, nil)
+ if err != nil {
+ return fmt.Errorf("failed to update lock: %v", err)
+ }
+ if !didSet {
+ goto READ
+ }
+ }
+
+ // Destroy the contender entry
+ contenderKey := path.Join(s.opts.Prefix, lockSession)
+ if _, err := kv.Delete(contenderKey, nil); err != nil {
+ return err
+ }
+ return nil
+}
+
+// Destroy is used to cleanup the semaphore entry. It is not necessary
+// to invoke. It will fail if the semaphore is in use.
+func (s *Semaphore) Destroy() error {
+ // Hold the lock as we try to acquire
+ s.l.Lock()
+ defer s.l.Unlock()
+
+ // Check if we already hold the semaphore
+ if s.isHeld {
+ return ErrSemaphoreHeld
+ }
+
+ // List for the semaphore
+ kv := s.c.KV()
+ pairs, _, err := kv.List(s.opts.Prefix, nil)
+ if err != nil {
+ return fmt.Errorf("failed to read prefix: %v", err)
+ }
+
+ // Find the lock pair, bail if it doesn't exist
+ lockPair := s.findLock(pairs)
+ if lockPair.ModifyIndex == 0 {
+ return nil
+ }
+ if lockPair.Flags != SemaphoreFlagValue {
+ return ErrSemaphoreConflict
+ }
+
+ // Decode the lock
+ lock, err := s.decodeLock(lockPair)
+ if err != nil {
+ return err
+ }
+
+ // Prune the dead holders
+ s.pruneDeadHolders(lock, pairs)
+
+ // Check if there are any holders
+ if len(lock.Holders) > 0 {
+ return ErrSemaphoreInUse
+ }
+
+ // Attempt the delete
+ didRemove, _, err := kv.DeleteCAS(lockPair, nil)
+ if err != nil {
+ return fmt.Errorf("failed to remove semaphore: %v", err)
+ }
+ if !didRemove {
+ return ErrSemaphoreInUse
+ }
+ return nil
+}
+
+// createSession is used to create a new managed session
+func (s *Semaphore) createSession() (string, error) {
+ session := s.c.Session()
+ se := &SessionEntry{
+ Name: s.opts.SessionName,
+ TTL: s.opts.SessionTTL,
+ Behavior: SessionBehaviorDelete,
+ }
+ id, _, err := session.Create(se, nil)
+ if err != nil {
+ return "", err
+ }
+ return id, nil
+}
+
+// contenderEntry returns a formatted KVPair for the contender
+func (s *Semaphore) contenderEntry(session string) *KVPair {
+ return &KVPair{
+ Key: path.Join(s.opts.Prefix, session),
+ Value: s.opts.Value,
+ Session: session,
+ Flags: SemaphoreFlagValue,
+ }
+}
+
+// findLock is used to find the KV Pair which is used for coordination
+func (s *Semaphore) findLock(pairs KVPairs) *KVPair {
+ key := path.Join(s.opts.Prefix, DefaultSemaphoreKey)
+ for _, pair := range pairs {
+ if pair.Key == key {
+ return pair
+ }
+ }
+ return &KVPair{Flags: SemaphoreFlagValue}
+}
+
+// decodeLock is used to decode a semaphoreLock from an
+// entry in Consul
+func (s *Semaphore) decodeLock(pair *KVPair) (*semaphoreLock, error) {
+ // Handle if there is no lock
+ if pair == nil || pair.Value == nil {
+ return &semaphoreLock{
+ Limit: s.opts.Limit,
+ Holders: make(map[string]bool),
+ }, nil
+ }
+
+ l := &semaphoreLock{}
+ if err := json.Unmarshal(pair.Value, l); err != nil {
+ return nil, fmt.Errorf("lock decoding failed: %v", err)
+ }
+ return l, nil
+}
+
+// encodeLock is used to encode a semaphoreLock into a KVPair
+// that can be PUT
+func (s *Semaphore) encodeLock(l *semaphoreLock, oldIndex uint64) (*KVPair, error) {
+ enc, err := json.Marshal(l)
+ if err != nil {
+ return nil, fmt.Errorf("lock encoding failed: %v", err)
+ }
+ pair := &KVPair{
+ Key: path.Join(s.opts.Prefix, DefaultSemaphoreKey),
+ Value: enc,
+ Flags: SemaphoreFlagValue,
+ ModifyIndex: oldIndex,
+ }
+ return pair, nil
+}
+
+// pruneDeadHolders is used to remove all the dead lock holders
+func (s *Semaphore) pruneDeadHolders(lock *semaphoreLock, pairs KVPairs) {
+ // Gather all the live holders
+ alive := make(map[string]struct{}, len(pairs))
+ for _, pair := range pairs {
+ if pair.Session != "" {
+ alive[pair.Session] = struct{}{}
+ }
+ }
+
+ // Remove any holders that are dead
+ for holder := range lock.Holders {
+ if _, ok := alive[holder]; !ok {
+ delete(lock.Holders, holder)
+ }
+ }
+}
+
+// monitorLock is a long running routine to monitor a semaphore ownership
+// It closes the stopCh if we lose our slot.
+func (s *Semaphore) monitorLock(session string, stopCh chan struct{}) {
+ defer close(stopCh)
+ kv := s.c.KV()
+ opts := &QueryOptions{RequireConsistent: true}
+WAIT:
+ retries := s.opts.MonitorRetries
+RETRY:
+ pairs, meta, err := kv.List(s.opts.Prefix, opts)
+ if err != nil {
+ // If configured we can try to ride out a brief Consul unavailability
+ // by doing retries. Note that we have to attempt the retry in a non-
+ // blocking fashion so that we have a clean place to reset the retry
+ // counter if service is restored.
+ if retries > 0 && IsRetryableError(err) {
+ time.Sleep(s.opts.MonitorRetryTime)
+ retries--
+ opts.WaitIndex = 0
+ goto RETRY
+ }
+ return
+ }
+ lockPair := s.findLock(pairs)
+ lock, err := s.decodeLock(lockPair)
+ if err != nil {
+ return
+ }
+ s.pruneDeadHolders(lock, pairs)
+ if _, ok := lock.Holders[session]; ok {
+ opts.WaitIndex = meta.LastIndex
+ goto WAIT
+ }
+}