blob: db04b08b84292b54b496bfdf4ab9213ae743eafd [file] [log] [blame]
Prince Pereirac1c21d62021-04-22 08:38:15 +00001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "sync"
23
24 "google.golang.org/grpc/balancer"
25 "google.golang.org/grpc/connectivity"
26 "google.golang.org/grpc/grpclog"
27 "google.golang.org/grpc/resolver"
28)
29
30type balancerWrapperBuilder struct {
31 b Balancer // The v1 balancer.
32}
33
34func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
35 bwb.b.Start(opts.Target.Endpoint, BalancerConfig{
36 DialCreds: opts.DialCreds,
37 Dialer: opts.Dialer,
38 })
39 _, pickfirst := bwb.b.(*pickFirst)
40 bw := &balancerWrapper{
41 balancer: bwb.b,
42 pickfirst: pickfirst,
43 cc: cc,
44 targetAddr: opts.Target.Endpoint,
45 startCh: make(chan struct{}),
46 conns: make(map[resolver.Address]balancer.SubConn),
47 connSt: make(map[balancer.SubConn]*scState),
48 csEvltr: &balancer.ConnectivityStateEvaluator{},
49 state: connectivity.Idle,
50 }
51 cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: bw})
52 go bw.lbWatcher()
53 return bw
54}
55
56func (bwb *balancerWrapperBuilder) Name() string {
57 return "wrapper"
58}
59
60type scState struct {
61 addr Address // The v1 address type.
62 s connectivity.State
63 down func(error)
64}
65
66type balancerWrapper struct {
67 balancer Balancer // The v1 balancer.
68 pickfirst bool
69
70 cc balancer.ClientConn
71 targetAddr string // Target without the scheme.
72
73 mu sync.Mutex
74 conns map[resolver.Address]balancer.SubConn
75 connSt map[balancer.SubConn]*scState
76 // This channel is closed when handling the first resolver result.
77 // lbWatcher blocks until this is closed, to avoid race between
78 // - NewSubConn is created, cc wants to notify balancer of state changes;
79 // - Build hasn't return, cc doesn't have access to balancer.
80 startCh chan struct{}
81
82 // To aggregate the connectivity state.
83 csEvltr *balancer.ConnectivityStateEvaluator
84 state connectivity.State
85}
86
87// lbWatcher watches the Notify channel of the balancer and manages
88// connections accordingly.
89func (bw *balancerWrapper) lbWatcher() {
90 <-bw.startCh
91 notifyCh := bw.balancer.Notify()
92 if notifyCh == nil {
93 // There's no resolver in the balancer. Connect directly.
94 a := resolver.Address{
95 Addr: bw.targetAddr,
96 Type: resolver.Backend,
97 }
98 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
99 if err != nil {
100 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
101 } else {
102 bw.mu.Lock()
103 bw.conns[a] = sc
104 bw.connSt[sc] = &scState{
105 addr: Address{Addr: bw.targetAddr},
106 s: connectivity.Idle,
107 }
108 bw.mu.Unlock()
109 sc.Connect()
110 }
111 return
112 }
113
114 for addrs := range notifyCh {
115 grpclog.Infof("balancerWrapper: got update addr from Notify: %v", addrs)
116 if bw.pickfirst {
117 var (
118 oldA resolver.Address
119 oldSC balancer.SubConn
120 )
121 bw.mu.Lock()
122 for oldA, oldSC = range bw.conns {
123 break
124 }
125 bw.mu.Unlock()
126 if len(addrs) <= 0 {
127 if oldSC != nil {
128 // Teardown old sc.
129 bw.mu.Lock()
130 delete(bw.conns, oldA)
131 delete(bw.connSt, oldSC)
132 bw.mu.Unlock()
133 bw.cc.RemoveSubConn(oldSC)
134 }
135 continue
136 }
137
138 var newAddrs []resolver.Address
139 for _, a := range addrs {
140 newAddr := resolver.Address{
141 Addr: a.Addr,
142 Type: resolver.Backend, // All addresses from balancer are all backends.
143 ServerName: "",
144 Metadata: a.Metadata,
145 }
146 newAddrs = append(newAddrs, newAddr)
147 }
148 if oldSC == nil {
149 // Create new sc.
150 sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
151 if err != nil {
152 grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
153 } else {
154 bw.mu.Lock()
155 // For pickfirst, there should be only one SubConn, so the
156 // address doesn't matter. All states updating (up and down)
157 // and picking should all happen on that only SubConn.
158 bw.conns[resolver.Address{}] = sc
159 bw.connSt[sc] = &scState{
160 addr: addrs[0], // Use the first address.
161 s: connectivity.Idle,
162 }
163 bw.mu.Unlock()
164 sc.Connect()
165 }
166 } else {
167 bw.mu.Lock()
168 bw.connSt[oldSC].addr = addrs[0]
169 bw.mu.Unlock()
170 oldSC.UpdateAddresses(newAddrs)
171 }
172 } else {
173 var (
174 add []resolver.Address // Addresses need to setup connections.
175 del []balancer.SubConn // Connections need to tear down.
176 )
177 resAddrs := make(map[resolver.Address]Address)
178 for _, a := range addrs {
179 resAddrs[resolver.Address{
180 Addr: a.Addr,
181 Type: resolver.Backend, // All addresses from balancer are all backends.
182 ServerName: "",
183 Metadata: a.Metadata,
184 }] = a
185 }
186 bw.mu.Lock()
187 for a := range resAddrs {
188 if _, ok := bw.conns[a]; !ok {
189 add = append(add, a)
190 }
191 }
192 for a, c := range bw.conns {
193 if _, ok := resAddrs[a]; !ok {
194 del = append(del, c)
195 delete(bw.conns, a)
196 // Keep the state of this sc in bw.connSt until its state becomes Shutdown.
197 }
198 }
199 bw.mu.Unlock()
200 for _, a := range add {
201 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
202 if err != nil {
203 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
204 } else {
205 bw.mu.Lock()
206 bw.conns[a] = sc
207 bw.connSt[sc] = &scState{
208 addr: resAddrs[a],
209 s: connectivity.Idle,
210 }
211 bw.mu.Unlock()
212 sc.Connect()
213 }
214 }
215 for _, c := range del {
216 bw.cc.RemoveSubConn(c)
217 }
218 }
219 }
220}
221
222func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
223 bw.mu.Lock()
224 defer bw.mu.Unlock()
225 scSt, ok := bw.connSt[sc]
226 if !ok {
227 return
228 }
229 if s == connectivity.Idle {
230 sc.Connect()
231 }
232 oldS := scSt.s
233 scSt.s = s
234 if oldS != connectivity.Ready && s == connectivity.Ready {
235 scSt.down = bw.balancer.Up(scSt.addr)
236 } else if oldS == connectivity.Ready && s != connectivity.Ready {
237 if scSt.down != nil {
238 scSt.down(errConnClosing)
239 }
240 }
241 sa := bw.csEvltr.RecordTransition(oldS, s)
242 if bw.state != sa {
243 bw.state = sa
244 }
245 bw.cc.UpdateState(balancer.State{ConnectivityState: bw.state, Picker: bw})
246 if s == connectivity.Shutdown {
247 // Remove state for this sc.
248 delete(bw.connSt, sc)
249 }
250}
251
252func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
253 bw.mu.Lock()
254 defer bw.mu.Unlock()
255 select {
256 case <-bw.startCh:
257 default:
258 close(bw.startCh)
259 }
260 // There should be a resolver inside the balancer.
261 // All updates here, if any, are ignored.
262}
263
264func (bw *balancerWrapper) Close() {
265 bw.mu.Lock()
266 defer bw.mu.Unlock()
267 select {
268 case <-bw.startCh:
269 default:
270 close(bw.startCh)
271 }
272 bw.balancer.Close()
273}
274
275// The picker is the balancerWrapper itself.
276// It either blocks or returns error, consistent with v1 balancer Get().
277func (bw *balancerWrapper) Pick(info balancer.PickInfo) (result balancer.PickResult, err error) {
278 failfast := true // Default failfast is true.
279 if ss, ok := rpcInfoFromContext(info.Ctx); ok {
280 failfast = ss.failfast
281 }
282 a, p, err := bw.balancer.Get(info.Ctx, BalancerGetOptions{BlockingWait: !failfast})
283 if err != nil {
284 return balancer.PickResult{}, toRPCErr(err)
285 }
286 if p != nil {
287 result.Done = func(balancer.DoneInfo) { p() }
288 defer func() {
289 if err != nil {
290 p()
291 }
292 }()
293 }
294
295 bw.mu.Lock()
296 defer bw.mu.Unlock()
297 if bw.pickfirst {
298 // Get the first sc in conns.
299 for _, result.SubConn = range bw.conns {
300 return result, nil
301 }
302 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
303 }
304 var ok1 bool
305 result.SubConn, ok1 = bw.conns[resolver.Address{
306 Addr: a.Addr,
307 Type: resolver.Backend,
308 ServerName: "",
309 Metadata: a.Metadata,
310 }]
311 s, ok2 := bw.connSt[result.SubConn]
312 if !ok1 || !ok2 {
313 // This can only happen due to a race where Get() returned an address
314 // that was subsequently removed by Notify. In this case we should
315 // retry always.
316 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
317 }
318 switch s.s {
319 case connectivity.Ready, connectivity.Idle:
320 return result, nil
321 case connectivity.Shutdown, connectivity.TransientFailure:
322 // If the returned sc has been shut down or is in transient failure,
323 // return error, and this RPC will fail or wait for another picker (if
324 // non-failfast).
325 return balancer.PickResult{}, balancer.ErrTransientFailure
326 default:
327 // For other states (connecting or unknown), the v1 balancer would
328 // traditionally wait until ready and then issue the RPC. Returning
329 // ErrNoSubConnAvailable will be a slight improvement in that it will
330 // allow the balancer to choose another address in case others are
331 // connected.
332 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
333 }
334}