blob: e8367cb8993b29cc4461b6203b8a4930bb7209f9 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "io"
24 "sync"
25
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/internal/channelz"
29 "google.golang.org/grpc/internal/transport"
30 "google.golang.org/grpc/status"
31)
32
33// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
34// actions and unblock when there's a picker update.
35type pickerWrapper struct {
36 mu sync.Mutex
37 done bool
38 blockingCh chan struct{}
39 picker balancer.Picker
40}
41
42func newPickerWrapper() *pickerWrapper {
43 return &pickerWrapper{blockingCh: make(chan struct{})}
44}
45
46// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
47func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
48 pw.mu.Lock()
49 if pw.done {
50 pw.mu.Unlock()
51 return
52 }
53 pw.picker = p
54 // pw.blockingCh should never be nil.
55 close(pw.blockingCh)
56 pw.blockingCh = make(chan struct{})
57 pw.mu.Unlock()
58}
59
60func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
61 acw.mu.Lock()
62 ac := acw.ac
63 acw.mu.Unlock()
64 ac.incrCallsStarted()
65 return func(b balancer.DoneInfo) {
66 if b.Err != nil && b.Err != io.EOF {
67 ac.incrCallsFailed()
68 } else {
69 ac.incrCallsSucceeded()
70 }
71 if done != nil {
72 done(b)
73 }
74 }
75}
76
77// pick returns the transport that will be used for the RPC.
78// It may block in the following cases:
79// - there's no picker
80// - the current picker returns ErrNoSubConnAvailable
81// - the current picker returns other errors and failfast is false.
82// - the subConn returned by the current picker is not READY
83// When one of these situations happens, pick blocks until the picker gets updated.
84func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
85 var ch chan struct{}
86
87 var lastPickErr error
88 for {
89 pw.mu.Lock()
90 if pw.done {
91 pw.mu.Unlock()
92 return nil, nil, ErrClientConnClosing
93 }
94
95 if pw.picker == nil {
96 ch = pw.blockingCh
97 }
98 if ch == pw.blockingCh {
99 // This could happen when either:
100 // - pw.picker is nil (the previous if condition), or
101 // - has called pick on the current picker.
102 pw.mu.Unlock()
103 select {
104 case <-ctx.Done():
105 var errStr string
106 if lastPickErr != nil {
107 errStr = "latest balancer error: " + lastPickErr.Error()
108 } else {
109 errStr = ctx.Err().Error()
110 }
111 switch ctx.Err() {
112 case context.DeadlineExceeded:
113 return nil, nil, status.Error(codes.DeadlineExceeded, errStr)
114 case context.Canceled:
115 return nil, nil, status.Error(codes.Canceled, errStr)
116 }
117 case <-ch:
118 }
119 continue
120 }
121
122 ch = pw.blockingCh
123 p := pw.picker
124 pw.mu.Unlock()
125
126 pickResult, err := p.Pick(info)
127
128 if err != nil {
129 if err == balancer.ErrNoSubConnAvailable {
130 continue
131 }
132 if _, ok := status.FromError(err); ok {
133 // Status error: end the RPC unconditionally with this status.
134 return nil, nil, err
135 }
136 // For all other errors, wait for ready RPCs should block and other
137 // RPCs should fail with unavailable.
138 if !failfast {
139 lastPickErr = err
140 continue
141 }
142 return nil, nil, status.Error(codes.Unavailable, err.Error())
143 }
144
145 acw, ok := pickResult.SubConn.(*acBalancerWrapper)
146 if !ok {
khenaidoo5cb0d402021-12-08 14:09:16 -0500147 logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
khenaidoo5fc5cea2021-08-11 17:39:16 -0400148 continue
149 }
150 if t := acw.getAddrConn().getReadyTransport(); t != nil {
151 if channelz.IsOn() {
152 return t, doneChannelzWrapper(acw, pickResult.Done), nil
153 }
154 return t, pickResult.Done, nil
155 }
156 if pickResult.Done != nil {
157 // Calling done with nil error, no bytes sent and no bytes received.
158 // DoneInfo with default value works.
159 pickResult.Done(balancer.DoneInfo{})
160 }
161 logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
162 // If ok == false, ac.state is not READY.
163 // A valid picker always returns READY subConn. This means the state of ac
164 // just changed, and picker will be updated shortly.
165 // continue back to the beginning of the for loop to repick.
166 }
167}
168
169func (pw *pickerWrapper) close() {
170 pw.mu.Lock()
171 defer pw.mu.Unlock()
172 if pw.done {
173 return
174 }
175 pw.done = true
176 close(pw.blockingCh)
177}