blob: 7233ade293bc0a69d7f880203ba4be42886ba1e5 [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "fmt"
23 "sync"
24
25 "google.golang.org/grpc/balancer"
26 "google.golang.org/grpc/connectivity"
27 "google.golang.org/grpc/grpclog"
28 "google.golang.org/grpc/resolver"
29)
30
31// scStateUpdate contains the subConn and the new state it changed to.
32type scStateUpdate struct {
33 sc balancer.SubConn
34 state connectivity.State
35}
36
37// scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
38// TODO make a general purpose buffer that uses interface{}.
39type scStateUpdateBuffer struct {
40 c chan *scStateUpdate
41 mu sync.Mutex
42 backlog []*scStateUpdate
43}
44
45func newSCStateUpdateBuffer() *scStateUpdateBuffer {
46 return &scStateUpdateBuffer{
47 c: make(chan *scStateUpdate, 1),
48 }
49}
50
51func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
52 b.mu.Lock()
53 defer b.mu.Unlock()
54 if len(b.backlog) == 0 {
55 select {
56 case b.c <- t:
57 return
58 default:
59 }
60 }
61 b.backlog = append(b.backlog, t)
62}
63
64func (b *scStateUpdateBuffer) load() {
65 b.mu.Lock()
66 defer b.mu.Unlock()
67 if len(b.backlog) > 0 {
68 select {
69 case b.c <- b.backlog[0]:
70 b.backlog[0] = nil
71 b.backlog = b.backlog[1:]
72 default:
73 }
74 }
75}
76
77// get returns the channel that the scStateUpdate will be sent to.
78//
79// Upon receiving, the caller should call load to send another
80// scStateChangeTuple onto the channel if there is any.
81func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
82 return b.c
83}
84
85// resolverUpdate contains the new resolved addresses or error if there's
86// any.
87type resolverUpdate struct {
88 addrs []resolver.Address
89 err error
90}
91
92// ccBalancerWrapper is a wrapper on top of cc for balancers.
93// It implements balancer.ClientConn interface.
94type ccBalancerWrapper struct {
95 cc *ClientConn
96 balancer balancer.Balancer
97 stateChangeQueue *scStateUpdateBuffer
98 resolverUpdateCh chan *resolverUpdate
99 done chan struct{}
100
101 mu sync.Mutex
102 subConns map[*acBalancerWrapper]struct{}
103}
104
105func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
106 ccb := &ccBalancerWrapper{
107 cc: cc,
108 stateChangeQueue: newSCStateUpdateBuffer(),
109 resolverUpdateCh: make(chan *resolverUpdate, 1),
110 done: make(chan struct{}),
111 subConns: make(map[*acBalancerWrapper]struct{}),
112 }
113 go ccb.watcher()
114 ccb.balancer = b.Build(ccb, bopts)
115 return ccb
116}
117
118// watcher balancer functions sequentially, so the balancer can be implemented
119// lock-free.
120func (ccb *ccBalancerWrapper) watcher() {
121 for {
122 select {
123 case t := <-ccb.stateChangeQueue.get():
124 ccb.stateChangeQueue.load()
125 select {
126 case <-ccb.done:
127 ccb.balancer.Close()
128 return
129 default:
130 }
131 ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
132 case t := <-ccb.resolverUpdateCh:
133 select {
134 case <-ccb.done:
135 ccb.balancer.Close()
136 return
137 default:
138 }
139 ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
140 case <-ccb.done:
141 }
142
143 select {
144 case <-ccb.done:
145 ccb.balancer.Close()
146 ccb.mu.Lock()
147 scs := ccb.subConns
148 ccb.subConns = nil
149 ccb.mu.Unlock()
150 for acbw := range scs {
151 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
152 }
153 return
154 default:
155 }
156 }
157}
158
159func (ccb *ccBalancerWrapper) close() {
160 close(ccb.done)
161}
162
163func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
164 // When updating addresses for a SubConn, if the address in use is not in
165 // the new addresses, the old ac will be tearDown() and a new ac will be
166 // created. tearDown() generates a state change with Shutdown state, we
167 // don't want the balancer to receive this state change. So before
168 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
169 // this function will be called with (nil, Shutdown). We don't need to call
170 // balancer method in this case.
171 if sc == nil {
172 return
173 }
174 ccb.stateChangeQueue.put(&scStateUpdate{
175 sc: sc,
176 state: s,
177 })
178}
179
180func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) {
181 if ccb.cc.curBalancerName != grpclbName {
182 var containsGRPCLB bool
183 for _, a := range addrs {
184 if a.Type == resolver.GRPCLB {
185 containsGRPCLB = true
186 break
187 }
188 }
189 if containsGRPCLB {
190 // The current balancer is not grpclb, but addresses contain grpclb
191 // address. This means we failed to switch to grpclb, most likely
192 // because grpclb is not registered. Filter out all grpclb addresses
193 // from addrs before sending to balancer.
194 tempAddrs := make([]resolver.Address, 0, len(addrs))
195 for _, a := range addrs {
196 if a.Type != resolver.GRPCLB {
197 tempAddrs = append(tempAddrs, a)
198 }
199 }
200 addrs = tempAddrs
201 }
202 }
203 select {
204 case <-ccb.resolverUpdateCh:
205 default:
206 }
207 ccb.resolverUpdateCh <- &resolverUpdate{
208 addrs: addrs,
209 err: err,
210 }
211}
212
213func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
214 if len(addrs) <= 0 {
215 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
216 }
217 ccb.mu.Lock()
218 defer ccb.mu.Unlock()
219 if ccb.subConns == nil {
220 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
221 }
222 ac, err := ccb.cc.newAddrConn(addrs, opts)
223 if err != nil {
224 return nil, err
225 }
226 acbw := &acBalancerWrapper{ac: ac}
227 acbw.ac.mu.Lock()
228 ac.acbw = acbw
229 acbw.ac.mu.Unlock()
230 ccb.subConns[acbw] = struct{}{}
231 return acbw, nil
232}
233
234func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
235 acbw, ok := sc.(*acBalancerWrapper)
236 if !ok {
237 return
238 }
239 ccb.mu.Lock()
240 defer ccb.mu.Unlock()
241 if ccb.subConns == nil {
242 return
243 }
244 delete(ccb.subConns, acbw)
245 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
246}
247
248func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
249 ccb.mu.Lock()
250 defer ccb.mu.Unlock()
251 if ccb.subConns == nil {
252 return
253 }
254 // Update picker before updating state. Even though the ordering here does
255 // not matter, it can lead to multiple calls of Pick in the common start-up
256 // case where we wait for ready and then perform an RPC. If the picker is
257 // updated later, we could call the "connecting" picker when the state is
258 // updated, and then call the "ready" picker after the picker gets updated.
259 ccb.cc.blockingpicker.updatePicker(p)
260 ccb.cc.csMgr.updateState(s)
261}
262
263func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) {
264 ccb.cc.resolveNow(o)
265}
266
267func (ccb *ccBalancerWrapper) Target() string {
268 return ccb.cc.target
269}
270
271// acBalancerWrapper is a wrapper on top of ac for balancers.
272// It implements balancer.SubConn interface.
273type acBalancerWrapper struct {
274 mu sync.Mutex
275 ac *addrConn
276}
277
278func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
279 acbw.mu.Lock()
280 defer acbw.mu.Unlock()
281 if len(addrs) <= 0 {
282 acbw.ac.tearDown(errConnDrain)
283 return
284 }
285 if !acbw.ac.tryUpdateAddrs(addrs) {
286 cc := acbw.ac.cc
287 opts := acbw.ac.scopts
288 acbw.ac.mu.Lock()
289 // Set old ac.acbw to nil so the Shutdown state update will be ignored
290 // by balancer.
291 //
292 // TODO(bar) the state transition could be wrong when tearDown() old ac
293 // and creating new ac, fix the transition.
294 acbw.ac.acbw = nil
295 acbw.ac.mu.Unlock()
296 acState := acbw.ac.getState()
297 acbw.ac.tearDown(errConnDrain)
298
299 if acState == connectivity.Shutdown {
300 return
301 }
302
303 ac, err := cc.newAddrConn(addrs, opts)
304 if err != nil {
305 grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
306 return
307 }
308 acbw.ac = ac
309 ac.mu.Lock()
310 ac.acbw = acbw
311 ac.mu.Unlock()
312 if acState != connectivity.Idle {
313 ac.connect()
314 }
315 }
316}
317
318func (acbw *acBalancerWrapper) Connect() {
319 acbw.mu.Lock()
320 defer acbw.mu.Unlock()
321 acbw.ac.connect()
322}
323
324func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
325 acbw.mu.Lock()
326 defer acbw.mu.Unlock()
327 return acbw.ac
328}