blob: bc965f0acaaa106680f972873d3c0ca0db177882 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "fmt"
23 "sync"
24
25 "google.golang.org/grpc/balancer"
26 "google.golang.org/grpc/connectivity"
27 "google.golang.org/grpc/grpclog"
28 "google.golang.org/grpc/resolver"
29)
30
31// scStateUpdate contains the subConn and the new state it changed to.
32type scStateUpdate struct {
33 sc balancer.SubConn
34 state connectivity.State
35}
36
37// scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
38// TODO make a general purpose buffer that uses interface{}.
39type scStateUpdateBuffer struct {
40 c chan *scStateUpdate
41 mu sync.Mutex
42 backlog []*scStateUpdate
43}
44
45func newSCStateUpdateBuffer() *scStateUpdateBuffer {
46 return &scStateUpdateBuffer{
47 c: make(chan *scStateUpdate, 1),
48 }
49}
50
51func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
52 b.mu.Lock()
53 defer b.mu.Unlock()
54 if len(b.backlog) == 0 {
55 select {
56 case b.c <- t:
57 return
58 default:
59 }
60 }
61 b.backlog = append(b.backlog, t)
62}
63
64func (b *scStateUpdateBuffer) load() {
65 b.mu.Lock()
66 defer b.mu.Unlock()
67 if len(b.backlog) > 0 {
68 select {
69 case b.c <- b.backlog[0]:
70 b.backlog[0] = nil
71 b.backlog = b.backlog[1:]
72 default:
73 }
74 }
75}
76
77// get returns the channel that the scStateUpdate will be sent to.
78//
79// Upon receiving, the caller should call load to send another
80// scStateChangeTuple onto the channel if there is any.
81func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
82 return b.c
83}
84
85// ccBalancerWrapper is a wrapper on top of cc for balancers.
86// It implements balancer.ClientConn interface.
87type ccBalancerWrapper struct {
88 cc *ClientConn
89 balancer balancer.Balancer
90 stateChangeQueue *scStateUpdateBuffer
91 resolverUpdateCh chan *resolver.State
92 done chan struct{}
93
94 mu sync.Mutex
95 subConns map[*acBalancerWrapper]struct{}
96}
97
98func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
99 ccb := &ccBalancerWrapper{
100 cc: cc,
101 stateChangeQueue: newSCStateUpdateBuffer(),
102 resolverUpdateCh: make(chan *resolver.State, 1),
103 done: make(chan struct{}),
104 subConns: make(map[*acBalancerWrapper]struct{}),
105 }
106 go ccb.watcher()
107 ccb.balancer = b.Build(ccb, bopts)
108 return ccb
109}
110
111// watcher balancer functions sequentially, so the balancer can be implemented
112// lock-free.
113func (ccb *ccBalancerWrapper) watcher() {
114 for {
115 select {
116 case t := <-ccb.stateChangeQueue.get():
117 ccb.stateChangeQueue.load()
118 select {
119 case <-ccb.done:
120 ccb.balancer.Close()
121 return
122 default:
123 }
124 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
125 ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
126 } else {
127 ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
128 }
129 case s := <-ccb.resolverUpdateCh:
130 select {
131 case <-ccb.done:
132 ccb.balancer.Close()
133 return
134 default:
135 }
136 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
137 ub.UpdateResolverState(*s)
138 } else {
139 ccb.balancer.HandleResolvedAddrs(s.Addresses, nil)
140 }
141 case <-ccb.done:
142 }
143
144 select {
145 case <-ccb.done:
146 ccb.balancer.Close()
147 ccb.mu.Lock()
148 scs := ccb.subConns
149 ccb.subConns = nil
150 ccb.mu.Unlock()
151 for acbw := range scs {
152 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
153 }
154 return
155 default:
156 }
157 }
158}
159
160func (ccb *ccBalancerWrapper) close() {
161 close(ccb.done)
162}
163
164func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
165 // When updating addresses for a SubConn, if the address in use is not in
166 // the new addresses, the old ac will be tearDown() and a new ac will be
167 // created. tearDown() generates a state change with Shutdown state, we
168 // don't want the balancer to receive this state change. So before
169 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
170 // this function will be called with (nil, Shutdown). We don't need to call
171 // balancer method in this case.
172 if sc == nil {
173 return
174 }
175 ccb.stateChangeQueue.put(&scStateUpdate{
176 sc: sc,
177 state: s,
178 })
179}
180
181func (ccb *ccBalancerWrapper) updateResolverState(s resolver.State) {
182 if ccb.cc.curBalancerName != grpclbName {
183 // Filter any grpclb addresses since we don't have the grpclb balancer.
184 for i := 0; i < len(s.Addresses); {
185 if s.Addresses[i].Type == resolver.GRPCLB {
186 copy(s.Addresses[i:], s.Addresses[i+1:])
187 s.Addresses = s.Addresses[:len(s.Addresses)-1]
188 continue
189 }
190 i++
191 }
192 }
193 select {
194 case <-ccb.resolverUpdateCh:
195 default:
196 }
197 ccb.resolverUpdateCh <- &s
198}
199
200func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
201 if len(addrs) <= 0 {
202 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
203 }
204 ccb.mu.Lock()
205 defer ccb.mu.Unlock()
206 if ccb.subConns == nil {
207 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
208 }
209 ac, err := ccb.cc.newAddrConn(addrs, opts)
210 if err != nil {
211 return nil, err
212 }
213 acbw := &acBalancerWrapper{ac: ac}
214 acbw.ac.mu.Lock()
215 ac.acbw = acbw
216 acbw.ac.mu.Unlock()
217 ccb.subConns[acbw] = struct{}{}
218 return acbw, nil
219}
220
221func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
222 acbw, ok := sc.(*acBalancerWrapper)
223 if !ok {
224 return
225 }
226 ccb.mu.Lock()
227 defer ccb.mu.Unlock()
228 if ccb.subConns == nil {
229 return
230 }
231 delete(ccb.subConns, acbw)
232 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
233}
234
235func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
236 ccb.mu.Lock()
237 defer ccb.mu.Unlock()
238 if ccb.subConns == nil {
239 return
240 }
241 // Update picker before updating state. Even though the ordering here does
242 // not matter, it can lead to multiple calls of Pick in the common start-up
243 // case where we wait for ready and then perform an RPC. If the picker is
244 // updated later, we could call the "connecting" picker when the state is
245 // updated, and then call the "ready" picker after the picker gets updated.
246 ccb.cc.blockingpicker.updatePicker(p)
247 ccb.cc.csMgr.updateState(s)
248}
249
250func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) {
251 ccb.cc.resolveNow(o)
252}
253
254func (ccb *ccBalancerWrapper) Target() string {
255 return ccb.cc.target
256}
257
258// acBalancerWrapper is a wrapper on top of ac for balancers.
259// It implements balancer.SubConn interface.
260type acBalancerWrapper struct {
261 mu sync.Mutex
262 ac *addrConn
263}
264
265func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
266 acbw.mu.Lock()
267 defer acbw.mu.Unlock()
268 if len(addrs) <= 0 {
269 acbw.ac.tearDown(errConnDrain)
270 return
271 }
272 if !acbw.ac.tryUpdateAddrs(addrs) {
273 cc := acbw.ac.cc
274 opts := acbw.ac.scopts
275 acbw.ac.mu.Lock()
276 // Set old ac.acbw to nil so the Shutdown state update will be ignored
277 // by balancer.
278 //
279 // TODO(bar) the state transition could be wrong when tearDown() old ac
280 // and creating new ac, fix the transition.
281 acbw.ac.acbw = nil
282 acbw.ac.mu.Unlock()
283 acState := acbw.ac.getState()
284 acbw.ac.tearDown(errConnDrain)
285
286 if acState == connectivity.Shutdown {
287 return
288 }
289
290 ac, err := cc.newAddrConn(addrs, opts)
291 if err != nil {
292 grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
293 return
294 }
295 acbw.ac = ac
296 ac.mu.Lock()
297 ac.acbw = acbw
298 ac.mu.Unlock()
299 if acState != connectivity.Idle {
300 ac.connect()
301 }
302 }
303}
304
305func (acbw *acBalancerWrapper) Connect() {
306 acbw.mu.Lock()
307 defer acbw.mu.Unlock()
308 acbw.ac.connect()
309}
310
311func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
312 acbw.mu.Lock()
313 defer acbw.mu.Unlock()
314 return acbw.ac
315}