blob: 14f915d6768aa9338849bbedf281cccb88819621 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "io"
24 "sync"
25
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/grpclog"
29 "google.golang.org/grpc/internal/channelz"
30 "google.golang.org/grpc/internal/transport"
31 "google.golang.org/grpc/status"
32)
33
34// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
35// actions and unblock when there's a picker update.
36type pickerWrapper struct {
37 mu sync.Mutex
38 done bool
39 blockingCh chan struct{}
40 picker balancer.Picker
41
42 // The latest connection happened.
43 connErrMu sync.Mutex
44 connErr error
45}
46
47func newPickerWrapper() *pickerWrapper {
48 bp := &pickerWrapper{blockingCh: make(chan struct{})}
49 return bp
50}
51
52func (bp *pickerWrapper) updateConnectionError(err error) {
53 bp.connErrMu.Lock()
54 bp.connErr = err
55 bp.connErrMu.Unlock()
56}
57
58func (bp *pickerWrapper) connectionError() error {
59 bp.connErrMu.Lock()
60 err := bp.connErr
61 bp.connErrMu.Unlock()
62 return err
63}
64
65// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
66func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
67 bp.mu.Lock()
68 if bp.done {
69 bp.mu.Unlock()
70 return
71 }
72 bp.picker = p
73 // bp.blockingCh should never be nil.
74 close(bp.blockingCh)
75 bp.blockingCh = make(chan struct{})
76 bp.mu.Unlock()
77}
78
79func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
80 acw.mu.Lock()
81 ac := acw.ac
82 acw.mu.Unlock()
83 ac.incrCallsStarted()
84 return func(b balancer.DoneInfo) {
85 if b.Err != nil && b.Err != io.EOF {
86 ac.incrCallsFailed()
87 } else {
88 ac.incrCallsSucceeded()
89 }
90 if done != nil {
91 done(b)
92 }
93 }
94}
95
96// pick returns the transport that will be used for the RPC.
97// It may block in the following cases:
98// - there's no picker
99// - the current picker returns ErrNoSubConnAvailable
100// - the current picker returns other errors and failfast is false.
101// - the subConn returned by the current picker is not READY
102// When one of these situations happens, pick blocks until the picker gets updated.
103func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
104 var (
105 p balancer.Picker
106 ch chan struct{}
107 )
108
109 for {
110 bp.mu.Lock()
111 if bp.done {
112 bp.mu.Unlock()
113 return nil, nil, ErrClientConnClosing
114 }
115
116 if bp.picker == nil {
117 ch = bp.blockingCh
118 }
119 if ch == bp.blockingCh {
120 // This could happen when either:
121 // - bp.picker is nil (the previous if condition), or
122 // - has called pick on the current picker.
123 bp.mu.Unlock()
124 select {
125 case <-ctx.Done():
126 return nil, nil, ctx.Err()
127 case <-ch:
128 }
129 continue
130 }
131
132 ch = bp.blockingCh
133 p = bp.picker
134 bp.mu.Unlock()
135
136 subConn, done, err := p.Pick(ctx, opts)
137
138 if err != nil {
139 switch err {
140 case balancer.ErrNoSubConnAvailable:
141 continue
142 case balancer.ErrTransientFailure:
143 if !failfast {
144 continue
145 }
146 return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
147 default:
148 // err is some other error.
149 return nil, nil, toRPCErr(err)
150 }
151 }
152
153 acw, ok := subConn.(*acBalancerWrapper)
154 if !ok {
155 grpclog.Infof("subconn returned from pick is not *acBalancerWrapper")
156 continue
157 }
158 if t, ok := acw.getAddrConn().getReadyTransport(); ok {
159 if channelz.IsOn() {
160 return t, doneChannelzWrapper(acw, done), nil
161 }
162 return t, done, nil
163 }
164 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
165 // If ok == false, ac.state is not READY.
166 // A valid picker always returns READY subConn. This means the state of ac
167 // just changed, and picker will be updated shortly.
168 // continue back to the beginning of the for loop to repick.
169 }
170}
171
172func (bp *pickerWrapper) close() {
173 bp.mu.Lock()
174 defer bp.mu.Unlock()
175 if bp.done {
176 return
177 }
178 bp.done = true
179 close(bp.blockingCh)
180}