blob: a2575c9637b09620e899b4660780415844107b4b [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "io"
24 "sync"
25
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/grpclog"
29 "google.golang.org/grpc/internal/channelz"
30 "google.golang.org/grpc/internal/transport"
31 "google.golang.org/grpc/status"
32)
33
34// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
35// actions and unblock when there's a picker update.
36type pickerWrapper struct {
37 mu sync.Mutex
38 done bool
39 blockingCh chan struct{}
40 picker balancer.Picker
41
42 // The latest connection happened.
43 connErrMu sync.Mutex
44 connErr error
45}
46
47func newPickerWrapper() *pickerWrapper {
48 bp := &pickerWrapper{blockingCh: make(chan struct{})}
49 return bp
50}
51
52func (bp *pickerWrapper) updateConnectionError(err error) {
53 bp.connErrMu.Lock()
54 bp.connErr = err
55 bp.connErrMu.Unlock()
56}
57
58func (bp *pickerWrapper) connectionError() error {
59 bp.connErrMu.Lock()
60 err := bp.connErr
61 bp.connErrMu.Unlock()
62 return err
63}
64
65// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
66func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
67 bp.mu.Lock()
68 if bp.done {
69 bp.mu.Unlock()
70 return
71 }
72 bp.picker = p
73 // bp.blockingCh should never be nil.
74 close(bp.blockingCh)
75 bp.blockingCh = make(chan struct{})
76 bp.mu.Unlock()
77}
78
79func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
80 acw.mu.Lock()
81 ac := acw.ac
82 acw.mu.Unlock()
83 ac.incrCallsStarted()
84 return func(b balancer.DoneInfo) {
85 if b.Err != nil && b.Err != io.EOF {
86 ac.incrCallsFailed()
87 } else {
88 ac.incrCallsSucceeded()
89 }
90 if done != nil {
91 done(b)
92 }
93 }
94}
95
96// pick returns the transport that will be used for the RPC.
97// It may block in the following cases:
98// - there's no picker
99// - the current picker returns ErrNoSubConnAvailable
100// - the current picker returns other errors and failfast is false.
101// - the subConn returned by the current picker is not READY
102// When one of these situations happens, pick blocks until the picker gets updated.
103func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
104 var ch chan struct{}
105
106 for {
107 bp.mu.Lock()
108 if bp.done {
109 bp.mu.Unlock()
110 return nil, nil, ErrClientConnClosing
111 }
112
113 if bp.picker == nil {
114 ch = bp.blockingCh
115 }
116 if ch == bp.blockingCh {
117 // This could happen when either:
118 // - bp.picker is nil (the previous if condition), or
119 // - has called pick on the current picker.
120 bp.mu.Unlock()
121 select {
122 case <-ctx.Done():
123 return nil, nil, ctx.Err()
124 case <-ch:
125 }
126 continue
127 }
128
129 ch = bp.blockingCh
130 p := bp.picker
131 bp.mu.Unlock()
132
133 subConn, done, err := p.Pick(ctx, opts)
134
135 if err != nil {
136 switch err {
137 case balancer.ErrNoSubConnAvailable:
138 continue
139 case balancer.ErrTransientFailure:
140 if !failfast {
141 continue
142 }
143 return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
144 case context.DeadlineExceeded:
145 return nil, nil, status.Error(codes.DeadlineExceeded, err.Error())
146 case context.Canceled:
147 return nil, nil, status.Error(codes.Canceled, err.Error())
148 default:
149 if _, ok := status.FromError(err); ok {
150 return nil, nil, err
151 }
152 // err is some other error.
153 return nil, nil, status.Error(codes.Unknown, err.Error())
154 }
155 }
156
157 acw, ok := subConn.(*acBalancerWrapper)
158 if !ok {
159 grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
160 continue
161 }
162 if t, ok := acw.getAddrConn().getReadyTransport(); ok {
163 if channelz.IsOn() {
164 return t, doneChannelzWrapper(acw, done), nil
165 }
166 return t, done, nil
167 }
168 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
169 // If ok == false, ac.state is not READY.
170 // A valid picker always returns READY subConn. This means the state of ac
171 // just changed, and picker will be updated shortly.
172 // continue back to the beginning of the for loop to repick.
173 }
174}
175
176func (bp *pickerWrapper) close() {
177 bp.mu.Lock()
178 defer bp.mu.Unlock()
179 if bp.done {
180 return
181 }
182 bp.done = true
183 close(bp.blockingCh)
184}