blob: e18a0ed4ad92d0213c6aff6da57f21fd800afea4 [file] [log] [blame]
sslobodrd046be82019-01-16 10:02:22 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package concurrency
16
17import (
18 "context"
19 "errors"
20 "fmt"
21
22 v3 "github.com/coreos/etcd/clientv3"
23 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
24 "github.com/coreos/etcd/mvcc/mvccpb"
25)
26
27var (
28 ErrElectionNotLeader = errors.New("election: not leader")
29 ErrElectionNoLeader = errors.New("election: no leader")
30)
31
32type Election struct {
33 session *Session
34
35 keyPrefix string
36
37 leaderKey string
38 leaderRev int64
39 leaderSession *Session
40 hdr *pb.ResponseHeader
41}
42
43// NewElection returns a new election on a given key prefix.
44func NewElection(s *Session, pfx string) *Election {
45 return &Election{session: s, keyPrefix: pfx + "/"}
46}
47
48// ResumeElection initializes an election with a known leader.
49func ResumeElection(s *Session, pfx string, leaderKey string, leaderRev int64) *Election {
50 return &Election{
51 session: s,
52 leaderKey: leaderKey,
53 leaderRev: leaderRev,
54 leaderSession: s,
55 }
56}
57
58// Campaign puts a value as eligible for the election. It blocks until
59// it is elected, an error occurs, or the context is cancelled.
60func (e *Election) Campaign(ctx context.Context, val string) error {
61 s := e.session
62 client := e.session.Client()
63
64 k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
65 txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
66 txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
67 txn = txn.Else(v3.OpGet(k))
68 resp, err := txn.Commit()
69 if err != nil {
70 return err
71 }
72 e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
73 if !resp.Succeeded {
74 kv := resp.Responses[0].GetResponseRange().Kvs[0]
75 e.leaderRev = kv.CreateRevision
76 if string(kv.Value) != val {
77 if err = e.Proclaim(ctx, val); err != nil {
78 e.Resign(ctx)
79 return err
80 }
81 }
82 }
83
84 _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
85 if err != nil {
86 // clean up in case of context cancel
87 select {
88 case <-ctx.Done():
89 e.Resign(client.Ctx())
90 default:
91 e.leaderSession = nil
92 }
93 return err
94 }
95 e.hdr = resp.Header
96
97 return nil
98}
99
100// Proclaim lets the leader announce a new value without another election.
101func (e *Election) Proclaim(ctx context.Context, val string) error {
102 if e.leaderSession == nil {
103 return ErrElectionNotLeader
104 }
105 client := e.session.Client()
106 cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
107 txn := client.Txn(ctx).If(cmp)
108 txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
109 tresp, terr := txn.Commit()
110 if terr != nil {
111 return terr
112 }
113 if !tresp.Succeeded {
114 e.leaderKey = ""
115 return ErrElectionNotLeader
116 }
117
118 e.hdr = tresp.Header
119 return nil
120}
121
122// Resign lets a leader start a new election.
123func (e *Election) Resign(ctx context.Context) (err error) {
124 if e.leaderSession == nil {
125 return nil
126 }
127 client := e.session.Client()
128 cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
129 resp, err := client.Txn(ctx).If(cmp).Then(v3.OpDelete(e.leaderKey)).Commit()
130 if err == nil {
131 e.hdr = resp.Header
132 }
133 e.leaderKey = ""
134 e.leaderSession = nil
135 return err
136}
137
138// Leader returns the leader value for the current election.
139func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) {
140 client := e.session.Client()
141 resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
142 if err != nil {
143 return nil, err
144 } else if len(resp.Kvs) == 0 {
145 // no leader currently elected
146 return nil, ErrElectionNoLeader
147 }
148 return resp, nil
149}
150
151// Observe returns a channel that reliably observes ordered leader proposals
152// as GetResponse values on every current elected leader key. It will not
153// necessarily fetch all historical leader updates, but will always post the
154// most recent leader value.
155//
156// The channel closes when the context is canceled or the underlying watcher
157// is otherwise disrupted.
158func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse {
159 retc := make(chan v3.GetResponse)
160 go e.observe(ctx, retc)
161 return retc
162}
163
164func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
165 client := e.session.Client()
166
167 defer close(ch)
168 for {
169 resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
170 if err != nil {
171 return
172 }
173
174 var kv *mvccpb.KeyValue
175 var hdr *pb.ResponseHeader
176
177 if len(resp.Kvs) == 0 {
178 cctx, cancel := context.WithCancel(ctx)
179 // wait for first key put on prefix
180 opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
181 wch := client.Watch(cctx, e.keyPrefix, opts...)
182 for kv == nil {
183 wr, ok := <-wch
184 if !ok || wr.Err() != nil {
185 cancel()
186 return
187 }
188 // only accept puts; a delete will make observe() spin
189 for _, ev := range wr.Events {
190 if ev.Type == mvccpb.PUT {
191 hdr, kv = &wr.Header, ev.Kv
192 // may have multiple revs; hdr.rev = the last rev
193 // set to kv's rev in case batch has multiple Puts
194 hdr.Revision = kv.ModRevision
195 break
196 }
197 }
198 }
199 cancel()
200 } else {
201 hdr, kv = resp.Header, resp.Kvs[0]
202 }
203
204 select {
205 case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
206 case <-ctx.Done():
207 return
208 }
209
210 cctx, cancel := context.WithCancel(ctx)
211 wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
212 keyDeleted := false
213 for !keyDeleted {
214 wr, ok := <-wch
215 if !ok {
216 cancel()
217 return
218 }
219 for _, ev := range wr.Events {
220 if ev.Type == mvccpb.DELETE {
221 keyDeleted = true
222 break
223 }
224 resp.Header = &wr.Header
225 resp.Kvs = []*mvccpb.KeyValue{ev.Kv}
226 select {
227 case ch <- *resp:
228 case <-cctx.Done():
229 cancel()
230 return
231 }
232 }
233 }
234 cancel()
235 }
236}
237
238// Key returns the leader key if elected, empty string otherwise.
239func (e *Election) Key() string { return e.leaderKey }
240
241// Rev returns the leader key's creation revision, if elected.
242func (e *Election) Rev() int64 { return e.leaderRev }
243
244// Header is the response header from the last successful election proposal.
245func (e *Election) Header() *pb.ResponseHeader { return e.hdr }