blob: 25dc2b7e730b5d262cc88a80acee066d0d132cf6 [file] [log] [blame]
Stephane Barbarie260a5632019-02-26 16:12:49 -05001// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package balancer
16
17import (
18 "fmt"
19 "strconv"
20 "sync"
21 "time"
22
23 "go.etcd.io/etcd/clientv3/balancer/picker"
24
25 "go.uber.org/zap"
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/connectivity"
28 "google.golang.org/grpc/resolver"
29 _ "google.golang.org/grpc/resolver/dns" // register DNS resolver
30 _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
31)
32
33// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
34// must be invoked at initialization time.
35func RegisterBuilder(cfg Config) {
36 bb := &builder{cfg}
37 balancer.Register(bb)
38
39 bb.cfg.Logger.Info(
40 "registered balancer",
41 zap.String("policy", bb.cfg.Policy.String()),
42 zap.String("name", bb.cfg.Name),
43 )
44}
45
46type builder struct {
47 cfg Config
48}
49
50// Build is called initially when creating "ccBalancerWrapper".
51// "grpc.Dial" is called to this client connection.
52// Then, resolved addresses will be handled via "HandleResolvedAddrs".
53func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
54 bb := &baseBalancer{
55 id: strconv.FormatInt(time.Now().UnixNano(), 36),
56 policy: b.cfg.Policy,
57 name: b.cfg.Policy.String(),
58 lg: b.cfg.Logger,
59
60 addrToSc: make(map[resolver.Address]balancer.SubConn),
61 scToAddr: make(map[balancer.SubConn]resolver.Address),
62 scToSt: make(map[balancer.SubConn]connectivity.State),
63
64 currentConn: nil,
65 csEvltr: &connectivityStateEvaluator{},
66
67 // initialize picker always returns "ErrNoSubConnAvailable"
68 Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
69 }
70 if b.cfg.Name != "" {
71 bb.name = b.cfg.Name
72 }
73 if bb.lg == nil {
74 bb.lg = zap.NewNop()
75 }
76
77 // TODO: support multiple connections
78 bb.mu.Lock()
79 bb.currentConn = cc
80 bb.mu.Unlock()
81
82 bb.lg.Info(
83 "built balancer",
84 zap.String("balancer-id", bb.id),
85 zap.String("policy", bb.policy.String()),
86 zap.String("resolver-target", cc.Target()),
87 )
88 return bb
89}
90
91// Name implements "grpc/balancer.Builder" interface.
92func (b *builder) Name() string { return b.cfg.Name }
93
94// Balancer defines client balancer interface.
95type Balancer interface {
96 // Balancer is called on specified client connection. Client initiates gRPC
97 // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
98 // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
99 // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
100 // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
101 // changes, thus requires failover logic in this method.
102 balancer.Balancer
103
104 // Picker calls "Pick" for every client request.
105 picker.Picker
106}
107
108type baseBalancer struct {
109 id string
110 policy picker.Policy
111 name string
112 lg *zap.Logger
113
114 mu sync.RWMutex
115
116 addrToSc map[resolver.Address]balancer.SubConn
117 scToAddr map[balancer.SubConn]resolver.Address
118 scToSt map[balancer.SubConn]connectivity.State
119
120 currentConn balancer.ClientConn
121 currentState connectivity.State
122 csEvltr *connectivityStateEvaluator
123
124 picker.Picker
125}
126
127// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
128// gRPC sends initial or updated resolved addresses from "Build".
129func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
130 if err != nil {
131 bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
132 return
133 }
134 bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
135
136 bb.mu.Lock()
137 defer bb.mu.Unlock()
138
139 resolved := make(map[resolver.Address]struct{})
140 for _, addr := range addrs {
141 resolved[addr] = struct{}{}
142 if _, ok := bb.addrToSc[addr]; !ok {
143 sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
144 if err != nil {
145 bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
146 continue
147 }
148 bb.addrToSc[addr] = sc
149 bb.scToAddr[sc] = addr
150 bb.scToSt[sc] = connectivity.Idle
151 sc.Connect()
152 }
153 }
154
155 for addr, sc := range bb.addrToSc {
156 if _, ok := resolved[addr]; !ok {
157 // was removed by resolver or failed to create subconn
158 bb.currentConn.RemoveSubConn(sc)
159 delete(bb.addrToSc, addr)
160
161 bb.lg.Info(
162 "removed subconn",
163 zap.String("balancer-id", bb.id),
164 zap.String("address", addr.Addr),
165 zap.String("subconn", scToString(sc)),
166 )
167
168 // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
169 // The entry will be deleted in HandleSubConnStateChange.
170 // (DO NOT) delete(bb.scToAddr, sc)
171 // (DO NOT) delete(bb.scToSt, sc)
172 }
173 }
174}
175
176// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
177func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
178 bb.mu.Lock()
179 defer bb.mu.Unlock()
180
181 old, ok := bb.scToSt[sc]
182 if !ok {
183 bb.lg.Warn(
184 "state change for an unknown subconn",
185 zap.String("balancer-id", bb.id),
186 zap.String("subconn", scToString(sc)),
187 zap.String("state", s.String()),
188 )
189 return
190 }
191
192 bb.lg.Info(
193 "state changed",
194 zap.String("balancer-id", bb.id),
195 zap.Bool("connected", s == connectivity.Ready),
196 zap.String("subconn", scToString(sc)),
197 zap.String("address", bb.scToAddr[sc].Addr),
198 zap.String("old-state", old.String()),
199 zap.String("new-state", s.String()),
200 )
201
202 bb.scToSt[sc] = s
203 switch s {
204 case connectivity.Idle:
205 sc.Connect()
206 case connectivity.Shutdown:
207 // When an address was removed by resolver, b called RemoveSubConn but
208 // kept the sc's state in scToSt. Remove state for this sc here.
209 delete(bb.scToAddr, sc)
210 delete(bb.scToSt, sc)
211 }
212
213 oldAggrState := bb.currentState
214 bb.currentState = bb.csEvltr.recordTransition(old, s)
215
216 // Regenerate picker when one of the following happens:
217 // - this sc became ready from not-ready
218 // - this sc became not-ready from ready
219 // - the aggregated state of balancer became TransientFailure from non-TransientFailure
220 // - the aggregated state of balancer became non-TransientFailure from TransientFailure
221 if (s == connectivity.Ready) != (old == connectivity.Ready) ||
222 (bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
223 bb.regeneratePicker()
224 }
225
226 bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
227 return
228}
229
230func (bb *baseBalancer) regeneratePicker() {
231 if bb.currentState == connectivity.TransientFailure {
232 bb.lg.Info(
233 "generated transient error picker",
234 zap.String("balancer-id", bb.id),
235 zap.String("policy", bb.policy.String()),
236 )
237 bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
238 return
239 }
240
241 // only pass ready subconns to picker
242 scs := make([]balancer.SubConn, 0)
243 addrToSc := make(map[resolver.Address]balancer.SubConn)
244 scToAddr := make(map[balancer.SubConn]resolver.Address)
245 for addr, sc := range bb.addrToSc {
246 if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
247 scs = append(scs, sc)
248 addrToSc[addr] = sc
249 scToAddr[sc] = addr
250 }
251 }
252
253 switch bb.policy {
254 case picker.RoundrobinBalanced:
255 bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)
256
257 default:
258 panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
259 }
260
261 bb.lg.Info(
262 "generated picker",
263 zap.String("balancer-id", bb.id),
264 zap.String("policy", bb.policy.String()),
265 zap.Strings("subconn-ready", scsToStrings(addrToSc)),
266 zap.Int("subconn-size", len(addrToSc)),
267 )
268}
269
270// Close implements "grpc/balancer.Balancer" interface.
271// Close is a nop because base balancer doesn't have internal state to clean up,
272// and it doesn't need to call RemoveSubConn for the SubConns.
273func (bb *baseBalancer) Close() {
274 // TODO
275}