blob: 042c949b70806bbcde16f203b39c18b00be7daf2 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package grpcproxy
16
17import (
18 "context"
19 "math"
20 "sync"
21
22 "github.com/coreos/etcd/clientv3"
23 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
24
25 "golang.org/x/time/rate"
26 "google.golang.org/grpc"
27)
28
29const (
30 lostLeaderKey = "__lostleader" // watched to detect leader loss
31 retryPerSecond = 10
32)
33
34type leader struct {
35 ctx context.Context
36 w clientv3.Watcher
37 // mu protects leaderc updates.
38 mu sync.RWMutex
39 leaderc chan struct{}
40 disconnc chan struct{}
41 donec chan struct{}
42}
43
44func newLeader(ctx context.Context, w clientv3.Watcher) *leader {
45 l := &leader{
46 ctx: clientv3.WithRequireLeader(ctx),
47 w: w,
48 leaderc: make(chan struct{}),
49 disconnc: make(chan struct{}),
50 donec: make(chan struct{}),
51 }
52 // begin assuming leader is lost
53 close(l.leaderc)
54 go l.recvLoop()
55 return l
56}
57
58func (l *leader) recvLoop() {
59 defer close(l.donec)
60
61 limiter := rate.NewLimiter(rate.Limit(retryPerSecond), retryPerSecond)
62 rev := int64(math.MaxInt64 - 2)
63 for limiter.Wait(l.ctx) == nil {
64 wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify())
65 cresp, ok := <-wch
66 if !ok {
67 l.loseLeader()
68 continue
69 }
70 if cresp.Err() != nil {
71 l.loseLeader()
72 if rpctypes.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() {
73 close(l.disconnc)
74 return
75 }
76 continue
77 }
78 l.gotLeader()
79 <-wch
80 l.loseLeader()
81 }
82}
83
84func (l *leader) loseLeader() {
85 l.mu.RLock()
86 defer l.mu.RUnlock()
87 select {
88 case <-l.leaderc:
89 default:
90 close(l.leaderc)
91 }
92}
93
94// gotLeader will force update the leadership status to having a leader.
95func (l *leader) gotLeader() {
96 l.mu.Lock()
97 defer l.mu.Unlock()
98 select {
99 case <-l.leaderc:
100 l.leaderc = make(chan struct{})
101 default:
102 }
103}
104
105func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc }
106
107func (l *leader) stopNotify() <-chan struct{} { return l.donec }
108
109// lostNotify returns a channel that is closed if there has been
110// a leader loss not yet followed by a leader reacquire.
111func (l *leader) lostNotify() <-chan struct{} {
112 l.mu.RLock()
113 defer l.mu.RUnlock()
114 return l.leaderc
115}