blob: f5a3be3b23984121a03ba53cf29ff8de7eb54e8b [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3election
16
17import (
18 "context"
19 "errors"
20
21 "go.etcd.io/etcd/clientv3"
22 "go.etcd.io/etcd/clientv3/concurrency"
23 epb "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
24)
25
26// ErrMissingLeaderKey is returned when election API request
27// is missing the "leader" field.
28var ErrMissingLeaderKey = errors.New(`"leader" field must be provided`)
29
30type electionServer struct {
31 c *clientv3.Client
32}
33
34func NewElectionServer(c *clientv3.Client) epb.ElectionServer {
35 return &electionServer{c}
36}
37
38func (es *electionServer) Campaign(ctx context.Context, req *epb.CampaignRequest) (*epb.CampaignResponse, error) {
39 s, err := es.session(ctx, req.Lease)
40 if err != nil {
41 return nil, err
42 }
43 e := concurrency.NewElection(s, string(req.Name))
44 if err = e.Campaign(ctx, string(req.Value)); err != nil {
45 return nil, err
46 }
47 return &epb.CampaignResponse{
48 Header: e.Header(),
49 Leader: &epb.LeaderKey{
50 Name: req.Name,
51 Key: []byte(e.Key()),
52 Rev: e.Rev(),
53 Lease: int64(s.Lease()),
54 },
55 }, nil
56}
57
58func (es *electionServer) Proclaim(ctx context.Context, req *epb.ProclaimRequest) (*epb.ProclaimResponse, error) {
59 if req.Leader == nil {
60 return nil, ErrMissingLeaderKey
61 }
62 s, err := es.session(ctx, req.Leader.Lease)
63 if err != nil {
64 return nil, err
65 }
66 e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev)
67 if err := e.Proclaim(ctx, string(req.Value)); err != nil {
68 return nil, err
69 }
70 return &epb.ProclaimResponse{Header: e.Header()}, nil
71}
72
73func (es *electionServer) Observe(req *epb.LeaderRequest, stream epb.Election_ObserveServer) error {
74 s, err := es.session(stream.Context(), -1)
75 if err != nil {
76 return err
77 }
78 e := concurrency.NewElection(s, string(req.Name))
79 ch := e.Observe(stream.Context())
80 for stream.Context().Err() == nil {
81 select {
82 case <-stream.Context().Done():
83 case resp, ok := <-ch:
84 if !ok {
85 return nil
86 }
87 lresp := &epb.LeaderResponse{Header: resp.Header, Kv: resp.Kvs[0]}
88 if err := stream.Send(lresp); err != nil {
89 return err
90 }
91 }
92 }
93 return stream.Context().Err()
94}
95
96func (es *electionServer) Leader(ctx context.Context, req *epb.LeaderRequest) (*epb.LeaderResponse, error) {
97 s, err := es.session(ctx, -1)
98 if err != nil {
99 return nil, err
100 }
101 l, lerr := concurrency.NewElection(s, string(req.Name)).Leader(ctx)
102 if lerr != nil {
103 return nil, lerr
104 }
105 return &epb.LeaderResponse{Header: l.Header, Kv: l.Kvs[0]}, nil
106}
107
108func (es *electionServer) Resign(ctx context.Context, req *epb.ResignRequest) (*epb.ResignResponse, error) {
109 if req.Leader == nil {
110 return nil, ErrMissingLeaderKey
111 }
112 s, err := es.session(ctx, req.Leader.Lease)
113 if err != nil {
114 return nil, err
115 }
116 e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev)
117 if err := e.Resign(ctx); err != nil {
118 return nil, err
119 }
120 return &epb.ResignResponse{Header: e.Header()}, nil
121}
122
123func (es *electionServer) session(ctx context.Context, lease int64) (*concurrency.Session, error) {
124 s, err := concurrency.NewSession(
125 es.c,
126 concurrency.WithLease(clientv3.LeaseID(lease)),
127 concurrency.WithContext(ctx),
128 )
129 if err != nil {
130 return nil, err
131 }
132 s.Orphan()
133 return s, nil
134}