blob: 8df4095ca95177f131076b8fe5717b4cf70a9b0d [file] [log] [blame]
Don Newton98fd8812019-09-23 15:15:02 -04001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "fmt"
23 "sync"
24
25 "google.golang.org/grpc/balancer"
26 "google.golang.org/grpc/connectivity"
27 "google.golang.org/grpc/grpclog"
28 "google.golang.org/grpc/resolver"
29)
30
31// scStateUpdate contains the subConn and the new state it changed to.
32type scStateUpdate struct {
33 sc balancer.SubConn
34 state connectivity.State
35}
36
37// scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
38// TODO make a general purpose buffer that uses interface{}.
39type scStateUpdateBuffer struct {
40 c chan *scStateUpdate
41 mu sync.Mutex
42 backlog []*scStateUpdate
43}
44
45func newSCStateUpdateBuffer() *scStateUpdateBuffer {
46 return &scStateUpdateBuffer{
47 c: make(chan *scStateUpdate, 1),
48 }
49}
50
51func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
52 b.mu.Lock()
53 defer b.mu.Unlock()
54 if len(b.backlog) == 0 {
55 select {
56 case b.c <- t:
57 return
58 default:
59 }
60 }
61 b.backlog = append(b.backlog, t)
62}
63
64func (b *scStateUpdateBuffer) load() {
65 b.mu.Lock()
66 defer b.mu.Unlock()
67 if len(b.backlog) > 0 {
68 select {
69 case b.c <- b.backlog[0]:
70 b.backlog[0] = nil
71 b.backlog = b.backlog[1:]
72 default:
73 }
74 }
75}
76
77// get returns the channel that the scStateUpdate will be sent to.
78//
79// Upon receiving, the caller should call load to send another
80// scStateChangeTuple onto the channel if there is any.
81func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
82 return b.c
83}
84
85// ccBalancerWrapper is a wrapper on top of cc for balancers.
86// It implements balancer.ClientConn interface.
87type ccBalancerWrapper struct {
88 cc *ClientConn
89 balancer balancer.Balancer
90 stateChangeQueue *scStateUpdateBuffer
91 ccUpdateCh chan *balancer.ClientConnState
92 done chan struct{}
93
94 mu sync.Mutex
95 subConns map[*acBalancerWrapper]struct{}
96}
97
98func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
99 ccb := &ccBalancerWrapper{
100 cc: cc,
101 stateChangeQueue: newSCStateUpdateBuffer(),
102 ccUpdateCh: make(chan *balancer.ClientConnState, 1),
103 done: make(chan struct{}),
104 subConns: make(map[*acBalancerWrapper]struct{}),
105 }
106 go ccb.watcher()
107 ccb.balancer = b.Build(ccb, bopts)
108 return ccb
109}
110
111// watcher balancer functions sequentially, so the balancer can be implemented
112// lock-free.
113func (ccb *ccBalancerWrapper) watcher() {
114 for {
115 select {
116 case t := <-ccb.stateChangeQueue.get():
117 ccb.stateChangeQueue.load()
118 select {
119 case <-ccb.done:
120 ccb.balancer.Close()
121 return
122 default:
123 }
124 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
125 ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
126 } else {
127 ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
128 }
129 case s := <-ccb.ccUpdateCh:
130 select {
131 case <-ccb.done:
132 ccb.balancer.Close()
133 return
134 default:
135 }
136 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
137 ub.UpdateClientConnState(*s)
138 } else {
139 ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
140 }
141 case <-ccb.done:
142 }
143
144 select {
145 case <-ccb.done:
146 ccb.balancer.Close()
147 ccb.mu.Lock()
148 scs := ccb.subConns
149 ccb.subConns = nil
150 ccb.mu.Unlock()
151 for acbw := range scs {
152 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
153 }
154 ccb.UpdateBalancerState(connectivity.Connecting, nil)
155 return
156 default:
157 }
158 ccb.cc.firstResolveEvent.Fire()
159 }
160}
161
162func (ccb *ccBalancerWrapper) close() {
163 close(ccb.done)
164}
165
166func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
167 // When updating addresses for a SubConn, if the address in use is not in
168 // the new addresses, the old ac will be tearDown() and a new ac will be
169 // created. tearDown() generates a state change with Shutdown state, we
170 // don't want the balancer to receive this state change. So before
171 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
172 // this function will be called with (nil, Shutdown). We don't need to call
173 // balancer method in this case.
174 if sc == nil {
175 return
176 }
177 ccb.stateChangeQueue.put(&scStateUpdate{
178 sc: sc,
179 state: s,
180 })
181}
182
183func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
184 if ccb.cc.curBalancerName != grpclbName {
185 // Filter any grpclb addresses since we don't have the grpclb balancer.
186 s := &ccs.ResolverState
187 for i := 0; i < len(s.Addresses); {
188 if s.Addresses[i].Type == resolver.GRPCLB {
189 copy(s.Addresses[i:], s.Addresses[i+1:])
190 s.Addresses = s.Addresses[:len(s.Addresses)-1]
191 continue
192 }
193 i++
194 }
195 }
196 select {
197 case <-ccb.ccUpdateCh:
198 default:
199 }
200 ccb.ccUpdateCh <- ccs
201}
202
203func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
204 if len(addrs) <= 0 {
205 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
206 }
207 ccb.mu.Lock()
208 defer ccb.mu.Unlock()
209 if ccb.subConns == nil {
210 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
211 }
212 ac, err := ccb.cc.newAddrConn(addrs, opts)
213 if err != nil {
214 return nil, err
215 }
216 acbw := &acBalancerWrapper{ac: ac}
217 acbw.ac.mu.Lock()
218 ac.acbw = acbw
219 acbw.ac.mu.Unlock()
220 ccb.subConns[acbw] = struct{}{}
221 return acbw, nil
222}
223
224func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
225 acbw, ok := sc.(*acBalancerWrapper)
226 if !ok {
227 return
228 }
229 ccb.mu.Lock()
230 defer ccb.mu.Unlock()
231 if ccb.subConns == nil {
232 return
233 }
234 delete(ccb.subConns, acbw)
235 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
236}
237
238func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
239 ccb.mu.Lock()
240 defer ccb.mu.Unlock()
241 if ccb.subConns == nil {
242 return
243 }
244 // Update picker before updating state. Even though the ordering here does
245 // not matter, it can lead to multiple calls of Pick in the common start-up
246 // case where we wait for ready and then perform an RPC. If the picker is
247 // updated later, we could call the "connecting" picker when the state is
248 // updated, and then call the "ready" picker after the picker gets updated.
249 ccb.cc.blockingpicker.updatePicker(p)
250 ccb.cc.csMgr.updateState(s)
251}
252
253func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) {
254 ccb.cc.resolveNow(o)
255}
256
257func (ccb *ccBalancerWrapper) Target() string {
258 return ccb.cc.target
259}
260
261// acBalancerWrapper is a wrapper on top of ac for balancers.
262// It implements balancer.SubConn interface.
263type acBalancerWrapper struct {
264 mu sync.Mutex
265 ac *addrConn
266}
267
268func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
269 acbw.mu.Lock()
270 defer acbw.mu.Unlock()
271 if len(addrs) <= 0 {
272 acbw.ac.tearDown(errConnDrain)
273 return
274 }
275 if !acbw.ac.tryUpdateAddrs(addrs) {
276 cc := acbw.ac.cc
277 opts := acbw.ac.scopts
278 acbw.ac.mu.Lock()
279 // Set old ac.acbw to nil so the Shutdown state update will be ignored
280 // by balancer.
281 //
282 // TODO(bar) the state transition could be wrong when tearDown() old ac
283 // and creating new ac, fix the transition.
284 acbw.ac.acbw = nil
285 acbw.ac.mu.Unlock()
286 acState := acbw.ac.getState()
287 acbw.ac.tearDown(errConnDrain)
288
289 if acState == connectivity.Shutdown {
290 return
291 }
292
293 ac, err := cc.newAddrConn(addrs, opts)
294 if err != nil {
295 grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
296 return
297 }
298 acbw.ac = ac
299 ac.mu.Lock()
300 ac.acbw = acbw
301 ac.mu.Unlock()
302 if acState != connectivity.Idle {
303 ac.connect()
304 }
305 }
306}
307
308func (acbw *acBalancerWrapper) Connect() {
309 acbw.mu.Lock()
310 defer acbw.mu.Unlock()
311 acbw.ac.connect()
312}
313
314func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
315 acbw.mu.Lock()
316 defer acbw.mu.Unlock()
317 return acbw.ac
318}