blob: ca07c154b2d4ff98f19aa8ae11c9d0b75b21ea82 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "strings"
24 "sync"
25
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/connectivity"
29 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/resolver"
31 "google.golang.org/grpc/status"
32)
33
34type balancerWrapperBuilder struct {
35 b Balancer // The v1 balancer.
36}
37
38func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
39 targetAddr := cc.Target()
40 targetSplitted := strings.Split(targetAddr, ":///")
41 if len(targetSplitted) >= 2 {
42 targetAddr = targetSplitted[1]
43 }
44
45 bwb.b.Start(targetAddr, BalancerConfig{
46 DialCreds: opts.DialCreds,
47 Dialer: opts.Dialer,
48 })
49 _, pickfirst := bwb.b.(*pickFirst)
50 bw := &balancerWrapper{
51 balancer: bwb.b,
52 pickfirst: pickfirst,
53 cc: cc,
54 targetAddr: targetAddr,
55 startCh: make(chan struct{}),
56 conns: make(map[resolver.Address]balancer.SubConn),
57 connSt: make(map[balancer.SubConn]*scState),
58 csEvltr: &balancer.ConnectivityStateEvaluator{},
59 state: connectivity.Idle,
60 }
61 cc.UpdateBalancerState(connectivity.Idle, bw)
62 go bw.lbWatcher()
63 return bw
64}
65
66func (bwb *balancerWrapperBuilder) Name() string {
67 return "wrapper"
68}
69
70type scState struct {
71 addr Address // The v1 address type.
72 s connectivity.State
73 down func(error)
74}
75
76type balancerWrapper struct {
77 balancer Balancer // The v1 balancer.
78 pickfirst bool
79
80 cc balancer.ClientConn
81 targetAddr string // Target without the scheme.
82
83 mu sync.Mutex
84 conns map[resolver.Address]balancer.SubConn
85 connSt map[balancer.SubConn]*scState
86 // This channel is closed when handling the first resolver result.
87 // lbWatcher blocks until this is closed, to avoid race between
88 // - NewSubConn is created, cc wants to notify balancer of state changes;
89 // - Build hasn't return, cc doesn't have access to balancer.
90 startCh chan struct{}
91
92 // To aggregate the connectivity state.
93 csEvltr *balancer.ConnectivityStateEvaluator
94 state connectivity.State
95}
96
97// lbWatcher watches the Notify channel of the balancer and manages
98// connections accordingly.
99func (bw *balancerWrapper) lbWatcher() {
100 <-bw.startCh
101 notifyCh := bw.balancer.Notify()
102 if notifyCh == nil {
103 // There's no resolver in the balancer. Connect directly.
104 a := resolver.Address{
105 Addr: bw.targetAddr,
106 Type: resolver.Backend,
107 }
108 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
109 if err != nil {
110 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
111 } else {
112 bw.mu.Lock()
113 bw.conns[a] = sc
114 bw.connSt[sc] = &scState{
115 addr: Address{Addr: bw.targetAddr},
116 s: connectivity.Idle,
117 }
118 bw.mu.Unlock()
119 sc.Connect()
120 }
121 return
122 }
123
124 for addrs := range notifyCh {
125 grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
126 if bw.pickfirst {
127 var (
128 oldA resolver.Address
129 oldSC balancer.SubConn
130 )
131 bw.mu.Lock()
132 for oldA, oldSC = range bw.conns {
133 break
134 }
135 bw.mu.Unlock()
136 if len(addrs) <= 0 {
137 if oldSC != nil {
138 // Teardown old sc.
139 bw.mu.Lock()
140 delete(bw.conns, oldA)
141 delete(bw.connSt, oldSC)
142 bw.mu.Unlock()
143 bw.cc.RemoveSubConn(oldSC)
144 }
145 continue
146 }
147
148 var newAddrs []resolver.Address
149 for _, a := range addrs {
150 newAddr := resolver.Address{
151 Addr: a.Addr,
152 Type: resolver.Backend, // All addresses from balancer are all backends.
153 ServerName: "",
154 Metadata: a.Metadata,
155 }
156 newAddrs = append(newAddrs, newAddr)
157 }
158 if oldSC == nil {
159 // Create new sc.
160 sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
161 if err != nil {
162 grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
163 } else {
164 bw.mu.Lock()
165 // For pickfirst, there should be only one SubConn, so the
166 // address doesn't matter. All states updating (up and down)
167 // and picking should all happen on that only SubConn.
168 bw.conns[resolver.Address{}] = sc
169 bw.connSt[sc] = &scState{
170 addr: addrs[0], // Use the first address.
171 s: connectivity.Idle,
172 }
173 bw.mu.Unlock()
174 sc.Connect()
175 }
176 } else {
177 bw.mu.Lock()
178 bw.connSt[oldSC].addr = addrs[0]
179 bw.mu.Unlock()
180 oldSC.UpdateAddresses(newAddrs)
181 }
182 } else {
183 var (
184 add []resolver.Address // Addresses need to setup connections.
185 del []balancer.SubConn // Connections need to tear down.
186 )
187 resAddrs := make(map[resolver.Address]Address)
188 for _, a := range addrs {
189 resAddrs[resolver.Address{
190 Addr: a.Addr,
191 Type: resolver.Backend, // All addresses from balancer are all backends.
192 ServerName: "",
193 Metadata: a.Metadata,
194 }] = a
195 }
196 bw.mu.Lock()
197 for a := range resAddrs {
198 if _, ok := bw.conns[a]; !ok {
199 add = append(add, a)
200 }
201 }
202 for a, c := range bw.conns {
203 if _, ok := resAddrs[a]; !ok {
204 del = append(del, c)
205 delete(bw.conns, a)
206 // Keep the state of this sc in bw.connSt until its state becomes Shutdown.
207 }
208 }
209 bw.mu.Unlock()
210 for _, a := range add {
211 sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
212 if err != nil {
213 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
214 } else {
215 bw.mu.Lock()
216 bw.conns[a] = sc
217 bw.connSt[sc] = &scState{
218 addr: resAddrs[a],
219 s: connectivity.Idle,
220 }
221 bw.mu.Unlock()
222 sc.Connect()
223 }
224 }
225 for _, c := range del {
226 bw.cc.RemoveSubConn(c)
227 }
228 }
229 }
230}
231
232func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
233 bw.mu.Lock()
234 defer bw.mu.Unlock()
235 scSt, ok := bw.connSt[sc]
236 if !ok {
237 return
238 }
239 if s == connectivity.Idle {
240 sc.Connect()
241 }
242 oldS := scSt.s
243 scSt.s = s
244 if oldS != connectivity.Ready && s == connectivity.Ready {
245 scSt.down = bw.balancer.Up(scSt.addr)
246 } else if oldS == connectivity.Ready && s != connectivity.Ready {
247 if scSt.down != nil {
248 scSt.down(errConnClosing)
249 }
250 }
251 sa := bw.csEvltr.RecordTransition(oldS, s)
252 if bw.state != sa {
253 bw.state = sa
254 }
255 bw.cc.UpdateBalancerState(bw.state, bw)
256 if s == connectivity.Shutdown {
257 // Remove state for this sc.
258 delete(bw.connSt, sc)
259 }
260}
261
262func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
263 bw.mu.Lock()
264 defer bw.mu.Unlock()
265 select {
266 case <-bw.startCh:
267 default:
268 close(bw.startCh)
269 }
270 // There should be a resolver inside the balancer.
271 // All updates here, if any, are ignored.
272}
273
274func (bw *balancerWrapper) Close() {
275 bw.mu.Lock()
276 defer bw.mu.Unlock()
277 select {
278 case <-bw.startCh:
279 default:
280 close(bw.startCh)
281 }
282 bw.balancer.Close()
283}
284
285// The picker is the balancerWrapper itself.
286// Pick should never return ErrNoSubConnAvailable.
287// It either blocks or returns error, consistent with v1 balancer Get().
288func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
289 failfast := true // Default failfast is true.
290 if ss, ok := rpcInfoFromContext(ctx); ok {
291 failfast = ss.failfast
292 }
293 a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
294 if err != nil {
295 return nil, nil, err
296 }
297 var done func(balancer.DoneInfo)
298 if p != nil {
299 done = func(i balancer.DoneInfo) { p() }
300 }
301 var sc balancer.SubConn
302 bw.mu.Lock()
303 defer bw.mu.Unlock()
304 if bw.pickfirst {
305 // Get the first sc in conns.
306 for _, sc = range bw.conns {
307 break
308 }
309 } else {
310 var ok bool
311 sc, ok = bw.conns[resolver.Address{
312 Addr: a.Addr,
313 Type: resolver.Backend,
314 ServerName: "",
315 Metadata: a.Metadata,
316 }]
317 if !ok && failfast {
318 return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available")
319 }
320 if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
321 // If the returned sc is not ready and RPC is failfast,
322 // return error, and this RPC will fail.
323 return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available")
324 }
325 }
326
327 return sc, done, nil
328}