blob: c3c2d2485693cc2e08eab84e5140be80ceb69939 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "sync"
20
21 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
22
23 "google.golang.org/grpc"
24)
25
26// Txn is the interface that wraps mini-transactions.
27//
28// Txn(context.TODO()).If(
29// Compare(Value(k1), ">", v1),
30// Compare(Version(k1), "=", 2)
31// ).Then(
32// OpPut(k2,v2), OpPut(k3,v3)
33// ).Else(
34// OpPut(k4,v4), OpPut(k5,v5)
35// ).Commit()
36//
37type Txn interface {
38 // If takes a list of comparison. If all comparisons passed in succeed,
39 // the operations passed into Then() will be executed. Or the operations
40 // passed into Else() will be executed.
41 If(cs ...Cmp) Txn
42
43 // Then takes a list of operations. The Ops list will be executed, if the
44 // comparisons passed in If() succeed.
45 Then(ops ...Op) Txn
46
47 // Else takes a list of operations. The Ops list will be executed, if the
48 // comparisons passed in If() fail.
49 Else(ops ...Op) Txn
50
51 // Commit tries to commit the transaction.
52 Commit() (*TxnResponse, error)
53}
54
55type txn struct {
56 kv *kv
57 ctx context.Context
58
59 mu sync.Mutex
60 cif bool
61 cthen bool
62 celse bool
63
64 isWrite bool
65
66 cmps []*pb.Compare
67
68 sus []*pb.RequestOp
69 fas []*pb.RequestOp
70
71 callOpts []grpc.CallOption
72}
73
74func (txn *txn) If(cs ...Cmp) Txn {
75 txn.mu.Lock()
76 defer txn.mu.Unlock()
77
78 if txn.cif {
79 panic("cannot call If twice!")
80 }
81
82 if txn.cthen {
83 panic("cannot call If after Then!")
84 }
85
86 if txn.celse {
87 panic("cannot call If after Else!")
88 }
89
90 txn.cif = true
91
92 for i := range cs {
93 txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
94 }
95
96 return txn
97}
98
99func (txn *txn) Then(ops ...Op) Txn {
100 txn.mu.Lock()
101 defer txn.mu.Unlock()
102
103 if txn.cthen {
104 panic("cannot call Then twice!")
105 }
106 if txn.celse {
107 panic("cannot call Then after Else!")
108 }
109
110 txn.cthen = true
111
112 for _, op := range ops {
113 txn.isWrite = txn.isWrite || op.isWrite()
114 txn.sus = append(txn.sus, op.toRequestOp())
115 }
116
117 return txn
118}
119
120func (txn *txn) Else(ops ...Op) Txn {
121 txn.mu.Lock()
122 defer txn.mu.Unlock()
123
124 if txn.celse {
125 panic("cannot call Else twice!")
126 }
127
128 txn.celse = true
129
130 for _, op := range ops {
131 txn.isWrite = txn.isWrite || op.isWrite()
132 txn.fas = append(txn.fas, op.toRequestOp())
133 }
134
135 return txn
136}
137
138func (txn *txn) Commit() (*TxnResponse, error) {
139 txn.mu.Lock()
140 defer txn.mu.Unlock()
141
142 r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
143
144 var resp *pb.TxnResponse
145 var err error
146 resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
147 if err != nil {
148 return nil, toErr(txn.ctx, err)
149 }
150 return (*TxnResponse)(resp), nil
151}