blob: 2521db6ac045905b6e1fb20d2fb35fab0ee79a24 [file] [log] [blame]
divyadesai81bb7ba2020-03-11 11:45:23 +00001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package concurrency
16
17import (
18 "context"
19 "errors"
20 "fmt"
21
22 v3 "go.etcd.io/etcd/clientv3"
23 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
24 "go.etcd.io/etcd/mvcc/mvccpb"
25)
26
27var (
28 ErrElectionNotLeader = errors.New("election: not leader")
29 ErrElectionNoLeader = errors.New("election: no leader")
30)
31
32type Election struct {
33 session *Session
34
35 keyPrefix string
36
37 leaderKey string
38 leaderRev int64
39 leaderSession *Session
40 hdr *pb.ResponseHeader
41}
42
43// NewElection returns a new election on a given key prefix.
44func NewElection(s *Session, pfx string) *Election {
45 return &Election{session: s, keyPrefix: pfx + "/"}
46}
47
48// ResumeElection initializes an election with a known leader.
49func ResumeElection(s *Session, pfx string, leaderKey string, leaderRev int64) *Election {
50 return &Election{
51 keyPrefix: pfx,
52 session: s,
53 leaderKey: leaderKey,
54 leaderRev: leaderRev,
55 leaderSession: s,
56 }
57}
58
59// Campaign puts a value as eligible for the election on the prefix
60// key.
61// Multiple sessions can participate in the election for the
62// same prefix, but only one can be the leader at a time.
63//
64// If the context is 'context.TODO()/context.Background()', the Campaign
65// will continue to be blocked for other keys to be deleted, unless server
66// returns a non-recoverable error (e.g. ErrCompacted).
67// Otherwise, until the context is not cancelled or timed-out, Campaign will
68// continue to be blocked until it becomes the leader.
69func (e *Election) Campaign(ctx context.Context, val string) error {
70 s := e.session
71 client := e.session.Client()
72
73 k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
74 txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
75 txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
76 txn = txn.Else(v3.OpGet(k))
77 resp, err := txn.Commit()
78 if err != nil {
79 return err
80 }
81 e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
82 if !resp.Succeeded {
83 kv := resp.Responses[0].GetResponseRange().Kvs[0]
84 e.leaderRev = kv.CreateRevision
85 if string(kv.Value) != val {
86 if err = e.Proclaim(ctx, val); err != nil {
87 e.Resign(ctx)
88 return err
89 }
90 }
91 }
92
93 _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
94 if err != nil {
95 // clean up in case of context cancel
96 select {
97 case <-ctx.Done():
98 e.Resign(client.Ctx())
99 default:
100 e.leaderSession = nil
101 }
102 return err
103 }
104 e.hdr = resp.Header
105
106 return nil
107}
108
109// Proclaim lets the leader announce a new value without another election.
110func (e *Election) Proclaim(ctx context.Context, val string) error {
111 if e.leaderSession == nil {
112 return ErrElectionNotLeader
113 }
114 client := e.session.Client()
115 cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
116 txn := client.Txn(ctx).If(cmp)
117 txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
118 tresp, terr := txn.Commit()
119 if terr != nil {
120 return terr
121 }
122 if !tresp.Succeeded {
123 e.leaderKey = ""
124 return ErrElectionNotLeader
125 }
126
127 e.hdr = tresp.Header
128 return nil
129}
130
131// Resign lets a leader start a new election.
132func (e *Election) Resign(ctx context.Context) (err error) {
133 if e.leaderSession == nil {
134 return nil
135 }
136 client := e.session.Client()
137 cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
138 resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
139 if err == nil {
140 e.hdr = resp.Header
141 }
142 e.leaderKey = ""
143 e.leaderSession = nil
144 return err
145}
146
147// Leader returns the leader value for the current election.
148func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
149 client := e.session.Client()
150 resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
151 if err != nil {
152 return nil, err
153 } else if len(resp.Kvs) == 0 {
154 // no leader currently elected
155 return nil, ErrElectionNoLeader
156 }
157 return resp, nil
158}
159
160// Observe returns a channel that reliably observes ordered leader proposals
161// as GetResponse values on every current elected leader key. It will not
162// necessarily fetch all historical leader updates, but will always post the
163// most recent leader value.
164//
165// The channel closes when the context is canceled or the underlying watcher
166// is otherwise disrupted.
167func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse {
168 retc := make(chan v3.GetResponse)
169 go e.observe(ctx, retc)
170 return retc
171}
172
173func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
174 client := e.session.Client()
175
176 defer close(ch)
177 for {
178 resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
179 if err != nil {
180 return
181 }
182
183 var kv *mvccpb.KeyValue
184 var hdr *pb.ResponseHeader
185
186 if len(resp.Kvs) == 0 {
187 cctx, cancel := context.WithCancel(ctx)
188 // wait for first key put on prefix
189 opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
190 wch := client.Watch(cctx, e.keyPrefix, opts...)
191 for kv == nil {
192 wr, ok := <-wch
193 if !ok || wr.Err() != nil {
194 cancel()
195 return
196 }
197 // only accept puts; a delete will make observe() spin
198 for _, ev := range wr.Events {
199 if ev.Type == mvccpb.PUT {
200 hdr, kv = &wr.Header, ev.Kv
201 // may have multiple revs; hdr.rev = the last rev
202 // set to kv's rev in case batch has multiple Puts
203 hdr.Revision = kv.ModRevision
204 break
205 }
206 }
207 }
208 cancel()
209 } else {
210 hdr, kv = resp.Header, resp.Kvs[0]
211 }
212
213 select {
214 case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
215 case <-ctx.Done():
216 return
217 }
218
219 cctx, cancel := context.WithCancel(ctx)
220 wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
221 keyDeleted := false
222 for !keyDeleted {
223 wr, ok := <-wch
224 if !ok {
225 cancel()
226 return
227 }
228 for _, ev := range wr.Events {
229 if ev.Type == mvccpb.DELETE {
230 keyDeleted = true
231 break
232 }
233 resp.Header = &wr.Header
234 resp.Kvs = []*mvccpb.KeyValue{ev.Kv}
235 select {
236 case ch <- *resp:
237 case <-cctx.Done():
238 cancel()
239 return
240 }
241 }
242 }
243 cancel()
244 }
245}
246
247// Key returns the leader key if elected, empty string otherwise.
248func (e *Election) Key() string { return e.leaderKey }
249
250// Rev returns the leader key's creation revision, if elected.
251func (e *Election) Rev() int64 { return e.leaderRev }
252
253// Header is the response header from the last successful election proposal.
254func (e *Election) Header() *pb.ResponseHeader { return e.hdr }