blob: f4ea61746823c6cce9658235c46059c55628fad7 [file] [log] [blame]
khenaidoo5fc5cea2021-08-11 17:39:16 -04001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "fmt"
23 "sync"
24
25 "google.golang.org/grpc/balancer"
26 "google.golang.org/grpc/connectivity"
27 "google.golang.org/grpc/internal/buffer"
28 "google.golang.org/grpc/internal/channelz"
29 "google.golang.org/grpc/internal/grpcsync"
30 "google.golang.org/grpc/resolver"
31)
32
33// scStateUpdate contains the subConn and the new state it changed to.
34type scStateUpdate struct {
35 sc balancer.SubConn
36 state connectivity.State
37 err error
38}
39
40// exitIdle contains no data and is just a signal sent on the updateCh in
41// ccBalancerWrapper to instruct the balancer to exit idle.
42type exitIdle struct{}
43
44// ccBalancerWrapper is a wrapper on top of cc for balancers.
45// It implements balancer.ClientConn interface.
46type ccBalancerWrapper struct {
47 cc *ClientConn
48 balancerMu sync.Mutex // synchronizes calls to the balancer
49 balancer balancer.Balancer
50 hasExitIdle bool
51 updateCh *buffer.Unbounded
52 closed *grpcsync.Event
53 done *grpcsync.Event
54
55 mu sync.Mutex
56 subConns map[*acBalancerWrapper]struct{}
57}
58
59func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
60 ccb := &ccBalancerWrapper{
61 cc: cc,
62 updateCh: buffer.NewUnbounded(),
63 closed: grpcsync.NewEvent(),
64 done: grpcsync.NewEvent(),
65 subConns: make(map[*acBalancerWrapper]struct{}),
66 }
67 go ccb.watcher()
68 ccb.balancer = b.Build(ccb, bopts)
69 _, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
70 return ccb
71}
72
73// watcher balancer functions sequentially, so the balancer can be implemented
74// lock-free.
75func (ccb *ccBalancerWrapper) watcher() {
76 for {
77 select {
78 case t := <-ccb.updateCh.Get():
79 ccb.updateCh.Load()
80 if ccb.closed.HasFired() {
81 break
82 }
83 switch u := t.(type) {
84 case *scStateUpdate:
85 ccb.balancerMu.Lock()
86 ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
87 ccb.balancerMu.Unlock()
88 case *acBalancerWrapper:
89 ccb.mu.Lock()
90 if ccb.subConns != nil {
91 delete(ccb.subConns, u)
92 ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
93 }
94 ccb.mu.Unlock()
95 case exitIdle:
96 if ccb.cc.GetState() == connectivity.Idle {
97 if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {
98 // We already checked that the balancer implements
99 // ExitIdle before pushing the event to updateCh, but
100 // check conditionally again as defensive programming.
101 ccb.balancerMu.Lock()
102 ei.ExitIdle()
103 ccb.balancerMu.Unlock()
104 }
105 }
106 default:
107 logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
108 }
109 case <-ccb.closed.Done():
110 }
111
112 if ccb.closed.HasFired() {
113 ccb.balancerMu.Lock()
114 ccb.balancer.Close()
115 ccb.balancerMu.Unlock()
116 ccb.mu.Lock()
117 scs := ccb.subConns
118 ccb.subConns = nil
119 ccb.mu.Unlock()
120 ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
121 ccb.done.Fire()
122 // Fire done before removing the addr conns. We can safely unblock
123 // ccb.close and allow the removeAddrConns to happen
124 // asynchronously.
125 for acbw := range scs {
126 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
127 }
128 return
129 }
130 }
131}
132
133func (ccb *ccBalancerWrapper) close() {
134 ccb.closed.Fire()
135 <-ccb.done.Done()
136}
137
138func (ccb *ccBalancerWrapper) exitIdle() bool {
139 if !ccb.hasExitIdle {
140 return false
141 }
142 ccb.updateCh.Put(exitIdle{})
143 return true
144}
145
146func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
147 // When updating addresses for a SubConn, if the address in use is not in
148 // the new addresses, the old ac will be tearDown() and a new ac will be
149 // created. tearDown() generates a state change with Shutdown state, we
150 // don't want the balancer to receive this state change. So before
151 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
152 // this function will be called with (nil, Shutdown). We don't need to call
153 // balancer method in this case.
154 if sc == nil {
155 return
156 }
157 ccb.updateCh.Put(&scStateUpdate{
158 sc: sc,
159 state: s,
160 err: err,
161 })
162}
163
164func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
165 ccb.balancerMu.Lock()
166 defer ccb.balancerMu.Unlock()
167 return ccb.balancer.UpdateClientConnState(*ccs)
168}
169
170func (ccb *ccBalancerWrapper) resolverError(err error) {
171 ccb.balancerMu.Lock()
172 defer ccb.balancerMu.Unlock()
173 ccb.balancer.ResolverError(err)
174}
175
176func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
177 if len(addrs) <= 0 {
178 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
179 }
180 ccb.mu.Lock()
181 defer ccb.mu.Unlock()
182 if ccb.subConns == nil {
183 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
184 }
185 ac, err := ccb.cc.newAddrConn(addrs, opts)
186 if err != nil {
187 return nil, err
188 }
189 acbw := &acBalancerWrapper{ac: ac}
190 acbw.ac.mu.Lock()
191 ac.acbw = acbw
192 acbw.ac.mu.Unlock()
193 ccb.subConns[acbw] = struct{}{}
194 return acbw, nil
195}
196
197func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
198 // The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock
199 // during switchBalancer() if the old balancer calls RemoveSubConn() in its
200 // Close().
201 ccb.updateCh.Put(sc)
202}
203
204func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
205 acbw, ok := sc.(*acBalancerWrapper)
206 if !ok {
207 return
208 }
209 acbw.UpdateAddresses(addrs)
210}
211
212func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
213 ccb.mu.Lock()
214 defer ccb.mu.Unlock()
215 if ccb.subConns == nil {
216 return
217 }
218 // Update picker before updating state. Even though the ordering here does
219 // not matter, it can lead to multiple calls of Pick in the common start-up
220 // case where we wait for ready and then perform an RPC. If the picker is
221 // updated later, we could call the "connecting" picker when the state is
222 // updated, and then call the "ready" picker after the picker gets updated.
223 ccb.cc.blockingpicker.updatePicker(s.Picker)
224 ccb.cc.csMgr.updateState(s.ConnectivityState)
225}
226
227func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
228 ccb.cc.resolveNow(o)
229}
230
231func (ccb *ccBalancerWrapper) Target() string {
232 return ccb.cc.target
233}
234
235// acBalancerWrapper is a wrapper on top of ac for balancers.
236// It implements balancer.SubConn interface.
237type acBalancerWrapper struct {
238 mu sync.Mutex
239 ac *addrConn
240}
241
242func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
243 acbw.mu.Lock()
244 defer acbw.mu.Unlock()
245 if len(addrs) <= 0 {
246 acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
247 return
248 }
249 if !acbw.ac.tryUpdateAddrs(addrs) {
250 cc := acbw.ac.cc
251 opts := acbw.ac.scopts
252 acbw.ac.mu.Lock()
253 // Set old ac.acbw to nil so the Shutdown state update will be ignored
254 // by balancer.
255 //
256 // TODO(bar) the state transition could be wrong when tearDown() old ac
257 // and creating new ac, fix the transition.
258 acbw.ac.acbw = nil
259 acbw.ac.mu.Unlock()
260 acState := acbw.ac.getState()
261 acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
262
263 if acState == connectivity.Shutdown {
264 return
265 }
266
267 newAC, err := cc.newAddrConn(addrs, opts)
268 if err != nil {
269 channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
270 return
271 }
272 acbw.ac = newAC
273 newAC.mu.Lock()
274 newAC.acbw = acbw
275 newAC.mu.Unlock()
276 if acState != connectivity.Idle {
277 go newAC.connect()
278 }
279 }
280}
281
282func (acbw *acBalancerWrapper) Connect() {
283 acbw.mu.Lock()
284 defer acbw.mu.Unlock()
285 go acbw.ac.connect()
286}
287
288func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
289 acbw.mu.Lock()
290 defer acbw.mu.Unlock()
291 return acbw.ac
292}