blob: 5356194c34025ca93c9936204ca87a4cbfbc5621 [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "fmt"
23 "sync"
24
25 "google.golang.org/grpc/balancer"
26 "google.golang.org/grpc/connectivity"
27 "google.golang.org/grpc/grpclog"
28 "google.golang.org/grpc/internal/buffer"
29 "google.golang.org/grpc/internal/grpcsync"
30 "google.golang.org/grpc/resolver"
31)
32
33// scStateUpdate contains the subConn and the new state it changed to.
34type scStateUpdate struct {
35 sc balancer.SubConn
36 state connectivity.State
37}
38
39// ccBalancerWrapper is a wrapper on top of cc for balancers.
40// It implements balancer.ClientConn interface.
41type ccBalancerWrapper struct {
42 cc *ClientConn
43 balancerMu sync.Mutex // synchronizes calls to the balancer
44 balancer balancer.Balancer
45 scBuffer *buffer.Unbounded
46 done *grpcsync.Event
47
48 mu sync.Mutex
49 subConns map[*acBalancerWrapper]struct{}
50}
51
52func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
53 ccb := &ccBalancerWrapper{
54 cc: cc,
55 scBuffer: buffer.NewUnbounded(),
56 done: grpcsync.NewEvent(),
57 subConns: make(map[*acBalancerWrapper]struct{}),
58 }
59 go ccb.watcher()
60 ccb.balancer = b.Build(ccb, bopts)
61 return ccb
62}
63
64// watcher balancer functions sequentially, so the balancer can be implemented
65// lock-free.
66func (ccb *ccBalancerWrapper) watcher() {
67 for {
68 select {
69 case t := <-ccb.scBuffer.Get():
70 ccb.scBuffer.Load()
71 if ccb.done.HasFired() {
72 break
73 }
74 ccb.balancerMu.Lock()
75 su := t.(*scStateUpdate)
76 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
77 ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state})
78 } else {
79 ccb.balancer.HandleSubConnStateChange(su.sc, su.state)
80 }
81 ccb.balancerMu.Unlock()
82 case <-ccb.done.Done():
83 }
84
85 if ccb.done.HasFired() {
86 ccb.balancer.Close()
87 ccb.mu.Lock()
88 scs := ccb.subConns
89 ccb.subConns = nil
90 ccb.mu.Unlock()
91 for acbw := range scs {
92 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
93 }
94 ccb.UpdateBalancerState(connectivity.Connecting, nil)
95 return
96 }
97 }
98}
99
100func (ccb *ccBalancerWrapper) close() {
101 ccb.done.Fire()
102}
103
104func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
105 // When updating addresses for a SubConn, if the address in use is not in
106 // the new addresses, the old ac will be tearDown() and a new ac will be
107 // created. tearDown() generates a state change with Shutdown state, we
108 // don't want the balancer to receive this state change. So before
109 // tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
110 // this function will be called with (nil, Shutdown). We don't need to call
111 // balancer method in this case.
112 if sc == nil {
113 return
114 }
115 ccb.scBuffer.Put(&scStateUpdate{
116 sc: sc,
117 state: s,
118 })
119}
120
121func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
122 ccb.balancerMu.Lock()
123 defer ccb.balancerMu.Unlock()
124 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
125 return ub.UpdateClientConnState(*ccs)
126 }
127 ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil)
128 return nil
129}
130
131func (ccb *ccBalancerWrapper) resolverError(err error) {
132 if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
133 ccb.balancerMu.Lock()
134 ub.ResolverError(err)
135 ccb.balancerMu.Unlock()
136 }
137}
138
139func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
140 if len(addrs) <= 0 {
141 return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
142 }
143 ccb.mu.Lock()
144 defer ccb.mu.Unlock()
145 if ccb.subConns == nil {
146 return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
147 }
148 ac, err := ccb.cc.newAddrConn(addrs, opts)
149 if err != nil {
150 return nil, err
151 }
152 acbw := &acBalancerWrapper{ac: ac}
153 acbw.ac.mu.Lock()
154 ac.acbw = acbw
155 acbw.ac.mu.Unlock()
156 ccb.subConns[acbw] = struct{}{}
157 return acbw, nil
158}
159
160func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
161 acbw, ok := sc.(*acBalancerWrapper)
162 if !ok {
163 return
164 }
165 ccb.mu.Lock()
166 defer ccb.mu.Unlock()
167 if ccb.subConns == nil {
168 return
169 }
170 delete(ccb.subConns, acbw)
171 ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
172}
173
174func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
175 ccb.mu.Lock()
176 defer ccb.mu.Unlock()
177 if ccb.subConns == nil {
178 return
179 }
180 // Update picker before updating state. Even though the ordering here does
181 // not matter, it can lead to multiple calls of Pick in the common start-up
182 // case where we wait for ready and then perform an RPC. If the picker is
183 // updated later, we could call the "connecting" picker when the state is
184 // updated, and then call the "ready" picker after the picker gets updated.
185 ccb.cc.blockingpicker.updatePicker(p)
186 ccb.cc.csMgr.updateState(s)
187}
188
189func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) {
190 ccb.cc.resolveNow(o)
191}
192
193func (ccb *ccBalancerWrapper) Target() string {
194 return ccb.cc.target
195}
196
197// acBalancerWrapper is a wrapper on top of ac for balancers.
198// It implements balancer.SubConn interface.
199type acBalancerWrapper struct {
200 mu sync.Mutex
201 ac *addrConn
202}
203
204func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
205 acbw.mu.Lock()
206 defer acbw.mu.Unlock()
207 if len(addrs) <= 0 {
208 acbw.ac.tearDown(errConnDrain)
209 return
210 }
211 if !acbw.ac.tryUpdateAddrs(addrs) {
212 cc := acbw.ac.cc
213 opts := acbw.ac.scopts
214 acbw.ac.mu.Lock()
215 // Set old ac.acbw to nil so the Shutdown state update will be ignored
216 // by balancer.
217 //
218 // TODO(bar) the state transition could be wrong when tearDown() old ac
219 // and creating new ac, fix the transition.
220 acbw.ac.acbw = nil
221 acbw.ac.mu.Unlock()
222 acState := acbw.ac.getState()
223 acbw.ac.tearDown(errConnDrain)
224
225 if acState == connectivity.Shutdown {
226 return
227 }
228
229 ac, err := cc.newAddrConn(addrs, opts)
230 if err != nil {
231 grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
232 return
233 }
234 acbw.ac = ac
235 ac.mu.Lock()
236 ac.acbw = acbw
237 ac.mu.Unlock()
238 if acState != connectivity.Idle {
239 ac.connect()
240 }
241 }
242}
243
244func (acbw *acBalancerWrapper) Connect() {
245 acbw.mu.Lock()
246 defer acbw.mu.Unlock()
247 acbw.ac.connect()
248}
249
250func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
251 acbw.mu.Lock()
252 defer acbw.mu.Unlock()
253 return acbw.ac
254}