blob: 9306385e96cc3bc6e020f1235feb4cb708433a3a [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package balancer implements client balancer.
16package balancer
17
18import (
19 "strconv"
20 "sync"
21 "time"
22
23 "github.com/coreos/etcd/clientv3/balancer/connectivity"
24 "github.com/coreos/etcd/clientv3/balancer/picker"
25
26 "go.uber.org/zap"
27 "google.golang.org/grpc/balancer"
28 grpcconnectivity "google.golang.org/grpc/connectivity"
29 "google.golang.org/grpc/resolver"
30 _ "google.golang.org/grpc/resolver/dns" // register DNS resolver
31 _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
32)
33
34// Config defines balancer configurations.
35type Config struct {
36 // Policy configures balancer policy.
37 Policy picker.Policy
38
39 // Picker implements gRPC picker.
40 // Leave empty if "Policy" field is not custom.
41 // TODO: currently custom policy is not supported.
42 // Picker picker.Picker
43
44 // Name defines an additional name for balancer.
45 // Useful for balancer testing to avoid register conflicts.
46 // If empty, defaults to policy name.
47 Name string
48
49 // Logger configures balancer logging.
50 // If nil, logs are discarded.
51 Logger *zap.Logger
52}
53
54// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
55// must be invoked at initialization time.
56func RegisterBuilder(cfg Config) {
57 bb := &builder{cfg}
58 balancer.Register(bb)
59
60 bb.cfg.Logger.Debug(
61 "registered balancer",
62 zap.String("policy", bb.cfg.Policy.String()),
63 zap.String("name", bb.cfg.Name),
64 )
65}
66
67type builder struct {
68 cfg Config
69}
70
71// Build is called initially when creating "ccBalancerWrapper".
72// "grpc.Dial" is called to this client connection.
73// Then, resolved addresses will be handled via "HandleResolvedAddrs".
74func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
75 bb := &baseBalancer{
76 id: strconv.FormatInt(time.Now().UnixNano(), 36),
77 policy: b.cfg.Policy,
78 name: b.cfg.Name,
79 lg: b.cfg.Logger,
80
81 addrToSc: make(map[resolver.Address]balancer.SubConn),
82 scToAddr: make(map[balancer.SubConn]resolver.Address),
83 scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
84
85 currentConn: nil,
86 connectivityRecorder: connectivity.New(b.cfg.Logger),
87
88 // initialize picker always returns "ErrNoSubConnAvailable"
89 picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
90 }
91
92 // TODO: support multiple connections
93 bb.mu.Lock()
94 bb.currentConn = cc
95 bb.mu.Unlock()
96
97 bb.lg.Info(
98 "built balancer",
99 zap.String("balancer-id", bb.id),
100 zap.String("policy", bb.policy.String()),
101 zap.String("resolver-target", cc.Target()),
102 )
103 return bb
104}
105
106// Name implements "grpc/balancer.Builder" interface.
107func (b *builder) Name() string { return b.cfg.Name }
108
109// Balancer defines client balancer interface.
110type Balancer interface {
111 // Balancer is called on specified client connection. Client initiates gRPC
112 // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
113 // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
114 // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
115 // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
116 // changes, thus requires failover logic in this method.
117 balancer.Balancer
118
119 // Picker calls "Pick" for every client request.
120 picker.Picker
121}
122
123type baseBalancer struct {
124 id string
125 policy picker.Policy
126 name string
127 lg *zap.Logger
128
129 mu sync.RWMutex
130
131 addrToSc map[resolver.Address]balancer.SubConn
132 scToAddr map[balancer.SubConn]resolver.Address
133 scToSt map[balancer.SubConn]grpcconnectivity.State
134
135 currentConn balancer.ClientConn
136 connectivityRecorder connectivity.Recorder
137
138 picker picker.Picker
139}
140
141// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
142// gRPC sends initial or updated resolved addresses from "Build".
143func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
144 if err != nil {
145 bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
146 return
147 }
148 bb.lg.Info("resolved",
149 zap.String("picker", bb.picker.String()),
150 zap.String("balancer-id", bb.id),
151 zap.Strings("addresses", addrsToStrings(addrs)),
152 )
153
154 bb.mu.Lock()
155 defer bb.mu.Unlock()
156
157 resolved := make(map[resolver.Address]struct{})
158 for _, addr := range addrs {
159 resolved[addr] = struct{}{}
160 if _, ok := bb.addrToSc[addr]; !ok {
161 sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
162 if err != nil {
163 bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
164 continue
165 }
166 bb.lg.Info("created subconn", zap.String("address", addr.Addr))
167 bb.addrToSc[addr] = sc
168 bb.scToAddr[sc] = addr
169 bb.scToSt[sc] = grpcconnectivity.Idle
170 sc.Connect()
171 }
172 }
173
174 for addr, sc := range bb.addrToSc {
175 if _, ok := resolved[addr]; !ok {
176 // was removed by resolver or failed to create subconn
177 bb.currentConn.RemoveSubConn(sc)
178 delete(bb.addrToSc, addr)
179
180 bb.lg.Info(
181 "removed subconn",
182 zap.String("picker", bb.picker.String()),
183 zap.String("balancer-id", bb.id),
184 zap.String("address", addr.Addr),
185 zap.String("subconn", scToString(sc)),
186 )
187
188 // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
189 // The entry will be deleted in HandleSubConnStateChange.
190 // (DO NOT) delete(bb.scToAddr, sc)
191 // (DO NOT) delete(bb.scToSt, sc)
192 }
193 }
194}
195
196// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
197func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
198 bb.mu.Lock()
199 defer bb.mu.Unlock()
200
201 old, ok := bb.scToSt[sc]
202 if !ok {
203 bb.lg.Warn(
204 "state change for an unknown subconn",
205 zap.String("picker", bb.picker.String()),
206 zap.String("balancer-id", bb.id),
207 zap.String("subconn", scToString(sc)),
208 zap.Int("subconn-size", len(bb.scToAddr)),
209 zap.String("state", s.String()),
210 )
211 return
212 }
213
214 bb.lg.Info(
215 "state changed",
216 zap.String("picker", bb.picker.String()),
217 zap.String("balancer-id", bb.id),
218 zap.Bool("connected", s == grpcconnectivity.Ready),
219 zap.String("subconn", scToString(sc)),
220 zap.Int("subconn-size", len(bb.scToAddr)),
221 zap.String("address", bb.scToAddr[sc].Addr),
222 zap.String("old-state", old.String()),
223 zap.String("new-state", s.String()),
224 )
225
226 bb.scToSt[sc] = s
227 switch s {
228 case grpcconnectivity.Idle:
229 sc.Connect()
230 case grpcconnectivity.Shutdown:
231 // When an address was removed by resolver, b called RemoveSubConn but
232 // kept the sc's state in scToSt. Remove state for this sc here.
233 delete(bb.scToAddr, sc)
234 delete(bb.scToSt, sc)
235 }
236
237 oldAggrState := bb.connectivityRecorder.GetCurrentState()
238 bb.connectivityRecorder.RecordTransition(old, s)
239
240 // Update balancer picker when one of the following happens:
241 // - this sc became ready from not-ready
242 // - this sc became not-ready from ready
243 // - the aggregated state of balancer became TransientFailure from non-TransientFailure
244 // - the aggregated state of balancer became non-TransientFailure from TransientFailure
245 if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
246 (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
247 bb.updatePicker()
248 }
249
250 bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
251}
252
253func (bb *baseBalancer) updatePicker() {
254 if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
255 bb.picker = picker.NewErr(balancer.ErrTransientFailure)
256 bb.lg.Info(
257 "updated picker to transient error picker",
258 zap.String("picker", bb.picker.String()),
259 zap.String("balancer-id", bb.id),
260 zap.String("policy", bb.policy.String()),
261 )
262 return
263 }
264
265 // only pass ready subconns to picker
266 scToAddr := make(map[balancer.SubConn]resolver.Address)
267 for addr, sc := range bb.addrToSc {
268 if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
269 scToAddr[sc] = addr
270 }
271 }
272
273 bb.picker = picker.New(picker.Config{
274 Policy: bb.policy,
275 Logger: bb.lg,
276 SubConnToResolverAddress: scToAddr,
277 })
278 bb.lg.Info(
279 "updated picker",
280 zap.String("picker", bb.picker.String()),
281 zap.String("balancer-id", bb.id),
282 zap.String("policy", bb.policy.String()),
283 zap.Strings("subconn-ready", scsToStrings(scToAddr)),
284 zap.Int("subconn-size", len(scToAddr)),
285 )
286}
287
288// Close implements "grpc/balancer.Balancer" interface.
289// Close is a nop because base balancer doesn't have internal state to clean up,
290// and it doesn't need to call RemoveSubConn for the SubConns.
291func (bb *baseBalancer) Close() {
292 // TODO
293}