blob: d02a7eec7c3765c11e326820571bc3768512767f [file] [log] [blame]
Stephane Barbarie260a5632019-02-26 16:12:49 -05001// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
Scott Baker8461e152019-10-01 14:44:30 -070015// Package balancer implements client balancer.
Stephane Barbarie260a5632019-02-26 16:12:49 -050016package balancer
17
18import (
Stephane Barbarie260a5632019-02-26 16:12:49 -050019 "strconv"
20 "sync"
21 "time"
22
Scott Baker8461e152019-10-01 14:44:30 -070023 "go.etcd.io/etcd/clientv3/balancer/connectivity"
Stephane Barbarie260a5632019-02-26 16:12:49 -050024 "go.etcd.io/etcd/clientv3/balancer/picker"
25
26 "go.uber.org/zap"
27 "google.golang.org/grpc/balancer"
Scott Baker8461e152019-10-01 14:44:30 -070028 grpcconnectivity "google.golang.org/grpc/connectivity"
Stephane Barbarie260a5632019-02-26 16:12:49 -050029 "google.golang.org/grpc/resolver"
30 _ "google.golang.org/grpc/resolver/dns" // register DNS resolver
31 _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
32)
33
Scott Baker8461e152019-10-01 14:44:30 -070034// Config defines balancer configurations.
35type Config struct {
36 // Policy configures balancer policy.
37 Policy picker.Policy
38
39 // Picker implements gRPC picker.
40 // Leave empty if "Policy" field is not custom.
41 // TODO: currently custom policy is not supported.
42 // Picker picker.Picker
43
44 // Name defines an additional name for balancer.
45 // Useful for balancer testing to avoid register conflicts.
46 // If empty, defaults to policy name.
47 Name string
48
49 // Logger configures balancer logging.
50 // If nil, logs are discarded.
51 Logger *zap.Logger
52}
53
Stephane Barbarie260a5632019-02-26 16:12:49 -050054// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
55// must be invoked at initialization time.
56func RegisterBuilder(cfg Config) {
57 bb := &builder{cfg}
58 balancer.Register(bb)
59
William Kurkiandaa6bb22019-03-07 12:26:28 -050060 bb.cfg.Logger.Debug(
Stephane Barbarie260a5632019-02-26 16:12:49 -050061 "registered balancer",
62 zap.String("policy", bb.cfg.Policy.String()),
63 zap.String("name", bb.cfg.Name),
64 )
65}
66
67type builder struct {
68 cfg Config
69}
70
71// Build is called initially when creating "ccBalancerWrapper".
72// "grpc.Dial" is called to this client connection.
73// Then, resolved addresses will be handled via "HandleResolvedAddrs".
74func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
75 bb := &baseBalancer{
76 id: strconv.FormatInt(time.Now().UnixNano(), 36),
77 policy: b.cfg.Policy,
Scott Baker8461e152019-10-01 14:44:30 -070078 name: b.cfg.Name,
Stephane Barbarie260a5632019-02-26 16:12:49 -050079 lg: b.cfg.Logger,
80
81 addrToSc: make(map[resolver.Address]balancer.SubConn),
82 scToAddr: make(map[balancer.SubConn]resolver.Address),
Scott Baker8461e152019-10-01 14:44:30 -070083 scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
Stephane Barbarie260a5632019-02-26 16:12:49 -050084
Scott Baker8461e152019-10-01 14:44:30 -070085 currentConn: nil,
86 connectivityRecorder: connectivity.New(b.cfg.Logger),
Stephane Barbarie260a5632019-02-26 16:12:49 -050087
88 // initialize picker always returns "ErrNoSubConnAvailable"
Scott Baker8461e152019-10-01 14:44:30 -070089 picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
Stephane Barbarie260a5632019-02-26 16:12:49 -050090 }
91
92 // TODO: support multiple connections
93 bb.mu.Lock()
94 bb.currentConn = cc
95 bb.mu.Unlock()
96
97 bb.lg.Info(
98 "built balancer",
99 zap.String("balancer-id", bb.id),
100 zap.String("policy", bb.policy.String()),
101 zap.String("resolver-target", cc.Target()),
102 )
103 return bb
104}
105
106// Name implements "grpc/balancer.Builder" interface.
107func (b *builder) Name() string { return b.cfg.Name }
108
109// Balancer defines client balancer interface.
110type Balancer interface {
111 // Balancer is called on specified client connection. Client initiates gRPC
112 // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
113 // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
114 // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
115 // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
116 // changes, thus requires failover logic in this method.
117 balancer.Balancer
118
119 // Picker calls "Pick" for every client request.
120 picker.Picker
121}
122
123type baseBalancer struct {
124 id string
125 policy picker.Policy
126 name string
127 lg *zap.Logger
128
129 mu sync.RWMutex
130
131 addrToSc map[resolver.Address]balancer.SubConn
132 scToAddr map[balancer.SubConn]resolver.Address
Scott Baker8461e152019-10-01 14:44:30 -0700133 scToSt map[balancer.SubConn]grpcconnectivity.State
Stephane Barbarie260a5632019-02-26 16:12:49 -0500134
Scott Baker8461e152019-10-01 14:44:30 -0700135 currentConn balancer.ClientConn
136 connectivityRecorder connectivity.Recorder
Stephane Barbarie260a5632019-02-26 16:12:49 -0500137
Scott Baker8461e152019-10-01 14:44:30 -0700138 picker picker.Picker
Stephane Barbarie260a5632019-02-26 16:12:49 -0500139}
140
141// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
142// gRPC sends initial or updated resolved addresses from "Build".
143func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
144 if err != nil {
145 bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
146 return
147 }
Scott Baker8461e152019-10-01 14:44:30 -0700148 bb.lg.Info("resolved",
149 zap.String("picker", bb.picker.String()),
150 zap.String("balancer-id", bb.id),
151 zap.Strings("addresses", addrsToStrings(addrs)),
152 )
Stephane Barbarie260a5632019-02-26 16:12:49 -0500153
154 bb.mu.Lock()
155 defer bb.mu.Unlock()
156
157 resolved := make(map[resolver.Address]struct{})
158 for _, addr := range addrs {
159 resolved[addr] = struct{}{}
160 if _, ok := bb.addrToSc[addr]; !ok {
161 sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
162 if err != nil {
Scott Baker8461e152019-10-01 14:44:30 -0700163 bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
Stephane Barbarie260a5632019-02-26 16:12:49 -0500164 continue
165 }
Scott Baker8461e152019-10-01 14:44:30 -0700166 bb.lg.Info("created subconn", zap.String("address", addr.Addr))
Stephane Barbarie260a5632019-02-26 16:12:49 -0500167 bb.addrToSc[addr] = sc
168 bb.scToAddr[sc] = addr
Scott Baker8461e152019-10-01 14:44:30 -0700169 bb.scToSt[sc] = grpcconnectivity.Idle
Stephane Barbarie260a5632019-02-26 16:12:49 -0500170 sc.Connect()
171 }
172 }
173
174 for addr, sc := range bb.addrToSc {
175 if _, ok := resolved[addr]; !ok {
176 // was removed by resolver or failed to create subconn
177 bb.currentConn.RemoveSubConn(sc)
178 delete(bb.addrToSc, addr)
179
180 bb.lg.Info(
181 "removed subconn",
Scott Baker8461e152019-10-01 14:44:30 -0700182 zap.String("picker", bb.picker.String()),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500183 zap.String("balancer-id", bb.id),
184 zap.String("address", addr.Addr),
185 zap.String("subconn", scToString(sc)),
186 )
187
188 // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
189 // The entry will be deleted in HandleSubConnStateChange.
190 // (DO NOT) delete(bb.scToAddr, sc)
191 // (DO NOT) delete(bb.scToSt, sc)
192 }
193 }
194}
195
196// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
Scott Baker8461e152019-10-01 14:44:30 -0700197func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
Stephane Barbarie260a5632019-02-26 16:12:49 -0500198 bb.mu.Lock()
199 defer bb.mu.Unlock()
200
201 old, ok := bb.scToSt[sc]
202 if !ok {
203 bb.lg.Warn(
204 "state change for an unknown subconn",
Scott Baker8461e152019-10-01 14:44:30 -0700205 zap.String("picker", bb.picker.String()),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500206 zap.String("balancer-id", bb.id),
207 zap.String("subconn", scToString(sc)),
Scott Baker8461e152019-10-01 14:44:30 -0700208 zap.Int("subconn-size", len(bb.scToAddr)),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500209 zap.String("state", s.String()),
210 )
211 return
212 }
213
214 bb.lg.Info(
215 "state changed",
Scott Baker8461e152019-10-01 14:44:30 -0700216 zap.String("picker", bb.picker.String()),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500217 zap.String("balancer-id", bb.id),
Scott Baker8461e152019-10-01 14:44:30 -0700218 zap.Bool("connected", s == grpcconnectivity.Ready),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500219 zap.String("subconn", scToString(sc)),
Scott Baker8461e152019-10-01 14:44:30 -0700220 zap.Int("subconn-size", len(bb.scToAddr)),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500221 zap.String("address", bb.scToAddr[sc].Addr),
222 zap.String("old-state", old.String()),
223 zap.String("new-state", s.String()),
224 )
225
226 bb.scToSt[sc] = s
227 switch s {
Scott Baker8461e152019-10-01 14:44:30 -0700228 case grpcconnectivity.Idle:
Stephane Barbarie260a5632019-02-26 16:12:49 -0500229 sc.Connect()
Scott Baker8461e152019-10-01 14:44:30 -0700230 case grpcconnectivity.Shutdown:
Stephane Barbarie260a5632019-02-26 16:12:49 -0500231 // When an address was removed by resolver, b called RemoveSubConn but
232 // kept the sc's state in scToSt. Remove state for this sc here.
233 delete(bb.scToAddr, sc)
234 delete(bb.scToSt, sc)
235 }
236
Scott Baker8461e152019-10-01 14:44:30 -0700237 oldAggrState := bb.connectivityRecorder.GetCurrentState()
238 bb.connectivityRecorder.RecordTransition(old, s)
Stephane Barbarie260a5632019-02-26 16:12:49 -0500239
Scott Baker8461e152019-10-01 14:44:30 -0700240 // Update balancer picker when one of the following happens:
Stephane Barbarie260a5632019-02-26 16:12:49 -0500241 // - this sc became ready from not-ready
242 // - this sc became not-ready from ready
243 // - the aggregated state of balancer became TransientFailure from non-TransientFailure
244 // - the aggregated state of balancer became non-TransientFailure from TransientFailure
Scott Baker8461e152019-10-01 14:44:30 -0700245 if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
246 (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
247 bb.updatePicker()
Stephane Barbarie260a5632019-02-26 16:12:49 -0500248 }
249
Scott Baker8461e152019-10-01 14:44:30 -0700250 bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
Stephane Barbarie260a5632019-02-26 16:12:49 -0500251}
252
Scott Baker8461e152019-10-01 14:44:30 -0700253func (bb *baseBalancer) updatePicker() {
254 if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
255 bb.picker = picker.NewErr(balancer.ErrTransientFailure)
Stephane Barbarie260a5632019-02-26 16:12:49 -0500256 bb.lg.Info(
Scott Baker8461e152019-10-01 14:44:30 -0700257 "updated picker to transient error picker",
258 zap.String("picker", bb.picker.String()),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500259 zap.String("balancer-id", bb.id),
260 zap.String("policy", bb.policy.String()),
261 )
Stephane Barbarie260a5632019-02-26 16:12:49 -0500262 return
263 }
264
265 // only pass ready subconns to picker
Stephane Barbarie260a5632019-02-26 16:12:49 -0500266 scToAddr := make(map[balancer.SubConn]resolver.Address)
267 for addr, sc := range bb.addrToSc {
Scott Baker8461e152019-10-01 14:44:30 -0700268 if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
Stephane Barbarie260a5632019-02-26 16:12:49 -0500269 scToAddr[sc] = addr
270 }
271 }
272
Scott Baker8461e152019-10-01 14:44:30 -0700273 bb.picker = picker.New(picker.Config{
274 Policy: bb.policy,
275 Logger: bb.lg,
276 SubConnToResolverAddress: scToAddr,
277 })
Stephane Barbarie260a5632019-02-26 16:12:49 -0500278 bb.lg.Info(
Scott Baker8461e152019-10-01 14:44:30 -0700279 "updated picker",
280 zap.String("picker", bb.picker.String()),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500281 zap.String("balancer-id", bb.id),
282 zap.String("policy", bb.policy.String()),
Scott Baker8461e152019-10-01 14:44:30 -0700283 zap.Strings("subconn-ready", scsToStrings(scToAddr)),
284 zap.Int("subconn-size", len(scToAddr)),
Stephane Barbarie260a5632019-02-26 16:12:49 -0500285 )
286}
287
288// Close implements "grpc/balancer.Balancer" interface.
289// Close is a nop because base balancer doesn't have internal state to clean up,
290// and it doesn't need to call RemoveSubConn for the SubConns.
291func (bb *baseBalancer) Close() {
292 // TODO
293}