blob: 66e9a44ac4dae60007fb635bdd852f455ce2c381 [file] [log] [blame]
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "sync"
24
25 "google.golang.org/grpc/balancer"
26 "google.golang.org/grpc/connectivity"
27 "google.golang.org/grpc/grpclog"
28 "google.golang.org/grpc/resolver"
29)
30
31type balancerWrapperBuilder struct {
32 b Balancer // The v1 balancer.
33}
34
35func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
36 bwb.b.Start(opts.Target.Endpoint, BalancerConfig{
37 DialCreds: opts.DialCreds,
38 Dialer: opts.Dialer,
39 })
40 _, pickfirst := bwb.b.(*pickFirst)
41 bw := &balancerWrapper{
42 balancer: bwb.b,
43 pickfirst: pickfirst,
44 cc: cc,
45 targetAddr: opts.Target.Endpoint,
46 startCh: make(chan struct{}),
47 conns: make(map[resolver.Address]balancer.SubConn),
48 connSt: make(map[balancer.SubConn]*scState),
49 csEvltr: &balancer.ConnectivityStateEvaluator{},
50 state: connectivity.Idle,
51 }
52 cc.UpdateBalancerState(connectivity.Idle, bw)
53 go bw.lbWatcher()
54 return bw
55}
56
57func (bwb *balancerWrapperBuilder) Name() string {
58 return "wrapper"
59}
60
61type scState struct {
62 addr Address // The v1 address type.
63 s connectivity.State
64 down func(error)
65}
66
67type balancerWrapper struct {
68 balancer Balancer // The v1 balancer.
69 pickfirst bool
70
71 cc balancer.ClientConn
72 targetAddr string // Target without the scheme.
73
74 mu sync.Mutex
75 conns map[resolver.Address]balancer.SubConn
76 connSt map[balancer.SubConn]*scState
77 // This channel is closed when handling the first resolver result.
78 // lbWatcher blocks until this is closed, to avoid race between
79 // - NewSubConn is created, cc wants to notify balancer of state changes;
80 // - Build hasn't return, cc doesn't have access to balancer.
81 startCh chan struct{}
82
83 // To aggregate the connectivity state.
84 csEvltr *balancer.ConnectivityStateEvaluator
85 state connectivity.State
86}
87
88// lbWatcher watches the Notify channel of the balancer and manages
89// connections accordingly.
90func (bw *balancerWrapper) lbWatcher() {
91 <-bw.startCh
92 notifyCh := bw.balancer.Notify()
93 if notifyCh == nil {
94 // There's no resolver in the balancer. Connect directly.
95 a := resolver.Address{
96 Addr: bw.targetAddr,
97 Type: resolver.Backend,
98 }
99 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
100 if err != nil {
101 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
102 } else {
103 bw.mu.Lock()
104 bw.conns[a] = sc
105 bw.connSt[sc] = &scState{
106 addr: Address{Addr: bw.targetAddr},
107 s: connectivity.Idle,
108 }
109 bw.mu.Unlock()
110 sc.Connect()
111 }
112 return
113 }
114
115 for addrs := range notifyCh {
116 grpclog.Infof("balancerWrapper: got update addr from Notify: %v", addrs)
117 if bw.pickfirst {
118 var (
119 oldA resolver.Address
120 oldSC balancer.SubConn
121 )
122 bw.mu.Lock()
123 for oldA, oldSC = range bw.conns {
124 break
125 }
126 bw.mu.Unlock()
127 if len(addrs) <= 0 {
128 if oldSC != nil {
129 // Teardown old sc.
130 bw.mu.Lock()
131 delete(bw.conns, oldA)
132 delete(bw.connSt, oldSC)
133 bw.mu.Unlock()
134 bw.cc.RemoveSubConn(oldSC)
135 }
136 continue
137 }
138
139 var newAddrs []resolver.Address
140 for _, a := range addrs {
141 newAddr := resolver.Address{
142 Addr: a.Addr,
143 Type: resolver.Backend, // All addresses from balancer are all backends.
144 ServerName: "",
145 Metadata: a.Metadata,
146 }
147 newAddrs = append(newAddrs, newAddr)
148 }
149 if oldSC == nil {
150 // Create new sc.
151 sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
152 if err != nil {
153 grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
154 } else {
155 bw.mu.Lock()
156 // For pickfirst, there should be only one SubConn, so the
157 // address doesn't matter. All states updating (up and down)
158 // and picking should all happen on that only SubConn.
159 bw.conns[resolver.Address{}] = sc
160 bw.connSt[sc] = &scState{
161 addr: addrs[0], // Use the first address.
162 s: connectivity.Idle,
163 }
164 bw.mu.Unlock()
165 sc.Connect()
166 }
167 } else {
168 bw.mu.Lock()
169 bw.connSt[oldSC].addr = addrs[0]
170 bw.mu.Unlock()
171 oldSC.UpdateAddresses(newAddrs)
172 }
173 } else {
174 var (
175 add []resolver.Address // Addresses need to setup connections.
176 del []balancer.SubConn // Connections need to tear down.
177 )
178 resAddrs := make(map[resolver.Address]Address)
179 for _, a := range addrs {
180 resAddrs[resolver.Address{
181 Addr: a.Addr,
182 Type: resolver.Backend, // All addresses from balancer are all backends.
183 ServerName: "",
184 Metadata: a.Metadata,
185 }] = a
186 }
187 bw.mu.Lock()
188 for a := range resAddrs {
189 if _, ok := bw.conns[a]; !ok {
190 add = append(add, a)
191 }
192 }
193 for a, c := range bw.conns {
194 if _, ok := resAddrs[a]; !ok {
195 del = append(del, c)
196 delete(bw.conns, a)
197 // Keep the state of this sc in bw.connSt until its state becomes Shutdown.
198 }
199 }
200 bw.mu.Unlock()
201 for _, a := range add {
202 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
203 if err != nil {
204 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
205 } else {
206 bw.mu.Lock()
207 bw.conns[a] = sc
208 bw.connSt[sc] = &scState{
209 addr: resAddrs[a],
210 s: connectivity.Idle,
211 }
212 bw.mu.Unlock()
213 sc.Connect()
214 }
215 }
216 for _, c := range del {
217 bw.cc.RemoveSubConn(c)
218 }
219 }
220 }
221}
222
223func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
224 bw.mu.Lock()
225 defer bw.mu.Unlock()
226 scSt, ok := bw.connSt[sc]
227 if !ok {
228 return
229 }
230 if s == connectivity.Idle {
231 sc.Connect()
232 }
233 oldS := scSt.s
234 scSt.s = s
235 if oldS != connectivity.Ready && s == connectivity.Ready {
236 scSt.down = bw.balancer.Up(scSt.addr)
237 } else if oldS == connectivity.Ready && s != connectivity.Ready {
238 if scSt.down != nil {
239 scSt.down(errConnClosing)
240 }
241 }
242 sa := bw.csEvltr.RecordTransition(oldS, s)
243 if bw.state != sa {
244 bw.state = sa
245 }
246 bw.cc.UpdateBalancerState(bw.state, bw)
247 if s == connectivity.Shutdown {
248 // Remove state for this sc.
249 delete(bw.connSt, sc)
250 }
251}
252
253func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
254 bw.mu.Lock()
255 defer bw.mu.Unlock()
256 select {
257 case <-bw.startCh:
258 default:
259 close(bw.startCh)
260 }
261 // There should be a resolver inside the balancer.
262 // All updates here, if any, are ignored.
263}
264
265func (bw *balancerWrapper) Close() {
266 bw.mu.Lock()
267 defer bw.mu.Unlock()
268 select {
269 case <-bw.startCh:
270 default:
271 close(bw.startCh)
272 }
273 bw.balancer.Close()
274}
275
276// The picker is the balancerWrapper itself.
277// It either blocks or returns error, consistent with v1 balancer Get().
278func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (sc balancer.SubConn, done func(balancer.DoneInfo), err error) {
279 failfast := true // Default failfast is true.
280 if ss, ok := rpcInfoFromContext(ctx); ok {
281 failfast = ss.failfast
282 }
283 a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
284 if err != nil {
285 return nil, nil, err
286 }
287 if p != nil {
288 done = func(balancer.DoneInfo) { p() }
289 defer func() {
290 if err != nil {
291 p()
292 }
293 }()
294 }
295
296 bw.mu.Lock()
297 defer bw.mu.Unlock()
298 if bw.pickfirst {
299 // Get the first sc in conns.
300 for _, sc := range bw.conns {
301 return sc, done, nil
302 }
303 return nil, nil, balancer.ErrNoSubConnAvailable
304 }
305 sc, ok1 := bw.conns[resolver.Address{
306 Addr: a.Addr,
307 Type: resolver.Backend,
308 ServerName: "",
309 Metadata: a.Metadata,
310 }]
311 s, ok2 := bw.connSt[sc]
312 if !ok1 || !ok2 {
313 // This can only happen due to a race where Get() returned an address
314 // that was subsequently removed by Notify. In this case we should
315 // retry always.
316 return nil, nil, balancer.ErrNoSubConnAvailable
317 }
318 switch s.s {
319 case connectivity.Ready, connectivity.Idle:
320 return sc, done, nil
321 case connectivity.Shutdown, connectivity.TransientFailure:
322 // If the returned sc has been shut down or is in transient failure,
323 // return error, and this RPC will fail or wait for another picker (if
324 // non-failfast).
325 return nil, nil, balancer.ErrTransientFailure
326 default:
327 // For other states (connecting or unknown), the v1 balancer would
328 // traditionally wait until ready and then issue the RPC. Returning
329 // ErrNoSubConnAvailable will be a slight improvement in that it will
330 // allow the balancer to choose another address in case others are
331 // connected.
332 return nil, nil, balancer.ErrNoSubConnAvailable
333 }
334}