blob: 45baa2ae13da94a4db1a334d6576b1752494c256 [file] [log] [blame]
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "io"
24 "sync"
25
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/grpclog"
29 "google.golang.org/grpc/internal/channelz"
30 "google.golang.org/grpc/internal/transport"
31 "google.golang.org/grpc/status"
32)
33
34// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
35// actions and unblock when there's a picker update.
36type pickerWrapper struct {
37 mu sync.Mutex
38 done bool
39 blockingCh chan struct{}
David K. Bainbridgec415efe2021-08-19 13:05:21 +000040 picker balancer.Picker
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070041
David K. Bainbridgec415efe2021-08-19 13:05:21 +000042 // The latest connection happened.
43 connErrMu sync.Mutex
44 connErr error
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070045}
46
Arjun E K57a7fcb2020-01-30 06:44:45 +000047func newPickerWrapper() *pickerWrapper {
David K. Bainbridgec415efe2021-08-19 13:05:21 +000048 bp := &pickerWrapper{blockingCh: make(chan struct{})}
49 return bp
50}
51
52func (bp *pickerWrapper) updateConnectionError(err error) {
53 bp.connErrMu.Lock()
54 bp.connErr = err
55 bp.connErrMu.Unlock()
56}
57
58func (bp *pickerWrapper) connectionError() error {
59 bp.connErrMu.Lock()
60 err := bp.connErr
61 bp.connErrMu.Unlock()
62 return err
Arjun E K57a7fcb2020-01-30 06:44:45 +000063}
64
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070065// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
David K. Bainbridgec415efe2021-08-19 13:05:21 +000066func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
67 bp.mu.Lock()
68 if bp.done {
69 bp.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070070 return
71 }
David K. Bainbridgec415efe2021-08-19 13:05:21 +000072 bp.picker = p
73 // bp.blockingCh should never be nil.
74 close(bp.blockingCh)
75 bp.blockingCh = make(chan struct{})
76 bp.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -070077}
78
79func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
80 acw.mu.Lock()
81 ac := acw.ac
82 acw.mu.Unlock()
83 ac.incrCallsStarted()
84 return func(b balancer.DoneInfo) {
85 if b.Err != nil && b.Err != io.EOF {
86 ac.incrCallsFailed()
87 } else {
88 ac.incrCallsSucceeded()
89 }
90 if done != nil {
91 done(b)
92 }
93 }
94}
95
96// pick returns the transport that will be used for the RPC.
97// It may block in the following cases:
98// - there's no picker
99// - the current picker returns ErrNoSubConnAvailable
100// - the current picker returns other errors and failfast is false.
101// - the subConn returned by the current picker is not READY
102// When one of these situations happens, pick blocks until the picker gets updated.
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000103func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700104 var ch chan struct{}
105
106 for {
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000107 bp.mu.Lock()
108 if bp.done {
109 bp.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700110 return nil, nil, ErrClientConnClosing
111 }
112
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000113 if bp.picker == nil {
114 ch = bp.blockingCh
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700115 }
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000116 if ch == bp.blockingCh {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700117 // This could happen when either:
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000118 // - bp.picker is nil (the previous if condition), or
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700119 // - has called pick on the current picker.
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000120 bp.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700121 select {
122 case <-ctx.Done():
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000123 if connectionErr := bp.connectionError(); connectionErr != nil {
124 switch ctx.Err() {
125 case context.DeadlineExceeded:
126 return nil, nil, status.Errorf(codes.DeadlineExceeded, "latest connection error: %v", connectionErr)
127 case context.Canceled:
128 return nil, nil, status.Errorf(codes.Canceled, "latest connection error: %v", connectionErr)
129 }
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700130 }
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000131 return nil, nil, ctx.Err()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700132 case <-ch:
133 }
134 continue
135 }
136
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000137 ch = bp.blockingCh
138 p := bp.picker
139 bp.mu.Unlock()
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700140
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000141 subConn, done, err := p.Pick(ctx, opts)
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700142
143 if err != nil {
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000144 switch err {
145 case balancer.ErrNoSubConnAvailable:
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700146 continue
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000147 case balancer.ErrTransientFailure:
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700148 if !failfast {
149 continue
150 }
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000151 return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
152 case context.DeadlineExceeded:
153 return nil, nil, status.Error(codes.DeadlineExceeded, err.Error())
154 case context.Canceled:
155 return nil, nil, status.Error(codes.Canceled, err.Error())
156 default:
157 if _, ok := status.FromError(err); ok {
158 return nil, nil, err
159 }
160 // err is some other error.
161 return nil, nil, status.Error(codes.Unknown, err.Error())
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700162 }
163 }
164
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000165 acw, ok := subConn.(*acBalancerWrapper)
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700166 if !ok {
167 grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
168 continue
169 }
170 if t, ok := acw.getAddrConn().getReadyTransport(); ok {
171 if channelz.IsOn() {
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000172 return t, doneChannelzWrapper(acw, done), nil
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700173 }
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000174 return t, done, nil
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700175 }
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000176 if done != nil {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700177 // Calling done with nil error, no bytes sent and no bytes received.
178 // DoneInfo with default value works.
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000179 done(balancer.DoneInfo{})
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700180 }
181 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
182 // If ok == false, ac.state is not READY.
183 // A valid picker always returns READY subConn. This means the state of ac
184 // just changed, and picker will be updated shortly.
185 // continue back to the beginning of the for loop to repick.
186 }
187}
188
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000189func (bp *pickerWrapper) close() {
190 bp.mu.Lock()
191 defer bp.mu.Unlock()
192 if bp.done {
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700193 return
194 }
David K. Bainbridgec415efe2021-08-19 13:05:21 +0000195 bp.done = true
196 close(bp.blockingCh)
Matteo Scandoloa6a3aee2019-11-26 13:30:14 -0700197}