blob: c39702ec47c16eee1038e85345646f61f93668a5 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package balancer
16
17import (
18 "fmt"
19 "strconv"
20 "sync"
21 "time"
22
23 "go.etcd.io/etcd/clientv3/balancer/picker"
24
25 "go.uber.org/zap"
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/connectivity"
28 "google.golang.org/grpc/resolver"
29 _ "google.golang.org/grpc/resolver/dns" // register DNS resolver
30 _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
31)
32
33// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
34// must be invoked at initialization time.
35func RegisterBuilder(cfg Config) {
36 bb := &builder{cfg}
37 balancer.Register(bb)
38
39 bb.cfg.Logger.Debug(
40 "registered balancer",
41 zap.String("policy", bb.cfg.Policy.String()),
42 zap.String("name", bb.cfg.Name),
43 )
44}
45
46type builder struct {
47 cfg Config
48}
49
50// Build is called initially when creating "ccBalancerWrapper".
51// "grpc.Dial" is called to this client connection.
52// Then, resolved addresses will be handled via "HandleResolvedAddrs".
53func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
54 bb := &baseBalancer{
55 id: strconv.FormatInt(time.Now().UnixNano(), 36),
56 policy: b.cfg.Policy,
Abhilash S.L3b494632019-07-16 15:51:09 +053057 name: b.cfg.Name,
William Kurkianea869482019-04-09 15:16:11 -040058 lg: b.cfg.Logger,
59
60 addrToSc: make(map[resolver.Address]balancer.SubConn),
61 scToAddr: make(map[balancer.SubConn]resolver.Address),
62 scToSt: make(map[balancer.SubConn]connectivity.State),
63
64 currentConn: nil,
65 csEvltr: &connectivityStateEvaluator{},
66
67 // initialize picker always returns "ErrNoSubConnAvailable"
68 Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
69 }
William Kurkianea869482019-04-09 15:16:11 -040070 if bb.lg == nil {
71 bb.lg = zap.NewNop()
72 }
73
74 // TODO: support multiple connections
75 bb.mu.Lock()
76 bb.currentConn = cc
77 bb.mu.Unlock()
78
79 bb.lg.Info(
80 "built balancer",
81 zap.String("balancer-id", bb.id),
82 zap.String("policy", bb.policy.String()),
83 zap.String("resolver-target", cc.Target()),
84 )
85 return bb
86}
87
88// Name implements "grpc/balancer.Builder" interface.
89func (b *builder) Name() string { return b.cfg.Name }
90
91// Balancer defines client balancer interface.
92type Balancer interface {
93 // Balancer is called on specified client connection. Client initiates gRPC
94 // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
95 // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
96 // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
97 // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
98 // changes, thus requires failover logic in this method.
99 balancer.Balancer
100
101 // Picker calls "Pick" for every client request.
102 picker.Picker
103}
104
105type baseBalancer struct {
106 id string
107 policy picker.Policy
108 name string
109 lg *zap.Logger
110
111 mu sync.RWMutex
112
113 addrToSc map[resolver.Address]balancer.SubConn
114 scToAddr map[balancer.SubConn]resolver.Address
115 scToSt map[balancer.SubConn]connectivity.State
116
117 currentConn balancer.ClientConn
118 currentState connectivity.State
119 csEvltr *connectivityStateEvaluator
120
121 picker.Picker
122}
123
124// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
125// gRPC sends initial or updated resolved addresses from "Build".
126func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
127 if err != nil {
128 bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
129 return
130 }
131 bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
132
133 bb.mu.Lock()
134 defer bb.mu.Unlock()
135
136 resolved := make(map[resolver.Address]struct{})
137 for _, addr := range addrs {
138 resolved[addr] = struct{}{}
139 if _, ok := bb.addrToSc[addr]; !ok {
140 sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
141 if err != nil {
142 bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
143 continue
144 }
145 bb.addrToSc[addr] = sc
146 bb.scToAddr[sc] = addr
147 bb.scToSt[sc] = connectivity.Idle
148 sc.Connect()
149 }
150 }
151
152 for addr, sc := range bb.addrToSc {
153 if _, ok := resolved[addr]; !ok {
154 // was removed by resolver or failed to create subconn
155 bb.currentConn.RemoveSubConn(sc)
156 delete(bb.addrToSc, addr)
157
158 bb.lg.Info(
159 "removed subconn",
160 zap.String("balancer-id", bb.id),
161 zap.String("address", addr.Addr),
162 zap.String("subconn", scToString(sc)),
163 )
164
165 // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
166 // The entry will be deleted in HandleSubConnStateChange.
167 // (DO NOT) delete(bb.scToAddr, sc)
168 // (DO NOT) delete(bb.scToSt, sc)
169 }
170 }
171}
172
173// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
174func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
175 bb.mu.Lock()
176 defer bb.mu.Unlock()
177
178 old, ok := bb.scToSt[sc]
179 if !ok {
180 bb.lg.Warn(
181 "state change for an unknown subconn",
182 zap.String("balancer-id", bb.id),
183 zap.String("subconn", scToString(sc)),
184 zap.String("state", s.String()),
185 )
186 return
187 }
188
189 bb.lg.Info(
190 "state changed",
191 zap.String("balancer-id", bb.id),
192 zap.Bool("connected", s == connectivity.Ready),
193 zap.String("subconn", scToString(sc)),
194 zap.String("address", bb.scToAddr[sc].Addr),
195 zap.String("old-state", old.String()),
196 zap.String("new-state", s.String()),
197 )
198
199 bb.scToSt[sc] = s
200 switch s {
201 case connectivity.Idle:
202 sc.Connect()
203 case connectivity.Shutdown:
204 // When an address was removed by resolver, b called RemoveSubConn but
205 // kept the sc's state in scToSt. Remove state for this sc here.
206 delete(bb.scToAddr, sc)
207 delete(bb.scToSt, sc)
208 }
209
210 oldAggrState := bb.currentState
211 bb.currentState = bb.csEvltr.recordTransition(old, s)
212
213 // Regenerate picker when one of the following happens:
214 // - this sc became ready from not-ready
215 // - this sc became not-ready from ready
216 // - the aggregated state of balancer became TransientFailure from non-TransientFailure
217 // - the aggregated state of balancer became non-TransientFailure from TransientFailure
218 if (s == connectivity.Ready) != (old == connectivity.Ready) ||
219 (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
220 bb.regeneratePicker()
221 }
222
223 bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
William Kurkianea869482019-04-09 15:16:11 -0400224}
225
226func (bb *baseBalancer) regeneratePicker() {
227 if bb.currentState == connectivity.TransientFailure {
228 bb.lg.Info(
229 "generated transient error picker",
230 zap.String("balancer-id", bb.id),
231 zap.String("policy", bb.policy.String()),
232 )
233 bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
234 return
235 }
236
237 // only pass ready subconns to picker
238 scs := make([]balancer.SubConn, 0)
239 addrToSc := make(map[resolver.Address]balancer.SubConn)
240 scToAddr := make(map[balancer.SubConn]resolver.Address)
241 for addr, sc := range bb.addrToSc {
242 if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
243 scs = append(scs, sc)
244 addrToSc[addr] = sc
245 scToAddr[sc] = addr
246 }
247 }
248
249 switch bb.policy {
250 case picker.RoundrobinBalanced:
251 bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
252
253 default:
254 panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
255 }
256
257 bb.lg.Info(
258 "generated picker",
259 zap.String("balancer-id", bb.id),
260 zap.String("policy", bb.policy.String()),
261 zap.Strings("subconn-ready", scsToStrings(addrToSc)),
262 zap.Int("subconn-size", len(addrToSc)),
263 )
264}
265
266// Close implements "grpc/balancer.Balancer" interface.
267// Close is a nop because base balancer doesn't have internal state to clean up,
268// and it doesn't need to call RemoveSubConn for the SubConns.
269func (bb *baseBalancer) Close() {
270 // TODO
271}