blob: 42b60feaa6c6a1053517e053b535d6cdecec1c5e [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "strings"
24 "sync"
25
26 "google.golang.org/grpc/balancer"
khenaidooac637102019-01-14 15:44:34 -050027 "google.golang.org/grpc/connectivity"
28 "google.golang.org/grpc/grpclog"
29 "google.golang.org/grpc/resolver"
khenaidooac637102019-01-14 15:44:34 -050030)
31
32type balancerWrapperBuilder struct {
33 b Balancer // The v1 balancer.
34}
35
36func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
37 targetAddr := cc.Target()
38 targetSplitted := strings.Split(targetAddr, ":///")
39 if len(targetSplitted) >= 2 {
40 targetAddr = targetSplitted[1]
41 }
42
43 bwb.b.Start(targetAddr, BalancerConfig{
44 DialCreds: opts.DialCreds,
45 Dialer: opts.Dialer,
46 })
47 _, pickfirst := bwb.b.(*pickFirst)
48 bw := &balancerWrapper{
49 balancer: bwb.b,
50 pickfirst: pickfirst,
51 cc: cc,
52 targetAddr: targetAddr,
53 startCh: make(chan struct{}),
54 conns: make(map[resolver.Address]balancer.SubConn),
55 connSt: make(map[balancer.SubConn]*scState),
56 csEvltr: &balancer.ConnectivityStateEvaluator{},
57 state: connectivity.Idle,
58 }
59 cc.UpdateBalancerState(connectivity.Idle, bw)
60 go bw.lbWatcher()
61 return bw
62}
63
64func (bwb *balancerWrapperBuilder) Name() string {
65 return "wrapper"
66}
67
68type scState struct {
69 addr Address // The v1 address type.
70 s connectivity.State
71 down func(error)
72}
73
74type balancerWrapper struct {
75 balancer Balancer // The v1 balancer.
76 pickfirst bool
77
78 cc balancer.ClientConn
79 targetAddr string // Target without the scheme.
80
81 mu sync.Mutex
82 conns map[resolver.Address]balancer.SubConn
83 connSt map[balancer.SubConn]*scState
84 // This channel is closed when handling the first resolver result.
85 // lbWatcher blocks until this is closed, to avoid race between
86 // - NewSubConn is created, cc wants to notify balancer of state changes;
87 // - Build hasn't return, cc doesn't have access to balancer.
88 startCh chan struct{}
89
90 // To aggregate the connectivity state.
91 csEvltr *balancer.ConnectivityStateEvaluator
92 state connectivity.State
93}
94
95// lbWatcher watches the Notify channel of the balancer and manages
96// connections accordingly.
97func (bw *balancerWrapper) lbWatcher() {
98 <-bw.startCh
99 notifyCh := bw.balancer.Notify()
100 if notifyCh == nil {
101 // There's no resolver in the balancer. Connect directly.
102 a := resolver.Address{
103 Addr: bw.targetAddr,
104 Type: resolver.Backend,
105 }
106 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
107 if err != nil {
108 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
109 } else {
110 bw.mu.Lock()
111 bw.conns[a] = sc
112 bw.connSt[sc] = &scState{
113 addr: Address{Addr: bw.targetAddr},
114 s: connectivity.Idle,
115 }
116 bw.mu.Unlock()
117 sc.Connect()
118 }
119 return
120 }
121
122 for addrs := range notifyCh {
123 grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
124 if bw.pickfirst {
125 var (
126 oldA resolver.Address
127 oldSC balancer.SubConn
128 )
129 bw.mu.Lock()
130 for oldA, oldSC = range bw.conns {
131 break
132 }
133 bw.mu.Unlock()
134 if len(addrs) <= 0 {
135 if oldSC != nil {
136 // Teardown old sc.
137 bw.mu.Lock()
138 delete(bw.conns, oldA)
139 delete(bw.connSt, oldSC)
140 bw.mu.Unlock()
141 bw.cc.RemoveSubConn(oldSC)
142 }
143 continue
144 }
145
146 var newAddrs []resolver.Address
147 for _, a := range addrs {
148 newAddr := resolver.Address{
149 Addr: a.Addr,
150 Type: resolver.Backend, // All addresses from balancer are all backends.
151 ServerName: "",
152 Metadata: a.Metadata,
153 }
154 newAddrs = append(newAddrs, newAddr)
155 }
156 if oldSC == nil {
157 // Create new sc.
158 sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
159 if err != nil {
160 grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
161 } else {
162 bw.mu.Lock()
163 // For pickfirst, there should be only one SubConn, so the
164 // address doesn't matter. All states updating (up and down)
165 // and picking should all happen on that only SubConn.
166 bw.conns[resolver.Address{}] = sc
167 bw.connSt[sc] = &scState{
168 addr: addrs[0], // Use the first address.
169 s: connectivity.Idle,
170 }
171 bw.mu.Unlock()
172 sc.Connect()
173 }
174 } else {
175 bw.mu.Lock()
176 bw.connSt[oldSC].addr = addrs[0]
177 bw.mu.Unlock()
178 oldSC.UpdateAddresses(newAddrs)
179 }
180 } else {
181 var (
182 add []resolver.Address // Addresses need to setup connections.
183 del []balancer.SubConn // Connections need to tear down.
184 )
185 resAddrs := make(map[resolver.Address]Address)
186 for _, a := range addrs {
187 resAddrs[resolver.Address{
188 Addr: a.Addr,
189 Type: resolver.Backend, // All addresses from balancer are all backends.
190 ServerName: "",
191 Metadata: a.Metadata,
192 }] = a
193 }
194 bw.mu.Lock()
195 for a := range resAddrs {
196 if _, ok := bw.conns[a]; !ok {
197 add = append(add, a)
198 }
199 }
200 for a, c := range bw.conns {
201 if _, ok := resAddrs[a]; !ok {
202 del = append(del, c)
203 delete(bw.conns, a)
204 // Keep the state of this sc in bw.connSt until its state becomes Shutdown.
205 }
206 }
207 bw.mu.Unlock()
208 for _, a := range add {
209 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
210 if err != nil {
211 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
212 } else {
213 bw.mu.Lock()
214 bw.conns[a] = sc
215 bw.connSt[sc] = &scState{
216 addr: resAddrs[a],
217 s: connectivity.Idle,
218 }
219 bw.mu.Unlock()
220 sc.Connect()
221 }
222 }
223 for _, c := range del {
224 bw.cc.RemoveSubConn(c)
225 }
226 }
227 }
228}
229
230func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
231 bw.mu.Lock()
232 defer bw.mu.Unlock()
233 scSt, ok := bw.connSt[sc]
234 if !ok {
235 return
236 }
237 if s == connectivity.Idle {
238 sc.Connect()
239 }
240 oldS := scSt.s
241 scSt.s = s
242 if oldS != connectivity.Ready && s == connectivity.Ready {
243 scSt.down = bw.balancer.Up(scSt.addr)
244 } else if oldS == connectivity.Ready && s != connectivity.Ready {
245 if scSt.down != nil {
246 scSt.down(errConnClosing)
247 }
248 }
249 sa := bw.csEvltr.RecordTransition(oldS, s)
250 if bw.state != sa {
251 bw.state = sa
252 }
253 bw.cc.UpdateBalancerState(bw.state, bw)
254 if s == connectivity.Shutdown {
255 // Remove state for this sc.
256 delete(bw.connSt, sc)
257 }
258}
259
260func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
261 bw.mu.Lock()
262 defer bw.mu.Unlock()
263 select {
264 case <-bw.startCh:
265 default:
266 close(bw.startCh)
267 }
268 // There should be a resolver inside the balancer.
269 // All updates here, if any, are ignored.
270}
271
272func (bw *balancerWrapper) Close() {
273 bw.mu.Lock()
274 defer bw.mu.Unlock()
275 select {
276 case <-bw.startCh:
277 default:
278 close(bw.startCh)
279 }
280 bw.balancer.Close()
281}
282
283// The picker is the balancerWrapper itself.
284// Pick should never return ErrNoSubConnAvailable.
285// It either blocks or returns error, consistent with v1 balancer Get().
286func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
287 failfast := true // Default failfast is true.
288 if ss, ok := rpcInfoFromContext(ctx); ok {
289 failfast = ss.failfast
290 }
291 a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
292 if err != nil {
293 return nil, nil, err
294 }
295 var done func(balancer.DoneInfo)
296 if p != nil {
297 done = func(i balancer.DoneInfo) { p() }
298 }
299 var sc balancer.SubConn
300 bw.mu.Lock()
301 defer bw.mu.Unlock()
302 if bw.pickfirst {
303 // Get the first sc in conns.
304 for _, sc = range bw.conns {
305 break
306 }
307 } else {
308 var ok bool
309 sc, ok = bw.conns[resolver.Address{
310 Addr: a.Addr,
311 Type: resolver.Backend,
312 ServerName: "",
313 Metadata: a.Metadata,
314 }]
315 if !ok && failfast {
Stephane Barbarie260a5632019-02-26 16:12:49 -0500316 return nil, nil, balancer.ErrTransientFailure
khenaidooac637102019-01-14 15:44:34 -0500317 }
318 if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
319 // If the returned sc is not ready and RPC is failfast,
320 // return error, and this RPC will fail.
Stephane Barbarie260a5632019-02-26 16:12:49 -0500321 return nil, nil, balancer.ErrTransientFailure
khenaidooac637102019-01-14 15:44:34 -0500322 }
323 }
324
325 return sc, done, nil
326}