blob: 84b6dbe3eabf93679950cbf7dd2b71337b7a41c2 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net"
27 "reflect"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc/balancer"
34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/grpclog"
39 "google.golang.org/grpc/internal"
40 "google.golang.org/grpc/internal/backoff"
41 "google.golang.org/grpc/internal/channelz"
42 "google.golang.org/grpc/internal/envconfig"
43 "google.golang.org/grpc/internal/grpcsync"
44 "google.golang.org/grpc/internal/transport"
45 "google.golang.org/grpc/keepalive"
46 "google.golang.org/grpc/metadata"
47 "google.golang.org/grpc/resolver"
48 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
49 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
50 "google.golang.org/grpc/status"
51)
52
53const (
54 // minimum time to give a connection to complete
55 minConnectTimeout = 20 * time.Second
56 // must match grpclbName in grpclb/grpclb.go
57 grpclbName = "grpclb"
58)
59
60var (
61 // ErrClientConnClosing indicates that the operation is illegal because
62 // the ClientConn is closing.
63 //
64 // Deprecated: this error should not be relied upon by users; use the status
65 // code of Canceled instead.
66 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
67 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
68 errConnDrain = errors.New("grpc: the connection is drained")
69 // errConnClosing indicates that the connection is closing.
70 errConnClosing = errors.New("grpc: the connection is closing")
71 // errBalancerClosed indicates that the balancer is closed.
72 errBalancerClosed = errors.New("grpc: balancer is closed")
73 // We use an accessor so that minConnectTimeout can be
74 // atomically read and updated while testing.
75 getMinConnectTimeout = func() time.Duration {
76 return minConnectTimeout
77 }
78)
79
80// The following errors are returned from Dial and DialContext
81var (
82 // errNoTransportSecurity indicates that there is no transport security
83 // being set for ClientConn. Users should either set one or explicitly
84 // call WithInsecure DialOption to disable security.
85 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
86 // errTransportCredsAndBundle indicates that creds bundle is used together
87 // with other individual Transport Credentials.
88 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
89 // errTransportCredentialsMissing indicates that users want to transmit security
90 // information (e.g., oauth2 token) which requires secure connection on an insecure
91 // connection.
92 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
93 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
94 // and grpc.WithInsecure() are both called for a connection.
95 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
96)
97
98const (
99 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
100 defaultClientMaxSendMessageSize = math.MaxInt32
101 // http2IOBufSize specifies the buffer size for sending frames.
102 defaultWriteBufSize = 32 * 1024
103 defaultReadBufSize = 32 * 1024
104)
105
106// Dial creates a client connection to the given target.
107func Dial(target string, opts ...DialOption) (*ClientConn, error) {
108 return DialContext(context.Background(), target, opts...)
109}
110
111// DialContext creates a client connection to the given target. By default, it's
112// a non-blocking dial (the function won't wait for connections to be
113// established, and connecting happens in the background). To make it a blocking
114// dial, use WithBlock() dial option.
115//
116// In the non-blocking case, the ctx does not act against the connection. It
117// only controls the setup steps.
118//
119// In the blocking case, ctx can be used to cancel or expire the pending
120// connection. Once this function returns, the cancellation and expiration of
121// ctx will be noop. Users should call ClientConn.Close to terminate all the
122// pending operations after this function returns.
123//
124// The target name syntax is defined in
125// https://github.com/grpc/grpc/blob/master/doc/naming.md.
126// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
127func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
128 cc := &ClientConn{
129 target: target,
130 csMgr: &connectivityStateManager{},
131 conns: make(map[*addrConn]struct{}),
132 dopts: defaultDialOptions(),
133 blockingpicker: newPickerWrapper(),
134 czData: new(channelzData),
135 firstResolveEvent: grpcsync.NewEvent(),
136 }
137 cc.retryThrottler.Store((*retryThrottler)(nil))
138 cc.ctx, cc.cancel = context.WithCancel(context.Background())
139
140 for _, opt := range opts {
141 opt.apply(&cc.dopts)
142 }
143
144 if channelz.IsOn() {
145 if cc.dopts.channelzParentID != 0 {
146 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
147 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
148 Desc: "Channel Created",
149 Severity: channelz.CtINFO,
150 Parent: &channelz.TraceEventDesc{
151 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
152 Severity: channelz.CtINFO,
153 },
154 })
155 } else {
156 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
157 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
158 Desc: "Channel Created",
159 Severity: channelz.CtINFO,
160 })
161 }
162 cc.csMgr.channelzID = cc.channelzID
163 }
164
165 if !cc.dopts.insecure {
166 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
167 return nil, errNoTransportSecurity
168 }
169 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
170 return nil, errTransportCredsAndBundle
171 }
172 } else {
173 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
174 return nil, errCredentialsConflict
175 }
176 for _, cd := range cc.dopts.copts.PerRPCCredentials {
177 if cd.RequireTransportSecurity() {
178 return nil, errTransportCredentialsMissing
179 }
180 }
181 }
182
183 cc.mkp = cc.dopts.copts.KeepaliveParams
184
185 if cc.dopts.copts.Dialer == nil {
186 cc.dopts.copts.Dialer = newProxyDialer(
187 func(ctx context.Context, addr string) (net.Conn, error) {
188 network, addr := parseDialTarget(addr)
189 return (&net.Dialer{}).DialContext(ctx, network, addr)
190 },
191 )
192 }
193
194 if cc.dopts.copts.UserAgent != "" {
195 cc.dopts.copts.UserAgent += " " + grpcUA
196 } else {
197 cc.dopts.copts.UserAgent = grpcUA
198 }
199
200 if cc.dopts.timeout > 0 {
201 var cancel context.CancelFunc
202 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
203 defer cancel()
204 }
205
206 defer func() {
207 select {
208 case <-ctx.Done():
209 conn, err = nil, ctx.Err()
210 default:
211 }
212
213 if err != nil {
214 cc.Close()
215 }
216 }()
217
218 scSet := false
219 if cc.dopts.scChan != nil {
220 // Try to get an initial service config.
221 select {
222 case sc, ok := <-cc.dopts.scChan:
223 if ok {
224 cc.sc = sc
225 scSet = true
226 }
227 default:
228 }
229 }
230 if cc.dopts.bs == nil {
231 cc.dopts.bs = backoff.Exponential{
232 MaxDelay: DefaultBackoffConfig.MaxDelay,
233 }
234 }
235 if cc.dopts.resolverBuilder == nil {
236 // Only try to parse target when resolver builder is not already set.
237 cc.parsedTarget = parseTarget(cc.target)
238 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
239 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
240 if cc.dopts.resolverBuilder == nil {
241 // If resolver builder is still nil, the parse target's scheme is
242 // not registered. Fallback to default resolver and set Endpoint to
243 // the original unparsed target.
244 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
245 cc.parsedTarget = resolver.Target{
246 Scheme: resolver.GetDefaultScheme(),
247 Endpoint: target,
248 }
249 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
250 }
251 } else {
252 cc.parsedTarget = resolver.Target{Endpoint: target}
253 }
254 creds := cc.dopts.copts.TransportCredentials
255 if creds != nil && creds.Info().ServerName != "" {
256 cc.authority = creds.Info().ServerName
257 } else if cc.dopts.insecure && cc.dopts.authority != "" {
258 cc.authority = cc.dopts.authority
259 } else {
260 // Use endpoint from "scheme://authority/endpoint" as the default
261 // authority for ClientConn.
262 cc.authority = cc.parsedTarget.Endpoint
263 }
264
265 if cc.dopts.scChan != nil && !scSet {
266 // Blocking wait for the initial service config.
267 select {
268 case sc, ok := <-cc.dopts.scChan:
269 if ok {
270 cc.sc = sc
271 }
272 case <-ctx.Done():
273 return nil, ctx.Err()
274 }
275 }
276 if cc.dopts.scChan != nil {
277 go cc.scWatcher()
278 }
279
280 var credsClone credentials.TransportCredentials
281 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
282 credsClone = creds.Clone()
283 }
284 cc.balancerBuildOpts = balancer.BuildOptions{
285 DialCreds: credsClone,
286 CredsBundle: cc.dopts.copts.CredsBundle,
287 Dialer: cc.dopts.copts.Dialer,
288 ChannelzParentID: cc.channelzID,
289 }
290
291 // Build the resolver.
292 rWrapper, err := newCCResolverWrapper(cc)
293 if err != nil {
294 return nil, fmt.Errorf("failed to build resolver: %v", err)
295 }
296
297 cc.mu.Lock()
298 cc.resolverWrapper = rWrapper
299 cc.mu.Unlock()
300 // A blocking dial blocks until the clientConn is ready.
301 if cc.dopts.block {
302 for {
303 s := cc.GetState()
304 if s == connectivity.Ready {
305 break
306 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
307 if err = cc.blockingpicker.connectionError(); err != nil {
308 terr, ok := err.(interface {
309 Temporary() bool
310 })
311 if ok && !terr.Temporary() {
312 return nil, err
313 }
314 }
315 }
316 if !cc.WaitForStateChange(ctx, s) {
317 // ctx got timeout or canceled.
318 return nil, ctx.Err()
319 }
320 }
321 }
322
323 return cc, nil
324}
325
326// connectivityStateManager keeps the connectivity.State of ClientConn.
327// This struct will eventually be exported so the balancers can access it.
328type connectivityStateManager struct {
329 mu sync.Mutex
330 state connectivity.State
331 notifyChan chan struct{}
332 channelzID int64
333}
334
335// updateState updates the connectivity.State of ClientConn.
336// If there's a change it notifies goroutines waiting on state change to
337// happen.
338func (csm *connectivityStateManager) updateState(state connectivity.State) {
339 csm.mu.Lock()
340 defer csm.mu.Unlock()
341 if csm.state == connectivity.Shutdown {
342 return
343 }
344 if csm.state == state {
345 return
346 }
347 csm.state = state
348 if channelz.IsOn() {
349 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
350 Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
351 Severity: channelz.CtINFO,
352 })
353 }
354 if csm.notifyChan != nil {
355 // There are other goroutines waiting on this channel.
356 close(csm.notifyChan)
357 csm.notifyChan = nil
358 }
359}
360
361func (csm *connectivityStateManager) getState() connectivity.State {
362 csm.mu.Lock()
363 defer csm.mu.Unlock()
364 return csm.state
365}
366
367func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
368 csm.mu.Lock()
369 defer csm.mu.Unlock()
370 if csm.notifyChan == nil {
371 csm.notifyChan = make(chan struct{})
372 }
373 return csm.notifyChan
374}
375
376// ClientConn represents a client connection to an RPC server.
377type ClientConn struct {
378 ctx context.Context
379 cancel context.CancelFunc
380
381 target string
382 parsedTarget resolver.Target
383 authority string
384 dopts dialOptions
385 csMgr *connectivityStateManager
386
387 balancerBuildOpts balancer.BuildOptions
388 blockingpicker *pickerWrapper
389
390 mu sync.RWMutex
391 resolverWrapper *ccResolverWrapper
392 sc ServiceConfig
393 scRaw string
394 conns map[*addrConn]struct{}
395 // Keepalive parameter can be updated if a GoAway is received.
396 mkp keepalive.ClientParameters
397 curBalancerName string
398 preBalancerName string // previous balancer name.
399 curAddresses []resolver.Address
400 balancerWrapper *ccBalancerWrapper
401 retryThrottler atomic.Value
402
403 firstResolveEvent *grpcsync.Event
404
405 channelzID int64 // channelz unique identification number
406 czData *channelzData
407}
408
409// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
410// ctx expires. A true value is returned in former case and false in latter.
411// This is an EXPERIMENTAL API.
412func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
413 ch := cc.csMgr.getNotifyChan()
414 if cc.csMgr.getState() != sourceState {
415 return true
416 }
417 select {
418 case <-ctx.Done():
419 return false
420 case <-ch:
421 return true
422 }
423}
424
425// GetState returns the connectivity.State of ClientConn.
426// This is an EXPERIMENTAL API.
427func (cc *ClientConn) GetState() connectivity.State {
428 return cc.csMgr.getState()
429}
430
431func (cc *ClientConn) scWatcher() {
432 for {
433 select {
434 case sc, ok := <-cc.dopts.scChan:
435 if !ok {
436 return
437 }
438 cc.mu.Lock()
439 // TODO: load balance policy runtime change is ignored.
440 // We may revist this decision in the future.
441 cc.sc = sc
442 cc.scRaw = ""
443 cc.mu.Unlock()
444 case <-cc.ctx.Done():
445 return
446 }
447 }
448}
449
450// waitForResolvedAddrs blocks until the resolver has provided addresses or the
451// context expires. Returns nil unless the context expires first; otherwise
452// returns a status error based on the context.
453func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
454 // This is on the RPC path, so we use a fast path to avoid the
455 // more-expensive "select" below after the resolver has returned once.
456 if cc.firstResolveEvent.HasFired() {
457 return nil
458 }
459 select {
460 case <-cc.firstResolveEvent.Done():
461 return nil
462 case <-ctx.Done():
463 return status.FromContextError(ctx.Err()).Err()
464 case <-cc.ctx.Done():
465 return ErrClientConnClosing
466 }
467}
468
469func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
470 cc.mu.Lock()
471 defer cc.mu.Unlock()
472 if cc.conns == nil {
473 // cc was closed.
474 return
475 }
476
477 if reflect.DeepEqual(cc.curAddresses, addrs) {
478 return
479 }
480
481 cc.curAddresses = addrs
482 cc.firstResolveEvent.Fire()
483
484 if cc.dopts.balancerBuilder == nil {
485 // Only look at balancer types and switch balancer if balancer dial
486 // option is not set.
487 var isGRPCLB bool
488 for _, a := range addrs {
489 if a.Type == resolver.GRPCLB {
490 isGRPCLB = true
491 break
492 }
493 }
494 var newBalancerName string
495 if isGRPCLB {
496 newBalancerName = grpclbName
497 } else {
498 // Address list doesn't contain grpclb address. Try to pick a
499 // non-grpclb balancer.
500 newBalancerName = cc.curBalancerName
501 // If current balancer is grpclb, switch to the previous one.
502 if newBalancerName == grpclbName {
503 newBalancerName = cc.preBalancerName
504 }
505 // The following could be true in two cases:
506 // - the first time handling resolved addresses
507 // (curBalancerName="")
508 // - the first time handling non-grpclb addresses
509 // (curBalancerName="grpclb", preBalancerName="")
510 if newBalancerName == "" {
511 newBalancerName = PickFirstBalancerName
512 }
513 }
514 cc.switchBalancer(newBalancerName)
515 } else if cc.balancerWrapper == nil {
516 // Balancer dial option was set, and this is the first time handling
517 // resolved addresses. Build a balancer with dopts.balancerBuilder.
518 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
519 }
520
521 cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
522}
523
524// switchBalancer starts the switching from current balancer to the balancer
525// with the given name.
526//
527// It will NOT send the current address list to the new balancer. If needed,
528// caller of this function should send address list to the new balancer after
529// this function returns.
530//
531// Caller must hold cc.mu.
532func (cc *ClientConn) switchBalancer(name string) {
533 if cc.conns == nil {
534 return
535 }
536
537 if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
538 return
539 }
540
541 grpclog.Infof("ClientConn switching balancer to %q", name)
542 if cc.dopts.balancerBuilder != nil {
543 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
544 return
545 }
546 // TODO(bar switching) change this to two steps: drain and close.
547 // Keep track of sc in wrapper.
548 if cc.balancerWrapper != nil {
549 cc.balancerWrapper.close()
550 }
551
552 builder := balancer.Get(name)
553 // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
554 // we reuse previous one?
555 if channelz.IsOn() {
556 if builder == nil {
557 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
558 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
559 Severity: channelz.CtWarning,
560 })
561 } else {
562 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
563 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
564 Severity: channelz.CtINFO,
565 })
566 }
567 }
568 if builder == nil {
569 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
570 builder = newPickfirstBuilder()
571 }
572
573 cc.preBalancerName = cc.curBalancerName
574 cc.curBalancerName = builder.Name()
575 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
576}
577
578func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
579 cc.mu.Lock()
580 if cc.conns == nil {
581 cc.mu.Unlock()
582 return
583 }
584 // TODO(bar switching) send updates to all balancer wrappers when balancer
585 // gracefully switching is supported.
586 cc.balancerWrapper.handleSubConnStateChange(sc, s)
587 cc.mu.Unlock()
588}
589
590// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
591//
592// Caller needs to make sure len(addrs) > 0.
593func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
594 ac := &addrConn{
595 cc: cc,
596 addrs: addrs,
597 scopts: opts,
598 dopts: cc.dopts,
599 czData: new(channelzData),
600 successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
601 resetBackoff: make(chan struct{}),
602 }
603 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
604 // Track ac in cc. This needs to be done before any getTransport(...) is called.
605 cc.mu.Lock()
606 if cc.conns == nil {
607 cc.mu.Unlock()
608 return nil, ErrClientConnClosing
609 }
610 if channelz.IsOn() {
611 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
612 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
613 Desc: "Subchannel Created",
614 Severity: channelz.CtINFO,
615 Parent: &channelz.TraceEventDesc{
616 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
617 Severity: channelz.CtINFO,
618 },
619 })
620 }
621 cc.conns[ac] = struct{}{}
622 cc.mu.Unlock()
623 return ac, nil
624}
625
626// removeAddrConn removes the addrConn in the subConn from clientConn.
627// It also tears down the ac with the given error.
628func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
629 cc.mu.Lock()
630 if cc.conns == nil {
631 cc.mu.Unlock()
632 return
633 }
634 delete(cc.conns, ac)
635 cc.mu.Unlock()
636 ac.tearDown(err)
637}
638
639func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
640 return &channelz.ChannelInternalMetric{
641 State: cc.GetState(),
642 Target: cc.target,
643 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
644 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
645 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
646 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
647 }
648}
649
650// Target returns the target string of the ClientConn.
651// This is an EXPERIMENTAL API.
652func (cc *ClientConn) Target() string {
653 return cc.target
654}
655
656func (cc *ClientConn) incrCallsStarted() {
657 atomic.AddInt64(&cc.czData.callsStarted, 1)
658 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
659}
660
661func (cc *ClientConn) incrCallsSucceeded() {
662 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
663}
664
665func (cc *ClientConn) incrCallsFailed() {
666 atomic.AddInt64(&cc.czData.callsFailed, 1)
667}
668
669// connect starts creating a transport.
670// It does nothing if the ac is not IDLE.
671// TODO(bar) Move this to the addrConn section.
672func (ac *addrConn) connect() error {
673 ac.mu.Lock()
674 if ac.state == connectivity.Shutdown {
675 ac.mu.Unlock()
676 return errConnClosing
677 }
678 if ac.state != connectivity.Idle {
679 ac.mu.Unlock()
680 return nil
681 }
682 ac.updateConnectivityState(connectivity.Connecting)
683 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
684 ac.mu.Unlock()
685
686 // Start a goroutine connecting to the server asynchronously.
687 go ac.resetTransport(false)
688 return nil
689}
690
691// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
692//
693// It checks whether current connected address of ac is in the new addrs list.
694// - If true, it updates ac.addrs and returns true. The ac will keep using
695// the existing connection.
696// - If false, it does nothing and returns false.
697func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
698 ac.mu.Lock()
699 defer ac.mu.Unlock()
700 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
701 if ac.state == connectivity.Shutdown {
702 ac.addrs = addrs
703 return true
704 }
705
706 var curAddrFound bool
707 for _, a := range addrs {
708 if reflect.DeepEqual(ac.curAddr, a) {
709 curAddrFound = true
710 break
711 }
712 }
713 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
714 if curAddrFound {
715 ac.addrs = addrs
716 ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
717 }
718
719 return curAddrFound
720}
721
722// GetMethodConfig gets the method config of the input method.
723// If there's an exact match for input method (i.e. /service/method), we return
724// the corresponding MethodConfig.
725// If there isn't an exact match for the input method, we look for the default config
726// under the service (i.e /service/). If there is a default MethodConfig for
727// the service, we return it.
728// Otherwise, we return an empty MethodConfig.
729func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
730 // TODO: Avoid the locking here.
731 cc.mu.RLock()
732 defer cc.mu.RUnlock()
733 m, ok := cc.sc.Methods[method]
734 if !ok {
735 i := strings.LastIndex(method, "/")
736 m = cc.sc.Methods[method[:i+1]]
737 }
738 return m
739}
740
741func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
742 cc.mu.RLock()
743 defer cc.mu.RUnlock()
744 return cc.sc.healthCheckConfig
745}
746
747func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
748 hdr, _ := metadata.FromOutgoingContext(ctx)
749 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
750 FullMethodName: method,
751 Header: hdr,
752 })
753 if err != nil {
754 return nil, nil, toRPCErr(err)
755 }
756 return t, done, nil
757}
758
759// handleServiceConfig parses the service config string in JSON format to Go native
760// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
761func (cc *ClientConn) handleServiceConfig(js string) error {
762 if cc.dopts.disableServiceConfig {
763 return nil
764 }
765 if cc.scRaw == js {
766 return nil
767 }
768 if channelz.IsOn() {
769 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
770 // The special formatting of \"%s\" instead of %q is to provide nice printing of service config
771 // for human consumption.
772 Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
773 Severity: channelz.CtINFO,
774 })
775 }
776 sc, err := parseServiceConfig(js)
777 if err != nil {
778 return err
779 }
780 cc.mu.Lock()
781 // Check if the ClientConn is already closed. Some fields (e.g.
782 // balancerWrapper) are set to nil when closing the ClientConn, and could
783 // cause nil pointer panic if we don't have this check.
784 if cc.conns == nil {
785 cc.mu.Unlock()
786 return nil
787 }
788 cc.scRaw = js
789 cc.sc = sc
790
791 if sc.retryThrottling != nil {
792 newThrottler := &retryThrottler{
793 tokens: sc.retryThrottling.MaxTokens,
794 max: sc.retryThrottling.MaxTokens,
795 thresh: sc.retryThrottling.MaxTokens / 2,
796 ratio: sc.retryThrottling.TokenRatio,
797 }
798 cc.retryThrottler.Store(newThrottler)
799 } else {
800 cc.retryThrottler.Store((*retryThrottler)(nil))
801 }
802
803 if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
804 if cc.curBalancerName == grpclbName {
805 // If current balancer is grpclb, there's at least one grpclb
806 // balancer address in the resolved list. Don't switch the balancer,
807 // but change the previous balancer name, so if a new resolved
808 // address list doesn't contain grpclb address, balancer will be
809 // switched to *sc.LB.
810 cc.preBalancerName = *sc.LB
811 } else {
812 cc.switchBalancer(*sc.LB)
813 cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
814 }
815 }
816
817 cc.mu.Unlock()
818 return nil
819}
820
821func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
822 cc.mu.RLock()
823 r := cc.resolverWrapper
824 cc.mu.RUnlock()
825 if r == nil {
826 return
827 }
828 go r.resolveNow(o)
829}
830
831// ResetConnectBackoff wakes up all subchannels in transient failure and causes
832// them to attempt another connection immediately. It also resets the backoff
833// times used for subsequent attempts regardless of the current state.
834//
835// In general, this function should not be used. Typical service or network
836// outages result in a reasonable client reconnection strategy by default.
837// However, if a previously unavailable network becomes available, this may be
838// used to trigger an immediate reconnect.
839//
840// This API is EXPERIMENTAL.
841func (cc *ClientConn) ResetConnectBackoff() {
842 cc.mu.Lock()
843 defer cc.mu.Unlock()
844 for ac := range cc.conns {
845 ac.resetConnectBackoff()
846 }
847}
848
849// Close tears down the ClientConn and all underlying connections.
850func (cc *ClientConn) Close() error {
851 defer cc.cancel()
852
853 cc.mu.Lock()
854 if cc.conns == nil {
855 cc.mu.Unlock()
856 return ErrClientConnClosing
857 }
858 conns := cc.conns
859 cc.conns = nil
860 cc.csMgr.updateState(connectivity.Shutdown)
861
862 rWrapper := cc.resolverWrapper
863 cc.resolverWrapper = nil
864 bWrapper := cc.balancerWrapper
865 cc.balancerWrapper = nil
866 cc.mu.Unlock()
867
868 cc.blockingpicker.close()
869
870 if rWrapper != nil {
871 rWrapper.close()
872 }
873 if bWrapper != nil {
874 bWrapper.close()
875 }
876
877 for ac := range conns {
878 ac.tearDown(ErrClientConnClosing)
879 }
880 if channelz.IsOn() {
881 ted := &channelz.TraceEventDesc{
882 Desc: "Channel Deleted",
883 Severity: channelz.CtINFO,
884 }
885 if cc.dopts.channelzParentID != 0 {
886 ted.Parent = &channelz.TraceEventDesc{
887 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
888 Severity: channelz.CtINFO,
889 }
890 }
891 channelz.AddTraceEvent(cc.channelzID, ted)
892 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
893 // the entity beng deleted, and thus prevent it from being deleted right away.
894 channelz.RemoveEntry(cc.channelzID)
895 }
896 return nil
897}
898
899// addrConn is a network connection to a given address.
900type addrConn struct {
901 ctx context.Context
902 cancel context.CancelFunc
903
904 cc *ClientConn
905 dopts dialOptions
906 acbw balancer.SubConn
907 scopts balancer.NewSubConnOptions
908
909 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
910 // health checking may require server to report healthy to set ac to READY), and is reset
911 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
912 // is received, transport is closed, ac has been torn down).
913 transport transport.ClientTransport // The current transport.
914
915 mu sync.Mutex
916 addrIdx int // The index in addrs list to start reconnecting from.
917 curAddr resolver.Address // The current address.
918 addrs []resolver.Address // All addresses that the resolver resolved to.
919
920 // Use updateConnectivityState for updating addrConn's connectivity state.
921 state connectivity.State
922
923 tearDownErr error // The reason this addrConn is torn down.
924
925 backoffIdx int
926 // backoffDeadline is the time until which resetTransport needs to
927 // wait before increasing backoffIdx count.
928 backoffDeadline time.Time
929 // connectDeadline is the time by which all connection
930 // negotiations must complete.
931 connectDeadline time.Time
932
933 resetBackoff chan struct{}
934
935 channelzID int64 // channelz unique identification number
936 czData *channelzData
937
938 successfulHandshake bool
939
940 healthCheckEnabled bool
941}
942
943// Note: this requires a lock on ac.mu.
944func (ac *addrConn) updateConnectivityState(s connectivity.State) {
945 ac.state = s
946 if channelz.IsOn() {
947 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
948 Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s),
949 Severity: channelz.CtINFO,
950 })
951 }
952}
953
954// adjustParams updates parameters used to create transports upon
955// receiving a GoAway.
956func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
957 switch r {
958 case transport.GoAwayTooManyPings:
959 v := 2 * ac.dopts.copts.KeepaliveParams.Time
960 ac.cc.mu.Lock()
961 if v > ac.cc.mkp.Time {
962 ac.cc.mkp.Time = v
963 }
964 ac.cc.mu.Unlock()
965 }
966}
967
968// resetTransport makes sure that a healthy ac.transport exists.
969//
970// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
971// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
972// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
973// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
974// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
975// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
976//
977// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
978func (ac *addrConn) resetTransport(resolveNow bool) {
979 for {
980 // If this is the first in a line of resets, we want to resolve immediately. The only other time we
981 // want to reset is if we have tried all the addresses handed to us.
982 if resolveNow {
983 ac.mu.Lock()
984 ac.cc.resolveNow(resolver.ResolveNowOption{})
985 ac.mu.Unlock()
986 }
987
988 ac.mu.Lock()
989 if ac.state == connectivity.Shutdown {
990 ac.mu.Unlock()
991 return
992 }
993
994 // The transport that was used before is no longer viable.
995 ac.transport = nil
996 // If the connection is READY, a failure must have occurred.
997 // Otherwise, we'll consider this is a transient failure when:
998 // We've exhausted all addresses
999 // We're in CONNECTING
1000 // And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
1001 if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
1002 ac.updateConnectivityState(connectivity.TransientFailure)
1003 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1004 }
1005 ac.transport = nil
1006 ac.mu.Unlock()
1007
1008 if err := ac.nextAddr(); err != nil {
1009 return
1010 }
1011
1012 ac.mu.Lock()
1013 if ac.state == connectivity.Shutdown {
1014 ac.mu.Unlock()
1015 return
1016 }
1017
1018 backoffIdx := ac.backoffIdx
1019 backoffFor := ac.dopts.bs.Backoff(backoffIdx)
1020
1021 // This will be the duration that dial gets to finish.
1022 dialDuration := getMinConnectTimeout()
1023 if backoffFor > dialDuration {
1024 // Give dial more time as we keep failing to connect.
1025 dialDuration = backoffFor
1026 }
1027 start := time.Now()
1028 connectDeadline := start.Add(dialDuration)
1029 ac.backoffDeadline = start.Add(backoffFor)
1030 ac.connectDeadline = connectDeadline
1031
1032 ac.mu.Unlock()
1033
1034 ac.cc.mu.RLock()
1035 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1036 ac.cc.mu.RUnlock()
1037
1038 ac.mu.Lock()
1039
1040 if ac.state == connectivity.Shutdown {
1041 ac.mu.Unlock()
1042 return
1043 }
1044
1045 if ac.state != connectivity.Connecting {
1046 ac.updateConnectivityState(connectivity.Connecting)
1047 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1048 }
1049
1050 addr := ac.addrs[ac.addrIdx]
1051 copts := ac.dopts.copts
1052 if ac.scopts.CredsBundle != nil {
1053 copts.CredsBundle = ac.scopts.CredsBundle
1054 }
1055 ac.mu.Unlock()
1056
1057 if channelz.IsOn() {
1058 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1059 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1060 Severity: channelz.CtINFO,
1061 })
1062 }
1063
1064 if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
1065 continue
1066 }
1067
1068 return
1069 }
1070}
1071
1072// createTransport creates a connection to one of the backends in addrs.
1073func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
1074 oneReset := sync.Once{}
1075 skipReset := make(chan struct{})
1076 allowedToReset := make(chan struct{})
1077 prefaceReceived := make(chan struct{})
1078 onCloseCalled := make(chan struct{})
1079
1080 var prefaceMu sync.Mutex
1081 var serverPrefaceReceived bool
1082 var clientPrefaceWrote bool
1083
1084 hcCtx, hcCancel := context.WithCancel(ac.ctx)
1085
1086 onGoAway := func(r transport.GoAwayReason) {
1087 hcCancel()
1088 ac.mu.Lock()
1089 ac.adjustParams(r)
1090 ac.mu.Unlock()
1091 select {
1092 case <-skipReset: // The outer resetTransport loop will handle reconnection.
1093 return
1094 case <-allowedToReset: // We're in the clear to reset.
1095 go oneReset.Do(func() { ac.resetTransport(false) })
1096 }
1097 }
1098
1099 prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
1100
1101 onClose := func() {
1102 hcCancel()
1103 close(onCloseCalled)
1104 prefaceTimer.Stop()
1105
1106 select {
1107 case <-skipReset: // The outer resetTransport loop will handle reconnection.
1108 return
1109 case <-allowedToReset: // We're in the clear to reset.
1110 oneReset.Do(func() { ac.resetTransport(false) })
1111 }
1112 }
1113
1114 target := transport.TargetInfo{
1115 Addr: addr.Addr,
1116 Metadata: addr.Metadata,
1117 Authority: ac.cc.authority,
1118 }
1119
1120 onPrefaceReceipt := func() {
1121 close(prefaceReceived)
1122 prefaceTimer.Stop()
1123
1124 // TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
1125 ac.mu.Lock()
1126
1127 prefaceMu.Lock()
1128 serverPrefaceReceived = true
1129 if clientPrefaceWrote {
1130 ac.successfulHandshake = true
1131 }
1132 prefaceMu.Unlock()
1133
1134 ac.mu.Unlock()
1135 }
1136
1137 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1138 defer cancel()
1139 if channelz.IsOn() {
1140 copts.ChannelzParentID = ac.channelzID
1141 }
1142
1143 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1144
1145 if err == nil {
1146 prefaceMu.Lock()
1147 clientPrefaceWrote = true
1148 if serverPrefaceReceived || ac.dopts.reqHandshake == envconfig.RequireHandshakeOff {
1149 ac.successfulHandshake = true
1150 }
1151 prefaceMu.Unlock()
1152
1153 if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1154 select {
1155 case <-prefaceTimer.C:
1156 // We didn't get the preface in time.
1157 newTr.Close()
1158 err = errors.New("timed out waiting for server handshake")
1159 case <-prefaceReceived:
1160 // We got the preface - huzzah! things are good.
1161 case <-onCloseCalled:
1162 // The transport has already closed - noop.
1163 close(allowedToReset)
1164 return nil
1165 }
1166 } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
1167 go func() {
1168 select {
1169 case <-prefaceTimer.C:
1170 // We didn't get the preface in time.
1171 newTr.Close()
1172 case <-prefaceReceived:
1173 // We got the preface just in the nick of time - huzzah!
1174 case <-onCloseCalled:
1175 // The transport has already closed - noop.
1176 }
1177 }()
1178 }
1179 }
1180
1181 if err != nil {
1182 // newTr is either nil, or closed.
1183 ac.cc.blockingpicker.updateConnectionError(err)
1184 ac.mu.Lock()
1185 if ac.state == connectivity.Shutdown {
1186 // ac.tearDown(...) has been invoked.
1187 ac.mu.Unlock()
1188
1189 // We don't want to reset during this close because we prefer to kick out of this function and let the loop
1190 // in resetTransport take care of reconnecting.
1191 close(skipReset)
1192
1193 return errConnClosing
1194 }
1195 ac.mu.Unlock()
1196 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1197
1198 // We don't want to reset during this close because we prefer to kick out of this function and let the loop
1199 // in resetTransport take care of reconnecting.
1200 close(skipReset)
1201
1202 return err
1203 }
1204
1205 // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1206 ac.mu.Lock()
1207 if ac.state == connectivity.Shutdown {
1208 ac.mu.Unlock()
1209 close(skipReset)
1210 newTr.Close()
1211 return nil
1212 }
1213 ac.transport = newTr
1214 ac.mu.Unlock()
1215
1216 healthCheckConfig := ac.cc.healthCheckConfig()
1217 // LB channel health checking is only enabled when all the four requirements below are met:
1218 // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1219 // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1220 // 3. a service config with non-empty healthCheckConfig field is provided,
1221 // 4. the current load balancer allows it.
1222 if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1223 if internal.HealthCheckFunc != nil {
1224 go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName)
1225 close(allowedToReset)
1226 return nil
1227 }
1228 // TODO: add a link to the health check doc in the error message.
1229 grpclog.Error("the client side LB channel health check function has not been set.")
1230 }
1231
1232 // No LB channel health check case
1233 ac.mu.Lock()
1234
1235 if ac.state == connectivity.Shutdown {
1236 ac.mu.Unlock()
1237
1238 // unblock onGoAway/onClose callback.
1239 close(skipReset)
1240 return errConnClosing
1241 }
1242
1243 ac.updateConnectivityState(connectivity.Ready)
1244 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1245 ac.curAddr = addr
1246
1247 ac.mu.Unlock()
1248
1249 // Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
1250 // goroutine failing races with all the code in this method that sets the connection to "ready".
1251 close(allowedToReset)
1252 return nil
1253}
1254
1255func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
1256 // Set up the health check helper functions
1257 newStream := func() (interface{}, error) {
1258 return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
1259 }
1260 firstReady := true
1261 reportHealth := func(ok bool) {
1262 ac.mu.Lock()
1263 defer ac.mu.Unlock()
1264 if ac.transport != newTr {
1265 return
1266 }
1267 if ok {
1268 if firstReady {
1269 firstReady = false
1270 ac.curAddr = addr
1271 }
1272 if ac.state != connectivity.Ready {
1273 ac.updateConnectivityState(connectivity.Ready)
1274 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1275 }
1276 } else {
1277 if ac.state != connectivity.TransientFailure {
1278 ac.updateConnectivityState(connectivity.TransientFailure)
1279 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1280 }
1281 }
1282 }
1283
1284 err := internal.HealthCheckFunc(ctx, newStream, reportHealth, serviceName)
1285 if err != nil {
1286 if status.Code(err) == codes.Unimplemented {
1287 if channelz.IsOn() {
1288 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1289 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
1290 Severity: channelz.CtError,
1291 })
1292 }
1293 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1294 } else {
1295 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1296 }
1297 }
1298}
1299
1300// nextAddr increments the addrIdx if there are more addresses to try. If
1301// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
1302// increment the backoffIdx.
1303//
1304// nextAddr must be called without ac.mu being held.
1305func (ac *addrConn) nextAddr() error {
1306 ac.mu.Lock()
1307
1308 // If a handshake has been observed, we want the next usage to start at
1309 // index 0 immediately.
1310 if ac.successfulHandshake {
1311 ac.successfulHandshake = false
1312 ac.backoffDeadline = time.Time{}
1313 ac.connectDeadline = time.Time{}
1314 ac.addrIdx = 0
1315 ac.backoffIdx = 0
1316 ac.mu.Unlock()
1317 return nil
1318 }
1319
1320 if ac.addrIdx < len(ac.addrs)-1 {
1321 ac.addrIdx++
1322 ac.mu.Unlock()
1323 return nil
1324 }
1325
1326 ac.addrIdx = 0
1327 ac.backoffIdx++
1328
1329 if ac.state == connectivity.Shutdown {
1330 ac.mu.Unlock()
1331 return errConnClosing
1332 }
1333 ac.cc.resolveNow(resolver.ResolveNowOption{})
1334 backoffDeadline := ac.backoffDeadline
1335 b := ac.resetBackoff
1336 ac.mu.Unlock()
1337 timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
1338 select {
1339 case <-timer.C:
1340 case <-b:
1341 timer.Stop()
1342 case <-ac.ctx.Done():
1343 timer.Stop()
1344 return ac.ctx.Err()
1345 }
1346 return nil
1347}
1348
1349func (ac *addrConn) resetConnectBackoff() {
1350 ac.mu.Lock()
1351 close(ac.resetBackoff)
1352 ac.backoffIdx = 0
1353 ac.resetBackoff = make(chan struct{})
1354 ac.mu.Unlock()
1355}
1356
1357// getReadyTransport returns the transport if ac's state is READY.
1358// Otherwise it returns nil, false.
1359// If ac's state is IDLE, it will trigger ac to connect.
1360func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1361 ac.mu.Lock()
1362 if ac.state == connectivity.Ready && ac.transport != nil {
1363 t := ac.transport
1364 ac.mu.Unlock()
1365 return t, true
1366 }
1367 var idle bool
1368 if ac.state == connectivity.Idle {
1369 idle = true
1370 }
1371 ac.mu.Unlock()
1372 // Trigger idle ac to connect.
1373 if idle {
1374 ac.connect()
1375 }
1376 return nil, false
1377}
1378
1379// tearDown starts to tear down the addrConn.
1380// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1381// some edge cases (e.g., the caller opens and closes many addrConn's in a
1382// tight loop.
1383// tearDown doesn't remove ac from ac.cc.conns.
1384func (ac *addrConn) tearDown(err error) {
1385 ac.mu.Lock()
1386 if ac.state == connectivity.Shutdown {
1387 ac.mu.Unlock()
1388 return
1389 }
1390 curTr := ac.transport
1391 ac.transport = nil
1392 // We have to set the state to Shutdown before anything else to prevent races
1393 // between setting the state and logic that waits on context cancelation / etc.
1394 ac.updateConnectivityState(connectivity.Shutdown)
1395 ac.cancel()
1396 ac.tearDownErr = err
1397 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
1398 ac.curAddr = resolver.Address{}
1399 if err == errConnDrain && curTr != nil {
1400 // GracefulClose(...) may be executed multiple times when
1401 // i) receiving multiple GoAway frames from the server; or
1402 // ii) there are concurrent name resolver/Balancer triggered
1403 // address removal and GoAway.
1404 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1405 ac.mu.Unlock()
1406 curTr.GracefulClose()
1407 ac.mu.Lock()
1408 }
1409 if channelz.IsOn() {
1410 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1411 Desc: "Subchannel Deleted",
1412 Severity: channelz.CtINFO,
1413 Parent: &channelz.TraceEventDesc{
1414 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1415 Severity: channelz.CtINFO,
1416 },
1417 })
1418 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1419 // the entity beng deleted, and thus prevent it from being deleted right away.
1420 channelz.RemoveEntry(ac.channelzID)
1421 }
1422 ac.mu.Unlock()
1423}
1424
1425func (ac *addrConn) getState() connectivity.State {
1426 ac.mu.Lock()
1427 defer ac.mu.Unlock()
1428 return ac.state
1429}
1430
1431func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1432 ac.mu.Lock()
1433 addr := ac.curAddr.Addr
1434 ac.mu.Unlock()
1435 return &channelz.ChannelInternalMetric{
1436 State: ac.getState(),
1437 Target: addr,
1438 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1439 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1440 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1441 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1442 }
1443}
1444
1445func (ac *addrConn) incrCallsStarted() {
1446 atomic.AddInt64(&ac.czData.callsStarted, 1)
1447 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1448}
1449
1450func (ac *addrConn) incrCallsSucceeded() {
1451 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1452}
1453
1454func (ac *addrConn) incrCallsFailed() {
1455 atomic.AddInt64(&ac.czData.callsFailed, 1)
1456}
1457
1458type retryThrottler struct {
1459 max float64
1460 thresh float64
1461 ratio float64
1462
1463 mu sync.Mutex
1464 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1465}
1466
1467// throttle subtracts a retry token from the pool and returns whether a retry
1468// should be throttled (disallowed) based upon the retry throttling policy in
1469// the service config.
1470func (rt *retryThrottler) throttle() bool {
1471 if rt == nil {
1472 return false
1473 }
1474 rt.mu.Lock()
1475 defer rt.mu.Unlock()
1476 rt.tokens--
1477 if rt.tokens < 0 {
1478 rt.tokens = 0
1479 }
1480 return rt.tokens <= rt.thresh
1481}
1482
1483func (rt *retryThrottler) successfulRPC() {
1484 if rt == nil {
1485 return
1486 }
1487 rt.mu.Lock()
1488 defer rt.mu.Unlock()
1489 rt.tokens += rt.ratio
1490 if rt.tokens > rt.max {
1491 rt.tokens = rt.max
1492 }
1493}
1494
1495type channelzChannel struct {
1496 cc *ClientConn
1497}
1498
1499func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1500 return c.cc.channelzMetric()
1501}
1502
1503// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1504// underlying connections within the specified timeout.
1505//
1506// Deprecated: This error is never returned by grpc and should not be
1507// referenced by users.
1508var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")