blob: 00447894f07b33e3f7aecce1d59663a3ae48d416 [file] [log] [blame]
Scott Baker105df152020-04-13 15:55:14 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "fmt"
24 "io"
25 "sync"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/internal/channelz"
31 "google.golang.org/grpc/internal/transport"
32 "google.golang.org/grpc/status"
33)
34
35// v2PickerWrapper wraps a balancer.Picker while providing the
36// balancer.V2Picker API. It requires a pickerWrapper to generate errors
37// including the latest connectionError. To be deleted when balancer.Picker is
38// updated to the balancer.V2Picker API.
39type v2PickerWrapper struct {
40 picker balancer.Picker
41 connErr *connErr
42}
43
44func (v *v2PickerWrapper) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
45 sc, done, err := v.picker.Pick(info.Ctx, info)
46 if err != nil {
47 if err == balancer.ErrTransientFailure {
48 return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.connErr.connectionError()))
49 }
50 return balancer.PickResult{}, err
51 }
52 return balancer.PickResult{SubConn: sc, Done: done}, nil
53}
54
55// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
56// actions and unblock when there's a picker update.
57type pickerWrapper struct {
58 mu sync.Mutex
59 done bool
60 blockingCh chan struct{}
61 picker balancer.V2Picker
62
63 // The latest connection error. TODO: remove when V1 picker is deprecated;
64 // balancer should be responsible for providing the error.
65 *connErr
66}
67
68type connErr struct {
69 mu sync.Mutex
70 err error
71}
72
73func (c *connErr) updateConnectionError(err error) {
74 c.mu.Lock()
75 c.err = err
76 c.mu.Unlock()
77}
78
79func (c *connErr) connectionError() error {
80 c.mu.Lock()
81 err := c.err
82 c.mu.Unlock()
83 return err
84}
85
86func newPickerWrapper() *pickerWrapper {
87 return &pickerWrapper{blockingCh: make(chan struct{}), connErr: &connErr{}}
88}
89
90// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
91func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
92 pw.updatePickerV2(&v2PickerWrapper{picker: p, connErr: pw.connErr})
93}
94
95// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
96func (pw *pickerWrapper) updatePickerV2(p balancer.V2Picker) {
97 pw.mu.Lock()
98 if pw.done {
99 pw.mu.Unlock()
100 return
101 }
102 pw.picker = p
103 // pw.blockingCh should never be nil.
104 close(pw.blockingCh)
105 pw.blockingCh = make(chan struct{})
106 pw.mu.Unlock()
107}
108
109func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
110 acw.mu.Lock()
111 ac := acw.ac
112 acw.mu.Unlock()
113 ac.incrCallsStarted()
114 return func(b balancer.DoneInfo) {
115 if b.Err != nil && b.Err != io.EOF {
116 ac.incrCallsFailed()
117 } else {
118 ac.incrCallsSucceeded()
119 }
120 if done != nil {
121 done(b)
122 }
123 }
124}
125
126// pick returns the transport that will be used for the RPC.
127// It may block in the following cases:
128// - there's no picker
129// - the current picker returns ErrNoSubConnAvailable
130// - the current picker returns other errors and failfast is false.
131// - the subConn returned by the current picker is not READY
132// When one of these situations happens, pick blocks until the picker gets updated.
133func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
134 var ch chan struct{}
135
136 var lastPickErr error
137 for {
138 pw.mu.Lock()
139 if pw.done {
140 pw.mu.Unlock()
141 return nil, nil, ErrClientConnClosing
142 }
143
144 if pw.picker == nil {
145 ch = pw.blockingCh
146 }
147 if ch == pw.blockingCh {
148 // This could happen when either:
149 // - pw.picker is nil (the previous if condition), or
150 // - has called pick on the current picker.
151 pw.mu.Unlock()
152 select {
153 case <-ctx.Done():
154 var errStr string
155 if lastPickErr != nil {
156 errStr = "latest balancer error: " + lastPickErr.Error()
157 } else if connectionErr := pw.connectionError(); connectionErr != nil {
158 errStr = "latest connection error: " + connectionErr.Error()
159 } else {
160 errStr = ctx.Err().Error()
161 }
162 switch ctx.Err() {
163 case context.DeadlineExceeded:
164 return nil, nil, status.Error(codes.DeadlineExceeded, errStr)
165 case context.Canceled:
166 return nil, nil, status.Error(codes.Canceled, errStr)
167 }
168 case <-ch:
169 }
170 continue
171 }
172
173 ch = pw.blockingCh
174 p := pw.picker
175 pw.mu.Unlock()
176
177 pickResult, err := p.Pick(info)
178
179 if err != nil {
180 if err == balancer.ErrNoSubConnAvailable {
181 continue
182 }
183 if tfe, ok := err.(interface{ IsTransientFailure() bool }); ok && tfe.IsTransientFailure() {
184 if !failfast {
185 lastPickErr = err
186 continue
187 }
188 return nil, nil, status.Error(codes.Unavailable, err.Error())
189 }
190 if _, ok := status.FromError(err); ok {
191 return nil, nil, err
192 }
193 // err is some other error.
194 return nil, nil, status.Error(codes.Unknown, err.Error())
195 }
196
197 acw, ok := pickResult.SubConn.(*acBalancerWrapper)
198 if !ok {
199 grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
200 continue
201 }
202 if t, ok := acw.getAddrConn().getReadyTransport(); ok {
203 if channelz.IsOn() {
204 return t, doneChannelzWrapper(acw, pickResult.Done), nil
205 }
206 return t, pickResult.Done, nil
207 }
208 if pickResult.Done != nil {
209 // Calling done with nil error, no bytes sent and no bytes received.
210 // DoneInfo with default value works.
211 pickResult.Done(balancer.DoneInfo{})
212 }
213 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
214 // If ok == false, ac.state is not READY.
215 // A valid picker always returns READY subConn. This means the state of ac
216 // just changed, and picker will be updated shortly.
217 // continue back to the beginning of the for loop to repick.
218 }
219}
220
221func (pw *pickerWrapper) close() {
222 pw.mu.Lock()
223 defer pw.mu.Unlock()
224 if pw.done {
225 return
226 }
227 pw.done = true
228 close(pw.blockingCh)
229}