VOL-2017 voltha-lib moved from voltha-go;
release version 2.2.1
Based on voltha-go commit 5259f8e52b3e3f5c7ad422a4b0e506e1d07f6b36
Change-Id: I8bbecdf456e420714a4016120eafc0d237c80565
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/doc.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/doc.go
new file mode 100644
index 0000000..dcdbf51
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/doc.go
@@ -0,0 +1,17 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package concurrency implements concurrency operations on top of
+// etcd such as distributed locks, barriers, and elections.
+package concurrency
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/election.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/election.go
new file mode 100644
index 0000000..2521db6
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/election.go
@@ -0,0 +1,254 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ v3 "go.etcd.io/etcd/clientv3"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/mvcc/mvccpb"
+)
+
+var (
+ ErrElectionNotLeader = errors.New("election: not leader")
+ ErrElectionNoLeader = errors.New("election: no leader")
+)
+
+type Election struct {
+ session *Session
+
+ keyPrefix string
+
+ leaderKey string
+ leaderRev int64
+ leaderSession *Session
+ hdr *pb.ResponseHeader
+}
+
+// NewElection returns a new election on a given key prefix.
+func NewElection(s *Session, pfx string) *Election {
+ return &Election{session: s, keyPrefix: pfx + "/"}
+}
+
+// ResumeElection initializes an election with a known leader.
+func ResumeElection(s *Session, pfx string, leaderKey string, leaderRev int64) *Election {
+ return &Election{
+ keyPrefix: pfx,
+ session: s,
+ leaderKey: leaderKey,
+ leaderRev: leaderRev,
+ leaderSession: s,
+ }
+}
+
+// Campaign puts a value as eligible for the election on the prefix
+// key.
+// Multiple sessions can participate in the election for the
+// same prefix, but only one can be the leader at a time.
+//
+// If the context is 'context.TODO()/context.Background()', the Campaign
+// will continue to be blocked for other keys to be deleted, unless server
+// returns a non-recoverable error (e.g. ErrCompacted).
+// Otherwise, until the context is not cancelled or timed-out, Campaign will
+// continue to be blocked until it becomes the leader.
+func (e *Election) Campaign(ctx context.Context, val string) error {
+ s := e.session
+ client := e.session.Client()
+
+ k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
+ txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
+ txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
+ txn = txn.Else(v3.OpGet(k))
+ resp, err := txn.Commit()
+ if err != nil {
+ return err
+ }
+ e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
+ if !resp.Succeeded {
+ kv := resp.Responses[0].GetResponseRange().Kvs[0]
+ e.leaderRev = kv.CreateRevision
+ if string(kv.Value) != val {
+ if err = e.Proclaim(ctx, val); err != nil {
+ e.Resign(ctx)
+ return err
+ }
+ }
+ }
+
+ _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
+ if err != nil {
+ // clean up in case of context cancel
+ select {
+ case <-ctx.Done():
+ e.Resign(client.Ctx())
+ default:
+ e.leaderSession = nil
+ }
+ return err
+ }
+ e.hdr = resp.Header
+
+ return nil
+}
+
+// Proclaim lets the leader announce a new value without another election.
+func (e *Election) Proclaim(ctx context.Context, val string) error {
+ if e.leaderSession == nil {
+ return ErrElectionNotLeader
+ }
+ client := e.session.Client()
+ cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
+ txn := client.Txn(ctx).If(cmp)
+ txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
+ tresp, terr := txn.Commit()
+ if terr != nil {
+ return terr
+ }
+ if !tresp.Succeeded {
+ e.leaderKey = ""
+ return ErrElectionNotLeader
+ }
+
+ e.hdr = tresp.Header
+ return nil
+}
+
+// Resign lets a leader start a new election.
+func (e *Election) Resign(ctx context.Context) (err error) {
+ if e.leaderSession == nil {
+ return nil
+ }
+ client := e.session.Client()
+ cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
+ resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
+ if err == nil {
+ e.hdr = resp.Header
+ }
+ e.leaderKey = ""
+ e.leaderSession = nil
+ return err
+}
+
+// Leader returns the leader value for the current election.
+func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
+ client := e.session.Client()
+ resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
+ if err != nil {
+ return nil, err
+ } else if len(resp.Kvs) == 0 {
+ // no leader currently elected
+ return nil, ErrElectionNoLeader
+ }
+ return resp, nil
+}
+
+// Observe returns a channel that reliably observes ordered leader proposals
+// as GetResponse values on every current elected leader key. It will not
+// necessarily fetch all historical leader updates, but will always post the
+// most recent leader value.
+//
+// The channel closes when the context is canceled or the underlying watcher
+// is otherwise disrupted.
+func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse {
+ retc := make(chan v3.GetResponse)
+ go e.observe(ctx, retc)
+ return retc
+}
+
+func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
+ client := e.session.Client()
+
+ defer close(ch)
+ for {
+ resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
+ if err != nil {
+ return
+ }
+
+ var kv *mvccpb.KeyValue
+ var hdr *pb.ResponseHeader
+
+ if len(resp.Kvs) == 0 {
+ cctx, cancel := context.WithCancel(ctx)
+ // wait for first key put on prefix
+ opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
+ wch := client.Watch(cctx, e.keyPrefix, opts...)
+ for kv == nil {
+ wr, ok := <-wch
+ if !ok || wr.Err() != nil {
+ cancel()
+ return
+ }
+ // only accept puts; a delete will make observe() spin
+ for _, ev := range wr.Events {
+ if ev.Type == mvccpb.PUT {
+ hdr, kv = &wr.Header, ev.Kv
+ // may have multiple revs; hdr.rev = the last rev
+ // set to kv's rev in case batch has multiple Puts
+ hdr.Revision = kv.ModRevision
+ break
+ }
+ }
+ }
+ cancel()
+ } else {
+ hdr, kv = resp.Header, resp.Kvs[0]
+ }
+
+ select {
+ case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
+ case <-ctx.Done():
+ return
+ }
+
+ cctx, cancel := context.WithCancel(ctx)
+ wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
+ keyDeleted := false
+ for !keyDeleted {
+ wr, ok := <-wch
+ if !ok {
+ cancel()
+ return
+ }
+ for _, ev := range wr.Events {
+ if ev.Type == mvccpb.DELETE {
+ keyDeleted = true
+ break
+ }
+ resp.Header = &wr.Header
+ resp.Kvs = []*mvccpb.KeyValue{ev.Kv}
+ select {
+ case ch <- *resp:
+ case <-cctx.Done():
+ cancel()
+ return
+ }
+ }
+ }
+ cancel()
+ }
+}
+
+// Key returns the leader key if elected, empty string otherwise.
+func (e *Election) Key() string { return e.leaderKey }
+
+// Rev returns the leader key's creation revision, if elected.
+func (e *Election) Rev() int64 { return e.leaderRev }
+
+// Header is the response header from the last successful election proposal.
+func (e *Election) Header() *pb.ResponseHeader { return e.hdr }
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/key.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/key.go
new file mode 100644
index 0000000..e4cf775
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/key.go
@@ -0,0 +1,65 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency
+
+import (
+ "context"
+ "fmt"
+
+ v3 "go.etcd.io/etcd/clientv3"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+ "go.etcd.io/etcd/mvcc/mvccpb"
+)
+
+func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
+ cctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ var wr v3.WatchResponse
+ wch := client.Watch(cctx, key, v3.WithRev(rev))
+ for wr = range wch {
+ for _, ev := range wr.Events {
+ if ev.Type == mvccpb.DELETE {
+ return nil
+ }
+ }
+ }
+ if err := wr.Err(); err != nil {
+ return err
+ }
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ return fmt.Errorf("lost watcher waiting for delete")
+}
+
+// waitDeletes efficiently waits until all keys matching the prefix and no greater
+// than the create revision.
+func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
+ getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
+ for {
+ resp, err := client.Get(ctx, pfx, getOpts...)
+ if err != nil {
+ return nil, err
+ }
+ if len(resp.Kvs) == 0 {
+ return resp.Header, nil
+ }
+ lastKey := string(resp.Kvs[0].Key)
+ if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
+ return nil, err
+ }
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go
new file mode 100644
index 0000000..306470b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/mutex.go
@@ -0,0 +1,153 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+
+ v3 "go.etcd.io/etcd/clientv3"
+ pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
+)
+
+// ErrLocked is returned by TryLock when Mutex is already locked by another session.
+var ErrLocked = errors.New("mutex: Locked by another session")
+
+// Mutex implements the sync Locker interface with etcd
+type Mutex struct {
+ s *Session
+
+ pfx string
+ myKey string
+ myRev int64
+ hdr *pb.ResponseHeader
+}
+
+func NewMutex(s *Session, pfx string) *Mutex {
+ return &Mutex{s, pfx + "/", "", -1, nil}
+}
+
+// TryLock locks the mutex if not already locked by another session.
+// If lock is held by another session, return immediately after attempting necessary cleanup
+// The ctx argument is used for the sending/receiving Txn RPC.
+func (m *Mutex) TryLock(ctx context.Context) error {
+ resp, err := m.tryAcquire(ctx)
+ if err != nil {
+ return err
+ }
+ // if no key on prefix / the minimum rev is key, already hold the lock
+ ownerKey := resp.Responses[1].GetResponseRange().Kvs
+ if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
+ m.hdr = resp.Header
+ return nil
+ }
+ client := m.s.Client()
+ // Cannot lock, so delete the key
+ if _, err := client.Delete(ctx, m.myKey); err != nil {
+ return err
+ }
+ m.myKey = "\x00"
+ m.myRev = -1
+ return ErrLocked
+}
+
+// Lock locks the mutex with a cancelable context. If the context is canceled
+// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
+func (m *Mutex) Lock(ctx context.Context) error {
+ resp, err := m.tryAcquire(ctx)
+ if err != nil {
+ return err
+ }
+ // if no key on prefix / the minimum rev is key, already hold the lock
+ ownerKey := resp.Responses[1].GetResponseRange().Kvs
+ if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
+ m.hdr = resp.Header
+ return nil
+ }
+ client := m.s.Client()
+ // wait for deletion revisions prior to myKey
+ hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
+ // release lock key if wait failed
+ if werr != nil {
+ m.Unlock(client.Ctx())
+ } else {
+ m.hdr = hdr
+ }
+ return werr
+}
+
+func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
+ s := m.s
+ client := m.s.Client()
+
+ m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
+ cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
+ // put self in lock waiters via myKey; oldest waiter holds lock
+ put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
+ // reuse key in case this session already holds the lock
+ get := v3.OpGet(m.myKey)
+ // fetch current holder to complete uncontended path with only one RPC
+ getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
+ resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
+ if err != nil {
+ return nil, err
+ }
+ m.myRev = resp.Header.Revision
+ if !resp.Succeeded {
+ m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
+ }
+ return resp, nil
+}
+
+func (m *Mutex) Unlock(ctx context.Context) error {
+ client := m.s.Client()
+ if _, err := client.Delete(ctx, m.myKey); err != nil {
+ return err
+ }
+ m.myKey = "\x00"
+ m.myRev = -1
+ return nil
+}
+
+func (m *Mutex) IsOwner() v3.Cmp {
+ return v3.Compare(v3.CreateRevision(m.myKey), "=", m.myRev)
+}
+
+func (m *Mutex) Key() string { return m.myKey }
+
+// Header is the response header received from etcd on acquiring the lock.
+func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }
+
+type lockerMutex struct{ *Mutex }
+
+func (lm *lockerMutex) Lock() {
+ client := lm.s.Client()
+ if err := lm.Mutex.Lock(client.Ctx()); err != nil {
+ panic(err)
+ }
+}
+func (lm *lockerMutex) Unlock() {
+ client := lm.s.Client()
+ if err := lm.Mutex.Unlock(client.Ctx()); err != nil {
+ panic(err)
+ }
+}
+
+// NewLocker creates a sync.Locker backed by an etcd mutex.
+func NewLocker(s *Session, pfx string) sync.Locker {
+ return &lockerMutex{NewMutex(s, pfx)}
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/session.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/session.go
new file mode 100644
index 0000000..97eb763
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/session.go
@@ -0,0 +1,141 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency
+
+import (
+ "context"
+ "time"
+
+ v3 "go.etcd.io/etcd/clientv3"
+)
+
+const defaultSessionTTL = 60
+
+// Session represents a lease kept alive for the lifetime of a client.
+// Fault-tolerant applications may use sessions to reason about liveness.
+type Session struct {
+ client *v3.Client
+ opts *sessionOptions
+ id v3.LeaseID
+
+ cancel context.CancelFunc
+ donec <-chan struct{}
+}
+
+// NewSession gets the leased session for a client.
+func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
+ ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
+ for _, opt := range opts {
+ opt(ops)
+ }
+
+ id := ops.leaseID
+ if id == v3.NoLease {
+ resp, err := client.Grant(ops.ctx, int64(ops.ttl))
+ if err != nil {
+ return nil, err
+ }
+ id = resp.ID
+ }
+
+ ctx, cancel := context.WithCancel(ops.ctx)
+ keepAlive, err := client.KeepAlive(ctx, id)
+ if err != nil || keepAlive == nil {
+ cancel()
+ return nil, err
+ }
+
+ donec := make(chan struct{})
+ s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
+
+ // keep the lease alive until client error or cancelled context
+ go func() {
+ defer close(donec)
+ for range keepAlive {
+ // eat messages until keep alive channel closes
+ }
+ }()
+
+ return s, nil
+}
+
+// Client is the etcd client that is attached to the session.
+func (s *Session) Client() *v3.Client {
+ return s.client
+}
+
+// Lease is the lease ID for keys bound to the session.
+func (s *Session) Lease() v3.LeaseID { return s.id }
+
+// Done returns a channel that closes when the lease is orphaned, expires, or
+// is otherwise no longer being refreshed.
+func (s *Session) Done() <-chan struct{} { return s.donec }
+
+// Orphan ends the refresh for the session lease. This is useful
+// in case the state of the client connection is indeterminate (revoke
+// would fail) or when transferring lease ownership.
+func (s *Session) Orphan() {
+ s.cancel()
+ <-s.donec
+}
+
+// Close orphans the session and revokes the session lease.
+func (s *Session) Close() error {
+ s.Orphan()
+ // if revoke takes longer than the ttl, lease is expired anyway
+ ctx, cancel := context.WithTimeout(s.opts.ctx, time.Duration(s.opts.ttl)*time.Second)
+ _, err := s.client.Revoke(ctx, s.id)
+ cancel()
+ return err
+}
+
+type sessionOptions struct {
+ ttl int
+ leaseID v3.LeaseID
+ ctx context.Context
+}
+
+// SessionOption configures Session.
+type SessionOption func(*sessionOptions)
+
+// WithTTL configures the session's TTL in seconds.
+// If TTL is <= 0, the default 60 seconds TTL will be used.
+func WithTTL(ttl int) SessionOption {
+ return func(so *sessionOptions) {
+ if ttl > 0 {
+ so.ttl = ttl
+ }
+ }
+}
+
+// WithLease specifies the existing leaseID to be used for the session.
+// This is useful in process restart scenario, for example, to reclaim
+// leadership from an election prior to restart.
+func WithLease(leaseID v3.LeaseID) SessionOption {
+ return func(so *sessionOptions) {
+ so.leaseID = leaseID
+ }
+}
+
+// WithContext assigns a context to the session instead of defaulting to
+// using the client context. This is useful for canceling NewSession and
+// Close operations immediately without having to close the client. If the
+// context is canceled before Close() completes, the session's lease will be
+// abandoned and left to expire instead of being revoked.
+func WithContext(ctx context.Context) SessionOption {
+ return func(so *sessionOptions) {
+ so.ctx = ctx
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/clientv3/concurrency/stm.go b/vendor/go.etcd.io/etcd/clientv3/concurrency/stm.go
new file mode 100644
index 0000000..ee11510
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/clientv3/concurrency/stm.go
@@ -0,0 +1,387 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency
+
+import (
+ "context"
+ "math"
+
+ v3 "go.etcd.io/etcd/clientv3"
+)
+
+// STM is an interface for software transactional memory.
+type STM interface {
+ // Get returns the value for a key and inserts the key in the txn's read set.
+ // If Get fails, it aborts the transaction with an error, never returning.
+ Get(key ...string) string
+ // Put adds a value for a key to the write set.
+ Put(key, val string, opts ...v3.OpOption)
+ // Rev returns the revision of a key in the read set.
+ Rev(key string) int64
+ // Del deletes a key.
+ Del(key string)
+
+ // commit attempts to apply the txn's changes to the server.
+ commit() *v3.TxnResponse
+ reset()
+}
+
+// Isolation is an enumeration of transactional isolation levels which
+// describes how transactions should interfere and conflict.
+type Isolation int
+
+const (
+ // SerializableSnapshot provides serializable isolation and also checks
+ // for write conflicts.
+ SerializableSnapshot Isolation = iota
+ // Serializable reads within the same transaction attempt return data
+ // from the at the revision of the first read.
+ Serializable
+ // RepeatableReads reads within the same transaction attempt always
+ // return the same data.
+ RepeatableReads
+ // ReadCommitted reads keys from any committed revision.
+ ReadCommitted
+)
+
+// stmError safely passes STM errors through panic to the STM error channel.
+type stmError struct{ err error }
+
+type stmOptions struct {
+ iso Isolation
+ ctx context.Context
+ prefetch []string
+}
+
+type stmOption func(*stmOptions)
+
+// WithIsolation specifies the transaction isolation level.
+func WithIsolation(lvl Isolation) stmOption {
+ return func(so *stmOptions) { so.iso = lvl }
+}
+
+// WithAbortContext specifies the context for permanently aborting the transaction.
+func WithAbortContext(ctx context.Context) stmOption {
+ return func(so *stmOptions) { so.ctx = ctx }
+}
+
+// WithPrefetch is a hint to prefetch a list of keys before trying to apply.
+// If an STM transaction will unconditionally fetch a set of keys, prefetching
+// those keys will save the round-trip cost from requesting each key one by one
+// with Get().
+func WithPrefetch(keys ...string) stmOption {
+ return func(so *stmOptions) { so.prefetch = append(so.prefetch, keys...) }
+}
+
+// NewSTM initiates a new STM instance, using serializable snapshot isolation by default.
+func NewSTM(c *v3.Client, apply func(STM) error, so ...stmOption) (*v3.TxnResponse, error) {
+ opts := &stmOptions{ctx: c.Ctx()}
+ for _, f := range so {
+ f(opts)
+ }
+ if len(opts.prefetch) != 0 {
+ f := apply
+ apply = func(s STM) error {
+ s.Get(opts.prefetch...)
+ return f(s)
+ }
+ }
+ return runSTM(mkSTM(c, opts), apply)
+}
+
+func mkSTM(c *v3.Client, opts *stmOptions) STM {
+ switch opts.iso {
+ case SerializableSnapshot:
+ s := &stmSerializable{
+ stm: stm{client: c, ctx: opts.ctx},
+ prefetch: make(map[string]*v3.GetResponse),
+ }
+ s.conflicts = func() []v3.Cmp {
+ return append(s.rset.cmps(), s.wset.cmps(s.rset.first()+1)...)
+ }
+ return s
+ case Serializable:
+ s := &stmSerializable{
+ stm: stm{client: c, ctx: opts.ctx},
+ prefetch: make(map[string]*v3.GetResponse),
+ }
+ s.conflicts = func() []v3.Cmp { return s.rset.cmps() }
+ return s
+ case RepeatableReads:
+ s := &stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
+ s.conflicts = func() []v3.Cmp { return s.rset.cmps() }
+ return s
+ case ReadCommitted:
+ s := &stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
+ s.conflicts = func() []v3.Cmp { return nil }
+ return s
+ default:
+ panic("unsupported stm")
+ }
+}
+
+type stmResponse struct {
+ resp *v3.TxnResponse
+ err error
+}
+
+func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) {
+ outc := make(chan stmResponse, 1)
+ go func() {
+ defer func() {
+ if r := recover(); r != nil {
+ e, ok := r.(stmError)
+ if !ok {
+ // client apply panicked
+ panic(r)
+ }
+ outc <- stmResponse{nil, e.err}
+ }
+ }()
+ var out stmResponse
+ for {
+ s.reset()
+ if out.err = apply(s); out.err != nil {
+ break
+ }
+ if out.resp = s.commit(); out.resp != nil {
+ break
+ }
+ }
+ outc <- out
+ }()
+ r := <-outc
+ return r.resp, r.err
+}
+
+// stm implements repeatable-read software transactional memory over etcd
+type stm struct {
+ client *v3.Client
+ ctx context.Context
+ // rset holds read key values and revisions
+ rset readSet
+ // wset holds overwritten keys and their values
+ wset writeSet
+ // getOpts are the opts used for gets
+ getOpts []v3.OpOption
+ // conflicts computes the current conflicts on the txn
+ conflicts func() []v3.Cmp
+}
+
+type stmPut struct {
+ val string
+ op v3.Op
+}
+
+type readSet map[string]*v3.GetResponse
+
+func (rs readSet) add(keys []string, txnresp *v3.TxnResponse) {
+ for i, resp := range txnresp.Responses {
+ rs[keys[i]] = (*v3.GetResponse)(resp.GetResponseRange())
+ }
+}
+
+// first returns the store revision from the first fetch
+func (rs readSet) first() int64 {
+ ret := int64(math.MaxInt64 - 1)
+ for _, resp := range rs {
+ if rev := resp.Header.Revision; rev < ret {
+ ret = rev
+ }
+ }
+ return ret
+}
+
+// cmps guards the txn from updates to read set
+func (rs readSet) cmps() []v3.Cmp {
+ cmps := make([]v3.Cmp, 0, len(rs))
+ for k, rk := range rs {
+ cmps = append(cmps, isKeyCurrent(k, rk))
+ }
+ return cmps
+}
+
+type writeSet map[string]stmPut
+
+func (ws writeSet) get(keys ...string) *stmPut {
+ for _, key := range keys {
+ if wv, ok := ws[key]; ok {
+ return &wv
+ }
+ }
+ return nil
+}
+
+// cmps returns a cmp list testing no writes have happened past rev
+func (ws writeSet) cmps(rev int64) []v3.Cmp {
+ cmps := make([]v3.Cmp, 0, len(ws))
+ for key := range ws {
+ cmps = append(cmps, v3.Compare(v3.ModRevision(key), "<", rev))
+ }
+ return cmps
+}
+
+// puts is the list of ops for all pending writes
+func (ws writeSet) puts() []v3.Op {
+ puts := make([]v3.Op, 0, len(ws))
+ for _, v := range ws {
+ puts = append(puts, v.op)
+ }
+ return puts
+}
+
+func (s *stm) Get(keys ...string) string {
+ if wv := s.wset.get(keys...); wv != nil {
+ return wv.val
+ }
+ return respToValue(s.fetch(keys...))
+}
+
+func (s *stm) Put(key, val string, opts ...v3.OpOption) {
+ s.wset[key] = stmPut{val, v3.OpPut(key, val, opts...)}
+}
+
+func (s *stm) Del(key string) { s.wset[key] = stmPut{"", v3.OpDelete(key)} }
+
+func (s *stm) Rev(key string) int64 {
+ if resp := s.fetch(key); resp != nil && len(resp.Kvs) != 0 {
+ return resp.Kvs[0].ModRevision
+ }
+ return 0
+}
+
+func (s *stm) commit() *v3.TxnResponse {
+ txnresp, err := s.client.Txn(s.ctx).If(s.conflicts()...).Then(s.wset.puts()...).Commit()
+ if err != nil {
+ panic(stmError{err})
+ }
+ if txnresp.Succeeded {
+ return txnresp
+ }
+ return nil
+}
+
+func (s *stm) fetch(keys ...string) *v3.GetResponse {
+ if len(keys) == 0 {
+ return nil
+ }
+ ops := make([]v3.Op, len(keys))
+ for i, key := range keys {
+ if resp, ok := s.rset[key]; ok {
+ return resp
+ }
+ ops[i] = v3.OpGet(key, s.getOpts...)
+ }
+ txnresp, err := s.client.Txn(s.ctx).Then(ops...).Commit()
+ if err != nil {
+ panic(stmError{err})
+ }
+ s.rset.add(keys, txnresp)
+ return (*v3.GetResponse)(txnresp.Responses[0].GetResponseRange())
+}
+
+func (s *stm) reset() {
+ s.rset = make(map[string]*v3.GetResponse)
+ s.wset = make(map[string]stmPut)
+}
+
+type stmSerializable struct {
+ stm
+ prefetch map[string]*v3.GetResponse
+}
+
+func (s *stmSerializable) Get(keys ...string) string {
+ if wv := s.wset.get(keys...); wv != nil {
+ return wv.val
+ }
+ firstRead := len(s.rset) == 0
+ for _, key := range keys {
+ if resp, ok := s.prefetch[key]; ok {
+ delete(s.prefetch, key)
+ s.rset[key] = resp
+ }
+ }
+ resp := s.stm.fetch(keys...)
+ if firstRead {
+ // txn's base revision is defined by the first read
+ s.getOpts = []v3.OpOption{
+ v3.WithRev(resp.Header.Revision),
+ v3.WithSerializable(),
+ }
+ }
+ return respToValue(resp)
+}
+
+func (s *stmSerializable) Rev(key string) int64 {
+ s.Get(key)
+ return s.stm.Rev(key)
+}
+
+func (s *stmSerializable) gets() ([]string, []v3.Op) {
+ keys := make([]string, 0, len(s.rset))
+ ops := make([]v3.Op, 0, len(s.rset))
+ for k := range s.rset {
+ keys = append(keys, k)
+ ops = append(ops, v3.OpGet(k))
+ }
+ return keys, ops
+}
+
+func (s *stmSerializable) commit() *v3.TxnResponse {
+ keys, getops := s.gets()
+ txn := s.client.Txn(s.ctx).If(s.conflicts()...).Then(s.wset.puts()...)
+ // use Else to prefetch keys in case of conflict to save a round trip
+ txnresp, err := txn.Else(getops...).Commit()
+ if err != nil {
+ panic(stmError{err})
+ }
+ if txnresp.Succeeded {
+ return txnresp
+ }
+ // load prefetch with Else data
+ s.rset.add(keys, txnresp)
+ s.prefetch = s.rset
+ s.getOpts = nil
+ return nil
+}
+
+func isKeyCurrent(k string, r *v3.GetResponse) v3.Cmp {
+ if len(r.Kvs) != 0 {
+ return v3.Compare(v3.ModRevision(k), "=", r.Kvs[0].ModRevision)
+ }
+ return v3.Compare(v3.ModRevision(k), "=", 0)
+}
+
+func respToValue(resp *v3.GetResponse) string {
+ if resp == nil || len(resp.Kvs) == 0 {
+ return ""
+ }
+ return string(resp.Kvs[0].Value)
+}
+
+// NewSTMRepeatable is deprecated.
+func NewSTMRepeatable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
+ return NewSTM(c, apply, WithAbortContext(ctx), WithIsolation(RepeatableReads))
+}
+
+// NewSTMSerializable is deprecated.
+func NewSTMSerializable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
+ return NewSTM(c, apply, WithAbortContext(ctx), WithIsolation(Serializable))
+}
+
+// NewSTMReadCommitted is deprecated.
+func NewSTMReadCommitted(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
+ return NewSTM(c, apply, WithAbortContext(ctx), WithIsolation(ReadCommitted))
+}