blob: 00447894f07b33e3f7aecce1d59663a3ae48d416 [file] [log] [blame]
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
Arjun E K57a7fcb2020-01-30 06:44:45 +000023 "fmt"
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070024 "io"
25 "sync"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/internal/channelz"
31 "google.golang.org/grpc/internal/transport"
32 "google.golang.org/grpc/status"
33)
34
Arjun E K57a7fcb2020-01-30 06:44:45 +000035// v2PickerWrapper wraps a balancer.Picker while providing the
36// balancer.V2Picker API. It requires a pickerWrapper to generate errors
37// including the latest connectionError. To be deleted when balancer.Picker is
38// updated to the balancer.V2Picker API.
39type v2PickerWrapper struct {
40 picker balancer.Picker
41 connErr *connErr
42}
43
44func (v *v2PickerWrapper) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
45 sc, done, err := v.picker.Pick(info.Ctx, info)
46 if err != nil {
47 if err == balancer.ErrTransientFailure {
48 return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.connErr.connectionError()))
49 }
50 return balancer.PickResult{}, err
51 }
52 return balancer.PickResult{SubConn: sc, Done: done}, nil
53}
54
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070055// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
56// actions and unblock when there's a picker update.
57type pickerWrapper struct {
58 mu sync.Mutex
59 done bool
60 blockingCh chan struct{}
Arjun E K57a7fcb2020-01-30 06:44:45 +000061 picker balancer.V2Picker
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070062
Arjun E K57a7fcb2020-01-30 06:44:45 +000063 // The latest connection error. TODO: remove when V1 picker is deprecated;
64 // balancer should be responsible for providing the error.
65 *connErr
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070066}
67
Arjun E K57a7fcb2020-01-30 06:44:45 +000068type connErr struct {
69 mu sync.Mutex
70 err error
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070071}
72
Arjun E K57a7fcb2020-01-30 06:44:45 +000073func (c *connErr) updateConnectionError(err error) {
74 c.mu.Lock()
75 c.err = err
76 c.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070077}
78
Arjun E K57a7fcb2020-01-30 06:44:45 +000079func (c *connErr) connectionError() error {
80 c.mu.Lock()
81 err := c.err
82 c.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070083 return err
84}
85
Arjun E K57a7fcb2020-01-30 06:44:45 +000086func newPickerWrapper() *pickerWrapper {
87 return &pickerWrapper{blockingCh: make(chan struct{}), connErr: &connErr{}}
88}
89
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070090// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
Arjun E K57a7fcb2020-01-30 06:44:45 +000091func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
92 pw.updatePickerV2(&v2PickerWrapper{picker: p, connErr: pw.connErr})
93}
94
95// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
96func (pw *pickerWrapper) updatePickerV2(p balancer.V2Picker) {
97 pw.mu.Lock()
98 if pw.done {
99 pw.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700100 return
101 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000102 pw.picker = p
103 // pw.blockingCh should never be nil.
104 close(pw.blockingCh)
105 pw.blockingCh = make(chan struct{})
106 pw.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700107}
108
109func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
110 acw.mu.Lock()
111 ac := acw.ac
112 acw.mu.Unlock()
113 ac.incrCallsStarted()
114 return func(b balancer.DoneInfo) {
115 if b.Err != nil && b.Err != io.EOF {
116 ac.incrCallsFailed()
117 } else {
118 ac.incrCallsSucceeded()
119 }
120 if done != nil {
121 done(b)
122 }
123 }
124}
125
126// pick returns the transport that will be used for the RPC.
127// It may block in the following cases:
128// - there's no picker
129// - the current picker returns ErrNoSubConnAvailable
130// - the current picker returns other errors and failfast is false.
131// - the subConn returned by the current picker is not READY
132// When one of these situations happens, pick blocks until the picker gets updated.
Arjun E K57a7fcb2020-01-30 06:44:45 +0000133func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700134 var ch chan struct{}
135
Arjun E K57a7fcb2020-01-30 06:44:45 +0000136 var lastPickErr error
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700137 for {
Arjun E K57a7fcb2020-01-30 06:44:45 +0000138 pw.mu.Lock()
139 if pw.done {
140 pw.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700141 return nil, nil, ErrClientConnClosing
142 }
143
Arjun E K57a7fcb2020-01-30 06:44:45 +0000144 if pw.picker == nil {
145 ch = pw.blockingCh
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700146 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000147 if ch == pw.blockingCh {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700148 // This could happen when either:
Arjun E K57a7fcb2020-01-30 06:44:45 +0000149 // - pw.picker is nil (the previous if condition), or
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700150 // - has called pick on the current picker.
Arjun E K57a7fcb2020-01-30 06:44:45 +0000151 pw.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700152 select {
153 case <-ctx.Done():
Arjun E K57a7fcb2020-01-30 06:44:45 +0000154 var errStr string
155 if lastPickErr != nil {
156 errStr = "latest balancer error: " + lastPickErr.Error()
157 } else if connectionErr := pw.connectionError(); connectionErr != nil {
158 errStr = "latest connection error: " + connectionErr.Error()
159 } else {
160 errStr = ctx.Err().Error()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700161 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000162 switch ctx.Err() {
163 case context.DeadlineExceeded:
164 return nil, nil, status.Error(codes.DeadlineExceeded, errStr)
165 case context.Canceled:
166 return nil, nil, status.Error(codes.Canceled, errStr)
167 }
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700168 case <-ch:
169 }
170 continue
171 }
172
Arjun E K57a7fcb2020-01-30 06:44:45 +0000173 ch = pw.blockingCh
174 p := pw.picker
175 pw.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700176
Arjun E K57a7fcb2020-01-30 06:44:45 +0000177 pickResult, err := p.Pick(info)
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700178
179 if err != nil {
Arjun E K57a7fcb2020-01-30 06:44:45 +0000180 if err == balancer.ErrNoSubConnAvailable {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700181 continue
Arjun E K57a7fcb2020-01-30 06:44:45 +0000182 }
183 if tfe, ok := err.(interface{ IsTransientFailure() bool }); ok && tfe.IsTransientFailure() {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700184 if !failfast {
Arjun E K57a7fcb2020-01-30 06:44:45 +0000185 lastPickErr = err
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700186 continue
187 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000188 return nil, nil, status.Error(codes.Unavailable, err.Error())
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700189 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000190 if _, ok := status.FromError(err); ok {
191 return nil, nil, err
192 }
193 // err is some other error.
194 return nil, nil, status.Error(codes.Unknown, err.Error())
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700195 }
196
Arjun E K57a7fcb2020-01-30 06:44:45 +0000197 acw, ok := pickResult.SubConn.(*acBalancerWrapper)
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700198 if !ok {
199 grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
200 continue
201 }
202 if t, ok := acw.getAddrConn().getReadyTransport(); ok {
203 if channelz.IsOn() {
Arjun E K57a7fcb2020-01-30 06:44:45 +0000204 return t, doneChannelzWrapper(acw, pickResult.Done), nil
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700205 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000206 return t, pickResult.Done, nil
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700207 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000208 if pickResult.Done != nil {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700209 // Calling done with nil error, no bytes sent and no bytes received.
210 // DoneInfo with default value works.
Arjun E K57a7fcb2020-01-30 06:44:45 +0000211 pickResult.Done(balancer.DoneInfo{})
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700212 }
213 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
214 // If ok == false, ac.state is not READY.
215 // A valid picker always returns READY subConn. This means the state of ac
216 // just changed, and picker will be updated shortly.
217 // continue back to the beginning of the for loop to repick.
218 }
219}
220
Arjun E K57a7fcb2020-01-30 06:44:45 +0000221func (pw *pickerWrapper) close() {
222 pw.mu.Lock()
223 defer pw.mu.Unlock()
224 if pw.done {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700225 return
226 }
Arjun E K57a7fcb2020-01-30 06:44:45 +0000227 pw.done = true
228 close(pw.blockingCh)
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700229}