blob: 45baa2ae13da94a4db1a334d6576b1752494c256 [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "io"
24 "sync"
25
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/grpclog"
29 "google.golang.org/grpc/internal/channelz"
30 "google.golang.org/grpc/internal/transport"
31 "google.golang.org/grpc/status"
32)
33
34// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
35// actions and unblock when there's a picker update.
36type pickerWrapper struct {
37 mu sync.Mutex
38 done bool
39 blockingCh chan struct{}
40 picker balancer.Picker
41
42 // The latest connection happened.
43 connErrMu sync.Mutex
44 connErr error
45}
46
47func newPickerWrapper() *pickerWrapper {
48 bp := &pickerWrapper{blockingCh: make(chan struct{})}
49 return bp
50}
51
52func (bp *pickerWrapper) updateConnectionError(err error) {
53 bp.connErrMu.Lock()
54 bp.connErr = err
55 bp.connErrMu.Unlock()
56}
57
58func (bp *pickerWrapper) connectionError() error {
59 bp.connErrMu.Lock()
60 err := bp.connErr
61 bp.connErrMu.Unlock()
62 return err
63}
64
65// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
66func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
67 bp.mu.Lock()
68 if bp.done {
69 bp.mu.Unlock()
70 return
71 }
72 bp.picker = p
73 // bp.blockingCh should never be nil.
74 close(bp.blockingCh)
75 bp.blockingCh = make(chan struct{})
76 bp.mu.Unlock()
77}
78
79func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
80 acw.mu.Lock()
81 ac := acw.ac
82 acw.mu.Unlock()
83 ac.incrCallsStarted()
84 return func(b balancer.DoneInfo) {
85 if b.Err != nil && b.Err != io.EOF {
86 ac.incrCallsFailed()
87 } else {
88 ac.incrCallsSucceeded()
89 }
90 if done != nil {
91 done(b)
92 }
93 }
94}
95
96// pick returns the transport that will be used for the RPC.
97// It may block in the following cases:
98// - there's no picker
99// - the current picker returns ErrNoSubConnAvailable
100// - the current picker returns other errors and failfast is false.
101// - the subConn returned by the current picker is not READY
102// When one of these situations happens, pick blocks until the picker gets updated.
103func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
104 var ch chan struct{}
105
106 for {
107 bp.mu.Lock()
108 if bp.done {
109 bp.mu.Unlock()
110 return nil, nil, ErrClientConnClosing
111 }
112
113 if bp.picker == nil {
114 ch = bp.blockingCh
115 }
116 if ch == bp.blockingCh {
117 // This could happen when either:
118 // - bp.picker is nil (the previous if condition), or
119 // - has called pick on the current picker.
120 bp.mu.Unlock()
121 select {
122 case <-ctx.Done():
123 if connectionErr := bp.connectionError(); connectionErr != nil {
124 switch ctx.Err() {
125 case context.DeadlineExceeded:
126 return nil, nil, status.Errorf(codes.DeadlineExceeded, "latest connection error: %v", connectionErr)
127 case context.Canceled:
128 return nil, nil, status.Errorf(codes.Canceled, "latest connection error: %v", connectionErr)
129 }
130 }
131 return nil, nil, ctx.Err()
132 case <-ch:
133 }
134 continue
135 }
136
137 ch = bp.blockingCh
138 p := bp.picker
139 bp.mu.Unlock()
140
141 subConn, done, err := p.Pick(ctx, opts)
142
143 if err != nil {
144 switch err {
145 case balancer.ErrNoSubConnAvailable:
146 continue
147 case balancer.ErrTransientFailure:
148 if !failfast {
149 continue
150 }
151 return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
152 case context.DeadlineExceeded:
153 return nil, nil, status.Error(codes.DeadlineExceeded, err.Error())
154 case context.Canceled:
155 return nil, nil, status.Error(codes.Canceled, err.Error())
156 default:
157 if _, ok := status.FromError(err); ok {
158 return nil, nil, err
159 }
160 // err is some other error.
161 return nil, nil, status.Error(codes.Unknown, err.Error())
162 }
163 }
164
165 acw, ok := subConn.(*acBalancerWrapper)
166 if !ok {
167 grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
168 continue
169 }
170 if t, ok := acw.getAddrConn().getReadyTransport(); ok {
171 if channelz.IsOn() {
172 return t, doneChannelzWrapper(acw, done), nil
173 }
174 return t, done, nil
175 }
176 if done != nil {
177 // Calling done with nil error, no bytes sent and no bytes received.
178 // DoneInfo with default value works.
179 done(balancer.DoneInfo{})
180 }
181 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
182 // If ok == false, ac.state is not READY.
183 // A valid picker always returns READY subConn. This means the state of ac
184 // just changed, and picker will be updated shortly.
185 // continue back to the beginning of the for loop to repick.
186 }
187}
188
189func (bp *pickerWrapper) close() {
190 bp.mu.Lock()
191 defer bp.mu.Unlock()
192 if bp.done {
193 return
194 }
195 bp.done = true
196 close(bp.blockingCh)
197}