blob: 20160472af5b51a83388d181ea5f0cf324044a55 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package concurrency
16
17import (
18 "context"
19 "errors"
20 "fmt"
21
22 v3 "github.com/coreos/etcd/clientv3"
23 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
24 "github.com/coreos/etcd/mvcc/mvccpb"
25)
26
27var (
28 ErrElectionNotLeader = errors.New("election: not leader")
29 ErrElectionNoLeader = errors.New("election: no leader")
30)
31
32type Election struct {
33 session *Session
34
35 keyPrefix string
36
37 leaderKey string
38 leaderRev int64
39 leaderSession *Session
40 hdr *pb.ResponseHeader
41}
42
43// NewElection returns a new election on a given key prefix.
44func NewElection(s *Session, pfx string) *Election {
45 return &Election{session: s, keyPrefix: pfx + "/"}
46}
47
48// ResumeElection initializes an election with a known leader.
49func ResumeElection(s *Session, pfx string, leaderKey string, leaderRev int64) *Election {
50 return &Election{
51 session: s,
52 keyPrefix: pfx,
53 leaderKey: leaderKey,
54 leaderRev: leaderRev,
55 leaderSession: s,
56 }
57}
58
59// Campaign puts a value as eligible for the election. It blocks until
60// it is elected, an error occurs, or the context is cancelled.
61func (e *Election) Campaign(ctx context.Context, val string) error {
62 s := e.session
63 client := e.session.Client()
64
65 k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
66 txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
67 txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
68 txn = txn.Else(v3.OpGet(k))
69 resp, err := txn.Commit()
70 if err != nil {
71 return err
72 }
73 e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
74 if !resp.Succeeded {
75 kv := resp.Responses[0].GetResponseRange().Kvs[0]
76 e.leaderRev = kv.CreateRevision
77 if string(kv.Value) != val {
78 if err = e.Proclaim(ctx, val); err != nil {
79 e.Resign(ctx)
80 return err
81 }
82 }
83 }
84
85 _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
86 if err != nil {
87 // clean up in case of context cancel
88 select {
89 case <-ctx.Done():
90 e.Resign(client.Ctx())
91 default:
92 e.leaderSession = nil
93 }
94 return err
95 }
96 e.hdr = resp.Header
97
98 return nil
99}
100
101// Proclaim lets the leader announce a new value without another election.
102func (e *Election) Proclaim(ctx context.Context, val string) error {
103 if e.leaderSession == nil {
104 return ErrElectionNotLeader
105 }
106 client := e.session.Client()
107 cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
108 txn := client.Txn(ctx).If(cmp)
109 txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
110 tresp, terr := txn.Commit()
111 if terr != nil {
112 return terr
113 }
114 if !tresp.Succeeded {
115 e.leaderKey = ""
116 return ErrElectionNotLeader
117 }
118
119 e.hdr = tresp.Header
120 return nil
121}
122
123// Resign lets a leader start a new election.
124func (e *Election) Resign(ctx context.Context) (err error) {
125 if e.leaderSession == nil {
126 return nil
127 }
128 client := e.session.Client()
129 cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
130 resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
131 if err == nil {
132 e.hdr = resp.Header
133 }
134 e.leaderKey = ""
135 e.leaderSession = nil
136 return err
137}
138
139// Leader returns the leader value for the current election.
140func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
141 client := e.session.Client()
142 resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
143 if err != nil {
144 return nil, err
145 } else if len(resp.Kvs) == 0 {
146 // no leader currently elected
147 return nil, ErrElectionNoLeader
148 }
149 return resp, nil
150}
151
152// Observe returns a channel that reliably observes ordered leader proposals
153// as GetResponse values on every current elected leader key. It will not
154// necessarily fetch all historical leader updates, but will always post the
155// most recent leader value.
156//
157// The channel closes when the context is canceled or the underlying watcher
158// is otherwise disrupted.
159func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse {
160 retc := make(chan v3.GetResponse)
161 go e.observe(ctx, retc)
162 return retc
163}
164
165func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
166 client := e.session.Client()
167
168 defer close(ch)
169 for {
170 resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
171 if err != nil {
172 return
173 }
174
175 var kv *mvccpb.KeyValue
176 var hdr *pb.ResponseHeader
177
178 if len(resp.Kvs) == 0 {
179 cctx, cancel := context.WithCancel(ctx)
180 // wait for first key put on prefix
181 opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
182 wch := client.Watch(cctx, e.keyPrefix, opts...)
183 for kv == nil {
184 wr, ok := <-wch
185 if !ok || wr.Err() != nil {
186 cancel()
187 return
188 }
189 // only accept puts; a delete will make observe() spin
190 for _, ev := range wr.Events {
191 if ev.Type == mvccpb.PUT {
192 hdr, kv = &wr.Header, ev.Kv
193 // may have multiple revs; hdr.rev = the last rev
194 // set to kv's rev in case batch has multiple Puts
195 hdr.Revision = kv.ModRevision
196 break
197 }
198 }
199 }
200 cancel()
201 } else {
202 hdr, kv = resp.Header, resp.Kvs[0]
203 }
204
205 select {
206 case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
207 case <-ctx.Done():
208 return
209 }
210
211 cctx, cancel := context.WithCancel(ctx)
212 wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
213 keyDeleted := false
214 for !keyDeleted {
215 wr, ok := <-wch
216 if !ok {
217 cancel()
218 return
219 }
220 for _, ev := range wr.Events {
221 if ev.Type == mvccpb.DELETE {
222 keyDeleted = true
223 break
224 }
225 resp.Header = &wr.Header
226 resp.Kvs = []*mvccpb.KeyValue{ev.Kv}
227 select {
228 case ch <- *resp:
229 case <-cctx.Done():
230 cancel()
231 return
232 }
233 }
234 }
235 cancel()
236 }
237}
238
239// Key returns the leader key if elected, empty string otherwise.
240func (e *Election) Key() string { return e.leaderKey }
241
242// Rev returns the leader key's creation revision, if elected.
243func (e *Election) Rev() int64 { return e.leaderRev }
244
245// Header is the response header from the last successful election proposal.
246func (e *Election) Header() *pb.ResponseHeader { return e.hdr }